Browse Source

Add support for code coverage (#56)

* Add support for code coverage

* Adding more test coverage

* Minor code improvements

Replaced an abscure macro for a generic Into

* Add more tests
César D. Rodas 2 years ago
parent
commit
7870922f60

+ 38 - 7
.github/workflows/ci.yml

@@ -10,6 +10,18 @@ env:
   CARGO_TERM_COLOR: always
 
 jobs:
+  rust-test:
+    runs-on: ubuntu-latest
+    steps:
+    - uses: actions/checkout@v3
+    - name: prepare
+      run: rustup component add clippy
+    - name: fmt
+      run: make fmt
+    - name: clippy
+      run: make clippy
+    - name: unit test
+      run: make unit-test
 
   test-ubuntu-latest:
     runs-on: ubuntu-latest
@@ -19,12 +31,31 @@ jobs:
       run: |
         rustup component add clippy
         sudo apt-get install tcl8.6 tclx
-    - name: Fmt
-      run: make fmt
-    - name: Clippy
-      run: make clippy
-    - name: Unit test
-      run: make unit-test
-    - name: Test
+    - name: test
       run: make test
 
+  check-code-coverage:
+    name: Code coverage
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout repository
+        uses: actions/checkout@v2
+
+      - name: Install stable toolchain
+        uses: actions-rs/toolchain@v1
+        with:
+          toolchain: stable
+          override: true
+
+      - name: Run cargo-tarpaulin
+        uses: actions-rs/tarpaulin@v0.1
+        with:
+          version: '0.15.0'
+          args: '--all -v'
+          out-type: 'Html'
+
+      - name: Archive code coverage results
+        uses: actions/upload-artifact@v1
+        with:
+          name: code-coverage-report
+          path: tarpaulin-report.html

+ 7 - 0
Cargo.lock

@@ -604,6 +604,7 @@ dependencies = [
  "metered",
  "num-traits",
  "parking_lot 0.11.2",
+ "paste",
  "rand",
  "redis-config-parser",
  "redis-zero-protocol-parser",
@@ -741,6 +742,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "paste"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc"
+
+[[package]]
 name = "pin-project-lite"
 version = "0.2.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"

+ 1 - 0
Cargo.toml

@@ -34,6 +34,7 @@ thiserror = "1.0.30"
 strum = "0.24"
 strum_macros = "0.24"
 num-traits = "0.2.15"
+paste = "1.0.7"
 
 [workspace]
 members = ["redis-config-parser"]

+ 129 - 2
src/cmd/client.rs

@@ -3,7 +3,6 @@
 use crate::{
     connection::{Connection, ConnectionStatus, UnblockReason},
     error::Error,
-    option,
     value::{bytes_to_int, bytes_to_number, Value},
 };
 use bytes::Bytes;
@@ -34,7 +33,7 @@ pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match sub.to_lowercase().as_str() {
         "id" => Ok((conn.id() as i64).into()),
         "info" => Ok(conn.to_string().into()),
-        "getname" => Ok(option!(conn.name())),
+        "getname" => Ok(conn.name().into()),
         "list" => {
             let mut list_client = "".to_owned();
             conn.all_connections()
@@ -126,6 +125,7 @@ mod test {
         error::Error,
         value::Value,
     };
+    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 
     #[tokio::test]
     async fn select() {
@@ -167,4 +167,131 @@ mod test {
             run_command(&c, &["select", "10000000"]).await
         );
     }
+
+    #[tokio::test]
+    async fn client_wrong_args() {
+        let c = create_connection();
+        assert_eq!(
+            Err(Error::WrongArgument("client".to_owned(), "ID".to_owned())),
+            run_command(&c, &["client", "id", "xxx"]).await
+        );
+        assert_eq!(
+            Err(Error::WrongArgument("client".to_owned(), "XXX".to_owned())),
+            run_command(&c, &["client", "xxx"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn client_id() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["client", "id"]).await);
+        assert_eq!(
+            Ok("id=1 addr=127.0.0.1:8080 name=None db=0\r\n".into()),
+            run_command(&c, &["client", "info"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn client_set_name() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["client", "getname"]).await
+        );
+        assert_eq!(
+            Ok(Value::Ok),
+            run_command(&c, &["client", "setname", "test"]).await
+        );
+        assert_eq!(
+            Ok("test".into()),
+            run_command(&c, &["client", "getname"]).await
+        );
+        assert_eq!(Ok(1.into()), run_command(&c, &["client", "id"]).await);
+    }
+
+    #[tokio::test]
+    async fn client_unblock_1() {
+        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
+        let c1 = create_connection();
+        let (mut c2_recv, c2) = c1.all_connections().new_connection(c1.db(), addr);
+
+        // unblock, will fail because c2 is not blocked
+        assert_eq!(
+            Ok(0.into()),
+            run_command(&c1, &["client", "unblock", "2", "error"]).await,
+        );
+
+        // block c2
+        c2.block();
+
+        // unblock c2 and return an error
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c1, &["client", "unblock", "2", "error"]).await,
+        );
+        assert!(!c2.is_blocked());
+
+        // read from c2 the error
+        assert_eq!(
+            Some(Value::Err(
+                "UNBLOCKED".into(),
+                "client unblocked via CLIENT UNBLOCK".into()
+            )),
+            c2_recv.recv().await
+        );
+    }
+
+    #[tokio::test]
+    async fn client_unblock_2() {
+        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
+        let c1 = create_connection();
+        let (mut c2_recv, c2) = c1.all_connections().new_connection(c1.db(), addr);
+        // block c2
+        c2.block();
+
+        // unblock c2 and return an null
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c1, &["client", "unblock", "2", "timeout"]).await,
+        );
+        assert!(!c2.is_blocked());
+
+        // read from c2 the error
+        assert_eq!(Some(Value::Null), c2_recv.recv().await);
+    }
+
+    #[tokio::test]
+    async fn client_unblock_3() {
+        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
+        let c1 = create_connection();
+        let (mut c2_recv, c2) = c1.all_connections().new_connection(c1.db(), addr);
+        // block c2
+        c2.block();
+
+        // unblock c2 and return an null
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c1, &["client", "unblock", "2"]).await,
+        );
+        assert!(!c2.is_blocked());
+
+        // read from c2 the error
+        assert_eq!(Some(Value::Null), c2_recv.recv().await);
+    }
+
+    #[tokio::test]
+    async fn client_unblock_4() {
+        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
+        let c1 = create_connection();
+        let (mut c2_recv, c2) = c1.all_connections().new_connection(c1.db(), addr);
+        // block c2
+        c2.block();
+
+        // unblock c2 and return an null
+        assert_eq!(
+            Err(Error::Syntax),
+            run_command(&c1, &["client", "unblock", "2", "wrong-arg"]).await,
+        );
+        assert!(c2.is_blocked());
+    }
 }

