lib.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. //! Subscription manager
  2. //!
  3. //! This crate provides a subscription manager or matching engine for
  4. //! subscriptions and events.
  5. //!
  6. //! This crate provides a generic efficient way of keeping track of
  7. //! subscriptions and check an event to get their listeners
  8. //!
  9. //! Each subscription has a droppable struct that will remove the subscription
  10. //! on Drop.
  11. //!
  12. //! Any delivery mechanism or any other form of communication is not part of
  13. //! this crate
  14. #![deny(missing_docs, warnings)]
  15. use nostr_rs_types::types::{Event, Filter};
  16. use std::{
  17. collections::{BTreeMap, HashSet},
  18. fmt::Debug,
  19. hash::Hash,
  20. ops::{Deref, DerefMut},
  21. sync::{atomic::AtomicUsize, Arc},
  22. };
  23. use tokio::sync::{RwLock, RwLockWriteGuard};
  24. mod filter;
  25. mod index;
  26. pub use self::{
  27. filter::SortedFilter,
  28. index::{CompoundIndex, Index},
  29. };
  30. /// Subscription value
  31. pub struct Subscription<T>
  32. where
  33. T: Sync + Send,
  34. {
  35. /// inner object
  36. inner: T,
  37. /// Reverse index
  38. ///
  39. /// This is a reverse index of the filters, it is only used to update the
  40. /// main shared index when this subscription is dropped.
  41. reverse_index: Vec<Vec<Index>>,
  42. }
  43. impl<T> Deref for Subscription<T>
  44. where
  45. T: Sync + Send,
  46. {
  47. type Target = T;
  48. fn deref(&self) -> &Self::Target {
  49. &self.inner
  50. }
  51. }
  52. impl<T> DerefMut for Subscription<T>
  53. where
  54. T: Sync + Send,
  55. {
  56. fn deref_mut(&mut self) -> &mut Self::Target {
  57. &mut self.inner
  58. }
  59. }
  60. /// Active subscription
  61. ///
  62. /// This is a droppable struct that will remove the subscription from the
  63. /// manager on Drop.
  64. ///
  65. /// The callee must keep this struct alive to keep the subscription alive.
  66. pub struct ActiveSubscription<I, T>
  67. where
  68. I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
  69. T: Sync + Send + 'static,
  70. {
  71. /// Subscription ID
  72. pub id: I,
  73. manager: Option<Arc<SubscriptionManager<I, T>>>,
  74. }
  75. impl<I, T> Drop for ActiveSubscription<I, T>
  76. where
  77. I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
  78. T: Sync + Send + 'static,
  79. {
  80. fn drop(&mut self) {
  81. if let Some(manager) = self.manager.take() {
  82. manager.unsubscribe(self);
  83. }
  84. }
  85. }
  86. /// Subscription manager
  87. ///
  88. /// This is the main struct that keeps track of all the subscriptions
  89. ///
  90. /// The generic type `I` is the type of the subscription ID (which is outside of
  91. /// the scope of this crate) and the T which is space to keep aditional data
  92. /// associate with a subscription
  93. #[derive(Default)]
  94. pub struct SubscriptionManager<I, T>
  95. where
  96. I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
  97. T: Sync + Send + 'static,
  98. {
  99. subscriptions: RwLock<BTreeMap<I, Subscription<T>>>,
  100. index: RwLock<BTreeMap<(Index, I), SortedFilter>>,
  101. total_subscribers: AtomicUsize,
  102. }
  103. impl<I, T> SubscriptionManager<I, T>
  104. where
  105. I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
  106. T: Sync + Send + 'static,
  107. {
  108. fn unsubscribe(self: Arc<Self>, subscription: &mut ActiveSubscription<I, T>) {
  109. let id_to_remove = subscription.id.clone();
  110. self.total_subscribers
  111. .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
  112. tokio::spawn(async move {
  113. let mut subscriptions = self.subscriptions.write().await;
  114. let mut indexes = self.index.write().await;
  115. if let Some(subscription) = subscriptions.remove(&id_to_remove) {
  116. for single_indexes in subscription.reverse_index.iter() {
  117. for index in single_indexes.iter() {
  118. indexes.remove(&(index.clone(), id_to_remove.clone()));
  119. }
  120. }
  121. }
  122. });
  123. }
  124. /// Get the subscriptions list as a mutable reference
  125. pub async fn subscriptions_mut(&self) -> RwLockWriteGuard<'_, BTreeMap<I, Subscription<T>>> {
  126. self.subscriptions.write().await
  127. }
  128. /// Get active listeners for this event
  129. pub async fn get_subscribers(self: &Arc<Self>, event: &Event) -> Vec<I> {
  130. let indexes = self.index.read().await;
  131. let event_index = Index::from(event);
  132. let mut matched = HashSet::new();
  133. for idx in event_index {
  134. let start_index = (idx.clone(), I::default());
  135. for ((current_idx, subscription_id), filter) in indexes.range(&start_index..) {
  136. if current_idx != &idx {
  137. break;
  138. }
  139. if !matched.contains(subscription_id) && filter.check_event(event) {
  140. matched.insert(subscription_id.clone());
  141. }
  142. }
  143. }
  144. matched.into_iter().collect()
  145. }
  146. /// Returns the total number of subscribers
  147. pub fn total_subscribers(&self) -> usize {
  148. self.total_subscribers
  149. .load(std::sync::atomic::Ordering::Relaxed)
  150. }
  151. /// Creates a subscription and returns an active subscription struct
  152. ///
  153. /// The return object must be kept alive to keep the subscription alive
  154. pub async fn subscribe(
  155. self: &Arc<Self>,
  156. id: I,
  157. mut filters: Vec<Filter>,
  158. inner: T,
  159. ) -> ActiveSubscription<I, T> {
  160. let mut subscriptions = self.subscriptions.write().await;
  161. let mut indexes = self.index.write().await;
  162. let reverse_index: Vec<_> = filters
  163. .iter_mut()
  164. .map(|f| {
  165. let event_index = <&mut Filter as Into<CompoundIndex>>::into(f).split();
  166. (f.clone(), event_index)
  167. })
  168. .collect();
  169. for (filter, single_indexes) in reverse_index.iter() {
  170. for index in single_indexes.iter() {
  171. indexes.insert((index.clone(), id.clone()), filter.clone().into());
  172. }
  173. }
  174. self.total_subscribers
  175. .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  176. subscriptions.insert(
  177. id.clone(),
  178. Subscription {
  179. reverse_index: reverse_index.into_iter().map(|(_, index)| index).collect(),
  180. inner,
  181. },
  182. );
  183. ActiveSubscription {
  184. id,
  185. manager: Some(self.clone()),
  186. }
  187. }
  188. }