Cesar Rodas 10 сар өмнө
parent
commit
4016361249

+ 9 - 2
utxo/src/storage/cache/batch.rs

@@ -170,9 +170,16 @@ where
             .await
     }
 
-    async fn tag_transaction(&mut self, transaction_id: &TxId, tags: &[Tag]) -> Result<(), Error> {
+    async fn tag_transaction(
+        &mut self,
+        transaction_id: &TxId,
+        base_tx: &BaseTx,
+        tags: &[Tag],
+    ) -> Result<(), Error> {
         self.to_invalidate
             .insert(Ids::Transaction(transaction_id.clone()), ());
-        self.inner.tag_transaction(transaction_id, tags).await
+        self.inner
+            .tag_transaction(transaction_id, base_tx, tags)
+            .await
     }
 }

+ 2 - 2
utxo/src/storage/cursor.rs

@@ -6,7 +6,7 @@ use crate::{AccountId, BaseTx, Filter, RevId, Revision, Tag, TxId, Type};
 /// of it like which key is used to filter transactions quickly before applying the other filters.
 pub enum PrimaryFilter {
     /// By transaction ID
-    Id(Vec<TxId>),
+    TxId(Vec<TxId>),
     /// By revision ID
     Revision(Vec<RevId>),
     /// By accounts
@@ -33,7 +33,7 @@ impl From<Filter> for Cursor {
         let primary_filter = if !filter.revisions.is_empty() {
             PrimaryFilter::Revision(std::mem::take(&mut filter.revisions))
         } else if !filter.ids.is_empty() {
-            PrimaryFilter::Id(std::mem::take(&mut filter.ids))
+            PrimaryFilter::TxId(std::mem::take(&mut filter.ids))
         } else if !filter.accounts.is_empty() {
             PrimaryFilter::Account(std::mem::take(&mut filter.accounts))
         } else if !filter.typ.is_empty() {

+ 96 - 2
utxo/src/storage/mod.rs

@@ -193,7 +193,12 @@ pub trait Batch<'a> {
 
     /// Sets the tags for a given transaction. Any tag not included in this
     /// vector should be removed
-    async fn tag_transaction(&mut self, transaction_id: &TxId, tags: &[Tag]) -> Result<(), Error>;
+    async fn tag_transaction(
+        &mut self,
+        transaction_id: &TxId,
+        base_tx: &BaseTx,
+        tags: &[Tag],
+    ) -> Result<(), Error>;
 
     /// Creates a relationship between an account and a transaction.
     ///
@@ -317,7 +322,7 @@ pub mod test {
     use std::collections::HashMap;
 
     use super::*;
-    use crate::{config::Config, status::StatusManager, Transaction};
+    use crate::{config::Config, status::StatusManager, Ledger, Transaction};
     use rand::Rng;
 
     #[macro_export]
@@ -342,6 +347,7 @@ pub mod test {
             $crate::storage_unit_test!(does_not_spend_unspendable_payments);
             $crate::storage_unit_test!(spend_spendable_payments);
             $crate::storage_unit_test!(relate_account_to_transaction);
+            $crate::storage_unit_test!(find_transactions_by_tags);
             $crate::storage_unit_test!(not_spendable_new_payments_not_spendable);
         };
     }
@@ -585,6 +591,94 @@ pub mod test {
         }
     }
 
+    pub async fn find_transactions_by_tags<T>(storage: T)
+    where
+        T: Storage + Send + Sync,
+    {
+        let ledger = Ledger::new(Config {
+            storage,
+            status: Default::default(),
+        });
+
+        for i in 0..10 {
+            let usd: Asset = "USD/2".parse().expect("valid asset");
+            let account = format!("account-{}", i).parse().expect("valid account");
+
+            let deposit = ledger
+                .deposit(
+                    &account,
+                    usd.from_human("100.99").expect("valid amount"),
+                    "settled".into(),
+                    format!("test deposit {}", i),
+                )
+                .await
+                .expect("valid deposit");
+
+            if i % 2 == 0 {
+                ledger
+                    .set_tags(
+                        deposit.revision_id,
+                        vec![
+                            "even".parse().expect("valid tag"),
+                            "even".parse().expect("valid tag"),
+                            "all".parse().expect("valid tag"),
+                        ],
+                        "add tags".to_owned(),
+                    )
+                    .await
+                    .expect("tag tx");
+            } else {
+                ledger
+                    .set_tags(
+                        deposit.revision_id,
+                        vec![
+                            "odd".parse().expect("valid tag"),
+                            "all".parse().expect("valid tag"),
+                        ],
+                        "add tags".to_owned(),
+                    )
+                    .await
+                    .expect("tag tx");
+            }
+        }
+
+        assert_eq!(
+            5,
+            ledger
+                .get_transactions(Filter {
+                    tags: vec!["even".parse().expect("valid tag")],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+
+        assert_eq!(
+            5,
+            ledger
+                .get_transactions(Filter {
+                    tags: vec!["odd".parse().expect("valid tag")],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+
+        assert_eq!(
+            10,
+            ledger
+                .get_transactions(Filter {
+                    tags: vec!["all".parse().expect("valid tag")],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+    }
+
     pub async fn spend_spendable_payments<T>(storage: T)
     where
         T: Storage + Send + Sync,

+ 10 - 2
utxo/src/storage/sqlite/batch.rs

@@ -307,7 +307,12 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
         Ok(())
     }
 
-    async fn tag_transaction(&mut self, transaction_id: &TxId, tags: &[Tag]) -> Result<(), Error> {
+    async fn tag_transaction(
+        &mut self,
+        transaction_id: &TxId,
+        base_tx: &BaseTx,
+        tags: &[Tag],
+    ) -> Result<(), Error> {
         sqlx::query(r#"DELETE FROM "transactions_by_tags" WHERE "transaction_id" = ? "#)
             .bind(transaction_id.to_string())
             .execute(&mut *self.inner)
@@ -315,9 +320,12 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             .map_err(|e| Error::Storage(e.to_string()))?;
 
         for tag in tags {
-            sqlx::query(r#"INSERT INTO "transactions_by_tags" VALUES(?, ?)"#)
+            sqlx::query(r#"INSERT INTO "transactions_by_tags"("transaction_id", "tag", "created_at") VALUES(?, ?, ?)
+                ON CONFLICT("transaction_id", "tag", "created_at") DO NOTHING
+                "#)
                 .bind(transaction_id.to_string())
                 .bind(tag.as_str())
+                .bind(base_tx.created_at.timestamp() as i64)
                 .execute(&mut *self.inner)
                 .await
                 .map_err(|e| Error::Storage(e.to_string()))?;

+ 30 - 13
utxo/src/storage/sqlite/mod.rs

@@ -46,7 +46,7 @@ where
     };
 
     let rows = match &cursor.primary_filter {
-        PrimaryFilter::Id(ids) => {
+        PrimaryFilter::TxId(ids) => {
             let sql = format!(
                 r#"
                     SELECT
@@ -173,18 +173,33 @@ where
         PrimaryFilter::Tags(tags) => {
             let sql = format!(
                 r#"
-                SELECT "bt"."blob", "b"."blob"
-                FROM "transactions" as "t"
-                JOIN "base_transactions" as "bt" ON "t"."transaction_id" = "bt"."transaction_id"
-                JOIN "revisions" as "b" ON "t"."revision_id" = "b"."revision_id"
-                JOIN "transaction_tags" as "tt" ON "t"."transaction_id" = "tt"."transaction_id"
-                WHERE "tt"."tag" IN (?)
-                {} {}
-                LIMIT {} OFFSET {}
-                "#,
-                since, until, limit, cursor.filter.skip
+                    SELECT
+                        "bt"."blob",
+                        "b"."blob"
+                    FROM
+                        "transactions" as "t",
+                        "base_transactions" as "bt",
+                        "revisions" as "b"
+                    WHERE
+                        "t"."transaction_id" IN (SELECT DISTINCT "transaction_id" FROM "transactions_by_tags" WHERE "tag" IN ({}) ORDER BY "created_at" DESC)
+                        AND "t"."revision_id" = "b"."revision_id"
+                        AND "t"."transaction_id" = "bt"."transaction_id"
+                        {} {}
+                    ORDER BY "t"."created_at" DESC
+                    LIMIT {} OFFSET {}
+                    "#,
+                "?,".repeat(tags.len()).trim_end_matches(","),
+                since,
+                until,
+                limit,
+                cursor.filter.skip
             );
-            todo!()
+
+            let mut query = sqlx::query(&sql);
+            for id in tags.iter() {
+                query = query.bind(id.to_string());
+            }
+            query.fetch_all(executor).await
         }
         PrimaryFilter::Stream => {
             let sql = format!(
@@ -273,8 +288,10 @@ impl SQLite {
         CREATE TABLE IF NOT EXISTS "transactions_by_tags" (
             "transaction_id" VARCHAR(67),
             "tag" VARCHAR(250) NOT NULL,
-            PRIMARY KEY ("transaction_id", "tag")
+            "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+            PRIMARY KEY ("tag", "created_at", "transaction_id")
         );
+        CREATE INDEX IF NOT EXISTS "transaction_id_and_tags" ON "transactions_by_tags" ("transaction_id");
         CREATE TABLE IF NOT EXISTS "payments" (
             "payment_id" VARCHAR(80) NOT NULL PRIMARY KEY,
             "to" VARCHAR(64) NOT NULL,

+ 3 - 1
utxo/src/transaction/mod.rs

@@ -349,7 +349,9 @@ impl Transaction {
             .store_revision(&self.revision_id, &self.revision)
             .await?;
 
-        batch.tag_transaction(&self.id, &self.revision.tags).await?;
+        batch
+            .tag_transaction(&self.id, &self.transaction, &self.revision.tags)
+            .await?;
 
         batch
             .update_transaction_revision(