batch.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. use crate::{
  2. storage::{self, Error},
  3. AccountId, Payment, PaymentId, Status, Transaction, TransactionId,
  4. };
  5. use sqlx::{Row, Sqlite, Transaction as SqlxTransaction};
  6. use std::marker::PhantomData;
  7. pub struct Batch<'a> {
  8. inner: SqlxTransaction<'a, Sqlite>,
  9. x: PhantomData<&'a ()>,
  10. }
  11. impl<'a> Batch<'a> {
  12. pub fn new(inner: SqlxTransaction<'a, Sqlite>) -> Batch<'a> {
  13. Self {
  14. inner,
  15. x: PhantomData,
  16. }
  17. }
  18. }
  19. #[async_trait::async_trait]
  20. impl<'a> storage::Batch<'a> for Batch<'a> {
  21. async fn rollback(self) -> Result<(), Error> {
  22. self.inner
  23. .rollback()
  24. .await
  25. .map_err(|e| Error::Storage(e.to_string()))
  26. }
  27. async fn commit(self) -> Result<(), Error> {
  28. self.inner
  29. .commit()
  30. .await
  31. .map_err(|e| Error::Storage(e.to_string()))
  32. }
  33. async fn spend_payment(
  34. &mut self,
  35. payment_id: &PaymentId,
  36. status: Status,
  37. transaction_id: &TransactionId,
  38. ) -> Result<(), Error> {
  39. let result = sqlx::query(
  40. r#"
  41. UPDATE payments SET "spent_by" = ?
  42. WHERE "transaction_id" = ? AND "position_id" = ? AND ("spent_by" IS NULL OR "spent_by" = ?)
  43. "#,
  44. )
  45. .bind(if status.is_rollback() {
  46. None
  47. } else {
  48. Some(transaction_id.to_string())
  49. })
  50. .bind(payment_id.transaction.to_string())
  51. .bind(payment_id.position.to_string())
  52. .bind(transaction_id.to_string())
  53. .execute(&mut *self.inner)
  54. .await
  55. .map_err(|e| Error::SpendPayment(e.to_string()))?;
  56. if result.rows_affected() == 1 {
  57. Ok(())
  58. } else {
  59. Err(Error::NoUpdate)
  60. }
  61. }
  62. async fn get_payment_status(
  63. &mut self,
  64. transaction_id: &TransactionId,
  65. ) -> Result<Option<Status>, Error> {
  66. let row = sqlx::query(
  67. r#"
  68. SELECT
  69. "p"."status"
  70. FROM
  71. "payments" "p"
  72. WHERE
  73. "p"."transaction_id" = ?
  74. LIMIT 1
  75. "#,
  76. )
  77. .bind(transaction_id.to_string())
  78. .fetch_optional(&mut *self.inner)
  79. .await
  80. .map_err(|e| Error::Storage(e.to_string()))?;
  81. if let Some(row) = row {
  82. let status = row
  83. .try_get::<u32, usize>(0)
  84. .map_err(|_| Error::Storage("failed to parse status".to_owned()))?;
  85. status
  86. .try_into()
  87. .map(|x| Some(x))
  88. .map_err(|_| Error::Storage("failed to parse status".to_owned()))
  89. } else {
  90. return Ok(None);
  91. }
  92. }
  93. async fn store_new_payment(&mut self, payment: &Payment) -> Result<(), Error> {
  94. sqlx::query(
  95. r#"
  96. INSERT INTO payments("transaction_id", "position_id", "to", "cents", "asset_id", "status")
  97. VALUES (?, ?, ?, ?, ?, ?)
  98. ON CONFLICT("transaction_id", "position_id")
  99. DO UPDATE SET "status" = excluded."status"
  100. "#,
  101. )
  102. .bind(payment.id.transaction.to_string())
  103. .bind(payment.id.position.to_string())
  104. .bind(payment.to.to_string())
  105. .bind(payment.amount.cents().to_string())
  106. .bind(payment.amount.asset().id)
  107. .bind::<u32>((&payment.status).into())
  108. .execute(&mut *self.inner)
  109. .await
  110. .map_err(|e| Error::Storage(e.to_string()))?;
  111. Ok(())
  112. }
  113. async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error> {
  114. sqlx::query(
  115. r#"
  116. INSERT INTO "transactions"("transaction_id", "status", "type", "reference", "created_at", "updated_at")
  117. VALUES(?, ?, ?, ?, ?, ?)
  118. ON CONFLICT("transaction_id")
  119. DO UPDATE SET "status" = excluded."status", "updated_at" = excluded."updated_at"
  120. "#,
  121. )
  122. .bind(transaction.id().to_string())
  123. .bind::<u32>(transaction.status().into())
  124. .bind::<u32>(transaction.typ().into())
  125. .bind(transaction.reference())
  126. .bind(transaction.created_at())
  127. .bind(transaction.updated_at())
  128. .execute(&mut *self.inner)
  129. .await
  130. .map_err(|e| Error::Storage(e.to_string()))?;
  131. for payment in transaction.spends().iter() {
  132. sqlx::query(
  133. r#"
  134. INSERT INTO "transaction_input_payments"("transaction_id", "payment_transaction_id", "payment_position_id")
  135. VALUES(?, ?, ?)
  136. ON CONFLICT("transaction_id", "payment_transaction_id", "payment_position_id")
  137. DO NOTHING
  138. "#,
  139. )
  140. .bind(transaction.id().to_string())
  141. .bind(payment.id.transaction.to_string())
  142. .bind(payment.id.position.to_string())
  143. .execute(&mut *self.inner)
  144. .await
  145. .map_err(|e| Error::Storage(e.to_string()))?;
  146. }
  147. Ok(())
  148. }
  149. async fn relate_account_to_transaction(
  150. &mut self,
  151. transaction: &Transaction,
  152. account: &AccountId,
  153. ) -> Result<(), Error> {
  154. sqlx::query(
  155. r#"
  156. INSERT INTO "transaction_accounts"("transaction_id", "account_id", "type", "created_at", "updated_at")
  157. VALUES(?, ?, ?, ?, ?)
  158. ON CONFLICT("transaction_id", "account_id")
  159. DO NOTHING
  160. "#,
  161. )
  162. .bind(transaction.id().to_string())
  163. .bind(account.to_string())
  164. .bind::<u32>(transaction.typ().into())
  165. .bind(transaction.created_at())
  166. .bind(transaction.updated_at())
  167. .execute(&mut *self.inner)
  168. .await
  169. .map_err(|e| Error::Storage(e.to_string()))?;
  170. Ok(())
  171. }
  172. }