entry.rs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use crate::{error::Error, value::Value};
  2. use bytes::BytesMut;
  3. use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
  4. use std::sync::atomic::{AtomicUsize, Ordering};
  5. use tokio::time::Instant;
  6. #[derive(Debug)]
  7. pub struct Entry {
  8. value: RwLock<Value>,
  9. version: AtomicUsize,
  10. expires_at: Mutex<Option<Instant>>,
  11. }
  12. static LAST_VERSION: AtomicUsize = AtomicUsize::new(0);
  13. /// Returns a new version
  14. pub fn unique_id() -> usize {
  15. LAST_VERSION.fetch_add(1, Ordering::Relaxed)
  16. }
  17. /// Database Entry
  18. ///
  19. /// A database entry is a Value associated with an optional ttl.
  20. ///
  21. /// The database will never return an entry if has expired already, by having
  22. /// this promise we can run the purge process every few seconds instead of doing
  23. /// so more frequently.
  24. impl Entry {
  25. pub fn new(value: Value, expires_at: Option<Instant>) -> Self {
  26. Self {
  27. value: RwLock::new(value),
  28. expires_at: Mutex::new(expires_at),
  29. version: AtomicUsize::new(LAST_VERSION.fetch_add(1, Ordering::Relaxed)),
  30. }
  31. }
  32. #[inline(always)]
  33. pub fn take_value(self) -> Value {
  34. self.value.into_inner()
  35. }
  36. #[inline(always)]
  37. pub fn digest(&self) -> Vec<u8> {
  38. self.value.read().digest()
  39. }
  40. #[inline(always)]
  41. pub fn bump_version(&self) {
  42. self.version.store(
  43. LAST_VERSION.fetch_add(1, Ordering::Relaxed),
  44. Ordering::Relaxed,
  45. )
  46. }
  47. pub fn persist(&self) {
  48. *self.expires_at.lock() = None;
  49. }
  50. pub fn clone(&self) -> Self {
  51. Self::new(self.value.read().clone(), *self.expires_at.lock())
  52. }
  53. pub fn get_ttl(&self) -> Option<Instant> {
  54. *self.expires_at.lock()
  55. }
  56. pub fn has_ttl(&self) -> bool {
  57. self.expires_at.lock().is_some()
  58. }
  59. pub fn set_ttl(&self, expires_at: Instant) {
  60. *self.expires_at.lock() = Some(expires_at);
  61. self.bump_version()
  62. }
  63. pub fn version(&self) -> usize {
  64. self.version.load(Ordering::Relaxed)
  65. }
  66. pub fn get(&self) -> RwLockReadGuard<'_, Value> {
  67. self.value.read()
  68. }
  69. pub fn get_mut(&self) -> RwLockWriteGuard<'_, Value> {
  70. self.value.write()
  71. }
  72. pub fn ensure_blob_is_mutable(&self) -> Result<(), Error> {
  73. let mut val = self.get_mut();
  74. match *val {
  75. Value::Blob(ref mut data) => {
  76. let rw_data = BytesMut::from(&data[..]);
  77. *val = Value::BlobRw(rw_data);
  78. Ok(())
  79. }
  80. Value::BlobRw(_) => Ok(()),
  81. _ => Err(Error::WrongType),
  82. }
  83. }
  84. /// If the Entry should be taken as valid, if this function returns FALSE
  85. /// the callee should behave as if the key was not found. By having this
  86. /// behaviour we can schedule the purge thread to run every few seconds or
  87. /// even minutes instead of once every second.
  88. pub fn is_valid(&self) -> bool {
  89. self.expires_at.lock().map_or(true, |x| x > Instant::now())
  90. }
  91. /// Whether or not the value is scalar
  92. pub fn is_scalar(&self) -> bool {
  93. matches!(
  94. *self.value.read(),
  95. Value::Boolean(_)
  96. | Value::Blob(_)
  97. | Value::BlobRw(_)
  98. | Value::BigInteger(_)
  99. | Value::Integer(_)
  100. | Value::Float(_)
  101. | Value::String(_)
  102. | Value::Null
  103. | Value::Ok
  104. )
  105. }
  106. /// Clone a value. If the value is not clonable an error is Value::Error is
  107. /// returned instead
  108. pub fn clone_value(&self) -> Value {
  109. if self.is_scalar() {
  110. self.value.read().clone()
  111. } else {
  112. Error::WrongType.into()
  113. }
  114. }
  115. }
  116. #[cfg(test)]
  117. mod test {
  118. use super::*;
  119. use tokio::time::Duration;
  120. #[test]
  121. fn is_valid_without_expiration() {
  122. let e = Entry::new(Value::Null, None);
  123. assert!(e.is_valid());
  124. }
  125. #[test]
  126. fn is_valid() {
  127. let e = (
  128. Entry::new(Value::Null, Some(Instant::now() - Duration::from_secs(5))),
  129. Entry::new(Value::Null, Some(Instant::now())),
  130. Entry::new(Value::Null, Some(Instant::now() + Duration::from_secs(5))),
  131. );
  132. assert!(!e.0.is_valid());
  133. assert!(!e.1.is_valid());
  134. assert!(e.2.is_valid());
  135. }
  136. #[test]
  137. fn persist() {
  138. let e = Entry::new(Value::Null, Some(Instant::now()));
  139. assert!(!e.is_valid());
  140. e.persist();
  141. assert!(e.is_valid());
  142. }
  143. #[test]
  144. fn update_ttl() {
  145. let e = Entry::new(Value::Null, Some(Instant::now()));
  146. assert!(!e.is_valid());
  147. e.persist();
  148. assert!(e.is_valid());
  149. e.set_ttl(Instant::now());
  150. assert!(!e.is_valid());
  151. }
  152. }