Browse Source

Added `keys` command (#14)

* Added `keys` command

Added `keys` command.

* Add TYPE command

* Added UNLINK which is an alias to DEL

* Added COPY command

* Added support for MOVE

Also renamed all command definitions to uppercase to avoid using
reserved keywords.

* Added RENAME and RENAMENX

* Added OBJECT command handler

Also improved the help sector adding a cmd/help.rs which hosts all the
help texts for all commands.

* Added Cursor type (#35)

The cursor type is serialized an string with integers. The integer
contains a tuple with (bucket_id, last_position) and a checksum. If the
cursor is invalid a new cursor with position (0, 0) is deserialized
instead.
César D. Rodas 3 years ago
parent
commit
495cff1d12
16 changed files with 1246 additions and 248 deletions
  1. 92 88
      Cargo.lock
  2. 11 7
      Cargo.toml
  3. 48 0
      src/cmd/help.rs
  4. 310 19
      src/cmd/key.rs
  5. 1 0
      src/cmd/mod.rs
  6. 5 7
      src/cmd/pubsub.rs
  7. 1 15
      src/cmd/server.rs
  8. 4 0
      src/db/entry.rs
  9. 306 6
      src/db/mod.rs
  10. 36 0
      src/db/scan.rs
  11. 181 100
      src/dispatcher/mod.rs
  12. 27 2
      src/error.rs
  13. 4 4
      src/macros.rs
  14. 119 0
      src/value/cursor.rs
  15. 12 0
      src/value/mod.rs
  16. 89 0
      src/value/typ.rs

+ 92 - 88
Cargo.lock

@@ -19,9 +19,9 @@ dependencies = [
 
 [[package]]
 name = "aspect"
-version = "0.2.1"
+version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba3f55a9bdf36dc7756a50fe239b5c457304a0c72fe5d8125ea4612fc161c6ca"
+checksum = "b3927b415bba088539aaaf872d19752c7d00101a25ead1d123fcd7633f9c224d"
 dependencies = [
  "aspect-weave",
 ]
@@ -41,9 +41,12 @@ dependencies = [
 
 [[package]]
 name = "atomic"
-version = "0.4.6"
+version = "0.5.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "64f46ca51dca4837f1520754d1c8c36636356b81553d928dc9c177025369a06e"
+checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c"
+dependencies = [
+ "autocfg",
+]
 
 [[package]]
 name = "atty"
@@ -122,32 +125,17 @@ checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
 
 [[package]]
 name = "cfg-if"
-version = "0.1.10"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
-
-[[package]]
-name = "cfg-if"
 version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
-name = "cloudabi"
-version = "0.0.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
-dependencies = [
- "bitflags",
-]
-
-[[package]]
 name = "crc32fast"
-version = "1.3.0"
+version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "738c290dfaea84fc1ca15ad9c168d083b05a714e1efddd8edaab678dc28d2836"
+checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
 ]
 
 [[package]]
@@ -156,7 +144,7 @@ version = "0.5.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
  "crossbeam-utils",
 ]
 
@@ -166,7 +154,7 @@ version = "0.8.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
  "lazy_static",
 ]
 
@@ -206,7 +194,7 @@ version = "1.0.22"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
  "crc32fast",
  "libc",
  "miniz_oxide",
@@ -314,16 +302,16 @@ version = "0.2.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
  "libc",
  "wasi",
 ]
 
 [[package]]
 name = "glob"
-version = "0.2.11"
+version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb"
+checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
 
 [[package]]
 name = "hashbrown"
@@ -355,6 +343,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "heck"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
+
+[[package]]
 name = "hermit-abi"
 version = "0.1.19"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -386,7 +380,7 @@ version = "0.1.12"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
 ]
 
 [[package]]
@@ -415,18 +409,9 @@ checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
 
 [[package]]
 name = "lock_api"
-version = "0.3.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
-dependencies = [
- "scopeguard",
-]
-
-[[package]]
-name = "lock_api"
-version = "0.4.5"
+version = "0.4.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
+checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b"
 dependencies = [
  "scopeguard",
 ]
@@ -437,7 +422,7 @@ version = "0.4.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
 ]
 
 [[package]]
@@ -448,26 +433,26 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
 
 [[package]]
 name = "metered"
-version = "0.4.0"
+version = "0.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b2c319dbe936d3b6b12d592e1061cade60305169012de8d2d3a86cc622d6a37a"
+checksum = "3f36652b6bf9c822adae73dc00def57f8ace762f14428339b2a7dd6589d372ff"
 dependencies = [
  "aspect",
  "atomic",
  "hdrhistogram",
  "metered-macro",
- "parking_lot 0.10.2",
+ "parking_lot",
  "serde",
 ]
 
 [[package]]
 name = "metered-macro"
-version = "0.4.0"
+version = "0.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3315377bb2a24f4fae8ecf3730253d5862517373be232e9bd512bfed2f05425d"
+checksum = "1e7a3760cebfa88dc786e9a75654e01297b28a0ab28cd94e686a42f96823b9ff"
 dependencies = [
  "aspect-weave",
- "heck",
+ "heck 0.3.3",
  "indexmap",
  "proc-macro2",
  "quote",
@@ -479,6 +464,7 @@ dependencies = [
 name = "microredis"
 version = "0.1.0"
 dependencies = [
+ "byteorder",
  "bytes",
  "crc32fast",
  "env_logger",
@@ -486,13 +472,16 @@ dependencies = [
  "glob",
  "log",
  "metered",
- "parking_lot 0.11.2",
+ "parking_lot",
  "rand",
  "redis-zero-protocol-parser",
  "seahash",
  "serde",
  "serde_json",
  "serde_prometheus",
+ "strum",
+ "strum_macros",
+ "thiserror",
  "tokio",
  "tokio-stream",
  "tokio-util",
@@ -583,37 +572,13 @@ checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
 
 [[package]]
 name = "parking_lot"
-version = "0.10.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
-dependencies = [
- "lock_api 0.3.4",
- "parking_lot_core 0.7.2",
-]
-
-[[package]]
-name = "parking_lot"
 version = "0.11.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
 dependencies = [
  "instant",
- "lock_api 0.4.5",
- "parking_lot_core 0.8.5",
-]
-
-[[package]]
-name = "parking_lot_core"
-version = "0.7.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
-dependencies = [
- "cfg-if 0.1.10",
- "cloudabi",
- "libc",
- "redox_syscall 0.1.57",
- "smallvec",
- "winapi",
+ "lock_api",
+ "parking_lot_core",
 ]
 
 [[package]]
@@ -622,10 +587,10 @@ version = "0.8.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
  "instant",
  "libc",
- "redox_syscall 0.2.10",
+ "redox_syscall",
  "smallvec",
  "winapi",
 ]
@@ -714,12 +679,6 @@ checksum = "299d79f6c9095164339b8ed3c47951772538a375e6811e76ffe9a4544f2cdbf5"
 
 [[package]]
 name = "redox_syscall"
-version = "0.1.57"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
-
-[[package]]
-name = "redox_syscall"
 version = "0.2.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
@@ -745,6 +704,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
 
 [[package]]
+name = "rustversion"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f"
+
+[[package]]
 name = "ryu"
 version = "1.0.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -764,18 +729,18 @@ checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
 
 [[package]]
 name = "serde"
-version = "1.0.132"
+version = "1.0.136"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008"
+checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.132"
+version = "1.0.136"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276"
+checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -851,6 +816,25 @@ dependencies = [
 ]
 
 [[package]]
+name = "strum"
+version = "0.24.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8"
+
+[[package]]
+name = "strum_macros"
+version = "0.24.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef"
+dependencies = [
+ "heck 0.4.0",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn",
+]
+
+[[package]]
 name = "syn"
 version = "1.0.84"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -882,6 +866,26 @@ dependencies = [
 ]
 
 [[package]]
+name = "thiserror"
+version = "1.0.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417"
+dependencies = [
+ "thiserror-impl",
+]
+
+[[package]]
+name = "thiserror-impl"
+version = "1.0.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
 name = "tokio"
 version = "1.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -893,7 +897,7 @@ dependencies = [
  "mio",
  "num_cpus",
  "once_cell",
- "parking_lot 0.11.2",
+ "parking_lot",
  "pin-project-lite",
  "signal-hook-registry",
  "tokio-macros",
@@ -945,7 +949,7 @@ version = "0.1.29"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
  "pin-project-lite",
  "tracing-core",
 ]

+ 11 - 7
Cargo.toml

@@ -7,20 +7,24 @@ edition = "2018"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+byteorder = "1.2.2"
 redis-zero-protocol-parser = "^0.2"
 tokio={version="1", features = ["full", "tracing"] }
-parking_lot="^0.11"
+parking_lot="0.11.2"
 tokio-util={version="^0.6", features = ["full"] }
-crc32fast="^1.2"
+crc32fast="1.3.2"
 futures = { version = "0.3.0", features = ["thread-pool"]}
 tokio-stream="0.1"
 seahash = "4"
 log="0.4"
-glob="^0.2"
-metered="^0.4"
-serde="^1.0"
-serde_json = "^1.0"
-serde_prometheus="^0.1"
+glob="0.3.0"
+metered="0.8.0"
+serde="1.0.136"
+serde_json = "1.0.70"
+serde_prometheus="0.1.6"
 env_logger = "0.8.4"
 bytes = "1"
 rand = "0.8.0"
+thiserror = "1.0.30"
+strum = "0.24"
+strum_macros = "0.24"

+ 48 - 0
src/cmd/help.rs

@@ -0,0 +1,48 @@
+//! # All help text to keep other controllers clean
+use crate::{error::Error, value::Value};
+
+fn convert_to_result(text: &[&str]) -> Result<Value, Error> {
+    Ok(Value::Array(
+        text.iter()
+            .map(|text| Value::String(text.to_string()))
+            .collect(),
+    ))
+}
+
+/// Help text for OBJECT command
+pub fn object() -> Result<Value, Error> {
+    convert_to_result(&[
+        "OBJECT <subcommand> arg arg ... arg. Subcommands are:",
+        "ENCODING <key> -- Return the kind of internal representation used in order to store the value associated with a key.",
+        "FREQ <key> -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.",
+        "IDLETIME <key> -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.",
+        "REFCOUNT <key> -- Return the number of references of the value associated with the specified key.",
+    ])
+}
+
+/// Help text for PUBSUB command
+pub fn pubsub() -> Result<Value, Error> {
+    convert_to_result(&[
+        "PUBSUB <subcommand> arg arg ... arg. Subcommands are:",
+        "CHANNELS [<pattern>] -- Return the currently active channels matching a pattern (default: all).",
+        "NUMPAT -- Return number of subscriptions to patterns.",
+        "NUMSUB [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).",
+    ])
+}
+
+/// Help text for COMMAND command
+pub fn command() -> Result<Value, Error> {
+    convert_to_result(&[
+        "COMMAND <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
+        "(no subcommand)",
+        "\tReturn details about all Redis commands",
+        "COUNT",
+        "\tReturn the total number of commands in this Redis server.",
+        "GETKEYS <full-command>",
+        "\tReturn the keys from a full Redis command.",
+        "INFO [<command-name> ...]",
+        "Return details about multiple Redis commands.",
+        "HELP",
+        "\tPrints this help.",
+    ])
+}

+ 310 - 19
src/cmd/key.rs

@@ -1,12 +1,65 @@
 //! # Key-related command handlers
 use super::now;
 use crate::{
-    check_arg, connection::Connection, error::Error, value::bytes_to_number, value::Value,
+    check_arg,
+    connection::Connection,
+    db::scan::Scan,
+    error::Error,
+    value::bytes_to_number,
+    value::{cursor::Cursor, Value},
 };
 use bytes::Bytes;
+use std::convert::TryInto;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tokio::time::{Duration, Instant};
 
+/// This command copies the value stored at the source key to the destination
+/// key.
+///
+/// By default, the destination key is created in the logical database used by
+/// the connection. The DB option allows specifying an alternative logical
+/// database index for the destination key.
+///
+/// The command returns an error when the destination key already exists. The
+/// REPLACE option removes the destination key before copying the value to it.
+pub async fn copy(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let mut skip = 3;
+    let target_db = if args.len() > 4 && check_arg!(args, 3, "DB") {
+        skip += 2;
+        Some(
+            conn.all_connections()
+                .get_databases()
+                .get(bytes_to_number(&args[4])?)?
+                .clone(),
+        )
+    } else {
+        None
+    };
+    let replace = match args
+        .get(skip)
+        .map(|m| String::from_utf8_lossy(m).to_uppercase())
+    {
+        Some(value) => {
+            if value == "REPLACE" {
+                true
+            } else {
+                return Err(Error::Syntax);
+            }
+        }
+        None => false,
+    };
+    let result = if conn
+        .db()
+        .copy(&args[1], &args[2], replace.into(), target_db)?
+    {
+        1
+    } else {
+        0
+    };
+
+    Ok(result.into())
+}
+
 /// Removes the specified keys. A key is ignored if it does not exist.
 pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().del(&args[1..]))
@@ -14,7 +67,7 @@ pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
 /// Returns if key exists.
 pub async fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    Ok(conn.db().exists(&args[1..]))
+    Ok(conn.db().exists(&args[1..]).into())
 }
 
 /// Set a timeout on key. After the timeout has expired, the key will automatically be deleted. A
