mod.rs 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312
  1. //! # in-memory database
  2. //!
  3. //! This database module is the core of the miniredis project. All other modules around this
  4. //! database module.
  5. use self::utils::{far_future, ExpirationOpts, Override};
  6. use crate::{
  7. error::Error,
  8. value::{
  9. bytes_to_number,
  10. cursor::Cursor,
  11. typ::{Typ, ValueTyp},
  12. VDebug, Value,
  13. },
  14. };
  15. use bytes::{BufMut, Bytes, BytesMut};
  16. use core::num;
  17. use entry::{new_version, Entry};
  18. use expiration::ExpirationDb;
  19. use futures::Future;
  20. use glob::Pattern;
  21. use log::trace;
  22. use num_traits::CheckedAdd;
  23. use parking_lot::{Mutex, RwLock};
  24. use rand::{prelude::SliceRandom, Rng};
  25. use seahash::hash;
  26. use std::{
  27. collections::HashMap,
  28. convert::{TryFrom, TryInto},
  29. ops::{AddAssign, Deref},
  30. str::FromStr,
  31. sync::Arc,
  32. thread,
  33. };
  34. use tokio::{
  35. sync::broadcast::{self, Receiver, Sender},
  36. time::{Duration, Instant},
  37. };
  38. mod entry;
  39. mod expiration;
  40. pub mod pool;
  41. pub mod scan;
  42. pub(crate) mod utils;
  43. /// Database structure
  44. ///
  45. /// Each connection has their own clone of the database and the conn_id is stored in each instance.
  46. /// The slots property is shared for all connections.
  47. ///
  48. /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
  49. /// HashMaps. Each key is pre-sharded and a bucket is selected. By doing this pre-step instead of
  50. /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
  51. /// making this database implementation thread-friendly. The number of number_of_slots available cannot be
  52. /// changed at runtime.
  53. ///
  54. /// The database is also aware of other connections locking other keys exclusively (for
  55. /// transactions).
  56. ///
  57. /// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
  58. /// versioning (in practice the nanosecond of last modification).
  59. #[derive(Debug)]
  60. pub struct Db {
  61. /// A vector of hashmaps.
  62. ///
  63. /// Instead of having a single HashMap, and having all threads fighting for
  64. /// blocking the single HashMap, we have a vector of N HashMap
  65. /// (configurable), which in theory allow to have faster reads and writes.
  66. ///
  67. /// Because all operations are always key specific, the key is used to hash
  68. /// and select to which HashMap the data might be stored.
  69. slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
  70. /// Data structure to store all expiring keys
  71. expirations: Arc<Mutex<ExpirationDb>>,
  72. /// Key changes subscriptions hash. This hash contains all the senders to
  73. /// key subscriptions. If a key does not exists here it means that no-one
  74. /// wants to be notified of the current key changes.
  75. change_subscriptions: Arc<RwLock<HashMap<Bytes, Sender<()>>>>,
  76. /// Number of HashMaps that are available.
  77. number_of_slots: usize,
  78. /// Databases unique ID. This is an internal identifier to avoid deadlocks
  79. /// when copying and moving data between databases.
  80. pub db_id: u128,
  81. /// Current connection ID
  82. ///
  83. /// A Database is attached to a conn_id. The slots and expiration data
  84. /// structures are shared between all connections, regardless of conn_id.
  85. ///
  86. /// This particular database instance is attached to a conn_id, which is used
  87. /// to lock keys exclusively for transactions and other atomic operations.
  88. conn_id: u128,
  89. /// HashMap of all blocked keys by other connections. If a key appears in
  90. /// here and it is not being hold by the current connection, current
  91. /// connection must wait.
  92. tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
  93. }
  94. impl Db {
  95. /// Creates a new database instance
  96. pub fn new(number_of_slots: usize) -> Self {
  97. let slots = (0..number_of_slots)
  98. .map(|_| RwLock::new(HashMap::new()))
  99. .collect();
  100. Self {
  101. slots: Arc::new(slots),
  102. expirations: Arc::new(Mutex::new(ExpirationDb::new())),
  103. change_subscriptions: Arc::new(RwLock::new(HashMap::new())),
  104. conn_id: 0,
  105. db_id: new_version(),
  106. tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
  107. number_of_slots,
  108. }
  109. }
  110. /// Creates a new Database instance bound to a connection.
  111. ///
  112. /// This is particular useful when locking keys exclusively.
  113. ///
  114. /// All the internal data are shjared through an Arc.
  115. pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
  116. Arc::new(Self {
  117. slots: self.slots.clone(),
  118. tx_key_locks: self.tx_key_locks.clone(),
  119. expirations: self.expirations.clone(),
  120. change_subscriptions: self.change_subscriptions.clone(),
  121. conn_id,
  122. db_id: self.db_id,
  123. number_of_slots: self.number_of_slots,
  124. })
  125. }
  126. #[inline]
  127. /// Returns a slot where a key may be hosted.
  128. ///
  129. /// In order to avoid too much locks, instead of having a single hash a
  130. /// database instance is a set of hashes. Each key is pre-shared with a
  131. /// quick hashing algorithm to select a 'slot' or HashMap where it may be
  132. /// hosted.
  133. fn get_slot(&self, key: &Bytes) -> usize {
  134. let id = (hash(key) as usize) % self.number_of_slots;
  135. trace!("selected slot {} for key {:?}", id, key);
  136. let waiting = Duration::from_nanos(100);
  137. while let Some(blocker) = self.tx_key_locks.read().get(key) {
  138. // Loop while the key we are trying to access is being blocked by a
  139. // connection in a transaction
  140. if *blocker == self.conn_id {
  141. // the key is being blocked by ourself, it is safe to break the
  142. // waiting loop
  143. break;
  144. }
  145. thread::sleep(waiting);
  146. }
  147. id
  148. }
  149. /// Locks keys exclusively
  150. ///
  151. /// The locked keys are only accessible (read or write) by the connection
  152. /// that locked them, any other connection must wait until the locking
  153. /// connection releases them.
  154. ///
  155. /// This is used to simulate redis transactions. Transaction in Redis are
  156. /// atomic but pausing a multi threaded Redis just to keep the same promises
  157. /// was a bit extreme, that's the reason why a transaction will lock
  158. /// exclusively all keys involved.
  159. pub fn lock_keys(&self, keys: &[Bytes]) {
  160. let waiting = Duration::from_nanos(100);
  161. loop {
  162. let mut lock = self.tx_key_locks.write();
  163. let mut i = 0;
  164. for key in keys.iter() {
  165. if let Some(blocker) = lock.get(key) {
  166. if *blocker == self.conn_id {
  167. // It is blocked by us already.
  168. i += 1;
  169. continue;
  170. }
  171. // It is blocked by another tx, we need to break
  172. // and retry to gain the lock over this key
  173. break;
  174. }
  175. lock.insert(key.clone(), self.conn_id);
  176. i += 1;
  177. }
  178. if i == keys.len() {
  179. // All the involved keys are successfully being blocked
  180. // exclusively.
  181. break;
  182. }
  183. // We need to sleep a bit and retry.
  184. drop(lock);
  185. thread::sleep(waiting);
  186. }
  187. }
  188. /// Releases the lock on keys
  189. pub fn unlock_keys(&self, keys: &[Bytes]) {
  190. let mut lock = self.tx_key_locks.write();
  191. for key in keys.iter() {
  192. lock.remove(key);
  193. }
  194. }
  195. /// Return debug info for a key
  196. pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
  197. let slot = self.slots[self.get_slot(key)].read();
  198. Ok(slot
  199. .get(key)
  200. .filter(|x| x.is_valid())
  201. .ok_or(Error::NotFound)?
  202. .value
  203. .debug())
  204. }
  205. /// Return the digest for each key. This used for testing only
  206. pub fn digest(&self, keys: &[Bytes]) -> Result<Vec<Value>, Error> {
  207. Ok(keys
  208. .iter()
  209. .map(|key| {
  210. let slot = self.slots[self.get_slot(key)].read();
  211. Value::Blob(
  212. slot.get(key)
  213. .filter(|v| v.is_valid())
  214. .map(|v| hex::encode(&v.value.digest()))
  215. .unwrap_or("00000".into())
  216. .as_str()
  217. .into(),
  218. )
  219. })
  220. .collect::<Vec<Value>>())
  221. }
  222. /// Flushes the entire database
  223. pub fn flushdb(&self) -> Result<Value, Error> {
  224. self.expirations.lock().flush();
  225. self.slots
  226. .iter()
  227. .map(|s| {
  228. let mut s = s.write();
  229. s.clear();
  230. })
  231. .for_each(drop);
  232. Ok(Value::Ok)
  233. }
  234. /// Returns the number of elements in the database
  235. pub fn len(&self) -> Result<usize, Error> {
  236. self.purge();
  237. Ok(self.slots.iter().map(|s| s.read().len()).sum())
  238. }
  239. /// Round numbers to store efficiently, specially float numbers. For instance `1.00` will be converted to `1`.
  240. fn round_numbers<T>(number: T) -> BytesMut
  241. where
  242. T: ToString,
  243. {
  244. let number_to_str = number.to_string();
  245. if number_to_str.find('.').is_none() {
  246. return number_to_str.as_bytes().into();
  247. }
  248. let number_to_str = number_to_str
  249. .trim_end_matches(|c| c == '0' || c == '.')
  250. .to_string();
  251. if number_to_str.is_empty() {
  252. "0"
  253. } else {
  254. number_to_str.as_str()
  255. }
  256. .into()
  257. }
  258. // Converts a given number to a correct Value, it should be used with Self::round_numbers()
  259. fn number_to_value(number: &[u8]) -> Result<Value, Error> {
  260. if number.iter().find(|x| **x == b'.').is_some() {
  261. Ok(Value::new(number))
  262. } else {
  263. Ok(Value::Integer(bytes_to_number(number)?))
  264. }
  265. }
  266. /// Increment a sub-key in a hash
  267. ///
  268. /// If the stored value cannot be converted into a number an error will be thrown
  269. pub fn hincrby<T>(
  270. &self,
  271. key: &Bytes,
  272. sub_key: &Bytes,
  273. incr_by: &Bytes,
  274. typ: &str,
  275. ) -> Result<Value, Error>
  276. where
  277. T: ToString
  278. + FromStr
  279. + CheckedAdd
  280. + for<'a> TryFrom<&'a Value, Error = Error>
  281. + Into<Value>
  282. + Copy,
  283. {
  284. let mut slot = self.slots[self.get_slot(key)].write();
  285. let mut incr_by: T =
  286. bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
  287. match slot.get_mut(key).filter(|x| x.is_valid()).map(|x| x.get()) {
  288. Some(Value::Hash(h)) => {
  289. let mut h = h.write();
  290. if let Some(n) = h.get(sub_key) {
  291. incr_by = incr_by
  292. .checked_add(
  293. &bytes_to_number(n)
  294. .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
  295. )
  296. .ok_or(Error::Overflow)?;
  297. }
  298. let incr_by_bytes = Self::round_numbers(incr_by).freeze();
  299. h.insert(sub_key.clone(), incr_by_bytes.clone());
  300. Self::number_to_value(&incr_by_bytes)
  301. }
  302. None => {
  303. #[allow(clippy::mutable_key_type)]
  304. let mut h = HashMap::new();
  305. let incr_by_bytes = Self::round_numbers(incr_by).freeze();
  306. h.insert(sub_key.clone(), incr_by_bytes.clone());
  307. let _ = slot.insert(key.clone(), Entry::new(h.into(), None));
  308. Self::number_to_value(&incr_by_bytes)
  309. }
  310. _ => Err(Error::WrongType),
  311. }
  312. }
  313. /// Increments a key's value by a given number
  314. ///
  315. /// If the stored value cannot be converted into a number an error will be
  316. /// thrown.
  317. pub fn incr<T>(&self, key: &Bytes, incr_by: T) -> Result<T, Error>
  318. where
  319. T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
  320. {
  321. let mut slot = self.slots[self.get_slot(key)].write();
  322. match slot.get_mut(key).filter(|x| x.is_valid()) {
  323. Some(x) => {
  324. if !x.is_clonable() {
  325. return Err(Error::WrongType);
  326. }
  327. let value = x.get();
  328. let mut number: T = value.try_into()?;
  329. number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
  330. x.change_value(Value::Blob(Self::round_numbers(number)));
  331. Ok(number)
  332. }
  333. None => {
  334. slot.insert(
  335. key.clone(),
  336. Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
  337. );
  338. Ok(incr_by)
  339. }
  340. }
  341. }
  342. /// Removes any expiration associated with a given key
  343. pub fn persist(&self, key: &Bytes) -> Value {
  344. let mut slot = self.slots[self.get_slot(key)].write();
  345. slot.get_mut(key)
  346. .filter(|x| x.is_valid())
  347. .map_or(0.into(), |x| {
  348. if x.has_ttl() {
  349. self.expirations.lock().remove(key);
  350. x.persist();
  351. 1.into()
  352. } else {
  353. 0.into()
  354. }
  355. })
  356. }
  357. /// Set time to live for a given key
  358. pub fn set_ttl(
  359. &self,
  360. key: &Bytes,
  361. expires_in: Duration,
  362. opts: ExpirationOpts,
  363. ) -> Result<Value, Error> {
  364. if (opts.NX && opts.XX) || (opts.NX && opts.GT) || (opts.NX && opts.LT) {
  365. return Err(Error::OptsNotCompatible("NX and XX, GT or LT".to_owned()));
  366. }
  367. if opts.GT && opts.LT {
  368. return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
  369. }
  370. let mut slot = self.slots[self.get_slot(key)].write();
  371. let expires_at = Instant::now()
  372. .checked_add(expires_in)
  373. .unwrap_or_else(far_future);
  374. Ok(slot
  375. .get_mut(key)
  376. .filter(|x| x.is_valid())
  377. .map_or(0.into(), |x| {
  378. let current_expire = x.get_ttl();
  379. if opts.NX && current_expire.is_some() {
  380. return 0.into();
  381. }
  382. if opts.XX && current_expire.is_none() {
  383. return 0.into();
  384. }
  385. if opts.GT {
  386. if let Some(current_expire) = current_expire {
  387. if expires_at <= current_expire {
  388. return 0.into();
  389. }
  390. } else {
  391. return 0.into();
  392. }
  393. }
  394. if opts.LT {
  395. if let Some(current_expire) = current_expire {
  396. if expires_at >= current_expire {
  397. return 0.into();
  398. }
  399. }
  400. }
  401. self.expirations.lock().add(key, expires_at);
  402. x.set_ttl(expires_at);
  403. 1.into()
  404. }))
  405. }
  406. /// Overwrites part of the string stored at key, starting at the specified
  407. /// offset, for the entire length of value. If the offset is larger than the
  408. /// current length of the string at key, the string is padded with zero-bytes to
  409. /// make offset fit. Non-existing keys are considered as empty strings, so this
  410. /// command will make sure it holds a string large enough to be able to set
  411. /// value at offset.
  412. pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
  413. let mut slot = self.slots[self.get_slot(key)].write();
  414. let value = slot.get_mut(key).map(|value| {
  415. if !value.is_valid() {
  416. self.expirations.lock().remove(key);
  417. value.persist();
  418. }
  419. value.get_mut()
  420. });
  421. if offset < 0 {
  422. return Err(Error::OutOfRange);
  423. }
  424. if offset >= 512 * 1024 * 1024 - 4 {
  425. return Err(Error::MaxAllowedSize);
  426. }
  427. let length = offset as usize + data.len();
  428. match value {
  429. Some(Value::Blob(bytes)) => {
  430. if bytes.capacity() < length {
  431. bytes.resize(length, 0);
  432. }
  433. let writer = &mut bytes[offset as usize..length];
  434. writer.copy_from_slice(data);
  435. Ok(bytes.len().into())
  436. }
  437. None => {
  438. if data.len() == 0 {
  439. return Ok(0.into());
  440. }
  441. let mut bytes = BytesMut::new();
  442. bytes.resize(length, 0);
  443. let writer = &mut bytes[offset as usize..];
  444. writer.copy_from_slice(data);
  445. slot.insert(key.clone(), Entry::new(Value::new(&bytes), None));
  446. Ok(bytes.len().into())
  447. }
  448. _ => Err(Error::WrongType),
  449. }
  450. }
  451. /// Copies a key
  452. pub fn copy(
  453. &self,
  454. source: &Bytes,
  455. target: &Bytes,
  456. replace: Override,
  457. target_db: Option<Arc<Db>>,
  458. ) -> Result<bool, Error> {
  459. let slot = self.slots[self.get_slot(source)].read();
  460. let value = if let Some(value) = slot.get(source).filter(|x| x.is_valid()) {
  461. value.clone()
  462. } else {
  463. return Ok(false);
  464. };
  465. drop(slot);
  466. if let Some(db) = target_db {
  467. if db.db_id == self.db_id && source == target {
  468. return Err(Error::SameEntry);
  469. }
  470. if replace == Override::No && db.exists(&[target.clone()]) > 0 {
  471. return Ok(false);
  472. }
  473. let _ = db.set_advanced(
  474. target,
  475. value.value.clone(),
  476. value.get_ttl().map(|v| v - Instant::now()),
  477. replace,
  478. false,
  479. false,
  480. );
  481. Ok(true)
  482. } else {
  483. if source == target {
  484. return Err(Error::SameEntry);
  485. }
  486. if replace == Override::No && self.exists(&[target.clone()]) > 0 {
  487. return Ok(false);
  488. }
  489. let mut slot = self.slots[self.get_slot(target)].write();
  490. slot.insert(target.clone(), value);
  491. Ok(true)
  492. }
  493. }
  494. /// Moves a given key between databases
  495. pub fn move_key(&self, source: &Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
  496. if self.db_id == target_db.db_id {
  497. return Err(Error::SameEntry);
  498. }
  499. let mut slot = self.slots[self.get_slot(source)].write();
  500. let (expires_in, value) = if let Some(value) = slot.get(source).filter(|v| v.is_valid()) {
  501. (
  502. value.get_ttl().map(|t| t - Instant::now()),
  503. value.value.clone(),
  504. )
  505. } else {
  506. return Ok(false);
  507. };
  508. if Value::Integer(1)
  509. == target_db.set_advanced(&source, value, expires_in, Override::No, false, false)
  510. {
  511. slot.remove(source);
  512. Ok(true)
  513. } else {
  514. Ok(false)
  515. }
  516. }
  517. /// Return a random key from the database
  518. pub fn randomkey(&self) -> Result<Value, Error> {
  519. let mut rng = rand::thread_rng();
  520. let mut candidates = self
  521. .slots
  522. .iter()
  523. .map(|slot| {
  524. let slot = slot.read();
  525. if slot.is_empty() {
  526. None
  527. } else {
  528. slot.iter()
  529. .skip(rng.gen_range((0..slot.len())))
  530. .next()
  531. .map(|(k, v)| k.clone())
  532. }
  533. })
  534. .filter_map(|v| v)
  535. .collect::<Vec<Bytes>>();
  536. candidates.shuffle(&mut rng);
  537. Ok(candidates.get(0).into())
  538. }
  539. /// Renames a key
  540. pub fn rename(
  541. &self,
  542. source: &Bytes,
  543. target: &Bytes,
  544. override_value: Override,
  545. ) -> Result<bool, Error> {
  546. let slot1 = self.get_slot(source);
  547. let slot2 = self.get_slot(target);
  548. let result = if slot1 == slot2 {
  549. let mut slot = self.slots[slot1].write();
  550. if override_value == Override::No && slot.get(target).is_some() {
  551. return Ok(false);
  552. }
  553. if let Some(value) = slot.remove(source) {
  554. slot.insert(target.clone(), value);
  555. Ok(true)
  556. } else {
  557. Err(Error::NotFound)
  558. }
  559. } else {
  560. let mut slot1 = self.slots[slot1].write();
  561. let mut slot2 = self.slots[slot2].write();
  562. if override_value == Override::No && slot2.get(target).is_some() {
  563. return Ok(false);
  564. }
  565. if let Some(value) = slot1.remove(source) {
  566. slot2.insert(target.clone(), value);
  567. Ok(true)
  568. } else {
  569. Err(Error::NotFound)
  570. }
  571. };
  572. if result.is_ok() {
  573. self.bump_version(source);
  574. self.bump_version(target);
  575. }
  576. result
  577. }
  578. /// Removes keys from the database
  579. pub fn del(&self, keys: &[Bytes]) -> Value {
  580. let mut expirations = self.expirations.lock();
  581. keys.iter()
  582. .filter_map(|key| {
  583. expirations.remove(key);
  584. self.slots[self.get_slot(key)].write().remove(key)
  585. })
  586. .filter(|key| key.is_valid())
  587. .count()
  588. .into()
  589. }
  590. /// Returns all keys that matches a given pattern. This is a very expensive command.
  591. pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
  592. let pattern = String::from_utf8_lossy(pattern);
  593. let pattern =
  594. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
  595. Ok(self
  596. .slots
  597. .iter()
  598. .map(|slot| {
  599. slot.read()
  600. .keys()
  601. .filter(|key| {
  602. let str_key = String::from_utf8_lossy(key);
  603. pattern.matches(&str_key)
  604. })
  605. .map(|key| Value::new(key))
  606. .collect::<Vec<Value>>()
  607. })
  608. .flatten()
  609. .collect())
  610. }
  611. /// Check if keys exists in the database
  612. pub fn exists(&self, keys: &[Bytes]) -> usize {
  613. let mut matches = 0;
  614. keys.iter()
  615. .map(|key| {
  616. let slot = self.slots[self.get_slot(key)].read();
  617. if let Some(key) = slot.get(key) {
  618. matches += if key.is_valid() { 1 } else { 0 };
  619. }
  620. })
  621. .for_each(drop);
  622. matches
  623. }
  624. /// get_map_or
  625. ///
  626. /// Instead of returning an entry of the database, to avoid cloning, this function will
  627. /// execute a callback function with the entry as a parameter. If no record is found another
  628. /// callback function is going to be executed, dropping the lock before doing so.
  629. ///
  630. /// If an entry is found, the lock is not dropped before doing the callback. Avoid inserting
  631. /// new entries. In this case the value is passed by reference, so it is possible to modify the
  632. /// entry itself.
  633. ///
  634. /// This function is useful to read non-scalar values from the database. Non-scalar values are
  635. /// forbidden to clone, attempting cloning will end-up in an error (Error::WrongType)
  636. pub fn get_map_or<F1, F2>(&self, key: &Bytes, found: F1, not_found: F2) -> Result<Value, Error>
  637. where
  638. F1: FnOnce(&Value) -> Result<Value, Error>,
  639. F2: FnOnce() -> Result<Value, Error>,
  640. {
  641. let slot = self.slots[self.get_slot(key)].read();
  642. let entry = slot.get(key).filter(|x| x.is_valid()).map(|e| e.get());
  643. if let Some(entry) = entry {
  644. found(entry)
  645. } else {
  646. // drop lock
  647. drop(slot);
  648. not_found()
  649. }
  650. }
  651. /// Updates the entry version of a given key
  652. pub fn bump_version(&self, key: &Bytes) -> bool {
  653. let mut slot = self.slots[self.get_slot(key)].write();
  654. let to_return = slot
  655. .get_mut(key)
  656. .filter(|x| x.is_valid())
  657. .map(|entry| {
  658. entry.bump_version();
  659. })
  660. .is_some();
  661. drop(slot);
  662. if to_return {
  663. let senders = self.change_subscriptions.read();
  664. if let Some(sender) = senders.get(key) {
  665. if sender.receiver_count() == 0 {
  666. // Garbage collection
  667. drop(senders);
  668. self.change_subscriptions.write().remove(key);
  669. } else {
  670. // Notify
  671. let _ = sender.send(());
  672. }
  673. }
  674. }
  675. to_return
  676. }
  677. /// Subscribe to key changes.
  678. pub fn subscribe_to_key_changes(&self, keys: &[Bytes]) -> Vec<Receiver<()>> {
  679. let mut subscriptions = self.change_subscriptions.write();
  680. keys.iter()
  681. .map(|key| {
  682. if let Some(sender) = subscriptions.get(key) {
  683. sender.subscribe()
  684. } else {
  685. let (sender, receiver) = broadcast::channel(1);
  686. subscriptions.insert(key.clone(), sender);
  687. receiver
  688. }
  689. })
  690. .collect()
  691. }
  692. /// Returns the version of a given key
  693. #[inline]
  694. pub fn get_version(&self, key: &Bytes) -> u128 {
  695. let slot = self.slots[self.get_slot(key)].read();
  696. slot.get(key)
  697. .filter(|x| x.is_valid())
  698. .map(|entry| entry.version())
  699. .unwrap_or_default()
  700. }
  701. /// Returns the name of the value type
  702. pub fn get_data_type(&self, key: &Bytes) -> String {
  703. let slot = self.slots[self.get_slot(key)].read();
  704. slot.get(key)
  705. .filter(|x| x.is_valid())
  706. .map_or("none".to_owned(), |x| {
  707. Typ::get_type(x.get()).to_string().to_lowercase()
  708. })
  709. }
  710. /// Get a copy of an entry
  711. pub fn get(&self, key: &Bytes) -> Value {
  712. let slot = self.slots[self.get_slot(key)].read();
  713. slot.get(key)
  714. .filter(|x| x.is_valid())
  715. .map_or(Value::Null, |x| x.clone_value())
  716. }
  717. /// Get a copy of an entry and modifies the expiration of the key
  718. pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
  719. let mut slot = self.slots[self.get_slot(key)].write();
  720. slot.get_mut(key)
  721. .filter(|x| x.is_valid())
  722. .map(|value| {
  723. if make_persistent {
  724. self.expirations.lock().remove(key);
  725. value.persist();
  726. } else if let Some(expires_in) = expires_in {
  727. let expires_at = Instant::now()
  728. .checked_add(expires_in)
  729. .unwrap_or_else(far_future);
  730. self.expirations.lock().add(key, expires_at);
  731. value.set_ttl(expires_at);
  732. }
  733. value
  734. })
  735. .map_or(Value::Null, |x| x.clone_value())
  736. }
  737. /// Get multiple copies of entries
  738. pub fn get_multi(&self, keys: &[Bytes]) -> Value {
  739. keys.iter()
  740. .map(|key| {
  741. let slot = self.slots[self.get_slot(key)].read();
  742. slot.get(key)
  743. .filter(|x| x.is_valid() && x.is_clonable())
  744. .map_or(Value::Null, |x| x.clone_value())
  745. })
  746. .collect::<Vec<Value>>()
  747. .into()
  748. }
  749. /// Get a key or set a new value for the given key.
  750. pub fn getset(&self, key: &Bytes, value: Value) -> Value {
  751. let mut slot = self.slots[self.get_slot(key)].write();
  752. self.expirations.lock().remove(key);
  753. slot.insert(key.clone(), Entry::new(value, None))
  754. .filter(|x| x.is_valid())
  755. .map_or(Value::Null, |x| x.clone_value())
  756. }
  757. /// Takes an entry from the database.
  758. pub fn getdel(&self, key: &Bytes) -> Value {
  759. let mut slot = self.slots[self.get_slot(key)].write();
  760. slot.remove(key).map_or(Value::Null, |x| {
  761. self.expirations.lock().remove(key);
  762. x.clone_value()
  763. })
  764. }
  765. /// Set a key, value with an optional expiration time
  766. pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
  767. let mut slot = self.slots[self.get_slot(key)].write();
  768. let mut entry = slot.get_mut(key).filter(|x| x.is_valid());
  769. if let Some(entry) = slot.get_mut(key).filter(|x| x.is_valid()) {
  770. match entry.get_mut() {
  771. Value::Blob(value) => {
  772. value.put(value_to_append.as_ref());
  773. Ok(value.len().into())
  774. }
  775. _ => Err(Error::WrongType),
  776. }
  777. } else {
  778. slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
  779. Ok(value_to_append.len().into())
  780. }
  781. }
  782. /// Set multiple key/value pairs. Are involved keys are locked exclusively
  783. /// like a transaction.
  784. ///
  785. /// If override_all is set to false, all entries must be new entries or the
  786. /// entire operation fails, in this case 1 or is returned. Otherwise `Ok` is
  787. /// returned.
  788. pub fn multi_set(&self, key_values: &[Bytes], override_all: bool) -> Result<Value, Error> {
  789. if key_values.len() % 2 == 1 {
  790. return Err(Error::Syntax);
  791. }
  792. let keys = key_values
  793. .iter()
  794. .step_by(2)
  795. .cloned()
  796. .collect::<Vec<Bytes>>();
  797. self.lock_keys(&keys);
  798. if !override_all {
  799. for key in keys.iter() {
  800. let slot = self.slots[self.get_slot(key)].read();
  801. if slot.get(key).is_some() {
  802. self.unlock_keys(&keys);
  803. return Ok(0.into());
  804. }
  805. }
  806. }
  807. for (i, _) in key_values.iter().enumerate().step_by(2) {
  808. let mut slot = self.slots[self.get_slot(&key_values[i])].write();
  809. slot.insert(
  810. key_values[i].clone(),
  811. Entry::new(Value::new(&key_values[i + 1]), None),
  812. );
  813. }
  814. self.unlock_keys(&keys);
  815. if override_all {
  816. Ok(Value::Ok)
  817. } else {
  818. Ok(1.into())
  819. }
  820. }
  821. /// Set a key, value with an optional expiration time
  822. pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
  823. self.set_advanced(key, value, expires_in, Default::default(), false, false)
  824. }
  825. /// Set a value in the database with various settings
  826. pub fn set_advanced(
  827. &self,
  828. key: &Bytes,
  829. value: Value,
  830. expires_in: Option<Duration>,
  831. override_value: Override,
  832. keep_ttl: bool,
  833. return_previous: bool,
  834. ) -> Value {
  835. let mut slot = self.slots[self.get_slot(key)].write();
  836. let expires_at = expires_in.map(|duration| {
  837. Instant::now()
  838. .checked_add(duration)
  839. .unwrap_or_else(far_future)
  840. });
  841. let previous = slot.get(key).filter(|x| x.is_valid());
  842. let expires_at = if keep_ttl {
  843. if let Some(previous) = previous {
  844. previous.get_ttl()
  845. } else {
  846. expires_at
  847. }
  848. } else {
  849. expires_at
  850. };
  851. let to_return = if return_previous {
  852. let previous = previous.map_or(Value::Null, |v| v.clone_value());
  853. if previous.is_err() {
  854. // Error while trying to clone the previous value to return, we
  855. // must halt and return immediately.
  856. return previous;
  857. }
  858. Some(previous)
  859. } else {
  860. None
  861. };
  862. match override_value {
  863. Override::No => {
  864. if previous.is_some() {
  865. return if let Some(to_return) = to_return {
  866. to_return
  867. } else {
  868. 0.into()
  869. };
  870. }
  871. }
  872. Override::Only => {
  873. if previous.is_none() {
  874. return if let Some(to_return) = to_return {
  875. to_return
  876. } else {
  877. 0.into()
  878. };
  879. }
  880. }
  881. _ => {}
  882. };
  883. if let Some(expires_at) = expires_at {
  884. self.expirations.lock().add(key, expires_at);
  885. } else {
  886. /// Make sure to remove the new key (or replaced) from the
  887. /// expiration table (from any possible past value).
  888. self.expirations.lock().remove(key);
  889. }
  890. slot.insert(key.clone(), Entry::new(value, expires_at));
  891. if let Some(to_return) = to_return {
  892. to_return
  893. } else if override_value == Override::Yes {
  894. Value::Ok
  895. } else {
  896. 1.into()
  897. }
  898. }
  899. /// Returns the TTL of a given key
  900. pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
  901. let slot = self.slots[self.get_slot(key)].read();
  902. slot.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
  903. }
  904. /// Check whether a given key is in the list of keys to be purged or not.
  905. /// This function is mainly used for unit testing
  906. pub fn is_key_in_expiration_list(&self, key: &Bytes) -> bool {
  907. self.expirations.lock().has(key)
  908. }
  909. /// Remove expired entries from the database.
  910. ///
  911. /// This function should be called from a background thread every few seconds. Calling it more
  912. /// often is a waste of resources.
  913. ///
  914. /// Expired keys are automatically hidden by the database, this process is just claiming back
  915. /// the memory from those expired keys.
  916. pub fn purge(&self) -> u64 {
  917. let mut expirations = self.expirations.lock();
  918. let mut removed = 0;
  919. trace!("Watching {} keys for expirations", expirations.len());
  920. let keys = expirations.get_expired_keys(None);
  921. drop(expirations);
  922. keys.iter()
  923. .map(|key| {
  924. let mut slot = self.slots[self.get_slot(key)].write();
  925. if slot.remove(key).is_some() {
  926. trace!("Removed key {:?} due timeout", key);
  927. removed += 1;
  928. }
  929. })
  930. .for_each(drop);
  931. removed
  932. }
  933. }
  934. impl scan::Scan for Db {
  935. fn scan(
  936. &self,
  937. cursor: Cursor,
  938. pattern: Option<&Bytes>,
  939. count: Option<usize>,
  940. typ: Option<Typ>,
  941. ) -> Result<scan::Result, Error> {
  942. let mut keys = vec![];
  943. let mut slot_id = cursor.bucket as usize;
  944. let mut last_pos = cursor.last_position as usize;
  945. let pattern = pattern
  946. .map(|pattern| {
  947. let pattern = String::from_utf8_lossy(pattern);
  948. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
  949. })
  950. .transpose()?;
  951. loop {
  952. let slot = if let Some(value) = self.slots.get(slot_id) {
  953. value.read()
  954. } else {
  955. // We iterated through all the entries, time to signal that to
  956. // the client but returning a "0" cursor.
  957. slot_id = 0;
  958. last_pos = 0;
  959. break;
  960. };
  961. for (key, value) in slot.iter().skip(last_pos) {
  962. if !value.is_valid() {
  963. // Entry still exists in memory but it is not longer valid
  964. // and will soon be gargabe collected.
  965. last_pos += 1;
  966. continue;
  967. }
  968. if let Some(pattern) = &pattern {
  969. let str_key = String::from_utf8_lossy(key);
  970. if !pattern.matches(&str_key) {
  971. last_pos += 1;
  972. continue;
  973. }
  974. }
  975. if let Some(typ) = &typ {
  976. if !typ.is_value_type(value.get()) {
  977. last_pos += 1;
  978. continue;
  979. }
  980. }
  981. keys.push(Value::new(key));
  982. last_pos += 1;
  983. if keys.len() == count.unwrap_or(10) {
  984. break;
  985. }
  986. }
  987. if keys.len() == count.unwrap_or(10) {
  988. break;
  989. }
  990. last_pos = 0;
  991. slot_id += 1;
  992. }
  993. Ok(scan::Result {
  994. cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
  995. result: keys,
  996. })
  997. }
  998. }
  999. #[cfg(test)]
  1000. mod test {
  1001. use super::*;
  1002. use crate::{bytes, db::scan::Scan, value::float::Float};
  1003. use std::str::FromStr;
  1004. #[test]
  1005. fn incr_wrong_type() {
  1006. let db = Db::new(100);
  1007. db.set(&bytes!(b"num"), Value::Blob(bytes!("some string")), None);
  1008. let r = db.incr(&bytes!("num"), 1);
  1009. assert!(r.is_err());
  1010. assert_eq!(Error::NotANumber, r.expect_err("should fail"));
  1011. assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
  1012. }
  1013. #[test]
  1014. fn incr_blob_float() {
  1015. let db = Db::new(100);
  1016. db.set(&bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
  1017. assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1018. assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
  1019. }
  1020. #[test]
  1021. fn incr_blob_int_float() {
  1022. let db = Db::new(100);
  1023. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  1024. assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1025. assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
  1026. }
  1027. #[test]
  1028. fn incr_blob_int() {
  1029. let db = Db::new(100);
  1030. db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
  1031. assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
  1032. assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
  1033. }
  1034. #[test]
  1035. fn incr_blob_int_set() {
  1036. let db = Db::new(100);
  1037. assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
  1038. assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
  1039. }
  1040. #[test]
  1041. fn incr_blob_float_set() {
  1042. let db = Db::new(100);
  1043. assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1044. assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
  1045. }
  1046. #[test]
  1047. fn del() {
  1048. let db = Db::new(100);
  1049. db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  1050. db.set(&bytes!(b"valid"), Value::Ok, None);
  1051. db.set(
  1052. &bytes!(b"expiring"),
  1053. Value::Ok,
  1054. Some(Duration::from_secs(5)),
  1055. );
  1056. assert_eq!(
  1057. Value::Integer(2),
  1058. db.del(&[
  1059. bytes!(b"expired"),
  1060. bytes!(b"valid"),
  1061. bytes!(b"expiring"),
  1062. bytes!(b"not_existing_key")
  1063. ])
  1064. );
  1065. }
  1066. #[test]
  1067. fn ttl() {
  1068. let db = Db::new(100);
  1069. db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  1070. db.set(&bytes!(b"valid"), Value::Ok, None);
  1071. db.set(
  1072. &bytes!(b"expiring"),
  1073. Value::Ok,
  1074. Some(Duration::from_secs(5)),
  1075. );
  1076. assert_eq!(None, db.ttl(&bytes!(b"expired")));
  1077. assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
  1078. assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
  1079. assert!(match db.ttl(&bytes!(b"expiring")) {
  1080. Some(Some(_)) => true,
  1081. _ => false,
  1082. });
  1083. }
  1084. #[test]
  1085. fn persist_bug() {
  1086. let db = Db::new(100);
  1087. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
  1088. assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
  1089. assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
  1090. db.persist(&bytes!(b"one"));
  1091. assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
  1092. }
  1093. #[test]
  1094. fn purge_keys() {
  1095. let db = Db::new(100);
  1096. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1097. // Expired keys should not be returned, even if they are not yet
  1098. // removed by the purge process.
  1099. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  1100. // Purge twice
  1101. assert_eq!(1, db.purge());
  1102. assert_eq!(0, db.purge());
  1103. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  1104. }
  1105. #[test]
  1106. fn replace_purge_keys() {
  1107. let db = Db::new(100);
  1108. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1109. // Expired keys should not be returned, even if they are not yet
  1110. // removed by the purge process.
  1111. assert_eq!(Value::Null, db.get(&bytes!(b"one")));
  1112. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
  1113. assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
  1114. // Purge should return 0 as the expired key has been removed already
  1115. assert_eq!(0, db.purge());
  1116. }
  1117. #[test]
  1118. fn scan_skip_expired() {
  1119. let db = Db::new(100);
  1120. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1121. db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1122. for i in 0u64..20u64 {
  1123. let key: Bytes = i.to_string().into();
  1124. db.set(&key, Value::Ok, None);
  1125. }
  1126. let result = db
  1127. .scan(Cursor::from_str("0").unwrap(), None, None, None)
  1128. .unwrap();
  1129. // first 10 records
  1130. assert_eq!(10, result.result.len());
  1131. // make sure the cursor is valid
  1132. assert_ne!("0", result.cursor.to_string());
  1133. let result = db.scan(result.cursor, None, None, None).unwrap();
  1134. // 10 more records
  1135. assert_eq!(10, result.result.len());
  1136. // make sure the cursor is valid
  1137. assert_ne!("0", result.cursor.to_string());
  1138. let result = db.scan(result.cursor, None, None, None).unwrap();
  1139. // No more results!
  1140. assert_eq!(0, result.result.len());
  1141. assert_eq!("0", result.cursor.to_string());
  1142. }
  1143. #[test]
  1144. fn scan_limit() {
  1145. let db = Db::new(10);
  1146. db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1147. db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1148. for i in 0u64..2000u64 {
  1149. let key: Bytes = i.to_string().into();
  1150. db.set(&key, Value::Ok, None);
  1151. }
  1152. let result = db
  1153. .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
  1154. .unwrap();
  1155. assert_eq!(2, result.result.len());
  1156. assert_ne!("0", result.cursor.to_string());
  1157. }
  1158. #[test]
  1159. fn scan_filter() {
  1160. let db = Db::new(100);
  1161. db.set(&bytes!(b"fone"), Value::Ok, None);
  1162. db.set(&bytes!(b"ftwo"), Value::Ok, None);
  1163. for i in 0u64..20u64 {
  1164. let key: Bytes = i.to_string().into();
  1165. db.set(&key, Value::Ok, None);
  1166. }
  1167. let result = db
  1168. .scan(
  1169. Cursor::from_str("0").unwrap(),
  1170. Some(&bytes!(b"f*")),
  1171. None,
  1172. None,
  1173. )
  1174. .unwrap();
  1175. assert_eq!(2, result.result.len());
  1176. assert_eq!("0", result.cursor.to_string());
  1177. }
  1178. }