key.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. //! # Key-related command handlers
  2. use super::now;
  3. use crate::{
  4. check_arg,
  5. connection::Connection,
  6. db::scan::Scan,
  7. error::Error,
  8. value::{bytes_to_number, cursor::Cursor, typ::Typ, Value},
  9. };
  10. use bytes::Bytes;
  11. use std::{
  12. convert::TryInto,
  13. str::FromStr,
  14. time::{SystemTime, UNIX_EPOCH},
  15. };
  16. use tokio::time::{Duration, Instant};
  17. /// This command copies the value stored at the source key to the destination
  18. /// key.
  19. ///
  20. /// By default, the destination key is created in the logical database used by
  21. /// the connection. The DB option allows specifying an alternative logical
  22. /// database index for the destination key.
  23. ///
  24. /// The command returns an error when the destination key already exists. The
  25. /// REPLACE option removes the destination key before copying the value to it.
  26. pub async fn copy(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  27. let mut skip = 3;
  28. let target_db = if args.len() > 4 && check_arg!(args, 3, "DB") {
  29. skip += 2;
  30. Some(
  31. conn.all_connections()
  32. .get_databases()
  33. .get(bytes_to_number(&args[4])?)?
  34. .clone(),
  35. )
  36. } else {
  37. None
  38. };
  39. let replace = match args
  40. .get(skip)
  41. .map(|m| String::from_utf8_lossy(m).to_uppercase())
  42. {
  43. Some(value) => {
  44. if value == "REPLACE" {
  45. true
  46. } else {
  47. return Err(Error::Syntax);
  48. }
  49. }
  50. None => false,
  51. };
  52. let result = if conn
  53. .db()
  54. .copy(&args[1], &args[2], replace.into(), target_db)?
  55. {
  56. 1
  57. } else {
  58. 0
  59. };
  60. Ok(result.into())
  61. }
  62. /// Removes the specified keys. A key is ignored if it does not exist.
  63. pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  64. Ok(conn.db().del(&args[1..]))
  65. }
  66. /// Returns if key exists.
  67. pub async fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  68. Ok(conn.db().exists(&args[1..]).into())
  69. }
  70. /// Set a timeout on key. After the timeout has expired, the key will automatically be deleted. A
  71. /// key with an associated timeout is often said to be volatile in Redis terminology.
  72. ///
  73. /// The timeout will only be cleared by commands that delete or overwrite the contents of the key,
  74. /// including DEL, SET, GETSET and all the *STORE commands. This means that all the operations that
  75. /// conceptually alter the value stored at the key without replacing it with a new one will leave
  76. /// the timeout untouched. For instance, incrementing the value of a key with INCR, pushing a new
  77. /// value into a list with LPUSH, or altering the field value of a hash with HSET are all
  78. /// operations that will leave the timeout untouched.
  79. ///
  80. /// The timeout can also be cleared, turning the key back into a persistent key, using the PERSIST
  81. /// command.
  82. pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  83. let expires_in: i64 = bytes_to_number(&args[2])?;
  84. if expires_in <= 0 {
  85. // Delete key right away
  86. return Ok(conn.db().del(&args[1..2]));
  87. }
  88. let expires_in: u64 = expires_in as u64;
  89. let expires_at = if check_arg!(args, 0, "EXPIRE") {
  90. Duration::from_secs(expires_in)
  91. } else {
  92. Duration::from_millis(expires_in)
  93. };
  94. Ok(conn.db().set_ttl(&args[1], expires_at))
  95. }
  96. /// Returns the string representation of the type of the value stored at key.
  97. /// The different types that can be returned are: string, list, set, zset, hash
  98. /// and stream.
  99. pub async fn data_type(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  100. Ok(conn.db().get_data_type(&args[1]).into())
  101. }
  102. /// EXPIREAT has the same effect and semantic as EXPIRE, but instead of specifying the number of
  103. /// seconds representing the TTL (time to live), it takes an absolute Unix timestamp (seconds since
  104. /// January 1, 1970). A timestamp in the past will delete the key immediately.
  105. pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  106. let secs = check_arg!(args, 0, "EXPIREAT");
  107. let expires_at: i64 = bytes_to_number(&args[2])?;
  108. let expires_in: i64 = if secs {
  109. expires_at - now().as_secs() as i64
  110. } else {
  111. expires_at - now().as_millis() as i64
  112. };
  113. if expires_in <= 0 {
  114. // Delete key right away
  115. return Ok(conn.db().del(&args[1..2]));
  116. }
  117. let expires_in: u64 = expires_in as u64;
  118. let expires_at = if secs {
  119. Duration::from_secs(expires_in)
  120. } else {
  121. Duration::from_millis(expires_in)
  122. };
  123. Ok(conn.db().set_ttl(&args[1], expires_at))
  124. }
  125. /// Returns the absolute Unix timestamp (since January 1, 1970) in seconds at which the given key
  126. /// will expire.
  127. pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  128. let ttl = match conn.db().ttl(&args[1]) {
  129. Some(Some(ttl)) => {
  130. // Is there a better way? There should be!
  131. if check_arg!(args, 0, "EXPIRETIME") {
  132. let secs: i64 = (ttl - Instant::now()).as_secs() as i64;
  133. secs + (now().as_secs() as i64)
  134. } else {
  135. let secs: i64 = (ttl - Instant::now()).as_millis() as i64;
  136. secs + (now().as_millis() as i64)
  137. }
  138. }
  139. Some(None) => -1,
  140. None => -2,
  141. };
  142. Ok(ttl.into())
  143. }
  144. /// Returns all keys that matches a given pattern
  145. pub async fn keys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  146. Ok(conn.db().get_all_keys(&args[1])?.into())
  147. }
  148. /// Move key from the currently selected database (see SELECT) to the specified
  149. /// destination database. When key already exists in the destination database,
  150. /// or it does not exist in the source database, it does nothing. It is possible
  151. /// to use MOVE as a locking primitive because of this.
  152. pub async fn move_key(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  153. let target_db = conn
  154. .all_connections()
  155. .get_databases()
  156. .get(bytes_to_number(&args[2])?)?;
  157. Ok(if conn.db().move_key(&args[1], target_db)? {
  158. 1.into()
  159. } else {
  160. 0.into()
  161. })
  162. }
  163. /// Return information about the object/value stored in the database
  164. pub async fn object(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  165. let subcommand = String::from_utf8_lossy(&args[1]).to_lowercase();
  166. let expected_args = if subcommand == "help" { 2 } else { 3 };
  167. if expected_args != args.len() {
  168. return Err(Error::SubCommandNotFound(
  169. subcommand.into(),
  170. String::from_utf8_lossy(&args[0]).into(),
  171. ));
  172. }
  173. match subcommand.as_str() {
  174. "help" => super::help::object(),
  175. "refcount" => Ok(if conn.db().exists(&[args[2].clone()]) == 1 {
  176. 1.into()
  177. } else {
  178. Value::Null
  179. }),
  180. _ => Err(Error::SubCommandNotFound(
  181. subcommand.into(),
  182. String::from_utf8_lossy(&args[0]).into(),
  183. )),
  184. }
  185. }
  186. /// Renames key to newkey. It returns an error when key does not exist. If
  187. /// newkey already exists it is overwritten, when this happens RENAME executes
  188. /// an implicit DEL operation, so if the deleted key contains a very big value
  189. /// it may cause high latency even if RENAME itself is usually a constant-time
  190. /// operation.
  191. pub async fn rename(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  192. let is_rename = check_arg!(args, 0, "RENAME");
  193. if conn.db().rename(&args[1], &args[2], is_rename.into())? {
  194. Ok(if is_rename { Value::Ok } else { 1.into() })
  195. } else {
  196. Ok(0.into())
  197. }
  198. }
  199. /// SCAN is a cursor based iterator. This means that at every call of the
  200. /// command, the server returns an updated cursor that the user needs to use as
  201. /// the cursor argument in the next call.
  202. pub async fn scan(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  203. let cursor: Cursor = (&args[1]).try_into()?;
  204. let mut current = 2;
  205. let pattern = if check_arg!(args, current, "MATCH") {
  206. current += 2;
  207. Some(
  208. args.get(current - 1)
  209. .ok_or_else(|| Error::InvalidArgsCount("SCAN".to_owned()))?,
  210. )
  211. } else {
  212. None
  213. };
  214. let count = if check_arg!(args, current, "COUNT") {
  215. current += 2;
  216. let number: usize = bytes_to_number(
  217. args.get(current - 1)
  218. .ok_or_else(|| Error::InvalidArgsCount("SCAN".to_owned()))?,
  219. )?;
  220. Some(number)
  221. } else {
  222. None
  223. };
  224. let typ = if check_arg!(args, current, "TYPE") {
  225. current += 2;
  226. Some(
  227. Typ::from_str(&String::from_utf8_lossy(
  228. args.get(current - 1)
  229. .ok_or_else(|| Error::InvalidArgsCount("SCAN".to_owned()))?,
  230. ))
  231. .map_err(|_| Error::Syntax)?,
  232. )
  233. } else {
  234. None
  235. };
  236. if current != args.len() {
  237. return Err(Error::Syntax);
  238. }
  239. Ok(conn.db().scan(cursor, pattern, count, typ)?.into())
  240. }
  241. /// Returns the remaining time to live of a key that has a timeout. This introspection capability
  242. /// allows a Redis client to check how many seconds a given key will continue to be part of the
  243. /// dataset.
  244. pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  245. let ttl = match conn.db().ttl(&args[1]) {
  246. Some(Some(ttl)) => {
  247. let ttl = ttl - Instant::now();
  248. if check_arg!(args, 0, "TTL") {
  249. ttl.as_secs() as i64
  250. } else {
  251. ttl.as_millis() as i64
  252. }
  253. }
  254. Some(None) => -1,
  255. None => -2,
  256. };
  257. Ok(ttl.into())
  258. }
  259. /// Remove the existing timeout on key, turning the key from volatile (a key with an expire set) to
  260. /// persistent (a key that will never expire as no timeout is associated).
  261. pub async fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  262. Ok(conn.db().persist(&args[1]))
  263. }
  264. #[cfg(test)]
  265. mod test {
  266. use std::convert::TryInto;
  267. use crate::{
  268. cmd::test::{create_connection, run_command},
  269. error::Error,
  270. value::Value,
  271. };
  272. #[tokio::test]
  273. async fn del() {
  274. let c = create_connection();
  275. assert_eq!(
  276. Ok(Value::Integer(1)),
  277. run_command(&c, &["incr", "foo"]).await
  278. );
  279. assert_eq!(
  280. Ok(Value::Integer(1)),
  281. run_command(&c, &["exists", "foo"]).await
  282. );
  283. assert_eq!(
  284. Ok(Value::Integer(1)),
  285. run_command(&c, &["del", "foo"]).await
  286. );
  287. assert_eq!(
  288. Ok(Value::Integer(0)),
  289. run_command(&c, &["del", "foo"]).await
  290. );
  291. assert_eq!(
  292. Ok(Value::Integer(0)),
  293. run_command(&c, &["exists", "foo"]).await
  294. );
  295. }
  296. #[tokio::test]
  297. async fn _type() {
  298. let c = create_connection();
  299. assert_eq!(
  300. Ok(Value::Integer(1)),
  301. run_command(&c, &["incr", "foo"]).await
  302. );
  303. assert_eq!(
  304. Ok(Value::Integer(1)),
  305. run_command(&c, &["hset", "hash", "foo", "bar"]).await
  306. );
  307. assert_eq!(
  308. Ok(Value::Integer(2)),
  309. run_command(&c, &["sadd", "set", "foo", "bar"]).await
  310. );
  311. assert_eq!(Ok("set".into()), run_command(&c, &["type", "set"]).await);
  312. assert_eq!(Ok("hash".into()), run_command(&c, &["type", "hash"]).await);
  313. assert_eq!(Ok("string".into()), run_command(&c, &["type", "foo"]).await);
  314. }
  315. #[tokio::test]
  316. async fn expire_and_persist() {
  317. let c = create_connection();
  318. assert_eq!(
  319. Ok(Value::Integer(1)),
  320. run_command(&c, &["incr", "foo"]).await
  321. );
  322. assert_eq!(
  323. Ok(Value::Integer(1)),
  324. run_command(&c, &["pexpire", "foo", "6000"]).await
  325. );
  326. match run_command(&c, &["pttl", "foo"]).await {
  327. Ok(Value::Integer(n)) => {
  328. assert!(n < 6000 && n > 5900);
  329. }
  330. _ => unreachable!(),
  331. };
  332. assert_eq!(
  333. Ok(Value::Integer(1)),
  334. run_command(&c, &["persist", "foo"]).await
  335. );
  336. assert_eq!(
  337. Ok(Value::Integer(-1)),
  338. run_command(&c, &["pttl", "foo"]).await
  339. );
  340. assert_eq!(
  341. Ok(Value::Integer(1)),
  342. run_command(&c, &["del", "foo"]).await
  343. );
  344. assert_eq!(
  345. Ok(Value::Integer(-2)),
  346. run_command(&c, &["pttl", "foo"]).await
  347. );
  348. }
  349. #[tokio::test]
  350. async fn copy() {
  351. let c = create_connection();
  352. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  353. assert_eq!(Ok(1.into()), run_command(&c, &["copy", "foo", "bar"]).await);
  354. assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
  355. assert_eq!(Ok(0.into()), run_command(&c, &["copy", "foo", "bar"]).await);
  356. assert_eq!(
  357. Ok(Value::Array(vec!["2".into(), "1".into()])),
  358. run_command(&c, &["mget", "foo", "bar"]).await
  359. );
  360. assert_eq!(
  361. Ok(1.into()),
  362. run_command(&c, &["copy", "foo", "bar", "replace"]).await
  363. );
  364. assert_eq!(
  365. Ok(Value::Array(vec!["2".into(), "2".into()])),
  366. run_command(&c, &["mget", "foo", "bar"]).await
  367. );
  368. }
  369. #[tokio::test]
  370. async fn copy_different_db() {
  371. let c = create_connection();
  372. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  373. assert_eq!(
  374. Ok(1.into()),
  375. run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
  376. );
  377. assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
  378. assert_eq!(
  379. Ok(0.into()),
  380. run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
  381. );
  382. assert_eq!(
  383. Ok(Value::Array(vec!["2".into(), Value::Null])),
  384. run_command(&c, &["mget", "foo", "bar"]).await
  385. );
  386. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  387. assert_eq!(
  388. Ok(Value::Array(vec![Value::Null, "1".into()])),
  389. run_command(&c, &["mget", "foo", "bar"]).await
  390. );
  391. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "0"]).await);
  392. assert_eq!(
  393. Ok(1.into()),
  394. run_command(&c, &["copy", "foo", "bar", "db", "2", "replace"]).await
  395. );
  396. assert_eq!(
  397. Ok(Value::Array(vec!["2".into(), Value::Null])),
  398. run_command(&c, &["mget", "foo", "bar"]).await
  399. );
  400. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  401. assert_eq!(
  402. Ok(Value::Array(vec![Value::Null, "2".into()])),
  403. run_command(&c, &["mget", "foo", "bar"]).await
  404. );
  405. }
  406. #[tokio::test]
  407. async fn copy_same_db() {
  408. let c = create_connection();
  409. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  410. assert_eq!(
  411. Err(Error::SameEntry),
  412. run_command(&c, &["copy", "foo", "foo"]).await
  413. );
  414. assert_eq!(
  415. Err(Error::SameEntry),
  416. run_command(&c, &["copy", "foo", "foo", "db", "0"]).await
  417. );
  418. }
  419. #[tokio::test]
  420. async fn _move() {
  421. let c = create_connection();
  422. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  423. assert_eq!(Ok(1.into()), run_command(&c, &["move", "foo", "2"]).await);
  424. assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]).await);
  425. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  426. assert_eq!(Ok(0.into()), run_command(&c, &["move", "foo", "2"]).await);
  427. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  428. assert_eq!(Ok("1".into()), run_command(&c, &["get", "foo"]).await);
  429. }
  430. #[tokio::test]
  431. async fn _move_same_db() {
  432. let c = create_connection();
  433. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  434. assert_eq!(
  435. Err(Error::SameEntry),
  436. run_command(&c, &["move", "foo", "0"]).await
  437. );
  438. }
  439. #[tokio::test]
  440. async fn rename() {
  441. let c = create_connection();
  442. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  443. assert_eq!(
  444. Ok(Value::Ok),
  445. run_command(&c, &["rename", "foo", "bar-1650"]).await
  446. );
  447. assert_eq!(
  448. Ok(Value::Ok),
  449. run_command(&c, &["rename", "bar-1650", "xxx"]).await
  450. );
  451. assert_eq!(
  452. Err(Error::NotFound),
  453. run_command(&c, &["rename", "foo", "bar"]).await
  454. );
  455. }
  456. #[tokio::test]
  457. async fn renamenx() {
  458. let c = create_connection();
  459. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  460. assert_eq!(
  461. Ok(1.into()),
  462. run_command(&c, &["renamenx", "foo", "bar-1650"]).await
  463. );
  464. assert_eq!(
  465. Ok(1.into()),
  466. run_command(&c, &["renamenx", "bar-1650", "xxx"]).await
  467. );
  468. assert_eq!(
  469. Err(Error::NotFound),
  470. run_command(&c, &["renamenx", "foo", "bar"]).await
  471. );
  472. assert_eq!(
  473. Ok(Value::Ok),
  474. run_command(&c, &["set", "bar-1650", "xxx"]).await
  475. );
  476. assert_eq!(
  477. Ok(0.into()),
  478. run_command(&c, &["renamenx", "xxx", "bar-1650"]).await
  479. );
  480. }
  481. #[tokio::test]
  482. async fn scan_no_args() {
  483. let c = create_connection();
  484. for i in (1..100) {
  485. assert_eq!(
  486. Ok(1.into()),
  487. run_command(&c, &["incr", &format!("foo-{}", i)]).await
  488. );
  489. }
  490. let r: Vec<Value> = run_command(&c, &["scan", "0"])
  491. .await
  492. .unwrap()
  493. .try_into()
  494. .unwrap();
  495. let values: Vec<Value> = r[1].clone().try_into().unwrap();
  496. assert_eq!(2, r.len());
  497. assert_eq!(10, values.len());
  498. }
  499. #[tokio::test]
  500. async fn scan_with_count_match() {
  501. let c = create_connection();
  502. for i in (1..100) {
  503. assert_eq!(
  504. Ok(1.into()),
  505. run_command(&c, &["incr", &format!("foo-{}", i)]).await
  506. );
  507. }
  508. let r: Vec<Value> = run_command(&c, &["scan", "0", "match", "foo-1*", "count", "50"])
  509. .await
  510. .unwrap()
  511. .try_into()
  512. .unwrap();
  513. let values: Vec<Value> = r[1].clone().try_into().unwrap();
  514. assert_eq!(2, r.len());
  515. assert_eq!(11, values.len());
  516. }
  517. #[tokio::test]
  518. async fn scan_with_count() {
  519. let c = create_connection();
  520. for i in (1..100) {
  521. assert_eq!(
  522. Ok(1.into()),
  523. run_command(&c, &["incr", &format!("foo-{}", i)]).await
  524. );
  525. }
  526. let r: Vec<Value> = run_command(&c, &["scan", "0", "count", "50"])
  527. .await
  528. .unwrap()
  529. .try_into()
  530. .unwrap();
  531. let values: Vec<Value> = r[1].clone().try_into().unwrap();
  532. assert_eq!(2, r.len());
  533. assert_eq!(50, values.len());
  534. }
  535. }