1
0
Эх сурвалжийг харах

Merge pull request #2 from crodas/feature/more-expire-cmds

Adding more expiration related commands
César D. Rodas 3 жил өмнө
parent
commit
47ebe0e11a
6 өөрчлөгдсөн 166 нэмэгдсэн , 54 устгасан
  1. 71 7
      src/cmd/key.rs
  2. 46 0
      src/db/entry.rs
  3. 3 44
      src/db/mod.rs
  4. 30 0
      src/dispatcher.rs
  5. 13 0
      src/macros.rs
  6. 3 3
      src/server.rs

+ 71 - 7
src/cmd/key.rs

@@ -1,33 +1,97 @@
-use crate::{connection::Connection, error::Error, value::bytes_to_number, value::Value};
+use crate::{
+    check_arg, connection::Connection, error::Error, value::bytes_to_number, value::Value,
+};
 use bytes::Bytes;
+use std::time::{SystemTime, UNIX_EPOCH};
 use tokio::time::{Duration, Instant};
 
+pub fn now() -> Duration {
+    let start = SystemTime::now();
+    start
+        .duration_since(UNIX_EPOCH)
+        .expect("Time went backwards")
+}
+
 pub fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     Ok(conn.db().del(&args[1..]))
 }
 
 pub fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    let expires_at: i64 = bytes_to_number(&args[2])?;
+    let expires_in: i64 = bytes_to_number(&args[2])?;
 
-    if expires_at <= 0 {
+    if expires_in <= 0 {
+        // Delete key right away
         return Ok(conn.db().del(&args[1..2]));
     }
 
-    let expires_at = Duration::new(expires_at as u64, 0);
+    let expires_at = if check_arg!(args, 0, "EXPIRES") {
+        Duration::from_secs(expires_in as u64)
+    } else {
+        Duration::from_millis(expires_in as u64)
+    };
 
     Ok(conn.db().expire(&args[1], expires_at))
 }
 
-pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
-    Ok(conn.db().persist(&args[1]))
+pub fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let secs = check_arg!(args, 0, "EXPIREAT");
+    let expires_at: i64 = bytes_to_number(&args[2])?;
+    let expires_in: i64 = if secs {
+        expires_at - now().as_secs() as i64
+    } else {
+        expires_at - now().as_millis() as i64
+    };
+
+    if expires_in <= 0 {
+        // Delete key right away
+        return Ok(conn.db().del(&args[1..2]));
+    }
+
+    let expires_at = if secs {
+        Duration::from_secs(expires_in as u64)
+    } else {
+        Duration::from_millis(expires_in as u64)
+    };
+
+    Ok(conn.db().expire(&args[1], expires_at))
 }
 
 pub fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     let ttl = match conn.db().ttl(&args[1]) {
-        Some(Some(ttl)) => (ttl - Instant::now()).as_secs() as i64,
+        Some(Some(ttl)) => {
+            let ttl = ttl - Instant::now();
+            if check_arg!(args, 0, "TTL") {
+                ttl.as_secs() as i64
+            } else {
+                ttl.as_millis() as i64
+            }
+        }
         Some(None) => -1,
         None => -2,
     };
 
     Ok(ttl.into())
 }
+
+pub fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let ttl = match conn.db().ttl(&args[1]) {
+        Some(Some(ttl)) => {
+            // Is there a better way? There should be!
+            if check_arg!(args, 0, "EXPIRETIME") {
+                let secs: i64 = (ttl - Instant::now()).as_secs() as i64;
+                secs + (now().as_secs() as i64)
+            } else {
+                let secs: i64 = (ttl - Instant::now()).as_millis() as i64;
+                secs + (now().as_millis() as i64)
+            }
+        }
+        Some(None) => -1,
+        None => -2,
+    };
+
+    Ok(ttl.into())
+}
+
+pub fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    Ok(conn.db().persist(&args[1]))
+}

+ 46 - 0
src/db/entry.rs

@@ -0,0 +1,46 @@
+use crate::value::Value;
+use tokio::time::Instant;
+
+#[derive(Debug)]
+pub struct Entry {
+    pub value: Value,
+    pub expires_at: Option<Instant>,
+}
+
+/// Database Entry
+///
+/// A database entry is a Value associated with an optional expiration time.
+///
+/// The database will never return an entry if has expired already, by having
+/// this promise we can run the purge process every few seconds instead of doing
+/// so more frequently.
+impl Entry {
+    pub fn new(value: Value) -> Self {
+        Self {
+            value,
+            expires_at: None,
+        }
+    }
+
+    /// Changes the value that is wrapped in this entry, the TTL (expired_at) is
+    /// not affected.
+    pub fn change_value(&mut self, value: Value) {
+        self.value = value;
+    }
+
+    pub fn get_mut(&mut self) -> &mut Value {
+        &mut self.value
+    }
+
+    pub fn get(&self) -> &Value {
+        &self.value
+    }
+
+    /// If the Entry should be taken as valid, if this function returns FALSE
+    /// the callee should behave as if the key was not found. By having this
+    /// behaviour we can schedule the purge thread to run every few seconds or
+    /// even minutes instead of once every second.
+    pub fn is_valid(&self) -> bool {
+        self.expires_at.map_or(true, |x| x > Instant::now())
+    }
+}

