Selaa lähdekoodia

Working on sorted sets commands

Cesar Rodas 1 vuosi sitten
vanhempi
säilyke
23726a7fe3
5 muutettua tiedostoa jossa 122 lisäystä ja 78 poistoa
  1. 31 2
      src/cmd/sorted_set.rs
  2. 9 0
      src/dispatcher/mod.rs
  3. 1 1
      src/value/mod.rs
  4. 49 51
      src/value/sorted_set/insert.rs
  5. 32 24
      src/value/sorted_set/mod.rs

+ 31 - 2
src/cmd/sorted_set.rs

@@ -1,6 +1,4 @@
 //! # Sorted Set command handlers
-use std::collections::VecDeque;
-
 use crate::{
     connection::Connection,
     error::Error,
@@ -12,6 +10,7 @@ use crate::{
 };
 use bytes::Bytes;
 use float_ord::FloatOrd;
+use std::collections::VecDeque;
 
 /// Adds all the specified members with the specified scores to the sorted set
 /// stored at key. It is possible to specify multiple score / member pairs. If a
@@ -102,6 +101,36 @@ pub async fn zadd(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value,
     Ok(result)
 }
 
+/// Increments the score of member in the sorted set stored at key by increment.
+/// If member does not exist in the sorted set, it is added with increment as
+/// its score (as if its previous score was 0.0). If key does not exist, a new
+/// sorted set with the specified member as its sole member is created.
+pub async fn zincr_by(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
+    let key = args.pop_front().ok_or(Error::Syntax)?;
+    let score = bytes_to_number::<f64>(&args.pop_front().ok_or(Error::Syntax)?)?;
+    let value = args.pop_front().ok_or(Error::Syntax)?;
+    let option = IOption::incr();
+    let result = conn
+        .db()
+        .get(&key)
+        .map_mut(|v| match v {
+            Value::SortedSet(x) => {
+                let _ = x.insert(FloatOrd(score), value.clone(), &option);
+                Ok(x.get_score(&value).unwrap_or_default().into())
+            }
+            _ => Err(Error::WrongType),
+        })
+        .unwrap_or_else(|| {
+            #[allow(clippy::mutable_key_type)]
+            let mut x = SortedSet::new();
+            let _ = x.insert(FloatOrd(score), value.clone(), &option);
+            Ok(x.get_score(&value).unwrap_or_default().into())
+        })?;
+
+    conn.db().bump_version(&key);
+    Ok(result)
+}
+
 /// Returns the sorted set cardinality (number of elements) of the sorted set
 /// stored at key.
 pub async fn zcard(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {

+ 9 - 0
src/dispatcher/mod.rs

@@ -47,6 +47,15 @@ dispatcher! {
             1,
             true,
         },
+        ZINCRBY {
+            cmd::sorted_set::zincr_by,
+            [Flag::Write Flag::DenyOom Flag::Fast],
+            4,
+            1,
+            1,
+            1,
+            true,
+        }
     },
     set {
         SADD {

+ 1 - 1
src/value/mod.rs

@@ -244,7 +244,7 @@ pub fn bytes_to_range<T: FromStr>(bytes: &[u8]) -> Result<Bound<T>, Error> {
 }
 
 /// Converts bytes to a Range of float FloatOrd numbers
-pub fn bytes_to_range_floatord(bytes: &[u8]) -> Result<Bound<FloatOrd<f64>>, Error> {
+pub fn bytes_to_range_floatord(bytes: &[u8]) -> Result<Bound<sorted_set::Score>, Error> {
     match bytes_to_range(bytes)? {
         Bound::Included(n) => Ok(Bound::Included(FloatOrd(n))),
         Bound::Excluded(n) => Ok(Bound::Excluded(FloatOrd(n))),

+ 49 - 51
src/value/sorted_set/insert.rs

@@ -32,65 +32,63 @@ pub struct IOption {
 }
 
 impl IOption {
+    /// Creates a new instance where only incr is set to true
+    pub fn incr() -> Self {
+        Self {
+            incr: true,
+            ..Default::default()
+        }
+    }
     /// Creates a new instance
     pub fn new(args: &mut VecDeque<Bytes>) -> Result<Self, Error> {
         let mut update_policy = None;
         let mut update_policy_score = None;
         let mut return_change = false;
         let mut incr = false;
-        loop {
-            match args.get(0) {
-                Some(t) => {
-                    let command = String::from_utf8_lossy(t);
-                    match command.to_uppercase().as_str() {
-                        "NX" => {
-                            if update_policy == Some(IPolicy::XX) {
-                                return Err(Error::OptsNotCompatible("XX AND NX".to_owned()));
-                            }
-                            update_policy = Some(IPolicy::NX);
-                            args.pop_front();
-                        }
-                        "XX" => {
-                            if update_policy == Some(IPolicy::NX) {
-                                return Err(Error::OptsNotCompatible("XX AND NX".to_owned()));
-                            }
-                            update_policy = Some(IPolicy::XX);
-                            args.pop_front();
-                        }
-                        "LT" => {
-                            if update_policy == Some(IPolicy::NX)
-                                || update_policy_score == Some(UPolicyScore::GT)
-                            {
-                                return Err(Error::OptsNotCompatible(
-                                    "GT, LT, and/or NX".to_owned(),
-                                ));
-                            }
-                            update_policy_score = Some(UPolicyScore::LT);
-                            args.pop_front();
-                        }
-                        "GT" => {
-                            if update_policy == Some(IPolicy::NX)
-                                || update_policy_score == Some(UPolicyScore::LT)
-                            {
-                                return Err(Error::OptsNotCompatible(
-                                    "GT, LT, and/or NX".to_owned(),
-                                ));
-                            }
-                            update_policy_score = Some(UPolicyScore::GT);
-                            args.pop_front();
-                        }
-                        "CH" => {
-                            return_change = true;
-                            args.pop_front();
-                        }
-                        "INCR" => {
-                            incr = true;
-                            args.pop_front();
-                        }
-                        _ => break,
+        while let Some(t) = args.front() {
+            let command = String::from_utf8_lossy(t);
+            match command.to_uppercase().as_str() {
+                "NX" => {
+                    if update_policy == Some(IPolicy::XX) {
+                        return Err(Error::OptsNotCompatible("XX AND NX".to_owned()));
+                    }
+                    update_policy = Some(IPolicy::NX);
+                    args.pop_front();
+                }
+                "XX" => {
+                    if update_policy == Some(IPolicy::NX) {
+                        return Err(Error::OptsNotCompatible("XX AND NX".to_owned()));
+                    }
+                    update_policy = Some(IPolicy::XX);
+                    args.pop_front();
+                }
+                "LT" => {
+                    if update_policy == Some(IPolicy::NX)
+                        || update_policy_score == Some(UPolicyScore::GT)
+                    {
+                        return Err(Error::OptsNotCompatible("GT, LT, and/or NX".to_owned()));
+                    }
+                    update_policy_score = Some(UPolicyScore::LT);
+                    args.pop_front();
+                }
+                "GT" => {
+                    if update_policy == Some(IPolicy::NX)
+                        || update_policy_score == Some(UPolicyScore::LT)
+                    {
+                        return Err(Error::OptsNotCompatible("GT, LT, and/or NX".to_owned()));
                     }
+                    update_policy_score = Some(UPolicyScore::GT);
+                    args.pop_front();
+                }
+                "CH" => {
+                    return_change = true;
+                    args.pop_front();
+                }
+                "INCR" => {
+                    incr = true;
+                    args.pop_front();
                 }
-                None => break,
+                _ => break,
             }
         }
         Ok(Self {

+ 32 - 24
src/value/sorted_set/mod.rs

@@ -12,11 +12,16 @@ mod insert;
 pub use insert::{IOption, IResult};
 use insert::{IPolicy, UPolicyScore};
 
+/// Sorted set score
+pub type Score = FloatOrd<f64>;
+/// Sorted set data with score
+pub type DataWithScore = (Score, Bytes);
+
 /// Sorted set structure
 #[derive(Debug, Clone)]
 pub struct SortedSet {
-    set: HashMap<Bytes, (FloatOrd<f64>, usize)>,
-    order: BTreeMap<(FloatOrd<f64>, Bytes), usize>,
+    set: HashMap<Bytes, (Score, usize)>,
+    order: BTreeMap<DataWithScore, usize>,
 }
 
 impl PartialEq for SortedSet {
@@ -25,6 +30,12 @@ impl PartialEq for SortedSet {
     }
 }
 
+impl Default for SortedSet {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl SortedSet {
     /// Creates a new instance
     pub fn new() -> Self {
@@ -40,6 +51,11 @@ impl SortedSet {
         self.order.clear();
     }
 
+    /// Returns `true` if the map contains no elements.
+    pub fn is_empty(&self) -> bool {
+        self.set.is_empty()
+    }
+
     /// Returns the number of elements in the set
     pub fn len(&self) -> usize {
         self.set.len()
@@ -49,7 +65,7 @@ impl SortedSet {
     /// If the set did not have this value present, true is returned.
     ///
     /// If the set did have this value present, false is returned.
-    pub fn insert(&mut self, score: FloatOrd<f64>, value: Bytes, option: &IOption) -> IResult {
+    pub fn insert(&mut self, score: Score, value: Bytes, option: &IOption) -> IResult {
         if let Some((current_score, _)) = self.set.get(&value).cloned() {
             if option.insert_policy == Some(IPolicy::NX) {
                 return IResult::NoOp;
@@ -93,8 +109,8 @@ impl SortedSet {
     }
 
     /// Returns a reference to the score in the set, if any, that is equal to the given value.
-    pub fn get_score(&self, value: &Bytes) -> Option<FloatOrd<f64>> {
-        self.set.get(value).map(|(value, _)| *value)
+    pub fn get_score(&self, value: &Bytes) -> Option<f64> {
+        self.set.get(value).map(|(value, _)| value.0)
     }
 
     /// Returns all the values sorted by their score
@@ -104,9 +120,9 @@ impl SortedSet {
 
     #[inline]
     fn convert_to_range(
-        min: Bound<FloatOrd<f64>>,
-        max: Bound<FloatOrd<f64>>,
-    ) -> (Bound<(FloatOrd<f64>, Bytes)>, Bound<(FloatOrd<f64>, Bytes)>) {
+        min: Bound<Score>,
+        max: Bound<Score>,
+    ) -> (Bound<DataWithScore>, Bound<DataWithScore>) {
         let min_bytes = Bytes::new();
         let max_bytes = Bytes::copy_from_slice(&vec![255u8; 4096]);
 
@@ -125,20 +141,12 @@ impl SortedSet {
     }
 
     /// Get total number of values in a score range
-    pub fn count_values_by_score_range(
-        &self,
-        min: Bound<FloatOrd<f64>>,
-        max: Bound<FloatOrd<f64>>,
-    ) -> usize {
+    pub fn count_values_by_score_range(&self, min: Bound<Score>, max: Bound<Score>) -> usize {
         self.order.range(Self::convert_to_range(min, max)).count()
     }
 
     /// Get values in a score range
-    pub fn get_values_by_score_range(
-        &self,
-        min: Bound<FloatOrd<f64>>,
-        max: Bound<FloatOrd<f64>>,
-    ) -> Vec<Bytes> {
+    pub fn get_values_by_score_range(&self, min: Bound<Score>, max: Bound<Score>) -> Vec<Bytes> {
         self.order
             .range(Self::convert_to_range(min, max))
             .map(|(k, _)| k.1.clone())
@@ -148,13 +156,11 @@ impl SortedSet {
     /// Adds the position in the set to each value based on their score
     #[inline]
     fn update_value_position(&mut self) {
-        let mut i = 0;
-        for ((_, key), value) in self.order.iter_mut() {
+        for (i, ((_, key), value)) in self.order.iter_mut().enumerate() {
             *value = i;
             if let Some(value) = self.set.get_mut(key) {
                 value.1 = i;
             }
-            i += 1;
         }
     }
 
@@ -171,8 +177,10 @@ mod test {
     #[test]
     fn basic_usage() {
         let mut set: SortedSet = SortedSet::new();
-        let mut op = IOption::default();
-        op.insert_policy = Some(IPolicy::NX);
+        let mut op = IOption {
+            insert_policy: Some(IPolicy::NX),
+            ..Default::default()
+        };
 
         assert_eq!(
             set.insert(FloatOrd(1.0), "2".into(), &op),
@@ -189,7 +197,7 @@ mod test {
         assert_eq!(set.insert(FloatOrd(2.0), "2".into(), &op), IResult::Updated);
 
         //assert_eq!(vec![3, 2], set.get_values());
-        assert_eq!(Some(FloatOrd(3.0)), set.get_score(&"2".into()));
+        assert_eq!(Some(3.0), set.get_score(&"2".into()));
         assert_eq!(Some(1), set.get_value_pos(&"2".into()));
         assert_eq!(Some(0), set.get_value_pos(&"3".into()));
         assert_eq!(None, set.get_value_pos(&"5".into()));