Эх сурвалжийг харах

WIP: Move broadcasting of transaction to subscribers to a different thread

Cesar Rodas 10 сар өмнө
parent
commit
20b888030a

+ 90 - 55
utxo/src/ledger.rs

@@ -9,10 +9,16 @@ use std::{
     sync::{atomic::AtomicUsize, Arc},
 };
 use tokio::sync::{
-    mpsc::{self, error::TrySendError, Receiver, Sender},
+    mpsc::{self, channel, error::TrySendError, Receiver, Sender},
     RwLock,
 };
 
+#[derive(Debug)]
+enum BroadcastMessage<T> {
+    Shutdown,
+    Message(T),
+}
+
 /// The Verax ledger
 #[derive(Debug)]
 pub struct Ledger<S>
@@ -20,9 +26,21 @@ where
     S: Storage + Sync + Send,
 {
     config: Config<S>,
-    senders: RwLock<HashMap<usize, Sender<Transaction>>>,
-    subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
-    sender_index: AtomicUsize,
+    brodcaster: Sender<BroadcastMessage<Transaction>>,
+    subscribers: Arc<RwLock<HashMap<usize, Sender<Transaction>>>>,
+    subscriptions: Arc<RwLock<HashMap<Filter, Vec<usize>>>>,
+    subscriber_index: AtomicUsize,
+}
+
+impl<S> Drop for Ledger<S>
+where
+    S: Storage + Sync + Send,
+{
+    fn drop(&mut self) {
+        self.brodcaster
+            .try_send(BroadcastMessage::Shutdown)
+            .expect("Successful delivery of shutdown signal");
+    }
 }
 
 impl<S> Ledger<S>
