Sfoglia il codice sorgente

Added first version key expirations.

The first version is pretty simple, just keep track when a value would
expire and check that value before returning any value (in db.rs, mainly
in `get` and `getset` methods and any reading operation).

The second version would spawn a worker that would run every few seconds
to remove expired keys and release some memory.

The first version is enough to be compatible with redis promised
behaviour of "removing" expired keys.
Cesar Rodas 3 anni fa
parent
commit
a4d6be1c81
6 ha cambiato i file con 172 aggiunte e 30 eliminazioni
  1. 35 0
      src/cmd/key.rs
  2. 1 0
      src/cmd/mod.rs
  3. 4 4
      src/cmd/string.rs
  4. 102 21
      src/db/mod.rs
  5. 20 0
      src/dispatcher.rs
  6. 10 5
      src/value.rs

+ 35 - 0
src/cmd/key.rs

@@ -0,0 +1,35 @@
+use crate::{connection::Connection, error::Error, value::bytes_to_number, value::Value};
+use bytes::Bytes;
+use tokio::time::{Duration, Instant};
+
+pub fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(conn.db().del(&args[1..]))
+}
+
+pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let expires_at: i64 = bytes_to_number(&args[2])?;
+
+    if expires_at <= 0 {
+        return Ok(conn.db().del(&args[1..2]));
+    }
+
+    let expires_at = Duration::new(expires_at as u64, 0);
+
+    Ok(conn
+        .db()
+        .expire(&args[1], expires_at))
+}
+
+pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(conn.db().persist(&args[1]))
+}
+
+pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let ttl = match conn.db().ttl(&args[1]) {
+        Some(Some(ttl)) => (ttl - Instant::now()).as_secs() as i64,
+        Some(None) => -1,
+        None => -2,
+    };
+
+    Ok(ttl.into())
+}

+ 1 - 0
src/cmd/mod.rs

@@ -1,2 +1,3 @@
 pub mod client;
 pub mod client;
+pub mod key;
 pub mod string;
 pub mod string;

+ 4 - 4
src/cmd/string.rs

@@ -1,6 +1,6 @@
 use crate::{connection::Connection, error::Error, value::Value};
 use crate::{connection::Connection, error::Error, value::Value};
-use std::convert::TryInto;
 use bytes::Bytes;
 use bytes::Bytes;
+use std::convert::TryInto;
 
 
 pub fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub fn incr_by(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let by: i64 = (&Value::Blob(args[2].to_owned())).try_into()?;
     let by: i64 = (&Value::Blob(args[2].to_owned())).try_into()?;
@@ -16,13 +16,13 @@ pub fn decr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 }
 }
 
 
 pub fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub fn get(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().get(&args[1])
+    Ok(conn.db().get(&args[1]))
 }
 }
 
 
 pub fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().set(&args[1], &Value::Blob(args[2].to_owned()))
+    Ok(conn.db().set(&args[1], &Value::Blob(args[2].to_owned())))
 }
 }
 
 
 pub fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 pub fn getset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    conn.db().getset(&args[1], &Value::Blob(args[2].to_owned()))
+    Ok(conn.db().getset(&args[1], &Value::Blob(args[2].to_owned())))
 }
 }

+ 102 - 21
src/db/mod.rs

@@ -2,14 +2,47 @@ use crate::{error::Error, value::Value};
 use bytes::Bytes;
 use bytes::Bytes;
 use log::trace;
 use log::trace;
 use seahash::hash;
 use seahash::hash;
