use crate::{ amount::AmountCents, broadcaster::Broadcaster, config::Config, status::{InternalStatus, StatusManager}, storage::{Batch, ReceivedPaymentStatus, Storage}, transaction::Error as TxError, transaction::Type, worker::WorkerManager, AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction, TxId, }; use std::{cmp::Ordering, collections::HashMap, sync::Arc}; use tokio::sync::mpsc::{self, Receiver, Sender}; /// The Verax ledger #[derive(Debug)] pub struct Ledger where S: Storage + Sync + Send, { config: Config, broadcaster: WorkerManager, } impl Ledger where S: Storage + Sync + Send, { /// Creates a new ledger instance pub fn new(config: Config) -> Arc { Arc::new(Self { config, broadcaster: WorkerManager::new(Broadcaster::default()), }) } /// Subscribes to new transactions and revisions with a given filter pub async fn subscribe(&self, filter: Filter) -> (Sender, Receiver) { let (sender, receiver) = mpsc::channel(100); self.subscribe_with_sender(filter, sender.clone()).await; (sender, receiver) } /// Subscribe to new transactions and revisions with a given filter and a receiver. pub async fn subscribe_with_sender(&self, filter: Filter, sender: Sender) { self.broadcaster.subscribe(filter, sender).await; } /// Returns the total number of active subscribers pub async fn subscribers(&self) -> usize { self.broadcaster.subscribers().await } /// The internal usage is to select unspent payments for each account to create new /// transactions. The external API however does not expose that level of usage, instead it /// exposes a simple API to move funds using accounts to debit from and accounts to credit to. A /// single transaction can use multiple accounts to debit and credit, instead of a single /// account. /// /// This function selects the unspent payments to be used in a transaction, in a descending /// order (making sure to include any negative deposit). /// /// This function returns a vector of payments to be used as inputs and optionally a dependent /// transaction to be executed first. This transaction is an internal transaction and it settles /// immediately. It is used to split an existing payment into two payments, one to be used as /// input and the other to be used as change. This is done to avoid locking any change amount /// until the main transaction settles. async fn select_payments_from_accounts( &self, payments: Vec<(AccountId, Amount)>, ) -> Result<(Option, Vec), Error> { let mut to_spend = HashMap::<_, AmountCents>::new(); for (account_id, amount) in payments.into_iter() { let id = (account_id, amount.asset().clone()); if let Some(value) = to_spend.get_mut(&id) { *value = value .checked_add(amount.cents()) .ok_or(Error::Overflow("cents".to_owned()))?; } else { to_spend.insert(id, amount.cents()); } } let mut change_input = vec![]; let mut change_output = vec![]; let mut payments: Vec = vec![]; for ((account, asset), mut to_spend_cents) in to_spend.into_iter() { let iterator = self .config .storage .get_unspent_payments(&account, &asset, to_spend_cents) .await?; for payment in iterator.into_iter() { let cents = payment.amount.cents(); to_spend_cents = to_spend_cents .checked_sub(cents) .ok_or(Error::Underflow("cents".to_owned()))?; payments.push(payment); match to_spend_cents.cmp(&0) { Ordering::Equal => { // No change amount, we are done with this input break; } Ordering::Less => { // There is a change amount, we need to split the last // input into two payment_ids into the same accounts in // a transaction that will settle immediately, otherwise // the change amount will be unspendable until this // transaction settles. By doing so the current // operation will have no change and it can safely take // its time to settle without making any change amount // unspendable. let to_spend_cents = to_spend_cents.abs(); let input = payments .pop() .ok_or(Error::InsufficientBalance(account.clone(), asset.clone()))?; change_input.push(input); change_output.push(( account.clone(), asset.new_amount( cents .checked_sub(to_spend_cents) .ok_or(Error::Underflow("change cents".to_owned()))?, ), )); change_output.push((account.clone(), asset.new_amount(to_spend_cents))); // Go to the next payment break; } _ => { // We need more funds, continue to the selecting the // available payment if any } } } if to_spend_cents > 0 { // We don't have enough payment to cover the to_spend_cents // Return an insufficient balance error return Err(Error::InsufficientBalance(account, asset.clone())); } } let exchange_tx = if change_input.is_empty() { None } else { let total = u16::try_from(change_input.len()).map_err(|e| Error::Overflow(e.to_string()))?; let split_input = Transaction::new( "Exchange transaction".to_owned(), // Set the change transaction as settled. This is an // internal transaction to split existing payments // into exact new payments, so the main transaction has no // change. self.config.status.default_spendable(), Type::Exchange, change_input, change_output, ) .await?; let creates = &split_input.creates; for i in 0..total { // Spend the new payment let index = i .checked_mul(2) .ok_or(Error::Overflow("index overflow".to_owned()))?; let uindex: usize = index.into(); payments.push(PaymentFrom { id: PaymentId { transaction: split_input.id.clone(), position: index, }, from: creates[uindex].to.clone(), amount: creates[uindex].amount.clone(), }); } Some(split_input) }; Ok((exchange_tx, payments)) } #[inline] /// Persist a new base transaction /// /// This operation should only happen once, because if it is executed multiple times the storage /// layer should fail. Base transactions are not allowed to be ammened, only revisions. async fn store_base_transaction( transaction: &Transaction, batch: &mut S::Batch<'_>, ) -> Result<(), Error> { let spends = transaction .spends .iter() .map(|x| x.id.clone()) .collect::>(); batch .spend_payments( &transaction.revision.transaction_id, spends, ReceivedPaymentStatus::Locked, ) .await?; batch .create_payments( &transaction.revision.transaction_id, &transaction.creates, ReceivedPaymentStatus::Locked, ) .await?; for account in transaction.accounts() { batch .relate_account_to_transaction( &transaction.revision.transaction_id, &account, transaction.typ, ) .await?; } batch .store_base_transaction( &transaction.revision.transaction_id, &transaction.transaction, ) .await?; Ok(()) } /// Stores the current transaction object to the storage layer. /// /// This method is not idempotent, and it will fail if the transaction if the requested update /// is not allowed. /// /// This function will store the base transaction if it is the first revision, and will create a /// new revision otherwise. pub async fn store(&self, transaction: Transaction) -> Result { transaction.validate()?; let mut batch = self.config.storage.begin().await?; if transaction.revision.previous.is_none() { Self::store_base_transaction(&transaction, &mut batch).await?; } let (created_updated, spent_updated) = match self .config .status .internal_type(&transaction.revision.status) { InternalStatus::Reverted => { batch .update_transaction_payments( &transaction.id, ReceivedPaymentStatus::Failed, ReceivedPaymentStatus::Spendable, ) .await? } InternalStatus::Spendable => { batch .update_transaction_payments( &transaction.id, ReceivedPaymentStatus::Spendable, ReceivedPaymentStatus::Spent, ) .await? } _ => (transaction.creates.len(), transaction.spends.len()), }; if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated { return Err(Error::Transaction(TxError::NoUpdate)); } if self .config .status .is_spendable(&transaction.revision.status) { batch .update_transaction_payments( &transaction.id, ReceivedPaymentStatus::Spendable, ReceivedPaymentStatus::Spent, ) .await?; } batch .store_revision(&transaction.revision_id, &transaction.revision) .await?; batch .tag_transaction( &transaction.id, &transaction.transaction, &transaction.revision.tags, ) .await?; batch .update_transaction_revision( &transaction.id, &transaction.revision_id, transaction.revision.previous.as_ref(), ) .await?; batch.commit().await?; // The transaction is persisted and now it is time to broadcast it to any possible listener self.broadcaster.process(transaction.clone()); Ok(transaction) } /// Creates a new transaction and returns it. /// /// The input is pretty simple, take this amounts from these given accounts /// and send them to these accounts (and amounts). The job of this function /// is to figure it out how to do it. This function will make sure that each /// account has enough balance, selecting the unspent payments from each /// account that will be spent. It will also return a list of transactions /// that will be used to return the change to the accounts, these accounts /// can be settled immediately so no other funds required to perform the /// transaction are locked. /// /// This functions performs read only operations on top of the storage layer /// and it will guarantee execution (meaning that it will not lock any /// funds, so these transactions may fail if any selected payment is spent /// between the time the transaction is created and executed). /// /// A NewTransaction struct is returned, the change_transactions should be /// executed and set as settled before the transaction is executed, /// otherwise it will fail. A failure in any execution will render the /// entire operation as failed but no funds will be locked. pub async fn new_transaction( &self, reference: String, status: Status, from: Vec<(AccountId, Amount)>, to: Vec<(AccountId, Amount)>, ) -> Result { let (change_transaction, payments) = self.select_payments_from_accounts(from).await?; if let Some(change_tx) = change_transaction { self.store(change_tx).await?; } self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?) .await } /// Return the balances from a given account /// /// The balance is a vector of Amounts, one for each asset. The balance will /// return only spendable amounts, meaning that any amount that is locked in /// a transaction will not be returned. /// /// TODO: Return locked funds as well. pub async fn get_balance(&self, account: &AccountId) -> Result, Error> { Ok(self.config.storage.get_balance(account).await?) } /// Creates an external deposit /// /// Although a deposit can have multiple output payments, to different /// accounts and amounts, to keep the upstream API simple, this function /// only accepts a single account and amount to credit pub async fn deposit( &self, account: &AccountId, amount: Amount, status: Status, tags: Vec, reference: String, ) -> Result { self.store(Transaction::new_external_deposit( reference, status, tags, vec![(account.clone(), amount)], )?) .await } /// Creates a new withdrawal transaction and returns it. /// /// Although a transaction supports multiple inputs to be burned, from /// different accounts, to keep things simple, this function only supports a /// single input (single account and single amount). This is because the /// natural behaviour is to have withdrawals from a single account. pub async fn withdrawal( &self, account: &AccountId, amount: Amount, status: Status, reference: String, ) -> Result { let (change_transactions, payments) = self .select_payments_from_accounts(vec![(account.clone(), amount)]) .await?; for change_tx in change_transactions.into_iter() { self.store(change_tx).await?; } self.store(Transaction::new_external_withdrawal( reference, status, payments, )?) .await } /// Returns the payment object by a given payment id pub async fn get_payment_info(&self, _payment_id: &PaymentId) -> Result { todo!() } /// Returns the transaction object by a given transaction id pub async fn get_transaction(&self, transaction_id: TxId) -> Result { let filter = Filter { ids: vec![transaction_id], limit: 1, ..Default::default() }; self.config .storage .find(filter) .await? .pop() .ok_or(Error::TxNotFound) } /// Returns all transactions from a given account. It can be optionally be /// sorted by transaction type. The transactions are sorted from newest to /// oldest. pub async fn get_transactions(&self, filter: Filter) -> Result, Error> { Ok(self.config.storage.find(filter).await?) } /// Returns the status manager pub fn get_status_manager(&self) -> &StatusManager { &self.config.status } /// Updates a transaction and updates their tags to this given set pub async fn set_tags( &self, revision_id: RevId, tags: Vec, reason: String, ) -> Result { let filter = Filter { revisions: vec![revision_id], limit: 1, ..Default::default() }; self.store( self.config .storage .find(filter) .await? .pop() .ok_or(Error::TxNotFound)? .set_tags(tags, reason)?, ) .await } /// Attempts to change the status of a given transaction id. On success the /// new transaction object is returned, otherwise an error is returned. pub async fn change_status( &self, revision_id: RevId, new_status: Status, reason: String, ) -> Result { let filter = Filter { revisions: vec![revision_id], limit: 1, ..Default::default() }; self.store( self.config .storage .find(filter) .await? .pop() .ok_or(Error::TxNotFound)? .change_status(&self.config, new_status, reason)?, ) .await } }