Browse Source

First server working

Cesar Rodas 3 years ago
parent
commit
660239748f
6 changed files with 241 additions and 7 deletions
  1. 15 0
      Cargo.toml
  2. 9 0
      src/db/mod.rs
  3. 35 0
      src/dispatcher.rs
  4. 105 0
      src/macros.rs
  5. 30 5
      src/main.rs
  6. 47 2
      src/value.rs

+ 15 - 0
Cargo.toml

@@ -0,0 +1,15 @@
+[package]
+name = "microredis"
+version = "0.1.0"
+authors = ["Cesar Rodas <cesar@rodasm.com.py>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+redis-zero-parser = {path = "parser"}
+tokio={version="1", features = ["full", "tracing"] }
+tokio-util={version="^0.6", features = ["full"] }
+futures = { version = "0.3.0", features = ["thread-pool"]}
+tokio-stream="0.1"
+bytes = "1"

+ 9 - 0
src/db/mod.rs

@@ -0,0 +1,9 @@
+use crate::value::Value;
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, RwLock};
+use tokio::time::Instant;
+
+pub struct Db {
+    entries: Arc<RwLock<HashMap<String, Value>>>,
+    expiration: Arc<RwLock<BTreeMap<(Instant, u64), String>>>,
+}

+ 35 - 0
src/dispatcher.rs

@@ -0,0 +1,35 @@
+use crate::{dispatcher, value::Value};
+use std::time::SystemTime;
+use std::time::UNIX_EPOCH;
+
+fn do_time(_args: &[Value]) -> Result<Value, String> {
+    let now = SystemTime::now();
+    let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
+    let seconds = format!("{}", since_the_epoch.as_secs());
+    let millis = format!("{}", since_the_epoch.subsec_millis());
+
+    Ok(vec![seconds.as_str(), millis.as_str()].into())
+}
+
+fn do_command(_args: &[Value]) -> Result<Value, String> {
+    let now = SystemTime::now();
+    let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
+    let in_ms: i128 =
+        since_the_epoch.as_secs() as i128 * 1000 + since_the_epoch.subsec_millis() as i128;
+    Ok(format!("{}", in_ms).as_str().into())
+}
+
+dispatcher! {
+    command  {
+        do_command,
+        ["random" "loading" "stale"],
+    },
+    get {
+        do_command,
+        ["random" "loading" "stale"],
+    },
+    time {
+        do_time,
+        ["random" "loading" "stale"],
+    },
+}

+ 105 - 0
src/macros.rs

@@ -0,0 +1,105 @@
+#[macro_export]
+macro_rules! dispatcher {
+    {
+        $($command:ident {
+            $handler:ident,
+            [$($tag:tt)+],
+        },)+$(,)?
+    }=>  {
+        $(
+            #[allow(non_snake_case, non_camel_case_types)]
+            pub mod $command {
+                use super::*;
+
+                pub struct Command {
+                    pub tags: &'static [&'static str],
+                }
+
+                impl Command {
+                    pub fn new() -> Self {
+                        Self {
+                            tags: &[$($tag,)+],
+                        }
+                    }
+                }
+
+                impl ExecutableCommand for Command {
+                    fn execute(&self, args: &[Value]) -> Result<Value, String> {
+                        $handler(args)
+                    }
+
+                    fn name(&self) -> &'static str {
+                        stringify!($command)
+                    }
+                }
+            }
+        )+
+        use std::ops::Deref;
+
+        pub trait ExecutableCommand {
+            fn execute(&self, args: &[Value]) -> Result<Value, String>;
+
+            fn name(&self) -> &'static str;
+        }
+
+        #[allow(non_snake_case, non_camel_case_types)]
+        pub enum Dispatcher {
+            $(
+                $command($command::Command),
+            )+
+        }
+
+        impl Dispatcher {
+            pub fn new(command: &Value) -> Result<Self, String> {
+                let command = match command {
+                    Value::String(x) => Ok(x.as_str()),
+                    Value::Blob(x) => Ok(unsafe { std::str::from_utf8_unchecked(&x) }),
+                    _ => Err("Invalid type"),
+                }?;
+
+                match command.to_lowercase().as_str() {
+                $(
+                    stringify!($command) => Ok(Self::$command($command::Command::new())),
+                )+
+                    _ => Err(format!("Command ({}) not found", command)),
+                }
+            }
+        }
+
+        impl Deref for Dispatcher {
+            type Target = dyn ExecutableCommand + Sync + Send;
+
+            fn deref(&self) -> &(dyn ExecutableCommand + Sync + Send + 'static) {
+                match self {
+                    $(
+                        Self::$command(v) => v as &(dyn ExecutableCommand + Sync + Send),
+                    )+
+                }
+            }
+        }
+    }
+}
+
+#[macro_export]
+macro_rules! value_try_from {
+    {$type: ty, $value: expr} => {
+        impl From<$type> for Value {
+            fn from(value: $type) -> Value {
+                $value(value.into())
+            }
+        }
+
+        value_vec_try_from!($type);
+    }
+}
+
+#[macro_export]
+macro_rules! value_vec_try_from {
+    {$type: ty} => {
+        impl From<Vec<$type>> for Value {
+            fn from(value: Vec<$type>) -> Value {
+                Value::Array(value.iter().map(|x| (*x).into()).collect())
+            }
+        }
+    }
+}

