mod.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. mod entry;
  2. mod expiration;
  3. use crate::{error::Error, value::Value};
  4. use bytes::Bytes;
  5. use entry::{new_version, Entry};
  6. use expiration::ExpirationDb;
  7. use log::trace;
  8. use parking_lot::{Mutex, RwLock};
  9. use seahash::hash;
  10. use std::{
  11. collections::HashMap,
  12. convert::{TryFrom, TryInto},
  13. ops::AddAssign,
  14. sync::Arc,
  15. thread,
  16. };
  17. use tokio::time::{Duration, Instant};
  18. #[derive(Debug)]
  19. pub struct Db {
  20. /// A vector of hashmaps.
  21. ///
  22. /// Instead of having a single HashMap, and having all threads fighting for
  23. /// blocking the single HashMap, we have a vector of N HashMap
  24. /// (configurable), which in theory allow to have faster reads and writes.
  25. ///
  26. /// Because all operations are always key specific, the key is used to hash
  27. /// and select to which HashMap the data might be stored.
  28. entries: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
  29. /// Data structure to store all expiring keys
  30. expirations: Arc<Mutex<ExpirationDb>>,
  31. /// Number of HashMaps that are available.
  32. slots: usize,
  33. /// A Database is attached to a conn_id. The entries and expiration data
  34. /// structures are shared between all connections.
  35. ///
  36. /// This particular database instace is attached to a conn_id, used to block
  37. /// all keys in case of a transaction.
  38. conn_id: u128,
  39. /// HashMap of all blocked keys by other connections. If a key appears in
  40. /// here and it is not being hold by the current connection, current
  41. /// connection must wait.
  42. tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
  43. }
  44. impl Db {
  45. pub fn new(slots: usize) -> Self {
  46. let mut entries = vec![];
  47. for _i in 0..slots {
  48. entries.push(RwLock::new(HashMap::new()));
  49. }
  50. Self {
  51. entries: Arc::new(entries),
  52. expirations: Arc::new(Mutex::new(ExpirationDb::new())),
  53. conn_id: 0,
  54. tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
  55. slots,
  56. }
  57. }
  58. pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Db {
  59. Self {
  60. entries: self.entries.clone(),
  61. tx_key_locks: self.tx_key_locks.clone(),
  62. expirations: self.expirations.clone(),
  63. conn_id,
  64. slots: self.slots,
  65. }
  66. }
  67. #[inline]
  68. fn get_slot(&self, key: &Bytes) -> usize {
  69. let id = (hash(key) as usize) % self.entries.len();
  70. trace!("selected slot {} for key {:?}", id, key);
  71. let waiting = Duration::from_nanos(100);
  72. while let Some(blocker) = self.tx_key_locks.read().get(key) {
  73. // Loop while the key we are trying to access is being blocked by a
  74. // connection in a transaction
  75. if *blocker == self.conn_id {
  76. // the key is being blocked by ourself, it is safe to break the
  77. // waiting loop
  78. break;
  79. }
  80. thread::sleep(waiting);
  81. }
  82. id
  83. }
  84. pub fn lock_keys(&self, keys: &[Bytes]) {
  85. let waiting = Duration::from_nanos(100);
  86. loop {
  87. let mut lock = self.tx_key_locks.write();
  88. let mut i = 0;
  89. for key in keys.iter() {
  90. if let Some(blocker) = lock.get(key) {
  91. if *blocker == self.conn_id {
  92. // It is blocked by us already.
  93. continue;
  94. }
  95. // It is blocked by another tx, we need to break
  96. // and retry to gain the lock over this key
  97. break;
  98. }
  99. lock.insert(key.clone(), self.conn_id);
  100. i += 1;
  101. }
  102. if i == keys.len() {
  103. // All the involved keys are successfully being blocked
  104. // exclusely.
  105. break;
  106. }
  107. // We need to sleep a bit and retry.
  108. drop(lock);
  109. thread::sleep(waiting);
  110. }
  111. }
  112. pub fn unlock_keys(&self, keys: &[Bytes]) {
  113. let mut lock = self.tx_key_locks.write();
  114. for key in keys.iter() {
  115. lock.remove(key);
  116. }
  117. }
  118. pub fn incr<
  119. T: ToString + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
  120. >(
  121. &self,
  122. key: &Bytes,
  123. incr_by: T,
  124. ) -> Result<Value, Error> {
  125. let mut entries = self.entries[self.get_slot(key)].write();
  126. match entries.get_mut(key) {
  127. Some(x) => {
  128. let value = x.get();
  129. let mut number: T = value.try_into()?;
  130. number += incr_by;
  131. x.change_value(number.to_string().as_str().into());
  132. Ok(number.into())
  133. }
  134. None => {
  135. entries.insert(
  136. key.clone(),
  137. Entry::new(incr_by.to_string().as_str().into(), None),
  138. );
  139. Ok((incr_by as T).into())
  140. }
  141. }
  142. }
  143. pub fn persist(&self, key: &Bytes) -> Value {
  144. let mut entries = self.entries[self.get_slot(key)].write();
  145. entries
  146. .get_mut(key)
  147. .filter(|x| x.is_valid())
  148. .map_or(0.into(), |x| {
  149. if x.has_ttl() {
  150. x.persist();
  151. 1.into()
  152. } else {
  153. 0.into()
  154. }
  155. })
  156. }
  157. pub fn set_ttl(&self, key: &Bytes, expires_in: Duration) -> Value {
  158. let mut entries = self.entries[self.get_slot(key)].write();
  159. let expires_at = Instant::now() + expires_in;
  160. entries
  161. .get_mut(key)
  162. .filter(|x| x.is_valid())
  163. .map_or(0.into(), |x| {
  164. self.expirations.lock().add(key, expires_at);
  165. x.set_ttl(expires_at);
  166. 1.into()
  167. })
  168. }
  169. pub fn del(&self, keys: &[Bytes]) -> Value {
  170. let mut expirations = self.expirations.lock();
  171. keys.iter()
  172. .filter_map(|key| {
  173. expirations.remove(key);
  174. self.entries[self.get_slot(key)].write().remove(key)
  175. })
  176. .filter(|key| key.is_valid())
  177. .count()
  178. .into()
  179. }
  180. pub fn exists(&self, keys: &[Bytes]) -> Value {
  181. let mut matches = 0;
  182. keys.iter()
  183. .map(|key| {
  184. let entries = self.entries[self.get_slot(key)].read();
  185. if entries.get(key).is_some() {
  186. matches += 1;
  187. }
  188. })
  189. .for_each(drop);
  190. matches.into()
  191. }
  192. pub fn get_map_or<F1, F2>(&self, key: &Bytes, found: F1, not_found: F2) -> Result<Value, Error>
  193. where
  194. F1: FnOnce(&Value) -> Result<Value, Error>,
  195. F2: FnOnce() -> Result<Value, Error>,
  196. {
  197. let entries = self.entries[self.get_slot(key)].read();
  198. let entry = entries.get(key).filter(|x| x.is_valid()).map(|e| e.get());
  199. if let Some(entry) = entry {
  200. found(entry)
  201. } else {
  202. // drop lock
  203. drop(entries);
  204. not_found()
  205. }
  206. }
  207. pub fn bump_version(&self, key: &Bytes) -> bool {
  208. let mut entries = self.entries[self.get_slot(key)].write();
  209. entries
  210. .get_mut(key)
  211. .filter(|x| x.is_valid())
  212. .map(|entry| {
  213. entry.bump_version();
  214. })
  215. .is_some()
  216. }
  217. pub fn get_version(&self, key: &Bytes) -> u128 {
  218. let entries = self.entries[self.get_slot(key)].read();
  219. entries
  220. .get(key)
  221. .filter(|x| x.is_valid())
  222. .map(|entry| entry.version())
  223. .unwrap_or_else(new_version)
  224. }
  225. pub fn get(&self, key: &Bytes) -> Value {
  226. let entries = self.entries[self.get_slot(key)].read();
  227. entries
  228. .get(key)
  229. .filter(|x| x.is_valid())
  230. .map_or(Value::Null, |x| x.clone_value())
  231. }
  232. pub fn get_multi(&self, keys: &[Bytes]) -> Value {
  233. keys.iter()
  234. .map(|key| {
  235. let entries = self.entries[self.get_slot(key)].read();
  236. entries
  237. .get(key)
  238. .filter(|x| x.is_valid() && x.is_clonable())
  239. .map_or(Value::Null, |x| x.clone_value())
  240. })
  241. .collect::<Vec<Value>>()
  242. .into()
  243. }
  244. pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
  245. let mut entries = self.entries[self.get_slot(key)].write();
  246. self.expirations.lock().remove(key);
  247. entries
  248. .insert(key.clone(), Entry::new(value.clone(), None))
  249. .filter(|x| x.is_valid())
  250. .map_or(Value::Null, |x| x.clone_value())
  251. }
  252. pub fn getdel(&self, key: &Bytes) -> Value {
  253. let mut entries = self.entries[self.get_slot(key)].write();
  254. entries.remove(key).map_or(Value::Null, |x| {
  255. self.expirations.lock().remove(key);
  256. x.clone_value()
  257. })
  258. }
  259. pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
  260. let mut entries = self.entries[self.get_slot(key)].write();
  261. let expires_at = expires_in.map(|duration| Instant::now() + duration);
  262. if let Some(expires_at) = expires_at {
  263. self.expirations.lock().add(key, expires_at);
  264. }
  265. entries.insert(key.clone(), Entry::new(value, expires_at));
  266. Value::Ok
  267. }
  268. pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
  269. let entries = self.entries[self.get_slot(key)].read();
  270. entries
  271. .get(key)
  272. .filter(|x| x.is_valid())
  273. .map(|x| x.get_ttl())
  274. }
  275. pub fn purge(&self) -> u64 {
  276. let mut expirations = self.expirations.lock();
  277. let mut removed = 0;
  278. trace!("Watching {} keys for expirations", expirations.len());
  279. let keys = expirations.get_expired_keys(None);
  280. drop(expirations);
  281. keys.iter()
  282. .map(|key| {
  283. let mut entries = self.entries[self.get_slot(key)].write();
  284. if entries.remove(key).is_some() {
  285. trace!("Removed key {:?} due timeout", key);
  286. removed += 1;
  287. }
  288. })
  289. .for_each(drop);
  290. removed
  291. }
  292. }
  293. #[cfg(test)]
  294. mod test {
  295. use super::*;
  296. use crate::bytes;
  297. #[test]
  298. fn incr_wrong_type() {
  299. let db = Db::new(100);
  300. db.set(&bytes!(b"num"), Value::Blob(bytes!("some string")), None);
  301. let r = db.incr(&bytes!("num"), 1);
  302. assert!(r.is_err());
  303. assert_eq!(Error::NotANumber, r.expect_err("should fail"));
  304. assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
  305. }
  306. #[test]
  307. fn incr_blob_float() {
  308. let db = Db::new(100);
  309. db.set(&bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
  310. assert_eq!(Ok(Value::Float(2.2)), db.incr(&bytes!("num"), 1.1));
  311. assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
  312. }
  313. #[test]
  314. fn incr_blob_int_float() {
  315. let db = Db::new(100);
  316. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  317. assert_eq!(Ok(Value::Float(2.1)), db.incr(&bytes!("num"), 1.1));
  318. assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
  319. }
  320. #[test]
  321. fn incr_blob_int() {
  322. let db = Db::new(100);
  323. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  324. assert_eq!(Ok(Value::Integer(2)), db.incr(&bytes!("num"), 1));
  325. assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
  326. }
  327. #[test]
  328. fn incr_blob_int_set() {
  329. let db = Db::new(100);
  330. assert_eq!(Ok(Value::Integer(1)), db.incr(&bytes!("num"), 1));
  331. assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
  332. }
  333. #[test]
  334. fn incr_blob_float_set() {
  335. let db = Db::new(100);
  336. assert_eq!(Ok(Value::Float(1.1)), db.incr(&bytes!("num"), 1.1));
  337. assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
  338. }
  339. #[test]
  340. fn del() {
  341. let db = Db::new(100);
  342. db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  343. db.set(&bytes!(b"valid"), Value::Ok, None);
  344. db.set(
  345. &bytes!(b"expiring"),
  346. Value::Ok,
  347. Some(Duration::from_secs(5)),
  348. );
  349. assert_eq!(
  350. Value::Integer(2),
  351. db.del(&[
  352. bytes!(b"expired"),
  353. bytes!(b"valid"),
  354. bytes!(b"expiring"),
  355. bytes!(b"not_existing_key")
  356. ])
  357. );
  358. }
  359. #[test]
  360. fn ttl() {
  361. let db = Db::new(100);
  362. db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  363. db.set(&bytes!(b"valid"), Value::Ok, None);
  364. db.set(
  365. &bytes!(b"expiring"),
  366. Value::Ok,
  367. Some(Duration::from_secs(5)),
  368. );
  369. assert_eq!(None, db.ttl(&bytes!(b"expired")));
  370. assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
  371. assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
  372. assert!(match db.ttl(&bytes!(b"expiring")) {
  373. Some(Some(_)) => true,
  374. _ => false,
  375. });
  376. }
  377. #[test]
  378. fn purge_keys() {
  379. let db = Db::new(100);
  380. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  381. // Expired keys should not be returned, even if they are not yet
  382. // removed by the purge process.
  383. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  384. // Purge twice
  385. assert_eq!(1, db.purge());
  386. assert_eq!(0, db.purge());
  387. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  388. }
  389. #[test]
  390. fn replace_purge_keys() {
  391. let db = Db::new(100);
  392. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  393. // Expired keys should not be returned, even if they are not yet
  394. // removed by the purge process.
  395. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  396. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
  397. assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
  398. // Purge should return 0 as the expired key has been removed already
  399. assert_eq!(0, db.purge());
  400. }
  401. }