Ver código fonte

Making each cmd handler async.

Making each controller async will allow flexibility and will make coding
blocking operations (B* list) and pub-sub much easier.

Added `blpop` / `brpop` support. This command is special because it may
lock the connection waiting for data to be inserted by another
connection. By making all commands handlers async the logic of 'waiting'
is super simple.

This retry every 100ms mechanism is the first and simple version, it
should be revisited to implement with Tokio/Notify or mpsc instead of
keep hitting the db every 100ms.
Cesar Rodas 3 anos atrás
pai
commit
0222d6a520
11 arquivos alterados com 364 adições e 213 exclusões
  1. 1 0
      Cargo.toml
  2. 3 3
      src/cmd/client.rs
  3. 52 52
      src/cmd/hash.rs
  4. 56 23
      src/cmd/key.rs
  5. 159 62
      src/cmd/list.rs
  6. 2 2
      src/cmd/mod.rs
  7. 63 63
      src/cmd/string.rs
  8. 17 2
      src/dispatcher.rs
  9. 7 3
      src/macros.rs
  10. 1 0
      src/server.rs
  11. 3 3
      src/value/mod.rs

+ 1 - 0
Cargo.toml

@@ -10,6 +10,7 @@ edition = "2018"
 redis-zero-protocol-parser = {path = "redis-protocol-parser"}
 tokio={version="1", features = ["full", "tracing"] }
 tokio-util={version="^0.6", features = ["full"] }
+async-trait = "0.1.50"
 crc32fast="^1.2"
 futures = { version = "0.3.0", features = ["thread-pool"]}
 tokio-stream="0.1"

+ 3 - 3
src/cmd/client.rs

@@ -2,7 +2,7 @@ use crate::{connection::Connection, error::Error, option, value::Value};
 use bytes::Bytes;
 use std::sync::Arc;
 
-pub fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let sub = unsafe { std::str::from_utf8_unchecked(&args[1]) }.to_string();
 
     let expected = match sub.to_lowercase().as_str() {
@@ -39,11 +39,11 @@ pub fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     }
 }
 
-pub fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(Value::Blob(args[1].to_owned()))
 }
 
-pub fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match args.len() {
         1 => Ok(Value::String("PONG".to_owned())),
         2 => Ok(Value::Blob(args[1].to_owned())),

+ 52 - 52
src/cmd/hash.rs

@@ -10,7 +10,7 @@ use std::{
     str::FromStr,
 };
 
-pub fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -32,7 +32,7 @@ pub fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -47,7 +47,7 @@ pub fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -62,7 +62,7 @@ pub fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -82,7 +82,7 @@ pub fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hincrby<
+pub async fn hincrby<
     T: ToString + FromStr + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
 >(
     conn: &Connection,
@@ -115,7 +115,7 @@ pub fn hincrby<
     )
 }
 
-pub fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -134,7 +134,7 @@ pub fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -145,7 +145,7 @@ pub fn hlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -170,7 +170,7 @@ pub fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let (count, with_values) = match args.len() {
         2 => (None, false),
         3 => (Some(bytes_to_number::<i64>(&args[2])?), false),
@@ -237,7 +237,7 @@ pub fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
     }
