Ver código fonte

Add support for BLMOVE / BRPOPLPUSH

Cesar Rodas 2 anos atrás
pai
commit
6cf86542fa
6 arquivos alterados com 142 adições e 53 exclusões
  1. 94 47
      src/cmd/list.rs
  2. 6 3
      src/cmd/server.rs
  3. 14 0
      src/connection/connections.rs
  4. 7 1
      src/connection/mod.rs
  5. 18 0
      src/dispatcher/mod.rs
  6. 3 2
      src/value/mod.rs

+ 94 - 47
src/cmd/list.rs

@@ -14,6 +14,7 @@ use std::collections::VecDeque;
 use tokio::time::{sleep, Duration, Instant};
 
 #[allow(clippy::needless_range_loop)]
+/// Removes an element from a list
 fn remove_element(
     conn: &Connection,
     key: &Bytes,
@@ -70,6 +71,53 @@ fn remove_element(
     Ok(result)
 }
 
+#[inline]
+/// Handles the timeout/sleep logic for all blocking commands.
+async fn handle_timeout(conn: &Connection, timeout: Option<Instant>) -> Result<bool, Error> {
+    if let Some(timeout) = timeout {
+        if Instant::now() >= timeout {
+            conn.unblock(UnblockReason::Timeout);
+            return Ok(true);
+        }
+    }
+    if conn.status() == ConnectionStatus::ExecutingTx {
+        conn.unblock(UnblockReason::Timeout);
+        return Ok(true);
+    }
+
+    if let Some(reason) = conn.has_been_unblocked_externally() {
+        match reason {
+            UnblockReason::Error => Err(Error::UnblockByError),
+            UnblockReason::Timeout => Ok(true),
+        }
+    } else {
+        // Check if the connection is still alive by entering the loop and check if the socket/unixsocket
+        // is still alive
+        conn.append_response(Value::Ignore);
+        sleep(Duration::from_millis(100)).await;
+        Ok(false)
+    }
+}
+
+/// Parses timeout and returns an instant or none if it should wait forever.
+#[inline]
+fn parse_timeout(arg: &Bytes) -> Result<Option<Instant>, Error> {
+    let raw_timeout = bytes_to_number::<f64>(arg)?;
+    if raw_timeout < 0f64 {
+        return Err(Error::NegativeNumber("timeout".to_owned()));
+    }
+
+    if raw_timeout == 0.0 {
+        return Ok(None);
+    }
+
+    Ok(Some(
+        Instant::now()
+            .checked_add(Duration::from_millis((raw_timeout * 1000f64).round() as u64))
+            .unwrap_or_else(far_future),
+    ))
+}
+
 /// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
 /// the connection when there are no elements to pop from any of the given lists. An element is
 /// popped from the head of the first list that is non-empty, with the given keys being checked in
@@ -88,46 +136,61 @@ pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             };
         }
 
-        if let Some(timeout) = timeout {
-            if Instant::now() >= timeout {
-                conn.unblock(UnblockReason::Timeout);
-                break;
-            }
-        }
-
-        if conn.status() == ConnectionStatus::ExecutingTx {
-            conn.unblock(UnblockReason::Timeout);
+        if handle_timeout(&conn, timeout).await? {
             break;
         }
-
-        if let Some(reason) = conn.is_unblocked() {
-            return match reason {
-                UnblockReason::Error => Err(Error::UnblockByError),
-                UnblockReason::Timeout => Ok(Value::Null),
-            };
-        }
-
-        sleep(Duration::from_millis(100)).await;
     }
 
     Ok(Value::Null)
 }
 
-fn parse_timeout(arg: &Bytes) -> Result<Option<Instant>, Error> {
-    let raw_timeout = bytes_to_number::<f64>(arg)?;
-    if raw_timeout < 0f64 {
-        return Err(Error::NegativeNumber("timeout".to_owned()));
-    }
+/// BLMOVE is the blocking variant of LMOVE. When source contains elements, this
+/// command behaves exactly like LMOVE. When used inside a MULTI/EXEC block,
+/// this command behaves exactly like LMOVE. When source is empty, Redis will
+/// block the connection until another client pushes to it or until timeout (a
+/// double value specifying the maximum number of seconds to block) is reached.
+/// A timeout of zero can be used to block indefinitely.
+///
+/// This command comes in place of the now deprecated BRPOPLPUSH. Doing BLMOVE
+/// RIGHT LEFT is equivalent.
+///
+/// See LMOVE for more information.
+pub async fn blmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let timeout = parse_timeout(&args[5])?;
+    conn.block();
 
-    if raw_timeout == 0.0 {
-        return Ok(None);
+    loop {
+        match lmove(&conn, &args).await? {
+            Value::Null => (),
+            n => return Ok(n),
+        };
+        if handle_timeout(&conn, timeout).await? {
+            break;
+        }
     }
 
-    Ok(Some(
-        Instant::now()
-            .checked_add(Duration::from_micros((raw_timeout * 1000f64).round() as u64))
-            .unwrap_or_else(far_future),
-    ))
+    Ok(Value::Null)
+}
+
+/// BRPOPLPUSH is the blocking variant of RPOPLPUSH. When source contains
+/// elements, this command behaves exactly like RPOPLPUSH. When used inside a
+/// MULTI/EXEC block, this command behaves exactly like RPOPLPUSH. When source
+/// is empty, Redis will block the connection until another client pushes to it
+/// or until timeout is reached. A timeout of zero can be used to block
+/// indefinitely.
+pub async fn brpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    blmove(
+        conn,
+        &[
+            "blmove".into(),
+            args[1].clone(),
+            args[2].clone(),
+            "RIGHT".into(),
+            "LEFT".into(),
+            args[3].clone(),
+        ],
+    )
+    .await
 }
 
 /// BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks
