| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806 |
- //! SQL-backed Store implementation for SQLite and PostgreSQL.
- //!
- //! Uses `sqlx::Any` for database-agnostic queries. Enable features
- //! `sqlite` or `postgres` to select the backend.
- //!
- //! ```text
- //! let pool = sqlx::any::Pool<Any>Options::new()
- //! .connect("sqlite::memory:").await?;
- //! let store = SqlStore::new(pool);
- //! store.migrate().await?;
- //! ```
- use std::collections::HashSet;
- use async_trait::async_trait;
- use sqlx::{Any, Pool, Row};
- use kuatia_storage::error::StoreError;
- use kuatia_storage::events::{EventStore, LedgerEvent};
- use kuatia_storage::store::*;
- use kuatia_types::*;
- /// SQL-backed [`Store`] implementation.
- pub struct SqlStore {
- pool: Pool<Any>,
- autoid: kuatia_types::autoid::AutoId,
- }
- impl SqlStore {
- /// Create a new SQL store wrapping an existing connection pool.
- pub fn new(pool: Pool<Any>) -> Self {
- Self {
- pool,
- autoid: kuatia_types::autoid::AutoId::new(),
- }
- }
- /// Run database migrations.
- pub async fn migrate(&self) -> Result<(), StoreError> {
- for sql in [
- include_str!("migrations/001_init.sql"),
- include_str!("migrations/002_timestamps_and_columns.sql"),
- include_str!("migrations/003_events.sql"),
- ] {
- for statement in sql.split(';') {
- let trimmed = statement.trim();
- if !trimmed.is_empty() {
- sqlx::query(trimmed)
- .execute(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- }
- }
- }
- Ok(())
- }
- }
- // ---------------------------------------------------------------------------
- // Serialization helpers
- // ---------------------------------------------------------------------------
- fn serialize_policy(policy: &AccountPolicy) -> Result<String, StoreError> {
- serde_json::to_string(policy)
- .map_err(|e| StoreError::Internal(format!("policy serialization: {e}")))
- }
- fn deserialize_policy(s: &str) -> Result<AccountPolicy, StoreError> {
- serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
- }
- fn serialize_blob<T: serde::Serialize>(val: &T) -> Result<Vec<u8>, StoreError> {
- serde_json::to_vec(val).map_err(|e| StoreError::Internal(format!("blob serialization: {e}")))
- }
- fn deserialize_blob<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T, StoreError> {
- serde_json::from_slice(bytes).map_err(|e| StoreError::Internal(format!("bad blob: {e}")))
- }
- fn status_to_i16(s: PostingStatus) -> i16 {
- match s {
- PostingStatus::Active => 0,
- PostingStatus::PendingInactive => 1,
- PostingStatus::Inactive => 2,
- }
- }
- fn status_from_i16(v: i16) -> Result<PostingStatus, StoreError> {
- match v {
- 0 => Ok(PostingStatus::Active),
- 1 => Ok(PostingStatus::PendingInactive),
- 2 => Ok(PostingStatus::Inactive),
- _ => Err(StoreError::Internal(format!("bad posting status: {v}"))),
- }
- }
- fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
- let id: i64 = row
- .try_get("id")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let version: i64 = row
- .try_get("version")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let policy_str: String = row
- .try_get("policy")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let flags_bits: i32 = row
- .try_get("flags")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let book: i32 = row
- .try_get("book")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let code: i32 = row
- .try_get("code")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let user_data_bytes: Vec<u8> = row
- .try_get("user_data")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let metadata_bytes: Vec<u8> = row
- .try_get("metadata")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(Account {
- id: AccountId::new(id),
- version: version as u64,
- policy: deserialize_policy(&policy_str)?,
- flags: AccountFlags::from_bits_truncate(flags_bits as u32),
- book: book as u32,
- code: code as u32,
- user_data: deserialize_blob(&user_data_bytes)?,
- metadata: deserialize_blob(&metadata_bytes)?,
- })
- }
- fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
- let transfer_id: Vec<u8> = row
- .try_get("transfer_id")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let idx: i16 = row
- .try_get("idx")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let owner: i64 = row
- .try_get("owner")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let asset: i32 = row
- .try_get("asset")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let value: i64 = row
- .try_get("value")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let status: i16 = row
- .try_get("status")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let mut tid = [0u8; 32];
- tid.copy_from_slice(&transfer_id);
- Ok(Posting {
- id: PostingId {
- transfer: EnvelopeId(tid),
- index: idx as u16,
- },
- owner: AccountId::new(owner),
- asset: AssetId::new(asset as u32),
- value: Cent::from(value),
- status: status_from_i16(status)?,
- })
- }
- // ---------------------------------------------------------------------------
- // AccountStore
- // ---------------------------------------------------------------------------
- #[async_trait]
- impl AccountStore for SqlStore {
- async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError> {
- let row = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
- .bind(id.0)
- .fetch_optional(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?
- .ok_or_else(|| StoreError::NotFound(format!("account {id:?}")))?;
- row_to_account(&row)
- }
- async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError> {
- let mut result = Vec::with_capacity(ids.len());
- for id in ids {
- result.push(self.get_account(id).await?);
- }
- Ok(result)
- }
- async fn create_account(&self, account: Account) -> Result<(), StoreError> {
- let exists = sqlx::query("SELECT 1 FROM accounts WHERE id = $1 LIMIT 1")
- .bind(account.id.0)
- .fetch_optional(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- if exists.is_some() {
- return Err(StoreError::AlreadyExists(format!(
- "account {:?}",
- account.id
- )));
- }
- sqlx::query(
- "INSERT INTO accounts (id, version, policy, flags, book, code, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
- )
- .bind(account.id.0)
- .bind(account.version as i64)
- .bind(serialize_policy(&account.policy)?)
- .bind(account.flags.bits() as i32)
- .bind(account.book as i32)
- .bind(account.code as i32)
- .bind(serialize_blob(&account.user_data)?)
- .bind(serialize_blob(&account.metadata)?)
- .execute(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- async fn append_account_version(&self, account: Account) -> Result<(), StoreError> {
- let current =
- sqlx::query("SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
- .bind(account.id.0)
- .fetch_optional(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?
- .ok_or_else(|| StoreError::NotFound(format!("account {:?}", account.id)))?;
- let current_version: i64 = current
- .try_get("version")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let expected = current_version
- .checked_add(1)
- .ok_or_else(|| StoreError::Internal("account version overflow".to_string()))?;
- if account.version as i64 != expected {
- return Err(StoreError::VersionConflict {
- account: account.id,
- expected: expected as u64,
- actual: account.version,
- });
- }
- sqlx::query(
- "INSERT INTO accounts (id, version, policy, flags, book, code, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
- )
- .bind(account.id.0)
- .bind(account.version as i64)
- .bind(serialize_policy(&account.policy)?)
- .bind(account.flags.bits() as i32)
- .bind(account.book as i32)
- .bind(account.code as i32)
- .bind(serialize_blob(&account.user_data)?)
- .bind(serialize_blob(&account.metadata)?)
- .execute(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError> {
- let rows = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version ASC")
- .bind(id.0)
- .fetch_all(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- if rows.is_empty() {
- return Err(StoreError::NotFound(format!("account {id:?}")));
- }
- rows.iter().map(row_to_account).collect()
- }
- async fn list_accounts(&self) -> Result<Vec<Account>, StoreError> {
- let rows = sqlx::query("SELECT * FROM accounts ORDER BY id, version DESC")
- .fetch_all(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let mut accounts: Vec<Account> = rows.iter().map(row_to_account).collect::<Result<_, _>>()?;
- accounts.dedup_by_key(|a| a.id);
- Ok(accounts)
- }
- }
- // ---------------------------------------------------------------------------
- // PostingStore
- // ---------------------------------------------------------------------------
- #[async_trait]
- impl PostingStore for SqlStore {
- async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError> {
- let mut result = Vec::with_capacity(ids.len());
- for id in ids {
- let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
- .bind(id.transfer.0.as_slice())
- .bind(id.index as i16)
- .fetch_optional(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?
- .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
- result.push(row_to_posting(&row)?);
- }
- Ok(result)
- }
- async fn get_postings_by_account(
- &self,
- account: &AccountId,
- asset: Option<&AssetId>,
- status: Option<PostingStatus>,
- ) -> Result<Vec<Posting>, StoreError> {
- let rows = match (asset, status) {
- (Some(a), Some(s)) => {
- sqlx::query(
- "SELECT * FROM postings WHERE owner = $1 AND asset = $2 AND status = $3",
- )
- .bind(account.0)
- .bind(a.0 as i32)
- .bind(status_to_i16(s))
- .fetch_all(&self.pool)
- .await
- }
- (Some(a), None) => {
- sqlx::query("SELECT * FROM postings WHERE owner = $1 AND asset = $2")
- .bind(account.0)
- .bind(a.0 as i32)
- .fetch_all(&self.pool)
- .await
- }
- (None, Some(s)) => {
- sqlx::query("SELECT * FROM postings WHERE owner = $1 AND status = $2")
- .bind(account.0)
- .bind(status_to_i16(s))
- .fetch_all(&self.pool)
- .await
- }
- (None, None) => {
- sqlx::query("SELECT * FROM postings WHERE owner = $1")
- .bind(account.0)
- .fetch_all(&self.pool)
- .await
- }
- }
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- rows.iter().map(row_to_posting).collect()
- }
- async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
- let (where_clause, count_clause) = {
- let mut w = String::from("WHERE owner = $1");
- let mut idx = 2u32;
- if query.asset.is_some() {
- w.push_str(&format!(" AND asset = ${idx}"));
- idx += 1;
- }
- if query.status.is_some() {
- w.push_str(&format!(" AND status = ${idx}"));
- }
- let c = format!("SELECT COUNT(*) as cnt FROM postings {w}");
- let limit = query.limit.unwrap_or(u32::MAX);
- let offset = query.offset.unwrap_or(0);
- w.push_str(&format!(" LIMIT {limit} OFFSET {offset}"));
- (format!("SELECT * FROM postings {w}"), c)
- };
- // Build count query
- let mut count_q = sqlx::query(&count_clause).bind(query.account.0);
- if let Some(ref a) = query.asset {
- count_q = count_q.bind(a.0 as i32);
- }
- if let Some(s) = query.status {
- count_q = count_q.bind(status_to_i16(s));
- }
- let count_row = count_q
- .fetch_one(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let total: i64 = count_row
- .try_get("cnt")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- // Build data query
- let mut data_q = sqlx::query(&where_clause).bind(query.account.0);
- if let Some(ref a) = query.asset {
- data_q = data_q.bind(a.0 as i32);
- }
- if let Some(s) = query.status {
- data_q = data_q.bind(status_to_i16(s));
- }
- let rows = data_q
- .fetch_all(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let items: Vec<Posting> = rows.iter().map(row_to_posting).collect::<Result<_, _>>()?;
- Ok(Page {
- items,
- total: total as u64,
- })
- }
- async fn reserve_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
- // Validate all Active first, then update in a transaction.
- let mut tx = self
- .pool
- .begin()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- for id in ids {
- let row =
- sqlx::query("SELECT status FROM postings WHERE transfer_id = $1 AND idx = $2")
- .bind(id.transfer.0.as_slice())
- .bind(id.index as i16)
- .fetch_optional(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?
- .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
- let status: i16 = row
- .try_get("status")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- if status != 0 {
- return Err(StoreError::PostingNotActive(*id));
- }
- }
- for id in ids {
- sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3")
- .bind(status_to_i16(PostingStatus::PendingInactive))
- .bind(id.transfer.0.as_slice())
- .bind(id.index as i16)
- .execute(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- }
- tx.commit()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- async fn release_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
- let mut tx = self
- .pool
- .begin()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- for id in ids {
- let row =
- sqlx::query("SELECT status FROM postings WHERE transfer_id = $1 AND idx = $2")
- .bind(id.transfer.0.as_slice())
- .bind(id.index as i16)
- .fetch_optional(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?
- .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
- let status: i16 = row
- .try_get("status")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- if status == 2 {
- return Err(StoreError::PostingInactive(*id));
- }
- }
- for id in ids {
- sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3 AND status = $4")
- .bind(status_to_i16(PostingStatus::Active))
- .bind(id.transfer.0.as_slice())
- .bind(id.index as i16)
- .bind(status_to_i16(PostingStatus::PendingInactive))
- .execute(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- }
- tx.commit()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- async fn finalize_postings(
- &self,
- deactivate: &[PostingId],
- create: &[Posting],
- ) -> Result<(), StoreError> {
- let mut tx = self
- .pool
- .begin()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- for id in deactivate {
- sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3")
- .bind(status_to_i16(PostingStatus::Inactive))
- .bind(id.transfer.0.as_slice())
- .bind(id.index as i16)
- .execute(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- }
- for posting in create {
- sqlx::query(
- "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6)"
- )
- .bind(posting.id.transfer.0.as_slice())
- .bind(posting.id.index as i16)
- .bind(posting.owner.0)
- .bind(posting.asset.0 as i32)
- .bind(posting.value.value())
- .bind(status_to_i16(posting.status))
- .execute(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- }
- tx.commit()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- }
- // ---------------------------------------------------------------------------
- // TransferStore
- // ---------------------------------------------------------------------------
- #[async_trait]
- impl TransferStore for SqlStore {
- async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
- let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
- .bind(id.0.as_slice())
- .fetch_optional(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- match row {
- None => Ok(None),
- Some(row) => {
- let transfer_bytes: Vec<u8> = row
- .try_get("transfer")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let receipt_bytes: Vec<u8> = row
- .try_get("receipt")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let created_at: i64 = row
- .try_get("created_at")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(Some(EnvelopeRecord {
- envelope: deserialize_blob(&transfer_bytes)?,
- receipt: deserialize_blob(&receipt_bytes)?,
- created_at,
- }))
- }
- }
- }
- async fn store_transfer(&self, record: EnvelopeRecord) -> Result<(), StoreError> {
- let tid = record.receipt.transfer_id;
- let transfer_bytes = serialize_blob(&record.envelope)?;
- let receipt_bytes = serialize_blob(&record.receipt)?;
- let mut tx = self
- .pool
- .begin()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book, code) VALUES ($1, $2, $3, $4, $5, $6)")
- .bind(tid.0.as_slice())
- .bind(&transfer_bytes)
- .bind(&receipt_bytes)
- .bind(record.created_at)
- .bind(record.envelope.book() as i32)
- .bind(record.envelope.code() as i32)
- .execute(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- // Populate transfer_accounts join table
- let mut account_ids: HashSet<i64> = HashSet::new();
- for np in record.envelope.creates() {
- account_ids.insert(np.owner.0);
- }
- for aid in &account_ids {
- sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2)")
- .bind(tid.0.as_slice())
- .bind(*aid)
- .execute(&mut *tx)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- }
- tx.commit()
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- async fn get_transfers_for_account(
- &self,
- account: &AccountId,
- ) -> Result<Vec<EnvelopeRecord>, StoreError> {
- let rows = sqlx::query(
- "SELECT t.id, t.transfer, t.receipt, t.created_at FROM transfers t INNER JOIN transfer_accounts ta ON t.id = ta.transfer_id WHERE ta.account_id = $1 ORDER BY t.created_at"
- )
- .bind(account.0)
- .fetch_all(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let mut result = Vec::with_capacity(rows.len());
- for row in &rows {
- let transfer_bytes: Vec<u8> = row
- .try_get("transfer")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let receipt_bytes: Vec<u8> = row
- .try_get("receipt")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let created_at: i64 = row
- .try_get("created_at")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- result.push(EnvelopeRecord {
- envelope: deserialize_blob(&transfer_bytes)?,
- receipt: deserialize_blob(&receipt_bytes)?,
- created_at,
- });
- }
- Ok(result)
- }
- async fn query_transfers(
- &self,
- query: &TransferQuery,
- ) -> Result<Page<EnvelopeRecord>, StoreError> {
- // Load base records, using the account join when available.
- let base_records = if let Some(ref account) = query.account {
- self.get_transfers_for_account(account).await?
- } else {
- let rows = sqlx::query(
- "SELECT transfer, receipt, created_at FROM transfers ORDER BY created_at",
- )
- .fetch_all(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let mut records = Vec::with_capacity(rows.len());
- for row in &rows {
- let transfer_bytes: Vec<u8> = row
- .try_get("transfer")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let receipt_bytes: Vec<u8> = row
- .try_get("receipt")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let created_at: i64 = row
- .try_get("created_at")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- records.push(EnvelopeRecord {
- envelope: deserialize_blob(&transfer_bytes)?,
- receipt: deserialize_blob(&receipt_bytes)?,
- created_at,
- });
- }
- records
- };
- // Filter in memory for remaining conditions.
- let filtered: Vec<EnvelopeRecord> = base_records
- .into_iter()
- .filter(|r| {
- if let Some(from) = query.from_ts
- && r.created_at < from
- {
- return false;
- }
- if let Some(to) = query.to_ts
- && r.created_at >= to
- {
- return false;
- }
- if let Some(book) = query.book
- && r.envelope.book() != book
- {
- return false;
- }
- if let Some(code) = query.code
- && r.envelope.code() != code
- {
- return false;
- }
- true
- })
- .collect();
- let total = filtered.len() as u64;
- let offset = query.offset.unwrap_or(0) as usize;
- let limit = query.limit.unwrap_or(u32::MAX) as usize;
- let items = filtered.into_iter().skip(offset).take(limit).collect();
- Ok(Page { items, total })
- }
- }
- // ---------------------------------------------------------------------------
- // SagaStore
- // ---------------------------------------------------------------------------
- #[async_trait]
- impl SagaStore for SqlStore {
- async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError> {
- sqlx::query("INSERT OR REPLACE INTO sagas (id, data) VALUES ($1, $2)")
- .bind(*id)
- .bind(&data)
- .execute(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError> {
- let rows = sqlx::query("SELECT id, data FROM sagas")
- .fetch_all(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let mut result = Vec::with_capacity(rows.len());
- for row in &rows {
- let id: i64 = row
- .try_get("id")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let data: Vec<u8> = row
- .try_get("data")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- result.push((id, data));
- }
- Ok(result)
- }
- async fn delete_saga(&self, id: &i64) -> Result<(), StoreError> {
- sqlx::query("DELETE FROM sagas WHERE id = $1")
- .bind(*id)
- .execute(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(())
- }
- }
- // ---------------------------------------------------------------------------
- // EventStore
- // ---------------------------------------------------------------------------
- #[async_trait]
- impl EventStore for SqlStore {
- async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
- let kind_str =
- serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
- let data = serialize_blob(event)?;
- let seq = self.autoid.next() as u64;
- sqlx::query("INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)")
- .bind(seq as i64)
- .bind(event.timestamp)
- .bind(&kind_str)
- .bind(&data)
- .execute(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- Ok(seq)
- }
- async fn get_events_since(
- &self,
- after_seq: u64,
- limit: u32,
- ) -> Result<Vec<LedgerEvent>, StoreError> {
- let rows = sqlx::query("SELECT seq, data FROM events WHERE seq > $1 ORDER BY seq LIMIT $2")
- .bind(after_seq as i64)
- .bind(limit as i32)
- .fetch_all(&self.pool)
- .await
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let mut events = Vec::with_capacity(rows.len());
- for row in &rows {
- let seq: i64 = row
- .try_get("seq")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let data: Vec<u8> = row
- .try_get("data")
- .map_err(|e| StoreError::Internal(e.to_string()))?;
- let mut event: LedgerEvent = deserialize_blob(&data)?;
- event.seq = seq as u64;
- events.push(event);
- }
- Ok(events)
- }
- }
|