Ver código fonte

Experiment with a better blocked version of the commands

The blocked versions must return right away to the main loop to detect
sudden disconnections. On disconnections the blocked command must be
stopped.

The current implementation is naive and it will never release unless the
client disconnects gracefully.
Cesar Rodas 2 anos atrás
pai
commit
842aa90fe1
3 arquivos alterados com 193 adições e 62 exclusões
  1. 128 42
      src/cmd/list.rs
  2. 19 0
      src/connection/mod.rs
  3. 46 20
      src/server.rs

+ 128 - 42
src/cmd/list.rs

@@ -15,6 +15,7 @@ use tokio::time::{sleep, Duration, Instant};
 
 #[allow(clippy::needless_range_loop)]
 /// Removes an element from a list
+#[inline]
 fn remove_element(
     conn: &Connection,
     key: &Bytes,
@@ -80,20 +81,13 @@ async fn handle_timeout(conn: &Connection, timeout: Option<Instant>) -> Result<b
             return Ok(true);
         }
     }
-    if conn.status() == ConnectionStatus::ExecutingTx {
-        conn.unblock(UnblockReason::Timeout);
-        return Ok(true);
-    }
 
     if let Some(reason) = conn.has_been_unblocked_externally() {
         match reason {
             UnblockReason::Error => Err(Error::UnblockByError),
-            UnblockReason::Timeout => Ok(true),
+            _ => Ok(true),
         }
     } else {
-        // Check if the connection is still alive by entering the loop and check if the socket/unixsocket
-        // is still alive
-        conn.append_response(Value::Ignore);
         sleep(Duration::from_millis(100)).await;
         Ok(false)
     }
@@ -113,7 +107,9 @@ fn parse_timeout(arg: &Bytes) -> Result<Option<Instant>, Error> {
 
     Ok(Some(
         Instant::now()
-            .checked_add(Duration::from_millis((raw_timeout * 1000f64).round() as u64))
+            .checked_add(Duration::from_millis(
+                (raw_timeout * 1_000f64).round() as u64
+            ))
             .unwrap_or_else(far_future),
     ))
 }