@@ -48,6 +101,13 @@ pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
+/// Returns the string representation of the type of the value stored at key.
+/// The different types that can be returned are: string, list, set, zset, hash
+/// and stream.
+pub async fn data_type(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(conn.db().get_data_type(&args[1]).into())
+}
+
 /// EXPIREAT has the same effect and semantic as EXPIRE, but instead of specifying the number of
 /// seconds representing the TTL (time to live), it takes an absolute Unix timestamp (seconds since
 /// January 1, 1970). A timestamp in the past will delete the key immediately.
@@ -76,17 +136,18 @@ pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
-/// Returns the remaining time to live of a key that has a timeout. This introspection capability
-/// allows a Redis client to check how many seconds a given key will continue to be part of the
-/// dataset.
-pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+/// Returns the absolute Unix timestamp (since January 1, 1970) in seconds at which the given key
+/// will expire.
+pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
-            let ttl = ttl - Instant::now();
-            if check_arg!(args, 0, "TTL") {
-                ttl.as_secs() as i64
+            // Is there a better way? There should be!
+            if check_arg!(args, 0, "EXPIRETIME") {
+                let secs: i64 = (ttl - Instant::now()).as_secs() as i64;
+                secs + (now().as_secs() as i64)
             } else {
-                ttl.as_millis() as i64
+                let secs: i64 = (ttl - Instant::now()).as_millis() as i64;
+                secs + (now().as_millis() as i64)
             }
         }
         Some(None) => -1,
@@ -96,18 +157,88 @@ pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(ttl.into())
 }
 
-/// Returns the absolute Unix timestamp (since January 1, 1970) in seconds at which the given key
-/// will expire.
-pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+/// Returns all keys that matches a given pattern
+pub async fn keys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(conn.db().get_all_keys(&args[1])?.into())
+}
+
+/// Move key from the currently selected database (see SELECT) to the specified
+/// destination database. When key already exists in the destination database,
+/// or it does not exist in the source database, it does nothing. It is possible
+/// to use MOVE as a locking primitive because of this.
+pub async fn move_key(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let target_db = conn
+        .all_connections()
+        .get_databases()
+        .get(bytes_to_number(&args[2])?)?;
+
+    Ok(if conn.db().move_key(&args[1], target_db)? {
+        1.into()
+    } else {
+        0.into()
+    })
+}
+
+/// Return information about the object/value stored in the database
+pub async fn object(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let subcommand = String::from_utf8_lossy(&args[1]).to_lowercase();
+
+    let expected_args = if subcommand == "help" { 2 } else { 3 };
+
+    if expected_args != args.len() {
+        return Err(Error::SubCommandNotFound(
+            subcommand.into(),
+            String::from_utf8_lossy(&args[0]).into(),
+        ));
+    }
+
+    match subcommand.as_str() {
+        "help" => super::help::object(),
+        "refcount" => Ok(if conn.db().exists(&[args[2].clone()]) == 1 {
+            1.into()
+        } else {
+            Value::Null
+        }),
+        _ => Err(Error::SubCommandNotFound(
+            subcommand.into(),
+            String::from_utf8_lossy(&args[0]).into(),
+        )),
+    }
+}
+
+/// Renames key to newkey. It returns an error when key does not exist. If
+/// newkey already exists it is overwritten, when this happens RENAME executes
+/// an implicit DEL operation, so if the deleted key contains a very big value
+/// it may cause high latency even if RENAME itself is usually a constant-time
+/// operation.
+pub async fn rename(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let is_rename = check_arg!(args, 0, "RENAME");
+    if conn.db().rename(&args[1], &args[2], is_rename.into())? {
+        Ok(if is_rename { Value::Ok } else { 1.into() })
+    } else {
+        Ok(0.into())
+    }
+}
+
+/// SCAN is a cursor based iterator. This means that at every call of the
+/// command, the server returns an updated cursor that the user needs to use as
+/// the cursor argument in the next call.
+pub async fn scan(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let cursor: Cursor = (&args[1]).try_into()?;
+    Ok(conn.db().scan(cursor, None, None, None)?.into())
+}
+
+/// Returns the remaining time to live of a key that has a timeout. This introspection capability
+/// allows a Redis client to check how many seconds a given key will continue to be part of the
+/// dataset.
+pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
-            // Is there a better way? There should be!
-            if check_arg!(args, 0, "EXPIRETIME") {
-                let secs: i64 = (ttl - Instant::now()).as_secs() as i64;
-                secs + (now().as_secs() as i64)
+            let ttl = ttl - Instant::now();
+            if check_arg!(args, 0, "TTL") {
+                ttl.as_secs() as i64
             } else {
-                let secs: i64 = (ttl - Instant::now()).as_millis() as i64;
-                secs + (now().as_millis() as i64)
+                ttl.as_millis() as i64
             }
         }
         Some(None) => -1,
@@ -127,6 +258,7 @@ pub async fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
 mod test {
     use crate::{
         cmd::test::{create_connection, run_command},
+        error::Error,
         value::Value,
     };
 
@@ -154,6 +286,25 @@ mod test {
             run_command(&c, &["exists", "foo"]).await
         );
     }
