use crate::{ storage::{self, Error}, AccountId, Payment, PaymentId, Status, Transaction, TransactionId, }; use sqlx::{Row, Sqlite, Transaction as SqlxTransaction}; use std::marker::PhantomData; pub struct Batch<'a> { inner: SqlxTransaction<'a, Sqlite>, x: PhantomData<&'a ()>, } impl<'a> Batch<'a> { pub fn new(inner: SqlxTransaction<'a, Sqlite>) -> Batch<'a> { Self { inner, x: PhantomData, } } } #[async_trait::async_trait] impl<'a> storage::Batch<'a> for Batch<'a> { async fn rollback(self) -> Result<(), Error> { self.inner .rollback() .await .map_err(|e| Error::Storage(e.to_string())) } async fn commit(self) -> Result<(), Error> { self.inner .commit() .await .map_err(|e| Error::Storage(e.to_string())) } async fn spend_payment( &mut self, payment_id: &PaymentId, status: Status, transaction_id: &TransactionId, ) -> Result<(), Error> { let result = sqlx::query( r#" UPDATE payments SET "spent_by" = ? WHERE "transaction_id" = ? AND "position_id" = ? AND ("spent_by" IS NULL OR "spent_by" = ?) "#, ) .bind(if status.is_rollback() { None } else { Some(transaction_id.to_string()) }) .bind(payment_id.transaction.to_string()) .bind(payment_id.position.to_string()) .bind(transaction_id.to_string()) .execute(&mut *self.inner) .await .map_err(|e| Error::SpendPayment(e.to_string()))?; if result.rows_affected() == 1 { Ok(()) } else { Err(Error::NoUpdate) } } async fn get_payment_status( &mut self, transaction_id: &TransactionId, ) -> Result, Error> { let row = sqlx::query( r#" SELECT "p"."status" FROM "payments" "p" WHERE "p"."transaction_id" = ? LIMIT 1 "#, ) .bind(transaction_id.to_string()) .fetch_optional(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; if let Some(row) = row { let status = row .try_get::(0) .map_err(|_| Error::Storage("failed to parse status".to_owned()))?; status .try_into() .map(|x| Some(x)) .map_err(|_| Error::Storage("failed to parse status".to_owned())) } else { return Ok(None); } } async fn store_new_payment(&mut self, payment: &Payment) -> Result<(), Error> { sqlx::query( r#" INSERT INTO payments("transaction_id", "position_id", "to", "cents", "asset_id", "status") VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT("transaction_id", "position_id") DO UPDATE SET "status" = excluded."status" "#, ) .bind(payment.id.transaction.to_string()) .bind(payment.id.position.to_string()) .bind(payment.to.to_string()) .bind(payment.amount.cents().to_string()) .bind(payment.amount.asset().id) .bind::((&payment.status).into()) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; Ok(()) } async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error> { sqlx::query( r#" INSERT INTO "transactions"("transaction_id", "status", "type", "reference", "created_at", "updated_at") VALUES(?, ?, ?, ?, ?, ?) ON CONFLICT("transaction_id") DO UPDATE SET "status" = excluded."status", "updated_at" = excluded."updated_at" "#, ) .bind(transaction.id().to_string()) .bind::(transaction.status().into()) .bind::(transaction.typ().into()) .bind(transaction.reference()) .bind(transaction.created_at()) .bind(transaction.updated_at()) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; for payment in transaction.spends().iter() { sqlx::query( r#" INSERT INTO "transaction_input_payments"("transaction_id", "payment_transaction_id", "payment_position_id") VALUES(?, ?, ?) ON CONFLICT("transaction_id", "payment_transaction_id", "payment_position_id") DO NOTHING "#, ) .bind(transaction.id().to_string()) .bind(payment.id.transaction.to_string()) .bind(payment.id.position.to_string()) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; } Ok(()) } async fn relate_account_to_transaction( &mut self, transaction: &Transaction, account: &AccountId, ) -> Result<(), Error> { sqlx::query( r#" INSERT INTO "transaction_accounts"("transaction_id", "account_id", "type", "created_at", "updated_at") VALUES(?, ?, ?, ?, ?) ON CONFLICT("transaction_id", "account_id") DO NOTHING "#, ) .bind(transaction.id().to_string()) .bind(account.to_string()) .bind::(transaction.typ().into()) .bind(transaction.created_at()) .bind(transaction.updated_at()) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; Ok(()) } }