1
0

key.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. //! # Key-related command handlers
  2. use super::now;
  3. use crate::{
  4. check_arg,
  5. connection::Connection,
  6. db::{scan::Scan, utils::ExpirationOpts},
  7. error::Error,
  8. value::{
  9. bytes_to_int, bytes_to_number, cursor::Cursor, expiration::Expiration, typ::Typ, Value,
  10. },
  11. };
  12. use bytes::Bytes;
  13. use std::{
  14. convert::TryInto,
  15. str::FromStr,
  16. time::{SystemTime, UNIX_EPOCH},
  17. };
  18. use tokio::time::{Duration, Instant};
  19. /// This command copies the value stored at the source key to the destination
  20. /// key.
  21. ///
  22. /// By default, the destination key is created in the logical database used by
  23. /// the connection. The DB option allows specifying an alternative logical
  24. /// database index for the destination key.
  25. ///
  26. /// The command returns an error when the destination key already exists. The
  27. /// REPLACE option removes the destination key before copying the value to it.
  28. pub async fn copy(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  29. let mut skip = 3;
  30. let target_db = if args.len() > 4 && check_arg!(args, 3, "DB") {
  31. skip += 2;
  32. Some(
  33. conn.all_connections()
  34. .get_databases()
  35. .get(bytes_to_int(&args[4])?)?
  36. .clone(),
  37. )
  38. } else {
  39. None
  40. };
  41. let replace = match args
  42. .get(skip)
  43. .map(|m| String::from_utf8_lossy(m).to_uppercase())
  44. {
  45. Some(value) => {
  46. if value == "REPLACE" {
  47. true
  48. } else {
  49. return Err(Error::Syntax);
  50. }
  51. }
  52. None => false,
  53. };
  54. let result = if conn
  55. .db()
  56. .copy(&args[1], &args[2], replace.into(), target_db)?
  57. {
  58. 1
  59. } else {
  60. 0
  61. };
  62. Ok(result.into())
  63. }
  64. /// Removes the specified keys. A key is ignored if it does not exist.
  65. pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  66. Ok(conn.db().del(&args[1..]))
  67. }
  68. /// Returns if key exists.
  69. pub async fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  70. Ok(conn.db().exists(&args[1..]).into())
  71. }
  72. /// Set a timeout on key. After the timeout has expired, the key will automatically be deleted. A
  73. /// key with an associated timeout is often said to be volatile in Redis terminology.
  74. ///
  75. /// The timeout will only be cleared by commands that delete or overwrite the contents of the key,
  76. /// including DEL, SET, GETSET and all the *STORE commands. This means that all the operations that
  77. /// conceptually alter the value stored at the key without replacing it with a new one will leave
  78. /// the timeout untouched. For instance, incrementing the value of a key with INCR, pushing a new
  79. /// value into a list with LPUSH, or altering the field value of a hash with HSET are all
  80. /// operations that will leave the timeout untouched.
  81. ///
  82. /// The timeout can also be cleared, turning the key back into a persistent key, using the PERSIST
  83. /// command.
  84. pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  85. let is_milliseconds = check_arg!(args, 0, "PEXPIRE");
  86. let expires_at = Expiration::new(&args[2], is_milliseconds, false, &args[0])?;
  87. if expires_at.is_negative {
  88. // Delete key right away
  89. return Ok(conn.db().del(&args[1..2]));
  90. }
  91. conn.db()
  92. .set_ttl(&args[1], expires_at.try_into()?, (&args[3..]).try_into()?)
  93. }
  94. /// Returns the string representation of the type of the value stored at key.
  95. /// The different types that can be returned are: string, list, set, zset, hash
  96. /// and stream.
  97. pub async fn data_type(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  98. Ok(conn.db().get_data_type(&args[1]).into())
  99. }
  100. /// EXPIREAT has the same effect and semantic as EXPIRE, but instead of specifying the number of
  101. /// seconds representing the TTL (time to live), it takes an absolute Unix timestamp (seconds since
  102. /// January 1, 1970). A timestamp in the past will delete the key immediately.
  103. pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  104. let is_milliseconds = check_arg!(args, 0, "PEXPIREAT");
  105. let expires_at = Expiration::new(&args[2], is_milliseconds, true, &args[0])?;
  106. if expires_at.is_negative {
  107. // Delete key right away
  108. return Ok(conn.db().del(&args[1..2]));
  109. }
  110. conn.db()
  111. .set_ttl(&args[1], expires_at.try_into()?, (&args[3..]).try_into()?)
  112. }
  113. /// Returns the absolute Unix timestamp (since January 1, 1970) in seconds at which the given key
  114. /// will expire.
  115. pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  116. let ttl = match conn.db().ttl(&args[1]) {
  117. Some(Some(ttl)) => {
  118. // Is there a better way? There should be!
  119. if check_arg!(args, 0, "EXPIRETIME") {
  120. let secs: i64 = (ttl - Instant::now()).as_secs() as i64;
  121. secs + 1 + (now().as_secs() as i64)
  122. } else {
  123. let secs: i64 = (ttl - Instant::now()).as_millis() as i64;
  124. secs + 1 + (now().as_millis() as i64)
  125. }
  126. }
  127. Some(None) => -1,
  128. None => -2,
  129. };
  130. Ok(ttl.into())
  131. }
  132. /// Returns all keys that matches a given pattern
  133. pub async fn keys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  134. Ok(conn.db().get_all_keys(&args[1])?.into())
  135. }
  136. /// Move key from the currently selected database (see SELECT) to the specified
  137. /// destination database. When key already exists in the destination database,
  138. /// or it does not exist in the source database, it does nothing. It is possible
  139. /// to use MOVE as a locking primitive because of this.
  140. pub async fn move_key(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  141. let target_db = conn
  142. .all_connections()
  143. .get_databases()
  144. .get(bytes_to_int(&args[2])?)?;
  145. Ok(if conn.db().move_key(&args[1], target_db)? {
  146. 1.into()
  147. } else {
  148. 0.into()
  149. })
  150. }
  151. /// Return information about the object/value stored in the database
  152. pub async fn object(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  153. let subcommand = String::from_utf8_lossy(&args[1]).to_lowercase();
  154. let expected_args = if subcommand == "help" { 2 } else { 3 };
  155. if expected_args != args.len() {
  156. return Err(Error::SubCommandNotFound(
  157. subcommand.into(),
  158. String::from_utf8_lossy(&args[0]).into(),
  159. ));
  160. }
  161. match subcommand.as_str() {
  162. "help" => super::help::object(),
  163. "refcount" => Ok(if conn.db().exists(&[args[2].clone()]) == 1 {
  164. 1.into()
  165. } else {
  166. Value::Null
  167. }),
  168. _ => Err(Error::SubCommandNotFound(
  169. subcommand.into(),
  170. String::from_utf8_lossy(&args[0]).into(),
  171. )),
  172. }
  173. }
  174. /// Return a random key from the currently selected database.
  175. pub async fn randomkey(conn: &Connection, _: &[Bytes]) -> Result<Value, Error> {
  176. conn.db().randomkey()
  177. }
  178. /// Renames key to newkey. It returns an error when key does not exist. If
  179. /// newkey already exists it is overwritten, when this happens RENAME executes
  180. /// an implicit DEL operation, so if the deleted key contains a very big value
  181. /// it may cause high latency even if RENAME itself is usually a constant-time
  182. /// operation.
  183. pub async fn rename(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  184. let is_rename = check_arg!(args, 0, "RENAME");
  185. if conn.db().rename(&args[1], &args[2], is_rename.into())? {
  186. Ok(if is_rename { Value::Ok } else { 1.into() })
  187. } else {
  188. Ok(0.into())
  189. }
  190. }
  191. /// SCAN is a cursor based iterator. This means that at every call of the
  192. /// command, the server returns an updated cursor that the user needs to use as
  193. /// the cursor argument in the next call.
  194. pub async fn scan(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  195. let cursor: Cursor = (&args[1]).try_into()?;
  196. let mut current = 2;
  197. let mut pattern = None;
  198. let mut count = None;
  199. let mut typ = None;
  200. for i in (2..args.len()).step_by(2) {
  201. let value = args
  202. .get(i + 1)
  203. .ok_or(Error::InvalidArgsCount("SCAN".to_owned()))?;
  204. match String::from_utf8_lossy(&args[i]).to_uppercase().as_str() {
  205. "MATCH" => pattern = Some(value),
  206. "COUNT" => {
  207. count = Some(
  208. bytes_to_number(value)
  209. .map_err(|_| Error::InvalidArgsCount("SCAN".to_owned()))?,
  210. )
  211. }
  212. "TYPE" => {
  213. typ = Some(
  214. Typ::from_str(&String::from_utf8_lossy(&value)).map_err(|_| Error::Syntax)?,
  215. )
  216. }
  217. _ => return Err(Error::Syntax),
  218. }
  219. }
  220. Ok(conn.db().scan(cursor, pattern, count, typ)?.into())
  221. }
  222. /// Returns the remaining time to live of a key that has a timeout. This introspection capability
  223. /// allows a Redis client to check how many seconds a given key will continue to be part of the
  224. /// dataset.
  225. pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  226. let ttl = match conn.db().ttl(&args[1]) {
  227. Some(Some(ttl)) => {
  228. let ttl = ttl - Instant::now();
  229. if check_arg!(args, 0, "TTL") {
  230. ttl.as_secs() as i64 + 1
  231. } else {
  232. ttl.as_millis() as i64
  233. }
  234. }
  235. Some(None) => -1,
  236. None => -2,
  237. };
  238. Ok(ttl.into())
  239. }
  240. /// Remove the existing timeout on key, turning the key from volatile (a key with an expire set) to
  241. /// persistent (a key that will never expire as no timeout is associated).
  242. pub async fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  243. Ok(conn.db().persist(&args[1]))
  244. }
  245. #[cfg(test)]
  246. mod test {
  247. use std::convert::TryInto;
  248. use crate::{
  249. cmd::test::{create_connection, run_command},
  250. error::Error,
  251. value::Value,
  252. };
  253. #[tokio::test]
  254. async fn del() {
  255. let c = create_connection();
  256. assert_eq!(
  257. Ok(Value::Integer(1)),
  258. run_command(&c, &["incr", "foo"]).await
  259. );
  260. assert_eq!(
  261. Ok(Value::Integer(1)),
  262. run_command(&c, &["exists", "foo"]).await
  263. );
  264. assert_eq!(
  265. Ok(Value::Integer(1)),
  266. run_command(&c, &["del", "foo"]).await
  267. );
  268. assert_eq!(
  269. Ok(Value::Integer(0)),
  270. run_command(&c, &["del", "foo"]).await
  271. );
  272. assert_eq!(
  273. Ok(Value::Integer(0)),
  274. run_command(&c, &["exists", "foo"]).await
  275. );
  276. }
  277. #[tokio::test]
  278. async fn _type() {
  279. let c = create_connection();
  280. assert_eq!(
  281. Ok(Value::Integer(1)),
  282. run_command(&c, &["incr", "foo"]).await
  283. );
  284. assert_eq!(
  285. Ok(Value::Integer(1)),
  286. run_command(&c, &["hset", "hash", "foo", "bar"]).await
  287. );
  288. assert_eq!(
  289. Ok(Value::Integer(2)),
  290. run_command(&c, &["sadd", "set", "foo", "bar"]).await
  291. );
  292. assert_eq!(Ok("set".into()), run_command(&c, &["type", "set"]).await);
  293. assert_eq!(Ok("hash".into()), run_command(&c, &["type", "hash"]).await);
  294. assert_eq!(Ok("string".into()), run_command(&c, &["type", "foo"]).await);
  295. }
  296. #[tokio::test]
  297. async fn expire_and_persist() {
  298. let c = create_connection();
  299. assert_eq!(
  300. Ok(Value::Integer(1)),
  301. run_command(&c, &["incr", "foo"]).await
  302. );
  303. assert_eq!(
  304. Ok(Value::Integer(1)),
  305. run_command(&c, &["pexpire", "foo", "6000"]).await
  306. );
  307. match run_command(&c, &["pttl", "foo"]).await {
  308. Ok(Value::Integer(n)) => {
  309. assert!(n < 6000 && n > 5900);
  310. }
  311. _ => unreachable!(),
  312. };
  313. assert_eq!(
  314. Ok(Value::Integer(1)),
  315. run_command(&c, &["persist", "foo"]).await
  316. );
  317. assert_eq!(
  318. Ok(Value::Integer(-1)),
  319. run_command(&c, &["pttl", "foo"]).await
  320. );
  321. assert_eq!(
  322. Ok(Value::Integer(1)),
  323. run_command(&c, &["del", "foo"]).await
  324. );
  325. assert_eq!(
  326. Ok(Value::Integer(-2)),
  327. run_command(&c, &["pttl", "foo"]).await
  328. );
  329. }
  330. #[tokio::test]
  331. async fn expire2() {
  332. let c = create_connection();
  333. assert_eq!(
  334. Ok(Value::Integer(1)),
  335. run_command(&c, &["incr", "foo"]).await
  336. );
  337. assert_eq!(
  338. Err(Error::InvalidExpire("pexpire".to_owned())),
  339. run_command(&c, &["pexpire", "foo", "9223372036854770000"]).await
  340. );
  341. }
  342. #[tokio::test]
  343. async fn copy() {
  344. let c = create_connection();
  345. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  346. assert_eq!(Ok(1.into()), run_command(&c, &["copy", "foo", "bar"]).await);
  347. assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
  348. assert_eq!(Ok(0.into()), run_command(&c, &["copy", "foo", "bar"]).await);
  349. assert_eq!(
  350. Ok(Value::Array(vec!["2".into(), "1".into()])),
  351. run_command(&c, &["mget", "foo", "bar"]).await
  352. );
  353. assert_eq!(
  354. Ok(1.into()),
  355. run_command(&c, &["copy", "foo", "bar", "replace"]).await
  356. );
  357. assert_eq!(
  358. Ok(Value::Array(vec!["2".into(), "2".into()])),
  359. run_command(&c, &["mget", "foo", "bar"]).await
  360. );
  361. }
  362. #[tokio::test]
  363. async fn copy_different_db() {
  364. let c = create_connection();
  365. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  366. assert_eq!(
  367. Ok(1.into()),
  368. run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
  369. );
  370. assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
  371. assert_eq!(
  372. Ok(0.into()),
  373. run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
  374. );
  375. assert_eq!(
  376. Ok(Value::Array(vec!["2".into(), Value::Null])),
  377. run_command(&c, &["mget", "foo", "bar"]).await
  378. );
  379. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  380. assert_eq!(
  381. Ok(Value::Array(vec![Value::Null, "1".into()])),
  382. run_command(&c, &["mget", "foo", "bar"]).await
  383. );
  384. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "0"]).await);
  385. assert_eq!(
  386. Ok(1.into()),
  387. run_command(&c, &["copy", "foo", "bar", "db", "2", "replace"]).await
  388. );
  389. assert_eq!(
  390. Ok(Value::Array(vec!["2".into(), Value::Null])),
  391. run_command(&c, &["mget", "foo", "bar"]).await
  392. );
  393. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  394. assert_eq!(
  395. Ok(Value::Array(vec![Value::Null, "2".into()])),
  396. run_command(&c, &["mget", "foo", "bar"]).await
  397. );
  398. }
  399. #[tokio::test]
  400. async fn copy_same_db() {
  401. let c = create_connection();
  402. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  403. assert_eq!(
  404. Err(Error::SameEntry),
  405. run_command(&c, &["copy", "foo", "foo"]).await
  406. );
  407. assert_eq!(
  408. Err(Error::SameEntry),
  409. run_command(&c, &["copy", "foo", "foo", "db", "0"]).await
  410. );
  411. }
  412. #[tokio::test]
  413. async fn _move() {
  414. let c = create_connection();
  415. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  416. assert_eq!(Ok(1.into()), run_command(&c, &["move", "foo", "2"]).await);
  417. assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]).await);
  418. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  419. assert_eq!(Ok(0.into()), run_command(&c, &["move", "foo", "2"]).await);
  420. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  421. assert_eq!(Ok("1".into()), run_command(&c, &["get", "foo"]).await);
  422. }
  423. #[tokio::test]
  424. async fn _move_same_db() {
  425. let c = create_connection();
  426. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  427. assert_eq!(
  428. Err(Error::SameEntry),
  429. run_command(&c, &["move", "foo", "0"]).await
  430. );
  431. }
  432. #[tokio::test]
  433. async fn rename() {
  434. let c = create_connection();
  435. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  436. assert_eq!(
  437. Ok(Value::Ok),
  438. run_command(&c, &["rename", "foo", "bar-1650"]).await
  439. );
  440. assert_eq!(
  441. Ok(Value::Ok),
  442. run_command(&c, &["rename", "bar-1650", "xxx"]).await
  443. );
  444. assert_eq!(
  445. Err(Error::NotFound),
  446. run_command(&c, &["rename", "foo", "bar"]).await
  447. );
  448. }
  449. #[tokio::test]
  450. async fn renamenx() {
  451. let c = create_connection();
  452. assert_eq!(Ok(1i64.into()), run_command(&c, &["incr", "foo"]).await);
  453. assert_eq!(
  454. Ok(1i64.into()),
  455. run_command(&c, &["renamenx", "foo", "bar-1650"]).await
  456. );
  457. assert_eq!(
  458. Ok(1i64.into()),
  459. run_command(&c, &["renamenx", "bar-1650", "xxx"]).await
  460. );
  461. assert_eq!(
  462. Err(Error::NotFound),
  463. run_command(&c, &["renamenx", "foo", "bar"]).await
  464. );
  465. assert_eq!(
  466. Ok(Value::Ok),
  467. run_command(&c, &["set", "bar-1650", "xxx"]).await
  468. );
  469. assert_eq!(
  470. Ok(0i64.into()),
  471. run_command(&c, &["renamenx", "xxx", "bar-1650"]).await
  472. );
  473. assert_eq!(
  474. Ok(Value::Blob("1".into())),
  475. run_command(&c, &["get", "xxx"]).await
  476. );
  477. }
  478. #[tokio::test]
  479. async fn scan_no_args() {
  480. let c = create_connection();
  481. for i in (1..100) {
  482. assert_eq!(
  483. Ok(1.into()),
  484. run_command(&c, &["incr", &format!("foo-{}", i)]).await
  485. );
  486. }
  487. let r: Vec<Value> = run_command(&c, &["scan", "0"])
  488. .await
  489. .unwrap()
  490. .try_into()
  491. .unwrap();
  492. let values: Vec<Value> = r[1].clone().try_into().unwrap();
  493. assert_eq!(2, r.len());
  494. assert_eq!(10, values.len());
  495. }
  496. #[tokio::test]
  497. async fn scan_with_count_match() {
  498. let c = create_connection();
  499. for i in (1..100) {
  500. assert_eq!(
  501. Ok(1.into()),
  502. run_command(&c, &["incr", &format!("foo-{}", i)]).await
  503. );
  504. }
  505. let r: Vec<Value> = run_command(&c, &["scan", "0", "match", "foo-1*", "count", "50"])
  506. .await
  507. .unwrap()
  508. .try_into()
  509. .unwrap();
  510. let values: Vec<Value> = r[1].clone().try_into().unwrap();
  511. assert_eq!(2, r.len());
  512. assert_eq!(11, values.len());
  513. }
  514. #[tokio::test]
  515. async fn scan_with_count() {
  516. let c = create_connection();
  517. for i in (1..100) {
  518. assert_eq!(
  519. Ok(1.into()),
  520. run_command(&c, &["incr", &format!("foo-{}", i)]).await
  521. );
  522. }
  523. let r: Vec<Value> = run_command(&c, &["scan", "0", "count", "50"])
  524. .await
  525. .unwrap()
  526. .try_into()
  527. .unwrap();
  528. let values: Vec<Value> = r[1].clone().try_into().unwrap();
  529. assert_eq!(2, r.len());
  530. assert_eq!(50, values.len());
  531. }
  532. }