瀏覽代碼

Extended Worker trait

Added `process_request` to allow the implementation to have a logic to drop
requests if it makes no sense to process them

The trait has a default implementation for `process_request`.
Cesar Rodas 10 月之前
父節點
當前提交
aba9a009cd
共有 3 個文件被更改,包括 32 次插入5 次删除
  1. 22 1
      utxo/src/broadcaster.rs
  2. 1 2
      utxo/src/ledger.rs
  3. 9 2
      utxo/src/worker.rs

+ 22 - 1
utxo/src/broadcaster.rs

@@ -1,7 +1,10 @@
 //! Broadcaster implementation
 use crate::{worker::Worker, Filter, Transaction};
 use async_trait::async_trait;
-use std::{collections::HashMap, sync::atomic::AtomicUsize};
+use std::{
+    collections::HashMap,
+    sync::atomic::{AtomicBool, AtomicUsize},
+};
 use tokio::sync::{
     mpsc::{self, error::TrySendError, Receiver, Sender},
     RwLock,
@@ -14,6 +17,7 @@ use tokio::sync::{
 pub struct Broadcaster {
     subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
     subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
+    is_there_any_subscriber: AtomicBool,
     index: AtomicUsize,
 }
 
@@ -22,6 +26,7 @@ impl Default for Broadcaster {
         Self {
             subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
             subscriptions: RwLock::new(HashMap::<Filter, Vec<_>>::new()),
+            is_there_any_subscriber: false.into(),
             index: 0.into(),
         }
     }
@@ -44,6 +49,9 @@ impl Broadcaster {
 
         self.subscribers.write().await.insert(sender_index, sender);
 
+        self.is_there_any_subscriber
+            .store(true, std::sync::atomic::Ordering::Release);
+
         if let Some(previous_values) = listeners.get_mut(&filter) {
             previous_values.push(sender_index);
         } else {
@@ -58,6 +66,11 @@ impl Broadcaster {
 impl Worker for Broadcaster {
     type Payload = Transaction;
 
+    async fn process_request(&self) -> bool {
+        self.is_there_any_subscriber
+            .load(std::sync::atomic::Ordering::Acquire)
+    }
+
     async fn handler(&self, transaction: Self::Payload) {
         let listeners = self.subscriptions.read().await;
         let senders = self.subscribers.read().await;
@@ -99,6 +112,14 @@ impl Worker for Broadcaster {
                         .collect::<Vec<_>>();
                 }
             }
+
+            drop(listeners);
+            drop(senders);
+
+            if self.subscribers().await == 0 {
+                self.is_there_any_subscriber
+                    .store(false, std::sync::atomic::Ordering::Release);
+            }
         }
     }
 }

+ 1 - 2
utxo/src/ledger.rs

@@ -178,8 +178,7 @@ where
     /// instead of having them at the transaction layer
     async fn persist(&self, mut transaction: Transaction) -> Result<Transaction, Error> {
         transaction.persist(&self.config).await?;
-
-        self.brodcaster.send(transaction.clone());
+        self.brodcaster.process(transaction.clone()).await;
         Ok(transaction)
     }
 

+ 9 - 2
utxo/src/worker.rs

@@ -22,6 +22,11 @@ pub trait Worker: Send + Sync {
 
     /// Method to be executed with a given task
     async fn handler(&self, payload: Self::Payload);
+
+    /// Whether or not to process the request
+    async fn process_request(&self) -> bool {
+        true
+    }
 }
 
 /// Worker manager
@@ -90,7 +95,9 @@ impl<W: Worker + 'static> WorkerManager<W> {
     }
 
     /// Sends a message to be processed in another thread
-    pub fn send(&self, message: W::Payload) {
-        self.sender.try_send(message).expect("foo");
+    pub async fn process(&self, message: W::Payload) {
+        if self.worker.process_request().await {
+            self.sender.try_send(message).expect("foo");
+        }
     }
 }