Bladeren bron

Working on change back

Cesar Rodas 1 jaar geleden
bovenliggende
commit
68cd570376

+ 11 - 0
utxo/src/amount.rs

@@ -39,4 +39,15 @@ impl Amount {
     pub fn cents(&self) -> AmountCents {
         self.cents
     }
+
+    pub fn checked_add(&self, other: &Self) -> Option<Self> {
+        if self.asset != other.asset {
+            return None;
+        }
+
+        self.cents.checked_add(other.cents).map(|cents| Self {
+            asset: self.asset,
+            cents,
+        })
+    }
 }

+ 8 - 1
utxo/src/asset.rs

@@ -1,3 +1,4 @@
+use crate::{amount::AmountCents, Amount};
 use serde::{Deserialize, Serialize};
 use std::fmt::Display;
 
@@ -14,7 +15,13 @@ pub type AssetId = u32;
 #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
 pub struct Asset {
     pub id: AssetId,
-    precision: u8,
+    pub(crate) precision: u8,
+}
+
+impl Asset {
+    pub fn new_amount(&self, cents: AmountCents) -> Amount {
+        Amount::new(*self, cents)
+    }
 }
 
 impl Display for Asset {

+ 17 - 6
utxo/src/asset_manager.rs

@@ -3,23 +3,25 @@ use std::{collections::HashMap, sync::Arc};
 
 #[derive(Clone, Debug, Eq, PartialEq)]
 pub struct AssetManager {
-    assets: HashMap<AssetId, AssetDefinition>,
+    assets: Arc<HashMap<AssetId, AssetDefinition>>,
 }
 
 #[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize)]
 pub struct AssetDefinition {
     #[serde(flatten)]
-    asset: Asset,
+    pub asset: Asset,
     name: Arc<str>,
 }
 
 impl AssetManager {
     pub fn new(assets: Vec<AssetDefinition>) -> Self {
         Self {
-            assets: assets
-                .into_iter()
-                .map(|asset| (asset.asset.id, asset))
-                .collect(),
+            assets: Arc::new(
+                assets
+                    .into_iter()
+                    .map(|asset| (asset.asset.id, asset))
+                    .collect(),
+            ),
         }
     }
 
@@ -41,3 +43,12 @@ impl AssetManager {
         self.asset(id).map(|asset| Amount::new(asset, cents))
     }
 }
+
+impl AssetDefinition {
+    pub fn new(id: u32, name: &str, precision: u8) -> Self {
+        Self {
+            asset: Asset { id, precision },
+            name: name.into(),
+        }
+    }
+}

+ 101 - 0
utxo/src/id.rs

@@ -0,0 +1,101 @@
+use serde::Serialize;
+use sha2::{Digest, Sha256};
+use std::fmt::Display;
+
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error("Invalid length for {0}: {1} (expected: {2})")]
+    InvalidLength(String, usize, usize),
+}
+
+macro_rules! Id {
+    ($id:ident, $suffix:expr) => {
+        #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
+        pub struct $id {
+            bytes: [u8; 32],
+        }
+
+        impl $id {
+            pub fn new(bytes: [u8; 32]) -> Self {
+                Self { bytes }
+            }
+        }
+
+        impl std::str::FromStr for $id {
+            type Err = Error;
+            fn from_str(value: &str) -> Result<Self, Self::Err> {
+                Ok(Self::try_from(value).unwrap_or_else(|_| {
+                    let mut hasher = Sha256::new();
+                    hasher.update(&value);
+                    Self {
+                        bytes: hasher.finalize().into(),
+                    }
+                }))
+            }
+        }
+
+        impl TryFrom<&str> for $id {
+            type Error = Error;
+            fn try_from(value: &str) -> Result<Self, Self::Error> {
+                if $suffix.len() + 64 != value.len() {
+                    return Err(Error::InvalidLength(
+                        stringify!($id).to_owned(),
+                        value.len(),
+                        $suffix.len() + 64,
+                    ));
+                }
+
+                if !value.starts_with($suffix) {
+                    return Err(Error::InvalidLength(
+                        stringify!($id).to_owned(),
+                        value.len(),
+                        $suffix.len() + 64,
+                    ));
+                }
+
+                let bytes = hex::decode(&value[$suffix.len()..]).map_err(|_| {
+                    Error::InvalidLength(stringify!($id).to_owned(), value.len(), 32)
+                })?;
+                bytes.try_into()
+            }
+        }
+
+        impl TryFrom<Vec<u8>> for $id {
+            type Error = Error;
+
+            fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
+                if value.len() != 32 {
+                    return Err(Error::InvalidLength(
+                        stringify!($id).to_owned(),
+                        value.len(),
+                        32,
+                    ));
+                }
+                let mut bytes = [0u8; 32];
+                bytes.copy_from_slice(&value);
+                Ok(Self { bytes })
+            }
+        }
+
+        impl Display for $id {
+            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                write!(f, "{}{}", $suffix, hex::encode(self.bytes))
+            }
+        }
+
+        impl AsRef<[u8]> for $id {
+            fn as_ref(&self) -> &[u8] {
+                &self.bytes
+            }
+        }
+
+        impl AsRef<[u8; 32]> for $id {
+            fn as_ref(&self) -> &[u8; 32] {
+                &self.bytes
+            }
+        }
+    };
+}
+
+Id!(AccountId, "account");
+Id!(TransactionId, "tx");

+ 162 - 78
utxo/src/ledger.rs

@@ -1,57 +1,45 @@
 use crate::{
-    amount::AmountCents, storage, AccountId, Amount, AssetManager, Batch, Error, PaymentId,
-    Storage, Transaction,
+    AccountId, Amount, Batch, Error, Payment, Status, Storage, Transaction, TransactionId,
 };
-use futures::TryStreamExt;
-use std::{collections::HashMap, sync::Arc};
+use std::{cmp::Ordering, collections::HashMap};
 
 #[derive(Clone, Debug)]
-pub struct Ledger<B, I, S>
+pub struct Ledger<'a, B, S>
 where
-    B: Batch,
-    I: TryStreamExt<Ok = (PaymentId, AmountCents), Error = storage::Error> + Unpin,
-    S: Storage<B, I>,
+    B: Batch<'a>,
+    S: Storage<'a, B> + Sync + Send,
 {
-    storage: Arc<S>,
-    asset_manager: Arc<AssetManager>,
-    _phantom_batch: std::marker::PhantomData<B>,
-    _phantom_iterator: std::marker::PhantomData<I>,
+    storage: S,
+    _phantom: std::marker::PhantomData<&'a B>,
 }
 
-impl<B, I, S> Ledger<B, I, S>
+impl<'a, B, S> Ledger<'a, B, S>
 where
