123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- use crate::{
- amount::AmountCents,
- broadcaster::Broadcaster,
- config::Config,
- status::{InternalStatus, StatusManager},
- storage::{Batch, ReceivedPaymentStatus, Storage},
- transaction::Error as TxError,
- transaction::Type,
- worker::WorkerManager,
- AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
- TxId,
- };
- use std::{cmp::Ordering, collections::HashMap, sync::Arc};
- use tokio::sync::mpsc::{self, Receiver, Sender};
- /// The Verax ledger
- #[derive(Debug)]
- pub struct Ledger<S>
- where
- S: Storage + Sync + Send,
- {
- config: Config<S>,
- broadcaster: WorkerManager<Broadcaster>,
- }
- impl<S> Ledger<S>
- where
- S: Storage + Sync + Send,
- {
- /// Creates a new ledger instance
- pub fn new(config: Config<S>) -> Arc<Self> {
- Arc::new(Self {
- config,
- broadcaster: WorkerManager::new(Broadcaster::default()),
- })
- }
- /// Subscribes to new transactions and revisions with a given filter
- pub async fn subscribe(&self, filter: Filter) -> (Sender<Transaction>, Receiver<Transaction>) {
- let (sender, receiver) = mpsc::channel(100);
- self.subscribe_with_sender(filter, sender.clone()).await;
- (sender, receiver)
- }
- /// Subscribe to new transactions and revisions with a given filter and a receiver.
- pub async fn subscribe_with_sender(&self, filter: Filter, sender: Sender<Transaction>) {
- self.broadcaster.subscribe(filter, sender).await;
- }
- /// Returns the total number of active subscribers
- pub async fn subscribers(&self) -> usize {
- self.broadcaster.subscribers().await
- }
- /// The internal usage is to select unspent payments for each account to create new
- /// transactions. The external API however does not expose that level of usage, instead it
- /// exposes a simple API to move funds using accounts to debit from and accounts to credit to. A
- /// single transaction can use multiple accounts to debit and credit, instead of a single
- /// account.
- ///
- /// This function selects the unspent payments to be used in a transaction, in a descending
- /// order (making sure to include any negative deposit).
- ///
- /// This function returns a vector of payments to be used as inputs and optionally a dependent
- /// transaction to be executed first. This transaction is an internal transaction and it settles
- /// immediately. It is used to split an existing payment into two payments, one to be used as
- /// input and the other to be used as change. This is done to avoid locking any change amount
- /// until the main transaction settles.
- async fn select_payments_from_accounts(
- &self,
- payments: Vec<(AccountId, Amount)>,
- ) -> Result<(Option<Transaction>, Vec<PaymentFrom>), Error> {
- let mut to_spend = HashMap::<_, AmountCents>::new();
- for (account_id, amount) in payments.into_iter() {
- let id = (account_id, amount.asset().clone());
- if let Some(value) = to_spend.get_mut(&id) {
- *value = value
- .checked_add(amount.cents())
- .ok_or(Error::Overflow("cents".to_owned()))?;
- } else {
- to_spend.insert(id, amount.cents());
- }
- }
- let mut change_input = vec![];
- let mut change_output = vec![];
- let mut payments: Vec<PaymentFrom> = vec![];
- for ((account, asset), mut to_spend_cents) in to_spend.into_iter() {
- let iterator = self
- .config
- .storage
- .get_unspent_payments(&account, &asset, to_spend_cents)
- .await?;
- for payment in iterator.into_iter() {
- let cents = payment.amount.cents();
- to_spend_cents = to_spend_cents
- .checked_sub(cents)
- .ok_or(Error::Underflow("cents".to_owned()))?;
- payments.push(payment);
- match to_spend_cents.cmp(&0) {
- Ordering::Equal => {
- // No change amount, we are done with this input
- break;
- }
- Ordering::Less => {
- // There is a change amount, we need to split the last
- // input into two payment_ids into the same accounts in
- // a transaction that will settle immediately, otherwise
- // the change amount will be unspendable until this
- // transaction settles. By doing so the current
- // operation will have no change and it can safely take
- // its time to settle without making any change amount
- // unspendable.
- let to_spend_cents = to_spend_cents.abs();
- let input = payments
- .pop()
- .ok_or(Error::InsufficientBalance(account.clone(), asset.clone()))?;
- change_input.push(input);
- change_output.push((
- account.clone(),
- asset.new_amount(
- cents
- .checked_sub(to_spend_cents)
- .ok_or(Error::Underflow("change cents".to_owned()))?,
- ),
- ));
- change_output.push((account.clone(), asset.new_amount(to_spend_cents)));
- // Go to the next payment
- break;
- }
- _ => {
- // We need more funds, continue to the selecting the
- // available payment if any
- }
- }
- }
- if to_spend_cents > 0 {
- // We don't have enough payment to cover the to_spend_cents
- // Return an insufficient balance error
- return Err(Error::InsufficientBalance(account, asset.clone()));
- }
- }
- let exchange_tx = if change_input.is_empty() {
- None
- } else {
- let total =
- u16::try_from(change_input.len()).map_err(|e| Error::Overflow(e.to_string()))?;
- let split_input = Transaction::new(
- "Exchange transaction".to_owned(),
- // Set the change transaction as settled. This is an
- // internal transaction to split existing payments
- // into exact new payments, so the main transaction has no
- // change.
- self.config.status.default_spendable(),
- Type::Exchange,
- change_input,
- change_output,
- )
- .await?;
- let creates = &split_input.creates;
- for i in 0..total {
- // Spend the new payment
- let index = i
- .checked_mul(2)
- .ok_or(Error::Overflow("index overflow".to_owned()))?;
- let uindex: usize = index.into();
- payments.push(PaymentFrom {
- id: PaymentId {
- transaction: split_input.id.clone(),
- position: index,
- },
- from: creates[uindex].to.clone(),
- amount: creates[uindex].amount.clone(),
- });
- }
- Some(split_input)
- };
- Ok((exchange_tx, payments))
- }
- #[inline]
- /// Persist a new base transaction
- ///
- /// This operation should only happen once, because if it is executed multiple times the storage
- /// layer should fail. Base transactions are not allowed to be ammened, only revisions.
- async fn store_base_transaction(
- transaction: &Transaction,
- batch: &mut S::Batch<'_>,
- ) -> Result<(), Error> {
- let spends = transaction
- .spends
- .iter()
- .map(|x| x.id.clone())
- .collect::<Vec<_>>();
- batch
- .spend_payments(
- &transaction.revision.transaction_id,
- spends,
- ReceivedPaymentStatus::Locked,
- )
- .await?;
- batch
- .create_payments(
- &transaction.revision.transaction_id,
- &transaction.creates,
- ReceivedPaymentStatus::Locked,
- )
- .await?;
- for account in transaction.accounts() {
- batch
- .relate_account_to_transaction(
- &transaction.revision.transaction_id,
- &account,
- transaction.typ,
- )
- .await?;
- }
- batch
- .store_base_transaction(
- &transaction.revision.transaction_id,
- &transaction.transaction,
- )
- .await?;
- Ok(())
- }
- /// Stores the current transaction object to the storage layer.
- ///
- /// This method is not idempotent, and it will fail if the transaction if the requested update
- /// is not allowed.
- ///
- /// This function will store the base transaction if it is the first revision, and will create a
- /// new revision otherwise.
- pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
- transaction.validate()?;
- let mut batch = self.config.storage.begin().await?;
- if transaction.revision.previous.is_none() {
- Self::store_base_transaction(&transaction, &mut batch).await?;
- }
- let (created_updated, spent_updated) = match self
- .config
- .status
- .internal_type(&transaction.revision.status)
- {
- InternalStatus::Reverted => {
- batch
- .update_transaction_payments(
- &transaction.id,
- ReceivedPaymentStatus::Failed,
- ReceivedPaymentStatus::Spendable,
- )
- .await?
- }
- InternalStatus::Spendable => {
- batch
- .update_transaction_payments(
- &transaction.id,
- ReceivedPaymentStatus::Spendable,
- ReceivedPaymentStatus::Spent,
- )
- .await?
- }
- _ => (transaction.creates.len(), transaction.spends.len()),
- };
- if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated
- {
- return Err(Error::Transaction(TxError::NoUpdate));
- }
- if self
- .config
- .status
- .is_spendable(&transaction.revision.status)
- {
- batch
- .update_transaction_payments(
- &transaction.id,
- ReceivedPaymentStatus::Spendable,
- ReceivedPaymentStatus::Spent,
- )
- .await?;
- }
- batch
- .store_revision(&transaction.revision_id, &transaction.revision)
- .await?;
- batch
- .tag_transaction(
- &transaction.id,
- &transaction.transaction,
- &transaction.revision.tags,
- )
- .await?;
- batch
- .update_transaction_revision(
- &transaction.id,
- &transaction.revision_id,
- transaction.revision.previous.as_ref(),
- )
- .await?;
- batch.commit().await?;
- // The transaction is persisted and now it is time to broadcast it to any possible listener
- self.broadcaster.process(transaction.clone());
- Ok(transaction)
- }
- /// Creates a new transaction and returns it.
- ///
- /// The input is pretty simple, take this amounts from these given accounts
- /// and send them to these accounts (and amounts). The job of this function
- /// is to figure it out how to do it. This function will make sure that each
- /// account has enough balance, selecting the unspent payments from each
- /// account that will be spent. It will also return a list of transactions
- /// that will be used to return the change to the accounts, these accounts
- /// can be settled immediately so no other funds required to perform the
- /// transaction are locked.
- ///
- /// This functions performs read only operations on top of the storage layer
- /// and it will guarantee execution (meaning that it will not lock any
- /// funds, so these transactions may fail if any selected payment is spent
- /// between the time the transaction is created and executed).
- ///
- /// A NewTransaction struct is returned, the change_transactions should be
- /// executed and set as settled before the transaction is executed,
- /// otherwise it will fail. A failure in any execution will render the
- /// entire operation as failed but no funds will be locked.
- pub async fn new_transaction(
- &self,
- reference: String,
- status: Status,
- from: Vec<(AccountId, Amount)>,
- to: Vec<(AccountId, Amount)>,
- ) -> Result<Transaction, Error> {
- let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
- if let Some(change_tx) = change_transaction {
- self.store(change_tx).await?;
- }
- self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
- .await
- }
- /// Return the balances from a given account
- ///
- /// The balance is a vector of Amounts, one for each asset. The balance will
- /// return only spendable amounts, meaning that any amount that is locked in
- /// a transaction will not be returned.
- ///
- /// TODO: Return locked funds as well.
- pub async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
- Ok(self.config.storage.get_balance(account).await?)
- }
- /// Creates an external deposit
- ///
- /// Although a deposit can have multiple output payments, to different
- /// accounts and amounts, to keep the upstream API simple, this function
- /// only accepts a single account and amount to credit
- pub async fn deposit(
- &self,
- account: &AccountId,
- amount: Amount,
- status: Status,
- tags: Vec<Tag>,
- reference: String,
- ) -> Result<Transaction, Error> {
- self.store(Transaction::new_external_deposit(
- reference,
- status,
- tags,
- vec![(account.clone(), amount)],
- )?)
- .await
- }
- /// Creates a new withdrawal transaction and returns it.
- ///
- /// Although a transaction supports multiple inputs to be burned, from
- /// different accounts, to keep things simple, this function only supports a
- /// single input (single account and single amount). This is because the
- /// natural behaviour is to have withdrawals from a single account.
- pub async fn withdrawal(
- &self,
- account: &AccountId,
- amount: Amount,
- status: Status,
- reference: String,
- ) -> Result<Transaction, Error> {
- let (change_transactions, payments) = self
- .select_payments_from_accounts(vec![(account.clone(), amount)])
- .await?;
- for change_tx in change_transactions.into_iter() {
- self.store(change_tx).await?;
- }
- self.store(Transaction::new_external_withdrawal(
- reference, status, payments,
- )?)
- .await
- }
- /// Returns the payment object by a given payment id
- pub async fn get_payment_info(&self, _payment_id: &PaymentId) -> Result<PaymentFrom, Error> {
- todo!()
- }
- /// Returns the transaction object by a given transaction id
- pub async fn get_transaction(&self, transaction_id: TxId) -> Result<Transaction, Error> {
- let filter = Filter {
- ids: vec![transaction_id],
- limit: 1,
- ..Default::default()
- };
- self.config
- .storage
- .find(filter)
- .await?
- .pop()
- .ok_or(Error::TxNotFound)
- }
- /// Returns all transactions from a given account. It can be optionally be
- /// sorted by transaction type. The transactions are sorted from newest to
- /// oldest.
- pub async fn get_transactions(&self, filter: Filter) -> Result<Vec<Transaction>, Error> {
- Ok(self.config.storage.find(filter).await?)
- }
- /// Returns the status manager
- pub fn get_status_manager(&self) -> &StatusManager {
- &self.config.status
- }
- /// Updates a transaction and updates their tags to this given set
- pub async fn set_tags(
- &self,
- revision_id: RevId,
- tags: Vec<Tag>,
- reason: String,
- ) -> Result<Transaction, Error> {
- let filter = Filter {
- revisions: vec![revision_id],
- limit: 1,
- ..Default::default()
- };
- self.store(
- self.config
- .storage
- .find(filter)
- .await?
- .pop()
- .ok_or(Error::TxNotFound)?
- .set_tags(tags, reason)?,
- )
- .await
- }
- /// Attempts to change the status of a given transaction id. On success the
- /// new transaction object is returned, otherwise an error is returned.
- pub async fn change_status(
- &self,
- revision_id: RevId,
- new_status: Status,
- reason: String,
- ) -> Result<Transaction, Error> {
- let filter = Filter {
- revisions: vec![revision_id],
- limit: 1,
- ..Default::default()
- };
- self.store(
- self.config
- .storage
- .find(filter)
- .await?
- .pop()
- .ok_or(Error::TxNotFound)?
- .change_status(&self.config, new_status, reason)?,
- )
- .await
- }
- }
|