-use std::collections::{BTreeMap, HashMap};
-use std::convert::TryInto;
-use std::sync::RwLock;
-use tokio::time::Instant;
+use std::{
+    collections::{BTreeMap, HashMap},
+    convert::TryInto,
+    sync::RwLock,
+};
+use tokio::time::{Duration, Instant};
+
+#[derive(Debug)]
+pub struct Entry {
+    pub value: Value,
+    pub expires_at: Option<Instant>,
+}
+
+impl Entry {
+    pub fn new(value: Value) -> Self {
+        Self {
+            value,
+            expires_at: None,
+        }
+    }
+
+    pub fn change_value(&mut self, value: Value) {
+        self.value = value;
+    }
+
+    pub fn get_mut(&mut self) -> &mut Value {
+        &mut self.value
+    }
+
+    pub fn get(&self) -> &Value {
+        &self.value
+    }
+
+    pub fn is_valid(&self) -> bool {
+        self.expires_at.map_or(true, |x| x > Instant::now())
+    }
+}
 
 
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct Db {
 pub struct Db {
-    entries: Vec<RwLock<HashMap<Bytes, Value>>>,
+    entries: Vec<RwLock<HashMap<Bytes, Entry>>>,
     expirations: RwLock<BTreeMap<(Instant, u64), String>>,
     expirations: RwLock<BTreeMap<(Instant, u64), String>>,
     slots: usize,
     slots: usize,
 }
 }
@@ -38,38 +71,86 @@ impl Db {
 
 
     pub fn incr(&self, key: &Bytes, incr_by: i64) -> Result<Value, Error> {
     pub fn incr(&self, key: &Bytes, incr_by: i64) -> Result<Value, Error> {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-        match entries.get(key) {
+        match entries.get_mut(key) {
             Some(x) => {
             Some(x) => {
-                let mut val: i64 = x.try_into()?;
+                let value = x.get();
+                let mut number: i64 = value.try_into()?;
 
 
-                val += incr_by;
+                number += incr_by;
 
 
-                entries.insert(key.clone(), format!("{}", val).as_str().into());
+                x.change_value(format!("{}", number).as_str().into());
 
 
-                Ok(val.into())
+                Ok(number.into())
             }
             }
             None => {
             None => {
-                entries.insert(key.clone(), incr_by.into());
-                Ok((incr_by  as i64).into())
+                entries.insert(key.clone(), Entry::new(incr_by.into()));
+                Ok((incr_by as i64).into())
             }
             }
         }
         }
     }
     }
 
 
-    pub fn get(&self, key: &Bytes) -> Result<Value, Error> {
+    pub fn persist(&self, key: &Bytes) -> Value {
+        let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+        entries
+            .get_mut(key)
+            .filter(|x| x.is_valid())
+            .map_or(0_i64.into(), |mut x| {
+                let ret = x.expires_at.map_or(0_i64, |_| 1_i64);
+                x.expires_at = None;
+                ret.into()
+            })
+    }
+
+    pub fn expire(&self, key: &Bytes, time: Duration) -> Value {
+        let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+        entries
+            .get_mut(key)
+            .filter(|x| x.is_valid())
+            .map_or(0_i64.into(), |mut x| {
+                x.expires_at = Some(Instant::now() + time);
+                1_i64.into()
+            })
+    }
+
+    pub fn del(&self, keys: &[Bytes]) -> Value {
+        let mut deleted = 0_i64;
+        keys.iter().map(|key| {
+            let mut entries = self.entries[self.get_slot(key)].write().unwrap();
+            if entries.remove(key).is_some() {
+                deleted += 1;
+            }
+        }).for_each(drop);
+
+        deleted.into()
+    }
+
+    pub fn get(&self, key: &Bytes) -> Value {
         let entries = self.entries[self.get_slot(key)].read().unwrap();
         let entries = self.entries[self.get_slot(key)].read().unwrap();
-        Ok(entries.get(key).cloned().unwrap_or(Value::Null))
+        entries
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map_or(Value::Null, |x| x.get().clone())
     }
     }
 
 
-    pub fn getset(&self, key: &Bytes, value: &Value) -> Result<Value, Error> {
+    pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-        let prev = entries.get(key).cloned().unwrap_or(Value::Null);
-        entries.insert(key.clone(), value.clone());
-        Ok(prev)
+        entries
+            .insert(key.clone(), Entry::new(value.clone()))
+            .filter(|x| x.is_valid())
+            .map_or(Value::Null, |x| x.get().clone())
     }
     }
 
 
