Browse Source

BLMOVE / BRPOPLPUSH (#51)

* Add support for BLMOVE / BRPOPLPUSH

* Experiment with a better blocked version of the commands

The blocked versions must return right away to the main loop to detect
sudden disconnections. On disconnections the blocked command must be
stopped.

The current implementation is naive and it will never release unless the
client disconnects gracefully.

* Make BLMOVE atomic

Lock both keys to alter. Also fixed a bug when the key is already
blocked by the same connection.

* Add block tests

* Fix unit tests

Unit tests were broke after the block workers updates

* Run unit tests on the pipeline
César D. Rodas 2 years ago
parent
commit
47a2d1aab9
10 changed files with 615 additions and 116 deletions
  1. 4 1
      Makefile
  2. 254 89
      src/cmd/list.rs
  3. 6 3
      src/cmd/server.rs
  4. 14 0
      src/connection/connections.rs
  5. 26 1
      src/connection/mod.rs
  6. 2 0
      src/db/mod.rs
  7. 18 0
      src/dispatcher/mod.rs
  8. 46 20
      src/server.rs
  9. 3 2
      src/value/mod.rs
  10. 242 0
      tests/unit/type/list.tcl

+ 4 - 1
Makefile

@@ -17,6 +17,9 @@ test-single: build
 		--tags -consistency \
 		--tags -cli \
 		--tags -needs:config-maxmemory
+unit-test:
+	cargo test --release
+
 test: build
 	./runtest  --clients 5 \
 		--skipunit unit/dump \
@@ -53,4 +56,4 @@ test: build
 		--tags -consistency \
 		--tags -cli \
 		--tags -needs:config-maxmemory
-ci: fmt clippy build test
+ci: fmt clippy build unit-test test

+ 254 - 89
src/cmd/list.rs

@@ -14,6 +14,8 @@ use std::collections::VecDeque;
 use tokio::time::{sleep, Duration, Instant};
 
 #[allow(clippy::needless_range_loop)]
+/// Removes an element from a list
+#[inline]
 fn remove_element(
     conn: &Connection,
     key: &Bytes,
@@ -70,49 +72,29 @@ fn remove_element(
     Ok(result)
 }
 
-/// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
-/// the connection when there are no elements to pop from any of the given lists. An element is
-/// popped from the head of the first list that is non-empty, with the given keys being checked in
-/// the order that they are given.
-pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let timeout = parse_timeout(&args[args.len() - 1])?;
-    let len = args.len() - 1;
-
-    conn.block();
-
-    loop {
-        for key in args[1..len].iter() {
-            match remove_element(conn, key, None, true)? {
-                Value::Null => (),
-                n => return Ok(vec![Value::new(&key), n].into()),
-            };
-        }
-
-        if let Some(timeout) = timeout {
-            if Instant::now() >= timeout {
-                conn.unblock(UnblockReason::Timeout);
-                break;
-            }
-        }
-
-        if conn.status() == ConnectionStatus::ExecutingTx {
+#[inline]
+/// Handles the timeout/sleep logic for all blocking commands.
+async fn handle_timeout(conn: &Connection, timeout: Option<Instant>) -> Result<bool, Error> {
+    if let Some(timeout) = timeout {
+        if Instant::now() >= timeout {
             conn.unblock(UnblockReason::Timeout);
-            break;
+            return Ok(true);
         }
+    }
 
-        if let Some(reason) = conn.is_unblocked() {
-            return match reason {
-                UnblockReason::Error => Err(Error::UnblockByError),
-                UnblockReason::Timeout => Ok(Value::Null),
-            };
+    if let Some(reason) = conn.has_been_unblocked_externally() {
+        match reason {
+            UnblockReason::Error => Err(Error::UnblockByError),
+            _ => Ok(true),
         }
-
-        sleep(Duration::from_millis(100)).await;
+    } else {
+        sleep(Duration::from_millis(5)).await;
+        Ok(false)
     }
-
-    Ok(Value::Null)
 }
 
+/// Parses timeout and returns an instant or none if it should wait forever.
+#[inline]
 fn parse_timeout(arg: &Bytes) -> Result<Option<Instant>, Error> {
     let raw_timeout = bytes_to_number::<f64>(arg)?;
     if raw_timeout < 0f64 {
@@ -125,51 +107,202 @@ fn parse_timeout(arg: &Bytes) -> Result<Option<Instant>, Error> {
 
     Ok(Some(
         Instant::now()
-            .checked_add(Duration::from_micros((raw_timeout * 1000f64).round() as u64))
+            .checked_add(Duration::from_millis(
+                (raw_timeout * 1_000f64).round() as u64
+            ))
             .unwrap_or_else(far_future),
     ))
 }
 
-/// BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks
+/// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
 /// the connection when there are no elements to pop from any of the given lists. An element is
-/// popped from the tail of the first list that is non-empty, with the given keys being checked in
+/// popped from the head of the first list that is non-empty, with the given keys being checked in
 /// the order that they are given.
-pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let timeout = parse_timeout(&args[args.len() - 1])?;
-    let len = args.len() - 1;
-
-    conn.block();
-
-    loop {
-        for key in args[1..len].iter() {
-            match remove_element(conn, key, None, false)? {
+pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let blpop_task = |conn: &Connection, args: &[Bytes]| -> Result<Value, Error> {
+        for key in (1..args.len() - 1) {
+            let key = &args[key];
+            match remove_element(&conn, key, None, true)? {
                 Value::Null => (),
                 n => return Ok(vec![Value::new(&key), n].into()),
             };
         }
+        Ok(Value::Null)
+    };
+
+    if conn.is_executing_tx() {
+        return blpop_task(conn, args);
+    }
+
+    let timeout = parse_timeout(&args[args.len() - 1])?;
+    let conn = conn.clone();
+    let args = args.to_vec();
 
-        if let Some(timeout) = timeout {
-            if Instant::now() >= timeout {
-                conn.unblock(UnblockReason::Timeout);
-                break;
+    conn.block();
+
+    tokio::spawn(async move {
+        loop {
+            match blpop_task(&conn, &args) {
+                Ok(Value::Null) => {}
+                Ok(x) => {
+                    conn.append_response(x);
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+            }
+
+            match handle_timeout(&conn, timeout).await {
+                Ok(true) => {
+                    conn.append_response(Value::Null);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    break;
+                }
+                _ => {}
             }
         }
-        if conn.status() == ConnectionStatus::ExecutingTx {
-            conn.unblock(UnblockReason::Timeout);
-            break;
-        }
+    });
+
+    Ok(Value::Ignore)
+}
+
+/// BLMOVE is the blocking variant of LMOVE. When source contains elements, this
+/// command behaves exactly like LMOVE. When used inside a MULTI/EXEC block,
+/// this command behaves exactly like LMOVE. When source is empty, Redis will
+/// block the connection until another client pushes to it or until timeout (a
+/// double value specifying the maximum number of seconds to block) is reached.
+/// A timeout of zero can be used to block indefinitely.
+///
+/// This command comes in place of the now deprecated BRPOPLPUSH. Doing BLMOVE
+/// RIGHT LEFT is equivalent.
+///
+/// See LMOVE for more information.
+pub async fn blmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    if conn.is_executing_tx() {
+        return lmove(&conn, &args).await;
+    }
 
-        if let Some(reason) = conn.is_unblocked() {
-            return match reason {
-                UnblockReason::Error => Err(Error::UnblockByError),
-                UnblockReason::Timeout => Ok(Value::Null),
+    let timeout = parse_timeout(&args[5])?;
+    conn.block();
+
+    let conn = conn.clone();
+    let args = args.to_vec();
+    tokio::spawn(async move {
+        loop {
+            match lmove(&conn, &args).await {
+                Ok(Value::Null) => (),
+                Ok(n) => {
+                    conn.append_response(n);
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
             };
+            match handle_timeout(&conn, timeout).await {
+                Ok(true) => {
+                    conn.append_response(Value::Null);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    break;
+                }
+                _ => {}
+            }
         }
+    });
+
+    Ok(Value::Ignore)
+}
+
+/// BRPOPLPUSH is the blocking variant of RPOPLPUSH. When source contains
+/// elements, this command behaves exactly like RPOPLPUSH. When used inside a
+/// MULTI/EXEC block, this command behaves exactly like RPOPLPUSH. When source
+/// is empty, Redis will block the connection until another client pushes to it
+/// or until timeout is reached. A timeout of zero can be used to block
+/// indefinitely.
+pub async fn brpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    blmove(
+        conn,
+        &[
+            "blmove".into(),
+            args[1].clone(),
+            args[2].clone(),
+            "RIGHT".into(),
+            "LEFT".into(),
+            args[3].clone(),
+        ],
+    )
+    .await
+}
 
-        sleep(Duration::from_millis(100)).await;
+/// BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks
+/// the connection when there are no elements to pop from any of the given lists. An element is
+/// popped from the tail of the first list that is non-empty, with the given keys being checked in
+/// the order that they are given.
+pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let brpop_task = |conn: &Connection, args: &[Bytes]| -> Result<Value, Error> {
+        for key in (1..args.len() - 1) {
+            let key = &args[key];
+            match remove_element(&conn, key, None, false)? {
+                Value::Null => (),
+                n => return Ok(vec![Value::new(&key), n].into()),
+            };
+        }
+        Ok(Value::Null)
+    };
+    if conn.is_executing_tx() {
+        return brpop_task(conn, args);
     }
 
-    Ok(Value::Null)
+    let timeout = parse_timeout(&args[args.len() - 1])?;
+    let conn = conn.clone();
+    let args = args.to_vec();
+
+    conn.block();
+
+    tokio::spawn(async move {
+        loop {
+            match brpop_task(&conn, &args) {
+                Ok(Value::Null) => {}
+                Ok(x) => {
+                    conn.append_response(x);
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+            }
+
+            match handle_timeout(&conn, timeout).await {
+                Ok(true) => {
+                    conn.append_response(Value::Null);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    break;
+                }
+                _ => {}
+            }
+        }
+    });
+
+    Ok(Value::Ignore)
 }
 
 /// Returns the element at index index in the list stored at key. The index is zero-based, so 0
@@ -286,7 +419,12 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         return Err(Error::Syntax);
     };
 
-    let result = conn.db().get_map_or(
+    let db = conn.db();
+
+    /// Lock keys to alter exclusively
+    db.lock_keys(&args[1..=2]);
+
+    let result = db.get_map_or(
         &args[1],
         |v| match v {
             Value::List(source) => conn.db().get_map_or(
@@ -314,10 +452,11 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
                     _ => Err(Error::WrongType),
                 },
                 || {
+                    let mut source = source.write();
                     let element = if source_is_left {
-                        source.write().pop_front()
+                        source.pop_front()
                     } else {
-                        source.write().pop_back()
+                        source.pop_back()
                     };
 
                     if let Some(element) = element {
@@ -334,11 +473,17 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             _ => Err(Error::WrongType),
         },
         || Ok(Value::Null),
-    )?;
+    );
 
-    conn.db().bump_version(&args[1]);
+    /// release the lock on keys
+    db.unlock_keys(&args[1..=2]);
 
-    Ok(result)
+    if result != Ok(Value::Null) {
+        conn.db().bump_version(&args[1]);
+        conn.db().bump_version(&args[2]);
+    }
+
+    result
 }
 
 /// Removes and returns the first elements of the list stored at key.
@@ -772,7 +917,7 @@ pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 #[cfg(test)]
 mod test {
     use crate::{
-        cmd::test::{create_connection, run_command},
+        cmd::test::{create_connection, create_connection_and_pubsub, run_command},
         error::Error,
         value::Value,
     };
@@ -780,7 +925,7 @@ mod test {
 
     #[tokio::test]
     async fn blpop_no_waiting() {
-        let c = create_connection();
+        let (mut recv, c) = create_connection_and_pubsub();
 
         assert_eq!(
             Ok(Value::Integer(5)),
@@ -788,30 +933,37 @@ mod test {
         );
 
         assert_eq!(
-            Ok(Value::Array(vec![
+            Ok(Value::Ignore),
+            run_command(&c, &["blpop", "foo", "1"]).await
+        );
+
+        assert_eq!(
+            Some(Value::Array(vec![
                 Value::Blob("foo".into()),
                 Value::Blob("5".into()),
             ])),
-            run_command(&c, &["blpop", "foo", "1"]).await
+            recv.recv().await
         );
     }
 
     #[tokio::test]
     async fn blpop_timeout() {
-        let c = create_connection();
+        let (mut recv, c) = create_connection_and_pubsub();
         let x = Instant::now();
 
         assert_eq!(
-            Ok(Value::Null),
+            Ok(Value::Ignore),
             run_command(&c, &["blpop", "foobar", "1"]).await
         );
 
-        assert!(Instant::now() - x <= Duration::from_millis(1000));
+        assert_eq!(Some(Value::Null), recv.recv().await,);
+
+        assert!(Instant::now() - x >= Duration::from_millis(1000));
     }
 
     #[tokio::test]
     async fn blpop_wait_insert() {
-        let c = create_connection();
+        let (mut recv, c) = create_connection_and_pubsub();
         let x = Instant::now();
 
         // Query command that will block connection until some data is inserted
@@ -819,7 +971,10 @@ mod test {
         //
         // We are issuing the command, sleeping a little bit then adding data to
         // foobar, before actually waiting on the result.
-        let waiting = run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]);
+        assert_eq!(
+            Ok(Value::Ignore),
+            run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]).await
+        );
 
         // Sleep 1 second before inserting new data
         sleep(Duration::from_millis(1000)).await;
@@ -831,11 +986,11 @@ mod test {
 
         // Read the output of the first blpop command now.
         assert_eq!(
-            Ok(Value::Array(vec![
+            Some(Value::Array(vec![
                 Value::Blob("foo".into()),
                 Value::Blob("5".into()),
             ])),
-            waiting.await
+            recv.recv().await,
         );
 
         assert!(Instant::now() - x > Duration::from_millis(1000));
@@ -960,7 +1115,7 @@ mod test {
 
     #[tokio::test]
     async fn brpop_no_waiting() {
-        let c = create_connection();
+        let (mut recv, c) = create_connection_and_pubsub();
 
         assert_eq!(
             Ok(Value::Integer(5)),
@@ -968,30 +1123,37 @@ mod test {
         );
 
         assert_eq!(
-            Ok(Value::Array(vec![
+            Ok(Value::Ignore),
+            run_command(&c, &["brpop", "foo", "1"]).await
+        );
+
+        assert_eq!(
+            Some(Value::Array(vec![
                 Value::Blob("foo".into()),
                 Value::Blob("5".into()),
             ])),
-            run_command(&c, &["brpop", "foo", "1"]).await
+            recv.recv().await,
         );
     }
 
     #[tokio::test]
     async fn brpop_timeout() {
-        let c = create_connection();
+        let (mut recv, c) = create_connection_and_pubsub();
         let x = Instant::now();
 
         assert_eq!(
-            Ok(Value::Null),
+            Ok(Value::Ignore),
             run_command(&c, &["brpop", "foobar", "1"]).await
         );
 
-        assert!(Instant::now() - x < Duration::from_millis(1000));
+        assert_eq!(Some(Value::Null), recv.recv().await,);
+
+        assert!(Instant::now() - x >= Duration::from_millis(1000));
     }
 
     #[tokio::test]
     async fn brpop_wait_insert() {
-        let c = create_connection();
+        let (mut recv, c) = create_connection_and_pubsub();
         let x = Instant::now();
 
         // Query command that will block connection until some data is inserted
@@ -999,7 +1161,10 @@ mod test {
         //
         // We are issuing the command, sleeping a little bit then adding data to
         // foobar, before actually waiting on the result.
-        let waiting = run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]);
+        assert_eq!(
+            Ok(Value::Ignore),
+            run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]).await
+        );
 
         // Sleep 1 second before inserting new data
         sleep(Duration::from_millis(1000)).await;
@@ -1011,11 +1176,11 @@ mod test {
 
         // Read the output of the first blpop command now.
         assert_eq!(
-            Ok(Value::Array(vec![
+            Some(Value::Array(vec![
                 Value::Blob("foo".into()),
                 Value::Blob("5".into()),
             ])),
-            waiting.await
+            recv.recv().await,
         );
 
         assert!(Instant::now() - x > Duration::from_millis(1000));

+ 6 - 3
src/cmd/server.rs

@@ -79,12 +79,15 @@ pub async fn debug(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
 /// The INFO command returns information and statistics about the server in a
 /// format that is simple to parse by computers and easy to read by humans.
-pub async fn info(_: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+pub async fn info(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+    let connections = conn.all_connections();
     Ok(Value::Blob(
         format!(
-            "redis_version: {}\r\nredis_git_sha1:{}\r\n",
+            "redis_version: {}\r\nredis_git_sha1:{}\r\n\r\nconnected_clients:{}\r\nblocked_clients:{}\r\n",
             git_version!(),
-            git_version!()
+            git_version!(),
+            connections.total_connections(),
+            connections.total_blocked_connections(),
         )
         .as_str()
         .into(),

+ 14 - 0
src/connection/connections.rs

@@ -79,6 +79,20 @@ impl Connections {
         self.connections.read().get(&conn_id).cloned()
     }
 
+    /// Return total number of connections
+    pub fn total_connections(&self) -> usize {
+        self.connections.read().len()
+    }
+
+    /// REturn total number of blocked connections
+    pub fn total_blocked_connections(&self) -> usize {
+        self.connections
+            .read()
+            .iter()
+            .map(|(_, conn)| if conn.is_blocked() { 1 } else { 0 })
+            .sum()
+    }
+
     /// Iterates over all connections
     pub fn iter(&self, f: &mut dyn FnMut(Arc<Connection>)) {
         for (_, value) in self.connections.read().iter() {

+ 26 - 1
src/connection/mod.rs

@@ -38,6 +38,8 @@ pub enum UnblockReason {
     Timeout,
     /// Throw an error
     Error,
+    /// Operation finished successfully
+    Finished,
 }
 
 /// Connection information
@@ -90,6 +92,13 @@ impl Connection {
         self.info.read().db.clone()
     }
 
+    /// Creates a clone connection
+    pub fn clone(&self) -> Arc<Connection> {
+        self.all_connections
+            .get_by_conn_id(self.id)
+            .expect("Connection must be registered")
+    }
+
     /// Returns the global pubsub server
     pub fn pubsub(&self) -> Arc<Pubsub> {
         self.all_connections.pubsub()
@@ -138,11 +147,19 @@ impl Connection {
     }
 
     /// If the current connection has been externally unblocked
-    pub fn is_unblocked(&self) -> Option<UnblockReason> {
+    #[inline]
+    pub fn has_been_unblocked_externally(&self) -> Option<UnblockReason> {
         self.info.read().unblock_reason
     }
 
+    /// Is the current connection blocked?
+    #[inline]
+    pub fn is_blocked(&self) -> bool {
+        self.info.read().is_blocked
+    }
+
     /// Connection ID
+    #[inline]
     pub fn id(&self) -> u128 {
         self.id
     }
@@ -207,10 +224,17 @@ impl Connection {
     }
 
     /// Returns the status of the connection
+    #[inline]
     pub fn status(&self) -> ConnectionStatus {
         self.info.read().status
     }
 
+    /// Is connection executing transaction?
+    #[inline]
+    pub fn is_executing_tx(&self) -> bool {
+        self.info.read().status == ConnectionStatus::ExecutingTx
+    }
+
     /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
     /// some value changed since the moment the command was queued until the execution time.
     pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {
@@ -285,6 +309,7 @@ impl Connection {
     /// all_connection lists.
     pub fn destroy(self: Arc<Connection>) {
         let pubsub = self.pubsub();
+        self.clone().unblock(UnblockReason::Timeout);
         pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self, false);
         pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self, false);
         self.all_connections.clone().remove(self);

+ 2 - 0
src/db/mod.rs

@@ -172,6 +172,7 @@ impl Db {
                 if let Some(blocker) = lock.get(key) {
                     if *blocker == self.conn_id {
                         // It is blocked by us already.
+                        i += 1;
                         continue;
                     }
                     // It is blocked by another tx, we need to break
@@ -710,6 +711,7 @@ impl Db {
     }
 
     /// Returns the version of a given key
+    #[inline]
     pub fn get_version(&self, key: &Bytes) -> u128 {
         let slot = self.slots[self.get_slot(key)].read();
         slot.get(key)

+ 18 - 0
src/dispatcher/mod.rs

@@ -196,6 +196,24 @@ dispatcher! {
             1,
             true,
         },
+        BRPOPLPUSH {
+            cmd::list::brpoplpush,
+            [Flag::Write Flag::NoScript],
+            4,
+            1,
+            2,
+            1,
+            true,
+        },
+        BLMOVE {
+            cmd::list::blmove,
+            [Flag::Write Flag::NoScript],
+            6,
+            1,
+            2,
+            1,
+            true,
+        },
         LINDEX {
             cmd::list::lindex,
             [Flag::ReadOnly],

+ 46 - 20
src/server.rs

@@ -6,6 +6,7 @@ use crate::{
     config::Config,
     connection::{connections::Connections, Connection, ConnectionStatus},
     db::{pool::Databases, Db},
+    dispatcher::Dispatcher,
     error::Error,
     value::Value,
 };
@@ -171,6 +172,20 @@ async fn serve_unixsocket(
     Ok(())
 }
 
+#[inline]
+async fn execute_command(
+    conn: &Connection,
+    dispatcher: &Dispatcher,
+    args: &[Bytes],
+) -> Option<Value> {
+    match dispatcher.execute(&conn, args).await {
+        Ok(result) => Some(result),
+        Err(Error::EmptyLine) => Some(Value::Ignore),
+        Err(Error::Quit) => None,
+        Err(err) => Some(err.into()),
+    }
+}
+
 /// Handles a new connection
 ///
 /// The new connection can be created from a new TCP or Unix stream.
@@ -182,37 +197,48 @@ async fn handle_new_connection<T: AsyncReadExt + AsyncWriteExt + Unpin, A: ToStr
     addr: A,
 ) {
     let (mut pubsub, conn) = all_connections.new_connection(default_db, addr);
+    let dispatcher = all_connections.get_dispatcher();
+    // Commands are being buffered when the client is blocked.
+    let mut buffered_commands: Vec<Vec<Bytes>> = vec![];
     trace!("New connection {}", conn.id());
 
     loop {
         tokio::select! {
             Some(msg) = pubsub.recv() => {
+                // Pub-sub message
                 if transport.send(msg).await.is_err() {
                     break;
                 }
+                'outer: for args in buffered_commands.iter() {
+                    // Client sent commands while the connection was blocked,
+                    // now it is time to process them one by one
+                    match execute_command(&conn, &dispatcher, &args).await {
+                        Some(result) => if result != Value::Ignore && transport.send(result).await.is_err() {
+                            break 'outer;
+                        },
+                        None => {
+                            let _ = transport.send(Value::Ok).await;
+                            break 'outer;
+                        }
+                    }
+                }
+                buffered_commands.clear();
             }
             result = transport.next() => match result {
-                Some(Ok(args)) => match all_connections.get_dispatcher().execute(&conn, &args).await {
-                    Ok(result) => {
-                        if result == Value::Ignore {
+                Some(Ok(args)) => {
+                        if conn.is_blocked() {
+                            buffered_commands.push(args.clone());
                             continue;
                         }
-                        if transport.send(result).await.is_err() {
-                            break;
-                        }
-                    },
-                    Err(Error::EmptyLine) => {
-                        // do nothing
-                    },
-                    Err(Error::Quit) => {
-                        let _ = transport.send(Value::Ok).await;
-                        break;
-                    }
-                    Err(err) => {
-                        if transport.send(err.into()).await.is_err() {
-                            break;
-                        }
-                    }
+                        match execute_command(&conn, &dispatcher, &args).await {
+                            Some(result) => if result != Value::Ignore && transport.send(result).await.is_err() {
+                               break;
+                            },
+                            None => {
+                                let _ = transport.send(Value::Ok).await;
+                                break;
+                            }
+                        };
                 },
                 Some(Err(e)) => {
                     warn!("error on decoding from socket; error = {:?}", e);
@@ -227,7 +253,7 @@ async fn handle_new_connection<T: AsyncReadExt + AsyncWriteExt + Unpin, A: ToStr
 
 /// Spawn redis server
 ///
-/// Spawn a redis server. This function will create Conections object, the in-memory database, the
+/// Spawn a redis server. This function will create Connections object, the in-memory database, the
 /// purge process and the TCP server.
 ///
 /// This process is also listening for any incoming message through the internal pub-sub.

+ 3 - 2
src/value/mod.rs

@@ -1,6 +1,6 @@
 //! # Redis Value
 //!
-//! All redis internal data structures and values are absracted in this mod.
+//! All redis internal data structures and values are abstracted in this mod.
 pub mod checksum;
 pub mod cursor;
 pub mod expiration;
@@ -26,7 +26,7 @@ use std::{
 pub enum Value {
     /// Hash. This type cannot be serialized
     Hash(locked::Value<HashMap<Bytes, Bytes>>),
-    /// List. This type cannot be sreialized
+    /// List. This type cannot be serialized
     List(locked::Value<VecDeque<checksum::Value>>),
     /// Set. This type cannot be serialized
     Set(locked::Value<HashSet<Bytes>>),
@@ -126,6 +126,7 @@ impl Value {
 impl From<&Value> for Vec<u8> {
     fn from(value: &Value) -> Vec<u8> {
         match value {
+            Value::Ignore => b"".to_vec(),
             Value::Null => b"*-1\r\n".to_vec(),
             Value::Array(x) => {
                 let mut s: Vec<u8> = format!("*{}\r\n", x.len()).into();

+ 242 - 0
tests/unit/type/list.tcl

@@ -195,6 +195,48 @@ start_server {
             assert_equal 1 [r llen blist2{t}]
         }
 
+        test "BRPOPLPUSH - $type" {
+            r del target{t}
+            r rpush target{t} bar
+
+            set rd [redis_deferring_client]
+            create_list blist{t} "a b $large c d"
+
+            $rd brpoplpush blist{t} target{t} 1
+            assert_equal d [$rd read]
+
+            assert_equal d [r lpop target{t}]
+            assert_equal "a b $large c" [r lrange blist{t} 0 -1]
+        }
+
+        foreach wherefrom {left right} {
+            foreach whereto {left right} {
+                test "BLMOVE $wherefrom $whereto - $type" {
+                    r del target{t}
+                    r rpush target{t} bar
+
+                    set rd [redis_deferring_client]
+                    create_list blist{t} "a b $large c d"
+
+                    $rd blmove blist{t} target{t} $wherefrom $whereto 1
+                    set poppedelement [$rd read]
+
+                    if {$wherefrom eq "right"} {
+                        assert_equal d $poppedelement
+                        assert_equal "a b $large c" [r lrange blist{t} 0 -1]
+                    } else {
+                        assert_equal a $poppedelement
+                        assert_equal "b $large c d" [r lrange blist{t} 0 -1]
+                    }
+
+                    if {$whereto eq "right"} {
+                        assert_equal $poppedelement [r rpop target{t}]
+                    } else {
+                        assert_equal $poppedelement [r lpop target{t}]
+                    }
+                }
+            }
+        }
     }
 
     test "BLPOP, LPUSH + DEL should not awake blocked client" {
@@ -273,6 +315,206 @@ start_server {
         assert_equal foo [lindex [r lrange blist 0 -1] 0]
     }
 
+    test "BRPOPLPUSH with zero timeout should block indefinitely" {
+        set rd [redis_deferring_client]
+        r del blist{t} target{t}
+        r rpush target{t} bar
+        $rd brpoplpush blist{t} target{t} 0
+        wait_for_blocked_clients_count 1
+        r rpush blist{t} foo
+        assert_equal foo [$rd read]
+        assert_equal {foo bar} [r lrange target{t} 0 -1]
+    }
+
+    foreach wherefrom {left right} {
+        foreach whereto {left right} {
+            test "BLMOVE $wherefrom $whereto with zero timeout should block indefinitely" {
+                set rd [redis_deferring_client]
+                r del blist{t} target{t}
+                r rpush target{t} bar
+                $rd blmove blist{t} target{t} $wherefrom $whereto 0
+                wait_for_blocked_clients_count 1
+                r rpush blist{t} foo
+                assert_equal foo [$rd read]
+                if {$whereto eq "right"} {
+                    assert_equal {bar foo} [r lrange target{t} 0 -1]
+                } else {
+                    assert_equal {foo bar} [r lrange target{t} 0 -1]
+                }
+            }
+        }
+    }
+
+    foreach wherefrom {left right} {
+        foreach whereto {left right} {
+            test "BLMOVE ($wherefrom, $whereto) with a client BLPOPing the target list" {
+                set rd [redis_deferring_client]
+                set rd2 [redis_deferring_client]
+                r del blist{t} target{t}
+                $rd2 blpop target{t} 0
+                $rd blmove blist{t} target{t} $wherefrom $whereto 0
+                wait_for_blocked_clients_count 2
+                r rpush blist{t} foo
+                assert_equal foo [$rd read]
+                assert_equal {target{t} foo} [$rd2 read]
+                assert_equal 0 [r exists target{t}]
+            }
+        }
+    }
+
+    test "BRPOPLPUSH with wrong source type" {
+        set rd [redis_deferring_client]
+        r del blist{t} target{t}
+        r set blist{t} nolist
+        $rd brpoplpush blist{t} target{t} 1
+        assert_error "WRONGTYPE*" {$rd read}
+    }
+
+    test "BRPOPLPUSH with wrong destination type" {
+        set rd [redis_deferring_client]
+        r del blist{t} target{t}
+        r set target{t} nolist
+        r lpush blist{t} foo
+        $rd brpoplpush blist{t} target{t} 1
+        assert_error "WRONGTYPE*" {$rd read}
+
+        set rd [redis_deferring_client]
+        r del blist{t} target{t}
+        r set target{t} nolist
+        $rd brpoplpush blist{t} target{t} 0
+        wait_for_blocked_clients_count 1
+        r rpush blist{t} foo
+        assert_error "WRONGTYPE*" {$rd read}
+        assert_equal {foo} [r lrange blist{t} 0 -1]
+    }
+
+    test "BRPOPLPUSH maintains order of elements after failure" {
+        set rd [redis_deferring_client]
+        r del blist{t} target{t}
+        r set target{t} nolist
+        $rd brpoplpush blist{t} target{t} 0
+        r rpush blist{t} a b c
+        assert_error "WRONGTYPE*" {$rd read}
+        r lrange blist{t} 0 -1
+    } {a b c}
+
+    test "BRPOPLPUSH with multiple blocked clients" {
+        set rd1 [redis_deferring_client]
+        set rd2 [redis_deferring_client]
+        r del blist{t} target1{t} target2{t}
+        r set target1{t} nolist
+        $rd1 brpoplpush blist{t} target1{t} 0
+        $rd2 brpoplpush blist{t} target2{t} 0
+        r lpush blist{t} foo
+
+        assert_error "WRONGTYPE*" {$rd1 read}
+        assert_equal {foo} [$rd2 read]
+        assert_equal {foo} [r lrange target2{t} 0 -1]
+    }
+
+    test "Linked LMOVEs" {
+      set rd1 [redis_deferring_client]
+      set rd2 [redis_deferring_client]
+
+      r del list1{t} list2{t} list3{t}
+
+      $rd1 blmove list1{t} list2{t} right left 0
+      $rd2 blmove list2{t} list3{t} left right 0
+
+      r rpush list1{t} foo
+
+      after 50
+
+      assert_equal {} [r lrange list1{t} 0 -1]
+      assert_equal {} [r lrange list2{t} 0 -1]
+      assert_equal {foo} [r lrange list3{t} 0 -1]
+    }
+
+    test "Circular BRPOPLPUSH" {
+      set rd1 [redis_deferring_client]
+      set rd2 [redis_deferring_client]
+
+      r del list1{t} list2{t}
+
+      $rd1 brpoplpush list1{t} list2{t} 0
+      $rd2 brpoplpush list2{t} list1{t} 0
+
+      r rpush list1{t} foo
+
+      assert_equal {foo} [r lrange list1{t} 0 -1]
+      assert_equal {} [r lrange list2{t} 0 -1]
+    }
+
+    test "Self-referential BRPOPLPUSH" {
+      set rd [redis_deferring_client]
+
+      r del blist{t}
+
+      $rd brpoplpush blist{t} blist{t} 0
+
+      r rpush blist{t} foo
+
+      assert_equal {foo} [r lrange blist{t} 0 -1]
+    }
+
+    test "BRPOPLPUSH inside a transaction" {
+        r del xlist{t} target{t}
+        r lpush xlist{t} foo
+        r lpush xlist{t} bar
+
+        r multi
+        r brpoplpush xlist{t} target{t} 0
+        r brpoplpush xlist{t} target{t} 0
+        r brpoplpush xlist{t} target{t} 0
+        r lrange xlist{t} 0 -1
+        r lrange target{t} 0 -1
+        r exec
+    } {foo bar {} {} {bar foo}}
+
+    test "PUSH resulting from BRPOPLPUSH affect WATCH" {
+        set blocked_client [redis_deferring_client]
+        set watching_client [redis_deferring_client]
+        r del srclist{t} dstlist{t} somekey{t}
+        r set somekey{t} somevalue
+        $blocked_client brpoplpush srclist{t} dstlist{t} 0
+        $watching_client watch dstlist{t}
+        $watching_client read
+        $watching_client multi
+        $watching_client read
+        $watching_client get somekey{t}
+        $watching_client read
+        r lpush srclist{t} element
+        after 50
+        $watching_client exec
+        $watching_client read
+    } {}
+
+    test "BRPOPLPUSH does not affect WATCH while still blocked" {
+        set blocked_client [redis_deferring_client]
+        set watching_client [redis_deferring_client]
+        r del srclist{t} dstlist{t} somekey{t}
+        r set somekey{t} somevalue
+        $blocked_client brpoplpush srclist{t} dstlist{t} 0
+        $watching_client watch dstlist{t}
+        $watching_client read
+        $watching_client multi
+        $watching_client read
+        $watching_client get somekey{t}
+        $watching_client read
+        $watching_client exec
+        # Blocked BLPOPLPUSH may create problems, unblock it.
+        r lpush srclist{t} element
+        $watching_client read
+    } {somevalue}
+
+    test {BRPOPLPUSH timeout} {
+      set rd [redis_deferring_client]
+
+      $rd brpoplpush foo_list{t} bar_list{t} 1
+      wait_for_blocked_clients_count 1
+      wait_for_blocked_clients_count 0 500 10
+      $rd read
+    } {}
 
     test "BLPOP when new key is moved into place" {
         set rd [redis_deferring_client]