ledger.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. use crate::{
  2. amount::AmountCents,
  3. broadcaster::Broadcaster,
  4. config::Config,
  5. status::{InternalStatus, StatusManager},
  6. storage::{AccountTransactionType, Batch, ReceivedPaymentStatus, Storage},
  7. transaction::{Error as TxError, Type},
  8. worker::WorkerManager,
  9. AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
  10. TxId,
  11. };
  12. use std::{cmp::Ordering, collections::HashMap, sync::Arc};
  13. use tokio::sync::mpsc::{self, Receiver, Sender};
  14. /// The Verax ledger
  15. #[derive(Debug)]
  16. pub struct Ledger<S>
  17. where
  18. S: Storage + Sync + Send,
  19. {
  20. config: Config<S>,
  21. broadcaster: WorkerManager<Broadcaster>,
  22. }
  23. impl<S> Ledger<S>
  24. where
  25. S: Storage + Sync + Send,
  26. {
  27. /// Creates a new ledger instance
  28. pub fn new(config: Config<S>) -> Arc<Self> {
  29. Arc::new(Self {
  30. config,
  31. broadcaster: WorkerManager::new(Broadcaster::default()),
  32. })
  33. }
  34. /// Subscribes to new transactions and revisions with a given filter
  35. pub async fn subscribe(&self, filter: Filter) -> (Sender<Transaction>, Receiver<Transaction>) {
  36. let (sender, receiver) = mpsc::channel(100);
  37. self.subscribe_with_sender(filter, sender.clone()).await;
  38. (sender, receiver)
  39. }
  40. /// Subscribe to new transactions and revisions with a given filter and a receiver.
  41. pub async fn subscribe_with_sender(&self, filter: Filter, sender: Sender<Transaction>) {
  42. self.broadcaster.subscribe(filter, sender).await;
  43. }
  44. /// Returns the total number of active subscribers
  45. pub async fn subscribers(&self) -> usize {
  46. self.broadcaster.subscribers().await
  47. }
  48. /// The internal usage is to select unspent payments for each account to create new
  49. /// transactions. The external API however does not expose that level of usage, instead it
  50. /// exposes a simple API to move funds using accounts to debit from and accounts to credit to. A
  51. /// single transaction can use multiple accounts to debit and credit, instead of a single
  52. /// account.
  53. ///
  54. /// This function selects the unspent payments to be used in a transaction, in a descending
  55. /// order (making sure to include any negative deposit).
  56. ///
  57. /// This function returns a vector of payments to be used as inputs and optionally a dependent
  58. /// transaction to be executed first. This transaction is an internal transaction and it settles
  59. /// immediately. It is used to split an existing payment into two payments, one to be used as
  60. /// input and the other to be used as change. This is done to avoid locking any change amount
  61. /// until the main transaction settles.
  62. async fn select_payments_from_accounts(
  63. &self,
  64. payments: Vec<(AccountId, Amount)>,
  65. ) -> Result<(Option<Transaction>, Vec<PaymentFrom>), Error> {
  66. let mut to_spend = HashMap::<_, AmountCents>::new();
  67. for (account_id, amount) in payments.into_iter() {
  68. let id = (account_id, amount.asset().clone());
  69. if let Some(value) = to_spend.get_mut(&id) {
  70. *value = value
  71. .checked_add(amount.cents())
  72. .ok_or(Error::Overflow("cents".to_owned()))?;
  73. } else {
  74. to_spend.insert(id, amount.cents());
  75. }
  76. }
  77. let mut change_input = vec![];
  78. let mut change_output = vec![];
  79. let mut payments: Vec<PaymentFrom> = vec![];
  80. for ((account, asset), mut to_spend_cents) in to_spend.into_iter() {
  81. let iterator = self
  82. .config
  83. .storage
  84. .get_unspent_payments(&account, &asset, to_spend_cents)
  85. .await?;
  86. for payment in iterator.into_iter() {
  87. let cents = payment.amount.cents();
  88. to_spend_cents = to_spend_cents
  89. .checked_sub(cents)
  90. .ok_or(Error::Underflow("cents".to_owned()))?;
  91. payments.push(payment);
  92. match to_spend_cents.cmp(&0) {
  93. Ordering::Equal => {
  94. // No change amount, we are done with this input
  95. break;
  96. }
  97. Ordering::Less => {
  98. // There is a change amount, we need to split the last
  99. // input into two payment_ids into the same accounts in
  100. // a transaction that will settle immediately, otherwise
  101. // the change amount will be unspendable until this
  102. // transaction settles. By doing so the current
  103. // operation will have no change and it can safely take
  104. // its time to settle without making any change amount
  105. // unspendable.
  106. let to_spend_cents = to_spend_cents.abs();
  107. let input = payments
  108. .pop()
  109. .ok_or(Error::InsufficientBalance(account.clone(), asset.clone()))?;
  110. change_input.push(input);
  111. change_output.push((
  112. account.clone(),
  113. asset.new_amount(
  114. cents
  115. .checked_sub(to_spend_cents)
  116. .ok_or(Error::Underflow("change cents".to_owned()))?,
  117. ),
  118. ));
  119. change_output.push((account.clone(), asset.new_amount(to_spend_cents)));
  120. // Go to the next payment
  121. break;
  122. }
  123. _ => {
  124. // We need more funds, continue to the selecting the
  125. // available payment if any
  126. }
  127. }
  128. }
  129. if to_spend_cents > 0 {
  130. // We don't have enough payment to cover the to_spend_cents
  131. // Return an insufficient balance error
  132. return Err(Error::InsufficientBalance(account, asset.clone()));
  133. }
  134. }
  135. let exchange_tx = if change_input.is_empty() {
  136. None
  137. } else {
  138. let total =
  139. u16::try_from(change_input.len()).map_err(|e| Error::Overflow(e.to_string()))?;
  140. let split_input = Transaction::new(
  141. "Exchange transaction".to_owned(),
  142. // Set the change transaction as settled. This is an
  143. // internal transaction to split existing payments
  144. // into exact new payments, so the main transaction has no
  145. // change.
  146. self.config.status.default_spendable(),
  147. Type::Exchange,
  148. change_input,
  149. change_output,
  150. )
  151. .await?;
  152. let creates = &split_input.creates;
  153. for i in 0..total {
  154. // Spend the new payment
  155. let index = i
  156. .checked_mul(2)
  157. .ok_or(Error::Overflow("index overflow".to_owned()))?;
  158. let uindex: usize = index.into();
  159. payments.push(PaymentFrom {
  160. id: PaymentId {
  161. transaction: split_input.id.clone(),
  162. position: index,
  163. },
  164. from: creates[uindex].to.clone(),
  165. amount: creates[uindex].amount.clone(),
  166. });
  167. }
  168. Some(split_input)
  169. };
  170. Ok((exchange_tx, payments))
  171. }
  172. #[inline]
  173. /// Persist a new base transaction
  174. ///
  175. /// This operation should only happen once, because if it is executed multiple times the storage
  176. /// layer should fail. Base transactions are not allowed to be ammened, only revisions.
  177. async fn store_base_transaction(
  178. transaction: &Transaction,
  179. batch: &mut S::Batch<'_>,
  180. ) -> Result<(), Error> {
  181. let spends = transaction
  182. .spends
  183. .iter()
  184. .map(|x| x.id.clone())
  185. .collect::<Vec<_>>();
  186. batch
  187. .spend_payments(
  188. &transaction.revision.transaction_id,
  189. spends,
  190. ReceivedPaymentStatus::Locked,
  191. )
  192. .await?;
  193. batch
  194. .create_payments(
  195. &transaction.revision.transaction_id,
  196. &transaction.creates,
  197. ReceivedPaymentStatus::Locked,
  198. )
  199. .await?;
  200. let accounts = transaction.accounts();
  201. for account in accounts.spends {
  202. batch
  203. .relate_account_to_transaction(
  204. AccountTransactionType::Spends,
  205. &transaction.revision.transaction_id,
  206. &account,
  207. transaction.typ,
  208. )
  209. .await?;
  210. }
  211. for account in accounts.receives {
  212. batch
  213. .relate_account_to_transaction(
  214. AccountTransactionType::Receives,
  215. &transaction.revision.transaction_id,
  216. &account,
  217. transaction.typ,
  218. )
  219. .await?;
  220. }
  221. batch
  222. .store_base_transaction(
  223. &transaction.revision.transaction_id,
  224. &transaction.transaction,
  225. )
  226. .await?;
  227. Ok(())
  228. }
  229. /// Stores the current transaction object to the storage layer.
  230. ///
  231. /// This method is not idempotent, and it will fail if the transaction if the requested update
  232. /// is not allowed.
  233. ///
  234. /// This function will store the base transaction if it is the first revision, and will create a
  235. /// new revision otherwise.
  236. pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
  237. transaction.validate()?;
  238. if let Some(previous) = &transaction.revision.previous {
  239. // Although this operation to check the previous version is being performed outside of
  240. // the writer batch, it is safe to do so because the previous version is
  241. // already stored in the storage layer, and the batch will make sure
  242. // the previous version is the current revision, or else the entire operation will be
  243. // revered
  244. if let Some(lock_token) = self
  245. .config
  246. .storage
  247. .find(Filter {
  248. revisions: vec![previous.clone()],
  249. ..Default::default()
  250. })
  251. .await?
  252. .pop()
  253. .ok_or(Error::TxNotFound)?
  254. .revision
  255. .locked
  256. {
  257. self.config
  258. .token_manager
  259. .verify(lock_token, &transaction.revision.update_token)?
  260. }
  261. }
  262. let mut batch = self.config.storage.begin().await?;
  263. if transaction.revision.previous.is_none() {
  264. Self::store_base_transaction(&transaction, &mut batch).await?;
  265. }
  266. let (created_updated, spent_updated) = match self
  267. .config
  268. .status
  269. .internal_type(&transaction.revision.status)
  270. {
  271. InternalStatus::Reverted => {
  272. batch
  273. .update_transaction_payments(
  274. &transaction.id,
  275. ReceivedPaymentStatus::Failed,
  276. ReceivedPaymentStatus::Spendable,
  277. )
  278. .await?
  279. }
  280. InternalStatus::Spendable => {
  281. batch
  282. .update_transaction_payments(
  283. &transaction.id,
  284. ReceivedPaymentStatus::Spendable,
  285. ReceivedPaymentStatus::Spent,
  286. )
  287. .await?
  288. }
  289. _ => (transaction.creates.len(), transaction.spends.len()),
  290. };
  291. if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated
  292. {
  293. return Err(Error::Transaction(TxError::NoUpdate));
  294. }
  295. if self
  296. .config
  297. .status
  298. .is_spendable(&transaction.revision.status)
  299. {
  300. batch
  301. .update_transaction_payments(
  302. &transaction.id,
  303. ReceivedPaymentStatus::Spendable,
  304. ReceivedPaymentStatus::Spent,
  305. )
  306. .await?;
  307. }
  308. batch
  309. .store_revision(&transaction.revision_id, &transaction.revision)
  310. .await?;
  311. batch
  312. .tag_transaction(
  313. &transaction.id,
  314. &transaction.transaction,
  315. &transaction.revision.tags,
  316. )
  317. .await?;
  318. batch
  319. .update_transaction_revision(
  320. &transaction.id,
  321. &transaction.revision_id,
  322. transaction.revision.previous.as_ref(),
  323. )
  324. .await?;
  325. batch.commit().await?;
  326. // The transaction is persisted and now it is time to broadcast it to any possible listener
  327. self.broadcaster.process(transaction.clone());
  328. Ok(transaction)
  329. }
  330. /// Creates a new transaction and returns it.
  331. ///
  332. /// The input is pretty simple, take this amounts from these given accounts
  333. /// and send them to these accounts (and amounts). The job of this function
  334. /// is to figure it out how to do it. This function will make sure that each
  335. /// account has enough balance, selecting the unspent payments from each
  336. /// account that will be spent. It will also return a list of transactions
  337. /// that will be used to return the change to the accounts, these accounts
  338. /// can be settled immediately so no other funds required to perform the
  339. /// transaction are locked.
  340. ///
  341. /// This functions performs read only operations on top of the storage layer
  342. /// and it will guarantee execution (meaning that it will not lock any
  343. /// funds, so these transactions may fail if any selected payment is spent
  344. /// between the time the transaction is created and executed).
  345. ///
  346. /// A NewTransaction struct is returned, the change_transactions should be
  347. /// executed and set as settled before the transaction is executed,
  348. /// otherwise it will fail. A failure in any execution will render the
  349. /// entire operation as failed but no funds will be locked.
  350. pub async fn new_transaction(
  351. &self,
  352. reference: String,
  353. status: Status,
  354. from: Vec<(AccountId, Amount)>,
  355. to: Vec<(AccountId, Amount)>,
  356. ) -> Result<Transaction, Error> {
  357. let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
  358. if let Some(change_tx) = change_transaction {
  359. self.store(change_tx).await?;
  360. }
  361. self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
  362. .await
  363. }
  364. /// Return the balances from a given account
  365. ///
  366. /// The balance is a vector of Amounts, one for each asset. The balance will
  367. /// return only spendable amounts, meaning that any amount that is locked in
  368. /// a transaction will not be returned.
  369. ///
  370. /// TODO: Return locked funds as well.
  371. pub async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
  372. Ok(self.config.storage.get_balance(account).await?)
  373. }
  374. /// Creates an external deposit
  375. ///
  376. /// Although a deposit can have multiple output payments, to different
  377. /// accounts and amounts, to keep the upstream API simple, this function
  378. /// only accepts a single account and amount to credit
  379. pub async fn deposit(
  380. &self,
  381. account: &AccountId,
  382. amount: Amount,
  383. status: Status,
  384. tags: Vec<Tag>,
  385. reference: String,
  386. ) -> Result<Transaction, Error> {
  387. self.store(Transaction::new_external_deposit(
  388. reference,
  389. status,
  390. tags,
  391. vec![(account.clone(), amount)],
  392. )?)
  393. .await
  394. }
  395. /// Creates a new withdrawal transaction and returns it.
  396. ///
  397. /// Although a transaction supports multiple inputs to be burned, from
  398. /// different accounts, to keep things simple, this function only supports a
  399. /// single input (single account and single amount). This is because the
  400. /// natural behaviour is to have withdrawals from a single account.
  401. pub async fn withdrawal(
  402. &self,
  403. account: &AccountId,
  404. amount: Amount,
  405. status: Status,
  406. reference: String,
  407. ) -> Result<Transaction, Error> {
  408. let (change_transactions, payments) = self
  409. .select_payments_from_accounts(vec![(account.clone(), amount)])
  410. .await?;
  411. for change_tx in change_transactions.into_iter() {
  412. self.store(change_tx).await?;
  413. }
  414. self.store(Transaction::new_external_withdrawal(
  415. reference, status, payments,
  416. )?)
  417. .await
  418. }
  419. /// Returns the payment object by a given payment id
  420. pub async fn get_payment_info(&self, _payment_id: &PaymentId) -> Result<PaymentFrom, Error> {
  421. todo!()
  422. }
  423. /// Returns the transaction object by a given transaction id
  424. pub async fn get_transaction(&self, transaction_id: TxId) -> Result<Transaction, Error> {
  425. let filter = Filter {
  426. ids: vec![transaction_id],
  427. limit: 1,
  428. ..Default::default()
  429. };
  430. self.config
  431. .storage
  432. .find(filter)
  433. .await?
  434. .pop()
  435. .ok_or(Error::TxNotFound)
  436. }
  437. /// Returns all transactions from a given account. It can be optionally be
  438. /// sorted by transaction type. The transactions are sorted from newest to
  439. /// oldest.
  440. pub async fn get_transactions(&self, filter: Filter) -> Result<Vec<Transaction>, Error> {
  441. Ok(self.config.storage.find(filter).await?)
  442. }
  443. /// Returns the status manager
  444. pub fn get_status_manager(&self) -> &StatusManager {
  445. &self.config.status
  446. }
  447. /// TODO
  448. pub async fn lock_transaction(&self, transaction_id: TxId) -> Result<Vec<u8>, Error> {
  449. let filter = Filter {
  450. ids: vec![transaction_id.clone()],
  451. limit: 1,
  452. ..Default::default()
  453. };
  454. let (new_revision, secret) = self
  455. .config
  456. .storage
  457. .find(filter)
  458. .await?
  459. .pop()
  460. .ok_or(Error::TxNotFound)?
  461. .lock_transaction(&self.config.token_manager)?;
  462. self.store(new_revision).await?;
  463. Ok(secret)
  464. }
  465. /// Updates a transaction and updates their tags to this given set
  466. pub async fn set_tags(
  467. &self,
  468. revision_id: RevId,
  469. tags: Vec<Tag>,
  470. reason: String,
  471. update_token: Option<Vec<u8>>,
  472. ) -> Result<Transaction, Error> {
  473. let filter = Filter {
  474. revisions: vec![revision_id],
  475. limit: 1,
  476. ..Default::default()
  477. };
  478. self.store(
  479. self.config
  480. .storage
  481. .find(filter)
  482. .await?
  483. .pop()
  484. .ok_or(Error::TxNotFound)?
  485. .set_tags(tags, reason, update_token)?,
  486. )
  487. .await
  488. }
  489. /// Attempts to change the status of a given transaction id. On success the
  490. /// new transaction object is returned, otherwise an error is returned.
  491. pub async fn change_status(
  492. &self,
  493. revision_id: RevId,
  494. new_status: Status,
  495. reason: String,
  496. update_token: Option<Vec<u8>>,
  497. ) -> Result<Transaction, Error> {
  498. let filter = Filter {
  499. revisions: vec![revision_id],
  500. limit: 1,
  501. ..Default::default()
  502. };
  503. self.store(
  504. self.config
  505. .storage
  506. .find(filter)
  507. .await?
  508. .pop()
  509. .ok_or(Error::TxNotFound)?
  510. .change_status(&self.config, new_status, reason, update_token)?,
  511. )
  512. .await
  513. }
  514. }