Pārlūkot izejas kodu

Working on a better data model

Update by using a revid
Cesar Rodas 10 mēneši atpakaļ
vecāks
revīzija
3050ce8b52

+ 47 - 43
client.js

@@ -33,45 +33,47 @@ async function deposit(account, amount, asset) {
 }
 
 async function trade(amount, asset, from, amount_to, asset_to, to) {
+  const request = {
+    memo: "trade",
+    debit: [
+      {
+        account: from,
+        amount: amount.toString(),
+        asset
+      },
+      {
+        account: to,
+        amount: amount_to.toString(),
+        asset: asset_to,
+      }
+    ],
+    credit: [
+      {
+        account: to,
+        amount: (amount * (1 - percentage)).toString(),
+        asset
+      },
+      {
+        account: from,
+        amount: amount_to.toString(),
+        asset: asset_to
+      },
+      {
+        account: fee,
+        amount: (amount * percentage).toString(),
+        asset
+      }
+    ],
+    status: 'pending',
+    asset,
+  }
+  console.log(request)
   const response = (await fetch("http://127.0.0.1:8080/tx", {
     method: "POST",
     headers: {
       "Content-Type": "application/json",
     },
-    body: JSON.stringify({
-      memo: "trade",
-      debit: [
-        {
-          account: from,
-          amount: amount.toString(),
-          asset
-        },
-        {
-          account: to,
-          amount: amount_to.toString(),
-          asset: asset_to,
-        }
-      ],
-      credit: [
-        {
-          account: to,
-          amount: (amount * (1 - percentage)).toString(),
-          asset
-        },
-        {
-          account: from,
-          amount: amount_to.toString(),
-          asset: asset_to
-        },
-        {
-          account: fee,
-          amount: (amount * percentage).toString(),
-          asset
-        }
-      ],
-      status: 'pending',
-      asset,
-    })
+    body: JSON.stringify(request)
   }));
   return response.json();
 }
@@ -90,19 +92,21 @@ async function change_status(id, s_status) {
   return response.json();
 }
 
