1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312 |
- //! # in-memory database
- //!
- //! This database module is the core of the miniredis project. All other modules around this
- //! database module.
- use self::utils::{far_future, ExpirationOpts, Override};
- use crate::{
- error::Error,
- value::{
- bytes_to_number,
- cursor::Cursor,
- typ::{Typ, ValueTyp},
- VDebug, Value,
- },
- };
- use bytes::{BufMut, Bytes, BytesMut};
- use core::num;
- use entry::{new_version, Entry};
- use expiration::ExpirationDb;
- use futures::Future;
- use glob::Pattern;
- use log::trace;
- use num_traits::CheckedAdd;
- use parking_lot::{Mutex, RwLock};
- use rand::{prelude::SliceRandom, Rng};
- use seahash::hash;
- use std::{
- collections::HashMap,
- convert::{TryFrom, TryInto},
- ops::{AddAssign, Deref},
- str::FromStr,
- sync::Arc,
- thread,
- };
- use tokio::{
- sync::broadcast::{self, Receiver, Sender},
- time::{Duration, Instant},
- };
- mod entry;
- mod expiration;
- pub mod pool;
- pub mod scan;
- pub(crate) mod utils;
- /// Database structure
- ///
- /// Each connection has their own clone of the database and the conn_id is stored in each instance.
- /// The slots property is shared for all connections.
- ///
- /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
- /// HashMaps. Each key is pre-sharded and a bucket is selected. By doing this pre-step instead of
- /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
- /// making this database implementation thread-friendly. The number of number_of_slots available cannot be
- /// changed at runtime.
- ///
- /// The database is also aware of other connections locking other keys exclusively (for
- /// transactions).
- ///
- /// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
- /// versioning (in practice the nanosecond of last modification).
- #[derive(Debug)]
- pub struct Db {
- /// A vector of hashmaps.
- ///
- /// Instead of having a single HashMap, and having all threads fighting for
- /// blocking the single HashMap, we have a vector of N HashMap
- /// (configurable), which in theory allow to have faster reads and writes.
- ///
- /// Because all operations are always key specific, the key is used to hash
- /// and select to which HashMap the data might be stored.
- slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
- /// Data structure to store all expiring keys
- expirations: Arc<Mutex<ExpirationDb>>,
- /// Key changes subscriptions hash. This hash contains all the senders to
- /// key subscriptions. If a key does not exists here it means that no-one
- /// wants to be notified of the current key changes.
- change_subscriptions: Arc<RwLock<HashMap<Bytes, Sender<()>>>>,
- /// Number of HashMaps that are available.
- number_of_slots: usize,
- /// Databases unique ID. This is an internal identifier to avoid deadlocks
- /// when copying and moving data between databases.
- pub db_id: u128,
- /// Current connection ID
- ///
- /// A Database is attached to a conn_id. The slots and expiration data
- /// structures are shared between all connections, regardless of conn_id.
- ///
- /// This particular database instance is attached to a conn_id, which is used
- /// to lock keys exclusively for transactions and other atomic operations.
- conn_id: u128,
- /// HashMap of all blocked keys by other connections. If a key appears in
- /// here and it is not being hold by the current connection, current
- /// connection must wait.
- tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
- }
- impl Db {
- /// Creates a new database instance
- pub fn new(number_of_slots: usize) -> Self {
- let slots = (0..number_of_slots)
- .map(|_| RwLock::new(HashMap::new()))
- .collect();
- Self {
- slots: Arc::new(slots),
- expirations: Arc::new(Mutex::new(ExpirationDb::new())),
- change_subscriptions: Arc::new(RwLock::new(HashMap::new())),
- conn_id: 0,
- db_id: new_version(),
- tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
- number_of_slots,
- }
- }
- /// Creates a new Database instance bound to a connection.
- ///
- /// This is particular useful when locking keys exclusively.
- ///
- /// All the internal data are shjared through an Arc.
- pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
- Arc::new(Self {
- slots: self.slots.clone(),
- tx_key_locks: self.tx_key_locks.clone(),
- expirations: self.expirations.clone(),
- change_subscriptions: self.change_subscriptions.clone(),
- conn_id,
- db_id: self.db_id,
- number_of_slots: self.number_of_slots,
- })
- }
- #[inline]
- /// Returns a slot where a key may be hosted.
- ///
- /// In order to avoid too much locks, instead of having a single hash a
- /// database instance is a set of hashes. Each key is pre-shared with a
- /// quick hashing algorithm to select a 'slot' or HashMap where it may be
- /// hosted.
- fn get_slot(&self, key: &Bytes) -> usize {
- let id = (hash(key) as usize) % self.number_of_slots;
- trace!("selected slot {} for key {:?}", id, key);
- let waiting = Duration::from_nanos(100);
- while let Some(blocker) = self.tx_key_locks.read().get(key) {
- // Loop while the key we are trying to access is being blocked by a
- // connection in a transaction
- if *blocker == self.conn_id {
- // the key is being blocked by ourself, it is safe to break the
- // waiting loop
- break;
- }
- thread::sleep(waiting);
- }
- id
- }
- /// Locks keys exclusively
- ///
- /// The locked keys are only accessible (read or write) by the connection
- /// that locked them, any other connection must wait until the locking
- /// connection releases them.
- ///
- /// This is used to simulate redis transactions. Transaction in Redis are
- /// atomic but pausing a multi threaded Redis just to keep the same promises
- /// was a bit extreme, that's the reason why a transaction will lock
- /// exclusively all keys involved.
- pub fn lock_keys(&self, keys: &[Bytes]) {
- let waiting = Duration::from_nanos(100);
- loop {
- let mut lock = self.tx_key_locks.write();
- let mut i = 0;
- for key in keys.iter() {
- if let Some(blocker) = lock.get(key) {
- if *blocker == self.conn_id {
- // It is blocked by us already.
- i += 1;
- continue;
- }
- // It is blocked by another tx, we need to break
- // and retry to gain the lock over this key
- break;
- }
- lock.insert(key.clone(), self.conn_id);
- i += 1;
- }
- if i == keys.len() {
- // All the involved keys are successfully being blocked
- // exclusively.
- break;
- }
- // We need to sleep a bit and retry.
- drop(lock);
- thread::sleep(waiting);
- }
- }
- /// Releases the lock on keys
- pub fn unlock_keys(&self, keys: &[Bytes]) {
- let mut lock = self.tx_key_locks.write();
- for key in keys.iter() {
- lock.remove(key);
- }
- }
- /// Return debug info for a key
- pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
- let slot = self.slots[self.get_slot(key)].read();
- Ok(slot
- .get(key)
- .filter(|x| x.is_valid())
- .ok_or(Error::NotFound)?
- .value
- .debug())
- }
- /// Return the digest for each key. This used for testing only
- pub fn digest(&self, keys: &[Bytes]) -> Result<Vec<Value>, Error> {
- Ok(keys
- .iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- Value::Blob(
- slot.get(key)
- .filter(|v| v.is_valid())
- .map(|v| hex::encode(&v.value.digest()))
- .unwrap_or("00000".into())
- .as_str()
- .into(),
- )
- })
- .collect::<Vec<Value>>())
- }
- /// Flushes the entire database
- pub fn flushdb(&self) -> Result<Value, Error> {
- self.expirations.lock().flush();
- self.slots
- .iter()
- .map(|s| {
- let mut s = s.write();
- s.clear();
- })
- .for_each(drop);
- Ok(Value::Ok)
- }
- /// Returns the number of elements in the database
- pub fn len(&self) -> Result<usize, Error> {
- self.purge();
- Ok(self.slots.iter().map(|s| s.read().len()).sum())
- }
- /// Round numbers to store efficiently, specially float numbers. For instance `1.00` will be converted to `1`.
- fn round_numbers<T>(number: T) -> BytesMut
- where
- T: ToString,
- {
- let number_to_str = number.to_string();
- if number_to_str.find('.').is_none() {
- return number_to_str.as_bytes().into();
- }
- let number_to_str = number_to_str
- .trim_end_matches(|c| c == '0' || c == '.')
- .to_string();
- if number_to_str.is_empty() {
- "0"
- } else {
- number_to_str.as_str()
- }
- .into()
- }
- // Converts a given number to a correct Value, it should be used with Self::round_numbers()
- fn number_to_value(number: &[u8]) -> Result<Value, Error> {
- if number.iter().find(|x| **x == b'.').is_some() {
- Ok(Value::new(number))
- } else {
- Ok(Value::Integer(bytes_to_number(number)?))
- }
- }
- /// Increment a sub-key in a hash
- ///
- /// If the stored value cannot be converted into a number an error will be thrown
- pub fn hincrby<T>(
- &self,
- key: &Bytes,
- sub_key: &Bytes,
- incr_by: &Bytes,
- typ: &str,
- ) -> Result<Value, Error>
- where
- T: ToString
- + FromStr
- + CheckedAdd
- + for<'a> TryFrom<&'a Value, Error = Error>
- + Into<Value>
- + Copy,
- {
- let mut slot = self.slots[self.get_slot(key)].write();
- let mut incr_by: T =
- bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
- match slot.get_mut(key).filter(|x| x.is_valid()).map(|x| x.get()) {
- Some(Value::Hash(h)) => {
- let mut h = h.write();
- if let Some(n) = h.get(sub_key) {
- incr_by = incr_by
- .checked_add(
- &bytes_to_number(n)
- .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
- )
- .ok_or(Error::Overflow)?;
- }
- let incr_by_bytes = Self::round_numbers(incr_by).freeze();
- h.insert(sub_key.clone(), incr_by_bytes.clone());
- Self::number_to_value(&incr_by_bytes)
- }
- None => {
- #[allow(clippy::mutable_key_type)]
- let mut h = HashMap::new();
- let incr_by_bytes = Self::round_numbers(incr_by).freeze();
- h.insert(sub_key.clone(), incr_by_bytes.clone());
- let _ = slot.insert(key.clone(), Entry::new(h.into(), None));
- Self::number_to_value(&incr_by_bytes)
- }
- _ => Err(Error::WrongType),
- }
- }
- /// Increments a key's value by a given number
- ///
- /// If the stored value cannot be converted into a number an error will be
- /// thrown.
- pub fn incr<T>(&self, key: &Bytes, incr_by: T) -> Result<T, Error>
- where
- T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
- {
- let mut slot = self.slots[self.get_slot(key)].write();
- match slot.get_mut(key).filter(|x| x.is_valid()) {
- Some(x) => {
- if !x.is_clonable() {
- return Err(Error::WrongType);
- }
- let value = x.get();
- let mut number: T = value.try_into()?;
- number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
- x.change_value(Value::Blob(Self::round_numbers(number)));
- Ok(number)
- }
- None => {
- slot.insert(
- key.clone(),
- Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
- );
- Ok(incr_by)
- }
- }
- }
- /// Removes any expiration associated with a given key
- pub fn persist(&self, key: &Bytes) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.get_mut(key)
- .filter(|x| x.is_valid())
- .map_or(0.into(), |x| {
- if x.has_ttl() {
- self.expirations.lock().remove(key);
- x.persist();
- 1.into()
- } else {
- 0.into()
- }
- })
- }
- /// Set time to live for a given key
- pub fn set_ttl(
- &self,
- key: &Bytes,
- expires_in: Duration,
- opts: ExpirationOpts,
- ) -> Result<Value, Error> {
- if (opts.NX && opts.XX) || (opts.NX && opts.GT) || (opts.NX && opts.LT) {
- return Err(Error::OptsNotCompatible("NX and XX, GT or LT".to_owned()));
- }
- if opts.GT && opts.LT {
- return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
- }
- let mut slot = self.slots[self.get_slot(key)].write();
- let expires_at = Instant::now()
- .checked_add(expires_in)
- .unwrap_or_else(far_future);
- Ok(slot
- .get_mut(key)
- .filter(|x| x.is_valid())
- .map_or(0.into(), |x| {
- let current_expire = x.get_ttl();
- if opts.NX && current_expire.is_some() {
- return 0.into();
- }
- if opts.XX && current_expire.is_none() {
- return 0.into();
- }
- if opts.GT {
- if let Some(current_expire) = current_expire {
- if expires_at <= current_expire {
- return 0.into();
- }
- } else {
- return 0.into();
- }
- }
- if opts.LT {
- if let Some(current_expire) = current_expire {
- if expires_at >= current_expire {
- return 0.into();
- }
- }
- }
- self.expirations.lock().add(key, expires_at);
- x.set_ttl(expires_at);
- 1.into()
- }))
- }
- /// Overwrites part of the string stored at key, starting at the specified
- /// offset, for the entire length of value. If the offset is larger than the
- /// current length of the string at key, the string is padded with zero-bytes to
- /// make offset fit. Non-existing keys are considered as empty strings, so this
- /// command will make sure it holds a string large enough to be able to set
- /// value at offset.
- pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
- let mut slot = self.slots[self.get_slot(key)].write();
- let value = slot.get_mut(key).map(|value| {
- if !value.is_valid() {
- self.expirations.lock().remove(key);
- value.persist();
- }
- value.get_mut()
- });
- if offset < 0 {
- return Err(Error::OutOfRange);
- }
- if offset >= 512 * 1024 * 1024 - 4 {
- return Err(Error::MaxAllowedSize);
- }
- let length = offset as usize + data.len();
- match value {
- Some(Value::Blob(bytes)) => {
- if bytes.capacity() < length {
- bytes.resize(length, 0);
- }
- let writer = &mut bytes[offset as usize..length];
- writer.copy_from_slice(data);
- Ok(bytes.len().into())
- }
- None => {
- if data.len() == 0 {
- return Ok(0.into());
- }
- let mut bytes = BytesMut::new();
- bytes.resize(length, 0);
- let writer = &mut bytes[offset as usize..];
- writer.copy_from_slice(data);
- slot.insert(key.clone(), Entry::new(Value::new(&bytes), None));
- Ok(bytes.len().into())
- }
- _ => Err(Error::WrongType),
- }
- }
- /// Copies a key
- pub fn copy(
- &self,
- source: &Bytes,
- target: &Bytes,
- replace: Override,
- target_db: Option<Arc<Db>>,
- ) -> Result<bool, Error> {
- let slot = self.slots[self.get_slot(source)].read();
- let value = if let Some(value) = slot.get(source).filter(|x| x.is_valid()) {
- value.clone()
- } else {
- return Ok(false);
- };
- drop(slot);
- if let Some(db) = target_db {
- if db.db_id == self.db_id && source == target {
- return Err(Error::SameEntry);
- }
- if replace == Override::No && db.exists(&[target.clone()]) > 0 {
- return Ok(false);
- }
- let _ = db.set_advanced(
- target,
- value.value.clone(),
- value.get_ttl().map(|v| v - Instant::now()),
- replace,
- false,
- false,
- );
- Ok(true)
- } else {
- if source == target {
- return Err(Error::SameEntry);
- }
- if replace == Override::No && self.exists(&[target.clone()]) > 0 {
- return Ok(false);
- }
- let mut slot = self.slots[self.get_slot(target)].write();
- slot.insert(target.clone(), value);
- Ok(true)
- }
- }
- /// Moves a given key between databases
- pub fn move_key(&self, source: &Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
- if self.db_id == target_db.db_id {
- return Err(Error::SameEntry);
- }
- let mut slot = self.slots[self.get_slot(source)].write();
- let (expires_in, value) = if let Some(value) = slot.get(source).filter(|v| v.is_valid()) {
- (
- value.get_ttl().map(|t| t - Instant::now()),
- value.value.clone(),
- )
- } else {
- return Ok(false);
- };
- if Value::Integer(1)
- == target_db.set_advanced(&source, value, expires_in, Override::No, false, false)
- {
- slot.remove(source);
- Ok(true)
- } else {
- Ok(false)
- }
- }
- /// Return a random key from the database
- pub fn randomkey(&self) -> Result<Value, Error> {
- let mut rng = rand::thread_rng();
- let mut candidates = self
- .slots
- .iter()
- .map(|slot| {
- let slot = slot.read();
- if slot.is_empty() {
- None
- } else {
- slot.iter()
- .skip(rng.gen_range((0..slot.len())))
- .next()
- .map(|(k, v)| k.clone())
- }
- })
- .filter_map(|v| v)
- .collect::<Vec<Bytes>>();
- candidates.shuffle(&mut rng);
- Ok(candidates.get(0).into())
- }
- /// Renames a key
- pub fn rename(
- &self,
- source: &Bytes,
- target: &Bytes,
- override_value: Override,
- ) -> Result<bool, Error> {
- let slot1 = self.get_slot(source);
- let slot2 = self.get_slot(target);
- let result = if slot1 == slot2 {
- let mut slot = self.slots[slot1].write();
- if override_value == Override::No && slot.get(target).is_some() {
- return Ok(false);
- }
- if let Some(value) = slot.remove(source) {
- slot.insert(target.clone(), value);
- Ok(true)
- } else {
- Err(Error::NotFound)
- }
- } else {
- let mut slot1 = self.slots[slot1].write();
- let mut slot2 = self.slots[slot2].write();
- if override_value == Override::No && slot2.get(target).is_some() {
- return Ok(false);
- }
- if let Some(value) = slot1.remove(source) {
- slot2.insert(target.clone(), value);
- Ok(true)
- } else {
- Err(Error::NotFound)
- }
- };
- if result.is_ok() {
- self.bump_version(source);
- self.bump_version(target);
- }
- result
- }
- /// Removes keys from the database
- pub fn del(&self, keys: &[Bytes]) -> Value {
- let mut expirations = self.expirations.lock();
- keys.iter()
- .filter_map(|key| {
- expirations.remove(key);
- self.slots[self.get_slot(key)].write().remove(key)
- })
- .filter(|key| key.is_valid())
- .count()
- .into()
- }
- /// Returns all keys that matches a given pattern. This is a very expensive command.
- pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
- let pattern = String::from_utf8_lossy(pattern);
- let pattern =
- Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
- Ok(self
- .slots
- .iter()
- .map(|slot| {
- slot.read()
- .keys()
- .filter(|key| {
- let str_key = String::from_utf8_lossy(key);
- pattern.matches(&str_key)
- })
- .map(|key| Value::new(key))
- .collect::<Vec<Value>>()
- })
- .flatten()
- .collect())
- }
- /// Check if keys exists in the database
- pub fn exists(&self, keys: &[Bytes]) -> usize {
- let mut matches = 0;
- keys.iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- if let Some(key) = slot.get(key) {
- matches += if key.is_valid() { 1 } else { 0 };
- }
- })
- .for_each(drop);
- matches
- }
- /// get_map_or
- ///
- /// Instead of returning an entry of the database, to avoid cloning, this function will
- /// execute a callback function with the entry as a parameter. If no record is found another
- /// callback function is going to be executed, dropping the lock before doing so.
- ///
- /// If an entry is found, the lock is not dropped before doing the callback. Avoid inserting
- /// new entries. In this case the value is passed by reference, so it is possible to modify the
- /// entry itself.
- ///
- /// This function is useful to read non-scalar values from the database. Non-scalar values are
- /// forbidden to clone, attempting cloning will end-up in an error (Error::WrongType)
- pub fn get_map_or<F1, F2>(&self, key: &Bytes, found: F1, not_found: F2) -> Result<Value, Error>
- where
- F1: FnOnce(&Value) -> Result<Value, Error>,
- F2: FnOnce() -> Result<Value, Error>,
- {
- let slot = self.slots[self.get_slot(key)].read();
- let entry = slot.get(key).filter(|x| x.is_valid()).map(|e| e.get());
- if let Some(entry) = entry {
- found(entry)
- } else {
- // drop lock
- drop(slot);
- not_found()
- }
- }
- /// Updates the entry version of a given key
- pub fn bump_version(&self, key: &Bytes) -> bool {
- let mut slot = self.slots[self.get_slot(key)].write();
- let to_return = slot
- .get_mut(key)
- .filter(|x| x.is_valid())
- .map(|entry| {
- entry.bump_version();
- })
- .is_some();
- drop(slot);
- if to_return {
- let senders = self.change_subscriptions.read();
- if let Some(sender) = senders.get(key) {
- if sender.receiver_count() == 0 {
- // Garbage collection
- drop(senders);
- self.change_subscriptions.write().remove(key);
- } else {
- // Notify
- let _ = sender.send(());
- }
- }
- }
- to_return
- }
- /// Subscribe to key changes.
- pub fn subscribe_to_key_changes(&self, keys: &[Bytes]) -> Vec<Receiver<()>> {
- let mut subscriptions = self.change_subscriptions.write();
- keys.iter()
- .map(|key| {
- if let Some(sender) = subscriptions.get(key) {
- sender.subscribe()
- } else {
- let (sender, receiver) = broadcast::channel(1);
- subscriptions.insert(key.clone(), sender);
- receiver
- }
- })
- .collect()
- }
- /// Returns the version of a given key
- #[inline]
- pub fn get_version(&self, key: &Bytes) -> u128 {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map(|entry| entry.version())
- .unwrap_or_default()
- }
- /// Returns the name of the value type
- pub fn get_data_type(&self, key: &Bytes) -> String {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map_or("none".to_owned(), |x| {
- Typ::get_type(x.get()).to_string().to_lowercase()
- })
- }
- /// Get a copy of an entry
- pub fn get(&self, key: &Bytes) -> Value {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Get a copy of an entry and modifies the expiration of the key
- pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.get_mut(key)
- .filter(|x| x.is_valid())
- .map(|value| {
- if make_persistent {
- self.expirations.lock().remove(key);
- value.persist();
- } else if let Some(expires_in) = expires_in {
- let expires_at = Instant::now()
- .checked_add(expires_in)
- .unwrap_or_else(far_future);
- self.expirations.lock().add(key, expires_at);
- value.set_ttl(expires_at);
- }
- value
- })
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Get multiple copies of entries
- pub fn get_multi(&self, keys: &[Bytes]) -> Value {
- keys.iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid() && x.is_clonable())
- .map_or(Value::Null, |x| x.clone_value())
- })
- .collect::<Vec<Value>>()
- .into()
- }
- /// Get a key or set a new value for the given key.
- pub fn getset(&self, key: &Bytes, value: Value) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- self.expirations.lock().remove(key);
- slot.insert(key.clone(), Entry::new(value, None))
- .filter(|x| x.is_valid())
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Takes an entry from the database.
- pub fn getdel(&self, key: &Bytes) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.remove(key).map_or(Value::Null, |x| {
- self.expirations.lock().remove(key);
- x.clone_value()
- })
- }
- /// Set a key, value with an optional expiration time
- pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
- let mut slot = self.slots[self.get_slot(key)].write();
- let mut entry = slot.get_mut(key).filter(|x| x.is_valid());
- if let Some(entry) = slot.get_mut(key).filter(|x| x.is_valid()) {
- match entry.get_mut() {
- Value::Blob(value) => {
- value.put(value_to_append.as_ref());
- Ok(value.len().into())
- }
- _ => Err(Error::WrongType),
- }
- } else {
- slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
- Ok(value_to_append.len().into())
- }
- }
- /// Set multiple key/value pairs. Are involved keys are locked exclusively
- /// like a transaction.
- ///
- /// If override_all is set to false, all entries must be new entries or the
- /// entire operation fails, in this case 1 or is returned. Otherwise `Ok` is
- /// returned.
- pub fn multi_set(&self, key_values: &[Bytes], override_all: bool) -> Result<Value, Error> {
- if key_values.len() % 2 == 1 {
- return Err(Error::Syntax);
- }
- let keys = key_values
- .iter()
- .step_by(2)
- .cloned()
- .collect::<Vec<Bytes>>();
- self.lock_keys(&keys);
- if !override_all {
- for key in keys.iter() {
- let slot = self.slots[self.get_slot(key)].read();
- if slot.get(key).is_some() {
- self.unlock_keys(&keys);
- return Ok(0.into());
- }
- }
- }
- for (i, _) in key_values.iter().enumerate().step_by(2) {
- let mut slot = self.slots[self.get_slot(&key_values[i])].write();
- slot.insert(
- key_values[i].clone(),
- Entry::new(Value::new(&key_values[i + 1]), None),
- );
- }
- self.unlock_keys(&keys);
- if override_all {
- Ok(Value::Ok)
- } else {
- Ok(1.into())
- }
- }
- /// Set a key, value with an optional expiration time
- pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
- self.set_advanced(key, value, expires_in, Default::default(), false, false)
- }
- /// Set a value in the database with various settings
- pub fn set_advanced(
- &self,
- key: &Bytes,
- value: Value,
- expires_in: Option<Duration>,
- override_value: Override,
- keep_ttl: bool,
- return_previous: bool,
- ) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- let expires_at = expires_in.map(|duration| {
- Instant::now()
- .checked_add(duration)
- .unwrap_or_else(far_future)
- });
- let previous = slot.get(key).filter(|x| x.is_valid());
- let expires_at = if keep_ttl {
- if let Some(previous) = previous {
- previous.get_ttl()
- } else {
- expires_at
- }
- } else {
- expires_at
- };
- let to_return = if return_previous {
- let previous = previous.map_or(Value::Null, |v| v.clone_value());
- if previous.is_err() {
- // Error while trying to clone the previous value to return, we
- // must halt and return immediately.
- return previous;
- }
- Some(previous)
- } else {
- None
- };
- match override_value {
- Override::No => {
- if previous.is_some() {
- return if let Some(to_return) = to_return {
- to_return
- } else {
- 0.into()
- };
- }
- }
- Override::Only => {
- if previous.is_none() {
- return if let Some(to_return) = to_return {
- to_return
- } else {
- 0.into()
- };
- }
- }
- _ => {}
- };
- if let Some(expires_at) = expires_at {
- self.expirations.lock().add(key, expires_at);
- } else {
- /// Make sure to remove the new key (or replaced) from the
- /// expiration table (from any possible past value).
- self.expirations.lock().remove(key);
- }
- slot.insert(key.clone(), Entry::new(value, expires_at));
- if let Some(to_return) = to_return {
- to_return
- } else if override_value == Override::Yes {
- Value::Ok
- } else {
- 1.into()
- }
- }
- /// Returns the TTL of a given key
- pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
- }
- /// Check whether a given key is in the list of keys to be purged or not.
- /// This function is mainly used for unit testing
- pub fn is_key_in_expiration_list(&self, key: &Bytes) -> bool {
- self.expirations.lock().has(key)
- }
- /// Remove expired entries from the database.
- ///
- /// This function should be called from a background thread every few seconds. Calling it more
- /// often is a waste of resources.
- ///
- /// Expired keys are automatically hidden by the database, this process is just claiming back
- /// the memory from those expired keys.
- pub fn purge(&self) -> u64 {
- let mut expirations = self.expirations.lock();
- let mut removed = 0;
- trace!("Watching {} keys for expirations", expirations.len());
- let keys = expirations.get_expired_keys(None);
- drop(expirations);
- keys.iter()
- .map(|key| {
- let mut slot = self.slots[self.get_slot(key)].write();
- if slot.remove(key).is_some() {
- trace!("Removed key {:?} due timeout", key);
- removed += 1;
- }
- })
- .for_each(drop);
- removed
- }
- }
- impl scan::Scan for Db {
- fn scan(
- &self,
- cursor: Cursor,
- pattern: Option<&Bytes>,
- count: Option<usize>,
- typ: Option<Typ>,
- ) -> Result<scan::Result, Error> {
- let mut keys = vec![];
- let mut slot_id = cursor.bucket as usize;
- let mut last_pos = cursor.last_position as usize;
- let pattern = pattern
- .map(|pattern| {
- let pattern = String::from_utf8_lossy(pattern);
- Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
- })
- .transpose()?;
- loop {
- let slot = if let Some(value) = self.slots.get(slot_id) {
- value.read()
- } else {
- // We iterated through all the entries, time to signal that to
- // the client but returning a "0" cursor.
- slot_id = 0;
- last_pos = 0;
- break;
- };
- for (key, value) in slot.iter().skip(last_pos) {
- if !value.is_valid() {
- // Entry still exists in memory but it is not longer valid
- // and will soon be gargabe collected.
- last_pos += 1;
- continue;
- }
- if let Some(pattern) = &pattern {
- let str_key = String::from_utf8_lossy(key);
- if !pattern.matches(&str_key) {
- last_pos += 1;
- continue;
- }
- }
- if let Some(typ) = &typ {
- if !typ.is_value_type(value.get()) {
- last_pos += 1;
- continue;
- }
- }
- keys.push(Value::new(key));
- last_pos += 1;
- if keys.len() == count.unwrap_or(10) {
- break;
- }
- }
- if keys.len() == count.unwrap_or(10) {
- break;
- }
- last_pos = 0;
- slot_id += 1;
- }
- Ok(scan::Result {
- cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
- result: keys,
- })
- }
- }
- #[cfg(test)]
- mod test {
- use super::*;
- use crate::{bytes, db::scan::Scan, value::float::Float};
- use std::str::FromStr;
- #[test]
- fn incr_wrong_type() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("some string")), None);
- let r = db.incr(&bytes!("num"), 1);
- assert!(r.is_err());
- assert_eq!(Error::NotANumber, r.expect_err("should fail"));
- assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_float() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
- assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_int_float() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
- assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_int() {
- let db = Db::new(100);
- db.set(&bytes!(b"num"), Value::Blob(bytes!("1")), None);
- assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
- assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_int_set() {
- let db = Db::new(100);
- assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
- assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
- }
- #[test]
- fn incr_blob_float_set() {
- let db = Db::new(100);
- assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
- }
- #[test]
- fn del() {
- let db = Db::new(100);
- db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"valid"), Value::Ok, None);
- db.set(
- &bytes!(b"expiring"),
- Value::Ok,
- Some(Duration::from_secs(5)),
- );
- assert_eq!(
- Value::Integer(2),
- db.del(&[
- bytes!(b"expired"),
- bytes!(b"valid"),
- bytes!(b"expiring"),
- bytes!(b"not_existing_key")
- ])
- );
- }
- #[test]
- fn ttl() {
- let db = Db::new(100);
- db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"valid"), Value::Ok, None);
- db.set(
- &bytes!(b"expiring"),
- Value::Ok,
- Some(Duration::from_secs(5)),
- );
- assert_eq!(None, db.ttl(&bytes!(b"expired")));
- assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
- assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
- assert!(match db.ttl(&bytes!(b"expiring")) {
- Some(Some(_)) => true,
- _ => false,
- });
- }
- #[test]
- fn persist_bug() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
- assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
- assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
- db.persist(&bytes!(b"one"));
- assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
- }
- #[test]
- fn purge_keys() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- // Expired keys should not be returned, even if they are not yet
- // removed by the purge process.
- assert_eq!(Value::Null, db.get(&bytes!(b"one")));
- // Purge twice
- assert_eq!(1, db.purge());
- assert_eq!(0, db.purge());
- assert_eq!(Value::Null, db.get(&bytes!(b"one")));
- }
- #[test]
- fn replace_purge_keys() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- // Expired keys should not be returned, even if they are not yet
- // removed by the purge process.
- assert_eq!(Value::Null, db.get(&bytes!(b"one")));
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
- assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
- // Purge should return 0 as the expired key has been removed already
- assert_eq!(0, db.purge());
- }
- #[test]
- fn scan_skip_expired() {
- let db = Db::new(100);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
- for i in 0u64..20u64 {
- let key: Bytes = i.to_string().into();
- db.set(&key, Value::Ok, None);
- }
- let result = db
- .scan(Cursor::from_str("0").unwrap(), None, None, None)
- .unwrap();
- // first 10 records
- assert_eq!(10, result.result.len());
- // make sure the cursor is valid
- assert_ne!("0", result.cursor.to_string());
- let result = db.scan(result.cursor, None, None, None).unwrap();
- // 10 more records
- assert_eq!(10, result.result.len());
- // make sure the cursor is valid
- assert_ne!("0", result.cursor.to_string());
- let result = db.scan(result.cursor, None, None, None).unwrap();
- // No more results!
- assert_eq!(0, result.result.len());
- assert_eq!("0", result.cursor.to_string());
- }
- #[test]
- fn scan_limit() {
- let db = Db::new(10);
- db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
- for i in 0u64..2000u64 {
- let key: Bytes = i.to_string().into();
- db.set(&key, Value::Ok, None);
- }
- let result = db
- .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
- .unwrap();
- assert_eq!(2, result.result.len());
- assert_ne!("0", result.cursor.to_string());
- }
- #[test]
- fn scan_filter() {
- let db = Db::new(100);
- db.set(&bytes!(b"fone"), Value::Ok, None);
- db.set(&bytes!(b"ftwo"), Value::Ok, None);
- for i in 0u64..20u64 {
- let key: Bytes = i.to_string().into();
- db.set(&key, Value::Ok, None);
- }
- let result = db
- .scan(
- Cursor::from_str("0").unwrap(),
- Some(&bytes!(b"f*")),
- None,
- None,
- )
- .unwrap();
- assert_eq!(2, result.result.len());
- assert_eq!("0", result.cursor.to_string());
- }
- }
|