Browse Source

Don't use callback and Guard with a RwLock the Value inside Entry (#63)

* Do not use callbacks

Instead return a reference to the Guarded HashMap and a reference to the
Key to be read.

Also update Entry to not need a mutable instance ever. Use an internal
mutex for the expiration and atomic for the object ids.

Do not use time for the object ids, use an atomic counter instead

* Guard inner value inside an Entry with a RwLock

The main goal is to avoid locking the entire slot as Write() to update
 a single value. Slots should be locked as write to add/remove new keys,
 not for individual updates. That's suboptimal.

 A requirement for that is to guard the Value with a RwLock

* Remove shared wrapper for Value::Hash values

Non scalar values have a proper locking mechanism inside an Entry, that
is enough to drop the former `shared` wrapper.

* Remove shared from Value::List

* Remove value::shared

* Fix dead lock issues

* Fix bump_version calls
César D. Rodas 1 year ago
parent
commit
ca961975e3
10 changed files with 1128 additions and 1014 deletions
  1. 198 174
      src/cmd/hash.rs
  2. 366 319
      src/cmd/list.rs
  3. 239 211
      src/cmd/set.rs
  4. 20 12
      src/cmd/string.rs
  5. 2 2
      src/cmd/transaction.rs
  6. 3 3
      src/connection/mod.rs
  7. 64 45
      src/db/entry.rs
  8. 230 173
      src/db/mod.rs
  9. 0 68
      src/value/locked.rs
  10. 6 7
      src/value/mod.rs

+ 198 - 174
src/cmd/hash.rs

@@ -7,7 +7,10 @@ use crate::{
 };
 use bytes::Bytes;
 use rand::Rng;
-use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::{
+    collections::{BTreeMap, HashMap, VecDeque},
+    ops::Deref,
+};
 
 /// Removes the specified fields from the hash stored at key. Specified fields that do not exist
 /// within this hash are ignored. If key does not exist, it is treated as an empty hash and this
