mod.rs 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433
  1. //! # in-memory database
  2. //!
  3. //! This database module is the core of the miniredis project. All other modules around this
  4. //! database module.
  5. use self::utils::{far_future, ExpirationOpts, Override};
  6. use crate::{
  7. error::Error,
  8. value::{bytes_to_number, cursor::Cursor, typ::Typ, VDebug, Value},
  9. };
  10. use bytes::{BufMut, Bytes, BytesMut};
  11. use entry::{unique_id, Entry};
  12. use expiration::ExpirationDb;
  13. use glob::Pattern;
  14. use log::trace;
  15. use num_traits::CheckedAdd;
  16. use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
  17. use rand::{prelude::SliceRandom, Rng};
  18. use seahash::hash;
  19. use std::{
  20. collections::{HashMap, VecDeque},
  21. convert::{TryFrom, TryInto},
  22. ops::{Deref, DerefMut},
  23. str::FromStr,
  24. sync::Arc,
  25. thread,
  26. };
  27. use tokio::{
  28. sync::broadcast::{self, Receiver, Sender},
  29. time::{Duration, Instant},
  30. };
  31. mod entry;
  32. mod expiration;
  33. pub mod pool;
  34. pub mod scan;
  35. pub(crate) mod utils;
  36. /// Read only reference
  37. pub struct RefValue<'a> {
  38. key: &'a Bytes,
  39. slot: RwLockReadGuard<'a, HashMap<Bytes, Entry>>,
  40. }
  41. impl<'a> RefValue<'a> {
  42. /// Consumes the RefValue and a new cloned Value, only if the Value is
  43. /// scalar, otherwise a WrongType error is returned (casted as a Value)
  44. #[inline(always)]
  45. pub fn into_inner(self) -> Value {
  46. self.slot
  47. .get(self.key)
  48. .filter(|x| x.is_valid())
  49. .map(|x| {
  50. if x.is_scalar() {
  51. x.get().clone()
  52. } else {
  53. Error::WrongType.into()
  54. }
  55. })
  56. .unwrap_or_default()
  57. }
  58. /// Gets an optional reference to the read guarded value
  59. pub fn inner(&self) -> Option<RwLockReadGuard<'_, Value>> {
  60. self.slot
  61. .get(self.key)
  62. .filter(|x| x.is_valid())
  63. .map(|x| x.get())
  64. }
  65. /// Gets an optional reference to the write guarded value
  66. pub fn inner_mut(&self) -> Option<RwLockWriteGuard<'_, Value>> {
  67. self.slot
  68. .get(self.key)
  69. .filter(|x| x.is_valid())
  70. .map(|x| x.get_mut())
  71. }
  72. /// map
  73. ///
  74. /// Maps an `Option<T>` to `Option<U>` by applying a function to a contained
  75. /// value (if `Some`) or returns `None` (if `None`). The RefValue is
  76. /// consumed so the underlying ReadGuard is dropped so slot is free as this
  77. /// function returns
  78. #[inline(always)]
  79. pub fn map<T, F>(self, f: F) -> Option<T>
  80. where
  81. F: FnOnce(&Value) -> T,
  82. {
  83. self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
  84. let value = x.get();
  85. f(value.deref())
  86. })
  87. }
  88. /// map_ut
  89. ///
  90. /// Maps an `Option<&mut T>` to `Option<&mut U>` by applying a function to a contained
  91. /// value (if `Some`) or returns `None` (if `None`). The RefValue is
  92. /// consumed so the underlying ReadGuard is dropped so slot is free as this
  93. /// function returns
  94. #[inline(always)]
  95. pub fn map_mut<T, F>(self, f: F) -> Option<T>
  96. where
  97. F: FnOnce(&mut Value) -> T,
  98. {
  99. self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
  100. let mut value = x.get_mut();
  101. f(value.deref_mut())
  102. })
  103. }
  104. /// Returns the version of a given key
  105. #[inline(always)]
  106. pub fn version(&self) -> usize {
  107. self.slot
  108. .get(self.key)
  109. .filter(|x| x.is_valid())
  110. .map(|x| x.version())
  111. .unwrap_or_default()
  112. }
  113. }
  114. /// Database structure
  115. ///
  116. /// Each connection has their own clone of the database and the conn_id is stored in each instance.
  117. /// The slots property is shared for all connections.
  118. ///
  119. /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
  120. /// HashMaps. Each key is pre-sharded and a bucket is selected. By doing this pre-step instead of
  121. /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
  122. /// making this database implementation thread-friendly. The number of number_of_slots available cannot be
  123. /// changed at runtime.
  124. ///
  125. /// The database is also aware of other connections locking other keys exclusively (for
  126. /// transactions).
  127. ///
  128. /// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
  129. /// versioning (in practice the nanosecond of last modification).
  130. #[derive(Debug)]
  131. pub struct Db {
  132. /// A vector of hashmaps.
  133. ///
  134. /// Instead of having a single HashMap, and having all threads fighting for
  135. /// blocking the single HashMap, we have a vector of N HashMap
  136. /// (configurable), which in theory allow to have faster reads and writes.
  137. ///
  138. /// Because all operations are always key specific, the key is used to hash
  139. /// and select to which HashMap the data might be stored.
  140. slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
  141. /// Data structure to store all expiring keys
  142. expirations: Arc<Mutex<ExpirationDb>>,
  143. /// Key changes subscriptions hash. This hash contains all the senders to
  144. /// key subscriptions. If a key does not exists here it means that no-one
  145. /// wants to be notified of the current key changes.
  146. change_subscriptions: Arc<RwLock<HashMap<Bytes, Sender<()>>>>,
  147. /// Number of HashMaps that are available.
  148. number_of_slots: usize,
  149. /// Databases unique ID. This is an internal identifier to avoid deadlocks
  150. /// when copying and moving data between databases.
  151. pub db_id: usize,
  152. /// Current connection ID
  153. ///
  154. /// A Database is attached to a conn_id. The slots and expiration data
  155. /// structures are shared between all connections, regardless of conn_id.
  156. ///
  157. /// This particular database instance is attached to a conn_id, which is used
  158. /// to lock keys exclusively for transactions and other atomic operations.
  159. conn_id: u128,
  160. /// HashMap of all blocked keys by other connections. If a key appears in
  161. /// here and it is not being hold by the current connection, current
  162. /// connection must wait.
  163. tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
  164. }
  165. impl Db {
  166. /// Creates a new database instance
  167. pub fn new(number_of_slots: usize) -> Self {
  168. let slots = (0..number_of_slots)
  169. .map(|_| RwLock::new(HashMap::new()))
  170. .collect();
  171. Self {
  172. slots: Arc::new(slots),
  173. expirations: Arc::new(Mutex::new(ExpirationDb::new())),
  174. change_subscriptions: Arc::new(RwLock::new(HashMap::new())),
  175. conn_id: 0,
  176. db_id: unique_id(),
  177. tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
  178. number_of_slots,
  179. }
  180. }
  181. /// Attaches a conn_id to a database instance.
  182. ///
  183. /// This conn_id is used to lock entries for a given conn_id.
  184. pub fn set_conn_id(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
  185. Arc::new(Self {
  186. slots: self.slots.clone(),
  187. tx_key_locks: self.tx_key_locks.clone(),
  188. expirations: self.expirations.clone(),
  189. change_subscriptions: self.change_subscriptions.clone(),
  190. conn_id,
  191. db_id: self.db_id,
  192. number_of_slots: self.number_of_slots,
  193. })
  194. }
  195. #[inline]
  196. /// Returns a slot where a key may be hosted.
  197. ///
  198. /// In order to avoid too much locks, instead of having a single hash a
  199. /// database instance is a set of hashes. Each key is pre-shared with a
  200. /// quick hashing algorithm to select a 'slot' or HashMap where it may be
  201. /// hosted.
  202. fn get_slot(&self, key: &Bytes) -> usize {
  203. let id = (hash(key) as usize) % self.number_of_slots;
  204. trace!("selected slot {} for key {:?}", id, key);
  205. let waiting = Duration::from_nanos(100);
  206. while let Some(blocker) = self.tx_key_locks.read().get(key) {
  207. // Loop while the key we are trying to access is being blocked by a
  208. // connection in a transaction
  209. if *blocker == self.conn_id {
  210. // the key is being blocked by ourself, it is safe to break the
  211. // waiting loop
  212. break;
  213. }
  214. thread::sleep(waiting);
  215. }
  216. id
  217. }
  218. /// Locks keys exclusively
  219. ///
  220. /// The locked keys are only accessible (read or write) by the connection
  221. /// that locked them, any other connection must wait until the locking
  222. /// connection releases them.
  223. ///
  224. /// This is used to simulate redis transactions. Transaction in Redis are
  225. /// atomic but pausing a multi threaded Redis just to keep the same promises
  226. /// was a bit extreme, that's the reason why a transaction will lock
  227. /// exclusively all keys involved.
  228. pub fn lock_keys(&self, keys: &[Bytes]) {
  229. let waiting = Duration::from_nanos(100);
  230. loop {
  231. let mut lock = self.tx_key_locks.write();
  232. let mut i = 0;
  233. for key in keys.iter() {
  234. if let Some(blocker) = lock.get(key) {
  235. if *blocker == self.conn_id {
  236. // It is blocked by us already.
  237. i += 1;
  238. continue;
  239. }
  240. // It is blocked by another tx, we need to break
  241. // and retry to gain the lock over this key
  242. break;
  243. }
  244. lock.insert(key.clone(), self.conn_id);
  245. i += 1;
  246. }
  247. if i == keys.len() {
  248. // All the involved keys are successfully being blocked
  249. // exclusively.
  250. break;
  251. }
  252. // We need to sleep a bit and retry.
  253. drop(lock);
  254. thread::sleep(waiting);
  255. }
  256. }
  257. /// Releases the lock on keys
  258. pub fn unlock_keys(&self, keys: &[Bytes]) {
  259. let mut lock = self.tx_key_locks.write();
  260. for key in keys.iter() {
  261. lock.remove(key);
  262. }
  263. }
  264. /// Return debug info for a key
  265. pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
  266. let slot = self.slots[self.get_slot(key)].read();
  267. slot.get(key)
  268. .filter(|x| x.is_valid())
  269. .map(|x| x.get().debug())
  270. .ok_or(Error::NotFound)
  271. }
  272. /// Return the digest for each key. This used for testing only
  273. pub fn digest(&self, keys: &[Bytes]) -> Result<Vec<Value>, Error> {
  274. Ok(keys
  275. .iter()
  276. .map(|key| {
  277. let slot = self.slots[self.get_slot(key)].read();
  278. Value::new(
  279. slot.get(key)
  280. .filter(|v| v.is_valid())
  281. .map(|v| hex::encode(v.digest()))
  282. .unwrap_or("00000".into())
  283. .as_bytes(),
  284. )
  285. })
  286. .collect::<Vec<Value>>())
  287. }
  288. /// Flushes the entire database
  289. pub fn flushdb(&self) -> Result<Value, Error> {
  290. self.expirations.lock().flush();
  291. self.slots
  292. .iter()
  293. .map(|s| {
  294. let mut s = s.write();
  295. s.clear();
  296. })
  297. .for_each(drop);
  298. Ok(Value::Ok)
  299. }
  300. /// Checks if the database is empty
  301. pub fn is_empty(&self) -> bool {
  302. for slot in self.slots.iter() {
  303. if slot.read().len() > 0 {
  304. return false;
  305. }
  306. }
  307. true
  308. }
  309. /// Returns the number of elements in the database
  310. pub fn len(&self) -> Result<usize, Error> {
  311. self.purge();
  312. Ok(self.slots.iter().map(|s| s.read().len()).sum())
  313. }
  314. /// Round numbers to store efficiently, specially float numbers. For instance `1.00` will be converted to `1`.
  315. fn round_numbers<T>(number: T) -> Bytes
  316. where
  317. T: ToString,
  318. {
  319. let number_to_str = number.to_string();
  320. if number_to_str.find('.').is_none() {
  321. return Bytes::copy_from_slice(number_to_str.as_bytes());
  322. }
  323. let number_to_str = number_to_str
  324. .trim_end_matches(|c| c == '0' || c == '.')
  325. .to_string();
  326. Bytes::copy_from_slice(if number_to_str.is_empty() {
  327. b"0"
  328. } else {
  329. number_to_str.as_bytes()
  330. })
  331. }
  332. // Converts a given number to a correct Value, it should be used with Self::round_numbers()
  333. fn number_to_value(number: &[u8]) -> Result<Value, Error> {
  334. if number.iter().any(|x| *x == b'.') {
  335. Ok(Value::new(number))
  336. } else {
  337. Ok(Value::Integer(bytes_to_number(number)?))
  338. }
  339. }
  340. /// Increment a sub-key in a hash
  341. ///
  342. /// If the stored value cannot be converted into a number an error will be thrown
  343. pub fn hincrby<T>(
  344. &self,
  345. key: &Bytes,
  346. sub_key: &Bytes,
  347. incr_by: &Bytes,
  348. typ: &str,
  349. ) -> Result<Value, Error>
  350. where
  351. T: ToString
  352. + FromStr
  353. + CheckedAdd
  354. + for<'a> TryFrom<&'a Value, Error = Error>
  355. + Into<Value>
  356. + Copy,
  357. {
  358. let slot_id = self.get_slot(key);
  359. let slot = self.slots[slot_id].read();
  360. let mut incr_by: T =
  361. bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
  362. if let Some(x) = slot
  363. .get(key)
  364. .filter(|x| x.is_valid())
  365. .map(|x| x.get_mut())
  366. .map(|mut x| match x.deref_mut() {
  367. Value::Hash(ref mut h) => {
  368. if let Some(n) = h.get(sub_key) {
  369. incr_by = incr_by
  370. .checked_add(
  371. &bytes_to_number(n)
  372. .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
  373. )
  374. .ok_or(Error::Overflow)?;
  375. }
  376. let incr_by_bytes = Self::round_numbers(incr_by);
  377. h.insert(sub_key.clone(), incr_by_bytes.clone());
  378. Self::number_to_value(&incr_by_bytes)
  379. }
  380. _ => Err(Error::WrongType),
  381. })
  382. {
  383. return x;
  384. }
  385. drop(slot);
  386. #[allow(clippy::mutable_key_type)]
  387. let mut h = HashMap::new();
  388. let incr_by_bytes = Self::round_numbers(incr_by);
  389. h.insert(sub_key.clone(), incr_by_bytes.clone());
  390. let _ = self.slots[slot_id]
  391. .write()
  392. .insert(key.clone(), Entry::new(h.into(), None));
  393. Self::number_to_value(&incr_by_bytes)
  394. }
  395. /// Increments a key's value by a given number
  396. ///
  397. /// If the stored value cannot be converted into a number an error will be
  398. /// thrown.
  399. pub fn incr<T>(&self, key: &Bytes, incr_by: T) -> Result<T, Error>
  400. where
  401. T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
  402. {
  403. let slot_id = self.get_slot(key);
  404. let slot = self.slots[slot_id].read();
  405. if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
  406. if !entry.is_scalar() {
  407. return Err(Error::WrongType);
  408. }
  409. let mut value = entry.get_mut();
  410. let mut number: T = (&*value).try_into()?;
  411. number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
  412. *value = Value::Blob(Self::round_numbers(number));
  413. entry.bump_version();
  414. Ok(number)
  415. } else {
  416. drop(slot);
  417. self.slots[slot_id].write().insert(
  418. key.clone(),
  419. Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
  420. );
  421. Ok(incr_by)
  422. }
  423. }
  424. /// Removes any expiration associated with a given key
  425. pub fn persist(&self, key: &Bytes) -> Value {
  426. let slot = self.slots[self.get_slot(key)].read();
  427. slot.get(key)
  428. .filter(|x| x.is_valid())
  429. .map_or(0.into(), |x| {
  430. if x.has_ttl() {
  431. self.expirations.lock().remove(key);
  432. x.persist();
  433. 1.into()
  434. } else {
  435. 0.into()
  436. }
  437. })
  438. }
  439. /// Set time to live for a given key
  440. pub fn set_ttl(
  441. &self,
  442. key: &Bytes,
  443. expires_in: Duration,
  444. opts: ExpirationOpts,
  445. ) -> Result<Value, Error> {
  446. if opts.if_none && (opts.replace_only || opts.greater_than || opts.lower_than) {
  447. return Err(Error::OptsNotCompatible("NX and XX, GT or LT".to_owned()));
  448. }
  449. if opts.greater_than && opts.lower_than {
  450. return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
  451. }
  452. let slot = self.slots[self.get_slot(key)].read();
  453. let expires_at = Instant::now()
  454. .checked_add(expires_in)
  455. .unwrap_or_else(far_future);
  456. Ok(slot
  457. .get(key)
  458. .filter(|x| x.is_valid())
  459. .map_or(0.into(), |x| {
  460. let current_expire = x.get_ttl();
  461. if opts.if_none && current_expire.is_some() {
  462. return 0.into();
  463. }
  464. if opts.replace_only && current_expire.is_none() {
  465. return 0.into();
  466. }
  467. if opts.greater_than {
  468. if let Some(current_expire) = current_expire {
  469. if expires_at <= current_expire {
  470. return 0.into();
  471. }
  472. } else {
  473. return 0.into();
  474. }
  475. }
  476. if opts.lower_than {
  477. if let Some(current_expire) = current_expire {
  478. if expires_at >= current_expire {
  479. return 0.into();
  480. }
  481. }
  482. }
  483. self.expirations.lock().add(key, expires_at);
  484. x.set_ttl(expires_at);
  485. 1.into()
  486. }))
  487. }
  488. /// Overwrites part of the string stored at key, starting at the specified
  489. /// offset, for the entire length of value. If the offset is larger than the
  490. /// current length of the string at key, the string is padded with zero-bytes to
  491. /// make offset fit. Non-existing keys are considered as empty strings, so this
  492. /// command will make sure it holds a string large enough to be able to set
  493. /// value at offset.
  494. pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
  495. let slot_id = self.get_slot(key);
  496. let slot = self.slots[slot_id].read();
  497. let mut value = slot
  498. .get(key)
  499. .map(|value| {
  500. value.ensure_blob_is_mutable()?;
  501. if !value.is_valid() {
  502. self.expirations.lock().remove(key);
  503. value.persist();
  504. }
  505. Ok::<_, Error>(value.get_mut())
  506. })
  507. .transpose()?;
  508. if offset < 0 {
  509. return Err(Error::OutOfRange);
  510. }
  511. if offset >= 512 * 1024 * 1024 - 4 {
  512. return Err(Error::MaxAllowedSize);
  513. }
  514. let length = offset as usize + data.len();
  515. if let Some(value) = value.as_mut() {
  516. match value.deref_mut() {
  517. Value::BlobRw(ref mut bytes) => {
  518. if bytes.capacity() < length {
  519. bytes.resize(length, 0);
  520. }
  521. let writer = &mut bytes[offset as usize..length];
  522. writer.copy_from_slice(data);
  523. Ok(bytes.len().into())
  524. }
  525. _ => Err(Error::WrongType),
  526. }
  527. } else {
  528. drop(value);
  529. drop(slot);
  530. if data.is_empty() {
  531. return Ok(0.into());
  532. }
  533. let mut bytes = BytesMut::new();
  534. bytes.resize(length, 0);
  535. let writer = &mut bytes[offset as usize..];
  536. writer.copy_from_slice(data);
  537. self.slots[slot_id]
  538. .write()
  539. .insert(key.clone(), Entry::new(Value::new(&bytes), None));
  540. Ok(bytes.len().into())
  541. }
  542. }
  543. /// Copies a key
  544. pub fn copy(
  545. &self,
  546. source: Bytes,
  547. target: Bytes,
  548. replace: Override,
  549. target_db: Option<Arc<Db>>,
  550. ) -> Result<bool, Error> {
  551. let slot = self.slots[self.get_slot(&source)].read();
  552. let value = if let Some(value) = slot.get(&source).filter(|x| x.is_valid()) {
  553. value.clone()
  554. } else {
  555. return Ok(false);
  556. };
  557. drop(slot);
  558. if let Some(db) = target_db {
  559. if db.db_id == self.db_id && source == target {
  560. return Err(Error::SameEntry);
  561. }
  562. if replace == Override::No && db.exists(&[target.clone()]) > 0 {
  563. return Ok(false);
  564. }
  565. let ttl = value.get_ttl().map(|v| v - Instant::now());
  566. let _ = db.set_advanced(target, value.take_value(), ttl, replace, false, false);
  567. Ok(true)
  568. } else {
  569. if source == target {
  570. return Err(Error::SameEntry);
  571. }
  572. if replace == Override::No && self.exists(&[target.clone()]) > 0 {
  573. return Ok(false);
  574. }
  575. let mut slot = self.slots[self.get_slot(&target)].write();
  576. slot.insert(target, value);
  577. Ok(true)
  578. }
  579. }
  580. /// Moves a given key between databases
  581. pub fn move_key(&self, source: Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
  582. if self.db_id == target_db.db_id {
  583. return Err(Error::SameEntry);
  584. }
  585. let mut slot = self.slots[self.get_slot(&source)].write();
  586. let (expires_in, value) = if let Some(value) = slot.get(&source).filter(|v| v.is_valid()) {
  587. (
  588. value.get_ttl().map(|t| t - Instant::now()),
  589. value.get().clone(),
  590. )
  591. } else {
  592. return Ok(false);
  593. };
  594. if Value::Integer(1)
  595. == target_db.set_advanced(
  596. source.clone(),
  597. value,
  598. expires_in,
  599. Override::No,
  600. false,
  601. false,
  602. )
  603. {
  604. slot.remove(&source);
  605. Ok(true)
  606. } else {
  607. Ok(false)
  608. }
  609. }
  610. /// Return a random key from the database
  611. pub fn randomkey(&self) -> Result<Value, Error> {
  612. let mut rng = rand::thread_rng();
  613. let mut candidates = self
  614. .slots
  615. .iter()
  616. .filter_map(|slot| {
  617. let slot = slot.read();
  618. if slot.is_empty() {
  619. None
  620. } else {
  621. slot.iter()
  622. .nth(rng.gen_range(0..slot.len()))
  623. .map(|(k, _v)| k.clone())
  624. }
  625. })
  626. .collect::<Vec<Bytes>>();
  627. candidates.shuffle(&mut rng);
  628. Ok(candidates.first().into())
  629. }
  630. /// Renames a key
  631. pub fn rename(
  632. &self,
  633. source: &Bytes,
  634. target: &Bytes,
  635. override_value: Override,
  636. ) -> Result<bool, Error> {
  637. let slot1 = self.get_slot(source);
  638. let slot2 = self.get_slot(target);
  639. let result = if slot1 == slot2 {
  640. let mut slot = self.slots[slot1].write();
  641. if override_value == Override::No && slot.get(target).is_some() {
  642. return Ok(false);
  643. }
  644. if let Some(value) = slot.remove(source) {
  645. slot.insert(target.clone(), value);
  646. Ok(true)
  647. } else {
  648. Err(Error::NotFound)
  649. }
  650. } else {
  651. let mut slot1 = self.slots[slot1].write();
  652. let mut slot2 = self.slots[slot2].write();
  653. if override_value == Override::No && slot2.get(target).is_some() {
  654. return Ok(false);
  655. }
  656. if let Some(value) = slot1.remove(source) {
  657. slot2.insert(target.clone(), value);
  658. Ok(true)
  659. } else {
  660. Err(Error::NotFound)
  661. }
  662. };
  663. if result.is_ok() {
  664. self.bump_version(source);
  665. self.bump_version(target);
  666. }
  667. result
  668. }
  669. /// Removes keys from the database
  670. pub fn del(&self, keys: &[Bytes]) -> Value {
  671. let mut expirations = self.expirations.lock();
  672. keys.iter()
  673. .filter_map(|key| {
  674. expirations.remove(key);
  675. self.slots[self.get_slot(key)].write().remove(key)
  676. })
  677. .filter(|key| key.is_valid())
  678. .count()
  679. .into()
  680. }
  681. /// Returns all keys that matches a given pattern. This is a very expensive command.
  682. pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
  683. let pattern = String::from_utf8_lossy(pattern);
  684. let pattern =
  685. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
  686. Ok(self
  687. .slots
  688. .iter()
  689. .flat_map(|slot| {
  690. slot.read()
  691. .keys()
  692. .filter(|key| {
  693. let str_key = String::from_utf8_lossy(key);
  694. pattern.matches(&str_key)
  695. })
  696. .map(|key| Value::new(key))
  697. .collect::<Vec<Value>>()
  698. })
  699. .collect())
  700. }
  701. /// Check if keys exists in the database
  702. pub fn exists(&self, keys: &[Bytes]) -> usize {
  703. let mut matches = 0;
  704. keys.iter()
  705. .map(|key| {
  706. let slot = self.slots[self.get_slot(key)].read();
  707. if let Some(key) = slot.get(key) {
  708. matches += if key.is_valid() { 1 } else { 0 };
  709. }
  710. })
  711. .for_each(drop);
  712. matches
  713. }
  714. /// Updates the entry version of a given key
  715. pub fn bump_version(&self, key: &Bytes) -> bool {
  716. let slot = self.slots[self.get_slot(key)].read();
  717. let to_return = slot
  718. .get(key)
  719. .filter(|x| x.is_valid())
  720. .map(|entry| {
  721. entry.bump_version();
  722. })
  723. .is_some();
  724. drop(slot);
  725. if to_return {
  726. let senders = self.change_subscriptions.read();
  727. if let Some(sender) = senders.get(key) {
  728. if sender.receiver_count() == 0 {
  729. // Garbage collection
  730. drop(senders);
  731. self.change_subscriptions.write().remove(key);
  732. } else {
  733. // Notify
  734. let _ = sender.send(());
  735. }
  736. }
  737. }
  738. to_return
  739. }
  740. /// Subscribe to key changes.
  741. pub fn subscribe_to_key_changes(&self, keys: &[Bytes]) -> Vec<Receiver<()>> {
  742. let mut subscriptions = self.change_subscriptions.write();
  743. keys.iter()
  744. .map(|key| {
  745. if let Some(sender) = subscriptions.get(key) {
  746. sender.subscribe()
  747. } else {
  748. let (sender, receiver) = broadcast::channel(1);
  749. subscriptions.insert(key.clone(), sender);
  750. receiver
  751. }
  752. })
  753. .collect()
  754. }
  755. /// Returns the name of the value type
  756. pub fn get_data_type(&self, key: &Bytes) -> String {
  757. let slot = self.slots[self.get_slot(key)].read();
  758. slot.get(key)
  759. .filter(|x| x.is_valid())
  760. .map_or("none".to_owned(), |x| {
  761. Typ::get_type(&x.get()).to_string().to_lowercase()
  762. })
  763. }
  764. /// Get a ref value
  765. pub fn get<'a>(&'a self, key: &'a Bytes) -> RefValue<'a> {
  766. RefValue {
  767. slot: self.slots[self.get_slot(key)].read(),
  768. key,
  769. }
  770. }
  771. /// Get a copy of an entry and modifies the expiration of the key
  772. pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
  773. let slot = self.slots[self.get_slot(key)].read();
  774. slot.get(key)
  775. .filter(|x| x.is_valid())
  776. .map(|value| {
  777. if make_persistent {
  778. self.expirations.lock().remove(key);
  779. value.persist();
  780. } else if let Some(expires_in) = expires_in {
  781. let expires_at = Instant::now()
  782. .checked_add(expires_in)
  783. .unwrap_or_else(far_future);
  784. self.expirations.lock().add(key, expires_at);
  785. value.set_ttl(expires_at);
  786. }
  787. value
  788. })
  789. .map_or(Value::Null, |x| x.clone_value())
  790. }
  791. /// Get multiple copies of entries
  792. pub fn get_multi(&self, keys: VecDeque<Bytes>) -> Value {
  793. keys.iter()
  794. .map(|key| {
  795. let slot = self.slots[self.get_slot(key)].read();
  796. slot.get(key)
  797. .filter(|x| x.is_valid() && x.is_scalar())
  798. .map_or(Value::Null, |x| x.clone_value())
  799. })
  800. .collect::<Vec<Value>>()
  801. .into()
  802. }
  803. /// Get a key or set a new value for the given key.
  804. pub fn getset(&self, key: &Bytes, value: Value) -> Value {
  805. let mut slot = self.slots[self.get_slot(key)].write();
  806. self.expirations.lock().remove(key);
  807. slot.insert(key.clone(), Entry::new(value, None))
  808. .filter(|x| x.is_valid())
  809. .map_or(Value::Null, |x| x.clone_value())
  810. }
  811. /// Takes an entry from the database.
  812. pub fn getdel(&self, key: &Bytes) -> Value {
  813. let mut slot = self.slots[self.get_slot(key)].write();
  814. slot.remove(key).map_or(Value::Null, |x| {
  815. self.expirations.lock().remove(key);
  816. x.clone_value()
  817. })
  818. }
  819. /// Set a key, value with an optional expiration time
  820. pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
  821. let slot = self.slots[self.get_slot(key)].read();
  822. if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
  823. entry.ensure_blob_is_mutable()?;
  824. match *entry.get_mut() {
  825. Value::BlobRw(ref mut value) => {
  826. value.put(value_to_append.as_ref());
  827. Ok(value.len().into())
  828. }
  829. _ => Err(Error::WrongType),
  830. }
  831. } else {
  832. drop(slot);
  833. let mut slot = self.slots[self.get_slot(key)].write();
  834. slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
  835. Ok(value_to_append.len().into())
  836. }
  837. }
  838. /// Set multiple key/value pairs. Are involved keys are locked exclusively
  839. /// like a transaction.
  840. ///
  841. /// If override_all is set to false, all entries must be new entries or the
  842. /// entire operation fails, in this case 1 or is returned. Otherwise `Ok` is
  843. /// returned.
  844. pub fn multi_set(
  845. &self,
  846. key_values: VecDeque<Bytes>,
  847. override_all: bool,
  848. ) -> Result<Value, Error> {
  849. if key_values.len() % 2 == 1 {
  850. return Err(Error::Syntax);
  851. }
  852. let mut keys = vec![];
  853. let mut values = vec![];
  854. key_values.into_iter().enumerate().for_each(|(key, val)| {
  855. if key % 2 == 0 {
  856. keys.push(val);
  857. } else {
  858. values.push(val);
  859. }
  860. });
  861. let to_lock = keys.clone();
  862. self.lock_keys(&to_lock);
  863. if !override_all {
  864. for key in keys.iter() {
  865. let slot = self.slots[self.get_slot(key)].read();
  866. if slot.get(key).is_some() {
  867. self.unlock_keys(&keys);
  868. return Ok(0.into());
  869. }
  870. }
  871. }
  872. let mut values = values.into_iter();
  873. for key in keys.into_iter() {
  874. let mut slot = self.slots[self.get_slot(&key)].write();
  875. if let Some(value) = values.next() {
  876. slot.insert(key, Entry::new(Value::Blob(value), None));
  877. }
  878. }
  879. self.unlock_keys(&to_lock);
  880. if override_all {
  881. Ok(Value::Ok)
  882. } else {
  883. Ok(1.into())
  884. }
  885. }
  886. /// Set a key, value with an optional expiration time
  887. pub fn set(&self, key: Bytes, value: Value, expires_in: Option<Duration>) -> Value {
  888. self.set_advanced(key, value, expires_in, Default::default(), false, false)
  889. }
  890. /// Set a value in the database with various settings
  891. pub fn set_advanced(
  892. &self,
  893. key: Bytes,
  894. value: Value,
  895. expires_in: Option<Duration>,
  896. override_value: Override,
  897. keep_ttl: bool,
  898. return_previous: bool,
  899. ) -> Value {
  900. let mut slot = self.slots[self.get_slot(&key)].write();
  901. let expires_at = expires_in.map(|duration| {
  902. Instant::now()
  903. .checked_add(duration)
  904. .unwrap_or_else(far_future)
  905. });
  906. let previous = slot.get(&key).filter(|x| x.is_valid());
  907. let expires_at = if keep_ttl {
  908. if let Some(previous) = previous {
  909. previous.get_ttl()
  910. } else {
  911. expires_at
  912. }
  913. } else {
  914. expires_at
  915. };
  916. let to_return = if return_previous {
  917. let previous = previous.map_or(Value::Null, |v| v.clone_value());
  918. if previous.is_err() {
  919. // Error while trying to clone the previous value to return, we
  920. // must halt and return immediately.
  921. return previous;
  922. }
  923. Some(previous)
  924. } else {
  925. None
  926. };
  927. match override_value {
  928. Override::No => {
  929. if previous.is_some() {
  930. return if let Some(to_return) = to_return {
  931. to_return
  932. } else {
  933. 0.into()
  934. };
  935. }
  936. }
  937. Override::Only => {
  938. if previous.is_none() {
  939. return if let Some(to_return) = to_return {
  940. to_return
  941. } else {
  942. 0.into()
  943. };
  944. }
  945. }
  946. _ => {}
  947. };
  948. if let Some(expires_at) = expires_at {
  949. self.expirations.lock().add(&key, expires_at);
  950. } else {
  951. // Make sure to remove the new key (or replaced) from the
  952. // expiration table (from any possible past value).
  953. self.expirations.lock().remove(&key);
  954. }
  955. slot.insert(key, Entry::new(value, expires_at));
  956. if let Some(to_return) = to_return {
  957. to_return
  958. } else if override_value == Override::Yes {
  959. Value::Ok
  960. } else {
  961. 1.into()
  962. }
  963. }
  964. /// Returns the TTL of a given key
  965. pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
  966. let slot = self.slots[self.get_slot(key)].read();
  967. slot.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
  968. }
  969. /// Check whether a given key is in the list of keys to be purged or not.
  970. /// This function is mainly used for unit testing
  971. pub fn is_key_in_expiration_list(&self, key: &Bytes) -> bool {
  972. self.expirations.lock().has(key)
  973. }
  974. /// Remove expired entries from the database.
  975. ///
  976. /// This function should be called from a background thread every few seconds. Calling it more
  977. /// often is a waste of resources.
  978. ///
  979. /// Expired keys are automatically hidden by the database, this process is just claiming back
  980. /// the memory from those expired keys.
  981. pub fn purge(&self) -> u64 {
  982. let mut expirations = self.expirations.lock();
  983. let mut removed = 0;
  984. trace!("Watching {} keys for expirations", expirations.len());
  985. let keys = expirations.get_expired_keys(None);
  986. drop(expirations);
  987. keys.iter()
  988. .map(|key| {
  989. let mut slot = self.slots[self.get_slot(key)].write();
  990. if slot.remove(key).is_some() {
  991. trace!("Removed key {:?} due timeout", key);
  992. removed += 1;
  993. }
  994. })
  995. .for_each(drop);
  996. removed
  997. }
  998. }
  999. impl scan::Scan for Db {
  1000. fn scan(
  1001. &self,
  1002. cursor: Cursor,
  1003. pattern: Option<Bytes>,
  1004. count: Option<usize>,
  1005. typ: Option<Typ>,
  1006. ) -> Result<scan::Result, Error> {
  1007. let mut keys = vec![];
  1008. let mut slot_id = cursor.bucket as usize;
  1009. let mut last_pos = cursor.last_position as usize;
  1010. let pattern = pattern
  1011. .map(|pattern| {
  1012. let pattern = String::from_utf8_lossy(&pattern);
  1013. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
  1014. })
  1015. .transpose()?;
  1016. loop {
  1017. let slot = if let Some(value) = self.slots.get(slot_id) {
  1018. value.read()
  1019. } else {
  1020. // We iterated through all the entries, time to signal that to
  1021. // the client but returning a "0" cursor.
  1022. slot_id = 0;
  1023. last_pos = 0;
  1024. break;
  1025. };
  1026. for (key, value) in slot.iter().skip(last_pos) {
  1027. if !value.is_valid() {
  1028. // Entry still exists in memory but it is not longer valid
  1029. // and will soon be gargabe collected.
  1030. last_pos += 1;
  1031. continue;
  1032. }
  1033. if let Some(pattern) = &pattern {
  1034. let str_key = String::from_utf8_lossy(key);
  1035. if !pattern.matches(&str_key) {
  1036. last_pos += 1;
  1037. continue;
  1038. }
  1039. }
  1040. if let Some(typ) = &typ {
  1041. if !typ.is_value_type(&value.get()) {
  1042. last_pos += 1;
  1043. continue;
  1044. }
  1045. }
  1046. keys.push(Value::new(key));
  1047. last_pos += 1;
  1048. if keys.len() == count.unwrap_or(10) {
  1049. break;
  1050. }
  1051. }
  1052. if keys.len() == count.unwrap_or(10) {
  1053. break;
  1054. }
  1055. last_pos = 0;
  1056. slot_id += 1;
  1057. }
  1058. Ok(scan::Result {
  1059. cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
  1060. result: keys,
  1061. })
  1062. }
  1063. }
  1064. #[cfg(test)]
  1065. mod test {
  1066. use super::*;
  1067. use crate::{bytes, db::scan::Scan, value::float::Float};
  1068. use std::str::FromStr;
  1069. #[test]
  1070. fn incr_wrong_type() {
  1071. let db = Db::new(100);
  1072. db.set(bytes!(b"num"), Value::Blob(bytes!("some string")), None);
  1073. let r = db.incr(&bytes!("num"), 1);
  1074. assert!(r.is_err());
  1075. assert_eq!(Error::NotANumber, r.expect_err("should fail"));
  1076. assert_eq!(
  1077. Value::Blob(bytes!("some string")),
  1078. db.get(&bytes!("num")).into_inner()
  1079. );
  1080. }
  1081. #[test]
  1082. fn incr_blob_float() {
  1083. let db = Db::new(100);
  1084. db.set(bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
  1085. assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1086. assert_eq!(
  1087. Value::Blob(bytes!("2.2")),
  1088. db.get(&bytes!("num")).into_inner()
  1089. );
  1090. }
  1091. #[test]
  1092. fn incr_blob_int_float() {
  1093. let db = Db::new(100);
  1094. db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
  1095. assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1096. assert_eq!(
  1097. Value::Blob(bytes!("2.1")),
  1098. db.get(&bytes!("num")).into_inner()
  1099. );
  1100. }
  1101. #[test]
  1102. fn incr_blob_int() {
  1103. let db = Db::new(100);
  1104. db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
  1105. assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
  1106. assert_eq!(
  1107. Value::Blob(bytes!("2")),
  1108. db.get(&bytes!("num")).into_inner()
  1109. );
  1110. }
  1111. #[test]
  1112. fn incr_blob_int_set() {
  1113. let db = Db::new(100);
  1114. assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
  1115. assert_eq!(
  1116. Value::Blob(bytes!("1")),
  1117. db.get(&bytes!("num")).into_inner()
  1118. );
  1119. }
  1120. #[test]
  1121. fn incr_blob_float_set() {
  1122. let db = Db::new(100);
  1123. assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1124. assert_eq!(
  1125. Value::Blob(bytes!("1.1")),
  1126. db.get(&bytes!("num")).into_inner()
  1127. );
  1128. }
  1129. #[test]
  1130. fn del() {
  1131. let db = Db::new(100);
  1132. db.set(bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  1133. db.set(bytes!(b"valid"), Value::Ok, None);
  1134. db.set(bytes!(b"expiring"), Value::Ok, Some(Duration::from_secs(5)));
  1135. assert_eq!(
  1136. Value::Integer(2),
  1137. db.del(&[
  1138. bytes!(b"expired"),
  1139. bytes!(b"valid"),
  1140. bytes!(b"expiring"),
  1141. bytes!(b"not_existing_key")
  1142. ])
  1143. );
  1144. }
  1145. #[test]
  1146. fn ttl() {
  1147. let db = Db::new(100);
  1148. db.set(bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  1149. db.set(bytes!(b"valid"), Value::Ok, None);
  1150. db.set(bytes!(b"expiring"), Value::Ok, Some(Duration::from_secs(5)));
  1151. assert_eq!(None, db.ttl(&bytes!(b"expired")));
  1152. assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
  1153. assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
  1154. assert!(matches!(db.ttl(&bytes!(b"expiring")), Some(Some(_))));
  1155. }
  1156. #[test]
  1157. fn persist_bug() {
  1158. let db = Db::new(100);
  1159. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
  1160. assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
  1161. assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
  1162. db.persist(&bytes!(b"one"));
  1163. assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
  1164. }
  1165. #[test]
  1166. fn purge_keys() {
  1167. let db = Db::new(100);
  1168. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1169. // Expired keys should not be returned, even if they are not yet
  1170. // removed by the purge process.
  1171. assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
  1172. // Purge twice
  1173. assert_eq!(1, db.purge());
  1174. assert_eq!(0, db.purge());
  1175. assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
  1176. }
  1177. #[test]
  1178. fn replace_purge_keys() {
  1179. let db = Db::new(100);
  1180. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1181. // Expired keys should not be returned, even if they are not yet
  1182. // removed by the purge process.
  1183. assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
  1184. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
  1185. assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
  1186. // Purge should return 0 as the expired key has been removed already
  1187. assert_eq!(0, db.purge());
  1188. }
  1189. #[test]
  1190. fn scan_skip_expired() {
  1191. let db = Db::new(100);
  1192. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1193. db.set(bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1194. for i in 0u64..20u64 {
  1195. let key: Bytes = i.to_string().into();
  1196. db.set(key, Value::Ok, None);
  1197. }
  1198. let result = db
  1199. .scan(Cursor::from_str("0").unwrap(), None, None, None)
  1200. .unwrap();
  1201. // first 10 records
  1202. assert_eq!(10, result.result.len());
  1203. // make sure the cursor is valid
  1204. assert_ne!("0", result.cursor.to_string());
  1205. let result = db.scan(result.cursor, None, None, None).unwrap();
  1206. // 10 more records
  1207. assert_eq!(10, result.result.len());
  1208. // make sure the cursor is valid
  1209. assert_ne!("0", result.cursor.to_string());
  1210. let result = db.scan(result.cursor, None, None, None).unwrap();
  1211. // No more results!
  1212. assert_eq!(0, result.result.len());
  1213. assert_eq!("0", result.cursor.to_string());
  1214. }
  1215. #[test]
  1216. fn scan_limit() {
  1217. let db = Db::new(10);
  1218. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1219. db.set(bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1220. for i in 0u64..2000u64 {
  1221. let key: Bytes = i.to_string().into();
  1222. db.set(key, Value::Ok, None);
  1223. }
  1224. let result = db
  1225. .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
  1226. .unwrap();
  1227. assert_eq!(2, result.result.len());
  1228. assert_ne!("0", result.cursor.to_string());
  1229. }
  1230. #[test]
  1231. fn scan_filter() {
  1232. let db = Db::new(100);
  1233. db.set(bytes!(b"fone"), Value::Ok, None);
  1234. db.set(bytes!(b"ftwo"), Value::Ok, None);
  1235. for i in 0u64..20u64 {
  1236. let key: Bytes = i.to_string().into();
  1237. db.set(key, Value::Ok, None);
  1238. }
  1239. let result = db
  1240. .scan(
  1241. Cursor::from_str("0").unwrap(),
  1242. Some(bytes!(b"f*")),
  1243. None,
  1244. None,
  1245. )
  1246. .unwrap();
  1247. assert_eq!(2, result.result.len());
  1248. assert_eq!("0", result.cursor.to_string());
  1249. }
  1250. #[tokio::test]
  1251. async fn lock_keys() {
  1252. let db1 = Arc::new(Db::new(100));
  1253. let db2 = db1.clone().set_conn_id(2);
  1254. let db3 = db1.clone().set_conn_id(3);
  1255. let shared = Arc::new(RwLock::new(1));
  1256. let shared1 = shared.clone();
  1257. let shared2 = shared.clone();
  1258. let shared3 = shared.clone();
  1259. let _ = tokio::join!(
  1260. tokio::spawn(async move {
  1261. db1.lock_keys(&["test".into()]);
  1262. let mut x = shared1.write();
  1263. *x = 2;
  1264. thread::sleep(Duration::from_secs(1));
  1265. db1.unlock_keys(&["test".into()]);
  1266. }),
  1267. tokio::spawn(async move {
  1268. db2.lock_keys(&["test".into(), "bar".into()]);
  1269. let mut x = shared2.write();
  1270. if *x == 2 {
  1271. *x = 5;
  1272. }
  1273. thread::sleep(Duration::from_secs(2));
  1274. db2.unlock_keys(&["test".into(), "bar".into()]);
  1275. }),
  1276. tokio::spawn(async move {
  1277. thread::sleep(Duration::from_millis(500));
  1278. assert_eq!(4, db3.get_slot(&"test".into()));
  1279. let mut x = shared3.write();
  1280. if *x == 5 {
  1281. *x = 6;
  1282. }
  1283. }),
  1284. );
  1285. assert_eq!(6, *shared.read());
  1286. }
  1287. }