-    pub fn set(&self, key: &Bytes, value: &Value) -> Result<Value, Error> {
+    pub fn set(&self, key: &Bytes, value: &Value) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-        entries.insert(key.clone(), value.clone());
-        Ok(Value::OK)
+        entries.insert(key.clone(), Entry::new(value.clone()));
+        Value::OK
+    }
+
+    pub fn ttl(&self, key: &Bytes) -> Option<Option<Instant>> {
+        let entries = self.entries[self.get_slot(key)].read().unwrap();
+        entries
+            .get(key)
+            .filter(|x| x.is_valid())
+            .map(|x| x.expires_at)
     }
     }
 }
 }

+ 20 - 0
src/dispatcher.rs

@@ -42,6 +42,16 @@ dispatcher! {
         ["random" "loading" "stale"],
         ["random" "loading" "stale"],
         2,
         2,
     },
     },
+    expire {
+        cmd::key::expire,
+        ["read" "write" "fast"],
+        3,
+    },
+    del {
+        cmd::key::del,
+        ["random" "loading" "stale"],
+        -2,
+    },
     get {
     get {
         cmd::string::get,
         cmd::string::get,
         ["random" "loading" "stale"],
         ["random" "loading" "stale"],
@@ -57,6 +67,16 @@ dispatcher! {
         ["write" "denyoom" "fast"],
         ["write" "denyoom" "fast"],
         3,
         3,
     },
     },
+    persist {
+        cmd::key::persist,
+        ["write" "fast"],
+        2,
+    },
+    ttl {
+        cmd::key::ttl,
+        ["read" "read"],
+        2,
+    },
     set {
     set {
         cmd::string::set,
         cmd::string::set,
         ["random" "loading" "stale"],
         ["random" "loading" "stale"],

+ 10 - 5
src/value.rs

@@ -1,7 +1,10 @@
 use crate::{error::Error, value_try_from, value_vec_try_from};
 use crate::{error::Error, value_try_from, value_vec_try_from};
 use bytes::{Bytes, BytesMut};
 use bytes::{Bytes, BytesMut};
 use redis_zero_protocol_parser::Value as ParsedValue;
 use redis_zero_protocol_parser::Value as ParsedValue;
-use std::convert::{TryFrom, TryInto};
+use std::{
+    convert::{TryFrom, TryInto},
+    str::FromStr
+};
 
 
 #[derive(Debug, PartialEq, Clone)]
 #[derive(Debug, PartialEq, Clone)]
 pub enum Value {
 pub enum Value {
@@ -53,16 +56,18 @@ impl TryFrom<&Value> for i64 {
         match val {
         match val {
             Value::BigInteger(x) => (*x).try_into().map_err(|_| Error::NotANumber),
             Value::BigInteger(x) => (*x).try_into().map_err(|_| Error::NotANumber),
             Value::Integer(x) => Ok(*x),
             Value::Integer(x) => Ok(*x),
-            Value::Blob(x) => {
-                let x = unsafe { std::str::from_utf8_unchecked(x) };
-                x.parse::<i64>().map_err(|_| Error::NotANumber)
-            }
+            Value::Blob(x) => bytes_to_number::<i64>(&x),
             Value::String(x) => x.parse::<i64>().map_err(|_| Error::NotANumber),
             Value::String(x) => x.parse::<i64>().map_err(|_| Error::NotANumber),
             _ => Err(Error::NotANumber),
             _ => Err(Error::NotANumber),
         }
         }
     }
     }
 }
 }
 
 
+pub fn bytes_to_number<T: FromStr>(bytes: &Bytes) -> Result<T, Error> {
+    let x = unsafe { std::str::from_utf8_unchecked(bytes) };
+    x.parse::<T>().map_err(|_| Error::NotANumber)
+}
+
 impl From<Value> for Vec<u8> {
 impl From<Value> for Vec<u8> {
     fn from(value: Value) -> Vec<u8> {
     fn from(value: Value) -> Vec<u8> {
         (&value).into()
         (&value).into()