Bläddra i källkod

Adding docs comments (#12)

César D. Rodas 3 år sedan
förälder
incheckning
7e40c4cf81

+ 40 - 0
redis-protocol-parser/src/lib.rs

@@ -1,32 +1,65 @@
+//! # A zero-copy redis protocol parser
+//!
+//! A zero-copy redis protocol parser
+
+#![deny(missing_docs)]
+#![deny(warnings)]
+
 #[macro_use]
 mod macros;
 
 use std::convert::TryInto;
 
+/// Redis Value.
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value<'a> {
+    /// Vector of values
     Array(Vec<Value<'a>>),
+    /// Binary data
     Blob(&'a [u8]),
+    /// String. New lines are not allowed
     String(&'a str),
+    /// Error
     Error(&'a str, &'a str),
+    /// Integer
     Integer(i64),
+    /// Boolean
     Boolean(bool),
+    /// Float number
     Float(f64),
+    /// Big integers
     BigInteger(i128),
+    /// Null
     Null,
 }
 
+/// Protocol errors
 #[derive(Debug, Clone, Eq, PartialEq)]
 pub enum Error {
+    /// The data is incomplete. This it not an error per-se, but rather a
+    /// mechanism to let the caller know they should keep buffering data before
+    /// calling the parser again.
     Partial,
+    /// Unexpected first byte after a new line
     InvalidPrefix,
+    /// Invalid data length
     InvalidLength,
+    /// Parsed value is not boolean
     InvalidBoolean,
+    /// Parsed data is not a number
     InvalidNumber,
+    /// Protocol error
     Protocol(u8, u8),
+    /// Missing new line
     NewLine,
 }
 
+/// Parses data in the format that redis expects
+///
+/// Redis expects an array of blobs. Although the protocol is much wider at the
+/// top level, redis expects an array of blobs.
+///
+/// The first value is returned along side with the unconsumed stream of bytes.
 pub fn parse_server(bytes: &[u8]) -> Result<(&[u8], Vec<&[u8]>), Error> {
     let (bytes, byte) = next!(bytes);
     match byte {
@@ -35,6 +68,9 @@ pub fn parse_server(bytes: &[u8]) -> Result<(&[u8], Vec<&[u8]>), Error> {
     }
 }
 
+/// Parses an array from an steam of bytes
+///
+/// The first value is returned along side with the unconsumed stream of bytes.
 fn parse_server_array(bytes: &[u8]) -> Result<(&[u8], Vec<&[u8]>), Error> {
     let (bytes, len) = read_line_number!(bytes, i32);
     if len <= 0 {
@@ -60,6 +96,10 @@ fn parse_server_array(bytes: &[u8]) -> Result<(&[u8], Vec<&[u8]>), Error> {
     Ok((bytes, v))
 }
 
+/// Parses redis values from an stream of bytes. If the data is incomplete
+/// Err(Error::Partial) is returned.
+///
+/// The first value is returned along side with the unconsumed stream of bytes.
 pub fn parse(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
     let (bytes, byte) = next!(bytes);
     match byte {

+ 20 - 2
src/cmd/client.rs

@@ -1,7 +1,13 @@
+//!  # Client-group command handlers
+
 use crate::{connection::Connection, error::Error, option, value::Value};
 use bytes::Bytes;
 use std::sync::Arc;
 
+/// "client" command handler
+///
+/// Documentation:
+///  * <https://redis.io/commands/client-id>
 pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let sub = String::from_utf8_lossy(&args[1]);
 
@@ -19,12 +25,12 @@ pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
     match sub.to_lowercase().as_str() {
         "id" => Ok((conn.id() as i64).into()),
-        "info" => Ok(conn.info().as_str().into()),
+        "info" => Ok(conn.as_string().into()),
         "getname" => Ok(option!(conn.name())),
         "list" => {
             let mut v: Vec<Value> = vec![];
             conn.all_connections()
-                .iter(&mut |conn: Arc<Connection>| v.push(conn.info().as_str().into()));
+                .iter(&mut |conn: Arc<Connection>| v.push(conn.as_string().into()));
             Ok(v.into())
         }
         "setname" => {
@@ -39,10 +45,18 @@ pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     }
 }
 
+/// "echo" command handler
+///
+/// Documentation:
+///  * <https://redis.io/commands/echo>
 pub async fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(Value::Blob(args[1].to_owned()))
 }
 
+/// "ping" command handler
+///
+/// Documentation:
+///  * <https://redis.io/commands/ping>
 pub async fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match args.len() {
         1 => Ok(Value::String("PONG".to_owned())),
@@ -51,6 +65,10 @@ pub async fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     }
 }
 
+/// "reset" command handler
+///
+/// Documentation:
+///  * <https://redis.io/commands/reset>
 pub async fn reset(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     conn.reset();
     Ok(Value::String("RESET".to_owned()))

+ 24 - 0
src/cmd/hash.rs

@@ -1,3 +1,4 @@
+//! # Hash command handlers
 use crate::{
     check_arg, connection::Connection, error::Error, value::bytes_to_number, value::Value,
 };
@@ -10,6 +11,9 @@ use std::{
     str::FromStr,
 };
 
+/// Removes the specified fields from the hash stored at key. Specified fields that do not exist
+/// within this hash are ignored. If key does not exist, it is treated as an empty hash and this
+/// command returns 0.
 pub async fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -36,6 +40,7 @@ pub async fn hdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Returns if field is an existing field in the hash stored at key.
 pub async fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -51,6 +56,7 @@ pub async fn hexists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
     )
 }
 
+/// Returns the value associated with field in the hash stored at key.
 pub async fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -66,6 +72,8 @@ pub async fn hget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Returns all fields and values of the hash stored at key. In the returned value, every field
+/// name is followed by its value, so the length of the reply is twice the size of the hash.
 pub async fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -86,6 +94,10 @@ pub async fn hgetall(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
     )
 }
 
+/// Increment the specified field of a hash stored at key, and representing a number, by the
+/// specified increment. If the increment value is negative, the result is to have the hash field
+/// value decremented instead of incremented. If the field does not exist, it is set to 0 before
+/// performing the operation.
 pub async fn hincrby<
     T: ToString + FromStr + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
 >(
@@ -123,6 +135,7 @@ pub async fn hincrby<
     Ok(result)
 }
 
+/// Returns all field names in the hash stored at key.
 pub async fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -142,6 +155,7 @@ pub async fn hkeys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Returns the number of fields contained in the hash stored at key.
 pub async fn hlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -153,6 +167,7 @@ pub async fn hlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Returns the values associated with the specified fields in the hash stored at key.
 pub async fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -178,6 +193,7 @@ pub async fn hmget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Returns random keys (or values) from a hash
 pub async fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let (count, with_values) = match args.len() {
         2 => (None, false),
@@ -242,6 +258,8 @@ pub async fn hrandfield(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
     )
 }
 
+/// Sets field in the hash stored at key to value. If key does not exist, a new key holding a hash
+/// is created. If field already exists in the hash, it is overwritten.
 pub async fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if args.len() % 2 == 1 {
         return Err(Error::InvalidArgsCount("hset".to_owned()));
@@ -278,6 +296,9 @@ pub async fn hset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Sets field in the hash stored at key to value, only if field does not yet exist. If key does
+/// not exist, a new key holding a hash is created. If field already exists, this operation has no
+/// effect.
 pub async fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -313,6 +334,8 @@ pub async fn hsetnx(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Returns the string length of the value associated with field in the hash stored at key. If the
+/// key or the field do not exist, 0 is returned.
 pub async fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -328,6 +351,7 @@ pub async fn hstrlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
     )
 }
 
+/// Returns all values in the hash stored at key.
 pub async fn hvals(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],

+ 27 - 1
src/cmd/key.rs

@@ -1,3 +1,4 @@
+//! # Key-related command handlers
 use crate::{
     check_arg, connection::Connection, error::Error, value::bytes_to_number, value::Value,
 };
@@ -5,21 +6,36 @@ use bytes::Bytes;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tokio::time::{Duration, Instant};
 
-pub fn now() -> Duration {
+/// Returns the current time
+fn now() -> Duration {
     let start = SystemTime::now();
     start
         .duration_since(UNIX_EPOCH)
         .expect("Time went backwards")
 }
 
+/// Removes the specified keys. A key is ignored if it does not exist.
 pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().del(&args[1..]))
 }
 
+/// Returns if key exists.
 pub async fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().exists(&args[1..]))
 }
 
+/// Set a timeout on key. After the timeout has expired, the key will automatically be deleted. A
+/// key with an associated timeout is often said to be volatile in Redis terminology.
+///
+/// The timeout will only be cleared by commands that delete or overwrite the contents of the key,
+/// including DEL, SET, GETSET and all the *STORE commands. This means that all the operations that
+/// conceptually alter the value stored at the key without replacing it with a new one will leave
+/// the timeout untouched. For instance, incrementing the value of a key with INCR, pushing a new
+/// value into a list with LPUSH, or altering the field value of a hash with HSET are all
+/// operations that will leave the timeout untouched.
+///
+/// The timeout can also be cleared, turning the key back into a persistent key, using the PERSIST
+/// command.
 pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let expires_in: i64 = bytes_to_number(&args[2])?;
 
@@ -37,6 +53,9 @@ pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
+/// EXPIREAT has the same effect and semantic as EXPIRE, but instead of specifying the number of
+/// seconds representing the TTL (time to live), it takes an absolute Unix timestamp (seconds since
+/// January 1, 1970). A timestamp in the past will delete the key immediately.
 pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let secs = check_arg!(args, 0, "EXPIREAT");
     let expires_at: i64 = bytes_to_number(&args[2])?;
@@ -60,6 +79,9 @@ pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error
     Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
+/// Returns the remaining time to live of a key that has a timeout. This introspection capability
+/// allows a Redis client to check how many seconds a given key will continue to be part of the
+/// dataset.
 pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
@@ -77,6 +99,8 @@ pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(ttl.into())
 }
 
