|
@@ -87,10 +87,10 @@ async fn schedule_blocking_task<F, T>(
|
|
|
conn: Arc<Connection>,
|
|
|
keys_to_watch: Vec<Bytes>,
|
|
|
worker: F,
|
|
|
- args: Vec<Bytes>,
|
|
|
+ args: VecDeque<Bytes>,
|
|
|
timeout: Option<Instant>,
|
|
|
) where
|
|
|
- F: Fn(Arc<Connection>, Vec<Bytes>, usize) -> T + Send + Sync + 'static,
|
|
|
+ F: Fn(Arc<Connection>, VecDeque<Bytes>, usize) -> T + Send + Sync + 'static,
|
|
|
T: Future<Output = Result<Value, Error>> + Send + Sync + 'static,
|
|
|
{
|
|
|
let (mut timeout_sx, mut timeout_rx) = broadcast::channel::<()>(1);
|
|
@@ -124,7 +124,7 @@ async fn schedule_blocking_task<F, T>(
|
|
|
|
|
|
loop {
|
|
|
// Run task
|
|
|
- match worker(conn.clone(), args.to_vec(), attempt).await {
|
|
|
+ match worker(conn.clone(), args.clone(), attempt).await {
|
|
|
Ok(Value::Ignore | Value::Null) => {}
|
|
|
Ok(result) => {
|
|
|
conn.append_response(result);
|
|
@@ -183,13 +183,12 @@ fn parse_timeout(arg: &Bytes) -> Result<Option<Instant>, Error> {
|
|
|
/// the connection when there are no elements to pop from any of the given lists. An element is
|
|
|
/// popped from the head of the first list that is non-empty, with the given keys being checked in
|
|
|
/// the order that they are given.
|
|
|
-pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let blpop_task = |conn: Arc<Connection>, args: Vec<Bytes>, attempt| async move {
|
|
|
- for key in (1..args.len() - 1) {
|
|
|
- let key = &args[key];
|
|
|
+pub async fn blpop(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let blpop_task = |conn: Arc<Connection>, args: VecDeque<Bytes>, attempt| async move {
|
|
|
+ for key in args.iter() {
|
|
|
match remove_element(&conn, key, None, true) {
|
|
|
Ok(Value::Null) => (),
|
|
|
- Ok(n) => return Ok(vec![Value::new(&key), n].into()),
|
|
|
+ Ok(n) => return Ok(vec![Value::Blob(key.clone()), n].into()),
|
|
|
Err(x) => {
|
|
|
if attempt == 1 {
|
|
|
return Err(x);
|
|
@@ -201,13 +200,12 @@ pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
};
|
|
|
|
|
|
if conn.is_executing_tx() {
|
|
|
- return blpop_task(conn.clone(), args.to_vec(), 1).await;
|
|
|
+ return blpop_task(conn.clone(), args, 1).await;
|
|
|
}
|
|
|
|
|
|
let timeout = parse_timeout(&args[args.len() - 1])?;
|
|
|
let conn = conn.clone();
|
|
|
- let args = args.to_vec();
|
|
|
- let keys_to_watch = (&args[1..args.len() - 1]).to_vec();
|
|
|
+ let keys_to_watch = args.iter().map(|c| c.clone()).collect::<Vec<_>>();
|
|
|
|
|
|
conn.block();
|
|
|
|
|
@@ -227,19 +225,19 @@ pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// RIGHT LEFT is equivalent.
|
|
|
///
|
|
|
/// See LMOVE for more information.
|
|
|
-pub async fn blmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn blmove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
if conn.is_executing_tx() {
|
|
|
- return lmove(&conn, &args).await;
|
|
|
+ return lmove(&conn, args).await;
|
|
|
}
|
|
|
|
|
|
- let timeout = parse_timeout(&args[5])?;
|
|
|
- let keys_to_watch = (&args[1..=2]).to_vec();
|
|
|
+ let timeout = parse_timeout(&args[4])?;
|
|
|
+ let keys_to_watch = vec![args[0].clone(), args[1].clone()];
|
|
|
|
|
|
schedule_blocking_task(
|
|
|
conn.clone(),
|
|
|
keys_to_watch,
|
|
|
- |conn, args, _| async move { lmove(&conn, &args).await },
|
|
|
- args.to_vec(),
|
|
|
+ |conn, args, _| async move { lmove(&conn, args).await },
|
|
|
+ args,
|
|
|
timeout,
|
|
|
)
|
|
|
.await;
|
|
@@ -253,17 +251,16 @@ pub async fn blmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// is empty, Redis will block the connection until another client pushes to it
|
|
|
/// or until timeout is reached. A timeout of zero can be used to block
|
|
|
/// indefinitely.
|
|
|
-pub async fn brpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn brpoplpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
blmove(
|
|
|
conn,
|
|
|
- &[
|
|
|
- "blmove".into(),
|
|
|
- args[1].clone(),
|
|
|
- args[2].clone(),
|
|
|
+ VecDeque::from([
|
|
|
+ args.pop_front().ok_or(Error::Syntax)?,
|
|
|
+ args.pop_front().ok_or(Error::Syntax)?,
|
|
|
"RIGHT".into(),
|
|
|
"LEFT".into(),
|
|
|
- args[3].clone(),
|
|
|
- ],
|
|
|
+ args.pop_front().ok_or(Error::Syntax)?,
|
|
|
+ ]),
|
|
|
)
|
|
|
.await
|
|
|
}
|
|
@@ -272,13 +269,12 @@ pub async fn brpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Erro
|
|
|
/// the connection when there are no elements to pop from any of the given lists. An element is
|
|
|
/// popped from the tail of the first list that is non-empty, with the given keys being checked in
|
|
|
/// the order that they are given.
|
|
|
-pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let brpop_task = |conn: Arc<Connection>, args: Vec<Bytes>, attempt| async move {
|
|
|
- for key in (1..args.len() - 1) {
|
|
|
- let key = &args[key];
|
|
|
+pub async fn brpop(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let brpop_task = |conn: Arc<Connection>, args: VecDeque<Bytes>, attempt| async move {
|
|
|
+ for key in args.iter() {
|
|
|
match remove_element(&conn, key, None, false) {
|
|
|
Ok(Value::Null) => (),
|
|
|
- Ok(n) => return Ok(vec![Value::new(&key), n].into()),
|
|
|
+ Ok(n) => return Ok(vec![Value::Blob(key.clone()), n].into()),
|
|
|
Err(x) => {
|
|
|
if attempt == 1 {
|
|
|
return Err(x);
|
|
@@ -290,20 +286,13 @@ pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
};
|
|
|
|
|
|
if conn.is_executing_tx() {
|
|
|
- return brpop_task(conn.clone(), args.to_vec(), 1).await;
|
|
|
+ return brpop_task(conn.clone(), args, 1).await;
|
|
|
}
|
|
|
|
|
|
let timeout = parse_timeout(&args[args.len() - 1])?;
|
|
|
- let keys_to_watch = (&args[1..args.len() - 1]).to_vec();
|
|
|
+ let keys_to_watch = args.iter().cloned().collect();
|
|
|
|
|
|
- schedule_blocking_task(
|
|
|
- conn.clone(),
|
|
|
- keys_to_watch,
|
|
|
- brpop_task,
|
|
|
- args.to_vec(),
|
|
|
- timeout,
|
|
|
- )
|
|
|
- .await;
|
|
|
+ schedule_blocking_task(conn.clone(), keys_to_watch, brpop_task, args, timeout).await;
|
|
|
|
|
|
Ok(Value::Ignore)
|
|
|
}
|
|
@@ -312,12 +301,12 @@ pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// means the first element, 1 the second element and so on. Negative indices can be used to
|
|
|
/// designate elements starting at the tail of the list. Here, -1 means the last element, -2 means
|
|
|
/// the penultimate and so forth.
|
|
|
-pub async fn lindex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn lindex(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &args[0],
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
- let mut index: i64 = bytes_to_number(&args[2])?;
|
|
|
+ let mut index: i64 = bytes_to_number(&args[1])?;
|
|
|
let x = x.read();
|
|
|
|
|
|
let index = if index < 0 {
|
|
@@ -339,20 +328,26 @@ pub async fn lindex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// Inserts element in the list stored at key either before or after the reference value pivot.
|
|
|
///
|
|
|
/// When key does not exist, it is considered an empty list and no operation is performed.
|
|
|
-pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let is_before = if check_arg!(args, 2, "BEFORE") {
|
|
|
- true
|
|
|
- } else if check_arg!(args, 2, "AFTER") {
|
|
|
- false
|
|
|
- } else {
|
|
|
- return Err(Error::Syntax);
|
|
|
+pub async fn linsert(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let key = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let direction = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let pivot = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let value = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+
|
|
|
+ let is_before = match String::from_utf8_lossy(&direction)
|
|
|
+ .to_ascii_uppercase()
|
|
|
+ .as_str()
|
|
|
+ {
|
|
|
+ "BEFORE" => true,
|
|
|
+ "AFTER" => false,
|
|
|
+ _ => return Err(Error::Syntax),
|
|
|
};
|
|
|
|
|
|
let result = conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &key,
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
- let pivot = checksum::Ref::new(&args[3]);
|
|
|
+ let pivot = checksum::Ref::new(&pivot);
|
|
|
let mut x = x.write();
|
|
|
let mut found = false;
|
|
|
|
|
@@ -360,7 +355,7 @@ pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
|
|
|
if *val == pivot {
|
|
|
let id = if is_before { key } else { key + 1 };
|
|
|
|
|
|
- let value = checksum::Value::new(args[4].clone());
|
|
|
+ let value = checksum::Value::new(value);
|
|
|
|
|
|
if id > x.len() {
|
|
|
x.push_back(value);
|
|
@@ -384,16 +379,16 @@ pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error>
|
|
|
|| Ok(0.into()),
|
|
|
)?;
|
|
|
|
|
|
- conn.db().bump_version(&args[1]);
|
|
|
+ conn.db().bump_version(&key);
|
|
|
|
|
|
Ok(result)
|
|
|
}
|
|
|
|
|
|
/// Returns the length of the list stored at key. If key does not exist, it is interpreted as an
|
|
|
/// empty list and 0 is returned. An error is returned when the value stored at key is not a list.
|
|
|
-pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn llen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &args[0],
|
|
|
|v| match v {
|
|
|
Value::List(x) => Ok(x.read().len().into()),
|
|
|
_ => Err(Error::WrongType),
|
|
@@ -405,35 +400,39 @@ pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// Atomically returns and removes the first/last element (head/tail depending on the wherefrom
|
|
|
/// argument) of the list stored at source, and pushes the element at the first/last element
|
|
|
/// (head/tail depending on the whereto argument) of the list stored at destination.
|
|
|
-pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let source_is_left = if check_arg!(args, 3, "LEFT") {
|
|
|
+pub async fn lmove(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let source_is_left = if check_arg!(args, 2, "LEFT") {
|
|
|
true
|
|
|
- } else if check_arg!(args, 3, "RIGHT") {
|
|
|
+ } else if check_arg!(args, 2, "RIGHT") {
|
|
|
false
|
|
|
} else {
|
|
|
return Err(Error::Syntax);
|
|
|
};
|
|
|
|
|
|
- let target_is_left = if check_arg!(args, 4, "LEFT") {
|
|
|
+ let target_is_left = if check_arg!(args, 3, "LEFT") {
|
|
|
true
|
|
|
- } else if check_arg!(args, 4, "RIGHT") {
|
|
|
+ } else if check_arg!(args, 3, "RIGHT") {
|
|
|
false
|
|
|
} else {
|
|
|
return Err(Error::Syntax);
|
|
|
};
|
|
|
|
|
|
+ let source = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let destination = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let to_lock = vec![source.clone(), destination.clone()];
|
|
|
+
|
|
|
let db = conn.db();
|
|
|
|
|
|
/// Lock keys to alter exclusively
|
|
|
- db.lock_keys(&args[1..=2]);
|
|
|
+ db.lock_keys(&to_lock);
|
|
|
|
|
|
let mut to_create = None;
|
|
|
|
|
|
let result = db.get_map_or(
|
|
|
- &args[1],
|
|
|
+ &source,
|
|
|
|v| match v {
|
|
|
Value::List(source) => conn.db().get_map_or(
|
|
|
- &args[2],
|
|
|
+ &destination,
|
|
|
|v| match v {
|
|
|
Value::List(target) => {
|
|
|
let element = if source_is_left {
|
|
@@ -481,15 +480,15 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
);
|
|
|
|
|
|
if let Some(to_create) = to_create {
|
|
|
- conn.db().set(&args[2], to_create.into(), None);
|
|
|
+ conn.db().set(destination.clone(), to_create.into(), None);
|
|
|
}
|
|
|
|
|
|
/// release the lock on keys
|
|
|
- db.unlock_keys(&args[1..=2]);
|
|
|
+ db.unlock_keys(&to_lock);
|
|
|
|
|
|
if result != Ok(Value::Null) {
|
|
|
- conn.db().bump_version(&args[1]);
|
|
|
- conn.db().bump_version(&args[2]);
|
|
|
+ conn.db().bump_version(&source);
|
|
|
+ conn.db().bump_version(&destination);
|
|
|
}
|
|
|
|
|
|
result
|
|
@@ -500,26 +499,26 @@ pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// By default, the command pops a single element from the beginning of the list. When provided
|
|
|
/// with the optional count argument, the reply will consist of up to count elements, depending on
|
|
|
/// the list's length.
|
|
|
-pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let count = match args.get(2) {
|
|
|
+pub async fn lpop(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let count = match args.get(1) {
|
|
|
Some(v) => Some(bytes_to_number(&v)?),
|
|
|
None => None,
|
|
|
};
|
|
|
|
|
|
- remove_element(conn, &args[1], count, true)
|
|
|
+ remove_element(conn, &args[0], count, true)
|
|
|
}
|
|
|
|
|
|
/// The command returns the index of matching elements inside a Redis list. By default, when no
|
|
|
/// options are given, it will scan the list from head to tail, looking for the first match of
|
|
|
/// "element". If the element is found, its index (the zero-based position in the list) is
|
|
|
/// returned. Otherwise, if no match is found, nil is returned.
|
|
|
-pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let element = checksum::Ref::new(&args[2]);
|
|
|
+pub async fn lpos(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let element = checksum::Ref::new(&args[1]);
|
|
|
let mut rank = None;
|
|
|
let mut count = None;
|
|
|
let mut max_len = None;
|
|
|
|
|
|
- let mut index = 3;
|
|
|
+ let mut index = 2;
|
|
|
loop {
|
|
|
if args.len() <= index {
|
|
|
break;
|
|
@@ -552,7 +551,7 @@ pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
let max_len = max_len.unwrap_or_default();
|
|
|
|
|
|
conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &args[0],
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
let x = x.read();
|
|
@@ -646,15 +645,14 @@ pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// the list, from the leftmost element to the rightmost element. So for instance the command LPUSH
|
|
|
/// mylist a b c will result into a list containing c as first element, b as second element and a
|
|
|
/// as third element.
|
|
|
-pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let is_push_x = check_arg!(args, 0, "LPUSHX");
|
|
|
-
|
|
|
+pub async fn lpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let key = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
let result = conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &key,
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
let mut x = x.write();
|
|
|
- for val in args.iter().skip(2) {
|
|
|
+ for val in args.clone().into_iter() {
|
|
|
x.push_front(checksum::Value::new(val.clone()));
|
|
|
}
|
|
|
Ok(x.len().into())
|
|
@@ -662,23 +660,43 @@ pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
_ => Err(Error::WrongType),
|
|
|
},
|
|
|
|| {
|
|
|
- if is_push_x {
|
|
|
- return Ok(0.into());
|
|
|
- }
|
|
|
let mut h = VecDeque::new();
|
|
|
|
|
|
- for val in args.iter().skip(2) {
|
|
|
- h.push_front(checksum::Value::new(val.clone()));
|
|
|
+ for val in args.clone().into_iter() {
|
|
|
+ h.push_front(checksum::Value::new(val));
|
|
|
}
|
|
|
|
|
|
let len = h.len();
|
|
|
- conn.db().set(&args[1], h.into(), None);
|
|
|
+ conn.db().set(key.clone(), h.into(), None);
|
|
|
Ok(len.into())
|
|
|
},
|
|
|
)?;
|
|
|
|
|
|
- conn.db().bump_version(&args[1]);
|
|
|
+ conn.db().bump_version(&key);
|
|
|
+ Ok(result)
|
|
|
+}
|
|
|
+
|
|
|
+/// LPUSHX key element
|
|
|
+pub async fn lpushx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let key = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let result = conn.db().get_map_or(
|
|
|
+ &key,
|
|
|
+ |v| match v {
|
|
|
+ Value::List(x) => {
|
|
|
+ let mut x = x.write();
|
|
|
+ for val in args.into_iter() {
|
|
|
+ x.push_front(checksum::Value::new(val));
|
|
|
+ }
|
|
|
+ Ok(x.len().into())
|
|
|
+ }
|
|
|
+ _ => Err(Error::WrongType),
|
|
|
+ },
|
|
|
+ || {
|
|
|
+ return Ok(0.into());
|
|
|
+ },
|
|
|
+ )?;
|
|
|
|
|
|
+ conn.db().bump_version(&key);
|
|
|
Ok(result)
|
|
|
}
|
|
|
|
|
@@ -688,13 +706,13 @@ pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
///
|
|
|
/// These offsets can also be negative numbers indicating offsets starting at the end of the list.
|
|
|
/// For example, -1 is the last element of the list, -2 the penultimate, and so on.
|
|
|
-pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn lrange(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &args[0],
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
- let start: i64 = bytes_to_number(&args[2])?;
|
|
|
- let end: i64 = bytes_to_number(&args[3])?;
|
|
|
+ let start: i64 = bytes_to_number(&args[1])?;
|
|
|
+ let end: i64 = bytes_to_number(&args[2])?;
|
|
|
let mut ret = vec![];
|
|
|
let x = x.read();
|
|
|
|
|
@@ -731,13 +749,13 @@ pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
}
|
|
|
|
|
|
/// Removes the first count occurrences of elements equal to element from the list stored at key
|
|
|
-pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn lrem(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
let result = conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &args[0],
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
- let element = checksum::Ref::new(&args[3]);
|
|
|
- let limit: i64 = bytes_to_number(&args[2])?;
|
|
|
+ let element = checksum::Ref::new(&args[2]);
|
|
|
+ let limit: i64 = bytes_to_number(&args[1])?;
|
|
|
let mut x = x.write();
|
|
|
|
|
|
let (is_reverse, limit) = if limit < 0 {
|
|
@@ -777,7 +795,7 @@ pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
|| Ok(0.into()),
|
|
|
)?;
|
|
|
|
|
|
- conn.db().bump_version(&args[1]);
|
|
|
+ conn.db().bump_version(&args[0]);
|
|
|
|
|
|
Ok(result)
|
|
|
}
|
|
@@ -786,12 +804,16 @@ pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// LINDEX.
|
|
|
///
|
|
|
/// An error is returned for out of range indexes.
|
|
|
-pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn lset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let key = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let index = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let value = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+
|
|
|
let result = conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &key,
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
- let mut index: i64 = bytes_to_number(&args[2])?;
|
|
|
+ let mut index: i64 = bytes_to_number(&index)?;
|
|
|
let mut x = x.write();
|
|
|
|
|
|
if index < 0 {
|
|
@@ -799,7 +821,7 @@ pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
}
|
|
|
|
|
|
if let Some(x) = x.get_mut(index as usize) {
|
|
|
- *x = checksum::Value::new(args[3].clone());
|
|
|
+ *x = checksum::Value::new(value);
|
|
|
Ok(Value::Ok)
|
|
|
} else {
|
|
|
Err(Error::OutOfRange)
|
|
@@ -810,7 +832,7 @@ pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
|| Err(Error::NotFound),
|
|
|
)?;
|
|
|
|
|
|
- conn.db().bump_version(&args[1]);
|
|
|
+ conn.db().bump_version(&key);
|
|
|
|
|
|
Ok(result)
|
|
|
}
|
|
@@ -818,13 +840,13 @@ pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// Trim an existing list so that it will contain only the specified range of elements specified.
|
|
|
/// Both start and stop are zero-based indexes, where 0 is the first element of the list (the
|
|
|
/// head), 1 the next element and so on.
|
|
|
-pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn ltrim(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
let result = conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &args[0],
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
- let mut start: i64 = bytes_to_number(&args[2])?;
|
|
|
- let mut end: i64 = bytes_to_number(&args[3])?;
|
|
|
+ let mut start: i64 = bytes_to_number(&args[1])?;
|
|
|
+ let mut end: i64 = bytes_to_number(&args[2])?;
|
|
|
let mut x = x.write();
|
|
|
|
|
|
if start < 0 {
|
|
@@ -859,27 +881,26 @@ pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
/// By default, the command pops a single element from the end of the list. When provided with the
|
|
|
/// optional count argument, the reply will consist of up to count elements, depending on the
|
|
|
/// list's length.
|
|
|
-pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let count = match args.get(2) {
|
|
|
+pub async fn rpop(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let count = match args.get(1) {
|
|
|
Some(v) => Some(bytes_to_number(&v)?),
|
|
|
None => None,
|
|
|
};
|
|
|
|
|
|
- remove_element(conn, &args[1], count, false)
|
|
|
+ remove_element(conn, &args[0], count, false)
|
|
|
}
|
|
|
|
|
|
/// Atomically returns and removes the last element (tail) of the list stored at source, and pushes
|
|
|
/// the element at the first element (head) of the list stored at destination.
|
|
|
-pub async fn rpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
+pub async fn rpoplpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
lmove(
|
|
|
conn,
|
|
|
- &[
|
|
|
- "lmove".into(),
|
|
|
- args[1].clone(),
|
|
|
- args[2].clone(),
|
|
|
+ VecDeque::from([
|
|
|
+ args.pop_front().ok_or(Error::Syntax)?,
|
|
|
+ args.pop_front().ok_or(Error::Syntax)?,
|
|
|
"RIGHT".into(),
|
|
|
"LEFT".into(),
|
|
|
- ],
|
|
|
+ ]),
|
|
|
)
|
|
|
.await
|
|
|
}
|
|
@@ -887,39 +908,60 @@ pub async fn rpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error
|
|
|
/// Insert all the specified values at the tail of the list stored at key. If key does not exist,
|
|
|
/// it is created as empty list before performing the push operation. When key holds a value that
|
|
|
/// is not a list, an error is returned.
|
|
|
-pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
|
|
|
- let is_push_x = check_arg!(args, 0, "RPUSHX");
|
|
|
-
|
|
|
+pub async fn rpushx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let key = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
let result = conn.db().get_map_or(
|
|
|
- &args[1],
|
|
|
+ &key,
|
|
|
|v| match v {
|
|
|
Value::List(x) => {
|
|
|
let mut x = x.write();
|
|
|
- for val in args.iter().skip(2) {
|
|
|
- x.push_back(checksum::Value::new(val.clone()));
|
|
|
+ for val in args.into_iter() {
|
|
|
+ x.push_back(checksum::Value::new(val));
|
|
|
}
|
|
|
Ok(x.len().into())
|
|
|
}
|
|
|
_ => Err(Error::WrongType),
|
|
|
},
|
|
|
|| {
|
|
|
- if is_push_x {
|
|
|
- return Ok(0.into());
|
|
|
+ return Ok(0.into());
|
|
|
+ },
|
|
|
+ )?;
|
|
|
+
|
|
|
+ conn.db().bump_version(&key);
|
|
|
+ Ok(result)
|
|
|
+}
|
|
|
+
|
|
|
+/// Insert all the specified values at the tail of the list stored at key. If key does not exist,
|
|
|
+/// it is created as empty list before performing the push operation. When key holds a value that
|
|
|
+/// is not a list, an error is returned.
|
|
|
+pub async fn rpush(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
|
|
|
+ let key = args.pop_front().ok_or(Error::Syntax)?;
|
|
|
+ let result = conn.db().get_map_or(
|
|
|
+ &key,
|
|
|
+ |v| match v {
|
|
|
+ Value::List(x) => {
|
|
|
+ let mut x = x.write();
|
|
|
+ for val in args.clone().into_iter() {
|
|
|
+ x.push_back(checksum::Value::new(val));
|
|
|
+ }
|
|
|
+ Ok(x.len().into())
|
|
|
}
|
|
|
+ _ => Err(Error::WrongType),
|
|
|
+ },
|
|
|
+ || {
|
|
|
let mut h = VecDeque::new();
|
|
|
|
|
|
- for val in args.iter().skip(2) {
|
|
|
- h.push_back(checksum::Value::new(val.clone()));
|
|
|
+ for val in args.clone().into_iter() {
|
|
|
+ h.push_back(checksum::Value::new(val));
|
|
|
}
|
|
|
|
|
|
let len = h.len();
|
|
|
- conn.db().set(&args[1], h.into(), None);
|
|
|
+ conn.db().set(key.clone(), h.into(), None);
|
|
|
Ok(len.into())
|
|
|
},
|
|
|
)?;
|
|
|
|
|
|
- conn.db().bump_version(&args[1]);
|
|
|
-
|
|
|
+ conn.db().bump_version(&key);
|
|
|
Ok(result)
|
|
|
}
|
|
|
|