Selaa lähdekoodia

Optimized dispatcher (#15)

On each parsed command send by clients a new dispatcher object would
have been created to be thrown away right when the command has been
executed. This was a bit suboptimal, although easier to implement.

This MR creates a single dispatcher object and will return a reference
to the handler function based on the command to be executed. Having a
single object, shared for all connections is far more efficient *and*
also required to gather metrics.
César D. Rodas 3 vuotta sitten
vanhempi
säilyke
5ea4149739
5 muutettua tiedostoa jossa 31 lisäystä ja 29 poistoa
  1. 3 3
      src/cmd/mod.rs
  2. 3 3
      src/cmd/transaction.rs
  3. 7 1
      src/connection/connections.rs
  4. 16 20
      src/macros.rs
  5. 2 2
      src/server.rs

+ 3 - 3
src/cmd/mod.rs

@@ -19,7 +19,6 @@ mod test {
     use bytes::Bytes;
     use std::{
         net::{IpAddr, Ipv4Addr, SocketAddr},
-        ops::Deref,
         sync::Arc,
     };
     use tokio::sync::mpsc::UnboundedReceiver;
@@ -55,8 +54,9 @@ mod test {
     pub async fn run_command(conn: &Connection, cmd: &[&str]) -> Result<Value, Error> {
         let args: Vec<Bytes> = cmd.iter().map(|s| Bytes::from(s.to_string())).collect();
 
-        let handler = Dispatcher::new(&args)?;
+        let dispatcher = Dispatcher::new();
+        let handler = dispatcher.get_handler(&args)?;
 
-        handler.deref().execute(conn, &args).await
+        handler.execute(conn, &args).await
     }
 }

+ 3 - 3
src/cmd/transaction.rs

@@ -1,6 +1,5 @@
 use crate::{
     connection::{Connection, ConnectionStatus},
-    dispatcher::Dispatcher,
     error::Error,
     value::Value,
 };
@@ -33,7 +32,7 @@ pub async fn exec(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
 
     if let Some(commands) = conn.get_queue_commands() {
         for args in commands.iter() {
-            let result = match Dispatcher::new(args) {
+            let result = match conn.all_connections().get_dispatcher().get_handler(args) {
                 Ok(handler) => handler
                     .execute(conn, args)
                     .await
@@ -195,7 +194,8 @@ mod test {
 
     fn get_keys(args: &[&str]) -> Vec<Bytes> {
         let args: Vec<Bytes> = args.iter().map(|s| Bytes::from(s.to_string())).collect();
-        if let Ok(cmd) = Dispatcher::new(&args) {
+        let d = Dispatcher::new();
+        if let Ok(cmd) = d.get_handler(&args) {
             cmd.get_keys(&args).iter().map(|k| (*k).clone()).collect()
         } else {
             vec![]

+ 7 - 1
src/connection/connections.rs

@@ -1,5 +1,5 @@
 use super::{pubsub_connection::PubsubClient, pubsub_server::Pubsub, Connection, ConnectionInfo};
-use crate::{db::Db, value::Value};
+use crate::{db::Db, dispatcher::Dispatcher, value::Value};
 use parking_lot::RwLock;
 use std::{collections::BTreeMap, net::SocketAddr, sync::Arc};
 
@@ -10,6 +10,7 @@ pub struct Connections {
     connections: RwLock<BTreeMap<u128, Arc<Connection>>>,
     db: Arc<Db>,
     pubsub: Arc<Pubsub>,
+    dispatcher: Arc<Dispatcher>,
     counter: RwLock<u128>,
 }
 
@@ -19,6 +20,7 @@ impl Connections {
             counter: RwLock::new(0),
             db,
             pubsub: Arc::new(Pubsub::new()),
+            dispatcher: Arc::new(Dispatcher::new()),
             connections: RwLock::new(BTreeMap::new()),
         }
     }
@@ -28,6 +30,10 @@ impl Connections {
         self.db.clone()
     }
 
+    pub fn get_dispatcher(&self) -> Arc<Dispatcher> {
+        self.dispatcher.clone()
+    }
+
     pub fn pubsub(&self) -> Arc<Pubsub> {
         self.pubsub.clone()
     }

+ 16 - 20
src/macros.rs

@@ -19,6 +19,7 @@ macro_rules! dispatcher {
                 use super::*;
                 use async_trait::async_trait;
 
+                #[derive(Debug)]
                 pub struct Command {
                     pub tags: &'static [&'static str],
                     pub min_args: i32,
@@ -104,7 +105,6 @@ macro_rules! dispatcher {
         )+)+
 
         use async_trait::async_trait;
-        use std::ops::Deref;
 
         #[async_trait]
         pub trait ExecutableCommand {
@@ -124,22 +124,30 @@ macro_rules! dispatcher {
         }
 
         #[allow(non_snake_case, non_camel_case_types)]
-        pub enum Dispatcher {
+        #[derive(Debug)]
+        pub struct Dispatcher {
             $($(
-                $command($command::Command),
+                $command: $command::Command,
             )+)+
         }
 
         impl Dispatcher {
-            pub fn new(args: &[Bytes]) -> Result<Self, Error> {
+            pub fn new() -> Self {
+                Self {
+                    $($(
+                        $command: $command::Command::new(),
+                    )+)+
+                }
+            }
+            pub fn get_handler(&self, args: &[Bytes]) -> Result<&(dyn ExecutableCommand + Send + Sync + 'static), Error> {
                 let command = String::from_utf8_lossy(&args[0]).to_lowercase();
 
-                let command = match command.as_str() {
+                let command: &(dyn ExecutableCommand + Send + Sync + 'static) = match command.as_str() {
                 $($(
-                    stringify!($command) => Ok(Self::$command($command::Command::new())),
+                    stringify!($command) => &self.$command,
                 )+)+
-                    _ => Err(Error::CommandNotFound(command.into())),
-                }?;
+                    _ => return Err(Error::CommandNotFound(command.into())),
+                };
 
                 if ! command.check_number_args(args.len()) {
                     Err(Error::InvalidArgsCount(command.name().into()))
@@ -148,18 +156,6 @@ macro_rules! dispatcher {
                 }
             }
         }
-
-        impl Deref for Dispatcher {
-            type Target = dyn ExecutableCommand + Sync + Send;
-
-            fn deref(&self) -> &(dyn ExecutableCommand + Sync + Send + 'static) {
-                match self {
-                    $($(
-                        Self::$command(v) => v as &(dyn ExecutableCommand + Sync + Send),
-                    )+)+
-                }
-            }
-        }
     }
 }
 

+ 2 - 2
src/server.rs

@@ -1,7 +1,6 @@
 use crate::{
     connection::{connections::Connections, ConnectionStatus},
     db::Db,
-    dispatcher::Dispatcher,
     value::Value,
 };
 use bytes::{Buf, Bytes, BytesMut};
@@ -69,6 +68,7 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
     loop {
         match listener.accept().await {
             Ok((socket, addr)) => {
+                let all_connections = all_connections.clone();
                 let (mut pubsub, conn) = all_connections.new_connection(db.clone(), addr);
 
                 tokio::spawn(async move {
@@ -84,7 +84,7 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
                                 }
                             }
                             result = transport.next() => match result {
-                            Some(Ok(args)) => match Dispatcher::new(&args) {
+                            Some(Ok(args)) => match all_connections.get_dispatcher().get_handler(&args) {
                                 Ok(handler) => {
                                     match handler
                                         .execute(&conn, &args)