ledger.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. use crate::{
  2. amount::AmountCents,
  3. broadcaster::Broadcaster,
  4. config::Config,
  5. status::{InternalStatus, StatusManager},
  6. storage::{AccountTransactionType, Batch, ReceivedPaymentStatus, Storage},
  7. token::TokenPayload,
  8. transaction::{Error as TxError, Type},
  9. worker::WorkerManager,
  10. AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
  11. TxId,
  12. };
  13. use std::{cmp::Ordering, collections::HashMap, sync::Arc};
  14. use tokio::sync::mpsc::{self, Receiver, Sender};
  15. /// The Verax ledger
  16. #[derive(Debug)]
  17. pub struct Ledger<S>
  18. where
  19. S: Storage + Sync + Send,
  20. {
  21. config: Config<S>,
  22. broadcaster: WorkerManager<Broadcaster>,
  23. }
  24. impl<S> Ledger<S>
  25. where
  26. S: Storage + Sync + Send,
  27. {
  28. /// Creates a new ledger instance
  29. pub fn new(config: Config<S>) -> Arc<Self> {
  30. Arc::new(Self {
  31. config,
  32. broadcaster: WorkerManager::new(Broadcaster::default()),
  33. })
  34. }
  35. /// Subscribes to new transactions and revisions with a given filter
  36. pub async fn subscribe(&self, filter: Filter) -> (Sender<Transaction>, Receiver<Transaction>) {
  37. let (sender, receiver) = mpsc::channel(100);
  38. self.subscribe_with_sender(filter, sender.clone()).await;
  39. (sender, receiver)
  40. }
  41. /// Subscribe to new transactions and revisions with a given filter and a receiver.
  42. pub async fn subscribe_with_sender(&self, filter: Filter, sender: Sender<Transaction>) {
  43. self.broadcaster.subscribe(filter, sender).await;
  44. }
  45. /// Returns the total number of active subscribers
  46. pub async fn subscribers(&self) -> usize {
  47. self.broadcaster.subscribers().await
  48. }
  49. /// The internal usage is to select unspent payments for each account to create new
  50. /// transactions. The external API however does not expose that level of usage, instead it
  51. /// exposes a simple API to move funds using accounts to debit from and accounts to credit to. A
  52. /// single transaction can use multiple accounts to debit and credit, instead of a single
  53. /// account.
  54. ///
  55. /// This function selects the unspent payments to be used in a transaction, in a descending
  56. /// order (making sure to include any negative deposit).
  57. ///
  58. /// This function returns a vector of payments to be used as inputs and optionally a dependent
  59. /// transaction to be executed first. This transaction is an internal transaction and it settles
  60. /// immediately. It is used to split an existing payment into two payments, one to be used as
  61. /// input and the other to be used as change. This is done to avoid locking any change amount
  62. /// until the main transaction settles.
  63. async fn select_payments_from_accounts(
  64. &self,
  65. payments: Vec<(AccountId, Amount)>,
  66. ) -> Result<(Option<Transaction>, Vec<PaymentFrom>), Error> {
  67. let mut to_spend = HashMap::<_, AmountCents>::new();
  68. for (account_id, amount) in payments.into_iter() {
  69. let id = (account_id, amount.asset().clone());
  70. if let Some(value) = to_spend.get_mut(&id) {
  71. *value = value
  72. .checked_add(amount.cents())
  73. .ok_or(Error::Overflow("cents".to_owned()))?;
  74. } else {
  75. to_spend.insert(id, amount.cents());
  76. }
  77. }
  78. let mut change_input = vec![];
  79. let mut change_output = vec![];
  80. let mut payments: Vec<PaymentFrom> = vec![];
  81. for ((account, asset), mut to_spend_cents) in to_spend.into_iter() {
  82. let iterator = self
  83. .config
  84. .storage
  85. .get_unspent_payments(&account, &asset, to_spend_cents)
  86. .await?;
  87. for payment in iterator.into_iter() {
  88. let cents = payment.amount.cents();
  89. to_spend_cents = to_spend_cents
  90. .checked_sub(cents)
  91. .ok_or(Error::Underflow("cents".to_owned()))?;
  92. payments.push(payment);
  93. match to_spend_cents.cmp(&0) {
  94. Ordering::Equal => {
  95. // No change amount, we are done with this input
  96. break;
  97. }
  98. Ordering::Less => {
  99. // There is a change amount, we need to split the last
  100. // input into two payment_ids into the same accounts in
  101. // a transaction that will settle immediately, otherwise
  102. // the change amount will be unspendable until this
  103. // transaction settles. By doing so the current
  104. // operation will have no change and it can safely take
  105. // its time to settle without making any change amount
  106. // unspendable.
  107. let to_spend_cents = to_spend_cents.abs();
  108. let input = payments
  109. .pop()
  110. .ok_or(Error::InsufficientBalance(account.clone(), asset.clone()))?;
  111. change_input.push(input);
  112. change_output.push((
  113. account.clone(),
  114. asset.new_amount(
  115. cents
  116. .checked_sub(to_spend_cents)
  117. .ok_or(Error::Underflow("change cents".to_owned()))?,
  118. ),
  119. ));
  120. change_output.push((account.clone(), asset.new_amount(to_spend_cents)));
  121. // Go to the next payment
  122. break;
  123. }
  124. _ => {
  125. // We need more funds, continue to the selecting the
  126. // available payment if any
  127. }
  128. }
  129. }
  130. if to_spend_cents > 0 {
  131. // We don't have enough payment to cover the to_spend_cents
  132. // Return an insufficient balance error
  133. return Err(Error::InsufficientBalance(account, asset.clone()));
  134. }
  135. }
  136. let exchange_tx = if change_input.is_empty() {
  137. None
  138. } else {
  139. let total =
  140. u16::try_from(change_input.len()).map_err(|e| Error::Overflow(e.to_string()))?;
  141. let split_input = Transaction::new(
  142. "Exchange transaction".to_owned(),
  143. // Set the change transaction as settled. This is an
  144. // internal transaction to split existing payments
  145. // into exact new payments, so the main transaction has no
  146. // change.
  147. self.config.status.default_spendable(),
  148. Type::Exchange,
  149. change_input,
  150. change_output,
  151. )
  152. .await?;
  153. let creates = &split_input.creates;
  154. for i in 0..total {
  155. // Spend the new payment
  156. let index = i
  157. .checked_mul(2)
  158. .ok_or(Error::Overflow("index overflow".to_owned()))?;
  159. let uindex: usize = index.into();
  160. payments.push(PaymentFrom {
  161. id: PaymentId {
  162. transaction: split_input.id.clone(),
  163. position: index,
  164. },
  165. from: creates[uindex].to.clone(),
  166. amount: creates[uindex].amount.clone(),
  167. });
  168. }
  169. Some(split_input)
  170. };
  171. Ok((exchange_tx, payments))
  172. }
  173. #[inline]
  174. /// Persist a new base transaction
  175. ///
  176. /// This operation should only happen once, because if it is executed multiple times the storage
  177. /// layer should fail. Base transactions are not allowed to be ammened, only revisions.
  178. async fn store_base_transaction(
  179. transaction: &Transaction,
  180. batch: &mut S::Batch<'_>,
  181. ) -> Result<(), Error> {
  182. let spends = transaction
  183. .spends
  184. .iter()
  185. .map(|x| x.id.clone())
  186. .collect::<Vec<_>>();
  187. batch
  188. .spend_payments(
  189. &transaction.revision.transaction_id,
  190. spends,
  191. ReceivedPaymentStatus::Locked,
  192. )
  193. .await?;
  194. batch
  195. .create_payments(
  196. &transaction.revision.transaction_id,
  197. &transaction.creates,
  198. ReceivedPaymentStatus::Locked,
  199. )
  200. .await?;
  201. let accounts = transaction.accounts();
  202. for account in accounts.spends {
  203. batch
  204. .relate_account_to_transaction(
  205. AccountTransactionType::Spends,
  206. &transaction.revision.transaction_id,
  207. &account,
  208. transaction.typ,
  209. )
  210. .await?;
  211. }
  212. for account in accounts.receives {
  213. batch
  214. .relate_account_to_transaction(
  215. AccountTransactionType::Receives,
  216. &transaction.revision.transaction_id,
  217. &account,
  218. transaction.typ,
  219. )
  220. .await?;
  221. }
  222. batch
  223. .store_base_transaction(
  224. &transaction.revision.transaction_id,
  225. &transaction.transaction,
  226. )
  227. .await?;
  228. Ok(())
  229. }
  230. /// Stores the current transaction object to the storage layer.
  231. ///
  232. /// This method is not idempotent, and it will fail if the transaction if the requested update
  233. /// is not allowed.
  234. ///
  235. /// This function will store the base transaction if it is the first revision, and will create a
  236. /// new revision otherwise.
  237. pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
  238. transaction.validate()?;
  239. if let Some(previous) = &transaction.revision.previous {
  240. // Although this operation to check the previous version is being performed outside of
  241. // the writer batch, it is safe to do so because the previous version is
  242. // already stored in the storage layer, and the batch will make sure
  243. // the previous version is the current revision, or else the entire operation will be
  244. // revered
  245. if let Some(lock_token) = self
  246. .config
  247. .storage
  248. .find(Filter {
  249. revisions: vec![previous.clone()],
  250. ..Default::default()
  251. })
  252. .await?
  253. .pop()
  254. .ok_or(Error::TxNotFound)?
  255. .revision
  256. .locked
  257. {
  258. self.config
  259. .token_manager
  260. .verify(lock_token, &transaction.revision.update_token)?
  261. }
  262. }
  263. let mut batch = self.config.storage.begin().await?;
  264. if transaction.revision.previous.is_none() {
  265. Self::store_base_transaction(&transaction, &mut batch).await?;
  266. }
  267. let (created_updated, spent_updated) = match self
  268. .config
  269. .status
  270. .internal_type(&transaction.revision.status)
  271. {
  272. InternalStatus::Reverted => {
  273. batch
  274. .update_transaction_payments(
  275. &transaction.id,
  276. ReceivedPaymentStatus::Failed,
  277. ReceivedPaymentStatus::Spendable,
  278. )
  279. .await?
  280. }
  281. InternalStatus::Spendable => {
  282. batch
  283. .update_transaction_payments(
  284. &transaction.id,
  285. ReceivedPaymentStatus::Spendable,
  286. ReceivedPaymentStatus::Spent,
  287. )
  288. .await?
  289. }
  290. _ => (transaction.creates.len(), transaction.spends.len()),
  291. };
  292. if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated
  293. {
  294. return Err(Error::Transaction(TxError::NoUpdate));
  295. }
  296. if self
  297. .config
  298. .status
  299. .is_spendable(&transaction.revision.status)
  300. {
  301. batch
  302. .update_transaction_payments(
  303. &transaction.id,
  304. ReceivedPaymentStatus::Spendable,
  305. ReceivedPaymentStatus::Spent,
  306. )
  307. .await?;
  308. }
  309. batch
  310. .store_revision(&transaction.revision_id, &transaction.revision)
  311. .await?;
  312. batch
  313. .tag_transaction(
  314. &transaction.id,
  315. &transaction.transaction,
  316. &transaction.revision.tags,
  317. )
  318. .await?;
  319. batch
  320. .update_transaction_revision(
  321. &transaction.id,
  322. &transaction.revision_id,
  323. transaction.revision.previous.as_ref(),
  324. )
  325. .await?;
  326. batch.commit().await?;
  327. // The transaction is persisted and now it is time to broadcast it to any possible listener
  328. self.broadcaster.process(transaction.clone());
  329. Ok(transaction)
  330. }
  331. /// Creates a new transaction and returns it.
  332. ///
  333. /// The input is pretty simple, take this amounts from these given accounts
  334. /// and send them to these accounts (and amounts). The job of this function
  335. /// is to figure it out how to do it. This function will make sure that each
  336. /// account has enough balance, selecting the unspent payments from each
  337. /// account that will be spent. It will also return a list of transactions
  338. /// that will be used to return the change to the accounts, these accounts
  339. /// can be settled immediately so no other funds required to perform the
  340. /// transaction are locked.
  341. ///
  342. /// This functions performs read only operations on top of the storage layer
  343. /// and it will guarantee execution (meaning that it will not lock any
  344. /// funds, so these transactions may fail if any selected payment is spent
  345. /// between the time the transaction is created and executed).
  346. ///
  347. /// A NewTransaction struct is returned, the change_transactions should be
  348. /// executed and set as settled before the transaction is executed,
  349. /// otherwise it will fail. A failure in any execution will render the
  350. /// entire operation as failed but no funds will be locked.
  351. pub async fn new_transaction(
  352. &self,
  353. reference: String,
  354. status: Status,
  355. from: Vec<(AccountId, Amount)>,
  356. to: Vec<(AccountId, Amount)>,
  357. ) -> Result<Transaction, Error> {
  358. let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
  359. if let Some(change_tx) = change_transaction {
  360. self.store(change_tx).await?;
  361. }
  362. self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
  363. .await
  364. }
  365. /// Return the balances from a given account
  366. ///
  367. /// The balance is a vector of Amounts, one for each asset. The balance will
  368. /// return only spendable amounts, meaning that any amount that is locked in
  369. /// a transaction will not be returned.
  370. ///
  371. /// TODO: Return locked funds as well.
  372. pub async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
  373. Ok(self.config.storage.get_balance(account).await?)
  374. }
  375. /// Creates an external deposit
  376. ///
  377. /// Although a deposit can have multiple output payments, to different
  378. /// accounts and amounts, to keep the upstream API simple, this function
  379. /// only accepts a single account and amount to credit
  380. pub async fn deposit(
  381. &self,
  382. account: &AccountId,
  383. amount: Amount,
  384. status: Status,
  385. tags: Vec<Tag>,
  386. reference: String,
  387. ) -> Result<Transaction, Error> {
  388. self.store(Transaction::new_external_deposit(
  389. reference,
  390. status,
  391. tags,
  392. vec![(account.clone(), amount)],
  393. )?)
  394. .await
  395. }
  396. /// Creates a new withdrawal transaction and returns it.
  397. ///
  398. /// Although a transaction supports multiple inputs to be burned, from
  399. /// different accounts, to keep things simple, this function only supports a
  400. /// single input (single account and single amount). This is because the
  401. /// natural behaviour is to have withdrawals from a single account.
  402. pub async fn withdrawal(
  403. &self,
  404. account: &AccountId,
  405. amount: Amount,
  406. status: Status,
  407. reference: String,
  408. ) -> Result<Transaction, Error> {
  409. let (change_transactions, payments) = self
  410. .select_payments_from_accounts(vec![(account.clone(), amount)])
  411. .await?;
  412. for change_tx in change_transactions.into_iter() {
  413. self.store(change_tx).await?;
  414. }
  415. self.store(Transaction::new_external_withdrawal(
  416. reference, status, payments,
  417. )?)
  418. .await
  419. }
  420. /// Returns the payment object by a given payment id
  421. pub async fn get_payment_info(&self, _payment_id: &PaymentId) -> Result<PaymentFrom, Error> {
  422. todo!()
  423. }
  424. /// Returns the transaction object by a given transaction id
  425. pub async fn get_transaction(&self, transaction_id: TxId) -> Result<Transaction, Error> {
  426. let filter = Filter {
  427. ids: vec![transaction_id],
  428. limit: 1,
  429. ..Default::default()
  430. };
  431. self.config
  432. .storage
  433. .find(filter)
  434. .await?
  435. .pop()
  436. .ok_or(Error::TxNotFound)
  437. }
  438. /// Returns all transactions from a given account. It can be optionally be
  439. /// sorted by transaction type. The transactions are sorted from newest to
  440. /// oldest.
  441. pub async fn get_transactions(&self, filter: Filter) -> Result<Vec<Transaction>, Error> {
  442. Ok(self.config.storage.find(filter).await?)
  443. }
  444. /// Returns the status manager
  445. pub fn get_status_manager(&self) -> &StatusManager {
  446. &self.config.status
  447. }
  448. /// Locks the transaction and returns a token that can be used to unlock it.
  449. ///
  450. /// Locked transactions cannot be updated without the TokenPayload or until it expires.
  451. pub async fn lock_transaction(
  452. &self,
  453. transaction_id: TxId,
  454. owner: String,
  455. ) -> Result<(Transaction, TokenPayload), Error> {
  456. let filter = Filter {
  457. ids: vec![transaction_id.clone()],
  458. limit: 1,
  459. ..Default::default()
  460. };
  461. let (new_revision, secret) = self
  462. .config
  463. .storage
  464. .find(filter)
  465. .await?
  466. .pop()
  467. .ok_or(Error::TxNotFound)?
  468. .lock_transaction(owner, &self.config.token_manager)?;
  469. Ok((self.store(new_revision).await?, secret))
  470. }
  471. /// Updates a transaction and updates their tags to this given set
  472. pub async fn set_tags(
  473. &self,
  474. revision_id: RevId,
  475. tags: Vec<Tag>,
  476. reason: String,
  477. update_token: Option<TokenPayload>,
  478. ) -> Result<Transaction, Error> {
  479. let filter = Filter {
  480. revisions: vec![revision_id],
  481. limit: 1,
  482. ..Default::default()
  483. };
  484. self.store(
  485. self.config
  486. .storage
  487. .find(filter)
  488. .await?
  489. .pop()
  490. .ok_or(Error::TxNotFound)?
  491. .set_tags(tags, reason, update_token)?,
  492. )
  493. .await
  494. }
  495. /// Attempts to change the status of a given transaction id. On success the
  496. /// new transaction object is returned, otherwise an error is returned.
  497. pub async fn change_status(
  498. &self,
  499. revision_id: RevId,
  500. new_status: Status,
  501. reason: String,
  502. update_token: Option<TokenPayload>,
  503. ) -> Result<Transaction, Error> {
  504. let filter = Filter {
  505. revisions: vec![revision_id],
  506. limit: 1,
  507. ..Default::default()
  508. };
  509. self.store(
  510. self.config
  511. .storage
  512. .find(filter)
  513. .await?
  514. .pop()
  515. .ok_or(Error::TxNotFound)?
  516. .change_status(&self.config, new_status, reason, update_token)?,
  517. )
  518. .await
  519. }
  520. }