-async function test() {
-  let d = (await deposit(addr1, 1512312, "BTC/8"));
+async function depositAndConfirm(account, amount, asset) {
+  let d = (await deposit(account, amount, asset));
   dbg(d);
-  return
-  dbg(await change_status(d._id, 'settled'));
-  d = (await deposit(addr2, 1001234, "USD/4"));
-  dbg(await change_status(d._id, 'settled'));
+  dbg(await change_status(d._rev, 'settled'));
+}
 
-  const t = await trade(1, "BTC/8", addr1, 26751.11, "USD/4", addr2);
+async function test() {
+  await depositAndConfirm(addr1, 111512312, "BTC/8");
+  await depositAndConfirm(addr2, 300512312, "USD/4");
+  let t = await trade(1, "BTC/8", addr1, 26751.11, "USD/4", addr2);
   dbg(t);
-  dbg(await change_status(t._id, 'processing',));
+  t = await change_status(t._rev, 'processing',);
+  dbg(t)
   console.log('set settle')
-  dbg(await change_status(t._id, 'settled'));
+  dbg(await change_status(t._rev, 'settled'));
   dbg(await get_balance(addr1));
   dbg(await get_balance(addr2));
   dbg(await get_balance(fee));

+ 19 - 5
src/main.rs

@@ -4,7 +4,7 @@ use actix_web::{
 };
 use serde::{Deserialize, Serialize};
 use serde_json::json;
-use verax::{AccountId, AnyAmount, AnyId, Asset, Status, TxId, Type};
+use verax::{AccountId, AnyAmount, AnyId, Asset, RevId, Status, Type};
 
 #[derive(Deserialize)]
 pub struct Movement {
@@ -66,7 +66,7 @@ pub struct UpdateTransaction {
 impl UpdateTransaction {
     pub async fn to_ledger_transaction(
         self,
-        id: &TxId,
+        id: &RevId,
         ledger: &Ledger,
     ) -> Result<verax::Transaction, verax::Error> {
         ledger
@@ -141,6 +141,20 @@ async fn get_info(info: web::Path<AnyId>, ledger: web::Data<Ledger>) -> impl Res
             )
             .await
             .map(|transactions| HttpResponse::Ok().json(transactions)),
+        AnyId::Revision(rev_id) => ledger
+            ._inner
+            .get_transaction_by_revision(&rev_id)
+            .await
+            .map(|rev| {
+                HttpResponse::Ok()
+                    .header(
+                        "Cache-Control",
+                        "public, max-age=31536000, s-maxage=31536000, immutable",
+                    )
+                    .header("Vary", "Accept-Encoding")
+                    .json(rev)
+            }),
+
         AnyId::Transaction(transaction_id) => ledger
             ._inner
             .get_transaction(&transaction_id)
@@ -206,7 +220,7 @@ async fn create_transaction(
 
 #[post("/{id}")]
 async fn update_status(
-    info: web::Path<TxId>,
+    info: web::Path<RevId>,
     item: web::Json<UpdateTransaction>,
     ledger: web::Data<Ledger>,
 ) -> impl Responder {
@@ -247,8 +261,8 @@ async fn main() -> std::io::Result<()> {
     let storage = verax::storage::SQLite::new(pool.clone());
     storage.setup().await.expect("setup");
 
-    let inner_storage = verax::storage::SQLite::new(pool.clone());
-    let storage = verax::storage::Cache::new(inner_storage);
+    let storage = verax::storage::SQLite::new(pool.clone());
+    let storage = verax::storage::Cache::new(storage);
     let ledger = verax::Ledger::new(storage.into());
 
     HttpServer::new(move || {

+ 8 - 2
utxo/src/id/mod.rs

@@ -82,6 +82,8 @@ crate::BinaryId!(RevId, "rev");
 pub enum AnyId {
     /// TxId
     Transaction(TxId),
+    /// Revision ID
+    Revision(RevId),
     /// Payment
     Payment(PaymentId),
     /// Account ID
@@ -99,8 +101,12 @@ impl FromStr for AnyId {
                     transaction: tx.parse()?,
                     position: (pos[1..]).parse()?,
                 }));
-            } else {
-                return Ok(Self::Transaction(value.parse()?));
+            } else if let Ok(id) = value.parse() {
+                return Ok(Self::Transaction(id));
+            }
+        } else if value.starts_with("rev") {
+            if let Ok(id) = value.parse() {
+                return Ok(Self::Revision(id));
             }
         }
 

+ 22 - 5
utxo/src/ledger.rs

@@ -1,6 +1,7 @@
 use crate::{
     amount::AmountCents, config::Config, status::StatusManager, storage::Storage,
-    transaction::Type, AccountId, Amount, Error, PaymentFrom, PaymentId, Status, Transaction, TxId,
+    transaction::Type, AccountId, Amount, Error, PaymentFrom, PaymentId, RevId, Status,
+    Transaction, TxId,
 };
 use std::{cmp::Ordering, collections::HashMap, sync::Arc};
 
@@ -266,9 +267,25 @@ where
         todo!()
     }
 
+    /// Returns the transaction object by a given revision id
+    pub async fn get_transaction_by_revision(
+        &self,
+        revision_id: &RevId,
+    ) -> Result<Transaction, Error> {
+        Ok(self
+            .config
+            .storage
+            .get_transaction(revision_id.into())
+            .await?)
+    }
+
     /// Returns the transaction object by a given transaction id
     pub async fn get_transaction(&self, transaction_id: &TxId) -> Result<Transaction, Error> {
-        Ok(self.config.storage.get_transaction(transaction_id).await?)
+        Ok(self
+            .config
+            .storage
+            .get_transaction(transaction_id.clone().into())
+            .await?)
     }
 
     /// Returns all transactions from a given account. It can be optionally be
@@ -306,7 +323,7 @@ where
         Ok(self
             .config
             .storage
-            .get_transaction(transaction_id)
+            .get_transaction(transaction_id.into())
             .await?
             .set_tags(&self.config, tags, reason)
             .await?)
@@ -316,14 +333,14 @@ where
     /// new transaction object is returned, otherwise an error is returned.
     pub async fn change_status(
         &self,
-        transaction_id: &TxId,
+        revision_id: &RevId,
         new_status: Status,
         reason: String,
     ) -> Result<Transaction, Error> {
         Ok(self
             .config
             .storage
-            .get_transaction(transaction_id)
+            .get_transaction(revision_id.into())
             .await?
             .change_status(&self.config, new_status, reason)
             .await?)

+ 1 - 1
utxo/src/lib.rs

@@ -48,6 +48,6 @@ pub use self::{
     ledger::Ledger,
     payment::PaymentFrom,
     serde::*,
-    status::Status,
+    status::{Status, StatusManager},
     transaction::*,
 };

+ 26 - 8
utxo/src/status.rs

@@ -3,23 +3,38 @@ use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 
 /// Status type
+///
+/// The statuses are bare strings of at most 10 characters
 pub type Status = MaxLengthString<10>;
 
 #[derive(Debug, Serialize, Deserialize)]
+/// Status manager
+///
+/// This struct manages the status of the payments and their life cycle.
 pub struct StatusManager {
+    /// List of all statuses
     statuses: Vec<Status>,
+    /// List of all spendable statuses
     spendable: Vec<Status>,
+    /// The default spendable status
     default_spendable: Status,
+    /// List of all reverted statuses
     reverted: Vec<Status>,
-    withdrawable: Vec<Status>,
     transition: HashMap<Status, Vec<Status>>,
 }
 
 #[derive(Debug, Clone, Copy)]
-pub enum StatusType {
+/// Internal status type
+///
+/// There could be many statuses, but they are translated into three types:
+pub enum InternalStatus {
+    /// The payment is spendable, because it is final and it has been successfully processed
     Spendable,
+    /// The payment is reverted, because it has been cancelled or failed
     Reverted,
-    NoChange,
+    /// The payment is not created but still not final, it may be visible or not on the account's
+    /// balance, but it is not spendable
+    Pending,
 }
 
 /// Status error object
@@ -33,6 +48,7 @@ pub enum Error {
 }
 
 impl StatusManager {
+    /// Creates a new status from a given string
     pub fn new_status(&self, status_name: &str) -> Result<Status, Error> {
         self.statuses
             .iter()
@@ -41,20 +57,23 @@ impl StatusManager {
             .ok_or(Error::UnknownStatus(status_name.to_owned()))
     }
 
-    pub fn typ(&self, status: &Status) -> StatusType {
+    /// Translated the status into an internal status type
+    pub fn internal_type(&self, status: &Status) -> InternalStatus {
         if self.is_spendable(status) {
-            StatusType::Spendable
+            InternalStatus::Spendable
         } else if self.is_reverted(status) {
-            StatusType::Reverted
+            InternalStatus::Reverted
         } else {
-            StatusType::NoChange
+            InternalStatus::Pending
         }
     }
 
+    /// Whether the status is spendable or not
     pub fn spendables(&self) -> &[Status] {
         &self.spendable
     }
 
+    /// Whether the status is reverted or not
     pub fn default_spendable(&self) -> Status {
         self.default_spendable.clone()
     }
@@ -105,7 +124,6 @@ impl Default for StatusManager {
             default_spendable: settled.clone(),
             spendable: vec![settled.clone()],
             reverted: vec![cancelled.clone(), failed.clone()],
-            withdrawable: vec![settled.clone()],
             transition: {
                 let mut map = HashMap::new();
                 map.insert(

+ 10 - 19
utxo/src/storage/cache/batch.rs

@@ -1,10 +1,10 @@
-use super::CacheStorage;
+use super::Storage;
 use crate::{
     payment::PaymentTo,
     storage::{Batch, Error, ReceivedPaymentStatus},
-    AccountId, Amount, BaseTx, PaymentFrom, PaymentId, RevId, Revision, Transaction, TxId,
+    AccountId, BaseTx, PaymentId, RevId, Revision, TxId,
 };
-use std::{collections::HashMap, marker::PhantomData};
+use std::{collections::HashMap, marker::PhantomData, sync::Arc};
 
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub enum Ids {
@@ -18,9 +18,7 @@ where
     B: Batch<'a> + Send,
 {
     inner: B,
-    payments: CacheStorage<PaymentId, PaymentFrom>,
-    balances: CacheStorage<AccountId, Vec<Amount>>,
-    transactions: CacheStorage<TxId, Transaction>,
+    storage: Arc<Storage>,
     to_invalidate: HashMap<Ids, ()>,
     _phantom: PhantomData<&'a ()>,
 }
@@ -29,17 +27,10 @@ impl<'a, B> CacheBatch<'a, B>
 where
     B: Batch<'a> + Send,
 {
-    pub fn new(
-        batch: B,
-        payments: CacheStorage<PaymentId, PaymentFrom>,
-        balances: CacheStorage<AccountId, Vec<Amount>>,
-        transactions: CacheStorage<TxId, Transaction>,
-    ) -> Self {
+    pub fn new(batch: B, storage: Arc<Storage>) -> Self {
         Self {
             inner: batch,
-            payments,
-            balances,
-            transactions,
+            storage,
             to_invalidate: HashMap::new(),
             _phantom: PhantomData,
         }
@@ -52,16 +43,16 @@ where
     B: Batch<'a> + Send,
 {
     async fn commit(self) -> Result<(), Error> {
-        let mut payments = self.payments.write().await;
-        let mut balances = self.balances.write().await;
-        let mut transactions = self.transactions.write().await;
+        let mut payments = self.storage.payments.write().await;
+        let mut balances = self.storage.balances.write().await;
+        let mut revision_by_transactions = self.storage.revisions_by_transactions.write().await;
 
         self.inner.commit().await?;
 
         for (id, _) in self.to_invalidate {
             match id {
                 Ids::Transaction(id) => {
-                    transactions.remove(&id);
+                    revision_by_transactions.remove(&id);
                 }
                 Ids::Payment(id) => {
                     payments.remove(&id);

+ 91 - 30
utxo/src/storage/cache/mod.rs

@@ -1,15 +1,15 @@
 //! Cache storage implementation.
 use crate::{
     amount::AmountCents,
-    storage::{Error, Storage},
-    AccountId, Amount, Asset, PaymentFrom, PaymentId, Transaction, TxId, Type,
+    storage::{self, Error, FilterId},
+    AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, RevId, Revision, TxId, Type,
 };
 use std::{collections::HashMap, sync::Arc};
 use tokio::sync::RwLock;
 
 mod batch;
 
-pub(crate) type CacheStorage<K, V> = Arc<RwLock<HashMap<K, V>>>;
+type SharedHashMap<K, V> = RwLock<HashMap<K, V>>;
 
 /// Cache storage implementation.
 ///
@@ -18,33 +18,39 @@ pub(crate) type CacheStorage<K, V> = Arc<RwLock<HashMap<K, V>>>;
 /// invalidation.
 pub struct Cache<S>
 where
-    S: Storage + Sync + Send,
+    S: storage::Storage + Sync + Send,
 {
-    payments: CacheStorage<PaymentId, PaymentFrom>,
-    balances: CacheStorage<AccountId, Vec<Amount>>,
-    transactions: CacheStorage<TxId, Transaction>,
     inner: S,
+    storage: Arc<Storage>,
+}
+
+#[derive(Debug, Default)]
+/// Cache storage.
+pub struct Storage {
+    payments: SharedHashMap<PaymentId, PaymentFrom>,
+    balances: SharedHashMap<AccountId, Vec<Amount>>,
+    transactions: SharedHashMap<TxId, BaseTx>,
+    revisions: SharedHashMap<RevId, Revision>,
+    revisions_by_transactions: SharedHashMap<TxId, Vec<RevId>>,
 }
 
 impl<S> Cache<S>
 where
-    S: Storage + Sync + Send,
+    S: storage::Storage + Sync + Send,
 {
     /// Create a new cache storage.
     pub fn new(storage: S) -> Self {
         Self {
-            payments: Arc::new(RwLock::new(HashMap::new())),
-            balances: Arc::new(RwLock::new(HashMap::new())),
-            transactions: Arc::new(RwLock::new(HashMap::new())),
+            storage: Arc::new(Storage::default()),
             inner: storage,
         }
     }
 }
 
 #[async_trait::async_trait]
-impl<S> Storage for Cache<S>
+impl<S> storage::Storage for Cache<S>
 where
-    S: Storage + Sync + Send,
+    S: storage::Storage + Sync + Send,
 {
     type Batch<'a> = batch::CacheBatch<'a, S::Batch<'a>>
     where
@@ -53,20 +59,19 @@ where
     async fn begin<'a>(&'a self) -> Result<Self::Batch<'a>, Error> {
         Ok(batch::CacheBatch::new(
             self.inner.begin().await?,
-            self.payments.clone(),
-            self.balances.clone(),
-            self.transactions.clone(),
+            self.storage.clone(),
         ))
     }
 
     async fn get_balance(&self, account_id: &AccountId) -> Result<Vec<Amount>, Error> {
-        let cache = self.balances.read().await;
+        let cache = self.storage.balances.read().await;
         if let Some(balances) = cache.get(account_id).cloned() {
             Ok(balances)
         } else {
             drop(cache);
             let result = self.inner.get_balance(account_id).await?;
-            self.balances
+            self.storage
+                .balances
                 .write()
                 .await
                 .insert(account_id.clone(), result.clone());
@@ -74,18 +79,72 @@ where
         }
     }
 
-    async fn get_transaction(&self, id: &TxId) -> Result<Transaction, Error> {
-        let transactions = self.transactions.read().await;
-        if let Some(transaction) = transactions.get(id).cloned() {
-            Ok(transaction)
+    async fn get_transaction_and_revision(
+        &self,
+        id: FilterId,
+    ) -> Result<(BaseTx, Revision), Error> {
+        let tx_cache = self.storage.transactions.read().await;
+        let rev_by_tx_cache = self.storage.revisions_by_transactions.read().await;
+        let rev_cache = self.storage.revisions.read().await;
+        match &id {
+            FilterId::LatestRevision(id) => {
+                if let Some(Some(Some(Some((tx, rev))))) = tx_cache.get(id).map(|tx| {
+                    rev_by_tx_cache.get(id).map(|revs| {
+                        revs.last().map(|last_rev| {
+                            rev_cache.get(last_rev).map(|rev| (tx.clone(), rev.clone()))
+                        })
+                    })
+                }) {
+                    return Ok((tx, rev));
+                }
+            }
+            FilterId::RevisionId(rev_id) => {
+                if let Some(Some((tx, rev))) = rev_cache.get(rev_id).map(|revision| {
+                    tx_cache
+                        .get(revision.transaction_id())
+                        .map(|tx| (tx.clone(), revision.clone()))
+                }) {
+                    return Ok((tx, rev));
+                }
+            }
+        }
+        drop(tx_cache);
+        drop(rev_by_tx_cache);
+        drop(rev_cache);
+
+        let (base_tx, rev) = self.inner.get_transaction_and_revision(id).await?;
+
+        self.storage.transactions.write().await.insert(
+            base_tx
+                .id()
+                .map_err(|e| Error::Encoding(e.to_string()))?
+                .clone(),
+            base_tx.clone(),
+        );
+
+        self.storage.revisions.write().await.insert(
+            rev.rev_id()
+                .map_err(|e| Error::Encoding(e.to_string()))?
+                .clone(),
+            rev.clone(),
+        );
+
+        Ok((base_tx, rev))
+    }
+
+    async fn get_revisions(&self, transaction_id: &TxId) -> Result<Vec<RevId>, Error> {
+        let cache = self.storage.revisions_by_transactions.read().await;
+        if let Some(revisions) = cache.get(transaction_id) {
+            Ok(revisions.clone())
         } else {
-            drop(transactions);
-            let result = self.inner.get_transaction(id).await?;
-            self.transactions
+            drop(cache);
+            let revisions = self.inner.get_revisions(transaction_id).await?;
+            self.storage
+                .revisions_by_transactions
                 .write()
                 .await
-                .insert(id.clone(), result.clone());
-            Ok(result)
+                .insert(transaction_id.clone(), revisions.clone());
+            Ok(revisions)
         }
     }
 
@@ -100,13 +159,15 @@ where
             .await
     }
 
-    async fn get_transactions(
+    async fn get_transactions_with_latest_revision(
         &self,
         account: &AccountId,
         types: &[Type],
         tags: &[String],
-    ) -> Result<Vec<Transaction>, Error> {
-        self.inner.get_transactions(account, types, tags).await
+    ) -> Result<Vec<(BaseTx, Revision)>, Error> {
+        self.inner
+            .get_transactions_with_latest_revision(account, types, tags)
+            .await
     }
 }
 

+ 109 - 17
utxo/src/storage/mod.rs

@@ -68,6 +68,48 @@ impl TryFrom<u32> for ReceivedPaymentStatus {
     }
 }
 
+/// IDs to query transactions
+#[derive(Debug, Clone)]
+pub enum FilterId {
+    /// By transaction ID with the latest revision
+    LatestRevision(TxId),
+    /// A transaction at a given revision
+    RevisionId(RevId),
+}
+
+impl From<TxId> for FilterId {
+    fn from(id: TxId) -> Self {
+        FilterId::LatestRevision(id)
+    }
+}
+
+impl From<&TxId> for FilterId {
+    fn from(id: &TxId) -> Self {
+        FilterId::LatestRevision(id.clone())
+    }
+}
+
+impl From<RevId> for FilterId {
+    fn from(id: RevId) -> Self {
+        FilterId::RevisionId(id)
+    }
+}
+
+impl From<&RevId> for FilterId {
+    fn from(id: &RevId) -> Self {
+        FilterId::RevisionId(id.clone())
+    }
+}
+
+impl ToString for FilterId {
+    fn to_string(&self) -> String {
+        match self {
+            FilterId::LatestRevision(id) => id.to_string(),
+            FilterId::RevisionId(id) => id.to_string(),
+        }
+    }
+}
+
 #[derive(thiserror::Error, Debug, Serialize)]
 /// Storage error
 pub enum Error {
@@ -84,6 +126,11 @@ pub enum Error {
     /// The date format is invalid
     InvalidDate(String),
 
+    #[error("Invalid id: {0}")]
+    #[serde(serialize_with = "crate::serialize_error_to_string")]
+    /// Invalid Id
+    InvalidId(#[from] crate::id::Error),
+
     #[error("Spend payment: {0}")]
     /// The requested payment has been spent already
     SpendPayment(String),
@@ -255,18 +302,63 @@ pub trait Storage {
     ) -> Result<Vec<(TxId, String, DateTime<Utc>)>, Error>;
     */
 
-    /// Returns a revision with a transaction object by id
-    async fn get_transaction(&self, transaction_id: &TxId) -> Result<Transaction, Error>;
+    /// Returns a transaction by id, the transaction is returned as a tuple of base transaction and
+    /// the current revision
+    async fn get_transaction_and_revision(
+        &self,
+        transaction_id: FilterId,
+    ) -> Result<(BaseTx, Revision), Error>;
+
+    /// List all revisions for a given transaction
+    async fn get_revisions(&self, transaction_id: &TxId) -> Result<Vec<RevId>, Error>;
 
     /// Returns a list of a transactions for a given account (and optionally
     /// filter by types). The list of transactions is expected to be sorted by
-    /// date desc (newest transactions first)
+    /// date desc (newest transactions first),
+    async fn get_transactions_with_latest_revision(
+        &self,
+        account: &AccountId,
+        types: &[Type],
+        tags: &[String],
+    ) -> Result<Vec<(BaseTx, Revision)>, Error>;
+
+    /// Returns a revision with a transaction object by id
+    async fn get_transaction(&self, id: FilterId) -> Result<Transaction, Error> {
+        let (base_tx, revision) = self.get_transaction_and_revision(id).await?;
+        (
+            self.get_revisions(&revision.transaction_id).await?,
+            base_tx,
+            revision,
+        )
+            .try_into()
+            .map_err(|e: crate::transaction::Error| Error::Encoding(e.to_string()))
+    }
+
+    /// Get list of transactions
     async fn get_transactions(
         &self,
         account: &AccountId,
         types: &[Type],
         tags: &[String],
-    ) -> Result<Vec<Transaction>, Error>;
+    ) -> Result<Vec<Transaction>, Error> {
+        let mut transactions = Vec::new();
+        for (base_tx, revision) in self
+            .get_transactions_with_latest_revision(account, types, tags)
+            .await?
+            .into_iter()
+        {
+            transactions.push(
+                (
+                    self.get_revisions(&revision.transaction_id).await?,
+                    base_tx,
+                    revision,
+                )
+                    .try_into()
+                    .map_err(|e: crate::transaction::Error| Error::Encoding(e.to_string()))?,
+            );
+        }
+        Ok(transactions)
+    }
 }
 
 #[cfg(test)]
@@ -341,7 +433,7 @@ pub mod test {
 
     pub async fn transaction<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let asset: Asset = "USD/2".parse().expect("valid asset");
         let deposit = Transaction::new_external_deposit(
@@ -371,7 +463,7 @@ pub mod test {
             .await
             .expect("update tx");
         storing.rollback().await.expect("rollback");
-        assert!(storage.get_transaction(&deposit.id).await.is_err());
+        assert!(storage.get_transaction((&deposit.id).into()).await.is_err());
 
         let mut storing = storage.begin().await.expect("valid tx");
         storing
@@ -391,12 +483,12 @@ pub mod test {
             .await
             .expect("update tx");
         storing.commit().await.expect("commit");
-        assert!(storage.get_transaction(&deposit.id).await.is_ok());
+        assert!(storage.get_transaction((&deposit.id).into()).await.is_ok());
     }
 
     pub async fn transaction_not_available_until_commit<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let usd: Asset = "USD/2".parse().expect("valid asset");
         let deposit = Transaction::new_external_deposit(
@@ -425,9 +517,9 @@ pub mod test {
             )
             .await
             .expect("update tx");
-        assert!(storage.get_transaction(&deposit.id).await.is_err());
+        assert!(storage.get_transaction((&deposit.id).into()).await.is_err());
         storing.rollback().await.expect("rollback");
-        assert!(storage.get_transaction(&deposit.id).await.is_err());
+        assert!(storage.get_transaction((&deposit.id).into()).await.is_err());
 
         let mut storing = storage.begin().await.expect("valid tx");
         storing
@@ -447,12 +539,12 @@ pub mod test {
             .await
             .expect("update tx");
         storing.commit().await.expect("commit");
-        assert!(storage.get_transaction(&deposit.id).await.is_ok());
+        assert!(storage.get_transaction((&deposit.id).into()).await.is_ok());
     }
 
     pub async fn does_not_update_spent_payments<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let mut writer = storage.begin().await.expect("writer");
         let mut rng = rand::thread_rng();
@@ -520,7 +612,7 @@ pub mod test {
 
     pub async fn spend_spendable_payments<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let mut writer = storage.begin().await.expect("writer");
         let mut rng = rand::thread_rng();
@@ -571,7 +663,7 @@ pub mod test {
 
     pub async fn does_not_spend_unspendable_payments<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let mut writer = storage.begin().await.expect("writer");
         let mut rng = rand::thread_rng();
@@ -623,7 +715,7 @@ pub mod test {
 
     pub async fn sorted_unspent_payments<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let mut writer = storage.begin().await.expect("writer");
         let accounts: Vec<AccountId> = (0..10)
@@ -687,7 +779,7 @@ pub mod test {
 
     pub async fn relate_account_to_transaction<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let account1: AccountId = "alice1".parse().expect("account");
         let account2: AccountId = "alice2".parse().expect("account");
@@ -769,7 +861,7 @@ pub mod test {
 
     pub async fn not_spendable_new_payments_not_spendable<T>(storage: T)
     where
-        T: Storage,
+        T: Storage + Send + Sync,
     {
         let mut writer = storage.begin().await.expect("writer");
         let mut rng = rand::thread_rng();

+ 57 - 22
utxo/src/storage/sqlite/mod.rs

@@ -1,14 +1,14 @@
 //! SQLite storage layer for Verax
 use crate::{
     amount::AmountCents,
-    storage::{Error, Storage},
+    storage::{Error, FilterId, Storage},
     transaction::{Revision, Type},
-    AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, Transaction, TxId,
+    AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, RevId, TxId,
 };
 use borsh::from_slice;
 use futures::TryStreamExt;
 use sqlx::{sqlite::SqliteRow, Executor, Row};
-use std::collections::HashMap;
+use std::{collections::HashMap, str::FromStr};
 
 mod batch;
 
@@ -242,17 +242,42 @@ impl Storage for SQLite {
         }
     }
 
-    async fn get_transaction(&self, transaction_id: &TxId) -> Result<Transaction, Error> {
+    async fn get_revisions(&self, transaction_id: &TxId) -> Result<Vec<RevId>, Error> {
         let mut conn = self
             .db
             .acquire()
             .await
             .map_err(|e| Error::Storage(e.to_string()))?;
 
-        let row = sqlx::query(
+        Ok(sqlx::query(
             r#"
+            SELECT "revision_id" FROM "revisions" WHERE "transaction_id" = ? ORDER BY "created_at" ASC
+            "#,
+        )
+        .bind(transaction_id.to_string())
+        .fetch_all(&mut *conn)
+        .await
+        .map_err(|e| Error::Storage(e.to_string()))?
+        .into_iter().map(|x| {
+              x.try_get::<String, usize>(0)
+                .map(|x| RevId::from_str(&x))
+                .map_err(|e| Error::Storage(e.to_string()))
+        }).collect::<Result<Result<Vec<_>, _>, _>>()??)
+    }
+
+    async fn get_transaction_and_revision(
+        &self,
+        id: FilterId,
+    ) -> Result<(BaseTx, Revision), Error> {
+        let mut conn = self
+            .db
+            .acquire()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+        let row = sqlx::query(match id {
+            FilterId::LatestRevision(_) => {
+                r#"
             SELECT
-                "t"."revision_id" as "current_id",
                 "bt"."blob",
                 "b"."blob"
             FROM
@@ -263,35 +288,47 @@ impl Storage for SQLite {
                 "t"."transaction_id" = ?
                 AND "t"."revision_id" = "b"."revision_id"
                 AND "t"."transaction_id" = "bt"."transaction_id"
-        "#,
-        )
-        .bind(transaction_id.to_string())
+        "#
+            }
+            FilterId::RevisionId(_) => {
+                r#"
+            SELECT
+                "bt"."blob",
+                "b"."blob"
+            FROM
+                "base_transactions" as "bt",
+                "revisions" as "b"
+            WHERE
+                "b"."revision_id" = ?
+                AND "b"."transaction_id" = "bt"."transaction_id"
+                "#
+            }
+        })
+        .bind(id.to_string())
         .fetch_one(&mut *conn)
         .await
         .map_err(|e| Error::Storage(e.to_string()))?;
 
         let transaction = row
-            .try_get::<Vec<u8>, usize>(1)
+            .try_get::<Vec<u8>, usize>(0)
             .map_err(|e| Error::Storage(e.to_string()))?;
 
         let revision = row
-            .try_get::<Vec<u8>, usize>(2)
+            .try_get::<Vec<u8>, usize>(1)
             .map_err(|e| Error::Storage(e.to_string()))?;
 
-        (
+        Ok((
             from_slice::<BaseTx>(&transaction)?,
             from_slice::<Revision>(&revision)?,
-        )
-            .try_into()
-            .map_err(|e: crate::transaction::Error| Error::Encoding(e.to_string()))
+        ))
     }
 
-    async fn get_transactions(
+    async fn get_transactions_with_latest_revision(
         &self,
         account: &AccountId,
         types: &[Type],
         _tags: &[String],
-    ) -> Result<Vec<Transaction>, Error> {
+    ) -> Result<Vec<(BaseTx, Revision)>, Error> {
         let mut conn = self
             .db
             .acquire()
@@ -358,14 +395,12 @@ impl Storage for SQLite {
                     .try_get::<Vec<u8>, usize>(3)
                     .map_err(|e| Error::Storage(e.to_string()))?;
 
-                (
+                Ok((
                     from_slice::<BaseTx>(&base_tx)?,
                     from_slice::<Revision>(&revision)?,
-                )
-                    .try_into()
-                    .map_err(|e: crate::transaction::Error| Error::Encoding(e.to_string()))
+                ))
             })
-            .collect::<Result<Vec<Transaction>, Error>>()
+            .collect::<Result<Vec<_>, _>>()
     }
 }
 

+ 21 - 16
utxo/src/tests/deposit.rs

@@ -6,7 +6,7 @@ async fn pending_deposit_and_failure() {
     let source = "account1".parse::<AccountId>().expect("account");
     let ledger = get_asset_manager_and_ledger().await;
     let usd: Asset = "USD/2".parse().expect("asset");
-    let id = ledger
+    let rev_id = ledger
         .deposit(
             &source,
             usd.from_human("30").expect("amount"),
@@ -15,7 +15,7 @@ async fn pending_deposit_and_failure() {
         )
         .await
         .expect("valid tx")
-        .id()
+        .revision_id
         .clone();
 
     assert!(ledger
@@ -25,7 +25,7 @@ async fn pending_deposit_and_failure() {
         .is_empty());
 
     ledger
-        .change_status(&id, "failed".into(), "failed due test".to_owned())
+        .change_status(&rev_id, "failed".into(), "failed due test".to_owned())
         .await
         .expect("valid tx");
 
@@ -107,7 +107,7 @@ async fn balance_decreases_while_pending_spending_and_confirm() {
         ledger.get_balance(&source).await.expect("balance")
     );
 
-    let id = ledger
+    let rev_id = ledger
         .new_transaction(
             "Exchange one".to_owned(),
             "pending".into(),
@@ -119,7 +119,7 @@ async fn balance_decreases_while_pending_spending_and_confirm() {
         )
         .await
         .expect("valid tx")
-        .id()
+        .revision_id
         .clone();
 
     assert_eq!(
@@ -130,7 +130,7 @@ async fn balance_decreases_while_pending_spending_and_confirm() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(&id, "settled".into(), "ready".to_owned())
+        .change_status(&rev_id, "settled".into(), "ready".to_owned())
         .await
         .expect("valid tx");
 
@@ -164,7 +164,7 @@ async fn balance_decreases_while_pending_spending_and_cancel() {
         ledger.get_balance(&source).await.expect("balance")
     );
 
-    let id = ledger
+    let rev_id = ledger
         .new_transaction(
             "Exchange one".to_owned(),
             "pending".into(),
@@ -176,7 +176,7 @@ async fn balance_decreases_while_pending_spending_and_cancel() {
         )
         .await
         .expect("valid tx")
-        .id()
+        .revision_id
         .clone();
 
     assert_eq!(
@@ -187,7 +187,7 @@ async fn balance_decreases_while_pending_spending_and_cancel() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(&id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(&rev_id, "cancelled".into(), "cancelled by test".to_owned())
         .await
         .expect("valid tx");
 
@@ -215,7 +215,7 @@ async fn balance_decreases_while_pending_spending_and_failed() {
         ledger.get_balance(&source).await.expect("balance")
     );
 
-    let tx = ledger
+    let rev_id = ledger
         .new_transaction(
             "Exchange one".to_owned(),
             "pending".into(),
@@ -227,7 +227,7 @@ async fn balance_decreases_while_pending_spending_and_failed() {
         )
         .await
         .expect("valid tx")
-        .id()
+        .revision_id
         .clone();
 
     assert_eq!(
@@ -237,10 +237,11 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
-    ledger
-        .change_status(&tx, "processing".into(), "processing now".to_owned())
+    let new_rev_id = ledger
+        .change_status(&rev_id, "processing".into(), "processing now".to_owned())
         .await
-        .expect("valid tx");
+        .expect("valid tx")
+        .revision_id;
 
     assert_eq!(
         vec![usd.from_human("17").expect("amount"),],
@@ -252,7 +253,11 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert_eq!(
         "Transaction: Error in status: Invalid transition from processing to cancelled".to_owned(),
         ledger
-            .change_status(&tx, "cancelled".into(), "cancelled by user".to_owned())
+            .change_status(
+                &new_rev_id,
+                "cancelled".into(),
+                "cancelled by user".to_owned()
+            )
             .await
             .unwrap_err()
             .to_string()
@@ -266,7 +271,7 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(&tx, "failed".into(), "it has failed".to_owned())
+        .change_status(&new_rev_id, "failed".into(), "it has failed".to_owned())
         .await
         .expect("valid");
 

+ 6 - 5
utxo/src/tests/mod.rs

@@ -1,5 +1,6 @@
 use crate::{
-    storage::sqlite::SQLite, storage::Storage, AccountId, Amount, Error, Ledger, Status, TxId,
+    storage::{sqlite::SQLite, Storage},
+    AccountId, Amount, Error, Ledger, RevId, Status,
 };
 use sqlx::sqlite::SqlitePoolOptions;
 
@@ -37,15 +38,15 @@ pub async fn withdrawal(
     account_id: &AccountId,
     status: Status,
     amount: Amount,
-) -> Result<TxId, Error> {
+) -> Result<RevId, Error> {
     Ok(ledger
         .withdrawal(account_id, amount, status, "Test".to_owned())
         .await?
-        .id()
+        .revision_id
         .clone())
 }
 
-pub async fn deposit<S>(ledger: &Ledger<S>, account_id: &AccountId, amount: Amount) -> TxId
+pub async fn deposit<S>(ledger: &Ledger<S>, account_id: &AccountId, amount: Amount) -> RevId
 where
     S: Storage + Send + Sync,
 {
@@ -53,7 +54,7 @@ where
         .deposit(account_id, amount, "settled".into(), "Test".to_owned())
         .await
         .expect("valid tx")
-        .id()
+        .revision_id
         .clone()
 }
 

+ 3 - 3
utxo/src/tests/tx.rs

@@ -39,11 +39,11 @@ async fn multi_account_transfers() {
         ledger.get_balance(&target).await.expect("balance")
     );
 
-    let exchange_id = ledger
+    let rev_id = ledger
         .get_transactions(&accounts[0], vec![Type::Exchange])
         .await
         .expect("valid")[0]
-        .id()
+        .revision_id
         .clone();
 
     for _ in &accounts {
@@ -52,6 +52,6 @@ async fn multi_account_transfers() {
             .await
             .expect("valid");
         assert_eq!(1, txs.len());
-        assert_eq!(exchange_id, *txs[0].id());
+        assert_eq!(rev_id, txs[0].revision_id);
     }
 }

+ 2 - 2
utxo/src/tests/withdrawal.rs

@@ -174,7 +174,7 @@ async fn cancelled_withdrawal() {
         ledger.get_balance(&fee).await.expect("balance")
     );
 
-    let tx_id = withdrawal(
+    let rev_id = withdrawal(
         &ledger,
         &source,
         "pending".into(),
@@ -198,7 +198,7 @@ async fn cancelled_withdrawal() {
     );
 
     ledger
-        .change_status(&tx_id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(&rev_id, "cancelled".into(), "cancelled by test".to_owned())
         .await
         .expect("valid tx");
 

+ 51 - 36
utxo/src/transaction/mod.rs

@@ -1,7 +1,7 @@
 use crate::{
     config::Config,
     payment::PaymentTo,
-    status::StatusType,
+    status::InternalStatus,
     storage::{Batch, ReceivedPaymentStatus, Storage},
     AccountId, Amount, PaymentFrom, RevId, Status, TxId,
 };
@@ -55,6 +55,10 @@ pub struct Transaction {
     #[serde(rename = "_id")]
     pub id: TxId,
 
+    /// List of all revisions
+    #[serde(rename = "_all_revs")]
+    pub revisions: Vec<RevId>,
+
     /// Revision ID
     #[serde(rename = "_rev")]
     pub revision_id: RevId,
@@ -76,15 +80,16 @@ impl Deref for Transaction {
     }
 }
 
-impl TryFrom<(BaseTx, Revision)> for Transaction {
+impl TryFrom<(Vec<RevId>, BaseTx, Revision)> for Transaction {
     type Error = Error;
 
-    fn try_from(value: (BaseTx, Revision)) -> Result<Self, Self::Error> {
+    fn try_from(value: (Vec<RevId>, BaseTx, Revision)) -> Result<Self, Self::Error> {
         Ok(Self {
-            id: value.0.id()?,
-            revision_id: value.1.rev_id()?,
-            revision: value.1,
-            transaction: value.0,
+            id: value.1.id()?,
+            revisions: value.0,
+            revision_id: value.2.rev_id()?,
+            transaction: value.1,
+            revision: value.2,
         })
     }
 }
@@ -104,10 +109,12 @@ impl Transaction {
             .collect();
 
         let (transaction, revision) = BaseTx::new(spends, creates, reference, typ, status)?;
+        let revision_id = revision.rev_id()?;
 
         Ok(Self {
             id: transaction.id()?,
-            revision_id: revision.rev_id()?,
+            revisions: vec![revision_id.clone()],
+            revision_id,
             revision,
             transaction,
         })
@@ -129,10 +136,12 @@ impl Transaction {
             .collect();
         let (transaction, revision) =
             BaseTx::new(Vec::new(), creates, reference, Type::Deposit, status)?;
+        let revision_id = revision.rev_id()?;
 
         Ok(Self {
             id: transaction.id()?,
-            revision_id: revision.rev_id()?,
+            revisions: vec![revision_id.clone()],
+            revision_id,
             revision,
             transaction,
         })
@@ -148,20 +157,17 @@ impl Transaction {
     ) -> Result<Self, Error> {
         let (transaction, revision) =
             BaseTx::new(spends, Vec::new(), reference, Type::Withdrawal, status)?;
+        let revision_id = revision.rev_id()?;
 
         Ok(Self {
             id: transaction.id()?,
-            revision_id: revision.rev_id()?,
+            revisions: vec![revision_id.clone()],
+            revision_id,
             transaction,
             revision,
         })
     }
 
-    /// Returns the transaction ID
-    pub fn id(&self) -> &TxId {
-        &self.id
-    }
-
     /// Updates the transaction tags
     pub async fn set_tags<S>(
         self,
@@ -180,10 +186,14 @@ impl Transaction {
             status: self.revision.status,
             created_at: Utc::now(),
         };
+        let revision_id = new_revision.rev_id()?;
+        let mut revisions = self.revisions;
+        revisions.push(revision_id.clone());
 
         let mut new_transaction = Transaction {
             id: self.id,
-            revision_id: new_revision.rev_id()?,
+            revisions,
+            revision_id,
             transaction: self.transaction,
             revision: new_revision,
         };
@@ -219,12 +229,16 @@ impl Transaction {
             status: new_status,
             created_at: Utc::now(),
         };
+        let revision_id = new_revision.rev_id()?;
+
         let mut new_transaction = Transaction {
             id: self.id,
-            revision_id: new_revision.rev_id()?,
+            revisions: self.revisions,
+            revision_id: revision_id.clone(),
             transaction: self.transaction,
             revision: new_revision,
         };
+        new_transaction.revisions.push(revision_id);
         new_transaction.persist(config).await?;
         Ok(new_transaction)
     }
@@ -291,25 +305,26 @@ impl Transaction {
             self.store_transaction::<S>(&mut batch).await?;
         }
 
-        let (created_updated, spent_updated) = match config.status.typ(&self.revision.status) {
-            StatusType::Reverted => batch
-                .update_transaction_payments(
-                    &self.id,
-                    ReceivedPaymentStatus::Failed,
-                    ReceivedPaymentStatus::Spendable,
-                )
-                .await
-                .expect("foo0"),
-            StatusType::Spendable => batch
-                .update_transaction_payments(
-                    &self.id,
-                    ReceivedPaymentStatus::Spendable,
-                    ReceivedPaymentStatus::Spent,
-                )
-                .await
-                .expect("foo1"),
-            _ => (self.creates.len(), self.spends.len()),
-        };
+        let (created_updated, spent_updated) =
+            match config.status.internal_type(&self.revision.status) {
+                InternalStatus::Reverted => batch
+                    .update_transaction_payments(
+                        &self.id,
+                        ReceivedPaymentStatus::Failed,
+                        ReceivedPaymentStatus::Spendable,
+                    )
+                    .await
+                    .expect("foo0"),
+                InternalStatus::Spendable => batch
+                    .update_transaction_payments(
+                        &self.id,
+                        ReceivedPaymentStatus::Spendable,
+                        ReceivedPaymentStatus::Spent,
+                    )
+                    .await
+                    .expect("foo1"),
+                _ => (self.creates.len(), self.spends.len()),
+            };
 
         if self.creates.len() != created_updated || self.spends.len() != spent_updated {
             return Err(Error::NoUpdate);