Ver código fonte

Merge pull request #1 from crodas/feature/expiration-with-keys

Added first version key expirations.
César D. Rodas 3 anos atrás
pai
commit
ee943fd053
6 arquivos alterados com 172 adições e 30 exclusões
  1. 35 0
      src/cmd/key.rs
  2. 1 0
      src/cmd/mod.rs
  3. 4 4
      src/cmd/string.rs
  4. 102 21
      src/db/mod.rs
  5. 20 0
      src/dispatcher.rs
  6. 10 5
      src/value.rs

+ 35 - 0
src/cmd/key.rs

@@ -0,0 +1,35 @@
+use crate::{connection::Connection, error::Error, value::bytes_to_number, value::Value};
+use bytes::Bytes;
+use tokio::time::{Duration, Instant};
+
+pub fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(conn.db().del(&args[1..]))
+}
+
+pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let expires_at: i64 = bytes_to_number(&args[2])?;
+
+    if expires_at <= 0 {
+        return Ok(conn.db().del(&args[1..2]));
+    }
+
+    let expires_at = Duration::new(expires_at as u64, 0);
+
+    Ok(conn
+        .db()
+        .expire(&args[1], expires_at))
+}
+
+pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(conn.db().persist(&args[1]))
+}
+
+pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let ttl = match conn.db().ttl(&args[1]) {
+        Some(Some(ttl)) => (ttl - Instant::now()).as_secs() as i64,
+        Some(None) => -1,
+        None => -2,
+    };
+
+    Ok(ttl.into())
+}

+ 1 - 0
src/cmd/mod.rs

@@ -1,2 +1,3 @@
 pub mod client;
+pub mod key;
 pub mod string;

+ 4 - 4
src/cmd/string.rs

@@ -1,6 +1,6 @@
 use crate::{connection::Connection, error::Error, value::Value};
-use std::convert::TryInto;
 use bytes::Bytes;
+use std::convert::TryInto;
 
 pub fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = (&Value::Blob(args[2].to_owned())).try_into()?;
@@ -16,13 +16,13 @@ pub fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 }
 
 pub fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get(&args[1])
+    Ok(conn.db().get(&args[1]))
 }
 
 pub fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().set(&args[1], &Value::Blob(args[2].to_owned()))
+    Ok(conn.db().set(&args[1], &Value::Blob(args[2].to_owned())))
 }
 
 pub fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().getset(&args[1], &Value::Blob(args[2].to_owned()))
+    Ok(conn.db().getset(&args[1], &Value::Blob(args[2].to_owned())))
 }

+ 102 - 21
src/db/mod.rs

@@ -2,14 +2,47 @@ use crate::{error::Error, value::Value};
 use bytes::Bytes;
 use log::trace;
 use seahash::hash;
-use std::collections::{BTreeMap, HashMap};
-use std::convert::TryInto;
-use std::sync::RwLock;
-use tokio::time::Instant;
+use std::{
+    collections::{BTreeMap, HashMap},
+    convert::TryInto,
+    sync::RwLock,
+};
+use tokio::time::{Duration, Instant};
+
+#[derive(Debug)]
+pub struct Entry {
+    pub value: Value,
+    pub expires_at: Option<Instant>,
+}
+
+impl Entry {
+    pub fn new(value: Value) -> Self {
+        Self {
+            value,
+            expires_at: None,
+        }
+    }
+
+    pub fn change_value(&mut self, value: Value) {
+        self.value = value;
+    }
+
+    pub fn get_mut(&mut self) -> &mut Value {
+        &mut self.value
+    }
+
+    pub fn get(&self) -> &Value {
+        &self.value
+    }
+
+    pub fn is_valid(&self) -> bool {
+        self.expires_at.map_or(true, |x| x > Instant::now())
+    }
+}
 
 #[derive(Debug)]
 pub struct Db {
-    entries: Vec<RwLock<HashMap<Bytes, Value>>>,
+    entries: Vec<RwLock<HashMap<Bytes, Entry>>>,
     expirations: RwLock<BTreeMap<(Instant, u64), String>>,
     slots: usize,
 }