@@ -31,11 +49,70 @@ where
 {
     /// Creates a new ledger instance
     pub fn new(config: Config<S>) -> Arc<Self> {
+        let (sender_to_worker, mut receiver) = channel::<BroadcastMessage<Transaction>>(10_000);
+        let senders = Arc::new(RwLock::new(HashMap::<usize, Sender<_>>::new()));
+        let subscriptions = Arc::new(RwLock::new(HashMap::<Filter, Vec<_>>::new()));
+
+        let sender_for_worker = senders.clone();
+        let subscriptions_for_worker = subscriptions.clone();
+
+        tokio::spawn(async move {
+            while let Some(message) = receiver.recv().await {
+                match message {
+                    BroadcastMessage::Shutdown => break,
+                    BroadcastMessage::Message(transaction) => {
+                        let listeners = subscriptions_for_worker.read().await;
+                        let senders = sender_for_worker.read().await;
+
+                        let mut subscriptions_to_reindex = vec![];
+                        let mut senders_to_remove = vec![];
+
+                        for (filter, listeners) in listeners.iter() {
+                            if filter.matches(&transaction.transaction, &transaction.revision) {
+                                for sender_index in listeners.iter() {
+                                    if let Some(Err(TrySendError::Closed(_))) = senders
+                                        .get(sender_index)
+                                        .map(|sender| sender.try_send(transaction.clone()))
+                                    {
+                                        senders_to_remove.push(*sender_index);
+                                        subscriptions_to_reindex.push(filter.clone());
+                                    }
+                                }
+                            }
+                        }
+
+                        drop(listeners);
+                        drop(senders);
+
+                        if !senders_to_remove.is_empty() {
+                            let mut listeners = subscriptions_for_worker.write().await;
+                            let mut senders = sender_for_worker.write().await;
+
+                            for to_remove in &senders_to_remove {
+                                senders.remove(to_remove);
+                            }
+
+                            for to_rebuild in &subscriptions_to_reindex {
+                                if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
+                                    *list_of_senders = list_of_senders
+                                        .into_iter()
+                                        .filter(|x| senders.contains_key(*x))
+                                        .map(|x| *x)
+                                        .collect::<Vec<_>>();
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        });
+
         Arc::new(Self {
             config,
-            senders: RwLock::new(HashMap::new()),
-            sender_index: AtomicUsize::new(0),
-            subscriptions: RwLock::new(HashMap::new()),
+            brodcaster: sender_to_worker,
+            subscribers: senders,
+            subscriber_index: AtomicUsize::new(0),
+            subscriptions,
         })
     }
 
@@ -46,10 +123,10 @@ where
         let filter = filter.prepare();
 
         let sender_index = self
-            .sender_index
+            .subscriber_index
             .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
 
-        self.senders.write().await.insert(sender_index, sender);
+        self.subscribers.write().await.insert(sender_index, sender);
 
         if let Some(previous_values) = listeners.get_mut(&filter) {
             previous_values.push(sender_index);
@@ -62,7 +139,7 @@ where
 
     /// Returns the total number of active subscribers
     pub async fn subscribers(&self) -> usize {
-        self.senders.read().await.len()
+        self.subscribers.read().await.len()
     }
 
     /// The internal usage is to select unspent payments for each account to
@@ -206,51 +283,9 @@ where
     async fn persist(&self, mut transaction: Transaction) -> Result<Transaction, Error> {
         transaction.persist(&self.config).await?;
 
-        {
-            // TODO: Move this block of code to a different thread so spreading messages won't make
-            // creating transaction slower
-            let listeners = self.subscriptions.read().await;
-            let senders = self.senders.read().await;
-
-            let mut subscriptions_to_reindex = vec![];
-            let mut senders_to_remove = vec![];
-
-            for (filter, listeners) in listeners.iter() {
-                if filter.matches(&transaction.transaction, &transaction.revision) {
-                    for sender_index in listeners.iter() {
-                        if let Some(Err(TrySendError::Closed(_))) = senders
-                            .get(sender_index)
-                            .map(|sender| sender.try_send(transaction.clone()))
-                        {
-                            senders_to_remove.push(*sender_index);
-                            subscriptions_to_reindex.push(filter.clone());
-                        }
-                    }
-                }
-            }
-
-            drop(listeners);
-            drop(senders);
-
-            if !senders_to_remove.is_empty() {
-                let mut listeners = self.subscriptions.write().await;
-                let mut senders = self.senders.write().await;
-
-                for to_remove in &senders_to_remove {
-                    senders.remove(to_remove);
-                }
-
-                for to_rebuild in &subscriptions_to_reindex {
-                    if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
-                        *list_of_senders = list_of_senders
-                            .into_iter()
-                            .filter(|x| senders.contains_key(*x))
-                            .map(|x| *x)
-                            .collect::<Vec<_>>();
-                    }
-                }
-            }
-        };
+        let _ = self
+            .brodcaster
+            .try_send(BroadcastMessage::Message(transaction.clone()));
         Ok(transaction)
     }
 

+ 1 - 0
utxo/src/lib.rs

@@ -37,6 +37,7 @@ pub mod storage;
 #[cfg(test)]
 mod tests;
 mod transaction;
+mod worker;
 
 #[cfg(test)]
 pub use self::storage::test as storage_test;

+ 6 - 2
utxo/src/storage/mod.rs

@@ -323,11 +323,11 @@ pub trait Storage {
 
 #[cfg(test)]
 pub mod test {
-    use std::collections::HashMap;
-
     use super::*;
     use crate::{config::Config, status::StatusManager, Ledger, Transaction};
     use rand::Rng;
+    use std::{collections::HashMap, time::Duration};
+    use tokio::time::sleep;
 
     #[macro_export]
     macro_rules! storage_unit_test {
@@ -661,6 +661,8 @@ pub mod test {
             }
         }
 
+        sleep(Duration::from_secs(1)).await;
+
         let expectactions = vec!["100.99", "102.99", "104.99", "106.99", "108.99", ""];
 
         assert_eq!(1, ledger.subscribers().await);
@@ -694,6 +696,8 @@ pub mod test {
             .await
             .expect("valid deposit");
 
+        sleep(Duration::from_secs(1)).await;
+
         assert_eq!(0, ledger.subscribers().await);
     }