+ 30 - 5
src/main.rs

@@ -1,17 +1,23 @@
+mod dispatcher;
+mod db;
+mod macros;
 mod value;
 
 use bytes::{Buf, BytesMut};
+use dispatcher::Dispatcher;
+use futures::SinkExt;
 use redis_zero_parser::{parse, Error as RedisError};
 use std::convert::TryFrom;
 use std::env;
 use std::error::Error;
+use std::ops::Deref;
 use std::{
     io,
     sync::{Arc, Mutex},
 };
 use tokio::net::{TcpListener, TcpStream};
 use tokio_stream::StreamExt;
-use tokio_util::codec::{Decoder, Framed};
+use tokio_util::codec::{Decoder, Encoder, Framed};
 use value::Value;
 
 #[tokio::main]
@@ -27,12 +33,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
         match listener.accept().await {
             Ok((socket, _)) => {
                 tokio::spawn(async move {
-                    let mut lines = Framed::new(socket, RedisParser);
+                    let mut transport = Framed::new(socket, RedisParser);
 
-                    while let Some(result) = lines.next().await {
+                    while let Some(result) = transport.next().await {
                         match result {
-                            Ok(line) => {
-                                println!("x => ({:?})", line);
+                            Ok(Value::Array(args)) => match Dispatcher::new(&args[0]) {
+                                Ok(handler) => {
+                                    let r = handler.deref().execute(&args);
+                                    transport.send(r.unwrap()).await;
+                                }
+                                Err(err) => {
+                                    println!("invalid command {:?}", err);
+                                }
+                            },
+                            Ok(x) => {
+                                println!("Invalid message {:?}", x);
                             }
                             Err(e) => {
                                 println!("error on decoding from socket; error = {:?}", e);
@@ -51,6 +66,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
 
 struct RedisParser;
 
+impl Encoder<Value> for RedisParser {
+    type Error = io::Error;
+
+    fn encode(&mut self, response: Value, dst: &mut BytesMut) -> io::Result<()> {
+        let v: Vec<u8> = response.into();
+        dst.extend_from_slice(&v);
+        Ok(())
+    }
+}
+
 impl Decoder for RedisParser {
     type Item = Value;
     type Error = io::Error;

+ 47 - 2
src/value.rs

@@ -1,10 +1,12 @@
+use crate::{value_try_from, value_vec_try_from};
 use redis_zero_parser::Value as ParsedValue;
+use bytes::{Bytes, BytesMut};
 use std::convert::TryFrom;
 
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
     Array(Vec<Value>),
-    Blob(Vec<u8>),
+    Blob(Bytes),
     String(String),
     Err(String, String),
     Integer(i64),
@@ -14,13 +16,45 @@ pub enum Value {
     Null,
 }
 
+impl From<&Value> for Vec<u8> {
+    fn from(value: &Value) -> Vec<u8> {
+        match value {
+            Value::Null => b"*-1\r\n".to_vec(),
+            Value::Array(x) => {
+                let mut s: Vec<u8> = format!("*{}\r\n", x.len()).into();
+                for i in x.iter() {
+                    let b: Vec<u8> = i.into();
+                    s.extend(b);
+                }
+                s.to_vec()
+            }
+            Value::Integer(x) => format!(":{}\r\n", x).into(),
+            Value::BigInteger(x) => format!("({}\r\n", x).into(),
+            Value::Blob(x) => {
+                let s = format!("${}\r\n", x.len());
+                let mut s: BytesMut = s.as_str().as_bytes().into();
+                s.extend_from_slice(&x);
+                s.extend_from_slice(b"\r\n");
+                s.to_vec()
+            }
+            _ => b"*-1\r\n".to_vec(),
+        }
+    }
+}
+
+impl From<Value> for Vec<u8> {
+    fn from(value: Value) -> Vec<u8> {
+        (&value).into()
+    }
+}
+
 impl<'a> TryFrom<&ParsedValue<'a>> for Value {
     type Error = &'static str;
 
     fn try_from(value: &ParsedValue) -> Result<Self, Self::Error> {
         Ok(match value {
             ParsedValue::String(x) => Self::String(x.to_string()),
-            ParsedValue::Blob(x) => Self::Blob(x.to_vec()),
+            ParsedValue::Blob(x) => Self::Blob(Bytes::copy_from_slice(*x)),
             ParsedValue::Array(x) => {
                 Self::Array(x.iter().map(|x| Value::try_from(x).unwrap()).collect())
             }
@@ -33,3 +67,14 @@ impl<'a> TryFrom<&ParsedValue<'a>> for Value {
         })
     }
 }
+
+value_try_from!(i64, Value::Integer);
+value_try_from!(i128, Value::BigInteger);
+
+impl From<&str> for Value {
+    fn from(value: &str) -> Value {
+        Value::Blob(Bytes::copy_from_slice(value.as_bytes()))
+    }
+}
+
+value_vec_try_from!(&str);