pubsub_connection.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. //! # Pubsub client
  2. //!
  3. //! Each connection has a pubsub client which is created, even on normal connection mode.
  4. use super::Connection;
  5. use crate::value::Value;
  6. use bytes::Bytes;
  7. use glob::Pattern;
  8. use parking_lot::RwLock;
  9. use std::collections::HashMap;
  10. use tokio::sync::mpsc;
  11. /// Pubsubclient
  12. #[derive(Debug)]
  13. pub struct PubsubClient {
  14. meta: RwLock<MetaData>,
  15. sender: mpsc::Sender<Value>,
  16. }
  17. /// Metadata associated with a pubsub client
  18. #[derive(Debug)]
  19. struct MetaData {
  20. subscriptions: HashMap<Bytes, bool>,
  21. psubscriptions: HashMap<Pattern, bool>,
  22. is_psubcribed: bool,
  23. }
  24. impl PubsubClient {
  25. /// Creates a new pubsub client instance
  26. pub fn new(sender: mpsc::Sender<Value>) -> Self {
  27. Self {
  28. meta: RwLock::new(MetaData {
  29. subscriptions: HashMap::new(),
  30. psubscriptions: HashMap::new(),
  31. is_psubcribed: false,
  32. }),
  33. sender,
  34. }
  35. }
  36. /// Unsubscribe from pattern subscriptions
  37. pub fn punsubscribe(&self, channels: &[Pattern], conn: &Connection) {
  38. let mut meta = self.meta.write();
  39. channels
  40. .iter()
  41. .map(|channel| meta.psubscriptions.remove(channel))
  42. .for_each(drop);
  43. drop(meta);
  44. conn.pubsub().punsubscribe(channels, conn);
  45. if self.total_subs() == 0 {
  46. conn.reset();
  47. }
  48. }
  49. /// Unsubscribe from channels
  50. pub fn unsubscribe(&self, channels: &[Bytes], conn: &Connection) {
  51. let mut meta = self.meta.write();
  52. channels
  53. .iter()
  54. .map(|channel| meta.subscriptions.remove(channel))
  55. .for_each(drop);
  56. drop(meta);
  57. conn.pubsub().unsubscribe(channels, conn);
  58. if self.total_subs() == 0 {
  59. conn.reset();
  60. }
  61. }
  62. /// Return list of subscriptions for this connection
  63. pub fn subscriptions(&self) -> Vec<Bytes> {
  64. self.meta
  65. .read()
  66. .subscriptions
  67. .keys()
  68. .cloned()
  69. .collect::<Vec<Bytes>>()
  70. }
  71. /// Return list of pattern subscriptions
  72. pub fn psubscriptions(&self) -> Vec<Pattern> {
  73. self.meta
  74. .read()
  75. .psubscriptions
  76. .keys()
  77. .cloned()
  78. .collect::<Vec<Pattern>>()
  79. }
  80. /// Return total number of subscriptions + psubscription
  81. pub fn total_subs(&self) -> usize {
  82. let meta = self.meta.read();
  83. meta.subscriptions.len() + meta.psubscriptions.len()
  84. }
  85. /// Creates a new subscription
  86. pub fn new_subscription(&self, channel: &Bytes) {
  87. let mut meta = self.meta.write();
  88. meta.subscriptions.insert(channel.clone(), true);
  89. }
  90. /// Creates a new pattern subscription
  91. pub fn new_psubscription(&self, channel: &Pattern) {
  92. let mut meta = self.meta.write();
  93. meta.psubscriptions.insert(channel.clone(), true);
  94. }
  95. /// Does this connection has a pattern subscription?
  96. pub fn is_psubcribed(&self) -> bool {
  97. self.meta.read().is_psubcribed
  98. }
  99. /// Keeps a record about this connection using pattern suscription
  100. pub fn make_psubcribed(&self) {
  101. self.meta.write().is_psubcribed = true;
  102. }
  103. /// Returns a copy of the pubsub sender. This sender object can be used to send messages (from
  104. /// other connections) to this connection.
  105. pub fn sender(&self) -> mpsc::Sender<Value> {
  106. self.sender.clone()
  107. }
  108. /// Sends a message
  109. #[inline]
  110. pub fn send(&self, message: Value) {
  111. let _ = self.sender.try_send(message);
  112. }
  113. }