hash.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. //! # Hash command handlers
  2. use crate::{
  3. check_arg,
  4. connection::Connection,
  5. error::Error,
  6. value::{bytes_to_number, float::Float, Value},
  7. };
  8. use bytes::Bytes;
  9. use rand::Rng;
  10. use std::collections::{BTreeMap, HashMap, VecDeque};
  11. /// Removes the specified fields from the hash stored at key. Specified fields that do not exist
  12. /// within this hash are ignored. If key does not exist, it is treated as an empty hash and this
  13. /// command returns 0.
  14. pub async fn hdel(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
  15. let mut is_empty = false;
  16. let key = args.pop_front().ok_or(Error::Syntax)?;
  17. let result = conn.db().get_map(&key, |v| match v {
  18. Some(Value::Hash(h)) => {
  19. let mut h = h.write();
  20. let mut total: i64 = 0;
  21. for key in args.into_iter() {
  22. if h.remove(&key).is_some() {
  23. total += 1;
  24. }
  25. }
  26. is_empty = h.len() == 0;
  27. Ok(total.into())
  28. }
  29. None => Ok(0.into()),
  30. _ => Err(Error::WrongType),
  31. })?;
  32. if is_empty {
  33. let _ = conn.db().del(&[key]);
  34. } else {
  35. conn.db().bump_version(&key);
  36. }
  37. Ok(result)
  38. }
  39. /// Returns if field is an existing field in the hash stored at key.
  40. pub async fn hexists(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  41. conn.db().get_map(&args[0], |v| match v {
  42. Some(Value::Hash(h)) => Ok(if h.read().get(&args[1]).is_some() {
  43. 1.into()
  44. } else {
  45. 0.into()
  46. }),
  47. None => Ok(0.into()),
  48. _ => Err(Error::WrongType),
  49. })
  50. }
  51. /// Returns the value associated with field in the hash stored at key.
  52. pub async fn hget(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  53. conn.db().get_map(&args[0], |v| match v {
  54. Some(Value::Hash(h)) => Ok(h
  55. .read()
  56. .get(&args[1])
  57. .map(|v| Value::new(v))
  58. .unwrap_or_default()),
  59. None => Ok(Value::Null),
  60. _ => Err(Error::WrongType),
  61. })
  62. }
  63. /// Returns all fields and values of the hash stored at key. In the returned value, every field
  64. /// name is followed by its value, so the length of the reply is twice the size of the hash.
  65. pub async fn hgetall(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  66. conn.db().get_map(&args[0], |v| match v {
  67. Some(Value::Hash(h)) => {
  68. let mut ret = vec![];
  69. for (key, value) in h.read().iter() {
  70. ret.push(Value::new(key));
  71. ret.push(Value::new(value));
  72. }
  73. Ok(ret.into())
  74. }
  75. None => Ok(Value::Array(vec![])),
  76. _ => Err(Error::WrongType),
  77. })
  78. }
  79. /// Increment the specified field of a hash stored at key, and representing a number, by the
  80. /// specified increment. If the increment value is negative, the result is to have the hash field
  81. /// value decremented instead of incremented. If the field does not exist, it is set to 0 before
  82. /// performing the operation.
  83. pub async fn hincrby_int(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  84. let result = conn
  85. .db()
  86. .hincrby::<i64>(&args[0], &args[1], &args[2], "an integer")?;
  87. conn.db().bump_version(&args[0]);
  88. Ok(result)
  89. }
  90. /// Increment the specified field of a hash stored at key, and representing a number, by the
  91. /// specified increment. If the increment value is negative, the result is to have the hash field
  92. /// value decremented instead of incremented. If the field does not exist, it is set to 0 before
  93. /// performing the operation.
  94. pub async fn hincrby_float(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  95. let result = conn
  96. .db()
  97. .hincrby::<Float>(&args[0], &args[1], &args[2], "a float")?;
  98. conn.db().bump_version(&args[0]);
  99. Ok(result)
  100. }
  101. /// Returns all field names in the hash stored at key.
  102. pub async fn hkeys(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  103. conn.db().get_map(&args[0], |v| match v {
  104. Some(Value::Hash(h)) => {
  105. let mut ret = vec![];
  106. for key in h.read().keys() {
  107. ret.push(Value::new(key));
  108. }
  109. Ok(ret.into())
  110. }
  111. None => Ok(Value::Array(vec![])),
  112. _ => Err(Error::WrongType),
  113. })
  114. }
  115. /// Returns the number of fields contained in the hash stored at key.
  116. pub async fn hlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  117. conn.db().get_map(&args[0], |v| match v {
  118. Some(Value::Hash(h)) => Ok(h.read().len().into()),
  119. None => Ok(0.into()),
  120. _ => Err(Error::WrongType),
  121. })
  122. }
  123. /// Returns the values associated with the specified fields in the hash stored at key.
  124. pub async fn hmget(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
  125. let key = args.pop_front().ok_or(Error::Syntax)?;
  126. conn.db().get_map(&key, |v| match v {
  127. Some(Value::Hash(h)) => {
  128. let h = h.read();
  129. Ok(args
  130. .iter()
  131. .map(|key| h.get(key).map(|v| Value::new(v)).unwrap_or_default())
  132. .collect::<Vec<Value>>()
  133. .into())
  134. }
  135. None => Ok(args
  136. .iter()
  137. .map(|_| Value::Null)
  138. .collect::<Vec<Value>>()
  139. .into()),
  140. _ => Err(Error::WrongType),
  141. })
  142. }
  143. /// Returns random keys (or values) from a hash
  144. pub async fn hrandfield(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  145. let (count, with_values) = match args.len() {
  146. 1 => (None, false),
  147. 2 => (Some(bytes_to_number::<i64>(&args[1])?), false),
  148. 3 => {
  149. if !(check_arg!(args, 2, "WITHVALUES")) {
  150. return Err(Error::Syntax);
  151. }
  152. (Some(bytes_to_number::<i64>(&args[1])?), true)
  153. }
  154. _ => return Err(Error::InvalidArgsCount("hrandfield".to_owned())),
  155. };
  156. let (count, single, repeat) = match count {
  157. Some(count) if count > 0 => (count, false, 1),
  158. Some(count) => (count.abs(), false, count.abs()),
  159. _ => (1, true, 1),
  160. };
  161. conn.db().get_map(&args[0], |v| match v {
  162. Some(Value::Hash(h)) => {
  163. let mut ret = vec![];
  164. let mut i = 0;
  165. let mut rand_sorted = BTreeMap::new();
  166. let mut rng = rand::thread_rng();
  167. let h = h.read();
  168. for _ in 0..repeat {
  169. for (key, value) in h.iter() {
  170. let rand = rng.gen::<u64>();
  171. rand_sorted.insert((rand, i), (key, value));
  172. i += 1;
  173. }
  174. }
  175. i = 0;
  176. for val in rand_sorted.values() {
  177. if single {
  178. return Ok(Value::new(val.0));
  179. }
  180. if i == count {
  181. break;
  182. }
  183. ret.push(Value::new(val.0));
  184. if with_values {
  185. ret.push(Value::new(val.1));
  186. }
  187. i += 1;
  188. }
  189. Ok(ret.into())
  190. }
  191. None => Ok(Value::Array(vec![])),
  192. _ => Err(Error::WrongType),
  193. })
  194. }
  195. /// Sets field in the hash stored at key to value. If key does not exist, a new key holding a hash
  196. /// is created. If field already exists in the hash, it is overwritten.
  197. pub async fn hmset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
  198. let key = args.pop_front().ok_or(Error::Syntax)?;
  199. if args.len() % 2 == 1 {
  200. return Err(Error::InvalidArgsCount("hset".to_owned()));
  201. }
  202. let result = conn.db().get_map(&key, |v| match v {
  203. Some(Value::Hash(h)) => {
  204. let mut h = h.write();
  205. loop {
  206. if args.is_empty() {
  207. break;
  208. }
  209. let key = args.pop_front().ok_or(Error::Syntax)?;
  210. let value = args.pop_front().ok_or(Error::Syntax)?;
  211. h.insert(key, value);
  212. }
  213. Ok(Value::Ok)
  214. }
  215. None => {
  216. #[allow(clippy::mutable_key_type)]
  217. let mut h = HashMap::new();
  218. loop {
  219. if args.is_empty() {
  220. break;
  221. }
  222. let key = args.pop_front().ok_or(Error::Syntax)?;
  223. let value = args.pop_front().ok_or(Error::Syntax)?;
  224. h.insert(key, value);
  225. }
  226. let _len = h.len();
  227. conn.db().set(key.clone(), h.into(), None);
  228. Ok(Value::Ok)
  229. }
  230. _ => Err(Error::WrongType),
  231. })?;
  232. conn.db().bump_version(&key);
  233. Ok(result)
  234. }
  235. /// Sets field in the hash stored at key to value. If key does not exist, a new key holding a hash
  236. /// is created. If field already exists in the hash, it is overwritten.
  237. pub async fn hset(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
  238. let key = args.pop_front().ok_or(Error::Syntax)?;
  239. if args.len() % 2 == 1 {
  240. return Err(Error::InvalidArgsCount("hset".to_owned()));
  241. }
  242. let result = conn.db().get_map(&key, |v| match v {
  243. Some(Value::Hash(h)) => {
  244. let mut h = h.write();
  245. let mut e: i64 = 0;
  246. loop {
  247. if args.is_empty() {
  248. break;
  249. }
  250. let key = args.pop_front().ok_or(Error::Syntax)?;
  251. let value = args.pop_front().ok_or(Error::Syntax)?;
  252. if h.insert(key, value).is_none() {
  253. e += 1;
  254. }
  255. }
  256. Ok(e.into())
  257. }
  258. None => {
  259. #[allow(clippy::mutable_key_type)]
  260. let mut h = HashMap::new();
  261. loop {
  262. if args.is_empty() {
  263. break;
  264. }
  265. let key = args.pop_front().ok_or(Error::Syntax)?;
  266. let value = args.pop_front().ok_or(Error::Syntax)?;
  267. h.insert(key, value);
  268. }
  269. let len = h.len();
  270. conn.db().set(key.clone(), h.into(), None);
  271. Ok(len.into())
  272. }
  273. _ => Err(Error::WrongType),
  274. })?;
  275. conn.db().bump_version(&key);
  276. Ok(result)
  277. }
  278. /// Sets field in the hash stored at key to value, only if field does not yet exist. If key does
  279. /// not exist, a new key holding a hash is created. If field already exists, this operation has no
  280. /// effect.
  281. pub async fn hsetnx(conn: &Connection, mut args: VecDeque<Bytes>) -> Result<Value, Error> {
  282. let key = args.pop_front().ok_or(Error::Syntax)?;
  283. let sub_key = args.pop_front().ok_or(Error::Syntax)?;
  284. let value = args.pop_front().ok_or(Error::Syntax)?;
  285. let result = conn.db().get_map(&key, |v| match v {
  286. Some(Value::Hash(h)) => {
  287. let mut h = h.write();
  288. if h.get(&sub_key).is_some() {
  289. Ok(0.into())
  290. } else {
  291. h.insert(sub_key, value);
  292. Ok(1.into())
  293. }
  294. }
  295. None => {
  296. #[allow(clippy::mutable_key_type)]
  297. let mut h = HashMap::new();
  298. h.insert(sub_key, value);
  299. let len = h.len();
  300. conn.db().set(key.clone(), h.into(), None);
  301. Ok(len.into())
  302. }
  303. _ => Err(Error::WrongType),
  304. })?;
  305. if result == Value::Integer(1) {
  306. conn.db().bump_version(&key);
  307. }
  308. Ok(result)
  309. }
  310. /// Returns the string length of the value associated with field in the hash stored at key. If the
  311. /// key or the field do not exist, 0 is returned.
  312. pub async fn hstrlen(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  313. conn.db().get_map(&args[0], |v| match v {
  314. Some(Value::Hash(h)) => Ok(h
  315. .read()
  316. .get(&args[1])
  317. .map(|v| v.len())
  318. .unwrap_or_default()
  319. .into()),
  320. None => Ok(0.into()),
  321. _ => Err(Error::WrongType),
  322. })
  323. }
  324. /// Returns all values in the hash stored at key.
  325. pub async fn hvals(conn: &Connection, args: VecDeque<Bytes>) -> Result<Value, Error> {
  326. conn.db().get_map(&args[0], |v| match v {
  327. Some(Value::Hash(h)) => {
  328. let mut ret = vec![];
  329. for value in h.read().values() {
  330. ret.push(Value::new(value));
  331. }
  332. Ok(ret.into())
  333. }
  334. None => Ok(Value::Array(vec![])),
  335. _ => Err(Error::WrongType),
  336. })
  337. }
  338. #[cfg(test)]
  339. mod test {
  340. use crate::{
  341. cmd::test::{create_connection, invalid_type, run_command},
  342. value::Value,
  343. };
  344. #[tokio::test]
  345. async fn hget() {
  346. let c = create_connection();
  347. let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
  348. assert_eq!(Ok(Value::Integer(3)), r);
  349. let r = run_command(&c, &["hget", "foo", "f1"]).await;
  350. assert_eq!(Ok(Value::Blob("1".into())), r);
  351. }
  352. #[tokio::test]
  353. async fn hgetall() {
  354. let c = create_connection();
  355. let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
  356. assert_eq!(Ok(Value::Integer(3)), r);
  357. let r = run_command(&c, &["hgetall", "foo"]).await;
  358. match r {
  359. Ok(Value::Array(x)) => {
  360. assert_eq!(6, x.len());
  361. assert!(
  362. x[0] == Value::Blob("f1".into())
  363. || x[0] == Value::Blob("f2".into())
  364. || x[0] == Value::Blob("f3".into())
  365. )
  366. }
  367. _ => unreachable!(),
  368. };
  369. }
  370. #[tokio::test]
  371. async fn hrandfield() {
  372. let c = create_connection();
  373. let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
  374. assert_eq!(Ok(Value::Integer(3)), r);
  375. let r = run_command(&c, &["hrandfield", "foo"]).await;
  376. match r {
  377. Ok(Value::Blob(x)) => {
  378. let x = String::from_utf8_lossy(&x);
  379. assert!(x == *"f1" || x == *"f2" || x == *"f3");
  380. }
  381. _ => unreachable!(),
  382. };
  383. }
  384. #[tokio::test]
  385. async fn hmget() {
  386. let c = create_connection();
  387. let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
  388. assert_eq!(Ok(Value::Integer(3)), r);
  389. let r = run_command(&c, &["hmget", "foo", "f1", "f2"]).await;
  390. assert_eq!(
  391. Ok(Value::Array(vec![
  392. Value::Blob("1".into()),
  393. Value::Blob("2".into()),
  394. ])),
  395. r
  396. );
  397. }
  398. #[tokio::test]
  399. async fn hexists() {
  400. let c = create_connection();
  401. let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
  402. assert_eq!(Ok(Value::Integer(3)), r);
  403. assert_eq!(
  404. Ok(Value::Integer(1)),
  405. run_command(&c, &["hexists", "foo", "f1"]).await
  406. );
  407. assert_eq!(
  408. Ok(Value::Integer(1)),
  409. run_command(&c, &["hexists", "foo", "f3"]).await
  410. );
  411. assert_eq!(
  412. Ok(Value::Integer(0)),
  413. run_command(&c, &["hexists", "foo", "f4"]).await
  414. );
  415. }
  416. #[tokio::test]
  417. async fn hstrlen() {
  418. let c = create_connection();
  419. let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
  420. assert_eq!(Ok(Value::Integer(3)), r);
  421. let r = run_command(&c, &["hstrlen", "foo", "f1"]).await;
  422. assert_eq!(Ok(Value::Integer(1)), r);
  423. }
  424. #[tokio::test]
  425. async fn hlen() {
  426. let c = create_connection();
  427. let r = run_command(&c, &["hset", "foo", "f1", "1", "f2", "2", "f3", "3"]).await;
  428. assert_eq!(Ok(Value::Integer(3)), r);
  429. let r = run_command(&c, &["hset", "foo", "f1", "2", "f4", "2", "f5", "3"]).await;
  430. assert_eq!(Ok(Value::Integer(2)), r);
  431. let r = run_command(&c, &["hlen", "foo"]).await;
  432. assert_eq!(Ok(Value::Integer(5)), r);
  433. }
  434. #[tokio::test]
  435. async fn hkeys() {
  436. let c = create_connection();
  437. let r = run_command(&c, &["hset", "foo", "f1", "1"]).await;
  438. assert_eq!(Ok(Value::Integer(1)), r);
  439. let r = run_command(&c, &["hkeys", "foo"]).await;
  440. assert_eq!(Ok(Value::Array(vec![Value::Blob("f1".into()),])), r);
  441. }
  442. #[tokio::test]
  443. async fn hvals() {
  444. let c = create_connection();
  445. let r = run_command(&c, &["hset", "foo", "f1", "1"]).await;
  446. assert_eq!(Ok(Value::Integer(1)), r);
  447. let r = run_command(&c, &["hvals", "foo"]).await;
  448. assert_eq!(Ok(Value::Array(vec![Value::Blob("1".into()),])), r);
  449. }
  450. #[tokio::test]
  451. async fn hdel_remove_empty_hash() {
  452. let c = create_connection();
  453. assert_eq!(
  454. Ok(Value::Integer(2)),
  455. run_command(&c, &["hset", "foo", "f1", "1", "f2", "1"]).await
  456. );
  457. assert_eq!(Ok(1.into()), run_command(&c, &["hdel", "foo", "f1",]).await);
  458. assert_eq!(
  459. Ok(Value::Integer(-1)),
  460. run_command(&c, &["ttl", "foo"]).await
  461. );
  462. assert_eq!(Ok(1.into()), run_command(&c, &["hdel", "foo", "f2",]).await);
  463. assert_eq!(
  464. Ok(Value::Integer(-2)),
  465. run_command(&c, &["ttl", "foo"]).await
  466. );
  467. }
  468. #[tokio::test]
  469. async fn hincrby() {
  470. let c = create_connection();
  471. assert_eq!(
  472. Ok(Value::Integer(1)),
  473. run_command(&c, &["hincrby", "foo", "f1", "1"]).await
  474. );
  475. assert_eq!(
  476. Ok(Value::Integer(-9)),
  477. run_command(&c, &["hincrby", "foo", "f1", "-10"]).await
  478. );
  479. assert_eq!(
  480. Ok(Value::Blob("-9".into())),
  481. run_command(&c, &["hget", "foo", "f1"]).await
  482. );
  483. }
  484. #[tokio::test]
  485. async fn hsetnx() {
  486. let c = create_connection();
  487. assert_eq!(
  488. Ok(Value::Integer(1)),
  489. run_command(&c, &["hsetnx", "foo", "xxx", "1"]).await
  490. );
  491. assert_eq!(
  492. Ok(Value::Integer(0)),
  493. run_command(&c, &["hsetnx", "foo", "xxx", "1"]).await
  494. );
  495. assert_eq!(
  496. Ok(Value::Integer(1)),
  497. run_command(&c, &["hsetnx", "foo", "bar", "1"]).await
  498. );
  499. assert_eq!(
  500. Ok(Value::Integer(2)),
  501. run_command(&c, &["hlen", "foo"]).await
  502. );
  503. }
  504. #[tokio::test]
  505. async fn hlen_non_existing() {
  506. let c = create_connection();
  507. assert_eq!(
  508. Ok(Value::Integer(0)),
  509. run_command(&c, &["hlen", "foo"]).await
  510. );
  511. }
  512. #[tokio::test]
  513. async fn invalid_types() {
  514. invalid_type(&["hdel", "key", "bar", "1"]).await;
  515. invalid_type(&["hexists", "key", "bar"]).await;
  516. invalid_type(&["hget", "key", "bar"]).await;
  517. invalid_type(&["hgetall", "key"]).await;
  518. invalid_type(&["hincrby", "key", "bar", "1"]).await;
  519. invalid_type(&["hincrbyfloat", "key", "bar", "1"]).await;
  520. invalid_type(&["hkeys", "key"]).await;
  521. invalid_type(&["hlen", "key"]).await;
  522. invalid_type(&["hstrlen", "key", "foo"]).await;
  523. invalid_type(&["hmget", "key", "1", "2"]).await;
  524. invalid_type(&["hrandfield", "key"]).await;
  525. invalid_type(&["hset", "key", "bar", "1"]).await;
  526. invalid_type(&["hsetnx", "key", "bar", "1"]).await;
  527. invalid_type(&["hvals", "key"]).await;
  528. }
  529. }