123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- //! Subscription manager
- //!
- //! This crate provides a subscription manager or matching engine for
- //! subscriptions and events.
- //!
- //! This crate provides a generic efficient way of keeping track of
- //! subscriptions and check an event to get their listeners
- //!
- //! Each subscription has a droppable struct that will remove the subscription
- //! on Drop.
- //!
- //! Any delivery mechanism or any other form of communication is not part of
- //! this crate
- #![deny(missing_docs, warnings)]
- use nostr_rs_types::types::{Event, Filter};
- use std::{
- collections::{BTreeMap, HashSet},
- fmt::Debug,
- hash::Hash,
- ops::{Deref, DerefMut},
- sync::{atomic::AtomicUsize, Arc},
- };
- use tokio::sync::{RwLock, RwLockWriteGuard};
- mod filter;
- mod index;
- pub use self::{
- filter::SortedFilter,
- index::{CompoundIndex, Index},
- };
- /// Subscription value
- pub struct Subscription<T>
- where
- T: Sync + Send,
- {
- /// inner object
- inner: T,
- /// Reverse index
- ///
- /// This is a reverse index of the filters, it is only used to update the
- /// main shared index when this subscription is dropped.
- reverse_index: Vec<Vec<Index>>,
- }
- impl<T> Deref for Subscription<T>
- where
- T: Sync + Send,
- {
- type Target = T;
- fn deref(&self) -> &Self::Target {
- &self.inner
- }
- }
- impl<T> DerefMut for Subscription<T>
- where
- T: Sync + Send,
- {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.inner
- }
- }
- /// Active subscription
- ///
- /// This is a droppable struct that will remove the subscription from the
- /// manager on Drop.
- ///
- /// The callee must keep this struct alive to keep the subscription alive.
- pub struct ActiveSubscription<I, T>
- where
- I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
- T: Sync + Send + 'static,
- {
- /// Subscription ID
- pub id: I,
- manager: Option<Arc<SubscriptionManager<I, T>>>,
- }
- impl<I, T> Drop for ActiveSubscription<I, T>
- where
- I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
- T: Sync + Send + 'static,
- {
- fn drop(&mut self) {
- if let Some(manager) = self.manager.take() {
- manager.unsubscribe(self);
- }
- }
- }
- /// Subscription manager
- ///
- /// This is the main struct that keeps track of all the subscriptions
- ///
- /// The generic type `I` is the type of the subscription ID (which is outside of
- /// the scope of this crate) and the T which is space to keep aditional data
- /// associate with a subscription
- #[derive(Default)]
- pub struct SubscriptionManager<I, T>
- where
- I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
- T: Sync + Send + 'static,
- {
- subscriptions: RwLock<BTreeMap<I, Subscription<T>>>,
- index: RwLock<BTreeMap<(Index, I), SortedFilter>>,
- total_subscribers: AtomicUsize,
- }
- impl<I, T> SubscriptionManager<I, T>
- where
- I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
- T: Sync + Send + 'static,
- {
- fn unsubscribe(self: Arc<Self>, subscription: &mut ActiveSubscription<I, T>) {
- let id_to_remove = subscription.id.clone();
- self.total_subscribers
- .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
- tokio::spawn(async move {
- let mut subscriptions = self.subscriptions.write().await;
- let mut indexes = self.index.write().await;
- if let Some(subscription) = subscriptions.remove(&id_to_remove) {
- for single_indexes in subscription.reverse_index.iter() {
- for index in single_indexes.iter() {
- indexes.remove(&(index.clone(), id_to_remove.clone()));
- }
- }
- }
- });
- }
- /// Get the subscriptions list as a mutable reference
- pub async fn subscriptions_mut(&self) -> RwLockWriteGuard<'_, BTreeMap<I, Subscription<T>>> {
- self.subscriptions.write().await
- }
- /// Get active listeners for this event
- pub async fn get_subscribers(self: &Arc<Self>, event: &Event) -> Vec<I> {
- let indexes = self.index.read().await;
- let event_index = Index::from(event);
- let mut matched = HashSet::new();
- for idx in event_index {
- let start_index = (idx.clone(), I::default());
- for ((current_idx, subscription_id), filter) in indexes.range(&start_index..) {
- if current_idx != &idx {
- break;
- }
- if !matched.contains(subscription_id) && filter.check_event(event) {
- matched.insert(subscription_id.clone());
- }
- }
- }
- matched.into_iter().collect()
- }
- /// Returns the total number of subscribers
- pub fn total_subscribers(&self) -> usize {
- self.total_subscribers
- .load(std::sync::atomic::Ordering::Relaxed)
- }
- /// Creates a subscription and returns an active subscription struct
- ///
- /// The return object must be kept alive to keep the subscription alive
- pub async fn subscribe(
- self: &Arc<Self>,
- id: I,
- mut filters: Vec<Filter>,
- inner: T,
- ) -> ActiveSubscription<I, T> {
- let mut subscriptions = self.subscriptions.write().await;
- let mut indexes = self.index.write().await;
- let reverse_index: Vec<_> = filters
- .iter_mut()
- .map(|f| {
- let event_index = <&mut Filter as Into<CompoundIndex>>::into(f).split();
- (f.clone(), event_index)
- })
- .collect();
- for (filter, single_indexes) in reverse_index.iter() {
- for index in single_indexes.iter() {
- indexes.insert((index.clone(), id.clone()), filter.clone().into());
- }
- }
- self.total_subscribers
- .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
- subscriptions.insert(
- id.clone(),
- Subscription {
- reverse_index: reverse_index.into_iter().map(|(_, index)| index).collect(),
- inner,
- },
- );
- ActiveSubscription {
- id,
- manager: Some(self.clone()),
- }
- }
- }
|