1
0

mod.rs 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424
  1. //! # in-memory database
  2. //!
  3. //! This database module is the core of the miniredis project. All other modules around this
  4. //! database module.
  5. use self::utils::{far_future, ExpirationOpts, Override};
  6. use crate::{
  7. error::Error,
  8. value::{bytes_to_number, cursor::Cursor, typ::Typ, VDebug, Value},
  9. };
  10. use bytes::{BufMut, Bytes, BytesMut};
  11. use entry::{unique_id, Entry};
  12. use expiration::ExpirationDb;
  13. use glob::Pattern;
  14. use log::trace;
  15. use num_traits::CheckedAdd;
  16. use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
  17. use rand::{prelude::SliceRandom, Rng};
  18. use seahash::hash;
  19. use std::{
  20. collections::{HashMap, VecDeque},
  21. convert::{TryFrom, TryInto},
  22. ops::{Deref, DerefMut},
  23. str::FromStr,
  24. sync::Arc,
  25. thread,
  26. };
  27. use tokio::{
  28. sync::broadcast::{self, Receiver, Sender},
  29. time::{Duration, Instant},
  30. };
  31. mod entry;
  32. mod expiration;
  33. pub mod pool;
  34. pub mod scan;
  35. pub(crate) mod utils;
  36. /// Read only reference
  37. pub struct RefValue<'a> {
  38. key: &'a Bytes,
  39. slot: RwLockReadGuard<'a, HashMap<Bytes, Entry>>,
  40. }
  41. impl<'a> RefValue<'a> {
  42. /// test
  43. #[inline(always)]
  44. pub fn into_inner(self) -> Value {
  45. self.slot
  46. .get(self.key)
  47. .filter(|x| x.is_valid())
  48. .map(|x| {
  49. if x.is_scalar() {
  50. x.get().clone()
  51. } else {
  52. Error::WrongType.into()
  53. }
  54. })
  55. .unwrap_or_default()
  56. }
  57. /// test
  58. pub fn inner(&self) -> Option<RwLockReadGuard<'_, Value>> {
  59. self.slot
  60. .get(self.key)
  61. .filter(|x| x.is_valid())
  62. .map(|x| x.get())
  63. }
  64. /// test
  65. pub fn inner_mut(&self) -> Option<RwLockWriteGuard<'_, Value>> {
  66. self.slot
  67. .get(self.key)
  68. .filter(|x| x.is_valid())
  69. .map(|x| x.get_mut())
  70. }
  71. /// map_mut
  72. #[inline(always)]
  73. pub fn map<T, F>(self, f: F) -> Option<T>
  74. where
  75. F: FnOnce(&Value) -> T,
  76. {
  77. self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
  78. let value = x.get();
  79. f(value.deref())
  80. })
  81. }
  82. /// map_mut
  83. #[inline(always)]
  84. pub fn map_mut<T, F>(self, f: F) -> Option<T>
  85. where
  86. F: FnOnce(&mut Value) -> T,
  87. {
  88. self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
  89. let mut value = x.get_mut();
  90. f(value.deref_mut())
  91. })
  92. }
  93. /// Returns the version of a given key
  94. #[inline(always)]
  95. pub fn version(&self) -> usize {
  96. self.slot
  97. .get(self.key)
  98. .filter(|x| x.is_valid())
  99. .map(|x| x.version())
  100. .unwrap_or_default()
  101. }
  102. }
  103. /// Database structure
  104. ///
  105. /// Each connection has their own clone of the database and the conn_id is stored in each instance.
  106. /// The slots property is shared for all connections.
  107. ///
  108. /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
  109. /// HashMaps. Each key is pre-sharded and a bucket is selected. By doing this pre-step instead of
  110. /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
  111. /// making this database implementation thread-friendly. The number of number_of_slots available cannot be
  112. /// changed at runtime.
  113. ///
  114. /// The database is also aware of other connections locking other keys exclusively (for
  115. /// transactions).
  116. ///
  117. /// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
  118. /// versioning (in practice the nanosecond of last modification).
  119. #[derive(Debug)]
  120. pub struct Db {
  121. /// A vector of hashmaps.
  122. ///
  123. /// Instead of having a single HashMap, and having all threads fighting for
  124. /// blocking the single HashMap, we have a vector of N HashMap
  125. /// (configurable), which in theory allow to have faster reads and writes.
  126. ///
  127. /// Because all operations are always key specific, the key is used to hash
  128. /// and select to which HashMap the data might be stored.
  129. slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
  130. /// Data structure to store all expiring keys
  131. expirations: Arc<Mutex<ExpirationDb>>,
  132. /// Key changes subscriptions hash. This hash contains all the senders to
  133. /// key subscriptions. If a key does not exists here it means that no-one
  134. /// wants to be notified of the current key changes.
  135. change_subscriptions: Arc<RwLock<HashMap<Bytes, Sender<()>>>>,
  136. /// Number of HashMaps that are available.
  137. number_of_slots: usize,
  138. /// Databases unique ID. This is an internal identifier to avoid deadlocks
  139. /// when copying and moving data between databases.
  140. pub db_id: usize,
  141. /// Current connection ID
  142. ///
  143. /// A Database is attached to a conn_id. The slots and expiration data
  144. /// structures are shared between all connections, regardless of conn_id.
  145. ///
  146. /// This particular database instance is attached to a conn_id, which is used
  147. /// to lock keys exclusively for transactions and other atomic operations.
  148. conn_id: u128,
  149. /// HashMap of all blocked keys by other connections. If a key appears in
  150. /// here and it is not being hold by the current connection, current
  151. /// connection must wait.
  152. tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
  153. }
  154. impl Db {
  155. /// Creates a new database instance
  156. pub fn new(number_of_slots: usize) -> Self {
  157. let slots = (0..number_of_slots)
  158. .map(|_| RwLock::new(HashMap::new()))
  159. .collect();
  160. Self {
  161. slots: Arc::new(slots),
  162. expirations: Arc::new(Mutex::new(ExpirationDb::new())),
  163. change_subscriptions: Arc::new(RwLock::new(HashMap::new())),
  164. conn_id: 0,
  165. db_id: unique_id(),
  166. tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
  167. number_of_slots,
  168. }
  169. }
  170. /// Creates a new Database instance bound to a connection.
  171. ///
  172. /// This is particular useful when locking keys exclusively.
  173. ///
  174. /// All the internal data are shjared through an Arc.
  175. pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
  176. Arc::new(Self {
  177. slots: self.slots.clone(),
  178. tx_key_locks: self.tx_key_locks.clone(),
  179. expirations: self.expirations.clone(),
  180. change_subscriptions: self.change_subscriptions.clone(),
  181. conn_id,
  182. db_id: self.db_id,
  183. number_of_slots: self.number_of_slots,
  184. })
  185. }
  186. #[inline]
  187. /// Returns a slot where a key may be hosted.
  188. ///
  189. /// In order to avoid too much locks, instead of having a single hash a
  190. /// database instance is a set of hashes. Each key is pre-shared with a
  191. /// quick hashing algorithm to select a 'slot' or HashMap where it may be
  192. /// hosted.
  193. fn get_slot(&self, key: &Bytes) -> usize {
  194. let id = (hash(key) as usize) % self.number_of_slots;
  195. trace!("selected slot {} for key {:?}", id, key);
  196. let waiting = Duration::from_nanos(100);
  197. while let Some(blocker) = self.tx_key_locks.read().get(key) {
  198. // Loop while the key we are trying to access is being blocked by a
  199. // connection in a transaction
  200. if *blocker == self.conn_id {
  201. // the key is being blocked by ourself, it is safe to break the
  202. // waiting loop
  203. break;
  204. }
  205. thread::sleep(waiting);
  206. }
  207. id
  208. }
  209. /// Locks keys exclusively
  210. ///
  211. /// The locked keys are only accessible (read or write) by the connection
  212. /// that locked them, any other connection must wait until the locking
  213. /// connection releases them.
  214. ///
  215. /// This is used to simulate redis transactions. Transaction in Redis are
  216. /// atomic but pausing a multi threaded Redis just to keep the same promises
  217. /// was a bit extreme, that's the reason why a transaction will lock
  218. /// exclusively all keys involved.
  219. pub fn lock_keys(&self, keys: &[Bytes]) {
  220. let waiting = Duration::from_nanos(100);
  221. loop {
  222. let mut lock = self.tx_key_locks.write();
  223. let mut i = 0;
  224. for key in keys.iter() {
  225. if let Some(blocker) = lock.get(key) {
  226. if *blocker == self.conn_id {
  227. // It is blocked by us already.
  228. i += 1;
  229. continue;
  230. }
  231. // It is blocked by another tx, we need to break
  232. // and retry to gain the lock over this key
  233. break;
  234. }
  235. lock.insert(key.clone(), self.conn_id);
  236. i += 1;
  237. }
  238. if i == keys.len() {
  239. // All the involved keys are successfully being blocked
  240. // exclusively.
  241. break;
  242. }
  243. // We need to sleep a bit and retry.
  244. drop(lock);
  245. thread::sleep(waiting);
  246. }
  247. }
  248. /// Releases the lock on keys
  249. pub fn unlock_keys(&self, keys: &[Bytes]) {
  250. let mut lock = self.tx_key_locks.write();
  251. for key in keys.iter() {
  252. lock.remove(key);
  253. }
  254. }
  255. /// Return debug info for a key
  256. pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
  257. let slot = self.slots[self.get_slot(key)].read();
  258. slot.get(key)
  259. .filter(|x| x.is_valid())
  260. .map(|x| x.get().debug())
  261. .ok_or(Error::NotFound)
  262. }
  263. /// Return the digest for each key. This used for testing only
  264. pub fn digest(&self, keys: &[Bytes]) -> Result<Vec<Value>, Error> {
  265. Ok(keys
  266. .iter()
  267. .map(|key| {
  268. let slot = self.slots[self.get_slot(key)].read();
  269. Value::new(
  270. slot.get(key)
  271. .filter(|v| v.is_valid())
  272. .map(|v| hex::encode(v.digest()))
  273. .unwrap_or("00000".into())
  274. .as_bytes(),
  275. )
  276. })
  277. .collect::<Vec<Value>>())
  278. }
  279. /// Flushes the entire database
  280. pub fn flushdb(&self) -> Result<Value, Error> {
  281. self.expirations.lock().flush();
  282. self.slots
  283. .iter()
  284. .map(|s| {
  285. let mut s = s.write();
  286. s.clear();
  287. })
  288. .for_each(drop);
  289. Ok(Value::Ok)
  290. }
  291. /// Checks if the database is empty
  292. pub fn is_empty(&self) -> bool {
  293. for slot in self.slots.iter() {
  294. if slot.read().len() > 0 {
  295. return false;
  296. }
  297. }
  298. true
  299. }
  300. /// Returns the number of elements in the database
  301. pub fn len(&self) -> Result<usize, Error> {
  302. self.purge();
  303. Ok(self.slots.iter().map(|s| s.read().len()).sum())
  304. }
  305. /// Round numbers to store efficiently, specially float numbers. For instance `1.00` will be converted to `1`.
  306. fn round_numbers<T>(number: T) -> Bytes
  307. where
  308. T: ToString,
  309. {
  310. let number_to_str = number.to_string();
  311. if number_to_str.find('.').is_none() {
  312. return Bytes::copy_from_slice(number_to_str.as_bytes());
  313. }
  314. let number_to_str = number_to_str
  315. .trim_end_matches(|c| c == '0' || c == '.')
  316. .to_string();
  317. Bytes::copy_from_slice(if number_to_str.is_empty() {
  318. b"0"
  319. } else {
  320. number_to_str.as_bytes()
  321. })
  322. }
  323. // Converts a given number to a correct Value, it should be used with Self::round_numbers()
  324. fn number_to_value(number: &[u8]) -> Result<Value, Error> {
  325. if number.iter().any(|x| *x == b'.') {
  326. Ok(Value::new(number))
  327. } else {
  328. Ok(Value::Integer(bytes_to_number(number)?))
  329. }
  330. }
  331. /// Increment a sub-key in a hash
  332. ///
  333. /// If the stored value cannot be converted into a number an error will be thrown
  334. pub fn hincrby<T>(
  335. &self,
  336. key: &Bytes,
  337. sub_key: &Bytes,
  338. incr_by: &Bytes,
  339. typ: &str,
  340. ) -> Result<Value, Error>
  341. where
  342. T: ToString
  343. + FromStr
  344. + CheckedAdd
  345. + for<'a> TryFrom<&'a Value, Error = Error>
  346. + Into<Value>
  347. + Copy,
  348. {
  349. let slot_id = self.get_slot(key);
  350. let slot = self.slots[slot_id].read();
  351. let mut incr_by: T =
  352. bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
  353. if let Some(x) = slot
  354. .get(key)
  355. .filter(|x| x.is_valid())
  356. .map(|x| x.get_mut())
  357. .map(|mut x| match x.deref_mut() {
  358. Value::Hash(ref mut h) => {
  359. if let Some(n) = h.get(sub_key) {
  360. incr_by = incr_by
  361. .checked_add(
  362. &bytes_to_number(n)
  363. .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
  364. )
  365. .ok_or(Error::Overflow)?;
  366. }
  367. let incr_by_bytes = Self::round_numbers(incr_by);
  368. h.insert(sub_key.clone(), incr_by_bytes.clone());
  369. Self::number_to_value(&incr_by_bytes)
  370. }
  371. _ => Err(Error::WrongType),
  372. })
  373. {
  374. return x;
  375. }
  376. drop(slot);
  377. #[allow(clippy::mutable_key_type)]
  378. let mut h = HashMap::new();
  379. let incr_by_bytes = Self::round_numbers(incr_by);
  380. h.insert(sub_key.clone(), incr_by_bytes.clone());
  381. let _ = self.slots[slot_id]
  382. .write()
  383. .insert(key.clone(), Entry::new(h.into(), None));
  384. Self::number_to_value(&incr_by_bytes)
  385. }
  386. /// Increments a key's value by a given number
  387. ///
  388. /// If the stored value cannot be converted into a number an error will be
  389. /// thrown.
  390. pub fn incr<T>(&self, key: &Bytes, incr_by: T) -> Result<T, Error>
  391. where
  392. T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
  393. {
  394. let slot_id = self.get_slot(key);
  395. let slot = self.slots[slot_id].read();
  396. if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
  397. if !entry.is_scalar() {
  398. return Err(Error::WrongType);
  399. }
  400. let mut value = entry.get_mut();
  401. let mut number: T = (&*value).try_into()?;
  402. number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
  403. *value = Value::Blob(Self::round_numbers(number));
  404. entry.bump_version();
  405. Ok(number)
  406. } else {
  407. drop(slot);
  408. self.slots[slot_id].write().insert(
  409. key.clone(),
  410. Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
  411. );
  412. Ok(incr_by)
  413. }
  414. }
  415. /// Removes any expiration associated with a given key
  416. pub fn persist(&self, key: &Bytes) -> Value {
  417. let slot = self.slots[self.get_slot(key)].read();
  418. slot.get(key)
  419. .filter(|x| x.is_valid())
  420. .map_or(0.into(), |x| {
  421. if x.has_ttl() {
  422. self.expirations.lock().remove(key);
  423. x.persist();
  424. 1.into()
  425. } else {
  426. 0.into()
  427. }
  428. })
  429. }
  430. /// Set time to live for a given key
  431. pub fn set_ttl(
  432. &self,
  433. key: &Bytes,
  434. expires_in: Duration,
  435. opts: ExpirationOpts,
  436. ) -> Result<Value, Error> {
  437. if opts.if_none && (opts.replace_only || opts.greater_than || opts.lower_than) {
  438. return Err(Error::OptsNotCompatible("NX and XX, GT or LT".to_owned()));
  439. }
  440. if opts.greater_than && opts.lower_than {
  441. return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
  442. }
  443. let slot = self.slots[self.get_slot(key)].read();
  444. let expires_at = Instant::now()
  445. .checked_add(expires_in)
  446. .unwrap_or_else(far_future);
  447. Ok(slot
  448. .get(key)
  449. .filter(|x| x.is_valid())
  450. .map_or(0.into(), |x| {
  451. let current_expire = x.get_ttl();
  452. if opts.if_none && current_expire.is_some() {
  453. return 0.into();
  454. }
  455. if opts.replace_only && current_expire.is_none() {
  456. return 0.into();
  457. }
  458. if opts.greater_than {
  459. if let Some(current_expire) = current_expire {
  460. if expires_at <= current_expire {
  461. return 0.into();
  462. }
  463. } else {
  464. return 0.into();
  465. }
  466. }
  467. if opts.lower_than {
  468. if let Some(current_expire) = current_expire {
  469. if expires_at >= current_expire {
  470. return 0.into();
  471. }
  472. }
  473. }
  474. self.expirations.lock().add(key, expires_at);
  475. x.set_ttl(expires_at);
  476. 1.into()
  477. }))
  478. }
  479. /// Overwrites part of the string stored at key, starting at the specified
  480. /// offset, for the entire length of value. If the offset is larger than the
  481. /// current length of the string at key, the string is padded with zero-bytes to
  482. /// make offset fit. Non-existing keys are considered as empty strings, so this
  483. /// command will make sure it holds a string large enough to be able to set
  484. /// value at offset.
  485. pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
  486. let slot_id = self.get_slot(key);
  487. let slot = self.slots[slot_id].read();
  488. let mut value = slot
  489. .get(key)
  490. .map(|value| {
  491. value.ensure_blob_is_mutable()?;
  492. if !value.is_valid() {
  493. self.expirations.lock().remove(key);
  494. value.persist();
  495. }
  496. Ok::<_, Error>(value.get_mut())
  497. })
  498. .transpose()?;
  499. if offset < 0 {
  500. return Err(Error::OutOfRange);
  501. }
  502. if offset >= 512 * 1024 * 1024 - 4 {
  503. return Err(Error::MaxAllowedSize);
  504. }
  505. let length = offset as usize + data.len();
  506. if let Some(value) = value.as_mut() {
  507. match value.deref_mut() {
  508. Value::BlobRw(ref mut bytes) => {
  509. if bytes.capacity() < length {
  510. bytes.resize(length, 0);
  511. }
  512. let writer = &mut bytes[offset as usize..length];
  513. writer.copy_from_slice(data);
  514. Ok(bytes.len().into())
  515. }
  516. _ => Err(Error::WrongType),
  517. }
  518. } else {
  519. drop(value);
  520. drop(slot);
  521. if data.is_empty() {
  522. return Ok(0.into());
  523. }
  524. let mut bytes = BytesMut::new();
  525. bytes.resize(length, 0);
  526. let writer = &mut bytes[offset as usize..];
  527. writer.copy_from_slice(data);
  528. self.slots[slot_id]
  529. .write()
  530. .insert(key.clone(), Entry::new(Value::new(&bytes), None));
  531. Ok(bytes.len().into())
  532. }
  533. }
  534. /// Copies a key
  535. pub fn copy(
  536. &self,
  537. source: Bytes,
  538. target: Bytes,
  539. replace: Override,
  540. target_db: Option<Arc<Db>>,
  541. ) -> Result<bool, Error> {
  542. let slot = self.slots[self.get_slot(&source)].read();
  543. let value = if let Some(value) = slot.get(&source).filter(|x| x.is_valid()) {
  544. value.clone()
  545. } else {
  546. return Ok(false);
  547. };
  548. drop(slot);
  549. if let Some(db) = target_db {
  550. if db.db_id == self.db_id && source == target {
  551. return Err(Error::SameEntry);
  552. }
  553. if replace == Override::No && db.exists(&[target.clone()]) > 0 {
  554. return Ok(false);
  555. }
  556. let ttl = value.get_ttl().map(|v| v - Instant::now());
  557. let _ = db.set_advanced(target, value.take_value(), ttl, replace, false, false);
  558. Ok(true)
  559. } else {
  560. if source == target {
  561. return Err(Error::SameEntry);
  562. }
  563. if replace == Override::No && self.exists(&[target.clone()]) > 0 {
  564. return Ok(false);
  565. }
  566. let mut slot = self.slots[self.get_slot(&target)].write();
  567. slot.insert(target, value);
  568. Ok(true)
  569. }
  570. }
  571. /// Moves a given key between databases
  572. pub fn move_key(&self, source: Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
  573. if self.db_id == target_db.db_id {
  574. return Err(Error::SameEntry);
  575. }
  576. let mut slot = self.slots[self.get_slot(&source)].write();
  577. let (expires_in, value) = if let Some(value) = slot.get(&source).filter(|v| v.is_valid()) {
  578. (
  579. value.get_ttl().map(|t| t - Instant::now()),
  580. value.get().clone(),
  581. )
  582. } else {
  583. return Ok(false);
  584. };
  585. if Value::Integer(1)
  586. == target_db.set_advanced(
  587. source.clone(),
  588. value,
  589. expires_in,
  590. Override::No,
  591. false,
  592. false,
  593. )
  594. {
  595. slot.remove(&source);
  596. Ok(true)
  597. } else {
  598. Ok(false)
  599. }
  600. }
  601. /// Return a random key from the database
  602. pub fn randomkey(&self) -> Result<Value, Error> {
  603. let mut rng = rand::thread_rng();
  604. let mut candidates = self
  605. .slots
  606. .iter()
  607. .filter_map(|slot| {
  608. let slot = slot.read();
  609. if slot.is_empty() {
  610. None
  611. } else {
  612. slot.iter()
  613. .nth(rng.gen_range(0..slot.len()))
  614. .map(|(k, _v)| k.clone())
  615. }
  616. })
  617. .collect::<Vec<Bytes>>();
  618. candidates.shuffle(&mut rng);
  619. Ok(candidates.first().into())
  620. }
  621. /// Renames a key
  622. pub fn rename(
  623. &self,
  624. source: &Bytes,
  625. target: &Bytes,
  626. override_value: Override,
  627. ) -> Result<bool, Error> {
  628. let slot1 = self.get_slot(source);
  629. let slot2 = self.get_slot(target);
  630. let result = if slot1 == slot2 {
  631. let mut slot = self.slots[slot1].write();
  632. if override_value == Override::No && slot.get(target).is_some() {
  633. return Ok(false);
  634. }
  635. if let Some(value) = slot.remove(source) {
  636. slot.insert(target.clone(), value);
  637. Ok(true)
  638. } else {
  639. Err(Error::NotFound)
  640. }
  641. } else {
  642. let mut slot1 = self.slots[slot1].write();
  643. let mut slot2 = self.slots[slot2].write();
  644. if override_value == Override::No && slot2.get(target).is_some() {
  645. return Ok(false);
  646. }
  647. if let Some(value) = slot1.remove(source) {
  648. slot2.insert(target.clone(), value);
  649. Ok(true)
  650. } else {
  651. Err(Error::NotFound)
  652. }
  653. };
  654. if result.is_ok() {
  655. self.bump_version(source);
  656. self.bump_version(target);
  657. }
  658. result
  659. }
  660. /// Removes keys from the database
  661. pub fn del(&self, keys: &[Bytes]) -> Value {
  662. let mut expirations = self.expirations.lock();
  663. keys.iter()
  664. .filter_map(|key| {
  665. expirations.remove(key);
  666. self.slots[self.get_slot(key)].write().remove(key)
  667. })
  668. .filter(|key| key.is_valid())
  669. .count()
  670. .into()
  671. }
  672. /// Returns all keys that matches a given pattern. This is a very expensive command.
  673. pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
  674. let pattern = String::from_utf8_lossy(pattern);
  675. let pattern =
  676. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
  677. Ok(self
  678. .slots
  679. .iter()
  680. .flat_map(|slot| {
  681. slot.read()
  682. .keys()
  683. .filter(|key| {
  684. let str_key = String::from_utf8_lossy(key);
  685. pattern.matches(&str_key)
  686. })
  687. .map(|key| Value::new(key))
  688. .collect::<Vec<Value>>()
  689. })
  690. .collect())
  691. }
  692. /// Check if keys exists in the database
  693. pub fn exists(&self, keys: &[Bytes]) -> usize {
  694. let mut matches = 0;
  695. keys.iter()
  696. .map(|key| {
  697. let slot = self.slots[self.get_slot(key)].read();
  698. if let Some(key) = slot.get(key) {
  699. matches += if key.is_valid() { 1 } else { 0 };
  700. }
  701. })
  702. .for_each(drop);
  703. matches
  704. }
  705. /// Updates the entry version of a given key
  706. pub fn bump_version(&self, key: &Bytes) -> bool {
  707. let slot = self.slots[self.get_slot(key)].read();
  708. let to_return = slot
  709. .get(key)
  710. .filter(|x| x.is_valid())
  711. .map(|entry| {
  712. entry.bump_version();
  713. })
  714. .is_some();
  715. drop(slot);
  716. if to_return {
  717. let senders = self.change_subscriptions.read();
  718. if let Some(sender) = senders.get(key) {
  719. if sender.receiver_count() == 0 {
  720. // Garbage collection
  721. drop(senders);
  722. self.change_subscriptions.write().remove(key);
  723. } else {
  724. // Notify
  725. let _ = sender.send(());
  726. }
  727. }
  728. }
  729. to_return
  730. }
  731. /// Subscribe to key changes.
  732. pub fn subscribe_to_key_changes(&self, keys: &[Bytes]) -> Vec<Receiver<()>> {
  733. let mut subscriptions = self.change_subscriptions.write();
  734. keys.iter()
  735. .map(|key| {
  736. if let Some(sender) = subscriptions.get(key) {
  737. sender.subscribe()
  738. } else {
  739. let (sender, receiver) = broadcast::channel(1);
  740. subscriptions.insert(key.clone(), sender);
  741. receiver
  742. }
  743. })
  744. .collect()
  745. }
  746. /// Returns the name of the value type
  747. pub fn get_data_type(&self, key: &Bytes) -> String {
  748. let slot = self.slots[self.get_slot(key)].read();
  749. slot.get(key)
  750. .filter(|x| x.is_valid())
  751. .map_or("none".to_owned(), |x| {
  752. Typ::get_type(&x.get()).to_string().to_lowercase()
  753. })
  754. }
  755. /// Get a ref value
  756. pub fn get<'a>(&'a self, key: &'a Bytes) -> RefValue<'a> {
  757. RefValue {
  758. slot: self.slots[self.get_slot(key)].read(),
  759. key,
  760. }
  761. }
  762. /// Get a copy of an entry and modifies the expiration of the key
  763. pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
  764. let slot = self.slots[self.get_slot(key)].read();
  765. slot.get(key)
  766. .filter(|x| x.is_valid())
  767. .map(|value| {
  768. if make_persistent {
  769. self.expirations.lock().remove(key);
  770. value.persist();
  771. } else if let Some(expires_in) = expires_in {
  772. let expires_at = Instant::now()
  773. .checked_add(expires_in)
  774. .unwrap_or_else(far_future);
  775. self.expirations.lock().add(key, expires_at);
  776. value.set_ttl(expires_at);
  777. }
  778. value
  779. })
  780. .map_or(Value::Null, |x| x.clone_value())
  781. }
  782. /// Get multiple copies of entries
  783. pub fn get_multi(&self, keys: VecDeque<Bytes>) -> Value {
  784. keys.iter()
  785. .map(|key| {
  786. let slot = self.slots[self.get_slot(key)].read();
  787. slot.get(key)
  788. .filter(|x| x.is_valid() && x.is_scalar())
  789. .map_or(Value::Null, |x| x.clone_value())
  790. })
  791. .collect::<Vec<Value>>()
  792. .into()
  793. }
  794. /// Get a key or set a new value for the given key.
  795. pub fn getset(&self, key: &Bytes, value: Value) -> Value {
  796. let mut slot = self.slots[self.get_slot(key)].write();
  797. self.expirations.lock().remove(key);
  798. slot.insert(key.clone(), Entry::new(value, None))
  799. .filter(|x| x.is_valid())
  800. .map_or(Value::Null, |x| x.clone_value())
  801. }
  802. /// Takes an entry from the database.
  803. pub fn getdel(&self, key: &Bytes) -> Value {
  804. let mut slot = self.slots[self.get_slot(key)].write();
  805. slot.remove(key).map_or(Value::Null, |x| {
  806. self.expirations.lock().remove(key);
  807. x.clone_value()
  808. })
  809. }
  810. /// Set a key, value with an optional expiration time
  811. pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
  812. let slot = self.slots[self.get_slot(key)].read();
  813. if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
  814. entry.ensure_blob_is_mutable()?;
  815. match *entry.get_mut() {
  816. Value::BlobRw(ref mut value) => {
  817. value.put(value_to_append.as_ref());
  818. Ok(value.len().into())
  819. }
  820. _ => Err(Error::WrongType),
  821. }
  822. } else {
  823. drop(slot);
  824. let mut slot = self.slots[self.get_slot(key)].write();
  825. slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
  826. Ok(value_to_append.len().into())
  827. }
  828. }
  829. /// Set multiple key/value pairs. Are involved keys are locked exclusively
  830. /// like a transaction.
  831. ///
  832. /// If override_all is set to false, all entries must be new entries or the
  833. /// entire operation fails, in this case 1 or is returned. Otherwise `Ok` is
  834. /// returned.
  835. pub fn multi_set(
  836. &self,
  837. key_values: VecDeque<Bytes>,
  838. override_all: bool,
  839. ) -> Result<Value, Error> {
  840. if key_values.len() % 2 == 1 {
  841. return Err(Error::Syntax);
  842. }
  843. let mut keys = vec![];
  844. let mut values = vec![];
  845. key_values.into_iter().enumerate().for_each(|(key, val)| {
  846. if key % 2 == 0 {
  847. keys.push(val);
  848. } else {
  849. values.push(val);
  850. }
  851. });
  852. let to_lock = keys.clone();
  853. self.lock_keys(&to_lock);
  854. if !override_all {
  855. for key in keys.iter() {
  856. let slot = self.slots[self.get_slot(key)].read();
  857. if slot.get(key).is_some() {
  858. self.unlock_keys(&keys);
  859. return Ok(0.into());
  860. }
  861. }
  862. }
  863. let mut values = values.into_iter();
  864. for key in keys.into_iter() {
  865. let mut slot = self.slots[self.get_slot(&key)].write();
  866. if let Some(value) = values.next() {
  867. slot.insert(key, Entry::new(Value::Blob(value), None));
  868. }
  869. }
  870. self.unlock_keys(&to_lock);
  871. if override_all {
  872. Ok(Value::Ok)
  873. } else {
  874. Ok(1.into())
  875. }
  876. }
  877. /// Set a key, value with an optional expiration time
  878. pub fn set(&self, key: Bytes, value: Value, expires_in: Option<Duration>) -> Value {
  879. self.set_advanced(key, value, expires_in, Default::default(), false, false)
  880. }
  881. /// Set a value in the database with various settings
  882. pub fn set_advanced(
  883. &self,
  884. key: Bytes,
  885. value: Value,
  886. expires_in: Option<Duration>,
  887. override_value: Override,
  888. keep_ttl: bool,
  889. return_previous: bool,
  890. ) -> Value {
  891. let mut slot = self.slots[self.get_slot(&key)].write();
  892. let expires_at = expires_in.map(|duration| {
  893. Instant::now()
  894. .checked_add(duration)
  895. .unwrap_or_else(far_future)
  896. });
  897. let previous = slot.get(&key).filter(|x| x.is_valid());
  898. let expires_at = if keep_ttl {
  899. if let Some(previous) = previous {
  900. previous.get_ttl()
  901. } else {
  902. expires_at
  903. }
  904. } else {
  905. expires_at
  906. };
  907. let to_return = if return_previous {
  908. let previous = previous.map_or(Value::Null, |v| v.clone_value());
  909. if previous.is_err() {
  910. // Error while trying to clone the previous value to return, we
  911. // must halt and return immediately.
  912. return previous;
  913. }
  914. Some(previous)
  915. } else {
  916. None
  917. };
  918. match override_value {
  919. Override::No => {
  920. if previous.is_some() {
  921. return if let Some(to_return) = to_return {
  922. to_return
  923. } else {
  924. 0.into()
  925. };
  926. }
  927. }
  928. Override::Only => {
  929. if previous.is_none() {
  930. return if let Some(to_return) = to_return {
  931. to_return
  932. } else {
  933. 0.into()
  934. };
  935. }
  936. }
  937. _ => {}
  938. };
  939. if let Some(expires_at) = expires_at {
  940. self.expirations.lock().add(&key, expires_at);
  941. } else {
  942. // Make sure to remove the new key (or replaced) from the
  943. // expiration table (from any possible past value).
  944. self.expirations.lock().remove(&key);
  945. }
  946. slot.insert(key, Entry::new(value, expires_at));
  947. if let Some(to_return) = to_return {
  948. to_return
  949. } else if override_value == Override::Yes {
  950. Value::Ok
  951. } else {
  952. 1.into()
  953. }
  954. }
  955. /// Returns the TTL of a given key
  956. pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
  957. let slot = self.slots[self.get_slot(key)].read();
  958. slot.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
  959. }
  960. /// Check whether a given key is in the list of keys to be purged or not.
  961. /// This function is mainly used for unit testing
  962. pub fn is_key_in_expiration_list(&self, key: &Bytes) -> bool {
  963. self.expirations.lock().has(key)
  964. }
  965. /// Remove expired entries from the database.
  966. ///
  967. /// This function should be called from a background thread every few seconds. Calling it more
  968. /// often is a waste of resources.
  969. ///
  970. /// Expired keys are automatically hidden by the database, this process is just claiming back
  971. /// the memory from those expired keys.
  972. pub fn purge(&self) -> u64 {
  973. let mut expirations = self.expirations.lock();
  974. let mut removed = 0;
  975. trace!("Watching {} keys for expirations", expirations.len());
  976. let keys = expirations.get_expired_keys(None);
  977. drop(expirations);
  978. keys.iter()
  979. .map(|key| {
  980. let mut slot = self.slots[self.get_slot(key)].write();
  981. if slot.remove(key).is_some() {
  982. trace!("Removed key {:?} due timeout", key);
  983. removed += 1;
  984. }
  985. })
  986. .for_each(drop);
  987. removed
  988. }
  989. }
  990. impl scan::Scan for Db {
  991. fn scan(
  992. &self,
  993. cursor: Cursor,
  994. pattern: Option<Bytes>,
  995. count: Option<usize>,
  996. typ: Option<Typ>,
  997. ) -> Result<scan::Result, Error> {
  998. let mut keys = vec![];
  999. let mut slot_id = cursor.bucket as usize;
  1000. let mut last_pos = cursor.last_position as usize;
  1001. let pattern = pattern
  1002. .map(|pattern| {
  1003. let pattern = String::from_utf8_lossy(&pattern);
  1004. Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
  1005. })
  1006. .transpose()?;
  1007. loop {
  1008. let slot = if let Some(value) = self.slots.get(slot_id) {
  1009. value.read()
  1010. } else {
  1011. // We iterated through all the entries, time to signal that to
  1012. // the client but returning a "0" cursor.
  1013. slot_id = 0;
  1014. last_pos = 0;
  1015. break;
  1016. };
  1017. for (key, value) in slot.iter().skip(last_pos) {
  1018. if !value.is_valid() {
  1019. // Entry still exists in memory but it is not longer valid
  1020. // and will soon be gargabe collected.
  1021. last_pos += 1;
  1022. continue;
  1023. }
  1024. if let Some(pattern) = &pattern {
  1025. let str_key = String::from_utf8_lossy(key);
  1026. if !pattern.matches(&str_key) {
  1027. last_pos += 1;
  1028. continue;
  1029. }
  1030. }
  1031. if let Some(typ) = &typ {
  1032. if !typ.is_value_type(&value.get()) {
  1033. last_pos += 1;
  1034. continue;
  1035. }
  1036. }
  1037. keys.push(Value::new(key));
  1038. last_pos += 1;
  1039. if keys.len() == count.unwrap_or(10) {
  1040. break;
  1041. }
  1042. }
  1043. if keys.len() == count.unwrap_or(10) {
  1044. break;
  1045. }
  1046. last_pos = 0;
  1047. slot_id += 1;
  1048. }
  1049. Ok(scan::Result {
  1050. cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
  1051. result: keys,
  1052. })
  1053. }
  1054. }
  1055. #[cfg(test)]
  1056. mod test {
  1057. use super::*;
  1058. use crate::{bytes, db::scan::Scan, value::float::Float};
  1059. use std::str::FromStr;
  1060. #[test]
  1061. fn incr_wrong_type() {
  1062. let db = Db::new(100);
  1063. db.set(bytes!(b"num"), Value::Blob(bytes!("some string")), None);
  1064. let r = db.incr(&bytes!("num"), 1);
  1065. assert!(r.is_err());
  1066. assert_eq!(Error::NotANumber, r.expect_err("should fail"));
  1067. assert_eq!(
  1068. Value::Blob(bytes!("some string")),
  1069. db.get(&bytes!("num")).into_inner()
  1070. );
  1071. }
  1072. #[test]
  1073. fn incr_blob_float() {
  1074. let db = Db::new(100);
  1075. db.set(bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
  1076. assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1077. assert_eq!(
  1078. Value::Blob(bytes!("2.2")),
  1079. db.get(&bytes!("num")).into_inner()
  1080. );
  1081. }
  1082. #[test]
  1083. fn incr_blob_int_float() {
  1084. let db = Db::new(100);
  1085. db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
  1086. assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1087. assert_eq!(
  1088. Value::Blob(bytes!("2.1")),
  1089. db.get(&bytes!("num")).into_inner()
  1090. );
  1091. }
  1092. #[test]
  1093. fn incr_blob_int() {
  1094. let db = Db::new(100);
  1095. db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
  1096. assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
  1097. assert_eq!(
  1098. Value::Blob(bytes!("2")),
  1099. db.get(&bytes!("num")).into_inner()
  1100. );
  1101. }
  1102. #[test]
  1103. fn incr_blob_int_set() {
  1104. let db = Db::new(100);
  1105. assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
  1106. assert_eq!(
  1107. Value::Blob(bytes!("1")),
  1108. db.get(&bytes!("num")).into_inner()
  1109. );
  1110. }
  1111. #[test]
  1112. fn incr_blob_float_set() {
  1113. let db = Db::new(100);
  1114. assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
  1115. assert_eq!(
  1116. Value::Blob(bytes!("1.1")),
  1117. db.get(&bytes!("num")).into_inner()
  1118. );
  1119. }
  1120. #[test]
  1121. fn del() {
  1122. let db = Db::new(100);
  1123. db.set(bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  1124. db.set(bytes!(b"valid"), Value::Ok, None);
  1125. db.set(bytes!(b"expiring"), Value::Ok, Some(Duration::from_secs(5)));
  1126. assert_eq!(
  1127. Value::Integer(2),
  1128. db.del(&[
  1129. bytes!(b"expired"),
  1130. bytes!(b"valid"),
  1131. bytes!(b"expiring"),
  1132. bytes!(b"not_existing_key")
  1133. ])
  1134. );
  1135. }
  1136. #[test]
  1137. fn ttl() {
  1138. let db = Db::new(100);
  1139. db.set(bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
  1140. db.set(bytes!(b"valid"), Value::Ok, None);
  1141. db.set(bytes!(b"expiring"), Value::Ok, Some(Duration::from_secs(5)));
  1142. assert_eq!(None, db.ttl(&bytes!(b"expired")));
  1143. assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
  1144. assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
  1145. assert!(matches!(db.ttl(&bytes!(b"expiring")), Some(Some(_))));
  1146. }
  1147. #[test]
  1148. fn persist_bug() {
  1149. let db = Db::new(100);
  1150. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
  1151. assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
  1152. assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
  1153. db.persist(&bytes!(b"one"));
  1154. assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
  1155. }
  1156. #[test]
  1157. fn purge_keys() {
  1158. let db = Db::new(100);
  1159. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1160. // Expired keys should not be returned, even if they are not yet
  1161. // removed by the purge process.
  1162. assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
  1163. // Purge twice
  1164. assert_eq!(1, db.purge());
  1165. assert_eq!(0, db.purge());
  1166. assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
  1167. }
  1168. #[test]
  1169. fn replace_purge_keys() {
  1170. let db = Db::new(100);
  1171. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1172. // Expired keys should not be returned, even if they are not yet
  1173. // removed by the purge process.
  1174. assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
  1175. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
  1176. assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
  1177. // Purge should return 0 as the expired key has been removed already
  1178. assert_eq!(0, db.purge());
  1179. }
  1180. #[test]
  1181. fn scan_skip_expired() {
  1182. let db = Db::new(100);
  1183. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1184. db.set(bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1185. for i in 0u64..20u64 {
  1186. let key: Bytes = i.to_string().into();
  1187. db.set(key, Value::Ok, None);
  1188. }
  1189. let result = db
  1190. .scan(Cursor::from_str("0").unwrap(), None, None, None)
  1191. .unwrap();
  1192. // first 10 records
  1193. assert_eq!(10, result.result.len());
  1194. // make sure the cursor is valid
  1195. assert_ne!("0", result.cursor.to_string());
  1196. let result = db.scan(result.cursor, None, None, None).unwrap();
  1197. // 10 more records
  1198. assert_eq!(10, result.result.len());
  1199. // make sure the cursor is valid
  1200. assert_ne!("0", result.cursor.to_string());
  1201. let result = db.scan(result.cursor, None, None, None).unwrap();
  1202. // No more results!
  1203. assert_eq!(0, result.result.len());
  1204. assert_eq!("0", result.cursor.to_string());
  1205. }
  1206. #[test]
  1207. fn scan_limit() {
  1208. let db = Db::new(10);
  1209. db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
  1210. db.set(bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
  1211. for i in 0u64..2000u64 {
  1212. let key: Bytes = i.to_string().into();
  1213. db.set(key, Value::Ok, None);
  1214. }
  1215. let result = db
  1216. .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
  1217. .unwrap();
  1218. assert_eq!(2, result.result.len());
  1219. assert_ne!("0", result.cursor.to_string());
  1220. }
  1221. #[test]
  1222. fn scan_filter() {
  1223. let db = Db::new(100);
  1224. db.set(bytes!(b"fone"), Value::Ok, None);
  1225. db.set(bytes!(b"ftwo"), Value::Ok, None);
  1226. for i in 0u64..20u64 {
  1227. let key: Bytes = i.to_string().into();
  1228. db.set(key, Value::Ok, None);
  1229. }
  1230. let result = db
  1231. .scan(
  1232. Cursor::from_str("0").unwrap(),
  1233. Some(bytes!(b"f*")),
  1234. None,
  1235. None,
  1236. )
  1237. .unwrap();
  1238. assert_eq!(2, result.result.len());
  1239. assert_eq!("0", result.cursor.to_string());
  1240. }
  1241. #[tokio::test]
  1242. async fn lock_keys() {
  1243. let db1 = Arc::new(Db::new(100));
  1244. let db2 = db1.clone().new_db_instance(2);
  1245. let db3 = db1.clone().new_db_instance(3);
  1246. let shared = Arc::new(RwLock::new(1));
  1247. let shared1 = shared.clone();
  1248. let shared2 = shared.clone();
  1249. let shared3 = shared.clone();
  1250. let _ = tokio::join!(
  1251. tokio::spawn(async move {
  1252. db1.lock_keys(&["test".into()]);
  1253. let mut x = shared1.write();
  1254. *x = 2;
  1255. thread::sleep(Duration::from_secs(1));
  1256. db1.unlock_keys(&["test".into()]);
  1257. }),
  1258. tokio::spawn(async move {
  1259. db2.lock_keys(&["test".into(), "bar".into()]);
  1260. let mut x = shared2.write();
  1261. if *x == 2 {
  1262. *x = 5;
  1263. }
  1264. thread::sleep(Duration::from_secs(2));
  1265. db2.unlock_keys(&["test".into(), "bar".into()]);
  1266. }),
  1267. tokio::spawn(async move {
  1268. thread::sleep(Duration::from_millis(500));
  1269. assert_eq!(4, db3.get_slot(&"test".into()));
  1270. let mut x = shared3.write();
  1271. if *x == 5 {
  1272. *x = 6;
  1273. }
  1274. }),
  1275. );
  1276. assert_eq!(6, *shared.read());
  1277. }
  1278. }