Ver código fonte

Added client name/client id

Cesar Rodas 3 anos atrás
pai
commit
d5cd3c18b1
9 arquivos alterados com 165 adições e 28 exclusões
  1. 45 0
      src/cmd/client.rs
  2. 1 0
      src/cmd/mod.rs
  3. 58 6
      src/connection.rs
  4. 1 1
      src/db/mod.rs
  5. 19 4
      src/dispatcher.rs
  6. 5 0
      src/error.rs
  7. 13 2
      src/macros.rs
  8. 16 15
      src/main.rs
  9. 7 0
      src/value.rs

+ 45 - 0
src/cmd/client.rs

@@ -0,0 +1,45 @@
+use crate::{connection::Connection, error::Error, option, value::Value};
+use bytes::Bytes;
+
+pub fn client(conn: &mut Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let sub = unsafe { std::str::from_utf8_unchecked(&args[1]) }.to_string();
+
+    let expected = match sub.to_lowercase().as_str() {
+        "setname" => 3,
+        _ => 2,
+    };
+
+    if args.len() != expected {
+        return Err(Error::WrongArgument(
+            "client".to_owned(),
+            sub.to_uppercase(),
+        ));
+    }
+
+    match sub.to_lowercase().as_str() {
+        "id" => Ok((conn.id() as i64).into()),
+        "info" => Ok(conn.info().as_str().into()),
+        "getname" => Ok(option!(conn.name().to_owned())),
+        "setname" => {
+            let name = unsafe { std::str::from_utf8_unchecked(&args[2]) }.to_string();
+            conn.set_name(name);
+            Ok(Value::OK)
+        }
+        _ => Err(Error::WrongArgument(
+            "client".to_owned(),
+            sub.to_uppercase(),
+        )),
+    }
+}
+
+pub fn echo(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(Value::Blob(args[1].to_owned()))
+}
+
+pub fn ping(_conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    match args.len() {
+        1 => Ok(Value::String("PONG".to_owned())),
+        2 => Ok(Value::Blob(args[1].to_owned())),
+        _ => Err(Error::InvalidArgsCount("ping".to_owned())),
+    }
+}

+ 1 - 0
src/cmd/mod.rs

@@ -1 +1,2 @@
+pub mod client;
 pub mod string;

+ 58 - 6
src/connection.rs

@@ -1,17 +1,69 @@
 use crate::db::Db;
-use std::sync::Arc;
+use std::collections::BTreeMap;
+use std::net::SocketAddr;
+use std::sync::{Arc, Mutex};
+
+pub struct Connections {
+    connections: BTreeMap<u128, Arc<Mutex<Connection>>>,
+    counter: u128,
+}
+
+impl Connections {
+    pub fn new() -> Self {
+        Self {
+            counter: 0,
+            connections: BTreeMap::new(),
+        }
+    }
+
+    pub fn new_connection(&mut self, db: Arc<Db>, addr: SocketAddr) -> Arc<Mutex<Connection>> {
+        let id = self.counter;
+        let conn = Arc::new(Mutex::new(Connection {
+            id,
+            db,
+            addr,
+            current_db: 0,
+            name: None,
+        }));
+        self.counter += 1;
+        self.connections.insert(id, conn.clone());
+        conn
+    }
+}
 
 pub struct Connection {
+    id: u128,
     db: Arc<Db>,
-    current_db: i32,
+    current_db: u32,
+    addr: SocketAddr,
+    name: Option<String>,
 }
 
 impl Connection {
-    pub fn new(db: Arc<Db>) -> Self {
-        Self { db, current_db: 0 }
-    }
-
     pub fn db(&self) -> &Db {
         &self.db
     }
+
+    pub fn id(&self) -> u128 {
+        self.id
+    }
+
+    pub fn name(&self) -> &Option<String> {
+        &self.name
+    }
+
+    pub fn set_name(&mut self, name: String) {
+        self.name = Some(name);
+    }
+
+    pub fn current_db(&self) -> u32 {
+        self.current_db
+    }
+
+    pub fn info(&self) -> String {
+        format!(
+            "id={} addr={} name={:?} db={}\r\n",
+            self.id, self.addr, self.name, self.current_db
+        )
+    }
 }

+ 1 - 1
src/db/mod.rs

@@ -50,7 +50,7 @@ impl Db {
             }
             None => {
                 entries.insert(key.clone(), "1".into());
-                Ok((1 as i64).into())
+                Ok((1_i64).into())
             }
         }
     }

+ 19 - 4
src/dispatcher.rs

@@ -27,21 +27,31 @@ dispatcher! {
         ["random" "loading" "stale"],
         1,
     },