+/// Returns the absolute Unix timestamp (since January 1, 1970) in seconds at which the given key
+/// will expire.
 pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
         Some(Some(ttl)) => {
@@ -96,6 +120,8 @@ pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Err
     Ok(ttl.into())
 }
 
+/// Remove the existing timeout on key, turning the key from volatile (a key with an expire set) to
+/// persistent (a key that will never expire as no timeout is associated).
 pub async fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().persist(&args[1]))
 }

+ 63 - 0
src/cmd/list.rs

@@ -1,3 +1,4 @@
+//! # List command handlers
 use crate::{
     check_arg,
     connection::{Connection, ConnectionStatus},
@@ -57,6 +58,10 @@ fn remove_element(
     Ok(result)
 }
 
+/// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
+/// the connection when there are no elements to pop from any of the given lists. An element is
+/// popped from the head of the first list that is non-empty, with the given keys being checked in
+/// the order that they are given.
 pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
     let len = args.len() - 1;
@@ -79,6 +84,10 @@ pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(Value::Null)
 }
 
+/// BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks
+/// the connection when there are no elements to pop from any of the given lists. An element is
+/// popped from the tail of the first list that is non-empty, with the given keys being checked in
+/// the order that they are given.
 pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
     let len = args.len() - 1;
@@ -101,6 +110,10 @@ pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(Value::Null)
 }
 
+/// Returns the element at index index in the list stored at key. The index is zero-based, so 0
+/// means the first element, 1 the second element and so on. Negative indices can be used to
+/// designate elements starting at the tail of the list. Here, -1 means the last element, -2 means
+/// the penultimate and so forth.
 pub async fn lindex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -122,6 +135,9 @@ pub async fn lindex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Inserts element in the list stored at key either before or after the reference value pivot.
+///
+/// When key does not exist, it is considered an empty list and no operation is performed.
 pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let is_before = if check_arg!(args, 2, "BEFORE") {
         true
@@ -172,6 +188,8 @@ pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
     Ok(result)
 }
 
