Browse Source

Added RANDOMKEY command (#49)

* Added RANDOMKEY command

* Added RANDOMKEY
* Improved CI process
* Improved Makefile
* Running keyspace tests

* Add more tests to run

* Fixed RESET issues

After calling RESET it should not notify about all unsubscribed
channels.

Also removed a protocol issue where the server connection will froze.
This will be addresses in another pull request.
César D. Rodas 2 years ago
parent
commit
a3fc949c9a

+ 10 - 4
.github/workflows/ci.yml

@@ -1,6 +1,13 @@
 name: CI
 
-on: [push, pull_request]
+on:
+  push:
+    branches: [ main, master, develop ]
+  pull_request:
+    branches: [ main, master, develop ]
+
+env:
+  CARGO_TERM_COLOR: always
 
 jobs:
 
@@ -8,9 +15,8 @@ jobs:
     runs-on: ubuntu-latest
     steps:
     - uses: actions/checkout@v3
-    - name: build
-      run: cargo build --release
     - name: test
       run: |
+        rustup component add clippy
         sudo apt-get install tcl8.6 tclx
-        make test
+        make ci

+ 0 - 31
.github/workflows/rust.yml

@@ -1,31 +0,0 @@
-name: Rust
-
-on:
-  push:
-    branches: [ main, master, develop ]
-  pull_request:
-    branches: [ main, master, develop ]
-
-env:
-  CARGO_TERM_COLOR: always
-
-jobs:
-  build:
-    runs-on: ubuntu-latest
-
-    steps:
-    - uses: actions/checkout@v2
-    - name: Build
-      run: cargo build --verbose
-    - name: Run tests
-      run: cargo test --all --verbose
-
-  clippy_check:
-    runs-on: ubuntu-latest
-    steps:
-      - uses: actions/checkout@v1
-      - run: rustup component add clippy
-      - uses: actions-rs/clippy-check@v1
-        with:
-          token: ${{ secrets.GITHUB_TOKEN }}
-          args: --all-features

+ 23 - 3
Makefile

@@ -1,11 +1,27 @@
+fmt:
+	cargo fmt
+clippy:
+	cargo clippy --release
 build:
 	cargo build --release
+test-single: build
+	./runtest  --clients 1 \
+		--single unit/other \
+		--ignore-encoding \
+		--tags -needs:repl \
+		--tags -leaks \
+		--tags -needs:debug \
+		--tags -needs:save \
+		--tags -external:skip \
+		--tags -needs:save \
+		--tags -consistency \
+		--tags -cli \
+		--tags -needs:config-maxmemory
 test: build
 	./runtest  --clients 5 \
 		--skipunit unit/dump \
 		--skipunit unit/auth \
 		--skipunit unit/protocol \
-		--skipunit unit/keyspace \
 		--skipunit unit/scan \
 		--skipunit unit/info \
 		--skipunit unit/type/zset \
@@ -13,7 +29,6 @@ test: build
 		--skipunit unit/type/stream \
 		--skipunit unit/type/stream-cgroups \
 		--skipunit unit/sort \
-		--skipunit unit/other \
 		--skipunit unit/aofrw \
 		--skipunit unit/acl \
 		--skipunit unit/latency-monitor \
@@ -32,5 +47,10 @@ test: build
 		--tags -needs:repl \
 		--tags -leaks \
 		--tags -needs:debug \
+		--tags -needs:save \
 		--tags -external:skip \
-		--tags -cli --tags -needs:config-maxmemory --stop 2>&1
+		--tags -needs:save \
+		--tags -consistency \
+		--tags -cli \
+		--tags -needs:config-maxmemory
+ci: fmt clippy build test

+ 7 - 2
src/cmd/key.rs

@@ -33,7 +33,7 @@ pub async fn copy(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
         Some(
             conn.all_connections()
                 .get_databases()
-                .get(bytes_to_number(&args[4])?)?
+                .get(bytes_to_int(&args[4])?)?
                 .clone(),
         )
     } else {
@@ -157,7 +157,7 @@ pub async fn move_key(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
     let target_db = conn
         .all_connections()
         .get_databases()
-        .get(bytes_to_number(&args[2])?)?;
+        .get(bytes_to_int(&args[2])?)?;
 
     Ok(if conn.db().move_key(&args[1], target_db)? {
         1.into()
@@ -193,6 +193,11 @@ pub async fn object(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
     }
 }
 
+/// Return a random key from the currently selected database.
+pub async fn randomkey(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
+    conn.db().randomkey()
+}
+
 /// Renames key to newkey. It returns an error when key does not exist. If
 /// newkey already exists it is overwritten, when this happens RENAME executes
 /// an implicit DEL operation, so if the deleted key contains a very big value

+ 5 - 4
src/connection/mod.rs

@@ -194,14 +194,15 @@ impl Connection {
         info.watch_keys = vec![];
         info.commands = None;
         info.tx_keys = HashSet::new();
+        drop(info);
 
         let pubsub = self.pubsub();
         let pubsub_client = self.pubsub_client();
         if !pubsub_client.subscriptions().is_empty() {
-            pubsub.unsubscribe(&self.pubsub_client.subscriptions(), self);
+            pubsub.unsubscribe(&self.pubsub_client.subscriptions(), self, false);
         }
         if !pubsub_client.psubscriptions().is_empty() {
-            pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self);
+            pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), self, false);
         }
     }
 
@@ -284,8 +285,8 @@ impl Connection {
     /// all_connection lists.
     pub fn destroy(self: Arc<Connection>) {
         let pubsub = self.pubsub();
-        pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self);
-        pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self);
+        pubsub.unsubscribe(&self.pubsub_client.subscriptions(), &self, false);
+        pubsub.punsubscribe(&self.pubsub_client.psubscriptions(), &self, false);
         self.all_connections.clone().remove(self);
     }
 

