pubsub.rs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. //! Pub-sub producer
  2. use std::cmp::Ordering;
  3. use std::collections::{BTreeMap, HashSet};
  4. use std::sync::atomic::AtomicUsize;
  5. use std::sync::Arc;
  6. use parking_lot::RwLock;
  7. use tokio::sync::mpsc;
  8. use super::subscriber::{ActiveSubscription, SubscriptionRequest};
  9. use super::{Error, Event, Spec, Subscriber};
  10. /// Default channel size for subscription buffering
  11. pub const DEFAULT_CHANNEL_SIZE: usize = 10_000;
  12. /// Subscriber Receiver
  13. pub type SubReceiver<S> = mpsc::Receiver<(Arc<<S as Spec>::SubscriptionId>, <S as Spec>::Event)>;
  14. /// Internal Index Tree
  15. pub type TopicTree<T> = Arc<
  16. RwLock<
  17. BTreeMap<
  18. // Index with a subscription unique ID
  19. (<T as Spec>::Topic, usize),
  20. Subscriber<T>,
  21. >,
  22. >,
  23. >;
  24. /// Manager
  25. pub struct Pubsub<S>
  26. where
  27. S: Spec + 'static,
  28. {
  29. inner: Arc<S>,
  30. listeners_topics: TopicTree<S>,
  31. unique_subscription_counter: AtomicUsize,
  32. active_subscribers: Arc<AtomicUsize>,
  33. }
  34. impl<S> Pubsub<S>
  35. where
  36. S: Spec + 'static,
  37. {
  38. /// Create a new instance
  39. pub fn new(inner: Arc<S>) -> Self {
  40. Self {
  41. inner,
  42. listeners_topics: Default::default(),
  43. unique_subscription_counter: 0.into(),
  44. active_subscribers: Arc::new(0.into()),
  45. }
  46. }
  47. /// Total number of active subscribers, it is not the number of active topics being subscribed
  48. pub fn active_subscribers(&self) -> usize {
  49. self.active_subscribers
  50. .load(std::sync::atomic::Ordering::Relaxed)
  51. }
  52. /// Publish an event to all listenrs
  53. #[inline(always)]
  54. fn publish_internal(event: S::Event, listeners_index: &TopicTree<S>) -> Result<(), Error> {
  55. let index_storage = listeners_index.read();
  56. let mut sent = HashSet::new();
  57. for topic in event.get_topics() {
  58. for ((subscription_index, unique_id), sender) in
  59. index_storage.range((topic.clone(), 0)..)
  60. {
  61. if subscription_index.cmp(&topic) != Ordering::Equal {
  62. break;
  63. }
  64. if sent.contains(&unique_id) {
  65. continue;
  66. }
  67. sent.insert(unique_id);
  68. sender.send(event.clone());
  69. }
  70. }
  71. Ok(())
  72. }
  73. /// Broadcast an event to all listeners
  74. #[inline(always)]
  75. pub fn publish<E>(&self, event: E)
  76. where
  77. E: Into<S::Event>,
  78. {
  79. let topics = self.listeners_topics.clone();
  80. let event = event.into();
  81. #[cfg(not(target_arch = "wasm32"))]
  82. tokio::spawn(async move {
  83. let _ = Self::publish_internal(event, &topics);
  84. });
  85. #[cfg(target_arch = "wasm32")]
  86. wasm_bindgen_futures::spawn_local(async move {
  87. let _ = Self::publish_internal(event, &topics);
  88. });
  89. }
  90. /// Broadcast an event to all listeners right away, blocking the current thread
  91. ///
  92. /// This function takes an Arc to the storage struct, the event_id, the kind
  93. /// and the vent to broadcast
  94. #[inline(always)]
  95. pub fn publish_now<E>(&self, event: E) -> Result<(), Error>
  96. where
  97. E: Into<S::Event>,
  98. {
  99. let event = event.into();
  100. Self::publish_internal(event, &self.listeners_topics)
  101. }
  102. /// Subscribe proving custom sender/receiver mpsc
  103. #[inline(always)]
  104. pub fn subscribe_with<I>(
  105. &self,
  106. request: I,
  107. sender: &mpsc::Sender<(Arc<I::SubscriptionId>, S::Event)>,
  108. receiver: Option<SubReceiver<S>>,
  109. ) -> Result<ActiveSubscription<S>, Error>
  110. where
  111. I: SubscriptionRequest<
  112. Topic = <S::Event as Event>::Topic,
  113. SubscriptionId = S::SubscriptionId,
  114. >,
  115. {
  116. let subscription_name = request.subscription_name();
  117. let sender = Subscriber::new(subscription_name.clone(), sender);
  118. let mut index_storage = self.listeners_topics.write();
  119. let subscription_internal_id = self
  120. .unique_subscription_counter
  121. .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  122. self.active_subscribers
  123. .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  124. let subscribed_to = request.try_get_topics()?;
  125. for index in subscribed_to.iter() {
  126. index_storage.insert((index.clone(), subscription_internal_id), sender.clone());
  127. }
  128. drop(index_storage);
  129. let inner = self.inner.clone();
  130. let subscribed_to_for_spawn = subscribed_to.clone();
  131. #[cfg(not(target_arch = "wasm32"))]
  132. tokio::spawn(async move {
  133. // TODO: Ignore topics broadcasted from fetch_events _if_ any real time has been broadcasted already.
  134. inner.fetch_events(subscribed_to_for_spawn, sender).await;
  135. });
  136. #[cfg(target_arch = "wasm32")]
  137. wasm_bindgen_futures::spawn_local(async move {
  138. inner.fetch_events(subscribed_to_for_spawn, sender).await;
  139. });
  140. Ok(ActiveSubscription::new(
  141. subscription_internal_id,
  142. subscription_name,
  143. self.active_subscribers.clone(),
  144. self.listeners_topics.clone(),
  145. subscribed_to,
  146. receiver,
  147. ))
  148. }
  149. /// Subscribe
  150. pub fn subscribe<I>(&self, request: I) -> Result<ActiveSubscription<S>, Error>
  151. where
  152. I: SubscriptionRequest<
  153. Topic = <S::Event as Event>::Topic,
  154. SubscriptionId = S::SubscriptionId,
  155. >,
  156. {
  157. let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
  158. self.subscribe_with(request, &sender, Some(receiver))
  159. }
  160. }