pubsub.rs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. use crate::{check_arg, connection::Connection, error::Error, value::Value};
  2. use bytes::Bytes;
  3. use glob::Pattern;
  4. pub async fn publish(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  5. Ok(conn.pubsub().publish(&args[1], &args[2]).await.into())
  6. }
  7. pub async fn pubsub(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  8. match String::from_utf8_lossy(&args[1]).to_lowercase().as_str() {
  9. "channels" => Ok(Value::Array(
  10. conn.pubsub()
  11. .channels()
  12. .iter()
  13. .map(|v| Value::Blob(v.clone()))
  14. .collect(),
  15. )),
  16. "help" => Ok(Value::Array(vec![
  17. Value::String("PUBSUB <subcommand> arg arg ... arg. Subcommands are:".to_owned()),
  18. Value::String("CHANNELS [<pattern>] -- Return the currently active channels matching a pattern (default: all).".to_owned()),
  19. Value::String("NUMPAT -- Return number of subscriptions to patterns.".to_owned()),
  20. Value::String("NUMSUB [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).".to_owned()),
  21. ])),
  22. "numpat" => Ok(conn.pubsub().get_number_of_psubscribers().into()),
  23. "numsub" => Ok(conn
  24. .pubsub()
  25. .get_number_of_subscribers(&args[2..])
  26. .iter()
  27. .map(|(channel, subs)| vec![Value::Blob(channel.clone()), (*subs).into()])
  28. .flatten()
  29. .collect::<Vec<Value>>()
  30. .into()),
  31. cmd => Ok(Value::Err(
  32. "ERR".to_owned(),
  33. format!(
  34. "Unknown subcommand or wrong number of arguments for '{}'. Try PUBSUB HELP.",
  35. cmd
  36. ),
  37. )),
  38. }
  39. }
  40. pub async fn subscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  41. let pubsub = conn.pubsub();
  42. let channels = &args[1..];
  43. if check_arg!(args, 0, "PSUBSCRIBE") {
  44. pubsub.psubscribe(channels, conn)?;
  45. } else {
  46. pubsub.subscribe(channels, conn);
  47. }
  48. conn.start_pubsub()
  49. }
  50. pub async fn punsubscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  51. let channels = if args.len() == 1 {
  52. conn.pubsub_client().psubscriptions()
  53. } else {
  54. (&args[1..])
  55. .iter()
  56. .map(|channel| {
  57. let channel = String::from_utf8_lossy(channel);
  58. Pattern::new(&channel).map_err(|_| Error::InvalidPattern(channel.to_string()))
  59. })
  60. .collect::<Result<Vec<Pattern>, Error>>()?
  61. };
  62. Ok(conn.pubsub_client().punsubscribe(&channels, conn).into())
  63. }
  64. pub async fn unsubscribe(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
  65. let channels = if args.len() == 1 {
  66. conn.pubsub_client().subscriptions()
  67. } else {
  68. (&args[1..]).to_vec()
  69. };
  70. Ok(conn.pubsub_client().unsubscribe(&channels, conn).into())
  71. }
  72. #[cfg(test)]
  73. mod test {
  74. use crate::{
  75. cmd::test::{
  76. create_connection_and_pubsub, create_new_connection_from_connection, run_command,
  77. },
  78. value::Value,
  79. };
  80. use tokio::sync::mpsc::UnboundedReceiver;
  81. async fn test_subscription_confirmation_and_first_message(
  82. msg: &str,
  83. channel: &str,
  84. recv: &mut UnboundedReceiver<Value>,
  85. ) {
  86. assert_eq!(
  87. Some(Value::Array(vec![
  88. "subscribe".into(),
  89. channel.into(),
  90. 1.into()
  91. ])),
  92. recv.recv().await
  93. );
  94. assert_eq!(
  95. Some(Value::Array(vec![
  96. Value::Blob("message".into()),
  97. channel.into(),
  98. msg.into()
  99. ])),
  100. recv.recv().await
  101. );
  102. }
  103. #[tokio::test]
  104. async fn test_subscribe_multiple_channels() {
  105. let (mut recv, c1) = create_connection_and_pubsub();
  106. assert_eq!(
  107. Ok(Value::Ok),
  108. run_command(&c1, &["subscribe", "foo", "bar"]).await
  109. );
  110. assert_eq!(
  111. Some(Value::Array(vec![
  112. "subscribe".into(),
  113. "foo".into(),
  114. 1.into()
  115. ])),
  116. recv.recv().await
  117. );
  118. assert_eq!(
  119. Some(Value::Array(vec![
  120. "subscribe".into(),
  121. "bar".into(),
  122. 2.into()
  123. ])),
  124. recv.recv().await
  125. );
  126. }
  127. #[tokio::test]
  128. async fn test_subscribe_multiple_channels_one_by_one() {
  129. let (mut recv, c1) = create_connection_and_pubsub();
  130. assert_eq!(Ok(Value::Ok), run_command(&c1, &["subscribe", "foo"]).await);
  131. assert_eq!(Ok(Value::Ok), run_command(&c1, &["subscribe", "bar"]).await);
  132. assert_eq!(
  133. Some(Value::Array(vec![
  134. "subscribe".into(),
  135. "foo".into(),
  136. 1.into()
  137. ])),
  138. recv.recv().await
  139. );
  140. assert_eq!(
  141. Some(Value::Array(vec![
  142. "subscribe".into(),
  143. "bar".into(),
  144. 2.into()
  145. ])),
  146. recv.recv().await
  147. );
  148. }
  149. #[tokio::test]
  150. async fn test_unsubscribe_with_args() {
  151. let (mut recv, c1) = create_connection_and_pubsub();
  152. assert_eq!(
  153. Ok(Value::Ok),
  154. run_command(&c1, &["subscribe", "foo", "bar"]).await
  155. );
  156. assert_eq!(
  157. Ok(Value::Integer(2)),
  158. run_command(&c1, &["unsubscribe", "foo", "bar"]).await
  159. );
  160. assert_eq!(
  161. Some(Value::Array(vec![
  162. "subscribe".into(),
  163. "foo".into(),
  164. 1.into()
  165. ])),
  166. recv.recv().await
  167. );
  168. assert_eq!(
  169. Some(Value::Array(vec![
  170. "subscribe".into(),
  171. "bar".into(),
  172. 2.into()
  173. ])),
  174. recv.recv().await
  175. );
  176. assert_eq!(
  177. Some(Value::Array(vec![
  178. "unsubscribe".into(),
  179. "foo".into(),
  180. 1.into()
  181. ])),
  182. recv.recv().await
  183. );
  184. assert_eq!(
  185. Some(Value::Array(vec![
  186. "unsubscribe".into(),
  187. "bar".into(),
  188. 1.into()
  189. ])),
  190. recv.recv().await
  191. );
  192. }
  193. #[tokio::test]
  194. async fn pubsub_publish() {
  195. let (mut sub1, c1) = create_connection_and_pubsub();
  196. let (mut sub2, c2) = create_new_connection_from_connection(&c1);
  197. let (_, c3) = create_new_connection_from_connection(&c1);
  198. assert_eq!(Ok(Value::Ok), run_command(&c1, &["subscribe", "foo"]).await);
  199. assert_eq!(Ok(Value::Ok), run_command(&c2, &["subscribe", "foo"]).await);
  200. let msg = "foo - message";
  201. let _ = run_command(&c3, &["publish", "foo", msg]).await;
  202. test_subscription_confirmation_and_first_message(msg, "foo", &mut sub1).await;
  203. test_subscription_confirmation_and_first_message(msg, "foo", &mut sub2).await;
  204. }
  205. #[tokio::test]
  206. async fn pubsub_numpat() {
  207. let (_, c1) = create_connection_and_pubsub();
  208. let (_, c2) = create_new_connection_from_connection(&c1);
  209. assert_eq!(
  210. Ok(Value::Integer(0)),
  211. run_command(&c1, &["pubsub", "numpat"]).await
  212. );
  213. let _ = run_command(&c2, &["psubscribe", "foo", "bar*", "xxx*"]).await;
  214. assert_eq!(
  215. Ok(Value::Integer(1)),
  216. run_command(&c1, &["pubsub", "numpat"]).await
  217. );
  218. }
  219. }