key.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. //! # Key-related command handlers
  2. use super::now;
  3. use crate::{
  4. check_arg,
  5. connection::Connection,
  6. db::scan::Scan,
  7. error::Error,
  8. value::bytes_to_number,
  9. value::{cursor::Cursor, Value},
  10. };
  11. use bytes::Bytes;
  12. use std::convert::TryInto;
  13. use std::time::{SystemTime, UNIX_EPOCH};
  14. use tokio::time::{Duration, Instant};
  15. /// This command copies the value stored at the source key to the destination
  16. /// key.
  17. ///
  18. /// By default, the destination key is created in the logical database used by
  19. /// the connection. The DB option allows specifying an alternative logical
  20. /// database index for the destination key.
  21. ///
  22. /// The command returns an error when the destination key already exists. The
  23. /// REPLACE option removes the destination key before copying the value to it.
  24. pub async fn copy(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  25. let mut skip = 3;
  26. let target_db = if args.len() > 4 && check_arg!(args, 3, "DB") {
  27. skip += 2;
  28. Some(
  29. conn.all_connections()
  30. .get_databases()
  31. .get(bytes_to_number(&args[4])?)?
  32. .clone(),
  33. )
  34. } else {
  35. None
  36. };
  37. let replace = match args
  38. .get(skip)
  39. .map(|m| String::from_utf8_lossy(m).to_uppercase())
  40. {
  41. Some(value) => {
  42. if value == "REPLACE" {
  43. true
  44. } else {
  45. return Err(Error::Syntax);
  46. }
  47. }
  48. None => false,
  49. };
  50. let result = if conn
  51. .db()
  52. .copy(&args[1], &args[2], replace.into(), target_db)?
  53. {
  54. 1
  55. } else {
  56. 0
  57. };
  58. Ok(result.into())
  59. }
  60. /// Removes the specified keys. A key is ignored if it does not exist.
  61. pub async fn del(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  62. Ok(conn.db().del(&args[1..]))
  63. }
  64. /// Returns if key exists.
  65. pub async fn exists(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  66. Ok(conn.db().exists(&args[1..]).into())
  67. }
  68. /// Set a timeout on key. After the timeout has expired, the key will automatically be deleted. A
  69. /// key with an associated timeout is often said to be volatile in Redis terminology.
  70. ///
  71. /// The timeout will only be cleared by commands that delete or overwrite the contents of the key,
  72. /// including DEL, SET, GETSET and all the *STORE commands. This means that all the operations that
  73. /// conceptually alter the value stored at the key without replacing it with a new one will leave
  74. /// the timeout untouched. For instance, incrementing the value of a key with INCR, pushing a new
  75. /// value into a list with LPUSH, or altering the field value of a hash with HSET are all
  76. /// operations that will leave the timeout untouched.
  77. ///
  78. /// The timeout can also be cleared, turning the key back into a persistent key, using the PERSIST
  79. /// command.
  80. pub async fn expire(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  81. let expires_in: i64 = bytes_to_number(&args[2])?;
  82. if expires_in <= 0 {
  83. // Delete key right away
  84. return Ok(conn.db().del(&args[1..2]));
  85. }
  86. let expires_in: u64 = expires_in as u64;
  87. let expires_at = if check_arg!(args, 0, "EXPIRE") {
  88. Duration::from_secs(expires_in)
  89. } else {
  90. Duration::from_millis(expires_in)
  91. };
  92. Ok(conn.db().set_ttl(&args[1], expires_at))
  93. }
  94. /// Returns the string representation of the type of the value stored at key.
  95. /// The different types that can be returned are: string, list, set, zset, hash
  96. /// and stream.
  97. pub async fn data_type(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  98. Ok(conn.db().get_data_type(&args[1]).into())
  99. }
  100. /// EXPIREAT has the same effect and semantic as EXPIRE, but instead of specifying the number of
  101. /// seconds representing the TTL (time to live), it takes an absolute Unix timestamp (seconds since
  102. /// January 1, 1970). A timestamp in the past will delete the key immediately.
  103. pub async fn expire_at(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  104. let secs = check_arg!(args, 0, "EXPIREAT");
  105. let expires_at: i64 = bytes_to_number(&args[2])?;
  106. let expires_in: i64 = if secs {
  107. expires_at - now().as_secs() as i64
  108. } else {
  109. expires_at - now().as_millis() as i64
  110. };
  111. if expires_in <= 0 {
  112. // Delete key right away
  113. return Ok(conn.db().del(&args[1..2]));
  114. }
  115. let expires_in: u64 = expires_in as u64;
  116. let expires_at = if secs {
  117. Duration::from_secs(expires_in)
  118. } else {
  119. Duration::from_millis(expires_in)
  120. };
  121. Ok(conn.db().set_ttl(&args[1], expires_at))
  122. }
  123. /// Returns the absolute Unix timestamp (since January 1, 1970) in seconds at which the given key
  124. /// will expire.
  125. pub async fn expire_time(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  126. let ttl = match conn.db().ttl(&args[1]) {
  127. Some(Some(ttl)) => {
  128. // Is there a better way? There should be!
  129. if check_arg!(args, 0, "EXPIRETIME") {
  130. let secs: i64 = (ttl - Instant::now()).as_secs() as i64;
  131. secs + (now().as_secs() as i64)
  132. } else {
  133. let secs: i64 = (ttl - Instant::now()).as_millis() as i64;
  134. secs + (now().as_millis() as i64)
  135. }
  136. }
  137. Some(None) => -1,
  138. None => -2,
  139. };
  140. Ok(ttl.into())
  141. }
  142. /// Returns all keys that matches a given pattern
  143. pub async fn keys(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  144. Ok(conn.db().get_all_keys(&args[1])?.into())
  145. }
  146. /// Move key from the currently selected database (see SELECT) to the specified
  147. /// destination database. When key already exists in the destination database,
  148. /// or it does not exist in the source database, it does nothing. It is possible
  149. /// to use MOVE as a locking primitive because of this.
  150. pub async fn move_key(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  151. let target_db = conn
  152. .all_connections()
  153. .get_databases()
  154. .get(bytes_to_number(&args[2])?)?;
  155. Ok(if conn.db().move_key(&args[1], target_db)? {
  156. 1.into()
  157. } else {
  158. 0.into()
  159. })
  160. }
  161. /// Return information about the object/value stored in the database
  162. pub async fn object(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  163. let subcommand = String::from_utf8_lossy(&args[1]).to_lowercase();
  164. let expected_args = if subcommand == "help" { 2 } else { 3 };
  165. if expected_args != args.len() {
  166. return Err(Error::SubCommandNotFound(
  167. subcommand.into(),
  168. String::from_utf8_lossy(&args[0]).into(),
  169. ));
  170. }
  171. match subcommand.as_str() {
  172. "help" => super::help::object(),
  173. "refcount" => Ok(if conn.db().exists(&[args[2].clone()]) == 1 {
  174. 1.into()
  175. } else {
  176. Value::Null
  177. }),
  178. _ => Err(Error::SubCommandNotFound(
  179. subcommand.into(),
  180. String::from_utf8_lossy(&args[0]).into(),
  181. )),
  182. }
  183. }
  184. /// Renames key to newkey. It returns an error when key does not exist. If
  185. /// newkey already exists it is overwritten, when this happens RENAME executes
  186. /// an implicit DEL operation, so if the deleted key contains a very big value
  187. /// it may cause high latency even if RENAME itself is usually a constant-time
  188. /// operation.
  189. pub async fn rename(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  190. let is_rename = check_arg!(args, 0, "RENAME");
  191. if conn.db().rename(&args[1], &args[2], is_rename.into())? {
  192. Ok(if is_rename { Value::Ok } else { 1.into() })
  193. } else {
  194. Ok(0.into())
  195. }
  196. }
  197. /// SCAN is a cursor based iterator. This means that at every call of the
  198. /// command, the server returns an updated cursor that the user needs to use as
  199. /// the cursor argument in the next call.
  200. pub async fn scan(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  201. let cursor: Cursor = (&args[1]).try_into()?;
  202. Ok(conn.db().scan(cursor, None, None, None)?.into())
  203. }
  204. /// Returns the remaining time to live of a key that has a timeout. This introspection capability
  205. /// allows a Redis client to check how many seconds a given key will continue to be part of the
  206. /// dataset.
  207. pub async fn ttl(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  208. let ttl = match conn.db().ttl(&args[1]) {
  209. Some(Some(ttl)) => {
  210. let ttl = ttl - Instant::now();
  211. if check_arg!(args, 0, "TTL") {
  212. ttl.as_secs() as i64
  213. } else {
  214. ttl.as_millis() as i64
  215. }
  216. }
  217. Some(None) => -1,
  218. None => -2,
  219. };
  220. Ok(ttl.into())
  221. }
  222. /// Remove the existing timeout on key, turning the key from volatile (a key with an expire set) to
  223. /// persistent (a key that will never expire as no timeout is associated).
  224. pub async fn persist(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  225. Ok(conn.db().persist(&args[1]))
  226. }
  227. #[cfg(test)]
  228. mod test {
  229. use crate::{
  230. cmd::test::{create_connection, run_command},
  231. error::Error,
  232. value::Value,
  233. };
  234. #[tokio::test]
  235. async fn del() {
  236. let c = create_connection();
  237. assert_eq!(
  238. Ok(Value::Integer(1)),
  239. run_command(&c, &["incr", "foo"]).await
  240. );
  241. assert_eq!(
  242. Ok(Value::Integer(1)),
  243. run_command(&c, &["exists", "foo"]).await
  244. );
  245. assert_eq!(
  246. Ok(Value::Integer(1)),
  247. run_command(&c, &["del", "foo"]).await
  248. );
  249. assert_eq!(
  250. Ok(Value::Integer(0)),
  251. run_command(&c, &["del", "foo"]).await
  252. );
  253. assert_eq!(
  254. Ok(Value::Integer(0)),
  255. run_command(&c, &["exists", "foo"]).await
  256. );
  257. }
  258. #[tokio::test]
  259. async fn _type() {
  260. let c = create_connection();
  261. assert_eq!(
  262. Ok(Value::Integer(1)),
  263. run_command(&c, &["incr", "foo"]).await
  264. );
  265. assert_eq!(
  266. Ok(Value::Integer(1)),
  267. run_command(&c, &["hset", "hash", "foo", "bar"]).await
  268. );
  269. assert_eq!(
  270. Ok(Value::Integer(2)),
  271. run_command(&c, &["sadd", "set", "foo", "bar"]).await
  272. );
  273. assert_eq!(Ok("set".into()), run_command(&c, &["type", "set"]).await);
  274. assert_eq!(Ok("hash".into()), run_command(&c, &["type", "hash"]).await);
  275. assert_eq!(Ok("string".into()), run_command(&c, &["type", "foo"]).await);
  276. }
  277. #[tokio::test]
  278. async fn expire_and_persist() {
  279. let c = create_connection();
  280. assert_eq!(
  281. Ok(Value::Integer(1)),
  282. run_command(&c, &["incr", "foo"]).await
  283. );
  284. assert_eq!(
  285. Ok(Value::Integer(1)),
  286. run_command(&c, &["pexpire", "foo", "6000"]).await
  287. );
  288. match run_command(&c, &["pttl", "foo"]).await {
  289. Ok(Value::Integer(n)) => {
  290. assert!(n < 6000 && n > 5900);
  291. }
  292. _ => unreachable!(),
  293. };
  294. assert_eq!(
  295. Ok(Value::Integer(1)),
  296. run_command(&c, &["persist", "foo"]).await
  297. );
  298. assert_eq!(
  299. Ok(Value::Integer(-1)),
  300. run_command(&c, &["pttl", "foo"]).await
  301. );
  302. assert_eq!(
  303. Ok(Value::Integer(1)),
  304. run_command(&c, &["del", "foo"]).await
  305. );
  306. assert_eq!(
  307. Ok(Value::Integer(-2)),
  308. run_command(&c, &["pttl", "foo"]).await
  309. );
  310. }
  311. #[tokio::test]
  312. async fn copy() {
  313. let c = create_connection();
  314. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  315. assert_eq!(Ok(1.into()), run_command(&c, &["copy", "foo", "bar"]).await);
  316. assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
  317. assert_eq!(Ok(0.into()), run_command(&c, &["copy", "foo", "bar"]).await);
  318. assert_eq!(
  319. Ok(Value::Array(vec!["2".into(), "1".into()])),
  320. run_command(&c, &["mget", "foo", "bar"]).await
  321. );
  322. assert_eq!(
  323. Ok(1.into()),
  324. run_command(&c, &["copy", "foo", "bar", "replace"]).await
  325. );
  326. assert_eq!(
  327. Ok(Value::Array(vec!["2".into(), "2".into()])),
  328. run_command(&c, &["mget", "foo", "bar"]).await
  329. );
  330. }
  331. #[tokio::test]
  332. async fn copy_different_db() {
  333. let c = create_connection();
  334. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  335. assert_eq!(
  336. Ok(1.into()),
  337. run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
  338. );
  339. assert_eq!(Ok(2.into()), run_command(&c, &["incr", "foo"]).await);
  340. assert_eq!(
  341. Ok(0.into()),
  342. run_command(&c, &["copy", "foo", "bar", "db", "2"]).await
  343. );
  344. assert_eq!(
  345. Ok(Value::Array(vec!["2".into(), Value::Null])),
  346. run_command(&c, &["mget", "foo", "bar"]).await
  347. );
  348. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  349. assert_eq!(
  350. Ok(Value::Array(vec![Value::Null, "1".into()])),
  351. run_command(&c, &["mget", "foo", "bar"]).await
  352. );
  353. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "0"]).await);
  354. assert_eq!(
  355. Ok(1.into()),
  356. run_command(&c, &["copy", "foo", "bar", "db", "2", "replace"]).await
  357. );
  358. assert_eq!(
  359. Ok(Value::Array(vec!["2".into(), Value::Null])),
  360. run_command(&c, &["mget", "foo", "bar"]).await
  361. );
  362. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  363. assert_eq!(
  364. Ok(Value::Array(vec![Value::Null, "2".into()])),
  365. run_command(&c, &["mget", "foo", "bar"]).await
  366. );
  367. }
  368. #[tokio::test]
  369. async fn copy_same_db() {
  370. let c = create_connection();
  371. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  372. assert_eq!(
  373. Err(Error::SameEntry),
  374. run_command(&c, &["copy", "foo", "foo"]).await
  375. );
  376. assert_eq!(
  377. Err(Error::SameEntry),
  378. run_command(&c, &["copy", "foo", "foo", "db", "0"]).await
  379. );
  380. }
  381. #[tokio::test]
  382. async fn _move() {
  383. let c = create_connection();
  384. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  385. assert_eq!(Ok(1.into()), run_command(&c, &["move", "foo", "2"]).await);
  386. assert_eq!(Ok(Value::Null), run_command(&c, &["get", "foo"]).await);
  387. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  388. assert_eq!(Ok(0.into()), run_command(&c, &["move", "foo", "2"]).await);
  389. assert_eq!(Ok(Value::Ok), run_command(&c, &["select", "2"]).await);
  390. assert_eq!(Ok("1".into()), run_command(&c, &["get", "foo"]).await);
  391. }
  392. #[tokio::test]
  393. async fn _move_same_db() {
  394. let c = create_connection();
  395. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  396. assert_eq!(
  397. Err(Error::SameEntry),
  398. run_command(&c, &["move", "foo", "0"]).await
  399. );
  400. }
  401. #[tokio::test]
  402. async fn rename() {
  403. let c = create_connection();
  404. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  405. assert_eq!(
  406. Ok(Value::Ok),
  407. run_command(&c, &["rename", "foo", "bar-1650"]).await
  408. );
  409. assert_eq!(
  410. Ok(Value::Ok),
  411. run_command(&c, &["rename", "bar-1650", "xxx"]).await
  412. );
  413. assert_eq!(
  414. Err(Error::NotFound),
  415. run_command(&c, &["rename", "foo", "bar"]).await
  416. );
  417. }
  418. #[tokio::test]
  419. async fn renamenx() {
  420. let c = create_connection();
  421. assert_eq!(Ok(1.into()), run_command(&c, &["incr", "foo"]).await);
  422. assert_eq!(
  423. Ok(1.into()),
  424. run_command(&c, &["renamenx", "foo", "bar-1650"]).await
  425. );
  426. assert_eq!(
  427. Ok(1.into()),
  428. run_command(&c, &["renamenx", "bar-1650", "xxx"]).await
  429. );
  430. assert_eq!(
  431. Err(Error::NotFound),
  432. run_command(&c, &["renamenx", "foo", "bar"]).await
  433. );
  434. assert_eq!(
  435. Ok(Value::Ok),
  436. run_command(&c, &["set", "bar-1650", "xxx"]).await
  437. );
  438. assert_eq!(
  439. Ok(0.into()),
  440. run_command(&c, &["renamenx", "xxx", "bar-1650"]).await
  441. );
  442. }
  443. }