123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- use crate::{
- amount::AmountCents,
- asset::AssetId,
- storage::{Error, Storage},
- transaction::{from_db, Type},
- AccountId, Amount, Asset, AssetManager, Payment, PaymentId, Status, TransactionId,
- };
- use chrono::NaiveDateTime;
- use futures::TryStreamExt;
- use sqlx::{sqlite::SqliteRow, Executor, Row};
- use std::{collections::HashMap, marker::PhantomData};
- mod batch;
- pub use batch::Batch;
- pub use sqlx::sqlite::SqlitePoolOptions;
- pub struct Sqlite<'a> {
- db: sqlx::SqlitePool,
- asset_manager: AssetManager,
- _phantom: PhantomData<&'a ()>,
- }
- impl<'a> Sqlite<'a> {
- pub fn new(db: sqlx::SqlitePool, asset_manager: AssetManager) -> Self {
- Self {
- db,
- asset_manager,
- _phantom: PhantomData,
- }
- }
- pub async fn setup(&self) -> Result<(), sqlx::Error> {
- let mut x = self.db.begin().await?;
- x.execute(
- r#"CREATE TABLE IF NOT EXISTS "payments" (
- "transaction_id" VARCHAR(66) NOT NULL,
- "position_id" INTEGER NOT NULL,
- "asset_id" TEXT NOT NULL,
- "cents" TEXT NOT NULL,
- "status" INTEGER NOT NULL,
- "to" VARCHAR(71) NOT NULL,
- "spent_by" TEXT,
- "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- PRIMARY KEY ("transaction_id", "position_id")
- );
- CREATE INDEX IF NOT EXISTS payments_to ON payments ("to", "asset_id", "spent_by", "status");
- CREATE TABLE IF NOT EXISTS "transactions" (
- "transaction_id" VARCHAR(66) NOT NULL,
- "status" INTEGER NOT NULL,
- "type" INTEGER NOT NULL,
- "reference" TEXT NOT NULL,
- "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- PRIMARY KEY ("transaction_id")
- );
- CREATE TABLE IF NOT EXISTS "transaction_accounts" (
- "transaction_id" VARCHAR(66) NOT NULL,
- "account_id" VARCHAR(71) NOT NULL,
- "type" INTEGER NOT NULL,
- "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- PRIMARY KEY("account_id", "transaction_id")
- );
- CREATE INDEX IF NOT EXISTS "type" ON "transaction_accounts" ("account_id", "type", "created_at");
- CREATE TABLE IF NOT EXISTS "transaction_input_payments" (
- "transaction_id" VARCHAR(66) NOT NULL,
- "payment_transaction_id" VARCHAR(66) NOT NULL,
- "payment_position_id" INTEGER NOT NULL,
- "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
- PRIMARY KEY ("transaction_id", "payment_transaction_id", "payment_position_id")
- );
- "#,
- )
- .await
- .expect("valid");
- x.commit().await?;
- Ok(())
- }
- #[inline]
- fn sql_row_to_payment(&self, row: SqliteRow) -> Result<Payment, Error> {
- let id = PaymentId {
- transaction: row
- .try_get::<String, usize>(0)
- .map_err(|_| Error::Storage("Invalid payment_id".to_string()))?
- .as_str()
- .try_into()
- .map_err(|_| Error::Storage("Invalid transaction_id length".to_string()))?,
- position: row
- .try_get::<i64, usize>(1)
- .map_err(|_| Error::Storage("Invalid payment_id".to_string()))?
- .try_into()
- .map_err(|_| Error::Storage("Invalid payment_id".to_string()))?,
- };
- let cents = row
- .try_get::<String, usize>(3)
- .map_err(|_| Error::Storage("Invalid cents".to_string()))?
- .parse::<i128>()
- .map_err(|_| Error::Storage("Invalid cents".to_string()))?;
- Ok(Payment {
- id,
- amount: self
- .asset_manager
- .asset(
- row.try_get::<String, usize>(2)
- .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?
- .parse()
- .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?,
- )
- .map_err(|e| Error::Storage(e.to_string()))?
- .new_amount(cents),
- to: row
- .try_get::<String, usize>(4)
- .map_err(|_| Error::Storage("Invalid `to`".to_string()))?
- .as_str()
- .try_into()
- .map_err(|_| Error::Storage("Invalid `to`".to_string()))?,
- status: row
- .try_get::<u32, usize>(5)
- .map_err(|_| Error::Storage("Invalid `status`".to_string()))?
- .try_into()
- .map_err(|_| Error::Storage("Invalid status".to_string()))?,
- spent_by: row
- .try_get::<Option<String>, usize>(6)
- .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?
- .map(|s| s.as_str().try_into())
- .transpose()
- .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?,
- })
- }
- }
- #[async_trait::async_trait]
- impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
- async fn begin(&'a self) -> Result<Batch<'a>, Error> {
- self.db
- .begin()
- .await
- .map(|x| Batch::new(x))
- .map_err(|x| Error::Storage(x.to_string()))
- }
- async fn get_payment(&self, id: PaymentId) -> Result<Payment, Error> {
- let mut conn = self
- .db
- .acquire()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- let row = sqlx::query(
- r#"
- SELECT
- "p"."transaction_id",
- "p"."position_id",
- "p"."asset_id",
- "p"."cents",
- "p"."to",
- "p"."status",
- "p"."spent_by"
- FROM
- "payments" "p"
- WHERE
- "p"."transaction_id" = ?
- AND "p"."position_id" = ?
- LIMIT 1
- "#,
- )
- .bind(id.transaction.to_string())
- .bind(id.position.to_string())
- .fetch_optional(&mut *conn)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?
- .ok_or(Error::NotFound)?;
- self.sql_row_to_payment(row)
- }
- async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
- let mut conn = self
- .db
- .acquire()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- let mut result = sqlx::query(
- r#"
- SELECT
- "asset_id",
- "cents"
- FROM
- "payments"
- WHERE
- "to" = ? AND "spent_by" IS NULL AND status = ?
- "#,
- )
- .bind(account.to_string())
- .bind::<u32>(Status::Settled.into())
- .fetch(&mut *conn);
- let mut balances = HashMap::<Asset, Amount>::new();
- while let Some(row) = result
- .try_next()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?
- {
- let asset = self
- .asset_manager
- .asset(
- row.try_get::<String, usize>(0)
- .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?
- .parse()
- .map_err(|_| Error::Storage("Invalid asset_id".to_string()))?,
- )
- .map_err(|e| Error::Storage(e.to_string()))?;
- let cents = row
- .try_get::<String, usize>(1)
- .map_err(|_| Error::Storage("Invalid cents".to_string()))?
- .parse::<i128>()
- .map_err(|_| Error::Storage("Invalid cents".to_string()))?;
- let new_amount = asset.new_amount(cents);
- if let Some(amount) = balances.get_mut(&asset) {
- *amount = amount
- .checked_add(&new_amount)
- .ok_or(Error::Storage("amount overflow".to_owned()))?;
- } else {
- balances.insert(asset, new_amount);
- }
- }
- Ok(balances.into_iter().map(|(_, v)| v).collect())
- }
- async fn get_unspent_payments(
- &self,
- account: &AccountId,
- asset: AssetId,
- mut target_amount: AmountCents,
- ) -> Result<Vec<Payment>, Error> {
- let mut conn = self
- .db
- .acquire()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- let mut result = sqlx::query(
- r#"
- SELECT
- "p"."transaction_id",
- "p"."position_id",
- "p"."asset_id",
- "p"."cents",
- "p"."to",
- "p"."status",
- "p"."spent_by"
- FROM
- "payments" as "p"
- WHERE
- "p"."to" = ? AND "p"."asset_id" = ? AND status = ? AND "p"."spent_by" IS NULL
- ORDER BY cents ASC
- "#,
- )
- .bind(account.to_string())
- .bind(asset.to_string())
- .bind::<u32>(Status::Settled.into())
- .fetch(&mut *conn);
- let mut to_return = vec![];
- while let Some(row) = result
- .try_next()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?
- {
- let row = self.sql_row_to_payment(row)?;
- target_amount -= row.amount.cents();
- to_return.push(row);
- if target_amount <= 0 {
- break;
- }
- }
- if target_amount <= 0 {
- Ok(to_return)
- } else {
- Err(Error::NotEnoughUnspentPayments(target_amount))
- }
- }
- async fn get_transaction(
- &self,
- transaction_id: &TransactionId,
- ) -> Result<from_db::Transaction, Error> {
- let mut conn = self
- .db
- .acquire()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- let transaction_row = sqlx::query(
- r#"
- SELECT
- "t"."status",
- "t"."type",
- "t"."reference",
- "t"."created_at",
- "t"."updated_at"
- FROM
- "transactions" "t"
- WHERE
- "t"."transaction_id" = ?
- "#,
- )
- .bind(transaction_id.to_string())
- .fetch_optional(&mut *conn)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?
- .ok_or(Error::NotFound)?;
- let mut spend_result = sqlx::query(
- r#"
- SELECT
- "p"."transaction_id",
- "p"."position_id",
- "p"."asset_id",
- "p"."cents",
- "p"."to",
- "p"."status",
- "p"."spent_by"
- FROM
- "transaction_input_payments" "tp"
- INNER JOIN
- "payments" "p"
- ON (
- "tp"."payment_transaction_id" = "p"."transaction_id"
- AND "tp"."payment_position_id" = "p"."position_id"
- )
- WHERE
- "tp"."transaction_id" = ?
- "#,
- )
- .bind(transaction_id.to_string())
- .fetch(&mut *conn);
- let mut spend = vec![];
- while let Some(row) = spend_result
- .try_next()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?
- {
- spend.push(self.sql_row_to_payment(row)?);
- }
- drop(spend_result);
- let mut create_result = sqlx::query(
- r#"
- SELECT
- "p"."transaction_id",
- "p"."position_id",
- "p"."asset_id",
- "p"."cents",
- "p"."to",
- "p"."status",
- "p"."spent_by"
- FROM
- "payments" "p"
- WHERE
- "p"."transaction_id" = ?
- "#,
- )
- .bind(transaction_id.to_string())
- .fetch(&mut *conn);
- let mut create = vec![];
- while let Some(row) = create_result
- .try_next()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?
- {
- create.push(self.sql_row_to_payment(row)?);
- }
- let status = transaction_row
- .try_get::<u32, usize>(0)
- .map_err(|_| Error::Storage("Invalid status".to_string()))?
- .try_into()
- .map_err(|_| Error::Storage("Invalid status".to_string()))?;
- let typ = transaction_row
- .try_get::<u32, usize>(1)
- .map_err(|_| Error::Storage("Invalid type".to_string()))?
- .try_into()
- .map_err(|_| Error::Storage("Invalid type".to_string()))?;
- let reference = transaction_row
- .try_get::<String, usize>(2)
- .map_err(|_| Error::Storage("Invalid reference".to_string()))?;
- let created_at = transaction_row
- .try_get::<NaiveDateTime, usize>(3)
- .map_err(|e| Error::InvalidDate(e.to_string()))?
- .and_utc();
- let updated_at = transaction_row
- .try_get::<NaiveDateTime, usize>(4)
- .map_err(|e| Error::InvalidDate(e.to_string()))?
- .and_utc();
- Ok(from_db::Transaction {
- id: transaction_id.clone(),
- spend,
- create,
- typ,
- status,
- reference,
- created_at,
- updated_at,
- })
- }
- async fn get_transactions(
- &self,
- account: &AccountId,
- typ: Option<Type>,
- ) -> Result<Vec<from_db::Transaction>, Error> {
- let mut conn = self
- .db
- .acquire()
- .await
- .map_err(|e| Error::Storage(e.to_string()))?;
- let sql = sqlx::query(if typ.is_some() {
- r#"SELECT "transaction_id" FROM "transaction_accounts" WHERE "account_id" = ? AND "type" = ?" ORDER BY "created_at" DESC"#
- } else {
- r#"SELECT "transaction_id" FROM "transaction_accounts" WHERE "account_id" = ? ORDER BY "created_at" DESC"#
- }).bind(account.to_string());
- let ids = if let Some(typ) = typ {
- sql.bind::<u32>(typ.into())
- } else {
- sql
- }
- .fetch_all(&mut *conn)
- .await
- .map_err(|e| Error::Storage(e.to_string()))?
- .iter()
- .map(|row| {
- let id: Result<TransactionId, _> = row
- .try_get::<String, usize>(0)
- .map_err(|_| Error::Storage("Invalid transaction_id".to_string()))?
- .as_str()
- .try_into();
- id.map_err(|_| Error::Storage("Invalid transaction_id length".to_string()))
- })
- .collect::<Result<Vec<_>, Error>>()?;
- drop(conn);
- let mut transactions = vec![];
- for id in ids.into_iter() {
- transactions.push(self.get_transaction(&id).await?);
- }
- Ok(transactions)
- }
- }
|