| 
					
				 | 
			
			
				@@ -250,6 +250,135 @@ mod test { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         cmd::test::{create_connection, run_command}, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         value::Value, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    use tokio::time::{sleep, Duration, Instant}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn blpop_no_waiting() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let c = create_connection(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Integer(5)), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Array(vec![ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("foo".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("5".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ])), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["blpop", "foo", "1"]).await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn blpop_timeout() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let c = create_connection(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let x = Instant::now(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Null), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["blpop", "foobar", "1"]).await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert!(Instant::now() - x > Duration::from_millis(1000)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn blpop_wait_insert() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let c = create_connection(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let x = Instant::now(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Query command that will block connection until some data is inserted 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // to foobar, foo, bar or the 5 seconds timeout happens. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // We are issuing the command, sleeping a little bit then adding data to 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // foobar, before actually waiting on the result. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let waiting = run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Sleep 1 second before inserting new data 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        sleep(Duration::from_millis(1000)).await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Integer(5)), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Read the output of the first blpop command now. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Array(vec![ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("foo".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("5".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ])), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            waiting.await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert!(Instant::now() - x > Duration::from_millis(1000)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert!(Instant::now() - x < Duration::from_millis(5000)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn brpop_no_waiting() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let c = create_connection(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Integer(5)), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Array(vec![ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("foo".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("5".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ])), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["brpop", "foo", "1"]).await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn brpop_timeout() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let c = create_connection(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let x = Instant::now(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Null), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["brpop", "foobar", "1"]).await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert!(Instant::now() - x > Duration::from_millis(1000)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn brpop_wait_insert() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let c = create_connection(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let x = Instant::now(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Query command that will block connection until some data is inserted 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // to foobar, foo, bar or the 5 seconds timeout happens. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // We are issuing the command, sleeping a little bit then adding data to 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // foobar, before actually waiting on the result. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let waiting = run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Sleep 1 second before inserting new data 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        sleep(Duration::from_millis(1000)).await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Integer(5)), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Read the output of the first blpop command now. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert_eq!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(Value::Array(vec![ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("foo".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Value::Blob("5".into()), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ])), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            waiting.await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert!(Instant::now() - x > Duration::from_millis(1000)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert!(Instant::now() - x < Duration::from_millis(5000)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     async fn lindex() { 
			 |