+/// Returns the length of the list stored at key. If key does not exist, it is interpreted as an
+/// empty list and 0 is returned. An error is returned when the value stored at key is not a list.
 pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -183,6 +201,9 @@ pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Atomically returns and removes the first/last element (head/tail depending on the wherefrom
+/// argument) of the list stored at source, and pushes the element at the first/last element
+/// (head/tail depending on the whereto argument) of the list stored at destination.
 pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let source_is_left = if check_arg!(args, 3, "LEFT") {
         true
@@ -255,6 +276,11 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Removes and returns the first elements of the list stored at key.
+///
+/// By default, the command pops a single element from the beginning of the list. When provided
+/// with the optional count argument, the reply will consist of up to count elements, depending on
+/// the list's length.
 pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let count = if args.len() > 2 {
         bytes_to_number(&args[2])?
@@ -265,6 +291,10 @@ pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     remove_element(conn, &args[1], count, true)
 }
 
+/// The command returns the index of matching elements inside a Redis list. By default, when no
+/// options are given, it will scan the list from head to tail, looking for the first match of
+/// "element". If the element is found, its index (the zero-based position in the list) is
+/// returned. Otherwise, if no match is found, nil is returned.
 pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let mut index = 3;
     let element = checksum::Ref::new(&args[2]);
@@ -339,6 +369,15 @@ pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Insert all the specified values at the head of the list stored at key. If key does not exist,
+/// it is created as empty list before performing the push operations. When key holds a value that
+/// is not a list, an error is returned.
+///
+/// It is possible to push multiple elements using a single command call just specifying multiple
+/// arguments at the end of the command. Elements are inserted one after the other to the head of
+/// the list, from the leftmost element to the rightmost element. So for instance the command LPUSH
+/// mylist a b c will result into a list containing c as first element, b as second element and a
+/// as third element.
 pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let is_push_x = check_arg!(args, 0, "LPUSHX");
 
@@ -375,6 +414,12 @@ pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Returns the specified elements of the list stored at key. The offsets start and stop are
+/// zero-based indexes, with 0 being the first element of the list (the head of the list), 1 being
+/// the next element and so on.
+///
+/// These offsets can also be negative numbers indicating offsets starting at the end of the list.
+/// For example, -1 is the last element of the list, -2 the penultimate, and so on.
 pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -406,6 +451,7 @@ pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Removes the first count occurrences of elements equal to element from the list stored at key
 pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -457,6 +503,10 @@ pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Sets the list element at index to element. For more information on the index argument, see
+/// LINDEX.
+///
+/// An error is returned for out of range indexes.
 pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -486,6 +536,9 @@ pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Trim an existing list so that it will contain only the specified range of elements specified.
+/// Both start and stop are zero-based indexes, where 0 is the first element of the list (the
+/// head), 1 the next element and so on.
 pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -522,6 +575,11 @@ pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Removes and returns the last elements of the list stored at key.
+///
+/// By default, the command pops a single element from the end of the list. When provided with the
+/// optional count argument, the reply will consist of up to count elements, depending on the
+/// list's length.
 pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let count = if args.len() > 2 {
         bytes_to_number(&args[2])?
@@ -532,6 +590,8 @@ pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     remove_element(conn, &args[1], count, false)
 }
 
+/// Atomically returns and removes the last element (tail) of the list stored at source, and pushes
+/// the element at the first element (head) of the list stored at destination.
 pub async fn rpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     lmove(
         conn,
@@ -546,6 +606,9 @@ pub async fn rpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error
     .await
 }
 
+/// Insert all the specified values at the tail of the list stored at key. If key does not exist,
+/// it is created as empty list before performing the push operation. When key holds a value that
+/// is not a list, an error is returned.
 pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let is_push_x = check_arg!(args, 0, "RPUSHX");
 

+ 4 - 0
src/cmd/metrics.rs

@@ -1,6 +1,10 @@
+//! # Metrics command handlers
 use crate::{connection::Connection, error::Error, value::Value};
 use bytes::Bytes;
 
+/// Dumps metrics from commands. If no argument is passed all commands' metrics are dump.
+///
+/// The metrics are serialized as JSON.
 pub async fn metrics(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let dispatcher = conn.all_connections().get_dispatcher();
     let mut result: Vec<Value> = vec![];

+ 2 - 1
src/cmd/mod.rs

@@ -1,10 +1,11 @@
+//! # All commands handlers
 pub mod client;
 pub mod hash;
 pub mod key;
 pub mod list;
+pub mod metrics;
 pub mod pubsub;
 pub mod set;
-pub mod metrics;
 pub mod string;
 pub mod transaction;
 

+ 6 - 0
src/cmd/pubsub.rs

@@ -1,11 +1,14 @@
+//! # Pubsub command handlers
 use crate::{check_arg, connection::Connection, error::Error, value::Value};
 use bytes::Bytes;
 use glob::Pattern;
 
+/// Posts a message to the given channel.
 pub async fn publish(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.pubsub().publish(&args[1], &args[2]).await.into())
 }
 
+/// All pubsub commands
 pub async fn pubsub(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match String::from_utf8_lossy(&args[1]).to_lowercase().as_str() {
         "channels" => Ok(Value::Array(
@@ -40,6 +43,7 @@ pub async fn pubsub(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     }
 }
 
+/// Subscribes the client to the specified channels.
 pub async fn subscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let pubsub = conn.pubsub();
 
@@ -54,6 +58,7 @@ pub async fn subscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Error
     conn.start_pubsub()
 }
 
+/// Unsubscribes the client from the given patterns, or from all of them if none is given.
 pub async fn punsubscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let channels = if args.len() == 1 {
         conn.pubsub_client().psubscriptions()
@@ -70,6 +75,7 @@ pub async fn punsubscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Er
     Ok(conn.pubsub_client().punsubscribe(&channels, conn).into())
 }
 
+/// Unsubscribes the client from the given channels, or from all of them if none is given.
 pub async fn unsubscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let channels = if args.len() == 1 {
         conn.pubsub_client().subscriptions()

+ 70 - 0
src/cmd/set.rs

@@ -1,3 +1,4 @@
+//! # Set command handlers
 use crate::{connection::Connection, error::Error, value::bytes_to_number, value::Value};
 use bytes::Bytes;
 use rand::Rng;
@@ -61,6 +62,9 @@ where
     )
 }
 
+/// Add the specified members to the set stored at key. Specified members that are already a member
+/// of this set are ignored. If key does not exist, a new set is created before adding the
+/// specified members.
 pub async fn sadd(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -102,6 +106,7 @@ pub async fn sadd(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Returns the set cardinality (number of elements) of the set stored at key.
 pub async fn scard(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -113,6 +118,10 @@ pub async fn scard(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     )
 }
 
+/// Returns the members of the set resulting from the difference between the first set and all the
+/// successive sets.
+///
+/// Keys that do not exist are considered to be empty sets.
 pub async fn sdiff(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     compare_sets(conn, &args[1..], |all_entries, elements| {
         for element in elements.iter() {
@@ -125,6 +134,10 @@ pub async fn sdiff(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     .await
 }
 
+/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in
+/// destination.
+///
+/// If destination already exists, it is overwritten.
 pub async fn sdiffstore(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if let Value::Array(values) = sdiff(conn, &args[1..]).await? {
         Ok(store(conn, &args[1], &values).into())
@@ -133,6 +146,11 @@ pub async fn sdiffstore(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
     }
 }
 
+/// Returns the members of the set resulting from the intersection of all the given sets.
+///
+/// Keys that do not exist are considered to be empty sets. With one of the keys being an empty
+/// set, the resulting set is also empty (since set intersection with an empty set always results
+/// in an empty set).
 pub async fn sinter(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     compare_sets(conn, &args[1..], |all_entries, elements| {
         all_entries.retain(|element| elements.contains(element));
@@ -148,6 +166,13 @@ pub async fn sinter(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     .await
 }
 
+/// This command is similar to SINTER, but instead of returning the result set, it returns just the
+/// cardinality of the result. Returns the cardinality of the set which would result from the
+/// intersection of all the given sets.
+///
+/// Keys that do not exist are considered to be empty sets. With one of the keys being an empty
+/// set, the resulting set is also empty (since set intersection with an empty set always results
+/// in an empty set).
 pub async fn sintercard(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if let Ok(Value::Array(x)) = sinter(conn, args).await {
         Ok(x.len().into())
@@ -156,6 +181,10 @@ pub async fn sintercard(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
     }
 }
 
+/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in
+/// destination.
+///
+/// If destination already exists, it is overwritten.
 pub async fn sinterstore(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if let Value::Array(values) = sinter(conn, &args[1..]).await? {
         Ok(store(conn, &args[1], &values).into())
@@ -164,6 +193,7 @@ pub async fn sinterstore(conn: &Connection, args: &[Bytes]) -> Result<Value, Err
     }
 }
 
+/// Returns if member is a member of the set stored at key.
 pub async fn sismember(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -181,6 +211,9 @@ pub async fn sismember(conn: &Connection, args: &[Bytes]) -> Result<Value, Error
     )
 }
 
+/// Returns all the members of the set value stored at key.
+///
+/// This has the same effect as running SINTER with one argument key.
 pub async fn smembers(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -197,6 +230,10 @@ pub async fn smembers(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
     )
 }
 
+/// Returns whether each member is a member of the set stored at key.
+///
+/// For every member, 1 is returned if the value is a member of the set, or 0 if the element is not
+/// a member of the set or if key does not exist.
 pub async fn smismember(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -215,6 +252,14 @@ pub async fn smismember(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
     )
 }
 
+/// Move member from the set at source to the set at destination. This operation is atomic. In
+/// every given moment the element will appear to be a member of source or destination for other
+/// clients.
+///
+/// If the source set does not exist or does not contain the specified element, no operation is
+/// performed and 0 is returned. Otherwise, the element is removed from the source set and added to
+/// the destination set. When the specified element already exists in the destination set, it is
+/// only removed from the source set.
 pub async fn smove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -259,6 +304,14 @@ pub async fn smove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Removes and returns one or more random members from the set value store at key.
+///
+/// This operation is similar to SRANDMEMBER, that returns one or more random elements from a set
+/// but does not remove it.
+///
+/// By default, the command pops a single member from the set. When provided with the optional
+/// count argument, the reply will consist of up to count members, depending on the set's
+/// cardinality.
 pub async fn spop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let rand = srandmember(conn, args).await?;
     let result = conn.db().get_map_or(
@@ -291,6 +344,15 @@ pub async fn spop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// When called with just the key argument, return a random element from the set value stored at
+/// key.
+///
+/// If the provided count argument is positive, return an array of distinct elements. The array's
+/// length is either count or the set's cardinality (SCARD), whichever is lower.
+///
+/// If called with a negative count, the behavior changes and the command is allowed to return the
+/// same element multiple times. In this case, the number of returned elements is the absolute
+/// value of the specified count.
 pub async fn srandmember(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().get_map_or(
         &args[1],
@@ -324,6 +386,9 @@ pub async fn srandmember(conn: &Connection, args: &[Bytes]) -> Result<Value, Err
     )
 }
 
+/// Remove the specified members from the set stored at key. Specified members that are not a
+/// member of this set are ignored. If key does not exist, it is treated as an empty set and this
+/// command returns 0.
 pub async fn srem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let result = conn.db().get_map_or(
         &args[1],
@@ -350,6 +415,7 @@ pub async fn srem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(result)
 }
 
+/// Returns the members of the set resulting from the union of all the given sets.
 pub async fn sunion(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     compare_sets(conn, &args[1..], |all_entries, elements| {
         for element in elements.iter() {
@@ -361,6 +427,10 @@ pub async fn sunion(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     .await
 }
 
+/// This command is equal to SUNION, but instead of returning the resulting set, it is stored in
+/// destination.
+///
+/// If destination already exists, it is overwritten.
 pub async fn sunionstore(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     if let Value::Array(values) = sunion(conn, &args[1..]).await? {
         Ok(store(conn, &args[1], &values).into())

+ 42 - 2
src/cmd/string.rs

@@ -1,3 +1,4 @@
+//! # String command handlers
 use crate::{
     check_arg, connection::Connection, error::Error, value::bytes_to_number, value::Value,
 };
@@ -5,51 +6,88 @@ use bytes::Bytes;
 use std::{convert::TryInto, ops::Neg};
 use tokio::time::Duration;
 
+/// Increments the number stored at key by one. If the key does not exist, it is set to 0 before
+/// performing the operation. An error is returned if the key contains a value of the wrong type or
+/// contains a string that can not be represented as integer. This operation is limited to 64 bit
+/// signed integers.
 pub async fn incr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().incr(&args[1], 1)
+    conn.db().incr(&args[1], 1_i64)
 }
 
+/// Increments the number stored at key by increment. If the key does not exist, it is set to 0
+/// before performing the operation. An error is returned if the key contains a value of the wrong
+/// type or contains a string that can not be represented as integer. This operation is limited to
+/// 64 bit signed integers.
 pub async fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = bytes_to_number(&args[2])?;
     conn.db().incr(&args[1], by)
 }
 
+/// Increment the string representing a floating point number stored at key by the specified
+/// increment. By using a negative increment value, the result is that the value stored at the key
+/// is decremented (by the obvious properties of addition). If the key does not exist, it is set to
+/// 0 before performing the operation.
 pub async fn incr_by_float(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: f64 = bytes_to_number(&args[2])?;
     conn.db().incr(&args[1], by)
 }
 
+/// Decrements the number stored at key by one. If the key does not exist, it is set to 0 before
+/// performing the operation. An error is returned if the key contains a value of the wrong type or
+/// contains a string that can not be represented as integer. This operation is limited to 64 bit
+/// signed integers.
 pub async fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().incr(&args[1], -1)
+    conn.db().incr(&args[1], -1_i64)
 }
 
+/// Decrements the number stored at key by decrement. If the key does not exist, it is set to 0
+/// before performing the operation. An error is returned if the key contains a value of the wrong
+/// type or contains a string that can not be represented as integer. This operation is limited to
+/// 64 bit signed integers.
 pub async fn decr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = (&Value::Blob(args[2].to_owned())).try_into()?;
     conn.db().incr(&args[1], by.neg())
 }
 
+/// Get the value of key. If the key does not exist the special value nil is returned. An error is
+/// returned if the value stored at key is not a string, because GET only handles string values.
 pub async fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().get(&args[1]))
 }
 
+/// Get the value of key and delete the key. This command is similar to GET, except for the fact
+/// that it also deletes the key on success (if and only if the key's value type is a string).
 pub async fn getdel(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().getdel(&args[1]))
 }
 
+/// Atomically sets key to value and returns the old value stored at key. Returns an error when key
+/// exists but does not hold a string value. Any previous time to live associated with the key is
+/// discarded on successful SET operation.
 pub async fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().getset(&args[1], &Value::Blob(args[2].to_owned())))
 }
 
+/// Returns the values of all specified keys. For every key that does not hold a string value or
+/// does not exist, the special value nil is returned. Because of this, the operation never fails.
 pub async fn mget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().get_multi(&args[1..]))
 }
 
+/// Set key to hold the string value. If key already holds a value, it is overwritten, regardless
+/// of its type. Any previous time to live associated with the key is discarded on successful SET
+/// operation.
 pub async fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn
         .db()
         .set(&args[1], Value::Blob(args[2].to_owned()), None))
 }
 
+/// Set key to hold the string value and set key to timeout after a given number of seconds. This
+/// command is equivalent to executing the following commands:
+///
+/// SET mykey value
+/// EXPIRE mykey seconds
 pub async fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = if check_arg!(args, 0, "SETEX") {
         Duration::from_secs(bytes_to_number(&args[2])?)
@@ -62,6 +100,8 @@ pub async fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         .set(&args[1], Value::Blob(args[2].to_owned()), Some(ttl)))
 }
 
+/// Returns the length of the string value stored at key. An error is returned when key holds a
+/// non-string value.
 pub async fn strlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     match conn.db().get(&args[1]) {
         Value::Blob(x) => Ok(x.len().into()),

+ 16 - 0
src/cmd/transaction.rs

@@ -1,3 +1,4 @@
+//! # Transaction command handlers
 use crate::{
     connection::{Connection, ConnectionStatus},
     error::Error,
@@ -5,14 +6,25 @@ use crate::{
 };
 use bytes::Bytes;
 
+/// Flushes all previously queued commands in a transaction and restores the connection state to
+/// normal.
+///
+/// If WATCH was used, DISCARD unwatches all keys watched by the connection
 pub async fn discard(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     conn.stop_transaction()
 }
 
+/// Marks the start of a transaction block. Subsequent commands will be queued for atomic execution
+/// using EXEC.
 pub async fn multi(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     conn.start_transaction()
 }
 
+/// Executes all previously queued commands in a transaction and restores the connection state to
+/// normal.
+///
+/// When using WATCH, EXEC will execute commands only if the watched keys were not modified,
+/// allowing for a check-and-set mechanism.
 pub async fn exec(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     if conn.status() != ConnectionStatus::Multi {
         return Err(Error::NotInTx);
@@ -49,6 +61,7 @@ pub async fn exec(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     Ok(results.into())
 }
 
+/// Marks the given keys to be watched for conditional execution of a transaction.
 pub async fn watch(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.watch_key(
         &(&args[1..])
@@ -59,6 +72,9 @@ pub async fn watch(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(Value::Ok)
 }
 
+/// Flushes all the previously watched keys for a transaction.
+///
+/// If you call EXEC or DISCARD, there's no need to manually call UNWATCH.
 pub async fn unwatch(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     conn.discard_watched_keys();
     Ok(Value::Ok)

+ 12 - 0
src/connection/connections.rs

@@ -1,3 +1,7 @@
+//! # Connections object
+//!
+//! This mod keeps track of all active conections. There is one instance of this mod per running
+//! server.
 use super::{pubsub_connection::PubsubClient, pubsub_server::Pubsub, Connection, ConnectionInfo};
 use crate::{db::Db, dispatcher::Dispatcher, value::Value};
 use parking_lot::RwLock;
@@ -5,6 +9,7 @@ use std::{collections::BTreeMap, net::SocketAddr, sync::Arc};
 
 use tokio::sync::mpsc;
 
+/// Connections struct
 #[derive(Debug)]
 pub struct Connections {
     connections: RwLock<BTreeMap<u128, Arc<Connection>>>,
@@ -15,6 +20,7 @@ pub struct Connections {
 }
 
 impl Connections {
+    /// Returns a new instance of connections.
     pub fn new(db: Arc<Db>) -> Self {
         Self {
             counter: RwLock::new(0),
@@ -25,24 +31,29 @@ impl Connections {
         }
     }
 
+    /// Returns the database
     #[allow(dead_code)]
     pub fn db(&self) -> Arc<Db> {
         self.db.clone()
     }
 
+    /// Returns the dispatcher instance
     pub fn get_dispatcher(&self) -> Arc<Dispatcher> {
         self.dispatcher.clone()
     }
 
+    /// Returns the pubsub server instance
     pub fn pubsub(&self) -> Arc<Pubsub> {
         self.pubsub.clone()
     }
 
+    /// Removes a connection from the connections
     pub fn remove(self: Arc<Connections>, conn: Arc<Connection>) {
         let id = conn.id();
         self.connections.write().remove(&id);
     }
 
+    /// Creates a new connection
     pub fn new_connection(
         self: &Arc<Connections>,
         db: Arc<Db>,
@@ -67,6 +78,7 @@ impl Connections {
         (pubsub_receiver, conn)
     }
 
+    /// Iterates over all connections
     pub fn iter(&self, f: &mut dyn FnMut(Arc<Connection>)) {
         for (_, value) in self.connections.read().iter() {
             f(value.clone())

+ 65 - 27
src/connection/mod.rs

@@ -1,3 +1,4 @@
+//! # Connection module
 use crate::{db::Db, error::Error, value::Value};
 use bytes::Bytes;
 use parking_lot::RwLock;
@@ -9,11 +10,16 @@ pub mod connections;
 pub mod pubsub_connection;
 pub mod pubsub_server;
 
+/// Possible status of connections
 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
 pub enum ConnectionStatus {
+    /// The connection is in a MULTI stage and commands are being queued
     Multi,
+    /// The connection is executing a transaction
     ExecutingTx,
+    /// The connection is in pub-sub only mode
     Pubsub,
+    /// The connection is a normal conection
     Normal,
 }
 
@@ -23,15 +29,17 @@ impl Default for ConnectionStatus {
     }
 }
 
+/// Connection information
 #[derive(Debug)]
 pub struct ConnectionInfo {
-    pub name: Option<String>,
-    pub watch_keys: Vec<(Bytes, u128)>,
-    pub tx_keys: HashSet<Bytes>,
-    pub status: ConnectionStatus,
-    pub commands: Option<Vec<Vec<Bytes>>>,
+    name: Option<String>,
+    watch_keys: Vec<(Bytes, u128)>,
+    tx_keys: HashSet<Bytes>,
+    status: ConnectionStatus,
+    commands: Option<Vec<Vec<Bytes>>>,
 }
 
+/// Connection
 #[derive(Debug)]
 pub struct Connection {
     id: u128,
@@ -44,6 +52,7 @@ pub struct Connection {
 }
 
 impl ConnectionInfo {
+    /// Creates a new connection
     fn new() -> Self {
         Self {
             name: None,
@@ -56,24 +65,46 @@ impl ConnectionInfo {
 }
 
 impl Connection {
+    /// Returns a connection database.
+    ///
+    /// The database object is unique to this connection but most of its internal structure is
+    /// shared (like the entries).
     pub fn db(&self) -> &Db {
         &self.db
     }
 
+    /// Returns the global pubsub server
     pub fn pubsub(&self) -> Arc<Pubsub> {
         self.all_connections.pubsub()
     }
 
+    /// Returns a reference to the pubsub client
     pub fn pubsub_client(&self) -> &pubsub_connection::PubsubClient {
         &self.pubsub_client
     }
 
+    /// Switch the connection to a pub-sub only mode
+    pub fn start_pubsub(&self) -> Result<Value, Error> {
+        let mut info = self.info.write();
+        match info.status {
+            ConnectionStatus::Normal | ConnectionStatus::Pubsub => {
+                info.status = ConnectionStatus::Pubsub;
+                Ok(Value::Ok)
+            }
+            _ => Err(Error::NestedTx),
+        }
+    }
+
+    /// Connection ID
     pub fn id(&self) -> u128 {
         self.id
     }
 
+    /// Drops a multi/transaction and reset the connection
+    ///
+    /// If the connection was not in a MULTI stage an error is thrown.
     pub fn stop_transaction(&self) -> Result<Value, Error> {
-        let info = &mut self.info.write();
+        let mut info = self.info.write();
         if info.status == ConnectionStatus::Multi {
             info.commands = None;
             info.watch_keys.clear();
@@ -86,6 +117,9 @@ impl Connection {
         }
     }
 
+    /// Starts a transaction/multi
+    ///
+    /// Nested transactions are not possible.
     pub fn start_transaction(&self) -> Result<Value, Error> {
         let mut info = self.info.write();
         if info.status == ConnectionStatus::Normal {
@@ -96,17 +130,7 @@ impl Connection {
         }
     }
 
-    pub fn start_pubsub(&self) -> Result<Value, Error> {
-        let mut info = self.info.write();
-        match info.status {
-            ConnectionStatus::Normal | ConnectionStatus::Pubsub => {
-                info.status = ConnectionStatus::Pubsub;
-                Ok(Value::Ok)
-            }
-            _ => Err(Error::NestedTx),
-        }
-    }
-
+    /// Resets the current connection.
     pub fn reset(&self) {
         let mut info = self.info.write();
         info.status = ConnectionStatus::Normal;
@@ -120,10 +144,13 @@ impl Connection {
         pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self);
     }
 
+    /// Returns the status of the connection
     pub fn status(&self) -> ConnectionStatus {
         self.info.read().status
     }
 
+    /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
+    /// some value changed since the moment the command was queued until the execution time.
     pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {
         let watch_keys = &mut self.info.write().watch_keys;
         keys.iter()
@@ -133,6 +160,7 @@ impl Connection {
             .for_each(drop);
     }
 
+    /// Returns true if any of the watched keys changed their value
     pub fn did_keys_change(&self) -> bool {
         let watch_keys = &self.info.read().watch_keys;
 
@@ -145,11 +173,17 @@ impl Connection {
         false
     }
 
+    /// Resets the watched keys list
     pub fn discard_watched_keys(&self) {
-        let watch_keys = &mut self.info.write().watch_keys;
-        watch_keys.clear();
+        self.info.write().watch_keys.clear()
     }
 
+    /// Returns a list of key that are involved in a transaction. These keys will be locked as
+    /// exclusive, even if they don't exists, during the execution of a transction.
+    ///
+    /// The original implementation of Redis does not need this promise because only one
+    /// transaction is executed at a time, in microredis transactions reserve their keys and do not
+    /// prevent other connections to continue modifying the database.
     pub fn get_tx_keys(&self) -> Vec<Bytes> {
         self.info
             .read()
@@ -159,19 +193,22 @@ impl Connection {
             .collect::<Vec<Bytes>>()
     }
 
+    /// Queues a command for later execution
     pub fn queue_command(&self, args: &[Bytes]) {
-        let info = &mut self.info.write();
+        let mut info = self.info.write();
         let commands = info.commands.get_or_insert(vec![]);
         commands.push(args.iter().map(|m| (*m).clone()).collect());
     }
 
+    /// Returns a list of queued commands.
     pub fn get_queue_commands(&self) -> Option<Vec<Vec<Bytes>>> {
-        let info = &mut self.info.write();
+        let mut info = self.info.write();
         info.watch_keys = vec![];
         info.status = ConnectionStatus::ExecutingTx;
         info.commands.take()
     }
 
+    /// Returns a lsit of transaction keys
     pub fn tx_keys(&self, keys: Vec<&Bytes>) {
         #[allow(clippy::mutable_key_type)]
         let tx_keys = &mut self.info.write().tx_keys;
@@ -182,6 +219,8 @@ impl Connection {
             .for_each(drop);
     }
 
+    /// Disconnects from the server, disconnect from all pubsub channels and remove itself from the
+    /// all_connection lists.
     pub fn destroy(self: Arc<Connection>) {
         let pubsub = self.pubsub();
         pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self);
@@ -189,25 +228,24 @@ impl Connection {
         self.all_connections.clone().remove(self);
     }
 
+    /// Returns the all_connections (Connections) instance
     pub fn all_connections(&self) -> Arc<connections::Connections> {
         self.all_connections.clone()
     }
 
+    /// Returns the connection name
     pub fn name(&self) -> Option<String> {
         self.info.read().name.clone()
     }
 
+    /// Sets a connection name
     pub fn set_name(&self, name: String) {
         let mut r = self.info.write();
         r.name = Some(name);
     }
 
-    #[allow(dead_code)]
-    pub fn current_db(&self) -> u32 {
-        self.current_db
-    }
-
-    pub fn info(&self) -> String {
+    /// Returns a string representation of this connection
+    pub fn as_string(&self) -> String {
         format!(
             "id={} addr={} name={:?} db={}\r\n",
             self.id,

+ 16 - 0
src/connection/pubsub_connection.rs

@@ -1,3 +1,6 @@
+//! # Pubsub client
+//!
+//! Each connection has a pubsub client which is created, even on normal connection mode.
 use super::Connection;
 use crate::value::Value;
 use bytes::Bytes;
@@ -6,12 +9,14 @@ use parking_lot::RwLock;
 use std::collections::HashMap;
 use tokio::sync::mpsc;
 
+/// Pubsubclient
 #[derive(Debug)]
 pub struct PubsubClient {
     meta: RwLock<MetaData>,
     sender: mpsc::UnboundedSender<Value>,
 }
 
+/// Metadata associated with a pubsub client
 #[derive(Debug)]
 struct MetaData {
     subscriptions: HashMap<Bytes, bool>,
@@ -21,6 +26,7 @@ struct MetaData {
 }
 
 impl PubsubClient {
+    /// Creates a new pubsub client instance
     pub fn new(sender: mpsc::UnboundedSender<Value>) -> Self {
         Self {
             meta: RwLock::new(MetaData {
@@ -33,6 +39,7 @@ impl PubsubClient {
         }
     }
 
+    /// Unsubscribe from pattern subscriptions
     pub fn punsubscribe(&self, channels: &[Pattern], conn: &Connection) -> u32 {
         let mut meta = self.meta.write();
         channels
@@ -46,6 +53,7 @@ impl PubsubClient {
         conn.pubsub().punsubscribe(channels, conn)
     }
 
+    /// Unsubscribe from channels
     pub fn unsubscribe(&self, channels: &[Bytes], conn: &Connection) -> u32 {
         let mut meta = self.meta.write();
         channels
@@ -59,6 +67,7 @@ impl PubsubClient {
         conn.pubsub().unsubscribe(channels, conn)
     }
 
+    /// Return list of subscriptions for this connection
     pub fn subscriptions(&self) -> Vec<Bytes> {
         self.meta
             .read()
@@ -68,6 +77,7 @@ impl PubsubClient {
             .collect::<Vec<Bytes>>()
     }
 
+    /// Return list of pattern subscriptions
     pub fn psubscriptions(&self) -> Vec<Pattern> {
         self.meta
             .read()
@@ -77,6 +87,7 @@ impl PubsubClient {
             .collect::<Vec<Pattern>>()
     }
 
+    /// Creates a new subscription and returns the ID for this new subscription.
     pub fn new_subscription(&self, channel: &Bytes) -> usize {
         let mut meta = self.meta.write();
         meta.subscriptions.insert(channel.clone(), true);
@@ -84,6 +95,7 @@ impl PubsubClient {
         meta.id
     }
 
+    /// Creates a new pattern subscription and returns the ID for this new subscription.
     pub fn new_psubscription(&self, channel: &Pattern) -> usize {
         let mut meta = self.meta.write();
         meta.psubscriptions.insert(channel.clone(), true);
@@ -91,14 +103,18 @@ impl PubsubClient {
         meta.id
     }
 
+    /// Does this connection has a pattern subscription?
     pub fn is_psubcribed(&self) -> bool {
         self.meta.read().is_psubcribed
     }
 
+    /// Keeps a record about this connection using pattern suscription
     pub fn make_psubcribed(&self) {
         self.meta.write().is_psubcribed = true;
     }
 
+    /// Returns a copy of the pubsub sender. This sender object can be used to send messages (from
+    /// other connections) to this connection.
     pub fn sender(&self) -> mpsc::UnboundedSender<Value> {
         self.sender.clone()
     }

+ 14 - 0
src/connection/pubsub_server.rs

@@ -1,3 +1,6 @@
+//! # Pubsub server
+//!
+//! There is one instance of this mod active per server instance.
 use crate::{connection::Connection, error::Error, value::Value};
 use bytes::Bytes;
 use glob::Pattern;
@@ -8,6 +11,7 @@ use tokio::sync::mpsc;
 type Sender = mpsc::UnboundedSender<Value>;
 type Subscription = HashMap<u128, Sender>;
 
+/// Pubsub global server structure
 #[derive(Debug)]
 pub struct Pubsub {
     subscriptions: RwLock<HashMap<Bytes, Subscription>>,
@@ -16,6 +20,7 @@ pub struct Pubsub {
 }
 
 impl Pubsub {
+    /// Creates a new Pubsub instance
     pub fn new() -> Self {
         Self {
             subscriptions: RwLock::new(HashMap::new()),
@@ -24,14 +29,17 @@ impl Pubsub {
         }
     }
 
+    /// Returns a list of all channels with subscriptions
     pub fn channels(&self) -> Vec<Bytes> {
         self.subscriptions.read().keys().cloned().collect()
     }
 
+    /// Returns numbers of pattern-subscriptions
     pub fn get_number_of_psubscribers(&self) -> i64 {
         *(self.number_of_psubscriptions.read())
     }
 
+    /// Returns numbers of subscribed for given channels
     pub fn get_number_of_subscribers(&self, channels: &[Bytes]) -> Vec<(Bytes, usize)> {
         let subscribers = self.subscriptions.read();
         let mut ret = vec![];
@@ -46,6 +54,7 @@ impl Pubsub {
         ret
     }
 
+    /// Subscribe to patterns
     pub fn psubscribe(&self, channels: &[Bytes], conn: &Connection) -> Result<(), Error> {
         let mut subscriptions = self.psubscriptions.write();
 
@@ -80,6 +89,8 @@ impl Pubsub {
         Ok(())
     }
 
+    /// Publishes a new message. This broadcast to channels subscribers and pattern-subscription
+    /// that matches the published channel.
     pub async fn publish(&self, channel: &Bytes, message: &Bytes) -> u32 {
         let mut i = 0;
 
@@ -115,6 +126,7 @@ impl Pubsub {
         i
     }
 
+    /// Unsubscribe from a pattern subscription
     pub fn punsubscribe(&self, channels: &[Pattern], conn: &Connection) -> u32 {
         let mut all_subs = self.psubscriptions.write();
         let conn_id = conn.id();
@@ -141,6 +153,7 @@ impl Pubsub {
         removed
     }
 
+    /// Subscribe connection to channels
     pub fn subscribe(&self, channels: &[Bytes], conn: &Connection) {
         let mut subscriptions = self.subscriptions.write();
 
@@ -167,6 +180,7 @@ impl Pubsub {
             .for_each(drop);
     }
 
+    /// Removes connection subscription to channels.
     pub fn unsubscribe(&self, channels: &[Bytes], conn: &Connection) -> u32 {
         let mut all_subs = self.subscriptions.write();
         let conn_id = conn.id();

+ 79 - 1
src/db/mod.rs

@@ -1,3 +1,7 @@
+//! # In-Memory database
+//!
+//! This database module is the core of the miniredis project. All other modules around this
+//! database module.
 mod entry;
 mod expiration;
 
@@ -17,6 +21,22 @@ use std::{
 };
 use tokio::time::{Duration, Instant};
 
+/// Databas structure
+///
+/// Each connection has their own clone of the database and the conn_id is stored in each instance.
+/// The entries property is shared for all connections.
+///
+/// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
+/// HashMaps. Each key is presharded and a bucket is selected. By doing this pre-step instead of
+/// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
+/// making this database implementation thread-friendly. The number of slots available cannot be
+/// changed at runtime.
+///
+/// The database is also aware of other connections locking other keys exclusively (for
+/// transactions).
+///
+/// Each entry is wrapped with an entry::Entry struct, which is aware of expirations and data
+/// versioning (in practice the nanosecond of last modification).
 #[derive(Debug)]
 pub struct Db {
     /// A vector of hashmaps.
@@ -49,6 +69,7 @@ pub struct Db {
 }
 
 impl Db {
+    /// Creates a new database instance
     pub fn new(slots: usize) -> Self {
         let mut entries = vec![];
 
@@ -65,6 +86,11 @@ impl Db {
         }
     }
 
+    /// Creates a new Database instance bound to a connection.
+    ///
+    /// This is particular useful when locking keys exclusively.
+    ///
+    /// All the internal data are shjared through an Arc.
     pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Db {
         Self {
             entries: self.entries.clone(),
@@ -76,6 +102,12 @@ impl Db {
     }
 
     #[inline]
+    /// Returns a slot where a key may be hosted.
+    ///
+    /// In order to avoid too much locks, instead of having a single hash a
+    /// database instance is a set of hashes. Each key is pre-shared with a
+    /// quick hashing algorithm to select a 'slot' or HashMap where it may be
+    /// hosted.
     fn get_slot(&self, key: &Bytes) -> usize {
         let id = (hash(key) as usize) % self.entries.len();
         trace!("selected slot {} for key {:?}", id, key);
@@ -97,6 +129,16 @@ impl Db {
         id
     }
 
+    /// Locks keys exclusively
+    ///
+    /// The locked keys are only accesible (read or write) by the connection
+    /// that locked them, any other connection must wait until the locking
+    /// connection releases them.
+    ///
+    /// This is used to simulate redis transactions. Transaction in Redis are
+    /// atomic but pausing a multi threaded Redis just to keep the same promises
+    /// was a bit extreme, that's the reason why a transaction will lock
+    /// exclusively all keys involved.
     pub fn lock_keys(&self, keys: &[Bytes]) {
         let waiting = Duration::from_nanos(100);
         loop {
@@ -119,7 +161,7 @@ impl Db {
 
             if i == keys.len() {
                 // All the involved keys are successfully being blocked
-                // exclusely.
+                // exclusively.
                 break;
             }
 
@@ -129,6 +171,7 @@ impl Db {
         }
     }
 
+    /// Releases the lock on keys
     pub fn unlock_keys(&self, keys: &[Bytes]) {
         let mut lock = self.tx_key_locks.write();
         for key in keys.iter() {
@@ -136,6 +179,10 @@ impl Db {
         }
     }
 
+    /// Increments a key's value by a given number
+    ///
+    /// If the stored value cannot be converted into a number an error will be
+    /// thrown.
     pub fn incr<
         T: ToString + AddAssign + for<'a> TryFrom<&'a Value, Error = Error> + Into<Value> + Copy,
     >(
@@ -165,6 +212,7 @@ impl Db {
         }
     }
 
+    /// Removes any expiration associated with a given key
     pub fn persist(&self, key: &Bytes) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write();
         entries
@@ -180,6 +228,7 @@ impl Db {
             })
     }
 
+    /// Set time to live for a given key
     pub fn set_ttl(&self, key: &Bytes, expires_in: Duration) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write();
         let expires_at = Instant::now() + expires_in;
@@ -194,6 +243,7 @@ impl Db {
             })
     }
 
+    /// Removes keys from the database
     pub fn del(&self, keys: &[Bytes]) -> Value {
         let mut expirations = self.expirations.lock();
 
@@ -207,6 +257,7 @@ impl Db {
             .into()
     }
 
+    /// Check if keys exists in the database
     pub fn exists(&self, keys: &[Bytes]) -> Value {
         let mut matches = 0;
         keys.iter()
@@ -221,6 +272,18 @@ impl Db {
         matches.into()
     }
 
+    /// get_map_or
+    ///
+    /// Instead of returning an entry of the database, to avoid clonning, this function will
+    /// execute a callback function with the entry as a parameter. If no record is found another
+    /// callback function is going to be executed, dropping the lock before doing so.
+    ///
+    /// If an entry is found, the lock is not dropped before doing the callback. Avoid inserting
+    /// new entries. In this case the value is passed by reference, so it is possible to modify the
+    /// entry itself.
+    ///
+    /// This function is useful to read non-scalar values from the database. Non-scalar values are
+    /// forbidden to clone, attempting cloning will endup in an error (Error::WrongType)
     pub fn get_map_or<F1, F2>(&self, key: &Bytes, found: F1, not_found: F2) -> Result<Value, Error>
     where
         F1: FnOnce(&Value) -> Result<Value, Error>,
@@ -239,6 +302,7 @@ impl Db {
         }
     }
 
+    /// Updates the entry version of a given key
     pub fn bump_version(&self, key: &Bytes) -> bool {
         let mut entries = self.entries[self.get_slot(key)].write();
         entries
@@ -250,6 +314,7 @@ impl Db {
             .is_some()
     }
 
+    /// Returns the version of a given key
     pub fn get_version(&self, key: &Bytes) -> u128 {
         let entries = self.entries[self.get_slot(key)].read();
         entries
@@ -259,6 +324,7 @@ impl Db {
             .unwrap_or_else(new_version)
     }
 
+    /// Get a copy of an entry
     pub fn get(&self, key: &Bytes) -> Value {
         let entries = self.entries[self.get_slot(key)].read();
         entries
@@ -267,6 +333,7 @@ impl Db {
             .map_or(Value::Null, |x| x.clone_value())
     }
 
+    /// Get multiple copies of entries
     pub fn get_multi(&self, keys: &[Bytes]) -> Value {
         keys.iter()
             .map(|key| {
@@ -280,6 +347,7 @@ impl Db {
             .into()
     }
 
+    /// Get a key or set a new value for the given key.
     pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write();
         self.expirations.lock().remove(key);
@@ -289,6 +357,7 @@ impl Db {
             .map_or(Value::Null, |x| x.clone_value())
     }
 
+    /// Takes an entry from the database.
     pub fn getdel(&self, key: &Bytes) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write();
         entries.remove(key).map_or(Value::Null, |x| {
@@ -297,6 +366,7 @@ impl Db {
         })
     }
 
+    /// Set a key, value with an optional expiration time
     pub fn set(&self, key: &Bytes, value: Value, expires_in: Option<Duration>) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write();
         let expires_at = expires_in.map(|duration| Instant::now() + duration);
@@ -308,6 +378,7 @@ impl Db {
         Value::Ok
     }
 
+    /// Returns the TTL of a given key
     pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
         let entries = self.entries[self.get_slot(key)].read();
         entries
@@ -316,6 +387,13 @@ impl Db {
             .map(|x| x.get_ttl())
     }
 
+    /// Remove expired entries from the database.
+    ///
+    /// This function should be called from a background thread every few seconds. Calling it more
+    /// often is a waste of resources.
+    ///
+    /// Expired keys are automatically hidden by the database, this process is just claiming back
+    /// the memory from those expired keys.
     pub fn purge(&self) -> u64 {
         let mut expirations = self.expirations.lock();
         let mut removed = 0;

+ 8 - 0
src/dispatcher.rs

@@ -1,3 +1,10 @@
+//! # Dispatcher
+//!
+//! Here is where every command is defined. Each command has some definition and a handler. Their
+//! handler are rust functions.
+//!
+//! Each command is defined with the dispatcher macro, which generates efficient and developer
+//! friendly code.
 use crate::{
     cmd,
     connection::{Connection, ConnectionStatus},
@@ -10,6 +17,7 @@ use std::convert::TryInto;
 use std::time::SystemTime;
 use std::time::UNIX_EPOCH;
 
+/// Returns the server time
 async fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");

+ 18 - 0
src/error.rs

@@ -1,20 +1,38 @@
+//! # Redis Error
+//!
+//! All redis errors are abstracted in this mod.
 use crate::value::Value;
 
+/// Redis errors
 #[derive(Debug, Eq, PartialEq)]
 pub enum Error {
+    /// A command is not found
     CommandNotFound(String),
+    /// Invalid number of arguments
     InvalidArgsCount(String),
+    /// The glob-pattern is not valid
     InvalidPattern(String),
+    /// Internal Error
     Internal,
+    /// Protocol error
     Protocol(String, String),
+    /// Unexpected argument
     WrongArgument(String, String),
+    /// Command not found
     NotFound,
+    /// Index out of range
     OutOfRange,
+    /// The connection is in pubsub only mode and the current command is not compabible.
     PubsubOnly(String),
+    /// Syntax error
     Syntax,
+    /// Byte cannot be converted to a number
     NotANumber,
+    /// The connection is not in a transaction
     NotInTx,
+    /// The connection is in a transaction and nested transactions are not supported
     NestedTx,
+    /// Wrong data type
     WrongType,
 }
 

+ 14 - 0
src/lib.rs

@@ -0,0 +1,14 @@
+//! # Microredis: A multi-threaded redis implementation of Redis
+//!
+//! In-memory database compatible with Redis.
+#![deny(missing_docs)]
+#![allow(warnings)]
+
+pub mod cmd;
+pub mod connection;
+pub mod db;
+pub mod dispatcher;
+pub mod error;
+pub mod macros;
+pub mod server;
+pub mod value;

+ 52 - 6
src/macros.rs

@@ -1,3 +1,16 @@
+//! # Macros
+//!
+//! All macros are defined in this module
+
+/// Dispatcher macro
+///
+/// In this macro all command and their definition are defined.
+///
+/// This macro generates rust code for each command and encapsulates all the logic to fast
+/// execution.
+///
+/// Using macros allow to generate pretty efficient code for run time and easy to extend at
+/// writting time.
 #[macro_export]
 macro_rules! dispatcher {
     {
@@ -16,21 +29,26 @@ macro_rules! dispatcher {
         $($(
             #[allow(non_snake_case, non_camel_case_types)]
             pub mod $command {
+                //! # Command mod
+                //!
+                //! Each individual command is defined in their own namespace
                 use super::*;
                 use async_trait::async_trait;
                 use metered::measure;
 
+                /// Command definition
                 #[derive(Debug)]
                 pub struct Command {
-                    pub tags: &'static [&'static str],
-                    pub min_args: i32,
-                    pub key_start: i32,
-                    pub key_stop: i32,
-                    pub key_step: usize,
-                    pub metrics: Metrics,
+                    tags: &'static [&'static str],
+                    min_args: i32,
+                    key_start: i32,
+                    key_stop: i32,
+                    key_step: usize,
+                    metrics: Metrics,
                 }
 
                 impl Command {
+                    /// Creates a new comamnd
                     pub fn new() -> Self {
                         Self {
                             tags: &[$($tag,)+],
@@ -129,25 +147,35 @@ macro_rules! dispatcher {
         use async_trait::async_trait;
         use metered::{Throughput, HitCount, ErrorCount, InFlight, ResponseTime};
 
+        /// Executable command trait
         #[async_trait]
         pub trait ExecutableCommand {
+            /// Call the command handler
             async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
 
+            /// Returns a reference to the metrics
             fn metrics(&self) -> &Metrics;
 
+            /// Can this command be queued in a transaction or should it be executed right away?
             fn is_queueable(&self) -> bool;
 
+            /// Can this command be executed in a pub-sub only mode?
             fn is_pubsub_executable(&self) -> bool;
 
+            /// Returns all database keys from the command arguments
             fn get_keys<'a>(&self, args: &'a [Bytes]) -> Vec<&'a Bytes>;
 
+            /// Checks if a given number of args is expected by this command
             fn check_number_args(&self, n: usize) -> bool;
 
+            /// Command group
             fn group(&self) -> &'static str;
 
+            /// Command name
             fn name(&self) -> &'static str;
         }
 
+        /// Metric struct for all command
         #[derive(Debug, Default, serde::Serialize)]
         pub struct Metrics {
             hit_count: HitCount,
@@ -157,6 +185,7 @@ macro_rules! dispatcher {
             throughput: Throughput,
         }
 
+        /// Metrics for all defined commands
         #[derive(serde::Serialize)]
         pub struct ServiceMetricRegistry<'a> {
             $($(
@@ -164,6 +193,9 @@ macro_rules! dispatcher {
             )+)+
         }
 
+        /// Dispatcher struct
+        ///
+        /// The list of commands are generated in this strucutre by this macro.
         #[allow(non_snake_case, non_camel_case_types)]
         #[derive(Debug)]
         pub struct Dispatcher {
@@ -173,6 +205,7 @@ macro_rules! dispatcher {
         }
 
         impl Dispatcher {
+            /// Creates a new dispatcher.
             pub fn new() -> Self {
                 Self {
                     $($(
@@ -181,6 +214,7 @@ macro_rules! dispatcher {
                 }
             }
 
+            /// Returns all metrics objects
             pub fn get_service_metric_registry(&self) -> ServiceMetricRegistry {
                 ServiceMetricRegistry {
                     $($(
@@ -189,6 +223,7 @@ macro_rules! dispatcher {
                 }
             }
 
+            /// Returns the handlers for defined commands.
             pub fn get_all_commands(&self) -> Vec<&(dyn ExecutableCommand + Send + Sync + 'static)> {
                 vec![
                 $($(
@@ -197,6 +232,7 @@ macro_rules! dispatcher {
                 ]
             }
 
+            /// Returns a command handler for a given command
             pub fn get_handler_for_command(&self, command: &str) -> Result<&(dyn ExecutableCommand + Send + Sync + 'static), Error> {
                 match command {
                 $($(
@@ -206,6 +242,11 @@ macro_rules! dispatcher {
                 }
             }
 
+            /// Returns the command handler
+            ///
+            /// Before returning the command handler this function will make sure the minimum
+            /// required arguments are provided. This pre-validation ensures each command handler
+            /// has fewer logic when reading the provided arguments.
             pub fn get_handler(&self, args: &[Bytes]) -> Result<&(dyn ExecutableCommand + Send + Sync + 'static), Error> {
                 let command = String::from_utf8_lossy(&args[0]).to_lowercase();
                 let command = self.get_handler_for_command(&command)?;
@@ -219,6 +260,7 @@ macro_rules! dispatcher {
     }
 }
 
+/// Generate code for From/Into a type to a Value
 #[macro_export]
 macro_rules! value_try_from {
     {$type: ty, $value: expr} => {
@@ -232,6 +274,7 @@ macro_rules! value_try_from {
     }
 }
 
+/// Generate code for From/Into a vec of type to a Value::Array
 #[macro_export]
 macro_rules! value_vec_try_from {
     {$type: ty} => {
@@ -243,6 +286,7 @@ macro_rules! value_vec_try_from {
     }
 }
 
+/// Converts an Option<T> to Value. If the option is None Value::Null is returned.
 #[macro_export]
 macro_rules! option {
     {$type: expr} => {
@@ -254,6 +298,7 @@ macro_rules! option {
     }
 }
 
+/// Check if a given command argument in a position $pos is eq to a $command
 #[macro_export]
 macro_rules! check_arg {
     {$args: tt, $pos: tt, $command: tt} => {{
@@ -266,6 +311,7 @@ macro_rules! check_arg {
     }}
 }
 
+/// Convert a stream to a Bytes
 #[macro_export]
 macro_rules! bytes {
     ($content:tt) => {

+ 1 - 9
src/main.rs

@@ -1,12 +1,4 @@
-mod cmd;
-mod connection;
-mod db;
-mod dispatcher;
-mod error;
-mod macros;
-mod server;
-mod value;
-
+use microredis::server;
 use std::{env, error::Error};
 
 #[tokio::main]

+ 19 - 2
src/server.rs

@@ -1,3 +1,7 @@
+//! # Server
+//!
+//! Redis TCP server. This module also includes a simple HTTP server to dump the prometheus
+//! metrics.
 use crate::{
     connection::{connections::Connections, ConnectionStatus},
     db::Db,
@@ -16,6 +20,7 @@ use tokio::{
 use tokio_stream::StreamExt;
 use tokio_util::codec::{Decoder, Encoder, Framed};
 
+/// Redis Parser Encoder/Decoder
 struct RedisParser;
 
 impl Encoder<Value> for RedisParser {
@@ -51,6 +56,10 @@ impl Decoder for RedisParser {
     }
 }
 
+/// Spawn a very simple HTTP server to serve metrics.
+///
+/// The incoming HTTP request is discarded and the response is always the metrics in a prometheus
+/// format
 pub async fn server_metrics(all_connections: Arc<Connections>) {
     info!("Listening on 127.0.0.1:7878 for metrics");
     let listener = tokio::net::TcpListener::bind("127.0.0.1:7878")
@@ -76,7 +85,7 @@ pub async fn server_metrics(all_connections: Arc<Connections>) {
             Some("redis"),
             globals.clone(),
         )
-        .unwrap_or("".to_owned());
+        .unwrap_or_else(|_| "".to_owned());
 
         let response = format!(
             "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
@@ -84,11 +93,19 @@ pub async fn server_metrics(all_connections: Arc<Connections>) {
             serialized
         );
 
-        let _ = stream.write_all(&response.as_bytes()).await;
+        let _ = stream.write_all(response.as_bytes()).await;
         let _ = stream.flush().await;
     }
 }
 
+/// Spawn redis server
+///
+/// Spawn a redis server. This function will create Conections object, the in-memory database, the
+/// purge process and the TCP server.
+///
+/// This process is also listening for any incoming message through the internal pub-sub.
+///
+/// This function will block the main thread and will never exit.
 pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
     let listener = TcpListener::bind(&addr).await?;
     info!("Listening on: {}", addr);

+ 15 - 0
src/value/checksum.rs

@@ -1,3 +1,6 @@
+//! # Checksum value
+//!
+//! Wraps any a structure and makes it faster to compare with each other with a fast checksum.
 use crate::value;
 use bytes::Bytes;
 use crc32fast::Hasher as Crc32Hasher;
@@ -13,18 +16,27 @@ fn calculate_checksum(bytes: &Bytes) -> Option<u32> {
     }
 }
 
+/// Ref
+///
+/// Creates a reference of bytes and calculates the checksum if needed. This is useful to compare
+/// bytes and Value
 pub struct Ref<'a> {
     bytes: &'a Bytes,
     checksum: Option<u32>,
 }
 
 impl<'a> Ref<'a> {
+    /// Creates a new instance
     pub fn new(bytes: &'a Bytes) -> Self {
         let checksum = calculate_checksum(bytes);
         Self { bytes, checksum }
     }
 }
 
+/// Value
+///
+/// Similar to Ref but instead of a reference of bytes it takes the ownership of the bytes. This
+/// object is comparable with a Ref.
 #[derive(Debug, Clone)]
 pub struct Value {
     bytes: Bytes,
@@ -32,15 +44,18 @@ pub struct Value {
 }
 
 impl Value {
+    /// Creates a new instance
     pub fn new(bytes: Bytes) -> Self {
         let checksum = calculate_checksum(&bytes);
         Self { bytes, checksum }
     }
 
+    /// Clone the underlying value
     pub fn clone_value(&self) -> value::Value {
         value::Value::Blob(self.bytes.clone())
     }
 
+    /// Whether it has a checksum or not
     pub fn has_checksum(&self) -> bool {
         self.checksum.is_some()
     }

+ 13 - 0
src/value/locked.rs

@@ -1,5 +1,15 @@
+//! # Locked Value
+//!
+//! Wraps any a structure and makes it read-write lockable. This is a very simple abstraction on
+//! top of a RwLock.
+
 use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
 
+/// Locked Value
+///
+/// This is a very simple data structure to wrap a value behind a read/write lock.
+///
+/// The wrap objects are comparable and clonable.
 #[derive(Debug)]
 pub struct Value<T: Clone + PartialEq>(pub RwLock<T>);
 
@@ -16,14 +26,17 @@ impl<T: PartialEq + Clone> PartialEq for Value<T> {
 }
 
 impl<T: PartialEq + Clone> Value<T> {
+    /// Creates a new instance
     pub fn new(obj: T) -> Self {
         Self(RwLock::new(obj))
     }
 
+    /// Acquire a write lock and return the wrapped Value
     pub fn write(&self) -> RwLockWriteGuard<'_, T> {
         self.0.write()
     }
 
+    /// Acquire a read lock and return the wrapped Value
     pub fn read(&self) -> RwLockReadGuard<'_, T> {
         self.0.read()
     }

+ 24 - 0
src/value/mod.rs

@@ -1,3 +1,6 @@
+//! # Redis Value
+//!
+//! All redis internal data structures and values are absracted in this mod.
 pub mod checksum;
 pub mod locked;
 
@@ -10,21 +13,38 @@ use std::{
     str::FromStr,
 };
 
+/// Redis Value.
+///
+/// This enum represents all data structures that are supported by Redis
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
+    /// Hash. This type cannot be serialized
     Hash(locked::Value<HashMap<Bytes, Bytes>>),
+    /// List. This type cannot be sreialized
     List(locked::Value<VecDeque<checksum::Value>>),
+    /// Set. This type cannot be serialized
     Set(locked::Value<HashSet<Bytes>>),
+    /// Vector/Array of values
     Array(Vec<Value>),
+    /// Bytes/Strings/Binary data
     Blob(Bytes),
+    /// String. This type does not allowe new lines
     String(String),
+    /// An error
     Err(String, String),
+    /// Integer
     Integer(i64),
+    /// Boolean
     Boolean(bool),
+    /// Float number
     Float(f64),
+    /// Big number
     BigInteger(i128),
+    /// Null
     Null,
+    /// The command has been Queued
     Queued,
+    /// Ok
     Ok,
 }
 
@@ -85,6 +105,10 @@ impl TryFrom<&Value> for f64 {
         }
     }
 }
+
+/// Tries to converts bytes data into a number
+///
+/// If the convertion fails a Error::NotANumber error is returned.
 pub fn bytes_to_number<T: FromStr>(bytes: &Bytes) -> Result<T, Error> {
     let x = String::from_utf8_lossy(bytes);
     x.parse::<T>().map_err(|_| Error::NotANumber)