瀏覽代碼

More changes to the realtime API

Worked on the cleanup of the subscription API

Here is the recording of the PR making process: https://www.youtube.com/watch?v=NXvJN3X3zjg
Cesar Rodas 10 月之前
父節點
當前提交
68a851b7f9
共有 6 個文件被更改,包括 117 次插入26 次删除
  1. 4 1
      utxo/src/filter.rs
  2. 71 23
      utxo/src/ledger.rs
  3. 31 0
      utxo/src/storage/mod.rs
  4. 1 0
      utxo/src/tests/deposit.rs
  5. 7 1
      utxo/src/tests/mod.rs
  6. 3 1
      utxo/src/transaction/mod.rs

+ 4 - 1
utxo/src/filter.rs

@@ -1,4 +1,4 @@
-use crate::{AccountId, BaseTx, RevId, Revision, Tag, TxId, Type};
+use crate::{AccountId, BaseTx, RevId, Revision, Status, Tag, TxId, Type};
 use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 
@@ -20,6 +20,9 @@ pub struct Filter {
     /// List of transaction types-kind
     #[serde(rename = "type", skip_serializing_if = "Vec::is_empty")]
     pub typ: Vec<Type>,
+    /// List of statuses to query
+    #[serde(skip_serializing_if = "Vec::is_empty")]
+    pub status: Vec<Status>,
     /// List of transactions by tags
     #[serde(skip_serializing_if = "Vec::is_empty")]
     pub tags: Vec<Tag>,

+ 71 - 23
utxo/src/ledger.rs

@@ -3,9 +3,13 @@ use crate::{
     transaction::Type, AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status,
     Tag, Transaction, TxId,
 };
-use std::{cmp::Ordering, collections::HashMap, sync::Arc};
+use std::{
+    cmp::Ordering,
+    collections::HashMap,
+    sync::{atomic::AtomicUsize, Arc},
+};
 use tokio::sync::{
-    mpsc::{self, Receiver, Sender},
+    mpsc::{self, error::TrySendError, Receiver, Sender},
     RwLock,
 };
 
@@ -16,7 +20,9 @@ where
     S: Storage + Sync + Send,
 {
     config: Config<S>,
-    listeners: RwLock<HashMap<Filter, Vec<Sender<Transaction>>>>,
+    senders: RwLock<HashMap<usize, Sender<Transaction>>>,
+    subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
+    sender_index: AtomicUsize,
 }
 
 impl<S> Ledger<S>
@@ -27,25 +33,38 @@ where
     pub fn new(config: Config<S>) -> Arc<Self> {
         Arc::new(Self {
             config,
-            listeners: RwLock::new(HashMap::new()),
+            senders: RwLock::new(HashMap::new()),
+            sender_index: AtomicUsize::new(0),
+            subscriptions: RwLock::new(HashMap::new()),
         })
     }
 
-    /// Creates a new subscription for future transactions
+    /// Subscribes to new transactions and revisions with a given filter
     pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
         let (sender, receiver) = mpsc::channel(100);
-        let mut listeners = self.listeners.write().await;
+        let mut listeners = self.subscriptions.write().await;
         let filter = filter.prepare();
 
+        let sender_index = self
+            .sender_index
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        self.senders.write().await.insert(sender_index, sender);
+
         if let Some(previous_values) = listeners.get_mut(&filter) {
-            previous_values.push(sender);
+            previous_values.push(sender_index);
         } else {
-            listeners.insert(filter, vec![sender]);
+            listeners.insert(filter, vec![sender_index]);
         }
 
         receiver
     }
 
+    /// Returns the total number of active subscribers
+    pub async fn subscribers(&self) -> usize {
+        self.senders.read().await.len()
+    }
+
     /// The internal usage is to select unspent payments for each account to
     /// create new transactions. The external API however does not expose that
     /// level of usage, instead it exposes a simple API to move funds using
