Cesar Rodas 10 ヶ月 前
コミット
cf4ccd836e
3 ファイル変更90 行追加38 行削除
  1. 40 24
      utxo/src/broadcaster.rs
  2. 27 12
      utxo/src/storage/cursor.rs
  3. 23 2
      utxo/src/transaction/mod.rs

+ 40 - 24
utxo/src/broadcaster.rs

@@ -1,5 +1,9 @@
 //! Broadcaster implementation
-use crate::{worker::Worker, Filter, Transaction};
+use crate::{
+    storage::{Cursor, PrimaryFilter},
+    worker::Worker,
+    Filter, Transaction,
+};
 use async_trait::async_trait;
 use std::{
     collections::HashMap,
@@ -16,7 +20,7 @@ use tokio::sync::{
 /// This structure broadcasts the transactions to all subscribers in a separated working thread.
 pub struct Broadcaster {
     subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
-    subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
+    subscriptions: RwLock<HashMap<PrimaryFilter, Vec<(Filter, usize)>>>,
     is_there_any_subscriber: AtomicBool,
     index: AtomicUsize,
 }
@@ -25,7 +29,7 @@ impl Default for Broadcaster {
     fn default() -> Self {
         Self {
             subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
-            subscriptions: RwLock::new(HashMap::<Filter, Vec<_>>::new()),
+            subscriptions: RwLock::new(HashMap::<PrimaryFilter, Vec<_>>::new()),
             is_there_any_subscriber: false.into(),
             index: 0.into(),
         }
@@ -52,10 +56,14 @@ impl Broadcaster {
         self.is_there_any_subscriber
             .store(true, std::sync::atomic::Ordering::Release);
 
-        if let Some(previous_values) = listeners.get_mut(&filter) {
-            previous_values.push(sender_index);
-        } else {
-            listeners.insert(filter, vec![sender_index]);
+        let cursor: Cursor = filter.into();
+
+        for primary_filter in cursor.primary_filter.into_iter() {
+            if let Some(previous_values) = listeners.get_mut(&primary_filter) {
+                previous_values.push((cursor.filter.clone(), sender_index));
+            } else {
+                listeners.insert(primary_filter, vec![(cursor.filter.clone(), sender_index)]);
+            }
         }
 
         receiver
@@ -78,16 +86,22 @@ impl Worker for Broadcaster {
         let mut subscriptions_to_reindex = vec![];
         let mut senders_to_remove = vec![];
 
-        for (filter, listeners) in listeners.iter() {
-            if filter.matches(&transaction.transaction, &transaction.revision) {
-                for sender_index in listeners.iter() {
-                    if let Some(Err(TrySendError::Closed(_))) = senders
-                        .get(sender_index)
-                        .map(|sender| sender.try_send(transaction.clone()))
-                    {
-                        senders_to_remove.push(*sender_index);
-                        subscriptions_to_reindex.push(filter.clone());
-                    }
+        for primary_filter in transaction.get_primary_filters().into_iter() {
+            let filters = if let Some(filters) = listeners.get(&primary_filter) {
+                filters
+            } else {
+                continue;
+            };
+            for (filter, sender_index) in filters.iter() {
+                if !filter.matches(&transaction.transaction, &transaction.revision) {
+                    continue;
+                }
+                if let Some(Err(TrySendError::Closed(_))) = senders
+                    .get(sender_index)
+                    .map(|sender| sender.try_send(transaction.clone()))
+                {
+                    senders_to_remove.push(*sender_index);
+                    subscriptions_to_reindex.push((primary_filter.clone(), filter.clone()));
                 }
             }
         }
@@ -103,13 +117,15 @@ impl Worker for Broadcaster {
                 senders.remove(to_remove);
             }
 
-            for to_rebuild in &subscriptions_to_reindex {
-                if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
-                    *list_of_senders = list_of_senders
-                        .iter()
-                        .filter(|x| senders.contains_key(*x))
-                        .copied()
-                        .collect::<Vec<_>>();
+            for to_rebuild in subscriptions_to_reindex {
+                if let Some(list_of_senders) = listeners.remove(&to_rebuild.0) {
+                    listeners.insert(
+                        to_rebuild.0,
+                        list_of_senders
+                            .into_iter()
+                            .filter(|x| senders.contains_key(&x.1))
+                            .collect::<Vec<_>>(),
+                    );
                 }
             }
 

+ 27 - 12
utxo/src/storage/cursor.rs

@@ -6,15 +6,15 @@ use crate::{AccountId, BaseTx, Filter, RevId, Revision, Tag, TxId, Type};
 /// of it like which key is used to filter transactions quickly before applying the other filters.
 pub enum PrimaryFilter {
     /// By transaction ID
-    TxId(Vec<TxId>),
+    TxId(TxId),
     /// By revision ID
-    Revision(Vec<RevId>),
+    Revision(RevId),
     /// By accounts
-    Account(Vec<AccountId>),
+    Account(AccountId),
     /// By transaction type
-    Type(Vec<Type>),
+    Type(Type),
     /// By tags
-    Tags(Vec<Tag>),
+    Tags(Tag),
     /// By transaction status
     Stream,
 }
@@ -23,7 +23,7 @@ pub enum PrimaryFilter {
 /// The cursor
 pub struct Cursor {
     /// The primary filter
-    pub primary_filter: PrimaryFilter,
+    pub primary_filter: Vec<PrimaryFilter>,
     /// The secondary filter
     pub filter: Filter,
 }
@@ -31,17 +31,32 @@ pub struct Cursor {
 impl From<Filter> for Cursor {
     fn from(mut filter: Filter) -> Self {
         let primary_filter = if !filter.revisions.is_empty() {
-            PrimaryFilter::Revision(std::mem::take(&mut filter.revisions))
+            std::mem::take(&mut filter.revisions)
+                .into_iter()
+                .map(|r| PrimaryFilter::Revision(r))
+                .collect()
         } else if !filter.ids.is_empty() {
-            PrimaryFilter::TxId(std::mem::take(&mut filter.ids))
+            std::mem::take(&mut filter.ids)
+                .into_iter()
+                .map(|id| PrimaryFilter::TxId(id))
+                .collect()
         } else if !filter.accounts.is_empty() {
-            PrimaryFilter::Account(std::mem::take(&mut filter.accounts))
+            std::mem::take(&mut filter.accounts)
+                .into_iter()
+                .map(|account| PrimaryFilter::Account(account))
+                .collect()
         } else if !filter.typ.is_empty() {
-            PrimaryFilter::Type(std::mem::take(&mut filter.typ))
+            std::mem::take(&mut filter.typ)
+                .into_iter()
+                .map(|typ| PrimaryFilter::Type(typ))
+                .collect()
         } else if !filter.tags.is_empty() {
-            PrimaryFilter::Tags(std::mem::take(&mut filter.tags))
+            std::mem::take(&mut filter.tags)
+                .into_iter()
+                .map(|tag| PrimaryFilter::Tags(tag))
+                .collect()
         } else {
-            PrimaryFilter::Stream
+            vec![PrimaryFilter::Stream]
         };
 
         Self {

+ 23 - 2
utxo/src/transaction/mod.rs

@@ -1,6 +1,8 @@
 use crate::{
-    config::Config, payment::PaymentTo, storage::Storage, AccountId, Amount, MaxLengthString,
-    PaymentFrom, RevId, Status, TxId,
+    config::Config,
+    payment::PaymentTo,
+    storage::{PrimaryFilter, Storage},
+    AccountId, Amount, MaxLengthString, PaymentFrom, RevId, Status, TxId,
 };
 use chrono::{DateTime, TimeZone, Utc};
 use serde::{Deserialize, Serialize};
@@ -120,6 +122,25 @@ impl Transaction {
         })
     }
 
+    /// Get the primary filters for this transaction
+    pub fn get_primary_filters(&self) -> Vec<PrimaryFilter> {
+        let mut filters = vec![
+            PrimaryFilter::TxId(self.id.clone()),
+            PrimaryFilter::Revision(self.revision_id.clone()),
+            PrimaryFilter::Type(self.typ.clone()),
+        ];
+
+        for account in self.accounts() {
+            filters.push(PrimaryFilter::Account(account));
+        }
+
+        for tag in self.revision.tags.clone() {
+            filters.push(PrimaryFilter::Tags(tag));
+        }
+
+        filters
+    }
+
     /// Creates a new external deposit transaction
     ///
     /// All transactions must be balanced, same amounts that are spent should be