Эх сурвалжийг харах

Working on a universal filter

Cesar Rodas 11 сар өмнө
parent
commit
54d7d01787

+ 2 - 0
utxo/src/lib.rs

@@ -27,6 +27,7 @@ mod amount;
 mod asset;
 mod config;
 mod error;
+mod filter;
 mod id;
 mod ledger;
 mod payment;
@@ -44,6 +45,7 @@ pub use self::{
     amount::{Amount, AnyAmount, HumanAmount},
     asset::Asset,
     error::Error,
+    filter::Filter,
     id::*,
     ledger::Ledger,
     payment::PaymentFrom,

+ 9 - 1
utxo/src/storage/cache/mod.rs

@@ -2,7 +2,8 @@
 use crate::{
     amount::AmountCents,
     storage::{self, Error, FilterId},
-    AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, RevId, Revision, Tag, TxId, Type,
+    AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, RevId, Revision, Tag, Transaction,
+    TxId, Type,
 };
 use std::{collections::HashMap, sync::Arc};
 use tokio::sync::RwLock;
@@ -148,6 +149,13 @@ where
         }
     }
 
+    async fn find_base_tx_and_revision(
+        &self,
+        mut filter: Filter,
+    ) -> Result<Vec<(BaseTx, Revision)>, Error> {
+        self.inner.find_base_tx_and_revision(filter).await
+    }
+
     async fn get_negative_unspent_payments(
         &self,
         account: &AccountId,

+ 11 - 36
utxo/src/storage/mod.rs

@@ -1,7 +1,7 @@
 //! Storage layer trait
 use crate::{
     amount::AmountCents, payment::PaymentTo, transaction::Type, AccountId, Amount, Asset, BaseTx,
-    PaymentFrom, PaymentId, RevId, Revision, Tag, Transaction, TxId,
+    Filter, PaymentFrom, PaymentId, RevId, Revision, Tag, Transaction, TxId,
 };
 //use chrono::{DateTime, Utc};
 use serde::Serialize;
@@ -323,16 +323,11 @@ pub trait Storage {
         Ok(payments)
     }
 
-    /*
-    ///
-    async fn get_revision(&self, revision_id: &TxId) -> Result<Transaction, Error>;
-
-    ///
-    async fn get_all_revisions(
+    /// Find transactions with a given filter
+    async fn find_base_tx_and_revision(
         &self,
-        transaction_id: &TxId,
-    ) -> Result<Vec<(TxId, String, DateTime<Utc>)>, Error>;
-    */
+        mut filter: Filter,
+    ) -> Result<Vec<(BaseTx, Revision)>, Error>;
 
     /// Returns a transaction by id, the transaction is returned as a tuple of base transaction and
     /// the current revision
@@ -355,31 +350,10 @@ pub trait Storage {
     ) -> Result<Vec<(BaseTx, Revision)>, Error>;
 
     /// Returns a revision with a transaction object by id
