瀏覽代碼

Introduce cursor

Cesar Rodas 10 月之前
父節點
當前提交
a73c4caa09
共有 5 個文件被更改,包括 210 次插入104 次删除
  1. 7 3
      utxo/src/filter.rs
  2. 102 0
      utxo/src/storage/cursor.rs
  3. 5 2
      utxo/src/storage/mod.rs
  4. 94 97
      utxo/src/storage/sqlite/mod.rs
  5. 2 2
      utxo/src/tests/tx.rs

+ 7 - 3
utxo/src/filter.rs

@@ -9,15 +9,19 @@ use serde::{Deserialize, Serialize};
 #[derive(Clone, Debug, Serialize, Deserialize, Default)]
 pub struct Filter {
     /// List of transaction IDs to query
-    #[serde(default, skip_serializing_if = "Vec::is_empty")]
+    #[serde(skip_serializing_if = "Vec::is_empty")]
     pub ids: Vec<TxId>,
     /// List of revisions to query
+    #[serde(skip_serializing_if = "Vec::is_empty")]
     pub revisions: Vec<RevId>,
     /// List of accounts to query their transactions
+    #[serde(skip_serializing_if = "Vec::is_empty")]
     pub accounts: Vec<AccountId>,
     /// List of transaction types-kind
-    pub kind: Vec<Type>,
+    #[serde(rename = "type", skip_serializing_if = "Vec::is_empty")]
+    pub typ: Vec<Type>,
     /// List of transactions by tags
+    #[serde(skip_serializing_if = "Vec::is_empty")]
     pub tags: Vec<Tag>,
     /// List transactions newer than this timestamp
     #[serde(
@@ -44,7 +48,7 @@ pub struct Filter {
 impl Filter {
     /// Adds a given kind to the filter
     pub fn kind(mut self, kind: Type) -> Self {
-        self.kind.push(kind);
+        self.typ.push(kind);
         self
     }
 

+ 102 - 0
utxo/src/storage/cursor.rs

@@ -0,0 +1,102 @@
+//! Cursor implementation
+use crate::{AccountId, BaseTx, Filter, RevId, Revision, Tag, TxId, Type};
+
+#[derive(Debug, Clone)]
+/// The primary filter is used to filter the transactions before applying the other filters. Think
+/// of it like which key is used to filter transactions quickly before applying the other filters.
+pub enum PrimaryFilter {
+    /// By transaction ID
+    Id(Vec<TxId>),
+    /// By revision ID
+    Revision(Vec<RevId>),
+    /// By accounts
+    Account(Vec<AccountId>),
+    /// By transaction type
+    Type(Vec<Type>),
+    /// By tags
+    Tags(Vec<Tag>),
+    /// By transaction status
+    Stream,
+}
+
+#[derive(Debug, Clone)]
+/// The cursor
+pub struct Cursor {
+    /// The primary filter
+    pub primary_filter: PrimaryFilter,
+    /// The secondary filter
+    pub filter: Filter,
+}
+
+impl From<Filter> for Cursor {
+    fn from(mut filter: Filter) -> Self {
+        let primary_filter = if !filter.revisions.is_empty() {
+            PrimaryFilter::Revision(std::mem::take(&mut filter.revisions))
+        } else if !filter.ids.is_empty() {
+            PrimaryFilter::Id(std::mem::take(&mut filter.ids))
+        } else if !filter.accounts.is_empty() {
+            PrimaryFilter::Account(std::mem::take(&mut filter.accounts))
+        } else if !filter.typ.is_empty() {
+            PrimaryFilter::Type(std::mem::take(&mut filter.typ))
+        } else if !filter.tags.is_empty() {
+            PrimaryFilter::Tags(std::mem::take(&mut filter.tags))
+        } else {
+            PrimaryFilter::Stream
+        };
+
+        filter.ids.sort();
+        filter.revisions.sort();
+        filter.accounts.sort();
+        filter.typ.sort();
+        filter.tags.sort();
+
+        Self {
+            primary_filter,
+            filter,
+        }
+    }
+}
+
+impl Cursor {
+    /// Check if the base transaction and the revision matches the current cursor filter
+    pub fn matches(&self, base: &BaseTx, revision: &Revision) -> bool {
+        if !self.filter.ids.is_empty()
+            && self
+                .filter
+                .ids
+                .binary_search(&revision.transaction_id)
+                .is_err()
+        {
+            return false;
+        }
+
+        if !self.filter.revisions.is_empty()
+            && self
+                .filter
+                .revisions
+                .binary_search(&revision.rev_id().expect("vv"))
+                .is_err()
+        {
+            return false;
+        }
+
+        if !self.filter.typ.is_empty() && self.filter.typ.binary_search(&base.typ).is_err() {
+            return false;
+        }
+
+        if !self.filter.tags.is_empty() {
+            let mut found = false;
+            for tag in revision.tags.iter() {
+                if self.filter.tags.binary_search(&tag).is_ok() {
+                    found = true;
+                    break;
+                }
+            }
+            if !found {
+                return false;
+            }
+        }
+
+        true
+    }
+}

+ 5 - 2
utxo/src/storage/mod.rs

@@ -7,9 +7,12 @@ use crate::{
 use serde::Serialize;
 
 pub mod cache;
+mod cursor;
+
 #[cfg(any(feature = "sqlite", test))]
 pub mod sqlite;
 pub use self::cache::Cache;
+pub use self::cursor::{Cursor, PrimaryFilter};
 #[cfg(any(feature = "sqlite", test))]
 pub use self::sqlite::SQLite;
 
@@ -812,7 +815,7 @@ pub mod test {
         batch.commit().await.expect("valid commit");
 
         let filter = Filter {
-            kind: vec![Type::Deposit],
+            typ: vec![Type::Deposit],
             ..Default::default()
         };
 
@@ -845,7 +848,7 @@ pub mod test {
                 0,
                 storage
                     .find(Filter {
-                        kind: vec![Type::Withdrawal],
+                        typ: vec![Type::Withdrawal],
                         accounts: vec![account],
                         ..Default::default()
                     })

+ 94 - 97
utxo/src/storage/sqlite/mod.rs

@@ -1,8 +1,9 @@
 //! SQLite storage layer for Verax
+use super::{Cursor, PrimaryFilter, ReceivedPaymentStatus};
 use crate::{
     amount::AmountCents,
     storage::{Error, Storage},
-    transaction::{Revision, Type},
+    transaction::Revision,
     AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, TxId,
 };
 use borsh::from_slice;
@@ -15,28 +16,11 @@ mod batch;
 pub use batch::Batch;
 pub use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
 
-use super::ReceivedPaymentStatus;
-
 /// SQLite storage layer for Verax
 pub struct SQLite {
     db: sqlx::SqlitePool,
 }
 
-#[derive(Debug, Clone)]
-enum PrimaryFilter {
-    Id(Vec<TxId>),
-    Revision(Vec<RevId>),
-    Account(Vec<AccountId>),
-    Type(Vec<Type>),
-    Stream,
-}
-
-#[derive(Debug, Clone)]
-struct Cursor {
-    primary_filter: PrimaryFilter,
-    filter: Filter,
-}
-
 /// Receives a cursor and does the initial loading of transactions that may match the requested
 /// filter.
 ///
@@ -62,6 +46,40 @@ where
     };
 
     let rows = match &cursor.primary_filter {
+        PrimaryFilter::Id(ids) => {
+            let sql = format!(
+                r#"
+                    SELECT
+                        "bt"."blob",
+                        "b"."blob"
+                    FROM
+                        "transactions" as "t",
+                        "base_transactions" as "bt",
+                        "revisions" as "b"
+                    WHERE
+                        "t"."transaction_id" IN ({})
+                        AND "t"."revision_id" = "b"."revision_id"
+                        AND "t"."transaction_id" = "bt"."transaction_id"
+                        {} {}
+                    ORDER BY "t"."created_at" DESC
+                    LIMIT {} OFFSET {}
+                    "#,
+                "?,".repeat(ids.len()).trim_end_matches(","),
+                since,
+                until,
+                limit,
+                cursor.filter.skip
+            );
+            let mut query = sqlx::query(&sql);
+            for id in ids.iter() {
+                query = query.bind(id.to_string());
+            }
+            query
+                .bind(limit as i64)
+                .bind(cursor.filter.skip as i64)
+                .fetch_all(executor)
+                .await
+        }
         PrimaryFilter::Revision(rev_ids) => {
             let sql = format!(
                 r#"
@@ -90,73 +108,105 @@ where
             }
             query.fetch_all(executor).await
         }
-        PrimaryFilter::Id(ids) => {
+        PrimaryFilter::Account(account_ids) => {
             let sql = format!(
                 r#"
                     SELECT
                         "bt"."blob",
                         "b"."blob"
                     FROM
+                        "transaction_accounts" as "ta",
                         "transactions" as "t",
                         "base_transactions" as "bt",
                         "revisions" as "b"
                     WHERE
-                        "t"."transaction_id" IN ({})
+                        "ta"."account_id" IN ({})
+                        AND "t"."transaction_id" = "ta"."transaction_id"
                         AND "t"."revision_id" = "b"."revision_id"
                         AND "t"."transaction_id" = "bt"."transaction_id"
                         {} {}
-                    ORDER BY "t"."created_at" DESC
+                    ORDER BY "ta"."created_at" DESC
                     LIMIT {} OFFSET {}
                     "#,
-                "?,".repeat(ids.len()).trim_end_matches(","),
+                "?,".repeat(account_ids.len()).trim_end_matches(","),
                 since,
                 until,
                 limit,
                 cursor.filter.skip
             );
             let mut query = sqlx::query(&sql);
-            for id in ids.iter() {
+            for id in account_ids.iter() {
                 query = query.bind(id.to_string());
             }
-            query
-                .bind(limit as i64)
-                .bind(cursor.filter.skip as i64)
-                .fetch_all(executor)
-                .await
+            query.fetch_all(executor).await
         }
-        PrimaryFilter::Account(account_ids) => {
+        PrimaryFilter::Type(types) => {
             let sql = format!(
                 r#"
                     SELECT
                         "bt"."blob",
                         "b"."blob"
                     FROM
-                        "transaction_accounts" as "ta",
                         "transactions" as "t",
                         "base_transactions" as "bt",
                         "revisions" as "b"
                     WHERE
-                        "ta"."account_id" IN ({})
-                        AND "t"."transaction_id" = "ta"."transaction_id"
+                        "bt"."type" IN ({})
                         AND "t"."revision_id" = "b"."revision_id"
                         AND "t"."transaction_id" = "bt"."transaction_id"
                         {} {}
-                    ORDER BY "ta"."created_at" DESC
+                    ORDER BY "bt"."created_at" DESC
                     LIMIT {} OFFSET {}
                     "#,
-                "?,".repeat(account_ids.len()).trim_end_matches(","),
+                "?,".repeat(types.len()).trim_end_matches(","),
                 since,
                 until,
                 limit,
                 cursor.filter.skip
             );
             let mut query = sqlx::query(&sql);
-            for id in account_ids.iter() {
+            for id in types.iter() {
                 query = query.bind(id.to_string());
             }
             query.fetch_all(executor).await
         }
-        _ => todo!(),
+        PrimaryFilter::Tags(tags) => {
+            let sql = format!(
+                r#"
+                SELECT "bt"."blob", "b"."blob"
+                FROM "transactions" as "t"
+                JOIN "base_transactions" as "bt" ON "t"."transaction_id" = "bt"."transaction_id"
+                JOIN "revisions" as "b" ON "t"."revision_id" = "b"."revision_id"
+                JOIN "transaction_tags" as "tt" ON "t"."transaction_id" = "tt"."transaction_id"
+                WHERE "tt"."tag" IN (?)
+                {} {}
+                LIMIT {} OFFSET {}
+                "#,
+                since, until, limit, cursor.filter.skip
+            );
+            todo!()
+        }
+        PrimaryFilter::Stream => {
+            let sql = format!(
+                r#"
+                    SELECT
+                        "bt"."blob",
+                        "b"."blob"
+                    FROM
+                        "transactions" as "t",
+                        "base_transactions" as "bt",
+                        "revisions" as "b"
+                    WHERE
+                        "t"."revision_id" = "b"."revision_id"
+                        AND "t"."transaction_id" = "bt"."transaction_id"
+                        {} {}
+                    ORDER BY "t"."created_at" DESC
+                    LIMIT {} OFFSET {}
+                    "#,
+                since, until, limit, cursor.filter.skip
+            );
+            sqlx::query(&sql).fetch_all(executor).await
+        }
     };
 
     let results = rows
@@ -495,7 +545,7 @@ impl Storage for SQLite {
 
     async fn find_base_tx_and_revision(
         &self,
-        mut filter: Filter,
+        filter: Filter,
     ) -> Result<Vec<(BaseTx, Revision)>, Error> {
         let mut conn = self
             .db
@@ -503,28 +553,7 @@ impl Storage for SQLite {
             .await
             .map_err(|e| Error::Storage(e.to_string()))?;
 
-        let primary_filter = if !filter.ids.is_empty() {
-            PrimaryFilter::Id(std::mem::take(&mut filter.ids))
-        } else if !filter.revisions.is_empty() {
-            PrimaryFilter::Revision(std::mem::take(&mut filter.revisions))
-        } else if !filter.accounts.is_empty() {
-            PrimaryFilter::Account(std::mem::take(&mut filter.accounts))
-        } else if !filter.kind.is_empty() {
-            PrimaryFilter::Type(std::mem::take(&mut filter.kind))
-        } else {
-            PrimaryFilter::Stream
-        };
-
-        filter.ids.sort();
-        filter.revisions.sort();
-        filter.accounts.sort();
-        filter.kind.sort();
-        filter.tags.sort();
-
-        let mut cursor = Cursor {
-            primary_filter,
-            filter,
-        };
+        let mut cursor: Cursor = filter.into();
 
         let candidate_limits = 20;
         let mut results = vec![];
@@ -534,44 +563,12 @@ impl Storage for SQLite {
             // if there are fewer candidates than requested it means there are no more resultset
             let no_more_candidates = candidates.len() != candidate_limits;
 
-            for candidate in candidates.into_iter() {
-                if !cursor.filter.ids.is_empty()
-                    && cursor
-                        .filter
-                        .ids
-                        .binary_search(&candidate.1.transaction_id)
-                        .is_err()
-                {
-                    continue;
-                }
-                if !cursor.filter.revisions.is_empty()
-                    && cursor
-                        .filter
-                        .revisions
-                        .binary_search(&candidate.1.rev_id().expect("vv"))
-                        .is_err()
-                {
-                    continue;
-                }
-                if !cursor.filter.kind.is_empty()
-                    && cursor.filter.kind.binary_search(&candidate.0.typ).is_err()
-                {
-                    continue;
-                }
-                if !cursor.filter.tags.is_empty() {
-                    let mut found = false;
-                    for tag in candidate.1.tags.iter() {
-                        if cursor.filter.tags.binary_search(&tag).is_ok() {
-                            found = true;
-                            break;
-                        }
-                    }
-                    if !found {
-                        continue;
-                    }
-                }
-                results.push(candidate);
-            }
+            let mut iteration_result = candidates
+                .into_iter()
+                .filter(|(base, revision)| cursor.matches(base, revision))
+                .collect::<Vec<_>>();
+
+            results.append(&mut iteration_result);
 
             if no_more_candidates
                 || (cursor.filter.limit != 0 && cursor.filter.limit >= results.len())

+ 2 - 2
utxo/src/tests/tx.rs

@@ -41,7 +41,7 @@ async fn multi_account_transfers() {
 
     let filter = Filter {
         accounts: vec![accounts[0].clone()],
-        kind: vec![Type::Exchange],
+        typ: vec![Type::Exchange],
         ..Default::default()
     };
 
@@ -52,7 +52,7 @@ async fn multi_account_transfers() {
     for _ in &accounts {
         let filter = Filter {
             accounts: vec![accounts[0].clone()],
-            kind: vec![Type::Exchange],
+            typ: vec![Type::Exchange],
             ..Default::default()
         };
         let txs = ledger.get_transactions(filter).await.expect("valid");