瀏覽代碼

Merge pull request #6 from crodas/feature/list

Add List related commands
César D. Rodas 3 年之前
父節點
當前提交
55036d3e07
共有 15 個文件被更改,包括 1960 次插入201 次删除
  1. 2 0
      Cargo.toml
  2. 3 3
      src/cmd/client.rs
  3. 61 61
      src/cmd/hash.rs
  4. 56 23
      src/cmd/key.rs
  5. 1480 0
      src/cmd/list.rs
  6. 3 2
      src/cmd/mod.rs
  7. 64 64
      src/cmd/string.rs
  8. 7 7
      src/db/mod.rs
  9. 94 2
      src/dispatcher.rs
  10. 4 0
      src/error.rs
  11. 7 3
      src/macros.rs
  12. 1 0
      src/server.rs
  13. 106 0
      src/value/checksum.rs
  14. 55 0
      src/value/locked.rs
  15. 17 36
      src/value/mod.rs

+ 2 - 0
Cargo.toml

@@ -10,6 +10,8 @@ edition = "2018"
 redis-zero-protocol-parser = {path = "redis-protocol-parser"}
 tokio={version="1", features = ["full", "tracing"] }
 tokio-util={version="^0.6", features = ["full"] }
+async-trait = "0.1.50"
+crc32fast="^1.2"
 futures = { version = "0.3.0", features = ["thread-pool"]}
 tokio-stream="0.1"
 seahash = "4"

+ 3 - 3
src/cmd/client.rs

@@ -2,7 +2,7 @@ use crate::{connection::Connection, error::Error, option, value::Value};
 use bytes::Bytes;
 use std::sync::Arc;
 
-pub fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let sub = unsafe { std::str::from_utf8_unchecked(&args[1]) }.to_string();
 
     let expected = match sub.to_lowercase().as_str() {
@@ -39,11 +39,11 @@ pub fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     }
 }
 
-pub fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(Value::Blob(args[1].to_owned()))
 }
 
-pub fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match args.len() {
         1 => Ok(Value::String("PONG".to_owned())),
         2 => Ok(Value::Blob(args[1].to_owned())),

+ 61 - 61
src/cmd/hash.rs

@@ -10,7 +10,7 @@ use std::{
     str::FromStr,
 };
 
-pub fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -28,26 +28,26 @@ pub fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             }
             _ => Err(Error::WrongType),
         },
-        || Ok(0_i64.into()),
+        || Ok(0.into()),
     )
 }
 
-pub fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
             Value::Hash(h) => Ok(if h.read().get(&args[2]).is_some() {
-                1_i64.into()
+                1.into()
             } else {
-                0_i64.into()
+                0.into()
             }),
             _ => Err(Error::WrongType),
         },
-        || Ok(0_i64.into()),
+        || Ok(0.into()),
     )
 }
 
-pub fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -62,7 +62,7 @@ pub fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -82,7 +82,7 @@ pub fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hincrby<
+pub async fn hincrby<
     T: ToString + FromStr + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
 >(
     conn: &Connection,
@@ -115,7 +115,7 @@ pub fn hincrby<
     )
 }
 
-pub fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -134,18 +134,18 @@ pub fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
             Value::Hash(h) => Ok((h.read().len() as i64).into()),
             _ => Err(Error::WrongType),
         },
-        || Ok(0_i64.into()),
+        || Ok(0.into()),
     )
 }
 
-pub fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -170,7 +170,7 @@ pub fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let (count, with_values) = match args.len() {
         2 => (None, false),
         3 => (Some(bytes_to_number::<i64>(&args[2])?), false),
@@ -237,7 +237,7 @@ pub fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
     }
@@ -269,7 +269,7 @@ pub fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -277,10 +277,10 @@ pub fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                 let mut h = h.write();
 
                 if h.get(&args[2]).is_some() {
-                    Ok(0_i64.into())
+                    Ok(0.into())
                 } else {
                     h.insert(args[2].clone(), args[3].clone());
-                    Ok(1_i64.into())
+                    Ok(1.into())
                 }
             }
             _ => Err(Error::WrongType),
@@ -298,22 +298,22 @@ pub fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
             Value::Hash(h) => Ok(if let Some(v) = h.read().get(&args[2]) {
                 (v.len() as i64).into()
             } else {
-                0_i64.into()
+                0.into()
             }),
             _ => Err(Error::WrongType),
         },
-        || Ok(0_i64.into()),
+        || Ok(0.into()),
     )
 }
 
-pub fn hvals(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hvals(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -339,25 +339,25 @@ mod test {
         value::Value,
     };
 
-    #[test]
-    fn hget() {
+    #[tokio::test]
+    async fn hget() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hget", "foo", "f1"]);
+        let r = run_command(&c, &["hget", "foo", "f1"]).await;
         assert_eq!(Ok(Value::Blob("1".into())), r);
     }
 
-    #[test]
-    fn hgetall() {
+    #[tokio::test]
+    async fn hgetall() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hgetall", "foo"]);
+        let r = run_command(&c, &["hgetall", "foo"]).await;
         match r {
             Ok(Value::Array(x)) => {
                 assert_eq!(6, x.len());
@@ -371,14 +371,14 @@ mod test {
         };
     }
 
-    #[test]
-    fn hrandfield() {
+    #[tokio::test]
+    async fn hrandfield() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hrandfield", "foo"]);
+        let r = run_command(&c, &["hrandfield", "foo"]).await;
         match r {
             Ok(Value::Blob(x)) => {
                 let x = unsafe { std::str::from_utf8_unchecked(&x) };
@@ -388,14 +388,14 @@ mod test {
         };
     }
 
-    #[test]
-    fn hmget() {
+    #[tokio::test]
+    async fn hmget() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hmget", "foo", "f1", "f2"]);
+        let r = run_command(&c, &["hmget", "foo", "f1", "f2"]).await;
         assert_eq!(
             Ok(Value::Array(vec![
                 Value::Blob("1".into()),
@@ -405,70 +405,70 @@ mod test {
         );
     }
 
-    #[test]
-    fn hexists() {
+    #[tokio::test]
+    async fn hexists() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["hexists", "foo", "f1"])
+            run_command(&c, &["hexists", "foo", "f1"]).await
         );
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["hexists", "foo", "f3"])
+            run_command(&c, &["hexists", "foo", "f3"]).await
         );
         assert_eq!(
             Ok(Value::Integer(0)),
-            run_command(&c, &["hexists", "foo", "f4"])
+            run_command(&c, &["hexists", "foo", "f4"]).await
         );
     }
-    #[test]
-    fn hstrlen() {
+    #[tokio::test]
+    async fn hstrlen() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hstrlen", "foo", "f1"]);
+        let r = run_command(&c, &["hstrlen", "foo", "f1"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
     }
 
-    #[test]
-    fn hlen() {
+    #[tokio::test]
+    async fn hlen() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hset", "foo", "f1", "2", "f4", "2", "f5", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "2", "f4", "2", "f5", "3"]).await;
         assert_eq!(Ok(Value::Integer(2)), r);
 
-        let r = run_command(&c, &["hlen", "foo"]);
+        let r = run_command(&c, &["hlen", "foo"]).await;
         assert_eq!(Ok(Value::Integer(5)), r);
     }
 
-    #[test]
-    fn hkeys() {
+    #[tokio::test]
+    async fn hkeys() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1"]).await;
 
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["hkeys", "foo"]);
+        let r = run_command(&c, &["hkeys", "foo"]).await;
         assert_eq!(Ok(Value::Array(vec![Value::Blob("f1".into()),])), r);
     }
 
-    #[test]
-    fn hvals() {
+    #[tokio::test]
+    async fn hvals() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1"]).await;
 
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["hvals", "foo"]);
+        let r = run_command(&c, &["hvals", "foo"]).await;
         assert_eq!(Ok(Value::Array(vec![Value::Blob("1".into()),])), r);
     }
 }

+ 56 - 23
src/cmd/key.rs

@@ -12,15 +12,15 @@ pub fn now() -> Duration {
         .expect("Time went backwards")
 }
 
-pub fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().del(&args[1..]))
 }
 
-pub fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().exists(&args[1..]))
 }
 
-pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let expires_in: i64 = bytes_to_number(&args[2])?;
 
     if expires_in <= 0 {
@@ -37,7 +37,7 @@ pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
-pub fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let secs = check_arg!(args, 0, "EXPIREAT");
     let expires_at: i64 = bytes_to_number(&args[2])?;
     let expires_in: i64 = if secs {
@@ -60,7 +60,7 @@ pub fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
-pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
             let ttl = ttl - Instant::now();
@@ -77,7 +77,7 @@ pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(ttl.into())
 }
 
-pub fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
             // Is there a better way? There should be!
@@ -96,7 +96,7 @@ pub fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(ttl.into())
 }
 
-pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().persist(&args[1]))
 }
 