@@ -38,38 +71,86 @@ impl Db {
 
     pub fn incr(&self, key: &Bytes, incr_by: i64) -> Result<Value, Error> {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-        match entries.get(key) {
+        match entries.get_mut(key) {
             Some(x) => {
-                let mut val: i64 = x.try_into()?;
+                let value = x.get();
+                let mut number: i64 = value.try_into()?;
 
-                val += incr_by;
+                number += incr_by;
 
-                entries.insert(key.clone(), format!("{}", val).as_str().into());
+                x.change_value(format!("{}", number).as_str().into());
 
-                Ok(val.into())
+                Ok(number.into())
             }
             None => {
-                entries.insert(key.clone(), incr_by.into());
-                Ok((incr_by  as i64).into())
+                entries.insert(key.clone(), Entry::new(incr_by.into()));
+                Ok((incr_by as i64).into())
             }
         }
     }
 
-    pub fn get(&self, key: &Bytes) -> Result<Value, Error> {
+    pub fn persist(&self, key: &Bytes) -> Value {
+        let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+        entries
+            .get_mut(key)
+            .filter(|x| x.is_valid())
+            .map_or(0_i64.into(), |mut x| {
+                let ret = x.expires_at.map_or(0_i64, |_| 1_i64);
+                x.expires_at = None;
+                ret.into()
+            })
+    }
+
+    pub fn expire(&self, key: &Bytes, time: Duration) -> Value {
+        let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+        entries
+            .get_mut(key)
+            .filter(|x| x.is_valid())
+            .map_or(0_i64.into(), |mut x| {
+                x.expires_at = Some(Instant::now() + time);
+                1_i64.into()
+            })
+    }
+
+    pub fn del(&self, keys: &[Bytes]) -> Value {
+        let mut deleted = 0_i64;
+        keys.iter().map(|key| {
+            let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+            if entries.remove(key).is_some() {
+                deleted += 1;
+            }
+        }).for_each(drop);
+
+        deleted.into()
+    }
+
+    pub fn get(&self, key: &Bytes) -> Value {
         let entries = self.entries[self.get_slot(key)].read().unwrap();
-        Ok(entries.get(key).cloned().unwrap_or(Value::Null))
+        entries
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map_or(Value::Null, |x| x.get().clone())
     }
 
-    pub fn getset(&self, key: &Bytes, value: &Value) -> Result<Value, Error> {
+    pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-        let prev = entries.get(key).cloned().unwrap_or(Value::Null);
-        entries.insert(key.clone(), value.clone());
-        Ok(prev)
+        entries
+            .insert(key.clone(), Entry::new(value.clone()))
+            .filter(|x| x.is_valid())
+            .map_or(Value::Null, |x| x.get().clone())
     }
 
-    pub fn set(&self, key: &Bytes, value: &Value) -> Result<Value, Error> {
+    pub fn set(&self, key: &Bytes, value: &Value) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-        entries.insert(key.clone(), value.clone());
-        Ok(Value::OK)
+        entries.insert(key.clone(), Entry::new(value.clone()));
+        Value::OK
+    }
+
+    pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
+        let entries = self.entries[self.get_slot(key)].read().unwrap();
+        entries
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.expires_at)
     }
 }

+ 20 - 0
src/dispatcher.rs

@@ -42,6 +42,16 @@ dispatcher! {
         ["random" "loading" "stale"],
         2,
     },
+    expire {
+        cmd::key::expire,
+        ["read" "write" "fast"],
+        3,
+    },
+    del {
+        cmd::key::del,
+        ["random" "loading" "stale"],
+        -2,
+    },
     get {
         cmd::string::get,
         ["random" "loading" "stale"],
@@ -57,6 +67,16 @@ dispatcher! {
         ["write" "denyoom" "fast"],
         3,
     },
+    persist {
+        cmd::key::persist,
+        ["write" "fast"],
+        2,
+    },
+    ttl {
+        cmd::key::ttl,
+        ["read" "read"],
+        2,
+    },
     set {
         cmd::string::set,
         ["random" "loading" "stale"],

+ 10 - 5
src/value.rs

@@ -1,7 +1,10 @@
 use crate::{error::Error, value_try_from, value_vec_try_from};
 use bytes::{Bytes, BytesMut};
 use redis_zero_protocol_parser::Value as ParsedValue;
-use std::convert::{TryFrom, TryInto};
+use std::{
+    convert::{TryFrom, TryInto},
+    str::FromStr
+};
 
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
@@ -53,16 +56,18 @@ impl TryFrom<&Value> for i64 {
         match val {
             Value::BigInteger(x) => (*x).try_into().map_err(|_| Error::NotANumber),
             Value::Integer(x) => Ok(*x),
-            Value::Blob(x) => {
-                let x = unsafe { std::str::from_utf8_unchecked(x) };
-                x.parse::<i64>().map_err(|_| Error::NotANumber)
-            }
+            Value::Blob(x) => bytes_to_number::<i64>(&x),
             Value::String(x) => x.parse::<i64>().map_err(|_| Error::NotANumber),
             _ => Err(Error::NotANumber),
         }
     }
 }
 
+pub fn bytes_to_number<T: FromStr>(bytes: &Bytes) -> Result<T, Error> {
+    let x = unsafe { std::str::from_utf8_unchecked(bytes) };
+    x.parse::<T>().map_err(|_| Error::NotANumber)
+}
+
 impl From<Value> for Vec<u8> {
     fn from(value: Value) -> Vec<u8> {
         (&value).into()