|
@@ -1,7 +1,7 @@
|
|
|
//! # List command handlers
|
|
|
use crate::{
|
|
|
check_arg,
|
|
|
- connection::{Connection, ConnectionStatus, UnblockReason},
|
|
|
+ connection::{Connection, UnblockReason},
|
|
|
db::utils::far_future,
|
|
|
error::Error,
|
|
|
try_get_arg, try_get_arg_str,
|
|
@@ -11,9 +11,9 @@ use crate::{
|
|
|
};
|
|
|
use bytes::Bytes;
|
|
|
use futures::{stream::FuturesUnordered, Future, StreamExt};
|
|
|
-use std::{collections::VecDeque, ops::Deref, sync::Arc};
|
|
|
+use std::{collections::VecDeque, sync::Arc};
|
|
|
use tokio::{
|
|
|
- sync::broadcast::{self, error::RecvError, Receiver},
|
|
|
+ sync::broadcast::{self, Receiver},
|
|
|
time::{sleep, Duration, Instant},
|
|
|
};
|
|
|
|
|
@@ -74,9 +74,8 @@ fn remove_element(
|
|
|
}
|
|
|
|
|
|
#[inline]
|
|
|
-async fn wait_for_event(receiver: &mut Receiver<()>) -> () {
|
|
|
+async fn wait_for_event(receiver: &mut Receiver<()>) {
|
|
|
let _ = receiver.recv().await;
|
|
|
- ()
|
|
|
}
|
|
|
|
|
|
#[inline]
|
|
@@ -93,10 +92,10 @@ async fn schedule_blocking_task<F, T>(
|
|
|
conn.block();
|
|
|
|
|
|
let mut timeout_rx = if let Some(timeout) = timeout {
|
|
|
- let (mut timeout_sx, timeout_rx) = broadcast::channel::<()>(1);
|
|
|
+ let (timeout_sx, timeout_rx) = broadcast::channel::<()>(1);
|
|
|
// setup timeout triggering event
|
|
|
let conn_for_timeout = conn.clone();
|
|
|
- let keys_to_watch_for_timeout = keys_to_watch.clone();
|
|
|
+ let _keys_to_watch_for_timeout = keys_to_watch.clone();
|
|
|
let block_id = conn.get_block_id();
|
|
|
tokio::spawn(async move {
|
|
|
sleep(timeout - Instant::now()).await;
|
|
@@ -107,7 +106,7 @@ async fn schedule_blocking_task<F, T>(
|
|
|
conn_for_timeout.unblock(UnblockReason::Timeout);
|
|
|
conn_for_timeout.append_response(Value::Null);
|
|
|
// Notify timeout event to the worker thread
|
|
|
- timeout_sx.send(());
|
|
|
+ let _ = timeout_sx.send(());
|
|
|
});
|
|
|
Some(timeout_rx)
|
|
|
} else {
|
|
@@ -144,7 +143,7 @@ async fn schedule_blocking_task<F, T>(
|
|
|
|
|
|
let mut futures = changes_watchers
|
|
|
.iter_mut()
|
|
|
- .map(|c| wait_for_event(c))
|
|
|
+ .map(wait_for_event)
|
|
|
.collect::<FuturesUnordered<_>>();
|
|
|
|
|
|
if let Some(ref mut timeout_rx) = &mut timeout_rx {
|
|
@@ -202,12 +201,12 @@ pub async fn blpop(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
|
|
|
};
|
|
|
|
|
|
if conn.is_executing_tx() {
|
|
|
- return blpop_task(conn.clone(), args, 1).await;
|
|
|
+ return blpop_task(conn.get_connection(), args, 1).await;
|
|
|
}
|
|
|
|
|
|
let timeout = parse_timeout(&args.pop_back().ok_or(Error::Syntax)?)?;
|
|
|
- let conn = conn.clone();
|
|
|
- let keys_to_watch = args.iter().map(|c| c.clone()).collect::<Vec<_>>();
|
|
|
+ let conn = conn.get_connection();
|
|
|
+ let keys_to_watch = args.iter().cloned().collect::<Vec<_>>();
|
|
|
|
|
|
conn.block();
|
|
|
|
|
@@ -229,14 +228,14 @@ pub async fn blpop(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
|
|
|
/// See LMOVE for more information.
|
|
|
pub async fn blmove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
if conn.is_executing_tx() {
|
|
|
- return lmove(&conn, args).await;
|
|
|
+ return lmove(conn, args).await;
|
|
|
}
|
|
|
|
|
|
let timeout = parse_timeout(&args.pop_back().ok_or(Error::Syntax)?)?;
|
|
|
let keys_to_watch = vec![args[0].clone(), args[1].clone()];
|
|
|
|
|
|
schedule_blocking_task(
|
|
|
- conn.clone(),
|
|
|
+ conn.get_connection(),
|
|
|
keys_to_watch,
|
|
|
|conn, args, _| async move { lmove(&conn, args).await },
|
|
|
args,
|
|
@@ -288,13 +287,20 @@ pub async fn brpop(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
|
|
|
};
|
|
|
|
|
|
if conn.is_executing_tx() {
|
|
|
- return brpop_task(conn.clone(), args, 1).await;
|
|
|
+ return brpop_task(conn.get_connection(), args, 1).await;
|
|
|
}
|
|
|
|
|
|
let timeout = parse_timeout(&args.pop_back().ok_or(Error::Syntax)?)?;
|
|
|
let keys_to_watch = args.iter().cloned().collect();
|
|
|
|
|
|
- schedule_blocking_task(conn.clone(), keys_to_watch, brpop_task, args, timeout).await;
|
|
|
+ schedule_blocking_task(
|
|
|
+ conn.get_connection(),
|
|
|
+ keys_to_watch,
|
|
|
+ brpop_task,
|
|
|
+ args,
|
|
|
+ timeout,
|
|
|
+ )
|
|
|
+ .await;
|
|
|
|
|
|
Ok(Value::Ignore)
|
|
|
}
|
|
@@ -306,13 +312,11 @@ pub async fn brpop(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
|
|
|
pub async fn lindex(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
conn.db().get_map(&args[0], |v| match v {
|
|
|
Some(Value::List(x)) => {
|
|
|
- let mut index: i64 = bytes_to_number(&args[1])?;
|
|
|
+ let index: i64 = bytes_to_number(&args[1])?;
|
|
|
let x = x.read();
|
|
|
|
|
|
let index = if index < 0 {
|
|
|
- x.len()
|
|
|
- .checked_sub((index * -1) as usize)
|
|
|
- .unwrap_or(x.len())
|
|
|
+ x.len().checked_sub(-index as usize).unwrap_or(x.len())
|
|
|
} else {
|
|
|
index as usize
|
|
|
};
|
|
@@ -357,7 +361,7 @@ pub async fn linsert(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Val
|
|
|
if id > x.len() {
|
|
|
x.push_back(value);
|
|
|
} else {
|
|
|
- x.insert(id as usize, value);
|
|
|
+ x.insert(id, value);
|
|
|
}
|
|
|
|
|
|
found = true;
|
|
@@ -416,7 +420,7 @@ pub async fn lmove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
|
|
|
|
|
|
let db = conn.db();
|
|
|
|
|
|
- /// Lock keys to alter exclusively
|
|
|
+ // Lock keys to alter exclusively
|
|
|
db.lock_keys(&to_lock);
|
|
|
|
|
|
let mut to_create = None;
|
|
@@ -470,7 +474,7 @@ pub async fn lmove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
|
|
|
conn.db().set(destination.clone(), to_create.into(), None);
|
|
|
}
|
|
|
|
|
|
- /// release the lock on keys
|
|
|
+ // release the lock on keys
|
|
|
db.unlock_keys(&to_lock);
|
|
|
|
|
|
if result != Ok(Value::Null) {
|
|
@@ -488,7 +492,7 @@ pub async fn lmove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value
|
|
|
/// the list's length.
|
|
|
pub async fn lpop(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
let count = match args.get(1) {
|
|
|
- Some(v) => Some(bytes_to_number(&v)?),
|
|
|
+ Some(v) => Some(bytes_to_number(v)?),
|
|
|
None => None,
|
|
|
};
|
|
|
|
|
@@ -513,9 +517,9 @@ pub async fn lpos(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Err
|
|
|
|
|
|
let next = try_get_arg!(args, index + 1);
|
|
|
match try_get_arg_str!(args, index).to_uppercase().as_str() {
|
|
|
- "RANK" => rank = Some(bytes_to_number::<i64>(&next)?),
|
|
|
- "COUNT" => count = Some(bytes_to_number::<usize>(&next)?),
|
|
|
- "MAXLEN" => max_len = Some(bytes_to_number::<usize>(&next)?),
|
|
|
+ "RANK" => rank = Some(bytes_to_number::<i64>(next)?),
|
|
|
+ "COUNT" => count = Some(bytes_to_number::<usize>(next)?),
|
|
|
+ "MAXLEN" => max_len = Some(bytes_to_number::<usize>(next)?),
|
|
|
_ => return Err(Error::Syntax),
|
|
|
}
|
|
|
|
|
@@ -527,7 +531,7 @@ pub async fn lpos(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Err
|
|
|
return Err(Error::InvalidRank("RANK".to_owned()));
|
|
|
}
|
|
|
if rank < 0 {
|
|
|
- (true, Some((rank * -1) as usize))
|
|
|
+ (true, Some(-rank as usize))
|
|
|
} else {
|
|
|
(false, Some(rank as usize))
|
|
|
}
|
|
@@ -583,23 +587,19 @@ pub async fn lpos(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Err
|
|
|
let rank = rank - 1;
|
|
|
|
|
|
let result = if rank < result.len() {
|
|
|
- (&result[rank..]).to_vec()
|
|
|
+ result[rank..].to_vec()
|
|
|
} else {
|
|
|
vec![]
|
|
|
};
|
|
|
|
|
|
return Ok(if let Some(count) = count {
|
|
|
if count > 0 && count < result.len() {
|
|
|
- (&result[0..count]).to_vec().into()
|
|
|
+ result[0..count].to_vec().into()
|
|
|
} else {
|
|
|
result.to_vec().into()
|
|
|
}
|
|
|
} else {
|
|
|
- result
|
|
|
- .to_vec()
|
|
|
- .get(0)
|
|
|
- .map(|c| c.clone())
|
|
|
- .unwrap_or_default()
|
|
|
+ result.to_vec().first().cloned().unwrap_or_default()
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -666,9 +666,7 @@ pub async fn lpushx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Valu
|
|
|
}
|
|
|
Ok(x.len().into())
|
|
|
}
|
|
|
- None => {
|
|
|
- return Ok(0.into());
|
|
|
- }
|
|
|
+ None => Ok(0.into()),
|
|
|
_ => Err(Error::WrongType),
|
|
|
})?;
|
|
|
|
|
@@ -691,18 +689,16 @@ pub async fn lrange(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, E
|
|
|
let x = x.read();
|
|
|
|
|
|
let start = if start < 0 {
|
|
|
- x.len()
|
|
|
- .checked_sub((start * -1) as usize)
|
|
|
- .unwrap_or_default()
|
|
|
+ x.len().checked_sub(-start as usize).unwrap_or_default()
|
|
|
} else {
|
|
|
- (start as usize)
|
|
|
+ start as usize
|
|
|
};
|
|
|
|
|
|
let end = if end < 0 {
|
|
|
- if let Some(x) = x.len().checked_sub((end * -1) as usize) {
|
|
|
+ if let Some(x) = x.len().checked_sub(-end as usize) {
|
|
|
x
|
|
|
} else {
|
|
|
- return Ok(Value::Array((vec![])));
|
|
|
+ return Ok(Value::Array(vec![]));
|
|
|
}
|
|
|
} else {
|
|
|
end as usize
|
|
@@ -847,7 +843,7 @@ pub async fn ltrim(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Er
|
|
|
/// list's length.
|
|
|
pub async fn rpop(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
let count = match args.get(1) {
|
|
|
- Some(v) => Some(bytes_to_number(&v)?),
|
|
|
+ Some(v) => Some(bytes_to_number(v)?),
|
|
|
None => None,
|
|
|
};
|
|
|
|