Ver código fonte

Code refactory

Cesar Rodas 3 anos atrás
pai
commit
1905444efe
6 arquivos alterados com 74 adições e 23 exclusões
  1. 3 3
      src/cmd/key.rs
  2. 19 2
      src/cmd/string.rs
  3. 1 0
      src/connection.rs
  4. 22 5
      src/db/entry.rs
  5. 19 13
      src/db/mod.rs
  6. 10 0
      src/dispatcher.rs

+ 3 - 3
src/cmd/key.rs

@@ -34,7 +34,7 @@ pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         Duration::from_millis(expires_in as u64)
     };
 
-    Ok(conn.db().add_expiration(&args[1], expires_at))
+    Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
 pub fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -57,7 +57,7 @@ pub fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         Duration::from_millis(expires_in as u64)
     };
 
-    Ok(conn.db().add_expiration(&args[1], expires_at))
+    Ok(conn.db().set_ttl(&args[1], expires_at))
 }
 
 pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
@@ -97,5 +97,5 @@ pub fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 }
 
 pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    Ok(conn.db().remove_expiration(&args[1]))
+    Ok(conn.db().persist(&args[1]))
 }

+ 19 - 2
src/cmd/string.rs

@@ -1,6 +1,9 @@
-use crate::{connection::Connection, error::Error, value::Value};
+use crate::{
+    check_arg, connection::Connection, error::Error, value::bytes_to_number, value::Value,
+};
 use bytes::Bytes;
 use std::{convert::TryInto, ops::Neg};
+use tokio::time::Duration;
 
 pub fn incr(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     conn.db().incr(&args[1], 1)
@@ -42,7 +45,21 @@ pub fn mget(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
 }
 
 pub fn set(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    Ok(conn.db().set(&args[1], &Value::Blob(args[2].to_owned())))
+    Ok(conn
+        .db()
+        .set(&args[1], &Value::Blob(args[2].to_owned()), None))
+}
+
+pub fn setex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let ttl = if check_arg!(args, 0, "SETEX") {
+        Duration::from_secs(bytes_to_number(&args[2])?)
+    } else {
+        Duration::from_millis(bytes_to_number(&args[2])?)
+    };
+
+    Ok(conn
+        .db()
+        .set(&args[1], &Value::Blob(args[2].to_owned()), Some(ttl)))
 }
 
 pub fn strlen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {

+ 1 - 0
src/connection.rs

@@ -56,6 +56,7 @@ impl Connection {
         self.name = Some(name);
     }
 
+    #[allow(dead_code)]
     pub fn current_db(&self) -> u32 {
         self.current_db
     }

+ 22 - 5
src/db/entry.rs

@@ -1,33 +1,50 @@
 use crate::value::Value;
-use tokio::time::Instant;
+use tokio::time::{Duration, Instant};
 
 #[derive(Debug)]
 pub struct Entry {
     pub value: Value,
-    pub expires_at: Option<Instant>,
+    expires_at: Option<Instant>,
 }
 
 /// Database Entry
 ///
-/// A database entry is a Value associated with an optional expiration time.
+/// A database entry is a Value associated with an optional ttl.
 ///
 /// The database will never return an entry if has expired already, by having
 /// this promise we can run the purge process every few seconds instead of doing
 /// so more frequently.
 impl Entry {
-    pub fn new(value: Value) -> Self {
+    pub fn new(value: Value, expires_in: Option<Duration>) -> Self {
         Self {
             value,
-            expires_at: None,
+            expires_at: expires_in.map(|duration| Instant::now() + duration),
         }
     }
 
+    pub fn persist(&mut self) {
+        self.expires_at = None;
+    }
+
+    pub fn get_ttl(&self) -> Option<Instant> {
+        self.expires_at
+    }
+
+    pub fn has_ttl(&self) -> bool {
+        self.expires_at.is_some()
+    }
+
+    pub fn set_ttl(&mut self, expires_in: Duration) {
+        self.expires_at = Some(Instant::now() + expires_in);
+    }
+
     /// Changes the value that is wrapped in this entry, the TTL (expired_at) is
     /// not affected.
     pub fn change_value(&mut self, value: Value) {
         self.value = value;
     }
 
+    #[allow(dead_code)]
     pub fn get_mut(&mut self) -> &mut Value {
         &mut self.value
     }

+ 19 - 13
src/db/mod.rs

@@ -78,31 +78,37 @@ impl Db {
                 Ok(number.into())
             }
             None => {
-                entries.insert(key.clone(), Entry::new(incr_by.to_string().as_str().into()));
+                entries.insert(
+                    key.clone(),
+                    Entry::new(incr_by.to_string().as_str().into(), None),
+                );
                 Ok((incr_by as T).into())
             }
         }
     }
 
-    pub fn remove_expiration(&self, key: &Bytes) -> Value {
+    pub fn persist(&self, key: &Bytes) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
         entries
             .get_mut(key)
             .filter(|x| x.is_valid())
-            .map_or(0_i64.into(), |mut x| {
-                let ret = x.expires_at.map_or(0_i64, |_| 1_i64);
-                x.expires_at = None;
-                ret.into()
+            .map_or(0_i64.into(), |x| {
+                if x.has_ttl() {
+                    x.persist();
+                    1_i64.into()
+                } else {
+                    0_i64.into()
+                }
             })
     }
 
-    pub fn add_expiration(&self, key: &Bytes, time: Duration) -> Value {
+    pub fn set_ttl(&self, key: &Bytes, expiration: Duration) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
         entries
             .get_mut(key)
             .filter(|x| x.is_valid())
-            .map_or(0_i64.into(), |mut x| {
-                x.expires_at = Some(Instant::now() + time);
+            .map_or(0_i64.into(), |x| {
+                x.set_ttl(expiration);
                 1_i64.into()
             })
     }
@@ -156,7 +162,7 @@ impl Db {
     pub fn getset(&self, key: &Bytes, value: &Value) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
         entries
-            .insert(key.clone(), Entry::new(value.clone()))
+            .insert(key.clone(), Entry::new(value.clone(), None))
             .filter(|x| x.is_valid())
             .map_or(Value::Null, |x| x.get().clone())
     }
@@ -166,9 +172,9 @@ impl Db {
         entries.remove(key).map_or(Value::Null, |x| x.get().clone())
     }
 
-    pub fn set(&self, key: &Bytes, value: &Value) -> Value {
+    pub fn set(&self, key: &Bytes, value: &Value, expires: Option<Duration>) -> Value {
         let mut entries = self.entries[self.get_slot(key)].write().unwrap();
-        entries.insert(key.clone(), Entry::new(value.clone()));
+        entries.insert(key.clone(), Entry::new(value.clone(), expires));
         Value::OK
     }
 
@@ -177,6 +183,6 @@ impl Db {
         entries
             .get(key)
             .filter(|x| x.is_valid())
-            .map(|x| x.expires_at)
+            .map(|x| x.get_ttl())
     }
 }

+ 10 - 0
src/dispatcher.rs

@@ -130,6 +130,16 @@ dispatcher! {
             ["random" "loading" "stale"],
             -3,
         },
+        setex {
+            cmd::string::setex,
+            ["random" "loading" "stale"],
+            4,
+        },
+        psetex {
+            cmd::string::setex,
+            ["random" "loading" "stale"],
+            4,
+        },
         strlen {
             cmd::string::strlen,
             ["random" "fast"],