1
0
Эх сурвалжийг харах

Add server parsing

Add special "Server parsing" that will accept only an array of blobs.
Any other data passed would fail right away.

This is to be compatible with redis server. The full parser should still
be used for other purposes, like a future redis-load balancer.

Added also INCR command support and other types conversions.
Cesar Rodas 3 жил өмнө
parent
commit
9d8241fb17

+ 34 - 0
parser/src/lib.rs

@@ -23,9 +23,43 @@ pub enum Error {
     InvalidLength,
     InvalidBoolean,
     InvalidNumber,
+    ProtocolError(u8, u8),
     NewLine,
 }
 
+pub fn parse_server(bytes: &[u8]) -> Result<(&[u8], Vec<&[u8]>), Error> {
+    let (bytes, byte) = next!(bytes);
+    match byte {
+        b'*' => parse_server_array(bytes),
+        _ => Err(Error::ProtocolError(b'*', byte)),
+    }
+}
+
+fn parse_server_array(bytes: &[u8]) -> Result<(&[u8], Vec<&[u8]>), Error> {
+    let (bytes, len) = read_line_number!(bytes, i32);
+    if len <= 0 {
+        return Err(Error::ProtocolError(b'x', b'y'));
+    }
+
+    let mut v = vec![];
+    let mut bytes = bytes;
+
+    for _i in 0..len {
+        let n = next!(bytes);
+        let r = match n.1 {
+            b'$' => parse_blob(n.0),
+            _ => Err(Error::ProtocolError(b'$', n.1)),
+        }?;
+        bytes = r.0;
+        v.push(match r.1 {
+            Value::Blob(x) => Ok(x),
+            _ => Err(Error::ProtocolError(b'x', b'y')),
+        }?);
+    }
+
+    Ok((bytes, v))
+}
+
 pub fn parse(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
     let (bytes, byte) = next!(bytes);
     match byte {

+ 1 - 0
src/commands/mod.rs

@@ -0,0 +1 @@
+pub mod string;

+ 14 - 0
src/commands/string.rs

@@ -0,0 +1,14 @@
+use crate::{db::Db, error::Error, value::Value};
+use bytes::Bytes;
+
+pub fn incr(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
+    db.incr(&args[1], 1)
+}
+
+pub fn get(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
+    db.get(&args[1])
+}
+
+pub fn set(db: &Db, args: &[Bytes]) -> Result<Value, Error> {
+    db.set(&args[1], &Value::Blob(args[2].clone()))
+}

+ 25 - 15
src/db/mod.rs

@@ -2,6 +2,7 @@ use crate::{error::Error, value::Value};
 use bytes::Bytes;
 use seahash::hash;
 use std::collections::{BTreeMap, HashMap};
+use std::convert::TryInto;
 use std::sync::RwLock;
 use tokio::time::Instant;
 
@@ -31,24 +32,33 @@ impl Db {
         (hash(key) as usize) % self.entries.len()
     }
 
-    pub fn get(&self, key: &Value) -> Result<Value, Error> {
-        match key {
-            Value::Blob(key) => {
-                let entries = self.entries[self.get_slot(key)].read().unwrap();
-                Ok(entries.get(key).cloned().unwrap_or(Value::Null))
+    pub fn incr(&self, key: &Bytes, incr_by: i64) -> Result<Value, Error> {
+        let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+        match entries.get(key) {
+            Some(x) => {
+                let mut val: i64 = x.try_into()?;
+
+                val += incr_by;
+
+                entries.insert(key.clone(), format!("{}", val).as_str().into());
+
+                Ok(val.into())
             }
-            _ => Err(Error::WrongType),
+            None => {
+                entries.insert(key.clone(), "1".into());
+                Ok((1 as i64).into())
+            },
         }
     }
 
-    pub fn set(&self, key: &Value, value: &Value) -> Result<Value, Error> {
-        match key {
-            Value::Blob(key) => {
-                let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-                entries.insert(key.clone(), value.clone());
-                Ok(Value::OK)
-            }
-            _ => Err(Error::WrongType),
-        }
+    pub fn get(&self, key: &Bytes) -> Result<Value, Error> {
+        let entries = self.entries[self.get_slot(key)].read().unwrap();
+        Ok(entries.get(key).cloned().unwrap_or(Value::Null))
+    }
+
+    pub fn set(&self, key: &Bytes, value: &Value) -> Result<Value, Error> {
+        let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+        entries.insert(key.clone(), value.clone());
+        Ok(Value::OK)
     }
 }

+ 11 - 13
src/dispatcher.rs

@@ -1,9 +1,10 @@
-use crate::{db::Db, dispatcher, error::Error, value::Value};
+use crate::{commands, db::Db, dispatcher, error::Error, value::Value};
+use bytes::Bytes;
 use std::convert::TryInto;
 use std::time::SystemTime;
 use std::time::UNIX_EPOCH;
 
-fn do_time(_db: &Db, _args: &[Value]) -> Result<Value, Error> {
+fn do_time(_db: &Db, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let seconds = format!("{}", since_the_epoch.as_secs());
@@ -12,7 +13,7 @@ fn do_time(_db: &Db, _args: &[Value]) -> Result<Value, Error> {
     Ok(vec![seconds.as_str(), millis.as_str()].into())
 }
 
-fn do_command(_db: &Db, _args: &[Value]) -> Result<Value, Error> {
+fn do_command(_db: &Db, _args: &[Bytes]) -> Result<Value, Error> {
     let now = SystemTime::now();
     let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
     let in_ms: i128 =
@@ -20,27 +21,24 @@ fn do_command(_db: &Db, _args: &[Value]) -> Result<Value, Error> {
     Ok(format!("{}", in_ms).as_str().into())
 }
 
-fn get(db: &Db, args: &[Value]) -> Result<Value, Error> {
-    db.get(&args[1])
-}
-
-fn set(db: &Db, args: &[Value]) -> Result<Value, Error> {
-    db.set(&args[1], &args[2])
-}
-
 dispatcher! {
     command  {
         do_command,
         ["random" "loading" "stale"],
         1,
     },
+    incr {
+        commands::string::incr,
+        ["write" "denyoom" "fast"],
+        2,
+    },
     get {
-        get,
+        commands::string::get,
         ["random" "loading" "stale"],
         2,
     },
     set {
-        set,
+        commands::string::set,
         ["random" "loading" "stale"],
         -3,
     },

+ 5 - 9
src/macros.rs

@@ -2,7 +2,7 @@
 macro_rules! dispatcher {
     {
         $($command:ident {
-            $handler:ident,
+            $handler:expr,
             [$($tag:tt)+],
             $min_args:expr,
         },)+$(,)?
@@ -27,7 +27,7 @@ macro_rules! dispatcher {
                 }
 
                 impl ExecutableCommand for Command {
-                    fn execute(&self, db: &Db, args: &[Value]) -> Result<Value, Error> {
+                    fn execute(&self, db: &Db, args: &[Bytes]) -> Result<Value, Error> {
                         $handler(db, args)
                     }
 
@@ -49,7 +49,7 @@ macro_rules! dispatcher {
         use std::ops::Deref;
 
         pub trait ExecutableCommand {
-            fn execute(&self, db: &Db, args: &[Value]) -> Result<Value, Error>;
+            fn execute(&self, db: &Db, args: &[Bytes]) -> Result<Value, Error>;
 
             fn check_number_args(&self, n: usize) -> bool;
 
@@ -64,12 +64,8 @@ macro_rules! dispatcher {
         }
 
         impl Dispatcher {
-            pub fn new(args: &[Value]) -> Result<Self, Error> {
-                let command = match &args[0] {
-                    Value::String(x) => Ok(x.as_str()),
-                    Value::Blob(x) => Ok(unsafe { std::str::from_utf8_unchecked(&x) }),
-                    _ => Err(Error::ProtocolError("$".to_string(), "*".to_string())),
-                }?;
+            pub fn new(args: &[Bytes]) -> Result<Self, Error> {
+                let command = unsafe { std::str::from_utf8_unchecked(&args[0]) };
 
                 let command = match command.to_lowercase().as_str() {
                 $(

+ 10 - 10
src/main.rs

@@ -1,14 +1,14 @@
+mod commands;
 mod db;
 mod dispatcher;
 mod error;
 mod macros;
 mod value;
 
-use bytes::{Buf, BytesMut};
+use bytes::{Buf, Bytes, BytesMut};
 use dispatcher::Dispatcher;
 use futures::SinkExt;
-use redis_zero_parser::{parse, Error as RedisError};
-use std::convert::TryFrom;
+use redis_zero_parser::{parse_server, Error as RedisError};
 use std::env;
 use std::error::Error;
 use std::ops::Deref;
@@ -38,7 +38,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
 
                     while let Some(result) = transport.next().await {
                         match result {
-                            Ok(Value::Array(args)) => match Dispatcher::new(&args) {
+                            Ok(args) => match Dispatcher::new(&args) {
                                 Ok(handler) => {
                                     let r = handler
                                         .deref()
@@ -51,9 +51,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
                                     transport.send(err).await;
                                 }
                             },
-                            Ok(x) => {
-                                println!("Invalid message {:?}", x);
-                            }
                             Err(e) => {
                                 println!("error on decoding from socket; error = {:?}", e);
                                 break;
@@ -82,17 +79,20 @@ impl Encoder<Value> for RedisParser {
 }
 
 impl Decoder for RedisParser {
-    type Item = Value;
+    type Item = Vec<Bytes>;
     type Error = io::Error;
 
     fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
         let (frame, proccesed) = {
-            let (unused, val) = match parse(src) {
+            let (unused, mut val) = match parse_server(src) {
                 Ok((buf, val)) => (buf, val),
                 Err(RedisError::Partial) => return Ok(None),
                 Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "something")),
             };
-            (Value::try_from(&val).unwrap(), src.len() - unused.len())
+            (
+                val.iter_mut().map(|e| Bytes::copy_from_slice(e)).collect(),
+                src.len() - unused.len(),
+            )
         };
 
         src.advance(proccesed);

+ 23 - 8
src/value.rs

@@ -1,7 +1,7 @@
-use crate::{value_try_from, value_vec_try_from};
+use crate::{error::Error, value_try_from, value_vec_try_from};
 use bytes::{Bytes, BytesMut};
 use redis_zero_parser::Value as ParsedValue;
-use std::convert::TryFrom;
+use std::convert::{TryFrom, TryInto};
 
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
@@ -45,17 +45,32 @@ impl From<&Value> for Vec<u8> {
     }
 }
 
+impl TryFrom<&Value> for i64 {
+    type Error = Error;
+
+    fn try_from(val: &Value) -> Result<Self, Self::Error> {
+        match val {
+            Value::BigInteger(x) => (*x).try_into().map_err(|_| Error::NotANumber),
+            Value::Integer(x) => Ok(*x),
+            Value::Blob(x) => {
+                let x = unsafe { std::str::from_utf8_unchecked(x) };
+                x.parse::<i64>().map_err(|_| Error::NotANumber)
+            }
+            Value::String(x) => x.parse::<i64>().map_err(|_| Error::NotANumber),
+            _ => Err(Error::NotANumber),
+        }
+    }
+}
+
 impl From<Value> for Vec<u8> {
     fn from(value: Value) -> Vec<u8> {
         (&value).into()
     }
 }
 
-impl<'a> TryFrom<&ParsedValue<'a>> for Value {
-    type Error = &'static str;
-
-    fn try_from(value: &ParsedValue) -> Result<Self, Self::Error> {
-        Ok(match value {
+impl<'a> From<&ParsedValue<'a>> for Value {
+    fn from(value: &ParsedValue) -> Self {
+        match value {
             ParsedValue::String(x) => Self::String(x.to_string()),
             ParsedValue::Blob(x) => Self::Blob(Bytes::copy_from_slice(*x)),
             ParsedValue::Array(x) => {
@@ -67,7 +82,7 @@ impl<'a> TryFrom<&ParsedValue<'a>> for Value {
             ParsedValue::Float(x) => Self::Float(*x),
             ParsedValue::Error(x, y) => Self::Err(x.to_string(), y.to_string()),
             ParsedValue::Null => Self::Null,
-        })
+        }
     }
 }