+    #[tokio::test]
+    async fn _type() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["incr", "foo"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(1)),
+            run_command(&c, &["hset", "hash", "foo", "bar"]).await
+        );
+        assert_eq!(
+            Ok(Value::Integer(2)),
+            run_command(&c, &["sadd", "set", "foo", "bar"]).await
+        );
+        assert_eq!(Ok("set".into()), run_command(&c, &["type", "set"]).await);
+        assert_eq!(Ok("hash".into()), run_command(&c, &["type", "hash"]).await);
+        assert_eq!(Ok("string".into()), run_command(&c, &["type", "foo"]).await);
+    }
 
     #[tokio::test]
     async fn expire_and_persist() {
@@ -191,4 +342,144 @@ mod test {
             run_command(&c, &["pttl", "foo"]).await
         );
     }
+
+    #[tokio::test]
+    async fn copy() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(Ok(1.into()), run_command(&c, &["copy", "foo", "bar"]).await);
+        assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(Ok(0.into()), run_command(&c, &["copy", "foo", "bar"]).await);
+        assert_eq!(
+            Ok(Value::Array(vec!["2".into(), "1".into()])),
+            run_command(&c, &["mget", "foo", "bar"]).await
+        );
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c, &["copy", "foo", "bar", "replace"]).await
+        );
+        assert_eq!(
+            Ok(Value::Array(vec!["2".into(), "2".into()])),
+            run_command(&c, &["mget", "foo", "bar"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn copy_different_db() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
+        );
+        assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(
+            Ok(0.into()),
+            run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
+        );
+        assert_eq!(
+            Ok(Value::Array(vec!["2".into(), Value::Null])),
+            run_command(&c, &["mget", "foo", "bar"]).await
+        );
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Null, "1".into()])),
+            run_command(&c, &["mget", "foo", "bar"]).await
+        );
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "0"]).await);
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c, &["copy", "foo", "bar", "db", "2", "replace"]).await
+        );
+        assert_eq!(
+            Ok(Value::Array(vec!["2".into(), Value::Null])),
+            run_command(&c, &["mget", "foo", "bar"]).await
+        );
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
+        assert_eq!(
+            Ok(Value::Array(vec![Value::Null, "2".into()])),
+            run_command(&c, &["mget", "foo", "bar"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn copy_same_db() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(
+            Err(Error::SameEntry),
+            run_command(&c, &["copy", "foo", "foo"]).await
+        );
+        assert_eq!(
+            Err(Error::SameEntry),
+            run_command(&c, &["copy", "foo", "foo", "db", "0"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn _move() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(Ok(1.into()), run_command(&c, &["move", "foo", "2"]).await);
+        assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]).await);
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(Ok(0.into()), run_command(&c, &["move", "foo", "2"]).await);
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
+        assert_eq!(Ok("1".into()), run_command(&c, &["get", "foo"]).await);
+    }
+
+    #[tokio::test]
+    async fn _move_same_db() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(
+            Err(Error::SameEntry),
+            run_command(&c, &["move", "foo", "0"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn rename() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(
+            Ok(Value::Ok),
+            run_command(&c, &["rename", "foo", "bar-1650"]).await
+        );
+        assert_eq!(
+            Ok(Value::Ok),
+            run_command(&c, &["rename", "bar-1650", "xxx"]).await
+        );
+        assert_eq!(
+            Err(Error::NotFound),
+            run_command(&c, &["rename", "foo", "bar"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn renamenx() {
+        let c = create_connection();
+        assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c, &["renamenx", "foo", "bar-1650"]).await
+        );
+        assert_eq!(
+            Ok(1.into()),
+            run_command(&c, &["renamenx", "bar-1650", "xxx"]).await
+        );
+        assert_eq!(
+            Err(Error::NotFound),
+            run_command(&c, &["renamenx", "foo", "bar"]).await
+        );
+
+        assert_eq!(
+            Ok(Value::Ok),
+            run_command(&c, &["set", "bar-1650", "xxx"]).await
+        );
+        assert_eq!(
+            Ok(0.into()),
+            run_command(&c, &["renamenx", "xxx", "bar-1650"]).await
+        );
+    }
 }

+ 1 - 0
src/cmd/mod.rs

@@ -4,6 +4,7 @@ use tokio::time::{Duration, Instant};
 
 pub mod client;
 pub mod hash;
+pub mod help;
 pub mod key;
 pub mod list;
 pub mod metrics;

+ 5 - 7
src/cmd/pubsub.rs

@@ -18,12 +18,7 @@ pub async fn pubsub(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                 .map(|v| Value::new(&v))
                 .collect(),
         )),
-        "help" => Ok(Value::Array(vec![
-            Value::String("PUBSUB <subcommand> arg arg ... arg. Subcommands are:".to_owned()),
-            Value::String("CHANNELS [<pattern>] -- Return the currently active channels matching a pattern (default: all).".to_owned()),
-            Value::String("NUMPAT -- Return number of subscriptions to patterns.".to_owned()),
-            Value::String("NUMSUB [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).".to_owned()),
-        ])),
+        "help" => super::help::pubsub(),
         "numpat" => Ok(conn.pubsub().get_number_of_psubscribers().into()),
         "numsub" => Ok(conn
             .pubsub()
@@ -33,7 +28,10 @@ pub async fn pubsub(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             .flatten()
             .collect::<Vec<Value>>()
             .into()),
-        cmd => Err(Error::SubCommandNotFound(cmd.into(), String::from_utf8_lossy(&args[0]).into())),
+        cmd => Err(Error::SubCommandNotFound(
+            cmd.into(),
+            String::from_utf8_lossy(&args[0]).into(),
+        )),
     }
 }
 

+ 1 - 15
src/cmd/server.rs

@@ -54,21 +54,7 @@ pub async fn command(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
                     .collect(),
             ))
         }
-        "help" => Ok(Value::Array(vec![
-            Value::String(
-                "COMMAND <subcommand> [<arg> [value] [opt] ...]. Subcommands are:".into(),
-            ),
-            Value::String("(no subcommand)".into()),
-            Value::String("\tReturn details about all Redis commands".into()),
-            Value::String("COUNT".into()),
-            Value::String("\tReturn the total number of commands in this Redis server.".into()),
-            Value::String("GETKEYS <full-command>".into()),
-            Value::String("\tReturn the keys from a full Redis command.".into()),
-            Value::String("INFO [<command-name> ...]".into()),
-            Value::String("Return details about multiple Redis commands.".into()),
-            Value::String("HELP".into()),
-            Value::String("\tPrints this help.".into()),
-        ])),
+        "help" => super::help::command(),
         cmd => Err(Error::SubCommandNotFound(
             cmd.into(),
             String::from_utf8_lossy(&args[0]).into(),

+ 4 - 0
src/db/entry.rs

@@ -40,6 +40,10 @@ impl Entry {
         self.expires_at = None;
     }
 
+    pub fn clone(&self) -> Self {
+        Self::new(self.value.clone(), self.expires_at.clone())
+    }
+
     pub fn get_ttl(&self) -> Option<Instant> {
         self.expires_at
     }

+ 306 - 6
src/db/mod.rs

@@ -5,18 +5,27 @@
 mod entry;
 mod expiration;
 pub mod pool;
-
-use crate::{error::Error, value::Value};
+pub mod scan;
+
+use crate::{
+    error::Error,
+    value::{
+        cursor::Cursor,
+        typ::{Typ, ValueTyp},
+        Value,
+    },
+};
 use bytes::{BufMut, Bytes, BytesMut};
 use entry::{new_version, Entry};
 use expiration::ExpirationDb;
+use glob::Pattern;
 use log::trace;
 use parking_lot::{Mutex, RwLock};
 use seahash::hash;
 use std::{
     collections::HashMap,
     convert::{TryFrom, TryInto},
-    ops::AddAssign,
+    ops::{AddAssign, Deref},
     sync::Arc,
     thread,
 };
@@ -83,6 +92,10 @@ pub struct Db {
     /// Number of HashMaps that are available.
     number_of_slots: usize,
 
+    /// Databases unique ID. This is an internal identifier to avoid deadlocks
+    /// when copying and moving data between databases.
+    pub db_id: u128,
+
     /// Current connection  ID
     ///
     /// A Database is attached to a conn_id. The slots and expiration data
@@ -109,6 +122,7 @@ impl Db {
             slots: Arc::new(slots),
             expirations: Arc::new(Mutex::new(ExpirationDb::new())),
             conn_id: 0,
+            db_id: new_version(),
             tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
             number_of_slots,
         }
@@ -125,6 +139,7 @@ impl Db {
             tx_key_locks: self.tx_key_locks.clone(),
             expirations: self.expirations.clone(),
             conn_id,
+            db_id: self.db_id,
             number_of_slots: self.number_of_slots,
         })
     }
@@ -310,6 +325,121 @@ impl Db {
         }
     }
 
+    /// Copies a key
+    pub fn copy(
+        &self,
+        source: &Bytes,
+        target: &Bytes,
+        replace: Override,
+        target_db: Option<Arc<Db>>,
+    ) -> Result<bool, Error> {
+        let slots = self.slots[self.get_slot(source)].read();
+        let value = if let Some(value) = slots.get(source).filter(|x| x.is_valid()) {
+            value.clone()
+        } else {
+            return Ok(false);
+        };
+        drop(slots);
+
+        if let Some(db) = target_db {
+            if db.db_id == self.db_id && source == target {
+                return Err(Error::SameEntry);
+            }
+            if replace == Override::No && db.exists(&[target.clone()]) > 0 {
+                return Ok(false);
+            }
+            let _ = db.set_advanced(
+                target,
+                value.value.clone(),
+                value.get_ttl().map(|v| v - Instant::now()),
+                replace,
+                false,
+                false,
+            );
+            Ok(true)
+        } else {
+            if source == target {
+                return Err(Error::SameEntry);
+            }
+
+            if replace == Override::No && self.exists(&[target.clone()]) > 0 {
+                return Ok(false);
+            }
+            let mut slots = self.slots[self.get_slot(target)].write();
+            slots.insert(target.clone(), value);
+
+            Ok(true)
+        }
+    }
+
+    /// Moves a given key between databases
+    pub fn move_key(&self, source: &Bytes, target_db: Arc<Db>) -> Result<bool, Error> {
+        if self.db_id == target_db.db_id {
+            return Err(Error::SameEntry);
+        }
+        let mut slot = self.slots[self.get_slot(source)].write();
+        let (expires_in, value) = if let Some(value) = slot.get(source).filter(|v| v.is_valid()) {
+            (
+                value.get_ttl().map(|t| t - Instant::now()),
+                value.value.clone(),
+            )
+        } else {
+            return Ok(false);
+        };
+
+        if Value::Integer(1)
+            == target_db.set_advanced(&source, value, expires_in, Override::No, false, false)
+        {
+            slot.remove(source);
+            Ok(true)
+        } else {
+            Ok(false)
+        }
+    }
+
+    /// Renames a key
+    pub fn rename(
+        &self,
+        source: &Bytes,
+        target: &Bytes,
+        override_value: Override,
+    ) -> Result<bool, Error> {
+        let slot1 = self.get_slot(source);
+        let slot2 = self.get_slot(target);
+
+        if slot1 == slot2 {
+            let mut slot = self.slots[slot1].write();
+
+            if let Some(value) = slot.remove(source) {
+                Ok(
+                    if override_value == Override::No && slot.get(target).is_some() {
+                        false
+                    } else {
+                        slot.insert(target.clone(), value);
+                        true
+                    },
+                )
+            } else {
+                Err(Error::NotFound)
+            }
+        } else {
+            let mut slot1 = self.slots[slot1].write();
+            let mut slot2 = self.slots[slot2].write();
+            if let Some(value) = slot1.remove(source) {
+                Ok(
+                    if override_value == Override::No && slot2.get(target).is_some() {
+                        false
+                    } else {
+                        slot2.insert(target.clone(), value);
+                        true
+                    },
+                )
+            } else {
+                Err(Error::NotFound)
+            }
+        }
+    }
+
     /// Removes keys from the database
     pub fn del(&self, keys: &[Bytes]) -> Value {
         let mut expirations = self.expirations.lock();
@@ -324,8 +454,30 @@ impl Db {
             .into()
     }
 
+    /// Returns all keys that matches a given pattern. This is a very expensive command.
+    pub fn get_all_keys(&self, pattern: &Bytes) -> Result<Vec<Value>, Error> {
+        let pattern = String::from_utf8_lossy(pattern);
+        let pattern =
+            Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))?;
+        Ok(self
+            .slots
+            .iter()
+            .map(|slot| {
+                slot.read()
+                    .keys()
+                    .filter(|key| {
+                        let str_key = String::from_utf8_lossy(key);
+                        pattern.matches(&str_key)
+                    })
+                    .map(|key| Value::new(key))
+                    .collect::<Vec<Value>>()
+            })
+            .flatten()
+            .collect())
+    }
+
     /// Check if keys exists in the database
