subscriber.rs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. //! Active subscription
  2. use std::fmt::Debug;
  3. use std::sync::atomic::AtomicUsize;
  4. use std::sync::{Arc, Mutex};
  5. use tokio::sync::mpsc;
  6. use super::pubsub::{SubReceiver, TopicTree};
  7. use super::{Error, Spec};
  8. /// Subscription request
  9. pub trait SubscriptionRequest {
  10. /// Topics
  11. type Topic;
  12. /// Subscription Id
  13. type SubscriptionId;
  14. /// Try to get topics from the request
  15. fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error>;
  16. /// Get the subscription name
  17. fn subscription_name(&self) -> Arc<Self::SubscriptionId>;
  18. }
  19. /// Active Subscription
  20. pub struct ActiveSubscription<S>
  21. where
  22. S: Spec + 'static,
  23. {
  24. id: usize,
  25. name: Arc<S::SubscriptionId>,
  26. active_subscribers: Arc<AtomicUsize>,
  27. topics: TopicTree<S>,
  28. subscribed_to: Vec<S::Topic>,
  29. receiver: Option<SubReceiver<S>>,
  30. }
  31. impl<S> ActiveSubscription<S>
  32. where
  33. S: Spec + 'static,
  34. {
  35. /// Creates a new instance
  36. pub fn new(
  37. id: usize,
  38. name: Arc<S::SubscriptionId>,
  39. active_subscribers: Arc<AtomicUsize>,
  40. topics: TopicTree<S>,
  41. subscribed_to: Vec<S::Topic>,
  42. receiver: Option<SubReceiver<S>>,
  43. ) -> Self {
  44. Self {
  45. id,
  46. name,
  47. active_subscribers,
  48. subscribed_to,
  49. topics,
  50. receiver,
  51. }
  52. }
  53. /// Receives the next event
  54. pub async fn recv(&mut self) -> Option<S::Event> {
  55. self.receiver.as_mut()?.recv().await.map(|(_, event)| event)
  56. }
  57. /// Try receive an event or return Noen right away
  58. pub fn try_recv(&mut self) -> Option<S::Event> {
  59. self.receiver
  60. .as_mut()?
  61. .try_recv()
  62. .ok()
  63. .map(|(_, event)| event)
  64. }
  65. /// Get the subscription name
  66. pub fn name(&self) -> &S::SubscriptionId {
  67. &self.name
  68. }
  69. }
  70. impl<S> Drop for ActiveSubscription<S>
  71. where
  72. S: Spec + 'static,
  73. {
  74. fn drop(&mut self) {
  75. // remove the listener
  76. let mut topics = self.topics.write();
  77. for index in self.subscribed_to.drain(..) {
  78. topics.remove(&(index, self.id));
  79. }
  80. // decrement the number of active subscribers
  81. self.active_subscribers
  82. .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
  83. }
  84. }
  85. /// Lightweight sink used by producers to send events to subscribers.
  86. ///
  87. /// You usually do not construct a `Subscriber` directly — it is provided to you in
  88. /// the [`Spec::fetch_events`] callback so you can backfill a new subscription.
  89. #[derive(Debug)]
  90. pub struct Subscriber<S>
  91. where
  92. S: Spec + 'static,
  93. {
  94. subscription: Arc<S::SubscriptionId>,
  95. inner: mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
  96. latest: Arc<Mutex<Option<S::Event>>>,
  97. }
  98. impl<S> Clone for Subscriber<S>
  99. where
  100. S: Spec + 'static,
  101. {
  102. fn clone(&self) -> Self {
  103. Self {
  104. subscription: self.subscription.clone(),
  105. inner: self.inner.clone(),
  106. latest: self.latest.clone(),
  107. }
  108. }
  109. }
  110. impl<S> Subscriber<S>
  111. where
  112. S: Spec + 'static,
  113. {
  114. /// Create a new instance
  115. pub fn new(
  116. subscription: Arc<S::SubscriptionId>,
  117. inner: &mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
  118. ) -> Self {
  119. Self {
  120. inner: inner.clone(),
  121. subscription,
  122. latest: Arc::new(Mutex::new(None)),
  123. }
  124. }
  125. /// Send a message
  126. pub fn send(&self, event: S::Event) {
  127. let mut latest = if let Ok(reader) = self.latest.lock() {
  128. reader
  129. } else {
  130. let _ = self.inner.try_send((self.subscription.to_owned(), event));
  131. return;
  132. };
  133. if let Some(last_event) = latest.replace(event.clone()) {
  134. if last_event == event {
  135. return;
  136. }
  137. }
  138. let _ = self.inner.try_send((self.subscription.to_owned(), event));
  139. }
  140. }