pubsub_connection.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. //! # Pubsub client
  2. //!
  3. //! Each connection has a pubsub client which is created, even on normal connection mode.
  4. use super::Connection;
  5. use crate::value::Value;
  6. use bytes::Bytes;
  7. use glob::Pattern;
  8. use parking_lot::RwLock;
  9. use std::collections::HashMap;
  10. use tokio::sync::mpsc;
  11. /// Pubsubclient
  12. #[derive(Debug)]
  13. pub struct PubsubClient {
  14. meta: RwLock<MetaData>,
  15. sender: mpsc::UnboundedSender<Value>,
  16. }
  17. /// Metadata associated with a pubsub client
  18. #[derive(Debug)]
  19. struct MetaData {
  20. subscriptions: HashMap<Bytes, bool>,
  21. psubscriptions: HashMap<Pattern, bool>,
  22. is_psubcribed: bool,
  23. id: usize,
  24. }
  25. impl PubsubClient {
  26. /// Creates a new pubsub client instance
  27. pub fn new(sender: mpsc::UnboundedSender<Value>) -> Self {
  28. Self {
  29. meta: RwLock::new(MetaData {
  30. subscriptions: HashMap::new(),
  31. psubscriptions: HashMap::new(),
  32. is_psubcribed: false,
  33. id: 0,
  34. }),
  35. sender,
  36. }
  37. }
  38. /// Unsubscribe from pattern subscriptions
  39. pub fn punsubscribe(&self, channels: &[Pattern], conn: &Connection) -> u32 {
  40. let mut meta = self.meta.write();
  41. channels
  42. .iter()
  43. .map(|channel| meta.psubscriptions.remove(channel))
  44. .for_each(drop);
  45. if meta.psubscriptions.len() + meta.subscriptions.len() == 0 {
  46. drop(meta);
  47. conn.reset();
  48. }
  49. conn.pubsub().punsubscribe(channels, conn)
  50. }
  51. /// Unsubscribe from channels
  52. pub fn unsubscribe(&self, channels: &[Bytes], conn: &Connection) -> u32 {
  53. let mut meta = self.meta.write();
  54. channels
  55. .iter()
  56. .map(|channel| meta.subscriptions.remove(channel))
  57. .for_each(drop);
  58. if meta.psubscriptions.len() + meta.subscriptions.len() == 0 {
  59. drop(meta);
  60. conn.reset();
  61. }
  62. conn.pubsub().unsubscribe(channels, conn)
  63. }
  64. /// Return list of subscriptions for this connection
  65. pub fn subscriptions(&self) -> Vec<Bytes> {
  66. self.meta
  67. .read()
  68. .subscriptions
  69. .keys()
  70. .cloned()
  71. .collect::<Vec<Bytes>>()
  72. }
  73. /// Return list of pattern subscriptions
  74. pub fn psubscriptions(&self) -> Vec<Pattern> {
  75. self.meta
  76. .read()
  77. .psubscriptions
  78. .keys()
  79. .cloned()
  80. .collect::<Vec<Pattern>>()
  81. }
  82. /// Creates a new subscription and returns the ID for this new subscription.
  83. pub fn new_subscription(&self, channel: &Bytes) -> usize {
  84. let mut meta = self.meta.write();
  85. meta.subscriptions.insert(channel.clone(), true);
  86. meta.id += 1;
  87. meta.id
  88. }
  89. /// Creates a new pattern subscription and returns the ID for this new subscription.
  90. pub fn new_psubscription(&self, channel: &Pattern) -> usize {
  91. let mut meta = self.meta.write();
  92. meta.psubscriptions.insert(channel.clone(), true);
  93. meta.id += 1;
  94. meta.id
  95. }
  96. /// Does this connection has a pattern subscription?
  97. pub fn is_psubcribed(&self) -> bool {
  98. self.meta.read().is_psubcribed
  99. }
  100. /// Keeps a record about this connection using pattern suscription
  101. pub fn make_psubcribed(&self) {
  102. self.meta.write().is_psubcribed = true;
  103. }
  104. /// Returns a copy of the pubsub sender. This sender object can be used to send messages (from
  105. /// other connections) to this connection.
  106. pub fn sender(&self) -> mpsc::UnboundedSender<Value> {
  107. self.sender.clone()
  108. }
  109. }