@@ -187,24 +206,51 @@ where
     async fn persist(&self, mut transaction: Transaction) -> Result<Transaction, Error> {
         transaction.persist(&self.config).await?;
 
-        let listeners = self.listeners.read().await;
-        for (filter, listeners) in listeners.iter() {
-            println!(
-                "filter {:#?} -> {:#?} -> {}",
-                filter,
-                transaction.revision.tags,
-                filter.matches(&transaction.transaction, &transaction.revision)
-            );
-            if filter.matches(&transaction.transaction, &transaction.revision) {
-                for listener in listeners.iter() {
-                    listener
-                        .send(transaction.clone())
-                        .await
-                        .expect("listener failed");
+        {
+            // TODO: Move this block of code to a different thread so spreading messages won't make
+            // creating transaction slower
+            let listeners = self.subscriptions.read().await;
+            let senders = self.senders.read().await;
+
+            let mut subscriptions_to_reindex = vec![];
+            let mut senders_to_remove = vec![];
+
+            for (filter, listeners) in listeners.iter() {
+                if filter.matches(&transaction.transaction, &transaction.revision) {
+                    for sender_index in listeners.iter() {
+                        if let Some(Err(TrySendError::Closed(_))) = senders
+                            .get(sender_index)
+                            .map(|sender| sender.try_send(transaction.clone()))
+                        {
+                            senders_to_remove.push(*sender_index);
+                            subscriptions_to_reindex.push(filter.clone());
+                        }
+                    }
                 }
             }
-        }
 
+            drop(listeners);
+            drop(senders);
+
+            if !senders_to_remove.is_empty() {
+                let mut listeners = self.subscriptions.write().await;
+                let mut senders = self.senders.write().await;
+
+                for to_remove in &senders_to_remove {
+                    senders.remove(to_remove);
+                }
+
+                for to_rebuild in &subscriptions_to_reindex {
+                    if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
+                        *list_of_senders = list_of_senders
+                            .into_iter()
+                            .filter(|x| senders.contains_key(*x))
+                            .map(|x| *x)
+                            .collect::<Vec<_>>();
+                    }
+                }
+            }
+        };
         Ok(transaction)
     }
 
@@ -264,11 +310,13 @@ where
         account: &AccountId,
         amount: Amount,
         status: Status,
+        tags: Vec<Tag>,
         reference: String,
     ) -> Result<Transaction, Error> {
         self.persist(Transaction::new_external_deposit(
             reference,
             status,
+            tags,
             vec![(account.clone(), amount)],
         )?)
         .await

+ 31 - 0
utxo/src/storage/mod.rs

@@ -370,6 +370,7 @@ pub mod test {
         let mut pending = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "pending".into(),
+            vec![],
             vec![(
                 "alice".parse().expect("account"),
                 asset.from_human("100.99").expect("valid amount"),
@@ -403,6 +404,7 @@ pub mod test {
         let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "settled".into(),
+            vec![],
             vec![(
                 "alice".parse().expect("account"),
                 asset.from_human("100.99").expect("valid amount"),
@@ -465,6 +467,7 @@ pub mod test {
         let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "settled".into(),
+            vec![],
             vec![(
                 "alice".parse().expect("account"),
                 usd.from_human("100.99").expect("valid amount"),
@@ -612,6 +615,8 @@ pub mod test {
             })
             .await;
 
+        assert_eq!(1, ledger.subscribers().await);
+
         for i in 0..10 {
             let usd: Asset = "USD/2".parse().expect("valid asset");
             let account = format!("account-{}", i).parse().expect("valid account");
@@ -622,6 +627,7 @@ pub mod test {
                     usd.from_human(&format!("10{}.99", i))
                         .expect("valid amount"),
                     "settled".into(),
+                    vec![],
                     format!("test deposit {}", i),
                 )
                 .await
@@ -657,6 +663,7 @@ pub mod test {
 
         let expectactions = vec!["100.99", "102.99", "104.99", "106.99", "108.99", ""];
 
+        assert_eq!(1, ledger.subscribers().await);
         for expectation in expectactions {
             assert_eq!(
                 expectation.to_string(),
@@ -666,6 +673,28 @@ pub mod test {
                     .unwrap_or_default()
             )
         }
+        assert_eq!(1, ledger.subscribers().await);
+
+        drop(subscription);
+
+        // TODO: Update this test when the drop() triggers the subscribers hashmap to cleanup
+        assert_eq!(1, ledger.subscribers().await);
+
+        let usd: Asset = "USD/2".parse().expect("valid asset");
+        let account = "account-99".parse().expect("valid account");
+
+        ledger
+            .deposit(
+                &account,
+                usd.from_human(&"1010.99").expect("valid amount"),
+                "settled".into(),
+                vec!["even".parse().expect("valid tag")],
+                "test deposit after the subscription listener is dropped".to_owned(),
+            )
+            .await
+            .expect("valid deposit");
+
+        assert_eq!(0, ledger.subscribers().await);
     }
 
     pub async fn find_transactions_by_tags<T>(storage: T)
@@ -686,6 +715,7 @@ pub mod test {
                     &account,
                     usd.from_human("100.99").expect("valid amount"),
                     "settled".into(),
+                    vec![],
                     format!("test deposit {}", i),
                 )
                 .await
@@ -950,6 +980,7 @@ pub mod test {
         let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "settled".into(),
+            vec![],
             vec![(
                 account1.clone(),
                 usd.from_human("100.99").expect("valid amount"),

+ 1 - 0
utxo/src/tests/deposit.rs

@@ -11,6 +11,7 @@ async fn pending_deposit_and_failure() {
             &source,
             usd.from_human("30").expect("amount"),
             "processing".into(),
+            vec![],
             "Test".to_owned(),
         )
         .await

+ 7 - 1
utxo/src/tests/mod.rs

@@ -52,7 +52,13 @@ where
     S: Storage + Send + Sync,
 {
     ledger
-        .deposit(account_id, amount, "settled".into(), "Test".to_owned())
+        .deposit(
+            account_id,
+            amount,
+            "settled".into(),
+            vec![],
+            "Test".to_owned(),
+        )
         .await
         .expect("valid tx")
         .revision_id

+ 3 - 1
utxo/src/transaction/mod.rs

@@ -131,14 +131,16 @@ impl Transaction {
     pub fn new_external_deposit(
         reference: String,
         status: Status,
+        tags: Vec<Tag>,
         creates: Vec<(AccountId, Amount)>,
     ) -> Result<Self, Error> {
         let creates = creates
             .into_iter()
             .map(|(to, amount)| PaymentTo { to, amount })
             .collect();
-        let (transaction, revision) =
+        let (transaction, mut revision) =
             BaseTx::new(Vec::new(), creates, reference, Type::Deposit, status)?;
+        revision.tags = tags;
         let revision_id = revision.rev_id()?;
 
         Ok(Self {