123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236 |
- //! # in-memory database
- //!
- //! This database module is the core of the miniredis project. All other modules around this
- //! database module.
- use self::utils::{far_future, ExpirationOpts, Override};
- use crate::{
- error::Error,
- value::{
- bytes_to_number,
- cursor::Cursor,
- typ::{Typ, ValueTyp},
- VDebug, Value,
- },
- };
- use bytes::{BufMut, Bytes, BytesMut};
- use core::num;
- use entry::{new_version, Entry};
- use expiration::ExpirationDb;
- use glob::Pattern;
- use log::trace;
- use num_traits::CheckedAdd;
- use parking_lot::{Mutex, RwLock};
- use seahash::hash;
- use std::{
- collections::HashMap,
- convert::{TryFrom, TryInto},
- ops::{AddAssign, Deref},
- str::FromStr,
- sync::Arc,
- thread,
- };
- use tokio::time::{Duration, Instant};
- mod entry;
- mod expiration;
- pub mod pool;
- pub mod scan;
- pub(crate) mod utils;
- /// Database structure
- ///
- /// Each connection has their own clone of the database and the conn_id is stored in each instance.
- /// The slots property is shared for all connections.
- ///
- /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
- /// HashMaps. Each key is pre-sharded and a bucket is selected. By doing this pre-step instead of
- /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
- /// making this database implementation thread-friendly. The number of number_of_slots available cannot be
- /// changed at runtime.
- ///
- /// The database is also aware of other connections locking other keys exclusively (for
- /// transactions).
- ///
- /// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
- /// versioning (in practice the nanosecond of last modification).
- #[derive(Debug)]
- pub struct Db {
- /// A vector of hashmaps.
- ///
- /// Instead of having a single HashMap, and having all threads fighting for
- /// blocking the single HashMap, we have a vector of N HashMap
- /// (configurable), which in theory allow to have faster reads and writes.
- ///
- /// Because all operations are always key specific, the key is used to hash
- /// and select to which HashMap the data might be stored.
- slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
- /// Data structure to store all expiring keys
- expirations: Arc<Mutex<ExpirationDb>>,
- /// Number of HashMaps that are available.
- number_of_slots: usize,
- /// Databases unique ID. This is an internal identifier to avoid deadlocks
- /// when copying and moving data between databases.
- pub db_id: u128,
- /// Current connection ID
- ///
- /// A Database is attached to a conn_id. The slots and expiration data
- /// structures are shared between all connections, regardless of conn_id.
- ///
- /// This particular database instace is attached to a conn_id, which is used
- /// to lock keys exclusively for transactions and other atomic operations.
- conn_id: u128,
- /// HashMap of all blocked keys by other connections. If a key appears in
- /// here and it is not being hold by the current connection, current
- /// connection must wait.
- tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
- }
- impl Db {
- /// Creates a new database instance
- pub fn new(number_of_slots: usize) -> Self {
- let slots = (0..number_of_slots)
- .map(|_| RwLock::new(HashMap::new()))
- .collect();
- Self {
- slots: Arc::new(slots),
- expirations: Arc::new(Mutex::new(ExpirationDb::new())),
- conn_id: 0,
- db_id: new_version(),
- tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
- number_of_slots,
- }
- }
- /// Creates a new Database instance bound to a connection.
- ///
- /// This is particular useful when locking keys exclusively.
- ///
- /// All the internal data are shjared through an Arc.
- pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
- Arc::new(Self {
- slots: self.slots.clone(),
- tx_key_locks: self.tx_key_locks.clone(),
- expirations: self.expirations.clone(),
- conn_id,
- db_id: self.db_id,
- number_of_slots: self.number_of_slots,
- })
- }
- #[inline]
- /// Returns a slot where a key may be hosted.
- ///
- /// In order to avoid too much locks, instead of having a single hash a
- /// database instance is a set of hashes. Each key is pre-shared with a
- /// quick hashing algorithm to select a 'slot' or HashMap where it may be
- /// hosted.
- fn get_slot(&self, key: &Bytes) -> usize {
- let id = (hash(key) as usize) % self.number_of_slots;
- trace!("selected slot {} for key {:?}", id, key);
- let waiting = Duration::from_nanos(100);
- while let Some(blocker) = self.tx_key_locks.read().get(key) {
- // Loop while the key we are trying to access is being blocked by a
- // connection in a transaction
- if *blocker == self.conn_id {
- // the key is being blocked by ourself, it is safe to break the
- // waiting loop
- break;
- }
- thread::sleep(waiting);
- }
- id
- }
- /// Locks keys exclusively
- ///
- /// The locked keys are only accesible (read or write) by the connection
- /// that locked them, any other connection must wait until the locking
- /// connection releases them.
- ///
- /// This is used to simulate redis transactions. Transaction in Redis are
- /// atomic but pausing a multi threaded Redis just to keep the same promises
- /// was a bit extreme, that's the reason why a transaction will lock
- /// exclusively all keys involved.
- pub fn lock_keys(&self, keys: &[Bytes]) {
- let waiting = Duration::from_nanos(100);
- loop {
- let mut lock = self.tx_key_locks.write();
- let mut i = 0;
- for key in keys.iter() {
- if let Some(blocker) = lock.get(key) {
- if *blocker == self.conn_id {
- // It is blocked by us already.
- continue;
- }
- // It is blocked by another tx, we need to break
- // and retry to gain the lock over this key
- break;
- }
- lock.insert(key.clone(), self.conn_id);
- i += 1;
- }
- if i == keys.len() {
- // All the involved keys are successfully being blocked
- // exclusively.
- break;
- }
- // We need to sleep a bit and retry.
- drop(lock);
- thread::sleep(waiting);
- }
- }
- /// Releases the lock on keys
- pub fn unlock_keys(&self, keys: &[Bytes]) {
- let mut lock = self.tx_key_locks.write();
- for key in keys.iter() {
- lock.remove(key);
- }
- }
- /// Return debug info for a key
- pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
- let slot = self.slots[self.get_slot(key)].read();
- Ok(slot
- .get(key)
- .filter(|x| x.is_valid())
- .ok_or(Error::NotFound)?
- .value
- .debug())
- }
- /// Return the digest for each key. This used for testing only
- pub fn digest(&self, keys: &[Bytes]) -> Result<Vec<Value>, Error> {
- Ok(keys
- .iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- Value::Blob(
- slot.get(key)
- .filter(|v| v.is_valid())
- .map(|v| hex::encode(&v.value.digest()))
- .unwrap_or("00000".into())
- .as_str()
- .into(),
- )
- })
- .collect::<Vec<Value>>())
- }
- /// Flushes the entire database
- pub fn flushdb(&self) -> Result<Value, Error> {
- self.expirations.lock().flush();
- self.slots
- .iter()
- .map(|s| {
- let mut s = s.write();
- s.clear();
- })
- .for_each(drop);
- Ok(Value::Ok)
- }
- /// Returns the number of elements in the database
- pub fn len(&self) -> Result<usize, Error> {
- self.purge();
- Ok(self.slots.iter().map(|s| s.read().len()).sum())
- }
- /// Round numbers to store efficiently, specially float numbers. For instance `1.00` will be converted to `1`.
- fn round_numbers<T>(number: T) -> BytesMut
- where
- T: ToString,
- {
- let number_to_str = number.to_string();
- if number_to_str.find('.').is_none() {
- return number_to_str.as_bytes().into();
- }
- let number_to_str = number_to_str
- .trim_end_matches(|c| c == '0' || c == '.')
- .to_string();
- if number_to_str.is_empty() {
- "0"
- } else {
- number_to_str.as_str()
- }
- .into()
- }
- // Converts a given number to a correct Value, it should be used with Self::round_numbers()
- fn number_to_value(number: &[u8]) -> Result<Value, Error> {
- if number.iter().find(|x| **x == b'.').is_some() {
- Ok(Value::new(number))
- } else {
- Ok(Value::Integer(bytes_to_number(number)?))
- }
- }
- /// Increment a sub-key in a hash
- ///
- /// If the stored value cannot be converted into a number an error will be thrown
- pub fn hincrby<T>(
- &self,
- key: &Bytes,
- sub_key: &Bytes,
- incr_by: &Bytes,
- typ: &str,
- ) -> Result<Value, Error>
- where
- T: ToString
- + FromStr
- + CheckedAdd
- + for<'a> TryFrom<&'a Value, Error = Error>
- + Into<Value>
- + Copy,
- {
- let mut slot = self.slots[self.get_slot(key)].write();
- let mut incr_by: T =
- bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
- match slot.get_mut(key).filter(|x| x.is_valid()).map(|x| x.get()) {
- Some(Value::Hash(h)) => {
- let mut h = h.write();
- if let Some(n) = h.get(sub_key) {
- incr_by = incr_by
- .checked_add(
- &bytes_to_number(n)
- .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
- )
- .ok_or(Error::Overflow)?;
- }
- let incr_by_bytes = Self::round_numbers(incr_by).freeze();
- h.insert(sub_key.clone(), incr_by_bytes.clone());
- Self::number_to_value(&incr_by_bytes)
- }
- None => {
- #[allow(clippy::mutable_key_type)]
- let mut h = HashMap::new();
- let incr_by_bytes = Self::round_numbers(incr_by).freeze();
- h.insert(sub_key.clone(), incr_by_bytes.clone());
- let _ = slot.insert(key.clone(), Entry::new(h.into(), None));
- Self::number_to_value(&incr_by_bytes)
- }
- _ => Err(Error::WrongType),
- }
- }
- /// Increments a key's value by a given number
- ///
- /// If the stored value cannot be converted into a number an error will be
- /// thrown.
- pub fn incr<T>(&self, key: &Bytes, incr_by: T) -> Result<T, Error>
- where
- T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
- {
- let mut slot = self.slots[self.get_slot(key)].write();
- match slot.get_mut(key).filter(|x| x.is_valid()) {
- Some(x) => {
- if !x.is_clonable() {
- return Err(Error::WrongType);
- }
- let value = x.get();
- let mut number: T = value.try_into()?;
- number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
- x.change_value(Value::Blob(Self::round_numbers(number)));
- Ok(number)
- }
- None => {
- slot.insert(
- key.clone(),
- Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
- );
- Ok(incr_by)
- }
- }
- }
- /// Removes any expiration associated with a given key
- pub fn persist(&self, key: &Bytes) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.get_mut(key)
- .filter(|x| x.is_valid())
- .map_or(0.into(), |x| {
- if x.has_ttl() {
- self.expirations.lock().remove(key);
- x.persist();
- 1.into()
- } else {
- 0.into()
- }
- })
- }
- /// Set time to live for a given key
- pub fn set_ttl(
- &self,
- key: &Bytes,
- expires_in: Duration,
- opts: ExpirationOpts,
- ) -> Result<Value, Error> {
- if (opts.NX && opts.XX) || (opts.NX && opts.GT) || (opts.NX && opts.LT) {
- return Err(Error::OptsNotCompatible("NX and XX, GT or LT".to_owned()));
- }
- if opts.GT && opts.LT {
- return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
- }
- let mut slot = self.slots[self.get_slot(key)].write();
- let expires_at = Instant::now()
- .checked_add(expires_in)
- .unwrap_or_else(far_future);
- Ok(slot
- .get_mut(key)
- .filter(|x| x.is_valid())
- .map_or(0.into(), |x| {
- let current_expire = x.get_ttl();
- if opts.NX && current_expire.is_some() {
- return 0.into();
- }
- if opts.XX && current_expire.is_none() {
- return 0.into();
- }
- if opts.GT {
- if let Some(current_expire) = current_expire {
- if expires_at <= current_expire {
- return 0.into();
- }
- } else {
- return 0.into();
- }
- }
- if opts.LT {
- if let Some(current_expire) = current_expire {
- if expires_at >= current_expire {
- return 0.into();
- }
- }
- }
- self.expirations.lock().add(key, expires_at);
- x.set_ttl(expires_at);
- 1.into()
- }))
- }
- /// Overwrites part of the string stored at key, starting at the specified
- /// offset, for the entire length of value. If the offset is larger than the
- /// current length of the string at key, the string is padded with zero-bytes to
- /// make offset fit. Non-existing keys are considered as empty strings, so this
- /// command will make sure it holds a string large enough to be able to set
- /// value at offset.
- pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
- let mut slot = self.slots[self.get_slot(key)].write();
- let value = slot.get_mut(key).map(|value| {
- if !value.is_valid() {
- self.expirations.lock().remove(key);
- value.persist();
- }
- value.get_mut()
- });
- if offset < 0 {
- return Err(Error::OutOfRange);
- }
- if offset >= 512 * 1024 * 1024 - 4 {
- return Err(Error::MaxAllowedSize);
- }
- let length = offset as usize + data.len();
- match value {
- Some(Value::Blob(bytes)) => {
- if bytes.capacity() < length {
- bytes.resize(length, 0);
- }
- let writer = &mut bytes[offset as usize..length];
- writer.copy_from_slice(data);
- Ok(bytes.len().into())
- }
- None => {
- if data.len() == 0 {
- return Ok(0.into());
- }
- let mut bytes = BytesMut::new();
- bytes.resize(length, 0);
- let writer = &mut bytes[offset as usize..];
- writer.copy_from_slice(data);
- slot.insert(key.clone(), Entry::new(Value::new(&bytes), None));
- Ok(bytes.len().into())
- }
- _ => Err(Error::WrongType),
- }
- }
- /// Copies a key
- pub fn copy(
- &self,
- source: &Bytes,
- target: &Bytes,
- replace: Override,
- target_db: Option<Arc<Db>>,
- ) -> Result<bool, Error> {
- let slot = self.slots[self.get_slot(source)].read();
- let value = if let Some(value) = slot.get(source).filter(|x| x.is_valid()) {
- value.clone()
- } else {
- return Ok(false);
- };
- drop(slot);
- if let Some(db) = target_db {
- if db.db_id == self.db_id && source == target {
- return Err(Error::SameEntry);
- }
- if replace == Override::No && db.exists(&[target.clone()]) > 0 {
- return Ok(false);
- }
- let _ = db.set_advanced(
- target,
- value.value.clone(),
- value.get_ttl().map(|v| v - Instant::now()),
- replace,
- false,
- false,
- );
- Ok(true)
- } else {
- if source == target {
- return Err(Error::SameEntry);
- }
- if replace == Override::No && self.exists(&[target.clone()]) > 0 {
- return Ok(false);
- }
- let mut slot = self.slots[self.get_slot(target)].write();
- slot.insert(target.clone(), value);
- Ok(true)
- }
- }
- /// Moves a given key between databases
- pub fn move_key(&self, source: &Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
- if self.db_id == target_db.db_id {
- return Err(Error::SameEntry);
- }
- let mut slot = self.slots[self.get_slot(source)].write();
- let (expires_in, value) = if let Some(value) = slot.get(source).filter(|v| v.is_valid()) {
- (
- value.get_ttl().map(|t| t - Instant::now()),
- value.value.clone(),
- )
- } else {
- return Ok(false);
- };
- if Value::Integer(1)
- == target_db.set_advanced(&source, value, expires_in, Override::No, false, false)
- {
- slot.remove(source);
- Ok(true)
- } else {
- Ok(false)
- }
- }
- /// Renames a key
- pub fn rename(
- &self,
- source: &Bytes,
- target: &Bytes,
- override_value: Override,
- ) -> Result<bool, Error> {
- let slot1 = self.get_slot(source);
- let slot2 = self.get_slot(target);
- if slot1 == slot2 {
- let mut slot = self.slots[slot1].write();
- if override_value == Override::No && slot.get(target).is_some() {
- return Ok(false);
- }
- if let Some(value) = slot.remove(source) {
- slot.insert(target.clone(), value);
- Ok(true)
- } else {
- Err(Error::NotFound)
- }
- } else {
- let mut slot1 = self.slots[slot1].write();
- let mut slot2 = self.slots[slot2].write();
- if override_value == Override::No && slot2.get(target).is_some() {
- return Ok(false);
- }
- if let Some(value) = slot1.remove(source) {
- slot2.insert(target.clone(), value);
- Ok(true)
- } else {
- Err(Error::NotFound)
- }
- }
- }
- /// Removes keys from the database
- pub fn del(&self, keys: &[Bytes]) -> Value {
- let mut expirations = self.expirations.lock();
- keys.iter()
- .filter_map(|key| {
- expirations.remove(key);
- self.slots[self.get_slot(key)].write().remove(key)
- })
- .filter(|key| key.is_valid())
- .count()
- .into()
- }
- /// Returns all keys that matches a given pattern. This is a very expensive command.
- pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
- let pattern = String::from_utf8_lossy(pattern);
- let pattern =
- Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
- Ok(self
- .slots
- .iter()
- .map(|slot| {
- slot.read()
- .keys()
- .filter(|key| {
- let str_key = String::from_utf8_lossy(key);
- pattern.matches(&str_key)
- })
- .map(|key| Value::new(key))
- .collect::<Vec<Value>>()
- })
- .flatten()
- .collect())
- }
- /// Check if keys exists in the database
- pub fn exists(&self, keys: &[Bytes]) -> usize {
- let mut matches = 0;
- keys.iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- if let Some(key) = slot.get(key) {
- matches += if key.is_valid() { 1 } else { 0 };
- }
- })
- .for_each(drop);
- matches
- }
- /// get_map_or
- ///
- /// Instead of returning an entry of the database, to avoid clonning, this function will
- /// execute a callback function with the entry as a parameter. If no record is found another
- /// callback function is going to be executed, dropping the lock before doing so.
- ///
- /// If an entry is found, the lock is not dropped before doing the callback. Avoid inserting
- /// new entries. In this case the value is passed by reference, so it is possible to modify the
- /// entry itself.
- ///
- /// This function is useful to read non-scalar values from the database. Non-scalar values are
- /// forbidden to clone, attempting cloning will endup in an error (Error::WrongType)
- pub fn get_map_or<F1, F2>(&self, key: &Bytes, found: F1, not_found: F2) -> Result<Value, Error>
- where
- F1: FnOnce(&Value) -> Result<Value, Error>,
- F2: FnOnce() -> Result<Value, Error>,
- {
- let slot = self.slots[self.get_slot(key)].read();
- let entry = slot.get(key).filter(|x| x.is_valid()).map(|e| e.get());
- if let Some(entry) = entry {
- found(entry)
- } else {
- // drop lock
- drop(slot);
- not_found()
- }
- }
- /// Updates the entry version of a given key
- pub fn bump_version(&self, key: &Bytes) -> bool {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.get_mut(key)
- .filter(|x| x.is_valid())
- .map(|entry| {
- entry.bump_version();
- })
- .is_some()
- }
- /// Returns the version of a given key
- pub fn get_version(&self, key: &Bytes) -> u128 {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map(|entry| entry.version())
- .unwrap_or_default()
- }
- /// Returns the name of the value type
- pub fn get_data_type(&self, key: &Bytes) -> String {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map_or("none".to_owned(), |x| {
- Typ::get_type(x.get()).to_string().to_lowercase()
- })
- }
- /// Get a copy of an entry
- pub fn get(&self, key: &Bytes) -> Value {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Get a copy of an entry and modifies the expiration of the key
- pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.get_mut(key)
- .filter(|x| x.is_valid())
- .map(|value| {
- if make_persistent {
- self.expirations.lock().remove(key);
- value.persist();
- } else if let Some(expires_in) = expires_in {
- let expires_at = Instant::now()
- .checked_add(expires_in)
- .unwrap_or_else(far_future);
- self.expirations.lock().add(key, expires_at);
- value.set_ttl(expires_at);
- }
- value
- })
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Get multiple copies of entries
- pub fn get_multi(&self, keys: &[Bytes]) -> Value {
- keys.iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid() && x.is_clonable())
- .map_or(Value::Null, |x| x.clone_value())
- })
- .collect::<Vec<Value>>()
- .into()
- }
- /// Get a key or set a new value for the given key.
- pub fn getset(&self, key: &Bytes, value: Value) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- self.expirations.lock().remove(key);
- slot.insert(key.clone(), Entry::new(value, None))
- .filter(|x| x.is_valid())
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Takes an entry from the database.
- pub fn getdel(&self, key: &Bytes) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.remove(key).map_or(Value::Null, |x| {
- self.expirations.lock().remove(key);
- x.clone_value()
- })
- }
- /// Set a key, value with an optional expiration time
- pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
- let mut slot = self.slots[self.get_slot(key)].write();
- let mut entry = slot.get_mut(key).filter(|x| x.is_valid());
- if let Some(entry) = slot.get_mut(key).filter(|x| x.is_valid()) {
- match entry.get_mut() {
- Value::Blob(value) => {
- value.put(value_to_append.as_ref());
- Ok(value.len().into())
- }
- _ => Err(Error::WrongType),
- }
- } else {
- slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
- Ok(value_to_append.len().into())
- }
- }
- /// Set multiple key/value pairs. Are involved keys are locked exclusively
- /// like a transaction.
- ///
- /// If override_all is set to false, all entries must be new entries or the
- /// entire operation fails, in this case 1 or is returned. Otherwise `Ok` is
- /// returned.
- pub fn multi_set(&self, key_values: &[Bytes], override_all: bool) -> Result<Value, Error> {
- if key_values.len() % 2 == 1 {
- return Err(Error::Syntax);
- }
- let keys = key_values
- .iter()
- .step_by(2)
- .cloned()
- .collect::<Vec<Bytes>>();
- self.lock_keys(&keys);
- if !override_all {
- for key in keys.iter() {
- let slot = self.slots[self.get_slot(key)].read();
- if slot.get(key).is_some() {
- self.unlock_keys(&keys);
- return Ok(0.into());
- }
- }
- }
- for (i, _) in key_values.iter().enumerate().step_by(2) {
- let mut slot = self.slots[self.get_slot(&key_values[i])].write();
- slot.insert(
- key_values[i].clone(),
- Entry::new(Value::new(&key_values[i + 1]), None),
- );
- }
- self.unlock_keys(&keys);
- if override_all {
- Ok(Value::Ok)
- } else {
- Ok(1.into())
- }
- }
- /// Set a key, value with an optional expiration time
- pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
- self.set_advanced(key, value, expires_in, Default::default(), false, false)
- }
- /// Set a value in the database with various settings
- pub fn set_advanced(
- &self,
- key: &Bytes,
- value: Value,
- expires_in: Option<Duration>,
- override_value: Override,
- keep_ttl: bool,
- return_previous: bool,
- ) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- let expires_at = expires_in.map(|duration| {
- Instant::now()
- .checked_add(duration)
- .unwrap_or_else(far_future)
- });
- let previous = slot.get(key).filter(|x| x.is_valid());
- let expires_at = if keep_ttl {
- if let Some(previous) = previous {
- previous.get_ttl()
- } else {
- expires_at
- }
- } else {
- expires_at
- };
- let to_return = if return_previous {
- let previous = previous.map_or(Value::Null, |v| v.clone_value());
- if previous.is_err() {
- // Error while trying to clone the previous value to return, we
- // must halt and return immediately.
- return previous;
- }
- Some(previous)
- } else {
- None
- };
- match override_value {
- Override::No => {
- if previous.is_some() {
- return if let Some(to_return) = to_return {
- to_return
- } else {
- 0.into()
- };
- }
- }
- Override::Only => {
- if previous.is_none() {
- return if let Some(to_return) = to_return {
- to_return
- } else {
- 0.into()
- };
- }
- }
- _ => {}
- };
- if let Some(expires_at) = expires_at {
- self.expirations.lock().add(key, expires_at);
- } else {
- /// Make sure to remove the new key (or replaced) from the
- /// expiration table (from any possible past value).
- self.expirations.lock().remove(key);
- }
- slot.insert(key.clone(), Entry::new(value, expires_at));
- if let Some(to_return) = to_return {
- to_return
- } else if override_value == Override::Yes {
- Value::Ok
- } else {
- 1.into()
- }
- }
- /// Returns the TTL of a given key
- pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
- }
- /// Check whether a given key is in the list of keys to be purged or not.
- /// This function is mainly used for unit testing
- pub fn is_key_in_expiration_list(&self, key: &Bytes) -> bool {
- self.expirations.lock().has(key)
- }
- /// Remove expired entries from the database.
- ///
- /// This function should be called from a background thread every few seconds. Calling it more
- /// often is a waste of resources.
- ///
- /// Expired keys are automatically hidden by the database, this process is just claiming back
- /// the memory from those expired keys.
- pub fn purge(&self) -> u64 {
- let mut expirations = self.expirations.lock();
- let mut removed = 0;
- trace!("Watching {} keys for expirations", expirations.len());
- let keys = expirations.get_expired_keys(None);
- drop(expirations);
- keys.iter()
- .map(|key| {
- let mut slot = self.slots[self.get_slot(key)].write();
- if slot.remove(key).is_some() {
- trace!("Removed key {:?} due timeout", key);
- removed += 1;
- }
- })
- .for_each(drop);
- removed
- }
- }
- impl scan::Scan for Db {
- fn scan(
- &self,
- cursor: Cursor,
- pattern: Option<&Bytes>,
- count: Option<usize>,
- typ: Option<Typ>,
- ) -> Result<scan::Result, Error> {
- let mut keys = vec![];
- let mut slot_id = cursor.bucket as usize;
- let mut last_pos = cursor.last_position as usize;
- let pattern = pattern
- .map(|pattern| {
- let pattern = String::from_utf8_lossy(pattern);
- Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
- })
- .transpose()?;
- loop {
- let slot = if let Some(value) = self.slots.get(slot_id) {
- value.read()
- } else {
- // We iterated through all the entries, time to signal that to
- // the client but returning a "0" cursor.
- slot_id = 0;
- last_pos = 0;
- break;
- };
- for (key, value) in slot.iter().skip(last_pos) {
- if !value.is_valid() {
- // Entry still exists in memory but it is not longer valid
- // and will soon be gargabe collected.
- last_pos += 1;
- continue;
- }
- if let Some(pattern) = &pattern {
- let str_key = String::from_utf8_lossy(key);
- if !pattern.matches(&str_key) {
- last_pos += 1;
- continue;
- }
- }
- if let Some(typ) = &typ {
- if !typ.is_value_type(value.get()) {
- last_pos += 1;
- continue;
- }
- }
- keys.push(Value::new(key));
- last_pos += 1;
- if keys.len() == count.unwrap_or(10) {
- break;
- }
- }
- if keys.len() == count.unwrap_or(10) {
- break;
- }
- last_pos = 0;
- slot_id += 1;
- }
- Ok(scan::Result {
- cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
- result: keys,
- })
- }
- }
- #[cfg(test)]
- mod test {
- use super::*;
- use crate::{bytes, db::scan::Scan, value::float::Float};
- use std::str::FromStr;
- #[test]
- fn incr_wrong_type() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("some string")), None);
- let r = db.incr(&bytes!("num"), 1);
- assert!(r.is_err());
- assert_eq!(Error::NotANumber, r.expect_err("should fail"));
- assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_float() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
- assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_int_float() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
- assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_int() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
- assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
- assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_int_set() {
- let db = Db::new(100);
- assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
- assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_float_set() {
- let db = Db::new(100);
- assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
- }
- #[test]
- fn del() {
- let db = Db::new(100);
- db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"valid"), Value::Ok, None);
- db.set(
- &bytes!(b"expiring"),
- Value::Ok,
- Some(Duration::from_secs(5)),
- );
- assert_eq!(
- Value::Integer(2),
- db.del(&[
- bytes!(b"expired"),
- bytes!(b"valid"),
- bytes!(b"expiring"),
- bytes!(b"not_existing_key")
- ])
- );
- }
- #[test]
- fn ttl() {
- let db = Db::new(100);
- db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"valid"), Value::Ok, None);
- db.set(
- &bytes!(b"expiring"),
- Value::Ok,
- Some(Duration::from_secs(5)),
- );
- assert_eq!(None, db.ttl(&bytes!(b"expired")));
- assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
- assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
- assert!(match db.ttl(&bytes!(b"expiring")) {
- Some(Some(_)) => true,
- _ => false,
- });
- }
- #[test]
- fn persist_bug() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
- assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
- assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
- db.persist(&bytes!(b"one"));
- assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
- }
- #[test]
- fn purge_keys() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- // Expired keys should not be returned, even if they are not yet
- // removed by the purge process.
- assert_eq!(Value::Null, db.get(&bytes!(b"one")));
- // Purge twice
- assert_eq!(1, db.purge());
- assert_eq!(0, db.purge());
- assert_eq!(Value::Null, db.get(&bytes!(b"one")));
- }
- #[test]
- fn replace_purge_keys() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- // Expired keys should not be returned, even if they are not yet
- // removed by the purge process.
- assert_eq!(Value::Null, db.get(&bytes!(b"one")));
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
- assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
- // Purge should return 0 as the expired key has been removed already
- assert_eq!(0, db.purge());
- }
- #[test]
- fn scan_skip_expired() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
- for i in 0u64..20u64 {
- let key: Bytes = i.to_string().into();
- db.set(&key, Value::Ok, None);
- }
- let result = db
- .scan(Cursor::from_str("0").unwrap(), None, None, None)
- .unwrap();
- // first 10 records
- assert_eq!(10, result.result.len());
- // make sure the cursor is valid
- assert_ne!("0", result.cursor.to_string());
- let result = db.scan(result.cursor, None, None, None).unwrap();
- // 10 more records
- assert_eq!(10, result.result.len());
- // make sure the cursor is valid
- assert_ne!("0", result.cursor.to_string());
- let result = db.scan(result.cursor, None, None, None).unwrap();
- // No more results!
- assert_eq!(0, result.result.len());
- assert_eq!("0", result.cursor.to_string());
- }
- #[test]
- fn scan_limit() {
- let db = Db::new(10);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
- for i in 0u64..2000u64 {
- let key: Bytes = i.to_string().into();
- db.set(&key, Value::Ok, None);
- }
- let result = db
- .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
- .unwrap();
- assert_eq!(2, result.result.len());
- assert_ne!("0", result.cursor.to_string());
- }
- #[test]
- fn scan_filter() {
- let db = Db::new(100);
- db.set(&bytes!(b"fone"), Value::Ok, None);
- db.set(&bytes!(b"ftwo"), Value::Ok, None);
- for i in 0u64..20u64 {
- let key: Bytes = i.to_string().into();
- db.set(&key, Value::Ok, None);
- }
- let result = db
- .scan(
- Cursor::from_str("0").unwrap(),
- Some(&bytes!(b"f*")),
- None,
- None,
- )
- .unwrap();
- assert_eq!(2, result.result.len());
- assert_eq!("0", result.cursor.to_string());
- }
- }
|