@@ -269,7 +269,7 @@ pub fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -298,7 +298,7 @@ pub fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -313,7 +313,7 @@ pub fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn hvals(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn hvals(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -339,25 +339,25 @@ mod test {
         value::Value,
     };
 
-    #[test]
-    fn hget() {
+    #[tokio::test]
+    async fn hget() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hget", "foo", "f1"]);
+        let r = run_command(&c, &["hget", "foo", "f1"]).await;
         assert_eq!(Ok(Value::Blob("1".into())), r);
     }
 
-    #[test]
-    fn hgetall() {
+    #[tokio::test]
+    async fn hgetall() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hgetall", "foo"]);
+        let r = run_command(&c, &["hgetall", "foo"]).await;
         match r {
             Ok(Value::Array(x)) => {
                 assert_eq!(6, x.len());
@@ -371,14 +371,14 @@ mod test {
         };
     }
 
-    #[test]
-    fn hrandfield() {
+    #[tokio::test]
+    async fn hrandfield() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hrandfield", "foo"]);
+        let r = run_command(&c, &["hrandfield", "foo"]).await;
         match r {
             Ok(Value::Blob(x)) => {
                 let x = unsafe { std::str::from_utf8_unchecked(&x) };
@@ -388,14 +388,14 @@ mod test {
         };
     }
 
-    #[test]
-    fn hmget() {
+    #[tokio::test]
+    async fn hmget() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hmget", "foo", "f1", "f2"]);
+        let r = run_command(&c, &["hmget", "foo", "f1", "f2"]).await;
         assert_eq!(
             Ok(Value::Array(vec![
                 Value::Blob("1".into()),
@@ -405,70 +405,70 @@ mod test {
         );
     }
 
-    #[test]
-    fn hexists() {
+    #[tokio::test]
+    async fn hexists() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["hexists", "foo", "f1"])
+            run_command(&c, &["hexists", "foo", "f1"]).await
         );
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["hexists", "foo", "f3"])
+            run_command(&c, &["hexists", "foo", "f3"]).await
         );
         assert_eq!(
             Ok(Value::Integer(0)),
-            run_command(&c, &["hexists", "foo", "f4"])
+            run_command(&c, &["hexists", "foo", "f4"]).await
         );
     }
-    #[test]
-    fn hstrlen() {
+    #[tokio::test]
+    async fn hstrlen() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hstrlen", "foo", "f1"]);
+        let r = run_command(&c, &["hstrlen", "foo", "f1"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
     }
 
-    #[test]
-    fn hlen() {
+    #[tokio::test]
+    async fn hlen() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
 
         assert_eq!(Ok(Value::Integer(3)), r);
 
-        let r = run_command(&c, &["hset", "foo", "f1", "2", "f4", "2", "f5", "3"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "2", "f4", "2", "f5", "3"]).await;
         assert_eq!(Ok(Value::Integer(2)), r);
 
-        let r = run_command(&c, &["hlen", "foo"]);
+        let r = run_command(&c, &["hlen", "foo"]).await;
         assert_eq!(Ok(Value::Integer(5)), r);
     }
 
-    #[test]
-    fn hkeys() {
+    #[tokio::test]
+    async fn hkeys() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1"]).await;
 
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["hkeys", "foo"]);
+        let r = run_command(&c, &["hkeys", "foo"]).await;
         assert_eq!(Ok(Value::Array(vec![Value::Blob("f1".into()),])), r);
     }
 
-    #[test]
-    fn hvals() {
+    #[tokio::test]
+    async fn hvals() {
         let c = create_connection();
-        let r = run_command(&c, &["hset", "foo", "f1", "1"]);
+        let r = run_command(&c, &["hset", "foo", "f1", "1"]).await;
 
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["hvals", "foo"]);
+        let r = run_command(&c, &["hvals", "foo"]).await;
         assert_eq!(Ok(Value::Array(vec![Value::Blob("1".into()),])), r);
     }
 }

+ 56 - 23
src/cmd/key.rs

@@ -12,15 +12,15 @@ pub fn now() -> Duration {
         .expect("Time went backwards")
 }
 
-pub fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().del(&args[1..]))
 }
 
-pub fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().exists(&args[1..]))
 }
 
-pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let expires_in: i64 = bytes_to_number(&args[2])?;
 
     if expires_in <= 0 {
@@ -37,7 +37,7 @@ pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
-pub fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let secs = check_arg!(args, 0, "EXPIREAT");
     let expires_at: i64 = bytes_to_number(&args[2])?;
     let expires_in: i64 = if secs {
@@ -60,7 +60,7 @@ pub fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
-pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
             let ttl = ttl - Instant::now();
@@ -77,7 +77,7 @@ pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(ttl.into())
 }
 
-pub fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
             // Is there a better way? There should be!
@@ -96,7 +96,7 @@ pub fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(ttl.into())
 }
 
-pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().persist(&args[1]))
 }
 
