ledger.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. use crate::{
  2. amount::AmountCents,
  3. broadcaster::Broadcaster,
  4. config::Config,
  5. status::{InternalStatus, StatusManager},
  6. storage::{Batch, ReceivedPaymentStatus, Storage},
  7. transaction::Error as TxError,
  8. transaction::Type,
  9. worker::WorkerManager,
  10. AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
  11. TxId,
  12. };
  13. use std::{cmp::Ordering, collections::HashMap, sync::Arc};
  14. use tokio::sync::mpsc::{self, Receiver, Sender};
  15. /// The Verax ledger
  16. #[derive(Debug)]
  17. pub struct Ledger<S>
  18. where
  19. S: Storage + Sync + Send,
  20. {
  21. config: Config<S>,
  22. broadcaster: WorkerManager<Broadcaster>,
  23. }
  24. impl<S> Ledger<S>
  25. where
  26. S: Storage + Sync + Send,
  27. {
  28. /// Creates a new ledger instance
  29. pub fn new(config: Config<S>) -> Arc<Self> {
  30. Arc::new(Self {
  31. config,
  32. broadcaster: WorkerManager::new(Broadcaster::default()),
  33. })
  34. }
  35. /// Subscribes to new transactions and revisions with a given filter
  36. pub async fn subscribe(&self, filter: Filter) -> (Sender<Transaction>, Receiver<Transaction>) {
  37. let (sender, receiver) = mpsc::channel(100);
  38. self.subscribe_with_sender(filter, sender.clone()).await;
  39. (sender, receiver)
  40. }
  41. /// Subscribe to new transactions and revisions with a given filter and a receiver.
  42. pub async fn subscribe_with_sender(&self, filter: Filter, sender: Sender<Transaction>) {
  43. self.broadcaster.subscribe(filter, sender).await;
  44. }
  45. /// Returns the total number of active subscribers
  46. pub async fn subscribers(&self) -> usize {
  47. self.broadcaster.subscribers().await
  48. }
  49. /// The internal usage is to select unspent payments for each account to create new
  50. /// transactions. The external API however does not expose that level of usage, instead it
  51. /// exposes a simple API to move funds using accounts to debit from and accounts to credit to. A
  52. /// single transaction can use multiple accounts to debit and credit, instead of a single
  53. /// account.
  54. ///
  55. /// This function selects the unspent payments to be used in a transaction, in a descending
  56. /// order (making sure to include any negative deposit).
  57. ///
  58. /// This function returns a vector of payments to be used as inputs and optionally a dependent
  59. /// transaction to be executed first. This transaction is an internal transaction and it settles
  60. /// immediately. It is used to split an existing payment into two payments, one to be used as
  61. /// input and the other to be used as change. This is done to avoid locking any change amount
  62. /// until the main transaction settles.
  63. async fn select_payments_from_accounts(
  64. &self,
  65. payments: Vec<(AccountId, Amount)>,
  66. ) -> Result<(Option<Transaction>, Vec<PaymentFrom>), Error> {
  67. let mut to_spend = HashMap::<_, AmountCents>::new();
  68. for (account_id, amount) in payments.into_iter() {
  69. let id = (account_id, amount.asset().clone());
  70. if let Some(value) = to_spend.get_mut(&id) {
  71. *value = value
  72. .checked_add(amount.cents())
  73. .ok_or(Error::Overflow("cents".to_owned()))?;
  74. } else {
  75. to_spend.insert(id, amount.cents());
  76. }
  77. }
  78. let mut change_input = vec![];
  79. let mut change_output = vec![];
  80. let mut payments: Vec<PaymentFrom> = vec![];
  81. for ((account, asset), mut to_spend_cents) in to_spend.into_iter() {
  82. let iterator = self
  83. .config
  84. .storage
  85. .get_unspent_payments(&account, &asset, to_spend_cents)
  86. .await?;
  87. for payment in iterator.into_iter() {
  88. let cents = payment.amount.cents();
  89. to_spend_cents = to_spend_cents
  90. .checked_sub(cents)
  91. .ok_or(Error::Underflow("cents".to_owned()))?;
  92. payments.push(payment);
  93. match to_spend_cents.cmp(&0) {
  94. Ordering::Equal => {
  95. // No change amount, we are done with this input
  96. break;
  97. }
  98. Ordering::Less => {
  99. // There is a change amount, we need to split the last
  100. // input into two payment_ids into the same accounts in
  101. // a transaction that will settle immediately, otherwise
  102. // the change amount will be unspendable until this
  103. // transaction settles. By doing so the current
  104. // operation will have no change and it can safely take
  105. // its time to settle without making any change amount
  106. // unspendable.
  107. let to_spend_cents = to_spend_cents.abs();
  108. let input = payments
  109. .pop()
  110. .ok_or(Error::InsufficientBalance(account.clone(), asset.clone()))?;
  111. change_input.push(input);
  112. change_output.push((
  113. account.clone(),
  114. asset.new_amount(
  115. cents
  116. .checked_sub(to_spend_cents)
  117. .ok_or(Error::Underflow("change cents".to_owned()))?,
  118. ),
  119. ));
  120. change_output.push((account.clone(), asset.new_amount(to_spend_cents)));
  121. // Go to the next payment
  122. break;
  123. }
  124. _ => {
  125. // We need more funds, continue to the selecting the
  126. // available payment if any
  127. }
  128. }
  129. }
  130. if to_spend_cents > 0 {
  131. // We don't have enough payment to cover the to_spend_cents
  132. // Return an insufficient balance error
  133. return Err(Error::InsufficientBalance(account, asset.clone()));
  134. }
  135. }
  136. let exchange_tx = if change_input.is_empty() {
  137. None
  138. } else {
  139. let total =
  140. u16::try_from(change_input.len()).map_err(|e| Error::Overflow(e.to_string()))?;
  141. let split_input = Transaction::new(
  142. "Exchange transaction".to_owned(),
  143. // Set the change transaction as settled. This is an
  144. // internal transaction to split existing payments
  145. // into exact new payments, so the main transaction has no
  146. // change.
  147. self.config.status.default_spendable(),
  148. Type::Exchange,
  149. change_input,
  150. change_output,
  151. )
  152. .await?;
  153. let creates = &split_input.creates;
  154. for i in 0..total {
  155. // Spend the new payment
  156. let index = i
  157. .checked_mul(2)
  158. .ok_or(Error::Overflow("index overflow".to_owned()))?;
  159. let uindex: usize = index.into();
  160. payments.push(PaymentFrom {
  161. id: PaymentId {
  162. transaction: split_input.id.clone(),
  163. position: index,
  164. },
  165. from: creates[uindex].to.clone(),
  166. amount: creates[uindex].amount.clone(),
  167. });
  168. }
  169. Some(split_input)
  170. };
  171. Ok((exchange_tx, payments))
  172. }
  173. #[inline]
  174. /// Persist a new base transaction
  175. ///
  176. /// This operation should only happen once, because if it is executed multiple times the storage
  177. /// layer should fail. Base transactions are not allowed to be ammened, only revisions.
  178. async fn store_base_transaction(
  179. transaction: &Transaction,
  180. batch: &mut S::Batch<'_>,
  181. ) -> Result<(), Error> {
  182. let spends = transaction
  183. .spends
  184. .iter()
  185. .map(|x| x.id.clone())
  186. .collect::<Vec<_>>();
  187. batch
  188. .spend_payments(
  189. &transaction.revision.transaction_id,
  190. spends,
  191. ReceivedPaymentStatus::Locked,
  192. )
  193. .await?;
  194. batch
  195. .create_payments(
  196. &transaction.revision.transaction_id,
  197. &transaction.creates,
  198. ReceivedPaymentStatus::Locked,
  199. )
  200. .await?;
  201. for account in transaction.accounts() {
  202. batch
  203. .relate_account_to_transaction(
  204. &transaction.revision.transaction_id,
  205. &account,
  206. transaction.typ,
  207. )
  208. .await?;
  209. }
  210. batch
  211. .store_base_transaction(
  212. &transaction.revision.transaction_id,
  213. &transaction.transaction,
  214. )
  215. .await?;
  216. Ok(())
  217. }
  218. /// Stores the current transaction object to the storage layer.
  219. ///
  220. /// This method is not idempotent, and it will fail if the transaction if the requested update
  221. /// is not allowed.
  222. ///
  223. /// This function will store the base transaction if it is the first revision, and will create a
  224. /// new revision otherwise.
  225. pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
  226. transaction.validate()?;
  227. let mut batch = self.config.storage.begin().await?;
  228. if transaction.revision.previous.is_none() {
  229. Self::store_base_transaction(&transaction, &mut batch).await?;
  230. }
  231. let (created_updated, spent_updated) = match self
  232. .config
  233. .status
  234. .internal_type(&transaction.revision.status)
  235. {
  236. InternalStatus::Reverted => {
  237. batch
  238. .update_transaction_payments(
  239. &transaction.id,
  240. ReceivedPaymentStatus::Failed,
  241. ReceivedPaymentStatus::Spendable,
  242. )
  243. .await?
  244. }
  245. InternalStatus::Spendable => {
  246. batch
  247. .update_transaction_payments(
  248. &transaction.id,
  249. ReceivedPaymentStatus::Spendable,
  250. ReceivedPaymentStatus::Spent,
  251. )
  252. .await?
  253. }
  254. _ => (transaction.creates.len(), transaction.spends.len()),
  255. };
  256. if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated
  257. {
  258. return Err(Error::Transaction(TxError::NoUpdate));
  259. }
  260. if self
  261. .config
  262. .status
  263. .is_spendable(&transaction.revision.status)
  264. {
  265. batch
  266. .update_transaction_payments(
  267. &transaction.id,
  268. ReceivedPaymentStatus::Spendable,
  269. ReceivedPaymentStatus::Spent,
  270. )
  271. .await?;
  272. }
  273. batch
  274. .store_revision(&transaction.revision_id, &transaction.revision)
  275. .await?;
  276. batch
  277. .tag_transaction(
  278. &transaction.id,
  279. &transaction.transaction,
  280. &transaction.revision.tags,
  281. )
  282. .await?;
  283. batch
  284. .update_transaction_revision(
  285. &transaction.id,
  286. &transaction.revision_id,
  287. transaction.revision.previous.as_ref(),
  288. )
  289. .await?;
  290. batch.commit().await?;
  291. // The transaction is persisted and now it is time to broadcast it to any possible listener
  292. self.broadcaster.process(transaction.clone());
  293. Ok(transaction)
  294. }
  295. /// Creates a new transaction and returns it.
  296. ///
  297. /// The input is pretty simple, take this amounts from these given accounts
  298. /// and send them to these accounts (and amounts). The job of this function
  299. /// is to figure it out how to do it. This function will make sure that each
  300. /// account has enough balance, selecting the unspent payments from each
  301. /// account that will be spent. It will also return a list of transactions
  302. /// that will be used to return the change to the accounts, these accounts
  303. /// can be settled immediately so no other funds required to perform the
  304. /// transaction are locked.
  305. ///
  306. /// This functions performs read only operations on top of the storage layer
  307. /// and it will guarantee execution (meaning that it will not lock any
  308. /// funds, so these transactions may fail if any selected payment is spent
  309. /// between the time the transaction is created and executed).
  310. ///
  311. /// A NewTransaction struct is returned, the change_transactions should be
  312. /// executed and set as settled before the transaction is executed,
  313. /// otherwise it will fail. A failure in any execution will render the
  314. /// entire operation as failed but no funds will be locked.
  315. pub async fn new_transaction(
  316. &self,
  317. reference: String,
  318. status: Status,
  319. from: Vec<(AccountId, Amount)>,
  320. to: Vec<(AccountId, Amount)>,
  321. ) -> Result<Transaction, Error> {
  322. let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
  323. if let Some(change_tx) = change_transaction {
  324. self.store(change_tx).await?;
  325. }
  326. self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
  327. .await
  328. }
  329. /// Return the balances from a given account
  330. ///
  331. /// The balance is a vector of Amounts, one for each asset. The balance will
  332. /// return only spendable amounts, meaning that any amount that is locked in
  333. /// a transaction will not be returned.
  334. ///
  335. /// TODO: Return locked funds as well.
  336. pub async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
  337. Ok(self.config.storage.get_balance(account).await?)
  338. }
  339. /// Creates an external deposit
  340. ///
  341. /// Although a deposit can have multiple output payments, to different
  342. /// accounts and amounts, to keep the upstream API simple, this function
  343. /// only accepts a single account and amount to credit
  344. pub async fn deposit(
  345. &self,
  346. account: &AccountId,
  347. amount: Amount,
  348. status: Status,
  349. tags: Vec<Tag>,
  350. reference: String,
  351. ) -> Result<Transaction, Error> {
  352. self.store(Transaction::new_external_deposit(
  353. reference,
  354. status,
  355. tags,
  356. vec![(account.clone(), amount)],
  357. )?)
  358. .await
  359. }
  360. /// Creates a new withdrawal transaction and returns it.
  361. ///
  362. /// Although a transaction supports multiple inputs to be burned, from
  363. /// different accounts, to keep things simple, this function only supports a
  364. /// single input (single account and single amount). This is because the
  365. /// natural behaviour is to have withdrawals from a single account.
  366. pub async fn withdrawal(
  367. &self,
  368. account: &AccountId,
  369. amount: Amount,
  370. status: Status,
  371. reference: String,
  372. ) -> Result<Transaction, Error> {
  373. let (change_transactions, payments) = self
  374. .select_payments_from_accounts(vec![(account.clone(), amount)])
  375. .await?;
  376. for change_tx in change_transactions.into_iter() {
  377. self.store(change_tx).await?;
  378. }
  379. self.store(Transaction::new_external_withdrawal(
  380. reference, status, payments,
  381. )?)
  382. .await
  383. }
  384. /// Returns the payment object by a given payment id
  385. pub async fn get_payment_info(&self, _payment_id: &PaymentId) -> Result<PaymentFrom, Error> {
  386. todo!()
  387. }
  388. /// Returns the transaction object by a given transaction id
  389. pub async fn get_transaction(&self, transaction_id: TxId) -> Result<Transaction, Error> {
  390. let filter = Filter {
  391. ids: vec![transaction_id],
  392. limit: 1,
  393. ..Default::default()
  394. };
  395. self.config
  396. .storage
  397. .find(filter)
  398. .await?
  399. .pop()
  400. .ok_or(Error::TxNotFound)
  401. }
  402. /// Returns all transactions from a given account. It can be optionally be
  403. /// sorted by transaction type. The transactions are sorted from newest to
  404. /// oldest.
  405. pub async fn get_transactions(&self, filter: Filter) -> Result<Vec<Transaction>, Error> {
  406. Ok(self.config.storage.find(filter).await?)
  407. }
  408. /// Returns the status manager
  409. pub fn get_status_manager(&self) -> &StatusManager {
  410. &self.config.status
  411. }
  412. /// Updates a transaction and updates their tags to this given set
  413. pub async fn set_tags(
  414. &self,
  415. revision_id: RevId,
  416. tags: Vec<Tag>,
  417. reason: String,
  418. ) -> Result<Transaction, Error> {
  419. let filter = Filter {
  420. revisions: vec![revision_id],
  421. limit: 1,
  422. ..Default::default()
  423. };
  424. self.store(
  425. self.config
  426. .storage
  427. .find(filter)
  428. .await?
  429. .pop()
  430. .ok_or(Error::TxNotFound)?
  431. .set_tags(tags, reason)?,
  432. )
  433. .await
  434. }
  435. /// Attempts to change the status of a given transaction id. On success the
  436. /// new transaction object is returned, otherwise an error is returned.
  437. pub async fn change_status(
  438. &self,
  439. revision_id: RevId,
  440. new_status: Status,
  441. reason: String,
  442. ) -> Result<Transaction, Error> {
  443. let filter = Filter {
  444. revisions: vec![revision_id],
  445. limit: 1,
  446. ..Default::default()
  447. };
  448. self.store(
  449. self.config
  450. .storage
  451. .find(filter)
  452. .await?
  453. .pop()
  454. .ok_or(Error::TxNotFound)?
  455. .change_status(&self.config, new_status, reason)?,
  456. )
  457. .await
  458. }
  459. }