use crate::{ changelog::Changelog, storage::{self, Error}, transaction, AccountId, Payment, PaymentId, Status, Transaction, TransactionId, }; use serde::{de::DeserializeOwned, Serialize}; use sqlx::{Row, Sqlite, Transaction as SqlxTransaction}; use std::marker::PhantomData; /// Creates a new Batch for SQLite /// /// Batches are a group of updates to the databases, in which all of the are /// executed or none. Same concept as a rdbms transaction. pub struct Batch<'a> { inner: SqlxTransaction<'a, Sqlite>, x: PhantomData<&'a ()>, } impl<'a> Batch<'a> { /// Creates a new instance pub fn new(inner: SqlxTransaction<'a, Sqlite>) -> Batch<'a> { Self { inner, x: PhantomData, } } } #[async_trait::async_trait] impl<'a> storage::Batch<'a> for Batch<'a> { async fn rollback(self) -> Result<(), Error> { self.inner .rollback() .await .map_err(|e| Error::Storage(e.to_string())) } async fn commit(self) -> Result<(), Error> { self.inner .commit() .await .map_err(|e| Error::Storage(e.to_string())) } async fn store_changelogs( &mut self, changelog: &[Changelog], ) -> Result<(), Error> { for change in changelog.iter() { let change_bytes = bincode::serialize(&change.change).map_err(|e| Error::Storage(e.to_string()))?; sqlx::query( r#" INSERT INTO "changelog"("id", "previous", "object_id", "change", "created_at") VALUES(?, ?, ?, ?, ?) ON CONFLICT("id") DO NOTHING "#, ) .bind(change.id().map_err(|e| Error::Storage(e.to_string()))?) .bind(&change.previous) .bind(&change.object_id) .bind(change_bytes) .bind(change.updated_at) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; } Ok(()) } async fn update_payment( &mut self, payment_id: &PaymentId, spent_by: &TransactionId, spent_status: Status, ) -> Result<(), Error> { let settled: u32 = Status::Settled.into(); let spent_by_val = if spent_status.is_rollback() { None } else { Some(spent_by.to_string()) }; let spent_by_status_val: Option = if spent_status.is_rollback() { None } else { Some(spent_status.into()) }; let result = sqlx::query( r#" UPDATE "payments" SET "spent_by" = ?, "spent_by_status" = ? WHERE "transaction_id" = ? AND "position_id" = ? AND "status" = ? AND ("spent_by_status" IS NULL OR "spent_by_status" != ?) AND ("spent_by" = ? OR "spent_by" IS NULL) "#, ) .bind(spent_by_val) .bind(spent_by_status_val) .bind(payment_id.transaction.to_string()) .bind(payment_id.position.to_string()) .bind(settled) .bind(settled) .bind(spent_by.to_string()) .execute(&mut *self.inner) .await .map_err(|e| Error::SpendPayment(e.to_string()))?; if result.rows_affected() == 1 { Ok(()) } else { Err(Error::NoUpdate) } } async fn get_payment_status( &mut self, transaction_id: &TransactionId, ) -> Result, Error> { let row = sqlx::query( r#" SELECT "p"."status" FROM "payments" "p" WHERE "p"."transaction_id" = ? LIMIT 1 "#, ) .bind(transaction_id.to_string()) .fetch_optional(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; if let Some(row) = row { let status = row .try_get::(0) .map_err(|_| Error::Storage("failed to parse status".to_owned()))?; status .try_into() .map(|x| Some(x)) .map_err(|_| Error::Storage("failed to parse status".to_owned())) } else { return Ok(None); } } async fn store_new_payment(&mut self, payment: &Payment) -> Result<(), Error> { sqlx::query( r#" INSERT INTO payments("transaction_id", "position_id", "to", "cents", "asset_id", "status", "spent_by_status") VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT("transaction_id", "position_id") DO UPDATE SET "status" = excluded."status" "#, ) .bind(payment.id.transaction.to_string()) .bind(payment.id.position.to_string()) .bind(payment.to.to_string()) .bind(payment.amount.cents().to_string()) .bind(payment.amount.asset().id) .bind::((&payment.status).into()) .bind::>(None) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; Ok(()) } async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error> { sqlx::query( r#" INSERT INTO "transactions"("transaction_id", "status", "type", "reference", "last_version", "created_at", "updated_at") VALUES(?, ?, ?, ?, ?, ?, ?) ON CONFLICT("transaction_id") DO UPDATE SET "status" = excluded."status", "updated_at" = excluded."updated_at", "last_version" = excluded."last_version" "#, ) .bind(transaction.id().to_string()) .bind::(transaction.status().into()) .bind::(transaction.typ().into()) .bind(transaction.reference()) .bind(transaction.last_version()) .bind(transaction.created_at()) .bind(transaction.updated_at()) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; for payment in transaction.spends().iter() { sqlx::query( r#" INSERT INTO "transaction_input_payments"("transaction_id", "payment_transaction_id", "payment_position_id") VALUES(?, ?, ?) ON CONFLICT("transaction_id", "payment_transaction_id", "payment_position_id") DO NOTHING "#, ) .bind(transaction.id().to_string()) .bind(payment.id.transaction.to_string()) .bind(payment.id.position.to_string()) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; } Ok(()) } async fn relate_account_to_transaction( &mut self, transaction: &Transaction, account: &AccountId, ) -> Result<(), Error> { sqlx::query( r#" INSERT INTO "transaction_accounts"("transaction_id", "account_id", "type", "created_at", "updated_at") VALUES(?, ?, ?, ?, ?) ON CONFLICT("transaction_id", "account_id") DO NOTHING "#, ) .bind(transaction.id().to_string()) .bind(account.to_string()) .bind::(transaction.typ().into()) .bind(transaction.created_at()) .bind(transaction.updated_at()) .execute(&mut *self.inner) .await .map_err(|e| Error::Storage(e.to_string()))?; Ok(()) } }