12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424 |
- //! # in-memory database
- //!
- //! This database module is the core of the miniredis project. All other modules around this
- //! database module.
- use self::utils::{far_future, ExpirationOpts, Override};
- use crate::{
- error::Error,
- value::{bytes_to_number, cursor::Cursor, typ::Typ, VDebug, Value},
- };
- use bytes::{BufMut, Bytes, BytesMut};
- use entry::{unique_id, Entry};
- use expiration::ExpirationDb;
- use glob::Pattern;
- use log::trace;
- use num_traits::CheckedAdd;
- use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
- use rand::{prelude::SliceRandom, Rng};
- use seahash::hash;
- use std::{
- collections::{HashMap, VecDeque},
- convert::{TryFrom, TryInto},
- ops::{Deref, DerefMut},
- str::FromStr,
- sync::Arc,
- thread,
- };
- use tokio::{
- sync::broadcast::{self, Receiver, Sender},
- time::{Duration, Instant},
- };
- mod entry;
- mod expiration;
- pub mod pool;
- pub mod scan;
- pub(crate) mod utils;
- /// Read only reference
- pub struct RefValue<'a> {
- key: &'a Bytes,
- slot: RwLockReadGuard<'a, HashMap<Bytes, Entry>>,
- }
- impl<'a> RefValue<'a> {
- /// test
- #[inline(always)]
- pub fn into_inner(self) -> Value {
- self.slot
- .get(self.key)
- .filter(|x| x.is_valid())
- .map(|x| {
- if x.is_scalar() {
- x.get().clone()
- } else {
- Error::WrongType.into()
- }
- })
- .unwrap_or_default()
- }
- /// test
- pub fn inner(&self) -> Option<RwLockReadGuard<'_, Value>> {
- self.slot
- .get(self.key)
- .filter(|x| x.is_valid())
- .map(|x| x.get())
- }
- /// test
- pub fn inner_mut(&self) -> Option<RwLockWriteGuard<'_, Value>> {
- self.slot
- .get(self.key)
- .filter(|x| x.is_valid())
- .map(|x| x.get_mut())
- }
- /// map_mut
- #[inline(always)]
- pub fn map<T, F>(self, f: F) -> Option<T>
- where
- F: FnOnce(&Value) -> T,
- {
- self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
- let value = x.get();
- f(value.deref())
- })
- }
- /// map_mut
- #[inline(always)]
- pub fn map_mut<T, F>(self, f: F) -> Option<T>
- where
- F: FnOnce(&mut Value) -> T,
- {
- self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
- let mut value = x.get_mut();
- f(value.deref_mut())
- })
- }
- /// Returns the version of a given key
- #[inline(always)]
- pub fn version(&self) -> usize {
- self.slot
- .get(self.key)
- .filter(|x| x.is_valid())
- .map(|x| x.version())
- .unwrap_or_default()
- }
- }
- /// Database structure
- ///
- /// Each connection has their own clone of the database and the conn_id is stored in each instance.
- /// The slots property is shared for all connections.
- ///
- /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
- /// HashMaps. Each key is pre-sharded and a bucket is selected. By doing this pre-step instead of
- /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
- /// making this database implementation thread-friendly. The number of number_of_slots available cannot be
- /// changed at runtime.
- ///
- /// The database is also aware of other connections locking other keys exclusively (for
- /// transactions).
- ///
- /// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
- /// versioning (in practice the nanosecond of last modification).
- #[derive(Debug)]
- pub struct Db {
- /// A vector of hashmaps.
- ///
- /// Instead of having a single HashMap, and having all threads fighting for
- /// blocking the single HashMap, we have a vector of N HashMap
- /// (configurable), which in theory allow to have faster reads and writes.
- ///
- /// Because all operations are always key specific, the key is used to hash
- /// and select to which HashMap the data might be stored.
- slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
- /// Data structure to store all expiring keys
- expirations: Arc<Mutex<ExpirationDb>>,
- /// Key changes subscriptions hash. This hash contains all the senders to
- /// key subscriptions. If a key does not exists here it means that no-one
- /// wants to be notified of the current key changes.
- change_subscriptions: Arc<RwLock<HashMap<Bytes, Sender<()>>>>,
- /// Number of HashMaps that are available.
- number_of_slots: usize,
- /// Databases unique ID. This is an internal identifier to avoid deadlocks
- /// when copying and moving data between databases.
- pub db_id: usize,
- /// Current connection ID
- ///
- /// A Database is attached to a conn_id. The slots and expiration data
- /// structures are shared between all connections, regardless of conn_id.
- ///
- /// This particular database instance is attached to a conn_id, which is used
- /// to lock keys exclusively for transactions and other atomic operations.
- conn_id: u128,
- /// HashMap of all blocked keys by other connections. If a key appears in
- /// here and it is not being hold by the current connection, current
- /// connection must wait.
- tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
- }
- impl Db {
- /// Creates a new database instance
- pub fn new(number_of_slots: usize) -> Self {
- let slots = (0..number_of_slots)
- .map(|_| RwLock::new(HashMap::new()))
- .collect();
- Self {
- slots: Arc::new(slots),
- expirations: Arc::new(Mutex::new(ExpirationDb::new())),
- change_subscriptions: Arc::new(RwLock::new(HashMap::new())),
- conn_id: 0,
- db_id: unique_id(),
- tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
- number_of_slots,
- }
- }
- /// Creates a new Database instance bound to a connection.
- ///
- /// This is particular useful when locking keys exclusively.
- ///
- /// All the internal data are shjared through an Arc.
- pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
- Arc::new(Self {
- slots: self.slots.clone(),
- tx_key_locks: self.tx_key_locks.clone(),
- expirations: self.expirations.clone(),
- change_subscriptions: self.change_subscriptions.clone(),
- conn_id,
- db_id: self.db_id,
- number_of_slots: self.number_of_slots,
- })
- }
- #[inline]
- /// Returns a slot where a key may be hosted.
- ///
- /// In order to avoid too much locks, instead of having a single hash a
- /// database instance is a set of hashes. Each key is pre-shared with a
- /// quick hashing algorithm to select a 'slot' or HashMap where it may be
- /// hosted.
- fn get_slot(&self, key: &Bytes) -> usize {
- let id = (hash(key) as usize) % self.number_of_slots;
- trace!("selected slot {} for key {:?}", id, key);
- let waiting = Duration::from_nanos(100);
- while let Some(blocker) = self.tx_key_locks.read().get(key) {
- // Loop while the key we are trying to access is being blocked by a
- // connection in a transaction
- if *blocker == self.conn_id {
- // the key is being blocked by ourself, it is safe to break the
- // waiting loop
- break;
- }
- thread::sleep(waiting);
- }
- id
- }
- /// Locks keys exclusively
- ///
- /// The locked keys are only accessible (read or write) by the connection
- /// that locked them, any other connection must wait until the locking
- /// connection releases them.
- ///
- /// This is used to simulate redis transactions. Transaction in Redis are
- /// atomic but pausing a multi threaded Redis just to keep the same promises
- /// was a bit extreme, that's the reason why a transaction will lock
- /// exclusively all keys involved.
- pub fn lock_keys(&self, keys: &[Bytes]) {
- let waiting = Duration::from_nanos(100);
- loop {
- let mut lock = self.tx_key_locks.write();
- let mut i = 0;
- for key in keys.iter() {
- if let Some(blocker) = lock.get(key) {
- if *blocker == self.conn_id {
- // It is blocked by us already.
- i += 1;
- continue;
- }
- // It is blocked by another tx, we need to break
- // and retry to gain the lock over this key
- break;
- }
- lock.insert(key.clone(), self.conn_id);
- i += 1;
- }
- if i == keys.len() {
- // All the involved keys are successfully being blocked
- // exclusively.
- break;
- }
- // We need to sleep a bit and retry.
- drop(lock);
- thread::sleep(waiting);
- }
- }
- /// Releases the lock on keys
- pub fn unlock_keys(&self, keys: &[Bytes]) {
- let mut lock = self.tx_key_locks.write();
- for key in keys.iter() {
- lock.remove(key);
- }
- }
- /// Return debug info for a key
- pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map(|x| x.get().debug())
- .ok_or(Error::NotFound)
- }
- /// Return the digest for each key. This used for testing only
- pub fn digest(&self, keys: &[Bytes]) -> Result<Vec<Value>, Error> {
- Ok(keys
- .iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- Value::new(
- slot.get(key)
- .filter(|v| v.is_valid())
- .map(|v| hex::encode(v.digest()))
- .unwrap_or("00000".into())
- .as_bytes(),
- )
- })
- .collect::<Vec<Value>>())
- }
- /// Flushes the entire database
- pub fn flushdb(&self) -> Result<Value, Error> {
- self.expirations.lock().flush();
- self.slots
- .iter()
- .map(|s| {
- let mut s = s.write();
- s.clear();
- })
- .for_each(drop);
- Ok(Value::Ok)
- }
- /// Checks if the database is empty
- pub fn is_empty(&self) -> bool {
- for slot in self.slots.iter() {
- if slot.read().len() > 0 {
- return false;
- }
- }
- true
- }
- /// Returns the number of elements in the database
- pub fn len(&self) -> Result<usize, Error> {
- self.purge();
- Ok(self.slots.iter().map(|s| s.read().len()).sum())
- }
- /// Round numbers to store efficiently, specially float numbers. For instance `1.00` will be converted to `1`.
- fn round_numbers<T>(number: T) -> Bytes
- where
- T: ToString,
- {
- let number_to_str = number.to_string();
- if number_to_str.find('.').is_none() {
- return Bytes::copy_from_slice(number_to_str.as_bytes());
- }
- let number_to_str = number_to_str
- .trim_end_matches(|c| c == '0' || c == '.')
- .to_string();
- Bytes::copy_from_slice(if number_to_str.is_empty() {
- b"0"
- } else {
- number_to_str.as_bytes()
- })
- }
- // Converts a given number to a correct Value, it should be used with Self::round_numbers()
- fn number_to_value(number: &[u8]) -> Result<Value, Error> {
- if number.iter().any(|x| *x == b'.') {
- Ok(Value::new(number))
- } else {
- Ok(Value::Integer(bytes_to_number(number)?))
- }
- }
- /// Increment a sub-key in a hash
- ///
- /// If the stored value cannot be converted into a number an error will be thrown
- pub fn hincrby<T>(
- &self,
- key: &Bytes,
- sub_key: &Bytes,
- incr_by: &Bytes,
- typ: &str,
- ) -> Result<Value, Error>
- where
- T: ToString
- + FromStr
- + CheckedAdd
- + for<'a> TryFrom<&'a Value, Error = Error>
- + Into<Value>
- + Copy,
- {
- let slot_id = self.get_slot(key);
- let slot = self.slots[slot_id].read();
- let mut incr_by: T =
- bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
- if let Some(x) = slot
- .get(key)
- .filter(|x| x.is_valid())
- .map(|x| x.get_mut())
- .map(|mut x| match x.deref_mut() {
- Value::Hash(ref mut h) => {
- if let Some(n) = h.get(sub_key) {
- incr_by = incr_by
- .checked_add(
- &bytes_to_number(n)
- .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
- )
- .ok_or(Error::Overflow)?;
- }
- let incr_by_bytes = Self::round_numbers(incr_by);
- h.insert(sub_key.clone(), incr_by_bytes.clone());
- Self::number_to_value(&incr_by_bytes)
- }
- _ => Err(Error::WrongType),
- })
- {
- return x;
- }
- drop(slot);
- #[allow(clippy::mutable_key_type)]
- let mut h = HashMap::new();
- let incr_by_bytes = Self::round_numbers(incr_by);
- h.insert(sub_key.clone(), incr_by_bytes.clone());
- let _ = self.slots[slot_id]
- .write()
- .insert(key.clone(), Entry::new(h.into(), None));
- Self::number_to_value(&incr_by_bytes)
- }
- /// Increments a key's value by a given number
- ///
- /// If the stored value cannot be converted into a number an error will be
- /// thrown.
- pub fn incr<T>(&self, key: &Bytes, incr_by: T) -> Result<T, Error>
- where
- T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
- {
- let slot_id = self.get_slot(key);
- let slot = self.slots[slot_id].read();
- if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
- if !entry.is_scalar() {
- return Err(Error::WrongType);
- }
- let mut value = entry.get_mut();
- let mut number: T = (&*value).try_into()?;
- number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
- *value = Value::Blob(Self::round_numbers(number));
- entry.bump_version();
- Ok(number)
- } else {
- drop(slot);
- self.slots[slot_id].write().insert(
- key.clone(),
- Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
- );
- Ok(incr_by)
- }
- }
- /// Removes any expiration associated with a given key
- pub fn persist(&self, key: &Bytes) -> Value {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map_or(0.into(), |x| {
- if x.has_ttl() {
- self.expirations.lock().remove(key);
- x.persist();
- 1.into()
- } else {
- 0.into()
- }
- })
- }
- /// Set time to live for a given key
- pub fn set_ttl(
- &self,
- key: &Bytes,
- expires_in: Duration,
- opts: ExpirationOpts,
- ) -> Result<Value, Error> {
- if opts.if_none && (opts.replace_only || opts.greater_than || opts.lower_than) {
- return Err(Error::OptsNotCompatible("NX and XX, GT or LT".to_owned()));
- }
- if opts.greater_than && opts.lower_than {
- return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
- }
- let slot = self.slots[self.get_slot(key)].read();
- let expires_at = Instant::now()
- .checked_add(expires_in)
- .unwrap_or_else(far_future);
- Ok(slot
- .get(key)
- .filter(|x| x.is_valid())
- .map_or(0.into(), |x| {
- let current_expire = x.get_ttl();
- if opts.if_none && current_expire.is_some() {
- return 0.into();
- }
- if opts.replace_only && current_expire.is_none() {
- return 0.into();
- }
- if opts.greater_than {
- if let Some(current_expire) = current_expire {
- if expires_at <= current_expire {
- return 0.into();
- }
- } else {
- return 0.into();
- }
- }
- if opts.lower_than {
- if let Some(current_expire) = current_expire {
- if expires_at >= current_expire {
- return 0.into();
- }
- }
- }
- self.expirations.lock().add(key, expires_at);
- x.set_ttl(expires_at);
- 1.into()
- }))
- }
- /// Overwrites part of the string stored at key, starting at the specified
- /// offset, for the entire length of value. If the offset is larger than the
- /// current length of the string at key, the string is padded with zero-bytes to
- /// make offset fit. Non-existing keys are considered as empty strings, so this
- /// command will make sure it holds a string large enough to be able to set
- /// value at offset.
- pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
- let slot_id = self.get_slot(key);
- let slot = self.slots[slot_id].read();
- let mut value = slot
- .get(key)
- .map(|value| {
- value.ensure_blob_is_mutable()?;
- if !value.is_valid() {
- self.expirations.lock().remove(key);
- value.persist();
- }
- Ok::<_, Error>(value.get_mut())
- })
- .transpose()?;
- if offset < 0 {
- return Err(Error::OutOfRange);
- }
- if offset >= 512 * 1024 * 1024 - 4 {
- return Err(Error::MaxAllowedSize);
- }
- let length = offset as usize + data.len();
- if let Some(value) = value.as_mut() {
- match value.deref_mut() {
- Value::BlobRw(ref mut bytes) => {
- if bytes.capacity() < length {
- bytes.resize(length, 0);
- }
- let writer = &mut bytes[offset as usize..length];
- writer.copy_from_slice(data);
- Ok(bytes.len().into())
- }
- _ => Err(Error::WrongType),
- }
- } else {
- drop(value);
- drop(slot);
- if data.is_empty() {
- return Ok(0.into());
- }
- let mut bytes = BytesMut::new();
- bytes.resize(length, 0);
- let writer = &mut bytes[offset as usize..];
- writer.copy_from_slice(data);
- self.slots[slot_id]
- .write()
- .insert(key.clone(), Entry::new(Value::new(&bytes), None));
- Ok(bytes.len().into())
- }
- }
- /// Copies a key
- pub fn copy(
- &self,
- source: Bytes,
- target: Bytes,
- replace: Override,
- target_db: Option<Arc<Db>>,
- ) -> Result<bool, Error> {
- let slot = self.slots[self.get_slot(&source)].read();
- let value = if let Some(value) = slot.get(&source).filter(|x| x.is_valid()) {
- value.clone()
- } else {
- return Ok(false);
- };
- drop(slot);
- if let Some(db) = target_db {
- if db.db_id == self.db_id && source == target {
- return Err(Error::SameEntry);
- }
- if replace == Override::No && db.exists(&[target.clone()]) > 0 {
- return Ok(false);
- }
- let ttl = value.get_ttl().map(|v| v - Instant::now());
- let _ = db.set_advanced(target, value.take_value(), ttl, replace, false, false);
- Ok(true)
- } else {
- if source == target {
- return Err(Error::SameEntry);
- }
- if replace == Override::No && self.exists(&[target.clone()]) > 0 {
- return Ok(false);
- }
- let mut slot = self.slots[self.get_slot(&target)].write();
- slot.insert(target, value);
- Ok(true)
- }
- }
- /// Moves a given key between databases
- pub fn move_key(&self, source: Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
- if self.db_id == target_db.db_id {
- return Err(Error::SameEntry);
- }
- let mut slot = self.slots[self.get_slot(&source)].write();
- let (expires_in, value) = if let Some(value) = slot.get(&source).filter(|v| v.is_valid()) {
- (
- value.get_ttl().map(|t| t - Instant::now()),
- value.get().clone(),
- )
- } else {
- return Ok(false);
- };
- if Value::Integer(1)
- == target_db.set_advanced(
- source.clone(),
- value,
- expires_in,
- Override::No,
- false,
- false,
- )
- {
- slot.remove(&source);
- Ok(true)
- } else {
- Ok(false)
- }
- }
- /// Return a random key from the database
- pub fn randomkey(&self) -> Result<Value, Error> {
- let mut rng = rand::thread_rng();
- let mut candidates = self
- .slots
- .iter()
- .filter_map(|slot| {
- let slot = slot.read();
- if slot.is_empty() {
- None
- } else {
- slot.iter()
- .nth(rng.gen_range(0..slot.len()))
- .map(|(k, _v)| k.clone())
- }
- })
- .collect::<Vec<Bytes>>();
- candidates.shuffle(&mut rng);
- Ok(candidates.first().into())
- }
- /// Renames a key
- pub fn rename(
- &self,
- source: &Bytes,
- target: &Bytes,
- override_value: Override,
- ) -> Result<bool, Error> {
- let slot1 = self.get_slot(source);
- let slot2 = self.get_slot(target);
- let result = if slot1 == slot2 {
- let mut slot = self.slots[slot1].write();
- if override_value == Override::No && slot.get(target).is_some() {
- return Ok(false);
- }
- if let Some(value) = slot.remove(source) {
- slot.insert(target.clone(), value);
- Ok(true)
- } else {
- Err(Error::NotFound)
- }
- } else {
- let mut slot1 = self.slots[slot1].write();
- let mut slot2 = self.slots[slot2].write();
- if override_value == Override::No && slot2.get(target).is_some() {
- return Ok(false);
- }
- if let Some(value) = slot1.remove(source) {
- slot2.insert(target.clone(), value);
- Ok(true)
- } else {
- Err(Error::NotFound)
- }
- };
- if result.is_ok() {
- self.bump_version(source);
- self.bump_version(target);
- }
- result
- }
- /// Removes keys from the database
- pub fn del(&self, keys: &[Bytes]) -> Value {
- let mut expirations = self.expirations.lock();
- keys.iter()
- .filter_map(|key| {
- expirations.remove(key);
- self.slots[self.get_slot(key)].write().remove(key)
- })
- .filter(|key| key.is_valid())
- .count()
- .into()
- }
- /// Returns all keys that matches a given pattern. This is a very expensive command.
- pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
- let pattern = String::from_utf8_lossy(pattern);
- let pattern =
- Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
- Ok(self
- .slots
- .iter()
- .flat_map(|slot| {
- slot.read()
- .keys()
- .filter(|key| {
- let str_key = String::from_utf8_lossy(key);
- pattern.matches(&str_key)
- })
- .map(|key| Value::new(key))
- .collect::<Vec<Value>>()
- })
- .collect())
- }
- /// Check if keys exists in the database
- pub fn exists(&self, keys: &[Bytes]) -> usize {
- let mut matches = 0;
- keys.iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- if let Some(key) = slot.get(key) {
- matches += if key.is_valid() { 1 } else { 0 };
- }
- })
- .for_each(drop);
- matches
- }
- /// Updates the entry version of a given key
- pub fn bump_version(&self, key: &Bytes) -> bool {
- let slot = self.slots[self.get_slot(key)].read();
- let to_return = slot
- .get(key)
- .filter(|x| x.is_valid())
- .map(|entry| {
- entry.bump_version();
- })
- .is_some();
- drop(slot);
- if to_return {
- let senders = self.change_subscriptions.read();
- if let Some(sender) = senders.get(key) {
- if sender.receiver_count() == 0 {
- // Garbage collection
- drop(senders);
- self.change_subscriptions.write().remove(key);
- } else {
- // Notify
- let _ = sender.send(());
- }
- }
- }
- to_return
- }
- /// Subscribe to key changes.
- pub fn subscribe_to_key_changes(&self, keys: &[Bytes]) -> Vec<Receiver<()>> {
- let mut subscriptions = self.change_subscriptions.write();
- keys.iter()
- .map(|key| {
- if let Some(sender) = subscriptions.get(key) {
- sender.subscribe()
- } else {
- let (sender, receiver) = broadcast::channel(1);
- subscriptions.insert(key.clone(), sender);
- receiver
- }
- })
- .collect()
- }
- /// Returns the name of the value type
- pub fn get_data_type(&self, key: &Bytes) -> String {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map_or("none".to_owned(), |x| {
- Typ::get_type(&x.get()).to_string().to_lowercase()
- })
- }
- /// Get a ref value
- pub fn get<'a>(&'a self, key: &'a Bytes) -> RefValue<'a> {
- RefValue {
- slot: self.slots[self.get_slot(key)].read(),
- key,
- }
- }
- /// Get a copy of an entry and modifies the expiration of the key
- pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid())
- .map(|value| {
- if make_persistent {
- self.expirations.lock().remove(key);
- value.persist();
- } else if let Some(expires_in) = expires_in {
- let expires_at = Instant::now()
- .checked_add(expires_in)
- .unwrap_or_else(far_future);
- self.expirations.lock().add(key, expires_at);
- value.set_ttl(expires_at);
- }
- value
- })
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Get multiple copies of entries
- pub fn get_multi(&self, keys: VecDeque<Bytes>) -> Value {
- keys.iter()
- .map(|key| {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key)
- .filter(|x| x.is_valid() && x.is_scalar())
- .map_or(Value::Null, |x| x.clone_value())
- })
- .collect::<Vec<Value>>()
- .into()
- }
- /// Get a key or set a new value for the given key.
- pub fn getset(&self, key: &Bytes, value: Value) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- self.expirations.lock().remove(key);
- slot.insert(key.clone(), Entry::new(value, None))
- .filter(|x| x.is_valid())
- .map_or(Value::Null, |x| x.clone_value())
- }
- /// Takes an entry from the database.
- pub fn getdel(&self, key: &Bytes) -> Value {
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.remove(key).map_or(Value::Null, |x| {
- self.expirations.lock().remove(key);
- x.clone_value()
- })
- }
- /// Set a key, value with an optional expiration time
- pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
- let slot = self.slots[self.get_slot(key)].read();
- if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
- entry.ensure_blob_is_mutable()?;
- match *entry.get_mut() {
- Value::BlobRw(ref mut value) => {
- value.put(value_to_append.as_ref());
- Ok(value.len().into())
- }
- _ => Err(Error::WrongType),
- }
- } else {
- drop(slot);
- let mut slot = self.slots[self.get_slot(key)].write();
- slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
- Ok(value_to_append.len().into())
- }
- }
- /// Set multiple key/value pairs. Are involved keys are locked exclusively
- /// like a transaction.
- ///
- /// If override_all is set to false, all entries must be new entries or the
- /// entire operation fails, in this case 1 or is returned. Otherwise `Ok` is
- /// returned.
- pub fn multi_set(
- &self,
- key_values: VecDeque<Bytes>,
- override_all: bool,
- ) -> Result<Value, Error> {
- if key_values.len() % 2 == 1 {
- return Err(Error::Syntax);
- }
- let mut keys = vec![];
- let mut values = vec![];
- key_values.into_iter().enumerate().for_each(|(key, val)| {
- if key % 2 == 0 {
- keys.push(val);
- } else {
- values.push(val);
- }
- });
- let to_lock = keys.clone();
- self.lock_keys(&to_lock);
- if !override_all {
- for key in keys.iter() {
- let slot = self.slots[self.get_slot(key)].read();
- if slot.get(key).is_some() {
- self.unlock_keys(&keys);
- return Ok(0.into());
- }
- }
- }
- let mut values = values.into_iter();
- for key in keys.into_iter() {
- let mut slot = self.slots[self.get_slot(&key)].write();
- if let Some(value) = values.next() {
- slot.insert(key, Entry::new(Value::Blob(value), None));
- }
- }
- self.unlock_keys(&to_lock);
- if override_all {
- Ok(Value::Ok)
- } else {
- Ok(1.into())
- }
- }
- /// Set a key, value with an optional expiration time
- pub fn set(&self, key: Bytes, value: Value, expires_in: Option<Duration>) -> Value {
- self.set_advanced(key, value, expires_in, Default::default(), false, false)
- }
- /// Set a value in the database with various settings
- pub fn set_advanced(
- &self,
- key: Bytes,
- value: Value,
- expires_in: Option<Duration>,
- override_value: Override,
- keep_ttl: bool,
- return_previous: bool,
- ) -> Value {
- let mut slot = self.slots[self.get_slot(&key)].write();
- let expires_at = expires_in.map(|duration| {
- Instant::now()
- .checked_add(duration)
- .unwrap_or_else(far_future)
- });
- let previous = slot.get(&key).filter(|x| x.is_valid());
- let expires_at = if keep_ttl {
- if let Some(previous) = previous {
- previous.get_ttl()
- } else {
- expires_at
- }
- } else {
- expires_at
- };
- let to_return = if return_previous {
- let previous = previous.map_or(Value::Null, |v| v.clone_value());
- if previous.is_err() {
- // Error while trying to clone the previous value to return, we
- // must halt and return immediately.
- return previous;
- }
- Some(previous)
- } else {
- None
- };
- match override_value {
- Override::No => {
- if previous.is_some() {
- return if let Some(to_return) = to_return {
- to_return
- } else {
- 0.into()
- };
- }
- }
- Override::Only => {
- if previous.is_none() {
- return if let Some(to_return) = to_return {
- to_return
- } else {
- 0.into()
- };
- }
- }
- _ => {}
- };
- if let Some(expires_at) = expires_at {
- self.expirations.lock().add(&key, expires_at);
- } else {
- // Make sure to remove the new key (or replaced) from the
- // expiration table (from any possible past value).
- self.expirations.lock().remove(&key);
- }
- slot.insert(key, Entry::new(value, expires_at));
- if let Some(to_return) = to_return {
- to_return
- } else if override_value == Override::Yes {
- Value::Ok
- } else {
- 1.into()
- }
- }
- /// Returns the TTL of a given key
- pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
- let slot = self.slots[self.get_slot(key)].read();
- slot.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
- }
- /// Check whether a given key is in the list of keys to be purged or not.
- /// This function is mainly used for unit testing
- pub fn is_key_in_expiration_list(&self, key: &Bytes) -> bool {
- self.expirations.lock().has(key)
- }
- /// Remove expired entries from the database.
- ///
- /// This function should be called from a background thread every few seconds. Calling it more
- /// often is a waste of resources.
- ///
- /// Expired keys are automatically hidden by the database, this process is just claiming back
- /// the memory from those expired keys.
- pub fn purge(&self) -> u64 {
- let mut expirations = self.expirations.lock();
- let mut removed = 0;
- trace!("Watching {} keys for expirations", expirations.len());
- let keys = expirations.get_expired_keys(None);
- drop(expirations);
- keys.iter()
- .map(|key| {
- let mut slot = self.slots[self.get_slot(key)].write();
- if slot.remove(key).is_some() {
- trace!("Removed key {:?} due timeout", key);
- removed += 1;
- }
- })
- .for_each(drop);
- removed
- }
- }
- impl scan::Scan for Db {
- fn scan(
- &self,
- cursor: Cursor,
- pattern: Option<Bytes>,
- count: Option<usize>,
- typ: Option<Typ>,
- ) -> Result<scan::Result, Error> {
- let mut keys = vec![];
- let mut slot_id = cursor.bucket as usize;
- let mut last_pos = cursor.last_position as usize;
- let pattern = pattern
- .map(|pattern| {
- let pattern = String::from_utf8_lossy(&pattern);
- Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
- })
- .transpose()?;
- loop {
- let slot = if let Some(value) = self.slots.get(slot_id) {
- value.read()
- } else {
- // We iterated through all the entries, time to signal that to
- // the client but returning a "0" cursor.
- slot_id = 0;
- last_pos = 0;
- break;
- };
- for (key, value) in slot.iter().skip(last_pos) {
- if !value.is_valid() {
- // Entry still exists in memory but it is not longer valid
- // and will soon be gargabe collected.
- last_pos += 1;
- continue;
- }
- if let Some(pattern) = &pattern {
- let str_key = String::from_utf8_lossy(key);
- if !pattern.matches(&str_key) {
- last_pos += 1;
- continue;
- }
- }
- if let Some(typ) = &typ {
- if !typ.is_value_type(&value.get()) {
- last_pos += 1;
- continue;
- }
- }
- keys.push(Value::new(key));
- last_pos += 1;
- if keys.len() == count.unwrap_or(10) {
- break;
- }
- }
- if keys.len() == count.unwrap_or(10) {
- break;
- }
- last_pos = 0;
- slot_id += 1;
- }
- Ok(scan::Result {
- cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
- result: keys,
- })
- }
- }
- #[cfg(test)]
- mod test {
- use super::*;
- use crate::{bytes, db::scan::Scan, value::float::Float};
- use std::str::FromStr;
- #[test]
- fn incr_wrong_type() {
- let db = Db::new(100);
- db.set(bytes!(b"num"), Value::Blob(bytes!("some string")), None);
- let r = db.incr(&bytes!("num"), 1);
- assert!(r.is_err());
- assert_eq!(Error::NotANumber, r.expect_err("should fail"));
- assert_eq!(
- Value::Blob(bytes!("some string")),
- db.get(&bytes!("num")).into_inner()
- );
- }
- #[test]
- fn incr_blob_float() {
- let db = Db::new(100);
- db.set(bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
- assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(
- Value::Blob(bytes!("2.2")),
- db.get(&bytes!("num")).into_inner()
- );
- }
- #[test]
- fn incr_blob_int_float() {
- let db = Db::new(100);
- db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
- assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(
- Value::Blob(bytes!("2.1")),
- db.get(&bytes!("num")).into_inner()
- );
- }
- #[test]
- fn incr_blob_int() {
- let db = Db::new(100);
- db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
- assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
- assert_eq!(
- Value::Blob(bytes!("2")),
- db.get(&bytes!("num")).into_inner()
- );
- }
- #[test]
- fn incr_blob_int_set() {
- let db = Db::new(100);
- assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
- assert_eq!(
- Value::Blob(bytes!("1")),
- db.get(&bytes!("num")).into_inner()
- );
- }
- #[test]
- fn incr_blob_float_set() {
- let db = Db::new(100);
- assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
- assert_eq!(
- Value::Blob(bytes!("1.1")),
- db.get(&bytes!("num")).into_inner()
- );
- }
- #[test]
- fn del() {
- let db = Db::new(100);
- db.set(bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(bytes!(b"valid"), Value::Ok, None);
- db.set(bytes!(b"expiring"), Value::Ok, Some(Duration::from_secs(5)));
- assert_eq!(
- Value::Integer(2),
- db.del(&[
- bytes!(b"expired"),
- bytes!(b"valid"),
- bytes!(b"expiring"),
- bytes!(b"not_existing_key")
- ])
- );
- }
- #[test]
- fn ttl() {
- let db = Db::new(100);
- db.set(bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(bytes!(b"valid"), Value::Ok, None);
- db.set(bytes!(b"expiring"), Value::Ok, Some(Duration::from_secs(5)));
- assert_eq!(None, db.ttl(&bytes!(b"expired")));
- assert_eq!(None, db.ttl(&bytes!(b"not_existing_key")));
- assert_eq!(Some(None), db.ttl(&bytes!(b"valid")));
- assert!(matches!(db.ttl(&bytes!(b"expiring")), Some(Some(_))));
- }
- #[test]
- fn persist_bug() {
- let db = Db::new(100);
- db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
- assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
- assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
- db.persist(&bytes!(b"one"));
- assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
- }
- #[test]
- fn purge_keys() {
- let db = Db::new(100);
- db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- // Expired keys should not be returned, even if they are not yet
- // removed by the purge process.
- assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
- // Purge twice
- assert_eq!(1, db.purge());
- assert_eq!(0, db.purge());
- assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
- }
- #[test]
- fn replace_purge_keys() {
- let db = Db::new(100);
- db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- // Expired keys should not be returned, even if they are not yet
- // removed by the purge process.
- assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
- db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
- assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
- // Purge should return 0 as the expired key has been removed already
- assert_eq!(0, db.purge());
- }
- #[test]
- fn scan_skip_expired() {
- let db = Db::new(100);
- db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
- for i in 0u64..20u64 {
- let key: Bytes = i.to_string().into();
- db.set(key, Value::Ok, None);
- }
- let result = db
- .scan(Cursor::from_str("0").unwrap(), None, None, None)
- .unwrap();
- // first 10 records
- assert_eq!(10, result.result.len());
- // make sure the cursor is valid
- assert_ne!("0", result.cursor.to_string());
- let result = db.scan(result.cursor, None, None, None).unwrap();
- // 10 more records
- assert_eq!(10, result.result.len());
- // make sure the cursor is valid
- assert_ne!("0", result.cursor.to_string());
- let result = db.scan(result.cursor, None, None, None).unwrap();
- // No more results!
- assert_eq!(0, result.result.len());
- assert_eq!("0", result.cursor.to_string());
- }
- #[test]
- fn scan_limit() {
- let db = Db::new(10);
- db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
- db.set(bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
- for i in 0u64..2000u64 {
- let key: Bytes = i.to_string().into();
- db.set(key, Value::Ok, None);
- }
- let result = db
- .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
- .unwrap();
- assert_eq!(2, result.result.len());
- assert_ne!("0", result.cursor.to_string());
- }
- #[test]
- fn scan_filter() {
- let db = Db::new(100);
- db.set(bytes!(b"fone"), Value::Ok, None);
- db.set(bytes!(b"ftwo"), Value::Ok, None);
- for i in 0u64..20u64 {
- let key: Bytes = i.to_string().into();
- db.set(key, Value::Ok, None);
- }
- let result = db
- .scan(
- Cursor::from_str("0").unwrap(),
- Some(bytes!(b"f*")),
- None,
- None,
- )
- .unwrap();
- assert_eq!(2, result.result.len());
- assert_eq!("0", result.cursor.to_string());
- }
- #[tokio::test]
- async fn lock_keys() {
- let db1 = Arc::new(Db::new(100));
- let db2 = db1.clone().new_db_instance(2);
- let db3 = db1.clone().new_db_instance(3);
- let shared = Arc::new(RwLock::new(1));
- let shared1 = shared.clone();
- let shared2 = shared.clone();
- let shared3 = shared.clone();
- let _ = tokio::join!(
- tokio::spawn(async move {
- db1.lock_keys(&["test".into()]);
- let mut x = shared1.write();
- *x = 2;
- thread::sleep(Duration::from_secs(1));
- db1.unlock_keys(&["test".into()]);
- }),
- tokio::spawn(async move {
- db2.lock_keys(&["test".into(), "bar".into()]);
- let mut x = shared2.write();
- if *x == 2 {
- *x = 5;
- }
- thread::sleep(Duration::from_secs(2));
- db2.unlock_keys(&["test".into(), "bar".into()]);
- }),
- tokio::spawn(async move {
- thread::sleep(Duration::from_millis(500));
- assert_eq!(4, db3.get_slot(&"test".into()));
- let mut x = shared3.write();
- if *x == 5 {
- *x = 6;
- }
- }),
- );
- assert_eq!(6, *shared.read());
- }
- }
|