-    B: Batch,
-    I: TryStreamExt<Ok = (PaymentId, AmountCents), Error = storage::Error> + Unpin,
-    S: Storage<B, I>,
+    B: Batch<'a>,
+    S: Storage<'a, B> + Sync + Send,
 {
-    pub fn new(storage: S, asset_manager: AssetManager) -> Self {
+    pub fn new(storage: S) -> Self {
         Self {
-            storage: Arc::new(storage),
-            asset_manager: Arc::new(asset_manager),
-            _phantom_batch: std::marker::PhantomData,
-            _phantom_iterator: std::marker::PhantomData,
+            storage,
+            _phantom: std::marker::PhantomData,
         }
     }
 
-    pub async fn get_balance(&self, account: AccountId) -> Result<Vec<Amount>, Error> {
-        self.storage
-            .get_balance(account)
-            .await?
-            .into_iter()
-            .map(|(asset, cents)| self.asset_manager.amount(asset, cents))
-            .collect::<Result<Vec<_>, _>>()
-    }
-
-    async fn normalize_payments_to_spend(
+    /// Selects the unspent payments to be used as inputs of the new transaction.
+    ///
+    /// This function also returns a list of transactions that will be used as
+    /// exchanged transactions, to make sure the main transaction doesn't hold
+    /// extra funds, by splitting any large unspent payments into two, one that
+    /// matches exactly the needed amount, and another one that will be used as
+    /// change. These exchange transaction are internal transactions and they
+    /// are created as settled.
+    async fn create_inputs_to_pay_from_accounts(
         &self,
         payments: Vec<(AccountId, Amount)>,
-    ) -> Result<Vec<(AccountId, Amount)>, Error> {
+    ) -> Result<(Vec<Transaction>, Vec<Payment>), Error> {
         let mut to_spend = HashMap::new();
-        let mut balances = HashMap::new();
+
         for (account_id, amount) in payments.into_iter() {
-            if balances.get(&account_id).is_none() {
-                balances.insert(account_id, self.get_balance(account_id).await?);
-            }
             let id = (account_id, *amount.asset());
             if let Some(value) = to_spend.get_mut(&id) {
                 *value += amount.cents();
@@ -60,57 +48,153 @@ where
             }
         }
 
+        let mut change = vec![];
+        let mut payments: Vec<Payment> = vec![];
+
         for ((account, asset), mut to_spend_cents) in to_spend.into_iter() {
-            let mut payment_ids: Vec<PaymentId> = vec![];
-            let mut iterator = self.storage.get_unspend_payments(account, asset.id).await?;
-            while let Some((payment_id, cents)) = iterator.try_next().await? {
-                payment_ids.push(payment_id);
+            let iterator = self
+                .storage
+                .get_unspent_payments(&account, asset.id, to_spend_cents)
+                .await?;
+            for payment in iterator.into_iter() {
+                let cents = payment.amount.cents();
                 to_spend_cents -= cents;
-                if to_spend_cents < 0 {
-                    // We have enough payment_id to to cover the to_spend_cents
-                    // Any negative number will be the return to the account
-                    break;
+                payments.push(payment);
+                match to_spend_cents.cmp(&0) {
+                    Ordering::Equal => {
+                        // No change amount, we are done with this input
+                        break;
+                    }
+                    Ordering::Less => {
+                        // There is a change amount, we need to split the last
+                        // input into two payment_ids into the same accounts in
+                        // a transaction that will settle immediately, otherwise
+                        // the change amount will be unspentable until this
+                        // transaction settles. By doing so the current
+                        // operation will have no change and it can safely take
+                        // its time to settle without making any change amount
+                        // unspentable.
+                        let to_spend_cents = to_spend_cents.abs();
+                        let input = payments
+                            .pop()
+                            .ok_or(Error::InsufficientBalance(account, asset.id))?;
+                        let split_input = Transaction::new(
+                            "Exchange".to_owned(),
+                            // Set the change transaction as settled. This is an
+                            // internal transaction to split an existing payment
+                            // into two. Since this is an internal transaction it
+                            // can be settled immediately.
+                            //
+                            // Because this internal transaction is being settled
+                            // immediately, the other payment can be used right away,
+                            // otherwise it would be locked until the main
+                            // transaction settles.
+                            Status::Settled,
+                            vec![input],
+                            vec![
+                                (account, asset.new_amount(cents - to_spend_cents)),
+                                (account, asset.new_amount(to_spend_cents)),
+                            ],
+                        )
+                        .await?;
+                        // Spend the new payment
+                        payments.push(split_input.created()[0].clone());
+                        // Return the split payment transaction to be executed
+                        // later as a pre-requisite for the new transaction
+                        change.push(split_input);
+
+                        // Go to the next payment input or exit the loop
+                        break;
+                    }
+                    _ => {
+                        // We need more funds, continue to the selecting the
+                        // available payment if any
+                    }
                 }
             }
+
+            if to_spend_cents > 0 {
+                // We don't have enough payment to cover the to_spend_cents
+                // Return an insufficient balance error
+                return Err(Error::InsufficientBalance(account, asset.id));
+            }
         }
 
-        /*
-               to_spend
-                   .into_iter()
-                   .map(|((account_id, asset), cents)| (account_id, Amount::new(asset, cents)))
-                   .collect::<Vec<_>>()
-                   .into_iter()
-                   .map(|(id, to_debit)| {
-                       balances
-                           .get(&id)
-                           .ok_or_else(|| Error::InsufficientBalance(id, to_debit.asset().id))
-                           .map(|balances| {
-                               if let Some(available) = balances
-                                   .iter()
-                                   .find(|asset| asset.asset() == to_debit.asset())
-                                   .map(|amount| amount.cents())
-                               {
-                                   if to_debit.cents() > available {
-                                       Err(Error::InsufficientBalance(id, to_debit.asset().id))
-                                   } else {
-                                       Ok((id, to_debit))
-                                   }
-                               } else {
-                                   Err(Error::InsufficientBalance(id, to_debit.asset().id))
-                               }
-                           })
-                   })
-                   .collect::<Result<Result<Vec<_>, _>, _>>()?
-        */
-        todo!()
+        Ok((change, payments))
     }
 
-    pub async fn new_transaction<'a>(
-        &self,
+    /// Creates a new transaction and returns it.
+    ///
+    /// The input is pretty simple, take this amounts from these given accounts
+    /// and send them to these accounts (and amounts). The job of this function
+    /// is to figure it out how to do it. This function will make sure that each
+    /// account has enough balance, selecting the unspent payments from each
+    /// account that will be spent. It will also return a list of transactions
+    /// that will be used to return the change to the accounts, these accounts
+    /// can be settled immediately so no other funds required to perform the
+    /// transaction are locked.
+    ///
+    /// This functions performs read only operations on top of the storage layer
+    /// and it will guarantee execution (meaning that it will not lock any
+    /// funds, so these transactions may fail if any selected payment is spent
+    /// between the time the transaction is created and executed).
+    ///
+    /// A NewTransaction struct is returned, the change_transactions should be
+    /// executed and set as settled before the transaction is executed,
+    /// otherwise it will fail. A failure in any execution will render the
+    /// entire operation as failed but no funds will be locked.
+    pub async fn new_transaction(
+        &'a self,
+        reference: String,
+        status: Status,
         from: Vec<(AccountId, Amount)>,
         to: Vec<(AccountId, Amount)>,
-    ) -> Result<Transaction<'a, B, I, S>, Error> {
-        let from = self.normalize_payments_to_spend(from).await?;
-        todo!()
+    ) -> Result<Transaction, Error> {
+        let (change_transactions, payments) = self.create_inputs_to_pay_from_accounts(from).await?;
+
+        for mut change_tx in change_transactions.into_iter() {
+            change_tx.persist(&self.storage).await?;
+        }
+
+        let mut transaction = Transaction::new(reference, status, payments, to).await?;
+
+        transaction.persist(&self.storage).await?;
+
+        Ok(transaction)
+    }
+
+    /// Return the balances from a given account
+    ///
+    /// The balance is a vector of Amounts, one for each asset. The balance will
+    /// return only spendable amounts, meaning that any amount that is locked in
+    /// a transaction will not be returned.
+    ///
+    /// TODO: Return locked funds as well.
+    pub async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
+        Ok(self.storage.get_balance(account).await?)
+    }
+
+    pub async fn deposit(
+        &'a self,
+        account: &AccountId,
+        amount: Amount,
+        status: Status,
+        reference: String,
+    ) -> Result<Transaction, Error> {
+        let mut transaction =
+            Transaction::new_external_deposit(reference, status, vec![(*account, amount)])?;
+        transaction.persist(&self.storage).await?;
+        Ok(transaction)
+    }
+
+    pub async fn change_status(
+        &'a self,
+        transaction_id: &TransactionId,
+        new_status: Status,
+    ) -> Result<Transaction, Error> {
+        let mut tx = self.storage.get_transaction(transaction_id).await?;
+        tx.change_status(new_status)?;
+        tx.persist(&self.storage).await?;
+        Ok(tx)
     }
 }

+ 4 - 2
utxo/src/lib.rs

@@ -1,22 +1,24 @@
-mod account_id;
 mod amount;
 mod asset;
 mod asset_manager;
 mod error;
+mod id;
 mod ledger;
 mod payment;
 #[cfg(any(feature = "sqlite", test))]
 mod sqlite;
 mod status;
 mod storage;
+#[cfg(test)]
+mod tests;
 mod transaction;
 
 pub use self::{
-    account_id::AccountId,
     amount::Amount,
     asset::Asset,
     asset_manager::AssetManager,
     error::Error,
+    id::*,
     ledger::Ledger,
     payment::{Payment, PaymentId},
     status::Status,

+ 6 - 4
utxo/src/payment.rs

@@ -1,15 +1,15 @@
-use crate::{AccountId, Amount};
+use crate::{AccountId, Amount, Status, TransactionId};
 use serde::Serialize;
 
 #[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize)]
 pub struct PaymentId {
-    pub transaction: AccountId,
+    pub transaction: TransactionId,
     pub position: usize,
 }
 
 impl ToString for PaymentId {
     fn to_string(&self) -> String {
-        format!("{}:{}", hex::encode(self.transaction), self.position)
+        format!("{}:{}", self.transaction, self.position)
     }
 }
 
@@ -19,5 +19,7 @@ pub struct Payment {
     pub to: AccountId,
     pub amount: Amount,
     #[serde(skip)]
-    pub spent_by: Option<AccountId>,
+    pub status: Status,
+    #[serde(skip)]
+    pub spent_by: Option<TransactionId>,
 }

+ 160 - 0
utxo/src/sqlite/batch.rs

@@ -0,0 +1,160 @@
+use crate::{
+    storage::{self, Error},
+    Payment, PaymentId, Status, Transaction, TransactionId,
+};
+use sqlx::{Row, Sqlite, Transaction as SqlxTransaction};
+use std::marker::PhantomData;
+
+pub struct Batch<'a> {
+    inner: SqlxTransaction<'a, Sqlite>,
+    x: PhantomData<&'a ()>,
+}
+
+impl<'a> Batch<'a> {
+    pub fn new(inner: SqlxTransaction<'a, Sqlite>) -> Batch<'a> {
+        Self {
+            inner,
+            x: PhantomData,
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl<'a> storage::Batch<'a> for Batch<'a> {
+    async fn spend_payment(
+        &mut self,
+        payment_id: PaymentId,
+        status: Status,
+        transaction_id: TransactionId,
+    ) -> Result<(), Error> {
+        let result = sqlx::query(
+            r#"
+                UPDATE payments SET "spent_by" = ?
+                WHERE "transaction_id" = ? AND "position_id" = ? AND ("spent_by" IS NULL OR "spent_by" = ?)
+            "#,
+        )
+        .bind(if status.is_rollback() {
+            None
+        } else {
+            Some(transaction_id.to_string())
+        })
+        .bind(payment_id.transaction.to_string())
+        .bind(payment_id.position.to_string())
+        .bind(transaction_id.to_string())
+        .execute(&mut *self.inner)
+        .await
+        .map_err(|e| Error::SpendPayment(e.to_string()))?;
+        if result.rows_affected() == 1 {
+            Ok(())
+        } else {
+            Err(Error::NoUpdate)
+        }
+    }
+
+    async fn get_payment_status(
+        &mut self,
+        transaction_id: &TransactionId,
+    ) -> Result<Option<Status>, Error> {
+        let row = sqlx::query(
+            r#"
+            SELECT
+                "p"."status"
+            FROM
+                "payments" "p"
+            WHERE
+                "p"."transaction_id" = ?
+            LIMIT 1
+            "#,
+        )
+        .bind(transaction_id.to_string())
+        .fetch_optional(&mut *self.inner)
+        .await
+        .map_err(|e| Error::Storage(e.to_string()))?;
+
+        if let Some(row) = row {
+            let status = row
+                .try_get::<u32, usize>(0)
+                .map_err(|_| Error::Storage("failed to parse status".to_owned()))?;
+
+            status
+                .try_into()
+                .map(|x| Some(x))
+                .map_err(|_| Error::Storage("failed to parse status".to_owned()))
+        } else {
+            return Ok(None);
+        }
+    }
+
+    async fn rollback(self) -> Result<(), Error> {
+        self.inner
+            .rollback()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))
+    }
+
+    async fn commit(self) -> Result<(), Error> {
+        self.inner
+            .commit()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))
+    }
+
+    async fn store_new_payments(&mut self, payments: &[Payment]) -> Result<(), Error> {
+        for payment in payments.iter() {
+            sqlx::query(
+                r#"
+                INSERT INTO payments("transaction_id", "position_id", "to", "cents", "asset_id", "status")
+                VALUES (?, ?, ?, ?, ?, ?)
+                ON CONFLICT("transaction_id", "position_id")
+                    DO UPDATE SET "status" = excluded."status"
+            "#,
+            )
+            .bind(payment.id.transaction.to_string())
+            .bind(payment.id.position.to_string())
+            .bind(payment.to.to_string())
+            .bind(payment.amount.cents().to_string())
+            .bind(payment.amount.asset().id)
+            .bind::<u32>((&payment.status).into())
+            .execute(&mut *self.inner)
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+        }
+        Ok(())
+    }
+
+    async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error> {
+        sqlx::query(
+            r#"
+                INSERT INTO "transactions"("transaction_id", "status", "reference")
+                VALUES(?, ?, ?)
+                ON CONFLICT("transaction_id")
+                    DO UPDATE SET "status" = excluded."status", "reference" = excluded."reference"
+            "#,
+        )
+        .bind(transaction.id.to_string())
+        .bind::<u32>((&transaction.status).into())
+        .bind(transaction.reference.to_string())
+        .execute(&mut *self.inner)
+        .await
+        .map_err(|e| Error::Storage(e.to_string()))?;
+
+        for payment in transaction.spend.iter() {
+            sqlx::query(
+                r#"
+            INSERT INTO "transaction_payments"("transaction_id", "payment_transaction_id", "payment_position_id")
+            VALUES(?, ?, ?)
+            ON CONFLICT("transaction_id", "payment_transaction_id", "payment_position_id")
+                DO NOTHING
+            "#,
+            )
+            .bind(transaction.id.to_string())
+            .bind(payment.id.transaction.to_string())
+            .bind(payment.id.position.to_string())
+            .execute(&mut *self.inner)
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+        }
+
+        Ok(())
+    }
+}

+ 365 - 57
utxo/src/sqlite/mod.rs

@@ -1,81 +1,389 @@
-use std::marker::PhantomData;
-
 use crate::{
-    storage::{self, Error},
-    AccountId, Payment, PaymentId,
+    amount::AmountCents, asset::AssetId, storage::Error, AccountId, Amount, Asset, AssetManager,
+    Payment, PaymentId, Status, Storage, Transaction, TransactionId,
 };
-use sqlx::{Sqlite, Transaction as SqlxTransaction};
+use futures::TryStreamExt;
+use sqlx::{sqlite::SqliteRow, Executor, Row};
+use std::{collections::HashMap, marker::PhantomData};
+
+mod batch;
+
+pub use batch::Batch;
+
+pub struct Sqlite<'a> {
+    db: sqlx::SqlitePool,
+    asset_manager: AssetManager,
+    _phantom: PhantomData<&'a ()>,
+}
+
+impl<'a> Sqlite<'a> {
+    pub fn new(db: sqlx::SqlitePool, asset_manager: AssetManager) -> Self {
+        Self {
+            db,
+            asset_manager,
+            _phantom: PhantomData,
+        }
+    }
+
+    pub async fn setup(&self) -> Result<(), sqlx::Error> {
+        let mut x = self.db.begin().await?;
+        x.execute(
+            r#"CREATE TABLE IF NOT EXISTS "payments" (
+                "transaction_id" VARCHAR(66) NOT NULL,
+                "position_id" INTEGER NOT NULL,
+                "asset_id" TEXT NOT NULL,
+                "cents" TEXT NOT NULL,
+                "status" INTEGER NOT NULL,
+                "to" VARCHAR(71) NOT NULL,
+                "spent_by" TEXT,
+                "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+                "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+                PRIMARY KEY ("transaction_id", "position_id")
+            );
+            CREATE INDEX IF NOT EXISTS payments_to ON payments ("to", "asset_id", "status", "spent_by");
+            CREATE TABLE IF NOT EXISTS "transactions" (
+                "transaction_id" VARCHAR(66) NOT NULL,
+                "status" INTEGER NOT NULL,
+                "reference" TEXT NOT NULL,
+                "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+                "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+                PRIMARY KEY ("transaction_id")
+            );
+            CREATE TABLE IF NOT EXISTS "transaction_payments" (
+                "transaction_id" VARCHAR(66) NOT NULL,
+                "payment_transaction_id" VARCHAR(66) NOT NULL,
+                "payment_position_id"  INTEGER NOT NULL,
+                "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+                "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+                PRIMARY KEY ("transaction_id", "payment_transaction_id", "payment_position_id")
+            );
+        "#,
+        )
+        .await
+        .expect("valid");
+        x.commit().await?;
+        Ok(())
+    }
+
+    #[inline]
+    fn sql_row_to_payment(&self, row: SqliteRow) -> Result<Payment, Error> {
+        let id = PaymentId {
+            transaction: row
+                .try_get::<String, usize>(0)
+                .map_err(|_| Error::Storage("Invalid payment_id".to_string()))?
+                .as_str()
+                .try_into()
+                .map_err(|_| Error::Storage("Invalid transaction_id length".to_string()))?,
+            position: row
+                .try_get::<i64, usize>(1)
+                .map_err(|_| Error::Storage("Invalid payment_id".to_string()))?
+                .try_into()
+                .map_err(|_| Error::Storage("Invalid payment_id".to_string()))?,
+        };
+
+        let cents = row
+            .try_get::<String, usize>(3)
+            .map_err(|_| Error::Storage("Invalid cents".to_string()))?
+            .parse::<i128>()
+            .map_err(|_| Error::Storage("Invalid cents".to_string()))?;
 
-pub struct Batch<'a> {
-    inner: SqlxTransaction<'a, Sqlite>,
-    x: PhantomData<&'a ()>,
+        Ok(Payment {
+            id,
+            amount: self
+                .asset_manager
+                .asset(
+                    row.try_get::<String, usize>(2)
+                        .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?
+                        .parse()
+                        .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?,
+                )
+                .map_err(|e| Error::Storage(e.to_string()))?
+                .new_amount(cents),
+            to: row
+                .try_get::<String, usize>(4)
+                .map_err(|_| Error::Storage("Invalid `to`".to_string()))?
+                .as_str()
+                .try_into()
+                .map_err(|_| Error::Storage("Invalid `to`".to_string()))?,
+            status: row
+                .try_get::<u32, usize>(5)
+                .map_err(|_| Error::Storage("Invalid `status`".to_string()))?
+                .try_into()
+                .map_err(|_| Error::Storage("Invalid status".to_string()))?,
+            spent_by: row
+                .try_get::<Option<String>, usize>(6)
+                .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?
+                .map(|s| s.as_str().try_into())
+                .transpose()
+                .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?,
+        })
+    }
 }
 
 #[async_trait::async_trait]
