Преглед изворни кода

Improve the subscription API

* Made subscribe() to return a sender and receiver
* Added subscribe_with_sender which allows to use an existing sender to
  subscribe to another filter
Cesar Rodas пре 10 месеци
родитељ
комит
fe66b5b9dd
3 измењених фајлова са 87 додато и 14 уклоњено
  1. 2 6
      utxo/src/broadcaster.rs
  2. 14 7
      utxo/src/ledger.rs
  3. 71 1
      utxo/src/storage/mod.rs

+ 2 - 6
utxo/src/broadcaster.rs

@@ -6,7 +6,7 @@ use std::{
     sync::atomic::{AtomicBool, AtomicUsize},
 };
 use tokio::sync::{
-    mpsc::{self, error::TrySendError, Receiver, Sender},
+    mpsc::{error::TrySendError, Sender},
     RwLock,
 };
 
@@ -38,8 +38,7 @@ impl Broadcaster {
     }
 
     /// Adds a subscriber to new transactions given a filter
-    pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
-        let (sender, receiver) = mpsc::channel(100);
+    pub async fn subscribe(&self, filter: Filter, sender: Sender<Transaction>) {
         let mut listeners = self.subscriptions.write().await;
         let filter = filter.prepare();
 
@@ -54,7 +53,6 @@ impl Broadcaster {
 
         let (primary_filter, filter) = filter.get_primary_filter();
         let key_filter: Vec<FilterableValue> = primary_filter.into();
-        println!("{:#?}", key_filter);
 
         for key_filter in key_filter {
             if let Some(previous_values) = listeners.get_mut(&key_filter) {
@@ -63,8 +61,6 @@ impl Broadcaster {
                 listeners.insert(key_filter, vec![(filter.clone(), sender_index)]);
             }
         }
-
-        receiver
     }
 }
 

+ 14 - 7
utxo/src/ledger.rs

@@ -11,7 +11,7 @@ use crate::{
     TxId,
 };
 use std::{cmp::Ordering, collections::HashMap, sync::Arc};
-use tokio::sync::mpsc::Receiver;
+use tokio::sync::mpsc::{self, Receiver, Sender};
 
 /// The Verax ledger
 #[derive(Debug)]
@@ -20,7 +20,7 @@ where
     S: Storage + Sync + Send,
 {
     config: Config<S>,
-    brodcaster: WorkerManager<Broadcaster>,
+    broadcaster: WorkerManager<Broadcaster>,
 }
 
 impl<S> Ledger<S>
@@ -31,18 +31,25 @@ where
     pub fn new(config: Config<S>) -> Arc<Self> {
         Arc::new(Self {
             config,
-            brodcaster: WorkerManager::new(Broadcaster::default()),
+            broadcaster: WorkerManager::new(Broadcaster::default()),
         })
     }
 
     /// Subscribes to new transactions and revisions with a given filter
-    pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
-        self.brodcaster.subscribe(filter).await
+    pub async fn subscribe(&self, filter: Filter) -> (Sender<Transaction>, Receiver<Transaction>) {
+        let (sender, receiver) = mpsc::channel(100);
+        self.subscribe_with_sender(filter, sender.clone()).await;
+        (sender, receiver)
+    }
+
+    /// Subscribe to new transactions and revisions with a given filter and a receiver.
+    pub async fn subscribe_with_sender(&self, filter: Filter, sender: Sender<Transaction>) {
+        self.broadcaster.subscribe(filter, sender).await;
     }
 
     /// Returns the total number of active subscribers
     pub async fn subscribers(&self) -> usize {
-        self.brodcaster.subscribers().await
+        self.broadcaster.subscribers().await
     }
 
     /// The internal usage is to select unspent payments for each account to create new
@@ -309,7 +316,7 @@ where
         batch.commit().await?;
 
         // The transaction is persisted and now it is time to broadcast it to any possible listener
-        self.brodcaster.process(transaction.clone());
+        self.broadcaster.process(transaction.clone());
 
         Ok(transaction)
     }

+ 71 - 1
utxo/src/storage/mod.rs

@@ -352,6 +352,7 @@ pub mod test {
             $crate::storage_unit_test!(spend_spendable_payments);
             $crate::storage_unit_test!(relate_account_to_transaction);
             $crate::storage_unit_test!(find_transactions_by_tags);
+            $crate::storage_unit_test!(find_transactions_by_status);
             $crate::storage_unit_test!(not_spendable_new_payments_not_spendable);
             $crate::storage_unit_test!(subscribe_realtime);
         };
@@ -613,7 +614,7 @@ pub mod test {
             status: Default::default(),
         });
 
-        let mut subscription = ledger
+        let (_, mut subscription) = ledger
             .subscribe(Filter {
                 tags: vec!["even".parse().expect("valid tag")],
                 ..Default::default()
@@ -797,6 +798,75 @@ pub mod test {
         );
     }
 
+    pub async fn find_transactions_by_status<T>(storage: T)
+    where
+        T: Storage + Send + Sync,
+    {
+        let ledger = Ledger::new(Config {
+            storage,
+            status: Default::default(),
+        });
+
+        for i in 0..10 {
+            let usd: Asset = "USD/2".parse().expect("valid asset");
+            let account = format!("account-{}", i).parse().expect("valid account");
+
+            ledger
+                .deposit(
+                    &account,
+                    usd.from_human("100.99").expect("valid amount"),
+                    if i % 2 == 0 {
+                        "even".into()
+                    } else {
+                        "odd".into()
+                    },
+                    vec![],
+                    format!("test deposit {}", i),
+                )
+                .await
+                .expect("valid deposit");
+        }
+
+        assert_eq!(
+            5,
+            ledger
+                .get_transactions(Filter {
+                    status: vec!["even".parse().expect("valid tag")],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+
+        assert_eq!(
+            5,
+            ledger
+                .get_transactions(Filter {
+                    status: vec!["odd".parse().expect("valid tag")],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+
+        assert_eq!(
+            10,
+            ledger
+                .get_transactions(Filter {
+                    status: vec![
+                        "even".parse().expect("valid tag"),
+                        "odd".parse().expect("valid tag")
+                    ],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+    }
+
     pub async fn spend_spendable_payments<T>(storage: T)
     where
         T: Storage + Send + Sync,