@@ -123,25 +119,58 @@ fn parse_timeout(arg: &Bytes) -> Result<Option<Instant>, Error> {
 /// popped from the head of the first list that is non-empty, with the given keys being checked in
 /// the order that they are given.
 pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let timeout = parse_timeout(&args[args.len() - 1])?;
-    let len = args.len() - 1;
-
-    conn.block();
-
-    loop {
-        for key in args[1..len].iter() {
-            match remove_element(conn, key, None, true)? {
+    let blpop_task = |conn: &Connection, args: &[Bytes]| -> Result<Value, Error> {
+        for key in (1..args.len() - 1) {
+            let key = &args[key];
+            match remove_element(&conn, key, None, true)? {
                 Value::Null => (),
                 n => return Ok(vec![Value::new(&key), n].into()),
             };
         }
+        Ok(Value::Null)
+    };
 
-        if handle_timeout(&conn, timeout).await? {
-            break;
-        }
+    if conn.is_executing_tx() {
+        return blpop_task(conn, args);
     }
 
-    Ok(Value::Null)
+    let timeout = parse_timeout(&args[args.len() - 1])?;
+    let conn = conn.clone();
+    let args = args.to_vec();
+
+    conn.block();
+
+    tokio::spawn(async move {
+        loop {
+            match blpop_task(&conn, &args) {
+                Ok(Value::Null) => {}
+                Ok(x) => {
+                    conn.append_response(x);
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+            }
+
+            match handle_timeout(&conn, timeout).await {
+                Ok(true) => {
+                    conn.append_response(Value::Null);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    break;
+                }
+                _ => {}
+            }
+        }
+    });
+
+    Ok(Value::Ignore)
 }
 
 /// BLMOVE is the blocking variant of LMOVE. When source contains elements, this
@@ -156,20 +185,45 @@ pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 ///
 /// See LMOVE for more information.
 pub async fn blmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    if conn.is_executing_tx() {
+        return lmove(&conn, &args).await;
+    }
+
     let timeout = parse_timeout(&args[5])?;
     conn.block();
 
-    loop {
-        match lmove(&conn, &args).await? {
-            Value::Null => (),
-            n => return Ok(n),
-        };
-        if handle_timeout(&conn, timeout).await? {
-            break;
+    let conn = conn.clone();
+    let args = args.to_vec();
+    tokio::spawn(async move {
+        loop {
+            match lmove(&conn, &args).await {
+                Ok(Value::Null) => (),
+                Ok(n) => {
+                    conn.append_response(n);
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+            };
+            match handle_timeout(&conn, timeout).await {
+                Ok(true) => {
+                    conn.append_response(Value::Null);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    break;
+                }
+                _ => {}
+            }
         }
-    }
+    });
 
-    Ok(Value::Null)
+    Ok(Value::Ignore)
 }
 
 /// BRPOPLPUSH is the blocking variant of RPOPLPUSH. When source contains
@@ -198,25 +252,57 @@ pub async fn brpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
 /// popped from the tail of the first list that is non-empty, with the given keys being checked in
 /// the order that they are given.
 pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let timeout = parse_timeout(&args[args.len() - 1])?;
-    let len = args.len() - 1;
-
-    conn.block();
-
-    loop {
-        for key in args[1..len].iter() {
-            match remove_element(conn, key, None, false)? {
+    let brpop_task = |conn: &Connection, args: &[Bytes]| -> Result<Value, Error> {
+        for key in (1..args.len() - 1) {
+            let key = &args[key];
+            match remove_element(&conn, key, None, false)? {
                 Value::Null => (),
                 n => return Ok(vec![Value::new(&key), n].into()),
             };
         }
+        Ok(Value::Null)
+    };
+    if conn.is_executing_tx() {
+        return brpop_task(conn, args);
+    }
 
-        if handle_timeout(&conn, timeout).await? {
-            break;
+    let timeout = parse_timeout(&args[args.len() - 1])?;
+    let conn = conn.clone();
+    let args = args.to_vec();
+
+    conn.block();
+
+    tokio::spawn(async move {
+        loop {
+            match brpop_task(&conn, &args) {
+                Ok(Value::Null) => {}
+                Ok(x) => {
+                    conn.append_response(x);
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    conn.unblock(UnblockReason::Finished);
+                    break;
+                }
+            }
+
+            match handle_timeout(&conn, timeout).await {
+                Ok(true) => {
+                    conn.append_response(Value::Null);
+                    break;
+                }
+                Err(x) => {
+                    conn.append_response(x.into());
+                    break;
+                }
+                _ => {}
+            }
         }
-    }
+    });
 
-    Ok(Value::Null)
+    Ok(Value::Ignore)
 }
 
 /// Returns the element at index index in the list stored at key. The index is zero-based, so 0

+ 19 - 0
src/connection/mod.rs

@@ -38,6 +38,8 @@ pub enum UnblockReason {
     Timeout,
     /// Throw an error
     Error,
+    /// Operation finished successfully
+    Finished,
 }
 
 /// Connection information
@@ -90,6 +92,13 @@ impl Connection {
         self.info.read().db.clone()
     }
 
+    /// Creates a clone connection
+    pub fn clone(&self) -> Arc<Connection> {
+        self.all_connections
+            .get_by_conn_id(self.id)
+            .expect("Connection must be registered")
+    }
+
     /// Returns the global pubsub server
     pub fn pubsub(&self) -> Arc<Pubsub> {
         self.all_connections.pubsub()
@@ -138,16 +147,19 @@ impl Connection {
     }
 
     /// If the current connection has been externally unblocked
+    #[inline]
     pub fn has_been_unblocked_externally(&self) -> Option<UnblockReason> {
         self.info.read().unblock_reason
     }
 
     /// Is the current connection blocked?
+    #[inline]
     pub fn is_blocked(&self) -> bool {
         self.info.read().is_blocked
     }
 
     /// Connection ID
+    #[inline]
     pub fn id(&self) -> u128 {
         self.id
     }
@@ -212,10 +224,17 @@ impl Connection {
     }
 
     /// Returns the status of the connection
+    #[inline]
     pub fn status(&self) -> ConnectionStatus {
         self.info.read().status
     }
 
+    /// Is connection executing transaction?
+    #[inline]
+    pub fn is_executing_tx(&self) -> bool {
+        self.info.read().status == ConnectionStatus::ExecutingTx
+    }
+
     /// Watches keys. In a transaction watched keys are a mechanism to discard a transaction if
     /// some value changed since the moment the command was queued until the execution time.
     pub fn watch_key(&self, keys: &[(&Bytes, u128)]) {

+ 46 - 20
src/server.rs

@@ -6,6 +6,7 @@ use crate::{
     config::Config,
     connection::{connections::Connections, Connection, ConnectionStatus},
     db::{pool::Databases, Db},
+    dispatcher::Dispatcher,
     error::Error,
     value::Value,
 };
@@ -171,6 +172,20 @@ async fn serve_unixsocket(
     Ok(())
 }
 
+#[inline]
+async fn execute_command(
+    conn: &Connection,
+    dispatcher: &Dispatcher,
+    args: &[Bytes],
+) -> Option<Value> {
+    match dispatcher.execute(&conn, args).await {
+        Ok(result) => Some(result),
+        Err(Error::EmptyLine) => Some(Value::Ignore),
+        Err(Error::Quit) => None,
+        Err(err) => Some(err.into()),
+    }
+}
+
 /// Handles a new connection
 ///
 /// The new connection can be created from a new TCP or Unix stream.
@@ -182,37 +197,48 @@ async fn handle_new_connection<T: AsyncReadExt + AsyncWriteExt + Unpin, A: ToStr
     addr: A,
 ) {
     let (mut pubsub, conn) = all_connections.new_connection(default_db, addr);
+    let dispatcher = all_connections.get_dispatcher();
+    // Commands are being buffered when the client is blocked.
+    let mut buffered_commands: Vec<Vec<Bytes>> = vec![];
     trace!("New connection {}", conn.id());
 
     loop {
         tokio::select! {
             Some(msg) = pubsub.recv() => {
+                // Pub-sub message
                 if transport.send(msg).await.is_err() {
                     break;
                 }
+                'outer: for args in buffered_commands.iter() {
+                    // Client sent commands while the connection was blocked,
+                    // now it is time to process them one by one
+                    match execute_command(&conn, &dispatcher, &args).await {
+                        Some(result) => if result != Value::Ignore && transport.send(result).await.is_err() {
+                            break 'outer;
+                        },
+                        None => {
+                            let _ = transport.send(Value::Ok).await;
+                            break 'outer;
+                        }
+                    }
+                }
+                buffered_commands.clear();
             }
             result = transport.next() => match result {
-                Some(Ok(args)) => match all_connections.get_dispatcher().execute(&conn, &args).await {
-                    Ok(result) => {
-                        if result == Value::Ignore {
+                Some(Ok(args)) => {
+                        if conn.is_blocked() {
+                            buffered_commands.push(args.clone());
                             continue;
                         }
-                        if transport.send(result).await.is_err() {
-                            break;
-                        }
-                    },
-                    Err(Error::EmptyLine) => {
-                        // do nothing
-                    },
-                    Err(Error::Quit) => {
-                        let _ = transport.send(Value::Ok).await;
-                        break;
-                    }
-                    Err(err) => {
-                        if transport.send(err.into()).await.is_err() {
-                            break;
-                        }
-                    }
+                        match execute_command(&conn, &dispatcher, &args).await {
+                            Some(result) => if result != Value::Ignore && transport.send(result).await.is_err() {
+                               break;
+                            },
+                            None => {
+                                let _ = transport.send(Value::Ok).await;
+                                break;
+                            }
+                        };
                 },
                 Some(Err(e)) => {
                     warn!("error on decoding from socket; error = {:?}", e);
@@ -227,7 +253,7 @@ async fn handle_new_connection<T: AsyncReadExt + AsyncWriteExt + Unpin, A: ToStr
 
 /// Spawn redis server
 ///
-/// Spawn a redis server. This function will create Conections object, the in-memory database, the
+/// Spawn a redis server. This function will create Connections object, the in-memory database, the
 /// purge process and the TCP server.
 ///
 /// This process is also listening for any incoming message through the internal pub-sub.