//! Subscription manager //! //! This crate provides a subscription manager or matching engine for //! subscriptions and events. //! //! This crate provides a generic efficient way of keeping track of //! subscriptions and check an event to get their listeners //! //! Each subscription has a droppable struct that will remove the subscription //! on Drop. //! //! Any delivery mechanism or any other form of communication is not part of //! this crate #![deny(missing_docs, warnings)] use nostr_rs_types::types::{Event, Filter}; use std::{ collections::{BTreeMap, HashSet}, fmt::Debug, hash::Hash, ops::{Deref, DerefMut}, sync::{atomic::AtomicUsize, Arc}, }; use tokio::sync::{RwLock, RwLockWriteGuard}; mod filter; mod index; pub use self::{ filter::SortedFilter, index::{CompoundIndex, Index}, }; /// Subscription value pub struct Subscription where T: Sync + Send, { /// inner object inner: T, /// Reverse index /// /// This is a reverse index of the filters, it is only used to update the /// main shared index when this subscription is dropped. reverse_index: Vec>, } impl Deref for Subscription where T: Sync + Send, { type Target = T; fn deref(&self) -> &Self::Target { &self.inner } } impl DerefMut for Subscription where T: Sync + Send, { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } /// Active subscription /// /// This is a droppable struct that will remove the subscription from the /// manager on Drop. /// /// The callee must keep this struct alive to keep the subscription alive. pub struct ActiveSubscription where I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static, T: Sync + Send + 'static, { /// Subscription ID pub id: I, manager: Option>>, } impl Drop for ActiveSubscription where I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static, T: Sync + Send + 'static, { fn drop(&mut self) { if let Some(manager) = self.manager.take() { manager.unsubscribe(self); } } } /// Subscription manager /// /// This is the main struct that keeps track of all the subscriptions /// /// The generic type `I` is the type of the subscription ID (which is outside of /// the scope of this crate) and the T which is space to keep aditional data /// associate with a subscription #[derive(Default)] pub struct SubscriptionManager where I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static, T: Sync + Send + 'static, { subscriptions: RwLock>>, index: RwLock>, total_subscribers: AtomicUsize, } impl SubscriptionManager where I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static, T: Sync + Send + 'static, { fn unsubscribe(self: Arc, subscription: &mut ActiveSubscription) { let id_to_remove = subscription.id.clone(); self.total_subscribers .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); tokio::spawn(async move { let mut subscriptions = self.subscriptions.write().await; let mut indexes = self.index.write().await; if let Some(subscription) = subscriptions.remove(&id_to_remove) { for single_indexes in subscription.reverse_index.iter() { for index in single_indexes.iter() { indexes.remove(&(index.clone(), id_to_remove.clone())); } } } }); } /// Get the subscriptions list as a mutable reference pub async fn subscriptions_mut(&self) -> RwLockWriteGuard<'_, BTreeMap>> { self.subscriptions.write().await } /// Get active listeners for this event pub async fn get_subscribers(self: &Arc, event: &Event) -> Vec { let indexes = self.index.read().await; let event_index = Index::from(event); let mut matched = HashSet::new(); for idx in event_index { let start_index = (idx.clone(), I::default()); for ((current_idx, subscription_id), filter) in indexes.range(&start_index..) { if current_idx != &idx { break; } if !matched.contains(subscription_id) && filter.check_event(event) { matched.insert(subscription_id.clone()); } } } matched.into_iter().collect() } /// Returns the total number of subscribers pub fn total_subscribers(&self) -> usize { self.total_subscribers .load(std::sync::atomic::Ordering::Relaxed) } /// Creates a subscription and returns an active subscription struct /// /// The return object must be kept alive to keep the subscription alive pub async fn subscribe( self: &Arc, id: I, mut filters: Vec, inner: T, ) -> ActiveSubscription { let mut subscriptions = self.subscriptions.write().await; let mut indexes = self.index.write().await; let reverse_index: Vec<_> = filters .iter_mut() .map(|f| { let event_index = <&mut Filter as Into>::into(f).split(); (f.clone(), event_index) }) .collect(); for (filter, single_indexes) in reverse_index.iter() { for index in single_indexes.iter() { indexes.insert((index.clone(), id.clone()), filter.clone().into()); } } self.total_subscribers .fetch_add(1, std::sync::atomic::Ordering::Relaxed); subscriptions.insert( id.clone(), Subscription { reverse_index: reverse_index.into_iter().map(|(_, index)| index).collect(), inner, }, ); ActiveSubscription { id, manager: Some(self.clone()), } } }