@@ -15,24 +18,26 @@ use std::collections::{BTreeMap, HashMap, VecDeque};
 pub async fn hdel(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let mut is_empty = false;
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-            let mut total: i64 = 0;
-
-            for key in args.into_iter() {
-                if h.remove(&key).is_some() {
-                    total += 1;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Hash(h) => {
+                let mut total: i64 = 0;
+
+                for key in args.into_iter() {
+                    if h.remove(&key).is_some() {
+                        total += 1;
+                    }
                 }
-            }
 
-            is_empty = h.len() == 0;
+                is_empty = h.is_empty();
 
-            Ok(total.into())
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+                Ok(total.into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))?;
 
     if is_empty {
         let _ = conn.db().del(&[key]);
@@ -45,47 +50,49 @@ pub async fn hdel(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 
 /// Returns if field is an existing field in the hash stored at key.
 pub async fn hexists(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(if h.read().get(&args[1]).is_some() {
-            1.into()
-        } else {
-            0.into()
-        }),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => Ok(if h.get(&args[1]).is_some() {
+                1.into()
+            } else {
+                0.into()
+            }),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))
 }
 
 /// Returns the value associated with field in the hash stored at key.
 pub async fn hget(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(h
-            .read()
-            .get(&args[1])
-            .map(|v| Value::new(v))
-            .unwrap_or_default()),
-        None => Ok(Value::Null),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => Ok(h.get(&args[1]).map(|v| Value::new(v)).unwrap_or_default()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Null))
 }
 
 /// Returns all fields and values of the hash stored at key. In the returned value, every field
 /// name is followed by its value, so the length of the reply is twice the size of the hash.
 pub async fn hgetall(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => {
+                let mut ret = vec![];
 
-            for (key, value) in h.read().iter() {
-                ret.push(Value::new(key));
-                ret.push(Value::new(value));
-            }
+                for (key, value) in h.iter() {
+                    ret.push(Value::new(key));
+                    ret.push(Value::new(value));
+                }
 
-            Ok(ret.into())
-        }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+                Ok(ret.into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Increment the specified field of a hash stored at key, and representing a number, by the
@@ -118,50 +125,56 @@ pub async fn hincrby_float(conn: &Connection, args: VecDeque<Bytes>) -> Result<V
 
 /// Returns all field names in the hash stored at key.
 pub async fn hkeys(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => {
+                let mut ret = vec![];
+
+                for key in h.keys() {
+                    ret.push(Value::new(key));
+                }
 
-            for key in h.read().keys() {
-                ret.push(Value::new(key));
+                Ok(ret.into())
             }
-
-            Ok(ret.into())
-        }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Returns the number of fields contained in the hash stored at key.
 pub async fn hlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(h.read().len().into()),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Hash(h) => Ok(h.len().into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))
 }
 
 /// Returns the values associated with the specified fields in the hash stored at key.
 pub async fn hmget(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let h = h.read();
-
-            Ok(args
+    let r = conn
+        .db()
+        .get(&key)
+        .map::<Result<Value, _>, _>(|v| match v {
+            Value::Hash(h) => Ok(args
                 .iter()
                 .map(|key| h.get(key).map(|v| Value::new(v)).unwrap_or_default())
                 .collect::<Vec<Value>>()
+                .into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
+            Ok(args
+                .iter()
+                .map(|_| Value::Null)
+                .collect::<Vec<Value>>()
                 .into())
-        }
-        None => Ok(args
-            .iter()
-            .map(|_| Value::Null)
-            .collect::<Vec<Value>>()
-            .into()),
-        _ => Err(Error::WrongType),
-    })
+        })?;
+    Ok(r)
 }
 
 /// Returns random keys (or values) from a hash
@@ -184,46 +197,48 @@ pub async fn hrandfield(conn: &Connection, args: VecDeque<Bytes>) -> Result<Valu
         _ => (1, true, 1),
     };
 
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
-            let mut i = 0;
-            let mut rand_sorted = BTreeMap::new();
-            let mut rng = rand::thread_rng();
-            let h = h.read();
-
-            for _ in 0..repeat {
-                for (key, value) in h.iter() {
-                    let rand = rng.gen::<u64>();
-                    rand_sorted.insert((rand, i), (key, value));
-                    i += 1;
+    conn.db()
+        .get(&args[0])
+        .inner()
+        .map(|v| match v.deref() {
+            Value::Hash(h) => {
+                let mut ret = vec![];
+                let mut i = 0;
+                let mut rand_sorted = BTreeMap::new();
+                let mut rng = rand::thread_rng();
+
+                for _ in 0..repeat {
+                    for (key, value) in h.iter() {
+                        let rand = rng.gen::<u64>();
+                        rand_sorted.insert((rand, i), (key, value));
+                        i += 1;
+                    }
                 }
-            }
 
-            i = 0;
-            for val in rand_sorted.values() {
-                if single {
-                    return Ok(Value::new(val.0));
-                }
+                i = 0;
+                for val in rand_sorted.values() {
+                    if single {
+                        return Ok(Value::new(val.0));
+                    }
 
-                if i == count {
-                    break;
-                }
+                    if i == count {
+                        break;
+                    }
 
-                ret.push(Value::new(val.0));
+                    ret.push(Value::new(val.0));
 
-                if with_values {
-                    ret.push(Value::new(val.1));
+                    if with_values {
+                        ret.push(Value::new(val.1));
+                    }
+
+                    i += 1;
                 }
 
-                i += 1;
+                Ok(ret.into())
             }
-
-            Ok(ret.into())
-        }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Sets field in the hash stored at key to value. If key does not exist, a new key holding a hash
@@ -233,20 +248,24 @@ pub async fn hmset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
     }
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-            loop {
-                if args.is_empty() {
-                    break;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Hash(h) => {
+                loop {
+                    if args.is_empty() {
+                        break;
+                    }
+                    let key = args.pop_front().ok_or(Error::Syntax)?;
+                    let value = args.pop_front().ok_or(Error::Syntax)?;
+                    h.insert(key, value);
                 }
-                let key = args.pop_front().ok_or(Error::Syntax)?;
-                let value = args.pop_front().ok_or(Error::Syntax)?;
-                h.insert(key, value);
+                Ok(Value::Ok)
             }
-            Ok(Value::Ok)
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut h = HashMap::new();
             loop {
@@ -260,9 +279,7 @@ pub async fn hmset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
             let _len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(Value::Ok)
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     conn.db().bump_version(&key);
 
@@ -276,23 +293,27 @@ pub async fn hset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
     }
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-            let mut e: i64 = 0;
-            loop {
-                if args.is_empty() {
-                    break;
-                }
-                let key = args.pop_front().ok_or(Error::Syntax)?;
-                let value = args.pop_front().ok_or(Error::Syntax)?;
-                if h.insert(key, value).is_none() {
-                    e += 1;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Hash(h) => {
+                let mut e: i64 = 0;
+                loop {
+                    if args.is_empty() {
+                        break;
+                    }
+                    let key = args.pop_front().ok_or(Error::Syntax)?;
+                    let value = args.pop_front().ok_or(Error::Syntax)?;
+                    if h.insert(key, value).is_none() {
+                        e += 1;
+                    }
                 }
+                Ok(e.into())
             }
-            Ok(e.into())
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut h = HashMap::new();
             loop {
@@ -306,9 +327,7 @@ pub async fn hset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
             let len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(len.into())
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     conn.db().bump_version(&key);
 
@@ -318,31 +337,33 @@ pub async fn hset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 /// Sets field in the hash stored at key to value, only if field does not yet exist. If key does
 /// not exist, a new key holding a hash is created. If field already exists, this operation has no
 /// effect.
+#[allow(warnings)]
 pub async fn hsetnx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
     let sub_key = args.pop_front().ok_or(Error::Syntax)?;
     let value = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut h = h.write();
-
-            if h.get(&sub_key).is_some() {
-                Ok(0.into())
-            } else {
-                h.insert(sub_key, value);
-                Ok(1.into())
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|mut v| match v {
+            Value::Hash(ref mut h) => {
+                if h.get(&sub_key).is_some() {
+                    Ok(0.into())
+                } else {
+                    h.insert(sub_key.clone(), value.clone());
+                    Ok(1.into())
+                }
             }
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut h = HashMap::new();
             h.insert(sub_key, value);
             let len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(len.into())
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     if result == Value::Integer(1) {
         conn.db().bump_version(&key);
@@ -354,33 +375,36 @@ pub async fn hsetnx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Valu
 /// Returns the string length of the value associated with field in the hash stored at key. If the
 /// key or the field do not exist, 0 is returned.
 pub async fn hstrlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => Ok(h
-            .read()
-            .get(&args[1])
-            .map(|v| v.len())
-            .unwrap_or_default()
-            .into()),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|x| match x {
+            Value::Hash(h) => match h.get(&args[1]) {
+                Some(v) => Ok(v.len().into()),
+                None => Ok(0.into()),
+            },
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| Ok(0.into()))
 }
 
 /// Returns all values in the hash stored at key.
 pub async fn hvals(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Hash(h)) => {
-            let mut ret = vec![];
+    if let Some(ref value) = conn.db().get(&args[0]).inner() {
+        match value.deref() {
+            Value::Hash(h) => {
+                let mut ret = vec![];
 
-            for value in h.read().values() {
-                ret.push(Value::new(value));
-            }
+                for value in h.values() {
+                    ret.push(Value::new(value));
+                }
 
-            Ok(ret.into())
+                Ok(ret.into())
+            }
+            _ => Err(Error::WrongType),
         }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+    } else {
+        Ok(Value::Array(vec![]))
+    }
 }
 
 #[cfg(test)]

+ 366 - 319
src/cmd/list.rs

@@ -28,41 +28,41 @@ fn remove_element(
 ) -> Result<Value, Error> {
     let db = conn.db();
     let mut new_len = 0;
-    let result = db.get_map(key, |v| match v {
-        Some(Value::List(x)) => {
-            let mut x = x.write();
-
-            let limit = if let Some(limit) = limit {
-                limit
-            } else {
-                // Return a single element
-                let ret = Ok((if front { x.pop_front() } else { x.pop_back() })
-                    .map_or(Value::Null, |x| x.clone_value()));
-                new_len = x.len();
-                return ret;
-            };
+    let result = db
+        .get(key)
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                let limit = if let Some(limit) = limit {
+                    limit
+                } else {
+                    // Return a single element
+                    let ret = Ok((if front { x.pop_front() } else { x.pop_back() })
+                        .map_or(Value::Null, |x| x.clone_value()));
+                    new_len = x.len();
+                    return ret;
+                };
 
-            let mut ret = vec![None; limit];
+                let mut ret = vec![None; limit];
 
-            for i in 0..limit {
-                if front {
-                    ret[i] = x.pop_front();
-                } else {
-                    ret[i] = x.pop_back();
+                for i in 0..limit {
+                    if front {
+                        ret[i] = x.pop_front();
+                    } else {
+                        ret[i] = x.pop_back();
+                    }
                 }
+                new_len = x.len();
+
+                Ok(ret
+                    .iter()
+                    .flatten()
+                    .map(|m| m.clone_value())
+                    .collect::<Vec<Value>>()
+                    .into())
             }
-            new_len = x.len();
-
-            Ok(ret
-                .iter()
-                .flatten()
-                .map(|m| m.clone_value())
-                .collect::<Vec<Value>>()
-                .into())
-        }
-        None => Ok(Value::Null),
-        _ => Err(Error::WrongType),
-    })?;
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Null))?;
 
     if new_len == 0 {
         let _ = db.del(&[key.clone()]);
@@ -310,22 +310,23 @@ pub async fn brpop(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
 /// designate elements starting at the tail of the list. Here, -1 means the last element, -2 means
 /// the penultimate and so forth.
 pub async fn lindex(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::List(x)) => {
-            let index: i64 = bytes_to_number(&args[1])?;
-            let x = x.read();
-
-            let index = if index < 0 {
-                x.len().checked_sub(-index as usize).unwrap_or(x.len())
-            } else {
-                index as usize
-            };
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::List(x) => {
+                let index: i64 = bytes_to_number(&args[1])?;
+
+                let index = if index < 0 {
+                    x.len().checked_sub(-index as usize).unwrap_or(x.len())
+                } else {
+                    index as usize
+                };
 
-            Ok(x.get(index).map_or(Value::Null, |x| x.clone_value()))
-        }
-        None => Ok(Value::Null),
-        _ => Err(Error::WrongType),
-    })
+                Ok(x.get(index).map_or(Value::Null, |x| x.clone_value()))
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Null))
 }
 
 /// Inserts element in the list stored at key either before or after the reference value pivot.
@@ -346,38 +347,40 @@ pub async fn linsert(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Val
         _ => return Err(Error::Syntax),
     };
 
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::List(x)) => {
-            let pivot = checksum::Ref::new(&pivot);
-            let mut x = x.write();
-            let mut found = false;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                let pivot = checksum::Ref::new(&pivot);
+                let mut found = false;
 
-            for (key, val) in x.iter().enumerate() {
-                if *val == pivot {
-                    let id = if is_before { key } else { key + 1 };
+                for (key, val) in x.iter().enumerate() {
+                    if *val == pivot {
+                        let id = if is_before { key } else { key + 1 };
 
-                    let value = checksum::Value::new(value);
+                        let value = checksum::Value::new(value);
 
-                    if id > x.len() {
-                        x.push_back(value);
-                    } else {
-                        x.insert(id, value);
-                    }
+                        if id > x.len() {
+                            x.push_back(value);
+                        } else {
+                            x.insert(id, value);
+                        }
 
-                    found = true;
-                    break;
+                        found = true;
+                        break;
+                    }
                 }
-            }
 
-            if found {
-                Ok(x.len().into())
-            } else {
-                Ok((-1).into())
+                if found {
+                    Ok(x.len().into())
+                } else {
+                    Ok((-1).into())
+                }
             }
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))?;
 
     conn.db().bump_version(&key);
 
@@ -387,11 +390,13 @@ pub async fn linsert(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Val
 /// Returns the length of the list stored at key. If key does not exist, it is interpreted as an
 /// empty list and 0 is returned. An error is returned when the value stored at key is not a list.
 pub async fn llen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::List(x)) => Ok(x.read().len().into()),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::List(x) => Ok(x.len().into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))
 }
 
 /// Atomically returns and removes the first/last element (head/tail depending on the wherefrom
@@ -424,51 +429,77 @@ pub async fn lmove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
     db.lock_keys(&to_lock);
 
     let mut to_create = None;
-
-    let result = db.get_map(&source, |v| match v {
-        Some(Value::List(source)) => conn.db().get_map(&destination, |v| match v {
-            Some(Value::List(target)) => {
-                let element = if source_is_left {
-                    source.write().pop_front()
-                } else {
-                    source.write().pop_back()
-                };
-
-                if let Some(element) = element {
-                    let ret = element.clone_value();
-                    if target_is_left {
-                        target.write().push_front(element);
+    let is_same_key = source == destination;
+
+    let result = db
+        .get(&source)
+        .map_mut(|v| match v {
+            Value::List(source) => {
+                if is_same_key {
+                    // take a different approach to avoid a deadlock
+                    let element = if source_is_left {
+                        source.pop_front()
                     } else {
-                        target.write().push_back(element);
-                    }
-                    Ok(ret)
-                } else {
-                    Ok(Value::Null)
+                        source.pop_back()
+                    };
+                    return if let Some(element) = element {
+                        let ret = element.clone_value();
+                        if target_is_left {
+                            source.push_front(element);
+                        } else {
+                            source.push_back(element);
+                        }
+                        Ok(ret)
+                    } else {
+                        Ok(Value::Null)
+                    };
                 }
-            }
-            None => {
-                let mut source = source.write();
-                let element = if source_is_left {
-                    source.pop_front()
-                } else {
-                    source.pop_back()
-                };
 
-                if let Some(element) = element {
-                    let ret = element.clone_value();
-                    let mut h = VecDeque::new();
-                    h.push_front(element);
-                    to_create = Some(h);
-                    Ok(ret)
-                } else {
-                    Ok(Value::Null)
-                }
+                conn.db()
+                    .get(&destination)
+                    .map_mut(|v| match v {
+                        Value::List(target) => {
+                            let element = if source_is_left {
+                                source.pop_front()
+                            } else {
+                                source.pop_back()
+                            };
+
+                            if let Some(element) = element {
+                                let ret = element.clone_value();
+                                if target_is_left {
+                                    target.push_front(element);
+                                } else {
+                                    target.push_back(element);
+                                }
+                                Ok(ret)
+                            } else {
+                                Ok(Value::Null)
+                            }
+                        }
+                        _ => Err(Error::WrongType),
+                    })
+                    .unwrap_or_else(|| {
+                        let element = if source_is_left {
+                            source.pop_front()
+                        } else {
+                            source.pop_back()
+                        };
+
+                        if let Some(element) = element {
+                            let ret = element.clone_value();
+                            let mut h = VecDeque::new();
+                            h.push_front(element);
+                            to_create = Some(h);
+                            Ok(ret)
+                        } else {
+                            Ok(Value::Null)
+                        }
+                    })
             }
             _ => Err(Error::WrongType),
-        }),
-        None => Ok(Value::Null),
-        _ => Err(Error::WrongType),
-    });
+        })
+        .unwrap_or(Ok(Value::Null));
 
     if let Some(to_create) = to_create {
         conn.db().set(destination.clone(), to_create.into(), None);
@@ -541,81 +572,82 @@ pub async fn lpos(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Err
 
     let max_len = max_len.unwrap_or_default();
 
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::List(x)) => {
-            let x = x.read();
-            let mut result: Vec<Value> = vec![];
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::List(x) => {
+                let mut result: Vec<Value> = vec![];
 
-            let mut values = x
-                .iter()
-                .enumerate()
-                .collect::<Vec<(usize, &checksum::Value)>>();
+                let mut values = x
+                    .iter()
+                    .enumerate()
+                    .collect::<Vec<(usize, &checksum::Value)>>();
 
-            if must_reverse {
-                values.reverse();
-            }
+                if must_reverse {
+                    values.reverse();
+                }
 
-            let mut checks = 1;
-
-            for (id, val) in values.iter() {
-                if **val == element {
-                    // Match!
-                    if let Some(count) = count {
-                        result.push((*id).into());
-                        if result.len() == count && count != 0 && rank.is_none() {
-                            // There is no point in keep looping. No RANK provided, COUNT is not 0
-                            // therefore we can return the vector of result as IS
-                            return Ok(result.into());
-                        }
-                    } else if let Some(rank) = rank {
-                        result.push((*id).into());
-                        if result.len() == rank {
+                let mut checks = 1;
+
+                for (id, val) in values.iter() {
+                    if **val == element {
+                        // Match!
+                        if let Some(count) = count {
+                            result.push((*id).into());
+                            if result.len() == count && count != 0 && rank.is_none() {
+                                // There is no point in keep looping. No RANK provided, COUNT is not 0
+                                // therefore we can return the vector of result as IS
+                                return Ok(result.into());
+                            }
+                        } else if let Some(rank) = rank {
+                            result.push((*id).into());
+                            if result.len() == rank {
+                                return Ok((*id).into());
+                            }
+                        } else {
+                            // return first match!
                             return Ok((*id).into());
                         }
-                    } else {
-                        // return first match!
-                        return Ok((*id).into());
                     }
+                    if checks == max_len {
+                        break;
+                    }
+                    checks += 1;
                 }
-                if checks == max_len {
-                    break;
-                }
-                checks += 1;
-            }
-
-            if let Some(rank) = rank {
-                let rank = rank - 1;
 
-                let result = if rank < result.len() {
-                    result[rank..].to_vec()
-                } else {
-                    vec![]
-                };
+                if let Some(rank) = rank {
+                    let rank = rank - 1;
 
-                return Ok(if let Some(count) = count {
-                    if count > 0 && count < result.len() {
-                        result[0..count].to_vec().into()
+                    let result = if rank < result.len() {
+                        result[rank..].to_vec()
                     } else {
-                        result.to_vec().into()
-                    }
-                } else {
-                    result.to_vec().first().cloned().unwrap_or_default()
-                });
-            }
+                        vec![]
+                    };
+
+                    return Ok(if let Some(count) = count {
+                        if count > 0 && count < result.len() {
+                            result[0..count].to_vec().into()
+                        } else {
+                            result.to_vec().into()
+                        }
+                    } else {
+                        result.to_vec().first().cloned().unwrap_or_default()
+                    });
+                }
 
-            if count.is_some() {
-                Ok(result.into())
-            } else {
-                Ok(Value::Null)
+                if count.is_some() {
+                    Ok(result.into())
+                } else {
+                    Ok(Value::Null)
+                }
             }
-        }
-        None => Ok(if count.is_some() {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(if count.is_some() {
             Value::Array(vec![])
         } else {
             Value::Null
-        }),
-        _ => Err(Error::WrongType),
-    })
+        }))
 }
 
 /// Insert all the specified values at the head of the list stored at key. If key does not exist,
@@ -629,15 +661,19 @@ pub async fn lpos(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Err
 /// as third element.
 pub async fn lpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::List(x)) => {
-            let mut x = x.write();
-            for val in args.into_iter() {
-                x.push_front(checksum::Value::new(val.clone()));
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                for val in args.clone().into_iter() {
+                    x.push_front(checksum::Value::new(val.clone()));
+                }
+                Ok(x.len().into())
             }
-            Ok(x.len().into())
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             let mut h = VecDeque::new();
 
             for val in args.into_iter() {
@@ -647,9 +683,7 @@ pub async fn lpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
             let len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(len.into())
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     conn.db().bump_version(&key);
     Ok(result)
@@ -658,17 +692,19 @@ pub async fn lpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
 /// LPUSHX key element
 pub async fn lpushx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::List(x)) => {
-            let mut x = x.write();
-            for val in args.into_iter() {
-                x.push_front(checksum::Value::new(val));
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                for val in args.into_iter() {
+                    x.push_front(checksum::Value::new(val));
+                }
+                Ok(x.len().into())
             }
-            Ok(x.len().into())
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))?;
 
     conn.db().bump_version(&key);
     Ok(result)
@@ -681,85 +717,88 @@ pub async fn lpushx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Valu
 /// These offsets can also be negative numbers indicating offsets starting at the end of the list.
 /// For example, -1 is the last element of the list, -2 the penultimate, and so on.
 pub async fn lrange(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::List(x)) => {
-            let start: i64 = bytes_to_number(&args[1])?;
-            let end: i64 = bytes_to_number(&args[2])?;
-            let mut ret = vec![];
-            let x = x.read();
-
-            let start = if start < 0 {
-                x.len().checked_sub(-start as usize).unwrap_or_default()
-            } else {
-                start as usize
-            };
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::List(x) => {
+                let start: i64 = bytes_to_number(&args[1])?;
+                let end: i64 = bytes_to_number(&args[2])?;
+                let mut ret = vec![];
+
+                let start = if start < 0 {
+                    x.len().checked_sub(-start as usize).unwrap_or_default()
+                } else {
+                    start as usize
+                };
 
-            let end = if end < 0 {
-                if let Some(x) = x.len().checked_sub(-end as usize) {
-                    x
+                let end = if end < 0 {
+                    if let Some(x) = x.len().checked_sub(-end as usize) {
+                        x
+                    } else {
+                        return Ok(Value::Array(vec![]));
+                    }
                 } else {
-                    return Ok(Value::Array(vec![]));
-                }
-            } else {
-                end as usize
-            };
+                    end as usize
+                };
 
-            for (i, val) in x.iter().enumerate().skip(start) {
-                if i > end {
-                    break;
+                for (i, val) in x.iter().enumerate().skip(start) {
+                    if i > end {
+                        break;
+                    }
+                    ret.push(val.clone_value());
                 }
-                ret.push(val.clone_value());
+                Ok(ret.into())
             }
-            Ok(ret.into())
-        }
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Removes the first count occurrences of elements equal to element from the list stored at key
 pub async fn lrem(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    let result = conn.db().get_map(&args[0], |v| match v {
-        Some(Value::List(x)) => {
-            let element = checksum::Ref::new(&args[2]);
-            let limit: i64 = bytes_to_number(&args[1])?;
-            let mut x = x.write();
-
-            let (is_reverse, limit) = if limit < 0 {
-                (true, -limit)
-            } else {
-                (false, limit)
-            };
+    let result = conn
+        .db()
+        .get(&args[0])
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                let element = checksum::Ref::new(&args[2]);
+                let limit: i64 = bytes_to_number(&args[1])?;
+
+                let (is_reverse, limit) = if limit < 0 {
+                    (true, -limit)
+                } else {
+                    (false, limit)
+                };
 
-            let mut keep = vec![true; x.len()];
-            let mut removed = 0;
-            let len = x.len();
+                let mut keep = vec![true; x.len()];
+                let mut removed = 0;
+                let len = x.len();
 
-            for i in 0..len {
-                let i = if is_reverse { len - 1 - i } else { i };
+                for i in 0..len {
+                    let i = if is_reverse { len - 1 - i } else { i };
 
-                if let Some(value) = x.get(i) {
-                    if *value == element {
-                        keep[i] = false;
-                        removed += 1;
-                        if removed == limit {
-                            break;
+                    if let Some(value) = x.get(i) {
+                        if *value == element {
+                            keep[i] = false;
+                            removed += 1;
+                            if removed == limit {
+                                break;
+                            }
                         }
                     }
                 }
-            }
 
-            let mut i = 0;
-            x.retain(|_| {
-                i += 1;
-                keep[i - 1]
-            });
+                let mut i = 0;
+                x.retain(|_| {
+                    i += 1;
+                    keep[i - 1]
+                });
 
-            Ok(removed.into())
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+                Ok(removed.into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))?;
 
     conn.db().bump_version(&args[0]);
 
@@ -775,25 +814,27 @@ pub async fn lset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
     let index = args.pop_front().ok_or(Error::Syntax)?;
     let value = args.pop_front().ok_or(Error::Syntax)?;
 
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::List(x)) => {
-            let mut index: i64 = bytes_to_number(&index)?;
-            let mut x = x.write();
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                let mut index: i64 = bytes_to_number(&index)?;
 
-            if index < 0 {
-                index += x.len() as i64;
-            }
+                if index < 0 {
+                    index += x.len() as i64;
+                }
 
-            if let Some(x) = x.get_mut(index as usize) {
-                *x = checksum::Value::new(value);
-                Ok(Value::Ok)
-            } else {
-                Err(Error::OutOfRange)
+                if let Some(x) = x.get_mut(index as usize) {
+                    *x = checksum::Value::new(value);
+                    Ok(Value::Ok)
+                } else {
+                    Err(Error::OutOfRange)
+                }
             }
-        }
-        None => Err(Error::NotFound),
-        _ => Err(Error::WrongType),
-    })?;
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Err(Error::NotFound))?;
 
     conn.db().bump_version(&key);
 
@@ -804,32 +845,34 @@ pub async fn lset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 /// Both start and stop are zero-based indexes, where 0 is the first element of the list (the
 /// head), 1 the next element and so on.
 pub async fn ltrim(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    let result = conn.db().get_map(&args[0], |v| match v {
-        Some(Value::List(x)) => {
-            let mut start: i64 = bytes_to_number(&args[1])?;
-            let mut end: i64 = bytes_to_number(&args[2])?;
-            let mut x = x.write();
-
-            if start < 0 {
-                start += x.len() as i64;
-            }
+    let result = conn
+        .db()
+        .get(&args[0])
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                let mut start: i64 = bytes_to_number(&args[1])?;
+                let mut end: i64 = bytes_to_number(&args[2])?;
+
+                if start < 0 {
+                    start += x.len() as i64;
+                }
 
-            if end < 0 {
-                end += x.len() as i64;
-            }
+                if end < 0 {
+                    end += x.len() as i64;
+                }
 
-            let mut i = 0;
-            x.retain(|_| {
-                let retain = i >= start && i <= end;
-                i += 1;
-                retain
-            });
+                let mut i = 0;
+                x.retain(|_| {
+                    let retain = i >= start && i <= end;
+                    i += 1;
+                    retain
+                });
 
-            Ok(Value::Ok)
-        }
-        None => Ok(Value::Ok),
-        _ => Err(Error::WrongType),
-    })?;
+                Ok(Value::Ok)
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Ok))?;
 
     conn.db().bump_version(&args[1]);
 
@@ -870,17 +913,19 @@ pub async fn rpoplpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<V
 /// is not a list, an error is returned.
 pub async fn rpushx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::List(x)) => {
-            let mut x = x.write();
-            for val in args.into_iter() {
-                x.push_back(checksum::Value::new(val));
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                for val in args.into_iter() {
+                    x.push_back(checksum::Value::new(val));
+                }
+                Ok(x.len().into())
             }
-            Ok(x.len().into())
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))?;
 
     conn.db().bump_version(&key);
     Ok(result)
@@ -891,15 +936,19 @@ pub async fn rpushx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Valu
 /// is not a list, an error is returned.
 pub async fn rpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::List(x)) => {
-            let mut x = x.write();
-            for val in args.into_iter() {
-                x.push_back(checksum::Value::new(val));
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::List(x) => {
+                for val in args.clone().into_iter() {
+                    x.push_back(checksum::Value::new(val));
+                }
+                Ok(x.len().into())
             }
-            Ok(x.len().into())
-        }
-        None => {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             let mut h = VecDeque::new();
 
             for val in args.into_iter() {
@@ -909,9 +958,7 @@ pub async fn rpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
             let len = h.len();
             conn.db().set(key.clone(), h.into(), None);
             Ok(len.into())
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     conn.db().bump_version(&key);
     Ok(result)

+ 239 - 211
src/cmd/set.rs

@@ -32,53 +32,63 @@ where
     F1: Fn(&mut HashSet<Bytes>, &HashSet<Bytes>) -> bool,
 {
     let top_key = keys.pop_front().ok_or(Error::Syntax)?;
-    conn.db().get_map(&top_key, |v| match v {
-        Some(Value::Set(x)) => {
-            #[allow(clippy::mutable_key_type)]
-            let mut all_entries = x.read().clone();
-            for key in keys.iter() {
-                let mut do_break = false;
-                let mut found = false;
-                let _ = conn.db().get_map(key, |v| match v {
-                    Some(Value::Set(x)) => {
-                        found = true;
-                        if !op(&mut all_entries, &x.read()) {
-                            do_break = true;
-                        }
-                        Ok(Value::Null)
+    conn.db()
+        .get(&top_key)
+        .map(|v| match v {
+            Value::Set(x) => {
+                #[allow(clippy::mutable_key_type)]
+                let mut all_entries = x.clone();
+                for key in keys.iter() {
+                    let mut do_break = false;
+                    let mut found = false;
+                    let _ = conn
+                        .db()
+                        .get(key)
+                        .map(|v| match v {
+                            Value::Set(x) => {
+                                found = true;
+                                if !op(&mut all_entries, x) {
+                                    do_break = true;
+                                }
+                                Ok(Value::Null)
+                            }
+                            _ => Err(Error::WrongType),
+                        })
+                        .unwrap_or(Ok(Value::Null))?;
+                    if !found && !op(&mut all_entries, &HashSet::new()) {
+                        break;
+                    }
+                    if do_break {
+                        break;
                     }
-                    None => Ok(Value::Null),
-                    _ => Err(Error::WrongType),
-                })?;
-                if !found && !op(&mut all_entries, &HashSet::new()) {
-                    break;
-                }
-                if do_break {
-                    break;
                 }
-            }
 
-            Ok(all_entries
-                .iter()
-                .map(|entry| Value::new(entry))
-                .collect::<Vec<Value>>()
-                .into())
-        }
-        None => {
+                Ok(all_entries
+                    .iter()
+                    .map(|entry| Value::new(entry))
+                    .collect::<Vec<Value>>()
+                    .into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut all_entries: HashSet<Bytes> = HashSet::new();
             for key in keys.iter() {
                 let mut do_break = false;
-                let _ = conn.db().get_map(key, |v| match v {
-                    Some(Value::Set(x)) => {
-                        if !op(&mut all_entries, &x.read()) {
-                            do_break = true;
+                let _ = conn
+                    .db()
+                    .get(key)
+                    .map(|v| match v {
+                        Value::Set(x) => {
+                            if !op(&mut all_entries, x) {
+                                do_break = true;
+                            }
+                            Ok(Value::Null)
                         }
-                        Ok(Value::Null)
-                    }
-                    None => Ok(Value::Null),
-                    _ => Err(Error::WrongType),
-                })?;
+                        _ => Err(Error::WrongType),
+                    })
+                    .unwrap_or(Ok(Value::Null))?;
                 if do_break {
                     break;
                 }
@@ -89,9 +99,7 @@ where
                 .map(|entry| Value::new(entry))
                 .collect::<Vec<Value>>()
                 .into())
-        }
-        _ => Err(Error::WrongType),
-    })
+        })
 }
 
 /// Add the specified members to the set stored at key. Specified members that are already a member
@@ -100,21 +108,24 @@ where
 pub async fn sadd(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
     let key_for_not_found = key.clone();
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Set(x)) => {
-            let mut x = x.write();
-
-            let mut len = 0;
-
-            for val in args.into_iter() {
-                if x.insert(val) {
-                    len += 1;
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Set(x) => {
+                let mut len = 0;
+
+                for val in args.clone().into_iter() {
+                    if x.insert(val) {
+                        len += 1;
+                    }
                 }
-            }
 
-            Ok(len.into())
-        }
-        None => {
+                Ok(len.into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
             #[allow(clippy::mutable_key_type)]
             let mut x = HashSet::new();
             let mut len = 0;
@@ -127,9 +138,7 @@ pub async fn sadd(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 
             conn.db().set(key_for_not_found, x.into(), None);
             Ok(len.into())
-        }
-        _ => Err(Error::WrongType),
-    })?;
+        })?;
 
     conn.db().bump_version(&key);
 
@@ -138,11 +147,13 @@ pub async fn sadd(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 
 /// Returns the set cardinality (number of elements) of the set stored at key.
 pub async fn scard(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Set(x)) => Ok(x.read().len().into()),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Set(x) => Ok(x.len().into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))
 }
 
 /// Returns the members of the set resulting from the difference between the first set and all the
@@ -234,33 +245,36 @@ pub async fn sinterstore(conn: &Connection, mut args: VecDeque<Bytes>) -> Result
 
 /// Returns if member is a member of the set stored at key.
 pub async fn sismember(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Set(x)) => {
-            if x.read().contains(&args[1]) {
-                Ok(1.into())
-            } else {
-                Ok(0.into())
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Set(x) => {
+                if x.contains(&args[1]) {
+                    Ok(1.into())
+                } else {
+                    Ok(0.into())
+                }
             }
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))
 }
 
 /// Returns all the members of the set value stored at key.
 ///
 /// This has the same effect as running SINTER with one argument key.
 pub async fn smembers(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Set(x)) => Ok(x
-            .read()
-            .iter()
-            .map(|x| Value::new(x))
-            .collect::<Vec<Value>>()
-            .into()),
-        None => Ok(Value::Array(vec![])),
-        _ => Err(Error::WrongType),
-    })
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Set(x) => Ok(x
+                .iter()
+                .map(|x| Value::new(x))
+                .collect::<Vec<Value>>()
+                .into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Array(vec![])))
 }
 
 /// Returns whether each member is a member of the set stored at key.
@@ -269,18 +283,17 @@ pub async fn smembers(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value,
 /// a member of the set or if key does not exist.
 pub async fn smismember(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    conn.db().get_map(&key, |v| match v {
-        Some(Value::Set(x)) => {
-            let x = x.read();
-            Ok(args
+    conn.db()
+        .get(&key)
+        .map(|v| match v {
+            Value::Set(x) => Ok(args
                 .iter()
                 .map(|member| if x.contains(member) { 1 } else { 0 })
                 .collect::<Vec<i32>>()
-                .into())
-        }
-        None => Ok(args.iter().map(|_| 0.into()).collect::<Vec<Value>>().into()),
-        _ => Err(Error::WrongType),
-    })
+                .into()),
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| Ok(args.iter().map(|_| 0.into()).collect::<Vec<Value>>().into()))
 }
 
 /// Move member from the set at source to the set at destination. This operation is atomic. In
@@ -299,41 +312,51 @@ pub async fn smove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
     let source = args.pop_front().ok_or(Error::Syntax)?;
     let destination = args.pop_front().ok_or(Error::Syntax)?;
     let member = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&source, |v| match v {
-        Some(Value::Set(set1)) => conn.db().get_map(&destination, |v| match v {
-            Some(Value::Set(set2)) => {
-                let mut set1 = set1.write();
-                if !set1.contains(&member) {
-                    return Ok(0.into());
-                }
-
+    let mut to_insert = None;
+    let result = conn
+        .db()
+        .get(&source)
+        .map_mut(|v| match v {
+            Value::Set(set1) => {
                 if source == destination {
-                    return Ok(1.into());
+                    return Ok(if set1.contains(&member) { 1 } else { 0 }.into());
                 }
 
-                let mut set2 = set2.write();
-                set1.remove(&member);
-                if set2.insert(member.clone()) {
-                    Ok(1.into())
-                } else {
-                    Ok(0.into())
-                }
-            }
-            None => {
-                set1.write().remove(&member);
-                #[allow(clippy::mutable_key_type)]
-                let mut x = HashSet::new();
-                x.insert(member.clone());
-                conn.db().set(destination.clone(), x.into(), None);
-                Ok(1.into())
+                conn.db()
+                    .get(&destination)
+                    .map_mut(|v| match v {
+                        Value::Set(set2) => {
+                            if !set1.contains(&member) {
+                                return Ok(0.into());
+                            }
+
+                            set1.remove(&member);
+                            if set2.insert(member.clone()) {
+                                Ok(1.into())
+                            } else {
+                                conn.db().bump_version(&source);
+                                Ok(0.into())
+                            }
+                        }
+                        _ => Err(Error::WrongType),
+                    })
+                    .unwrap_or_else(|| {
+                        set1.remove(&member);
+                        let mut x = HashSet::new();
+                        x.insert(member.clone());
+                        to_insert = Some(x);
+                        Ok(1.into())
+                    })
             }
             _ => Err(Error::WrongType),
-        }),
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+        })
+        .unwrap_or(Ok(0.into()))?;
 
-    if result == Value::Integer(1) {
+    if let Some(x) = to_insert {
+        conn.db().set(destination.clone(), x.into(), None);
+    }
+
+    if let Value::Integer(1) = result {
         conn.db().bump_version(&source);
         conn.db().bump_version(&destination);
     }
@@ -353,29 +376,31 @@ pub async fn spop(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
     let rand = srandmember(conn, args.clone()).await?;
     let key = args.pop_front().ok_or(Error::Syntax)?;
     let mut should_remove = false;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Set(x)) => {
-            let mut x = x.write();
-            match &rand {
-                Value::Blob(value) => {
-                    x.remove(value.as_ref());
-                }
-                Value::Array(values) => {
-                    for value in values.iter() {
-                        if let Value::Blob(value) = value {
-                            x.remove(value.as_ref());
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Set(x) => {
+                match &rand {
+                    Value::Blob(value) => {
+                        x.remove(value.as_ref());
+                    }
+                    Value::Array(values) => {
+                        for value in values.iter() {
+                            if let Value::Blob(value) = value {
+                                x.remove(value.as_ref());
+                            }
                         }
                     }
-                }
-                _ => unreachable!(),
-            };
+                    _ => unreachable!(),
+                };
 
-            should_remove = x.is_empty();
-            Ok(rand)
-        }
-        None => Ok(Value::Null),
-        _ => Err(Error::WrongType),
-    })?;
+                should_remove = x.is_empty();
+                Ok(rand)
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(Value::Null))?;
 
     if should_remove {
         let _ = conn.db().del(&[key]);
@@ -396,67 +421,68 @@ pub async fn spop(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
 /// same element multiple times. In this case, the number of returned elements is the absolute
 /// value of the specified count.
 pub async fn srandmember(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    conn.db().get_map(&args[0], |v| match v {
-        Some(Value::Set(x)) => {
-            let mut rng = rand::thread_rng();
-            let set = x.read();
-
-            let mut items = set
-                .iter()
-                .map(|x| (x, rng.gen()))
-                .collect::<Vec<(&Bytes, i128)>>();
-
-            items.sort_by(|a, b| a.1.cmp(&b.1));
-
-            if args.len() == 1 {
-                // Two arguments provided, return the first element or null if the array is null
-                if items.is_empty() {
-                    Ok(Value::Null)
-                } else {
-                    let item = items[0].0.clone();
-                    Ok(Value::new(&item))
-                }
-            } else {
-                if items.is_empty() {
-                    return Ok(Value::Array(vec![]));
-                }
-                let len = bytes_to_number::<i64>(&args[1])?;
-
-                if len > 0 {
-                    // required length is positive, return *up* to the requested number and no duplicated allowed
-                    let len: usize = min(items.len(), len as usize);
-                    Ok(items[0..len]
-                        .iter()
-                        .map(|item| Value::new(item.0))
-                        .collect::<Vec<Value>>()
-                        .into())
+    conn.db()
+        .get(&args[0])
+        .map(|v| match v {
+            Value::Set(set) => {
+                let mut rng = rand::thread_rng();
+
+                let mut items = set
+                    .iter()
+                    .map(|x| (x, rng.gen()))
+                    .collect::<Vec<(&Bytes, i128)>>();
+
+                items.sort_by(|a, b| a.1.cmp(&b.1));
+
+                if args.len() == 1 {
+                    // Two arguments provided, return the first element or null if the array is null
+                    if items.is_empty() {
+                        Ok(Value::Null)
+                    } else {
+                        let item = items[0].0.clone();
+                        Ok(Value::new(&item))
+                    }
                 } else {
-                    // duplicated results are allowed and the requested number must be returned
-                    let len = -len as usize;
-                    let total = items.len() - 1;
-                    let mut i = 0;
-                    let items = (0..len)
-                        .map(|_| {
-                            let r = (items[i].0, rng.gen());
-                            i = if i >= total { 0 } else { i + 1 };
-                            r
-                        })
-                        .collect::<Vec<(&Bytes, i128)>>();
-                    Ok(items
-                        .iter()
-                        .map(|item| Value::new(item.0))
-                        .collect::<Vec<Value>>()
-                        .into())
+                    if items.is_empty() {
+                        return Ok(Value::Array(vec![]));
+                    }
+                    let len = bytes_to_number::<i64>(&args[1])?;
+
+                    if len > 0 {
+                        // required length is positive, return *up* to the requested number and no duplicated allowed
+                        let len: usize = min(items.len(), len as usize);
+                        Ok(items[0..len]
+                            .iter()
+                            .map(|item| Value::new(item.0))
+                            .collect::<Vec<Value>>()
+                            .into())
+                    } else {
+                        // duplicated results are allowed and the requested number must be returned
+                        let len = -len as usize;
+                        let total = items.len() - 1;
+                        let mut i = 0;
+                        let items = (0..len)
+                            .map(|_| {
+                                let r = (items[i].0, rng.gen());
+                                i = if i >= total { 0 } else { i + 1 };
+                                r
+                            })
+                            .collect::<Vec<(&Bytes, i128)>>();
+                        Ok(items
+                            .iter()
+                            .map(|item| Value::new(item.0))
+                            .collect::<Vec<Value>>()
+                            .into())
+                    }
                 }
             }
-        }
-        None => Ok(if args.len() == 1 {
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(if args.len() == 1 {
             Value::Null
         } else {
             Value::Array(vec![])
-        }),
-        _ => Err(Error::WrongType),
-    })
+        }))
 }
 
 /// Remove the specified members from the set stored at key. Specified members that are not a
@@ -464,22 +490,24 @@ pub async fn srandmember(conn: &Connection, args: VecDeque<Bytes>) -> Result<Val
 /// command returns 0.
 pub async fn srem(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
     let key = args.pop_front().ok_or(Error::Syntax)?;
-    let result = conn.db().get_map(&key, |v| match v {
-        Some(Value::Set(x)) => {
-            let mut set = x.write();
-            let mut i = 0;
-
-            args.into_iter().for_each(|value| {
-                if set.remove(&value) {
-                    i += 1;
-                }
-            });
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::Set(set) => {
+                let mut i = 0;
+
+                args.into_iter().for_each(|value| {
+                    if set.remove(&value) {
+                        i += 1;
+                    }
+                });
 
-            Ok(i.into())
-        }
-        None => Ok(0.into()),
-        _ => Err(Error::WrongType),
-    })?;
+                Ok(i.into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or(Ok(0.into()))?;
 
     conn.db().bump_version(&key);
 

+ 20 - 12
src/cmd/string.rs

@@ -12,7 +12,7 @@ use std::{
     cmp::min,
     collections::VecDeque,
     convert::TryInto,
-    ops::{Bound, Neg},
+    ops::{Bound, Deref, Neg},
 };
 
 /// If key already exists and is a string, this command appends the value at the
@@ -77,7 +77,7 @@ pub async fn decr_by(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value,
 /// Get the value of key. If the key does not exist the special value nil is returned. An error is
 /// returned if the value stored at key is not a string, because GET only handles string values.
 pub async fn get(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    Ok(conn.db().get(&args[0]))
+    Ok(conn.db().get(&args[0]).into_inner())
 }
 
 /// Get the value of key and optionally set its expiration. GETEX is similar to
@@ -124,11 +124,15 @@ pub async fn getex(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Er
 /// Get the value of key. If the key does not exist the special value nil is returned. An error is
 /// returned if the value stored at key is not a string, because GET only handles string values.
 pub async fn getrange(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    let bytes = match conn.db().get(&args[0]) {
-        Value::Blob(binary) => binary,
-        Value::BlobRw(binary) => binary.freeze(),
-        Value::Null => return Ok("".into()),
-        _ => return Err(Error::WrongType),
+    let bytes = if let Some(value) = conn.db().get(&args[0]).inner() {
+        match value.deref() {
+            Value::Blob(binary) => binary.clone(),
+            Value::BlobRw(binary) => binary.clone().freeze(),
+            Value::Null => return Ok("".into()),
+            _ => return Err(Error::WrongType),
+        }
+    } else {
+        return Ok("".into());
     };
 
     let start = bytes_to_number::<i64>(&args[1])?;
@@ -350,11 +354,15 @@ pub async fn setnx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
 /// Returns the length of the string value stored at key. An error is returned when key holds a
 /// non-string value.
 pub async fn strlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
-    match conn.db().get(&args[0]) {
-        Value::Blob(x) => Ok(x.len().into()),
-        Value::String(x) => Ok(x.len().into()),
-        Value::Null => Ok(0.into()),
-        _ => Ok(Error::WrongType.into()),
+    if let Some(value) = conn.db().get(&args[0]).inner() {
+        match value.deref() {
+            Value::Blob(x) => Ok(x.len().into()),
+            Value::String(x) => Ok(x.len().into()),
+            Value::Null => Ok(0.into()),
+            _ => Ok(Error::WrongType.into()),
+        }
+    } else {
+        Ok(0.into())
     }
 }
 

+ 2 - 2
src/cmd/transaction.rs

@@ -74,10 +74,10 @@ pub async fn watch(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Er
     conn.watch_key(
         args.into_iter()
             .map(|key| {
-                let v = conn.db().get_version(&key);
+                let v = conn.db().get(&key).version();
                 (key, v)
             })
-            .collect::<Vec<(Bytes, u128)>>(),
+            .collect::<Vec<(Bytes, usize)>>(),
     );
     Ok(Value::Ok)
 }

+ 3 - 3
src/connection/mod.rs

@@ -46,7 +46,7 @@ pub struct ConnectionInfo {
     current_db: usize,
     db: Arc<Db>,
     name: Option<String>,
-    watch_keys: Vec<(Bytes, u128)>,
+    watch_keys: Vec<(Bytes, usize)>,
     tx_keys: HashSet<Bytes>,
     status: ConnectionStatus,
     commands: Option<Vec<VecDeque<Bytes>>>,
@@ -267,7 +267,7 @@ impl Connection {
 
     /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
     /// some value changed since the moment the command was queued until the execution time.
-    pub fn watch_key(&self, keys: Vec<(Bytes, u128)>) {
+    pub fn watch_key(&self, keys: Vec<(Bytes, usize)>) {
         let watch_keys = &mut self.info.write().watch_keys;
         keys.into_iter()
             .map(|value| {
@@ -281,7 +281,7 @@ impl Connection {
         let watch_keys = &self.info.read().watch_keys;
 
         for key in watch_keys.iter() {
-            if self.info.read().db.get_version(&key.0) != key.1 {
+            if self.info.read().db.get(&key.0).version() != key.1 {
                 return true;
             }
         }

+ 64 - 45
src/db/entry.rs

@@ -1,19 +1,21 @@
 use crate::{error::Error, value::Value};
-use std::time::SystemTime;
+use bytes::BytesMut;
+use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
+use std::sync::atomic::{AtomicUsize, Ordering};
 use tokio::time::Instant;
 
 #[derive(Debug)]
 pub struct Entry {
-    pub value: Value,
-    pub version: u128,
-    expires_at: Option<Instant>,
+    value: RwLock<Value>,
+    version: AtomicUsize,
+    expires_at: Mutex<Option<Instant>>,
 }
 
-pub fn new_version() -> u128 {
-    SystemTime::now()
-        .duration_since(SystemTime::UNIX_EPOCH)
-        .expect("get millis error")
-        .as_nanos()
+static LAST_VERSION: AtomicUsize = AtomicUsize::new(0);
+
+/// Returns a new version
+pub fn unique_id() -> usize {
+    LAST_VERSION.fetch_add(1, Ordering::Relaxed)
 }
 
 /// Database Entry
@@ -26,56 +28,75 @@ pub fn new_version() -> u128 {
 impl Entry {
     pub fn new(value: Value, expires_at: Option<Instant>) -> Self {
         Self {
-            value,
-            expires_at,
-            version: new_version(),
+            value: RwLock::new(value),
+            expires_at: Mutex::new(expires_at),
+            version: AtomicUsize::new(LAST_VERSION.fetch_add(1, Ordering::Relaxed)),
         }
     }
 
-    pub fn bump_version(&mut self) {
-        self.version = new_version();
+    #[inline(always)]
+    pub fn take_value(self) -> Value {
+        self.value.into_inner()
+    }
+
+    #[inline(always)]
+    pub fn digest(&self) -> Vec<u8> {
+        self.value.read().digest()
     }
 
-    pub fn persist(&mut self) {
-        self.expires_at = None;
+    #[inline(always)]
+    pub fn bump_version(&self) {
+        self.version.store(
+            LAST_VERSION.fetch_add(1, Ordering::Relaxed),
+            Ordering::Relaxed,
+        )
+    }
+
+    pub fn persist(&self) {
+        *self.expires_at.lock() = None;
     }
 
     pub fn clone(&self) -> Self {
-        Self::new(self.value.clone(), self.expires_at)
+        Self::new(self.value.read().clone(), *self.expires_at.lock())
     }
 
     pub fn get_ttl(&self) -> Option<Instant> {
-        self.expires_at
+        *self.expires_at.lock()
     }
 
     pub fn has_ttl(&self) -> bool {
-        self.expires_at.is_some()
+        self.expires_at.lock().is_some()
     }
 
-    pub fn set_ttl(&mut self, expires_at: Instant) {
-        self.expires_at = Some(expires_at);
-        self.version = new_version();
+    pub fn set_ttl(&self, expires_at: Instant) {
+        *self.expires_at.lock() = Some(expires_at);
+        self.bump_version()
     }
 
-    pub fn version(&self) -> u128 {
-        self.version
+    pub fn version(&self) -> usize {
+        self.version.load(Ordering::Relaxed)
     }
 
-    /// Changes the value that is wrapped in this entry, the TTL (expired_at) is
-    /// not affected.
-    pub fn change_value(&mut self, value: Value) {
-        self.value = value;
-        self.version = new_version();
+    pub fn get(&self) -> RwLockReadGuard<'_, Value> {
+        self.value.read()
     }
 
-    #[allow(dead_code)]
-    pub fn get_mut(&mut self) -> &mut Value {
-        self.version = new_version();
-        &mut self.value
+    pub fn get_mut(&self) -> RwLockWriteGuard<'_, Value> {
+        self.value.write()
     }
 
-    pub fn get(&self) -> &Value {
-        &self.value
+    pub fn ensure_blob_is_mutable(&self) -> Result<(), Error> {
+        self.bump_version();
+        let mut val = self.get_mut();
+        match *val {
+            Value::Blob(ref mut data) => {
+                let rw_data = BytesMut::from(&data[..]);
+                *val = Value::BlobRw(rw_data);
+                Ok(())
+            }
+            Value::BlobRw(_) => Ok(()),
+            _ => Err(Error::WrongType),
+        }
     }
 
     /// If the Entry should be taken as valid, if this function returns FALSE
@@ -83,15 +104,13 @@ impl Entry {
     /// behaviour we can schedule the purge thread to run every few seconds or
     /// even minutes instead of once every second.
     pub fn is_valid(&self) -> bool {
-        self.expires_at.map_or(true, |x| x > Instant::now())
+        self.expires_at.lock().map_or(true, |x| x > Instant::now())
     }
 
-    /// Whether or not the value is clonable. Special types like hashes should
-    /// not be clonable because those types cannot be returned to the user with
-    /// the `get` command.
-    pub fn is_clonable(&self) -> bool {
+    /// Whether or not the value is scalar
+    pub fn is_scalar(&self) -> bool {
         matches!(
-            &self.value,
+            *self.value.read(),
             Value::Boolean(_)
                 | Value::Blob(_)
                 | Value::BlobRw(_)
@@ -107,8 +126,8 @@ impl Entry {
     /// Clone a value. If the value is not clonable an error is Value::Error is
     /// returned instead
     pub fn clone_value(&self) -> Value {
-        if self.is_clonable() {
-            self.value.clone()
+        if self.is_scalar() {
+            self.value.read().clone()
         } else {
             Error::WrongType.into()
         }
@@ -140,7 +159,7 @@ mod test {
 
     #[test]
     fn persist() {
-        let mut e = Entry::new(Value::Null, Some(Instant::now()));
+        let e = Entry::new(Value::Null, Some(Instant::now()));
         assert!(!e.is_valid());
         e.persist();
         assert!(e.is_valid());
@@ -148,7 +167,7 @@ mod test {
 
     #[test]
     fn update_ttl() {
-        let mut e = Entry::new(Value::Null, Some(Instant::now()));
+        let e = Entry::new(Value::Null, Some(Instant::now()));
         assert!(!e.is_valid());
         e.persist();
         assert!(e.is_valid());

+ 230 - 173
src/db/mod.rs

@@ -8,19 +8,18 @@ use crate::{
     value::{bytes_to_number, cursor::Cursor, typ::Typ, VDebug, Value},
 };
 use bytes::{BufMut, Bytes, BytesMut};
-
-use entry::{new_version, Entry};
+use entry::{unique_id, Entry};
 use expiration::ExpirationDb;
-
 use glob::Pattern;
 use log::trace;
 use num_traits::CheckedAdd;
-use parking_lot::{Mutex, RwLock};
+use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
 use rand::{prelude::SliceRandom, Rng};
 use seahash::hash;
 use std::{
     collections::{HashMap, VecDeque},
     convert::{TryFrom, TryInto},
+    ops::{Deref, DerefMut},
     str::FromStr,
     sync::Arc,
     thread,
@@ -36,6 +35,80 @@ pub mod pool;
 pub mod scan;
 pub(crate) mod utils;
 
+/// Read only reference
+pub struct RefValue<'a> {
+    key: &'a Bytes,
+    slot: RwLockReadGuard<'a, HashMap<Bytes, Entry>>,
+}
+
+impl<'a> RefValue<'a> {
+    /// test
+    #[inline(always)]
+    pub fn into_inner(self) -> Value {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| {
+                if x.is_scalar() {
+                    x.get().clone()
+                } else {
+                    Error::WrongType.into()
+                }
+            })
+            .unwrap_or_default()
+    }
+
+    /// test
+    pub fn inner(&self) -> Option<RwLockReadGuard<'_, Value>> {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.get())
+    }
+
+    /// test
+    pub fn inner_mut(&self) -> Option<RwLockWriteGuard<'_, Value>> {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.get_mut())
+    }
+
+    /// map_mut
+    #[inline(always)]
+    pub fn map<T, F>(self, f: F) -> Option<T>
+    where
+        F: FnOnce(&Value) -> T,
+    {
+        self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
+            let value = x.get();
+            f(value.deref())
+        })
+    }
+
+    /// map_mut
+    #[inline(always)]
+    pub fn map_mut<T, F>(self, f: F) -> Option<T>
+    where
+        F: FnOnce(&mut Value) -> T,
+    {
+        self.slot.get(self.key).filter(|x| x.is_valid()).map(|x| {
+            let mut value = x.get_mut();
+            f(value.deref_mut())
+        })
+    }
+
+    /// Returns the version of a given key
+    #[inline(always)]
+    pub fn version(&self) -> usize {
+        self.slot
+            .get(self.key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.version())
+            .unwrap_or_default()
+    }
+}
+
 /// Database structure
 ///
 /// Each connection has their own clone of the database and the conn_id is stored in each instance.
@@ -77,7 +150,7 @@ pub struct Db {
 
     /// Databases unique ID. This is an internal identifier to avoid deadlocks
     /// when copying and moving data between databases.
-    pub db_id: u128,
+    pub db_id: usize,
 
     /// Current connection  ID
     ///
@@ -106,7 +179,7 @@ impl Db {
             expirations: Arc::new(Mutex::new(ExpirationDb::new())),
             change_subscriptions: Arc::new(RwLock::new(HashMap::new())),
             conn_id: 0,
-            db_id: new_version(),
+            db_id: unique_id(),
             tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
             number_of_slots,
         }
@@ -211,12 +284,10 @@ impl Db {
     /// Return debug info for a key
     pub fn debug(&self, key: &Bytes) -> Result<VDebug, Error> {
         let slot = self.slots[self.get_slot(key)].read();
-        Ok(slot
-            .get(key)
+        slot.get(key)
             .filter(|x| x.is_valid())
-            .ok_or(Error::NotFound)?
-            .value
-            .debug())
+            .map(|x| x.get().debug())
+            .ok_or(Error::NotFound)
     }
 
     /// Return the digest for each key. This used for testing only
@@ -228,7 +299,7 @@ impl Db {
                 Value::new(
                     slot.get(key)
                         .filter(|v| v.is_valid())
-                        .map(|v| hex::encode(v.value.digest()))
+                        .map(|v| hex::encode(v.digest()))
                         .unwrap_or("00000".into())
                         .as_bytes(),
                 )
@@ -315,35 +386,45 @@ impl Db {
             + Into<Value>
             + Copy,
     {
-        let mut slot = self.slots[self.get_slot(key)].write();
+        let slot_id = self.get_slot(key);
+        let slot = self.slots[slot_id].read();
         let mut incr_by: T =
             bytes_to_number(incr_by).map_err(|_| Error::NotANumberType(typ.to_owned()))?;
-        match slot.get_mut(key).filter(|x| x.is_valid()).map(|x| x.get()) {
-            Some(Value::Hash(h)) => {
-                let mut h = h.write();
-                if let Some(n) = h.get(sub_key) {
-                    incr_by = incr_by
-                        .checked_add(
-                            &bytes_to_number(n)
-                                .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
-                        )
-                        .ok_or(Error::Overflow)?;
-                }
-                let incr_by_bytes = Self::round_numbers(incr_by);
-                h.insert(sub_key.clone(), incr_by_bytes.clone());
 
-                Self::number_to_value(&incr_by_bytes)
-            }
-            None => {
-                #[allow(clippy::mutable_key_type)]
-                let mut h = HashMap::new();
-                let incr_by_bytes = Self::round_numbers(incr_by);
-                h.insert(sub_key.clone(), incr_by_bytes.clone());
-                let _ = slot.insert(key.clone(), Entry::new(h.into(), None));
-                Self::number_to_value(&incr_by_bytes)
-            }
-            _ => Err(Error::WrongType),
+        if let Some(x) = slot
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.get_mut())
+            .map(|mut x| match x.deref_mut() {
+                Value::Hash(ref mut h) => {
+                    if let Some(n) = h.get(sub_key) {
+                        incr_by = incr_by
+                            .checked_add(
+                                &bytes_to_number(n)
+                                    .map_err(|_| Error::NotANumberType(typ.to_owned()))?,
+                            )
+                            .ok_or(Error::Overflow)?;
+                    }
+                    let incr_by_bytes = Self::round_numbers(incr_by);
+                    h.insert(sub_key.clone(), incr_by_bytes.clone());
+
+                    Self::number_to_value(&incr_by_bytes)
+                }
+                _ => Err(Error::WrongType),
+            })
+        {
+            return x;
         }
+
+        drop(slot);
+        #[allow(clippy::mutable_key_type)]
+        let mut h = HashMap::new();
+        let incr_by_bytes = Self::round_numbers(incr_by);
+        h.insert(sub_key.clone(), incr_by_bytes.clone());
+        let _ = self.slots[slot_id]
+            .write()
+            .insert(key.clone(), Entry::new(h.into(), None));
+        Self::number_to_value(&incr_by_bytes)
     }
 
     /// Increments a key's value by a given number
@@ -354,35 +435,34 @@ impl Db {
     where
         T: ToString + CheckedAdd + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
     {
-        let mut slot = self.slots[self.get_slot(key)].write();
-        match slot.get_mut(key).filter(|x| x.is_valid()) {
-            Some(x) => {
-                if !x.is_clonable() {
-                    return Err(Error::WrongType);
-                }
-                let value = x.get();
-                let mut number: T = value.try_into()?;
-
-                number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
-
-                x.change_value(Value::Blob(Self::round_numbers(number)));
+        let slot_id = self.get_slot(key);
+        let slot = self.slots[slot_id].read();
 
-                Ok(number)
-            }
-            None => {
-                slot.insert(
-                    key.clone(),
-                    Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
-                );
-                Ok(incr_by)
+        if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
+            if !entry.is_scalar() {
+                return Err(Error::WrongType);
             }
+            let mut value = entry.get_mut();
+            let mut number: T = (&*value).try_into()?;
+
+            number = incr_by.checked_add(&number).ok_or(Error::Overflow)?;
+            *value = Value::Blob(Self::round_numbers(number));
+            entry.bump_version();
+            Ok(number)
+        } else {
+            drop(slot);
+            self.slots[slot_id].write().insert(
+                key.clone(),
+                Entry::new(Value::Blob(Self::round_numbers(incr_by)), None),
+            );
+            Ok(incr_by)
         }
     }
 
     /// Removes any expiration associated with a given key
     pub fn persist(&self, key: &Bytes) -> Value {
-        let mut slot = self.slots[self.get_slot(key)].write();
-        slot.get_mut(key)
+        let slot = self.slots[self.get_slot(key)].read();
+        slot.get(key)
             .filter(|x| x.is_valid())
             .map_or(0.into(), |x| {
                 if x.has_ttl() {
@@ -410,13 +490,13 @@ impl Db {
             return Err(Error::OptsNotCompatible("GT and LT".to_owned()));
         }
 
-        let mut slot = self.slots[self.get_slot(key)].write();
+        let slot = self.slots[self.get_slot(key)].read();
         let expires_at = Instant::now()
             .checked_add(expires_in)
             .unwrap_or_else(far_future);
 
         Ok(slot
-            .get_mut(key)
+            .get(key)
             .filter(|x| x.is_valid())
             .map_or(0.into(), |x| {
                 let current_expire = x.get_ttl();
@@ -457,22 +537,20 @@ impl Db {
     /// command will make sure it holds a string large enough to be able to set
     /// value at offset.
     pub fn set_range(&self, key: &Bytes, offset: i128, data: &[u8]) -> Result<Value, Error> {
-        let mut slot = self.slots[self.get_slot(key)].write();
-
-        if let Some(entry) = slot.get_mut(key).filter(|x| x.is_valid()) {
-            if let Value::Blob(data) = entry.get() {
-                let rw_data = BytesMut::from(&data[..]);
-                entry.change_value(Value::BlobRw(rw_data));
-            }
-        }
+        let slot_id = self.get_slot(key);
+        let slot = self.slots[slot_id].read();
 
-        let value = slot.get_mut(key).map(|value| {
-            if !value.is_valid() {
-                self.expirations.lock().remove(key);
-                value.persist();
-            }
-            value.get_mut()
-        });
+        let mut value = slot
+            .get(key)
+            .map(|value| {
+                value.ensure_blob_is_mutable()?;
+                if !value.is_valid() {
+                    self.expirations.lock().remove(key);
+                    value.persist();
+                }
+                Ok::<_, Error>(value.get_mut())
+            })
+            .transpose()?;
 
         if offset < 0 {
             return Err(Error::OutOfRange);
@@ -483,27 +561,32 @@ impl Db {
         }
 
         let length = offset as usize + data.len();
-        match value {
-            Some(Value::BlobRw(bytes)) => {
-                if bytes.capacity() < length {
-                    bytes.resize(length, 0);
+        if let Some(value) = value.as_mut() {
+            match value.deref_mut() {
+                Value::BlobRw(ref mut bytes) => {
+                    if bytes.capacity() < length {
+                        bytes.resize(length, 0);
+                    }
+                    let writer = &mut bytes[offset as usize..length];
+                    writer.copy_from_slice(data);
+                    Ok(bytes.len().into())
                 }
-                let writer = &mut bytes[offset as usize..length];
-                writer.copy_from_slice(data);
-                Ok(bytes.len().into())
+                _ => Err(Error::WrongType),
             }
-            None => {
-                if data.is_empty() {
-                    return Ok(0.into());
-                }
-                let mut bytes = BytesMut::new();
-                bytes.resize(length, 0);
-                let writer = &mut bytes[offset as usize..];
-                writer.copy_from_slice(data);
-                slot.insert(key.clone(), Entry::new(Value::new(&bytes), None));
-                Ok(bytes.len().into())
+        } else {
+            drop(value);
+            drop(slot);
+            if data.is_empty() {
+                return Ok(0.into());
             }
-            _ => Err(Error::WrongType),
+            let mut bytes = BytesMut::new();
+            bytes.resize(length, 0);
+            let writer = &mut bytes[offset as usize..];
+            writer.copy_from_slice(data);
+            self.slots[slot_id]
+                .write()
+                .insert(key.clone(), Entry::new(Value::new(&bytes), None));
+            Ok(bytes.len().into())
         }
     }
 
@@ -530,14 +613,9 @@ impl Db {
             if replace == Override::No && db.exists(&[target.clone()]) > 0 {
                 return Ok(false);
             }
-            let _ = db.set_advanced(
-                target,
-                value.value.clone(),
-                value.get_ttl().map(|v| v - Instant::now()),
-                replace,
-                false,
-                false,
-            );
+
+            let ttl = value.get_ttl().map(|v| v - Instant::now());
+            let _ = db.set_advanced(target, value.take_value(), ttl, replace, false, false);
             Ok(true)
         } else {
             if source == target {
@@ -563,7 +641,7 @@ impl Db {
         let (expires_in, value) = if let Some(value) = slot.get(&source).filter(|v| v.is_valid()) {
             (
                 value.get_ttl().map(|t| t - Instant::now()),
-                value.value.clone(),
+                value.get().clone(),
             )
         } else {
             return Ok(false);
@@ -702,39 +780,11 @@ impl Db {
         matches
     }
 
-    /// get_map_or
-    ///
-    /// Instead of returning an entry of the database, to avoid cloning, this function will
-    /// execute a callback function with the entry as a parameter. If no record is found another
-    /// callback function is going to be executed, dropping the lock before doing so.
-    ///
-    /// If an entry is found, the lock is not dropped before doing the callback. Avoid inserting
-    /// new entries. In this case the value is passed by reference, so it is possible to modify the
-    /// entry itself.
-    ///
-    /// This function is useful to read non-scalar values from the database. Non-scalar values are
-    /// forbidden to clone, attempting cloning will end-up in an error (Error::WrongType)
-    pub fn get_map<F1>(&self, key: &Bytes, found: F1) -> Result<Value, Error>
-    where
-        F1: FnOnce(Option<&Value>) -> Result<Value, Error>,
-    {
-        let slot = self.slots[self.get_slot(key)].read();
-        let entry = slot.get(key).filter(|x| x.is_valid()).map(|e| e.get());
-
-        if let Some(entry) = entry {
-            found(Some(entry))
-        } else {
-            // drop lock
-            drop(slot);
-            found(None)
-        }
-    }
-
     /// Updates the entry version of a given key
     pub fn bump_version(&self, key: &Bytes) -> bool {
-        let mut slot = self.slots[self.get_slot(key)].write();
+        let slot = self.slots[self.get_slot(key)].read();
         let to_return = slot
-            .get_mut(key)
+            .get(key)
             .filter(|x| x.is_valid())
             .map(|entry| {
                 entry.bump_version();
@@ -773,38 +823,28 @@ impl Db {
             .collect()
     }
 
-    /// Returns the version of a given key
-    #[inline]
-    pub fn get_version(&self, key: &Bytes) -> u128 {
-        let slot = self.slots[self.get_slot(key)].read();
-        slot.get(key)
-            .filter(|x| x.is_valid())
-            .map(|entry| entry.version())
-            .unwrap_or_default()
-    }
-
     /// Returns the name of the value type
     pub fn get_data_type(&self, key: &Bytes) -> String {
         let slot = self.slots[self.get_slot(key)].read();
         slot.get(key)
             .filter(|x| x.is_valid())
             .map_or("none".to_owned(), |x| {
-                Typ::get_type(x.get()).to_string().to_lowercase()
+                Typ::get_type(&x.get()).to_string().to_lowercase()
             })
     }
 
-    /// Get a copy of an entry
-    pub fn get(&self, key: &Bytes) -> Value {
-        let slot = self.slots[self.get_slot(key)].read();
-        slot.get(key)
-            .filter(|x| x.is_valid())
-            .map_or(Value::Null, |x| x.clone_value())
+    /// Get a ref value
+    pub fn get<'a>(&'a self, key: &'a Bytes) -> RefValue<'a> {
+        RefValue {
+            slot: self.slots[self.get_slot(key)].read(),
+            key,
+        }
     }
 
     /// Get a copy of an entry and modifies the expiration of the key
     pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
-        let mut slot = self.slots[self.get_slot(key)].write();
-        slot.get_mut(key)
+        let slot = self.slots[self.get_slot(key)].read();
+        slot.get(key)
             .filter(|x| x.is_valid())
             .map(|value| {
                 if make_persistent {
@@ -828,7 +868,7 @@ impl Db {
             .map(|key| {
                 let slot = self.slots[self.get_slot(key)].read();
                 slot.get(key)
-                    .filter(|x| x.is_valid() && x.is_clonable())
+                    .filter(|x| x.is_valid() && x.is_scalar())
                     .map_or(Value::Null, |x| x.clone_value())
             })
             .collect::<Vec<Value>>()
@@ -855,21 +895,20 @@ impl Db {
 
     /// Set a key, value with an optional expiration time
     pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
-        let mut slot = self.slots[self.get_slot(key)].write();
+        let slot = self.slots[self.get_slot(key)].read();
 
-        if let Some(entry) = slot.get_mut(key).filter(|x| x.is_valid()) {
-            if let Value::Blob(data) = entry.get() {
-                let rw_data = BytesMut::from(&data[..]);
-                entry.change_value(Value::BlobRw(rw_data));
-            }
-            match entry.get_mut() {
-                Value::BlobRw(value) => {
+        if let Some(entry) = slot.get(key).filter(|x| x.is_valid()) {
+            entry.ensure_blob_is_mutable()?;
+            match *entry.get_mut() {
+                Value::BlobRw(ref mut value) => {
                     value.put(value_to_append.as_ref());
                     Ok(value.len().into())
                 }
                 _ => Err(Error::WrongType),
             }
         } else {
+            drop(slot);
+            let mut slot = self.slots[self.get_slot(key)].write();
             slot.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
             Ok(value_to_append.len().into())
         }
@@ -1105,7 +1144,7 @@ impl scan::Scan for Db {
                     }
                 }
                 if let Some(typ) = &typ {
-                    if !typ.is_value_type(value.get()) {
+                    if !typ.is_value_type(&value.get()) {
                         last_pos += 1;
                         continue;
                     }
@@ -1147,7 +1186,10 @@ mod test {
 
         assert!(r.is_err());
         assert_eq!(Error::NotANumber, r.expect_err("should fail"));
-        assert_eq!(Value::Blob(bytes!("some string")), db.get(&bytes!("num")));
+        assert_eq!(
+            Value::Blob(bytes!("some string")),
+            db.get(&bytes!("num")).into_inner()
+        );
     }
 
     #[test]
@@ -1156,7 +1198,10 @@ mod test {
         db.set(bytes!(b"num"), Value::Blob(bytes!("1.1")), None);
 
         assert_eq!(Ok(2.2.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
-        assert_eq!(Value::Blob(bytes!("2.2")), db.get(&bytes!("num")));
+        assert_eq!(
+            Value::Blob(bytes!("2.2")),
+            db.get(&bytes!("num")).into_inner()
+        );
     }
 
     #[test]
@@ -1165,7 +1210,10 @@ mod test {
         db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
 
         assert_eq!(Ok(2.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
-        assert_eq!(Value::Blob(bytes!("2.1")), db.get(&bytes!("num")));
+        assert_eq!(
+            Value::Blob(bytes!("2.1")),
+            db.get(&bytes!("num")).into_inner()
+        );
     }
 
     #[test]
@@ -1174,21 +1222,30 @@ mod test {
         db.set(bytes!(b"num"), Value::Blob(bytes!("1")), None);
 
         assert_eq!(Ok(2), db.incr(&bytes!("num"), 1));
-        assert_eq!(Value::Blob(bytes!("2")), db.get(&bytes!("num")));
+        assert_eq!(
+            Value::Blob(bytes!("2")),
+            db.get(&bytes!("num")).into_inner()
+        );
     }
 
     #[test]
     fn incr_blob_int_set() {
         let db = Db::new(100);
         assert_eq!(Ok(1), db.incr(&bytes!("num"), 1));
-        assert_eq!(Value::Blob(bytes!("1")), db.get(&bytes!("num")));
+        assert_eq!(
+            Value::Blob(bytes!("1")),
+            db.get(&bytes!("num")).into_inner()
+        );
     }
 
     #[test]
     fn incr_blob_float_set() {
         let db = Db::new(100);
         assert_eq!(Ok(1.1.into()), db.incr::<Float>(&bytes!("num"), 1.1.into()));
-        assert_eq!(Value::Blob(bytes!("1.1")), db.get(&bytes!("num")));
+        assert_eq!(
+            Value::Blob(bytes!("1.1")),
+            db.get(&bytes!("num")).into_inner()
+        );
     }
 
     #[test]
@@ -1226,7 +1283,7 @@ mod test {
     fn persist_bug() {
         let db = Db::new(100);
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(1)));
-        assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
         assert!(db.is_key_in_expiration_list(&bytes!(b"one")));
         db.persist(&bytes!(b"one"));
         assert!(!db.is_key_in_expiration_list(&bytes!(b"one")));
@@ -1238,13 +1295,13 @@ mod test {
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
         // Expired keys should not be returned, even if they are not yet
         // removed by the purge process.
-        assert_eq!(Value::Null, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
 
         // Purge twice
         assert_eq!(1, db.purge());
         assert_eq!(0, db.purge());
 
-        assert_eq!(Value::Null, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
     }
 
     #[test]
@@ -1253,10 +1310,10 @@ mod test {
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
         // Expired keys should not be returned, even if they are not yet
         // removed by the purge process.
-        assert_eq!(Value::Null, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Null, db.get(&bytes!(b"one")).into_inner());
 
         db.set(bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
-        assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
+        assert_eq!(Value::Ok, db.get(&bytes!(b"one")).into_inner());
 
         // Purge should return 0 as the expired key has been removed already
         assert_eq!(0, db.purge());

+ 0 - 68
src/value/locked.rs

@@ -1,68 +0,0 @@
-//! # Locked Value
-//!
-//! Wraps any a structure and makes it read-write lockable. This is a very simple abstraction on
-//! top of a RwLock.
-
-use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
-
-/// Locked Value
-///
-/// This is a very simple data structure to wrap a value behind a read/write lock.
-///
-/// The wrap objects are comparable and clonable.
-#[derive(Debug)]
-pub struct Value<T: Clone + PartialEq>(pub RwLock<T>);
-
-impl<T: Clone + PartialEq> Clone for Value<T> {
-    fn clone(&self) -> Self {
-        Self(RwLock::new(self.0.read().clone()))
-    }
-}
-
-impl<T: PartialEq + Clone> PartialEq for Value<T> {
-    fn eq(&self, other: &Value<T>) -> bool {
-        self.0.read().eq(&other.0.read())
-    }
-}
-
-impl<T: PartialEq + Clone> Value<T> {
-    /// Creates a new instance
-    pub fn new(obj: T) -> Self {
-        Self(RwLock::new(obj))
-    }
-
-    /// Acquire a write lock and return the wrapped Value
-    pub fn write(&self) -> RwLockWriteGuard<'_, T> {
-        self.0.write()
-    }
-
-    /// Acquire a read lock and return the wrapped Value
-    pub fn read(&self) -> RwLockReadGuard<'_, T> {
-        self.0.read()
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use super::*;
-
-    #[test]
-    fn locked_eq1() {
-        let a = Value::new(1);
-        let b = Value::new(1);
-        assert!(a == b);
-    }
-
-    #[test]
-    fn locked_eq2() {
-        let a = Value::new(1);
-        let b = Value::new(2);
-        assert!(a != b);
-    }
-
-    #[test]
-    fn locked_clone() {
-        let a = Value::new((1, 2, 3));
-        assert!(a == a.clone());
-    }
-}

+ 6 - 7
src/value/mod.rs

@@ -5,7 +5,6 @@ pub mod checksum;
 pub mod cursor;
 pub mod expiration;
 pub mod float;
-pub mod locked;
 pub mod typ;
 
 use crate::{error::Error, value_try_from, value_vec_try_from};
@@ -24,11 +23,11 @@ use std::{
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
     /// Hash. This type cannot be serialized
-    Hash(locked::Value<HashMap<Bytes, Bytes>>),
+    Hash(HashMap<Bytes, Bytes>),
     /// List. This type cannot be serialized
-    List(locked::Value<VecDeque<checksum::Value>>),
+    List(VecDeque<checksum::Value>),
     /// Set. This type cannot be serialized
-    Set(locked::Value<HashSet<Bytes>>),
+    Set(HashSet<Bytes>),
     /// Vector/Array of values
     Array(Vec<Value>),
     /// Bytes/Strings/Binary data
@@ -272,19 +271,19 @@ impl From<&str> for Value {
 
 impl From<HashMap<Bytes, Bytes>> for Value {
     fn from(value: HashMap<Bytes, Bytes>) -> Value {
-        Value::Hash(locked::Value::new(value))
+        Value::Hash(value)
     }
 }
 
 impl From<VecDeque<checksum::Value>> for Value {
     fn from(value: VecDeque<checksum::Value>) -> Value {
-        Value::List(locked::Value::new(value))
+        Value::List(value)
     }
 }
 
 impl From<HashSet<Bytes>> for Value {
     fn from(value: HashSet<Bytes>) -> Value {
-        Value::Set(locked::Value::new(value))
+        Value::Set(value)
     }
 }