Browse Source

Add multiple databases support (#32)

* Minor improvements

Minor improvements needed to make it easier to add support for multiple
databases.

* Add databases pool

This new struct is going to host all databases in the server. Each
database will have their own Db, to keep the codebase simple.

New methods are going to be added to move and copy databases from
databases atomically.
César D. Rodas 3 years ago
parent
commit
fd6033eb13
10 changed files with 278 additions and 119 deletions
  1. 66 5
      src/cmd/client.rs
  2. 11 8
      src/cmd/mod.rs
  3. 6 2
      src/cmd/string.rs
  4. 8 11
      src/connection/connections.rs
  5. 25 11
      src/connection/mod.rs
  6. 69 71
      src/db/mod.rs
  7. 66 0
      src/db/pool.rs
  8. 9 0
      src/dispatcher/mod.rs
  9. 3 0
      src/error.rs
  10. 15 11
      src/server.rs

+ 66 - 5
src/cmd/client.rs

@@ -1,6 +1,11 @@
 //!  # Client-group command handlers
 //!  # Client-group command handlers
 
 
-use crate::{connection::Connection, error::Error, option, value::Value};
+use crate::{
+    connection::Connection,
+    error::Error,
+    option,
+    value::{bytes_to_number, Value},
+};
 use bytes::Bytes;
 use bytes::Bytes;
 use std::sync::Arc;
 use std::sync::Arc;
 
 
@@ -25,13 +30,13 @@ pub async fn client(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
 
     match sub.to_lowercase().as_str() {
     match sub.to_lowercase().as_str() {
         "id" => Ok((conn.id() as i64).into()),
         "id" => Ok((conn.id() as i64).into()),
-        "info" => Ok(conn.as_string().into()),
+        "info" => Ok(conn.to_string().into()),
         "getname" => Ok(option!(conn.name())),
         "getname" => Ok(option!(conn.name())),
         "list" => {
         "list" => {
-            let mut v: Vec<Value> = vec![];
+            let mut list_client = "".to_owned();
             conn.all_connections()
             conn.all_connections()
-                .iter(&mut |conn: Arc<Connection>| v.push(conn.as_string().into()));
-            Ok(v.into())
+                .iter(&mut |conn: Arc<Connection>| list_client.push_str(&conn.to_string()));
+            Ok(list_client.into())
         }
         }
         "setname" => {
         "setname" => {
             let name = String::from_utf8_lossy(&args[2]).to_string();
             let name = String::from_utf8_lossy(&args[2]).to_string();
@@ -53,6 +58,12 @@ pub async fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(Value::new(&args[1]))
     Ok(Value::new(&args[1]))
 }
 }
 
 
+/// Select the Redis logical database having the specified zero-based numeric
+/// index. New connections always use the database 0.
+pub async fn select(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.selectdb(bytes_to_number(&args[1])?)
+}
+
 /// "ping" command handler
 /// "ping" command handler
 ///
 ///
 /// Documentation:
 /// Documentation:
@@ -73,3 +84,53 @@ pub async fn reset(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
     conn.reset();
     conn.reset();
     Ok(Value::String("RESET".to_owned()))
     Ok(Value::String("RESET".to_owned()))
 }
 }
+
+#[cfg(test)]
+mod test {
+    use crate::{
+        cmd::test::{create_connection, run_command},
+        error::Error,
+        value::Value,
+    };
+
+    #[tokio::test]
+    async fn select() {
+        let c = create_connection();
+        assert_eq!(
+            Ok(Value::Integer(3)),
+            run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await
+        );
+        assert_eq!(
+            Ok(Value::Blob("1".into())),
+            run_command(&c, &["hget", "foo", "f1"]).await
+        );
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "1"]).await);
+        assert_eq!(
+            Ok(Value::Null),
+            run_command(&c, &["hget", "foo", "f1"]).await
+        );
+        assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "0"]).await);
+        assert_eq!(
+            Ok(Value::Blob("1".into())),
+            run_command(&c, &["hget", "foo", "f1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn select_err_0() {
+        let c = create_connection();
+        assert_eq!(
+            Err(Error::NotANumber),
+            run_command(&c, &["select", "-1"]).await
+        );
+    }
+
+    #[tokio::test]
+    async fn select_err_1() {
+        let c = create_connection();
+        assert_eq!(
+            Err(Error::NotSuchDatabase),
+            run_command(&c, &["select", "10000000"]).await
+        );
+    }
+}

+ 11 - 8
src/cmd/mod.rs

@@ -25,7 +25,7 @@ pub fn now() -> Duration {
 mod test {
 mod test {
     use crate::{
     use crate::{
         connection::{connections::Connections, Connection},
         connection::{connections::Connections, Connection},
-        db::Db,
+        db::pool::Databases,
         dispatcher::Dispatcher,
         dispatcher::Dispatcher,
         error::Error,
         error::Error,
         value::Value,
         value::Value,
@@ -38,21 +38,21 @@ mod test {
     use tokio::sync::mpsc::Receiver;
     use tokio::sync::mpsc::Receiver;
 
 
     pub fn create_connection() -> Arc<Connection> {
     pub fn create_connection() -> Arc<Connection> {
-        let db = Arc::new(Db::new(1000));
-        let all_connections = Arc::new(Connections::new(db.clone()));
+        let (default_db, all_dbs) = Databases::new(16, 1000);
+        let all_connections = Arc::new(Connections::new(all_dbs));
 
 
         let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
         let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
 
 
-        all_connections.new_connection(db.clone(), client).1
+        all_connections.new_connection(default_db, client).1
     }
     }
 
 
     pub fn create_connection_and_pubsub() -> (Receiver<Value>, Arc<Connection>) {
     pub fn create_connection_and_pubsub() -> (Receiver<Value>, Arc<Connection>) {
-        let db = Arc::new(Db::new(1000));
-        let all_connections = Arc::new(Connections::new(db.clone()));
+        let (default_db, all_dbs) = Databases::new(16, 1000);
+        let all_connections = Arc::new(Connections::new(all_dbs));
 
 
         let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
         let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
 
 
-        all_connections.new_connection(db.clone(), client)
+        all_connections.new_connection(default_db, client)
     }
     }
 
 
     pub fn create_new_connection_from_connection(
     pub fn create_new_connection_from_connection(
@@ -62,7 +62,10 @@ mod test {
 
 
         let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
         let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
 
 
-        all_connections.new_connection(all_connections.db(), client)
+        all_connections.new_connection(
+            all_connections.get_databases().get(0).expect("DB(0)"),
+            client,
+        )
     }
     }
 
 
     pub async fn run_command(conn: &Connection, cmd: &[&str]) -> Result<Value, Error> {
     pub async fn run_command(conn: &Connection, cmd: &[&str]) -> Result<Value, Error> {

+ 6 - 2
src/cmd/string.rs

@@ -1,8 +1,12 @@
 //! # String command handlers
 //! # String command handlers
 use super::now;
 use super::now;
 use crate::{
 use crate::{
-    check_arg, connection::Connection, db::Override, error::Error, try_get_arg,
-    value::bytes_to_number, value::Value,
+    check_arg,
+    connection::Connection,
+    db::Override,
+    error::Error,
+    try_get_arg,
+    value::{bytes_to_number, Value},
 };
 };
 use bytes::Bytes;
 use bytes::Bytes;
 use std::{
 use std::{

+ 8 - 11
src/connection/connections.rs

@@ -3,7 +3,7 @@
 //! This mod keeps track of all active conections. There is one instance of this mod per running
 //! This mod keeps track of all active conections. There is one instance of this mod per running
 //! server.
 //! server.
 use super::{pubsub_connection::PubsubClient, pubsub_server::Pubsub, Connection, ConnectionInfo};
 use super::{pubsub_connection::PubsubClient, pubsub_server::Pubsub, Connection, ConnectionInfo};
-use crate::{db::Db, dispatcher::Dispatcher, value::Value};
+use crate::{db::pool::Databases, db::Db, dispatcher::Dispatcher, value::Value};
 use parking_lot::RwLock;
 use parking_lot::RwLock;
 use std::{collections::BTreeMap, net::SocketAddr, sync::Arc};
 use std::{collections::BTreeMap, net::SocketAddr, sync::Arc};
 
 
@@ -13,7 +13,7 @@ use tokio::sync::mpsc;
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct Connections {
 pub struct Connections {
     connections: RwLock<BTreeMap<u128, Arc<Connection>>>,
     connections: RwLock<BTreeMap<u128, Arc<Connection>>>,
-    db: Arc<Db>,
+    dbs: Arc<Databases>,
     pubsub: Arc<Pubsub>,
     pubsub: Arc<Pubsub>,
     dispatcher: Arc<Dispatcher>,
     dispatcher: Arc<Dispatcher>,
     counter: RwLock<u128>,
     counter: RwLock<u128>,
@@ -21,20 +21,19 @@ pub struct Connections {
 
 
 impl Connections {
 impl Connections {
     /// Returns a new instance of connections.
     /// Returns a new instance of connections.
-    pub fn new(db: Arc<Db>) -> Self {
+    pub fn new(dbs: Arc<Databases>) -> Self {
         Self {
         Self {
             counter: RwLock::new(0),
             counter: RwLock::new(0),
-            db,
+            dbs,
             pubsub: Arc::new(Pubsub::new()),
             pubsub: Arc::new(Pubsub::new()),
             dispatcher: Arc::new(Dispatcher::new()),
             dispatcher: Arc::new(Dispatcher::new()),
             connections: RwLock::new(BTreeMap::new()),
             connections: RwLock::new(BTreeMap::new()),
         }
         }
     }
     }
 
 
-    /// Returns the database
-    #[allow(dead_code)]
-    pub fn db(&self) -> Arc<Db> {
-        self.db.clone()
+    /// Returns all databases
+    pub fn get_databases(&self) -> Arc<Databases> {
+        self.dbs.clone()
     }
     }
 
 
     /// Returns the dispatcher instance
     /// Returns the dispatcher instance
@@ -66,11 +65,9 @@ impl Connections {
 
 
         let conn = Arc::new(Connection {
         let conn = Arc::new(Connection {
             id: *id,
             id: *id,
-            db: db.new_db_instance(*id),
             addr,
             addr,
             all_connections: self.clone(),
             all_connections: self.clone(),
-            current_db: 0,
-            info: RwLock::new(ConnectionInfo::new()),
+            info: RwLock::new(ConnectionInfo::new(db.new_db_instance(*id))),
             pubsub_client: PubsubClient::new(pubsub_sender),
             pubsub_client: PubsubClient::new(pubsub_sender),
         });
         });
 
 

+ 25 - 11
src/connection/mod.rs

@@ -32,6 +32,8 @@ impl Default for ConnectionStatus {
 /// Connection information
 /// Connection information
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct ConnectionInfo {
 pub struct ConnectionInfo {
+    current_db: usize,
+    db: Arc<Db>,
     name: Option<String>,
     name: Option<String>,
     watch_keys: Vec<(Bytes, u128)>,
     watch_keys: Vec<(Bytes, u128)>,
     tx_keys: HashSet<Bytes>,
     tx_keys: HashSet<Bytes>,
@@ -43,8 +45,6 @@ pub struct ConnectionInfo {
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct Connection {
 pub struct Connection {
     id: u128,
     id: u128,
-    db: Db,
-    current_db: u32,
     all_connections: Arc<connections::Connections>,
     all_connections: Arc<connections::Connections>,
     addr: SocketAddr,
     addr: SocketAddr,
     info: RwLock<ConnectionInfo>,
     info: RwLock<ConnectionInfo>,
@@ -53,10 +53,12 @@ pub struct Connection {
 
 
 impl ConnectionInfo {
 impl ConnectionInfo {
     /// Creates a new connection
     /// Creates a new connection
-    fn new() -> Self {
+    fn new(db: Arc<Db>) -> Self {
         Self {
         Self {
             name: None,
             name: None,
             watch_keys: vec![],
             watch_keys: vec![],
+            db,
+            current_db: 0,
             tx_keys: HashSet::new(),
             tx_keys: HashSet::new(),
             commands: None,
             commands: None,
             status: ConnectionStatus::Normal,
             status: ConnectionStatus::Normal,
@@ -69,8 +71,8 @@ impl Connection {
     ///
     ///
     /// The database object is unique to this connection but most of its internal structure is
     /// The database object is unique to this connection but most of its internal structure is
     /// shared (like the entries).
     /// shared (like the entries).
-    pub fn db(&self) -> &Db {
-        &self.db
+    pub fn db(&self) -> Arc<Db> {
+        self.info.read().db.clone()
     }
     }
 
 
     /// Returns the global pubsub server
     /// Returns the global pubsub server
@@ -165,7 +167,7 @@ impl Connection {
         let watch_keys = &self.info.read().watch_keys;
         let watch_keys = &self.info.read().watch_keys;
 
 
         for key in watch_keys.iter() {
         for key in watch_keys.iter() {
-            if self.db.get_version(&key.0) != key.1 {
+            if self.info.read().db.get_version(&key.0) != key.1 {
                 return true;
                 return true;
             }
             }
         }
         }
@@ -244,14 +246,26 @@ impl Connection {
         r.name = Some(name);
         r.name = Some(name);
     }
     }
 
 
+    /// Changes the current db for the current connection
+    pub fn selectdb(&self, db: usize) -> Result<Value, Error> {
+        let mut info = self.info.write();
+        info.db = self
+            .all_connections
+            .get_databases()
+            .get(db)?
+            .new_db_instance(self.id);
+        info.current_db = db;
+        Ok(Value::Ok)
+    }
+}
+
+impl ToString for Connection {
     /// Returns a string representation of this connection
     /// Returns a string representation of this connection
-    pub fn as_string(&self) -> String {
+    fn to_string(&self) -> String {
+        let info = self.info.read();
         format!(
         format!(
             "id={} addr={} name={:?} db={}\r\n",
             "id={} addr={} name={:?} db={}\r\n",
-            self.id,
-            self.addr,
-            self.info.read().name,
-            self.current_db
+            self.id, self.addr, info.name, info.current_db
         )
         )
     }
     }
 }
 }

+ 69 - 71
src/db/mod.rs

@@ -4,6 +4,7 @@
 //! database module.
 //! database module.
 mod entry;
 mod entry;
 mod expiration;
 mod expiration;
+pub mod pool;
 
 
 use crate::{error::Error, value::Value};
 use crate::{error::Error, value::Value};
 use bytes::{BufMut, Bytes, BytesMut};
 use bytes::{BufMut, Bytes, BytesMut};
@@ -48,15 +49,15 @@ impl Default for Override {
     }
     }
 }
 }
 
 
-/// Databas structure
+/// Database structure
 ///
 ///
 /// Each connection has their own clone of the database and the conn_id is stored in each instance.
 /// Each connection has their own clone of the database and the conn_id is stored in each instance.
-/// The entries property is shared for all connections.
+/// The slots property is shared for all connections.
 ///
 ///
 /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
 /// To avoid lock contention this database is *not* a single HashMap, instead it is a vector of
 /// HashMaps. Each key is presharded and a bucket is selected. By doing this pre-step instead of
 /// HashMaps. Each key is presharded and a bucket is selected. By doing this pre-step instead of
 /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
 /// locking the entire database, only a small portion is locked (shared or exclusively) at a time,
-/// making this database implementation thread-friendly. The number of slots available cannot be
+/// making this database implementation thread-friendly. The number of number_of_slots available cannot be
 /// changed at runtime.
 /// changed at runtime.
 ///
 ///
 /// The database is also aware of other connections locking other keys exclusively (for
 /// The database is also aware of other connections locking other keys exclusively (for
@@ -74,19 +75,21 @@ pub struct Db {
     ///
     ///
     /// Because all operations are always key specific, the key is used to hash
     /// Because all operations are always key specific, the key is used to hash
     /// and select to which HashMap the data might be stored.
     /// and select to which HashMap the data might be stored.
-    entries: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
+    slots: Arc<Vec<RwLock<HashMap<Bytes, Entry>>>>,
 
 
     /// Data structure to store all expiring keys
     /// Data structure to store all expiring keys
     expirations: Arc<Mutex<ExpirationDb>>,
     expirations: Arc<Mutex<ExpirationDb>>,
 
 
     /// Number of HashMaps that are available.
     /// Number of HashMaps that are available.
-    slots: usize,
+    number_of_slots: usize,
 
 
-    /// A Database is attached to a conn_id. The entries and expiration data
-    /// structures are shared between all connections.
+    /// Current connection  ID
     ///
     ///
-    /// This particular database instace is attached to a conn_id, used to block
-    /// all keys in case of a transaction.
+    /// A Database is attached to a conn_id. The slots and expiration data
+    /// structures are shared between all connections, regardless of conn_id.
+    ///
+    /// This particular database instace is attached to a conn_id, which is used
+    /// to lock keys exclusively for transactions and other atomic operations.
     conn_id: u128,
     conn_id: u128,
 
 
     /// HashMap of all blocked keys by other connections. If a key appears in
     /// HashMap of all blocked keys by other connections. If a key appears in
@@ -97,19 +100,17 @@ pub struct Db {
 
 
 impl Db {
 impl Db {
     /// Creates a new database instance
     /// Creates a new database instance
-    pub fn new(slots: usize) -> Self {
-        let mut entries = vec![];
-
-        for _i in 0..slots {
-            entries.push(RwLock::new(HashMap::new()));
-        }
+    pub fn new(number_of_slots: usize) -> Self {
+        let slots = (0..number_of_slots)
+            .map(|_| RwLock::new(HashMap::new()))
+            .collect();
 
 
         Self {
         Self {
-            entries: Arc::new(entries),
+            slots: Arc::new(slots),
             expirations: Arc::new(Mutex::new(ExpirationDb::new())),
             expirations: Arc::new(Mutex::new(ExpirationDb::new())),
             conn_id: 0,
             conn_id: 0,
             tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
             tx_key_locks: Arc::new(RwLock::new(HashMap::new())),
-            slots,
+            number_of_slots,
         }
         }
     }
     }
 
 
@@ -118,14 +119,14 @@ impl Db {
     /// This is particular useful when locking keys exclusively.
     /// This is particular useful when locking keys exclusively.
     ///
     ///
     /// All the internal data are shjared through an Arc.
     /// All the internal data are shjared through an Arc.
-    pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Db {
-        Self {
-            entries: self.entries.clone(),
+    pub fn new_db_instance(self: Arc<Db>, conn_id: u128) -> Arc<Db> {
+        Arc::new(Self {
+            slots: self.slots.clone(),
             tx_key_locks: self.tx_key_locks.clone(),
             tx_key_locks: self.tx_key_locks.clone(),
             expirations: self.expirations.clone(),
             expirations: self.expirations.clone(),
             conn_id,
             conn_id,
-            slots: self.slots,
-        }
+            number_of_slots: self.number_of_slots,
+        })
     }
     }
 
 
     #[inline]
     #[inline]
@@ -136,7 +137,7 @@ impl Db {
     /// quick hashing algorithm to select a 'slot' or HashMap where it may be
     /// quick hashing algorithm to select a 'slot' or HashMap where it may be
     /// hosted.
     /// hosted.
     fn get_slot(&self, key: &Bytes) -> usize {
     fn get_slot(&self, key: &Bytes) -> usize {
-        let id = (hash(key) as usize) % self.entries.len();
+        let id = (hash(key) as usize) % self.number_of_slots;
         trace!("selected slot {} for key {:?}", id, key);
         trace!("selected slot {} for key {:?}", id, key);
 
 
         let waiting = Duration::from_nanos(100);
         let waiting = Duration::from_nanos(100);
@@ -217,8 +218,8 @@ impl Db {
         key: &Bytes,
         key: &Bytes,
         incr_by: T,
         incr_by: T,
     ) -> Result<Value, Error> {
     ) -> Result<Value, Error> {
-        let mut entries = self.entries[self.get_slot(key)].write();
-        match entries.get_mut(key) {
+        let mut slots = self.slots[self.get_slot(key)].write();
+        match slots.get_mut(key) {
             Some(x) => {
             Some(x) => {
                 let value = x.get();
                 let value = x.get();
                 let mut number: T = value.try_into()?;
                 let mut number: T = value.try_into()?;
@@ -230,7 +231,7 @@ impl Db {
                 Ok(number.into())
                 Ok(number.into())
             }
             }
             None => {
             None => {
-                entries.insert(
+                slots.insert(
                     key.clone(),
                     key.clone(),
                     Entry::new(Value::Blob(incr_by.to_string().as_str().into()), None),
                     Entry::new(Value::Blob(incr_by.to_string().as_str().into()), None),
                 );
                 );
@@ -241,8 +242,8 @@ impl Db {
 
 
     /// Removes any expiration associated with a given key
     /// Removes any expiration associated with a given key
     pub fn persist(&self, key: &Bytes) -> Value {
     pub fn persist(&self, key: &Bytes) -> Value {
-        let mut entries = self.entries[self.get_slot(key)].write();
-        entries
+        let mut slots = self.slots[self.get_slot(key)].write();
+        slots
             .get_mut(key)
             .get_mut(key)
             .filter(|x| x.is_valid())
             .filter(|x| x.is_valid())
             .map_or(0.into(), |x| {
             .map_or(0.into(), |x| {
@@ -258,10 +259,10 @@ impl Db {
 
 
     /// Set time to live for a given key
     /// Set time to live for a given key
     pub fn set_ttl(&self, key: &Bytes, expires_in: Duration) -> Value {
     pub fn set_ttl(&self, key: &Bytes, expires_in: Duration) -> Value {
-        let mut entries = self.entries[self.get_slot(key)].write();
+        let mut slots = self.slots[self.get_slot(key)].write();
         let expires_at = Instant::now() + expires_in;
         let expires_at = Instant::now() + expires_in;
 
 
-        entries
+        slots
             .get_mut(key)
             .get_mut(key)
             .filter(|x| x.is_valid())
             .filter(|x| x.is_valid())
             .map_or(0.into(), |x| {
             .map_or(0.into(), |x| {
@@ -278,8 +279,8 @@ impl Db {
     /// command will make sure it holds a string large enough to be able to set
     /// command will make sure it holds a string large enough to be able to set
     /// value at offset.
     /// value at offset.
     pub fn set_range(&self, key: &Bytes, offset: u64, data: &[u8]) -> Result<Value, Error> {
     pub fn set_range(&self, key: &Bytes, offset: u64, data: &[u8]) -> Result<Value, Error> {
-        let mut entries = self.entries[self.get_slot(key)].write();
-        let value = entries.get_mut(key).map(|value| {
+        let mut slots = self.slots[self.get_slot(key)].write();
+        let value = slots.get_mut(key).map(|value| {
             if !value.is_valid() {
             if !value.is_valid() {
                 self.expirations.lock().remove(key);
                 self.expirations.lock().remove(key);
                 value.persist();
                 value.persist();
@@ -302,7 +303,7 @@ impl Db {
                 bytes.resize(length, 0);
                 bytes.resize(length, 0);
                 let writer = &mut bytes[offset as usize..];
                 let writer = &mut bytes[offset as usize..];
                 writer.copy_from_slice(data);
                 writer.copy_from_slice(data);
-                entries.insert(key.clone(), Entry::new(Value::new(&bytes), None));
+                slots.insert(key.clone(), Entry::new(Value::new(&bytes), None));
                 Ok(bytes.len().into())
                 Ok(bytes.len().into())
             }
             }
             _ => Err(Error::WrongType),
             _ => Err(Error::WrongType),
@@ -316,7 +317,7 @@ impl Db {
         keys.iter()
         keys.iter()
             .filter_map(|key| {
             .filter_map(|key| {
                 expirations.remove(key);
                 expirations.remove(key);
-                self.entries[self.get_slot(key)].write().remove(key)
+                self.slots[self.get_slot(key)].write().remove(key)
             })
             })
             .filter(|key| key.is_valid())
             .filter(|key| key.is_valid())
             .count()
             .count()
@@ -328,8 +329,8 @@ impl Db {
         let mut matches = 0;
         let mut matches = 0;
         keys.iter()
         keys.iter()
             .map(|key| {
             .map(|key| {
-                let entries = self.entries[self.get_slot(key)].read();
-                if entries.get(key).is_some() {
+                let slots = self.slots[self.get_slot(key)].read();
+                if slots.get(key).is_some() {
                     matches += 1;
                     matches += 1;
                 }
                 }
             })
             })
@@ -355,14 +356,14 @@ impl Db {
         F1: FnOnce(&Value) -> Result<Value, Error>,
         F1: FnOnce(&Value) -> Result<Value, Error>,
         F2: FnOnce() -> Result<Value, Error>,
         F2: FnOnce() -> Result<Value, Error>,
     {
     {
-        let entries = self.entries[self.get_slot(key)].read();
-        let entry = entries.get(key).filter(|x| x.is_valid()).map(|e| e.get());
+        let slots = self.slots[self.get_slot(key)].read();
+        let entry = slots.get(key).filter(|x| x.is_valid()).map(|e| e.get());
 
 
         if let Some(entry) = entry {
         if let Some(entry) = entry {
             found(entry)
             found(entry)
         } else {
         } else {
             // drop lock
             // drop lock
-            drop(entries);
+            drop(slots);
 
 
             not_found()
             not_found()
         }
         }
@@ -370,8 +371,8 @@ impl Db {
 
 
     /// Updates the entry version of a given key
     /// Updates the entry version of a given key
     pub fn bump_version(&self, key: &Bytes) -> bool {
     pub fn bump_version(&self, key: &Bytes) -> bool {
-        let mut entries = self.entries[self.get_slot(key)].write();
-        entries
+        let mut slots = self.slots[self.get_slot(key)].write();
+        slots
             .get_mut(key)
             .get_mut(key)
             .filter(|x| x.is_valid())
             .filter(|x| x.is_valid())
             .map(|entry| {
             .map(|entry| {
@@ -382,8 +383,8 @@ impl Db {
 
 
     /// Returns the version of a given key
     /// Returns the version of a given key
     pub fn get_version(&self, key: &Bytes) -> u128 {
     pub fn get_version(&self, key: &Bytes) -> u128 {
-        let entries = self.entries[self.get_slot(key)].read();
-        entries
+        let slots = self.slots[self.get_slot(key)].read();
+        slots
             .get(key)
             .get(key)
             .filter(|x| x.is_valid())
             .filter(|x| x.is_valid())
             .map(|entry| entry.version())
             .map(|entry| entry.version())
@@ -392,8 +393,8 @@ impl Db {
 
 
     /// Get a copy of an entry
     /// Get a copy of an entry
     pub fn get(&self, key: &Bytes) -> Value {
     pub fn get(&self, key: &Bytes) -> Value {
-        let entries = self.entries[self.get_slot(key)].read();
-        entries
+        let slots = self.slots[self.get_slot(key)].read();
+        slots
             .get(key)
             .get(key)
             .filter(|x| x.is_valid())
             .filter(|x| x.is_valid())
             .map_or(Value::Null, |x| x.clone_value())
             .map_or(Value::Null, |x| x.clone_value())
@@ -401,8 +402,8 @@ impl Db {
 
 
     /// Get a copy of an entry and modifies the expiration of the key
     /// Get a copy of an entry and modifies the expiration of the key
     pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
     pub fn getex(&self, key: &Bytes, expires_in: Option<Duration>, make_persistent: bool) -> Value {
-        let mut entries = self.entries[self.get_slot(key)].write();
-        entries
+        let mut slots = self.slots[self.get_slot(key)].write();
+        slots
             .get_mut(key)
             .get_mut(key)
             .filter(|x| x.is_valid())
             .filter(|x| x.is_valid())
             .map(|value| {
             .map(|value| {
@@ -423,8 +424,8 @@ impl Db {
     pub fn get_multi(&self, keys: &[Bytes]) -> Value {
     pub fn get_multi(&self, keys: &[Bytes]) -> Value {
         keys.iter()
         keys.iter()
             .map(|key| {
             .map(|key| {
-                let entries = self.entries[self.get_slot(key)].read();
-                entries
+                let slots = self.slots[self.get_slot(key)].read();
+                slots
                     .get(key)
                     .get(key)
                     .filter(|x| x.is_valid() && x.is_clonable())
                     .filter(|x| x.is_valid() && x.is_clonable())
                     .map_or(Value::Null, |x| x.clone_value())
                     .map_or(Value::Null, |x| x.clone_value())
@@ -435,9 +436,9 @@ impl Db {
 
 
     /// Get a key or set a new value for the given key.
     /// Get a key or set a new value for the given key.
     pub fn getset(&self, key: &Bytes, value: Value) -> Value {
     pub fn getset(&self, key: &Bytes, value: Value) -> Value {
-        let mut entries = self.entries[self.get_slot(key)].write();
+        let mut slots = self.slots[self.get_slot(key)].write();
         self.expirations.lock().remove(key);
         self.expirations.lock().remove(key);
-        entries
+        slots
             .insert(key.clone(), Entry::new(value, None))
             .insert(key.clone(), Entry::new(value, None))
             .filter(|x| x.is_valid())
             .filter(|x| x.is_valid())
             .map_or(Value::Null, |x| x.clone_value())
             .map_or(Value::Null, |x| x.clone_value())
@@ -445,8 +446,8 @@ impl Db {
 
 
     /// Takes an entry from the database.
     /// Takes an entry from the database.
     pub fn getdel(&self, key: &Bytes) -> Value {
     pub fn getdel(&self, key: &Bytes) -> Value {
-        let mut entries = self.entries[self.get_slot(key)].write();
-        entries.remove(key).map_or(Value::Null, |x| {
+        let mut slots = self.slots[self.get_slot(key)].write();
+        slots.remove(key).map_or(Value::Null, |x| {
             self.expirations.lock().remove(key);
             self.expirations.lock().remove(key);
             x.clone_value()
             x.clone_value()
         })
         })
@@ -454,10 +455,10 @@ impl Db {
     ///
     ///
     /// Set a key, value with an optional expiration time
     /// Set a key, value with an optional expiration time
     pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
     pub fn append(&self, key: &Bytes, value_to_append: &Bytes) -> Result<Value, Error> {
-        let mut entries = self.entries[self.get_slot(key)].write();
-        let mut entry = entries.get_mut(key).filter(|x| x.is_valid());
+        let mut slots = self.slots[self.get_slot(key)].write();
+        let mut entry = slots.get_mut(key).filter(|x| x.is_valid());
 
 
-        if let Some(entry) = entries.get_mut(key).filter(|x| x.is_valid()) {
+        if let Some(entry) = slots.get_mut(key).filter(|x| x.is_valid()) {
             match entry.get_mut() {
             match entry.get_mut() {
                 Value::Blob(value) => {
                 Value::Blob(value) => {
                     value.put(value_to_append.as_ref());
                     value.put(value_to_append.as_ref());
@@ -466,7 +467,7 @@ impl Db {
                 _ => Err(Error::WrongType),
                 _ => Err(Error::WrongType),
             }
             }
         } else {
         } else {
-            entries.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
+            slots.insert(key.clone(), Entry::new(Value::new(value_to_append), None));
             Ok(value_to_append.len().into())
             Ok(value_to_append.len().into())
         }
         }
     }
     }
@@ -488,8 +489,8 @@ impl Db {
 
 
         if !override_all {
         if !override_all {
             for key in keys.iter() {
             for key in keys.iter() {
-                let entries = self.entries[self.get_slot(key)].read();
-                if entries.get(key).is_some() {
+                let slots = self.slots[self.get_slot(key)].read();
+                if slots.get(key).is_some() {
                     self.unlock_keys(&keys);
                     self.unlock_keys(&keys);
                     return 0.into();
                     return 0.into();
                 }
                 }
@@ -497,8 +498,8 @@ impl Db {
         }
         }
 
 
         for (i, _) in key_values.iter().enumerate().step_by(2) {
         for (i, _) in key_values.iter().enumerate().step_by(2) {
-            let mut entries = self.entries[self.get_slot(&key_values[i])].write();
-            entries.insert(
+            let mut slots = self.slots[self.get_slot(&key_values[i])].write();
+            slots.insert(
                 key_values[i].clone(),
                 key_values[i].clone(),
                 Entry::new(Value::new(&key_values[i + 1]), None),
                 Entry::new(Value::new(&key_values[i + 1]), None),
             );
             );
@@ -528,9 +529,9 @@ impl Db {
         keep_ttl: bool,
         keep_ttl: bool,
         return_previous: bool,
         return_previous: bool,
     ) -> Value {
     ) -> Value {
-        let mut entries = self.entries[self.get_slot(key)].write();
+        let mut slots = self.slots[self.get_slot(key)].write();
         let expires_at = expires_in.map(|duration| Instant::now() + duration);
         let expires_at = expires_in.map(|duration| Instant::now() + duration);
-        let previous = entries.get(key);
+        let previous = slots.get(key);
 
 
         let expires_at = if keep_ttl {
         let expires_at = if keep_ttl {
             if let Some(previous) = previous {
             if let Some(previous) = previous {
@@ -570,7 +571,7 @@ impl Db {
             self.expirations.lock().remove(key);
             self.expirations.lock().remove(key);
         }
         }
 
 
-        entries.insert(key.clone(), Entry::new(value, expires_at));
+        slots.insert(key.clone(), Entry::new(value, expires_at));
 
 
         if let Some(to_return) = to_return {
         if let Some(to_return) = to_return {
             to_return
             to_return
@@ -583,11 +584,8 @@ impl Db {
 
 
     /// Returns the TTL of a given key
     /// Returns the TTL of a given key
     pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
     pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
-        let entries = self.entries[self.get_slot(key)].read();
-        entries
-            .get(key)
-            .filter(|x| x.is_valid())
-            .map(|x| x.get_ttl())
+        let slots = self.slots[self.get_slot(key)].read();
+        slots.get(key).filter(|x| x.is_valid()).map(|x| x.get_ttl())
     }
     }
 
 
     /// Check whether a given key is in the list of keys to be purged or not.
     /// Check whether a given key is in the list of keys to be purged or not.
@@ -614,8 +612,8 @@ impl Db {
 
 
         keys.iter()
         keys.iter()
             .map(|key| {
             .map(|key| {
-                let mut entries = self.entries[self.get_slot(key)].write();
-                if entries.remove(key).is_some() {
+                let mut slots = self.slots[self.get_slot(key)].write();
+                if slots.remove(key).is_some() {
                     trace!("Removed key {:?} due timeout", key);
                     trace!("Removed key {:?} due timeout", key);
                     removed += 1;
                     removed += 1;
                 }
                 }

+ 66 - 0
src/db/pool.rs

@@ -0,0 +1,66 @@
+//! # pool of databases
+//!
+//! Redis and microredis support multiple databases.
+//!
+//! Each database is completely independent from each other. There are a few
+//! commands that allows multiple databases to interact with each other (to move
+//! or copy entires atomically).
+//!
+//! This struct will hold an Arc for each database to share databases between
+//! connections.
+use super::Db;
+use crate::error::Error;
+use std::sync::Arc;
+
+/// Databases
+#[derive(Debug)]
+pub struct Databases {
+    databases: Vec<Arc<Db>>,
+}
+
+impl Databases {
+    /// Creates new pool of databases.
+    ///
+    /// The default database is returned along side the pool
+    pub fn new(databases: usize, number_of_slots: usize) -> (Arc<Db>, Arc<Self>) {
+        let databases = (0..databases)
+            .map(|_| Arc::new(Db::new(number_of_slots)))
+            .collect::<Vec<Arc<Db>>>();
+
+        (databases[0].clone(), Arc::new(Self { databases }))
+    }
+
+    /// Returns a single database or None
+    pub fn get(&self, db: usize) -> Result<Arc<Db>, Error> {
+        self.databases
+            .get(db)
+            .map(|db| db.clone())
+            .ok_or(Error::NotSuchDatabase)
+    }
+}
+
+/// Database iterator
+pub struct DatabasesIterator<'a> {
+    databases: &'a Databases,
+    index: usize,
+}
+
+impl<'a> IntoIterator for &'a Databases {
+    type Item = Arc<Db>;
+    type IntoIter = DatabasesIterator<'a>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        DatabasesIterator {
+            databases: self,
+            index: 0,
+        }
+    }
+}
+
+impl<'a> Iterator for DatabasesIterator<'a> {
+    type Item = Arc<Db>;
+    fn next(&mut self) -> Option<Self::Item> {
+        self.index += 1;
+        self.databases.get(self.index - 1).ok()
+    }
+}

+ 9 - 0
src/dispatcher/mod.rs

@@ -807,6 +807,15 @@ dispatcher! {
             0,
             0,
             false,
             false,
         },
         },
+        select {
+            cmd::client::select,
+            [Flag::Fast Flag::Stale Flag::Loading],
+            2,
+            0,
+            0,
+            0,
+            false,
+        }
     },
     },
     transaction {
     transaction {
         discard {
         discard {

+ 3 - 0
src/error.rs

@@ -32,6 +32,8 @@ pub enum Error {
     NotANumber,
     NotANumber,
     /// The connection is not in a transaction
     /// The connection is not in a transaction
     NotInTx,
     NotInTx,
+    /// The requested database does not exists
+    NotSuchDatabase,
     /// The connection is in a transaction and nested transactions are not supported
     /// The connection is in a transaction and nested transactions are not supported
     NestedTx,
     NestedTx,
     /// Wrong data type
     /// Wrong data type
@@ -59,6 +61,7 @@ impl From<Error> for Value {
             Error::OutOfRange => "index out of range".to_owned(),
             Error::OutOfRange => "index out of range".to_owned(),
             Error::Syntax => "syntax error".to_owned(),
             Error::Syntax => "syntax error".to_owned(),
             Error::NotFound => "no such key".to_owned(),
             Error::NotFound => "no such key".to_owned(),
+            Error::NotSuchDatabase => "DB index is out of range".to_owned(),
             Error::NestedTx => "calls can not be nested".to_owned(),
             Error::NestedTx => "calls can not be nested".to_owned(),
             Error::PubsubOnly(x) => format!("Can't execute '{}': only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context", x),
             Error::PubsubOnly(x) => format!("Can't execute '{}': only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context", x),
             Error::WrongArgument(x, y) => format!(
             Error::WrongArgument(x, y) => format!(

+ 15 - 11
src/server.rs

@@ -4,7 +4,7 @@
 //! metrics.
 //! metrics.
 use crate::{
 use crate::{
     connection::{connections::Connections, ConnectionStatus},
     connection::{connections::Connections, ConnectionStatus},
-    db::Db,
+    db::pool::Databases,
     value::Value,
     value::Value,
 };
 };
 use bytes::{Buf, Bytes, BytesMut};
 use bytes::{Buf, Bytes, BytesMut};
@@ -110,17 +110,21 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
     let listener = TcpListener::bind(&addr).await?;
     let listener = TcpListener::bind(&addr).await?;
     info!("Listening on: {}", addr);
     info!("Listening on: {}", addr);
 
 
-    let db = Arc::new(Db::new(1000));
-    let all_connections = Arc::new(Connections::new(db.clone()));
+    let (default_db, all_dbs) = Databases::new(16, 1000);
+    let all_connections = Arc::new(Connections::new(all_dbs.clone()));
     let all_connections_for_metrics = all_connections.clone();
     let all_connections_for_metrics = all_connections.clone();
 
 
-    let db_for_purging = db.clone();
-    tokio::spawn(async move {
-        loop {
-            db_for_purging.purge();
-            sleep(Duration::from_millis(5000)).await;
-        }
-    });
+    all_dbs
+        .into_iter()
+        .map(|db_for_purging| {
+            tokio::spawn(async move {
+                loop {
+                    db_for_purging.purge();
+                    sleep(Duration::from_millis(5000)).await;
+                }
+            });
+        })
+        .for_each(drop);
 
 
     tokio::spawn(async move {
     tokio::spawn(async move {
         server_metrics(all_connections_for_metrics.clone()).await;
         server_metrics(all_connections_for_metrics.clone()).await;
@@ -130,7 +134,7 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
         match listener.accept().await {
         match listener.accept().await {
             Ok((socket, addr)) => {
             Ok((socket, addr)) => {
                 let all_connections = all_connections.clone();
                 let all_connections = all_connections.clone();
-                let (mut pubsub, conn) = all_connections.new_connection(db.clone(), addr);
+                let (mut pubsub, conn) = all_connections.new_connection(default_db.clone(), addr);
 
 
                 tokio::spawn(async move {
                 tokio::spawn(async move {
                     let mut transport = Framed::new(socket, RedisParser);
                     let mut transport = Framed::new(socket, RedisParser);