@@ -107,28 +107,61 @@ mod test {
         value::Value,
     };
 
-    #[test]
-    fn del() {
+    #[tokio::test]
+    async fn del() {
         let c = create_connection();
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["incr", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["exists", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["del", "foo"]));
-        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["del", "foo"]));
-        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["exists", "foo"]));
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["incr", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["exists", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["del", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["del", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["exists", "foo"]).await
+        );
     }
 
-    #[test]
-    fn expire_and_persist() {
+    #[tokio::test]
+    async fn expire_and_persist() {
         let c = create_connection();
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["incr", "foo"]));
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["pexpire", "foo", "6000"])
+            run_command(&c, &["incr", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["pexpire", "foo", "6000"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(5999)),
+            run_command(&c, &["pttl", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["persist", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(-1)),
+            run_command(&c, &["pttl", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["del", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(-2)),
+            run_command(&c, &["pttl", "foo"]).await
         );
-        assert_eq!(Ok(Value::Integer(5999)), run_command(&c, &["pttl", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["persist", "foo"]));
-        assert_eq!(Ok(Value::Integer(-1)), run_command(&c, &["pttl", "foo"]));
-        assert_eq!(Ok(Value::Integer(1)), run_command(&c, &["del", "foo"]));
-        assert_eq!(Ok(Value::Integer(-2)), run_command(&c, &["pttl", "foo"]));
     }
 }

+ 159 - 62
src/cmd/list.rs

@@ -4,7 +4,9 @@ use crate::{
 };
 use bytes::Bytes;
 use std::collections::LinkedList;
+use tokio::time::{sleep, Duration, Instant};
 
+#[allow(clippy::needless_range_loop)]
 fn remove_element(
     conn: &Connection,
     key: &Bytes,
@@ -12,7 +14,7 @@ fn remove_element(
     front: bool,
 ) -> Result<Value, Error> {
     conn.db().get_map_or(
-        &key,
+        key,
         |v| match v {
             Value::List(x) => {
                 let mut x = x.write();
@@ -39,7 +41,7 @@ fn remove_element(
                     .map(|x| x.as_ref().unwrap().clone_value())
                     .collect();
 
-                Ok(if ret.len() == 0 {
+                Ok(if ret.is_empty() {
                     Value::Null
                 } else {
                     ret.into()
@@ -51,8 +53,51 @@ fn remove_element(
     )
 }
 
+pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
+    let len = args.len() - 1;
 
-pub fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    loop {
+        for key in args[1..len].iter() {
+            match remove_element(conn, key, 0, true)? {
+                Value::Null => (),
+                n => return Ok(vec![Value::Blob(key.clone()), n].into()),
+            };
+        }
+
+        if Instant::now() >= timeout {
+            break;
+        }
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    Ok(Value::Null)
+}
+
+pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
+    let len = args.len() - 1;
+
+    loop {
+        for key in args[1..len].iter() {
+            match remove_element(conn, key, 0, false)? {
+                Value::Null => (),
+                n => return Ok(vec![Value::Blob(key.clone()), n].into()),
+            };
+        }
+
+        if Instant::now() >= timeout {
+            break;
+        }
+
+        sleep(Duration::from_millis(100)).await;
+    }
+
+    Ok(Value::Null)
+}
+
+pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -63,7 +108,7 @@ pub fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let count = if args.len() > 2 {
         bytes_to_number(&args[2])?
     } else {
@@ -73,7 +118,7 @@ pub fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     remove_element(conn, &args[1], count, true)
 }
 
-pub fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let is_push_x = check_arg!(args, 0, "LPUSHX");
 
     conn.db().get_map_or(
@@ -105,7 +150,7 @@ pub fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
         |v| match v {
@@ -136,7 +181,17 @@ pub fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
-pub fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let count = if args.len() > 2 {
+        bytes_to_number(&args[2])?
+    } else {
+        0
+    };
+
+    remove_element(conn, &args[1], count, false)
+}
+
+pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let is_push_x = check_arg!(args, 0, "RPUSHX");
 
     conn.db().get_map_or(
@@ -175,37 +230,38 @@ mod test {
         value::Value,
     };
 
-    #[test]
-    fn llen() {
+    #[tokio::test]
+    async fn llen() {
         let c = create_connection();
 
         assert_eq!(
-            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]),
-            run_command(&c, &["llen", "foo"])
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
+            run_command(&c, &["llen", "foo"]).await
         );
 
-        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["llen", "foobar"]));
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["llen", "foobar"]).await
+        );
     }
 
-    #[test]
-    fn lpop() {
+    #[tokio::test]
+    async fn lpop() {
         let c = create_connection();
 
         assert_eq!(
             Ok(Value::Integer(5)),
-            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]),
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
         );
 
         assert_eq!(
             Ok(Value::Blob("5".into())),
-            run_command(&c, &["lpop", "foo"])
+            run_command(&c, &["lpop", "foo"]).await
         );
 
         assert_eq!(
-            Ok(Value::Array(vec![
-                Value::Blob("4".into())
-            ])),
-            run_command(&c, &["lpop", "foo", "1"])
+            Ok(Value::Array(vec![Value::Blob("4".into())])),
+            run_command(&c, &["lpop", "foo", "1"]).await
         );
 
         assert_eq!(
@@ -214,29 +270,29 @@ mod test {
                 Value::Blob("2".into()),
                 Value::Blob("1".into()),
             ])),
-            run_command(&c, &["lpop", "foo", "55"])
+            run_command(&c, &["lpop", "foo", "55"]).await
         );
 
         assert_eq!(
             Ok(Value::Null),
-            run_command(&c, &["lpop", "foo", "55"])
+            run_command(&c, &["lpop", "foo", "55"]).await
         );
 
+        assert_eq!(Ok(Value::Null), run_command(&c, &["lpop", "foo"]).await);
+
         assert_eq!(
-            Ok(Value::Null),
-            run_command(&c, &["lpop", "foo"])
+            Ok(Value::Integer(0)),
+            run_command(&c, &["llen", "foobar"]).await
         );
-
-        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["llen", "foobar"]));
     }
 
-    #[test]
-    fn lpush() {
+    #[tokio::test]
+    async fn lpush() {
         let c = create_connection();
 
         assert_eq!(
             Ok(Value::Integer(5)),
-            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"])
+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await
         );
 
         assert_eq!(
@@ -247,12 +303,12 @@ mod test {
                 Value::Blob("2".into()),
                 Value::Blob("1".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
 
         assert_eq!(
             Ok(Value::Integer(10)),
-            run_command(&c, &["lpush", "foo", "6", "7", "8", "9", "10"])
+            run_command(&c, &["lpush", "foo", "6", "7", "8", "9", "10"]).await
         );
 
         assert_eq!(
@@ -268,22 +324,22 @@ mod test {
                 Value::Blob("2".into()),
                 Value::Blob("1".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
     }
 
-    #[test]
-    fn lpush_simple() {
+    #[tokio::test]
+    async fn lpush_simple() {
         let c = create_connection();
 
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["lpush", "foo", "world"])
+            run_command(&c, &["lpush", "foo", "world"]).await
         );
 
         assert_eq!(
             Ok(Value::Integer(2)),
-            run_command(&c, &["lpush", "foo", "hello"])
+            run_command(&c, &["lpush", "foo", "hello"]).await
         );
 
         assert_eq!(
@@ -291,22 +347,63 @@ mod test {
                 Value::Blob("hello".into()),
                 Value::Blob("world".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn rpop() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(5)),
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("5".into())),
+            run_command(&c, &["rpop", "foo"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Blob("4".into())])),
+            run_command(&c, &["rpop", "foo", "1"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Blob("3".into()),
+                Value::Blob("2".into()),
+                Value::Blob("1".into()),
+            ])),
+            run_command(&c, &["rpop", "foo", "55"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["rpop", "foo", "55"]).await
+        );
+
+        assert_eq!(Ok(Value::Null), run_command(&c, &["rpop", "foo"]).await);
+
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["llen", "foobar"]).await
         );
     }
 
-    #[test]
-    fn rpush_simple() {
+    #[tokio::test]
+    async fn rpush_simple() {
         let c = create_connection();
 
         assert_eq!(
             Ok(Value::Integer(1)),
-            run_command(&c, &["rpush", "foo", "world"])
+            run_command(&c, &["rpush", "foo", "world"]).await
         );
 
         assert_eq!(
             Ok(Value::Integer(2)),
-            run_command(&c, &["rpush", "foo", "hello"])
+            run_command(&c, &["rpush", "foo", "hello"]).await
         );
 
         assert_eq!(
@@ -314,17 +411,17 @@ mod test {
                 Value::Blob("world".into()),
                 Value::Blob("hello".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
     }
 
-    #[test]
-    fn lrange() {
+    #[tokio::test]
+    async fn lrange() {
         let c = create_connection();
 
         assert_eq!(
             Ok(Value::Integer(5)),
-            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"])
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
         );
 
         assert_eq!(
@@ -335,7 +432,7 @@ mod test {
                 Value::Blob("4".into()),
                 Value::Blob("5".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
 
         assert_eq!(
@@ -345,7 +442,7 @@ mod test {
                 Value::Blob("3".into()),
                 Value::Blob("4".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-2"])
+            run_command(&c, &["lrange", "foo", "0", "-2"]).await
         );
 
         assert_eq!(
@@ -353,22 +450,22 @@ mod test {
                 Value::Blob("4".into()),
                 Value::Blob("5".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "-2", "-1"])
+            run_command(&c, &["lrange", "foo", "-2", "-1"]).await
         );
 
         assert_eq!(
             Ok(Value::Array(vec![Value::Blob("3".into()),])),
-            run_command(&c, &["lrange", "foo", "-3", "-3"])
+            run_command(&c, &["lrange", "foo", "-3", "-3"]).await
         );
     }
 
-    #[test]
-    fn rpush() {
+    #[tokio::test]
+    async fn rpush() {
         let c = create_connection();
 
         assert_eq!(
             Ok(Value::Integer(5)),
-            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"])
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
         );
 
         assert_eq!(
@@ -379,12 +476,12 @@ mod test {
                 Value::Blob("4".into()),
                 Value::Blob("5".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
 
         assert_eq!(
             Ok(Value::Integer(10)),
-            run_command(&c, &["rpush", "foo", "6", "7", "8", "9", "10"])
+            run_command(&c, &["rpush", "foo", "6", "7", "8", "9", "10"]).await
         );
 
         assert_eq!(
@@ -400,17 +497,17 @@ mod test {
                 Value::Blob("9".into()),
                 Value::Blob("10".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
     }
 
-    #[test]
-    fn rpushx() {
+    #[tokio::test]
+    async fn rpushx() {
         let c = create_connection();
 
         assert_eq!(
             Ok(Value::Integer(5)),
-            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"])
+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
         );
 
         assert_eq!(
@@ -421,12 +518,12 @@ mod test {
                 Value::Blob("4".into()),
                 Value::Blob("5".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
 
         assert_eq!(
             Ok(Value::Integer(10)),
-            run_command(&c, &["rpushx", "foo", "6", "7", "8", "9", "10"])
+            run_command(&c, &["rpushx", "foo", "6", "7", "8", "9", "10"]).await
         );
 
         assert_eq!(
@@ -442,12 +539,12 @@ mod test {
                 Value::Blob("9".into()),
                 Value::Blob("10".into()),
             ])),
-            run_command(&c, &["lrange", "foo", "0", "-1"])
+            run_command(&c, &["lrange", "foo", "0", "-1"]).await
         );
 
         assert_eq!(
             Ok(Value::Integer(0)),
-            run_command(&c, &["rpushx", "foobar", "6", "7", "8", "9", "10"])
+            run_command(&c, &["rpushx", "foobar", "6", "7", "8", "9", "10"]).await
         );
     }
 }

+ 2 - 2
src/cmd/mod.rs

@@ -29,11 +29,11 @@ mod test {
         all_connections.new_connection(db.clone(), client)
     }
 
-    pub fn run_command(conn: &Connection, cmd: &[&str]) -> Result<Value, Error> {
+    pub async fn run_command(conn: &Connection, cmd: &[&str]) -> Result<Value, Error> {
         let args: Vec<Bytes> = cmd.iter().map(|s| Bytes::from(s.to_string())).collect();
 
         let handler = Dispatcher::new(&args)?;
 
-        handler.deref().execute(&conn, &args)
+        handler.deref().execute(&conn, &args).await
     }
 }

+ 63 - 63
src/cmd/string.rs

@@ -5,52 +5,52 @@ use bytes::Bytes;
 use std::{convert::TryInto, ops::Neg};
 use tokio::time::Duration;
 
-pub fn incr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn incr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().incr(&args[1], 1)
 }
 
-pub fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = bytes_to_number(&args[2])?;
     conn.db().incr(&args[1], by)
 }
 
-pub fn incr_by_float(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn incr_by_float(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: f64 = bytes_to_number(&args[2])?;
     conn.db().incr(&args[1], by)
 }
 
-pub fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().incr(&args[1], -1)
 }
 
-pub fn decr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn decr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = (&Value::Blob(args[2].to_owned())).try_into()?;
     conn.db().incr(&args[1], by.neg())
 }
 
-pub fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().get(&args[1]))
 }
 
-pub fn getdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn getdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().getdel(&args[1]))
 }
 
-pub fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().getset(&args[1], &Value::Blob(args[2].to_owned())))
 }
 
-pub fn mget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn mget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().get_multi(&args[1..]))
 }
 
-pub fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn
         .db()
         .set(&args[1], Value::Blob(args[2].to_owned()), None))
 }
 
-pub fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = if check_arg!(args, 0, "SETEX") {
         Duration::from_secs(bytes_to_number(&args[2])?)
     } else {
@@ -62,7 +62,7 @@ pub fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         .set(&args[1], Value::Blob(args[2].to_owned()), Some(ttl)))
 }
 
-pub fn strlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+pub async fn strlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match conn.db().get(&args[1]) {
         Value::Blob(x) => Ok((x.len() as i64).into()),
         Value::String(x) => Ok((x.len() as i64).into()),
@@ -79,137 +79,137 @@ mod test {
         value::Value,
     };
 
-    #[test]
-    fn incr() {
+    #[tokio::test]
+    async fn incr() {
         let c = create_connection();
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(2)), r);
 
-        let x = run_command(&c, &["get", "foo"]);
+        let x = run_command(&c, &["get", "foo"]).await;
         assert_eq!(Ok(Value::Blob("2".into())), x);
     }
 
-    #[test]
-    fn incr_do_not_affect_ttl() {
+    #[tokio::test]
+    async fn incr_do_not_affect_ttl() {
         let c = create_connection();
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["expire", "foo", "60"]);
+        let r = run_command(&c, &["expire", "foo", "60"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
 
-        let r = run_command(&c, &["incr", "foo"]);
+        let r = run_command(&c, &["incr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(2)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
     }
 
-    #[test]
-    fn decr() {
+    #[tokio::test]
+    async fn decr() {
         let c = create_connection();
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-1)), r);
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-2)), r);
-        let x = run_command(&c, &["get", "foo"]);
+        let x = run_command(&c, &["get", "foo"]).await;
         assert_eq!(Ok(Value::Blob("-2".into())), x);
     }
 
-    #[test]
-    fn decr_do_not_affect_ttl() {
+    #[tokio::test]
+    async fn decr_do_not_affect_ttl() {
         let c = create_connection();
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-1)), r);
 
-        let r = run_command(&c, &["expire", "foo", "60"]);
+        let r = run_command(&c, &["expire", "foo", "60"]).await;
         assert_eq!(Ok(Value::Integer(1)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
 
-        let r = run_command(&c, &["decr", "foo"]);
+        let r = run_command(&c, &["decr", "foo"]).await;
         assert_eq!(Ok(Value::Integer(-2)), r);
 
-        let r = run_command(&c, &["ttl", "foo"]);
+        let r = run_command(&c, &["ttl", "foo"]).await;
         assert_eq!(Ok(Value::Integer(59)), r);
     }
 
-    #[test]
-    fn get_and_set() {
+    #[tokio::test]
+    async fn get_and_set() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
-        let x = run_command(&c, &["get", "foo"]);
+        let x = run_command(&c, &["get", "foo"]).await;
         assert_eq!(Ok(Value::Blob("bar".into())), x);
     }
 
-    #[test]
-    fn getdel() {
+    #[tokio::test]
+    async fn getdel() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
         assert_eq!(
             Ok(Value::Blob("bar".into())),
-            run_command(&c, &["getdel", "foo"])
+            run_command(&c, &["getdel", "foo"]).await
         );
 
-        assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]));
+        assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]).await);
     }
 
-    #[test]
-    fn getset() {
+    #[tokio::test]
+    async fn getset() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
         assert_eq!(
             Ok(Value::Blob("bar".into())),
-            run_command(&c, &["getset", "foo", "1"])
+            run_command(&c, &["getset", "foo", "1"]).await
         );
 
         assert_eq!(
             Ok(Value::Blob("1".into())),
-            run_command(&c, &["get", "foo"])
+            run_command(&c, &["get", "foo"]).await
         );
     }
 
-    #[test]
-    fn strlen() {
+    #[tokio::test]
+    async fn strlen() {
         let c = create_connection();
-        let x = run_command(&c, &["set", "foo", "bar"]);
+        let x = run_command(&c, &["set", "foo", "bar"]).await;
         assert_eq!(Ok(Value::OK), x);
 
-        let x = run_command(&c, &["strlen", "foo"]);
+        let x = run_command(&c, &["strlen", "foo"]).await;
         assert_eq!(Ok(Value::Integer(3)), x);
 
-        let x = run_command(&c, &["strlen", "foxxo"]);
+        let x = run_command(&c, &["strlen", "foxxo"]).await;
         assert_eq!(Ok(Value::Integer(0)), x);
     }
 
-    #[test]
-    fn wrong_type() {
+    #[tokio::test]
+    async fn wrong_type() {
         let c = create_connection();
-        let _ = run_command(&c, &["hset", "xxx", "key", "foo"]);
-        let _ = run_command(&c, &["incr", "foo"]);
+        let _ = run_command(&c, &["hset", "xxx", "key", "foo"]).await;
+        let _ = run_command(&c, &["incr", "foo"]).await;
 
-        let x = run_command(&c, &["strlen", "xxx"]);
+        let x = run_command(&c, &["strlen", "xxx"]).await;
         assert_eq!(Ok(Error::WrongType.into()), x);
 
-        let x = run_command(&c, &["get", "xxx"]);
+        let x = run_command(&c, &["get", "xxx"]).await;
         assert_eq!(Ok(Error::WrongType.into()), x);
 
-        let x = run_command(&c, &["get", "xxx"]);
+        let x = run_command(&c, &["get", "xxx"]).await;
         assert_eq!(Ok(Error::WrongType.into()), x);
 
-        let x = run_command(&c, &["mget", "xxx", "foo"]);
+        let x = run_command(&c, &["mget", "xxx", "foo"]).await;
         assert_eq!(
             Ok(Value::Array(vec![Value::Null, Value::Blob("1".into()),])),
             x

+ 17 - 2
src/dispatcher.rs

@@ -4,7 +4,7 @@ use std::convert::TryInto;
 use std::time::SystemTime;
 use std::time::UNIX_EPOCH;
 
-fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
+async fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let seconds = format!("{}", since_the_epoch.as_secs());
@@ -13,7 +13,7 @@ fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     Ok(vec![seconds.as_str(), millis.as_str()].into())
 }
 
-fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
+async fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let in_ms: i128 =
@@ -23,6 +23,16 @@ fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
 
 dispatcher! {
     list {
+        blpop {
+            cmd::list::blpop,
+            [""],
+            -3,
+        },
+        brpop {
+            cmd::list::brpop,
+            [""],
+            -3,
+        },
         llen {
             cmd::list::llen,
             [""],
@@ -48,6 +58,11 @@ dispatcher! {
             [""],
             4,
         },
+        rpop {
+            cmd::list::rpop,
+            [""],
+            -2,
+        },
         rpush {
             cmd::list::rpush,
             [""],

+ 7 - 3
src/macros.rs

@@ -13,6 +13,7 @@ macro_rules! dispatcher {
             #[allow(non_snake_case, non_camel_case_types)]
             pub mod $command {
                 use super::*;
+                use async_trait::async_trait;
 
                 pub struct Command {
                     pub tags: &'static [&'static str],
@@ -28,9 +29,10 @@ macro_rules! dispatcher {
                     }
                 }
 
+                #[async_trait]
                 impl ExecutableCommand for Command {
-                    fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-                        $handler(conn, args)
+                    async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+                        $handler(conn, args).await
                     }
 
                     fn check_number_args(&self, n: usize) -> bool {
@@ -53,9 +55,11 @@ macro_rules! dispatcher {
             }
         )+)+
         use std::ops::Deref;
+        use async_trait::async_trait;
 
+        #[async_trait]
         pub trait ExecutableCommand {
-            fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
+            async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
 
             fn check_number_args(&self, n: usize) -> bool;
 

+ 1 - 0
src/server.rs

@@ -78,6 +78,7 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
                                     let r = handler
                                         .deref()
                                         .execute(&conn, &args)
+                                        .await
                                         .unwrap_or_else(|x| x.into());
                                     if transport.send(r).await.is_err() {
                                         break;

+ 3 - 3
src/value/mod.rs

@@ -36,7 +36,7 @@ impl From<&Value> for Vec<u8> {
                     let b: Vec<u8> = i.into();
                     s.extend(b);
                 }
-                s.to_vec()
+                s
             }
             Value::Integer(x) => format!(":{}\r\n", x).into(),
             Value::BigInteger(x) => format!("({}\r\n", x).into(),
@@ -96,7 +96,7 @@ impl From<Value> for Vec<u8> {
 impl<'a> From<&ParsedValue<'a>> for Value {
     fn from(value: &ParsedValue) -> Self {
         match value {
-            ParsedValue::String(x) => Self::String(x.to_string()),
+            ParsedValue::String(x) => Self::String((*x).to_string()),
             ParsedValue::Blob(x) => Self::Blob(Bytes::copy_from_slice(*x)),
             ParsedValue::Array(x) => {
                 Self::Array(x.iter().map(|x| Value::try_from(x).unwrap()).collect())
@@ -105,7 +105,7 @@ impl<'a> From<&ParsedValue<'a>> for Value {
             ParsedValue::BigInteger(x) => Self::BigInteger(*x),
             ParsedValue::Integer(x) => Self::Integer(*x),
             ParsedValue::Float(x) => Self::Float(*x),
-            ParsedValue::Error(x, y) => Self::Err(x.to_string(), y.to_string()),
+            ParsedValue::Error(x, y) => Self::Err((*x).to_string(), (*y).to_string()),
             ParsedValue::Null => Self::Null,
         }
     }