mod.rs 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236
  1. //! # in-memory database
  2. //!
  3. //! This database module is the core of the miniredis project. All other modules around this
  4. //! database module.
  5. use self::utils::{far_future, ExpirationOpts, Override};
  6. use crate::{
  7. error::Error,
  8. value::{
  9. bytes_to_number,
  10. cursor::Cursor,
  11. typ::{Typ, ValueTyp},
  12. VDebug, Value,
  13. },
  14. };
  15. use bytes::{BufMut, Bytes, BytesMut};
  16. use core::num;
  17. use entry::{new_version, Entry};
  18. use expiration::ExpirationDb;
  19. use glob::Pattern;
  20. use log::trace;
  21. use num_traits::CheckedAdd;
  22. use parking_lot::{Mutex, RwLock};
  23. use seahash::hash;
  24. use std::{
  25. collections::HashMap,
  26. convert::{TryFrom, TryInto},
  27. ops::{AddAssign, Deref},
  28. str::FromStr,
  29. sync::Arc,
  30. thread,
  31. };
  32. use tokio::time::{Duration, Instant};
  33. mod entry;
  34. mod expiration;
  35. pub mod pool;
  36. pub mod scan;
  37. pub(crate) mod utils;
  38. /// Database structure
  39. ///
  40. /// Each connection has their own clone of the database and the conn_id is stored in each instance.
  41. /// The slots property is shared for all connections.
  42. ///
  43. /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
  44. /// HashMaps. Each key is pre-sharded and a bucket is selected. By doing this pre-step instead of
  45. /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
  46. /// making this database implementation thread-friendly. The number of number_of_slots available cannot be
  47. /// changed at runtime.
  48. ///
  49. /// The database is also aware of other connections locking other keys exclusively (for
  50. /// transactions).
  51. ///
  52. /// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
  53. /// versioning (in practice the nanosecond of last modification).
  54. #[derive(Debug)]
  55. pub struct Db {
  56. /// A vector of hashmaps.
  57. ///
  58. /// Instead of having a single HashMap, and having all threads fighting for
  59. /// blocking the single HashMap, we have a vector of N HashMap
  60. /// (configurable), which in theory allow to have faster reads and writes.
  61. ///
  62. /// Because all operations are always key specific, the key is used to hash
  63. /// and select to which HashMap the data might be stored.
  64. slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
  65. /// Data structure to store all expiring keys
  66. expirations: Arc<Mutex<ExpirationDb>>,
  67. /// Number of HashMaps that are available.
  68. number_of_slots: usize,
  69. /// Databases unique ID. This is an internal identifier to avoid deadlocks
  70. /// when copying and moving data between databases.
  71. pub db_id: u128,
  72. /// Current connection ID
  73. ///
  74. /// A Database is attached to a conn_id. The slots and expiration data
  75. /// structures are shared between all connections, regardless of conn_id.
  76. ///
  77. /// This particular database instace is attached to a conn_id, which is used
  78. /// to lock keys exclusively for transactions and other atomic operations.
  79. conn_id: u128,
  80. /// HashMap of all blocked keys by other connections. If a key appears in
  81. /// here and it is not being hold by the current connection, current
  82. /// connection must wait.
  83. tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
  84. }
  85. impl Db {
  86. /// Creates a new database instance
  87. pub fn new(number_of_slots: usize) -> Self {
  88. let slots = (0..number_of_slots)
  89. .map(|_| RwLock::new(HashMap::new()))
  90. .collect();
  91. Self {
  92. slots: Arc::new(slots),
  93. expirations: Arc::new(Mutex::new(ExpirationDb::new())),
  94. conn_id: 0,
  95. db_id: new_version(),
  96. tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
  97. number_of_slots,
  98. }
  99. }
  100. /// Creates a new Database instance bound to a connection.
  101. ///
  102. /// This is particular useful when locking keys exclusively.
  103. ///
  104. /// All the internal data are shjared through an Arc.
  105. pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
  106. Arc::new(Self {
  107. slots: self.slots.clone(),
  108. tx_key_locks: self.tx_key_locks.clone(),
  109. expirations: self.expirations.clone(),
  110. conn_id,
  111. db_id: self.db_id,
  112. number_of_slots: self.number_of_slots,
  113. })
  114. }
  115. #[inline]
  116. /// Returns a slot where a key may be hosted.
  117. ///
  118. /// In order to avoid too much locks, instead of having a single hash a
  119. /// database instance is a set of hashes. Each key is pre-shared with a
  120. /// quick hashing algorithm to select a 'slot' or HashMap where it may be
  121. /// hosted.
  122. fn get_slot(&self, key: &Bytes) -> usize {
  123. let id = (hash(key) as usize) % self.number_of_slots;
  124. trace!("selected slot {} for key {:?}", id, key);
  125. let waiting = Duration::from_nanos(100);
  126. while let Some(blocker) = self.tx_key_locks.read().get(key) {
  127. // Loop while the key we are trying to access is being blocked by a
  128. // connection in a transaction
  129. if *blocker == self.conn_id {
  130. // the key is being blocked by ourself, it is safe to break the
  131. // waiting loop
  132. break;
  133. }
  134. thread::sleep(waiting);
  135. }
  136. id
  137. }
  138. /// Locks keys exclusively
  139. ///
  140. /// The locked keys are only accesible (read or write) by the connection
  141. /// that locked them, any other connection must wait until the locking
  142. /// connection releases them.
  143. ///
  144. /// This is used to simulate redis transactions. Transaction in Redis are
  145. /// atomic but pausing a multi threaded Redis just to keep the same promises
  146. /// was a bit extreme, that's the reason why a transaction will lock
  147. /// exclusively all keys involved.
  148. pub fn lock_keys(&self, keys: &[Bytes]) {
  149. let waiting = Duration::from_nanos(100);
  150. loop {
  151. let mut lock = self.tx_key_locks.write();
  152. let mut i = 0;
  153. for key in keys.iter() {
  154. if let Some(blocker) = lock.get(key) {
  155. if *blocker == self.conn_id {
  156. // It is blocked by us already.
  157. continue;
  158. }
  159. // It is blocked by another tx, we need to break
  160. // and retry to gain the lock over this key
  161. break;
  162. }
  163. lock.insert(key.clone(), self.conn_id);
  164. i += 1;
  165. }
  166. if i == keys.len() {
  167. // All the involved keys are successfully being blocked
  168. // exclusively.
  169. break;
  170. }
  171. // We need to sleep a bit and retry.
  172. drop(lock);
  173. thread::sleep(waiting);
  174. }
  175. }
  176. /// Releases the lock on keys
  177. pub fn unlock_keys(&self, keys: &[Bytes]) {
  178. let mut lock = self.tx_key_locks.write();
  179. for key in keys.iter() {
  180. lock.remove(key);
  181. }
  182. }
  183. /// Return debug info for a key
  184. pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
  185. let slot = self.slots[self.get_slot(key)].read();
  186. Ok(slot
  187. .get(key)
  188. .filter(|x| x.is_valid())
  189. .ok_or(Error::NotFound)?
  190. .value
  191. .debug())
  192. }
  193. /// Return the digest for each key. This used for testing only
  194. pub fn digest(&self, keys: &[Bytes]) -> Result<Vec<Value>, Error> {
  195. Ok(keys
  196. .iter()
  197. .map(|key| {
  198. let slot = self.slots[self.get_slot(key)].read();
  199. Value::Blob(
  200. slot.get(key)
  201. .filter(|v| v.is_valid())
  202. .map(|v| hex::encode(&v.value.digest()))
  203. .unwrap_or("00000".into())
  204. .as_str()
  205. .into(),
  206. )
  207. })
  208. .collect::<Vec<Value>>())
  209. }
  210. /// Flushes the entire database
  211. pub fn flushdb(&self) -> Result<Value, Error> {
  212. self.expirations.lock().flush();
  213. self.slots
  214. .iter()
  215. .map(|s| {
  216. let mut s = s.write();
  217. s.clear();
  218. })
  219. .for_each(drop);
  220. Ok(Value::Ok)
  221. }
  222. /// Returns the number of elements in the database
  223. pub fn len(&self) -> Result<usize, Error> {
  224. self.purge();
  225. Ok(self.slots.iter().map(|s| s.read().len()).sum())
  226. }
  227. /// Round numbers to store efficiently, specially float numbers. For instance `1.00` will be converted to `1`.
  228. fn round_numbers<T>(number: T) -> BytesMut
  229. where
  230. T: ToString,
  231. {
  232. let number_to_str = number.to_string();
  233. if number_to_str.find('.').is_none() {
  234. return number_to_str.as_bytes().into();
  235. }
  236. let number_to_str = number_to_str
  237. .trim_end_matches(|c| c == '0' || c == '.')
  238. .to_string();
  239. if number_to_str.is_empty() {
  240. "0"
  241. } else {
  242. number_to_str.as_str()
  243. }
  244. .into()
  245. }
  246. // Converts a given number to a correct Value, it should be used with Self::round_numbers()
  247. fn number_to_value(number: &[u8]) -> Result<Value, Error> {
  248. if number.iter().find(|x| **x == b'.').is_some() {
  249. Ok(Value::new(number))
  250. } else {
  251. Ok(Value::Integer(bytes_to_number(number)?))
  252. }
  253. }
  254. /// Increment a sub-key in a hash
  255. ///
  256. /// If the stored value cannot be converted into a number an error will be thrown
  257. pub fn hincrby<T>(
  258. &self,
  259. key: &Bytes,
  260. sub_key: &Bytes,
  261. incr_by: &Bytes,
  262. typ: &str,
  263. ) -> Result<Value, Error>
  264. where
  265. T: ToString
  266. + FromStr
  267. + CheckedAdd
  268. + for<'a> TryFrom<&'a Value, Error = Error>
  269. + Into<Value>
  270. + Copy,
  271. {
  272. let mut slot = self.slots[self.get_slot(key)].write();
  273. let mut incr_by: T =
  274. bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
  275. match slot.get_mut(key).filter(|x| x.is_valid()).map(|x| x.get()) {
  276. Some(Value::Hash(h)) => {
  277. let mut h = h.write();
  278. if let Some(n) = h.get(sub_key) {
  279. incr_by = incr_by
  280. .checked_add(
  281. &bytes_to_number(n)
  282. .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
  283. )
  284. .ok_or(Error::Overflow)?;
  285. }
  286. let incr_by_bytes = Self::round_numbers(incr_by).freeze();
  287. h.insert(sub_key.clone(), incr_by_bytes.clone());
  288. Self::number_to_value(&incr_by_bytes)
  289. }
  290. None => {
  291. #[allow(clippy::mutable_key_type)]
  292. let mut h = HashMap::new();
  293. let incr_by_bytes = Self::round_numbers(incr_by).freeze();
  294. h.insert(sub_key.clone(), incr_by_bytes.clone());
  295. let _ = slot.insert(key.clone(), Entry::new(h.into(), None));
  296. Self::number_to_value(&incr_by_bytes)
  297. }
  298. _ => Err(Error::WrongType),
  299. }
  300. }
  301. /// Increments a key's value by a given number
  302. ///
  303. /// If the stored value cannot be converted into a number an error will be
  304. /// thrown.
  305. pub fn incr<T>(&self, key: &Bytes, incr_by: T) -> Result<T, Error>
  306. where
  307. T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
  308. {
  309. let mut slot = self.slots[self.get_slot(key)].write();
  310. match slot.get_mut(key).filter(|x| x.is_valid()) {
  311. Some(x) => {
  312. if !x.is_clonable() {
  313. return Err(Error::WrongType);
  314. }
  315. let value = x.get();
  316. let mut number: T = value.try_into()?;
  317. number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
  318. x.change_value(Value::Blob(Self::round_numbers(number)));
  319. Ok(number)
  320. }
  321. None => {
  322. slot.insert(
  323. key.clone(),
  324. Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
  325. );
  326. Ok(incr_by)
  327. }
  328. }
  329. }
  330. /// Removes any expiration associated with a given key
  331. pub fn persist(&self, key: &Bytes) -> Value {
  332. let mut slot = self.slots[self.get_slot(key)].write();
  333. slot.get_mut(key)
  334. .filter(|x| x.is_valid())
  335. .map_or(0.into(), |x| {
  336. if x.has_ttl() {
  337. self.expirations.lock().remove(key);
  338. x.persist();
  339. 1.into()
  340. } else {
  341. 0.into()
  342. }
  343. })
  344. }
  345. /// Set time to live for a given key
  346. pub fn set_ttl(
  347. &self,
  348. key: &Bytes,
  349. expires_in: Duration,
  350. opts: ExpirationOpts,
  351. ) -> Result<Value, Error> {
  352. if (opts.NX && opts.XX) || (opts.NX && opts.GT) || (opts.NX && opts.LT) {
  353. return Err(Error::OptsNotCompatible("NX and XX, GT or LT".to_owned()));
  354. }
  355. if opts.GT && opts.LT {
  356. return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
  357. }
  358. let mut slot = self.slots[self.get_slot(key)].write();
  359. let expires_at = Instant::now()
  360. .checked_add(expires_in)
  361. .unwrap_or_else(far_future);
  362. Ok(slot
  363. .get_mut(key)
  364. .filter(|x| x.is_valid())
  365. .map_or(0.into(), |x| {
  366. let current_expire = x.get_ttl();
  367. if opts.NX && current_expire.is_some() {
  368. return 0.into();
  369. }
  370. if opts.XX && current_expire.is_none() {
  371. return 0.into();
  372. }
  373. if opts.GT {
  374. if let Some(current_expire) = current_expire {
  375. if expires_at <= current_expire {
  376. return 0.into();
  377. }
  378. } else {
  379. return 0.into();
  380. }
  381. }
  382. if opts.LT {
  383. if let Some(current_expire) = current_expire {
  384. if expires_at >= current_expire {
  385. return 0.into();
  386. }
  387. }
  388. }
  389. self.expirations.lock().add(key, expires_at);
  390. x.set_ttl(expires_at);
  391. 1.into()
  392. }))
  393. }
  394. /// Overwrites part of the string stored at key, starting at the specified
  395. /// offset, for the entire length of value. If the offset is larger than the
  396. /// current length of the string at key, the string is padded with zero-bytes to
  397. /// make offset fit. Non-existing keys are considered as empty strings, so this
  398. /// command will make sure it holds a string large enough to be able to set
  399. /// value at offset.
  400. pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
  401. let mut slot = self.slots[self.get_slot(key)].write();
  402. let value = slot.get_mut(key).map(|value| {
  403. if !value.is_valid() {
  404. self.expirations.lock().remove(key);
  405. value.persist();
  406. }
  407. value.get_mut()
  408. });
  409. if offset < 0 {
  410. return Err(Error::OutOfRange);
  411. }
  412. if offset >= 512 * 1024 * 1024 - 4 {
  413. return Err(Error::MaxAllowedSize);
  414. }
  415. let length = offset as usize + data.len();
  416. match value {
  417. Some(Value::Blob(bytes)) => {
  418. if bytes.capacity() < length {
  419. bytes.resize(length, 0);
  420. }
  421. let writer = &mut bytes[offset as usize..length];
  422. writer.copy_from_slice(data);
  423. Ok(bytes.len().into())
  424. }
  425. None => {
  426. if data.len() == 0 {
  427. return Ok(0.into());
  428. }
  429. let mut bytes = BytesMut::new();
  430. bytes.resize(length, 0);
  431. let writer = &mut bytes[offset as usize..];
  432. writer.copy_from_slice(data);
  433. slot.insert(key.clone(), Entry::new(Value::new(&bytes), None));
  434. Ok(bytes.len().into())
  435. }
  436. _ => Err(Error::WrongType),
  437. }
  438. }
  439. /// Copies a key
  440. pub fn copy(
  441. &self,
  442. source: &Bytes,
  443. target: &Bytes,
  444. replace: Override,
  445. target_db: Option<Arc<Db>>,
  446. ) -> Result<bool, Error> {
  447. let slot = self.slots[self.get_slot(source)].read();
  448. let value = if let Some(value) = slot.get(source).filter(|x| x.is_valid()) {
  449. value.clone()
  450. } else {
  451. return Ok(false);
  452. };
  453. drop(slot);
  454. if let Some(db) = target_db {
  455. if db.db_id == self.db_id && source == target {
  456. return Err(Error::SameEntry);
  457. }
  458. if replace == Override::No && db.exists(&[target.clone()]) > 0 {
  459. return Ok(false);
  460. }
  461. let _ = db.set_advanced(
  462. target,
  463. value.value.clone(),
  464. value.get_ttl().map(|v| v - Instant::now()),
  465. replace,
  466. false,
  467. false,
  468. );
  469. Ok(true)
  470. } else {
  471. if source == target {
  472. return Err(Error::SameEntry);
  473. }
  474. if replace == Override::No && self.exists(&[target.clone()]) > 0 {
  475. return Ok(false);
  476. }
  477. let mut slot = self.slots[self.get_slot(target)].write();
  478. slot.insert(target.clone(), value);
  479. Ok(true)
  480. }
  481. }
  482. /// Moves a given key between databases
  483. pub fn move_key(&self, source: &Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
  484. if self.db_id == target_db.db_id {
  485. return Err(Error::SameEntry);
  486. }
  487. let mut slot = self.slots[self.get_slot(source)].write();
  488. let (expires_in, value) = if let Some(value) = slot.get(source).filter(|v| v.is_valid()) {
  489. (
  490. value.get_ttl().map(|t| t - Instant::now()),
  491. value.value.clone(),
  492. )
  493. } else {
  494. return Ok(false);
  495. };
  496. if Value::Integer(1)
  497. == target_db.set_advanced(&source, value, expires_in, Override::No, false, false)
  498. {
  499. slot.remove(source);
  500. Ok(true)
  501. } else {
  502. Ok(false)
  503. }
  504. }
  505. /// Renames a key
  506. pub fn rename(
  507. &self,
  508. source: &Bytes,
  509. target: &Bytes,
  510. override_value: Override,
  511. ) -> Result<bool, Error> {
  512. let slot1 = self.get_slot(source);
  513. let slot2 = self.get_slot(target);
  514. if slot1 == slot2 {
  515. let mut slot = self.slots[slot1].write();
  516. if override_value == Override::No && slot.get(target).is_some() {
  517. return Ok(false);
  518. }
  519. if let Some(value) = slot.remove(source) {
  520. slot.insert(target.clone(), value);
  521. Ok(true)
  522. } else {
  523. Err(Error::NotFound)
  524. }
  525. } else {
  526. let mut slot1 = self.slots[slot1].write();
  527. let mut slot2 = self.slots[slot2].write();
  528. if override_value == Override::No && slot2.get(target).is_some() {
  529. return Ok(false);
  530. }
  531. if let Some(value) = slot1.remove(source) {
  532. slot2.insert(target.clone(), value);
  533. Ok(true)
  534. } else {
  535. Err(Error::NotFound)
  536. }
  537. }
  538. }
  539. /// Removes keys from the database
  540. pub fn del(&self, keys: &[Bytes]) -> Value {
  541. let mut expirations = self.expirations.lock();
  542. keys.iter()
  543. .filter_map(|key| {
  544. expirations.remove(key);
  545. self.slots[self.get_slot(key)].write().remove(key)
  546. })
  547. .filter(|key| key.is_valid())
  548. .count()
  549. .into()
  550. }
  551. /// Returns all keys that matches a given pattern. This is a very expensive command.
  552. pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
  553. let pattern = String::from_utf8_lossy(pattern);
  554. let pattern =
  555. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
  556. Ok(self
  557. .slots
  558. .iter()
  559. .map(|slot| {
  560. slot.read()
  561. .keys()
  562. .filter(|key| {
  563. let str_key = String::from_utf8_lossy(key);
  564. pattern.matches(&str_key)
  565. })
  566. .map(|key| Value::new(key))
  567. .collect::<Vec<Value>>()
  568. })
  569. .flatten()
  570. .collect())
  571. }
  572. /// Check if keys exists in the database
  573. pub fn exists(&self, keys: &[Bytes]) -> usize {
  574. let mut matches = 0;
  575. keys.iter()
  576. .map(|key| {
  577. let slot = self.slots[self.get_slot(key)].read();
  578. if let Some(key) = slot.get(key) {
  579. matches += if key.is_valid() { 1 } else { 0 };
  580. }
  581. })
  582. .for_each(drop);
  583. matches
  584. }
  585. /// get_map_or
  586. ///
  587. /// Instead of returning an entry of the database, to avoid clonning, this function will
  588. /// execute a callback function with the entry as a parameter. If no record is found another
  589. /// callback function is going to be executed, dropping the lock before doing so.
  590. ///
  591. /// If an entry is found, the lock is not dropped before doing the callback. Avoid inserting
  592. /// new entries. In this case the value is passed by reference, so it is possible to modify the
  593. /// entry itself.
  594. ///
  595. /// This function is useful to read non-scalar values from the database. Non-scalar values are
  596. /// forbidden to clone, attempting cloning will endup in an error (Error::WrongType)
  597. pub fn get_map_or<F1, F2>(&self, key: &Bytes, found: F1, not_found: F2) -> Result<Value, Error>
  598. where
  599. F1: FnOnce(&Value) -> Result<Value, Error>,
  600. F2: FnOnce() -> Result<Value, Error>,
  601. {
  602. let slot = self.slots[self.get_slot(key)].read();
  603. let entry = slot.get(key).filter(|x| x.is_valid()).map(|e| e.get());
  604. if let Some(entry) = entry {
  605. found(entry)
  606. } else {
  607. // drop lock
  608. drop(slot);
  609. not_found()
  610. }
  611. }
  612. /// Updates the entry version of a given key
  613. pub fn bump_version(&self, key: &Bytes) -> bool {
  614. let mut slot = self.slots[self.get_slot(key)].write();
  615. slot.get_mut(key)
  616. .filter(|x| x.is_valid())
  617. .map(|entry| {
  618. entry.bump_version();
  619. })
  620. .is_some()
  621. }
  622. /// Returns the version of a given key
  623. pub fn get_version(&self, key: &Bytes) -> u128 {
  624. let slot = self.slots[self.get_slot(key)].read();
  625. slot.get(key)
  626. .filter(|x| x.is_valid())
  627. .map(|entry| entry.version())
  628. .unwrap_or_default()
  629. }
  630. /// Returns the name of the value type
  631. pub fn get_data_type(&self, key: &Bytes) -> String {
  632. let slot = self.slots[self.get_slot(key)].read();
  633. slot.get(key)
  634. .filter(|x| x.is_valid())
  635. .map_or("none".to_owned(), |x| {
  636. Typ::get_type(x.get()).to_string().to_lowercase()
  637. })
  638. }
  639. /// Get a copy of an entry
  640. pub fn get(&self, key: &Bytes) -> Value {
  641. let slot = self.slots[self.get_slot(key)].read();
  642. slot.get(key)
  643. .filter(|x| x.is_valid())
  644. .map_or(Value::Null, |x| x.clone_value())
  645. }
  646. /// Get a copy of an entry and modifies the expiration of the key
  647. pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
  648. let mut slot = self.slots[self.get_slot(key)].write();
  649. slot.get_mut(key)
  650. .filter(|x| x.is_valid())
  651. .map(|value| {
  652. if make_persistent {
  653. self.expirations.lock().remove(key);
  654. value.persist();
  655. } else if let Some(expires_in) = expires_in {
  656. let expires_at = Instant::now()
  657. .checked_add(expires_in)
  658. .unwrap_or_else(far_future);
  659. self.expirations.lock().add(key, expires_at);
  660. value.set_ttl(expires_at);
  661. }
  662. value
  663. })
  664. .map_or(Value::Null, |x| x.clone_value())
  665. }
  666. /// Get multiple copies of entries
  667. pub fn get_multi(&self, keys: &[Bytes]) -> Value {
  668. keys.iter()
  669. .map(|key| {
  670. let slot = self.slots[self.get_slot(key)].read();
  671. slot.get(key)
  672. .filter(|x| x.is_valid() && x.is_clonable())
  673. .map_or(Value::Null, |x| x.clone_value())
  674. })
  675. .collect::<Vec<Value>>()
  676. .into()
  677. }
  678. /// Get a key or set a new value for the given key.
  679. pub fn getset(&self, key: &Bytes, value: Value) -> Value {
  680. let mut slot = self.slots[self.get_slot(key)].write();
  681. self.expirations.lock().remove(key);
  682. slot.insert(key.clone(), Entry::new(value, None))
  683. .filter(|x| x.is_valid())
  684. .map_or(Value::Null, |x| x.clone_value())
  685. }
  686. /// Takes an entry from the database.
  687. pub fn getdel(&self, key: &Bytes) -> Value {
  688. let mut slot = self.slots[self.get_slot(key)].write();
  689. slot.remove(key).map_or(Value::Null, |x| {
  690. self.expirations.lock().remove(key);
  691. x.clone_value()
  692. })
  693. }
  694. /// Set a key, value with an optional expiration time
  695. pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
  696. let mut slot = self.slots[self.get_slot(key)].write();
  697. let mut entry = slot.get_mut(key).filter(|x| x.is_valid());
  698. if let Some(entry) = slot.get_mut(key).filter(|x| x.is_valid()) {
  699. match entry.get_mut() {
  700. Value::Blob(value) => {
  701. value.put(value_to_append.as_ref());
  702. Ok(value.len().into())
  703. }
  704. _ => Err(Error::WrongType),
  705. }
  706. } else {
  707. slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
  708. Ok(value_to_append.len().into())
  709. }
  710. }
  711. /// Set multiple key/value pairs. Are involved keys are locked exclusively
  712. /// like a transaction.
  713. ///
  714. /// If override_all is set to false, all entries must be new entries or the
  715. /// entire operation fails, in this case 1 or is returned. Otherwise `Ok` is
  716. /// returned.
  717. pub fn multi_set(&self, key_values: &[Bytes], override_all: bool) -> Result<Value, Error> {
  718. if key_values.len() % 2 == 1 {
  719. return Err(Error::Syntax);
  720. }
  721. let keys = key_values
  722. .iter()
  723. .step_by(2)
  724. .cloned()
  725. .collect::<Vec<Bytes>>();
  726. self.lock_keys(&keys);
  727. if !override_all {
  728. for key in keys.iter() {
  729. let slot = self.slots[self.get_slot(key)].read();
  730. if slot.get(key).is_some() {
  731. self.unlock_keys(&keys);
  732. return Ok(0.into());
  733. }
  734. }
  735. }
  736. for (i, _) in key_values.iter().enumerate().step_by(2) {
  737. let mut slot = self.slots[self.get_slot(&key_values[i])].write();
  738. slot.insert(
  739. key_values[i].clone(),
  740. Entry::new(Value::new(&key_values[i + 1]), None),
  741. );
  742. }
  743. self.unlock_keys(&keys);
  744. if override_all {
  745. Ok(Value::Ok)
  746. } else {
  747. Ok(1.into())
  748. }
  749. }
  750. /// Set a key, value with an optional expiration time
  751. pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
  752. self.set_advanced(key, value, expires_in, Default::default(), false, false)
  753. }
  754. /// Set a value in the database with various settings
  755. pub fn set_advanced(
  756. &self,
  757. key: &Bytes,
  758. value: Value,
  759. expires_in: Option<Duration>,
  760. override_value: Override,
  761. keep_ttl: bool,
  762. return_previous: bool,
  763. ) -> Value {
  764. let mut slot = self.slots[self.get_slot(key)].write();
  765. let expires_at = expires_in.map(|duration| {
  766. Instant::now()
  767. .checked_add(duration)
  768. .unwrap_or_else(far_future)
  769. });
  770. let previous = slot.get(key).filter(|x| x.is_valid());
  771. let expires_at = if keep_ttl {
  772. if let Some(previous) = previous {
  773. previous.get_ttl()
  774. } else {
  775. expires_at
  776. }
  777. } else {
  778. expires_at
  779. };
  780. let to_return = if return_previous {
  781. let previous = previous.map_or(Value::Null, |v| v.clone_value());
  782. if previous.is_err() {
  783. // Error while trying to clone the previous value to return, we
  784. // must halt and return immediately.
  785. return previous;
  786. }
  787. Some(previous)
  788. } else {
  789. None
  790. };
  791. match override_value {
  792. Override::No => {
  793. if previous.is_some() {
  794. return if let Some(to_return) = to_return {
  795. to_return
  796. } else {
  797. 0.into()
  798. };
  799. }
  800. }
  801. Override::Only => {
  802. if previous.is_none() {
  803. return if let Some(to_return) = to_return {
  804. to_return
  805. } else {
  806. 0.into()
  807. };
  808. }
  809. }
  810. _ => {}
  811. };
  812. if let Some(expires_at) = expires_at {
  813. self.expirations.lock().add(key, expires_at);
  814. } else {
  815. /// Make sure to remove the new key (or replaced) from the
  816. /// expiration table (from any possible past value).
  817. self.expirations.lock().remove(key);
  818. }
  819. slot.insert(key.clone(), Entry::new(value, expires_at));
  820. if let Some(to_return) = to_return {
  821. to_return
  822. } else if override_value == Override::Yes {
  823. Value::Ok
  824. } else {
  825. 1.into()
  826. }
  827. }
  828. /// Returns the TTL of a given key
  829. pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
  830. let slot = self.slots[self.get_slot(key)].read();
  831. slot.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
  832. }
  833. /// Check whether a given key is in the list of keys to be purged or not.
  834. /// This function is mainly used for unit testing
  835. pub fn is_key_in_expiration_list(&self, key: &Bytes) -> bool {
  836. self.expirations.lock().has(key)
  837. }
  838. /// Remove expired entries from the database.
  839. ///
  840. /// This function should be called from a background thread every few seconds. Calling it more
  841. /// often is a waste of resources.
  842. ///
  843. /// Expired keys are automatically hidden by the database, this process is just claiming back
  844. /// the memory from those expired keys.
  845. pub fn purge(&self) -> u64 {
  846. let mut expirations = self.expirations.lock();
  847. let mut removed = 0;
  848. trace!("Watching {} keys for expirations", expirations.len());
  849. let keys = expirations.get_expired_keys(None);
  850. drop(expirations);
  851. keys.iter()
  852. .map(|key| {
  853. let mut slot = self.slots[self.get_slot(key)].write();
  854. if slot.remove(key).is_some() {
  855. trace!("Removed key {:?} due timeout", key);
  856. removed += 1;
  857. }
  858. })
  859. .for_each(drop);
  860. removed
  861. }
  862. }
  863. impl scan::Scan for Db {
  864. fn scan(
  865. &self,
  866. cursor: Cursor,
  867. pattern: Option<&Bytes>,
  868. count: Option<usize>,
  869. typ: Option<Typ>,
  870. ) -> Result<scan::Result, Error> {
  871. let mut keys = vec![];
  872. let mut slot_id = cursor.bucket as usize;
  873. let mut last_pos = cursor.last_position as usize;
  874. let pattern = pattern
  875. .map(|pattern| {
  876. let pattern = String::from_utf8_lossy(pattern);
  877. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
  878. })
  879. .transpose()?;
  880. loop {
  881. let slot = if let Some(value) = self.slots.get(slot_id) {
  882. value.read()
  883. } else {
  884. // We iterated through all the entries, time to signal that to
  885. // the client but returning a "0" cursor.
  886. slot_id = 0;
  887. last_pos = 0;
  888. break;
  889. };
  890. for (key, value) in slot.iter().skip(last_pos) {
  891. if !value.is_valid() {
  892. // Entry still exists in memory but it is not longer valid
  893. // and will soon be gargabe collected.
  894. last_pos += 1;
  895. continue;
  896. }
  897. if let Some(pattern) = &pattern {
  898. let str_key = String::from_utf8_lossy(key);
  899. if !pattern.matches(&str_key) {
  900. last_pos += 1;
  901. continue;
  902. }
  903. }
  904. if let Some(typ) = &typ {
  905. if !typ.is_value_type(value.get()) {
  906. last_pos += 1;
  907. continue;
  908. }
  909. }
  910. keys.push(Value::new(key));
  911. last_pos += 1;
  912. if keys.len() == count.unwrap_or(10) {
  913. break;
  914. }
  915. }
  916. if keys.len() == count.unwrap_or(10) {
  917. break;
  918. }
  919. last_pos = 0;
  920. slot_id += 1;
  921. }
  922. Ok(scan::Result {
  923. cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
  924. result: keys,
  925. })
  926. }
  927. }
  928. #[cfg(test)]
  929. mod test {
  930. use super::*;
  931. use crate::{bytes, db::scan::Scan, value::float::Float};
  932. use std::str::FromStr;
  933. #[test]
  934. fn incr_wrong_type() {
  935. let db = Db::new(100);
  936. db.set(&bytes!(b"num"), Value::Blob(bytes!("some string")), None);
  937. let r = db.incr(&bytes!("num"), 1);
  938. assert!(r.is_err());
  939. assert_eq!(Error::NotANumber, r.expect_err("should fail"));
  940. assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
  941. }
  942. #[test]
  943. fn incr_blob_float() {
  944. let db = Db::new(100);
  945. db.set(&bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
  946. assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  947. assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
  948. }
  949. #[test]
  950. fn incr_blob_int_float() {
  951. let db = Db::new(100);
  952. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  953. assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  954. assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
  955. }
  956. #[test]
  957. fn incr_blob_int() {
  958. let db = Db::new(100);
  959. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  960. assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
  961. assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
  962. }
  963. #[test]
  964. fn incr_blob_int_set() {
  965. let db = Db::new(100);
  966. assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
  967. assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
  968. }
  969. #[test]
  970. fn incr_blob_float_set() {
  971. let db = Db::new(100);
  972. assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  973. assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
  974. }
  975. #[test]
  976. fn del() {
  977. let db = Db::new(100);
  978. db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  979. db.set(&bytes!(b"valid"), Value::Ok, None);
  980. db.set(
  981. &bytes!(b"expiring"),
  982. Value::Ok,
  983. Some(Duration::from_secs(5)),
  984. );
  985. assert_eq!(
  986. Value::Integer(2),
  987. db.del(&[
  988. bytes!(b"expired"),
  989. bytes!(b"valid"),
  990. bytes!(b"expiring"),
  991. bytes!(b"not_existing_key")
  992. ])
  993. );
  994. }
  995. #[test]
  996. fn ttl() {
  997. let db = Db::new(100);
  998. db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  999. db.set(&bytes!(b"valid"), Value::Ok, None);
  1000. db.set(
  1001. &bytes!(b"expiring"),
  1002. Value::Ok,
  1003. Some(Duration::from_secs(5)),
  1004. );
  1005. assert_eq!(None, db.ttl(&bytes!(b"expired")));
  1006. assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
  1007. assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
  1008. assert!(match db.ttl(&bytes!(b"expiring")) {
  1009. Some(Some(_)) => true,
  1010. _ => false,
  1011. });
  1012. }
  1013. #[test]
  1014. fn persist_bug() {
  1015. let db = Db::new(100);
  1016. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
  1017. assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
  1018. assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
  1019. db.persist(&bytes!(b"one"));
  1020. assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
  1021. }
  1022. #[test]
  1023. fn purge_keys() {
  1024. let db = Db::new(100);
  1025. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1026. // Expired keys should not be returned, even if they are not yet
  1027. // removed by the purge process.
  1028. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  1029. // Purge twice
  1030. assert_eq!(1, db.purge());
  1031. assert_eq!(0, db.purge());
  1032. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  1033. }
  1034. #[test]
  1035. fn replace_purge_keys() {
  1036. let db = Db::new(100);
  1037. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1038. // Expired keys should not be returned, even if they are not yet
  1039. // removed by the purge process.
  1040. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  1041. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
  1042. assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
  1043. // Purge should return 0 as the expired key has been removed already
  1044. assert_eq!(0, db.purge());
  1045. }
  1046. #[test]
  1047. fn scan_skip_expired() {
  1048. let db = Db::new(100);
  1049. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1050. db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1051. for i in 0u64..20u64 {
  1052. let key: Bytes = i.to_string().into();
  1053. db.set(&key, Value::Ok, None);
  1054. }
  1055. let result = db
  1056. .scan(Cursor::from_str("0").unwrap(), None, None, None)
  1057. .unwrap();
  1058. // first 10 records
  1059. assert_eq!(10, result.result.len());
  1060. // make sure the cursor is valid
  1061. assert_ne!("0", result.cursor.to_string());
  1062. let result = db.scan(result.cursor, None, None, None).unwrap();
  1063. // 10 more records
  1064. assert_eq!(10, result.result.len());
  1065. // make sure the cursor is valid
  1066. assert_ne!("0", result.cursor.to_string());
  1067. let result = db.scan(result.cursor, None, None, None).unwrap();
  1068. // No more results!
  1069. assert_eq!(0, result.result.len());
  1070. assert_eq!("0", result.cursor.to_string());
  1071. }
  1072. #[test]
  1073. fn scan_limit() {
  1074. let db = Db::new(10);
  1075. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1076. db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1077. for i in 0u64..2000u64 {
  1078. let key: Bytes = i.to_string().into();
  1079. db.set(&key, Value::Ok, None);
  1080. }
  1081. let result = db
  1082. .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
  1083. .unwrap();
  1084. assert_eq!(2, result.result.len());
  1085. assert_ne!("0", result.cursor.to_string());
  1086. }
  1087. #[test]
  1088. fn scan_filter() {
  1089. let db = Db::new(100);
  1090. db.set(&bytes!(b"fone"), Value::Ok, None);
  1091. db.set(&bytes!(b"ftwo"), Value::Ok, None);
  1092. for i in 0u64..20u64 {
  1093. let key: Bytes = i.to_string().into();
  1094. db.set(&key, Value::Ok, None);
  1095. }
  1096. let result = db
  1097. .scan(
  1098. Cursor::from_str("0").unwrap(),
  1099. Some(&bytes!(b"f*")),
  1100. None,
  1101. None,
  1102. )
  1103. .unwrap();
  1104. assert_eq!(2, result.result.len());
  1105. assert_eq!("0", result.cursor.to_string());
  1106. }
  1107. }