mod.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. mod entry;
  2. mod expiration;
  3. use crate::{error::Error, value::Value};
  4. use bytes::Bytes;
  5. use entry::Entry;
  6. use expiration::ExpirationDb;
  7. use log::trace;
  8. use seahash::hash;
  9. use std::{
  10. collections::HashMap,
  11. convert::{TryFrom, TryInto},
  12. ops::AddAssign,
  13. sync::{Mutex, RwLock},
  14. };
  15. use tokio::time::{Duration, Instant};
  16. #[derive(Debug)]
  17. pub struct Db {
  18. /// A vector of hashmaps.
  19. ///
  20. /// Instead of having a single HashMap, and having all threads fighting for
  21. /// blocking the single HashMap, we have a vector of N HashMap
  22. /// (configurable), which in theory allow to have faster reads and writes.
  23. ///
  24. /// Because all operations are always key specific, the key is used to hash
  25. /// and select to which HashMap the data might be stored.
  26. entries: Vec<RwLock<HashMap<Bytes, Entry>>>,
  27. /// Data structure to store all expiring keys
  28. expirations: Mutex<ExpirationDb>,
  29. /// Number of HashMaps that are available.
  30. slots: usize,
  31. }
  32. impl Db {
  33. pub fn new(slots: usize) -> Self {
  34. let mut entries = vec![];
  35. for _i in 0..slots {
  36. entries.push(RwLock::new(HashMap::new()));
  37. }
  38. Self {
  39. entries,
  40. expirations: Mutex::new(ExpirationDb::new()),
  41. slots,
  42. }
  43. }
  44. #[inline]
  45. fn get_slot(&self, key: &Bytes) -> usize {
  46. let id = (hash(key) as usize) % self.entries.len();
  47. trace!("selected slot {} for key {:?}", id, key);
  48. id
  49. }
  50. pub fn incr<
  51. T: ToString + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
  52. >(
  53. &self,
  54. key: &Bytes,
  55. incr_by: T,
  56. ) -> Result<Value, Error> {
  57. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  58. match entries.get_mut(key) {
  59. Some(x) => {
  60. let value = x.get();
  61. let mut number: T = value.try_into()?;
  62. number += incr_by;
  63. x.change_value(number.to_string().as_str().into());
  64. Ok(number.into())
  65. }
  66. None => {
  67. entries.insert(
  68. key.clone(),
  69. Entry::new(incr_by.to_string().as_str().into(), None),
  70. );
  71. Ok((incr_by as T).into())
  72. }
  73. }
  74. }
  75. pub fn persist(&self, key: &Bytes) -> Value {
  76. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  77. entries
  78. .get_mut(key)
  79. .filter(|x| x.is_valid())
  80. .map_or(0.into(), |x| {
  81. if x.has_ttl() {
  82. x.persist();
  83. 1.into()
  84. } else {
  85. 0.into()
  86. }
  87. })
  88. }
  89. pub fn set_ttl(&self, key: &Bytes, expires_in: Duration) -> Value {
  90. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  91. let expires_at = Instant::now() + expires_in;
  92. entries
  93. .get_mut(key)
  94. .filter(|x| x.is_valid())
  95. .map_or(0.into(), |x| {
  96. self.expirations.lock().unwrap().add(key, expires_at);
  97. x.set_ttl(expires_at);
  98. 1.into()
  99. })
  100. }
  101. pub fn del(&self, keys: &[Bytes]) -> Value {
  102. let mut deleted = 0;
  103. let mut expirations = self.expirations.lock().unwrap();
  104. keys.iter()
  105. .map(|key| {
  106. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  107. if let Some(entry) = entries.remove(key) {
  108. expirations.remove(key);
  109. if entry.is_valid() {
  110. deleted += 1;
  111. }
  112. }
  113. })
  114. .for_each(drop);
  115. deleted.into()
  116. }
  117. pub fn exists(&self, keys: &[Bytes]) -> Value {
  118. let mut matches = 0;
  119. keys.iter()
  120. .map(|key| {
  121. let entries = self.entries[self.get_slot(key)].read().unwrap();
  122. if entries.get(key).is_some() {
  123. matches += 1;
  124. }
  125. })
  126. .for_each(drop);
  127. matches.into()
  128. }
  129. pub fn get_map_or<F1, F2>(&self, key: &Bytes, found: F1, not_found: F2) -> Result<Value, Error>
  130. where
  131. F1: FnOnce(&Value) -> Result<Value, Error>,
  132. F2: FnOnce() -> Result<Value, Error>,
  133. {
  134. let entries = self.entries[self.get_slot(key)].read().unwrap();
  135. let entry = entries.get(key).filter(|x| x.is_valid()).map(|e| e.get());
  136. if let Some(entry) = entry {
  137. found(entry)
  138. } else {
  139. // drop lock
  140. drop(entries);
  141. not_found()
  142. }
  143. }
  144. pub fn get(&self, key: &Bytes) -> Value {
  145. let entries = self.entries[self.get_slot(key)].read().unwrap();
  146. entries
  147. .get(key)
  148. .filter(|x| x.is_valid())
  149. .map_or(Value::Null, |x| x.clone_value())
  150. }
  151. pub fn get_multi(&self, keys: &[Bytes]) -> Value {
  152. keys.iter()
  153. .map(|key| {
  154. let entries = self.entries[self.get_slot(key)].read().unwrap();
  155. entries
  156. .get(key)
  157. .filter(|x| x.is_valid() && x.is_clonable())
  158. .map_or(Value::Null, |x| x.clone_value())
  159. })
  160. .collect::<Vec<Value>>()
  161. .into()
  162. }
  163. pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
  164. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  165. self.expirations.lock().unwrap().remove(key);
  166. entries
  167. .insert(key.clone(), Entry::new(value.clone(), None))
  168. .filter(|x| x.is_valid())
  169. .map_or(Value::Null, |x| x.clone_value())
  170. }
  171. pub fn getdel(&self, key: &Bytes) -> Value {
  172. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  173. entries.remove(key).map_or(Value::Null, |x| {
  174. self.expirations.lock().unwrap().remove(key);
  175. x.clone_value()
  176. })
  177. }
  178. pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
  179. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  180. let expires_at = expires_in.map(|duration| Instant::now() + duration);
  181. if let Some(expires_at) = expires_at {
  182. self.expirations.lock().unwrap().add(key, expires_at);
  183. }
  184. entries.insert(key.clone(), Entry::new(value, expires_at));
  185. Value::OK
  186. }
  187. pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
  188. let entries = self.entries[self.get_slot(key)].read().unwrap();
  189. entries
  190. .get(key)
  191. .filter(|x| x.is_valid())
  192. .map(|x| x.get_ttl())
  193. }
  194. pub fn purge(&self) -> u64 {
  195. let mut expirations = self.expirations.lock().unwrap();
  196. let mut removed = 0;
  197. trace!("Watching {} keys for expirations", expirations.len());
  198. let keys = expirations.get_expired_keys(None);
  199. drop(expirations);
  200. keys.iter()
  201. .map(|key| {
  202. let mut entries = self.entries[self.get_slot(key)].write().unwrap();
  203. if entries.remove(key).is_some() {
  204. trace!("Removed key {:?} due timeout", key);
  205. removed += 1;
  206. }
  207. })
  208. .for_each(drop);
  209. removed
  210. }
  211. }
  212. #[cfg(test)]
  213. mod test {
  214. use super::*;
  215. use crate::bytes;
  216. #[test]
  217. fn incr_wrong_type() {
  218. let db = Db::new(100);
  219. db.set(&bytes!(b"num"), Value::Blob(bytes!("some string")), None);
  220. let r = db.incr(&bytes!("num"), 1);
  221. assert!(r.is_err());
  222. assert_eq!(Error::NotANumber, r.expect_err("should fail"));
  223. assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
  224. }
  225. #[test]
  226. fn incr_blob_float() {
  227. let db = Db::new(100);
  228. db.set(&bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
  229. assert_eq!(Value::Float(2.2), db.incr(&bytes!("num"), 1.1).unwrap());
  230. assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
  231. }
  232. #[test]
  233. fn incr_blob_int_float() {
  234. let db = Db::new(100);
  235. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  236. assert_eq!(Value::Float(2.1), db.incr(&bytes!("num"), 1.1).unwrap());
  237. assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
  238. }
  239. #[test]
  240. fn incr_blob_int() {
  241. let db = Db::new(100);
  242. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  243. assert_eq!(Value::Integer(2), db.incr(&bytes!("num"), 1).unwrap());
  244. assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
  245. }
  246. #[test]
  247. fn incr_blob_int_set() {
  248. let db = Db::new(100);
  249. assert_eq!(Value::Integer(1), db.incr(&bytes!("num"), 1).unwrap());
  250. assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
  251. }
  252. #[test]
  253. fn incr_blob_float_set() {
  254. let db = Db::new(100);
  255. assert_eq!(Value::Float(1.1), db.incr(&bytes!("num"), 1.1).unwrap());
  256. assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
  257. }
  258. #[test]
  259. fn del() {
  260. let db = Db::new(100);
  261. db.set(&bytes!(b"expired"), Value::OK, Some(Duration::from_secs(0)));
  262. db.set(&bytes!(b"valid"), Value::OK, None);
  263. db.set(
  264. &bytes!(b"expiring"),
  265. Value::OK,
  266. Some(Duration::from_secs(5)),
  267. );
  268. assert_eq!(
  269. Value::Integer(2),
  270. db.del(&[
  271. bytes!(b"expired"),
  272. bytes!(b"valid"),
  273. bytes!(b"expiring"),
  274. bytes!(b"not_existing_key")
  275. ])
  276. );
  277. }
  278. #[test]
  279. fn ttl() {
  280. let db = Db::new(100);
  281. db.set(&bytes!(b"expired"), Value::OK, Some(Duration::from_secs(0)));
  282. db.set(&bytes!(b"valid"), Value::OK, None);
  283. db.set(
  284. &bytes!(b"expiring"),
  285. Value::OK,
  286. Some(Duration::from_secs(5)),
  287. );
  288. assert_eq!(None, db.ttl(&bytes!(b"expired")));
  289. assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
  290. assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
  291. assert!(match db.ttl(&bytes!(b"expiring")) {
  292. Some(Some(_)) => true,
  293. _ => false,
  294. });
  295. }
  296. #[test]
  297. fn purge_keys() {
  298. let db = Db::new(100);
  299. db.set(&bytes!(b"one"), Value::OK, Some(Duration::from_secs(0)));
  300. // Expired keys should not be returned, even if they are not yet
  301. // removed by the purge process.
  302. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  303. // Purge twice
  304. assert_eq!(1, db.purge());
  305. assert_eq!(0, db.purge());
  306. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  307. }
  308. #[test]
  309. fn replace_purge_keys() {
  310. let db = Db::new(100);
  311. db.set(&bytes!(b"one"), Value::OK, Some(Duration::from_secs(0)));
  312. // Expired keys should not be returned, even if they are not yet
  313. // removed by the purge process.
  314. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  315. db.set(&bytes!(b"one"), Value::OK, Some(Duration::from_secs(5)));
  316. assert_eq!(Value::OK, db.get(&bytes!(b"one")));
  317. // Purge should return 0 as the expired key has been removed already
  318. assert_eq!(0, db.purge());
  319. }
  320. }