list.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. use crate::{
  2. check_arg, connection::Connection, error::Error, value::bytes_to_number, value::checksum,
  3. value::Value,
  4. };
  5. use bytes::Bytes;
  6. use std::collections::VecDeque;
  7. use tokio::time::{sleep, Duration, Instant};
  8. #[allow(clippy::needless_range_loop)]
  9. fn remove_element(
  10. conn: &Connection,
  11. key: &Bytes,
  12. count: usize,
  13. front: bool,
  14. ) -> Result<Value, Error> {
  15. conn.db().get_map_or(
  16. key,
  17. |v| match v {
  18. Value::List(x) => {
  19. let mut x = x.write();
  20. if count == 0 {
  21. // Return a single element
  22. return Ok((if front { x.pop_front() } else { x.pop_back() })
  23. .map_or(Value::Null, |x| x.clone_value()));
  24. }
  25. let mut ret = vec![None; count];
  26. for i in 0..count {
  27. if front {
  28. ret[i] = x.pop_front();
  29. } else {
  30. ret[i] = x.pop_back();
  31. }
  32. }
  33. let ret: Vec<Value> = ret
  34. .iter()
  35. .filter(|v| v.is_some())
  36. .map(|x| x.as_ref().unwrap().clone_value())
  37. .collect();
  38. Ok(if ret.is_empty() {
  39. Value::Null
  40. } else {
  41. ret.into()
  42. })
  43. }
  44. _ => Err(Error::WrongType),
  45. },
  46. || Ok(Value::Null),
  47. )
  48. }
  49. pub async fn blpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  50. let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
  51. let len = args.len() - 1;
  52. loop {
  53. for key in args[1..len].iter() {
  54. match remove_element(conn, key, 0, true)? {
  55. Value::Null => (),
  56. n => return Ok(vec![Value::Blob(key.clone()), n].into()),
  57. };
  58. }
  59. if Instant::now() >= timeout {
  60. break;
  61. }
  62. sleep(Duration::from_millis(100)).await;
  63. }
  64. Ok(Value::Null)
  65. }
  66. pub async fn brpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  67. let timeout = Instant::now() + Duration::from_secs(bytes_to_number(&args[args.len() - 1])?);
  68. let len = args.len() - 1;
  69. loop {
  70. for key in args[1..len].iter() {
  71. match remove_element(conn, key, 0, false)? {
  72. Value::Null => (),
  73. n => return Ok(vec![Value::Blob(key.clone()), n].into()),
  74. };
  75. }
  76. if Instant::now() >= timeout {
  77. break;
  78. }
  79. sleep(Duration::from_millis(100)).await;
  80. }
  81. Ok(Value::Null)
  82. }
  83. pub async fn lindex(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  84. conn.db().get_map_or(
  85. &args[1],
  86. |v| match v {
  87. Value::List(x) => {
  88. let mut index: i64 = bytes_to_number(&args[2])?;
  89. let x = x.read();
  90. if index < 0 {
  91. index += x.len() as i64;
  92. }
  93. Ok(x.get(index as usize)
  94. .map_or(Value::Null, |x| x.clone_value()))
  95. }
  96. _ => Err(Error::WrongType),
  97. },
  98. || Ok(0.into()),
  99. )
  100. }
  101. pub async fn llen(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  102. conn.db().get_map_or(
  103. &args[1],
  104. |v| match v {
  105. Value::List(x) => Ok((x.read().len() as i64).into()),
  106. _ => Err(Error::WrongType),
  107. },
  108. || Ok(0.into()),
  109. )
  110. }
  111. pub async fn lpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  112. let count = if args.len() > 2 {
  113. bytes_to_number(&args[2])?
  114. } else {
  115. 0
  116. };
  117. remove_element(conn, &args[1], count, true)
  118. }
  119. pub async fn lpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  120. let is_push_x = check_arg!(args, 0, "LPUSHX");
  121. conn.db().get_map_or(
  122. &args[1],
  123. |v| match v {
  124. Value::List(x) => {
  125. let mut x = x.write();
  126. for val in args.iter().skip(2) {
  127. x.push_front(checksum::Value::new(val.clone()));
  128. }
  129. Ok((x.len() as i64).into())
  130. }
  131. _ => Err(Error::WrongType),
  132. },
  133. || {
  134. if is_push_x {
  135. return Ok(0.into());
  136. }
  137. let mut h = VecDeque::new();
  138. for val in args.iter().skip(2) {
  139. h.push_front(checksum::Value::new(val.clone()));
  140. }
  141. let len = h.len() as i64;
  142. conn.db().set(&args[1], h.into(), None);
  143. Ok(len.into())
  144. },
  145. )
  146. }
  147. pub async fn lrange(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  148. conn.db().get_map_or(
  149. &args[1],
  150. |v| match v {
  151. Value::List(x) => {
  152. let mut start: i64 = bytes_to_number(&args[2])?;
  153. let mut end: i64 = bytes_to_number(&args[3])?;
  154. let mut ret = vec![];
  155. let x = x.read();
  156. if start < 0 {
  157. start += x.len() as i64;
  158. }
  159. if end < 0 {
  160. end += x.len() as i64;
  161. }
  162. for (i, val) in x.iter().enumerate() {
  163. if i >= start as usize && i <= end as usize {
  164. ret.push(val.clone_value());
  165. }
  166. }
  167. Ok(ret.into())
  168. }
  169. _ => Err(Error::WrongType),
  170. },
  171. || Ok(Value::Array(vec![])),
  172. )
  173. }
  174. pub async fn rpop(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  175. let count = if args.len() > 2 {
  176. bytes_to_number(&args[2])?
  177. } else {
  178. 0
  179. };
  180. remove_element(conn, &args[1], count, false)
  181. }
  182. pub async fn rpush(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  183. let is_push_x = check_arg!(args, 0, "RPUSHX");
  184. conn.db().get_map_or(
  185. &args[1],
  186. |v| match v {
  187. Value::List(x) => {
  188. let mut x = x.write();
  189. for val in args.iter().skip(2) {
  190. x.push_back(checksum::Value::new(val.clone()));
  191. }
  192. Ok((x.len() as i64).into())
  193. }
  194. _ => Err(Error::WrongType),
  195. },
  196. || {
  197. if is_push_x {
  198. return Ok(0.into());
  199. }
  200. let mut h = VecDeque::new();
  201. for val in args.iter().skip(2) {
  202. h.push_back(checksum::Value::new(val.clone()));
  203. }
  204. let len = h.len() as i64;
  205. conn.db().set(&args[1], h.into(), None);
  206. Ok(len.into())
  207. },
  208. )
  209. }
  210. #[cfg(test)]
  211. mod test {
  212. use crate::{
  213. cmd::test::{create_connection, run_command},
  214. value::Value,
  215. };
  216. use tokio::time::{sleep, Duration, Instant};
  217. #[tokio::test]
  218. async fn blpop_no_waiting() {
  219. let c = create_connection();
  220. assert_eq!(
  221. Ok(Value::Integer(5)),
  222. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  223. );
  224. assert_eq!(
  225. Ok(Value::Array(vec![
  226. Value::Blob("foo".into()),
  227. Value::Blob("5".into()),
  228. ])),
  229. run_command(&c, &["blpop", "foo", "1"]).await
  230. );
  231. }
  232. #[tokio::test]
  233. async fn blpop_timeout() {
  234. let c = create_connection();
  235. let x = Instant::now();
  236. assert_eq!(
  237. Ok(Value::Null),
  238. run_command(&c, &["blpop", "foobar", "1"]).await
  239. );
  240. assert!(Instant::now() - x > Duration::from_millis(1000));
  241. }
  242. #[tokio::test]
  243. async fn blpop_wait_insert() {
  244. let c = create_connection();
  245. let x = Instant::now();
  246. // Query command that will block connection until some data is inserted
  247. // to foobar, foo, bar or the 5 seconds timeout happens.
  248. //
  249. // We are issuing the command, sleeping a little bit then adding data to
  250. // foobar, before actually waiting on the result.
  251. let waiting = run_command(&c, &["blpop", "foobar", "foo", "bar", "5"]);
  252. // Sleep 1 second before inserting new data
  253. sleep(Duration::from_millis(1000)).await;
  254. assert_eq!(
  255. Ok(Value::Integer(5)),
  256. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  257. );
  258. // Read the output of the first blpop command now.
  259. assert_eq!(
  260. Ok(Value::Array(vec![
  261. Value::Blob("foo".into()),
  262. Value::Blob("5".into()),
  263. ])),
  264. waiting.await
  265. );
  266. assert!(Instant::now() - x > Duration::from_millis(1000));
  267. assert!(Instant::now() - x < Duration::from_millis(5000));
  268. }
  269. #[tokio::test]
  270. async fn brpop_no_waiting() {
  271. let c = create_connection();
  272. assert_eq!(
  273. Ok(Value::Integer(5)),
  274. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
  275. );
  276. assert_eq!(
  277. Ok(Value::Array(vec![
  278. Value::Blob("foo".into()),
  279. Value::Blob("5".into()),
  280. ])),
  281. run_command(&c, &["brpop", "foo", "1"]).await
  282. );
  283. }
  284. #[tokio::test]
  285. async fn brpop_timeout() {
  286. let c = create_connection();
  287. let x = Instant::now();
  288. assert_eq!(
  289. Ok(Value::Null),
  290. run_command(&c, &["brpop", "foobar", "1"]).await
  291. );
  292. assert!(Instant::now() - x > Duration::from_millis(1000));
  293. }
  294. #[tokio::test]
  295. async fn brpop_wait_insert() {
  296. let c = create_connection();
  297. let x = Instant::now();
  298. // Query command that will block connection until some data is inserted
  299. // to foobar, foo, bar or the 5 seconds timeout happens.
  300. //
  301. // We are issuing the command, sleeping a little bit then adding data to
  302. // foobar, before actually waiting on the result.
  303. let waiting = run_command(&c, &["brpop", "foobar", "foo", "bar", "5"]);
  304. // Sleep 1 second before inserting new data
  305. sleep(Duration::from_millis(1000)).await;
  306. assert_eq!(
  307. Ok(Value::Integer(5)),
  308. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await,
  309. );
  310. // Read the output of the first blpop command now.
  311. assert_eq!(
  312. Ok(Value::Array(vec![
  313. Value::Blob("foo".into()),
  314. Value::Blob("5".into()),
  315. ])),
  316. waiting.await
  317. );
  318. assert!(Instant::now() - x > Duration::from_millis(1000));
  319. assert!(Instant::now() - x < Duration::from_millis(5000));
  320. }
  321. #[tokio::test]
  322. async fn lindex() {
  323. let c = create_connection();
  324. assert_eq!(
  325. Ok(Value::Integer(5)),
  326. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await
  327. );
  328. assert_eq!(
  329. Ok(Value::Array(vec![
  330. Value::Blob("5".into()),
  331. Value::Blob("4".into()),
  332. Value::Blob("3".into()),
  333. Value::Blob("2".into()),
  334. Value::Blob("1".into()),
  335. ])),
  336. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  337. );
  338. assert_eq!(
  339. Ok(Value::Blob("5".into())),
  340. run_command(&c, &["lindex", "foo", "0"]).await
  341. );
  342. assert_eq!(
  343. Ok(Value::Blob("1".into())),
  344. run_command(&c, &["lindex", "foo", "-1"]).await
  345. );
  346. assert_eq!(
  347. Ok(Value::Null),
  348. run_command(&c, &["lindex", "foo", "-100"]).await
  349. );
  350. assert_eq!(
  351. Ok(Value::Null),
  352. run_command(&c, &["lindex", "foo", "100"]).await
  353. );
  354. }
  355. #[tokio::test]
  356. async fn llen() {
  357. let c = create_connection();
  358. assert_eq!(
  359. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  360. run_command(&c, &["llen", "foo"]).await
  361. );
  362. assert_eq!(
  363. Ok(Value::Integer(0)),
  364. run_command(&c, &["llen", "foobar"]).await
  365. );
  366. }
  367. #[tokio::test]
  368. async fn lpop() {
  369. let c = create_connection();
  370. assert_eq!(
  371. Ok(Value::Integer(5)),
  372. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await,
  373. );
  374. assert_eq!(
  375. Ok(Value::Blob("5".into())),
  376. run_command(&c, &["lpop", "foo"]).await
  377. );
  378. assert_eq!(
  379. Ok(Value::Array(vec![Value::Blob("4".into())])),
  380. run_command(&c, &["lpop", "foo", "1"]).await
  381. );
  382. assert_eq!(
  383. Ok(Value::Array(vec![
  384. Value::Blob("3".into()),
  385. Value::Blob("2".into()),
  386. Value::Blob("1".into()),
  387. ])),
  388. run_command(&c, &["lpop", "foo", "55"]).await
  389. );
  390. assert_eq!(
  391. Ok(Value::Null),
  392. run_command(&c, &["lpop", "foo", "55"]).await
  393. );
  394. assert_eq!(Ok(Value::Null), run_command(&c, &["lpop", "foo"]).await);
  395. assert_eq!(
  396. Ok(Value::Integer(0)),
  397. run_command(&c, &["llen", "foobar"]).await
  398. );
  399. }
  400. #[tokio::test]
  401. async fn lpush() {
  402. let c = create_connection();
  403. assert_eq!(
  404. Ok(Value::Integer(5)),
  405. run_command(&c, &["lpush", "foo", "1", "2", "3", "4", "5"]).await
  406. );
  407. assert_eq!(
  408. Ok(Value::Array(vec![
  409. Value::Blob("5".into()),
  410. Value::Blob("4".into()),
  411. Value::Blob("3".into()),
  412. Value::Blob("2".into()),
  413. Value::Blob("1".into()),
  414. ])),
  415. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  416. );
  417. assert_eq!(
  418. Ok(Value::Integer(10)),
  419. run_command(&c, &["lpush", "foo", "6", "7", "8", "9", "10"]).await
  420. );
  421. assert_eq!(
  422. Ok(Value::Array(vec![
  423. Value::Blob("10".into()),
  424. Value::Blob("9".into()),
  425. Value::Blob("8".into()),
  426. Value::Blob("7".into()),
  427. Value::Blob("6".into()),
  428. Value::Blob("5".into()),
  429. Value::Blob("4".into()),
  430. Value::Blob("3".into()),
  431. Value::Blob("2".into()),
  432. Value::Blob("1".into()),
  433. ])),
  434. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  435. );
  436. }
  437. #[tokio::test]
  438. async fn lpush_simple() {
  439. let c = create_connection();
  440. assert_eq!(
  441. Ok(Value::Integer(1)),
  442. run_command(&c, &["lpush", "foo", "world"]).await
  443. );
  444. assert_eq!(
  445. Ok(Value::Integer(2)),
  446. run_command(&c, &["lpush", "foo", "hello"]).await
  447. );
  448. assert_eq!(
  449. Ok(Value::Array(vec![
  450. Value::Blob("hello".into()),
  451. Value::Blob("world".into()),
  452. ])),
  453. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  454. );
  455. }
  456. #[tokio::test]
  457. async fn rpop() {
  458. let c = create_connection();
  459. assert_eq!(
  460. Ok(Value::Integer(5)),
  461. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  462. );
  463. assert_eq!(
  464. Ok(Value::Blob("5".into())),
  465. run_command(&c, &["rpop", "foo"]).await
  466. );
  467. assert_eq!(
  468. Ok(Value::Array(vec![Value::Blob("4".into())])),
  469. run_command(&c, &["rpop", "foo", "1"]).await
  470. );
  471. assert_eq!(
  472. Ok(Value::Array(vec![
  473. Value::Blob("3".into()),
  474. Value::Blob("2".into()),
  475. Value::Blob("1".into()),
  476. ])),
  477. run_command(&c, &["rpop", "foo", "55"]).await
  478. );
  479. assert_eq!(
  480. Ok(Value::Null),
  481. run_command(&c, &["rpop", "foo", "55"]).await
  482. );
  483. assert_eq!(Ok(Value::Null), run_command(&c, &["rpop", "foo"]).await);
  484. assert_eq!(
  485. Ok(Value::Integer(0)),
  486. run_command(&c, &["llen", "foobar"]).await
  487. );
  488. }
  489. #[tokio::test]
  490. async fn rpush_simple() {
  491. let c = create_connection();
  492. assert_eq!(
  493. Ok(Value::Integer(1)),
  494. run_command(&c, &["rpush", "foo", "world"]).await
  495. );
  496. assert_eq!(
  497. Ok(Value::Integer(2)),
  498. run_command(&c, &["rpush", "foo", "hello"]).await
  499. );
  500. assert_eq!(
  501. Ok(Value::Array(vec![
  502. Value::Blob("world".into()),
  503. Value::Blob("hello".into()),
  504. ])),
  505. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  506. );
  507. }
  508. #[tokio::test]
  509. async fn lrange() {
  510. let c = create_connection();
  511. assert_eq!(
  512. Ok(Value::Integer(5)),
  513. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  514. );
  515. assert_eq!(
  516. Ok(Value::Array(vec![
  517. Value::Blob("1".into()),
  518. Value::Blob("2".into()),
  519. Value::Blob("3".into()),
  520. Value::Blob("4".into()),
  521. Value::Blob("5".into()),
  522. ])),
  523. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  524. );
  525. assert_eq!(
  526. Ok(Value::Array(vec![
  527. Value::Blob("1".into()),
  528. Value::Blob("2".into()),
  529. Value::Blob("3".into()),
  530. Value::Blob("4".into()),
  531. ])),
  532. run_command(&c, &["lrange", "foo", "0", "-2"]).await
  533. );
  534. assert_eq!(
  535. Ok(Value::Array(vec![
  536. Value::Blob("4".into()),
  537. Value::Blob("5".into()),
  538. ])),
  539. run_command(&c, &["lrange", "foo", "-2", "-1"]).await
  540. );
  541. assert_eq!(
  542. Ok(Value::Array(vec![Value::Blob("3".into()),])),
  543. run_command(&c, &["lrange", "foo", "-3", "-3"]).await
  544. );
  545. }
  546. #[tokio::test]
  547. async fn rpush() {
  548. let c = create_connection();
  549. assert_eq!(
  550. Ok(Value::Integer(5)),
  551. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  552. );
  553. assert_eq!(
  554. Ok(Value::Array(vec![
  555. Value::Blob("1".into()),
  556. Value::Blob("2".into()),
  557. Value::Blob("3".into()),
  558. Value::Blob("4".into()),
  559. Value::Blob("5".into()),
  560. ])),
  561. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  562. );
  563. assert_eq!(
  564. Ok(Value::Integer(10)),
  565. run_command(&c, &["rpush", "foo", "6", "7", "8", "9", "10"]).await
  566. );
  567. assert_eq!(
  568. Ok(Value::Array(vec![
  569. Value::Blob("1".into()),
  570. Value::Blob("2".into()),
  571. Value::Blob("3".into()),
  572. Value::Blob("4".into()),
  573. Value::Blob("5".into()),
  574. Value::Blob("6".into()),
  575. Value::Blob("7".into()),
  576. Value::Blob("8".into()),
  577. Value::Blob("9".into()),
  578. Value::Blob("10".into()),
  579. ])),
  580. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  581. );
  582. }
  583. #[tokio::test]
  584. async fn rpushx() {
  585. let c = create_connection();
  586. assert_eq!(
  587. Ok(Value::Integer(5)),
  588. run_command(&c, &["rpush", "foo", "1", "2", "3", "4", "5"]).await
  589. );
  590. assert_eq!(
  591. Ok(Value::Array(vec![
  592. Value::Blob("1".into()),
  593. Value::Blob("2".into()),
  594. Value::Blob("3".into()),
  595. Value::Blob("4".into()),
  596. Value::Blob("5".into()),
  597. ])),
  598. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  599. );
  600. assert_eq!(
  601. Ok(Value::Integer(10)),
  602. run_command(&c, &["rpushx", "foo", "6", "7", "8", "9", "10"]).await
  603. );
  604. assert_eq!(
  605. Ok(Value::Array(vec![
  606. Value::Blob("1".into()),
  607. Value::Blob("2".into()),
  608. Value::Blob("3".into()),
  609. Value::Blob("4".into()),
  610. Value::Blob("5".into()),
  611. Value::Blob("6".into()),
  612. Value::Blob("7".into()),
  613. Value::Blob("8".into()),
  614. Value::Blob("9".into()),
  615. Value::Blob("10".into()),
  616. ])),
  617. run_command(&c, &["lrange", "foo", "0", "-1"]).await
  618. );
  619. assert_eq!(
  620. Ok(Value::Integer(0)),
  621. run_command(&c, &["rpushx", "foobar", "6", "7", "8", "9", "10"]).await
  622. );
  623. }
  624. }