|
@@ -37,29 +37,27 @@ pub const DEFAULT_CHANNEL_SIZE: usize = 10;
|
|
/// The content of the notification is not relevant to this scope and it is up
|
|
/// The content of the notification is not relevant to this scope and it is up
|
|
/// to the application, therefore the generic T is used instead of a specific
|
|
/// to the application, therefore the generic T is used instead of a specific
|
|
/// type
|
|
/// type
|
|
-pub struct Manager<T, I, F>
|
|
|
|
|
|
+pub struct Manager<E, F>
|
|
where
|
|
where
|
|
- T: Indexable<Type = I> + Clone + Send + Sync + 'static,
|
|
|
|
- I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
|
|
|
|
- F: OnNewSubscription<Index = I, Event = T> + 'static,
|
|
|
|
|
|
+ E: Indexable + Clone + Send + Sync + 'static,
|
|
|
|
+ F: OnNewSubscription<Index = E::Index, Event = E> + 'static,
|
|
{
|
|
{
|
|
- indexes: IndexTree<T, I>,
|
|
|
|
|
|
+ indexes: IndexTree<E, E::Index>,
|
|
on_new_subscription: Option<F>,
|
|
on_new_subscription: Option<F>,
|
|
- unsubscription_sender: mpsc::Sender<(SubId, Vec<Index<I>>)>,
|
|
|
|
|
|
+ unsubscription_sender: mpsc::Sender<(SubId, Vec<Index<E::Index>>)>,
|
|
active_subscriptions: Arc<AtomicUsize>,
|
|
active_subscriptions: Arc<AtomicUsize>,
|
|
background_subscription_remover: Option<JoinHandle<()>>,
|
|
background_subscription_remover: Option<JoinHandle<()>>,
|
|
}
|
|
}
|
|
|
|
|
|
-impl<T, I, F> Default for Manager<T, I, F>
|
|
|
|
|
|
+impl<E, F> Default for Manager<E, F>
|
|
where
|
|
where
|
|
- T: Indexable<Type = I> + Clone + Send + Sync + 'static,
|
|
|
|
- I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
|
|
|
|
- F: OnNewSubscription<Index = I, Event = T> + 'static,
|
|
|
|
|
|
+ E: Indexable + Clone + Send + Sync + 'static,
|
|
|
|
+ F: OnNewSubscription<Index = E::Index, Event = E> + 'static,
|
|
{
|
|
{
|
|
fn default() -> Self {
|
|
fn default() -> Self {
|
|
let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE);
|
|
let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE);
|
|
let active_subscriptions: Arc<AtomicUsize> = Default::default();
|
|
let active_subscriptions: Arc<AtomicUsize> = Default::default();
|
|
- let storage: IndexTree<T, I> = Arc::new(Default::default());
|
|
|
|
|
|
+ let storage: IndexTree<E, E::Index> = Arc::new(Default::default());
|
|
|
|
|
|
Self {
|
|
Self {
|
|
background_subscription_remover: Some(tokio::spawn(Self::remove_subscription(
|
|
background_subscription_remover: Some(tokio::spawn(Self::remove_subscription(
|
|
@@ -75,11 +73,10 @@ where
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-impl<T, I, F> From<F> for Manager<T, I, F>
|
|
|
|
|
|
+impl<E, F> From<F> for Manager<E, F>
|
|
where
|
|
where
|
|
- T: Indexable<Type = I> + Clone + Send + Sync + 'static,
|
|
|
|
- I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
|
|
|
|
- F: OnNewSubscription<Index = I, Event = T> + 'static,
|
|
|
|
|
|
+ E: Indexable + Clone + Send + Sync + 'static,
|
|
|
|
+ F: OnNewSubscription<Index = E::Index, Event = E> + 'static,
|
|
{
|
|
{
|
|
fn from(value: F) -> Self {
|
|
fn from(value: F) -> Self {
|
|
let mut manager: Self = Default::default();
|
|
let mut manager: Self = Default::default();
|
|
@@ -88,18 +85,17 @@ where
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-impl<T, I, F> Manager<T, I, F>
|
|
|
|
|
|
+impl<E, F> Manager<E, F>
|
|
where
|
|
where
|
|
- T: Indexable<Type = I> + Clone + Send + Sync + 'static,
|
|
|
|
- I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
|
|
|
|
- F: OnNewSubscription<Index = I, Event = T> + 'static,
|
|
|
|
|
|
+ E: Indexable + Clone + Send + Sync + 'static,
|
|
|
|
+ F: OnNewSubscription<Index = E::Index, Event = E> + 'static,
|
|
{
|
|
{
|
|
- #[inline]
|
|
|
|
|
|
+ #[inline(always)]
|
|
/// Broadcast an event to all listeners
|
|
/// Broadcast an event to all listeners
|
|
///
|
|
///
|
|
/// This function takes an Arc to the storage struct, the event_id, the kind
|
|
/// This function takes an Arc to the storage struct, the event_id, the kind
|
|
/// and the vent to broadcast
|
|
/// and the vent to broadcast
|
|
- async fn broadcast_impl(storage: &IndexTree<T, I>, event: T) {
|
|
|
|
|
|
+ async fn broadcast_impl(storage: &IndexTree<E, E::Index>, event: E) {
|
|
let index_storage = storage.read().await;
|
|
let index_storage = storage.read().await;
|
|
let mut sent = HashSet::new();
|
|
let mut sent = HashSet::new();
|
|
for index in event.to_indexes() {
|
|
for index in event.to_indexes() {
|
|
@@ -121,7 +117,7 @@ where
|
|
///
|
|
///
|
|
/// This public method will not block the caller, it will spawn a new task
|
|
/// This public method will not block the caller, it will spawn a new task
|
|
/// instead
|
|
/// instead
|
|
- pub fn broadcast(&self, event: T) {
|
|
|
|
|
|
+ pub fn broadcast(&self, event: E) {
|
|
let storage = self.indexes.clone();
|
|
let storage = self.indexes.clone();
|
|
tokio::spawn(async move {
|
|
tokio::spawn(async move {
|
|
Self::broadcast_impl(&storage, event).await;
|
|
Self::broadcast_impl(&storage, event).await;
|
|
@@ -131,7 +127,7 @@ where
|
|
/// Broadcasts an event to all listeners
|
|
/// Broadcasts an event to all listeners
|
|
///
|
|
///
|
|
/// This method is async and will await for the broadcast to be completed
|
|
/// This method is async and will await for the broadcast to be completed
|
|
- pub async fn broadcast_async(&self, event: T) {
|
|
|
|
|
|
+ pub async fn broadcast_async(&self, event: E) {
|
|
Self::broadcast_impl(&self.indexes, event).await;
|
|
Self::broadcast_impl(&self.indexes, event).await;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -140,8 +136,8 @@ where
|
|
async fn subscribe_inner(
|
|
async fn subscribe_inner(
|
|
&self,
|
|
&self,
|
|
sub_id: SubId,
|
|
sub_id: SubId,
|
|
- indexes: Vec<Index<I>>,
|
|
|
|
- ) -> ActiveSubscription<T, I> {
|
|
|
|
|
|
+ indexes: Vec<Index<E::Index>>,
|
|
|
|
+ ) -> ActiveSubscription<E, E::Index> {
|
|
let (sender, receiver) = mpsc::channel(10);
|
|
let (sender, receiver) = mpsc::channel(10);
|
|
if let Some(on_new_subscription) = self.on_new_subscription.as_ref() {
|
|
if let Some(on_new_subscription) = self.on_new_subscription.as_ref() {
|
|
match on_new_subscription
|
|
match on_new_subscription
|
|
@@ -181,20 +177,20 @@ where
|
|
}
|
|
}
|
|
|
|
|
|
/// Try to subscribe to a specific event
|
|
/// Try to subscribe to a specific event
|
|
- pub async fn try_subscribe<P: AsRef<SubId> + TryInto<Vec<Index<I>>>>(
|
|
|
|
|
|
+ pub async fn try_subscribe<P: AsRef<SubId> + TryInto<Vec<Index<E::Index>>>>(
|
|
&self,
|
|
&self,
|
|
params: P,
|
|
params: P,
|
|
- ) -> Result<ActiveSubscription<T, I>, P::Error> {
|
|
|
|
|
|
+ ) -> Result<ActiveSubscription<E, E::Index>, P::Error> {
|
|
Ok(self
|
|
Ok(self
|
|
.subscribe_inner(params.as_ref().clone(), params.try_into()?)
|
|
.subscribe_inner(params.as_ref().clone(), params.try_into()?)
|
|
.await)
|
|
.await)
|
|
}
|
|
}
|
|
|
|
|
|
/// Subscribe to a specific event
|
|
/// Subscribe to a specific event
|
|
- pub async fn subscribe<P: AsRef<SubId> + Into<Vec<Index<I>>>>(
|
|
|
|
|
|
+ pub async fn subscribe<P: AsRef<SubId> + Into<Vec<Index<E::Index>>>>(
|
|
&self,
|
|
&self,
|
|
params: P,
|
|
params: P,
|
|
- ) -> ActiveSubscription<T, I> {
|
|
|
|
|
|
+ ) -> ActiveSubscription<E, E::Index> {
|
|
self.subscribe_inner(params.as_ref().clone(), params.into())
|
|
self.subscribe_inner(params.as_ref().clone(), params.into())
|
|
.await
|
|
.await
|
|
}
|
|
}
|
|
@@ -209,8 +205,8 @@ where
|
|
/// This task will run in the background (and will be dropped when the [`Manager`]
|
|
/// This task will run in the background (and will be dropped when the [`Manager`]
|
|
/// is) and will remove subscriptions from the storage struct it is dropped.
|
|
/// is) and will remove subscriptions from the storage struct it is dropped.
|
|
async fn remove_subscription(
|
|
async fn remove_subscription(
|
|
- mut receiver: mpsc::Receiver<(SubId, Vec<Index<I>>)>,
|
|
|
|
- storage: IndexTree<T, I>,
|
|
|
|
|
|
+ mut receiver: mpsc::Receiver<(SubId, Vec<Index<E::Index>>)>,
|
|
|
|
+ storage: IndexTree<E, E::Index>,
|
|
active_subscriptions: Arc<AtomicUsize>,
|
|
active_subscriptions: Arc<AtomicUsize>,
|
|
) {
|
|
) {
|
|
while let Some((sub_id, indexes)) = receiver.recv().await {
|
|
while let Some((sub_id, indexes)) = receiver.recv().await {
|
|
@@ -228,11 +224,10 @@ where
|
|
}
|
|
}
|
|
|
|
|
|
/// Manager goes out of scope, stop all background tasks
|
|
/// Manager goes out of scope, stop all background tasks
|
|
-impl<T, I, F> Drop for Manager<T, I, F>
|
|
|
|
|
|
+impl<E, F> Drop for Manager<E, F>
|
|
where
|
|
where
|
|
- T: Indexable<Type = I> + Clone + Send + Sync + 'static,
|
|
|
|
- I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
|
|
|
|
- F: OnNewSubscription<Index = I, Event = T> + 'static,
|
|
|
|
|
|
+ E: Indexable + Clone + Send + Sync + 'static,
|
|
|
|
+ F: OnNewSubscription<Index = E::Index, Event = E> + 'static,
|
|
{
|
|
{
|
|
fn drop(&mut self) {
|
|
fn drop(&mut self) {
|
|
if let Some(handler) = self.background_subscription_remover.take() {
|
|
if let Some(handler) = self.background_subscription_remover.take() {
|