瀏覽代碼

Moved broadcasting to their own thread with their own abstraction

Cesar Rodas 10 月之前
父節點
當前提交
8523f83ae9
共有 3 個文件被更改,包括 193 次插入105 次删除
  1. 102 105
      utxo/src/ledger.rs
  2. 3 0
      utxo/src/storage/mod.rs
  3. 88 0
      utxo/src/worker.rs

+ 102 - 105
utxo/src/ledger.rs

@@ -1,22 +1,112 @@
 use crate::{
-    amount::AmountCents, config::Config, status::StatusManager, storage::Storage,
-    transaction::Type, AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status,
-    Tag, Transaction, TxId,
+    amount::AmountCents,
+    config::Config,
+    status::StatusManager,
+    storage::Storage,
+    transaction::Type,
+    worker::{Worker, WorkerManager},
+    AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
+    TxId,
 };
+use async_trait::async_trait;
 use std::{
     cmp::Ordering,
     collections::HashMap,
     sync::{atomic::AtomicUsize, Arc},
 };
 use tokio::sync::{
-    mpsc::{self, channel, error::TrySendError, Receiver, Sender},
+    mpsc::{self, error::TrySendError, Receiver, Sender},
     RwLock,
 };
 
 #[derive(Debug)]
-enum BroadcastMessage<T> {
-    Shutdown,
-    Message(T),
+struct Broadcaster {
+    subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
+    subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
+    index: AtomicUsize,
+}
+
+impl Broadcaster {
+    fn new() -> Self {
+        Self {
+            subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
+            subscriptions: RwLock::new(HashMap::<Filter, Vec<_>>::new()),
+            index: 0.into(),
+        }
+    }
+
+    pub async fn subscribers(&self) -> usize {
+        self.subscribers.read().await.len()
+    }
+
+    pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
+        let (sender, receiver) = mpsc::channel(100);
+        let mut listeners = self.subscriptions.write().await;
+        let filter = filter.prepare();
+
+        let sender_index = self
+            .index
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        self.subscribers.write().await.insert(sender_index, sender);
+
+        if let Some(previous_values) = listeners.get_mut(&filter) {
+            previous_values.push(sender_index);
+        } else {
+            listeners.insert(filter, vec![sender_index]);
+        }
+
+        receiver
+    }
+}
+
+#[async_trait]
+impl Worker for Broadcaster {
+    type Message = Transaction;
+
+    async fn handler(&self, transaction: Self::Message) {
+        let listeners = self.subscriptions.read().await;
+        let senders = self.subscribers.read().await;
+
+        let mut subscriptions_to_reindex = vec![];
+        let mut senders_to_remove = vec![];
+
+        for (filter, listeners) in listeners.iter() {
+            if filter.matches(&transaction.transaction, &transaction.revision) {
+                for sender_index in listeners.iter() {
+                    if let Some(Err(TrySendError::Closed(_))) = senders
+                        .get(sender_index)
+                        .map(|sender| sender.try_send(transaction.clone()))
+                    {
+                        senders_to_remove.push(*sender_index);
+                        subscriptions_to_reindex.push(filter.clone());
+                    }
+                }
+            }
+        }
+
+        drop(listeners);
+        drop(senders);
+
+        if !senders_to_remove.is_empty() {
+            let mut listeners = self.subscriptions.write().await;
+            let mut senders = self.subscribers.write().await;
+
+            for to_remove in &senders_to_remove {
+                senders.remove(to_remove);
+            }
+
+            for to_rebuild in &subscriptions_to_reindex {
+                if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
+                    *list_of_senders = list_of_senders
+                        .into_iter()
+                        .filter(|x| senders.contains_key(*x))
+                        .map(|x| *x)
+                        .collect::<Vec<_>>();
+                }
+            }
+        }
+    }
 }
 
 /// The Verax ledger
@@ -26,21 +116,7 @@ where
     S: Storage + Sync + Send,
 {
     config: Config<S>,
-    brodcaster: Sender<BroadcastMessage<Transaction>>,
-    subscribers: Arc<RwLock<HashMap<usize, Sender<Transaction>>>>,
-    subscriptions: Arc<RwLock<HashMap<Filter, Vec<usize>>>>,
-    subscriber_index: AtomicUsize,
-}
-
-impl<S> Drop for Ledger<S>
-where
-    S: Storage + Sync + Send,
-{
-    fn drop(&mut self) {
-        self.brodcaster
-            .try_send(BroadcastMessage::Shutdown)
-            .expect("Successful delivery of shutdown signal");
-    }
+    brodcaster: WorkerManager<Broadcaster>,
 }
 
 impl<S> Ledger<S>
