expiration.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. use bytes::Bytes;
  2. use std::collections::{BTreeMap, HashMap};
  3. use tokio::time::Instant;
  4. /// ExpirationId
  5. ///
  6. /// The internal data structure is a B-Tree and the key is the expiration time,
  7. /// all data are naturally sorted by expiration time. Because it is possible
  8. /// that different keys expire at the same instant, an internal counter is added
  9. /// to the ID to make each ID unique (and sorted by Expiration Time +
  10. /// Incremental
  11. /// counter).
  12. #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)]
  13. pub struct ExpirationId(pub (Instant, u64));
  14. #[derive(Debug)]
  15. pub struct ExpirationDb {
  16. /// B-Tree Map of expiring keys
  17. expiring_keys: BTreeMap<ExpirationId, Bytes>,
  18. /// Hash which contains the keys and their ExpirationId.
  19. keys: HashMap<Bytes, ExpirationId>,
  20. next_id: u64,
  21. }
  22. impl ExpirationDb {
  23. pub fn new() -> Self {
  24. Self {
  25. expiring_keys: BTreeMap::new(),
  26. keys: HashMap::new(),
  27. next_id: 0,
  28. }
  29. }
  30. pub fn add(&mut self, key: &Bytes, expires_at: Instant) {
  31. let entry_id = ExpirationId((expires_at, self.next_id));
  32. if let Some(prev) = self.keys.remove(key) {
  33. // Another key with expiration is already known, it has
  34. // to be removed before adding a new one
  35. self.expiring_keys.remove(&prev);
  36. }
  37. self.expiring_keys.insert(entry_id, key.clone());
  38. self.keys.insert(key.clone(), entry_id);
  39. self.next_id += 1;
  40. }
  41. pub fn remove(&mut self, key: &Bytes) -> bool {
  42. if let Some(prev) = self.keys.remove(key) {
  43. // Another key with expiration is already known, it has
  44. // to be removed before adding a new one
  45. self.expiring_keys.remove(&prev);
  46. true
  47. } else {
  48. false
  49. }
  50. }
  51. pub fn len(&self) -> usize {
  52. self.expiring_keys.len()
  53. }
  54. /// Returns a list of expired keys, these keys are removed from the internal
  55. /// data structure which is keeping track of expiring keys.
  56. pub fn get_expired_keys(&mut self, now: Option<Instant>) -> Vec<Bytes> {
  57. let now = now.unwrap_or_else(Instant::now);
  58. let mut expiring_keys = vec![];
  59. for (key, value) in self.expiring_keys.iter_mut() {
  60. if key.0 .0 > now {
  61. break;
  62. }
  63. expiring_keys.push((*key, value.clone()));
  64. self.keys.remove(value);
  65. }
  66. expiring_keys
  67. .iter()
  68. .map(|(k, v)| {
  69. self.expiring_keys.remove(k);
  70. v.to_owned()
  71. })
  72. .collect()
  73. }
  74. }
  75. #[cfg(test)]
  76. mod test {
  77. use super::*;
  78. use tokio::time::{Duration, Instant};
  79. #[test]
  80. fn two_entires_same_expiration() {
  81. let mut db = ExpirationDb::new();
  82. let key1 = Bytes::from(&b"key"[..]);
  83. let key2 = Bytes::from(&b"bar"[..]);
  84. let key3 = Bytes::from(&b"xxx"[..]);
  85. let expiration = Instant::now() + Duration::from_secs(5);
  86. db.add(&key1, expiration);
  87. db.add(&key2, expiration);
  88. db.add(&key3, expiration);
  89. assert_eq!(3, db.len());
  90. }
  91. #[test]
  92. fn remove_prev_expiration() {
  93. let mut db = ExpirationDb::new();
  94. let key1 = Bytes::from(&b"key"[..]);
  95. let key2 = Bytes::from(&b"bar"[..]);
  96. let expiration = Instant::now() + Duration::from_secs(5);
  97. db.add(&key1, expiration);
  98. db.add(&key2, expiration);
  99. db.add(&key1, expiration);
  100. assert_eq!(2, db.len());
  101. }
  102. #[test]
  103. fn get_expiration() {
  104. let mut db = ExpirationDb::new();
  105. let keys = vec![
  106. (
  107. Bytes::from(&b"hix"[..]),
  108. Instant::now() + Duration::from_secs(15),
  109. ),
  110. (
  111. Bytes::from(&b"key"[..]),
  112. Instant::now() + Duration::from_secs(2),
  113. ),
  114. (
  115. Bytes::from(&b"bar"[..]),
  116. Instant::now() + Duration::from_secs(3),
  117. ),
  118. (
  119. Bytes::from(&b"hi"[..]),
  120. Instant::now() + Duration::from_secs(3),
  121. ),
  122. ];
  123. keys.iter()
  124. .map(|v| {
  125. db.add(&v.0, v.1);
  126. })
  127. .for_each(drop);
  128. assert_eq!(db.len(), keys.len());
  129. assert_eq!(0, db.get_expired_keys(Some(Instant::now())).len());
  130. assert_eq!(db.len(), keys.len());
  131. assert_eq!(
  132. vec![keys[1].0.clone()],
  133. db.get_expired_keys(Some(Instant::now() + Duration::from_secs(2)))
  134. );
  135. assert_eq!(3, db.len());
  136. assert_eq!(
  137. vec![keys[2].0.clone(), keys[3].0.clone()],
  138. db.get_expired_keys(Some(Instant::now() + Duration::from_secs(4)))
  139. );
  140. assert_eq!(1, db.len());
  141. }
  142. #[test]
  143. pub fn remove() {
  144. let mut db = ExpirationDb::new();
  145. let keys = vec![
  146. (
  147. Bytes::from(&b"hix"[..]),
  148. Instant::now() + Duration::from_secs(15),
  149. ),
  150. (
  151. Bytes::from(&b"key"[..]),
  152. Instant::now() + Duration::from_secs(2),
  153. ),
  154. (
  155. Bytes::from(&b"bar"[..]),
  156. Instant::now() + Duration::from_secs(3),
  157. ),
  158. (
  159. Bytes::from(&b"hi"[..]),
  160. Instant::now() + Duration::from_secs(3),
  161. ),
  162. ];
  163. keys.iter()
  164. .map(|v| {
  165. db.add(&v.0, v.1);
  166. })
  167. .for_each(drop);
  168. assert_eq!(keys.len(), db.len());
  169. assert!(db.remove(&keys[0].0));
  170. assert!(!db.remove(&keys[0].0));
  171. assert_eq!(keys.len() - 1, db.len());
  172. }
  173. }