Ver código fonte

Add support for Transactions (#9)

* Introducing Entry::Version

Entry::Version is the nano-second of the last update (or creation) of
the entry. This is needed to keep track of key changes for transaction
support.

When reading values through `get_map_or` be sure to bump the version
(calling `db.bump_version`) outside of the callbacks to avoid deadlocks.

Check the documentation for WATCH/UNWATCH/EXEC commands.

* Added support for `watch` and `unwatch`

This command will keep a list of watched keys and their current version.
The `unwatch` command will clean up that list.

The list will be used by `multi`/`exec` to skip a transaction if the
watched keys changed after being watched.

* Added code to extract keys from commands

The idea is to keep track of which keys are passed in each command, this
will help keep track of which keys should be locked exclusively when a
transaction is executed.

* Added function to keep track of keys

* Adding code to start, exec and discard transactions

* Remove deref() call

* Added support for multi/exec.

This implementation will not lock the entire database to process one
transaction at a time, as the original implementation of Redis does,
instead the list of keys affected in the transaction are locked
exclusively by the transaction. Any other connection trying to read or
update locked keys will have to wait in-line.

This implementation is multi-threaded and async I/O, that is why I don't
want to lock-up the entire database exclusively for one transaction,
instead I'm trying to keep the same behaviour as Redis does by locking
all keys involved exclusively.

TODO:
* [ ] Maybe create the loop/await async, instead of blocking the current
      thread.
César D. Rodas 3 anos atrás
pai
commit
d39001273c
15 arquivos alterados com 1022 adições e 102 exclusões
  1. 1 1
      src/cmd/client.rs
  2. 26 8
      src/cmd/hash.rs
  3. 57 25
      src/cmd/list.rs
  4. 1 0
      src/cmd/mod.rs
  5. 25 8
      src/cmd/set.rs
  6. 4 4
      src/cmd/string.rs
  7. 203 0
      src/cmd/transaction.rs
  8. 150 18
      src/connection.rs
  9. 26 2
      src/db/entry.rs
  10. 116 17
      src/db/mod.rs
  11. 356 14
      src/dispatcher.rs
  12. 6 0
      src/error.rs
  13. 46 1
      src/macros.rs
  14. 1 2
      src/server.rs
  15. 4 2
      src/value/mod.rs

+ 1 - 1
src/cmd/client.rs

@@ -30,7 +30,7 @@ pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         "setname" => {
         "setname" => {
             let name = unsafe { std::str::from_utf8_unchecked(&args[2]) }.to_string();
             let name = unsafe { std::str::from_utf8_unchecked(&args[2]) }.to_string();
             conn.set_name(name);
             conn.set_name(name);
-            Ok(Value::OK)
+            Ok(Value::Ok)
         }
         }
         _ => Err(Error::WrongArgument(
         _ => Err(Error::WrongArgument(
             "client".to_owned(),
             "client".to_owned(),

+ 26 - 8
src/cmd/hash.rs

@@ -11,7 +11,7 @@ use std::{
 };
 };
 
 
 pub async fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Hash(h) => {
             Value::Hash(h) => {
@@ -29,7 +29,11 @@ pub async fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(0.into()),
         || Ok(0.into()),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -88,7 +92,7 @@ pub async fn hincrby<
     conn: &Connection,
     conn: &Connection,
     args: &[Bytes],
     args: &[Bytes],
 ) -> Result<Value, Error> {
 ) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Hash(h) => {
             Value::Hash(h) => {
@@ -112,7 +116,11 @@ pub async fn hincrby<
             conn.db().set(&args[1], h.into(), None);
             conn.db().set(&args[1], h.into(), None);
             Ok(incr_by.into())
             Ok(incr_by.into())
         },
         },
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -241,7 +249,7 @@ pub async fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if args.len() % 2 == 1 {
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
         return Err(Error::InvalidArgsCount("hset".to_owned()));
     }
     }
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Hash(h) => {
             Value::Hash(h) => {
@@ -266,11 +274,15 @@ pub async fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             conn.db().set(&args[1], h.into(), None);
             conn.db().set(&args[1], h.into(), None);
             Ok(len.into())
             Ok(len.into())
         },
         },
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Hash(h) => {
             Value::Hash(h) => {
@@ -295,7 +307,13 @@ pub async fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             conn.db().set(&args[1], h.into(), None);
             conn.db().set(&args[1], h.into(), None);
             Ok(len.into())
             Ok(len.into())
         },
         },
-    )
+    )?;
+
+    if result == Value::Integer(1) {
+        conn.db().bump_version(&args[1]);
+    }
+
+    Ok(result)
 }
 }
 
 
 pub async fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {

+ 57 - 25
src/cmd/list.rs

@@ -13,7 +13,7 @@ fn remove_element(
     count: usize,
     count: usize,
     front: bool,
     front: bool,
 ) -> Result<Value, Error> {
 ) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         key,
         key,
         |v| match v {
         |v| match v {
             Value::List(x) => {
             Value::List(x) => {
@@ -50,7 +50,11 @@ fn remove_element(
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(Value::Null),
         || Ok(Value::Null),
-    )
+    )?;
+
+    conn.db().bump_version(key);
+
+    Ok(result)
 }
 }
 
 
 pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -65,7 +69,7 @@ pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             };
             };
         }
         }
 
 
-        if Instant::now() >= timeout {
+        if Instant::now() >= timeout || conn.is_executing_transaction() {
             break;
             break;
         }
         }
 
 
@@ -87,7 +91,7 @@ pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             };
             };
         }
         }
 
 
-        if Instant::now() >= timeout {
+        if Instant::now() >= timeout || conn.is_executing_transaction() {
             break;
             break;
         }
         }
 
 
@@ -127,7 +131,7 @@ pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
         return Err(Error::Syntax);
         return Err(Error::Syntax);
     };
     };
 
 
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::List(x) => {
             Value::List(x) => {
@@ -161,7 +165,11 @@ pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(0.into()),
         || Ok(0.into()),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -192,7 +200,7 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         return Err(Error::Syntax);
         return Err(Error::Syntax);
     };
     };
 
 
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::List(source) => conn.db().get_map_or(
             Value::List(source) => conn.db().get_map_or(
@@ -240,7 +248,11 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(Value::Null),
         || Ok(Value::Null),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -330,7 +342,7 @@ pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let is_push_x = check_arg!(args, 0, "LPUSHX");
     let is_push_x = check_arg!(args, 0, "LPUSHX");
 
 
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::List(x) => {
             Value::List(x) => {
@@ -356,7 +368,11 @@ pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             conn.db().set(&args[1], h.into(), None);
             conn.db().set(&args[1], h.into(), None);
             Ok(len.into())
             Ok(len.into())
         },
         },
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -391,7 +407,7 @@ pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 }
 }
 
 
 pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::List(x) => {
             Value::List(x) => {
@@ -434,11 +450,15 @@ pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(0.into()),
         || Ok(0.into()),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::List(x) => {
             Value::List(x) => {
@@ -451,7 +471,7 @@ pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
 
                 if let Some(x) = x.get_mut(index as usize) {
                 if let Some(x) = x.get_mut(index as usize) {
                     *x = checksum::Value::new(args[3].clone());
                     *x = checksum::Value::new(args[3].clone());
-                    Ok(Value::OK)
+                    Ok(Value::Ok)
                 } else {
                 } else {
                     Err(Error::OutOfRange)
                     Err(Error::OutOfRange)
                 }
                 }
@@ -459,11 +479,15 @@ pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Err(Error::NotFound),
         || Err(Error::NotFound),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::List(x) => {
             Value::List(x) => {
@@ -486,12 +510,16 @@ pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                     retain
                     retain
                 });
                 });
 
 
-                Ok(Value::OK)
+                Ok(Value::Ok)
             }
             }
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
-        || Ok(Value::OK),
-    )
+        || Ok(Value::Ok),
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -521,7 +549,7 @@ pub async fn rpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error
 pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let is_push_x = check_arg!(args, 0, "RPUSHX");
     let is_push_x = check_arg!(args, 0, "RPUSHX");
 
 
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::List(x) => {
             Value::List(x) => {
@@ -547,7 +575,11 @@ pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             conn.db().set(&args[1], h.into(), None);
             conn.db().set(&args[1], h.into(), None);
             Ok(len.into())
             Ok(len.into())
         },
         },
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 #[cfg(test)]
 #[cfg(test)]
