Ver código fonte

Do not use callbacks

Instead return a reference to the Guarded HashMap and a reference to the
Key to be read.

Also update Entry to not need a mutable instance ever. Use an internal
mutex for the expiration and atomic for the object ids.

Do not use time for the object ids, use an atomic counter instead
Cesar Rodas 1 ano atrás
pai
commit
bb3a083c21
7 arquivos alterados com 133 adições e 82 exclusões
  1. 6 6
      src/cmd/string.rs
  2. 1 1
      src/cmd/transaction.rs
  3. 2 2
      src/connection/mod.rs
  4. 35 32
      src/db/entry.rs
  5. 82 34
      src/db/mod.rs
  6. 7 7
      src/value/mod.rs
  7. 0 0
      src/value/shared.rs

+ 6 - 6
src/cmd/string.rs

@@ -12,7 +12,7 @@ use std::{
     cmp::min,
     collections::VecDeque,
     convert::TryInto,
-    ops::{Bound, Neg},
+    ops::{Bound, Deref, Neg},
 };
 
 /// If key already exists and is a string, this command appends the value at the
@@ -77,7 +77,7 @@ pub async fn decr_by(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value,
 /// Get the value of key. If the key does not exist the special value nil is returned. An error is
 /// returned if the value stored at key is not a string, because GET only handles string values.
 pub async fn get(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    Ok(conn.db().get(&args[0]))
+    Ok(conn.db().get(&args[0]).inner())
 }
 
 /// Get the value of key and optionally set its expiration. GETEX is similar to
@@ -124,9 +124,9 @@ pub async fn getex(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Er
 /// Get the value of key. If the key does not exist the special value nil is returned. An error is
 /// returned if the value stored at key is not a string, because GET only handles string values.
 pub async fn getrange(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    let bytes = match conn.db().get(&args[0]) {
-        Value::Blob(binary) => binary,
-        Value::BlobRw(binary) => binary.freeze(),
+    let bytes = match conn.db().get(&args[0]).deref() {
+        Value::Blob(binary) => binary.clone(),
+        Value::BlobRw(binary) => binary.clone().freeze(),
         Value::Null => return Ok("".into()),
         _ => return Err(Error::WrongType),
     };
@@ -350,7 +350,7 @@ pub async fn setnx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
 /// Returns the length of the string value stored at key. An error is returned when key holds a
 /// non-string value.
 pub async fn strlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    match conn.db().get(&args[0]) {
+    match conn.db().get(&args[0]).deref() {
         Value::Blob(x) => Ok(x.len().into()),
         Value::String(x) => Ok(x.len().into()),
         Value::Null => Ok(0.into()),

+ 1 - 1
src/cmd/transaction.rs

@@ -77,7 +77,7 @@ pub async fn watch(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Er
                 let v = conn.db().get_version(&key);
                 (key, v)
             })
-            .collect::<Vec<(Bytes, u128)>>(),
+            .collect::<Vec<(Bytes, usize)>>(),
     );
     Ok(Value::Ok)
 }

+ 2 - 2
src/connection/mod.rs

@@ -46,7 +46,7 @@ pub struct ConnectionInfo {
     current_db: usize,
     db: Arc<Db>,
     name: Option<String>,
-    watch_keys: Vec<(Bytes, u128)>,
+    watch_keys: Vec<(Bytes, usize)>,
     tx_keys: HashSet<Bytes>,
     status: ConnectionStatus,
     commands: Option<Vec<VecDeque<Bytes>>>,
@@ -267,7 +267,7 @@ impl Connection {
 
     /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
     /// some value changed since the moment the command was queued until the execution time.
-    pub fn watch_key(&self, keys: Vec<(Bytes, u128)>) {
+    pub fn watch_key(&self, keys: Vec<(Bytes, usize)>) {
         let watch_keys = &mut self.info.write().watch_keys;
         keys.into_iter()
             .map(|value| {

+ 35 - 32
src/db/entry.rs

@@ -1,19 +1,20 @@
 use crate::{error::Error, value::Value};
-use std::time::SystemTime;
+use parking_lot::Mutex;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use tokio::time::Instant;
 
 #[derive(Debug)]
 pub struct Entry {
     pub value: Value,
-    pub version: u128,
-    expires_at: Option<Instant>,
+    pub version: AtomicUsize,
+    expires_at: Mutex<Option<Instant>>,
 }
 
-pub fn new_version() -> u128 {
-    SystemTime::now()
-        .duration_since(SystemTime::UNIX_EPOCH)
-        .expect("get millis error")
-        .as_nanos()
+static LAST_VERSION: AtomicUsize = AtomicUsize::new(0);
+
+/// Returns a new version
+pub fn unique_id() -> usize {
+    LAST_VERSION.fetch_add(1, Ordering::Relaxed)
 }
 
 /// Database Entry
@@ -27,50 +28,54 @@ impl Entry {
     pub fn new(value: Value, expires_at: Option<Instant>) -> Self {
         Self {
             value,
-            expires_at,
-            version: new_version(),
+            expires_at: Mutex::new(expires_at),
+            version: AtomicUsize::new(LAST_VERSION.fetch_add(1, Ordering::Relaxed)),
         }
     }
 
-    pub fn bump_version(&mut self) {
-        self.version = new_version();
+    #[inline(always)]
+    pub fn bump_version(&self) {
+        self.version.store(
+            LAST_VERSION.fetch_add(1, Ordering::Relaxed),
+            Ordering::Relaxed,
+        )
     }
 
-    pub fn persist(&mut self) {
-        self.expires_at = None;
+    pub fn persist(&self) {
+        *self.expires_at.lock() = None;
     }
 
     pub fn clone(&self) -> Self {
-        Self::new(self.value.clone(), self.expires_at)
+        Self::new(self.value.clone(), *self.expires_at.lock())
     }
 
     pub fn get_ttl(&self) -> Option<Instant> {
-        self.expires_at
+        *self.expires_at.lock()
     }
 
     pub fn has_ttl(&self) -> bool {
-        self.expires_at.is_some()
+        self.expires_at.lock().is_some()
     }
 
-    pub fn set_ttl(&mut self, expires_at: Instant) {
-        self.expires_at = Some(expires_at);
-        self.version = new_version();
+    pub fn set_ttl(&self, expires_at: Instant) {
+        *self.expires_at.lock() = Some(expires_at);
+        self.bump_version()
     }
 
-    pub fn version(&self) -> u128 {
-        self.version
+    pub fn version(&self) -> usize {
+        self.version.load(Ordering::Relaxed)
     }
 
     /// Changes the value that is wrapped in this entry, the TTL (expired_at) is
     /// not affected.
     pub fn change_value(&mut self, value: Value) {
         self.value = value;
-        self.version = new_version();
+        self.bump_version()
     }
 
     #[allow(dead_code)]
     pub fn get_mut(&mut self) -> &mut Value {
-        self.version = new_version();
+        self.bump_version();
         &mut self.value
     }
 
@@ -83,13 +88,11 @@ impl Entry {
     /// behaviour we can schedule the purge thread to run every few seconds or
     /// even minutes instead of once every second.
     pub fn is_valid(&self) -> bool {
-        self.expires_at.map_or(true, |x| x > Instant::now())
+        self.expires_at.lock().map_or(true, |x| x > Instant::now())
     }
 
-    /// Whether or not the value is clonable. Special types like hashes should
-    /// not be clonable because those types cannot be returned to the user with
-    /// the `get` command.
-    pub fn is_clonable(&self) -> bool {
+    /// Whether or not the value is scalar
+    pub fn is_scalar(&self) -> bool {
         matches!(
             &self.value,
             Value::Boolean(_)
@@ -107,7 +110,7 @@ impl Entry {
     /// Clone a value. If the value is not clonable an error is Value::Error is
     /// returned instead
     pub fn clone_value(&self) -> Value {
-        if self.is_clonable() {
+        if self.is_scalar() {
             self.value.clone()
         } else {
             Error::WrongType.into()
@@ -140,7 +143,7 @@ mod test {
 
     #[test]
     fn persist() {
-        let mut e = Entry::new(Value::Null, Some(Instant::now()));
+        let e = Entry::new(Value::Null, Some(Instant::now()));
         assert!(!e.is_valid());
         e.persist();
         assert!(e.is_valid());
@@ -148,7 +151,7 @@ mod test {
 
     #[test]
     fn update_ttl() {
-        let mut e = Entry::new(Value::Null, Some(Instant::now()));
+        let e = Entry::new(Value::Null, Some(Instant::now()));
         assert!(!e.is_valid());
         e.persist();
         assert!(e.is_valid());

+ 82 - 34
src/db/mod.rs

@@ -8,19 +8,18 @@ use crate::{
     value::{bytes_to_number, cursor::Cursor, typ::Typ, VDebug, Value},
 };
 use bytes::{BufMut, Bytes, BytesMut};
-
-use entry::{new_version, Entry};
+use entry::{unique_id, Entry};
 use expiration::ExpirationDb;
-
 use glob::Pattern;
 use log::trace;
 use num_traits::CheckedAdd;
-use parking_lot::{Mutex, RwLock};
+use parking_lot::{Mutex, RwLock, RwLockReadGuard};
 use rand::{prelude::SliceRandom, Rng};
 use seahash::hash;
 use std::{
     collections::{HashMap, VecDeque},
     convert::{TryFrom, TryInto},
+    ops::Deref,
     str::FromStr,
     sync::Arc,
     thread,
@@ -36,6 +35,52 @@ pub mod pool;
 pub mod scan;
 pub(crate) mod utils;
 
+/// Read only reference
+pub struct RefValue<'a> {
+    key: &'a Bytes,
+    slot: RwLockReadGuard<'a, HashMap<Bytes, Entry>>,
+}
+
+impl<'a> RefValue<'a> {
+    /// test
+    #[inline(always)]
+    pub fn inner(self) -> Value {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| {
+                if x.is_scalar() {
+                    x.get().clone()
+                } else {
+                    Error::WrongType.into()
+                }
+            })
+            .unwrap_or_default()
+    }
+
+    /// Returns the version of a given key
+    #[inline(always)]
+    pub fn version(&self) -> usize {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.version())
+            .unwrap_or_default()
+    }
+}
+
+impl Deref for RefValue<'_> {
+    type Target = Value;
+
+    fn deref(&self) -> &Self::Target {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.get())
+            .unwrap_or(&Value::Null)
+    }
+}
+
 /// Database structure
 ///
 /// Each connection has their own clone of the database and the conn_id is stored in each instance.
@@ -77,7 +122,7 @@ pub struct Db {
 
     /// Databases unique ID. This is an internal identifier to avoid deadlocks
     /// when copying and moving data between databases.
-    pub db_id: u128,
+    pub db_id: usize,
 
     /// Current connection  ID
     ///
@@ -106,7 +151,7 @@ impl Db {
             expirations: Arc::new(Mutex::new(ExpirationDb::new())),
             change_subscriptions: Arc::new(RwLock::new(HashMap::new())),
             conn_id: 0,
-            db_id: new_version(),
+            db_id: unique_id(),
             tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
             number_of_slots,
         }
@@ -357,7 +402,7 @@ impl Db {
         let mut slot = self.slots[self.get_slot(key)].write();
         match slot.get_mut(key).filter(|x| x.is_valid()) {
             Some(x) => {
-                if !x.is_clonable() {
+                if !x.is_scalar() {
                     return Err(Error::WrongType);
                 }
                 let value = x.get();
@@ -381,8 +426,8 @@ impl Db {
 
     /// Removes any expiration associated with a given key
     pub fn persist(&self, key: &Bytes) -> Value {
-        let mut slot = self.slots[self.get_slot(key)].write();
-        slot.get_mut(key)
+        let slot = self.slots[self.get_slot(key)].read();
+        slot.get(key)
             .filter(|x| x.is_valid())
             .map_or(0.into(), |x| {
                 if x.has_ttl() {
@@ -410,13 +455,13 @@ impl Db {
             return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
         }
 
-        let mut slot = self.slots[self.get_slot(key)].write();
+        let slot = self.slots[self.get_slot(key)].read();
         let expires_at = Instant::now()
             .checked_add(expires_in)
             .unwrap_or_else(far_future);
 
         Ok(slot
-            .get_mut(key)
+            .get(key)
             .filter(|x| x.is_valid())
             .map_or(0.into(), |x| {
                 let current_expire = x.get_ttl();
@@ -732,9 +777,9 @@ impl Db {
 
     /// Updates the entry version of a given key
     pub fn bump_version(&self, key: &Bytes) -> bool {
-        let mut slot = self.slots[self.get_slot(key)].write();
+        let slot = self.slots[self.get_slot(key)].read();
         let to_return = slot
-            .get_mut(key)
+            .get(key)
             .filter(|x| x.is_valid())
             .map(|entry| {
                 entry.bump_version();
@@ -775,7 +820,7 @@ impl Db {
 
     /// Returns the version of a given key
     #[inline]
-    pub fn get_version(&self, key: &Bytes) -> u128 {
+    pub fn get_version(&self, key: &Bytes) -> usize {
         let slot = self.slots[self.get_slot(key)].read();
         slot.get(key)
             .filter(|x| x.is_valid())
@@ -793,18 +838,18 @@ impl Db {
             })
     }
 
-    /// Get a copy of an entry
-    pub fn get(&self, key: &Bytes) -> Value {
-        let slot = self.slots[self.get_slot(key)].read();
-        slot.get(key)
-            .filter(|x| x.is_valid())
-            .map_or(Value::Null, |x| x.clone_value())
+    /// Get a ref value
+    pub fn get<'a>(&'a self, key: &'a Bytes) -> RefValue<'a> {
+        RefValue {
+            slot: self.slots[self.get_slot(key)].read(),
+            key,
+        }
     }
 
     /// Get a copy of an entry and modifies the expiration of the key
     pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
-        let mut slot = self.slots[self.get_slot(key)].write();
-        slot.get_mut(key)
+        let slot = self.slots[self.get_slot(key)].read();
+        slot.get(key)
             .filter(|x| x.is_valid())
             .map(|value| {
                 if make_persistent {
@@ -828,7 +873,7 @@ impl Db {
             .map(|key| {
                 let slot = self.slots[self.get_slot(key)].read();
                 slot.get(key)
-                    .filter(|x| x.is_valid() && x.is_clonable())
+                    .filter(|x| x.is_valid() && x.is_scalar())
                     .map_or(Value::Null, |x| x.clone_value())
             })
             .collect::<Vec<Value>>()
@@ -1147,7 +1192,10 @@ mod test {
 
         assert!(r.is_err());
         assert_eq!(Error::NotANumber, r.expect_err("should fail"));
-        assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
+        assert_eq!(
+            Value::Blob(bytes!("some string")),
+            db.get(&bytes!("num")).inner()
+        );
     }
 
     #[test]
@@ -1156,7 +1204,7 @@ mod test {
         db.set(bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
 
         assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
-        assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
+        assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")).inner());
     }
 
     #[test]
@@ -1165,7 +1213,7 @@ mod test {
         db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
 
         assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
-        assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
+        assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")).inner());
     }
 
     #[test]
@@ -1174,21 +1222,21 @@ mod test {
         db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
 
         assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
-        assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
+        assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")).inner());
     }
 
     #[test]
     fn incr_blob_int_set() {
         let db = Db::new(100);
         assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
-        assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
+        assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")).inner());
     }
 
     #[test]
     fn incr_blob_float_set() {
         let db = Db::new(100);
         assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
-        assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
+        assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")).inner());
     }
 
     #[test]
@@ -1226,7 +1274,7 @@ mod test {
     fn persist_bug() {
         let db = Db::new(100);
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
-        assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Ok, db.get(&bytes!(b"one")).inner());
         assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
         db.persist(&bytes!(b"one"));
         assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
@@ -1238,13 +1286,13 @@ mod test {
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
         // Expired keys should not be returned, even if they are not yet
         // removed by the purge process.
-        assert_eq!(Value::Null, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Null, db.get(&bytes!(b"one")).inner());
 
         // Purge twice
         assert_eq!(1, db.purge());
         assert_eq!(0, db.purge());
 
-        assert_eq!(Value::Null, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Null, db.get(&bytes!(b"one")).inner());
     }
 
     #[test]
@@ -1253,10 +1301,10 @@ mod test {
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
         // Expired keys should not be returned, even if they are not yet
         // removed by the purge process.
-        assert_eq!(Value::Null, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Null, db.get(&bytes!(b"one")).inner());
 
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
-        assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Ok, db.get(&bytes!(b"one")).inner());
 
         // Purge should return 0 as the expired key has been removed already
         assert_eq!(0, db.purge());

+ 7 - 7
src/value/mod.rs

@@ -5,7 +5,7 @@ pub mod checksum;
 pub mod cursor;
 pub mod expiration;
 pub mod float;
-pub mod locked;
+pub mod shared;
 pub mod typ;
 
 use crate::{error::Error, value_try_from, value_vec_try_from};
@@ -24,11 +24,11 @@ use std::{
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
     /// Hash. This type cannot be serialized
-    Hash(locked::Value<HashMap<Bytes, Bytes>>),
+    Hash(shared::Value<HashMap<Bytes, Bytes>>),
     /// List. This type cannot be serialized
-    List(locked::Value<VecDeque<checksum::Value>>),
+    List(shared::Value<VecDeque<checksum::Value>>),
     /// Set. This type cannot be serialized
-    Set(locked::Value<HashSet<Bytes>>),
+    Set(shared::Value<HashSet<Bytes>>),
     /// Vector/Array of values
     Array(Vec<Value>),
     /// Bytes/Strings/Binary data
@@ -272,19 +272,19 @@ impl From<&str> for Value {
 
 impl From<HashMap<Bytes, Bytes>> for Value {
     fn from(value: HashMap<Bytes, Bytes>) -> Value {
-        Value::Hash(locked::Value::new(value))
+        Value::Hash(shared::Value::new(value))
     }
 }
 
 impl From<VecDeque<checksum::Value>> for Value {
     fn from(value: VecDeque<checksum::Value>) -> Value {
-        Value::List(locked::Value::new(value))
+        Value::List(shared::Value::new(value))
     }
 }
 
 impl From<HashSet<Bytes>> for Value {
     fn from(value: HashSet<Bytes>) -> Value {
-        Value::Set(locked::Value::new(value))
+        Value::Set(shared::Value::new(value))
     }
 }
 

+ 0 - 0
src/value/locked.rs → src/value/shared.rs