123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- use crate::{
- storage::{self, Error},
- AccountId, Payment, PaymentId, Status, Transaction, TransactionId,
- };
- use sqlx::{Row, Sqlite, Transaction as SqlxTransaction};
- use std::marker::PhantomData;
- pub struct Batch<'a> {
- inner: SqlxTransaction<'a, Sqlite>,
- x: PhantomData<&'a ()>,
- }
- impl<'a> Batch<'a> {
- pub fn new(inner: SqlxTransaction<'a, Sqlite>) -> Batch<'a> {
- Self {
- inner,
- x: PhantomData,
- }
- }
- }
- #[async_trait::async_trait]
- impl<'a> storage::Batch<'a> for Batch<'a> {
- async fn rollback(self) -> Result<(), Error> {
- self.inner
- .rollback()
- .await
- .map_err(|e| Error::Storage(e.to_string()))
- }
- async fn commit(self) -> Result<(), Error> {
- self.inner
- .commit()
- .await
- .map_err(|e| Error::Storage(e.to_string()))
- }
- async fn spend_payment(
- &mut self,
- payment_id: &PaymentId,
- status: Status,
- transaction_id: &TransactionId,
- ) -> Result<(), Error> {
- let result = sqlx::query(
- r#"
- UPDATE payments SET "spent_by" = ?
- WHERE "transaction_id" = ? AND "position_id" = ? AND ("spent_by" IS NULL OR "spent_by" = ?)
- "#,
- )
- .bind(if status.is_rollback() {
- None
- } else {
- Some(transaction_id.to_string())
- })
- .bind(payment_id.transaction.to_string())
- .bind(payment_id.position.to_string())
- .bind(transaction_id.to_string())
- .execute(&mut *self.inner)
- .await
- .map_err(|e| Error::SpendPayment(e.to_string()))?;
- if result.rows_affected() == 1 {
- Ok(())
- } else {
- Err(Error::NoUpdate)
- }
- }
- async fn get_payment_status(
- &mut self,
- transaction_id: &TransactionId,
- ) -> Result<Option<Status>, Error> {
- let row = sqlx::query(
- r#"
- SELECT
- "p"."status"
- FROM
- "payments" "p"
- WHERE
- "p"."transaction_id" = ?
- LIMIT 1
- "#,
- )
- .bind(transaction_id.to_string())
- .fetch_optional(&mut *self.inner)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- if let Some(row) = row {
- let status = row
- .try_get::<u32, usize>(0)
- .map_err(|_| Error::Storage("failed to parse status".to_owned()))?;
- status
- .try_into()
- .map(|x| Some(x))
- .map_err(|_| Error::Storage("failed to parse status".to_owned()))
- } else {
- return Ok(None);
- }
- }
- async fn store_new_payment(&mut self, payment: &Payment) -> Result<(), Error> {
- sqlx::query(
- r#"
- INSERT INTO payments("transaction_id", "position_id", "to", "cents", "asset_id", "status")
- VALUES (?, ?, ?, ?, ?, ?)
- ON CONFLICT("transaction_id", "position_id")
- DO UPDATE SET "status" = excluded."status"
- "#,
- )
- .bind(payment.id.transaction.to_string())
- .bind(payment.id.position.to_string())
- .bind(payment.to.to_string())
- .bind(payment.amount.cents().to_string())
- .bind(payment.amount.asset().id)
- .bind::<u32>((&payment.status).into())
- .execute(&mut *self.inner)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- Ok(())
- }
- async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error> {
- sqlx::query(
- r#"
- INSERT INTO "transactions"("transaction_id", "status", "type", "reference", "created_at", "updated_at")
- VALUES(?, ?, ?, ?, ?, ?)
- ON CONFLICT("transaction_id")
- DO UPDATE SET "status" = excluded."status", "updated_at" = excluded."updated_at"
- "#,
- )
- .bind(transaction.id().to_string())
- .bind::<u32>(transaction.status().into())
- .bind::<u32>(transaction.typ().into())
- .bind(transaction.reference())
- .bind(transaction.created_at())
- .bind(transaction.updated_at())
- .execute(&mut *self.inner)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- for payment in transaction.spends().iter() {
- sqlx::query(
- r#"
- INSERT INTO "transaction_input_payments"("transaction_id", "payment_transaction_id", "payment_position_id")
- VALUES(?, ?, ?)
- ON CONFLICT("transaction_id", "payment_transaction_id", "payment_position_id")
- DO NOTHING
- "#,
- )
- .bind(transaction.id().to_string())
- .bind(payment.id.transaction.to_string())
- .bind(payment.id.position.to_string())
- .execute(&mut *self.inner)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- }
- Ok(())
- }
- async fn relate_account_to_transaction(
- &mut self,
- transaction: &Transaction,
- account: &AccountId,
- ) -> Result<(), Error> {
- sqlx::query(
- r#"
- INSERT INTO "transaction_accounts"("transaction_id", "account_id", "type", "created_at", "updated_at")
- VALUES(?, ?, ?, ?, ?)
- ON CONFLICT("transaction_id", "account_id")
- DO NOTHING
- "#,
- )
- .bind(transaction.id().to_string())
- .bind(account.to_string())
- .bind::<u32>(transaction.typ().into())
- .bind(transaction.created_at())
- .bind(transaction.updated_at())
- .execute(&mut *self.inner)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- Ok(())
- }
- }
|