@@ -1210,17 +1242,17 @@ mod test {
         );
         );
 
 
         assert_eq!(
         assert_eq!(
-            Ok(Value::OK),
+            Ok(Value::Ok),
             run_command(&c, &["lset", "foo", "-1", "6"]).await,
             run_command(&c, &["lset", "foo", "-1", "6"]).await,
         );
         );
 
 
         assert_eq!(
         assert_eq!(
-            Ok(Value::OK),
+            Ok(Value::Ok),
             run_command(&c, &["lset", "foo", "-2", "7"]).await,
             run_command(&c, &["lset", "foo", "-2", "7"]).await,
         );
         );
 
 
         assert_eq!(
         assert_eq!(
-            Ok(Value::OK),
+            Ok(Value::Ok),
             run_command(&c, &["lset", "foo", "0", "8"]).await,
             run_command(&c, &["lset", "foo", "0", "8"]).await,
         );
         );
 
 
@@ -1265,7 +1297,7 @@ mod test {
         );
         );
 
 
         assert_eq!(
         assert_eq!(
-            Ok(Value::OK),
+            Ok(Value::Ok),
             run_command(&c, &["ltrim", "foo", "1", "-2"]).await
             run_command(&c, &["ltrim", "foo", "1", "-2"]).await
         );
         );
 
 

+ 1 - 0
src/cmd/mod.rs

@@ -4,6 +4,7 @@ pub mod key;
 pub mod list;
 pub mod list;
 pub mod set;
 pub mod set;
 pub mod string;
 pub mod string;
