瀏覽代碼

WIP: Working on having the subscription API

This API will allow third party to subscribe to new transactions and revisions
that matches a required filter.

TODO: Move the APIs to persist to a single place in the ledger, let the
Transaction do the validation and return the new revision.

TODO: Move the broadcasting of events to their own dedicated thread

TODO: Remove stale listeners
Cesar Rodas 10 月之前
父節點
當前提交
73bcc4d307

+ 49 - 2
utxo/src/filter.rs

@@ -1,4 +1,4 @@
-use crate::{AccountId, RevId, Tag, TxId, Type};
+use crate::{AccountId, BaseTx, RevId, Revision, Tag, TxId, Type};
 use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
 ///
 /// All this filters options are AND, meaning that an object must match all the
 /// requirements in order to be included in the resultset.
-#[derive(Clone, Debug, Serialize, Deserialize, Default)]
+#[derive(Clone, Debug, Serialize, PartialEq, Eq, Hash, Deserialize, Default)]
 pub struct Filter {
     /// List of transaction IDs to query
     #[serde(skip_serializing_if = "Vec::is_empty")]
@@ -46,6 +46,53 @@ pub struct Filter {
 }
 
 impl Filter {
+    /// Takes the filter and sorts it. This is a pre-requist for matching
+    pub fn prepare(mut self) -> Self {
+        self.ids.sort();
+        self.revisions.sort();
+        self.accounts.sort();
+        self.typ.sort();
+        self.tags.sort();
+        self
+    }
+
+    /// Check if the base transaction and the revision matches the current cursor filter
+    ///
+    /// Before this function is called, the filter needs to be prepareted with `prepare` function
+    pub fn matches(&self, base: &BaseTx, revision: &Revision) -> bool {
+        if !self.ids.is_empty() && self.ids.binary_search(&revision.transaction_id).is_err() {
+            return false;
+        }
+
+        if !self.revisions.is_empty()
+            && self
+                .revisions
+                .binary_search(&revision.rev_id().expect("vv"))
+                .is_err()
+        {
+            return false;
+        }
+
+        if !self.typ.is_empty() && self.typ.binary_search(&base.typ).is_err() {
+            return false;
+        }
+
+        if !self.tags.is_empty() {
+            let mut found = false;
+            for tag in revision.tags.iter() {
+                if self.tags.binary_search(tag).is_ok() {
+                    found = true;
+                    break;
+                }
+            }
+            if !found {
+                return false;
+            }
+        }
+
+        true
+    }
+
     /// Adds a given kind to the filter
     pub fn kind(mut self, kind: Type) -> Self {
         self.typ.push(kind);

+ 80 - 42
utxo/src/ledger.rs

@@ -4,6 +4,10 @@ use crate::{
     Tag, Transaction, TxId,
 };
 use std::{cmp::Ordering, collections::HashMap, sync::Arc};
+use tokio::sync::{
+    mpsc::{self, Receiver, Sender},
+    RwLock,
+};
 
 /// The Verax ledger
 #[derive(Debug)]
@@ -11,18 +15,8 @@ pub struct Ledger<S>
 where
     S: Storage + Sync + Send,
 {
-    config: Arc<Config<S>>,
-}
-
-impl<S> Clone for Ledger<S>
-where
-    S: Storage + Sync + Send,
-{
-    fn clone(&self) -> Self {
-        Self {
-            config: self.config.clone(),
-        }
-    }
+    config: Config<S>,
+    listeners: RwLock<HashMap<Filter, Vec<Sender<Transaction>>>>,
 }
 
 impl<S> Ledger<S>
@@ -30,10 +24,26 @@ where
     S: Storage + Sync + Send,
 {
     /// Creates a new ledger instance
-    pub fn new(config: Config<S>) -> Self {
-        Self {
-            config: Arc::new(config),
+    pub fn new(config: Config<S>) -> Arc<Self> {
+        Arc::new(Self {
+            config,
+            listeners: RwLock::new(HashMap::new()),
+        })
+    }
+
+    /// Creates a new subscription for future transactions
+    pub async fn subscribe(&self, filter: Filter) -> Receiver<Transaction> {
+        let (sender, receiver) = mpsc::channel(100);
+        let mut listeners = self.listeners.write().await;
+        let filter = filter.prepare();
+
+        if let Some(previous_values) = listeners.get_mut(&filter) {
+            previous_values.push(sender);
+        } else {
+            listeners.insert(filter, vec![sender]);
         }
+
+        receiver
     }
 
     /// The internal usage is to select unspent payments for each account to
@@ -48,9 +58,9 @@ where
     ///
     /// This function returns a vector of payments to be used as inputs and
     /// optionally a dependent transaction to be executed first. This
-    /// transaction is an internal transaction and it settles immediately. It is
-    /// used to split an existing payment into two payments, one to be used as
-    /// input and the other to be used as change. This is done to avoid locking
+    /// transaction is an internal transaction and it settles immediately. It
+    /// isutxo/src/storage/cache/mod.rs used to split an existing payment into two payments, one
+    /// to be used as input and the other to be used as change. This is done to avoid locking
     /// any change amount until the main transaction settles.
     async fn select_payments_from_accounts(
         &self,
@@ -172,6 +182,32 @@ where
         Ok((exchange_tx, payments))
     }
 
+    /// TODO: Move the whole logic of persisting the transaction and the revision into this layer,
+    /// instead of having them at the transaction layer
+    async fn persist(&self, mut transaction: Transaction) -> Result<Transaction, Error> {
+        transaction.persist(&self.config).await?;
+
+        let listeners = self.listeners.read().await;
+        for (filter, listeners) in listeners.iter() {
+            println!(
+                "filter {:#?} -> {:#?} -> {}",
+                filter,
+                transaction.revision.tags,
+                filter.matches(&transaction.transaction, &transaction.revision)
+            );
+            if filter.matches(&transaction.transaction, &transaction.revision) {
+                for listener in listeners.iter() {
+                    listener
+                        .send(transaction.clone())
+                        .await
+                        .expect("listener failed");
+                }
+            }
+        }
+
+        Ok(transaction)
+    }
+
     /// Creates a new transaction and returns it.
     ///
     /// The input is pretty simple, take this amounts from these given accounts
@@ -200,13 +236,11 @@ where
         to: Vec<(AccountId, Amount)>,
     ) -> Result<Transaction, Error> {
         let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
-        if let Some(mut change_tx) = change_transaction {
-            change_tx.persist(&self.config).await?;
+        if let Some(change_tx) = change_transaction {
+            self.persist(change_tx).await?;
         }
-        let mut transaction =
-            Transaction::new(reference, status, Type::Transaction, payments, to).await?;
-        transaction.persist(&self.config).await?;
-        Ok(transaction)
+        self.persist(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
+            .await
     }
 
     /// Return the balances from a given account
@@ -232,10 +266,12 @@ where
         status: Status,
         reference: String,
     ) -> Result<Transaction, Error> {
-        let mut transaction =
-            Transaction::new_external_deposit(reference, status, vec![(account.clone(), amount)])?;
-        transaction.persist(&self.config).await?;
-        Ok(transaction)
+        self.persist(Transaction::new_external_deposit(
+            reference,
+            status,
+            vec![(account.clone(), amount)],
+        )?)
+        .await
     }
 
     /// Creates a new withdrawal transaction and returns it.
@@ -254,12 +290,13 @@ where
         let (change_transactions, payments) = self
             .select_payments_from_accounts(vec![(account.clone(), amount)])
             .await?;
-        for mut change_tx in change_transactions.into_iter() {
-            change_tx.persist(&self.config).await?;
+        for change_tx in change_transactions.into_iter() {
+            self.persist(change_tx).await?;
         }
-        let mut transaction = Transaction::new_external_withdrawal(reference, status, payments)?;
-        transaction.persist(&self.config).await?;
-        Ok(transaction)
+        self.persist(Transaction::new_external_withdrawal(
+            reference, status, payments,
+        )?)
+        .await
     }
 
     /// Returns the payment object by a given payment id
@@ -306,15 +343,16 @@ where
             limit: 1,
             ..Default::default()
         };
-        Ok(self
-            .config
-            .storage
-            .find(filter)
-            .await?
-            .pop()
-            .ok_or(Error::TxNotFound)?
-            .set_tags(&self.config, tags, reason)
-            .await?)
+        self.persist(
+            self.config
+                .storage
+                .find(filter)
+                .await?
+                .pop()
+                .ok_or(Error::TxNotFound)?
+                .set_tags(tags, reason)?,
+        )
+        .await
     }
 
     /// Attempts to change the status of a given transaction id. On success the

+ 1 - 1
utxo/src/storage/cache/mod.rs

@@ -149,7 +149,7 @@ mod test {
 
     storage_test_suite!();
 
-    pub async fn get_ledger_and_asset_manager() -> Ledger<Cache<SQLite>> {
+    pub async fn get_ledger_and_asset_manager() -> Arc<Ledger<Cache<SQLite>>> {
         let pool = SqlitePoolOptions::new()
             .max_connections(1)
             .idle_timeout(None)

+ 5 - 48
utxo/src/storage/cursor.rs

@@ -1,7 +1,7 @@
 //! Cursor implementation
 use crate::{AccountId, BaseTx, Filter, RevId, Revision, Tag, TxId, Type};
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 /// The primary filter is used to filter the transactions before applying the other filters. Think
 /// of it like which key is used to filter transactions quickly before applying the other filters.
 pub enum PrimaryFilter {
@@ -19,7 +19,7 @@ pub enum PrimaryFilter {
     Stream,
 }
 
-#[derive(Debug, Clone)]
+#[derive(Debug, PartialEq, Eq, Hash, Clone)]
 /// The cursor
 pub struct Cursor {
     /// The primary filter
@@ -44,59 +44,16 @@ impl From<Filter> for Cursor {
             PrimaryFilter::Stream
         };
 
-        filter.ids.sort();
-        filter.revisions.sort();
-        filter.accounts.sort();
-        filter.typ.sort();
-        filter.tags.sort();
-
         Self {
             primary_filter,
-            filter,
+            filter: filter.prepare(),
         }
     }
 }
 
 impl Cursor {
     /// Check if the base transaction and the revision matches the current cursor filter
-    pub fn matches(&self, base: &BaseTx, revision: &Revision) -> bool {
-        if !self.filter.ids.is_empty()
-            && self
-                .filter
-                .ids
-                .binary_search(&revision.transaction_id)
-                .is_err()
-        {
-            return false;
-        }
-
-        if !self.filter.revisions.is_empty()
-            && self
-                .filter
-                .revisions
-                .binary_search(&revision.rev_id().expect("vv"))
-                .is_err()
-        {
-            return false;
-        }
-
-        if !self.filter.typ.is_empty() && self.filter.typ.binary_search(&base.typ).is_err() {
-            return false;
-        }
-
-        if !self.filter.tags.is_empty() {
-            let mut found = false;
-            for tag in revision.tags.iter() {
-                if self.filter.tags.binary_search(tag).is_ok() {
-                    found = true;
-                    break;
-                }
-            }
-            if !found {
-                return false;
-            }
-        }
-
-        true
+    pub fn matches(&self, base_tx: &BaseTx, revision: &Revision) -> bool {
+        self.filter.matches(base_tx, revision)
     }
 }

+ 73 - 0
utxo/src/storage/mod.rs

@@ -353,6 +353,7 @@ pub mod test {
             $crate::storage_unit_test!(relate_account_to_transaction);
             $crate::storage_unit_test!(find_transactions_by_tags);
             $crate::storage_unit_test!(not_spendable_new_payments_not_spendable);
+            $crate::storage_unit_test!(subscribe_realtime);
         };
     }
 
@@ -595,6 +596,78 @@ pub mod test {
         }
     }
 
+    pub async fn subscribe_realtime<T>(storage: T)
+    where
+        T: Storage + Send + Sync,
+    {
+        let ledger = Ledger::new(Config {
+            storage,
+            status: Default::default(),
+        });
+
+        let mut subscription = ledger
+            .subscribe(Filter {
+                tags: vec!["even".parse().expect("valid tag")],
+                ..Default::default()
+            })
+            .await;
+
+        for i in 0..10 {
+            let usd: Asset = "USD/2".parse().expect("valid asset");
+            let account = format!("account-{}", i).parse().expect("valid account");
+
+            let deposit = ledger
+                .deposit(
+                    &account,
+                    usd.from_human(&format!("10{}.99", i))
+                        .expect("valid amount"),
+                    "settled".into(),
+                    format!("test deposit {}", i),
+                )
+                .await
+                .expect("valid deposit");
+
+            if i % 2 == 0 {
+                ledger
+                    .set_tags(
+                        deposit.revision_id,
+                        vec![
+                            "even".parse().expect("valid tag"),
+                            "even".parse().expect("valid tag"),
+                            "all".parse().expect("valid tag"),
+                        ],
+                        "add tags".to_owned(),
+                    )
+                    .await
+                    .expect("tag tx");
+            } else {
+                ledger
+                    .set_tags(
+                        deposit.revision_id,
+                        vec![
+                            "odd".parse().expect("valid tag"),
+                            "all".parse().expect("valid tag"),
+                        ],
+                        "add tags".to_owned(),
+                    )
+                    .await
+                    .expect("tag tx");
+            }
+        }
+
+        let expectactions = vec!["100.99", "102.99", "104.99", "106.99", "108.99", ""];
+
+        for expectation in expectactions {
+            assert_eq!(
+                expectation.to_string(),
+                subscription
+                    .try_recv()
+                    .map(|t| t.creates[0].amount.to_string())
+                    .unwrap_or_default()
+            )
+        }
+    }
+
     pub async fn find_transactions_by_tags<T>(storage: T)
     where
         T: Storage + Send + Sync,

+ 3 - 2
utxo/src/tests/mod.rs

@@ -3,9 +3,10 @@ use crate::{
     AccountId, Amount, Error, Ledger, RevId, Status,
 };
 use sqlx::sqlite::SqlitePoolOptions;
+use std::sync::Arc;
 
 #[allow(unused)]
-pub async fn get_file_asset_manager_and_ledger<'a>(name: &str) -> Ledger<SQLite> {
+pub async fn get_file_asset_manager_and_ledger<'a>(name: &str) -> Arc<Ledger<SQLite>> {
     let pool = SqlitePoolOptions::new()
         .max_connections(1)
         .idle_timeout(None)
@@ -19,7 +20,7 @@ pub async fn get_file_asset_manager_and_ledger<'a>(name: &str) -> Ledger<SQLite>
     Ledger::new(db.into())
 }
 
-pub async fn get_asset_manager_and_ledger() -> Ledger<SQLite> {
+pub async fn get_asset_manager_and_ledger() -> Arc<Ledger<SQLite>> {
     let pool = SqlitePoolOptions::new()
         .max_connections(1)
         .idle_timeout(None)

+ 3 - 13
utxo/src/transaction/mod.rs

@@ -172,15 +172,7 @@ impl Transaction {
     }
 
     /// Updates the transaction tags
-    pub async fn set_tags<S>(
-        self,
-        config: &Config<S>,
-        new_tags: Vec<Tag>,
-        reason: String,
-    ) -> Result<Self, Error>
-    where
-        S: Storage + Sync + Send,
-    {
+    pub fn set_tags(self, new_tags: Vec<Tag>, reason: String) -> Result<Self, Error> {
         let new_revision = Revision {
             transaction_id: self.revision.transaction_id,
             changelog: reason,
@@ -193,15 +185,13 @@ impl Transaction {
         let mut revisions = self.revisions;
         revisions.push(revision_id.clone());
 
-        let mut new_transaction = Transaction {
+        Ok(Transaction {
             id: self.id,
             revisions,
             revision_id,
             transaction: self.transaction,
             revision: new_revision,
-        };
-        new_transaction.persist(config).await?;
-        Ok(new_transaction)
+        })
     }
 
     /// Prepares a new revision to change the transaction status

+ 1 - 0
utxo/src/transaction/typ.rs

@@ -13,6 +13,7 @@ pub enum Error {
     PartialEq,
     PartialOrd,
     Ord,
+    Hash,
     Eq,
     Serialize,
     Deserialize,