expiration.rs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. use bytes::Bytes;
  2. use std::{
  3. collections::{BTreeMap, HashMap},
  4. ops::Deref,
  5. };
  6. use tokio::time::Instant;
  7. /// ExpirationId
  8. ///
  9. /// The internal data structure is a B-Tree and the key is the expiration time,
  10. /// all data are naturally sorted by expiration time. Because it is possible
  11. /// that different keys expire at the same instant, an internal counter is added
  12. /// to the ID to make each ID unique (and sorted by Expiration Time +
  13. /// Incremental
  14. /// counter).
  15. #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)]
  16. pub struct ExpirationId(pub (Instant, u64));
  17. impl Deref for ExpirationId {
  18. type Target = Instant;
  19. fn deref(&self) -> &Self::Target {
  20. &self.0 .0
  21. }
  22. }
  23. #[derive(Debug)]
  24. pub struct ExpirationDb {
  25. /// B-Tree Map of expiring keys
  26. expiring_keys: BTreeMap<ExpirationId, Bytes>,
  27. /// Hash which contains the keys and their ExpirationId.
  28. keys: HashMap<Bytes, ExpirationId>,
  29. next_id: u64,
  30. }
  31. impl ExpirationDb {
  32. pub fn new() -> Self {
  33. Self {
  34. expiring_keys: BTreeMap::new(),
  35. keys: HashMap::new(),
  36. next_id: 0,
  37. }
  38. }
  39. pub fn add(&mut self, key: &Bytes, expires_at: Instant) {
  40. let entry_id = ExpirationId((expires_at, self.next_id));
  41. if let Some(prev) = self.keys.remove(key) {
  42. // Another key with expiration is already known, it has
  43. // to be removed before adding a new one
  44. self.expiring_keys.remove(&prev);
  45. }
  46. self.expiring_keys.insert(entry_id, key.clone());
  47. self.keys.insert(key.clone(), entry_id);
  48. self.next_id += 1;
  49. }
  50. pub fn has(&self, key: &Bytes) -> bool {
  51. self.keys.get(key).is_some()
  52. }
  53. pub fn remove(&mut self, key: &Bytes) -> bool {
  54. if let Some(prev) = self.keys.remove(key) {
  55. // Another key with expiration is already known, it has
  56. // to be removed before adding a new one
  57. self.expiring_keys.remove(&prev);
  58. true
  59. } else {
  60. false
  61. }
  62. }
  63. pub fn len(&self) -> usize {
  64. self.expiring_keys.len()
  65. }
  66. /// Returns a list of expired keys, these keys are removed from the internal
  67. /// data structure which is keeping track of expiring keys.
  68. pub fn get_expired_keys(&mut self, now: Option<Instant>) -> Vec<Bytes> {
  69. let now = now.unwrap_or_else(Instant::now);
  70. let mut expired_keys = vec![];
  71. for (key, value) in self.expiring_keys.iter_mut() {
  72. if **key > now {
  73. break;
  74. }
  75. expired_keys.push((*key, value.clone()));
  76. self.keys.remove(value);
  77. }
  78. expired_keys
  79. .iter()
  80. .map(|(k, v)| {
  81. self.expiring_keys.remove(k);
  82. v.to_owned()
  83. })
  84. .collect()
  85. }
  86. }
  87. #[cfg(test)]
  88. mod test {
  89. use super::*;
  90. use crate::bytes;
  91. use tokio::time::{Duration, Instant};
  92. #[test]
  93. fn two_entires_same_expiration() {
  94. let mut db = ExpirationDb::new();
  95. let key1 = bytes!(b"key");
  96. let key2 = bytes!(b"bar");
  97. let key3 = bytes!(b"xxx");
  98. let expiration = Instant::now() + Duration::from_secs(5);
  99. db.add(&key1, expiration);
  100. db.add(&key2, expiration);
  101. db.add(&key3, expiration);
  102. assert_eq!(3, db.len());
  103. }
  104. #[test]
  105. fn remove_prev_expiration() {
  106. let mut db = ExpirationDb::new();
  107. let key1 = bytes!(b"key");
  108. let key2 = bytes!(b"bar");
  109. let expiration = Instant::now() + Duration::from_secs(5);
  110. db.add(&key1, expiration);
  111. db.add(&key2, expiration);
  112. db.add(&key1, expiration);
  113. assert_eq!(2, db.len());
  114. }
  115. #[test]
  116. fn get_expiration() {
  117. let mut db = ExpirationDb::new();
  118. let keys = vec![
  119. ("hix".into(), Instant::now() + Duration::from_secs(15)),
  120. ("key".into(), Instant::now() + Duration::from_secs(2)),
  121. ("bar".into(), Instant::now() + Duration::from_secs(3)),
  122. ("hi".into(), Instant::now() + Duration::from_secs(3)),
  123. ];
  124. keys.iter()
  125. .map(|v| {
  126. db.add(&v.0, v.1);
  127. })
  128. .for_each(drop);
  129. assert_eq!(db.len(), keys.len());
  130. assert_eq!(0, db.get_expired_keys(Some(Instant::now())).len());
  131. assert_eq!(db.len(), keys.len());
  132. assert_eq!(
  133. vec![keys[1].0.clone()],
  134. db.get_expired_keys(Some(Instant::now() + Duration::from_secs(2)))
  135. );
  136. assert_eq!(3, db.len());
  137. assert_eq!(
  138. vec![keys[2].0.clone(), keys[3].0.clone()],
  139. db.get_expired_keys(Some(Instant::now() + Duration::from_secs(4)))
  140. );
  141. assert_eq!(1, db.len());
  142. }
  143. #[test]
  144. pub fn remove() {
  145. let mut db = ExpirationDb::new();
  146. let keys = vec![
  147. ("hix".into(), Instant::now() + Duration::from_secs(15)),
  148. ("key".into(), Instant::now() + Duration::from_secs(2)),
  149. ("bar".into(), Instant::now() + Duration::from_secs(3)),
  150. ("hi".into(), Instant::now() + Duration::from_secs(3)),
  151. ];
  152. keys.iter()
  153. .map(|v| {
  154. db.add(&(v.0), v.1);
  155. })
  156. .for_each(drop);
  157. assert_eq!(keys.len(), db.len());
  158. assert!(db.remove(&keys[0].0));
  159. assert!(!db.remove(&keys[0].0));
  160. assert_eq!(keys.len() - 1, db.len());
  161. }
  162. }