123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- //! Broadcaster implementation
- use crate::{worker::Worker, Filter, FilterableValue, Transaction};
- use async_trait::async_trait;
- use std::{
- collections::HashMap,
- sync::atomic::{AtomicBool, AtomicUsize},
- };
- use tokio::sync::{
- mpsc::{error::TrySendError, Sender},
- RwLock,
- };
- #[derive(Debug)]
- /// Broadcaster
- ///
- /// This structure broadcasts the transactions to all subscribers in a separated working thread.
- pub struct Broadcaster {
- subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
- subscriptions: RwLock<HashMap<FilterableValue, Vec<(Filter, usize)>>>,
- is_there_any_subscriber: AtomicBool,
- index: AtomicUsize,
- }
- impl Default for Broadcaster {
- fn default() -> Self {
- Self {
- subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
- subscriptions: RwLock::new(HashMap::<FilterableValue, Vec<_>>::new()),
- is_there_any_subscriber: false.into(),
- index: 0.into(),
- }
- }
- }
- impl Broadcaster {
- pub async fn subscribers(&self) -> usize {
- self.subscribers.read().await.len()
- }
- /// Adds a subscriber to new transactions given a filter
- pub async fn subscribe(&self, filter: Filter, sender: Sender<Transaction>) {
- let mut listeners = self.subscriptions.write().await;
- let filter = filter.prepare();
- let sender_index = self
- .index
- .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
- self.subscribers.write().await.insert(sender_index, sender);
- self.is_there_any_subscriber
- .store(true, std::sync::atomic::Ordering::Release);
- let (primary_filter, filter) = filter.get_primary_filter();
- let key_filter: Vec<FilterableValue> = primary_filter.into();
- for key_filter in key_filter {
- if let Some(previous_values) = listeners.get_mut(&key_filter) {
- previous_values.push((filter.clone(), sender_index));
- } else {
- listeners.insert(key_filter, vec![(filter.clone(), sender_index)]);
- }
- }
- }
- }
- #[async_trait]
- impl Worker for Broadcaster {
- type Payload = Transaction;
- fn process_request(&self) -> bool {
- self.is_there_any_subscriber
- .load(std::sync::atomic::Ordering::Acquire)
- }
- async fn handler(&self, transaction: Self::Payload) {
- let listeners = self.subscriptions.read().await;
- let senders = self.subscribers.read().await;
- let mut subscriptions_to_reindex = vec![];
- let mut senders_to_remove = vec![];
- for primary_filter in transaction.get_filterable_fields() {
- let listeners = if let Some(listeners) = listeners.get(&primary_filter) {
- listeners
- } else {
- continue;
- };
- for (filter, sender_index) in listeners {
- if filter.matches(&transaction.transaction, &transaction.revision) {
- if let Some(Err(TrySendError::Closed(_))) = senders
- .get(sender_index)
- .map(|sender| sender.try_send(transaction.clone()))
- {
- senders_to_remove.push(*sender_index);
- subscriptions_to_reindex.push(primary_filter.clone());
- }
- }
- }
- }
- drop(listeners);
- drop(senders);
- if !senders_to_remove.is_empty() {
- let mut listeners = self.subscriptions.write().await;
- let mut senders = self.subscribers.write().await;
- for to_remove in &senders_to_remove {
- senders.remove(to_remove);
- }
- for to_rebuild in subscriptions_to_reindex {
- if let Some(list_of_senders) = listeners.remove(&to_rebuild) {
- listeners.insert(
- to_rebuild,
- list_of_senders
- .into_iter()
- .filter(|x| senders.contains_key(&x.1))
- .collect::<Vec<_>>(),
- );
- }
- }
- drop(listeners);
- drop(senders);
- if self.subscribers().await == 0 {
- self.is_there_any_subscriber
- .store(false, std::sync::atomic::Ordering::Release);
- }
- }
- }
- }
|