Browse Source

Use BytesMut for Value::Blob instead Bytes (#30)

BytesMut is needed in order to implement an efficient way to change the
stored values. Right now the whole value has to be replaced with a new
value.

Using Bytes instead of BytesMut is particular ineficient for APPEND and
SETRANGE
César D. Rodas 3 years ago
parent
commit
7e67e4ceec

+ 2 - 2
src/cmd/client.rs

@@ -50,7 +50,7 @@ pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 /// Documentation:
 ///  * <https://redis.io/commands/echo>
 pub async fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    Ok(Value::Blob(args[1].to_owned()))
+    Ok(Value::new(&args[1]))
 }
 
 /// "ping" command handler
@@ -60,7 +60,7 @@ pub async fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match args.len() {
         1 => Ok(Value::String("PONG".to_owned())),
-        2 => Ok(Value::Blob(args[1].to_owned())),
+        2 => Ok(Value::new(&args[1])),
         _ => Err(Error::InvalidArgsCount("ping".to_owned())),
     }
 }

+ 9 - 9
src/cmd/hash.rs

@@ -62,7 +62,7 @@ pub async fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         &args[1],
         |v| match v {
             Value::Hash(h) => Ok(if let Some(v) = h.read().get(&args[2]) {
-                Value::Blob(v.clone())
+                Value::new(&v)
             } else {
                 Value::Null
             }),
@@ -82,8 +82,8 @@ pub async fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
                 let mut ret = vec![];
 
                 for (key, value) in h.read().iter() {
-                    ret.push(Value::Blob(key.clone()));
-                    ret.push(Value::Blob(value.clone()));
+                    ret.push(Value::new(&key));
+                    ret.push(Value::new(&value));
                 }
 
                 Ok(ret.into())
@@ -144,7 +144,7 @@ pub async fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                 let mut ret = vec![];
 
                 for key in h.read().keys() {
-                    ret.push(Value::Blob(key.clone()));
+                    ret.push(Value::new(&key));
                 }
 
                 Ok(ret.into())
@@ -179,7 +179,7 @@ pub async fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                     .iter()
                     .map(|key| {
                         if let Some(value) = h.get(key) {
-                            Value::Blob(value.clone())
+                            Value::new(&value)
                         } else {
                             Value::Null
                         }
@@ -234,17 +234,17 @@ pub async fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
                 i = 0;
                 for val in rand_sorted.values() {
                     if single {
-                        return Ok(Value::Blob(val.0.clone()));
+                        return Ok(Value::new(&val.0));
                     }
 
                     if i == count {
                         break;
                     }
 
-                    ret.push(Value::Blob(val.0.clone()));
+                    ret.push(Value::new(&val.0));
 
                     if with_values {
-                        ret.push(Value::Blob(val.1.clone()));
+                        ret.push(Value::new(&val.1));
                     }
 
                     i += 1;
@@ -360,7 +360,7 @@ pub async fn hvals(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                 let mut ret = vec![];
 
                 for value in h.read().values() {
-                    ret.push(Value::Blob(value.clone()));
+                    ret.push(Value::new(&value));
                 }
 
                 Ok(ret.into())

+ 6 - 4
src/cmd/list.rs

@@ -63,14 +63,15 @@ fn remove_element(
 /// popped from the head of the first list that is non-empty, with the given keys being checked in
 /// the order that they are given.
 pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
+    let timeout =
+        Instant::now() + Duration::from_secs(bytes_to_number::<u64>(&args[args.len() - 1])?);
     let len = args.len() - 1;
 
     loop {
         for key in args[1..len].iter() {
             match remove_element(conn, key, 0, true)? {
                 Value::Null => (),
-                n => return Ok(vec![Value::Blob(key.clone()), n].into()),
+                n => return Ok(vec![Value::new(&key), n].into()),
             };
         }
 
@@ -89,14 +90,15 @@ pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 /// popped from the tail of the first list that is non-empty, with the given keys being checked in
 /// the order that they are given.
 pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
+    let timeout =
+        Instant::now() + Duration::from_secs(bytes_to_number::<u64>(&args[args.len() - 1])?);
     let len = args.len() - 1;
 
     loop {
         for key in args[1..len].iter() {
             match remove_element(conn, key, 0, false)? {
                 Value::Null => (),
-                n => return Ok(vec![Value::Blob(key.clone()), n].into()),
+                n => return Ok(vec![Value::new(&key), n].into()),
             };
         }
 

+ 2 - 2
src/cmd/pubsub.rs

@@ -15,7 +15,7 @@ pub async fn pubsub(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             conn.pubsub()
                 .channels()
                 .iter()
-                .map(|v| Value::Blob(v.clone()))
+                .map(|v| Value::new(&v))
                 .collect(),
         )),
         "help" => Ok(Value::Array(vec![
@@ -29,7 +29,7 @@ pub async fn pubsub(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             .pubsub()
             .get_number_of_subscribers(&args[2..])
             .iter()
-            .map(|(channel, subs)| vec![Value::Blob(channel.clone()), (*subs).into()])
+            .map(|(channel, subs)| vec![Value::new(&channel), (*subs).into()])
             .flatten()
             .collect::<Vec<Value>>()
             .into()),

+ 1 - 1
src/cmd/server.rs

@@ -50,7 +50,7 @@ pub async fn command(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
                 command
                     .get_keys(args)
                     .iter()
-                    .map(|key| Value::Blob((*key).clone()))
+                    .map(|key| Value::new(*key))
                     .collect(),
             ))
         }

+ 7 - 7
src/cmd/set.rs

@@ -11,7 +11,7 @@ fn store(conn: &Connection, key: &Bytes, values: &[Value]) -> i64 {
 
     for val in values.iter() {
         if let Value::Blob(blob) = val {
-            if x.insert(blob.clone()) {
+            if x.insert(blob.clone().freeze()) {
                 len += 1;
             }
         }
@@ -52,7 +52,7 @@ where
 
                 Ok(all_entries
                     .iter()
-                    .map(|entry| Value::Blob(entry.clone()))
+                    .map(|entry| Value::new(&entry))
                     .collect::<Vec<Value>>()
                     .into())
             }
@@ -221,7 +221,7 @@ pub async fn smembers(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
             Value::Set(x) => Ok(x
                 .read()
                 .iter()
-                .map(|x| Value::Blob(x.clone()))
+                .map(|x| Value::new(x))
                 .collect::<Vec<Value>>()
                 .into()),
             _ => Err(Error::WrongType),
@@ -321,12 +321,12 @@ pub async fn spop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                 let mut x = x.write();
                 match &rand {
                     Value::Blob(value) => {
-                        x.remove(value);
+                        x.remove(value.as_ref().into());
                     }
                     Value::Array(values) => {
                         for value in values.iter() {
                             if let Value::Blob(value) = value {
-                                x.remove(value);
+                                x.remove(value.as_ref().into());
                             }
                         }
                     }
@@ -370,12 +370,12 @@ pub async fn srandmember(conn: &Connection, args: &[Bytes]) -> Result<Value, Err
 
                 if args.len() == 2 {
                     let item = items[0].0.clone();
-                    Ok(Value::Blob(item))
+                    Ok(Value::new(&item))
                 } else {
                     let len: usize = min(items.len(), bytes_to_number(&args[2])?);
                     Ok(items[0..len]
                         .iter()
-                        .map(|item| Value::Blob(item.0.clone()))
+                        .map(|item| Value::new(&item.0))
                         .collect::<Vec<Value>>()
                         .into())
                 }

+ 43 - 11
src/cmd/string.rs

@@ -58,7 +58,7 @@ pub async fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 /// type or contains a string that can not be represented as integer. This operation is limited to
 /// 64 bit signed integers.
 pub async fn decr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let by: i64 = (&Value::Blob(args[2].to_owned())).try_into()?;
+    let by: i64 = (&Value::new(&args[2])).try_into()?;
     conn.db().incr(&args[1], by.neg())
 }
 
@@ -142,7 +142,11 @@ pub async fn getrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
             }
 
             Ok(Value::Blob(
-                binary.slice((Bound::Included(start), Bound::Included(end))),
+                binary
+                    .freeze()
+                    .slice((Bound::Included(start), Bound::Included(end)))
+                    .as_ref()
+                    .into(),
             ))
         }
         Value::Null => Ok("".into()),
@@ -160,7 +164,7 @@ pub async fn getdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 /// exists but does not hold a string value. Any previous time to live associated with the key is
 /// discarded on successful SET operation.
 pub async fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    Ok(conn.db().getset(&args[1], &Value::Blob(args[2].to_owned())))
+    Ok(conn.db().getset(&args[1], Value::new(&args[2])))
 }
 
 /// Returns the values of all specified keys. For every key that does not hold a string value or
@@ -174,9 +178,7 @@ pub async fn mget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 /// operation.
 pub async fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match args.len() {
-        3 => Ok(conn
-            .db()
-            .set(&args[1], Value::Blob(args[2].to_owned()), None)),
+        3 => Ok(conn.db().set(&args[1], Value::new(&args[2]), None)),
         4 | 5 | 6 | 7 => {
             let mut offset = 3;
             let mut expiration = None;
@@ -252,7 +254,7 @@ pub async fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             Ok(
                 match conn.db().set_advanced(
                     &args[1],
-                    Value::Blob(args[2].to_owned()),
+                    Value::new(&args[2]),
                     expiration,
                     override_value,
                     keep_ttl,
@@ -305,9 +307,7 @@ pub async fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         Duration::from_millis(bytes_to_number(&args[2])?)
     };
 
-    Ok(conn
-        .db()
-        .set(&args[1], Value::Blob(args[3].to_owned()), Some(ttl)))
+    Ok(conn.db().set(&args[1], Value::new(&args[3]), Some(ttl)))
 }
 
 /// Set key to hold string value if key does not exist. In that case, it is
@@ -316,7 +316,7 @@ pub async fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn setnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().set_advanced(
         &args[1],
-        Value::Blob(args[2].to_owned()),
+        Value::new(&args[2]),
         None,
         Override::No,
         false,
@@ -335,6 +335,17 @@ pub async fn strlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     }
 }
 
+/// Overwrites part of the string stored at key, starting at the specified
+/// offset, for the entire length of value. If the offset is larger than the
+/// current length of the string at key, the string is padded with zero-bytes to
+/// make offset fit. Non-existing keys are considered as empty strings, so this
+/// command will make sure it holds a string large enough to be able to set
+/// value at offset.
+pub async fn setrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db()
+        .set_range(&args[1], bytes_to_number(&args[2])?, &args[3])
+}
+
 #[cfg(test)]
 mod test {
     use crate::{
@@ -686,4 +697,25 @@ mod test {
             x
         );
     }
+
+    #[tokio::test]
+    async fn test_set_range() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(23.into()),
+            run_command(&c, &["setrange", "foo", "20", "xxx"]).await,
+        );
+        assert_eq!(
+            Ok(23.into()),
+            run_command(&c, &["setrange", "foo", "2", "xxx"]).await,
+        );
+        assert_eq!(
+            Ok(33.into()),
+            run_command(&c, &["setrange", "foo", "30", "xxx"]).await,
+        );
+        assert_eq!(
+            Ok("\0\0xxx\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0xxx\0\0\0\0\0\0\0xxx".into()),
+            run_command(&c, &["get", "foo"]).await,
+        );
+    }
 }

+ 7 - 7
src/connection/pubsub_server.rs

@@ -79,7 +79,7 @@ impl Pubsub {
             let _ = conn.pubsub_client().sender().try_send(
                 vec![
                     "psubscribe".into(),
-                    Value::Blob(bytes_channel.clone()),
+                    Value::new(&bytes_channel),
                     conn.pubsub_client().new_psubscription(&channel).into(),
                 ]
                 .into(),
@@ -99,8 +99,8 @@ impl Pubsub {
                 if sender
                     .try_send(Value::Array(vec![
                         "message".into(),
-                        Value::Blob(channel.clone()),
-                        Value::Blob(message.clone()),
+                        Value::new(&channel),
+                        Value::new(&message),
                     ]))
                     .is_ok()
                 {
@@ -120,8 +120,8 @@ impl Pubsub {
                 let _ = sub.try_send(Value::Array(vec![
                     "pmessage".into(),
                     pattern.as_str().into(),
-                    Value::Blob(channel.clone()),
-                    Value::Blob(message.clone()),
+                    Value::new(channel),
+                    Value::new(message),
                 ]));
                 i += 1;
             }
@@ -175,7 +175,7 @@ impl Pubsub {
                 let _ = conn.pubsub_client().sender().try_send(
                     vec![
                         "subscribe".into(),
-                        Value::Blob(channel.clone()),
+                        Value::new(&channel),
                         conn.pubsub_client().new_subscription(channel).into(),
                     ]
                     .into(),
@@ -196,7 +196,7 @@ impl Pubsub {
                     if let Some(sender) = subs.remove(&conn_id) {
                         let _ = sender.try_send(Value::Array(vec![
                             "unsubscribe".into(),
-                            Value::Blob(channel.clone()),
+                            Value::new(&channel),
                             1.into(),
                         ]));
                         removed += 1;

+ 2 - 0
src/db/entry.rs

@@ -99,6 +99,8 @@ impl Entry {
         )
     }
 
+    /// Clone a value. If the value is not clonable an error is Value::Error is
+    /// returned instead
     pub fn clone_value(&self) -> Value {
         if self.is_clonable() {
             self.value.clone()

+ 9 - 9
src/db/expiration.rs

@@ -130,10 +130,10 @@ mod test {
     fn get_expiration() {
         let mut db = ExpirationDb::new();
         let keys = vec![
-            (bytes!(b"hix"), Instant::now() + Duration::from_secs(15)),
-            (bytes!(b"key"), Instant::now() + Duration::from_secs(2)),
-            (bytes!(b"bar"), Instant::now() + Duration::from_secs(3)),
-            (bytes!(b"hi"), Instant::now() + Duration::from_secs(3)),
+            ("hix".into(), Instant::now() + Duration::from_secs(15)),
+            ("key".into(), Instant::now() + Duration::from_secs(2)),
+            ("bar".into(), Instant::now() + Duration::from_secs(3)),
+            ("hi".into(), Instant::now() + Duration::from_secs(3)),
         ];
 
         keys.iter()
@@ -164,15 +164,15 @@ mod test {
     pub fn remove() {
         let mut db = ExpirationDb::new();
         let keys = vec![
-            (bytes!(b"hix"), Instant::now() + Duration::from_secs(15)),
-            (bytes!(b"key"), Instant::now() + Duration::from_secs(2)),
-            (bytes!(b"bar"), Instant::now() + Duration::from_secs(3)),
-            (bytes!(b"hi"), Instant::now() + Duration::from_secs(3)),
+            ("hix".into(), Instant::now() + Duration::from_secs(15)),
+            ("key".into(), Instant::now() + Duration::from_secs(2)),
+            ("bar".into(), Instant::now() + Duration::from_secs(3)),
+            ("hi".into(), Instant::now() + Duration::from_secs(3)),
         ];
 
         keys.iter()
             .map(|v| {
-                db.add(&v.0, v.1);
+                db.add(&(v.0), v.1);
             })
             .for_each(drop);
 

+ 54 - 16
src/db/mod.rs

@@ -1,4 +1,4 @@
-//! # In-Memory database
+//! # in-memory database
 //!
 //! This database module is the core of the miniredis project. All other modules around this
 //! database module.
@@ -6,7 +6,7 @@ mod entry;
 mod expiration;
 
 use crate::{error::Error, value::Value};
-use bytes::Bytes;
+use bytes::{BufMut, Bytes, BytesMut};
 use entry::{new_version, Entry};
 use expiration::ExpirationDb;
 use log::trace;
@@ -225,14 +225,14 @@ impl Db {
 
                 number += incr_by;
 
-                x.change_value(number.to_string().as_str().into());
+                x.change_value(Value::Blob(number.to_string().as_str().into()));
 
                 Ok(number.into())
             }
             None => {
                 entries.insert(
                     key.clone(),
-                    Entry::new(incr_by.to_string().as_str().into(), None),
+                    Entry::new(Value::Blob(incr_by.to_string().as_str().into()), None),
                 );
                 Ok((incr_by as T).into())
             }
@@ -271,6 +271,44 @@ impl Db {
             })
     }
 
+    /// Overwrites part of the string stored at key, starting at the specified
+    /// offset, for the entire length of value. If the offset is larger than the
+    /// current length of the string at key, the string is padded with zero-bytes to
+    /// make offset fit. Non-existing keys are considered as empty strings, so this
+    /// command will make sure it holds a string large enough to be able to set
+    /// value at offset.
+    pub fn set_range(&self, key: &Bytes, offset: u64, data: &[u8]) -> Result<Value, Error> {
+        let mut entries = self.entries[self.get_slot(key)].write();
+        let value = entries.get_mut(key).map(|value| {
+            if !value.is_valid() {
+                self.expirations.lock().remove(key);
+                value.persist();
+            }
+            value.get_mut()
+        });
+
+        let length = offset as usize + data.len();
+        match value {
+            Some(Value::Blob(bytes)) => {
+                if bytes.capacity() < length {
+                    bytes.resize(length, 0);
+                }
+                let writer = &mut bytes[offset as usize..length];
+                writer.copy_from_slice(data);
+                Ok(bytes.len().into())
+            }
+            None => {
+                let mut bytes = BytesMut::new();
+                bytes.resize(length, 0);
+                let writer = &mut bytes[offset as usize..];
+                writer.copy_from_slice(data);
+                entries.insert(key.clone(), Entry::new(Value::new(&bytes), None));
+                Ok(bytes.len().into())
+            }
+            _ => Err(Error::WrongType),
+        }
+    }
+
     /// Removes keys from the database
     pub fn del(&self, keys: &[Bytes]) -> Value {
         let mut expirations = self.expirations.lock();
@@ -396,11 +434,11 @@ impl Db {
     }
 
     /// Get a key or set a new value for the given key.
-    pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
+    pub fn getset(&self, key: &Bytes, value: Value) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write();
         self.expirations.lock().remove(key);
         entries
-            .insert(key.clone(), Entry::new(value.clone(), None))
+            .insert(key.clone(), Entry::new(value, None))
             .filter(|x| x.is_valid())
             .map_or(Value::Null, |x| x.clone_value())
     }
@@ -420,20 +458,15 @@ impl Db {
         let mut entry = entries.get_mut(key).filter(|x| x.is_valid());
 
         if let Some(entry) = entries.get_mut(key).filter(|x| x.is_valid()) {
-            match entry.get() {
+            match entry.get_mut() {
                 Value::Blob(value) => {
-                    let binary: Bytes = [value.as_ref(), value_to_append.as_ref()].concat().into();
-                    let len = binary.len();
-                    entry.change_value(Value::Blob(binary));
-                    Ok(len.into())
+                    value.put(value_to_append.as_ref());
+                    Ok(value.len().into())
                 }
                 _ => Err(Error::WrongType),
             }
         } else {
-            entries.insert(
-                key.clone(),
-                Entry::new(Value::Blob(value_to_append.clone()), None),
-            );
+            entries.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
             Ok(value_to_append.len().into())
         }
     }
@@ -467,7 +500,7 @@ impl Db {
             let mut entries = self.entries[self.get_slot(&key_values[i])].write();
             entries.insert(
                 key_values[i].clone(),
-                Entry::new(Value::Blob(key_values[i + 1].clone()), None),
+                Entry::new(Value::new(&key_values[i + 1]), None),
             );
         }
 
@@ -531,7 +564,12 @@ impl Db {
 
         if let Some(expires_at) = expires_at {
             self.expirations.lock().add(key, expires_at);
+        } else {
+            /// Make sure to remove the new key (or replaced) from the
+            /// expiration table (from any possible past value).
+            self.expirations.lock().remove(key);
         }
+
         entries.insert(key.clone(), Entry::new(value, expires_at));
 
         if let Some(to_return) = to_return {

+ 9 - 0
src/dispatcher/mod.rs

@@ -760,6 +760,15 @@ dispatcher! {
             1,
             true,
         },
+        setrange {
+            cmd::string::setrange,
+            [Flag::Write],
+            4,
+            1,
+            1,
+            1,
+            true,
+        }
     },
     connection {
         client {

+ 1 - 1
src/macros.rs

@@ -230,6 +230,6 @@ macro_rules! try_get_arg {
 #[macro_export]
 macro_rules! bytes {
     ($content:tt) => {
-        Bytes::from(&$content[..])
+        (&$content[..]).into()
     };
 }

+ 1 - 1
src/value/checksum.rs

@@ -52,7 +52,7 @@ impl Value {
 
     /// Clone the underlying value
     pub fn clone_value(&self) -> value::Value {
-        value::Value::Blob(self.bytes.clone())
+        value::Value::new(&self.bytes)
     }
 
     /// Whether it has a checksum or not

+ 12 - 5
src/value/mod.rs

@@ -27,7 +27,7 @@ pub enum Value {
     /// Vector/Array of values
     Array(Vec<Value>),
     /// Bytes/Strings/Binary data
-    Blob(Bytes),
+    Blob(BytesMut),
     /// String. This type does not allowe new lines
     String(String),
     /// An error
@@ -48,6 +48,13 @@ pub enum Value {
     Ok,
 }
 
+impl Value {
+    /// Creates a new Redis value from a stream of bytes
+    pub fn new(value: &[u8]) -> Self {
+        Self::Blob(value.into())
+    }
+}
+
 impl From<&Value> for Vec<u8> {
     fn from(value: &Value) -> Vec<u8> {
         match value {
@@ -86,7 +93,7 @@ impl TryFrom<&Value> for i64 {
         match val {
             Value::BigInteger(x) => (*x).try_into().map_err(|_| Error::NotANumber),
             Value::Integer(x) => Ok(*x),
-            Value::Blob(x) => bytes_to_number::<i64>(x),
+            Value::Blob(x) => bytes_to_number::<i64>(&x),
             Value::String(x) => x.parse::<i64>().map_err(|_| Error::NotANumber),
             _ => Err(Error::NotANumber),
         }
@@ -109,7 +116,7 @@ impl TryFrom<&Value> for f64 {
 /// Tries to converts bytes data into a number
 ///
 /// If the convertion fails a Error::NotANumber error is returned.
-pub fn bytes_to_number<T: FromStr>(bytes: &Bytes) -> Result<T, Error> {
+pub fn bytes_to_number<T: FromStr>(bytes: &[u8]) -> Result<T, Error> {
     let x = String::from_utf8_lossy(bytes);
     x.parse::<T>().map_err(|_| Error::NotANumber)
 }
@@ -124,7 +131,7 @@ impl<'a> From<&ParsedValue<'a>> for Value {
     fn from(value: &ParsedValue) -> Self {
         match value {
             ParsedValue::String(x) => Self::String((*x).to_string()),
-            ParsedValue::Blob(x) => Self::Blob(Bytes::copy_from_slice(*x)),
+            ParsedValue::Blob(x) => Self::new(*x),
             ParsedValue::Array(x) => Self::Array(x.iter().map(|x| x.into()).collect()),
             ParsedValue::Boolean(x) => Self::Boolean(*x),
             ParsedValue::BigInteger(x) => Self::BigInteger(*x),
@@ -150,7 +157,7 @@ impl From<usize> for Value {
 
 impl From<&str> for Value {
     fn from(value: &str) -> Value {
-        Value::Blob(Bytes::copy_from_slice(value.as_bytes()))
+        Value::Blob(value.as_bytes().into())
     }
 }