Переглянути джерело

Move broadcaster code to their own file

Cesar Rodas 10 місяців тому
батько
коміт
a9fa2f91e5
4 змінених файлів з 123 додано та 113 видалено
  1. 104 0
      utxo/src/broadcaster.rs
  2. 6 109
      utxo/src/ledger.rs
  3. 1 0
      utxo/src/lib.rs
  4. 12 4
      utxo/src/worker.rs

+ 104 - 0
utxo/src/broadcaster.rs

@@ -0,0 +1,104 @@
+//! Broadcaster implementation
+use crate::{worker::Worker, Filter, Transaction};
+use async_trait::async_trait;
+use std::{collections::HashMap, sync::atomic::AtomicUsize};
+use tokio::sync::{
+    mpsc::{self, error::TrySendError, Receiver, Sender},
+    RwLock,
+};
+
+#[derive(Debug)]
+/// Broadcaster
+///
+/// This structure broadcasts the transactions to all subscribers in a separated working thread.
+pub struct Broadcaster {
+    subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
+    subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
+    index: AtomicUsize,
+}
+
+impl Default for Broadcaster {
+    fn default() -> Self {
+        Self {
+            subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
+            subscriptions: RwLock::new(HashMap::<Filter, Vec<_>>::new()),
+            index: 0.into(),
+        }
+    }
+}
+
+impl Broadcaster {
+    pub async fn subscribers(&self) -> usize {
+        self.subscribers.read().await.len()
+    }
+
+    /// Adds a subscriber to new transactions given a filter
+    pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
+        let (sender, receiver) = mpsc::channel(100);
+        let mut listeners = self.subscriptions.write().await;
+        let filter = filter.prepare();
+
+        let sender_index = self
+            .index
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        self.subscribers.write().await.insert(sender_index, sender);
+
+        if let Some(previous_values) = listeners.get_mut(&filter) {
+            previous_values.push(sender_index);
+        } else {
+            listeners.insert(filter, vec![sender_index]);
+        }
+
+        receiver
+    }
+}
+
+#[async_trait]
+impl Worker for Broadcaster {
+    type Payload = Transaction;
+
+    async fn handler(&self, transaction: Self::Payload) {
+        let listeners = self.subscriptions.read().await;
+        let senders = self.subscribers.read().await;
+
+        let mut subscriptions_to_reindex = vec![];
+        let mut senders_to_remove = vec![];
+
+        for (filter, listeners) in listeners.iter() {
+            if filter.matches(&transaction.transaction, &transaction.revision) {
+                for sender_index in listeners.iter() {
+                    if let Some(Err(TrySendError::Closed(_))) = senders
+                        .get(sender_index)
+                        .map(|sender| sender.try_send(transaction.clone()))
+                    {
+                        senders_to_remove.push(*sender_index);
+                        subscriptions_to_reindex.push(filter.clone());
+                    }
+                }
+            }
+        }
+
+        drop(listeners);
+        drop(senders);
+
+        if !senders_to_remove.is_empty() {
+            let mut listeners = self.subscriptions.write().await;
+            let mut senders = self.subscribers.write().await;
+
+            for to_remove in &senders_to_remove {
+                senders.remove(to_remove);
+            }
+
+            for to_rebuild in &subscriptions_to_reindex {
+                if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
+                    *list_of_senders = list_of_senders
+                        .iter()
+                        .filter(|x| senders.contains_key(*x))
+                        .copied()
+                        .collect::<Vec<_>>();
+                }
+            }
+        }
+    }
+}

+ 6 - 109
utxo/src/ledger.rs

@@ -1,113 +1,10 @@
 use crate::{
-    amount::AmountCents,
-    config::Config,
-    status::StatusManager,
-    storage::Storage,
-    transaction::Type,
-    worker::{Worker, WorkerManager},
-    AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
-    TxId,
+    amount::AmountCents, broadcaster::Broadcaster, config::Config, status::StatusManager,
+    storage::Storage, transaction::Type, worker::WorkerManager, AccountId, Amount, Error, Filter,
+    PaymentFrom, PaymentId, RevId, Status, Tag, Transaction, TxId,
 };
