broadcaster.rs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. //! Broadcaster implementation
  2. use crate::{worker::Worker, Filter, FilterableValue, Transaction};
  3. use async_trait::async_trait;
  4. use std::{
  5. collections::HashMap,
  6. sync::atomic::{AtomicBool, AtomicUsize},
  7. };
  8. use tokio::sync::{
  9. mpsc::{error::TrySendError, Sender},
  10. RwLock,
  11. };
  12. #[derive(Debug)]
  13. /// Broadcaster
  14. ///
  15. /// This structure broadcasts the transactions to all subscribers in a separated working thread.
  16. pub struct Broadcaster {
  17. subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
  18. subscriptions: RwLock<HashMap<FilterableValue, Vec<(Filter, usize)>>>,
  19. is_there_any_subscriber: AtomicBool,
  20. index: AtomicUsize,
  21. }
  22. impl Default for Broadcaster {
  23. fn default() -> Self {
  24. Self {
  25. subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
  26. subscriptions: RwLock::new(HashMap::<FilterableValue, Vec<_>>::new()),
  27. is_there_any_subscriber: false.into(),
  28. index: 0.into(),
  29. }
  30. }
  31. }
  32. impl Broadcaster {
  33. pub async fn subscribers(&self) -> usize {
  34. self.subscribers.read().await.len()
  35. }
  36. /// Adds a subscriber to new transactions given a filter
  37. pub async fn subscribe(&self, filter: Filter, sender: Sender<Transaction>) {
  38. let mut listeners = self.subscriptions.write().await;
  39. let filter = filter.prepare();
  40. let sender_index = self
  41. .index
  42. .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  43. self.subscribers.write().await.insert(sender_index, sender);
  44. self.is_there_any_subscriber
  45. .store(true, std::sync::atomic::Ordering::Release);
  46. let (primary_filter, filter) = filter.get_primary_filter();
  47. let key_filter: Vec<FilterableValue> = primary_filter.into();
  48. for key_filter in key_filter {
  49. if let Some(previous_values) = listeners.get_mut(&key_filter) {
  50. previous_values.push((filter.clone(), sender_index));
  51. } else {
  52. listeners.insert(key_filter, vec![(filter.clone(), sender_index)]);
  53. }
  54. }
  55. }
  56. }
  57. #[async_trait]
  58. impl Worker for Broadcaster {
  59. type Payload = Transaction;
  60. fn process_request(&self) -> bool {
  61. self.is_there_any_subscriber
  62. .load(std::sync::atomic::Ordering::Acquire)
  63. }
  64. async fn handler(&self, transaction: Self::Payload) {
  65. let listeners = self.subscriptions.read().await;
  66. let senders = self.subscribers.read().await;
  67. let mut subscriptions_to_reindex = vec![];
  68. let mut senders_to_remove = vec![];
  69. for primary_filter in transaction.get_filterable_fields() {
  70. let listeners = if let Some(listeners) = listeners.get(&primary_filter) {
  71. listeners
  72. } else {
  73. continue;
  74. };
  75. for (filter, sender_index) in listeners {
  76. if filter.matches(&transaction.transaction, &transaction.revision) {
  77. if let Some(Err(TrySendError::Closed(_))) = senders
  78. .get(sender_index)
  79. .map(|sender| sender.try_send(transaction.clone()))
  80. {
  81. senders_to_remove.push(*sender_index);
  82. subscriptions_to_reindex.push(primary_filter.clone());
  83. }
  84. }
  85. }
  86. }
  87. drop(listeners);
  88. drop(senders);
  89. if !senders_to_remove.is_empty() {
  90. let mut listeners = self.subscriptions.write().await;
  91. let mut senders = self.subscribers.write().await;
  92. for to_remove in &senders_to_remove {
  93. senders.remove(to_remove);
  94. }
  95. for to_rebuild in subscriptions_to_reindex {
  96. if let Some(list_of_senders) = listeners.remove(&to_rebuild) {
  97. listeners.insert(
  98. to_rebuild,
  99. list_of_senders
  100. .into_iter()
  101. .filter(|x| senders.contains_key(&x.1))
  102. .collect::<Vec<_>>(),
  103. );
  104. }
  105. }
  106. drop(listeners);
  107. drop(senders);
  108. if self.subscribers().await == 0 {
  109. self.is_there_any_subscriber
  110. .store(false, std::sync::atomic::Ordering::Release);
  111. }
  112. }
  113. }
  114. }