瀏覽代碼

Working on a universal filter

Cesar Rodas 11 月之前
父節點
當前提交
ebc556236d

+ 65 - 55
src/main.rs

@@ -4,7 +4,7 @@ use actix_web::{
 };
 use serde::{Deserialize, Serialize};
 use serde_json::json;
-use verax::{AccountId, AnyAmount, AnyId, Asset, RevId, Status, Type};
+use verax::{AccountId, AnyAmount, AnyId, Asset, Filter, RevId, Status, Tag, Type};
 
 #[derive(Deserialize)]
 pub struct Movement {
@@ -19,7 +19,7 @@ pub struct Deposit {
     #[serde(flatten)]
     pub amount: AnyAmount,
     pub memo: String,
-    pub tags: Vec<String>,
+    pub tags: Vec<Tag>,
     pub status: Status,
 }
 
@@ -41,7 +41,7 @@ impl Deposit {
         Ok(if !self.tags.is_empty() {
             ledger
                 ._inner
-                .set_tags(&zdeposit.id, self.tags, "Update tags".to_owned())
+                .set_tags(zdeposit.revision_id, self.tags, "Update tags".to_owned())
                 .await?
         } else {
             zdeposit
@@ -66,7 +66,7 @@ pub struct UpdateTransaction {
 impl UpdateTransaction {
     pub async fn to_ledger_transaction(
         self,
-        id: &RevId,
+        id: RevId,
         ledger: &Ledger,
     ) -> Result<verax::Transaction, verax::Error> {
         ledger
@@ -132,63 +132,73 @@ async fn get_balance(info: web::Path<AccountId>, ledger: web::Data<Ledger>) -> i
 
 #[get("/{id}")]
 async fn get_info(info: web::Path<AnyId>, ledger: web::Data<Ledger>) -> impl Responder {
-    let result = match info.0 {
-        AnyId::Account(account_id) => ledger
-            ._inner
-            .get_transactions(
-                &account_id,
-                vec![Type::Deposit, Type::Withdrawal, Type::Transaction],
-            )
-            .await
-            .map(|transactions| HttpResponse::Ok().json(transactions)),
-        AnyId::Revision(rev_id) => ledger
-            ._inner
-            .get_transaction_by_revision(&rev_id)
-            .await
-            .map(|rev| {
+    let (cache_for_ever, filter) = match info.0 {
+        AnyId::Account(account_id) => (
+            false,
+            Filter {
+                accounts: vec![account_id],
+                kind: vec![Type::Deposit, Type::Withdrawal, Type::Transaction],
+                ..Default::default()
+            },
+        ),
+        AnyId::Revision(rev_id) => (
+            true,
+            Filter {
+                revisions: vec![rev_id],
+                limit: 1,
+                ..Default::default()
+            },
+        ),
+
+        AnyId::Transaction(transaction_id) => (
+            false,
+            Filter {
+                ids: vec![transaction_id],
+                limit: 1,
+                ..Default::default()
+            },
+        ),
+
+        AnyId::Payment(payment_id) => {
+            let _ = ledger
+                ._inner
+                .get_payment_info(&payment_id)
+                .await
+                .map(|tx| HttpResponse::Ok().json(tx));
+
+            todo!()
+        }
+    };
+
+    let limit = filter.limit;
+
+    ledger
+        ._inner
+        .get_transactions(filter)
+        .await
+        .map(|results| {
+            let json_response = if limit == 1 {
+                serde_json::to_value(&results[0])
+            } else {
+                serde_json::to_value(&results)
+            }
+            .unwrap();
+
+            if cache_for_ever {
                 HttpResponse::Ok()
                     .header(
                         "Cache-Control",
                         "public, max-age=31536000, s-maxage=31536000, immutable",
                     )
                     .header("Vary", "Accept-Encoding")
-                    .json(rev)
-            }),
-
-        AnyId::Transaction(transaction_id) => ledger
-            ._inner
-            .get_transaction(&transaction_id)
-            .await
-            .map(|tx| {
-                if ledger
-                    ._inner
-                    .get_status_manager()
-                    .is_final(&tx.revision.status)
-                {
-                    HttpResponse::Ok()
-                        .header(
-                            "Cache-Control",
-                            "public, max-age=31536000, s-maxage=31536000, immutable",
-                        )
-                        .header("Vary", "Accept-Encoding")
-                        .json(tx)
-                } else {
-                    HttpResponse::Ok().json(tx)
-                }
-            }),
-        AnyId::Payment(payment_id) => ledger
-            ._inner
-            .get_payment_info(&payment_id)
-            .await
-            .map(|tx| HttpResponse::Ok().json(tx)),
-    };
-
-    match result {
-        Ok(x) => x,
-        Err(err) => {
+                    .json(json_response)
+            } else {
+                HttpResponse::Ok().json(json_response)
+            }
+        })
+        .map_err(|err| {
             HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
-        }
-    }
+        })
 }
 
 #[post("/deposit")]
@@ -226,7 +236,7 @@ async fn update_status(
 ) -> impl Responder {
     match item
         .into_inner()
-        .to_ledger_transaction(&info.0, &ledger)
+        .to_ledger_transaction(info.0, &ledger)
         .await
     {
         Ok(tx) => HttpResponse::Accepted().json(tx),

+ 4 - 0
utxo/src/error.rs

@@ -8,6 +8,10 @@ pub enum Error {
     #[error("Transaction: {0}")]
     Transaction(#[from] transaction::Error),
 
+    /// Transaction not found
+    #[error("Transaction not found")]
+    TxNotFound,
+
     /// An internal conversion error
     #[error("Conversion overflow: {0}")]
     Overflow(String),

+ 145 - 0
utxo/src/filter.rs

@@ -0,0 +1,145 @@
+use crate::{AccountId, RevId, Tag, TxId, Type};
+use chrono::{DateTime, Utc};
+use serde::{Deserialize, Serialize};
+
+/// Filter transactions
+///
+/// All this filters options are AND, meaning that an object must match all the
+/// requirements in order to be included in the resultset.
+#[derive(Clone, Debug, Serialize, Deserialize, Default)]
+pub struct Filter {
+    /// List of transaction IDs to query
+    #[serde(default, skip_serializing_if = "Vec::is_empty")]
+    pub ids: Vec<TxId>,
+    /// List of revisions to query
+    pub revisions: Vec<RevId>,
+    /// List of accounts to query their transactions
+    pub accounts: Vec<AccountId>,
+    /// List of transaction types-kind
+    pub kind: Vec<Type>,
+    /// List of transactions by tags
+    pub tags: Vec<Tag>,
+    /// List transactions newer than this timestamp
+    #[serde(
+        default,
+        with = "option_ts_seconds",
+        skip_serializing_if = "Option::is_none"
+    )]
+    pub since: Option<DateTime<Utc>>,
+    /// List transactions upto this timestamp
+    #[serde(
+        default,
+        with = "option_ts_seconds",
+        skip_serializing_if = "Option::is_none"
+    )]
+    pub until: Option<DateTime<Utc>>,
+    /// Limit for transactions
+    #[serde(default, skip_serializing_if = "is_zero")]
+    pub limit: usize,
+    /// Skip the first `skip` transactions
+    #[serde(default, skip_serializing_if = "is_zero")]
+    pub skip: usize,
+}
+
+impl Filter {
+    /// Adds a given kind to the filter
+    pub fn kind(mut self, kind: Type) -> Self {
+        self.kind.push(kind);
+        self
+    }
+
+    /// Adds a given tag to the filter
+    pub fn account(mut self, account: AccountId) -> Self {
+        self.accounts.push(account);
+        self
+    }
+
+    /// Adds until timestamp to the filter
+    pub fn until(mut self, time: DateTime<Utc>) -> Self {
+        self.until = Some(time);
+        self
+    }
+
+    /// Adds since timestamp to the filter
+    pub fn since(mut self, time: DateTime<Utc>) -> Self {
+        self.since = Some(time);
+        self
+    }
+}
+
+impl From<Vec<Tag>> for Filter {
+    fn from(value: Vec<Tag>) -> Self {
+        Filter {
+            tags: value,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<TxId> for Filter {
+    fn from(value: TxId) -> Self {
+        Filter {
+            ids: vec![value],
+            ..Default::default()
+        }
+    }
+}
+
+impl From<RevId> for Filter {
+    fn from(value: RevId) -> Self {
+        Filter {
+            revisions: vec![value],
+            ..Default::default()
+        }
+    }
+}
+
+impl From<AccountId> for Filter {
+    fn from(value: AccountId) -> Self {
+        Filter {
+            accounts: vec![value],
+            ..Default::default()
+        }
+    }
+}
+
+#[inline]
+fn is_zero(number: &usize) -> bool {
+    *number == 0
+}
+
+pub(crate) mod option_ts_seconds {
+    use chrono::{DateTime, LocalResult, TimeZone, Utc};
+    use serde::{de, Deserialize, Deserializer, Serializer};
+
+    pub fn serialize<S>(date: &Option<DateTime<Utc>>, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        match date {
+            Some(date) => serializer.serialize_i64(date.timestamp()),
+            None => serializer.serialize_none(),
+        }
+    }
+
+    pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let timestamp: Option<i64> = Deserialize::deserialize(deserializer)?;
+        let timestamp = if let Some(timestamp) = timestamp {
+            timestamp
+        } else {
+            return Ok(None);
+        };
+
+        match Utc.timestamp_opt(timestamp, 0) {
+            LocalResult::None => Err(de::Error::custom("Invalid timestamp")),
+            LocalResult::Ambiguous(min, max) => Err(de::Error::custom(format!(
+                "Timestamp {0} is ambiguous (min={1}, max={2})",
+                timestamp, min, max
+            ))),
+            LocalResult::Single(ts) => Ok(Some(ts)),
+        }
+    }
+}

+ 33 - 38
utxo/src/ledger.rs

@@ -1,7 +1,7 @@
 use crate::{
     amount::AmountCents, config::Config, status::StatusManager, storage::Storage,
-    transaction::Type, AccountId, Amount, Error, PaymentFrom, PaymentId, RevId, Status, Tag,
-    Transaction, TxId,
+    transaction::Type, AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status,
+    Tag, Transaction, TxId,
 };
 use std::{cmp::Ordering, collections::HashMap, sync::Arc};
 
@@ -267,45 +267,26 @@ where
         todo!()
     }
 
-    /// Returns the transaction object by a given revision id
-    pub async fn get_transaction_by_revision(
-        &self,
-        revision_id: &RevId,
-    ) -> Result<Transaction, Error> {
-        Ok(self
-            .config
-            .storage
-            .get_transaction(revision_id.into())
-            .await?)
-    }
-
     /// Returns the transaction object by a given transaction id
-    pub async fn get_transaction(&self, transaction_id: &TxId) -> Result<Transaction, Error> {
-        Ok(self
-            .config
+    pub async fn get_transaction(&self, transaction_id: TxId) -> Result<Transaction, Error> {
+        let filter = Filter {
+            ids: vec![transaction_id],
+            limit: 1,
+            ..Default::default()
+        };
+        self.config
             .storage
-            .get_transaction(transaction_id.clone().into())
-            .await?)
+            .find(filter)
+            .await?
+            .pop()
+            .ok_or(Error::TxNotFound)
     }
 
     /// Returns all transactions from a given account. It can be optionally be
     /// sorted by transaction type. The transactions are sorted from newest to
     /// oldest.
-    pub async fn get_transactions(
-        &self,
-        account_id: &AccountId,
-        types: Vec<Type>,
-    ) -> Result<Vec<Transaction>, Error> {
-        let types = if types.is_empty() {
-            vec![Type::Transaction, Type::Deposit, Type::Withdrawal]
-        } else {
-            types
-        };
-        Ok(self
-            .config
-            .storage
-            .get_transactions(account_id, &types, &[])
-            .await?)
+    pub async fn get_transactions(&self, filter: Filter) -> Result<Vec<Transaction>, Error> {
+        Ok(self.config.storage.find(filter).await?)
     }
 
     /// Returns the status manager
@@ -316,15 +297,22 @@ where
     /// Updates a transaction and updates their tags to this given set
     pub async fn set_tags(
         &self,
-        transaction_id: &TxId,
+        revision_id: RevId,
         tags: Vec<Tag>,
         reason: String,
     ) -> Result<Transaction, Error> {
+        let filter = Filter {
+            revisions: vec![revision_id],
+            limit: 1,
+            ..Default::default()
+        };
         Ok(self
             .config
             .storage
-            .get_transaction(transaction_id.into())
+            .find(filter)
             .await?
+            .pop()
+            .ok_or(Error::TxNotFound)?
             .set_tags(&self.config, tags, reason)
             .await?)
     }
@@ -333,15 +321,22 @@ where
     /// new transaction object is returned, otherwise an error is returned.
     pub async fn change_status(
         &self,
-        revision_id: &RevId,
+        revision_id: RevId,
         new_status: Status,
         reason: String,
     ) -> Result<Transaction, Error> {
+        let filter = Filter {
+            revisions: vec![revision_id],
+            limit: 1,
+            ..Default::default()
+        };
         Ok(self
             .config
             .storage
-            .get_transaction(revision_id.into())
+            .find(filter)
             .await?
+            .pop()
+            .ok_or(Error::TxNotFound)?
             .change_status(&self.config, new_status, reason)
             .await?)
     }

+ 3 - 70
utxo/src/storage/cache/mod.rs

@@ -1,9 +1,8 @@
 //! Cache storage implementation.
 use crate::{
     amount::AmountCents,
-    storage::{self, Error, FilterId},
-    AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, RevId, Revision, Tag, Transaction,
-    TxId, Type,
+    storage::{self, Error},
+    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, Revision, TxId,
 };
 use std::{collections::HashMap, sync::Arc};
 use tokio::sync::RwLock;
@@ -30,8 +29,6 @@ where
 pub struct Storage {
     payments: SharedHashMap<PaymentId, PaymentFrom>,
     balances: SharedHashMap<AccountId, Vec<Amount>>,
-    transactions: SharedHashMap<TxId, BaseTx>,
-    revisions: SharedHashMap<RevId, Revision>,
     revisions_by_transactions: SharedHashMap<TxId, Vec<RevId>>,
 }
 
@@ -80,59 +77,6 @@ where
         }
     }
 
-    async fn get_transaction_and_revision(
-        &self,
-        id: FilterId,
-    ) -> Result<(BaseTx, Revision), Error> {
-        let tx_cache = self.storage.transactions.read().await;
-        let rev_by_tx_cache = self.storage.revisions_by_transactions.read().await;
-        let rev_cache = self.storage.revisions.read().await;
-        match &id {
-            FilterId::LatestRevision(id) => {
-                if let Some(Some(Some(Some((tx, rev))))) = tx_cache.get(id).map(|tx| {
-                    rev_by_tx_cache.get(id).map(|revs| {
-                        revs.last().map(|last_rev| {
-                            rev_cache.get(last_rev).map(|rev| (tx.clone(), rev.clone()))
-                        })
-                    })
-                }) {
-                    return Ok((tx, rev));
-                }
-            }
-            FilterId::RevisionId(rev_id) => {
-                if let Some(Some((tx, rev))) = rev_cache.get(rev_id).map(|revision| {
-                    tx_cache
-                        .get(revision.transaction_id())
-                        .map(|tx| (tx.clone(), revision.clone()))
-                }) {
-                    return Ok((tx, rev));
-                }
-            }
-        }
-        drop(tx_cache);
-        drop(rev_by_tx_cache);
-        drop(rev_cache);
-
-        let (base_tx, rev) = self.inner.get_transaction_and_revision(id).await?;
-
-        self.storage.transactions.write().await.insert(
-            base_tx
-                .id()
-                .map_err(|e| Error::Encoding(e.to_string()))?
-                .clone(),
-            base_tx.clone(),
-        );
-
-        self.storage.revisions.write().await.insert(
-            rev.rev_id()
-                .map_err(|e| Error::Encoding(e.to_string()))?
-                .clone(),
-            rev.clone(),
-        );
-
-        Ok((base_tx, rev))
-    }
-
     async fn get_revisions(&self, transaction_id: &TxId) -> Result<Vec<RevId>, Error> {
         let cache = self.storage.revisions_by_transactions.read().await;
         if let Some(revisions) = cache.get(transaction_id) {
@@ -151,7 +95,7 @@ where
 
     async fn find_base_tx_and_revision(
         &self,
-        mut filter: Filter,
+        filter: Filter,
     ) -> Result<Vec<(BaseTx, Revision)>, Error> {
         self.inner.find_base_tx_and_revision(filter).await
     }
@@ -176,17 +120,6 @@ where
             .get_positive_unspent_payments(account, asset, target_amount)
             .await
     }
-
-    async fn get_transactions_with_latest_revision(
-        &self,
-        account: &AccountId,
-        types: &[Type],
-        tags: &[Tag],
-    ) -> Result<Vec<(BaseTx, Revision)>, Error> {
-        self.inner
-            .get_transactions_with_latest_revision(account, types, tags)
-            .await
-    }
 }
 
 #[cfg(test)]

+ 41 - 69
utxo/src/storage/mod.rs

@@ -68,48 +68,6 @@ impl TryFrom<u32> for ReceivedPaymentStatus {
     }
 }
 
-/// IDs to query transactions
-#[derive(Debug, Clone)]
-pub enum FilterId {
-    /// By transaction ID with the latest revision
-    LatestRevision(TxId),
-    /// A transaction at a given revision
-    RevisionId(RevId),
-}
-
-impl From<TxId> for FilterId {
-    fn from(id: TxId) -> Self {
-        FilterId::LatestRevision(id)
-    }
-}
-
-impl From<&TxId> for FilterId {
-    fn from(id: &TxId) -> Self {
-        FilterId::LatestRevision(id.clone())
-    }
-}
-
-impl From<RevId> for FilterId {
-    fn from(id: RevId) -> Self {
-        FilterId::RevisionId(id)
-    }
-}
-
-impl From<&RevId> for FilterId {
-    fn from(id: &RevId) -> Self {
-        FilterId::RevisionId(id.clone())
-    }
-}
-
-impl ToString for FilterId {
-    fn to_string(&self) -> String {
-        match self {
-            FilterId::LatestRevision(id) => id.to_string(),
-            FilterId::RevisionId(id) => id.to_string(),
-        }
-    }
-}
-
 #[derive(thiserror::Error, Debug, Serialize)]
 /// Storage error
 pub enum Error {
@@ -329,26 +287,9 @@ pub trait Storage {
         mut filter: Filter,
     ) -> Result<Vec<(BaseTx, Revision)>, Error>;
 
-    /// Returns a transaction by id, the transaction is returned as a tuple of base transaction and
-    /// the current revision
-    async fn get_transaction_and_revision(
-        &self,
-        transaction_id: FilterId,
-    ) -> Result<(BaseTx, Revision), Error>;
-
     /// List all revisions for a given transaction
     async fn get_revisions(&self, transaction_id: &TxId) -> Result<Vec<RevId>, Error>;
 
-    /// Returns a list of a transactions for a given account (and optionally
-    /// filter by types). The list of transactions is expected to be sorted by
-    /// date desc (newest transactions first),
-    async fn get_transactions_with_latest_revision(
-        &self,
-        account: &AccountId,
-        types: &[Type],
-        tags: &[Tag],
-    ) -> Result<Vec<(BaseTx, Revision)>, Error>;
-
     /// Returns a revision with a transaction object by id
     async fn find(&self, filter: Filter) -> Result<Vec<Transaction>, Error> {
         let mut results = vec![];
@@ -472,7 +413,11 @@ pub mod test {
             .await
             .expect("update tx");
         storing.rollback().await.expect("rollback");
-        assert!(storage.get_transaction((&deposit.id).into()).await.is_err());
+        assert!(storage
+            .find(deposit.id.clone().into())
+            .await
+            .expect("result")
+            .is_empty());
 
         let mut storing = storage.begin().await.expect("valid tx");
         storing
@@ -492,7 +437,10 @@ pub mod test {
             .await
             .expect("update tx");
         storing.commit().await.expect("commit");
-        assert!(storage.get_transaction((&deposit.id).into()).await.is_ok());
+        assert_eq!(
+            storage.find(deposit.id.into()).await.expect("result").len(),
+            1
+        );
     }
 
     pub async fn transaction_not_available_until_commit<T>(storage: T)
@@ -526,9 +474,17 @@ pub mod test {
             )
             .await
             .expect("update tx");
-        assert!(storage.get_transaction((&deposit.id).into()).await.is_err());
+        assert!(storage
+            .find(deposit.id.clone().into())
+            .await
+            .expect("results")
+            .is_empty());
         storing.rollback().await.expect("rollback");
-        assert!(storage.get_transaction((&deposit.id).into()).await.is_err());
+        assert!(storage
+            .find(deposit.id.clone().into())
+            .await
+            .expect("results")
+            .is_empty());
 
         let mut storing = storage.begin().await.expect("valid tx");
         storing
@@ -548,7 +504,14 @@ pub mod test {
             .await
             .expect("update tx");
         storing.commit().await.expect("commit");
-        assert!(storage.get_transaction((&deposit.id).into()).await.is_ok());
+        assert_eq!(
+            1,
+            storage
+                .find(deposit.id.into())
+                .await
+                .expect("results")
+                .len()
+        );
     }
 
     pub async fn does_not_update_spent_payments<T>(storage: T)
@@ -848,10 +811,15 @@ pub mod test {
 
         batch.commit().await.expect("valid commit");
 
+        let filter = Filter {
+            kind: vec![Type::Deposit],
+            ..Default::default()
+        };
+
         assert_eq!(
             1,
             storage
-                .get_transactions(&account1, &[Type::Deposit], &[])
+                .find(filter.clone().account(account1.clone()))
                 .await
                 .expect("valid tx")
                 .len()
@@ -859,7 +827,7 @@ pub mod test {
         assert_eq!(
             1,
             storage
-                .get_transactions(&account2, &[Type::Deposit], &[])
+                .find(filter.clone().account(account2.clone()))
                 .await
                 .expect("valid tx")
                 .len()
@@ -867,16 +835,20 @@ pub mod test {
         assert_eq!(
             0,
             storage
-                .get_transactions(&account3, &[Type::Deposit], &[])
+                .find(filter.account(account3.clone()))
                 .await
                 .expect("valid tx")
                 .len()
         );
-        for account in &[account1, account2, account3] {
+        for account in [account1, account2, account3] {
             assert_eq!(
                 0,
                 storage
-                    .get_transactions(account, &[Type::Withdrawal], &[])
+                    .find(Filter {
+                        kind: vec![Type::Withdrawal],
+                        accounts: vec![account],
+                        ..Default::default()
+                    })
                     .await
                     .expect("valid tx")
                     .len()

+ 1 - 1
utxo/src/storage/sqlite/batch.rs

@@ -315,7 +315,7 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             .map_err(|e| Error::Storage(e.to_string()))?;
 
         for tag in tags {
-            sqlx::query(r#"INSERT INTO "transaction_by_tags" VALUES(?, ?)"#)
+            sqlx::query(r#"INSERT INTO "transactions_by_tags" VALUES(?, ?)"#)
                 .bind(transaction_id.to_string())
                 .bind(tag.as_str())
                 .execute(&mut *self.inner)

+ 69 - 165
utxo/src/storage/sqlite/mod.rs

@@ -1,10 +1,9 @@
 //! SQLite storage layer for Verax
 use crate::{
     amount::AmountCents,
-    storage::{Error, FilterId, Storage},
+    storage::{Error, Storage},
     transaction::{Revision, Type},
-    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, Tag, Transaction,
-    TxId,
+    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, TxId,
 };
 use borsh::from_slice;
 use futures::TryStreamExt;
@@ -23,6 +22,7 @@ pub struct SQLite {
     db: sqlx::SqlitePool,
 }
 
+#[derive(Debug, Clone)]
 enum PrimaryFilter {
     Id(Vec<TxId>),
     Revision(Vec<RevId>),
@@ -31,16 +31,17 @@ enum PrimaryFilter {
     Stream,
 }
 
+#[derive(Debug, Clone)]
 struct Cursor {
     primary_filter: PrimaryFilter,
     filter: Filter,
-    skip: usize,
 }
 
 /// Receives a cursor and does the initial loading of transactions that may match the requested
 /// filter.
 ///
 /// The function will load the full transaction of candidates and return them.
+#[allow(warnings)]
 async fn find_candidates<'e, 'c: 'e, E>(
     executor: E,
     cursor: &mut Cursor,
@@ -61,6 +62,34 @@ where
     };
 
     let rows = match &cursor.primary_filter {
+        PrimaryFilter::Revision(rev_ids) => {
+            let sql = format!(
+                r#"
+                    SELECT
+                        "bt"."blob",
+                        "b"."blob"
+                    FROM
+                        "base_transactions" as "bt",
+                        "revisions" as "b"
+                    WHERE
+                        "b"."revision_id" IN ({})
+                        AND "b"."transaction_id" = "bt"."transaction_id"
+                        {} {}
+                    ORDER BY "b"."created_at" DESC
+                    LIMIT {} OFFSET {}
+                    "#,
+                "?,".repeat(rev_ids.len()).trim_end_matches(","),
+                since,
+                until,
+                limit,
+                cursor.filter.skip
+            );
+            let mut query = sqlx::query(&sql);
+            for id in rev_ids.iter() {
+                query = query.bind(id.to_string());
+            }
+            query.fetch_all(executor).await
+        }
         PrimaryFilter::Id(ids) => {
             let sql = format!(
                 r#"
@@ -77,11 +106,13 @@ where
                         AND "t"."transaction_id" = "bt"."transaction_id"
                         {} {}
                     ORDER BY "t"."created_at" DESC
-                    LIMIT ? OFFSET ?
+                    LIMIT {} OFFSET {}
                     "#,
                 "?,".repeat(ids.len()).trim_end_matches(","),
                 since,
-                until
+                until,
+                limit,
+                cursor.filter.skip
             );
             let mut query = sqlx::query(&sql);
             for id in ids.iter() {
@@ -89,41 +120,41 @@ where
             }
             query
                 .bind(limit as i64)
-                .bind(cursor.skip as i64)
+                .bind(cursor.filter.skip as i64)
                 .fetch_all(executor)
                 .await
         }
-        PrimaryFilter::Revision(rev_ids) => {
+        PrimaryFilter::Account(account_ids) => {
             let sql = format!(
                 r#"
                     SELECT
                         "bt"."blob",
                         "b"."blob"
                     FROM
+                        "transaction_accounts" as "ta",
                         "transactions" as "t",
                         "base_transactions" as "bt",
                         "revisions" as "b"
                     WHERE
-                        "b"."revision_id" IN ({})
+                        "ta"."account_id" IN ({})
+                        AND "t"."transaction_id" = "ta"."transaction_id"
                         AND "t"."revision_id" = "b"."revision_id"
                         AND "t"."transaction_id" = "bt"."transaction_id"
                         {} {}
-                    ORDER BY "t"."created_at" DESC
-                    LIMIT ? OFFSET ?
+                    ORDER BY "ta"."created_at" DESC
+                    LIMIT {} OFFSET {}
                     "#,
-                "?,".repeat(rev_ids.len()).trim_end_matches(","),
+                "?,".repeat(account_ids.len()).trim_end_matches(","),
                 since,
-                until
+                until,
+                limit,
+                cursor.filter.skip
             );
             let mut query = sqlx::query(&sql);
-            for id in rev_ids.iter() {
+            for id in account_ids.iter() {
                 query = query.bind(id.to_string());
             }
-            query
-                .bind(limit as i64)
-                .bind(cursor.skip as i64)
-                .fetch_all(executor)
-                .await
+            query.fetch_all(executor).await
         }
         _ => todo!(),
     };
@@ -147,7 +178,7 @@ where
         })
         .collect::<Result<Vec<_>, Error>>()?;
 
-    cursor.skip += results.len();
+    cursor.filter.skip += results.len();
 
     Ok(results)
 }
@@ -192,7 +223,6 @@ impl SQLite {
         CREATE TABLE IF NOT EXISTS "transactions_by_tags" (
             "transaction_id" VARCHAR(67),
             "tag" VARCHAR(250) NOT NULL,
-            "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
             PRIMARY KEY ("transaction_id", "tag")
         );
         CREATE TABLE IF NOT EXISTS "payments" (
@@ -463,64 +493,6 @@ impl Storage for SQLite {
         .collect::<Result<Result<Vec<_>, _>, _>>()??)
     }
 
-    async fn get_transaction_and_revision(
-        &self,
-        id: FilterId,
-    ) -> Result<(BaseTx, Revision), Error> {
-        let mut conn = self
-            .db
-            .acquire()
-            .await
-            .map_err(|e| Error::Storage(e.to_string()))?;
-        let row = sqlx::query(match id {
-            FilterId::LatestRevision(_) => {
-                r#"
-            SELECT
-                "bt"."blob",
-                "b"."blob"
-            FROM
-                "transactions" as "t",
-                "base_transactions" as "bt",
-                "revisions" as "b"
-            WHERE
-                "t"."transaction_id" = ?
-                AND "t"."revision_id" = "b"."revision_id"
-                AND "t"."transaction_id" = "bt"."transaction_id"
-        "#
-            }
-            FilterId::RevisionId(_) => {
-                r#"
-            SELECT
-                "bt"."blob",
-                "b"."blob"
-            FROM
-                "base_transactions" as "bt",
-                "revisions" as "b"
-            WHERE
-                "b"."revision_id" = ?
-                AND "b"."transaction_id" = "bt"."transaction_id"
-                "#
-            }
-        })
-        .bind(id.to_string())
-        .fetch_one(&mut *conn)
-        .await
-        .map_err(|e| Error::Storage(e.to_string()))?;
-
-        let transaction = row
-            .try_get::<Vec<u8>, usize>(0)
-            .map_err(|e| Error::Storage(e.to_string()))?;
-
-        let revision = row
-            .try_get::<Vec<u8>, usize>(1)
-            .map_err(|e| Error::Storage(e.to_string()))?;
-
-        Ok((
-            from_slice::<BaseTx>(&transaction)?,
-            from_slice::<Revision>(&revision)?,
-        ))
-    }
-
     async fn find_base_tx_and_revision(
         &self,
         mut filter: Filter,
@@ -552,7 +524,6 @@ impl Storage for SQLite {
         let mut cursor = Cursor {
             primary_filter,
             filter,
-            skip: 0,
         };
 
         let candidate_limits = 20;
@@ -573,6 +544,20 @@ impl Storage for SQLite {
                 {
                     continue;
                 }
+                if !cursor.filter.revisions.is_empty()
+                    && cursor
+                        .filter
+                        .revisions
+                        .binary_search(&candidate.1.rev_id().expect("vv"))
+                        .is_err()
+                {
+                    continue;
+                }
+                if !cursor.filter.kind.is_empty()
+                    && cursor.filter.kind.binary_search(&candidate.0.typ).is_err()
+                {
+                    continue;
+                }
                 if !cursor.filter.tags.is_empty() {
                     let mut found = false;
                     for tag in candidate.1.tags.iter() {
@@ -595,91 +580,10 @@ impl Storage for SQLite {
             }
         }
 
-        Ok(if cursor.filter.limit != 0 {
-            results.split_off(cursor.filter.limit)
-        } else {
-            results
-        })
-    }
-
-    async fn get_transactions_with_latest_revision(
-        &self,
-        account: &AccountId,
-        types: &[Type],
-        _tags: &[Tag],
-    ) -> Result<Vec<(BaseTx, Revision)>, Error> {
-        let mut conn = self
-            .db
-            .acquire()
-            .await
-            .map_err(|e| Error::Storage(e.to_string()))?;
-
-        let sql = if types.is_empty() {
-            r#"SELECT
-                "t"."transaction_id",
-                "t"."revision_id",
-                "bt"."blob",
-                "b"."blob"
-            FROM
-                "transaction_accounts" as "ta",
-                "base_transactions" as "bt",
-                "transactions" as "t",
-                "revisions" as "b"
-            WHERE
-                "ta"."account_id" = ?
-                AND "t"."transaction_id" = "ta"."transaction_id"
-                AND "t"."revision_id" = "b"."revision_id"
-                AND "t"."transaction_id" = "bt"."transaction_id"
-            ORDER BY "ta"."id" DESC"#
-                .to_owned()
-        } else {
-            let types = types
-                .into_iter()
-                .map(|t| u32::from(t).to_string())
-                .collect::<Vec<_>>()
-                .join(",");
-            format!(
-                r#"SELECT
-                    "t"."transaction_id",
-                    "t"."revision_id",
-                    "bt"."blob",
-                    "b"."blob"
-                FROM
-                    "transaction_accounts" as "ta",
-                    "base_transactions" as "bt",
-                    "transactions" as "t",
-                    "revisions" as "b"
-                WHERE
-                    "account_id" = ?
-                    AND "t"."transaction_id" = "ta"."transaction_id"
-                    AND "t"."revision_id" = "b"."revision_id"
-                    AND "t"."transaction_id" = "bt"."transaction_id"
-                    AND "ta"."type" IN ({types})
-                ORDER BY "ta"."id" DESC"#,
-            )
-        };
-
-        sqlx::query(&sql)
-            .bind(account.to_string())
-            .fetch_all(&mut *conn)
-            .await
-            .map_err(|e| Error::Storage(e.to_string()))?
-            .into_iter()
-            .map(|row| {
-                let base_tx = row
-                    .try_get::<Vec<u8>, usize>(2)
-                    .map_err(|e| Error::Storage(e.to_string()))?;
-
-                let revision = row
-                    .try_get::<Vec<u8>, usize>(3)
-                    .map_err(|e| Error::Storage(e.to_string()))?;
-
-                Ok((
-                    from_slice::<BaseTx>(&base_tx)?,
-                    from_slice::<Revision>(&revision)?,
-                ))
-            })
-            .collect::<Result<Vec<_>, _>>()
+        if cursor.filter.limit != 0 {
+            results.truncate(cursor.filter.limit);
+        }
+        Ok(results)
     }
 }
 

+ 6 - 6
utxo/src/tests/deposit.rs

@@ -25,7 +25,7 @@ async fn pending_deposit_and_failure() {
         .is_empty());
 
     ledger
-        .change_status(&rev_id, "failed".into(), "failed due test".to_owned())
+        .change_status(rev_id, "failed".into(), "failed due test".to_owned())
         .await
         .expect("valid tx");
 
@@ -130,7 +130,7 @@ async fn balance_decreases_while_pending_spending_and_confirm() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(&rev_id, "settled".into(), "ready".to_owned())
+        .change_status(rev_id, "settled".into(), "ready".to_owned())
         .await
         .expect("valid tx");
 
@@ -187,7 +187,7 @@ async fn balance_decreases_while_pending_spending_and_cancel() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(&rev_id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(rev_id, "cancelled".into(), "cancelled by test".to_owned())
         .await
         .expect("valid tx");
 
@@ -238,7 +238,7 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     let new_rev_id = ledger
-        .change_status(&rev_id, "processing".into(), "processing now".to_owned())
+        .change_status(rev_id, "processing".into(), "processing now".to_owned())
         .await
         .expect("valid tx")
         .revision_id;
@@ -254,7 +254,7 @@ async fn balance_decreases_while_pending_spending_and_failed() {
         "Transaction: Error in status: Invalid transition from processing to cancelled".to_owned(),
         ledger
             .change_status(
-                &new_rev_id,
+                new_rev_id.clone(),
                 "cancelled".into(),
                 "cancelled by user".to_owned()
             )
@@ -271,7 +271,7 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(&new_rev_id, "failed".into(), "it has failed".to_owned())
+        .change_status(new_rev_id, "failed".into(), "it has failed".to_owned())
         .await
         .expect("valid");
 

+ 14 - 9
utxo/src/tests/tx.rs

@@ -1,5 +1,5 @@
 use super::{deposit, get_asset_manager_and_ledger};
-use crate::{AccountId, Asset, Type};
+use crate::{AccountId, Asset, Filter, Type};
 
 #[tokio::test]
 async fn multi_account_transfers() {
@@ -39,18 +39,23 @@ async fn multi_account_transfers() {
         ledger.get_balance(&target).await.expect("balance")
     );
 
-    let rev_id = ledger
-        .get_transactions(&accounts[0], vec![Type::Exchange])
-        .await
-        .expect("valid")[0]
+    let filter = Filter {
+        accounts: vec![accounts[0].clone()],
+        kind: vec![Type::Exchange],
+        ..Default::default()
+    };
+
+    let rev_id = ledger.get_transactions(filter).await.expect("valid")[0]
         .revision_id
         .clone();
 
     for _ in &accounts {
-        let txs = ledger
-            .get_transactions(&accounts[0], vec![Type::Exchange])
-            .await
-            .expect("valid");
+        let filter = Filter {
+            accounts: vec![accounts[0].clone()],
+            kind: vec![Type::Exchange],
+            ..Default::default()
+        };
+        let txs = ledger.get_transactions(filter).await.expect("valid");
         assert_eq!(1, txs.len());
         assert_eq!(rev_id, txs[0].revision_id);
     }

+ 1 - 1
utxo/src/tests/withdrawal.rs

@@ -198,7 +198,7 @@ async fn cancelled_withdrawal() {
     );
 
     ledger
-        .change_status(&rev_id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(rev_id, "cancelled".into(), "cancelled by test".to_owned())
         .await
         .expect("valid tx");