-impl<'a> storage::Batch for Batch<'a> {
-    async fn spend_payment(
-        &mut self,
-        payment_id: PaymentId,
-        transaction_id: AccountId,
-    ) -> Result<(), Error> {
-        let result = sqlx::query(
+impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
+    async fn begin(&'a self) -> Result<Batch<'a>, Error> {
+        self.db
+            .begin()
+            .await
+            .map(|x| Batch::new(x))
+            .map_err(|x| Error::Storage(x.to_string()))
+    }
+
+    async fn get_payment(&self, id: PaymentId) -> Result<Payment, Error> {
+        let mut conn = self
+            .db
+            .acquire()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+
+        let row = sqlx::query(
             r#"
-                UPDATE payments SET spent_by = ?
-                WHERE transaction_id = ? AND position_id = ? AND spent_by IS NULL
+            SELECT
+                "p"."transaction_id",
+                "p"."position_id",
+                "p"."asset_id",
+                "p"."cents",
+                "p"."to",
+                "p"."status",
+                "p"."spent_by"
+            FROM
+                "payments" "p"
+            WHERE
+                "p"."transaction_id" = ?
+                AND "p"."position_id" = ?
+            LIMIT 1
             "#,
         )
-        .bind(transaction_id.to_string())
-        .bind(payment_id.transaction.to_string())
-        .bind(payment_id.position.to_string())
-        .execute(&mut *self.inner)
+        .bind(id.transaction.to_string())
+        .bind(id.position.to_string())
+        .fetch_optional(&mut *conn)
         .await
-        .map_err(|e| Error::Storage(e.to_string()))?;
-        if result.rows_affected() == 1 {
-            Ok(())
-        } else {
-            Err(Error::NoUpdate)
-        }
+        .map_err(|e| Error::Storage(e.to_string()))?
+        .ok_or(Error::NotFound)?;
+
+        self.sql_row_to_payment(row)
     }
 
-    async fn rollback(self) -> Result<(), Error> {
-        self.inner
-            .rollback()
+    async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
+        let mut conn = self
+            .db
+            .acquire()
             .await
-            .map_err(|e| Error::Storage(e.to_string()))
-    }
+            .map_err(|e| Error::Storage(e.to_string()))?;
 
-    async fn commit(self) -> Result<(), Error> {
-        self.inner
-            .commit()
+        let mut result = sqlx::query(
+            r#"
+            SELECT
+                "asset_id",
+                "cents"
+            FROM
+                "payments"
+            WHERE
+                "to" = ? AND status = ? AND "spent_by" IS NULL
+            "#,
+        )
+        .bind(account.to_string())
+        .bind::<u32>(Status::Settled.into())
+        .fetch(&mut *conn);
+
+        let mut balances = HashMap::<Asset, Amount>::new();
+
+        while let Some(row) = result
+            .try_next()
             .await
-            .map_err(|e| Error::Storage(e.to_string()))
+            .map_err(|e| Error::Storage(e.to_string()))?
+        {
+            let asset = self
+                .asset_manager
+                .asset(
+                    row.try_get::<String, usize>(0)
+                        .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?
+                        .parse()
+                        .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?,
+                )
+                .map_err(|e| Error::Storage(e.to_string()))?;
+
+            let cents = row
+                .try_get::<String, usize>(1)
+                .map_err(|_| Error::Storage("Invalid cents".to_string()))?
+                .parse::<i128>()
+                .map_err(|_| Error::Storage("Invalid cents".to_string()))?;
+
+            let new_amount = asset.new_amount(cents);
+
+            if let Some(amount) = balances.get_mut(&asset) {
+                *amount = amount
+                    .checked_add(&new_amount)
+                    .ok_or(Error::Storage("amount overflow".to_owned()))?;
+            } else {
+                balances.insert(asset, new_amount);
+            }
+        }
+
+        Ok(balances.into_iter().map(|(_, v)| v).collect())
     }
 
-    async fn store_new_payments(&mut self, payments: &[Payment]) -> Result<(), Error> {
-        for payment in payments.iter() {
-            sqlx::query(
-                r#"
-                INSERT INTO payments(transaction_id, position_id, to, amount, asset, spent_by)
-                VALUES (?, ?, ?, ?, ?, ?)
-            "#,
-            )
-            .bind(payment.id.transaction.to_string())
-            .bind(payment.id.position.to_string())
-            .bind(payment.to.to_string())
-            .bind(payment.amount.cents().to_string())
-            .bind(payment.amount.asset().id)
-            .bind(payment.spent_by.map(|id| id.to_string()))
-            .execute(&mut *self.inner)
+    async fn get_unspent_payments(
+        &self,
+        account: &AccountId,
+        asset: AssetId,
+        mut target_amount: AmountCents,
+    ) -> Result<Vec<Payment>, Error> {
+        let mut conn = self
+            .db
+            .acquire()
             .await
             .map_err(|e| Error::Storage(e.to_string()))?;
+        let mut result = sqlx::query(
+            r#"
+            SELECT
+                "p"."transaction_id",
+                "p"."position_id",
+                "p"."asset_id",
+                "p"."cents",
+                "p"."to",
+                "p"."status",
+                "p"."spent_by"
+            FROM
+                "payments" as "p"
+            WHERE
+                "p"."to" = ? AND "p"."asset_id" = ? AND status = ? AND "p"."spent_by" IS NULL
+            ORDER BY cents ASC
+            "#,
+        )
+        .bind(account.to_string())
+        .bind(asset.to_string())
+        .bind::<u32>(Status::Settled.into())
+        .fetch(&mut *conn);
+
+        let mut to_return = vec![];
+        while let Some(row) = result
+            .try_next()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?
+        {
+            let row = self.sql_row_to_payment(row)?;
+            target_amount -= row.amount.cents();
+            to_return.push(row);
+            if target_amount <= 0 {
+                break;
+            }
+        }
+
+        if target_amount <= 0 {
+            Ok(to_return)
+        } else {
+            Err(Error::NotEnoughUnspentPayments(target_amount))
         }
-        Ok(())
     }
 
-    async fn store_transaction(&mut self) -> Result<(), Error> {
-        todo!()
+    async fn get_transaction(&self, transaction_id: &TransactionId) -> Result<Transaction, Error> {
+        let mut conn = self
+            .db
+            .acquire()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+
+        let transaction_row = sqlx::query(
+            r#"
+            SELECT
+                "t"."status",
+                "t"."reference"
+            FROM
+                "transactions" "t"
+            WHERE
+                "t"."transaction_id" = ?
+            "#,
+        )
+        .bind(transaction_id.to_string())
+        .fetch_optional(&mut *conn)
+        .await
+        .map_err(|e| Error::Storage(e.to_string()))?
+        .ok_or(Error::NotFound)?;
+
+        let mut spend_result = sqlx::query(
+            r#"
+            SELECT
+                "p"."transaction_id",
+                "p"."position_id",
+                "p"."asset_id",
+                "p"."cents",
+                "p"."to",
+                "p"."status",
+                "p"."spent_by"
+            FROM
+                "payments" "p"
+            INNER JOIN 
+                "transaction_payments" "tp" 
+                ON (
+                    "tp"."payment_transaction_id" = "p"."transaction_id" 
+                    AND "tp"."payment_position_id" = "p"."position_id"
+                )
+            WHERE
+                "tp"."transaction_id" = ?
+            "#,
+        )
+        .bind(transaction_id.to_string())
+        .fetch(&mut *conn);
+
+        let mut spend = vec![];
+
+        while let Some(row) = spend_result
+            .try_next()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?
+        {
+            spend.push(self.sql_row_to_payment(row)?);
+        }
+
+        drop(spend_result);
+
+        let mut create_result = sqlx::query(
+            r#"
+            SELECT
+                "p"."transaction_id",
+                "p"."position_id",
+                "p"."asset_id",
+                "p"."cents",
+                "p"."to",
+                "p"."status",
+                "p"."spent_by"
+            FROM
+                "payments" "p"
+            WHERE
+                "p"."transaction_id" = ?
+            "#,
+        )
+        .bind(transaction_id.to_string())
+        .fetch(&mut *conn);
+
+        let mut create = vec![];
+
+        while let Some(row) = create_result
+            .try_next()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?
+        {
+            create.push(self.sql_row_to_payment(row)?);
+        }
+
+        let status = transaction_row
+            .try_get::<u32, usize>(0)
+            .map_err(|_| Error::Storage("Invalid status".to_string()))?
+            .try_into()
+            .map_err(|_| Error::Storage("Invalid status".to_string()))?;
+        let reference = transaction_row
+            .try_get::<String, usize>(1)
+            .map_err(|_| Error::Storage("Invalid reference".to_string()))?;
+
+        Ok(Transaction {
+            id: transaction_id.clone(),
+            is_external_deposit: spend.is_empty(),
+            spend,
+            create,
+            status,
+            reference,
+        })
     }
 }
-
-#[cfg(test)]
-mod test {}

+ 58 - 2
utxo/src/status.rs

@@ -37,6 +37,50 @@ impl Default for Status {
     }
 }
 
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error("Invalid status: {0}")]
+    InvalidStatus(u32),
+}
+
+impl TryFrom<u32> for Status {
+    type Error = Error;
+    fn try_from(value: u32) -> Result<Self, Self::Error> {
+        match value {
+            0 => Ok(Self::Pending),
+            10 => Ok(Self::Processing),
+            20 => Ok(Self::Cancelled),
+            30 => Ok(Self::Settled),
+            40 => Ok(Self::Failed),
+            _ => Err(Error::InvalidStatus(value)),
+        }
+    }
+}
+
+impl From<Status> for u32 {
+    fn from(value: Status) -> Self {
+        match value {
+            Status::Pending => 0,
+            Status::Processing => 10,
+            Status::Cancelled => 20,
+            Status::Settled => 30,
+            Status::Failed => 40,
+        }
+    }
+}
+
+impl From<&Status> for u32 {
+    fn from(value: &Status) -> Self {
+        match value {
+            Status::Pending => 0,
+            Status::Processing => 10,
+            Status::Cancelled => 20,
+            Status::Settled => 30,
+            Status::Failed => 40,
+        }
+    }
+}
+
 impl Status {
     /// Checks if the current status can transition to the new state.
     pub fn can_transition_to(&self, new_status: &Status) -> bool {
@@ -46,13 +90,20 @@ impl Status {
             match self {
                 Self::Pending => true,
                 Self::Processing => {
-                    matches!(new_status, Self::Cancelled | Self::Settled | Self::Failed)
+                    matches!(new_status, Self::Settled | Self::Failed)
                 }
                 _ => false,
             }
         }
     }
 
+    /// Checks if the current status is a rollback operation.
+    ///
+    /// In a rollback operation any previously spent payments are released by the storage layer.
+    pub fn is_rollback(&self) -> bool {
+        matches!(self, Self::Cancelled | Self::Failed)
+    }
+
     /// Checks if the transaction status is finalized
     pub fn is_finalized(&self) -> bool {
         matches!(self, Self::Cancelled | Self::Settled | Self::Failed)
@@ -72,6 +123,7 @@ mod test {
         assert!(status.can_transition_to(&Status::Settled));
         assert!(status.can_transition_to(&Status::Failed));
         assert!(!status.is_finalized());
+        assert!(!status.is_rollback());
     }
 
     #[test]
@@ -79,10 +131,11 @@ mod test {
         let status = Status::Processing;
         assert!(!status.can_transition_to(&Status::Pending));
         assert!(!status.can_transition_to(&Status::Processing));
-        assert!(status.can_transition_to(&Status::Cancelled));
+        assert!(!status.can_transition_to(&Status::Cancelled));
         assert!(status.can_transition_to(&Status::Settled));
         assert!(status.can_transition_to(&Status::Failed));
         assert!(!status.is_finalized());
+        assert!(!status.is_rollback());
     }
 
     #[test]
@@ -94,6 +147,7 @@ mod test {
         assert!(!status.can_transition_to(&Status::Settled));
         assert!(!status.can_transition_to(&Status::Failed));
         assert!(status.is_finalized());
+        assert!(status.is_rollback());
     }
 
     #[test]
@@ -105,6 +159,7 @@ mod test {
         assert!(!status.can_transition_to(&Status::Settled));
         assert!(!status.can_transition_to(&Status::Failed));
         assert!(status.is_finalized());
+        assert!(!status.is_rollback());
     }
 
     #[test]
@@ -116,5 +171,6 @@ mod test {
         assert!(!status.can_transition_to(&Status::Settled));
         assert!(!status.can_transition_to(&Status::Failed));
         assert!(status.is_finalized());
+        assert!(status.is_rollback());
     }
 }

+ 44 - 11
utxo/src/storage.rs

@@ -1,43 +1,76 @@
-use crate::{amount::AmountCents, asset::AssetId, AccountId, Payment, PaymentId};
-use futures::TryStreamExt;
+use crate::{
+    amount::AmountCents, asset::AssetId, AccountId, Amount, Payment, PaymentId, Status,
+    Transaction, TransactionId,
+};
 
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
     #[error("Storage error: {0}")]
     Storage(String),
 
+    #[error("Spend payment: {0}")]
+    SpendPayment(String),
+
     #[error("No storage update when expecting")]
     NoUpdate,
+
+    #[error("Record not found")]
+    NotFound,
+
+    /// TODO: Convert the AmountCents to Amount for better error reporting upstream
+    #[error("Not enough unspent payments (missing {0} cents)")]
+    NotEnoughUnspentPayments(AmountCents),
 }
 
 #[async_trait::async_trait]
-pub trait Batch {
+pub trait Batch<'a> {
     async fn spend_payment(
         &mut self,
         payment_id: PaymentId,
-        transaction_id: AccountId,
+        status: Status,
+        transaction_id: TransactionId,
     ) -> Result<(), Error>;
 
     async fn rollback(self) -> Result<(), Error>;
 
+    async fn get_payment_status(
+        &mut self,
+        transaction_id: &TransactionId,
+    ) -> Result<Option<Status>, Error>;
+
     async fn commit(self) -> Result<(), Error>;
 
     async fn store_new_payments(&mut self, outputs: &[Payment]) -> Result<(), Error>;
 
-    async fn store_transaction(&mut self) -> Result<(), Error>;
+    async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error>;
 }
 
 #[async_trait::async_trait]
-pub trait Storage<B, I>
+pub trait Storage<'a, B>
 where
-    B: Batch,
-    I: TryStreamExt<Ok = (PaymentId, AmountCents), Error = Error> + Unpin,
+    B: Batch<'a>,
 {
+    async fn begin(&'a self) -> Result<B, Error>;
+
     async fn get_payment(&self, id: PaymentId) -> Result<Payment, Error>;
 
-    async fn begin(&self) -> Result<B, Error>;
+    async fn get_unspent_payment(&self, id: PaymentId) -> Result<Payment, Error> {
+        let payment = self.get_payment(id).await?;
+        if payment.spent_by.is_some() {
+            Err(Error::NotFound)
+        } else {
+            Ok(payment)
+        }
+    }
+
+    async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error>;
 
-    async fn get_balance(&self, account: AccountId) -> Result<Vec<(AssetId, AmountCents)>, Error>;
+    async fn get_unspent_payments(
+        &self,
+        account: &AccountId,
+        asset: AssetId,
+        target_amount: AmountCents,
+    ) -> Result<Vec<Payment>, Error>;
 
-    async fn get_unspend_payments(&self, account: AccountId, asset: AssetId) -> Result<I, Error>;
+    async fn get_transaction(&self, transaction_id: &TransactionId) -> Result<Transaction, Error>;
 }

+ 240 - 0
utxo/src/tests/deposit.rs

@@ -0,0 +1,240 @@
+use crate::{
+    sqlite::{Batch, Sqlite},
+    tests::get_instance,
+    AccountId, Amount, Ledger, Status, TransactionId,
+};
+
+pub async fn deposit(
+    ledger: &Ledger<'static, Batch<'static>, Sqlite<'static>>,
+    account_id: &AccountId,
+    amount: Amount,
+) -> TransactionId {
+    ledger
+        .deposit(account_id, amount, Status::Settled, "Test".to_owned())
+        .await
+        .expect("valid tx")
+        .id
+}
+
+#[tokio::test]
+async fn deposit_and_transfer() {
+    let source = "account1".parse::<AccountId>().expect("account");
+    let dest = "account2".parse::<AccountId>().expect("account");
+    let fee = "fee".parse::<AccountId>().expect("account");
+    let (assets, ledger) = get_instance().await;
+
+    deposit(&ledger, &source, assets.amount(2, 1000).expect("amount")).await;
+    deposit(&ledger, &source, assets.amount(2, 2000).expect("amount")).await;
+
+    assert_eq!(
+        vec![assets.amount(2, 3000).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+
+    ledger
+        .new_transaction(
+            "Exchange one".to_owned(),
+            Status::Settled,
+            vec![(source.clone(), assets.amount(2, 1300).expect("amount"))],
+            vec![
+                (dest.clone(), assets.amount(2, 1250).expect("amount")),
+                (fee.clone(), assets.amount(2, 50).expect("amount")),
+            ],
+        )
+        .await
+        .expect("valid tx");
+
+    assert_eq!(
+        vec![assets.amount(2, 1700).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert_eq!(
+        vec![assets.amount(2, 1250).expect("amount")],
+        ledger.get_balance(&dest).await.expect("balance")
+    );
+    assert_eq!(
+        vec![assets.amount(2, 50).expect("amount")],
+        ledger.get_balance(&fee).await.expect("balance")
+    );
+}
+
+#[tokio::test]
+async fn balance_decreases_while_pending_spending_and_confirm() {
+    let source = "account1".parse::<AccountId>().expect("account");
+    let dest = "account2".parse::<AccountId>().expect("account");
+    let fee = "fee".parse::<AccountId>().expect("account");
+    let (assets, ledger) = get_instance().await;
+
+    deposit(&ledger, &source, assets.amount(2, 1000).expect("amount")).await;
+    deposit(&ledger, &source, assets.amount(2, 2000).expect("amount")).await;
+
+    assert_eq!(
+        vec![assets.amount(2, 3000).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+
+    let id = ledger
+        .new_transaction(
+            "Exchange one".to_owned(),
+            Status::Pending,
+            vec![(source.clone(), assets.amount(2, 1300).expect("amount"))],
+            vec![
+                (dest.clone(), assets.amount(2, 1250).expect("amount")),
+                (fee.clone(), assets.amount(2, 50).expect("amount")),
+            ],
+        )
+        .await
+        .expect("valid tx")
+        .id;
+
+    assert_eq!(
+        vec![assets.amount(2, 1700).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
+    assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
+
+    ledger
+        .change_status(&id, Status::Settled)
+        .await
+        .expect("valid tx");
+
+    assert_eq!(
+        vec![assets.amount(2, 1700).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert_eq!(
+        vec![assets.amount(2, 1250).expect("amount")],
+        ledger.get_balance(&dest).await.expect("balance")
+    );
+    assert_eq!(
+        vec![assets.amount(2, 50).expect("amount")],
+        ledger.get_balance(&fee).await.expect("balance")
+    );
+}
+
+#[tokio::test]
+async fn balance_decreases_while_pending_spending_and_cancel() {
+    let source = "account1".parse::<AccountId>().expect("account");
+    let dest = "account2".parse::<AccountId>().expect("account");
+    let fee = "fee".parse::<AccountId>().expect("account");
+    let (assets, ledger) = get_instance().await;
+
+    deposit(&ledger, &source, assets.amount(2, 1000).expect("amount")).await;
+    deposit(&ledger, &source, assets.amount(2, 2000).expect("amount")).await;
+
+    assert_eq!(
+        vec![assets.amount(2, 3000).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+
+    let id = ledger
+        .new_transaction(
+            "Exchange one".to_owned(),
+            Status::Pending,
+            vec![(source.clone(), assets.amount(2, 1300).expect("amount"))],
+            vec![
+                (dest.clone(), assets.amount(2, 1250).expect("amount")),
+                (fee.clone(), assets.amount(2, 50).expect("amount")),
+            ],
+        )
+        .await
+        .expect("valid tx")
+        .id;
+
+    assert_eq!(
+        vec![assets.amount(2, 1700).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
+    assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
+
+    ledger
+        .change_status(&id, Status::Cancelled)
+        .await
+        .expect("valid tx");
+
+    assert_eq!(
+        vec![assets.amount(2, 3000).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
+    assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
+}
+
+#[tokio::test]
+async fn balance_decreases_while_pending_spending_and_failed() {
+    let source = "account1".parse::<AccountId>().expect("account");
+    let dest = "account2".parse::<AccountId>().expect("account");
+    let fee = "fee".parse::<AccountId>().expect("account");
+    let (assets, ledger) = get_instance().await;
+
+    deposit(&ledger, &source, assets.amount(2, 1000).expect("amount")).await;
+    deposit(&ledger, &source, assets.amount(2, 2000).expect("amount")).await;
+
+    assert_eq!(
+        vec![assets.amount(2, 3000).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+
+    let id = ledger
+        .new_transaction(
+            "Exchange one".to_owned(),
+            Status::Pending,
+            vec![(source.clone(), assets.amount(2, 1300).expect("amount"))],
+            vec![
+                (dest.clone(), assets.amount(2, 1250).expect("amount")),
+                (fee.clone(), assets.amount(2, 50).expect("amount")),
+            ],
+        )
+        .await
+        .expect("valid tx")
+        .id;
+
+    assert_eq!(
+        vec![assets.amount(2, 1700).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
+    assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
+
+    ledger
+        .change_status(&id, Status::Processing)
+        .await
+        .expect("valid tx");
+
+    assert_eq!(
+        vec![assets.amount(2, 1700).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
+    assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
+
+    assert_eq!(
+        "Transaction: Status transition from Processing to Cancelled is not allowed".to_owned(),
+        ledger
+            .change_status(&id, Status::Cancelled)
+            .await
+            .unwrap_err()
+            .to_string()
+    );
+
+    assert_eq!(
+        vec![assets.amount(2, 1700).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
+    assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
+
+    ledger
+        .change_status(&id, Status::Failed)
+        .await
+        .expect("valid");
+
+    assert_eq!(
+        vec![assets.amount(2, 3000).expect("amount")],
+        ledger.get_balance(&source).await.expect("balance")
+    );
+    assert!(ledger.get_balance(&dest).await.expect("balance").is_empty());
+    assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
+}

+ 31 - 0
utxo/src/tests/mod.rs

@@ -0,0 +1,31 @@
+use crate::{
+    asset_manager::AssetDefinition,
+    sqlite::{Batch, Sqlite},
+    AssetManager, Ledger,
+};
+use sqlx::sqlite::SqlitePoolOptions;
+
+pub async fn get_instance() -> (
+    AssetManager,
+    Ledger<'static, Batch<'static>, Sqlite<'static>>,
+) {
+    let pool = SqlitePoolOptions::new()
+        .max_connections(1)
+        .idle_timeout(None)
+        .max_lifetime(None)
+        .connect(":memory:")
+        //.connect("sqlite:///tmp/test.db")
+        .await
+        .expect("pool");
+
+    let assets = AssetManager::new(vec![
+        AssetDefinition::new(1, "BTC", 8),
+        AssetDefinition::new(2, "USD", 4),
+    ]);
+
+    let db = Sqlite::new(pool, assets.clone());
+    db.setup().await.expect("setup");
+    (assets, Ledger::new(db))
+}
+
+mod deposit;

+ 6 - 0
utxo/src/transaction/error.rs

@@ -5,6 +5,9 @@ pub enum Error {
     #[error("Payment {0} is already spent")]
     SpentPayment(usize),
 
+    #[error("Payment {0} is in the incorrect status {1}")]
+    InvalidPaymentStatus(usize, Status),
+
     #[error(
         "Payment {0} is not a valid amount. Spending {1} and issuing {2}. Both amounts must match"
     )]
@@ -28,6 +31,9 @@ pub enum Error {
     #[error("Storage: {0}")]
     Storage(#[from] storage::Error),
 
+    #[error("Internal error at serializing: {0}")]
+    Internal(#[from] Box<bincode::ErrorKind>),
+
     #[error("Overflow")]
     Overflow,
 }

+ 130 - 56
utxo/src/transaction/mod.rs

@@ -1,7 +1,6 @@
 use crate::{
-    amount::AmountCents, storage, AccountId, Asset, Batch, Payment, PaymentId, Status, Storage,
+    amount::AmountCents, AccountId, Amount, Asset, Batch, Payment, Status, Storage, TransactionId,
 };
-use futures::{future::join_all, TryStreamExt};
 use sha2::{Digest, Sha256};
 use std::collections::HashMap;
 
@@ -35,60 +34,130 @@ pub use error::Error;
 /// either as settled, cancelled or failed. A higher layer should split any
 /// available payment to be spend into a new transaction, and then finalize the
 /// transaction, and reserve only the exact amount to be spent, otherwise
-/// unrelated funds will be held unspendable until the transaction is finalized.
+/// unrelated funds will be held unspentable until the transaction is finalized.
 #[derive(Debug, Clone)]
-pub struct Transaction<'a, B, I, S>
-where
-    B: Batch,
-    I: TryStreamExt<Ok = (PaymentId, AmountCents), Error = storage::Error> + Unpin,
-    S: Storage<B, I>,
-{
-    storage: &'a S,
-    id: Option<AccountId>,
-    spend: Vec<Payment>,
-    create: Vec<Payment>,
-    status: Status,
-    _phantom_batch: std::marker::PhantomData<B>,
-    _phantom_iterator: std::marker::PhantomData<I>,
+pub struct Transaction {
+    pub(crate) id: TransactionId,
+    pub(crate) spend: Vec<Payment>,
+    #[allow(dead_code)]
+    pub(crate) reference: String,
+    pub(crate) create: Vec<Payment>,
+    pub(crate) status: Status,
+    pub(crate) is_external_deposit: bool,
 }
 
-impl<'a, B, I, S> Transaction<'a, B, I, S>
-where
-    B: Batch,
-    I: TryStreamExt<Ok = (PaymentId, AmountCents), Error = storage::Error> + Unpin,
-    S: Storage<B, I>,
-{
+impl Transaction {
+    pub fn new_external_deposit(
+        reference: String,
+        status: Status,
+        pay_to: Vec<(AccountId, Amount)>,
+    ) -> Result<Transaction, Error> {
+        let mut hasher = Sha256::new();
+        for (account, amount) in pay_to.iter() {
+            hasher.update(&bincode::serialize(&(account, amount))?);
+        }
+
+        let id = TransactionId::new(hasher.finalize().into());
+
+        Ok(Self {
+            id,
+            spend: vec![],
+            reference,
+            create: pay_to
+                .into_iter()
+                .enumerate()
+                .map(|(position, (to, amount))| Payment {
+                    id: crate::PaymentId {
+                        transaction: id,
+                        position,
+                    },
+                    to,
+                    amount,
+                    spent_by: None,
+                    status: status.clone(),
+                })
+                .collect(),
+            is_external_deposit: true,
+            status,
+        })
+    }
+
     pub async fn new(
-        storage: &'a S,
-        input_ids: Vec<PaymentId>,
-    ) -> Result<Transaction<'a, B, I, S>, Error> {
-        let futures = input_ids
-            .into_iter()
-            .map(|id| storage.get_payment(id))
-            .collect::<Vec<_>>();
-        let spend = join_all(futures)
-            .await
-            .into_iter()
-            .collect::<Result<Vec<Payment>, _>>()?;
+        reference: String,
+        status: Status,
+        spend: Vec<Payment>,
+        pay_to: Vec<(AccountId, Amount)>,
+    ) -> Result<Transaction, Error> {
+        let mut hasher = Sha256::new();
+        for input in spend.iter() {
+            hasher.update(&bincode::serialize(&input.id)?);
+        }
+
+        for (account, amount) in pay_to.iter() {
+            hasher.update(&bincode::serialize(&(account, amount))?);
+        }
+
+        let id = TransactionId::new(hasher.finalize().into());
+
         for (i, input) in spend.iter().enumerate() {
-            if input.spent_by.is_some() {
+            if input.spent_by.is_some() && input.spent_by != Some(id) {
                 return Err(Error::SpentPayment(i));
             }
+            if input.spent_by.is_none() && input.status != Status::Settled {
+                return Err(Error::InvalidPaymentStatus(i, input.status.clone()));
+            }
         }
 
         Ok(Self {
-            storage,
-            id: None,
-            spend,
-            status: Status::default(),
-            create: Vec::new(),
-            _phantom_batch: std::marker::PhantomData,
-            _phantom_iterator: std::marker::PhantomData,
+            id,
+            reference,
+            spend: spend
+                .into_iter()
+                .map(|mut input| {
+                    input.spent_by = Some(id);
+                    input
+                })
+                .collect(),
+            create: pay_to
+                .into_iter()
+                .enumerate()
+                .map(|(position, (to, amount))| Payment {
+                    id: crate::PaymentId {
+                        transaction: id,
+                        position,
+                    },
+                    to,
+                    amount,
+                    spent_by: None,
+                    status: status.clone(),
+                })
+                .collect(),
+            is_external_deposit: false,
+            status,
         })
     }
 
+    pub async fn settle<'a, B, S>(&mut self, storage: &'a S) -> Result<(), Error>
+    where
+        B: Batch<'a>,
+        S: Storage<'a, B> + Sync + Send,
+    {
+        self.change_status(Status::Settled)?;
+        self.persist::<B, S>(storage).await
+    }
+
+    #[inline]
     pub fn change_status(&mut self, new_status: Status) -> Result<(), Error> {
         if self.status.can_transition_to(&new_status) {
+            self.spend.iter_mut().for_each(|payment| {
+                payment.status = new_status.clone();
+                if new_status.is_rollback() {
+                    payment.spent_by = None;
+                }
+            });
+            self.create.iter_mut().for_each(|payment| {
+                payment.status = new_status.clone();
+            });
             self.status = new_status;
             Ok(())
         } else {
@@ -100,12 +169,15 @@ where
     }
 
     fn validate(&mut self) -> Result<(), Error> {
-        let mut sha256 = Sha256::new();
+        if self.is_external_deposit {
+            return Ok(());
+        }
+
         let mut debit = HashMap::<Asset, AmountCents>::new();
         let mut credit = HashMap::<Asset, AmountCents>::new();
 
         for (i, input) in self.spend.iter().enumerate() {
-            if input.spent_by.is_some() {
+            if input.spent_by.is_some() && input.spent_by != Some(self.id) {
                 return Err(Error::SpentPayment(i));
             }
             if let Some(value) = debit.get_mut(input.amount.asset()) {
@@ -117,8 +189,6 @@ where
             } else {
                 debit.insert(*input.amount.asset(), input.amount.cents());
             }
-            let bytes: Vec<u8> = bincode::serialize(&input).unwrap();
-            sha256.update(&bytes);
         }
 
         for (i, output) in self.create.iter().enumerate() {
@@ -134,8 +204,6 @@ where
             } else {
                 credit.insert(*output.amount.asset(), output.amount.cents());
             }
-            let bytes: Vec<u8> = bincode::serialize(&output).unwrap();
-            sha256.update(&bytes);
         }
 
         for (asset, credit_amount) in credit.into_iter() {
@@ -152,7 +220,6 @@ where
             return Err(Error::MissingPaymentAsset(asset));
         }
 
-        self.id = Some(AccountId(sha256.finalize().into()));
         Ok(())
     }
 
@@ -164,19 +231,26 @@ where
         &self.create
     }
 
-    pub async fn persist(mut self) -> Result<Transaction<'a, B, I, S>, Error> {
-        if self.status.is_finalized() {
-            return Err(Error::TransactionUpdatesNotAllowed);
+    pub async fn persist<'a, B, S>(&mut self, storage: &'a S) -> Result<(), Error>
+    where
+        B: Batch<'a>,
+        S: Storage<'a, B> + Sync + Send,
+    {
+        let mut batch = storage.begin().await?;
+        if let Some(status) = batch.get_payment_status(&self.id).await? {
+            if status.is_finalized() {
+                return Err(Error::TransactionUpdatesNotAllowed);
+            }
         }
         self.validate()?;
-        let mut batch = self.storage.begin().await?;
+        batch.store_transaction(self).await?;
         batch.store_new_payments(&self.create).await?;
-        let id = self.id.as_ref().unwrap();
         for input in self.spend.iter_mut() {
-            batch.spend_payment(input.id, *id).await?;
-            input.spent_by = Some(*id);
+            batch
+                .spend_payment(input.id, self.status.clone(), self.id)
+                .await?;
         }
         batch.commit().await?;
-        Ok(self)
+        Ok(())
     }
 }