Kaynağa Gözat

WIP: Move primaryfilter to their

Cesar Rodas 10 ay önce
ebeveyn
işleme
3a351a6364

+ 32 - 18
utxo/src/broadcaster.rs

@@ -1,5 +1,5 @@
 //! Broadcaster implementation
-use crate::{worker::Worker, Filter, Transaction};
+use crate::{worker::Worker, Filter, FilterableValue, Transaction};
 use async_trait::async_trait;
 use std::{
     collections::HashMap,
@@ -16,7 +16,7 @@ use tokio::sync::{
 /// This structure broadcasts the transactions to all subscribers in a separated working thread.
 pub struct Broadcaster {
     subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
-    subscriptions: RwLock<HashMap<Filter, Vec<usize>>>,
+    subscriptions: RwLock<HashMap<FilterableValue, Vec<(Filter, usize)>>>,
     is_there_any_subscriber: AtomicBool,
     index: AtomicUsize,
 }
@@ -25,7 +25,7 @@ impl Default for Broadcaster {
     fn default() -> Self {
         Self {
             subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
-            subscriptions: RwLock::new(HashMap::<Filter, Vec<_>>::new()),
+            subscriptions: RwLock::new(HashMap::<FilterableValue, Vec<_>>::new()),
             is_there_any_subscriber: false.into(),
             index: 0.into(),
         }
@@ -52,10 +52,16 @@ impl Broadcaster {
         self.is_there_any_subscriber
             .store(true, std::sync::atomic::Ordering::Release);
 
-        if let Some(previous_values) = listeners.get_mut(&filter) {
-            previous_values.push(sender_index);
-        } else {
-            listeners.insert(filter, vec![sender_index]);
+        let (primary_filter, filter) = filter.get_primary_filter();
+        let key_filter: Vec<FilterableValue> = primary_filter.into();
+        println!("{:#?}", key_filter);
+
+        for key_filter in key_filter {
+            if let Some(previous_values) = listeners.get_mut(&key_filter) {
+                previous_values.push((filter.clone(), sender_index));
+            } else {
+                listeners.insert(key_filter, vec![(filter.clone(), sender_index)]);
+            }
         }
 
         receiver
@@ -78,15 +84,21 @@ impl Worker for Broadcaster {
         let mut subscriptions_to_reindex = vec![];
         let mut senders_to_remove = vec![];
 
-        for (filter, listeners) in listeners.iter() {
-            if filter.matches(&transaction.transaction, &transaction.revision) {
-                for sender_index in listeners.iter() {
+        for primary_filter in transaction.get_filterable_fields() {
+            let listeners = if let Some(listeners) = listeners.get(&primary_filter) {
+                listeners
+            } else {
+                continue;
+            };
+
+            for (filter, sender_index) in listeners {
+                if filter.matches(&transaction.transaction, &transaction.revision) {
                     if let Some(Err(TrySendError::Closed(_))) = senders
                         .get(sender_index)
                         .map(|sender| sender.try_send(transaction.clone()))
                     {
                         senders_to_remove.push(*sender_index);
-                        subscriptions_to_reindex.push(filter.clone());
+                        subscriptions_to_reindex.push(primary_filter.clone());
                     }
                 }
             }
@@ -103,13 +115,15 @@ impl Worker for Broadcaster {
                 senders.remove(to_remove);
             }
 
-            for to_rebuild in &subscriptions_to_reindex {
-                if let Some(list_of_senders) = listeners.get_mut(to_rebuild) {
-                    *list_of_senders = list_of_senders
-                        .iter()
-                        .filter(|x| senders.contains_key(*x))
-                        .copied()
-                        .collect::<Vec<_>>();
+            for to_rebuild in subscriptions_to_reindex {
+                if let Some(list_of_senders) = listeners.remove(&to_rebuild) {
+                    listeners.insert(
+                        to_rebuild,
+                        list_of_senders
+                            .into_iter()
+                            .filter(|x| senders.contains_key(&x.1))
+                            .collect::<Vec<_>>(),
+                    );
                 }
             }
 

+ 86 - 0
utxo/src/filter.rs

@@ -2,6 +2,70 @@ use crate::{AccountId, BaseTx, RevId, Revision, Status, Tag, TxId, Type};
 use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// The primary filter is used to filter the transactions before applying the other filters. Think
+/// of it like which key is used to filter transactions quickly before applying the other filters.
+pub enum PrimaryFilter {
+    /// By transaction ID
+    TxId(Vec<TxId>),
+    /// By revision ID
+    Revision(Vec<RevId>),
+    /// By accounts
+    Account(Vec<AccountId>),
+    /// By transaction type
+    Type(Vec<Type>),
+    /// By transaction status
+    Status(Vec<Status>),
+    /// By tags
+    Tags(Vec<Tag>),
+    /// By transaction status
+    Stream,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[allow(warnings)]
+/// Filterable Value
+///
+/// Individual value from a transaction, and their type, that can be used to filter a transaction,
+/// either at realtime throught the subscription API, or through the filter API in the storage layer
+pub enum FilterableValue {
+    /// Transaction ID
+    TxId(TxId),
+    /// Revision ID
+    Revision(RevId),
+    /// Account
+    Account(AccountId),
+    /// Status
+    Status(Status),
+    /// Transaction type
+    Type(Type),
+    /// Tag
+    Tag(Tag),
+    /// Any other filterable value. This is being used to subscribe to all new transactions
+    Anything,
+}
+
+impl From<PrimaryFilter> for Vec<FilterableValue> {
+    fn from(value: PrimaryFilter) -> Self {
+        match value {
+            PrimaryFilter::TxId(ids) => ids.into_iter().map(FilterableValue::TxId).collect(),
+            PrimaryFilter::Revision(revisions) => revisions
+                .into_iter()
+                .map(FilterableValue::Revision)
+                .collect(),
+            PrimaryFilter::Account(accounts) => {
+                accounts.into_iter().map(FilterableValue::Account).collect()
+            }
+            PrimaryFilter::Status(statuses) => {
+                statuses.into_iter().map(FilterableValue::Status).collect()
+            }
+            PrimaryFilter::Type(types) => types.into_iter().map(FilterableValue::Type).collect(),
+            PrimaryFilter::Tags(tags) => tags.into_iter().map(FilterableValue::Tag).collect(),
+            PrimaryFilter::Stream => vec![FilterableValue::Anything],
+        }
+    }
+}
+
 /// Filter transactions
 ///
 /// All this filters options are AND, meaning that an object must match all the
@@ -49,6 +113,28 @@ pub struct Filter {
 }
 
 impl Filter {
+    /// Extracts the PrimaryFilter from the filter and prepares the fitler to be used with `matches`
+    /// method
+    pub fn get_primary_filter(mut self) -> (PrimaryFilter, Self) {
+        let primary_filter = if !self.revisions.is_empty() {
+            PrimaryFilter::Revision(std::mem::take(&mut self.revisions))
+        } else if !self.ids.is_empty() {
+            PrimaryFilter::TxId(std::mem::take(&mut self.ids))
+        } else if !self.accounts.is_empty() {
+            PrimaryFilter::Account(std::mem::take(&mut self.accounts))
+        } else if !self.typ.is_empty() {
+            PrimaryFilter::Type(std::mem::take(&mut self.typ))
+        } else if !self.tags.is_empty() {
+            PrimaryFilter::Tags(std::mem::take(&mut self.tags))
+        } else if !self.status.is_empty() {
+            PrimaryFilter::Status(std::mem::take(&mut self.status))
+        } else {
+            PrimaryFilter::Stream
+        };
+
+        (primary_filter, self.prepare())
+    }
+
     /// Takes the filter and sorts it. This is a pre-requist for matching
     pub fn prepare(mut self) -> Self {
         self.ids.sort();

+ 1 - 1
utxo/src/lib.rs

@@ -47,7 +47,7 @@ pub use self::{
     amount::{Amount, AnyAmount, HumanAmount},
     asset::Asset,
     error::Error,
-    filter::Filter,
+    filter::{Filter, FilterableValue, PrimaryFilter},
     id::*,
     ledger::Ledger,
     payment::PaymentFrom,

+ 4 - 35
utxo/src/storage/cursor.rs

@@ -1,23 +1,5 @@
 //! Cursor implementation
-use crate::{AccountId, BaseTx, Filter, RevId, Revision, Tag, TxId, Type};
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-/// The primary filter is used to filter the transactions before applying the other filters. Think
-/// of it like which key is used to filter transactions quickly before applying the other filters.
-pub enum PrimaryFilter {
-    /// By transaction ID
-    TxId(Vec<TxId>),
-    /// By revision ID
-    Revision(Vec<RevId>),
-    /// By accounts
-    Account(Vec<AccountId>),
-    /// By transaction type
-    Type(Vec<Type>),
-    /// By tags
-    Tags(Vec<Tag>),
-    /// By transaction status
-    Stream,
-}
+use crate::{BaseTx, Filter, PrimaryFilter, Revision};
 
 #[derive(Debug, PartialEq, Eq, Hash, Clone)]
 /// The cursor
@@ -29,24 +11,11 @@ pub struct Cursor {
 }
 
 impl From<Filter> for Cursor {
-    fn from(mut filter: Filter) -> Self {
-        let primary_filter = if !filter.revisions.is_empty() {
-            PrimaryFilter::Revision(std::mem::take(&mut filter.revisions))
-        } else if !filter.ids.is_empty() {
-            PrimaryFilter::TxId(std::mem::take(&mut filter.ids))
-        } else if !filter.accounts.is_empty() {
-            PrimaryFilter::Account(std::mem::take(&mut filter.accounts))
-        } else if !filter.typ.is_empty() {
-            PrimaryFilter::Type(std::mem::take(&mut filter.typ))
-        } else if !filter.tags.is_empty() {
-            PrimaryFilter::Tags(std::mem::take(&mut filter.tags))
-        } else {
-            PrimaryFilter::Stream
-        };
-
+    fn from(filter: Filter) -> Self {
+        let (primary_filter, filter) = filter.get_primary_filter();
         Self {
             primary_filter,
-            filter: filter.prepare(),
+            filter,
         }
     }
 }

+ 1 - 2
utxo/src/storage/mod.rs

@@ -12,7 +12,7 @@ mod cursor;
 #[cfg(any(feature = "sqlite", test))]
 pub mod sqlite;
 pub use self::cache::Cache;
-pub use self::cursor::{Cursor, PrimaryFilter};
+pub use self::cursor::Cursor;
 #[cfg(any(feature = "sqlite", test))]
 pub use self::sqlite::SQLite;
 
@@ -644,7 +644,6 @@ pub mod test {
                         deposit.revision_id,
                         vec![
                             "even".parse().expect("valid tag"),
-                            "even".parse().expect("valid tag"),
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),

+ 32 - 2
utxo/src/storage/sqlite/mod.rs

@@ -1,10 +1,10 @@
 //! SQLite storage layer for Verax
-use super::{Cursor, PrimaryFilter, ReceivedPaymentStatus};
+use super::{Cursor, ReceivedPaymentStatus};
 use crate::{
     amount::AmountCents,
     storage::{Error, Storage},
     transaction::Revision,
-    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, TxId,
+    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, PrimaryFilter, RevId, TxId,
 };
 use borsh::from_slice;
 use futures::TryStreamExt;
@@ -201,6 +201,36 @@ where
             }
             query.fetch_all(executor).await
         }
+        PrimaryFilter::Status(statuses) => {
+            let sql = format!(
+                r#"
+                  SELECT
+                      "bt"."blob",
+                      "b"."blob"
+                  FROM
+                      "transactions" as "t",
+                      "base_transactions" as "bt",
+                      "revisions" as "b"
+                  WHERE
+                      "b"."status" IN ({})
+                      AND "t"."revision_id" = "b"."revision_id"
+                      AND "t"."transaction_id" = "bt"."transaction_id"
+                      {} {}
+                  ORDER BY "t"."created_at" DESC
+                  LIMIT {} OFFSET {}
+                  "#,
+                "?,".repeat(statuses.len()).trim_end_matches(","),
+                since,
+                until,
+                limit,
+                cursor.filter.skip
+            );
+            let mut query = sqlx::query(&sql);
+            for status in statuses.iter() {
+                query = query.bind(status.to_string());
+            }
+            query.fetch_all(executor).await
+        }
         PrimaryFilter::Stream => {
             let sql = format!(
                 r#"

+ 24 - 2
utxo/src/transaction/mod.rs

@@ -1,6 +1,6 @@
 use crate::{
-    config::Config, payment::PaymentTo, storage::Storage, AccountId, Amount, MaxLengthString,
-    PaymentFrom, RevId, Status, TxId,
+    config::Config, payment::PaymentTo, storage::Storage, AccountId, Amount, FilterableValue,
+    MaxLengthString, PaymentFrom, RevId, Status, TxId,
 };
 use chrono::{DateTime, TimeZone, Utc};
 use serde::{Deserialize, Serialize};
@@ -120,6 +120,28 @@ impl Transaction {
         })
     }
 
+    /// Returns the filterable fields
+    pub fn get_filterable_fields(&self) -> Vec<FilterableValue> {
+        let mut filters = vec![
+            FilterableValue::Anything,
+            FilterableValue::Type(self.transaction.typ.clone()),
+            FilterableValue::TxId(self.id.clone()),
+            FilterableValue::Status(self.revision.status.clone()),
+            FilterableValue::Revision(self.revision_id.clone()),
+            FilterableValue::Type(self.typ),
+        ];
+
+        for account in self.accounts() {
+            filters.push(FilterableValue::Account(account));
+        }
+
+        for tag in &self.revision.tags {
+            filters.push(FilterableValue::Tag(tag.clone()));
+        }
+
+        filters
+    }
+
     /// Creates a new external deposit transaction
     ///
     /// All transactions must be balanced, same amounts that are spent should be