-    pub fn exists(&self, keys: &[Bytes]) -> Value {
+    pub fn exists(&self, keys: &[Bytes]) -> usize {
         let mut matches = 0;
         keys.iter()
             .map(|key| {
@@ -336,7 +488,7 @@ impl Db {
             })
             .for_each(drop);
 
-        matches.into()
+        matches
     }
 
     /// get_map_or
@@ -391,6 +543,17 @@ impl Db {
             .unwrap_or_else(new_version)
     }
 
+    /// Returns the name of the value type
+    pub fn get_data_type(&self, key: &Bytes) -> String {
+        let slots = self.slots[self.get_slot(key)].read();
+        slots
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map_or("none".to_owned(), |x| {
+                Typ::get_type(x.get()).to_string().to_lowercase()
+            })
+    }
+
     /// Get a copy of an entry
     pub fn get(&self, key: &Bytes) -> Value {
         let slots = self.slots[self.get_slot(key)].read();
@@ -624,10 +787,82 @@ impl Db {
     }
 }
 
+impl scan::Scan for Db {
+    fn scan(
+        &self,
+        cursor: Cursor,
+        pattern: Option<&Bytes>,
+        count: Option<usize>,
+        typ: Option<Typ>,
+    ) -> Result<scan::Result, Error> {
+        let mut keys = vec![];
+        let mut slot_id = cursor.bucket as usize;
+        let mut last_pos = cursor.last_position as usize;
+        let pattern = pattern
+            .map(|pattern| {
+                let pattern = String::from_utf8_lossy(pattern);
+                Pattern::new(&pattern).map_err(|_| Error::InvalidPattern(pattern.to_string()))
+            })
+            .transpose()?;
+
+        loop {
+            let slot = if let Some(value) = self.slots.get(slot_id) {
+                value.read()
+            } else {
+                // We iterated through all the entries, time to signal that to
+                // the client but returning a "0" cursor.
+                slot_id = 0;
+                last_pos = 0;
+                break;
+            };
+
+            for (key, value) in slot.iter().skip(last_pos) {
+                if !value.is_valid() {
+                    // Entry still exists in memory but it is not longer valid
+                    // and will soon be gargabe collected.
+                    last_pos += 1;
+                    continue;
+                }
+                if let Some(pattern) = &pattern {
+                    let str_key = String::from_utf8_lossy(key);
+                    if !pattern.matches(&str_key) {
+                        last_pos += 1;
+                        continue;
+                    }
+                }
+                if let Some(typ) = &typ {
+                    if !typ.is_value_type(value.get()) {
+                        last_pos += 1;
+                        continue;
+                    }
+                }
+                keys.push(Value::new(key));
+                last_pos += 1;
+                if keys.len() == count.unwrap_or(10) {
+                    break;
+                }
+            }
+
+            if keys.len() == count.unwrap_or(10) {
+                break;
+            }
+
+            last_pos = 0;
+            slot_id += 1;
+        }
+
+        Ok(scan::Result {
+            cursor: Cursor::new(slot_id as u16, last_pos as u64)?,
+            result: keys,
+        })
+    }
+}
+
 #[cfg(test)]
 mod test {
     use super::*;
-    use crate::bytes;
+    use crate::{bytes, db::scan::Scan};
+    use std::str::FromStr;
 
     #[test]
     fn incr_wrong_type() {
@@ -763,4 +998,69 @@ mod test {
         // Purge should return 0 as the expired key has been removed already
         assert_eq!(0, db.purge());
     }
+
+    #[test]
+    fn scan_skip_expired() {
+        let db = Db::new(100);
+        db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
+        db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
+        for i in 0u64..20u64 {
+            let key: Bytes = i.to_string().into();
+            db.set(&key, Value::Ok, None);
+        }
+        let result = db
+            .scan(Cursor::from_str("0").unwrap(), None, None, None)
+            .unwrap();
+        // first 10 records
+        assert_eq!(10, result.result.len());
+        // make sure the cursor is valid
+        assert_ne!("0", result.cursor.to_string());
+        let result = db.scan(result.cursor, None, None, None).unwrap();
+        // 10 more records
+        assert_eq!(10, result.result.len());
+        // make sure the cursor is valid
+        assert_ne!("0", result.cursor.to_string());
+
+        let result = db.scan(result.cursor, None, None, None).unwrap();
+        // No more results!
+        assert_eq!(0, result.result.len());
+        assert_eq!("0", result.cursor.to_string());
+    }
+
+    #[test]
+    fn scan_limit() {
+        let db = Db::new(10);
+        db.set(&bytes!(b"one"), Value::Ok, Some(Duration::from_secs(0)));
+        db.set(&bytes!(b"two"), Value::Ok, Some(Duration::from_secs(0)));
+        for i in 0u64..2000u64 {
+            let key: Bytes = i.to_string().into();
+            db.set(&key, Value::Ok, None);
+        }
+        let result = db
+            .scan(Cursor::from_str("0").unwrap(), None, Some(2), None)
+            .unwrap();
+        assert_eq!(2, result.result.len());
+        assert_ne!("0", result.cursor.to_string());
+    }
+
+    #[test]
+    fn scan_filter() {
+        let db = Db::new(100);
+        db.set(&bytes!(b"fone"), Value::Ok, None);
+        db.set(&bytes!(b"ftwo"), Value::Ok, None);
+        for i in 0u64..20u64 {
+            let key: Bytes = i.to_string().into();
+            db.set(&key, Value::Ok, None);
+        }
+        let result = db
+            .scan(
+                Cursor::from_str("0").unwrap(),
+                Some(&bytes!(b"f*")),
+                None,
+                None,
+            )
+            .unwrap();
+        assert_eq!(2, result.result.len());
+        assert_eq!("0", result.cursor.to_string());
+    }
 }

+ 36 - 0
src/db/scan.rs

@@ -0,0 +1,36 @@
+//! # Scan trait
+//!
+//! This trait must be implemented for all data structure that implements
+//! SCAN-like commands.
+use crate::{
+    error::Error,
+    value::{cursor::Cursor, typ::Typ, Value},
+};
+use bytes::Bytes;
+
+/// Result of an scan
+pub struct Result {
+    /// New cursor to be used in the next request
+    pub cursor: Cursor,
+    /// Result
+    pub result: Vec<Value>,
+}
+
+impl From<Result> for Value {
+    fn from(v: Result) -> Value {
+        Value::Array(vec![v.cursor.to_string().into(), Value::Array(v.result)])
+    }
+}
+
+/// Scan trait
+pub trait Scan {
+    /// Scans the current data struct and returns a sub-set of the result and a
+    /// cursor to continue where it left off.
+    fn scan(
+        &self,
+        cursor: Cursor,
+        pattern: Option<&Bytes>,
+        count: Option<usize>,
+        typ: Option<Typ>,
+    ) -> std::result::Result<Result, Error>;
+}

+ 181 - 100
src/dispatcher/mod.rs

@@ -21,7 +21,7 @@ pub mod command;
 /// Returns the server time
 dispatcher! {
     set {
-        sadd {
+        SADD {
             cmd::set::sadd,
             [Flag::Write Flag::DenyOom Flag::Fast],
             -3,
@@ -30,7 +30,7 @@ dispatcher! {
             1,
             true,
         },
-        scard {
+        SCARD {
             cmd::set::scard,
             [Flag::ReadOnly Flag::Fast],
             2,
@@ -39,7 +39,7 @@ dispatcher! {
             1,
             true,
         },
-        sdiff {
+        SDIFF {
             cmd::set::sdiff,
             [Flag::ReadOnly Flag::SortForScript],
             -2,
@@ -48,7 +48,7 @@ dispatcher! {
             1,
             true,
         },
-        sdiffstore {
+        SDIFFSTORE {
             cmd::set::sdiffstore,
             [Flag::Write Flag::DenyOom],
             -3,
@@ -57,7 +57,7 @@ dispatcher! {
             1,
             true,
         },
-        sinter {
+        SINTER {
             cmd::set::sinter,
             [Flag::ReadOnly Flag::SortForScript],
             -2,
@@ -66,7 +66,7 @@ dispatcher! {
             1,
             true,
         },
-        sintercard {
+        SINTERCARD {
             cmd::set::sintercard,
             [Flag::ReadOnly],
             -2,
@@ -75,7 +75,7 @@ dispatcher! {
             1,
             true,
         },
-        sinterstore {
+        SINTERSTORE {
             cmd::set::sinterstore,
             [Flag::Write Flag::DenyOom],
             -3,
@@ -84,7 +84,7 @@ dispatcher! {
             1,
             true,
         },
-        sismember {
+        SISMEMBER {
             cmd::set::sismember,
             [Flag::ReadOnly Flag::Fast],
             3,
@@ -93,7 +93,7 @@ dispatcher! {
             1,
             true,
         },
-        smembers {
+        SMEMBERS {
             cmd::set::smembers,
             [Flag::ReadOnly Flag::SortForScript],
             2,
@@ -102,7 +102,7 @@ dispatcher! {
             1,
             true,
         },
-        smismember {
+        SMISMEMBER {
             cmd::set::smismember,
             [Flag::ReadOnly Flag::Fast],
             -3,
@@ -111,7 +111,7 @@ dispatcher! {
             1,
             true,
         },
-        smove {
+        SMOVE {
             cmd::set::smove,
             [Flag::Write Flag::Fast],
             4,
@@ -120,7 +120,7 @@ dispatcher! {
             1,
             true,
         },
-        spop {
+        SPOP {
             cmd::set::spop,
             [Flag::Write Flag::Random Flag::Fast],
             -2,
@@ -129,7 +129,7 @@ dispatcher! {
             1,
             true,
         },
-        srandmember {
+        SRANDMEMBER {
             cmd::set::srandmember,
             [Flag::ReadOnly Flag::Random],
             -2,
@@ -138,7 +138,7 @@ dispatcher! {
             1,
             true,
         },
-        srem {
+        SREM {
             cmd::set::srem,
             [Flag::Write Flag::Fast],
             -3,
@@ -147,7 +147,7 @@ dispatcher! {
             1,
             true,
         },
-        sunion {
+        SUNION {
             cmd::set::sunion,
             [Flag::ReadOnly Flag::SortForScript],
             -2,
@@ -156,7 +156,7 @@ dispatcher! {
             1,
             true,
         },
-        sunionstore {
+        SUNIONSTORE {
             cmd::set::sunionstore,
             [Flag::Write Flag::DenyOom],
             -2,
@@ -167,7 +167,7 @@ dispatcher! {
         },
     },
     metrics {
-        metrics {
+        METRICS {
             cmd::metrics::metrics,
             [Flag::ReadOnly Flag::Fast],
             -1,
@@ -178,7 +178,7 @@ dispatcher! {
         },
     },
     list {
-        blpop {
+        BLPOP {
             cmd::list::blpop,
             [Flag::Write Flag::NoScript],
             -3,
@@ -187,7 +187,7 @@ dispatcher! {
             1,
             true,
         },
-        brpop {
+        BRPOP {
             cmd::list::brpop,
             [Flag::Write Flag::NoScript],
             -3,
@@ -196,7 +196,7 @@ dispatcher! {
             1,
             true,
         },
-        lindex {
+        LINDEX {
             cmd::list::lindex,
             [Flag::ReadOnly],
             3,
@@ -205,7 +205,7 @@ dispatcher! {
             1,
             true,
         },
-        linsert {
+        LINSERT {
             cmd::list::linsert,
             [Flag::Write Flag::DenyOom],
             5,
@@ -214,7 +214,7 @@ dispatcher! {
             1,
             true,
         },
-        llen {
+        LLEN {
             cmd::list::llen,
             [Flag::ReadOnly Flag::Fast],
             2,
@@ -223,7 +223,7 @@ dispatcher! {
             1,
             true,
         },
-        lmove {
+        LMOVE {
             cmd::list::lmove,
             [Flag::Write Flag::DenyOom],
             5,
@@ -232,7 +232,7 @@ dispatcher! {
             1,
             true,
         },
-        lpop {
+        LPOP {
             cmd::list::lpop,
             [Flag::Write Flag::DenyOom],
             -2,
@@ -241,7 +241,7 @@ dispatcher! {
             1,
             true,
         },
-        lpos {
+        LPOS {
             cmd::list::lpos,
             [Flag::ReadOnly],
             -2,
@@ -250,7 +250,7 @@ dispatcher! {
             1,
             true,
         },
-        lpush {
+        LPUSH {
             cmd::list::lpush,
             [Flag::Write Flag::DenyOom Flag::Fast],
             -3,
@@ -259,7 +259,7 @@ dispatcher! {
             1,
             true,
         },
-        lpushx {
+        LPUSHX {
             cmd::list::lpush,
             [Flag::Write Flag::DenyOom Flag::Fast],
             -3,
@@ -268,7 +268,7 @@ dispatcher! {
             1,
             true,
         },
-        lrange {
+        LRANGE {
             cmd::list::lrange,
             [Flag::ReadOnly],
             4,
@@ -277,7 +277,7 @@ dispatcher! {
             1,
             true,
         },
-        lrem {
+        LREM {
             cmd::list::lrem,
             [Flag::Write],
             4,
@@ -286,7 +286,7 @@ dispatcher! {
             1,
             true,
         },
-        lset {
+        LSET {
             cmd::list::lset,
             [Flag::Write Flag::DenyOom],
             4,
@@ -295,7 +295,7 @@ dispatcher! {
             1,
             true,
         },
-        ltrim {
+        LTRIM {
             cmd::list::ltrim,
             [Flag::Write],
             4,
@@ -304,7 +304,7 @@ dispatcher! {
             1,
             true,
         },
-        rpop {
+        RPOP {
             cmd::list::rpop,
             [Flag::Write Flag::Fast],
             -2,
@@ -313,7 +313,7 @@ dispatcher! {
             1,
             true,
         },
-        rpoplpush {
+        RPOPLPUSH {
             cmd::list::rpoplpush,
             [Flag::Write Flag::DenyOom],
             3,
@@ -322,7 +322,7 @@ dispatcher! {
             1,
             true,
         },
-        rpush {
+        RPUSH {
             cmd::list::rpush,
             [Flag::Write Flag::DenyOom Flag::Fast],
             -3,
@@ -331,7 +331,7 @@ dispatcher! {
             1,
             true,
         },
-        rpushx {
+        RPUSHX {
             cmd::list::rpush,
             [Flag::Write Flag::DenyOom Flag::Fast],
             -3,
@@ -342,7 +342,7 @@ dispatcher! {
         },
     },
     hash {
-        hdel {
+        HDEL {
             cmd::hash::hdel,
             [Flag::Write Flag::Fast],
             -2,
@@ -351,7 +351,7 @@ dispatcher! {
             1,
             true,
         },
-        hexists {
+        HEXISTS {
             cmd::hash::hexists,
             [Flag::ReadOnly Flag::Fast],
             3,
@@ -360,7 +360,7 @@ dispatcher! {
             1,
             true,
         },
-        hget {
+        HGET {
             cmd::hash::hget,
             [Flag::ReadOnly Flag::Fast],
             3,
@@ -369,7 +369,7 @@ dispatcher! {
             1,
             true,
         },
-        hgetall {
+        HGETALL {
             cmd::hash::hgetall,
             [Flag::ReadOnly Flag::Random],
             2,
@@ -378,7 +378,7 @@ dispatcher! {
             1,
             true,
         },
-        hincrby {
+        HINCRBY {
             cmd::hash::hincrby::<i64>,
             [Flag::Write Flag::DenyOom Flag::Fast],
             4,
@@ -387,7 +387,7 @@ dispatcher! {
             1,
             true,
         },
-        hincrbyfloat {
+        HINCRBYFLOAT {
             cmd::hash::hincrby::<f64>,
             [Flag::Write Flag::DenyOom Flag::Fast],
             4,
@@ -396,7 +396,7 @@ dispatcher! {
             1,
             true,
         },
-        hkeys {
+        HKEYS {
             cmd::hash::hkeys,
             [Flag::ReadOnly Flag::SortForScript],
             2,
@@ -405,7 +405,7 @@ dispatcher! {
             1,
             true,
         },
-        hlen {
+        HLEN {
             cmd::hash::hlen,
             [Flag::ReadOnly Flag::Fast],
             2,
@@ -414,7 +414,7 @@ dispatcher! {
             1,
             true,
         },
-        hmget {
+        HMGET {
             cmd::hash::hmget,
             [Flag::ReadOnly Flag::Fast],
             -3,
@@ -423,7 +423,7 @@ dispatcher! {
             1,
             true,
         },
-        hmset {
+        HMSET {
             cmd::hash::hset,
             [Flag::Write Flag::DenyOom Flag::Fast],
             -3,
@@ -432,7 +432,7 @@ dispatcher! {
             1,
             true,
         },
-        hrandfield {
+        HRANDFIELD {
             cmd::hash::hrandfield,
             [Flag::ReadOnly Flag::ReadOnly],
             -2,
@@ -441,7 +441,7 @@ dispatcher! {
             1,
             true,
         },
-        hset {
+        HSET {
             cmd::hash::hset,
             [Flag::Write Flag::DenyOom Flag::Fast],
             -4,
@@ -450,7 +450,7 @@ dispatcher! {
             1,
             true,
         },
-        hsetnx {
+        HSETNX {
             cmd::hash::hsetnx,
             [Flag::Write Flag::DenyOom Flag::Fast],
             4,
@@ -459,7 +459,7 @@ dispatcher! {
             1,
             true,
         },
-        hstrlen {
+        HSTRLEN {
             cmd::hash::hstrlen,
             [Flag::ReadOnly Flag::Fast],
             3,
@@ -468,7 +468,7 @@ dispatcher! {
             1,
             true,
         },
-        hvals {
+        HVALS {
             cmd::hash::hvals,
             [Flag::ReadOnly Flag::SortForScript],
             2,
@@ -479,7 +479,16 @@ dispatcher! {
         },
     },
     keys {
-        del {
+        COPY {
+            cmd::key::copy,
+            [Flag::Write Flag::DenyOom],
+            -3,
+            1,
+            2,
+            1,
+            true,
+        },
+        DEL {
             cmd::key::del,
             [Flag::Write],
             -2,
@@ -488,7 +497,7 @@ dispatcher! {
             1,
             true,
         },
-        exists {
+        EXISTS {
             cmd::key::exists,
             [Flag::ReadOnly Flag::Fast],
             -2,
@@ -497,7 +506,7 @@ dispatcher! {
             1,
             true,
         },
-        expire {
+        EXPIRE {
             cmd::key::expire,
             [Flag::Write Flag::Fast],
             3,
@@ -506,7 +515,7 @@ dispatcher! {
             1,
             true,
         },
-        expireat {
+        EXPIREAT {
             cmd::key::expire_at,
             [Flag::Write Flag::Fast],
             3,
@@ -515,7 +524,7 @@ dispatcher! {
             1,
             true,
         },
-        expiretime {
+        EXPIRETIME {
             cmd::key::expire_time,
             [Flag::Write Flag::Fast],
             2,
@@ -524,7 +533,34 @@ dispatcher! {
             1,
             true,
         },
-        persist {
+        KEYS {
+            cmd::key::keys,
+            [Flag::ReadOnly Flag::SortForScript],
+            2,
+            0,
+            0,
+            0,
+            true,
+        },
+        MOVE {
+            cmd::key::move_key,
+            [Flag::Write Flag::Fast],
+            3,
+            1,
+            1,
+            1,
+            true,
+        },
+        OBJECT {
+            cmd::key::object,
+            [Flag::ReadOnly Flag::Random],
+            -2,
+            2,
+            2,
+            1,
+            true,
+        },
+        PERSIST {
             cmd::key::persist,
             [Flag::Write Flag::Fast],
             2,
@@ -533,7 +569,7 @@ dispatcher! {
             1,
             true,
         },
-        pexpire {
+        PEXPIRE {
             cmd::key::expire,
             [Flag::Write Flag::Fast],
             3,
@@ -542,7 +578,7 @@ dispatcher! {
             1,
             true,
         },
-        pexpireat {
+        PEXPIREAT {
             cmd::key::expire_at,
             [Flag::Write Flag::Fast],
             3,
@@ -551,7 +587,7 @@ dispatcher! {
             1,
             true,
         },
-        pexpiretime {
+        PEXPIRETIME {
             cmd::key::expire_time,
             [Flag::Write Flag::Fast],
             2,
@@ -560,7 +596,7 @@ dispatcher! {
             1,
             true,
         },
-        pttl {
+        PTTL {
             cmd::key::ttl,
             [Flag::ReadOnly Flag::Random Flag::Fast],
             2,
@@ -569,7 +605,34 @@ dispatcher! {
             1,
             true,
         },
-        ttl {
+        RENAME {
+            cmd::key::rename,
+            [Flag::Write],
+            3,
+            1,
+            2,
+            1,
+            true,
+        },
+        RENAMENX {
+            cmd::key::rename,
+            [Flag::Write Flag::Write],
+            3,
+            1,
+            2,
+            1,
+            true,
+        },
+        SCAN {
+            cmd::key::scan,
+            [Flag::ReadOnly Flag::Random],
+            -2,
+            0,
+            0,
+            0,
+            true,
+        },
+        TTL {
             cmd::key::ttl,
             [Flag::ReadOnly Flag::Random Flag::Fast],
             2,
@@ -578,9 +641,27 @@ dispatcher! {
             1,
             true,
         },
+        TYPE {
+            cmd::key::data_type,
+            [Flag::ReadOnly Flag::Fast],
+            2,
+            1,
+            1,
+            1,
+            true,
+        },
+        UNLINK {
+            cmd::key::del,
+            [Flag::Write Flag::Fast],
+            -2,
+            1,
+            -1,
+            1,
+            true,
+        },
     },
     string {
-        append {
+        APPEND {
             cmd::string::append,
             [Flag::Write Flag::DenyOom Flag::Fast],
             3,
@@ -589,7 +670,7 @@ dispatcher! {
             1,
             true,
         },
-        decr {
+        DECR {
             cmd::string::decr,
             [Flag::Write Flag::DenyOom Flag::Fast],
             2,
@@ -598,7 +679,7 @@ dispatcher! {
             1,
             true,
         },
-        decrby {
+        DECRBY {
             cmd::string::decr_by,
             [Flag::Write Flag::DenyOom Flag::Fast],
             3,
@@ -607,7 +688,7 @@ dispatcher! {
             1,
             true,
         },
-        get {
+        GET {
             cmd::string::get,
             [Flag::ReadOnly Flag::Fast],
             2,
@@ -616,7 +697,7 @@ dispatcher! {
             1,
             true,
         },
-        getex {
+        GETEX {
             cmd::string::getex,
             [Flag::Write Flag::Fast],
             -2,
@@ -625,7 +706,7 @@ dispatcher! {
             1,
             true,
         },
-        getrange {
+        GETRANGE {
             cmd::string::getrange,
             [Flag::ReadOnly],
             4,
@@ -634,7 +715,7 @@ dispatcher! {
             1,
             true,
         },
-        getdel {
+        GETDEL {
             cmd::string::getdel,
             [Flag::Write Flag::Fast],
             2,
@@ -643,7 +724,7 @@ dispatcher! {
             1,
             true,
         },
-        getset {
+        GETSET {
             cmd::string::getset,
             [Flag::Write Flag::DenyOom Flag::Fast],
             3,
@@ -652,7 +733,7 @@ dispatcher! {
             1,
             true,
         },
-        incr {
+        INCR {
             cmd::string::incr,
             [Flag::Write Flag::DenyOom Flag::Fast],
             2,
@@ -661,7 +742,7 @@ dispatcher! {
             1,
             true,
         },
-        incrby {
+        INCRBY {
             cmd::string::incr_by,
             [Flag::Write Flag::DenyOom Flag::Fast],
             3,
@@ -670,7 +751,7 @@ dispatcher! {
             1,
             true,
         },
-        incrbyfloat {
+        INCRBYFLOAT {
             cmd::string::incr_by_float,
             [Flag::Write Flag::DenyOom Flag::Fast],
             3,
@@ -679,7 +760,7 @@ dispatcher! {
             1,
             true,
         },
-        mget {
+        MGET {
             cmd::string::mget,
             [Flag::ReadOnly Flag::Fast],
             -2,
@@ -688,7 +769,7 @@ dispatcher! {
             1,
             true,
         },
-        mset {
+        MSET {
             cmd::string::mset,
             [Flag::Write Flag::DenyOom],
             -2,
@@ -697,7 +778,7 @@ dispatcher! {
             1,
             true,
         },
-        msetnx {
+        MSETNX {
             cmd::string::msetnx,
             [Flag::Write Flag::DenyOom],
             -2,
@@ -706,7 +787,7 @@ dispatcher! {
             1,
             true,
         },
-        set {
+        SET {
             cmd::string::set,
             [Flag::Write Flag::DenyOom],
             -3,
@@ -715,7 +796,7 @@ dispatcher! {
             1,
             true,
         },
-        setex {
+        SETEX {
             cmd::string::setex,
             [Flag::Write Flag::DenyOom],
             4,
@@ -724,7 +805,7 @@ dispatcher! {
             1,
             true,
         },
-        setnx {
+        SETNX {
             cmd::string::setnx,
             [Flag::Write Flag::DenyOom],
             3,
@@ -733,7 +814,7 @@ dispatcher! {
             1,
             true,
         },
-        psetex {
+        PSETEX {
             cmd::string::setex,
             [Flag::Write Flag::DenyOom],
             4,
@@ -742,7 +823,7 @@ dispatcher! {
             1,
             true,
         },
-        strlen {
+        STRLEN {
             cmd::string::strlen,
             [Flag::ReadOnly Flag::Fast],
             2,
@@ -751,7 +832,7 @@ dispatcher! {
             1,
             true,
         },
-        substr {
+        SUBSTR {
             cmd::string::getrange,
             [Flag::ReadOnly],
             2,
@@ -760,7 +841,7 @@ dispatcher! {
             1,
             true,
         },
-        setrange {
+        SETRANGE {
             cmd::string::setrange,
             [Flag::Write],
             4,
@@ -771,7 +852,7 @@ dispatcher! {
         }
     },
     connection {
-        client {
+        CLIENT {
             cmd::client::client,
             [Flag::Admin Flag::NoScript Flag::Random Flag::Loading Flag::Stale],
             -2,
@@ -780,7 +861,7 @@ dispatcher! {
             0,
             true,
         },
-        echo {
+        ECHO {
             cmd::client::echo,
             [Flag::Fast],
             2,
@@ -789,7 +870,7 @@ dispatcher! {
             0,
             true,
         },
-        ping {
+        PING {
             cmd::client::ping,
             [Flag::Stale Flag::Fast],
             -1,
@@ -798,7 +879,7 @@ dispatcher! {
             0,
             true,
         },
-        reset {
+        RESET {
             cmd::client::reset,
             [Flag::NoScript Flag::Loading Flag::Stale Flag::Fast],
             1,
@@ -807,7 +888,7 @@ dispatcher! {
             0,
             false,
         },
-        select {
+        SELECT {
             cmd::client::select,
             [Flag::Fast Flag::Stale Flag::Loading],
             2,
@@ -818,7 +899,7 @@ dispatcher! {
         }
     },
     transaction {
-        discard {
+        DISCARD {
             cmd::transaction::discard,
             [Flag::NoScript Flag::Loading Flag::Stale Flag::Fast],
             1,
@@ -827,7 +908,7 @@ dispatcher! {
             0,
             false,
         },
-        exec {
+        EXEC {
             cmd::transaction::exec,
             [Flag::NoScript Flag::Loading Flag::Stale Flag::SkipMonitor Flag::SkipSlowlog],
             1,
@@ -836,7 +917,7 @@ dispatcher! {
             0,
             false,
         },
-        multi {
+        MULTI {
             cmd::transaction::multi,
             [Flag::NoScript Flag::Loading Flag::Stale Flag::Fast],
             1,
@@ -845,7 +926,7 @@ dispatcher! {
             0,
             false,
         },
-        watch {
+        WATCH {
             cmd::transaction::watch,
             [Flag::NoScript Flag::Loading Flag::Stale Flag::Fast],
             -2,
@@ -854,7 +935,7 @@ dispatcher! {
             1,
             false,
         },
-        unwatch {
+        UNWATCH {
             cmd::transaction::unwatch,
             [Flag::NoScript Flag::Loading Flag::Stale Flag::Fast],
             1,
@@ -865,7 +946,7 @@ dispatcher! {
         },
     },
     pubsub {
-        publish {
+        PUBLISH {
             cmd::pubsub::publish,
             [Flag::PubSub Flag::Loading Flag::Stale Flag::Fast Flag::MayReplicate],
             3,
@@ -874,7 +955,7 @@ dispatcher! {
             0,
             true,
         },
-        pubsub {
+        PUBSUB {
             cmd::pubsub::pubsub,
             [Flag::PubSub Flag::Random Flag::Loading Flag::Stale],
             -2,
@@ -883,7 +964,7 @@ dispatcher! {
             0,
             true,
         },
-        psubscribe {
+        PSUBSCRIBE {
             cmd::pubsub::subscribe,
             [Flag::PubSub Flag::Random Flag::Loading Flag::Stale],
             -2,
@@ -892,7 +973,7 @@ dispatcher! {
             0,
             true,
         },
-        punsubscribe {
+        PUNSUBSCRIBE {
             cmd::pubsub::punsubscribe,
             [Flag::PubSub Flag::Random Flag::Loading Flag::Stale],
             -1,
@@ -901,7 +982,7 @@ dispatcher! {
             0,
             true,
         },
-        subscribe {
+        SUBSCRIBE {
             cmd::pubsub::subscribe,
             [Flag::PubSub Flag::Random Flag::Loading Flag::Stale],
             -2,
@@ -910,7 +991,7 @@ dispatcher! {
             0,
             true,
         },
-        unsubscribe {
+        UNSUBSCRIBE {
             cmd::pubsub::unsubscribe,
             [Flag::PubSub Flag::Random Flag::Loading Flag::Stale],
             -1,
@@ -921,7 +1002,7 @@ dispatcher! {
         },
     },
     server {
-        command {
+        COMMAND {
             cmd::server::command,
             [Flag::Random Flag::Loading Flag::Stale],
             -1,
@@ -930,7 +1011,7 @@ dispatcher! {
             0,
             true,
         },
-        time {
+        TIME {
             cmd::server::time,
             [Flag::Random Flag::Loading Flag::Stale Flag::Fast],
             1,

+ 27 - 2
src/error.rs

@@ -2,42 +2,65 @@
 //!
 //! All redis errors are abstracted in this mod.
 use crate::value::Value;
+use thiserror::Error;
 
 /// Redis errors
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Debug, PartialEq, Error)]
 pub enum Error {
     /// A command is not found
+    #[error("Command {0} not found")]
     CommandNotFound(String),
     /// A sub-command is not found
+    #[error("Subcommand {0} / {1} not found")]
     SubCommandNotFound(String, String),
     /// Invalid number of arguments
+    #[error("Invalid number of argumetns for command {0}")]
     InvalidArgsCount(String),
     /// The glob-pattern is not valid
+    #[error("Invalid pattern {0}")]
     InvalidPattern(String),
     /// Internal Error
+    #[error("Internal error")]
     Internal,
     /// Protocol error
+    #[error("Protocol error {0} expecting {1}")]
     Protocol(String, String),
     /// Unexpected argument
+    #[error("Wrong argument {1} for command {0}")]
     WrongArgument(String, String),
-    /// Command not found
+    /// Key not found
+    #[error("Key not found")]
     NotFound,
     /// Index out of range
+    #[error("Index out of range")]
     OutOfRange,
+    /// Attempting to move or copy to the same key
+    #[error("Cannot move same key")]
+    SameEntry,
     /// The connection is in pubsub only mode and the current command is not compabible.
+    #[error("Invalid command {0} in pubsub mode")]
     PubsubOnly(String),
     /// Syntax error
+    #[error("Syntax error")]
     Syntax,
     /// Byte cannot be converted to a number
+    #[error("Not a number")]
     NotANumber,
     /// The connection is not in a transaction
+    #[error("Not in a transaction")]
     NotInTx,
     /// The requested database does not exists
+    #[error("Database does not exists")]
     NotSuchDatabase,
     /// The connection is in a transaction and nested transactions are not supported
+    #[error("Nested transaction not allowed")]
     NestedTx,
     /// Wrong data type
+    #[error("Wrong type")]
     WrongType,
+    /// Cursor error
+    #[error("Error while creating or parsing the cursor: {0}")]
+    Cursor(#[from] crate::value::cursor::Error),
 }
 
 impl From<Error> for Value {
@@ -50,6 +73,7 @@ impl From<Error> for Value {
         };
 
         let err_msg = match value {
+            Error::Cursor(_) => "internal error".to_owned(),
             Error::CommandNotFound(x) => format!("unknown command `{}`", x),
             Error::SubCommandNotFound(x, y) => format!("Unknown subcommand or wrong number of arguments for '{}'. Try {} HELP.", x, y),
             Error::InvalidArgsCount(x) => format!("wrong number of arguments for '{}' command", x),
@@ -57,6 +81,7 @@ impl From<Error> for Value {
             Error::Internal => "internal error".to_owned(),
             Error::Protocol(x, y) => format!("Protocol error: expected '{}', got '{}'", x, y),
             Error::NotInTx => " without MULTI".to_owned(),
+            Error::SameEntry => "source and destination objects are the same".to_owned(),
             Error::NotANumber => "value is not an integer or out of range".to_owned(),
             Error::OutOfRange => "index out of range".to_owned(),
             Error::Syntax => "syntax error".to_owned(),

+ 4 - 4
src/macros.rs

@@ -87,7 +87,7 @@ macro_rules! dispatcher {
             /// Returns a command handler for a given command
             #[inline(always)]
             pub fn get_handler_for_command(&self, command: &str) -> Result<&command::Command, Error> {
-                match command.to_lowercase().as_str() {
+                match command.to_uppercase().as_str() {
                 $($(
                     stringify!($command) => Ok(&self.$command),
                 )+)+
@@ -102,7 +102,7 @@ macro_rules! dispatcher {
             /// has fewer logic when reading the provided arguments.
             #[inline(always)]
             pub fn get_handler(&self, args: &[Bytes]) -> Result<&command::Command, Error> {
-                let command = String::from_utf8_lossy(&args[0]).to_lowercase();
+                let command = String::from_utf8_lossy(&args[0]).to_uppercase();
                 let command = self.get_handler_for_command(&command)?;
                 if ! command.check_number_args(args.len()) {
                     Err(Error::InvalidArgsCount(command.name().into()))
@@ -119,8 +119,8 @@ macro_rules! dispatcher {
             #[inline(always)]
             pub fn execute<'a>(&'a self, conn: &'a Connection, args: &'a [Bytes]) -> futures::future::BoxFuture<'a, Result<Value, Error>> {
                 async move {
-                    let command = String::from_utf8_lossy(&args[0]);
-                    match command.to_lowercase().as_str() {
+                    let command = String::from_utf8_lossy(&args[0]).to_uppercase();
+                    match command.as_str() {
                         $($(
                             stringify!($command) => {
                                 let command = &self.$command;

+ 119 - 0
src/value/cursor.rs

@@ -0,0 +1,119 @@
+//! Cursor implementation
+
+use byteorder::{LittleEndian, WriteBytesExt};
+use bytes::Bytes;
+use crc32fast::Hasher as Crc32Hasher;
+use std::{
+    convert::TryFrom,
+    io,
+    num::{IntErrorKind, ParseIntError},
+    str::FromStr,
+};
+use thiserror::Error;
+
+/// Error
+#[derive(Error, Debug, PartialEq)]
+pub enum Error {
+    #[error("Parsing Error")]
+    /// Parsing Int error
+    Int(#[from] ParseIntError),
+    #[error("I/O Error")]
+    /// I/O error
+    Io,
+}
+
+/// Cursor.
+///
+/// Redis cursors are stateless. They serialize into a u128 integer information
+/// about the latest processed bucket and the last position with a checksum
+/// value to make sure the number is valid.
+#[derive(Debug, Eq, PartialEq)]
+pub struct Cursor {
+    checksum: u32,
+    /// Current Bucket ID
+    pub bucket: u16,
+    /// Last position of the key that was processed
+    pub last_position: u64,
+}
+
+impl Cursor {
+    /// Creates a new cursor
+    pub fn new(bucket: u16, last_position: u64) -> Result<Self, Error> {
+        let mut hasher = Crc32Hasher::new();
+        let mut buf = vec![];
+        buf.write_u16::<LittleEndian>(bucket)
+            .map_err(|_| Error::Io)?;
+        buf.write_u64::<LittleEndian>(last_position)
+            .map_err(|_| Error::Io)?;
+        hasher.update(&buf);
+        Ok(Self {
+            checksum: hasher.finalize(),
+            bucket,
+            last_position,
+        })
+    }
+
+    /// Serializes the cursor a  single u128 integer
+    pub fn serialize(&self) -> u128 {
+        let bucket: u128 = self.bucket.into();
+        let last_position: u128 = self.last_position as u128;
+        if bucket == last_position && bucket == 0 {
+            return 0;
+        }
+        let checksum: u128 = self.checksum.into();
+        (checksum << 80) | (bucket << 64) | (last_position)
+    }
+}
+
+impl FromStr for Cursor {
+    type Err = Error;
+
+    /// Deserializes a cursor from a string. The string must be a valid number.
+    /// If the number is invalid or the checksum is not valid a new cursor with
+    /// position 0,0 is returned.
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        let raw_number: u128 = u128::from_str(s)?;
+        let checksum: u32 = (raw_number >> 80) as u32;
+        let cursor = Self::new((raw_number >> 64) as u16, raw_number as u64)?;
+        if cursor.checksum == checksum {
+            Ok(cursor)
+        } else {
+            Ok(Self::new(0, 0)?)
+        }
+    }
+}
+
+impl TryFrom<&Bytes> for Cursor {
+    type Error = Error;
+
+    fn try_from(v: &Bytes) -> Result<Self, Self::Error> {
+        Cursor::from_str(&String::from_utf8_lossy(v))
+    }
+}
+
+impl ToString for Cursor {
+    fn to_string(&self) -> String {
+        self.serialize().to_string()
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    fn serialize_end() {
+        let x = Cursor::new(0, 0).unwrap();
+        assert_eq!("0", x.to_string());
+    }
+
+    #[test]
+    fn serialize() {
+        for e in 0..255 {
+            for i in 1..10000 {
+                let x = Cursor::new(e, i).unwrap();
+                let y = Cursor::from_str(&x.to_string()).unwrap();
+                assert_eq!(x, y);
+            }
+        }
+    }
+}

+ 12 - 0
src/value/mod.rs

@@ -2,7 +2,9 @@
 //!
 //! All redis internal data structures and values are absracted in this mod.
 pub mod checksum;
+pub mod cursor;
 pub mod locked;
+pub mod typ;
 
 use crate::{error::Error, value_try_from, value_vec_try_from};
 use bytes::{Bytes, BytesMut};
@@ -53,6 +55,16 @@ impl Value {
     pub fn new(value: &[u8]) -> Self {
         Self::Blob(value.into())
     }
+
+    /// Returns the internal encoding of the redis
+    pub fn encoding(&self) -> &str {
+        match self {
+            Self::Hash(_) | Self::Set(_) => "hashtable",
+            Self::List(_) => "linkedlist",
+            Self::Array(_) => "vector",
+            _ => "embstr",
+        }
+    }
 }
 
 impl From<&Value> for Vec<u8> {

+ 89 - 0
src/value/typ.rs

@@ -0,0 +1,89 @@
+//! # Value Type mod
+//!
+use crate::value::Value;
+use std::str::FromStr;
+use strum_macros::{Display, EnumString};
+
+/// Value Type
+#[derive(EnumString, Display, PartialEq, Copy, Clone)]
+pub enum ValueTyp {
+    /// Set
+    #[strum(ascii_case_insensitive)]
+    Set,
+    /// Hash
+    #[strum(ascii_case_insensitive)]
+    Hash,
+    /// List
+    #[strum(ascii_case_insensitive)]
+    List,
+    /// Fallback
+    #[strum(ascii_case_insensitive)]
+    String,
+}
+
+/// Type
+pub struct Typ {
+    typ: ValueTyp,
+    is_negated: bool,
+}
+
+impl Typ {
+    /// Returns the type from a given value
+    pub fn get_type(value: &Value) -> ValueTyp {
+        match value {
+            Value::Hash(_) => ValueTyp::Hash,
+            Value::List(_) => ValueTyp::List,
+            Value::Set(_) => ValueTyp::Set,
+            _ => ValueTyp::String,
+        }
+    }
+
+    /// Whether the current type is negated or not
+    pub fn is_negated(&self) -> bool {
+        self.is_negated
+    }
+
+    /// Checks if a given value is of the same type
+    pub fn is_value_type(&self, value: &Value) -> bool {
+        let t = Self::get_type(value);
+        if self.is_negated {
+            t != self.typ
+        } else {
+            t == self.typ
+        }
+    }
+}
+
+impl FromStr for Typ {
+    type Err = strum::ParseError;
+    fn from_str(input: &str) -> Result<Self, Self::Err> {
+        if input.chars().next() == Some('!') {
+            Ok(Self {
+                typ: ValueTyp::from_str(&input[1..])?,
+                is_negated: true,
+            })
+        } else {
+            Ok(Self {
+                typ: ValueTyp::from_str(input)?,
+                is_negated: false,
+            })
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn type_parsing_test_1() {
+        let t = Typ::from_str("!set").unwrap();
+        assert!(t.is_negated());
+    }
+
+    #[test]
+    fn type_parsing_test_2() {
+        let t = Typ::from_str("set").unwrap();
+        assert!(!t.is_negated());
+    }
+}