-    async fn get_transaction(&self, id: FilterId) -> Result<Transaction, Error> {
-        let (base_tx, revision) = self.get_transaction_and_revision(id).await?;
-        (
-            self.get_revisions(&revision.transaction_id).await?,
-            base_tx,
-            revision,
-        )
-            .try_into()
-            .map_err(|e: crate::transaction::Error| Error::Encoding(e.to_string()))
-    }
-
-    /// Get list of transactions
-    async fn get_transactions(
-        &self,
-        account: &AccountId,
-        types: &[Type],
-        tags: &[Tag],
-    ) -> Result<Vec<Transaction>, Error> {
-        let mut transactions = Vec::new();
-        for (base_tx, revision) in self
-            .get_transactions_with_latest_revision(account, types, tags)
-            .await?
-            .into_iter()
-        {
-            transactions.push(
+    async fn find(&self, filter: Filter) -> Result<Vec<Transaction>, Error> {
+        let mut results = vec![];
+        for (base_tx, revision) in self.find_base_tx_and_revision(filter).await?.into_iter() {
+            results.push(
                 (
                     self.get_revisions(&revision.transaction_id).await?,
                     base_tx,
@@ -389,7 +363,8 @@ pub trait Storage {
                     .map_err(|e: crate::transaction::Error| Error::Encoding(e.to_string()))?,
             );
         }
-        Ok(transactions)
+
+        Ok(results)
     }
 }
 

+ 212 - 1
utxo/src/storage/sqlite/mod.rs

@@ -3,7 +3,8 @@ use crate::{
     amount::AmountCents,
     storage::{Error, FilterId, Storage},
     transaction::{Revision, Type},
-    AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, RevId, Tag, TxId,
+    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, Tag, Transaction,
+    TxId,
 };
 use borsh::from_slice;
 use futures::TryStreamExt;
@@ -22,6 +23,135 @@ pub struct SQLite {
     db: sqlx::SqlitePool,
 }
 
+enum PrimaryFilter {
+    Id(Vec<TxId>),
+    Revision(Vec<RevId>),
+    Account(Vec<AccountId>),
+    Type(Vec<Type>),
+    Stream,
+}
+
+struct Cursor {
+    primary_filter: PrimaryFilter,
+    filter: Filter,
+    skip: usize,
+}
+
+/// Receives a cursor and does the initial loading of transactions that may match the requested
+/// filter.
+///
+/// The function will load the full transaction of candidates and return them.
+async fn find_candidates<'e, 'c: 'e, E>(
+    executor: E,
+    cursor: &mut Cursor,
+    limit: usize,
+) -> Result<Vec<(BaseTx, Revision)>, Error>
+where
+    E: sqlx::Executor<'c, Database = sqlx::Sqlite>,
+{
+    let since = if let Some(since) = cursor.filter.since {
+        format!("AND created_at <= {}", since.timestamp())
+    } else {
+        "".to_owned()
+    };
+    let until = if let Some(until) = cursor.filter.until {
+        format!("AND created_at >= {}", until.timestamp())
+    } else {
+        "".to_owned()
+    };
+
+    let rows = match &cursor.primary_filter {
+        PrimaryFilter::Id(ids) => {
+            let sql = format!(
+                r#"
+                    SELECT
+                        "bt"."blob",
+                        "b"."blob"
+                    FROM
+                        "transactions" as "t",
+                        "base_transactions" as "bt",
+                        "revisions" as "b"
+                    WHERE
+                        "t"."transaction_id" IN ({})
+                        AND "t"."revision_id" = "b"."revision_id"
+                        AND "t"."transaction_id" = "bt"."transaction_id"
+                        {} {}
+                    ORDER BY "t"."created_at" DESC
+                    LIMIT ? OFFSET ?
+                    "#,
+                "?,".repeat(ids.len()).trim_end_matches(","),
+                since,
+                until
+            );
+            let mut query = sqlx::query(&sql);
+            for id in ids.iter() {
+                query = query.bind(id.to_string());
+            }
+            query
+                .bind(limit as i64)
+                .bind(cursor.skip as i64)
+                .fetch_all(executor)
+                .await
+        }
+        PrimaryFilter::Revision(rev_ids) => {
+            let sql = format!(
+                r#"
+                    SELECT
+                        "bt"."blob",
+                        "b"."blob"
+                    FROM
+                        "transactions" as "t",
+                        "base_transactions" as "bt",
+                        "revisions" as "b"
+                    WHERE
+                        "b"."revision_id" IN ({})
+                        AND "t"."revision_id" = "b"."revision_id"
+                        AND "t"."transaction_id" = "bt"."transaction_id"
+                        {} {}
+                    ORDER BY "t"."created_at" DESC
+                    LIMIT ? OFFSET ?
+                    "#,
+                "?,".repeat(rev_ids.len()).trim_end_matches(","),
+                since,
+                until
+            );
+            let mut query = sqlx::query(&sql);
+            for id in rev_ids.iter() {
+                query = query.bind(id.to_string());
+            }
+            query
+                .bind(limit as i64)
+                .bind(cursor.skip as i64)
+                .fetch_all(executor)
+                .await
+        }
+        _ => todo!(),
+    };
+
+    let results = rows
+        .map_err(|e| Error::Storage(e.to_string()))?
+        .into_iter()
+        .map(|row| {
+            let base_tx = row
+                .try_get::<Vec<u8>, usize>(0)
+                .map_err(|e| Error::Storage(e.to_string()))?;
+
+            let revision = row
+                .try_get::<Vec<u8>, usize>(1)
+                .map_err(|e| Error::Storage(e.to_string()))?;
+
+            Ok((
+                from_slice::<BaseTx>(&base_tx)?,
+                from_slice::<Revision>(&revision)?,
+            ))
+        })
+        .collect::<Result<Vec<_>, Error>>()?;
+
+    cursor.skip += results.len();
+
+    Ok(results)
+}
+
 impl SQLite {
     /// Creates a new Verax SQLite storage layer. It takes an instance of the asset_manager
     pub fn new(db: sqlx::SqlitePool) -> Self {
@@ -391,6 +521,87 @@ impl Storage for SQLite {
         ))
     }
 
+    async fn find_base_tx_and_revision(
+        &self,
+        mut filter: Filter,
+    ) -> Result<Vec<(BaseTx, Revision)>, Error> {
+        let mut conn = self
+            .db
+            .acquire()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+
+        let primary_filter = if !filter.ids.is_empty() {
+            PrimaryFilter::Id(std::mem::take(&mut filter.ids))
+        } else if !filter.revisions.is_empty() {
+            PrimaryFilter::Revision(std::mem::take(&mut filter.revisions))
+        } else if !filter.accounts.is_empty() {
+            PrimaryFilter::Account(std::mem::take(&mut filter.accounts))
+        } else if !filter.kind.is_empty() {
+            PrimaryFilter::Type(std::mem::take(&mut filter.kind))
+        } else {
+            PrimaryFilter::Stream
+        };
+
+        filter.ids.sort();
+        filter.revisions.sort();
+        filter.accounts.sort();
+        filter.kind.sort();
+        filter.tags.sort();
+
+        let mut cursor = Cursor {
+            primary_filter,
+            filter,
+            skip: 0,
+        };
+
+        let candidate_limits = 20;
+        let mut results = vec![];
+
+        loop {
+            let candidates = find_candidates(&mut *conn, &mut cursor, candidate_limits).await?;
+            // if there are fewer candidates than requested it means there are no more resultset
+            let no_more_candidates = candidates.len() != candidate_limits;
+
+            for candidate in candidates.into_iter() {
+                if !cursor.filter.ids.is_empty()
+                    && cursor
+                        .filter
+                        .ids
+                        .binary_search(&candidate.1.transaction_id)
+                        .is_err()
+                {
+                    continue;
+                }
+                if !cursor.filter.tags.is_empty() {
+                    let mut found = false;
+                    for tag in candidate.1.tags.iter() {
+                        if cursor.filter.tags.binary_search(&tag).is_ok() {
+                            found = true;
+                            break;
+                        }
+                    }
+                    if !found {
+                        continue;
+                    }
+                }
+                results.push(candidate);
+            }
+
+            if no_more_candidates
+                || (cursor.filter.limit != 0 && cursor.filter.limit >= results.len())
+            {
+                break;
+            }
+        }
+
+        Ok(if cursor.filter.limit != 0 {
+            results.split_off(cursor.filter.limit)
+        } else {
+            results
+        })
+    }
+
     async fn get_transactions_with_latest_revision(
         &self,
         account: &AccountId,

+ 3 - 0
utxo/src/transaction/typ.rs

@@ -11,6 +11,9 @@ pub enum Error {
     Clone,
     Copy,
     PartialEq,
+    PartialOrd,
+    Ord,
+    Eq,
     Serialize,
     Deserialize,
     borsh::BorshDeserialize,