+ 3 - 44
src/db/mod.rs

@@ -1,5 +1,8 @@
+pub mod entry;
+
 use crate::{error::Error, value::Value};
 use bytes::Bytes;
+use entry::Entry;
 use log::trace;
 use seahash::hash;
 use std::{
@@ -10,50 +13,6 @@ use std::{
 use tokio::time::{Duration, Instant};
 
 #[derive(Debug)]
-pub struct Entry {
-    pub value: Value,
-    pub expires_at: Option<Instant>,
-}
-
-/// Database Entry
-///
-/// A database entry is a Value associated with an optional expiration time.
-///
-/// The database will never return an entry if has expired already, by having
-/// this promise we can run the purge process every few seconds instead of doing
-/// so more frequently.
-impl Entry {
-    pub fn new(value: Value) -> Self {
-        Self {
-            value,
-            expires_at: None,
-        }
-    }
-
-    /// Changes the value that is wrapped in this entry, the TTL (expired_at) is
-    /// not affected.
-    pub fn change_value(&mut self, value: Value) {
-        self.value = value;
-    }
-
-    pub fn get_mut(&mut self) -> &mut Value {
-        &mut self.value
-    }
-
-    pub fn get(&self) -> &Value {
-        &self.value
-    }
-
-    /// If the Entry should be taken as valid, if this function returns FALSE
-    /// the callee should behave as if the key was not found. By having this
-    /// behaviour we can schedule the purge thread to run every few seconds or
-    /// even minutes instead of once every second.
-    pub fn is_valid(&self) -> bool {
-        self.expires_at.map_or(true, |x| x > Instant::now())
-    }
-}
-
-#[derive(Debug)]
 pub struct Db {
     /// A vector of hashmaps.
     ///

+ 30 - 0
src/dispatcher.rs

@@ -47,6 +47,16 @@ dispatcher! {
         ["read" "write" "fast"],
         3,
     },
+    expireat {
+        cmd::key::expire_at,
+        ["read" "write" "fast"],
+        3,
+    },
+    expiretime {
+        cmd::key::expire_time,
+        ["read" "write" "fast"],
+        2,
+    },
     del {
         cmd::key::del,
         ["random" "loading" "stale"],
@@ -77,6 +87,26 @@ dispatcher! {
         ["read" "read"],
         2,
     },
+    pexpire {
+        cmd::key::expire,
+        ["read" "write" "fast"],
+        3,
+    },
+    pexpireat {
+        cmd::key::expire_at,
+        ["read" "write" "fast"],
+        3,
+    },
+    pexpiretime {
+        cmd::key::expire_time,
+        ["read" "write" "fast"],
+        2,
+    },
+    pttl {
+        cmd::key::ttl,
+        ["read" "read"],
+        2,
+    },
     set {
         cmd::string::set,
         ["random" "loading" "stale"],

+ 13 - 0
src/macros.rs

@@ -130,3 +130,16 @@ macro_rules! option {
         }
     }
 }
+
+#[macro_export]
+macro_rules! check_arg {
+    {$args: tt, $pos: tt, $command: tt} => {{
+        match $args.get($pos) {
+            Some(bytes) => {
+                let command = unsafe { std::str::from_utf8_unchecked(&bytes) };
+                command.to_uppercase() == $command
+            },
+            None => false,
+        }
+    }}
+}

+ 3 - 3
src/server.rs

@@ -1,4 +1,4 @@
-use crate::{connection::Connections, dispatcher::Dispatcher, value::Value, db::Db};
+use crate::{connection::Connections, db::Db, dispatcher::Dispatcher, value::Value};
 use bytes::{Buf, Bytes, BytesMut};
 use futures::SinkExt;
 use log::{info, trace, warn};
@@ -26,13 +26,13 @@ impl Decoder for RedisParser {
 
     fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
         let (frame, proccesed) = {
-            let (unused, mut val) = match parse_server(src) {
+            let (unused, val) = match parse_server(src) {
                 Ok((buf, val)) => (buf, val),
                 Err(RedisError::Partial) => return Ok(None),
                 Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "something")),
             };
             (
-                val.iter_mut().map(|e| Bytes::copy_from_slice(e)).collect(),
+                val.iter().map(|e| Bytes::copy_from_slice(e)).collect(),
                 src.len() - unused.len(),
             )
         };