-use async_trait::async_trait;
-use std::{
-    cmp::Ordering,
-    collections::HashMap,
-    sync::{atomic::AtomicUsize, Arc},
-};
-use tokio::sync::{
-    mpsc::{self, error::TrySendError, Receiver, Sender},
-    RwLock,
-};
-
-#[derive(Debug)]
-struct Broadcaster {
-    subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
-    subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
-    index: AtomicUsize,
-}
-
-impl Broadcaster {
-    fn new() -> Self {
-        Self {
-            subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
-            subscriptions: RwLock::new(HashMap::<Filter, Vec<_>>::new()),
-            index: 0.into(),
-        }
-    }
-
-    pub async fn subscribers(&self) -> usize {
-        self.subscribers.read().await.len()
-    }
-
-    pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
-        let (sender, receiver) = mpsc::channel(100);
-        let mut listeners = self.subscriptions.write().await;
-        let filter = filter.prepare();
-
-        let sender_index = self
-            .index
-            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
-
-        self.subscribers.write().await.insert(sender_index, sender);
-
-        if let Some(previous_values) = listeners.get_mut(&filter) {
-            previous_values.push(sender_index);
-        } else {
-            listeners.insert(filter, vec![sender_index]);
-        }
-
-        receiver
-    }
-}
-
-#[async_trait]
-impl Worker for Broadcaster {
-    type Message = Transaction;
-
-    async fn handler(&self, transaction: Self::Message) {
-        let listeners = self.subscriptions.read().await;
-        let senders = self.subscribers.read().await;
-
-        let mut subscriptions_to_reindex = vec![];
-        let mut senders_to_remove = vec![];
-
-        for (filter, listeners) in listeners.iter() {
-            if filter.matches(&transaction.transaction, &transaction.revision) {
-                for sender_index in listeners.iter() {
-                    if let Some(Err(TrySendError::Closed(_))) = senders
-                        .get(sender_index)
-                        .map(|sender| sender.try_send(transaction.clone()))
-                    {
-                        senders_to_remove.push(*sender_index);
-                        subscriptions_to_reindex.push(filter.clone());
-                    }
-                }
-            }
-        }
-
-        drop(listeners);
-        drop(senders);
-
-        if !senders_to_remove.is_empty() {
-            let mut listeners = self.subscriptions.write().await;
-            let mut senders = self.subscribers.write().await;
-
-            for to_remove in &senders_to_remove {
-                senders.remove(to_remove);
-            }
-
-            for to_rebuild in &subscriptions_to_reindex {
-                if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
-                    *list_of_senders = list_of_senders
-                        .iter()
-                        .filter(|x| senders.contains_key(*x))
-                        .copied()
-                        .collect::<Vec<_>>();
-                }
-            }
-        }
-    }
-}
+use std::{cmp::Ordering, collections::HashMap, sync::Arc};
+use tokio::sync::mpsc::Receiver;
 
 /// The Verax ledger
 #[derive(Debug)]
@@ -127,7 +24,7 @@ where
     pub fn new(config: Config<S>) -> Arc<Self> {
         Arc::new(Self {
             config,
-            brodcaster: WorkerManager::new(Broadcaster::new()),
+            brodcaster: WorkerManager::new(Broadcaster::default()),
         })
     }
 

+ 1 - 0
utxo/src/lib.rs

@@ -25,6 +25,7 @@
 
 mod amount;
 mod asset;
+mod broadcaster;
 mod config;
 mod error;
 mod filter;

+ 12 - 4
utxo/src/worker.rs

@@ -18,9 +18,10 @@ const WORKER_BUFFER_SIZE: usize = 1_000;
 /// The worker trait has the definition of the code the worker has to perform in a different thread
 #[async_trait]
 pub trait Worker: Send + Sync {
-    type Message: Send + Sync;
+    type Payload: Send + Sync;
 
-    async fn handler(&self, message: Self::Message);
+    /// Method to be executed with a given task
+    async fn handler(&self, payload: Self::Payload);
 }
 
 /// Worker manager
@@ -29,9 +30,15 @@ pub trait Worker: Send + Sync {
 /// in a separate thread from the send() context.
 ///
 /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
+///
+/// TODO:
+///  * Boot the spawn thread on demand (right now it is at instantiation time)
+///  * Respawn on  the worker thread need (while the Worker is in memory / not dropped)
+///  * Kill off the worker thread after a long time without any activity. This will reboot (step 1)
+///    when needed
 #[derive(Debug)]
 pub struct WorkerManager<W: Worker> {
-    sender: Sender<W::Message>,
+    sender: Sender<W::Payload>,
     is_running: Arc<AtomicBool>,
     worker: Arc<W>,
 }
@@ -52,6 +59,7 @@ impl<W: Worker> Deref for WorkerManager<W> {
 }
 
 impl<W: Worker + 'static> WorkerManager<W> {
+    /// Creates a new WorkerManager given a struct that implements the Worker trait
     pub fn new(worker: W) -> Self {
         let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
         let worker = Arc::new(worker);
@@ -82,7 +90,7 @@ impl<W: Worker + 'static> WorkerManager<W> {
     }
 
     /// Sends a message to be processed in another thread
-    pub fn send(&self, message: W::Message) {
+    pub fn send(&self, message: W::Payload) {
         self.sender.try_send(message).expect("foo");
     }
 }