@@ -49,97 +125,20 @@ where
 {
     /// Creates a new ledger instance
     pub fn new(config: Config<S>) -> Arc<Self> {
-        let (sender_to_worker, mut receiver) = channel::<BroadcastMessage<Transaction>>(10_000);
-        let senders = Arc::new(RwLock::new(HashMap::<usize, Sender<_>>::new()));
-        let subscriptions = Arc::new(RwLock::new(HashMap::<Filter, Vec<_>>::new()));
-
-        let sender_for_worker = senders.clone();
-        let subscriptions_for_worker = subscriptions.clone();
-
-        tokio::spawn(async move {
-            while let Some(message) = receiver.recv().await {
-                match message {
-                    BroadcastMessage::Shutdown => break,
-                    BroadcastMessage::Message(transaction) => {
-                        let listeners = subscriptions_for_worker.read().await;
-                        let senders = sender_for_worker.read().await;
-
-                        let mut subscriptions_to_reindex = vec![];
-                        let mut senders_to_remove = vec![];
-
-                        for (filter, listeners) in listeners.iter() {
-                            if filter.matches(&transaction.transaction, &transaction.revision) {
-                                for sender_index in listeners.iter() {
-                                    if let Some(Err(TrySendError::Closed(_))) = senders
-                                        .get(sender_index)
-                                        .map(|sender| sender.try_send(transaction.clone()))
-                                    {
-                                        senders_to_remove.push(*sender_index);
-                                        subscriptions_to_reindex.push(filter.clone());
-                                    }
-                                }
-                            }
-                        }
-
-                        drop(listeners);
-                        drop(senders);
-
-                        if !senders_to_remove.is_empty() {
-                            let mut listeners = subscriptions_for_worker.write().await;
-                            let mut senders = sender_for_worker.write().await;
-
-                            for to_remove in &senders_to_remove {
-                                senders.remove(to_remove);
-                            }
-
-                            for to_rebuild in &subscriptions_to_reindex {
-                                if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
-                                    *list_of_senders = list_of_senders
-                                        .into_iter()
-                                        .filter(|x| senders.contains_key(*x))
-                                        .map(|x| *x)
-                                        .collect::<Vec<_>>();
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        });
-
         Arc::new(Self {
             config,
-            brodcaster: sender_to_worker,
-            subscribers: senders,
-            subscriber_index: AtomicUsize::new(0),
-            subscriptions,
+            brodcaster: WorkerManager::new(Broadcaster::new()),
         })
     }
 
     /// Subscribes to new transactions and revisions with a given filter
     pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
-        let (sender, receiver) = mpsc::channel(100);
-        let mut listeners = self.subscriptions.write().await;
-        let filter = filter.prepare();
-
-        let sender_index = self
-            .subscriber_index
-            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
-
-        self.subscribers.write().await.insert(sender_index, sender);
-
-        if let Some(previous_values) = listeners.get_mut(&filter) {
-            previous_values.push(sender_index);
-        } else {
-            listeners.insert(filter, vec![sender_index]);
-        }
-
-        receiver
+        self.brodcaster.subscribe(filter).await
     }
 
     /// Returns the total number of active subscribers
     pub async fn subscribers(&self) -> usize {
-        self.subscribers.read().await.len()
+        self.brodcaster.subscribers().await
     }
 
     /// The internal usage is to select unspent payments for each account to
@@ -283,9 +282,7 @@ where
     async fn persist(&self, mut transaction: Transaction) -> Result<Transaction, Error> {
         transaction.persist(&self.config).await?;
 
-        let _ = self
-            .brodcaster
-            .try_send(BroadcastMessage::Message(transaction.clone()));
+        let _ = self.brodcaster.send(transaction.clone());
         Ok(transaction)
     }
 

+ 3 - 0
utxo/src/storage/mod.rs

@@ -699,6 +699,9 @@ pub mod test {
         sleep(Duration::from_secs(1)).await;
 
         assert_eq!(0, ledger.subscribers().await);
+
+        drop(ledger);
+        sleep(Duration::from_secs(1)).await;
     }
 
     pub async fn find_transactions_by_tags<T>(storage: T)

+ 88 - 0
utxo/src/worker.rs

@@ -0,0 +1,88 @@
+//! Creates a worker thread and exposes a sender to communidate with
+use async_trait::async_trait;
+use std::{
+    ops::Deref,
+    sync::{atomic::AtomicBool, Arc},
+    time::Duration,
+};
+use tokio::{
+    sync::mpsc::{channel, Sender},
+    time::sleep,
+};
+
+const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
+const WORKER_BUFFER_SIZE: usize = 1_000;
+
+/// Worker trait
+///
+/// The worker trait has the definition of the code the worker has to perform in a different thread
+#[async_trait]
+pub trait Worker: Send + Sync {
+    type Message: Send + Sync;
+
+    async fn handler(&self, message: Self::Message);
+}
+
+/// Worker manager
+///
+/// The worker manager manages the instances of the Worker trait, which is executed asynchronously
+/// in a separate thread from the send() context.
+///
+/// The logic of having one or more instances of the Worker trait is abstracted in this structure.
+#[derive(Debug)]
+pub struct WorkerManager<W: Worker> {
+    sender: Sender<W::Message>,
+    is_running: Arc<AtomicBool>,
+    worker: Arc<W>,
+}
+
+impl<W: Worker> Drop for WorkerManager<W> {
+    fn drop(&mut self) {
+        self.is_running
+            .store(false, std::sync::atomic::Ordering::Release);
+    }
+}
+
+impl<W: Worker> Deref for WorkerManager<W> {
+    type Target = Arc<W>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.worker
+    }
+}
+
+impl<W: Worker + 'static> WorkerManager<W> {
+    pub fn new(worker: W) -> Self {
+        let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
+        let worker = Arc::new(worker);
+        let is_running = Arc::new(AtomicBool::new(true));
+
+        let worker_for_thread = worker.clone();
+        let worker_in_scope = is_running.clone();
+        tokio::spawn(async move {
+            loop {
+                tokio::select! {
+                    Some(message) = receiver.recv() => {
+                        worker_for_thread.handler(message).await
+                    }
+                    _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS))  => {}
+                }
+
+                if worker_in_scope.load(std::sync::atomic::Ordering::Acquire) == false {
+                    break;
+                }
+            }
+        });
+
+        Self {
+            sender,
+            is_running,
+            worker,
+        }
+    }
+
+    /// Sends a message to be processed in another thread
+    pub fn send(&self, message: W::Message) {
+        self.sender.try_send(message).expect("foo");
+    }
+}