+ 91 - 1
src/cmd/hash.rs

@@ -382,7 +382,7 @@ pub async fn hvals(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 #[cfg(test)]
 mod test {
     use crate::{
-        cmd::test::{create_connection, run_command},
+        cmd::test::{create_connection, invalid_type, run_command},
         value::Value,
     };
 
@@ -518,4 +518,94 @@ mod test {
         let r = run_command(&c, &["hvals", "foo"]).await;
         assert_eq!(Ok(Value::Array(vec![Value::Blob("1".into()),])), r);
     }
+
+    #[tokio::test]
+    async fn hdel_remove_empty_hash() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["hset", "foo", "f1", "1", "f2", "1"]).await
+        );
+
+        assert_eq!(Ok(1.into()), run_command(&c, &["hdel", "foo", "f1",]).await);
+        assert_eq!(
+            Ok(Value::Integer(-1)),
+            run_command(&c, &["ttl", "foo"]).await
+        );
+        assert_eq!(Ok(1.into()), run_command(&c, &["hdel", "foo", "f2",]).await);
+        assert_eq!(
+            Ok(Value::Integer(-2)),
+            run_command(&c, &["ttl", "foo"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn hincrby() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["hincrby", "foo", "f1", "1"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(-9)),
+            run_command(&c, &["hincrby", "foo", "f1", "-10"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Blob("-9".into())),
+            run_command(&c, &["hget", "foo", "f1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn hsetnx() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["hsetnx", "foo", "xxx", "1"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["hsetnx", "foo", "xxx", "1"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["hsetnx", "foo", "bar", "1"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["hlen", "foo"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn hlen_non_existing() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c, &["hlen", "foo"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn invalid_types() {
+        invalid_type(&["hdel", "key", "bar", "1"]).await;
+        invalid_type(&["hexists", "key", "bar"]).await;
+        invalid_type(&["hget", "key", "bar"]).await;
+        invalid_type(&["hgetall", "key"]).await;
+        invalid_type(&["hincrby", "key", "bar", "1"]).await;
+        invalid_type(&["hincrbyfloat", "key", "bar", "1"]).await;
+        invalid_type(&["hkeys", "key"]).await;
+        invalid_type(&["hlen", "key"]).await;
+        invalid_type(&["hstrlen", "key", "foo"]).await;
+        invalid_type(&["hmget", "key", "1", "2"]).await;
+        invalid_type(&["hrandfield", "key"]).await;
+        invalid_type(&["hset", "key", "bar", "1"]).await;
+        invalid_type(&["hsetnx", "key", "bar", "1"]).await;
+        invalid_type(&["hvals", "key"]).await;
+    }
 }

+ 42 - 0
src/cmd/key.rs

@@ -563,6 +563,48 @@ mod test {
     }
 
     #[tokio::test]
+    async fn scan_with_type_1() {
+        let c = create_connection();
+        for i in (1..100) {
+            assert_eq!(
+                Ok(1.into()),
+                run_command(&c, &["incr", &format!("foo-{}", i)]).await
+            );
+        }
+
+        let r: Vec<Value> = run_command(&c, &["scan", "0", "type", "hash"])
+            .await
+            .unwrap()
+            .try_into()
+            .unwrap();
+        let values: Vec<Value> = r[1].clone().try_into().unwrap();
+
+        assert_eq!(2, r.len());
+        assert_eq!(0, values.len());
+    }
+
+    #[tokio::test]
+    async fn scan_with_type_2() {
+        let c = create_connection();
+        for i in (1..100) {
+            assert_eq!(
+                Ok(1.into()),
+                run_command(&c, &["incr", &format!("foo-{}", i)]).await
+            );
+        }
+
+        let r: Vec<Value> = run_command(&c, &["scan", "0", "type", "!hash"])
+            .await
+            .unwrap()
+            .try_into()
+            .unwrap();
+        let values: Vec<Value> = r[1].clone().try_into().unwrap();
+
+        assert_eq!(2, r.len());
+        assert_eq!(10, values.len());
+    }
+
+    #[tokio::test]
     async fn scan_with_count() {
         let c = create_connection();
         for i in (1..100) {

+ 21 - 0
src/cmd/mod.rs

@@ -56,6 +56,12 @@ mod test {
         all_connections.new_connection(default_db, client)
     }
 
+    pub async fn invalid_type(cmd: &[&str]) {
+        let c = create_connection();
+        let _ = run_command(&c, &["set", "key", "test"]).await;
+        assert_eq!(Err(Error::WrongType), run_command(&c, cmd).await);
+    }
+
     pub fn create_new_connection_from_connection(
         conn: &Connection,
     ) -> (Receiver<Value>, Arc<Connection>) {
@@ -75,4 +81,19 @@ mod test {
         let dispatcher = Dispatcher::new();
         dispatcher.execute(conn, &args).await
     }
+
+    #[tokio::test]
+    async fn total_connections() {
+        let c = create_connection();
+        let all_connections = c.all_connections();
+        assert_eq!(1, all_connections.total_connections());
+        let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
+        let c2 = c.all_connections().new_connection(c.db(), client).1;
+        c2.block();
+        assert_eq!(2, all_connections.total_connections());
+        assert_eq!(1, all_connections.total_blocked_connections());
+        c2.destroy();
+        assert_eq!(1, all_connections.total_connections());
+        assert_eq!(0, all_connections.total_blocked_connections());
+    }
 }

+ 82 - 1
src/cmd/pubsub.rs

@@ -85,10 +85,13 @@ pub async fn unsubscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Err
 mod test {
     use crate::{
         cmd::test::{
-            create_connection_and_pubsub, create_new_connection_from_connection, run_command,
+            create_connection, create_connection_and_pubsub, create_new_connection_from_connection,
+            run_command,
         },
+        error::Error,
         value::Value,
     };
+    use std::convert::TryInto;
     use tokio::sync::mpsc::Receiver;
 
     async fn test_subscription_confirmation_and_first_message(
@@ -177,6 +180,49 @@ mod test {
     }
 
     #[tokio::test]
+    async fn test_unsubscribe_with_no_args() {
+        let (mut recv, c1) = create_connection_and_pubsub();
+
+        assert_eq!(
+            Ok(Value::Ignore),
+            run_command(&c1, &["subscribe", "foo", "bar"]).await
+        );
+
+        assert_eq!(Ok(Value::Ignore), run_command(&c1, &["unsubscribe"]).await);
+
+        assert_eq!(
+            Some(Value::Array(vec![
+                "subscribe".into(),
+                "foo".into(),
+                1.into()
+            ])),
+            recv.recv().await
+        );
+
+        assert_eq!(
+            Some(Value::Array(vec![
+                "subscribe".into(),
+                "bar".into(),
+                2.into()
+            ])),
+            recv.recv().await
+        );
+
+        let x: Vec<Vec<Value>> = vec![
+            recv.recv().await.unwrap().try_into().unwrap(),
+            recv.recv().await.unwrap().try_into().unwrap(),
+        ];
+
+        assert_eq!(Value::Blob("unsubscribe".into()), x[0][0]);
+        assert_eq!(Value::Integer(1), x[0][2]);
+        assert_eq!(Value::Blob("unsubscribe".into()), x[1][0]);
+        assert_eq!(Value::Integer(0), x[1][2]);
+
+        assert!(x[0][1] == "foo".into() || x[0][1] == "bar".into());
+        assert!(x[1][1] == "foo".into() || x[1][1] == "bar".into());
+    }
+
+    #[tokio::test]
     async fn test_unsubscribe_with_args() {
         let (mut recv, c1) = create_connection_and_pubsub();
 
@@ -241,6 +287,14 @@ mod test {
             Ok(Value::Ignore),
             run_command(&c2, &["subscribe", "foo"]).await
         );
+        assert_eq!(
+            Ok(Value::Array(vec!["foo".into(), 2.into()])),
+            run_command(&c2, &["pubsub", "numsub", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Array(vec!["foo".into()])),
+            run_command(&c2, &["pubsub", "channels"]).await
+        );
 
         let msg = "foo - message";
 
@@ -251,6 +305,15 @@ mod test {
     }
 
     #[tokio::test]
+    async fn pubsub_not_found() {
+        let c1 = create_connection();
+        assert_eq!(
+            Err(Error::SubCommandNotFound("foo".into(), "pubsub".into())),
+            run_command(&c1, &["pubsub", "foo"]).await
+        );
+    }
+
+    #[tokio::test]
     async fn pubsub_numpat() {
         let (_, c1) = create_connection_and_pubsub();
         let (_, c2) = create_new_connection_from_connection(&c1);
@@ -266,5 +329,23 @@ mod test {
             Ok(Value::Integer(3)),
             run_command(&c1, &["pubsub", "numpat"]).await
         );
+
+        let _ = run_command(&c2, &["punsubscribe", "barx*"]).await;
+        assert_eq!(
+            Ok(Value::Integer(3)),
+            run_command(&c2, &["pubsub", "numpat"]).await
+        );
+
+        let _ = run_command(&c2, &["punsubscribe", "bar*"]).await;
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c2, &["pubsub", "numpat"]).await
+        );
+
+        let _ = run_command(&c2, &["punsubscribe"]).await;
+        assert_eq!(
+            Ok(Value::Integer(0)),
+            run_command(&c2, &["pubsub", "numpat"]).await
+        );
     }
 }

+ 156 - 0
src/cmd/server.rs

@@ -134,3 +134,159 @@ pub async fn time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
 pub async fn quit(_: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     Err(Error::Quit)
 }
+
+#[cfg(test)]
+mod test {
+    use crate::{
+        cmd::test::{create_connection, run_command},
+        error::Error,
+        value::Value,
+    };
+
+    #[tokio::test]
+    async fn digest() {
+        let c = create_connection();
+        let _ = run_command(&c, &["hset", "foo0", "f1", "1", "f2", "2", "f3", "3"]).await;
+        let _ = run_command(&c, &["set", "foo1", "f1"]).await;
+        let _ = run_command(&c, &["rpush", "foo2", "f1"]).await;
+        let _ = run_command(&c, &["sadd", "foo3", "f1"]).await;
+        assert_eq!(
+            Ok(Value::Array(vec![
+                "30c7a6a3e846cda0ec6bec93bbcead474c8b735d81d6b13043e8e7bd1287465b".into(),
+                "c9c7eecf5cc340e36731787d8844a5b166d9611718fc12f0fa6501f711aad8a5".into(),
+                "30c7a6a3e846cda0ec6bec93bbcead474c8b735d81d6b13043e8e7bd1287465b".into(),
+                "30c7a6a3e846cda0ec6bec93bbcead474c8b735d81d6b13043e8e7bd1287465b".into(),
+            ])),
+            run_command(
+                &c,
+                &["debug", "digest-value", "foo0", "foo1", "foo2", "foo3"]
+            )
+            .await
+        );
+    }
+
+    #[tokio::test]
+    async fn debug() {
+        let c = create_connection();
+        let _ = run_command(&c, &["hset", "foo0", "f1", "1", "f2", "2", "f3", "3"]).await;
+        let _ = run_command(&c, &["set", "foo1", "f1"]).await;
+        let _ = run_command(&c, &["rpush", "foo2", "f1"]).await;
+        let _ = run_command(&c, &["sadd", "foo3", "f1"]).await;
+        match run_command(&c, &["debug", "object", "foo0"]).await {
+            Ok(Value::Blob(s)) => {
+                let s = String::from_utf8_lossy(&s);
+                assert!(s.contains("hashtable"))
+            }
+            _ => panic!("Unxpected response"),
+        };
+        match run_command(&c, &["debug", "object", "foo1"]).await {
+            Ok(Value::Blob(s)) => {
+                let s = String::from_utf8_lossy(&s);
+                assert!(s.contains("embstr"));
+            }
+            _ => panic!("Unxpected response"),
+        };
+        match run_command(&c, &["debug", "object", "foo2"]).await {
+            Ok(Value::Blob(s)) => {
+                let s = String::from_utf8_lossy(&s);
+                assert!(s.contains("linkedlist"));
+            }
+            _ => panic!("Unxpected response"),
+        };
+        match run_command(&c, &["debug", "object", "foo3"]).await {
+            Ok(Value::Blob(s)) => {
+                let s = String::from_utf8_lossy(&s);
+                assert!(s.contains("hashtable"));
+            }
+            _ => panic!("Unxpected response"),
+        };
+    }
+
+    #[tokio::test]
+    async fn command_info() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Array(vec![
+                "CLIENT".into(),
+                Value::Integer(-2),
+                Value::Array(vec![
+                    "admin".into(),
+                    "noscript".into(),
+                    "random".into(),
+                    "loading".into(),
+                    "stale".into(),
+                ]),
+                0.into(),
+                0.into(),
+                0.into(),
+            ])])),
+            run_command(&c, &["command", "info", "client"]).await,
+        );
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Array(vec![
+                "QUIT".into(),
+                1.into(),
+                Value::Array(vec![
+                    "random".into(),
+                    "loading".into(),
+                    "stale".into(),
+                    "fast".into()
+                ]),
+                0.into(),
+                0.into(),
+                0.into(),
+            ])])),
+            run_command(&c, &["command", "info", "quit"]).await,
+        );
+    }
+
+    #[tokio::test]
+    async fn flush() {
+        let c = create_connection();
+        let _ = run_command(&c, &["hset", "foo0", "f1", "1", "f2", "2", "f3", "3"]).await;
+        let _ = run_command(&c, &["set", "foo1", "f1"]).await;
+        let _ = run_command(&c, &["rpush", "foo2", "f1"]).await;
+        let _ = run_command(&c, &["sadd", "foo3", "f1"]).await;
+        assert_eq!(Ok(Value::Integer(4)), run_command(&c, &["dbsize"]).await);
+        let _ = run_command(&c, &["flushdb"]).await;
+        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["dbsize"]).await);
+    }
+
+    #[tokio::test]
+    async fn flushall() {
+        let c = create_connection();
+        let _ = run_command(&c, &["hset", "foo0", "f1", "1", "f2", "2", "f3", "3"]).await;
+        let _ = run_command(&c, &["set", "foo1", "f1"]).await;
+        let _ = run_command(&c, &["rpush", "foo2", "f1"]).await;
+        let _ = run_command(&c, &["sadd", "foo3", "f1"]).await;
+        assert_eq!(Ok(Value::Integer(4)), run_command(&c, &["dbsize"]).await);
+        let _ = run_command(&c, &["select", "3"]).await;
+        let _ = run_command(&c, &["flushall"]).await;
+        let _ = run_command(&c, &["select", "0"]).await;
+        assert_eq!(Ok(Value::Integer(0)), run_command(&c, &["dbsize"]).await);
+    }
+
+    #[tokio::test]
+    async fn get_keys_1() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Array(vec!["foo0".into()])),
+            run_command(
+                &c,
+                &["command", "getkeys", "hset", "foo0", "f1", "1", "f2", "2", "f3", "3"]
+            )
+            .await
+        );
+    }
+    #[tokio::test]
+    async fn get_keys_2() {
+        let c = create_connection();
+        assert_eq!(
+            Err(Error::SubCommandNotFound(
+                "getkeys".to_owned(),
+                "command".to_owned()
+            )),
+            run_command(&c, &["command", "getkeys"]).await
+        );
+    }
+}

