Ver código fonte

Added Connection struct

This connection will have an Arc's reference to the shared Databases and
all settings related to the connection.
Cesar Rodas 3 anos atrás
pai
commit
fec2a234a2
6 arquivos alterados com 43 adições e 23 exclusões
  1. 11 11
      src/cmd/string.rs
  2. 17 0
      src/connection.rs
  3. 3 2
      src/db/mod.rs
  4. 3 3
      src/dispatcher.rs
  5. 3 3
      src/macros.rs
  6. 6 4
      src/main.rs

+ 11 - 11
src/cmd/string.rs

@@ -1,22 +1,22 @@
-use crate::{db::Db, error::Error, value::Value};
+use crate::{connection::Connection, error::Error, value::Value};
 use bytes::Bytes;
 
-pub fn incr(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
-    db.incr(&args[1], 1)
+pub fn incr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().incr(&args[1], 1)
 }
 
-pub fn decr(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
-    db.incr(&args[1], -1)
+pub fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().incr(&args[1], -1)
 }
 
-pub fn get(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
-    db.get(&args[1])
+pub fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().get(&args[1])
 }
 
-pub fn set(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
-    db.set(&args[1], &Value::Blob(args[2].to_owned()))
+pub fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().set(&args[1], &Value::Blob(args[2].to_owned()))
 }
 
-pub fn getset(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
-    db.getset(&args[1], &Value::Blob(args[2].to_owned()))
+pub fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    conn.db().getset(&args[1], &Value::Blob(args[2].to_owned()))
 }

+ 17 - 0
src/connection.rs

@@ -0,0 +1,17 @@
+use crate::db::Db;
+use std::sync::Arc;
+
+pub struct Connection {
+    db: Arc<Db>,
+    current_db: i32,
+}
+
+impl Connection {
+    pub fn new(db: Arc<Db>) -> Self {
+        Self { db, current_db: 0 }
+    }
+
+    pub fn db(&self) -> &Db {
+        &self.db
+    }
+}

+ 3 - 2
src/db/mod.rs

@@ -1,6 +1,6 @@
 use crate::{error::Error, value::Value};
-use log::trace;
 use bytes::Bytes;
+use log::trace;
 use seahash::hash;
 use std::collections::{BTreeMap, HashMap};
 use std::convert::TryInto;
@@ -29,6 +29,7 @@ impl Db {
         }
     }
 
+    #[inline]
     fn get_slot(&self, key: &Bytes) -> usize {
         let id = (hash(key) as usize) % self.entries.len();
         trace!("selected slot {} for key {:?}", id, key);
@@ -50,7 +51,7 @@ impl Db {
             None => {
                 entries.insert(key.clone(), "1".into());
                 Ok((1 as i64).into())
-            },
+            }
         }
     }
 

+ 3 - 3
src/dispatcher.rs

@@ -1,10 +1,10 @@
-use crate::{cmd, db::Db, dispatcher, error::Error, value::Value};
+use crate::{cmd, connection::Connection, dispatcher, error::Error, value::Value};
 use bytes::Bytes;
 use std::convert::TryInto;
 use std::time::SystemTime;
 use std::time::UNIX_EPOCH;
 
-fn do_time(_db: &Db, _args: &[Bytes]) -> Result<Value, Error> {
+fn do_time(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let seconds = format!("{}", since_the_epoch.as_secs());
@@ -13,7 +13,7 @@ fn do_time(_db: &Db, _args: &[Bytes]) -> Result<Value, Error> {
     Ok(vec![seconds.as_str(), millis.as_str()].into())
 }
 
-fn do_command(_db: &Db, _args: &[Bytes]) -> Result<Value, Error> {
+fn do_command(_conn: &Connection, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let in_ms: i128 =

+ 3 - 3
src/macros.rs

@@ -27,8 +27,8 @@ macro_rules! dispatcher {
                 }
 
                 impl ExecutableCommand for Command {
-                    fn execute(&self, db: &Db, args: &[Bytes]) -> Result<Value, Error> {
-                        $handler(db, args)
+                    fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+                        $handler(conn, args)
                     }
 
                     fn check_number_args(&self, n: usize) -> bool {
@@ -49,7 +49,7 @@ macro_rules! dispatcher {
         use std::ops::Deref;
 
         pub trait ExecutableCommand {
-            fn execute(&self, db: &Db, args: &[Bytes]) -> Result<Value, Error>;
+            fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
 
             fn check_number_args(&self, n: usize) -> bool;
 

+ 6 - 4
src/main.rs

@@ -1,4 +1,5 @@
 mod cmd;
+mod connection;
 mod db;
 mod dispatcher;
 mod error;
@@ -6,10 +7,11 @@ mod macros;
 mod value;
 
 use bytes::{Buf, Bytes, BytesMut};
+use connection::Connection;
 use dispatcher::Dispatcher;
 use futures::SinkExt;
-use redis_zero_protocol_parser::{parse_server, Error as RedisError};
 use log::info;
+use redis_zero_protocol_parser::{parse_server, Error as RedisError};
 use std::env;
 use std::error::Error;
 use std::ops::Deref;
@@ -27,7 +29,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
 
     env_logger::init();
 
-
     let listener = TcpListener::bind(&addr).await?;
     info!("Listening on: {}", addr);
 
@@ -36,7 +37,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
     loop {
         match listener.accept().await {
             Ok((socket, _)) => {
-                let db = db.clone();
+                let conn = Connection::new(db.clone());
+
                 tokio::spawn(async move {
                     let mut transport = Framed::new(socket, RedisParser);
 
@@ -46,7 +48,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
                                 Ok(handler) => {
                                     let r = handler
                                         .deref()
-                                        .execute(&db, &args)
+                                        .execute(&conn, &args)
                                         .unwrap_or_else(|x| x.into());
                                     transport.send(r).await;
                                 }