1
0

list.rs 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510
  1. use crate::{
  2. check_arg, connection::Connection, error::Error, value::bytes_to_number, value::checksum,
  3. value::Value,
  4. };
  5. use bytes::Bytes;
  6. use std::collections::VecDeque;
  7. use tokio::time::{sleep, Duration, Instant};
  8. #[allow(clippy::needless_range_loop)]
  9. fn remove_element(
  10. conn: &Connection,
  11. key: &Bytes,
  12. count: usize,
  13. front: bool,
  14. ) -> Result<Value, Error> {
  15. let result = conn.db().get_map_or(
  16. key,
  17. |v| match v {
  18. Value::List(x) => {
  19. let mut x = x.write();
  20. if count == 0 {
  21. // Return a single element
  22. return Ok((if front { x.pop_front() } else { x.pop_back() })
  23. .map_or(Value::Null, |x| x.clone_value()));
  24. }
  25. let mut ret = vec![None; count];
  26. for i in 0..count {
  27. if front {
  28. ret[i] = x.pop_front();
  29. } else {
  30. ret[i] = x.pop_back();
  31. }
  32. }
  33. let ret: Vec<Value> = ret
  34. .iter()
  35. .filter(|v| v.is_some())
  36. .map(|x| x.as_ref().unwrap().clone_value())
  37. .collect();
  38. Ok(if ret.is_empty() {
  39. Value::Null
  40. } else {
  41. ret.into()
  42. })
  43. }
  44. _ => Err(Error::WrongType),
  45. },
  46. || Ok(Value::Null),
  47. )?;
  48. conn.db().bump_version(key);
  49. Ok(result)
  50. }
  51. pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  52. let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
  53. let len = args.len() - 1;
  54. loop {
  55. for key in args[1..len].iter() {
  56. match remove_element(conn, key, 0, true)? {
  57. Value::Null => (),
  58. n => return Ok(vec![Value::Blob(key.clone()), n].into()),
  59. };
  60. }
  61. if Instant::now() >= timeout || conn.is_executing_transaction() {
  62. break;
  63. }
  64. sleep(Duration::from_millis(100)).await;
  65. }
  66. Ok(Value::Null)
  67. }
  68. pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  69. let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
  70. let len = args.len() - 1;
  71. loop {
  72. for key in args[1..len].iter() {
  73. match remove_element(conn, key, 0, false)? {
  74. Value::Null => (),
  75. n => return Ok(vec![Value::Blob(key.clone()), n].into()),
  76. };
  77. }
  78. if Instant::now() >= timeout || conn.is_executing_transaction() {
  79. break;
  80. }
  81. sleep(Duration::from_millis(100)).await;
  82. }
  83. Ok(Value::Null)
  84. }
  85. pub async fn lindex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  86. conn.db().get_map_or(
  87. &args[1],
  88. |v| match v {
  89. Value::List(x) => {
  90. let mut index: i64 = bytes_to_number(&args[2])?;
  91. let x = x.read();
  92. if index < 0 {
  93. index += x.len() as i64;
  94. }
  95. Ok(x.get(index as usize)
  96. .map_or(Value::Null, |x| x.clone_value()))
  97. }
  98. _ => Err(Error::WrongType),
  99. },
  100. || Ok(0.into()),
  101. )
  102. }
  103. pub async fn linsert(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  104. let is_before = if check_arg!(args, 2, "BEFORE") {
  105. true
  106. } else if check_arg!(args, 2, "AFTER") {
  107. false
  108. } else {
  109. return Err(Error::Syntax);
  110. };
  111. let result = conn.db().get_map_or(
  112. &args[1],
  113. |v| match v {
  114. Value::List(x) => {
  115. let pivot = checksum::Ref::new(&args[3]);
  116. let mut x = x.write();
  117. let mut found = false;
  118. for (key, val) in x.iter().enumerate() {
  119. if *val == pivot {
  120. let id = if is_before { key } else { key + 1 };
  121. let value = checksum::Value::new(args[4].clone());
  122. if id > x.len() {
  123. x.push_back(value);
  124. } else {
  125. x.insert(id as usize, value);
  126. }
  127. found = true;
  128. break;
  129. }
  130. }
  131. if found {
  132. Ok(x.len().into())
  133. } else {
  134. Ok((-1).into())
  135. }
  136. }
  137. _ => Err(Error::WrongType),
  138. },
  139. || Ok(0.into()),
  140. )?;
  141. conn.db().bump_version(&args[1]);
  142. Ok(result)
  143. }
  144. pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  145. conn.db().get_map_or(
  146. &args[1],
  147. |v| match v {
  148. Value::List(x) => Ok(x.read().len().into()),
  149. _ => Err(Error::WrongType),
  150. },
  151. || Ok(0.into()),
  152. )
  153. }
  154. pub async fn lmove(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  155. let source_is_left = if check_arg!(args, 3, "LEFT") {
  156. true
  157. } else if check_arg!(args, 3, "RIGHT") {
  158. false
  159. } else {
  160. return Err(Error::Syntax);
  161. };
  162. let target_is_left = if check_arg!(args, 4, "LEFT") {
  163. true
  164. } else if check_arg!(args, 4, "RIGHT") {
  165. false
  166. } else {
  167. return Err(Error::Syntax);
  168. };
  169. let result = conn.db().get_map_or(
  170. &args[1],
  171. |v| match v {
  172. Value::List(source) => conn.db().get_map_or(
  173. &args[2],
  174. |v| match v {
  175. Value::List(target) => {
  176. let element = if source_is_left {
  177. source.write().pop_front()
  178. } else {
  179. source.write().pop_back()
  180. };
  181. if let Some(element) = element {
  182. let ret = element.clone_value();
  183. if target_is_left {
  184. target.write().push_front(element);
  185. } else {
  186. target.write().push_back(element);
  187. }
  188. Ok(ret)
  189. } else {
  190. Ok(Value::Null)
  191. }
  192. }
  193. _ => Err(Error::WrongType),
  194. },
  195. || {
  196. let element = if source_is_left {
  197. source.write().pop_front()
  198. } else {
  199. source.write().pop_back()
  200. };
  201. if let Some(element) = element {
  202. let ret = element.clone_value();
  203. let mut h = VecDeque::new();
  204. h.push_front(element);
  205. conn.db().set(&args[2], h.into(), None);
  206. Ok(ret)
  207. } else {
  208. Ok(Value::Null)
  209. }
  210. },
  211. ),
  212. _ => Err(Error::WrongType),
  213. },
  214. || Ok(Value::Null),
  215. )?;
  216. conn.db().bump_version(&args[1]);
  217. Ok(result)
  218. }
  219. pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  220. let count = if args.len() > 2 {
  221. bytes_to_number(&args[2])?
  222. } else {
  223. 0
  224. };
  225. remove_element(conn, &args[1], count, true)
  226. }
  227. pub async fn lpos(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  228. let mut index = 3;
  229. let element = checksum::Ref::new(&args[2]);
  230. let rank = if check_arg!(args, index, "RANK") {
  231. index += 2;
  232. Some(bytes_to_number::<usize>(&args[index - 1])?)
  233. } else {
  234. None
  235. };
  236. let count = if check_arg!(args, index, "COUNT") {
  237. index += 2;
  238. Some(bytes_to_number::<usize>(&args[index - 1])?)
  239. } else {
  240. None
  241. };
  242. let max_len = if check_arg!(args, index, "MAXLEN") {
  243. index += 2;
  244. bytes_to_number::<i64>(&args[index - 1])?
  245. } else {
  246. -1
  247. };
  248. if index != args.len() {
  249. return Err(Error::Syntax);
  250. }
  251. conn.db().get_map_or(
  252. &args[1],
  253. |v| match v {
  254. Value::List(x) => {
  255. let x = x.read();
  256. let mut ret: Vec<Value> = vec![];
  257. for (i, val) in x.iter().enumerate() {
  258. if *val == element {
  259. // Match!
  260. if let Some(count) = count {
  261. ret.push(i.into());
  262. if ret.len() > count {
  263. return Ok(ret.into());
  264. }
  265. } else if let Some(rank) = rank {
  266. ret.push(i.into());
  267. if ret.len() == rank {
  268. return Ok(ret[rank - 1].clone());
  269. }
  270. } else {
  271. // return first match!
  272. return Ok(i.into());
  273. }
  274. }
  275. if (i as i64) == max_len {
  276. break;
  277. }
  278. }
  279. if count.is_some() {
  280. Ok(ret.into())
  281. } else {
  282. Ok(Value::Null)
  283. }
  284. }
  285. _ => Err(Error::WrongType),
  286. },
  287. || {
  288. Ok(if count.is_some() {
  289. Value::Array(vec![])
  290. } else {
  291. Value::Null
  292. })
  293. },
  294. )
  295. }
  296. pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  297. let is_push_x = check_arg!(args, 0, "LPUSHX");
  298. let result = conn.db().get_map_or(
  299. &args[1],
  300. |v| match v {
  301. Value::List(x) => {
  302. let mut x = x.write();
  303. for val in args.iter().skip(2) {
  304. x.push_front(checksum::Value::new(val.clone()));
  305. }
  306. Ok(x.len().into())
  307. }
  308. _ => Err(Error::WrongType),
  309. },
  310. || {
  311. if is_push_x {
  312. return Ok(0.into());
  313. }
  314. let mut h = VecDeque::new();
  315. for val in args.iter().skip(2) {
  316. h.push_front(checksum::Value::new(val.clone()));
  317. }
  318. let len = h.len();
  319. conn.db().set(&args[1], h.into(), None);
  320. Ok(len.into())
  321. },
  322. )?;
  323. conn.db().bump_version(&args[1]);
  324. Ok(result)
  325. }
  326. pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  327. conn.db().get_map_or(
  328. &args[1],
  329. |v| match v {
  330. Value::List(x) => {
  331. let mut start: i64 = bytes_to_number(&args[2])?;
  332. let mut end: i64 = bytes_to_number(&args[3])?;
  333. let mut ret = vec![];
  334. let x = x.read();
  335. if start < 0 {
  336. start += x.len() as i64;
  337. }
  338. if end < 0 {
  339. end += x.len() as i64;
  340. }
  341. for (i, val) in x.iter().enumerate() {
  342. if i >= start as usize && i <= end as usize {
  343. ret.push(val.clone_value());
  344. }
  345. }
  346. Ok(ret.into())
  347. }
  348. _ => Err(Error::WrongType),
  349. },
  350. || Ok(Value::Array(vec![])),
  351. )
  352. }
  353. pub async fn lrem(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  354. let result = conn.db().get_map_or(
  355. &args[1],
  356. |v| match v {
  357. Value::List(x) => {
  358. let element = checksum::Ref::new(&args[3]);
  359. let limit: i64 = bytes_to_number(&args[2])?;
  360. let mut x = x.write();
  361. let (is_reverse, limit) = if limit < 0 {
  362. (true, -limit)
  363. } else {
  364. (false, limit)
  365. };
  366. let mut keep = vec![true; x.len()];
  367. let mut removed = 0;
  368. let len = x.len();
  369. for i in 0..len {
  370. let i = if is_reverse { len - 1 - i } else { i };
  371. if let Some(value) = x.get(i) {
  372. if *value == element {
  373. keep[i] = false;
  374. removed += 1;
  375. if removed == limit {
  376. break;
  377. }
  378. }
  379. }
  380. }
  381. let mut i = 0;
  382. x.retain(|_| {
  383. i += 1;
  384. keep[i - 1]
  385. });
  386. Ok(removed.into())
  387. }
  388. _ => Err(Error::WrongType),
  389. },
  390. || Ok(0.into()),
  391. )?;
  392. conn.db().bump_version(&args[1]);
  393. Ok(result)
  394. }
  395. pub async fn lset(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  396. let result = conn.db().get_map_or(
  397. &args[1],
  398. |v| match v {
  399. Value::List(x) => {
  400. let mut index: i64 = bytes_to_number(&args[2])?;
  401. let mut x = x.write();
  402. if index < 0 {
  403. index += x.len() as i64;
  404. }
  405. if let Some(x) = x.get_mut(index as usize) {
  406. *x = checksum::Value::new(args[3].clone());
  407. Ok(Value::Ok)
  408. } else {
  409. Err(Error::OutOfRange)
  410. }
  411. }
  412. _ => Err(Error::WrongType),
  413. },
  414. || Err(Error::NotFound),
  415. )?;
  416. conn.db().bump_version(&args[1]);
  417. Ok(result)
  418. }
  419. pub async fn ltrim(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  420. let result = conn.db().get_map_or(
  421. &args[1],
  422. |v| match v {
  423. Value::List(x) => {
  424. let mut start: i64 = bytes_to_number(&args[2])?;
  425. let mut end: i64 = bytes_to_number(&args[3])?;
  426. let mut x = x.write();
  427. if start < 0 {
  428. start += x.len() as i64;
  429. }
  430. if end < 0 {
  431. end += x.len() as i64;
  432. }
  433. let mut i = 0;
  434. x.retain(|_| {
  435. let retain = i >= start && i <= end;
  436. i += 1;
  437. retain
  438. });
  439. Ok(Value::Ok)
  440. }
  441. _ => Err(Error::WrongType),
  442. },
  443. || Ok(Value::Ok),
  444. )?;
  445. conn.db().bump_version(&args[1]);
  446. Ok(result)
  447. }
  448. pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  449. let count = if args.len() > 2 {
  450. bytes_to_number(&args[2])?
  451. } else {
  452. 0
  453. };
  454. remove_element(conn, &args[1], count, false)
  455. }
  456. pub async fn rpoplpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  457. lmove(
  458. conn,
  459. &[
  460. "lmove".into(),
  461. args[1].clone(),
  462. args[2].clone(),
  463. "RIGHT".into(),
  464. "LEFT".into(),
  465. ],
  466. )
  467. .await
  468. }
  469. pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  470. let is_push_x = check_arg!(args, 0, "RPUSHX");
  471. let result = conn.db().get_map_or(
  472. &args[1],
  473. |v| match v {
  474. Value::List(x) => {
  475. let mut x = x.write();
  476. for val in args.iter().skip(2) {
  477. x.push_back(checksum::Value::new(val.clone()));
  478. }
  479. Ok(x.len().into())
  480. }
  481. _ => Err(Error::WrongType),
  482. },
  483. || {
  484. if is_push_x {
  485. return Ok(0.into());
  486. }
  487. let mut h = VecDeque::new();
  488. for val in args.iter().skip(2) {
  489. h.push_back(checksum::Value::new(val.clone()));
  490. }
  491. let len = h.len();
  492. conn.db().set(&args[1], h.into(), None);
  493. Ok(len.into())
  494. },
  495. )?;
  496. conn.db().bump_version(&args[1]);
  497. Ok(result)
  498. }
  499. #[cfg(test)]
  500. mod test {
  501. use crate::{
  502. cmd::test::{create_connection, run_command},
  503. error::Error,
  504. value::Value,
  505. };
  506. use tokio::time::{sleep, Duration, Instant};
  507. #[tokio::test]
  508. async fn blpop_no_waiting() {
  509. let c = create_connection();
  510. assert_eq!(
  511. Ok(Value::Integer(5)),
  512. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  513. );
  514. assert_eq!(
  515. Ok(Value::Array(vec![
  516. Value::Blob("foo".into()),
  517. Value::Blob("5".into()),
  518. ])),
  519. run_command(&c, &["blpop", "foo", "1"]).await
  520. );
  521. }
  522. #[tokio::test]
  523. async fn blpop_timeout() {
  524. let c = create_connection();
  525. let x = Instant::now();
  526. assert_eq!(
  527. Ok(Value::Null),
  528. run_command(&c, &["blpop", "foobar", "1"]).await
  529. );
  530. assert!(Instant::now() - x > Duration::from_millis(1000));
  531. }
  532. #[tokio::test]
  533. async fn blpop_wait_insert() {
  534. let c = create_connection();
  535. let x = Instant::now();
  536. // Query command that will block connection until some data is inserted
  537. // to foobar, foo, bar or the 5 seconds timeout happens.
  538. //
  539. // We are issuing the command, sleeping a little bit then adding data to
  540. // foobar, before actually waiting on the result.
  541. let waiting = run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]);
  542. // Sleep 1 second before inserting new data
  543. sleep(Duration::from_millis(1000)).await;
  544. assert_eq!(
  545. Ok(Value::Integer(5)),
  546. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  547. );
  548. // Read the output of the first blpop command now.
  549. assert_eq!(
  550. Ok(Value::Array(vec![
  551. Value::Blob("foo".into()),
  552. Value::Blob("5".into()),
  553. ])),
  554. waiting.await
  555. );
  556. assert!(Instant::now() - x > Duration::from_millis(1000));
  557. assert!(Instant::now() - x < Duration::from_millis(5000));
  558. }
  559. #[tokio::test]
  560. async fn lrem_1() {
  561. let c = create_connection();
  562. assert_eq!(
  563. Ok(Value::Integer(5)),
  564. run_command(
  565. &c,
  566. &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
  567. )
  568. .await
  569. );
  570. assert_eq!(
  571. Ok(Value::Integer(3)),
  572. run_command(&c, &["lrem", "mylist", "3", "hello"]).await
  573. );
  574. assert_eq!(
  575. Ok(Value::Array(vec![
  576. Value::Blob("world".into()),
  577. Value::Blob("hello".into()),
  578. ])),
  579. run_command(&c, &["lrange", "mylist", "0", "-1"]).await
  580. );
  581. }
  582. #[tokio::test]
  583. async fn lrem_2() {
  584. let c = create_connection();
  585. assert_eq!(
  586. Ok(Value::Integer(5)),
  587. run_command(
  588. &c,
  589. &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
  590. )
  591. .await
  592. );
  593. assert_eq!(
  594. Ok(Value::Integer(2)),
  595. run_command(&c, &["lrem", "mylist", "-2", "hello"]).await
  596. );
  597. assert_eq!(
  598. Ok(Value::Array(vec![
  599. Value::Blob("hello".into()),
  600. Value::Blob("hello".into()),
  601. Value::Blob("world".into()),
  602. ])),
  603. run_command(&c, &["lrange", "mylist", "0", "-1"]).await
  604. );
  605. assert_eq!(
  606. Ok(Value::Integer(1)),
  607. run_command(&c, &["lrem", "mylist", "1", "hello"]).await
  608. );
  609. assert_eq!(
  610. Ok(Value::Array(vec![
  611. Value::Blob("hello".into()),
  612. Value::Blob("world".into()),
  613. ])),
  614. run_command(&c, &["lrange", "mylist", "0", "-1"]).await
  615. );
  616. }
  617. #[tokio::test]
  618. async fn lrem_3() {
  619. let c = create_connection();
  620. assert_eq!(
  621. Ok(Value::Integer(5)),
  622. run_command(
  623. &c,
  624. &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
  625. )
  626. .await
  627. );
  628. assert_eq!(
  629. Ok(Value::Integer(4)),
  630. run_command(&c, &["lrem", "mylist", "-100", "hello"]).await
  631. );
  632. assert_eq!(
  633. Ok(Value::Array(vec![Value::Blob("world".into()),])),
  634. run_command(&c, &["lrange", "mylist", "0", "-1"]).await
  635. );
  636. }
  637. #[tokio::test]
  638. async fn lrem_4() {
  639. let c = create_connection();
  640. assert_eq!(
  641. Ok(Value::Integer(5)),
  642. run_command(
  643. &c,
  644. &["rpush", "mylist", "hello", "hello", "world", "hello", "hello"]
  645. )
  646. .await
  647. );
  648. assert_eq!(
  649. Ok(Value::Integer(4)),
  650. run_command(&c, &["lrem", "mylist", "100", "hello"]).await
  651. );
  652. assert_eq!(
  653. Ok(Value::Array(vec![Value::Blob("world".into()),])),
  654. run_command(&c, &["lrange", "mylist", "0", "-1"]).await
  655. );
  656. }
  657. #[tokio::test]
  658. async fn brpop_no_waiting() {
  659. let c = create_connection();
  660. assert_eq!(
  661. Ok(Value::Integer(5)),
  662. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
  663. );
  664. assert_eq!(
  665. Ok(Value::Array(vec![
  666. Value::Blob("foo".into()),
  667. Value::Blob("5".into()),
  668. ])),
  669. run_command(&c, &["brpop", "foo", "1"]).await
  670. );
  671. }
  672. #[tokio::test]
  673. async fn brpop_timeout() {
  674. let c = create_connection();
  675. let x = Instant::now();
  676. assert_eq!(
  677. Ok(Value::Null),
  678. run_command(&c, &["brpop", "foobar", "1"]).await
  679. );
  680. assert!(Instant::now() - x > Duration::from_millis(1000));
  681. }
  682. #[tokio::test]
  683. async fn brpop_wait_insert() {
  684. let c = create_connection();
  685. let x = Instant::now();
  686. // Query command that will block connection until some data is inserted
  687. // to foobar, foo, bar or the 5 seconds timeout happens.
  688. //
  689. // We are issuing the command, sleeping a little bit then adding data to
  690. // foobar, before actually waiting on the result.
  691. let waiting = run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]);
  692. // Sleep 1 second before inserting new data
  693. sleep(Duration::from_millis(1000)).await;
  694. assert_eq!(
  695. Ok(Value::Integer(5)),
  696. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
  697. );
  698. // Read the output of the first blpop command now.
  699. assert_eq!(
  700. Ok(Value::Array(vec![
  701. Value::Blob("foo".into()),
  702. Value::Blob("5".into()),
  703. ])),
  704. waiting.await
  705. );
  706. assert!(Instant::now() - x > Duration::from_millis(1000));
  707. assert!(Instant::now() - x < Duration::from_millis(5000));
  708. }
  709. #[tokio::test]
  710. async fn lindex() {
  711. let c = create_connection();
  712. assert_eq!(
  713. Ok(Value::Integer(5)),
  714. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await
  715. );
  716. assert_eq!(
  717. Ok(Value::Array(vec![
  718. Value::Blob("5".into()),
  719. Value::Blob("4".into()),
  720. Value::Blob("3".into()),
  721. Value::Blob("2".into()),
  722. Value::Blob("1".into()),
  723. ])),
  724. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  725. );
  726. assert_eq!(
  727. Ok(Value::Blob("5".into())),
  728. run_command(&c, &["lindex", "foo", "0"]).await
  729. );
  730. assert_eq!(
  731. Ok(Value::Blob("1".into())),
  732. run_command(&c, &["lindex", "foo", "-1"]).await
  733. );
  734. assert_eq!(
  735. Ok(Value::Null),
  736. run_command(&c, &["lindex", "foo", "-100"]).await
  737. );
  738. assert_eq!(
  739. Ok(Value::Null),
  740. run_command(&c, &["lindex", "foo", "100"]).await
  741. );
  742. }
  743. #[tokio::test]
  744. async fn linsert_syntax_err() {
  745. let c = create_connection();
  746. assert_eq!(
  747. Ok(Value::Integer(2)),
  748. run_command(&c, &["rpush", "foo", "hello", "world"]).await
  749. );
  750. assert_eq!(
  751. Err(Error::Syntax),
  752. run_command(&c, &["linsert", "foo", "beforex", "world", "there"]).await
  753. );
  754. }
  755. #[tokio::test]
  756. async fn linsert_before() {
  757. let c = create_connection();
  758. assert_eq!(
  759. Ok(Value::Integer(2)),
  760. run_command(&c, &["rpush", "foo", "hello", "world"]).await
  761. );
  762. assert_eq!(
  763. Ok(Value::Integer(3)),
  764. run_command(&c, &["linsert", "foo", "before", "world", "there"]).await
  765. );
  766. assert_eq!(
  767. Ok(Value::Array(vec![
  768. Value::Blob("hello".into()),
  769. Value::Blob("there".into()),
  770. Value::Blob("world".into()),
  771. ])),
  772. run_command(&c, &["lrange", "foo", "0", "-1"]).await,
  773. );
  774. }
  775. #[tokio::test]
  776. async fn linsert_after() {
  777. let c = create_connection();
  778. assert_eq!(
  779. Ok(Value::Integer(2)),
  780. run_command(&c, &["rpush", "foo", "hello", "world"]).await
  781. );
  782. assert_eq!(
  783. Ok(Value::Integer(3)),
  784. run_command(&c, &["linsert", "foo", "after", "world", "there"]).await
  785. );
  786. assert_eq!(
  787. Ok(Value::Array(vec![
  788. Value::Blob("hello".into()),
  789. Value::Blob("world".into()),
  790. Value::Blob("there".into()),
  791. ])),
  792. run_command(&c, &["lrange", "foo", "0", "-1"]).await,
  793. );
  794. }
  795. #[tokio::test]
  796. async fn linsert_before_after() {
  797. let c = create_connection();
  798. assert_eq!(
  799. Ok(Value::Integer(2)),
  800. run_command(&c, &["rpush", "foo", "hello", "world"]).await
  801. );
  802. assert_eq!(
  803. Ok(Value::Integer(3)),
  804. run_command(&c, &["linsert", "foo", "after", "world", "there1"]).await
  805. );
  806. assert_eq!(
  807. Ok(Value::Integer(4)),
  808. run_command(&c, &["linsert", "foo", "before", "world", "there2"]).await
  809. );
  810. assert_eq!(
  811. Ok(Value::Array(vec![
  812. Value::Blob("hello".into()),
  813. Value::Blob("there2".into()),
  814. Value::Blob("world".into()),
  815. Value::Blob("there1".into()),
  816. ])),
  817. run_command(&c, &["lrange", "foo", "0", "-1"]).await,
  818. );
  819. }
  820. #[tokio::test]
  821. async fn linsert_not_found() {
  822. let c = create_connection();
  823. assert_eq!(
  824. Ok(Value::Integer(2)),
  825. run_command(&c, &["rpush", "foo", "hello", "world"]).await
  826. );
  827. assert_eq!(
  828. Ok(Value::Integer(-1)),
  829. run_command(&c, &["linsert", "foo", "after", "worldx", "there"]).await
  830. );
  831. assert_eq!(
  832. Ok(Value::Integer(-1)),
  833. run_command(&c, &["linsert", "foo", "before", "worldx", "there"]).await
  834. );
  835. }
  836. #[tokio::test]
  837. async fn llen() {
  838. let c = create_connection();
  839. assert_eq!(
  840. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  841. run_command(&c, &["llen", "foo"]).await
  842. );
  843. assert_eq!(
  844. Ok(Value::Integer(0)),
  845. run_command(&c, &["llen", "foobar"]).await
  846. );
  847. }
  848. #[tokio::test]
  849. async fn lmove_1() {
  850. let c = create_connection();
  851. assert_eq!(
  852. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
  853. run_command(&c, &["llen", "foo"]).await
  854. );
  855. assert_eq!(
  856. Ok(Value::Blob("1".into())),
  857. run_command(&c, &["lmove", "foo", "bar", "left", "left"]).await
  858. );
  859. assert_eq!(
  860. Ok(Value::Array(vec![Value::Blob("1".into()),])),
  861. run_command(&c, &["lrange", "bar", "0", "-1"]).await
  862. );
  863. assert_eq!(
  864. Ok(Value::Blob("5".into())),
  865. run_command(&c, &["lmove", "foo", "bar", "right", "left"]).await
  866. );
  867. assert_eq!(
  868. Ok(Value::Array(vec![
  869. Value::Blob("5".into()),
  870. Value::Blob("1".into()),
  871. ])),
  872. run_command(&c, &["lrange", "bar", "0", "-1"]).await
  873. );
  874. }
  875. #[tokio::test]
  876. async fn lpop() {
  877. let c = create_connection();
  878. assert_eq!(
  879. Ok(Value::Integer(5)),
  880. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  881. );
  882. assert_eq!(
  883. Ok(Value::Blob("5".into())),
  884. run_command(&c, &["lpop", "foo"]).await
  885. );
  886. assert_eq!(
  887. Ok(Value::Array(vec![Value::Blob("4".into())])),
  888. run_command(&c, &["lpop", "foo", "1"]).await
  889. );
  890. assert_eq!(
  891. Ok(Value::Array(vec![
  892. Value::Blob("3".into()),
  893. Value::Blob("2".into()),
  894. Value::Blob("1".into()),
  895. ])),
  896. run_command(&c, &["lpop", "foo", "55"]).await
  897. );
  898. assert_eq!(
  899. Ok(Value::Null),
  900. run_command(&c, &["lpop", "foo", "55"]).await
  901. );
  902. assert_eq!(Ok(Value::Null), run_command(&c, &["lpop", "foo"]).await);
  903. assert_eq!(
  904. Ok(Value::Integer(0)),
  905. run_command(&c, &["llen", "foobar"]).await
  906. );
  907. }
  908. #[tokio::test]
  909. async fn lpos_single_match() {
  910. let c = create_connection();
  911. assert_eq!(
  912. Ok(Value::Integer(11)),
  913. run_command(
  914. &c,
  915. &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
  916. )
  917. .await
  918. );
  919. assert_eq!(
  920. Ok(Value::Integer(6)),
  921. run_command(&c, &["lpos", "mylist", "3"]).await
  922. );
  923. }
  924. #[tokio::test]
  925. async fn lpos_single_skip() {
  926. let c = create_connection();
  927. assert_eq!(
  928. Ok(Value::Integer(11)),
  929. run_command(
  930. &c,
  931. &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
  932. )
  933. .await
  934. );
  935. assert_eq!(
  936. Ok(Value::Integer(8)),
  937. run_command(&c, &["lpos", "mylist", "3", "rank", "2"]).await
  938. );
  939. }
  940. #[tokio::test]
  941. async fn lpos_single_skip_max_len() {
  942. let c = create_connection();
  943. assert_eq!(
  944. Ok(Value::Integer(11)),
  945. run_command(
  946. &c,
  947. &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
  948. )
  949. .await
  950. );
  951. assert_eq!(
  952. Ok(Value::Null),
  953. run_command(&c, &["lpos", "mylist", "3", "rank", "2", "maxlen", "7"]).await
  954. );
  955. }
  956. #[tokio::test]
  957. async fn lpos_not_found() {
  958. let c = create_connection();
  959. assert_eq!(
  960. Ok(Value::Array(vec![])),
  961. run_command(&c, &["lpos", "mylist", "3", "count", "5", "maxlen", "9"]).await
  962. );
  963. assert_eq!(
  964. Ok(Value::Null),
  965. run_command(&c, &["lpos", "mylist", "3"]).await
  966. );
  967. }
  968. #[tokio::test]
  969. async fn lpos() {
  970. let c = create_connection();
  971. assert_eq!(
  972. Ok(Value::Integer(11)),
  973. run_command(
  974. &c,
  975. &["RPUSH", "mylist", "a", "b", "c", "d", "1", "2", "3", "4", "3", "3", "3"]
  976. )
  977. .await
  978. );
  979. assert_eq!(
  980. Ok(Value::Array(vec![
  981. Value::Integer(6),
  982. Value::Integer(8),
  983. Value::Integer(9),
  984. ])),
  985. run_command(&c, &["lpos", "mylist", "3", "count", "5", "maxlen", "9"]).await
  986. );
  987. }
  988. #[tokio::test]
  989. async fn lpush() {
  990. let c = create_connection();
  991. assert_eq!(
  992. Ok(Value::Integer(5)),
  993. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await
  994. );
  995. assert_eq!(
  996. Ok(Value::Array(vec![
  997. Value::Blob("5".into()),
  998. Value::Blob("4".into()),
  999. Value::Blob("3".into()),
  1000. Value::Blob("2".into()),
  1001. Value::Blob("1".into()),
  1002. ])),
  1003. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1004. );
  1005. assert_eq!(
  1006. Ok(Value::Integer(10)),
  1007. run_command(&c, &["lpush", "foo", "6", "7", "8", "9", "10"]).await
  1008. );
  1009. assert_eq!(
  1010. Ok(Value::Array(vec![
  1011. Value::Blob("10".into()),
  1012. Value::Blob("9".into()),
  1013. Value::Blob("8".into()),
  1014. Value::Blob("7".into()),
  1015. Value::Blob("6".into()),
  1016. Value::Blob("5".into()),
  1017. Value::Blob("4".into()),
  1018. Value::Blob("3".into()),
  1019. Value::Blob("2".into()),
  1020. Value::Blob("1".into()),
  1021. ])),
  1022. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1023. );
  1024. }
  1025. #[tokio::test]
  1026. async fn lpush_simple() {
  1027. let c = create_connection();
  1028. assert_eq!(
  1029. Ok(Value::Integer(1)),
  1030. run_command(&c, &["lpush", "foo", "world"]).await
  1031. );
  1032. assert_eq!(
  1033. Ok(Value::Integer(2)),
  1034. run_command(&c, &["lpush", "foo", "hello"]).await
  1035. );
  1036. assert_eq!(
  1037. Ok(Value::Array(vec![
  1038. Value::Blob("hello".into()),
  1039. Value::Blob("world".into()),
  1040. ])),
  1041. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1042. );
  1043. }
  1044. #[tokio::test]
  1045. async fn lset() {
  1046. let c = create_connection();
  1047. assert_eq!(
  1048. Ok(Value::Integer(5)),
  1049. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  1050. );
  1051. assert_eq!(
  1052. Ok(Value::Ok),
  1053. run_command(&c, &["lset", "foo", "-1", "6"]).await,
  1054. );
  1055. assert_eq!(
  1056. Ok(Value::Ok),
  1057. run_command(&c, &["lset", "foo", "-2", "7"]).await,
  1058. );
  1059. assert_eq!(
  1060. Ok(Value::Ok),
  1061. run_command(&c, &["lset", "foo", "0", "8"]).await,
  1062. );
  1063. assert_eq!(
  1064. Err(Error::OutOfRange),
  1065. run_command(&c, &["lset", "foo", "55", "8"]).await,
  1066. );
  1067. assert_eq!(
  1068. Err(Error::OutOfRange),
  1069. run_command(&c, &["lset", "foo", "-55", "8"]).await,
  1070. );
  1071. assert_eq!(
  1072. Err(Error::NotFound),
  1073. run_command(&c, &["lset", "key_not_exists", "-55", "8"]).await,
  1074. );
  1075. assert_eq!(
  1076. Ok(Value::Blob("6".into())),
  1077. run_command(&c, &["rpop", "foo"]).await
  1078. );
  1079. assert_eq!(
  1080. Ok(Value::Blob("7".into())),
  1081. run_command(&c, &["rpop", "foo"]).await
  1082. );
  1083. assert_eq!(
  1084. Ok(Value::Blob("8".into())),
  1085. run_command(&c, &["lpop", "foo"]).await
  1086. );
  1087. }
  1088. #[tokio::test]
  1089. async fn ltrim() {
  1090. let c = create_connection();
  1091. assert_eq!(
  1092. Ok(Value::Integer(5)),
  1093. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  1094. );
  1095. assert_eq!(
  1096. Ok(Value::Ok),
  1097. run_command(&c, &["ltrim", "foo", "1", "-2"]).await
  1098. );
  1099. assert_eq!(
  1100. Ok(Value::Array(vec![
  1101. Value::Blob("2".into()),
  1102. Value::Blob("3".into()),
  1103. Value::Blob("4".into()),
  1104. ])),
  1105. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1106. );
  1107. }
  1108. #[tokio::test]
  1109. async fn rpop() {
  1110. let c = create_connection();
  1111. assert_eq!(
  1112. Ok(Value::Integer(5)),
  1113. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  1114. );
  1115. assert_eq!(
  1116. Ok(Value::Blob("5".into())),
  1117. run_command(&c, &["rpop", "foo"]).await
  1118. );
  1119. assert_eq!(
  1120. Ok(Value::Array(vec![Value::Blob("4".into())])),
  1121. run_command(&c, &["rpop", "foo", "1"]).await
  1122. );
  1123. assert_eq!(
  1124. Ok(Value::Array(vec![
  1125. Value::Blob("3".into()),
  1126. Value::Blob("2".into()),
  1127. Value::Blob("1".into()),
  1128. ])),
  1129. run_command(&c, &["rpop", "foo", "55"]).await
  1130. );
  1131. assert_eq!(
  1132. Ok(Value::Null),
  1133. run_command(&c, &["rpop", "foo", "55"]).await
  1134. );
  1135. assert_eq!(Ok(Value::Null), run_command(&c, &["rpop", "foo"]).await);
  1136. assert_eq!(
  1137. Ok(Value::Integer(0)),
  1138. run_command(&c, &["llen", "foobar"]).await
  1139. );
  1140. }
  1141. #[tokio::test]
  1142. async fn rpush_simple() {
  1143. let c = create_connection();
  1144. assert_eq!(
  1145. Ok(Value::Integer(1)),
  1146. run_command(&c, &["rpush", "foo", "world"]).await
  1147. );
  1148. assert_eq!(
  1149. Ok(Value::Integer(2)),
  1150. run_command(&c, &["rpush", "foo", "hello"]).await
  1151. );
  1152. assert_eq!(
  1153. Ok(Value::Array(vec![
  1154. Value::Blob("world".into()),
  1155. Value::Blob("hello".into()),
  1156. ])),
  1157. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1158. );
  1159. }
  1160. #[tokio::test]
  1161. async fn lrange() {
  1162. let c = create_connection();
  1163. assert_eq!(
  1164. Ok(Value::Integer(5)),
  1165. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  1166. );
  1167. assert_eq!(
  1168. Ok(Value::Array(vec![
  1169. Value::Blob("1".into()),
  1170. Value::Blob("2".into()),
  1171. Value::Blob("3".into()),
  1172. Value::Blob("4".into()),
  1173. Value::Blob("5".into()),
  1174. ])),
  1175. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1176. );
  1177. assert_eq!(
  1178. Ok(Value::Array(vec![
  1179. Value::Blob("1".into()),
  1180. Value::Blob("2".into()),
  1181. Value::Blob("3".into()),
  1182. Value::Blob("4".into()),
  1183. ])),
  1184. run_command(&c, &["lrange", "foo", "0", "-2"]).await
  1185. );
  1186. assert_eq!(
  1187. Ok(Value::Array(vec![
  1188. Value::Blob("4".into()),
  1189. Value::Blob("5".into()),
  1190. ])),
  1191. run_command(&c, &["lrange", "foo", "-2", "-1"]).await
  1192. );
  1193. assert_eq!(
  1194. Ok(Value::Array(vec![Value::Blob("3".into()),])),
  1195. run_command(&c, &["lrange", "foo", "-3", "-3"]).await
  1196. );
  1197. }
  1198. #[tokio::test]
  1199. async fn rpush() {
  1200. let c = create_connection();
  1201. assert_eq!(
  1202. Ok(Value::Integer(5)),
  1203. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  1204. );
  1205. assert_eq!(
  1206. Ok(Value::Array(vec![
  1207. Value::Blob("1".into()),
  1208. Value::Blob("2".into()),
  1209. Value::Blob("3".into()),
  1210. Value::Blob("4".into()),
  1211. Value::Blob("5".into()),
  1212. ])),
  1213. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1214. );
  1215. assert_eq!(
  1216. Ok(Value::Integer(10)),
  1217. run_command(&c, &["rpush", "foo", "6", "7", "8", "9", "10"]).await
  1218. );
  1219. assert_eq!(
  1220. Ok(Value::Array(vec![
  1221. Value::Blob("1".into()),
  1222. Value::Blob("2".into()),
  1223. Value::Blob("3".into()),
  1224. Value::Blob("4".into()),
  1225. Value::Blob("5".into()),
  1226. Value::Blob("6".into()),
  1227. Value::Blob("7".into()),
  1228. Value::Blob("8".into()),
  1229. Value::Blob("9".into()),
  1230. Value::Blob("10".into()),
  1231. ])),
  1232. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1233. );
  1234. }
  1235. #[tokio::test]
  1236. async fn rpushx() {
  1237. let c = create_connection();
  1238. assert_eq!(
  1239. Ok(Value::Integer(5)),
  1240. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  1241. );
  1242. assert_eq!(
  1243. Ok(Value::Array(vec![
  1244. Value::Blob("1".into()),
  1245. Value::Blob("2".into()),
  1246. Value::Blob("3".into()),
  1247. Value::Blob("4".into()),
  1248. Value::Blob("5".into()),
  1249. ])),
  1250. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1251. );
  1252. assert_eq!(
  1253. Ok(Value::Integer(10)),
  1254. run_command(&c, &["rpushx", "foo", "6", "7", "8", "9", "10"]).await
  1255. );
  1256. assert_eq!(
  1257. Ok(Value::Array(vec![
  1258. Value::Blob("1".into()),
  1259. Value::Blob("2".into()),
  1260. Value::Blob("3".into()),
  1261. Value::Blob("4".into()),
  1262. Value::Blob("5".into()),
  1263. Value::Blob("6".into()),
  1264. Value::Blob("7".into()),
  1265. Value::Blob("8".into()),
  1266. Value::Blob("9".into()),
  1267. Value::Blob("10".into()),
  1268. ])),
  1269. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  1270. );
  1271. assert_eq!(
  1272. Ok(Value::Integer(0)),
  1273. run_command(&c, &["rpushx", "foobar", "6", "7", "8", "9", "10"]).await
  1274. );
  1275. }
  1276. }