-    incr {
-        cmd::string::incr,
-        ["write" "denyoom" "fast"],
-        2,
+    client {
+        cmd::client::client,
+        ["random" "loading" "stale"],
+        -2,
     },
     decr {
         cmd::string::decr,
         ["write" "denyoom" "fast"],
         2,
     },
+    echo {
+        cmd::client::echo,
+        ["random" "loading" "stale"],
+        2,
+    },
     get {
         cmd::string::get,
         ["random" "loading" "stale"],
         2,
     },
+    incr {
+        cmd::string::incr,
+        ["write" "denyoom" "fast"],
+        2,
+    },
     set {
         cmd::string::set,
         ["random" "loading" "stale"],
@@ -52,6 +62,11 @@ dispatcher! {
         ["random" "loading" "stale"],
         -3,
     },
+    ping {
+        cmd::client::ping,
+        ["random" "loading" "stale"],
+        -1,
+    },
     time {
         do_time,
         ["random" "loading" "stale"],

+ 5 - 0
src/error.rs

@@ -4,6 +4,7 @@ pub enum Error {
     CommandNotFound(String),
     InvalidArgsCount(String),
     ProtocolError(String, String),
+    WrongArgument(String, String),
     NotANumber,
     WrongType,
 }
@@ -20,6 +21,10 @@ impl From<Error> for Value {
             Error::InvalidArgsCount(x) => format!("wrong number of arguments for '{}' command", x),
             Error::ProtocolError(x, y) => format!("Protocol error: expected '{}', got '{}'", x, y),
             Error::NotANumber => "value is not an integer or out of range".to_string(),
+            Error::WrongArgument(x, y) => format!(
+                "Unknown subcommand or wrong number of arguments for '{}'. Try {} HELP.",
+                y, x
+            ),
             Error::WrongType => {
                 "Operation against a key holding the wrong kind of value".to_string()
             }

+ 13 - 2
src/macros.rs

@@ -27,7 +27,7 @@ macro_rules! dispatcher {
                 }
 
                 impl ExecutableCommand for Command {
-                    fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+                    fn execute(&self, conn: &mut Connection, args: &[Bytes]) -> Result<Value, Error> {
                         $handler(conn, args)
                     }
 
@@ -49,7 +49,7 @@ macro_rules! dispatcher {
         use std::ops::Deref;
 
         pub trait ExecutableCommand {
-            fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
+            fn execute(&self, conn: &mut Connection, args: &[Bytes]) -> Result<Value, Error>;
 
             fn check_number_args(&self, n: usize) -> bool;
 
@@ -119,3 +119,14 @@ macro_rules! value_vec_try_from {
         }
     }
 }
+
+#[macro_export]
+macro_rules! option {
+    {$type: expr} => {
+        if let Some(val) = $type {
+            val.into()
+        } else {
+            Value::Null
+        }
+    }
+}

+ 16 - 15
src/main.rs

@@ -7,15 +7,12 @@ mod macros;
 mod value;
 
 use bytes::{Buf, Bytes, BytesMut};
-use connection::Connection;
+use connection::Connections;
 use dispatcher::Dispatcher;
 use futures::SinkExt;
-use log::info;
+use log::{info, trace, warn};
 use redis_zero_protocol_parser::{parse_server, Error as RedisError};
-use std::env;
-use std::error::Error;
-use std::ops::Deref;
-use std::{io, sync::Arc};
+use std::{env, error::Error, io, ops::Deref, sync::Arc};
 use tokio::net::TcpListener;
 use tokio_stream::StreamExt;
 use tokio_util::codec::{Decoder, Encoder, Framed};
@@ -33,32 +30,38 @@ async fn main() -> Result<(), Box<dyn Error>> {
     info!("Listening on: {}", addr);
 
     let db = Arc::new(db::Db::new(1000));
+    let mut all_connections = Connections::new();
 
     loop {
         match listener.accept().await {
-            Ok((socket, _)) => {
-                let conn = Connection::new(db.clone());
+            Ok((socket, addr)) => {
+                let conn = all_connections.new_connection(db.clone(), addr);
 
                 tokio::spawn(async move {
                     let mut transport = Framed::new(socket, RedisParser);
 
+                    trace!("New connection {}", conn.lock().unwrap().id());
+
                     while let Some(result) = transport.next().await {
                         match result {
                             Ok(args) => match Dispatcher::new(&args) {
                                 Ok(handler) => {
                                     let r = handler
                                         .deref()
-                                        .execute(&conn, &args)
+                                        .execute(&mut conn.lock().unwrap(), &args)
                                         .unwrap_or_else(|x| x.into());
-                                    transport.send(r).await;
+                                    if transport.send(r).await.is_err() {
+                                        break;
+                                    }
                                 }
                                 Err(err) => {
-                                    let err: Value = err.into();
-                                    transport.send(err).await;
+                                    if transport.send(err.into()).await.is_err() {
+                                        break;
+                                    }
                                 }
                             },
                             Err(e) => {
-                                println!("error on decoding from socket; error = {:?}", e);
+                                warn!("error on decoding from socket; error = {:?}", e);
                                 break;
                             }
                         }
@@ -68,8 +71,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
             Err(e) => println!("error accepting socket; error = {:?}", e),
         }
     }
-
-    Ok(())
 }
 
 struct RedisParser;

+ 7 - 0
src/value.rs

@@ -39,6 +39,7 @@ impl From<&Value> for Vec<u8> {
                 s.to_vec()
             }
             Value::Err(x, y) => format!("-{} {}\r\n", x, y).into(),
+            Value::String(x) => format!("+{}\r\n", x).into(),
             Value::OK => "+OK\r\n".into(),
             _ => b"*-1\r\n".to_vec(),
         }
@@ -96,3 +97,9 @@ impl From<&str> for Value {
 }
 
 value_vec_try_from!(&str);
+
+impl From<String> for Value {
+    fn from(value: String) -> Value {
+        value.as_str().into()
+    }
+}