@@ -148,25 +211,9 @@ pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
             };
         }
 
-        if let Some(timeout) = timeout {
-            if Instant::now() >= timeout {
-                conn.unblock(UnblockReason::Timeout);
-                break;
-            }
-        }
-        if conn.status() == ConnectionStatus::ExecutingTx {
-            conn.unblock(UnblockReason::Timeout);
+        if handle_timeout(&conn, timeout).await? {
             break;
         }
-
-        if let Some(reason) = conn.is_unblocked() {
-            return match reason {
-                UnblockReason::Error => Err(Error::UnblockByError),
-                UnblockReason::Timeout => Ok(Value::Null),
-            };
-        }
-
-        sleep(Duration::from_millis(100)).await;
     }
 
     Ok(Value::Null)

+ 6 - 3
src/cmd/server.rs

@@ -79,12 +79,15 @@ pub async fn debug(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 
 /// The INFO command returns information and statistics about the server in a
 /// format that is simple to parse by computers and easy to read by humans.
-pub async fn info(_: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+pub async fn info(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+    let connections = conn.all_connections();
     Ok(Value::Blob(
         format!(
-            "redis_version: {}\r\nredis_git_sha1:{}\r\n",
+            "redis_version: {}\r\nredis_git_sha1:{}\r\n\r\nconnected_clients:{}\r\nblocked_clients:{}\r\n",
             git_version!(),
-            git_version!()
+            git_version!(),
+            connections.total_connections(),
+            connections.total_blocked_connections(),
         )
         .as_str()
         .into(),

+ 14 - 0
src/connection/connections.rs

@@ -79,6 +79,20 @@ impl Connections {
         self.connections.read().get(&conn_id).cloned()
     }
 
+    /// Return total number of connections
+    pub fn total_connections(&self) -> usize {
+        self.connections.read().len()
+    }
+
+    /// REturn total number of blocked connections
+    pub fn total_blocked_connections(&self) -> usize {
+        self.connections
+            .read()
+            .iter()
+            .map(|(_, conn)| if conn.is_blocked() { 1 } else { 0 })
+            .sum()
+    }
+
     /// Iterates over all connections
     pub fn iter(&self, f: &mut dyn FnMut(Arc<Connection>)) {
         for (_, value) in self.connections.read().iter() {

+ 7 - 1
src/connection/mod.rs

@@ -138,10 +138,15 @@ impl Connection {
     }
 
     /// If the current connection has been externally unblocked
-    pub fn is_unblocked(&self) -> Option<UnblockReason> {
+    pub fn has_been_unblocked_externally(&self) -> Option<UnblockReason> {
         self.info.read().unblock_reason
     }
 
+    /// Is the current connection blocked?
+    pub fn is_blocked(&self) -> bool {
+        self.info.read().is_blocked
+    }
+
     /// Connection ID
     pub fn id(&self) -> u128 {
         self.id
@@ -285,6 +290,7 @@ impl Connection {
     /// all_connection lists.
     pub fn destroy(self: Arc<Connection>) {
         let pubsub = self.pubsub();
+        self.clone().unblock(UnblockReason::Timeout);
         pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self, false);
         pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self, false);
         self.all_connections.clone().remove(self);

+ 18 - 0
src/dispatcher/mod.rs

@@ -196,6 +196,24 @@ dispatcher! {
             1,
             true,
         },
+        BRPOPLPUSH {
+            cmd::list::brpoplpush,
+            [Flag::Write Flag::NoScript],
+            4,
+            1,
+            2,
+            1,
+            true,
+        },
+        BLMOVE {
+            cmd::list::blmove,
+            [Flag::Write Flag::NoScript],
+            6,
+            1,
+            2,
+            1,
+            true,
+        },
         LINDEX {
             cmd::list::lindex,
             [Flag::ReadOnly],

+ 3 - 2
src/value/mod.rs

@@ -1,6 +1,6 @@
 //! # Redis Value
 //!
-//! All redis internal data structures and values are absracted in this mod.
+//! All redis internal data structures and values are abstracted in this mod.
 pub mod checksum;
 pub mod cursor;
 pub mod expiration;
@@ -26,7 +26,7 @@ use std::{
 pub enum Value {
     /// Hash. This type cannot be serialized
     Hash(locked::Value<HashMap<Bytes, Bytes>>),
-    /// List. This type cannot be sreialized
+    /// List. This type cannot be serialized
     List(locked::Value<VecDeque<checksum::Value>>),
     /// Set. This type cannot be serialized
     Set(locked::Value<HashSet<Bytes>>),
@@ -126,6 +126,7 @@ impl Value {
 impl From<&Value> for Vec<u8> {
     fn from(value: &Value) -> Vec<u8> {
         match value {
+            Value::Ignore => b"".to_vec(),
             Value::Null => b"*-1\r\n".to_vec(),
             Value::Array(x) => {
                 let mut s: Vec<u8> = format!("*{}\r\n", x.len()).into();