Ver código fonte

Remove shared wrapper for Value::Hash values

Non scalar values have a proper locking mechanism inside an Entry, that
is enough to drop the former `shared` wrapper.
Cesar Rodas 1 ano atrás
pai
commit
d41fe22f52
4 arquivos alterados com 268 adições e 206 exclusões
  1. 198 174
      src/cmd/hash.rs
  2. 2 0
      src/db/entry.rs
  3. 66 30
      src/db/mod.rs
  4. 2 2
      src/value/mod.rs

+ 198 - 174
src/cmd/hash.rs

@@ -7,7 +7,10 @@ use crate::{
 };
 use bytes::Bytes;
 use rand::Rng;
-use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::{
+    collections::{BTreeMap, HashMap, VecDeque},
+    ops::Deref,
+};
 
 /// Removes the specified fields from the hash stored at key. Specified fields that do not exist
 /// within this hash are ignored. If key does not exist, it is treated as an empty hash and this
@@ -15,24 +18,26 @@ use std::collections::{BTreeMap, HashMap, VecDeque};
 pub async fn hdel(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let mut is_empty = false;
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-            let mut total: i64 = 0;
-
-            for key in args.into_iter() {
-                if h.remove(&key).is_some() {
-                    total += 1;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Hash(h) => {
+                let mut total: i64 = 0;
+
+                for key in args.into_iter() {
+                    if h.remove(&key).is_some() {
+                        total += 1;
+                    }
                 }
-            }
 
-            is_empty = h.len() == 0;
+                is_empty = h.is_empty();
 
-            Ok(total.into())
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+                Ok(total.into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))?;
 
     if is_empty {
         let _ = conn.db().del(&[key]);
@@ -45,47 +50,49 @@ pub async fn hdel(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 
 /// Returns if field is an existing field in the hash stored at key.
 pub async fn hexists(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(if h.read().get(&args[1]).is_some() {
-            1.into()
-        } else {
-            0.into()
-        }),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => Ok(if h.get(&args[1]).is_some() {
+                1.into()
+            } else {
+                0.into()
+            }),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))
 }
 
 /// Returns the value associated with field in the hash stored at key.
 pub async fn hget(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(h
-            .read()
-            .get(&args[1])
-            .map(|v| Value::new(v))
-            .unwrap_or_default()),
-        None => Ok(Value::Null),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => Ok(h.get(&args[1]).map(|v| Value::new(v)).unwrap_or_default()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Null))
 }
 
 /// Returns all fields and values of the hash stored at key. In the returned value, every field
 /// name is followed by its value, so the length of the reply is twice the size of the hash.
 pub async fn hgetall(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => {
+                let mut ret = vec![];
 
-            for (key, value) in h.read().iter() {
-                ret.push(Value::new(key));
-                ret.push(Value::new(value));
-            }
+                for (key, value) in h.iter() {
+                    ret.push(Value::new(key));
+                    ret.push(Value::new(value));
+                }
 
-            Ok(ret.into())
-        }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+                Ok(ret.into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Increment the specified field of a hash stored at key, and representing a number, by the
@@ -118,50 +125,56 @@ pub async fn hincrby_float(conn: &Connection, args: VecDeque<Bytes>) -> Result<V
 
 /// Returns all field names in the hash stored at key.
 pub async fn hkeys(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => {
+                let mut ret = vec![];
+
+                for key in h.keys() {
+                    ret.push(Value::new(key));
+                }
 
-            for key in h.read().keys() {
-                ret.push(Value::new(key));
+                Ok(ret.into())
             }
-
-            Ok(ret.into())
-        }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Returns the number of fields contained in the hash stored at key.
 pub async fn hlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(h.read().len().into()),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => Ok(h.len().into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))
 }
 
 /// Returns the values associated with the specified fields in the hash stored at key.
 pub async fn hmget(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let h = h.read();
-
-            Ok(args
+    let r = conn
+        .db()
+        .get(&key)
+        .map::<Result<Value, _>, _>(|v| match v {
+            Value::Hash(h) => Ok(args
                 .iter()
                 .map(|key| h.get(key).map(|v| Value::new(v)).unwrap_or_default())
                 .collect::<Vec<Value>>()
+                .into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
+            Ok(args
+                .iter()
+                .map(|_| Value::Null)
+                .collect::<Vec<Value>>()
                 .into())
-        }
-        None => Ok(args
-            .iter()
-            .map(|_| Value::Null)
-            .collect::<Vec<Value>>()
-            .into()),
-        _ => Err(Error::WrongType),
-    })
+        })?;
+    Ok(r)
 }
 
 /// Returns random keys (or values) from a hash
@@ -184,46 +197,48 @@ pub async fn hrandfield(conn: &Connection, args: VecDeque<Bytes>) -> Result<Valu
         _ => (1, true, 1),
     };
 
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
-            let mut i = 0;
-            let mut rand_sorted = BTreeMap::new();
-            let mut rng = rand::thread_rng();
-            let h = h.read();
-
-            for _ in 0..repeat {
-                for (key, value) in h.iter() {
-                    let rand = rng.gen::<u64>();
-                    rand_sorted.insert((rand, i), (key, value));
-                    i += 1;
+    conn.db()
+        .get(&args[0])
+        .inner()
+        .map(|v| match v.deref() {
+            Value::Hash(h) => {
+                let mut ret = vec![];
+                let mut i = 0;
+                let mut rand_sorted = BTreeMap::new();
+                let mut rng = rand::thread_rng();
+
+                for _ in 0..repeat {
+                    for (key, value) in h.iter() {
+                        let rand = rng.gen::<u64>();
+                        rand_sorted.insert((rand, i), (key, value));
+                        i += 1;
+                    }
                 }
-            }
 
-            i = 0;
-            for val in rand_sorted.values() {
-                if single {
-                    return Ok(Value::new(val.0));
-                }
+                i = 0;
+                for val in rand_sorted.values() {
+                    if single {
+                        return Ok(Value::new(val.0));
+                    }
 
-                if i == count {
-                    break;
-                }
+                    if i == count {
+                        break;
+                    }
 
-                ret.push(Value::new(val.0));
+                    ret.push(Value::new(val.0));
 
-                if with_values {
-                    ret.push(Value::new(val.1));
+                    if with_values {
+                        ret.push(Value::new(val.1));
+                    }
+
+                    i += 1;
                 }
 
-                i += 1;
+                Ok(ret.into())
             }
-
-            Ok(ret.into())
-        }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Sets field in the hash stored at key to value. If key does not exist, a new key holding a hash
@@ -233,20 +248,24 @@ pub async fn hmset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
     }
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-            loop {
-                if args.is_empty() {
-                    break;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Hash(h) => {
+                loop {
+                    if args.is_empty() {
+                        break;
+                    }
+                    let key = args.pop_front().ok_or(Error::Syntax)?;
+                    let value = args.pop_front().ok_or(Error::Syntax)?;
+                    h.insert(key, value);
                 }
-                let key = args.pop_front().ok_or(Error::Syntax)?;
-                let value = args.pop_front().ok_or(Error::Syntax)?;
-                h.insert(key, value);
+                Ok(Value::Ok)
             }
-            Ok(Value::Ok)
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut h = HashMap::new();
             loop {
@@ -260,9 +279,7 @@ pub async fn hmset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
             let _len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(Value::Ok)
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     conn.db().bump_version(&key);
 
@@ -276,23 +293,27 @@ pub async fn hset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
     }
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-            let mut e: i64 = 0;
-            loop {
-                if args.is_empty() {
-                    break;
-                }
-                let key = args.pop_front().ok_or(Error::Syntax)?;
-                let value = args.pop_front().ok_or(Error::Syntax)?;
-                if h.insert(key, value).is_none() {
-                    e += 1;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Hash(h) => {
+                let mut e: i64 = 0;
+                loop {
+                    if args.is_empty() {
+                        break;
+                    }
+                    let key = args.pop_front().ok_or(Error::Syntax)?;
+                    let value = args.pop_front().ok_or(Error::Syntax)?;
+                    if h.insert(key, value).is_none() {
+                        e += 1;
+                    }
                 }
+                Ok(e.into())
             }
-            Ok(e.into())
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut h = HashMap::new();
             loop {
@@ -306,9 +327,7 @@ pub async fn hset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
             let len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(len.into())
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     conn.db().bump_version(&key);
 
@@ -318,31 +337,33 @@ pub async fn hset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 /// Sets field in the hash stored at key to value, only if field does not yet exist. If key does
 /// not exist, a new key holding a hash is created. If field already exists, this operation has no
 /// effect.
+#[allow(warnings)]
 pub async fn hsetnx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
     let sub_key = args.pop_front().ok_or(Error::Syntax)?;
     let value = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-
-            if h.get(&sub_key).is_some() {
-                Ok(0.into())
-            } else {
-                h.insert(sub_key, value);
-                Ok(1.into())
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|mut v| match v {
+            Value::Hash(ref mut h) => {
+                if h.get(&sub_key).is_some() {
+                    Ok(0.into())
+                } else {
+                    h.insert(sub_key.clone(), value.clone());
+                    Ok(1.into())
+                }
             }
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut h = HashMap::new();
             h.insert(sub_key, value);
             let len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(len.into())
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     if result == Value::Integer(1) {
         conn.db().bump_version(&key);
@@ -354,33 +375,36 @@ pub async fn hsetnx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Valu
 /// Returns the string length of the value associated with field in the hash stored at key. If the
 /// key or the field do not exist, 0 is returned.
 pub async fn hstrlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(h
-            .read()
-            .get(&args[1])
-            .map(|v| v.len())
-            .unwrap_or_default()
-            .into()),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|x| match x {
+            Value::Hash(h) => match h.get(&args[1]) {
+                Some(v) => Ok(v.len().into()),
+                None => Ok(0.into()),
+            },
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| Ok(0.into()))
 }
 
 /// Returns all values in the hash stored at key.
 pub async fn hvals(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
+    if let Some(ref value) = conn.db().get(&args[0]).inner() {
+        match value.deref() {
+            Value::Hash(h) => {
+                let mut ret = vec![];
 
-            for value in h.read().values() {
-                ret.push(Value::new(value));
-            }
+                for value in h.values() {
+                    ret.push(Value::new(value));
+                }
 
-            Ok(ret.into())
+                Ok(ret.into())
+            }
+            _ => Err(Error::WrongType),
         }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+    } else {
+        Ok(Value::Array(vec![]))
+    }
 }
 
 #[cfg(test)]

+ 2 - 0
src/db/entry.rs

@@ -34,10 +34,12 @@ impl Entry {
         }
     }
 
+    #[inline(always)]
     pub fn take_value(self) -> Value {
         self.value.into_inner()
     }
 
+    #[inline(always)]
     pub fn digest(&self) -> Vec<u8> {
         self.value.read().digest()
     }

+ 66 - 30
src/db/mod.rs

@@ -13,13 +13,13 @@ use expiration::ExpirationDb;
 use glob::Pattern;
 use log::trace;
 use num_traits::CheckedAdd;
-use parking_lot::{Mutex, RwLock, RwLockReadGuard};
+use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
 use rand::{prelude::SliceRandom, Rng};
 use seahash::hash;
 use std::{
     collections::{HashMap, VecDeque},
     convert::{TryFrom, TryInto},
-    ops::DerefMut,
+    ops::{Deref, DerefMut},
     str::FromStr,
     sync::Arc,
     thread,
@@ -66,6 +66,38 @@ impl<'a> RefValue<'a> {
             .map(|x| x.get())
     }
 
+    /// test
+    pub fn inner_mut(&self) -> Option<RwLockWriteGuard<'_, Value>> {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.get_mut())
+    }
+
+    /// map_mut
+    #[inline(always)]
+    pub fn map<T, F>(self, f: F) -> Option<T>
+    where
+        F: FnOnce(&Value) -> T,
+    {
+        self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
+            let value = x.get();
+            f(value.deref())
+        })
+    }
+
+    /// map_mut
+    #[inline(always)]
+    pub fn map_mut<T, F>(self, f: F) -> Option<T>
+    where
+        F: FnOnce(&mut Value) -> T,
+    {
+        self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
+            let mut value = x.get_mut();
+            f(value.deref_mut())
+        })
+    }
+
     /// Returns the version of a given key
     #[inline(always)]
     pub fn version(&self) -> usize {
@@ -359,36 +391,40 @@ impl Db {
         let mut incr_by: T =
             bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
 
-        if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
-            let mut value = entry.get_mut();
-            if let Value::Hash(h) = value.deref_mut() {
-                let mut h = h.write();
-                if let Some(n) = h.get(sub_key) {
-                    incr_by = incr_by
-                        .checked_add(
-                            &bytes_to_number(n)
-                                .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
-                        )
-                        .ok_or(Error::Overflow)?;
-                }
-                let incr_by_bytes = Self::round_numbers(incr_by);
-                h.insert(sub_key.clone(), incr_by_bytes.clone());
+        if let Some(x) = slot
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.get_mut())
+            .map(|mut x| match x.deref_mut() {
+                Value::Hash(ref mut h) => {
+                    if let Some(n) = h.get(sub_key) {
+                        incr_by = incr_by
+                            .checked_add(
+                                &bytes_to_number(n)
+                                    .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
+                            )
+                            .ok_or(Error::Overflow)?;
+                    }
+                    let incr_by_bytes = Self::round_numbers(incr_by);
+                    h.insert(sub_key.clone(), incr_by_bytes.clone());
 
-                Self::number_to_value(&incr_by_bytes)
-            } else {
-                Err(Error::WrongType)
-            }
-        } else {
-            drop(slot);
-            #[allow(clippy::mutable_key_type)]
-            let mut h = HashMap::new();
-            let incr_by_bytes = Self::round_numbers(incr_by);
-            h.insert(sub_key.clone(), incr_by_bytes.clone());
-            let _ = self.slots[slot_id]
-                .write()
-                .insert(key.clone(), Entry::new(h.into(), None));
-            Self::number_to_value(&incr_by_bytes)
+                    Self::number_to_value(&incr_by_bytes)
+                }
+                _ => Err(Error::WrongType),
+            })
+        {
+            return x;
         }
+
+        drop(slot);
+        #[allow(clippy::mutable_key_type)]
+        let mut h = HashMap::new();
+        let incr_by_bytes = Self::round_numbers(incr_by);
+        h.insert(sub_key.clone(), incr_by_bytes.clone());
+        let _ = self.slots[slot_id]
+            .write()
+            .insert(key.clone(), Entry::new(h.into(), None));
+        Self::number_to_value(&incr_by_bytes)
     }
 
     /// Increments a key's value by a given number

+ 2 - 2
src/value/mod.rs

@@ -24,7 +24,7 @@ use std::{
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
     /// Hash. This type cannot be serialized
-    Hash(shared::Value<HashMap<Bytes, Bytes>>),
+    Hash(HashMap<Bytes, Bytes>),
     /// List. This type cannot be serialized
     List(shared::Value<VecDeque<checksum::Value>>),
     /// Set. This type cannot be serialized
@@ -272,7 +272,7 @@ impl From<&str> for Value {
 
 impl From<HashMap<Bytes, Bytes>> for Value {
     fn from(value: HashMap<Bytes, Bytes>) -> Value {
-        Value::Hash(shared::Value::new(value))
+        Value::Hash(value)
     }
 }