@@ -107,28 +107,61 @@ mod test {
         value::Value,
     };
 
-    #[test]
-    fn del() {
+    #[tokio::test]
+    async fn del() {
         let c = create_connection();
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["incr", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["exists", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["del", "foo"]));
-        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["del", "foo"]));
-        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["exists", "foo"]));
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["incr", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["exists", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["del", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["del", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["exists", "foo"]).await
+        );
     }
 
-    #[test]
-    fn expire_and_persist() {
+    #[tokio::test]
+    async fn expire_and_persist() {
         let c = create_connection();
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["incr", "foo"]));
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["pexpire", "foo", "6000"])
+            run_command(&c, &["incr", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["pexpire", "foo", "6000"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(5999)),
+            run_command(&c, &["pttl", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["persist", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(-1)),
+            run_command(&c, &["pttl", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["del", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(-2)),
+            run_command(&c, &["pttl", "foo"]).await
         );
-        assert_eq!(Ok(Value::Integer(5999)), run_command(&c, &["pttl", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["persist", "foo"]));
-        assert_eq!(Ok(Value::Integer(-1)), run_command(&c, &["pttl", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["del", "foo"]));
-        assert_eq!(Ok(Value::Integer(-2)), run_command(&c, &["pttl", "foo"]));
     }
 }

+ 1480 - 0
src/cmd/list.rs

@@ -0,0 +1,1480 @@
+use crate::{
+    check_arg, connection::Connection, error::Error, value::bytes_to_number, value::checksum,
+    value::Value,
+};
+use bytes::Bytes;
+use std::collections::VecDeque;
+use tokio::time::{sleep, Duration, Instant};
+
+#[allow(clippy::needless_range_loop)]
+fn remove_element(
+    conn: &Connection,
+    key: &Bytes,
+    count: usize,
+    front: bool,
+) -> Result<Value, Error> {
+    conn.db().get_map_or(
+        key,
+        |v| match v {
+            Value::List(x) => {
+                let mut x = x.write();
+
+                if count == 0 {
+                    // Return a single element
+                    return Ok((if front { x.pop_front() } else { x.pop_back() })
+                        .map_or(Value::Null, |x| x.clone_value()));
+                }
+
+                let mut ret = vec![None; count];
+
+                for i in 0..count {
+                    if front {
+                        ret[i] = x.pop_front();
+                    } else {
+                        ret[i] = x.pop_back();
+                    }
+                }
+
+                let ret: Vec<Value> = ret
+                    .iter()
+                    .filter(|v| v.is_some())
+                    .map(|x| x.as_ref().unwrap().clone_value())
+                    .collect();
+
+                Ok(if ret.is_empty() {
+                    Value::Null
+                } else {
+                    ret.into()
+                })
+            }
+            _ => Err(Error::WrongType),
+        },
+        || Ok(Value::Null),
+    )
+}
+
+pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
+    let len = args.len() - 1;
+
+    loop {
+        for key in args[1..len].iter() {
+            match remove_element(conn, key, 0, true)? {
+                Value::Null => (),
+                n => return Ok(vec![Value::Blob(key.clone()), n].into()),
+            };
+        }
+
+        if Instant::now() >= timeout {
+            break;
+        }
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    Ok(Value::Null)
+}
+
+pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
+    let len = args.len() - 1;
+
+    loop {
+        for key in args[1..len].iter() {
+            match remove_element(conn, key, 0, false)? {
+                Value::Null => (),
+                n => return Ok(vec![Value::Blob(key.clone()), n].into()),
+            };
+        }
+
+        if Instant::now() >= timeout {
+            break;
+        }
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    Ok(Value::Null)
+}
+
+pub async fn lindex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let mut index: i64 = bytes_to_number(&args[2])?;
+                let x = x.read();
+
+                if index < 0 {
+                    index += x.len() as i64;
+                }
+
+                Ok(x.get(index as usize)
+                    .map_or(Value::Null, |x| x.clone_value()))
+            }
+            _ => Err(Error::WrongType),
+        },
+        || Ok(0.into()),
+    )
+}
+
+pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let is_before = if check_arg!(args, 2, "BEFORE") {
+        true
+    } else if check_arg!(args, 2, "AFTER") {
+        false
+    } else {
+        return Err(Error::Syntax);
+    };
+
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let pivot = checksum::Value::new(args[3].clone());
+                let mut x = x.write();
+                let mut found = false;
+
+                for (key, val) in x.iter().enumerate() {
+                    if *val == pivot {
+                        let id = if is_before { key } else { key + 1 };
+
+                        let value = checksum::Value::new(args[4].clone());
+
+                        if id > x.len() {
+                            x.push_back(value);
+                        } else {
+                            x.insert(id as usize, value);
+                        }
+
+                        found = true;
+                        break;
+                    }
+                }
+
+                if found {
+                    Ok((x.len() as i64).into())
+                } else {
+                    Ok((-1).into())
+                }
+            }
+            _ => Err(Error::WrongType),
+        },
+        || Ok(0.into()),
+    )
+}
+
+pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => Ok((x.read().len() as i64).into()),
+            _ => Err(Error::WrongType),
+        },
+        || Ok(0.into()),
+    )
+}
+
+pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let source_is_left = if check_arg!(args, 3, "LEFT") {
+        true
+    } else if check_arg!(args, 3, "RIGHT") {
+        false
+    } else {
+        return Err(Error::Syntax);
+    };
+
+    let target_is_left = if check_arg!(args, 4, "LEFT") {
+        true
+    } else if check_arg!(args, 4, "RIGHT") {
+        false
+    } else {
+        return Err(Error::Syntax);
+    };
+
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(source) => conn.db().get_map_or(
+                &args[2],
+                |v| match v {
+                    Value::List(target) => {
+                        let element = if source_is_left {
+                            source.write().pop_front()
+                        } else {
+                            source.write().pop_back()
+                        };
+
+                        if let Some(element) = element {
+                            let ret = element.clone_value();
+                            if target_is_left {
+                                target.write().push_front(element);
+                            } else {
+                                target.write().push_back(element);
+                            }
+                            Ok(ret)
+                        } else {
+                            Ok(Value::Null)
+                        }
+                    }
+                    _ => Err(Error::WrongType),
+                },
+                || {
+                    let element = if source_is_left {
+                        source.write().pop_front()
+                    } else {
+                        source.write().pop_back()
+                    };
+
+                    if let Some(element) = element {
+                        let ret = element.clone_value();
+                        let mut h = VecDeque::new();
+                        h.push_front(element);
+                        conn.db().set(&args[2], h.into(), None);
+                        Ok(ret)
+                    } else {
+                        Ok(Value::Null)
+                    }
+                },
+            ),
+            _ => Err(Error::WrongType),
+        },
+        || Ok(Value::Null),
+    )
+}
+
+pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let count = if args.len() > 2 {
+        bytes_to_number(&args[2])?
+    } else {
+        0
+    };
+
+    remove_element(conn, &args[1], count, true)
+}
+
+pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let mut index = 3;
+    let element = checksum::Value::new(args[2].clone());
+    let rank = if check_arg!(args, index, "RANK") {
+        index += 2;
+        Some(bytes_to_number::<usize>(&args[index - 1])?)
+    } else {
+        None
+    };
+    let count = if check_arg!(args, index, "COUNT") {
+        index += 2;
+        Some(bytes_to_number::<usize>(&args[index - 1])?)
+    } else {
+        None
+    };
+    let max_len = if check_arg!(args, index, "MAXLEN") {
+        index += 2;
+        bytes_to_number::<i64>(&args[index - 1])?
+    } else {
+        -1
+    };
+
+    if index != args.len() {
+        return Err(Error::Syntax);
+    }
+
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let x = x.read();
+                let mut ret: Vec<Value> = vec![];
+
+                for (i, val) in x.iter().enumerate() {
+                    if *val == element {
+                        // Match!
+                        if let Some(count) = count {
+                            ret.push((i as i64).into());
+                            if ret.len() > count {
+                                return Ok(ret.into());
+                            }
+                        } else if let Some(rank) = rank {
+                            ret.push((i as i64).into());
+                            if ret.len() == rank {
+                                return Ok(ret[rank - 1].clone());
+                            }
+                        } else {
+                            // return first match!
+                            return Ok((i as i64).into());
+                        }
+                    }
+                    if (i as i64) == max_len {
+                        break;
+                    }
+                }
+
+                if count.is_some() {
+                    Ok(ret.into())
+                } else {
+                    Ok(Value::Null)
+                }
+            }
+            _ => Err(Error::WrongType),
+        },
+        || {
+            Ok(if count.is_some() {
+                Value::Array(vec![])
+            } else {
+                Value::Null
+            })
+        },
+    )
+}
+
+pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let is_push_x = check_arg!(args, 0, "LPUSHX");
+
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let mut x = x.write();
+                for val in args.iter().skip(2) {
+                    x.push_front(checksum::Value::new(val.clone()));
+                }
+                Ok((x.len() as i64).into())
+            }
+            _ => Err(Error::WrongType),
+        },
+        || {
+            if is_push_x {
+                return Ok(0.into());
+            }
+            let mut h = VecDeque::new();
+
+            for val in args.iter().skip(2) {
+                h.push_front(checksum::Value::new(val.clone()));
+            }
+
+            let len = h.len() as i64;
+            conn.db().set(&args[1], h.into(), None);
+            Ok(len.into())
+        },
+    )
+}
+
+pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let mut start: i64 = bytes_to_number(&args[2])?;
+                let mut end: i64 = bytes_to_number(&args[3])?;
+                let mut ret = vec![];
+                let x = x.read();
+
+                if start < 0 {
+                    start += x.len() as i64;
+                }
+
+                if end < 0 {
+                    end += x.len() as i64;
+                }
+
+                for (i, val) in x.iter().enumerate() {
+                    if i >= start as usize && i <= end as usize {
+                        ret.push(val.clone_value());
+                    }
+                }
+                Ok(ret.into())
+            }
+            _ => Err(Error::WrongType),
+        },
+        || Ok(Value::Array(vec![])),
+    )
+}
+
+pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let element = checksum::Value::new(args[3].clone());
+                let limit: i64 = bytes_to_number(&args[2])?;
+                let mut x = x.write();
+
+                let (is_reverse, limit) = if limit < 0 {
+                    (true, -limit)
+                } else {
+                    (false, limit)
+                };
+
+                let mut keep = vec![true; x.len()];
+                let mut removed = 0;
+                let len = x.len();
+
+                for i in 0..len {
+                    let i = if is_reverse { len - 1 - i } else { i };
+
+                    println!("{}", i);
+                    if let Some(value) = x.get(i) {
+                        println!("{} {:?} {:?}", i, *value, element);
+                        if *value == element {
+                            keep[i] = false;
+                            removed += 1;
+                            if removed == limit {
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                let mut i = 0;
+                x.retain(|_| {
+                    i += 1;
+                    keep[i - 1]
+                });
+
+                Ok(removed.into())
+            }
+            _ => Err(Error::WrongType),
+        },
+        || Ok(0.into()),
+    )
+}
+
+pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let mut index: i64 = bytes_to_number(&args[2])?;
+                let mut x = x.write();
+
+                if index < 0 {
+                    index += x.len() as i64;
+                }
+
+                if let Some(x) = x.get_mut(index as usize) {
+                    *x = checksum::Value::new(args[3].clone());
+                    Ok(Value::OK)
+                } else {
+                    Err(Error::OutOfRange)
+                }
+            }
+            _ => Err(Error::WrongType),
+        },
+        || Err(Error::NotFound),
+    )
+}
+
+pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let mut start: i64 = bytes_to_number(&args[2])?;
+                let mut end: i64 = bytes_to_number(&args[3])?;
+                let mut x = x.write();
+
+                if start < 0 {
+                    start += x.len() as i64;
+                }
+
+                if end < 0 {
+                    end += x.len() as i64;
+                }
+
+                let mut i = 0;
+                x.retain(|_| {
+                    let retain = i >= start && i <= end;
+                    i += 1;
+                    retain
+                });
+
+                Ok(Value::OK)
+            }
+            _ => Err(Error::WrongType),
+        },
+        || Ok(Value::OK),
+    )
+}
+
+pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let count = if args.len() > 2 {
+        bytes_to_number(&args[2])?
+    } else {
+        0
+    };
+
+    remove_element(conn, &args[1], count, false)
+}
+
+pub async fn rpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    lmove(
+        conn,
+        &[
+            "lmove".into(),
+            args[1].clone(),
+            args[2].clone(),
+            "RIGHT".into(),
+            "LEFT".into(),
+        ],
+    )
+    .await
+}
+
+pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let is_push_x = check_arg!(args, 0, "RPUSHX");
+
+    conn.db().get_map_or(
+        &args[1],
+        |v| match v {
+            Value::List(x) => {
+                let mut x = x.write();
+                for val in args.iter().skip(2) {
+                    x.push_back(checksum::Value::new(val.clone()));
+                }
+                Ok((x.len() as i64).into())
+            }
+            _ => Err(Error::WrongType),
+        },
+        || {
+            if is_push_x {
+                return Ok(0.into());
+            }
+            let mut h = VecDeque::new();
+
+            for val in args.iter().skip(2) {
+                h.push_back(checksum::Value::new(val.clone()));
+            }
+
+            let len = h.len() as i64;
+            conn.db().set(&args[1], h.into(), None);
+            Ok(len.into())
+        },
+    )
+}
+
+#[cfg(test)]
+mod test {
+    use crate::{
+        cmd::test::{create_connection, run_command},
+        error::Error,
+        value::Value,
+    };
+    use tokio::time::{sleep, Duration, Instant};
+
+    #[tokio::test]
+    async fn blpop_no_waiting() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("foo".into()),
+                Value::Blob("5".into()),
+            ])),
+            run_command(&c, &["blpop", "foo", "1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn blpop_timeout() {
+        let c = create_connection();
+        let x = Instant::now();
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["blpop", "foobar", "1"]).await
+        );
+
+        assert!(Instant::now() - x > Duration::from_millis(1000));
+    }
+
+    #[tokio::test]
+    async fn blpop_wait_insert() {
+        let c = create_connection();
+        let x = Instant::now();
+
+        // Query command that will block connection until some data is inserted
+        // to foobar, foo, bar or the 5 seconds timeout happens.
+        //
+        // We are issuing the command, sleeping a little bit then adding data to
+        // foobar, before actually waiting on the result.
+        let waiting = run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]);
+
+        // Sleep 1 second before inserting new data
+        sleep(Duration::from_millis(1000)).await;
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
+        );
+
+        // Read the output of the first blpop command now.
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("foo".into()),
+                Value::Blob("5".into()),
+            ])),
+            waiting.await
+        );
+
+        assert!(Instant::now() - x > Duration::from_millis(1000));
+        assert!(Instant::now() - x < Duration::from_millis(5000));
+    }
+
+    #[tokio::test]
+    async fn lrem_1() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(
+                &c,
+                &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(3)),
+            run_command(&c, &["lrem", "mylist", "3", "hello"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("world".into()),
+                Value::Blob("hello".into()),
+            ])),
+            run_command(&c, &["lrange", "mylist", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lrem_2() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(
+                &c,
+                &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["lrem", "mylist", "-2", "hello"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("hello".into()),
+                Value::Blob("hello".into()),
+                Value::Blob("world".into()),
+            ])),
+            run_command(&c, &["lrange", "mylist", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["lrem", "mylist", "1", "hello"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("hello".into()),
+                Value::Blob("world".into()),
+            ])),
+            run_command(&c, &["lrange", "mylist", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lrem_3() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(
+                &c,
+                &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(4)),
+            run_command(&c, &["lrem", "mylist", "-100", "hello"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Blob("world".into()),])),
+            run_command(&c, &["lrange", "mylist", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lrem_4() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(
+                &c,
+                &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(4)),
+            run_command(&c, &["lrem", "mylist", "100", "hello"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Blob("world".into()),])),
+            run_command(&c, &["lrange", "mylist", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn brpop_no_waiting() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("foo".into()),
+                Value::Blob("5".into()),
+            ])),
+            run_command(&c, &["brpop", "foo", "1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn brpop_timeout() {
+        let c = create_connection();
+        let x = Instant::now();
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["brpop", "foobar", "1"]).await
+        );
+
+        assert!(Instant::now() - x > Duration::from_millis(1000));
+    }
+
+    #[tokio::test]
+    async fn brpop_wait_insert() {
+        let c = create_connection();
+        let x = Instant::now();
+
+        // Query command that will block connection until some data is inserted
+        // to foobar, foo, bar or the 5 seconds timeout happens.
+        //
+        // We are issuing the command, sleeping a little bit then adding data to
+        // foobar, before actually waiting on the result.
+        let waiting = run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]);
+
+        // Sleep 1 second before inserting new data
+        sleep(Duration::from_millis(1000)).await;
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
+        );
+
+        // Read the output of the first blpop command now.
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("foo".into()),
+                Value::Blob("5".into()),
+            ])),
+            waiting.await
+        );
+
+        assert!(Instant::now() - x > Duration::from_millis(1000));
+        assert!(Instant::now() - x < Duration::from_millis(5000));
+    }
+
+    #[tokio::test]
+    async fn lindex() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("5".into()),
+                Value::Blob("4".into()),
+                Value::Blob("3".into()),
+                Value::Blob("2".into()),
+                Value::Blob("1".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("5".into())),
+            run_command(&c, &["lindex", "foo", "0"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("1".into())),
+            run_command(&c, &["lindex", "foo", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["lindex", "foo", "-100"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["lindex", "foo", "100"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn linsert_syntax_err() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["rpush", "foo", "hello", "world"]).await
+        );
+
+        assert_eq!(
+            Err(Error::Syntax),
+            run_command(&c, &["linsert", "foo", "beforex", "world", "there"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn linsert_before() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["rpush", "foo", "hello", "world"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(3)),
+            run_command(&c, &["linsert", "foo", "before", "world", "there"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("hello".into()),
+                Value::Blob("there".into()),
+                Value::Blob("world".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await,
+        );
+    }
+
+    #[tokio::test]
+    async fn linsert_after() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["rpush", "foo", "hello", "world"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(3)),
+            run_command(&c, &["linsert", "foo", "after", "world", "there"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("hello".into()),
+                Value::Blob("world".into()),
+                Value::Blob("there".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await,
+        );
+    }
+
+    #[tokio::test]
+    async fn linsert_before_after() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["rpush", "foo", "hello", "world"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(3)),
+            run_command(&c, &["linsert", "foo", "after", "world", "there1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(4)),
+            run_command(&c, &["linsert", "foo", "before", "world", "there2"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("hello".into()),
+                Value::Blob("there2".into()),
+                Value::Blob("world".into()),
+                Value::Blob("there1".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await,
+        );
+    }
+
+    #[tokio::test]
+    async fn linsert_not_found() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["rpush", "foo", "hello", "world"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(-1)),
+            run_command(&c, &["linsert", "foo", "after", "worldx", "there"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(-1)),
+            run_command(&c, &["linsert", "foo", "before", "worldx", "there"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn llen() {
+        let c = create_connection();
+
+        assert_eq!(
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
+            run_command(&c, &["llen", "foo"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["llen", "foobar"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lmove_1() {
+        let c = create_connection();
+
+        assert_eq!(
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
+            run_command(&c, &["llen", "foo"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("1".into())),
+            run_command(&c, &["lmove", "foo", "bar", "left", "left"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Blob("1".into()),])),
+            run_command(&c, &["lrange", "bar", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("5".into())),
+            run_command(&c, &["lmove", "foo", "bar", "right", "left"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("5".into()),
+                Value::Blob("1".into()),
+            ])),
+            run_command(&c, &["lrange", "bar", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpop() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("5".into())),
+            run_command(&c, &["lpop", "foo"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Blob("4".into())])),
+            run_command(&c, &["lpop", "foo", "1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("3".into()),
+                Value::Blob("2".into()),
+                Value::Blob("1".into()),
+            ])),
+            run_command(&c, &["lpop", "foo", "55"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["lpop", "foo", "55"]).await
+        );
+
+        assert_eq!(Ok(Value::Null), run_command(&c, &["lpop", "foo"]).await);
+
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["llen", "foobar"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpos_single_match() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Integer(11)),
+            run_command(
+                &c,
+                &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(6)),
+            run_command(&c, &["lpos", "mylist", "3"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpos_single_skip() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Integer(11)),
+            run_command(
+                &c,
+                &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(8)),
+            run_command(&c, &["lpos", "mylist", "3", "rank", "2"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpos_single_skip_max_len() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Integer(11)),
+            run_command(
+                &c,
+                &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["lpos", "mylist", "3", "rank", "2", "maxlen", "7"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpos_not_found() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Array(vec![])),
+            run_command(&c, &["lpos", "mylist", "3", "count", "5", "maxlen", "9"]).await
+        );
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["lpos", "mylist", "3"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpos() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Integer(11)),
+            run_command(
+                &c,
+                &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
+            )
+            .await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Integer(6),
+                Value::Integer(8),
+                Value::Integer(9),
+            ])),
+            run_command(&c, &["lpos", "mylist", "3", "count", "5", "maxlen", "9"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpush() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("5".into()),
+                Value::Blob("4".into()),
+                Value::Blob("3".into()),
+                Value::Blob("2".into()),
+                Value::Blob("1".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(10)),
+            run_command(&c, &["lpush", "foo", "6", "7", "8", "9", "10"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("10".into()),
+                Value::Blob("9".into()),
+                Value::Blob("8".into()),
+                Value::Blob("7".into()),
+                Value::Blob("6".into()),
+                Value::Blob("5".into()),
+                Value::Blob("4".into()),
+                Value::Blob("3".into()),
+                Value::Blob("2".into()),
+                Value::Blob("1".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lpush_simple() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["lpush", "foo", "world"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["lpush", "foo", "hello"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("hello".into()),
+                Value::Blob("world".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lset() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::OK),
+            run_command(&c, &["lset", "foo", "-1", "6"]).await,
+        );
+
+        assert_eq!(
+            Ok(Value::OK),
+            run_command(&c, &["lset", "foo", "-2", "7"]).await,
+        );
+
+        assert_eq!(
+            Ok(Value::OK),
+            run_command(&c, &["lset", "foo", "0", "8"]).await,
+        );
+
+        assert_eq!(
+            Err(Error::OutOfRange),
+            run_command(&c, &["lset", "foo", "55", "8"]).await,
+        );
+
+        assert_eq!(
+            Err(Error::OutOfRange),
+            run_command(&c, &["lset", "foo", "-55", "8"]).await,
+        );
+
+        assert_eq!(
+            Err(Error::NotFound),
+            run_command(&c, &["lset", "key_not_exists", "-55", "8"]).await,
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("6".into())),
+            run_command(&c, &["rpop", "foo"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("7".into())),
+            run_command(&c, &["rpop", "foo"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("8".into())),
+            run_command(&c, &["lpop", "foo"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn ltrim() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::OK),
+            run_command(&c, &["ltrim", "foo", "1", "-2"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("2".into()),
+                Value::Blob("3".into()),
+                Value::Blob("4".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn rpop() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("5".into())),
+            run_command(&c, &["rpop", "foo"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Blob("4".into())])),
+            run_command(&c, &["rpop", "foo", "1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("3".into()),
+                Value::Blob("2".into()),
+                Value::Blob("1".into()),
+            ])),
+            run_command(&c, &["rpop", "foo", "55"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["rpop", "foo", "55"]).await
+        );
+
+        assert_eq!(Ok(Value::Null), run_command(&c, &["rpop", "foo"]).await);
+
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["llen", "foobar"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn rpush_simple() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["rpush", "foo", "world"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["rpush", "foo", "hello"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("world".into()),
+                Value::Blob("hello".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn lrange() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("1".into()),
+                Value::Blob("2".into()),
+                Value::Blob("3".into()),
+                Value::Blob("4".into()),
+                Value::Blob("5".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("1".into()),
+                Value::Blob("2".into()),
+                Value::Blob("3".into()),
+                Value::Blob("4".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-2"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("4".into()),
+                Value::Blob("5".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "-2", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Blob("3".into()),])),
+            run_command(&c, &["lrange", "foo", "-3", "-3"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn rpush() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("1".into()),
+                Value::Blob("2".into()),
+                Value::Blob("3".into()),
+                Value::Blob("4".into()),
+                Value::Blob("5".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(10)),
+            run_command(&c, &["rpush", "foo", "6", "7", "8", "9", "10"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("1".into()),
+                Value::Blob("2".into()),
+                Value::Blob("3".into()),
+                Value::Blob("4".into()),
+                Value::Blob("5".into()),
+                Value::Blob("6".into()),
+                Value::Blob("7".into()),
+                Value::Blob("8".into()),
+                Value::Blob("9".into()),
+                Value::Blob("10".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn rpushx() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("1".into()),
+                Value::Blob("2".into()),
+                Value::Blob("3".into()),
+                Value::Blob("4".into()),
+                Value::Blob("5".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(10)),
+            run_command(&c, &["rpushx", "foo", "6", "7", "8", "9", "10"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("1".into()),
+                Value::Blob("2".into()),
+                Value::Blob("3".into()),
+                Value::Blob("4".into()),
+                Value::Blob("5".into()),
+                Value::Blob("6".into()),
+                Value::Blob("7".into()),
+                Value::Blob("8".into()),
+                Value::Blob("9".into()),
+                Value::Blob("10".into()),
+            ])),
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["rpushx", "foobar", "6", "7", "8", "9", "10"]).await
+        );
+    }
+}

+ 3 - 2
src/cmd/mod.rs

@@ -1,6 +1,7 @@
 pub mod client;
 pub mod hash;
 pub mod key;
+pub mod list;
 pub mod string;
 
 #[cfg(test)]
@@ -28,11 +29,11 @@ mod test {
         all_connections.new_connection(db.clone(), client)
     }
 
-    pub fn run_command(conn: &Connection, cmd: &[&str]) -> Result<Value, Error> {
+    pub async fn run_command(conn: &Connection, cmd: &[&str]) -> Result<Value, Error> {
         let args: Vec<Bytes> = cmd.iter().map(|s| Bytes::from(s.to_string())).collect();
 
         let handler = Dispatcher::new(&args)?;
 
-        handler.deref().execute(&conn, &args)
+        handler.deref().execute(&conn, &args).await
     }
 }

+ 64 - 64
src/cmd/string.rs

@@ -5,52 +5,52 @@ use bytes::Bytes;
 use std::{convert::TryInto, ops::Neg};
 use tokio::time::Duration;
 
-pub fn incr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn incr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().incr(&args[1], 1)
 }
 
-pub fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = bytes_to_number(&args[2])?;
     conn.db().incr(&args[1], by)
 }
 
-pub fn incr_by_float(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn incr_by_float(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: f64 = bytes_to_number(&args[2])?;
     conn.db().incr(&args[1], by)
 }
 
-pub fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().incr(&args[1], -1)
 }
 
-pub fn decr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn decr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = (&Value::Blob(args[2].to_owned())).try_into()?;
     conn.db().incr(&args[1], by.neg())
 }
 
-pub fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().get(&args[1]))
 }
 
-pub fn getdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn getdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().getdel(&args[1]))
 }
 
-pub fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().getset(&args[1], &Value::Blob(args[2].to_owned())))
 }
 
-pub fn mget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn mget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().get_multi(&args[1..]))
 }
 
-pub fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn
         .db()
         .set(&args[1], Value::Blob(args[2].to_owned()), None))
 }
 
-pub fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = if check_arg!(args, 0, "SETEX") {
         Duration::from_secs(bytes_to_number(&args[2])?)
     } else {
@@ -62,11 +62,11 @@ pub fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         .set(&args[1], Value::Blob(args[2].to_owned()), Some(ttl)))
 }
 
-pub fn strlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn strlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match conn.db().get(&args[1]) {
         Value::Blob(x) => Ok((x.len() as i64).into()),
         Value::String(x) => Ok((x.len() as i64).into()),
-        Value::Null => Ok(0_i64.into()),
+        Value::Null => Ok(0.into()),
         _ => Ok(Error::WrongType.into()),
     }
 }
@@ -79,137 +79,137 @@ mod test {
         value::Value,
     };
 
-    #[test]
-    fn incr() {
+    #[tokio::test]
+    async fn incr() {
         let c = create_connection();
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(2)), r);
 
-        let x = run_command(&c, &["get", "foo"]);
+        let x = run_command(&c, &["get", "foo"]).await;
         assert_eq!(Ok(Value::Blob("2".into())), x);
     }
 
-    #[test]
-    fn incr_do_not_affect_ttl() {
+    #[tokio::test]
+    async fn incr_do_not_affect_ttl() {
         let c = create_connection();
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["expire", "foo", "60"]);
+        let r = run_command(&c, &["expire", "foo", "60"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
 
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(2)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
     }
 
-    #[test]
-    fn decr() {
+    #[tokio::test]
+    async fn decr() {
         let c = create_connection();
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-1)), r);
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-2)), r);
-        let x = run_command(&c, &["get", "foo"]);
+        let x = run_command(&c, &["get", "foo"]).await;
         assert_eq!(Ok(Value::Blob("-2".into())), x);
     }
 
-    #[test]
-    fn decr_do_not_affect_ttl() {
+    #[tokio::test]
+    async fn decr_do_not_affect_ttl() {
         let c = create_connection();
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-1)), r);
 
-        let r = run_command(&c, &["expire", "foo", "60"]);
+        let r = run_command(&c, &["expire", "foo", "60"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
 
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-2)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
     }
 
-    #[test]
-    fn get_and_set() {
+    #[tokio::test]
+    async fn get_and_set() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
-        let x = run_command(&c, &["get", "foo"]);
+        let x = run_command(&c, &["get", "foo"]).await;
         assert_eq!(Ok(Value::Blob("bar".into())), x);
     }
 
-    #[test]
-    fn getdel() {
+    #[tokio::test]
+    async fn getdel() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
         assert_eq!(
             Ok(Value::Blob("bar".into())),
-            run_command(&c, &["getdel", "foo"])
+            run_command(&c, &["getdel", "foo"]).await
         );
 
-        assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]));
+        assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]).await);
     }
 
-    #[test]
-    fn getset() {
+    #[tokio::test]
+    async fn getset() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
         assert_eq!(
             Ok(Value::Blob("bar".into())),
-            run_command(&c, &["getset", "foo", "1"])
+            run_command(&c, &["getset", "foo", "1"]).await
         );
 
         assert_eq!(
             Ok(Value::Blob("1".into())),
-            run_command(&c, &["get", "foo"])
+            run_command(&c, &["get", "foo"]).await
         );
     }
 
-    #[test]
-    fn strlen() {
+    #[tokio::test]
+    async fn strlen() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
-        let x = run_command(&c, &["strlen", "foo"]);
+        let x = run_command(&c, &["strlen", "foo"]).await;
         assert_eq!(Ok(Value::Integer(3)), x);
 
-        let x = run_command(&c, &["strlen", "foxxo"]);
+        let x = run_command(&c, &["strlen", "foxxo"]).await;
         assert_eq!(Ok(Value::Integer(0)), x);
     }
 
-    #[test]
-    fn wrong_type() {
+    #[tokio::test]
+    async fn wrong_type() {
         let c = create_connection();
-        let _ = run_command(&c, &["hset", "xxx", "key", "foo"]);
-        let _ = run_command(&c, &["incr", "foo"]);
+        let _ = run_command(&c, &["hset", "xxx", "key", "foo"]).await;
+        let _ = run_command(&c, &["incr", "foo"]).await;
 
-        let x = run_command(&c, &["strlen", "xxx"]);
+        let x = run_command(&c, &["strlen", "xxx"]).await;
         assert_eq!(Ok(Error::WrongType.into()), x);
 
-        let x = run_command(&c, &["get", "xxx"]);
+        let x = run_command(&c, &["get", "xxx"]).await;
         assert_eq!(Ok(Error::WrongType.into()), x);
 
-        let x = run_command(&c, &["get", "xxx"]);
+        let x = run_command(&c, &["get", "xxx"]).await;
         assert_eq!(Ok(Error::WrongType.into()), x);
 
-        let x = run_command(&c, &["mget", "xxx", "foo"]);
+        let x = run_command(&c, &["mget", "xxx", "foo"]).await;
         assert_eq!(
             Ok(Value::Array(vec![Value::Null, Value::Blob("1".into()),])),
             x

+ 7 - 7
src/db/mod.rs

@@ -90,12 +90,12 @@ impl Db {
         entries
             .get_mut(key)
             .filter(|x| x.is_valid())
-            .map_or(0_i64.into(), |x| {
+            .map_or(0.into(), |x| {
                 if x.has_ttl() {
                     x.persist();
-                    1_i64.into()
+                    1.into()
                 } else {
-                    0_i64.into()
+                    0.into()
                 }
             })
     }
@@ -107,15 +107,15 @@ impl Db {
         entries
             .get_mut(key)
             .filter(|x| x.is_valid())
-            .map_or(0_i64.into(), |x| {
+            .map_or(0.into(), |x| {
                 self.expirations.lock().unwrap().add(key, expires_at);
                 x.set_ttl(expires_at);
-                1_i64.into()
+                1.into()
             })
     }
 
     pub fn del(&self, keys: &[Bytes]) -> Value {
-        let mut deleted = 0_i64;
+        let mut deleted = 0;
         let mut expirations = self.expirations.lock().unwrap();
         keys.iter()
             .map(|key| {
@@ -133,7 +133,7 @@ impl Db {
     }
 
     pub fn exists(&self, keys: &[Bytes]) -> Value {
-        let mut matches = 0_i64;
+        let mut matches = 0;
         keys.iter()
             .map(|key| {
                 let entries = self.entries[self.get_slot(key)].read().unwrap();

+ 94 - 2
src/dispatcher.rs

@@ -4,7 +4,7 @@ use std::convert::TryInto;
 use std::time::SystemTime;
 use std::time::UNIX_EPOCH;
 
-fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
+async fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let seconds = format!("{}", since_the_epoch.as_secs());
@@ -13,7 +13,7 @@ fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     Ok(vec![seconds.as_str(), millis.as_str()].into())
 }
 
-fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
+async fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let in_ms: i128 =
@@ -22,6 +22,98 @@ fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
 }
 
 dispatcher! {
+    list {
+        blpop {
+            cmd::list::blpop,
+            [""],
+            -3,
+        },
+        brpop {
+            cmd::list::brpop,
+            [""],
+            -3,
+        },
+        lindex {
+            cmd::list::lindex,
+            [""],
+            3,
+        },
+        linsert {
+            cmd::list::linsert,
+            [""],
+            5,
+        },
+        llen {
+            cmd::list::llen,
+            [""],
+            2,
+        },
+        lmove {
+            cmd::list::lmove,
+            [""],
+            5,
+        },
+        lpop {
+            cmd::list::lpop,
+            [""],
+            -2,
+        },
+        lpos {
+            cmd::list::lpos,
+            [""],
+            -2,
+        },
+        lpush {
+            cmd::list::lpush,
+            [""],
+            -3,
+        },
+        lpushx {
+            cmd::list::lpush,
+            [""],
+            -3,
+        },
+        lrange {
+            cmd::list::lrange,
+            [""],
+            4,
+        },
+        lrem {
+            cmd::list::lrem,
+            [""],
+            4,
+        },
+        lset {
+            cmd::list::lset,
+            [""],
+            4,
+        },
+        ltrim {
+            cmd::list::ltrim,
+            [""],
+            4,
+        },
+        rpop {
+            cmd::list::rpop,
+            [""],
+            -2,
+        },
+        rpoplpush {
+            cmd::list::rpoplpush,
+            [""],
+            3,
+        },
+        rpush {
+            cmd::list::rpush,
+            [""],
+            -3,
+        },
+        rpushx {
+            cmd::list::rpush,
+            [""],
+            -3,
+        },
+    },
     hash {
         hdel {
             cmd::hash::hdel,

+ 4 - 0
src/error.rs

@@ -6,6 +6,8 @@ pub enum Error {
     InvalidArgsCount(String),
     Protocol(String, String),
     WrongArgument(String, String),
+    NotFound,
+    OutOfRange,
     Syntax,
     NotANumber,
     WrongType,
@@ -23,7 +25,9 @@ impl From<Error> for Value {
             Error::InvalidArgsCount(x) => format!("wrong number of arguments for '{}' command", x),
             Error::Protocol(x, y) => format!("Protocol error: expected '{}', got '{}'", x, y),
             Error::NotANumber => "value is not an integer or out of range".to_owned(),
+            Error::OutOfRange => "index out of range".to_owned(),
             Error::Syntax => "syntax error".to_owned(),
+            Error::NotFound => "no such key".to_owned(),
             Error::WrongArgument(x, y) => format!(
                 "Unknown subcommand or wrong number of arguments for '{}'. Try {} HELP.",
                 y, x

+ 7 - 3
src/macros.rs

@@ -13,6 +13,7 @@ macro_rules! dispatcher {
             #[allow(non_snake_case, non_camel_case_types)]
             pub mod $command {
                 use super::*;
+                use async_trait::async_trait;
 
                 pub struct Command {
                     pub tags: &'static [&'static str],
@@ -28,9 +29,10 @@ macro_rules! dispatcher {
                     }
                 }
 
+                #[async_trait]
                 impl ExecutableCommand for Command {
-                    fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-                        $handler(conn, args)
+                    async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+                        $handler(conn, args).await
                     }
 
                     fn check_number_args(&self, n: usize) -> bool {
@@ -53,9 +55,11 @@ macro_rules! dispatcher {
             }
         )+)+
         use std::ops::Deref;
+        use async_trait::async_trait;
 
+        #[async_trait]
         pub trait ExecutableCommand {
-            fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
+            async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
 
             fn check_number_args(&self, n: usize) -> bool;
 

+ 1 - 0
src/server.rs

@@ -78,6 +78,7 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
                                     let r = handler
                                         .deref()
                                         .execute(&conn, &args)
+                                        .await
                                         .unwrap_or_else(|x| x.into());
                                     if transport.send(r).await.is_err() {
                                         break;

+ 106 - 0
src/value/checksum.rs

@@ -0,0 +1,106 @@
+use crate::value;
+use bytes::Bytes;
+use crc32fast::Hasher;
+
+#[derive(Debug, Clone)]
+pub struct Value {
+    bytes: Bytes,
+    checksum: Option<u32>,
+}
+
+impl Value {
+    pub fn new(bytes: Bytes) -> Self {
+        let checksum = Self::calculate_checksum(&bytes);
+        Self { bytes, checksum }
+    }
+
+    pub fn clone_value(&self) -> value::Value {
+        value::Value::Blob(self.bytes.clone())
+    }
+
+    pub fn has_checksum(&self) -> bool {
+        self.checksum.is_some()
+    }
+
+    fn calculate_checksum(bytes: &Bytes) -> Option<u32> {
+        if bytes.len() < 1024 {
+            None
+        } else {
+            let mut hasher = Hasher::new();
+            hasher.update(bytes);
+            Some(hasher.finalize())
+        }
+    }
+}
+
+impl PartialEq for Value {
+    fn eq(&self, other: &Value) -> bool {
+        if self.checksum == other.checksum && self.bytes.len() == other.bytes.len() {
+            // The data have the same checksum now perform a more extensive
+            // comparision
+            return self.bytes.eq(&other.bytes);
+        }
+
+        false
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::bytes;
+
+    #[test]
+    fn does_not_have_checksum() {
+        let data = Value::new(bytes!(b"one"));
+        assert!(!data.has_checksum())
+    }
+
+    #[test]
+    fn has_checksum() {
+        let data = Value::new(bytes!(
+            b"
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+        "
+        ));
+        assert!(data.has_checksum())
+    }
+
+    #[test]
+    fn compare() {
+        let data1 = Value::new(bytes!(
+            b"
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+            one one one one one one one one one one one one one one one one one one
+        "
+        ));
+        assert!(data1.clone() == data1.clone());
+
+        let data2 = Value::new(bytes!(b"one"));
+        assert!(data2 == data2.clone());
+        assert!(data1 != data2);
+    }
+}

+ 55 - 0
src/value/locked.rs

@@ -0,0 +1,55 @@
+use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
+
+#[derive(Debug)]
+pub struct Value<T: Clone + PartialEq>(pub RwLock<T>);
+
+impl<T: Clone + PartialEq> Clone for Value<T> {
+    fn clone(&self) -> Self {
+        Self(RwLock::new(self.0.read().unwrap().clone()))
+    }
+}
+
+impl<T: PartialEq + Clone> PartialEq for Value<T> {
+    fn eq(&self, other: &Value<T>) -> bool {
+        self.0.read().unwrap().eq(&other.0.read().unwrap())
+    }
+}
+
+impl<T: PartialEq + Clone> Value<T> {
+    pub fn new(obj: T) -> Self {
+        Self(RwLock::new(obj))
+    }
+
+    pub fn write(&self) -> RwLockWriteGuard<'_, T> {
+        self.0.write().unwrap()
+    }
+
+    pub fn read(&self) -> RwLockReadGuard<'_, T> {
+        self.0.read().unwrap()
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn locked_eq1() {
+        let a = Value::new(1);
+        let b = Value::new(1);
+        assert!(a == b);
+    }
+
+    #[test]
+    fn locked_eq2() {
+        let a = Value::new(1);
+        let b = Value::new(2);
+        assert!(a != b);
+    }
+
+    #[test]
+    fn locked_clone() {
+        let a = Value::new((1, 2, 3));
+        assert!(a == a.clone());
+    }
+}

+ 17 - 36
src/value.rs → src/value/mod.rs

@@ -1,45 +1,19 @@
+pub mod checksum;
+pub mod locked;
+
 use crate::{error::Error, value_try_from, value_vec_try_from};
 use bytes::{Bytes, BytesMut};
 use redis_zero_protocol_parser::Value as ParsedValue;
 use std::{
-    collections::HashMap,
+    collections::{HashMap, VecDeque},
     convert::{TryFrom, TryInto},
     str::FromStr,
-    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
 };
 
-#[derive(Debug)]
-pub struct LockedValue<T: Clone + PartialEq>(pub RwLock<T>);
-
-impl<T: Clone + PartialEq> Clone for LockedValue<T> {
-    fn clone(&self) -> Self {
-        Self(RwLock::new(self.0.read().unwrap().clone()))
-    }
-}
-
-impl<T: PartialEq + Clone> PartialEq for LockedValue<T> {
-    fn eq(&self, other: &LockedValue<T>) -> bool {
-        self.0.read().unwrap().eq(&other.0.read().unwrap())
-    }
-}
-
-impl<T: PartialEq + Clone> LockedValue<T> {
-    pub fn new(obj: T) -> Self {
-        Self(RwLock::new(obj))
-    }
-
-    pub fn write(&self) -> RwLockWriteGuard<'_, T> {
-        self.0.write().unwrap()
-    }
-
-    pub fn read(&self) -> RwLockReadGuard<'_, T> {
-        self.0.read().unwrap()
-    }
-}
-
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
-    Hash(LockedValue<HashMap<Bytes, Bytes>>),
+    Hash(locked::Value<HashMap<Bytes, Bytes>>),
+    List(locked::Value<VecDeque<checksum::Value>>),
     Array(Vec<Value>),
     Blob(Bytes),
     String(String),
@@ -62,7 +36,7 @@ impl From<&Value> for Vec<u8> {
                     let b: Vec<u8> = i.into();
                     s.extend(b);
                 }
-                s.to_vec()
+                s
             }
             Value::Integer(x) => format!(":{}\r\n", x).into(),
             Value::BigInteger(x) => format!("({}\r\n", x).into(),
@@ -122,7 +96,7 @@ impl From<Value> for Vec<u8> {
 impl<'a> From<&ParsedValue<'a>> for Value {
     fn from(value: &ParsedValue) -> Self {
         match value {
-            ParsedValue::String(x) => Self::String(x.to_string()),
+            ParsedValue::String(x) => Self::String((*x).to_string()),
             ParsedValue::Blob(x) => Self::Blob(Bytes::copy_from_slice(*x)),
             ParsedValue::Array(x) => {
                 Self::Array(x.iter().map(|x| Value::try_from(x).unwrap()).collect())
@@ -131,13 +105,14 @@ impl<'a> From<&ParsedValue<'a>> for Value {
             ParsedValue::BigInteger(x) => Self::BigInteger(*x),
             ParsedValue::Integer(x) => Self::Integer(*x),
             ParsedValue::Float(x) => Self::Float(*x),
-            ParsedValue::Error(x, y) => Self::Err(x.to_string(), y.to_string()),
+            ParsedValue::Error(x, y) => Self::Err((*x).to_string(), (*y).to_string()),
             ParsedValue::Null => Self::Null,
         }
     }
 }
 
 value_try_from!(f64, Value::Float);
+value_try_from!(i32, Value::Integer);
 value_try_from!(i64, Value::Integer);
 value_try_from!(i128, Value::BigInteger);
 
@@ -149,7 +124,13 @@ impl From<&str> for Value {
 
 impl From<HashMap<Bytes, Bytes>> for Value {
     fn from(value: HashMap<Bytes, Bytes>) -> Value {
-        Value::Hash(LockedValue::new(value))
+        Value::Hash(locked::Value::new(value))
+    }
+}
+
+impl From<VecDeque<checksum::Value>> for Value {
+    fn from(value: VecDeque<checksum::Value>) -> Value {
+        Value::List(locked::Value::new(value))
     }
 }