+ 2 - 2
src/connection/pubsub_connection.rs

@@ -45,7 +45,7 @@ impl PubsubClient {
             .map(|channel| meta.psubscriptions.remove(channel))
             .for_each(drop);
         drop(meta);
-        conn.pubsub().punsubscribe(channels, conn);
+        conn.pubsub().punsubscribe(channels, conn, true);
 
         if self.total_subs() == 0 {
             conn.reset();
@@ -60,7 +60,7 @@ impl PubsubClient {
             .map(|channel| meta.subscriptions.remove(channel))
             .for_each(drop);
         drop(meta);
-        conn.pubsub().unsubscribe(channels, conn);
+        conn.pubsub().unsubscribe(channels, conn, true);
 
         if self.total_subs() == 0 {
             conn.reset();

+ 16 - 12
src/connection/pubsub_server.rs

@@ -129,7 +129,7 @@ impl Pubsub {
     }
 
     /// Unsubscribe from a pattern subscription
-    pub fn punsubscribe(&self, channels: &[Pattern], conn: &Connection) {
+    pub fn punsubscribe(&self, channels: &[Pattern], conn: &Connection, notify: bool) {
         if channels.is_empty() {
             return conn.append_response(Value::Array(vec![
                 "punsubscribe".into(),
@@ -149,11 +149,13 @@ impl Pubsub {
                     }
                 }
 
-                conn.append_response(Value::Array(vec![
-                    "punsubscribe".into(),
-                    channel.as_str().into(),
-                    conn.pubsub_client().total_subs().into(),
-                ]));
+                if notify {
+                    conn.append_response(Value::Array(vec![
+                        "punsubscribe".into(),
+                        channel.as_str().into(),
+                        conn.pubsub_client().total_subs().into(),
+                    ]));
+                }
             })
             .for_each(drop);
     }
@@ -188,7 +190,7 @@ impl Pubsub {
     }
 
     /// Removes connection subscription to channels.
-    pub fn unsubscribe(&self, channels: &[Bytes], conn: &Connection) {
+    pub fn unsubscribe(&self, channels: &[Bytes], conn: &Connection, notify: bool) {
         if channels.is_empty() {
             return conn.append_response(Value::Array(vec![
                 "unsubscribe".into(),
@@ -208,11 +210,13 @@ impl Pubsub {
                         all_subs.remove(channel);
                     }
                 }
-                conn.append_response(Value::Array(vec![
-                    "unsubscribe".into(),
-                    Value::new(&channel),
-                    (all_subs.len() + total_psubs).into(),
-                ]));
+                if notify {
+                    conn.append_response(Value::Array(vec![
+                        "unsubscribe".into(),
+                        Value::new(&channel),
+                        (all_subs.len() + total_psubs).into(),
+                    ]));
+                }
             })
             .for_each(drop);
     }

+ 24 - 0
src/db/mod.rs

@@ -20,6 +20,7 @@ use glob::Pattern;
 use log::trace;
 use num_traits::CheckedAdd;
 use parking_lot::{Mutex, RwLock};
+use rand::{prelude::SliceRandom, Rng};
 use seahash::hash;
 use std::{
     collections::HashMap,
@@ -555,6 +556,29 @@ impl Db {
         }
     }
 
+    /// Return a random key from the database
+    pub fn randomkey(&self) -> Result<Value, Error> {
+        let mut rng = rand::thread_rng();
+        let mut candidates = self
+            .slots
+            .iter()
+            .map(|slot| {
+                let slot = slot.read();
+                if slot.is_empty() {
+                    None
+                } else {
+                    slot.iter()
+                        .skip(rng.gen_range((0..slot.len())))
+                        .next()
+                        .map(|(k, v)| k.clone())
+                }
+            })
+            .filter_map(|v| v)
+            .collect::<Vec<Bytes>>();
+        candidates.shuffle(&mut rng);
+        Ok(candidates.get(0).into())
+    }
+
     /// Renames a key
     pub fn rename(
         &self,

+ 9 - 0
src/dispatcher/mod.rs

@@ -605,6 +605,15 @@ dispatcher! {
             1,
             true,
         },
+        RANDOMKEY {
+            cmd::key::randomkey,
+            [Flag::ReadOnly Flag::Random],
+            1,
+            0,
+            0,
+            0,
+            true,
+        },
         RENAME {
             cmd::key::rename,
             [Flag::Write],

+ 1 - 1
src/error.rs

@@ -83,7 +83,7 @@ pub enum Error {
     #[error("GT and LT options at the same time are not compatible")]
     InvalidExpireOpts,
     /// The connection is not in a transaction
-    #[error(" without MULTI")]
+    #[error("without MULTI")]
     NotInTx,
     /// Transaction was aborted
     #[error("Transaction discarded because of previous errors.")]

+ 1 - 91
tests/unit/keyspace.tcl

@@ -282,37 +282,6 @@ start_server {tags {"keyspace"}} {
         assert_equal $digest [debug_digest_value newset2{t}]
     }
 
-    test {COPY basic usage for ziplist sorted set} {
-        r del zset1{t} newzset1{t}
-        r zadd zset1{t} 123 foobar
-        assert_encoding ziplist zset1{t}
-        r copy zset1{t} newzset1{t}
-        set digest [debug_digest_value zset1{t}]
-        assert_equal $digest [debug_digest_value newzset1{t}]
-        assert_equal 1 [r object refcount zset1{t}]
-        assert_equal 1 [r object refcount newzset1{t}]
-        r del zset1{t}
-        assert_equal $digest [debug_digest_value newzset1{t}]
-    }
-
-     test {COPY basic usage for skiplist sorted set} {
-        r del zset2{t} newzset2{t}
-        set original_max [lindex [r config get zset-max-ziplist-entries] 1]
-        r config set zset-max-ziplist-entries 0
-        for {set j 0} {$j < 130} {incr j} {
-            r zadd zset2{t} [randomInt 50] ele-[randomInt 10]
-        }
-        assert_encoding skiplist zset2{t}
-        r copy zset2{t} newzset2{t}
-        set digest [debug_digest_value zset2{t}]
-        assert_equal $digest [debug_digest_value newzset2{t}]
-        assert_equal 1 [r object refcount zset2{t}]
-        assert_equal 1 [r object refcount newzset2{t}]
-        r del zset2{t}
-        assert_equal $digest [debug_digest_value newzset2{t}]
-        r config set zset-max-ziplist-entries $original_max
-    }
-
     test {COPY basic usage for listpack hash} {
         r del hash1{t} newhash1{t}
         r hset hash1{t} tmp 17179869184
@@ -326,67 +295,8 @@ start_server {tags {"keyspace"}} {
         assert_equal $digest [debug_digest_value newhash1{t}]
     }
 
-    test {COPY basic usage for hashtable hash} {
-        r del hash2{t} newhash2{t}
-        set original_max [lindex [r config get hash-max-ziplist-entries] 1]
-        r config set hash-max-ziplist-entries 0
-        for {set i 0} {$i < 64} {incr i} {
-            r hset hash2{t} [randomValue] [randomValue]
-        }
-        assert_encoding hashtable hash2{t}
-        r copy hash2{t} newhash2{t}
-        set digest [debug_digest_value hash2{t}]
-        assert_equal $digest [debug_digest_value newhash2{t}]
-        assert_equal 1 [r object refcount hash2{t}]
-        assert_equal 1 [r object refcount newhash2{t}]
-        r del hash2{t}
-        assert_equal $digest [debug_digest_value newhash2{t}]
-        r config set hash-max-ziplist-entries $original_max
-    }
-
-    test {COPY basic usage for stream} {
-        r del mystream{t} mynewstream{t}
-        for {set i 0} {$i < 1000} {incr i} {
-            r XADD mystream{t} * item 2 value b
-        }
-        r copy mystream{t} mynewstream{t}
-        set digest [debug_digest_value mystream{t}]
-        assert_equal $digest [debug_digest_value mynewstream{t}]
-        assert_equal 1 [r object refcount mystream{t}]
-        assert_equal 1 [r object refcount mynewstream{t}]
-        r del mystream{t}
-        assert_equal $digest [debug_digest_value mynewstream{t}]
-    }
-
-    test {COPY basic usage for stream-cgroups} {
-        r del x{t}
-        r XADD x{t} 100 a 1
-        set id [r XADD x{t} 101 b 1]
-        r XADD x{t} 102 c 1
-        r XADD x{t} 103 e 1
-        r XADD x{t} 104 f 1
-        r XADD x{t} 105 g 1
-        r XGROUP CREATE x{t} g1 0
-        r XGROUP CREATE x{t} g2 0
-        r XREADGROUP GROUP g1 Alice COUNT 1 STREAMS x{t} >
-        r XREADGROUP GROUP g1 Bob COUNT 1 STREAMS x{t} >
-        r XREADGROUP GROUP g1 Bob NOACK COUNT 1 STREAMS x{t} >
-        r XREADGROUP GROUP g2 Charlie COUNT 4 STREAMS x{t} >
-        r XGROUP SETID x{t} g1 $id
-        r XREADGROUP GROUP g1 Dave COUNT 3 STREAMS x{t} >
-        r XDEL x{t} 103
-
-        r copy x{t} newx{t}
-        set info [r xinfo stream x{t} full]
-        assert_equal $info [r xinfo stream newx{t} full]
-        assert_equal 1 [r object refcount x{t}]
-        assert_equal 1 [r object refcount newx{t}]
-        r del x{t}
-        assert_equal $info [r xinfo stream newx{t} full]
-        r flushdb
-    }
-
     test {MOVE basic usage} {
+        r flushdb
         r set mykey foobar
         r move mykey 10
         set res {}

+ 0 - 120
tests/unit/other.tcl

@@ -54,60 +54,6 @@ start_server {tags {"other"}} {
         set _ $err
     } {*index is out of range*} {cluster:skip}
 
-    tags {consistency} {
-        proc check_consistency {dumpname code} {
-            set dump [csvdump r]
-            set sha1 [r debug digest]
-
-            uplevel 1 $code
-
-            set sha1_after [r debug digest]
-            if {$sha1 eq $sha1_after} {
-                return 1
-            }
-
-            # Failed
-            set newdump [csvdump r]
-            puts "Consistency test failed!"
-            puts "You can inspect the two dumps in /tmp/${dumpname}*.txt"
-
-            set fd [open /tmp/${dumpname}1.txt w]
-            puts $fd $dump
-            close $fd
-            set fd [open /tmp/${dumpname}2.txt w]
-            puts $fd $newdump
-            close $fd
-
-            return 0
-        }
-
-        if {$::accurate} {set numops 10000} else {set numops 1000}
-        test {Check consistency of different data types after a reload} {
-            r flushdb
-            createComplexDataset r $numops usetag
-            if {$::ignoredigest} {
-                set _ 1
-            } else {
-                check_consistency {repldump} {
-                    r debug reload
-                }
-            }
-        } {1}
-
-        test {Same dataset digest if saving/reloading as AOF?} {
-            if {$::ignoredigest} {
-                set _ 1
-            } else {
-                check_consistency {aofdump} {
-                    r config set aof-use-rdb-preamble no
-                    r bgrewriteaof
-                    waitForBgrewriteaof r
-                    r debug loadaof
-                }
-            }
-        } {1} {needs:debug}
-    }
-
     test {EXPIRES after a reload (snapshot + append only file rewrite)} {
         r flushdb
         r set x 10
@@ -164,41 +110,6 @@ start_server {tags {"other"}} {
         r config set appendonly no
     } {OK} {needs:debug}
 
-    tags {protocol} {
-        test {PIPELINING stresser (also a regression for the old epoll bug)} {
-            if {$::tls} {
-                set fd2 [::tls::socket [srv host] [srv port]]
-            } else {
-                set fd2 [socket [srv host] [srv port]]
-            }
-            fconfigure $fd2 -encoding binary -translation binary
-            puts -nonewline $fd2 "SELECT 9\r\n"
-            flush $fd2
-            gets $fd2
-
-            for {set i 0} {$i < 100000} {incr i} {
-                set q {}
-                set val "0000${i}0000"
-                append q "SET key:$i $val\r\n"
-                puts -nonewline $fd2 $q
-                set q {}
-                append q "GET key:$i\r\n"
-                puts -nonewline $fd2 $q
-            }
-            flush $fd2
-
-            for {set i 0} {$i < 100000} {incr i} {
-                gets $fd2 line
-                gets $fd2 count
-                set count [string range $count 1 end]
-                set val [read $fd2 $count]
-                read $fd2 2
-            }
-            close $fd2
-            set _ 1
-        } {1}
-    }
-
     test {APPEND basics} {
         r del foo
         list [r append foo bar] [r get foo] \
@@ -255,27 +166,6 @@ start_server {tags {"other"}} {
         r save
     } {OK} {needs:save}
 
-    test {RESET clears client state} {
-        r client setname test-client
-        r client tracking on
-
-        assert_equal [r reset] "RESET"
-        set client [r client list]
-        assert_match {*name= *} $client
-        assert_match {*flags=N *} $client
-    } {} {needs:reset}
-
-    test {RESET clears MONITOR state} {
-        set rd [redis_deferring_client]
-        $rd monitor
-        assert_equal [$rd read] "OK"
-
-        $rd reset
-        assert_equal [$rd read] "RESET"
-
-        assert_no_match {*flags=O*} [r client list]
-    } {} {needs:reset}
-
     test {RESET clears and discards MULTI state} {
         r multi
         r set key-a a
@@ -292,16 +182,6 @@ start_server {tags {"other"}} {
         # confirm we're not subscribed by executing another command
         r set key val
     } {OK} {needs:reset}
-
-    test {RESET clears authenticated state} {
-        r acl setuser user1 on >secret +@all
-        r auth user1 secret
-        assert_equal [r acl whoami] user1
-
-        r reset
-
-        assert_equal [r acl whoami] default
-    } {} {needs:reset}
 }
 
 start_server {tags {"other external:skip"}} {