+pub mod transaction;
 
 
 #[cfg(test)]
 #[cfg(test)]
 mod test {
 mod test {

+ 25 - 8
src/cmd/set.rs

@@ -62,7 +62,7 @@ where
 }
 }
 
 
 pub async fn sadd(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn sadd(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Set(x) => {
             Value::Set(x) => {
@@ -95,7 +95,11 @@ pub async fn sadd(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
 
             Ok(len.into())
             Ok(len.into())
         },
         },
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn scard(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn scard(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -212,7 +216,7 @@ pub async fn smismember(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
 }
 }
 
 
 pub async fn smove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn smove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Set(set1) => {
             Value::Set(set1) => {
@@ -247,12 +251,17 @@ pub async fn smove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(0.into()),
         || Ok(0.into()),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+    conn.db().bump_version(&args[3]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn spop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn spop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let rand = srandmember(conn, args).await?;
     let rand = srandmember(conn, args).await?;
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Set(x) => {
             Value::Set(x) => {
@@ -275,7 +284,11 @@ pub async fn spop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(0.into()),
         || Ok(0.into()),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn srandmember(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn srandmember(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -312,7 +325,7 @@ pub async fn srandmember(conn: &Connection, args: &[Bytes]) -> Result<Value, Err
 }
 }
 
 
 pub async fn srem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn srem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get_map_or(
+    let result = conn.db().get_map_or(
         &args[1],
         &args[1],
         |v| match v {
         |v| match v {
             Value::Set(x) => {
             Value::Set(x) => {
@@ -330,7 +343,11 @@ pub async fn srem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
         },
         },
         || Ok(0.into()),
         || Ok(0.into()),
-    )
+    )?;
+
+    conn.db().bump_version(&args[1]);
+
+    Ok(result)
 }
 }
 
 
 pub async fn sunion(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub async fn sunion(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {

+ 4 - 4
src/cmd/string.rs

@@ -144,7 +144,7 @@ mod test {
     async fn get_and_set() {
     async fn get_and_set() {
         let c = create_connection();
         let c = create_connection();
         let x = run_command(&c, &["set", "foo", "bar"]).await;
         let x = run_command(&c, &["set", "foo", "bar"]).await;
-        assert_eq!(Ok(Value::OK), x);
+        assert_eq!(Ok(Value::Ok), x);
 
 
         let x = run_command(&c, &["get", "foo"]).await;
         let x = run_command(&c, &["get", "foo"]).await;
         assert_eq!(Ok(Value::Blob("bar".into())), x);
         assert_eq!(Ok(Value::Blob("bar".into())), x);
@@ -154,7 +154,7 @@ mod test {
     async fn getdel() {
     async fn getdel() {
         let c = create_connection();
         let c = create_connection();
         let x = run_command(&c, &["set", "foo", "bar"]).await;
         let x = run_command(&c, &["set", "foo", "bar"]).await;
-        assert_eq!(Ok(Value::OK), x);
+        assert_eq!(Ok(Value::Ok), x);
 
 
         assert_eq!(
         assert_eq!(
             Ok(Value::Blob("bar".into())),
             Ok(Value::Blob("bar".into())),
@@ -168,7 +168,7 @@ mod test {
     async fn getset() {
     async fn getset() {
         let c = create_connection();
         let c = create_connection();
         let x = run_command(&c, &["set", "foo", "bar"]).await;
         let x = run_command(&c, &["set", "foo", "bar"]).await;
-        assert_eq!(Ok(Value::OK), x);
+        assert_eq!(Ok(Value::Ok), x);
 
 
         assert_eq!(
         assert_eq!(
             Ok(Value::Blob("bar".into())),
             Ok(Value::Blob("bar".into())),
@@ -185,7 +185,7 @@ mod test {
     async fn strlen() {
     async fn strlen() {
         let c = create_connection();
         let c = create_connection();
         let x = run_command(&c, &["set", "foo", "bar"]).await;
         let x = run_command(&c, &["set", "foo", "bar"]).await;
-        assert_eq!(Ok(Value::OK), x);
+        assert_eq!(Ok(Value::Ok), x);
 
 
         let x = run_command(&c, &["strlen", "foo"]).await;
         let x = run_command(&c, &["strlen", "foo"]).await;
         assert_eq!(Ok(Value::Integer(3)), x);
         assert_eq!(Ok(Value::Integer(3)), x);

+ 203 - 0
src/cmd/transaction.rs

@@ -0,0 +1,203 @@
+use crate::{connection::Connection, dispatcher::Dispatcher, error::Error, value::Value};
+use bytes::Bytes;
+
+pub async fn discard(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+    conn.stop_transaction()
+}
+
+pub async fn multi(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+    conn.start_transaction()
+}
+
+pub async fn exec(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+    if !conn.in_transaction() {
+        return Err(Error::NotInTx);
+    }
+
+    if conn.did_keys_change() {
+        let _ = conn.stop_transaction();
+        return Ok(Value::Null);
+    }
+
+    let db = conn.db();
+    let locked_keys = conn.get_tx_keys();
+
+    conn.start_executing_transaction();
+
+    db.lock_keys(&locked_keys);
+
+    let mut results = vec![];
+
+    if let Some(commands) = conn.get_queue_commands() {
+        for args in commands.iter() {
+            let result = match Dispatcher::new(args) {
+                Ok(handler) => handler
+                    .execute(conn, args)
+                    .await
+                    .unwrap_or_else(|x| x.into()),
+                Err(err) => err.into(),
+            };
+            results.push(result);
+        }
+    }
+
+    db.unlock_keys(&locked_keys);
+    let _ = conn.stop_transaction();
+
+    Ok(results.into())
+}
+
+pub async fn watch(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.watch_key(
+        &(&args[1..])
+            .iter()
+            .map(|key| (key, conn.db().get_version(key)))
+            .collect::<Vec<(&Bytes, u128)>>(),
+    );
+    Ok(Value::Ok)
+}
+
+pub async fn unwatch(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+    conn.discard_watched_keys();
+    Ok(Value::Ok)
+}
+
+#[cfg(test)]
+mod test {
+    use crate::dispatcher::Dispatcher;
+    use crate::{
+        cmd::test::{create_connection, run_command},
+        error::Error,
+        value::Value,
+    };
+    use bytes::Bytes;
+
+    #[tokio::test]
+    async fn test_exec() {
+        let c = create_connection();
+
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["multi"]).await);
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(
+            Ok(Value::Queued),
+            run_command(&c, &["set", "foo", "foo"]).await
+        );
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Null,
+                Value::Ok,
+                Value::Blob("foo".into()),
+            ])),
+            run_command(&c, &["exec"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn test_nested_multi() {
+        let c = create_connection();
+
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["multi"]).await);
+        assert_eq!(Err(Error::NestedTx), run_command(&c, &["multi"]).await);
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(
+            Ok(Value::Queued),
+            run_command(&c, &["set", "foo", "foo"]).await
+        );
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Null,
+                Value::Ok,
+                Value::Blob("foo".into()),
+            ])),
+            run_command(&c, &["exec"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn test_discard() {
+        let c = create_connection();
+
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["multi"]).await);
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(
+            Ok(Value::Queued),
+            run_command(&c, &["set", "foo", "foo"]).await
+        );
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["discard"]).await);
+        assert_eq!(
+            Err(Error::NotInTx),
+            run_command(&c, &["exec"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn test_exec_watch_changes() {
+        let c = create_connection();
+
+        assert_eq!(
+            Ok(Value::Ok),
+            run_command(&c, &["watch", "foo", "bar"]).await
+        );
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["set", "foo", "bar"]).await);
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["multi"]).await);
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(
+            Ok(Value::Queued),
+            run_command(&c, &["set", "foo", "foo"]).await
+        );
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(Ok(Value::Null), run_command(&c, &["exec"]).await);
+    }
+
+    #[test]
+    fn test_extract_keys() {
+        assert_eq!(vec!["foo"], get_keys(&["get", "foo"]));
+        assert_eq!(vec!["foo"], get_keys(&["set", "foo", "bar"]));
+        assert_eq!(vec!["foo", "bar"], get_keys(&["mget", "foo", "bar"]));
+        assert_eq!(
+            vec!["key", "key1", "key2"],
+            get_keys(&["SINTERSTORE", "key", "key1", "key2"])
+        );
+    }
+
+    #[tokio::test]
+    async fn test_exec_brpop_not_waiting() {
+        let c = create_connection();
+
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["multi"]).await);
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["brpop", "foo", "1000"]).await);
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Null,
+            ])),
+            run_command(&c, &["exec"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn test_exec_blpop_not_waiting() {
+        let c = create_connection();
+
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["multi"]).await);
+        assert_eq!(Ok(Value::Queued), run_command(&c, &["blpop", "foo", "1000"]).await);
+        assert_eq!(
+            Ok(Value::Array(vec![
+                Value::Null,
+            ])),
+            run_command(&c, &["exec"]).await
+        );
+    }
+
+    fn get_keys(args: &[&str]) -> Vec<Bytes> {
+        let args: Vec<Bytes> = args.iter().map(|s| Bytes::from(s.to_string())).collect();
+        Dispatcher::new(&args)
+            .unwrap()
+            .get_keys(&args)
+            .iter()
+            .map(|k| (*k).clone())
+            .collect()
+    }
+}

+ 150 - 18
src/connection.rs

@@ -1,13 +1,37 @@
-use crate::db::Db;
-use std::collections::BTreeMap;
-use std::net::SocketAddr;
-use std::sync::{Arc, RwLock};
+use crate::{db::Db, error::Error, value::Value};
+use bytes::Bytes;
+use std::{
+    collections::{BTreeMap, HashSet},
+    net::SocketAddr,
+    sync::{Arc, RwLock},
+};
 
 
+#[derive(Debug)]
 pub struct Connections {
 pub struct Connections {
     connections: RwLock<BTreeMap<u128, Arc<Connection>>>,
     connections: RwLock<BTreeMap<u128, Arc<Connection>>>,
     counter: RwLock<u128>,
     counter: RwLock<u128>,
 }
 }
 
 
+#[derive(Debug)]
+pub struct ConnectionInfo {
+    pub name: Option<String>,
+    pub watch_keys: Vec<(Bytes, u128)>,
+    pub tx_keys: HashSet<Bytes>,
+    pub in_transaction: bool,
+    pub in_executing_transaction: bool,
+    pub commands: Option<Vec<Vec<Bytes>>>,
+}
+
+#[derive(Debug)]
+pub struct Connection {
+    id: u128,
+    db: Db,
+    current_db: u32,
+    connections: Arc<Connections>,
+    addr: SocketAddr,
+    info: RwLock<ConnectionInfo>,
+}
+
 impl Connections {
 impl Connections {
     pub fn new() -> Self {
     pub fn new() -> Self {
         Self {
         Self {
@@ -27,17 +51,18 @@ impl Connections {
         addr: SocketAddr,
         addr: SocketAddr,
     ) -> Arc<Connection> {
     ) -> Arc<Connection> {
         let mut id = self.counter.write().unwrap();
         let mut id = self.counter.write().unwrap();
+        *id += 1;
 
 
         let conn = Arc::new(Connection {
         let conn = Arc::new(Connection {
             id: *id,
             id: *id,
-            db,
+            db: db.new_db_instance(*id),
             addr,
             addr,
             connections: self.clone(),
             connections: self.clone(),
             current_db: 0,
             current_db: 0,
-            name: RwLock::new(None),
+            info: RwLock::new(ConnectionInfo::new()),
         });
         });
+
         self.connections.write().unwrap().insert(*id, conn.clone());
         self.connections.write().unwrap().insert(*id, conn.clone());
-        *id += 1;
         conn
         conn
     }
     }
 
 
@@ -48,13 +73,17 @@ impl Connections {
     }
     }
 }
 }
 
 
-pub struct Connection {
-    id: u128,
-    db: Arc<Db>,
-    current_db: u32,
-    connections: Arc<Connections>,
-    addr: SocketAddr,
-    name: RwLock<Option<String>>,
+impl ConnectionInfo {
+    fn new() -> Self {
+        Self {
+            name: None,
+            watch_keys: vec![],
+            tx_keys: HashSet::new(),
+            commands: None,
+            in_transaction: false,
+            in_executing_transaction: false,
+        }
+    }
 }
 }
 
 
 impl Connection {
 impl Connection {
@@ -66,6 +95,109 @@ impl Connection {
         self.id
         self.id
     }
     }
 
 
+    pub fn stop_transaction(&self) -> Result<Value, Error> {
+        let info = &mut self.info.write().unwrap();
+        if info.in_transaction {
+            info.commands = None;
+            info.watch_keys.clear();
+            info.tx_keys.clear();
+            info.in_transaction = false;
+            info.in_executing_transaction = true;
+
+            Ok(Value::Ok)
+        } else {
+            Err(Error::NotInTx)
+        }
+    }
+
+    pub fn start_transaction(&self) -> Result<Value, Error> {
+        let mut info = self.info.write().unwrap();
+        if !info.in_transaction {
+            info.in_transaction = true;
+            Ok(Value::Ok)
+        } else {
+            Err(Error::NestedTx)
+        }
+    }
+
+    /// We are inside a MULTI, most transactions are rather queued for later
+    /// execution instead of being executed right away.
+    pub fn in_transaction(&self) -> bool {
+        self.info.read().unwrap().in_transaction
+    }
+
+    /// The commands are being executed inside a transaction (by EXEC). It is
+    /// important to keep track of this because some commands change their
+    /// behaviour.
+    pub fn is_executing_transaction(&self) -> bool {
+        self.info.read().unwrap().in_executing_transaction
+    }
+
+    /// EXEC has been called and we need to keep track
+    pub fn start_executing_transaction(&self) {
+        let info = &mut self.info.write().unwrap();
+        info.in_executing_transaction = true;
+    }
+
+    pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {
+        let watch_keys = &mut self.info.write().unwrap().watch_keys;
+        keys.iter()
+            .map(|(bytes, version)| {
+                watch_keys.push(((*bytes).clone(), *version));
+            })
+            .for_each(drop);
+    }
+
+    pub fn did_keys_change(&self) -> bool {
+        let watch_keys = &self.info.read().unwrap().watch_keys;
+
+        for key in watch_keys.iter() {
+            if self.db.get_version(&key.0) != key.1 {
+                return true;
+            }
+        }
+
+        false
+    }
+
+    pub fn discard_watched_keys(&self) {
+        let watch_keys = &mut self.info.write().unwrap().watch_keys;
+        watch_keys.clear();
+    }
+
+    pub fn get_tx_keys(&self) -> Vec<Bytes> {
+        self.info
+            .read()
+            .unwrap()
+            .tx_keys
+            .iter()
+            .cloned()
+            .collect::<Vec<Bytes>>()
+    }
+
+    pub fn queue_command(&self, args: &[Bytes]) {
+        let info = &mut self.info.write().unwrap();
+        let commands = info.commands.get_or_insert(vec![]);
+        commands.push(args.iter().map(|m| (*m).clone()).collect());
+    }
+
+    pub fn get_queue_commands(&self) -> Option<Vec<Vec<Bytes>>> {
+        let info = &mut self.info.write().unwrap();
+        info.watch_keys = vec![];
+        info.in_transaction = false;
+        info.commands.take()
+    }
+
+    pub fn tx_keys(&self, keys: Vec<&Bytes>) {
+        #[allow(clippy::mutable_key_type)]
+        let tx_keys = &mut self.info.write().unwrap().tx_keys;
+        keys.iter()
+            .map(|k| {
+                tx_keys.insert((*k).clone());
+            })
+            .for_each(drop);
+    }
+
     pub fn destroy(self: Arc<Connection>) {
     pub fn destroy(self: Arc<Connection>) {
         self.connections.clone().remove(self);
         self.connections.clone().remove(self);
     }
     }
@@ -75,12 +207,12 @@ impl Connection {
     }
     }
 
 
     pub fn name(&self) -> Option<String> {
     pub fn name(&self) -> Option<String> {
-        self.name.read().unwrap().clone()
+        self.info.read().unwrap().name.clone()
     }
     }
 
 
     pub fn set_name(&self, name: String) {
     pub fn set_name(&self, name: String) {
-        let mut r = self.name.write().unwrap();
-        *r = Some(name);
+        let mut r = self.info.write().unwrap();
+        r.name = Some(name);
     }
     }
 
 
     #[allow(dead_code)]
     #[allow(dead_code)]
@@ -93,7 +225,7 @@ impl Connection {
             "id={} addr={} name={:?} db={}\r\n",
             "id={} addr={} name={:?} db={}\r\n",
             self.id,
             self.id,
             self.addr,
             self.addr,
-            self.name.read().unwrap(),
+            self.info.read().unwrap().name,
             self.current_db
             self.current_db
         )
         )
     }
     }

+ 26 - 2
src/db/entry.rs

@@ -1,12 +1,21 @@
 use crate::{error::Error, value::Value};
 use crate::{error::Error, value::Value};
+use std::time::SystemTime;
 use tokio::time::Instant;
 use tokio::time::Instant;
 
 
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct Entry {
 pub struct Entry {
     pub value: Value,
     pub value: Value,
+    pub version: u128,
     expires_at: Option<Instant>,
     expires_at: Option<Instant>,
 }
 }
 
 
+pub fn new_version() -> u128 {
+    SystemTime::now()
+        .duration_since(SystemTime::UNIX_EPOCH)
+        .expect("get millis error")
+        .as_nanos()
+}
+
 /// Database Entry
 /// Database Entry
 ///
 ///
 /// A database entry is a Value associated with an optional ttl.
 /// A database entry is a Value associated with an optional ttl.
@@ -16,7 +25,15 @@ pub struct Entry {
 /// so more frequently.
 /// so more frequently.
 impl Entry {
 impl Entry {
     pub fn new(value: Value, expires_at: Option<Instant>) -> Self {
     pub fn new(value: Value, expires_at: Option<Instant>) -> Self {
-        Self { value, expires_at }
+        Self {
+            value,
+            expires_at,
+            version: new_version(),
+        }
+    }
+
+    pub fn bump_version(&mut self) {
+        self.version = new_version();
     }
     }
 
 
     pub fn persist(&mut self) {
     pub fn persist(&mut self) {
@@ -33,16 +50,23 @@ impl Entry {
 
 
     pub fn set_ttl(&mut self, expires_at: Instant) {
     pub fn set_ttl(&mut self, expires_at: Instant) {
         self.expires_at = Some(expires_at);
         self.expires_at = Some(expires_at);
+        self.version = new_version();
+    }
+
+    pub fn version(&self) -> u128 {
+        self.version
     }
     }
 
 
     /// Changes the value that is wrapped in this entry, the TTL (expired_at) is
     /// Changes the value that is wrapped in this entry, the TTL (expired_at) is
     /// not affected.
     /// not affected.
     pub fn change_value(&mut self, value: Value) {
     pub fn change_value(&mut self, value: Value) {
         self.value = value;
         self.value = value;
+        self.version = new_version();
     }
     }
 
 
     #[allow(dead_code)]
     #[allow(dead_code)]
     pub fn get_mut(&mut self) -> &mut Value {
     pub fn get_mut(&mut self) -> &mut Value {
+        self.version = new_version();
         &mut self.value
         &mut self.value
     }
     }
 
 
@@ -71,7 +95,7 @@ impl Entry {
                 | Value::Float(_)
                 | Value::Float(_)
                 | Value::String(_)
                 | Value::String(_)
                 | Value::Null
                 | Value::Null
-                | Value::OK
+                | Value::Ok
         )
         )
     }
     }
 
 

+ 116 - 17
src/db/mod.rs

@@ -3,7 +3,7 @@ mod expiration;
 
 
 use crate::{error::Error, value::Value};
 use crate::{error::Error, value::Value};
 use bytes::Bytes;
 use bytes::Bytes;
-use entry::Entry;
+use entry::{new_version, Entry};
 use expiration::ExpirationDb;
 use expiration::ExpirationDb;
 use log::trace;
 use log::trace;
 use seahash::hash;
 use seahash::hash;
@@ -11,7 +11,8 @@ use std::{
     collections::HashMap,
     collections::HashMap,
     convert::{TryFrom, TryInto},
     convert::{TryFrom, TryInto},
     ops::AddAssign,
     ops::AddAssign,
-    sync::{Mutex, RwLock},
+    sync::{Arc, Mutex, RwLock},
+    thread,
 };
 };
 use tokio::time::{Duration, Instant};
 use tokio::time::{Duration, Instant};
 
 
@@ -25,13 +26,25 @@ pub struct Db {
     ///
     ///
     /// Because all operations are always key specific, the key is used to hash
     /// Because all operations are always key specific, the key is used to hash
     /// and select to which HashMap the data might be stored.
     /// and select to which HashMap the data might be stored.
-    entries: Vec<RwLock<HashMap<Bytes, Entry>>>,
+    entries: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
 
 
     /// Data structure to store all expiring keys
     /// Data structure to store all expiring keys
-    expirations: Mutex<ExpirationDb>,
+    expirations: Arc<Mutex<ExpirationDb>>,
 
 
     /// Number of HashMaps that are available.
     /// Number of HashMaps that are available.
     slots: usize,
     slots: usize,
+
+    // A Database is attached to a conn_id. The entries and expiration data
+    // structures are shared between all connections.
+    //
+    // This particular database instace is attached to a conn_id, used to block
+    // all keys in case of a transaction.
+    conn_id: u128,
+
+    // HashMap of all blocked keys by other connections. If a key appears in
+    // here and it is not being hold by the current connection, current
+    // connection must wait.
+    tx_key_locks: Arc<RwLock<HashMap<Bytes, u128>>>,
 }
 }
 
 
 impl Db {
 impl Db {
@@ -43,19 +56,85 @@ impl Db {
         }
         }
 
 
         Self {
         Self {
-            entries,
-            expirations: Mutex::new(ExpirationDb::new()),
+            entries: Arc::new(entries),
+            expirations: Arc::new(Mutex::new(ExpirationDb::new())),
+            conn_id: 0,
+            tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
             slots,
             slots,
         }
         }
     }
     }
 
 
+    pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Db {
+        Self {
+            entries: self.entries.clone(),
+            tx_key_locks: self.tx_key_locks.clone(),
+            expirations: self.expirations.clone(),
+            conn_id,
+            slots: self.slots,
+        }
+    }
+
     #[inline]
     #[inline]
     fn get_slot(&self, key: &Bytes) -> usize {
     fn get_slot(&self, key: &Bytes) -> usize {
         let id = (hash(key) as usize) % self.entries.len();
         let id = (hash(key) as usize) % self.entries.len();
         trace!("selected slot {} for key {:?}", id, key);
         trace!("selected slot {} for key {:?}", id, key);
+
+        let waiting = Duration::from_nanos(100);
+
+        while let Some(blocker) = self.tx_key_locks.read().unwrap().get(key) {
+            // Loop while the key we are trying to access is being blocked by a
+            // connection in a transaction
+            if *blocker == self.conn_id {
+                // the key is being blocked by ourself, it is safe to break the
+                // waiting loop
+                break;
+            }
+
+            thread::sleep(waiting);
+        }
+
         id
         id
     }
     }
 
 
+    pub fn lock_keys(&self, keys: &[Bytes]) {
+        let waiting = Duration::from_nanos(100);
+        loop {
+            let mut lock = self.tx_key_locks.write().unwrap();
+            let mut i = 0;
+
+            for key in keys.iter() {
+                if let Some(blocker) = lock.get(key) {
+                    if *blocker == self.conn_id {
+                        // It is blocked by us already.
+                        continue;
+                    }
+                    // It is blocked by another tx, we need to break
+                    // and retry to gain the lock over this key
+                    break;
+                }
+                lock.insert(key.clone(), self.conn_id);
+                i += 1;
+            }
+
+            if i == keys.len() {
+                // All the involved keys are successfully being blocked
+                // exclusely.
+                break;
+            }
+
+            // We need to sleep a bit and retry.
+            drop(lock);
+            thread::sleep(waiting);
+        }
+    }
+
+    pub fn unlock_keys(&self, keys: &[Bytes]) {
+        let mut lock = self.tx_key_locks.write().unwrap();
+        for key in keys.iter() {
+            lock.remove(key);
+        }
+    }
+
     pub fn incr<
     pub fn incr<
         T: ToString + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
         T: ToString + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
     >(
     >(
@@ -164,6 +243,26 @@ impl Db {
         }
         }
     }
     }
 
 
+    pub fn bump_version(&self, key: &Bytes) -> bool {
+        let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+        entries
+            .get_mut(key)
+            .filter(|x| x.is_valid())
+            .map(|entry| {
+                entry.bump_version();
+            })
+            .is_some()
+    }
+
+    pub fn get_version(&self, key: &Bytes) -> u128 {
+        let entries = self.entries[self.get_slot(key)].read().unwrap();
+        entries
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map(|entry| entry.version())
+            .unwrap_or_else(new_version)
+    }
+
     pub fn get(&self, key: &Bytes) -> Value {
     pub fn get(&self, key: &Bytes) -> Value {
         let entries = self.entries[self.get_slot(key)].read().unwrap();
         let entries = self.entries[self.get_slot(key)].read().unwrap();
         entries
         entries
@@ -210,7 +309,7 @@ impl Db {
             self.expirations.lock().unwrap().add(key, expires_at);
             self.expirations.lock().unwrap().add(key, expires_at);
         }
         }
         entries.insert(key.clone(), Entry::new(value, expires_at));
         entries.insert(key.clone(), Entry::new(value, expires_at));
-        Value::OK
+        Value::Ok
     }
     }
 
 
     pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
     pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
@@ -305,11 +404,11 @@ mod test {
     #[test]
     #[test]
     fn del() {
     fn del() {
         let db = Db::new(100);
         let db = Db::new(100);
-        db.set(&bytes!(b"expired"), Value::OK, Some(Duration::from_secs(0)));
-        db.set(&bytes!(b"valid"), Value::OK, None);
+        db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
+        db.set(&bytes!(b"valid"), Value::Ok, None);
         db.set(
         db.set(
             &bytes!(b"expiring"),
             &bytes!(b"expiring"),
-            Value::OK,
+            Value::Ok,
             Some(Duration::from_secs(5)),
             Some(Duration::from_secs(5)),
         );
         );
 
 
@@ -327,11 +426,11 @@ mod test {
     #[test]
     #[test]
     fn ttl() {
     fn ttl() {
         let db = Db::new(100);
         let db = Db::new(100);
-        db.set(&bytes!(b"expired"), Value::OK, Some(Duration::from_secs(0)));
-        db.set(&bytes!(b"valid"), Value::OK, None);
+        db.set(&bytes!(b"expired"), Value::Ok, Some(Duration::from_secs(0)));
+        db.set(&bytes!(b"valid"), Value::Ok, None);
         db.set(
         db.set(
             &bytes!(b"expiring"),
             &bytes!(b"expiring"),
-            Value::OK,
+            Value::Ok,
             Some(Duration::from_secs(5)),
             Some(Duration::from_secs(5)),
         );
         );
 
 
@@ -347,7 +446,7 @@ mod test {
     #[test]
     #[test]
     fn purge_keys() {
     fn purge_keys() {
         let db = Db::new(100);
         let db = Db::new(100);
-        db.set(&bytes!(b"one"), Value::OK, Some(Duration::from_secs(0)));
+        db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
         // Expired keys should not be returned, even if they are not yet
         // Expired keys should not be returned, even if they are not yet
         // removed by the purge process.
         // removed by the purge process.
         assert_eq!(Value::Null, db.get(&bytes!(b"one")));
         assert_eq!(Value::Null, db.get(&bytes!(b"one")));
@@ -362,13 +461,13 @@ mod test {
     #[test]
     #[test]
     fn replace_purge_keys() {
     fn replace_purge_keys() {
         let db = Db::new(100);
         let db = Db::new(100);
-        db.set(&bytes!(b"one"), Value::OK, Some(Duration::from_secs(0)));
+        db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
         // Expired keys should not be returned, even if they are not yet
         // Expired keys should not be returned, even if they are not yet
         // removed by the purge process.
         // removed by the purge process.
         assert_eq!(Value::Null, db.get(&bytes!(b"one")));
         assert_eq!(Value::Null, db.get(&bytes!(b"one")));
 
 
-        db.set(&bytes!(b"one"), Value::OK, Some(Duration::from_secs(5)));
-        assert_eq!(Value::OK, db.get(&bytes!(b"one")));
+        db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(5)));
+        assert_eq!(Value::Ok, db.get(&bytes!(b"one")));
 
 
         // Purge should return 0 as the expired key has been removed already
         // Purge should return 0 as the expired key has been removed already
         assert_eq!(0, db.purge());
         assert_eq!(0, db.purge());

+ 356 - 14
src/dispatcher.rs

@@ -13,95 +13,151 @@ async fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     Ok(vec![seconds.as_str(), millis.as_str()].into())
     Ok(vec![seconds.as_str(), millis.as_str()].into())
 }
 }
 
 
-async fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
-    let now = SystemTime::now();
-    let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
-    let in_ms: i128 =
-        since_the_epoch.as_secs() as i128 * 1000 + since_the_epoch.subsec_millis() as i128;
-    Ok(format!("{}", in_ms).as_str().into())
-}
-
 dispatcher! {
 dispatcher! {
     set {
     set {
         sadd {
         sadd {
             cmd::set::sadd,
             cmd::set::sadd,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         scard {
         scard {
             cmd::set::scard,
             cmd::set::scard,
             [""],
             [""],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         sdiff {
         sdiff {
             cmd::set::sdiff,
             cmd::set::sdiff,
             [""],
             [""],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         sdiffstore {
         sdiffstore {
             cmd::set::sdiffstore,
             cmd::set::sdiffstore,
             [""],
             [""],
             -3,
             -3,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         sinter {
         sinter {
             cmd::set::sinter,
             cmd::set::sinter,
             [""],
             [""],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         sintercard {
         sintercard {
             cmd::set::sintercard,
             cmd::set::sintercard,
             [""],
             [""],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         sinterstore {
         sinterstore {
             cmd::set::sinterstore,
             cmd::set::sinterstore,
             [""],
             [""],
             -3,
             -3,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         sismember {
         sismember {
             cmd::set::sismember,
             cmd::set::sismember,
             [""],
             [""],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         smembers {
         smembers {
             cmd::set::smembers,
             cmd::set::smembers,
             [""],
             [""],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         smismember {
         smismember {
             cmd::set::smismember,
             cmd::set::smismember,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         smove {
         smove {
             cmd::set::smove,
             cmd::set::smove,
             [""],
             [""],
             4,
             4,
+            1,
+            2,
+            1,
+            true,
         },
         },
         spop {
         spop {
             cmd::set::spop,
             cmd::set::spop,
             [""],
             [""],
             -2,
             -2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         srandmember {
         srandmember {
             cmd::set::srandmember,
             cmd::set::srandmember,
             [""],
             [""],
             -2,
             -2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         srem {
         srem {
             cmd::set::srem,
             cmd::set::srem,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         sunion {
         sunion {
             cmd::set::sunion,
             cmd::set::sunion,
             [""],
             [""],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         sunionstore {
         sunionstore {
             cmd::set::sunionstore,
             cmd::set::sunionstore,
             [""],
             [""],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
     },
     },
     list {
     list {
@@ -109,91 +165,163 @@ dispatcher! {
             cmd::list::blpop,
             cmd::list::blpop,
             [""],
             [""],
             -3,
             -3,
+            1,
+            -2,
+            1,
+            true,
         },
         },
         brpop {
         brpop {
             cmd::list::brpop,
             cmd::list::brpop,
             [""],
             [""],
             -3,
             -3,
+            1,
+            -2,
+            1,
+            true,
         },
         },
         lindex {
         lindex {
             cmd::list::lindex,
             cmd::list::lindex,
             [""],
             [""],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         linsert {
         linsert {
             cmd::list::linsert,
             cmd::list::linsert,
             [""],
             [""],
             5,
             5,
+            1,
+            1,
+            1,
+            true,
         },
         },
         llen {
         llen {
             cmd::list::llen,
             cmd::list::llen,
             [""],
             [""],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         lmove {
         lmove {
             cmd::list::lmove,
             cmd::list::lmove,
             [""],
             [""],
             5,
             5,
+            1,
+            2,
+            1,
+            true,
         },
         },
         lpop {
         lpop {
             cmd::list::lpop,
             cmd::list::lpop,
             [""],
             [""],
             -2,
             -2,
+            1,
+            -2,
+            1,
+            true,
         },
         },
         lpos {
         lpos {
             cmd::list::lpos,
             cmd::list::lpos,
             [""],
             [""],
             -2,
             -2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         lpush {
         lpush {
             cmd::list::lpush,
             cmd::list::lpush,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         lpushx {
         lpushx {
             cmd::list::lpush,
             cmd::list::lpush,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         lrange {
         lrange {
             cmd::list::lrange,
             cmd::list::lrange,
             [""],
             [""],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         lrem {
         lrem {
             cmd::list::lrem,
             cmd::list::lrem,
             [""],
             [""],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         lset {
         lset {
             cmd::list::lset,
             cmd::list::lset,
             [""],
             [""],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         ltrim {
         ltrim {
             cmd::list::ltrim,
             cmd::list::ltrim,
             [""],
             [""],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         rpop {
         rpop {
             cmd::list::rpop,
             cmd::list::rpop,
             [""],
             [""],
             -2,
             -2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         rpoplpush {
         rpoplpush {
             cmd::list::rpoplpush,
             cmd::list::rpoplpush,
             [""],
             [""],
             3,
             3,
+            1,
+            2,
+            1,
+            true,
         },
         },
         rpush {
         rpush {
             cmd::list::rpush,
             cmd::list::rpush,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         rpushx {
         rpushx {
             cmd::list::rpush,
             cmd::list::rpush,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
     },
     },
     hash {
     hash {
@@ -201,76 +329,136 @@ dispatcher! {
             cmd::hash::hdel,
             cmd::hash::hdel,
             [""],
             [""],
             -2,
             -2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hexists {
         hexists {
             cmd::hash::hexists,
             cmd::hash::hexists,
             [""],
             [""],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hget {
         hget {
             cmd::hash::hget,
             cmd::hash::hget,
             [""],
             [""],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hgetall {
         hgetall {
             cmd::hash::hgetall,
             cmd::hash::hgetall,
             [""],
             [""],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hincrby {
         hincrby {
             cmd::hash::hincrby::<i64>,
             cmd::hash::hincrby::<i64>,
             [""],
             [""],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hincrbyfloat {
         hincrbyfloat {
             cmd::hash::hincrby::<f64>,
             cmd::hash::hincrby::<f64>,
             [""],
             [""],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hkeys {
         hkeys {
             cmd::hash::hkeys,
             cmd::hash::hkeys,
             [""],
             [""],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hlen {
         hlen {
             cmd::hash::hlen,
             cmd::hash::hlen,
             [""],
             [""],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hmget {
         hmget {
             cmd::hash::hmget,
             cmd::hash::hmget,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hmset {
         hmset {
             cmd::hash::hset,
             cmd::hash::hset,
             [""],
             [""],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hrandfield {
         hrandfield {
             cmd::hash::hrandfield,
             cmd::hash::hrandfield,
             [""],
             [""],
             -2,
             -2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hset {
         hset {
             cmd::hash::hset,
             cmd::hash::hset,
             [""],
             [""],
             -4,
             -4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hsetnx {
         hsetnx {
             cmd::hash::hsetnx,
             cmd::hash::hsetnx,
             [""],
             [""],
-            -4,
+            4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hstrlen {
         hstrlen {
             cmd::hash::hstrlen,
             cmd::hash::hstrlen,
             [""],
             [""],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         hvals {
         hvals {
             cmd::hash::hvals,
             cmd::hash::hvals,
             [""],
             [""],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
     },
     },
     keys {
     keys {
@@ -278,56 +466,100 @@ dispatcher! {
             cmd::key::del,
             cmd::key::del,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         exists {
         exists {
             cmd::key::exists,
             cmd::key::exists,
             ["read" "fast"],
             ["read" "fast"],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         expire {
         expire {
             cmd::key::expire,
             cmd::key::expire,
             ["read" "write" "fast"],
             ["read" "write" "fast"],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         expireat {
         expireat {
             cmd::key::expire_at,
             cmd::key::expire_at,
             ["read" "write" "fast"],
             ["read" "write" "fast"],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         expiretime {
         expiretime {
             cmd::key::expire_time,
             cmd::key::expire_time,
             ["read" "write" "fast"],
             ["read" "write" "fast"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         persist {
         persist {
             cmd::key::persist,
             cmd::key::persist,
             ["write" "fast"],
             ["write" "fast"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         pexpire {
         pexpire {
             cmd::key::expire,
             cmd::key::expire,
             ["read" "write" "fast"],
             ["read" "write" "fast"],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         pexpireat {
         pexpireat {
             cmd::key::expire_at,
             cmd::key::expire_at,
             ["read" "write" "fast"],
             ["read" "write" "fast"],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         pexpiretime {
         pexpiretime {
             cmd::key::expire_time,
             cmd::key::expire_time,
             ["read" "write" "fast"],
             ["read" "write" "fast"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         pttl {
         pttl {
             cmd::key::ttl,
             cmd::key::ttl,
             ["read" "read"],
             ["read" "read"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         ttl {
         ttl {
             cmd::key::ttl,
             cmd::key::ttl,
             ["read" "read"],
             ["read" "read"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
     },
     },
     string {
     string {
@@ -335,66 +567,118 @@ dispatcher! {
             cmd::string::decr,
             cmd::string::decr,
             ["write" "denyoom" "fast"],
             ["write" "denyoom" "fast"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         decrby {
         decrby {
             cmd::string::decr_by,
             cmd::string::decr_by,
             ["write" "denyoom" "fast"],
             ["write" "denyoom" "fast"],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         get {
         get {
             cmd::string::get,
             cmd::string::get,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         getdel {
         getdel {
             cmd::string::getdel,
             cmd::string::getdel,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         getset {
         getset {
             cmd::string::getset,
             cmd::string::getset,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
-            -3,
+            3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         incr {
         incr {
             cmd::string::incr,
             cmd::string::incr,
             ["write" "denyoom" "fast"],
             ["write" "denyoom" "fast"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         },
         },
         incrby {
         incrby {
             cmd::string::incr_by,
             cmd::string::incr_by,
             ["write" "denyoom" "fast"],
             ["write" "denyoom" "fast"],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         incrbyfloat {
         incrbyfloat {
             cmd::string::incr_by_float,
             cmd::string::incr_by_float,
             ["write" "denyoom" "fast"],
             ["write" "denyoom" "fast"],
             3,
             3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         mget {
         mget {
             cmd::string::mget,
             cmd::string::mget,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             -2,
             -2,
+            1,
+            -1,
+            1,
+            true,
         },
         },
         set {
         set {
             cmd::string::set,
             cmd::string::set,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             -3,
             -3,
+            1,
+            1,
+            1,
+            true,
         },
         },
         setex {
         setex {
             cmd::string::setex,
             cmd::string::setex,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         psetex {
         psetex {
             cmd::string::setex,
             cmd::string::setex,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             4,
             4,
+            1,
+            1,
+            1,
+            true,
         },
         },
         strlen {
         strlen {
             cmd::string::strlen,
             cmd::string::strlen,
             ["random" "fast"],
             ["random" "fast"],
             2,
             2,
+            1,
+            1,
+            1,
+            true,
         }
         }
     },
     },
     connection {
     connection {
@@ -402,28 +686,86 @@ dispatcher! {
             cmd::client::client,
             cmd::client::client,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             -2,
             -2,
+            0,
+            0,
+            0,
+            true,
         },
         },
         echo {
         echo {
             cmd::client::echo,
             cmd::client::echo,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             2,
             2,
+            0,
+            0,
+            0,
+            true,
         },
         },
         ping {
         ping {
             cmd::client::ping,
             cmd::client::ping,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             -1,
             -1,
+            0,
+            0,
+            0,
+            true,
         },
         },
     },
     },
-    server {
-        command  {
-            do_command,
-            ["random" "loading" "stale"],
+    transaction {
+        discard {
+            cmd::transaction::discard,
+            [""],
+            1,
+            0,
+            0,
+            0,
+            false,
+        },
+        exec {
+            cmd::transaction::exec,
+            [""],
+            1,
+            0,
+            0,
+            0,
+            false,
+        },
+        multi {
+            cmd::transaction::multi,
+            [""],
             1,
             1,
+            0,
+            0,
+            0,
+            false,
         },
         },
+        watch {
+            cmd::transaction::watch,
+            [""],
+            -2,
+            1,
+            -1,
+            1,
+            false,
+        },
+        unwatch {
+            cmd::transaction::unwatch,
+            [""],
+            1,
+            0,
+            0,
+            0,
+            true,
+        },
+    },
+    server {
         time {
         time {
             do_time,
             do_time,
             ["random" "loading" "stale"],
             ["random" "loading" "stale"],
             1,
             1,
+            0,
+            0,
+            0,
+            true,
         },
         },
     }
     }
 }
 }

+ 6 - 0
src/error.rs

@@ -10,6 +10,8 @@ pub enum Error {
     OutOfRange,
     OutOfRange,
     Syntax,
     Syntax,
     NotANumber,
     NotANumber,
+    NotInTx,
+    NestedTx,
     WrongType,
     WrongType,
 }
 }
 
 
@@ -17,6 +19,8 @@ impl From<Error> for Value {
     fn from(value: Error) -> Value {
     fn from(value: Error) -> Value {
         let err_type = match value {
         let err_type = match value {
             Error::WrongType => "WRONGTYPE",
             Error::WrongType => "WRONGTYPE",
+            Error::NestedTx => "ERR MULTI",
+            Error::NotInTx => "ERR EXEC",
             _ => "ERR",
             _ => "ERR",
         };
         };
 
 
@@ -24,10 +28,12 @@ impl From<Error> for Value {
             Error::CommandNotFound(x) => format!("unknown command `{}`", x),
             Error::CommandNotFound(x) => format!("unknown command `{}`", x),
             Error::InvalidArgsCount(x) => format!("wrong number of arguments for '{}' command", x),
             Error::InvalidArgsCount(x) => format!("wrong number of arguments for '{}' command", x),
             Error::Protocol(x, y) => format!("Protocol error: expected '{}', got '{}'", x, y),
             Error::Protocol(x, y) => format!("Protocol error: expected '{}', got '{}'", x, y),
+            Error::NotInTx => " without MULTI".to_owned(),
             Error::NotANumber => "value is not an integer or out of range".to_owned(),
             Error::NotANumber => "value is not an integer or out of range".to_owned(),
             Error::OutOfRange => "index out of range".to_owned(),
             Error::OutOfRange => "index out of range".to_owned(),
             Error::Syntax => "syntax error".to_owned(),
             Error::Syntax => "syntax error".to_owned(),
             Error::NotFound => "no such key".to_owned(),
             Error::NotFound => "no such key".to_owned(),
+            Error::NestedTx => "calls can not be nested".to_owned(),
             Error::WrongArgument(x, y) => format!(
             Error::WrongArgument(x, y) => format!(
                 "Unknown subcommand or wrong number of arguments for '{}'. Try {} HELP.",
                 "Unknown subcommand or wrong number of arguments for '{}'. Try {} HELP.",
                 y, x
                 y, x

+ 46 - 1
src/macros.rs

@@ -6,6 +6,10 @@ macro_rules! dispatcher {
                 $handler:expr,
                 $handler:expr,
                 [$($tag:tt)+],
                 [$($tag:tt)+],
                 $min_args:expr,
                 $min_args:expr,
+                $key_start:expr,
+                $key_stop:expr,
+                $key_step:expr,
+                $queueable:expr,
             }),+$(,)?
             }),+$(,)?
         }),+$(,)?
         }),+$(,)?
     }=>  {
     }=>  {
@@ -18,6 +22,9 @@ macro_rules! dispatcher {
                 pub struct Command {
                 pub struct Command {
                     pub tags: &'static [&'static str],
                     pub tags: &'static [&'static str],
                     pub min_args: i32,
                     pub min_args: i32,
+                    pub key_start: i32,
+                    pub key_stop: i32,
+                    pub key_step: usize,
                 }
                 }
 
 
                 impl Command {
                 impl Command {
@@ -25,6 +32,9 @@ macro_rules! dispatcher {
                         Self {
                         Self {
                             tags: &[$($tag,)+],
                             tags: &[$($tag,)+],
                             min_args: $min_args,
                             min_args: $min_args,
+                            key_start: $key_start,
+                            key_stop: $key_stop,
+                            key_step: $key_step,
                         }
                         }
                     }
                     }
                 }
                 }
@@ -32,7 +42,38 @@ macro_rules! dispatcher {
                 #[async_trait]
                 #[async_trait]
                 impl ExecutableCommand for Command {
                 impl ExecutableCommand for Command {
                     async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                     async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-                        $handler(conn, args).await
+                        if conn.in_transaction() && self.is_queueable() {
+                            conn.queue_command(args);
+                            conn.tx_keys(self.get_keys(args));
+                            Ok(Value::Queued)
+                        } else {
+                            $handler(conn, args).await
+                        }
+                    }
+
+                    fn is_queueable(&self) -> bool {
+                        $queueable
+                    }
+
+                    fn get_keys<'a>(&self, args: &'a [Bytes]) -> Vec<&'a Bytes> {
+                        let start = self.key_start;
+                        let stop  = if self.key_stop > 0 {
+                            self.key_stop
+                        } else {
+                            (args.len() as i32) + self.key_stop
+                        };
+
+                        if start == 0 {
+                            return vec![];
+                        }
+
+                        let mut result = vec![];
+
+                        for i in (start .. stop+1).step_by(self.key_step) {
+                            result.push(&args[i as usize]);
+                        }
+
+                        result
                     }
                     }
 
 
                     fn check_number_args(&self, n: usize) -> bool {
                     fn check_number_args(&self, n: usize) -> bool {
@@ -61,6 +102,10 @@ macro_rules! dispatcher {
         pub trait ExecutableCommand {
         pub trait ExecutableCommand {
             async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
             async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
 
 
+            fn is_queueable(&self) -> bool;
+
+            fn get_keys<'a>(&self, args: &'a [Bytes]) -> Vec<&'a Bytes>;
+
             fn check_number_args(&self, n: usize) -> bool;
             fn check_number_args(&self, n: usize) -> bool;
 
 
             fn group(&self) -> &'static str;
             fn group(&self) -> &'static str;

+ 1 - 2
src/server.rs

@@ -3,7 +3,7 @@ use bytes::{Buf, Bytes, BytesMut};
 use futures::SinkExt;
 use futures::SinkExt;
 use log::{info, trace, warn};
 use log::{info, trace, warn};
 use redis_zero_protocol_parser::{parse_server, Error as RedisError};
 use redis_zero_protocol_parser::{parse_server, Error as RedisError};
-use std::{error::Error, io, ops::Deref, sync::Arc};
+use std::{error::Error, io, sync::Arc};
 use tokio::{
 use tokio::{
     net::TcpListener,
     net::TcpListener,
     time::{sleep, Duration},
     time::{sleep, Duration},
@@ -76,7 +76,6 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
                             Ok(args) => match Dispatcher::new(&args) {
                             Ok(args) => match Dispatcher::new(&args) {
                                 Ok(handler) => {
                                 Ok(handler) => {
                                     let r = handler
                                     let r = handler
-                                        .deref()
                                         .execute(&conn, &args)
                                         .execute(&conn, &args)
                                         .await
                                         .await
                                         .unwrap_or_else(|x| x.into());
                                         .unwrap_or_else(|x| x.into());

+ 4 - 2
src/value/mod.rs

@@ -24,7 +24,8 @@ pub enum Value {
     Float(f64),
     Float(f64),
     BigInteger(i128),
     BigInteger(i128),
     Null,
     Null,
-    OK,
+    Queued,
+    Ok,
 }
 }
 
 
 impl From<&Value> for Vec<u8> {
 impl From<&Value> for Vec<u8> {
@@ -51,7 +52,8 @@ impl From<&Value> for Vec<u8> {
             }
             }
             Value::Err(x, y) => format!("-{} {}\r\n", x, y).into(),
             Value::Err(x, y) => format!("-{} {}\r\n", x, y).into(),
             Value::String(x) => format!("+{}\r\n", x).into(),
             Value::String(x) => format!("+{}\r\n", x).into(),
-            Value::OK => "+OK\r\n".into(),
+            Value::Queued => "+QUEUED\r\n".into(),
+            Value::Ok => "+OK\r\n".into(),
             _ => b"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n".to_vec(),
             _ => b"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n".to_vec(),
         }
         }
     }
     }