+ 9 - 0
src/cmd/string.rs

@@ -747,6 +747,15 @@ mod test {
     }
 
     #[tokio::test]
+    async fn test_set_px() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Ok),
+            run_command(&c, &["set", "foo", "20", "px", "1234"]).await,
+        );
+    }
+
+    #[tokio::test]
     async fn test_invalid_ts() {
         let c = create_connection();
         assert_eq!(

+ 60 - 0
src/config.rs

@@ -89,3 +89,63 @@ pub async fn parse(path: String) -> Result<Config, Error> {
     let content = tokio::fs::read(path).await?;
     Ok(from_slice(&content)?)
 }
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use redis_config_parser::de::from_str;
+
+    #[test]
+    fn parse() {
+        let config = "always-show-logo yes
+notify-keyspace-events KEA
+daemonize no
+pidfile /var/run/redis.pid
+port 21111
+timeout 0
+bind 127.0.0.1
+loglevel verbose
+logfile ''
+databases 16
+latency-monitor-threshold 1
+save 60 10000
+rdbcompression yes
+dbfilename dump.rdb
+dir ./tests/tmp/server.43948.1
+slave-serve-stale-data yes
+appendonly no
+appendfsync everysec
+no-appendfsync-on-rewrite no
+activerehashing yes
+unixsocket /Users/crodas/projects/rust/microredis/tests/tmp/server.43948.1/socket
+";
+
+        let config: Config = from_str(&config).unwrap();
+        assert!(!config.daemonize);
+        assert_eq!(21111, config.port);
+        assert_eq!(vec!["127.0.0.1"], config.bind);
+        assert_eq!(vec!["127.0.0.1:21111"], config.get_tcp_hostnames());
+        assert_eq!(LogLevel::Debug, config.log.level);
+        assert_eq!(Some("".to_owned()), config.log.file);
+        assert_eq!(16, config.databases);
+        assert_eq!(
+            Some(
+                "/Users/crodas/projects/rust/microredis/tests/tmp/server.43948.1/socket".to_owned()
+            ),
+            config.unixsocket
+        );
+    }
+
+    #[test]
+    fn test_default_config() {
+        let config = Config::default();
+        assert!(!config.daemonize);
+        assert_eq!(6379, config.port);
+        assert_eq!(vec!["127.0.0.1"], config.bind);
+        assert_eq!(vec!["127.0.0.1:6379"], config.get_tcp_hostnames());
+        assert_eq!(LogLevel::Debug, config.log.level);
+        assert_eq!(None, config.log.file);
+        assert_eq!(16, config.databases);
+        assert_eq!(None, config.unixsocket);
+    }
+}

+ 1 - 1
src/connection/connections.rs

@@ -46,7 +46,7 @@ impl Connections {
     }
 
     /// Removes a connection from the connections
-    pub fn remove(self: Arc<Connections>, conn: Arc<Connection>) {
+    pub fn remove(self: &Arc<Connections>, conn: Arc<Connection>) {
         let id = conn.id();
         self.connections.write().remove(&id);
     }

+ 3 - 3
src/connection/mod.rs

@@ -78,7 +78,7 @@ impl ConnectionInfo {
             current_db: 0,
             tx_keys: HashSet::new(),
             commands: None,
-            status: ConnectionStatus::Normal,
+            status: ConnectionStatus::default(),
             blocked_notification: None,
             is_blocked: false,
             block_id: 0,
@@ -208,7 +208,7 @@ impl Connection {
                 info.commands = None;
                 info.watch_keys.clear();
                 info.tx_keys.clear();
-                info.status = ConnectionStatus::Normal;
+                info.status = ConnectionStatus::default();
 
                 Ok(Value::Ok)
             }
@@ -238,7 +238,7 @@ impl Connection {
     /// Resets the current connection.
     pub fn reset(&self) {
         let mut info = self.info.write();
-        info.status = ConnectionStatus::Normal;
+        info.status = ConnectionStatus::default();
         info.name = None;
         info.watch_keys = vec![];
         info.commands = None;

+ 38 - 0
src/db/mod.rs

@@ -1309,4 +1309,42 @@ mod test {
         assert_eq!(2, result.result.len());
         assert_eq!("0", result.cursor.to_string());
     }
+
+    #[tokio::test]
+    async fn lock_keys() {
+        let db1 = Arc::new(Db::new(100));
+        let db2 = db1.clone().new_db_instance(2);
+        let db3 = db1.clone().new_db_instance(3);
+        let shared = Arc::new(RwLock::new(1));
+        let shared1 = shared.clone();
+        let shared2 = shared.clone();
+        let shared3 = shared.clone();
+        tokio::join!(
+            tokio::spawn(async move {
+                db1.lock_keys(&["test".into()]);
+                let mut x = shared1.write();
+                *x = 2;
+                thread::sleep(Duration::from_secs(1));
+                db1.unlock_keys(&["test".into()]);
+            }),
+            tokio::spawn(async move {
+                db2.lock_keys(&["test".into(), "bar".into()]);
+                let mut x = shared2.write();
+                if *x == 2 {
+                    *x = 5;
+                }
+                thread::sleep(Duration::from_secs(2));
+                db2.unlock_keys(&["test".into(), "bar".into()]);
+            }),
+            tokio::spawn(async move {
+                thread::sleep(Duration::from_millis(500));
+                assert_eq!(4, db3.get_slot(&"test".into()));
+                let mut x = shared3.write();
+                if *x == 5 {
+                    *x = 6;
+                }
+            }),
+        );
+        assert_eq!(6, *shared.read());
+    }
 }

+ 43 - 2
src/db/utils.rs

@@ -1,4 +1,5 @@
 use crate::error::Error;
+use bytes::Bytes;
 use std::convert::TryFrom;
 use tokio::time::{Duration, Instant};
 
@@ -46,10 +47,10 @@ pub struct ExpirationOpts {
     pub LT: bool,
 }
 
-impl TryFrom<&[bytes::Bytes]> for ExpirationOpts {
+impl TryFrom<&[Bytes]> for ExpirationOpts {
     type Error = Error;
 
-    fn try_from(args: &[bytes::Bytes]) -> Result<Self, Self::Error> {
+    fn try_from(args: &[Bytes]) -> Result<Self, Self::Error> {
         let mut expiration_opts = Self::default();
         for arg in args.iter() {
             match String::from_utf8_lossy(arg).to_uppercase().as_str() {
@@ -63,3 +64,43 @@ impl TryFrom<&[bytes::Bytes]> for ExpirationOpts {
         Ok(expiration_opts)
     }
 }
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use std::convert::TryInto;
+
+    #[test]
+    fn parsing_expiration_1() {
+        let opts = vec![
+            Bytes::copy_from_slice(b"nx"),
+            Bytes::copy_from_slice(b"Xx"),
+            Bytes::copy_from_slice(b"GT"),
+            Bytes::copy_from_slice(b"lT"),
+        ];
+        let x: ExpirationOpts = opts.as_slice().try_into().unwrap();
+        assert!(x.NX);
+        assert!(x.XX);
+        assert!(x.GT);
+        assert!(x.LT);
+    }
+
+    #[test]
+    fn parsing_expiration_2() {
+        let opts = vec![Bytes::copy_from_slice(b"nx")];
+        let x: ExpirationOpts = opts.as_slice().try_into().unwrap();
+
+        assert!(x.NX);
+        assert!(!x.XX);
+        assert!(!x.GT);
+        assert!(!x.LT);
+    }
+
+    #[test]
+    fn parsing_expiration_3() {
+        let opts = vec![Bytes::copy_from_slice(b"xxx")];
+        let x: Result<ExpirationOpts, _> = opts.as_slice().try_into();
+
+        assert!(x.is_err());
+    }
+}

+ 0 - 12
src/macros.rs

@@ -203,18 +203,6 @@ macro_rules! value_vec_try_from {
     }
 }
 
-/// Converts an Option<T> to Value. If the option is None Value::Null is returned.
-#[macro_export]
-macro_rules! option {
-    {$type: expr} => {
-        if let Some(val) = $type {
-            val.into()
-        } else {
-            Value::Null
-        }
-    }
-}
-
 /// Check if a given command argument in a position $pos is eq to a $command
 #[macro_export]
 macro_rules! check_arg {

+ 0 - 9
src/value/expiration.rs

@@ -53,15 +53,6 @@ impl Expiration {
             command: command.to_string(),
         })
     }
-
-    /// Fails if the timestamp is negative
-    pub fn must_be_positive(&self) -> Result<(), Error> {
-        if self.is_negative {
-            Err(Error::InvalidExpire(self.command.to_string()))
-        } else {
-            Ok(())
-        }
-    }
 }
 
 impl TryInto<Duration> for Expiration {

+ 115 - 3
src/value/mod.rs

@@ -148,6 +148,13 @@ impl From<&Value> for Vec<u8> {
             }
             Value::Err(x, y) => format!("-{} {}\r\n", x, y).into(),
             Value::String(x) => format!("+{}\r\n", x).into(),
+            Value::Boolean(x) => {
+                if *x {
+                    "#t\r\n".into()
+                } else {
+                    "#f\r\n".into()
+                }
+            }
             Value::Queued => "+QUEUED\r\n".into(),
             Value::Ok => "+OK\r\n".into(),
             _ => b"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n".to_vec(),
@@ -233,9 +240,9 @@ impl From<Value> for Vec<u8> {
     }
 }
 
-impl From<Option<&Bytes>> for Value {
-    fn from(v: Option<&Bytes>) -> Self {
-        if let Some(v) = v {
+impl<T: Into<Value>> From<Option<T>> for Value {
+    fn from(value: Option<T>) -> Self {
+        if let Some(v) = value {
             v.into()
         } else {
             Value::Null
@@ -297,3 +304,108 @@ impl TryInto<Vec<Value>> for Value {
         }
     }
 }
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use paste::paste;
+
+    macro_rules! serialize_deserialize {
+        ($name:ty, $x:expr, $str:expr) => {
+            paste! {
+                #[test]
+                fn [<serialize_and_deserialize_ $name>]() {
+                    let raw_bytes: Vec<u8> = $x.into();
+                    let parsed: ParsedValue = redis_zero_protocol_parser::parse(&raw_bytes).unwrap().1;
+                    assert_eq!(Value::String($str.to_owned()), (&parsed).into());
+                }
+            }
+        };
+        ($name:ty, $x:expr) => {
+            paste! {
+                #[test]
+                fn [<serialize_and_deserialize_ $name>]() {
+                    let raw_bytes: Vec<u8> = $x.into();
+                    let parsed: ParsedValue = redis_zero_protocol_parser::parse(&raw_bytes).unwrap().1;
+                    assert_eq!($x, (&parsed).into());
+                }
+            }
+        };
+    }
+
+    macro_rules! try_into {
+        ($name:ty, $x:expr, $ty:ty, $expected:expr) => {
+            paste! {
+                #[test]
+                fn [<try_into_ $ty _ $name>]() {
+                    let val: Result<$ty, _> = (&$x).try_into();
+                    assert_eq!(val, $expected);
+                }
+            }
+        };
+    }
+
+    serialize_deserialize!(null, Value::Null);
+    serialize_deserialize!(blob, Value::Blob("test".into()));
+    serialize_deserialize!(int, Value::Integer(1.into()));
+    serialize_deserialize!(bigint, Value::BigInteger(1.into()));
+    serialize_deserialize!(_true, Value::Boolean(true));
+    serialize_deserialize!(_false, Value::Boolean(false));
+    serialize_deserialize!(float, Value::Float(1.2));
+    serialize_deserialize!(string, Value::String("test".into()));
+    serialize_deserialize!(array, Value::Array(vec!["test".into(), Value::Float(1.2)]));
+    serialize_deserialize!(err, Value::Err("foo".to_owned(), "bar".to_owned()));
+    serialize_deserialize!(queued, Value::Queued, "QUEUED");
+    serialize_deserialize!(ok, Value::Ok, "OK");
+    try_into!(biginteger, Value::BigInteger(1), i64, Ok(1));
+    try_into!(integer, Value::Integer(2), i64, Ok(2));
+    try_into!(blob, Value::Blob("3".into()), i64, Ok(3));
+    try_into!(string, Value::String("4".into()), i64, Ok(4));
+    try_into!(ok, Value::Ok, i64, Err(Error::NotANumber));
+    try_into!(
+        string_1,
+        Value::String("foo".into()),
+        i64,
+        Err(Error::NotANumber)
+    );
+    try_into!(float, Value::Float(2.1), f64, Ok(2.1));
+    try_into!(blob, Value::Blob("3.1".into()), f64, Ok(3.1));
+    try_into!(string, Value::String("4.1".into()), f64, Ok(4.1));
+    try_into!(ok, Value::Ok, f64, Err(Error::NotANumber));
+    try_into!(
+        string_1,
+        Value::String("foo".into()),
+        f64,
+        Err(Error::NotANumber)
+    );
+
+    #[test]
+    fn debug() {
+        let x = Value::Null;
+        assert_eq!(Value::Blob("Value at:0x6000004a8840 refcount:1 encoding:embstr serializedlength:5 lru:13421257 lru_seconds_idle:367".into()), x.debug().into());
+    }
+
+    #[test]
+    fn test_try_into_array() {
+        let x: Result<Vec<Value>, _> = Value::Null.try_into();
+        assert_eq!(Err(Error::Internal), x);
+    }
+
+    #[test]
+    fn serialize_none() {
+        let x: Option<Bytes> = None;
+        assert_eq!(Value::Null, x.as_ref().into());
+    }
+
+    #[test]
+    fn serialize_bytes() {
+        let x: Option<Bytes> = Some("test".into());
+        assert_eq!(Value::Blob("test".into()), x.as_ref().into());
+    }
+
+    #[test]
+    fn test_is_err() {
+        assert!(Value::Err("foo".to_owned(), "bar".to_owned()).is_err());
+        assert!(!Value::Null.is_err());
+    }
+}

+ 1 - 0
tests/unit/pubsub.tcl

@@ -65,6 +65,7 @@ start_server {tags {"pubsub network"}} {
         set rd1 [redis_deferring_client]
         assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
         unsubscribe $rd1
+        after 10
         assert_equal 0 [r publish chan1 hello]
         assert_equal 0 [r publish chan2 hello]
         assert_equal 0 [r publish chan3 hello]