|
@@ -917,7 +917,7 @@ pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
#[cfg(test)]
|
|
|
mod test {
|
|
|
use crate::{
|
|
|
- cmd::test::{create_connection, run_command},
|
|
|
+ cmd::test::{create_connection, create_connection_and_pubsub, run_command},
|
|
|
error::Error,
|
|
|
value::Value,
|
|
|
};
|
|
@@ -925,7 +925,7 @@ mod test {
|
|
|
|
|
|
#[tokio::test]
|
|
|
async fn blpop_no_waiting() {
|
|
|
- let c = create_connection();
|
|
|
+ let (mut recv, c) = create_connection_and_pubsub();
|
|
|
|
|
|
assert_eq!(
|
|
|
Ok(Value::Integer(5)),
|
|
@@ -933,30 +933,37 @@ mod test {
|
|
|
);
|
|
|
|
|
|
assert_eq!(
|
|
|
- Ok(Value::Array(vec![
|
|
|
+ Ok(Value::Ignore),
|
|
|
+ run_command(&c, &["blpop", "foo", "1"]).await
|
|
|
+ );
|
|
|
+
|
|
|
+ assert_eq!(
|
|
|
+ Some(Value::Array(vec![
|
|
|
Value::Blob("foo".into()),
|
|
|
Value::Blob("5".into()),
|
|
|
])),
|
|
|
- run_command(&c, &["blpop", "foo", "1"]).await
|
|
|
+ recv.recv().await
|
|
|
);
|
|
|
}
|
|
|
|
|
|
#[tokio::test]
|
|
|
async fn blpop_timeout() {
|
|
|
- let c = create_connection();
|
|
|
+ let (mut recv, c) = create_connection_and_pubsub();
|
|
|
let x = Instant::now();
|
|
|
|
|
|
assert_eq!(
|
|
|
- Ok(Value::Null),
|
|
|
+ Ok(Value::Ignore),
|
|
|
run_command(&c, &["blpop", "foobar", "1"]).await
|
|
|
);
|
|
|
|
|
|
- assert!(Instant::now() - x <= Duration::from_millis(1000));
|
|
|
+ assert_eq!(Some(Value::Null), recv.recv().await,);
|
|
|
+
|
|
|
+ assert!(Instant::now() - x >= Duration::from_millis(1000));
|
|
|
}
|
|
|
|
|
|
#[tokio::test]
|
|
|
async fn blpop_wait_insert() {
|
|
|
- let c = create_connection();
|
|
|
+ let (mut recv, c) = create_connection_and_pubsub();
|
|
|
let x = Instant::now();
|
|
|
|
|
|
// Query command that will block connection until some data is inserted
|
|
@@ -964,7 +971,10 @@ mod test {
|
|
|
//
|
|
|
// We are issuing the command, sleeping a little bit then adding data to
|
|
|
// foobar, before actually waiting on the result.
|
|
|
- let waiting = run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]);
|
|
|
+ assert_eq!(
|
|
|
+ Ok(Value::Ignore),
|
|
|
+ run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]).await
|
|
|
+ );
|
|
|
|
|
|
// Sleep 1 second before inserting new data
|
|
|
sleep(Duration::from_millis(1000)).await;
|
|
@@ -976,11 +986,11 @@ mod test {
|
|
|
|
|
|
// Read the output of the first blpop command now.
|
|
|
assert_eq!(
|
|
|
- Ok(Value::Array(vec![
|
|
|
+ Some(Value::Array(vec![
|
|
|
Value::Blob("foo".into()),
|
|
|
Value::Blob("5".into()),
|
|
|
])),
|
|
|
- waiting.await
|
|
|
+ recv.recv().await,
|
|
|
);
|
|
|
|
|
|
assert!(Instant::now() - x > Duration::from_millis(1000));
|
|
@@ -1105,7 +1115,7 @@ mod test {
|
|
|
|
|
|
#[tokio::test]
|
|
|
async fn brpop_no_waiting() {
|
|
|
- let c = create_connection();
|
|
|
+ let (mut recv, c) = create_connection_and_pubsub();
|
|
|
|
|
|
assert_eq!(
|
|
|
Ok(Value::Integer(5)),
|
|
@@ -1113,30 +1123,37 @@ mod test {
|
|
|
);
|
|
|
|
|
|
assert_eq!(
|
|
|
- Ok(Value::Array(vec![
|
|
|
+ Ok(Value::Ignore),
|
|
|
+ run_command(&c, &["brpop", "foo", "1"]).await
|
|
|
+ );
|
|
|
+
|
|
|
+ assert_eq!(
|
|
|
+ Some(Value::Array(vec![
|
|
|
Value::Blob("foo".into()),
|
|
|
Value::Blob("5".into()),
|
|
|
])),
|
|
|
- run_command(&c, &["brpop", "foo", "1"]).await
|
|
|
+ recv.recv().await,
|
|
|
);
|
|
|
}
|
|
|
|
|
|
#[tokio::test]
|
|
|
async fn brpop_timeout() {
|
|
|
- let c = create_connection();
|
|
|
+ let (mut recv, c) = create_connection_and_pubsub();
|
|
|
let x = Instant::now();
|
|
|
|
|
|
assert_eq!(
|
|
|
- Ok(Value::Null),
|
|
|
+ Ok(Value::Ignore),
|
|
|
run_command(&c, &["brpop", "foobar", "1"]).await
|
|
|
);
|
|
|
|
|
|
- assert!(Instant::now() - x < Duration::from_millis(1000));
|
|
|
+ assert_eq!(Some(Value::Null), recv.recv().await,);
|
|
|
+
|
|
|
+ assert!(Instant::now() - x >= Duration::from_millis(1000));
|
|
|
}
|
|
|
|
|
|
#[tokio::test]
|
|
|
async fn brpop_wait_insert() {
|
|
|
- let c = create_connection();
|
|
|
+ let (mut recv, c) = create_connection_and_pubsub();
|
|
|
let x = Instant::now();
|
|
|
|
|
|
// Query command that will block connection until some data is inserted
|
|
@@ -1144,7 +1161,10 @@ mod test {
|
|
|
//
|
|
|
// We are issuing the command, sleeping a little bit then adding data to
|
|
|
// foobar, before actually waiting on the result.
|
|
|
- let waiting = run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]);
|
|
|
+ assert_eq!(
|
|
|
+ Ok(Value::Ignore),
|
|
|
+ run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]).await
|
|
|
+ );
|
|
|
|
|
|
// Sleep 1 second before inserting new data
|
|
|
sleep(Duration::from_millis(1000)).await;
|
|
@@ -1156,11 +1176,11 @@ mod test {
|
|
|
|
|
|
// Read the output of the first blpop command now.
|
|
|
assert_eq!(
|
|
|
- Ok(Value::Array(vec![
|
|
|
+ Some(Value::Array(vec![
|
|
|
Value::Blob("foo".into()),
|
|
|
Value::Blob("5".into()),
|
|
|
])),
|
|
|
- waiting.await
|
|
|
+ recv.recv().await,
|
|
|
);
|
|
|
|
|
|
assert!(Instant::now() - x > Duration::from_millis(1000));
|