| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- //! Storage abstraction separating the pure decision logic from IO.
- //!
- //! The [`Store`] trait composes focused sub-traits, each a dumb instruction
- //! follower: writes apply one update and report an affected-row count (or an I/O
- //! error). The saga, not the store, interprets counts and owns idempotency and
- //! compensation.
- //! - [`AccountStore`] — account CRUD and versioning
- //! - [`PostingStore`] — posting reads and lifecycle transitions
- //! - [`TransferStore`] — transfer persistence and queries
- //! - [`SagaStore`] — saga state for crash recovery
- //! - [`EventStore`] — the ledger event log
- //! - [`BookStore`] — book persistence
- use async_trait::async_trait;
- use kuatia_types::{
- Account, AccountId, AssetId, Book, BookId, Envelope, EnvelopeId, Posting, PostingId,
- PostingStatus, Receipt, ReservationId,
- };
- use crate::error::StoreError;
- use crate::events::EventStore;
- /// Pairs a committed transfer with its receipt.
- #[derive(Debug, Clone)]
- pub struct EnvelopeRecord {
- /// The envelope that was committed.
- pub envelope: Envelope,
- /// The receipt proving commitment.
- pub receipt: Receipt,
- /// Unix milliseconds when this record was created.
- pub created_at: i64,
- }
- /// Pagination and filtering parameters for posting queries.
- #[derive(Debug, Clone)]
- pub struct PostingQuery {
- /// Filter to postings owned by this account.
- pub account: AccountId,
- /// Filter by asset.
- pub asset: Option<AssetId>,
- /// Filter by posting status.
- pub status: Option<PostingStatus>,
- /// Max results to return.
- pub limit: Option<u32>,
- /// Number of results to skip.
- pub offset: Option<u32>,
- }
- /// Pagination and filtering parameters for transfer queries.
- #[derive(Debug, Clone, Default)]
- pub struct TransferQuery {
- /// Filter to transfers involving this account.
- pub account: Option<AccountId>,
- /// Inclusive lower bound (unix millis).
- pub from_ts: Option<i64>,
- /// Exclusive upper bound (unix millis).
- pub to_ts: Option<i64>,
- /// Filter by book.
- pub book: Option<BookId>,
- /// Max results to return.
- pub limit: Option<u32>,
- /// Number of results to skip.
- pub offset: Option<u32>,
- }
- /// A page of results with total count for pagination.
- #[derive(Debug, Clone)]
- pub struct Page<T> {
- /// The items in this page.
- pub items: Vec<T>,
- /// Total number of matching items (before pagination).
- pub total: u64,
- }
- // ---------------------------------------------------------------------------
- // Sub-traits
- // ---------------------------------------------------------------------------
- /// Account persistence: create, version, query.
- #[async_trait]
- pub trait AccountStore: Send + Sync {
- /// Fetch a single account by id.
- async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError>;
- /// Fetch multiple accounts by id.
- async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError>;
- /// Persist a new account (version 1).
- async fn create_account(&self, account: Account) -> Result<(), StoreError>;
- /// Append a new version to an existing account.
- async fn append_account_version(&self, account: Account) -> Result<(), StoreError>;
- /// Return the full version history for an account.
- async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError>;
- /// List all accounts (latest version of each).
- async fn list_accounts(&self) -> Result<Vec<Account>, StoreError>;
- }
- /// Posting persistence: reads and lifecycle transitions.
- #[async_trait]
- pub trait PostingStore: Send + Sync {
- /// Fetch postings by their ids.
- async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError>;
- /// Return postings owned by an account, optionally filtered by asset and/or status.
- async fn get_postings_by_account(
- &self,
- account: &AccountId,
- asset: Option<&AssetId>,
- status: Option<PostingStatus>,
- ) -> Result<Vec<Posting>, StoreError>;
- /// Reserve postings: `Active → PendingInactive`, stamping `reservation` as
- /// the owner token. A dumb instruction — each id flips only if still `Active`;
- /// returns the **number of rows reserved** (0 ≤ n ≤ ids.len()). It does not
- /// error on a short count; the caller (saga) interprets it.
- async fn reserve_postings(
- &self,
- ids: &[PostingId],
- reservation: ReservationId,
- ) -> Result<u64, StoreError>;
- /// Release postings: `PendingInactive` owned by `reservation` → `Active`,
- /// clearing the owner. A dumb instruction — only postings reserved by this
- /// `reservation` flip; returns the **number of rows released**. Releasing an
- /// `Active` (already released) or differently-owned posting simply does not
- /// count. The caller interprets the result.
- async fn release_postings(
- &self,
- ids: &[PostingId],
- reservation: ReservationId,
- ) -> Result<u64, StoreError>;
- /// Deactivate postings: flip to `Inactive`. A dumb instruction — it applies
- /// the conditional update and returns the **number of rows changed**; it does
- /// not decide whether that count is correct. The caller (saga) interprets it.
- /// - `reservation == None` (raw): only postings still `Active` flip.
- /// - `reservation == Some(rid)`: only postings `PendingInactive` owned by
- /// `rid` flip.
- /// Returns the count of postings actually transitioned (0 ≤ n ≤ ids.len()).
- async fn deactivate_postings(
- &self,
- ids: &[PostingId],
- reservation: Option<ReservationId>,
- ) -> Result<u64, StoreError>;
- /// Insert postings if absent (idempotent). A dumb instruction — inserts each
- /// posting unless one with the same id already exists, and returns the
- /// **number of rows inserted** (already-present postings contribute 0). The
- /// caller decides what a short count means.
- async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError>;
- /// Query postings with filtering and pagination.
- async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
- let all = self
- .get_postings_by_account(&query.account, query.asset.as_ref(), query.status)
- .await?;
- let total = all.len() as u64;
- let offset = query.offset.unwrap_or(0) as usize;
- let limit = query.limit.unwrap_or(u32::MAX) as usize;
- let items = all.into_iter().skip(offset).take(limit).collect();
- Ok(Page { items, total })
- }
- }
- /// Transfer persistence: store and query committed transfers.
- #[async_trait]
- pub trait TransferStore: Send + Sync {
- /// Fetch a transfer record by its content-addressed id.
- async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError>;
- /// Persist a transfer record if absent (idempotent) and index it under every
- /// account in `involved` (both created and consumed owners — the caller
- /// supplies the set so storage computes nothing). A dumb instruction:
- /// returns **1** if the transfer row was newly inserted, **0** if it already
- /// existed. The caller decides what `0` means.
- async fn store_transfer(
- &self,
- record: EnvelopeRecord,
- involved: &[AccountId],
- ) -> Result<u64, StoreError>;
- /// Return all transfers involving the given account.
- async fn get_transfers_for_account(
- &self,
- account: &AccountId,
- ) -> Result<Vec<EnvelopeRecord>, StoreError>;
- /// Query transfers with filtering and pagination.
- async fn query_transfers(
- &self,
- query: &TransferQuery,
- ) -> Result<Page<EnvelopeRecord>, StoreError> {
- // Default in-memory implementation
- let all = if let Some(ref account) = query.account {
- self.get_transfers_for_account(account).await?
- } else {
- return Err(StoreError::Internal(
- "query_transfers requires account filter in default implementation".into(),
- ));
- };
- let filtered: Vec<EnvelopeRecord> = all
- .into_iter()
- .filter(|r| {
- if let Some(from) = query.from_ts
- && r.created_at < from
- {
- return false;
- }
- if let Some(to) = query.to_ts
- && r.created_at >= to
- {
- return false;
- }
- if let Some(book) = query.book
- && r.envelope.book() != book
- {
- return false;
- }
- true
- })
- .collect();
- let total = filtered.len() as u64;
- let offset = query.offset.unwrap_or(0) as usize;
- let limit = query.limit.unwrap_or(u32::MAX) as usize;
- let items = filtered.into_iter().skip(offset).take(limit).collect();
- Ok(Page { items, total })
- }
- }
- /// Saga state persistence for crash recovery.
- #[async_trait]
- pub trait SagaStore: Send + Sync {
- /// Persist a saga execution state.
- async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError>;
- /// Load all pending (incomplete) saga states.
- async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError>;
- /// Delete a completed saga state.
- async fn delete_saga(&self, id: &i64) -> Result<(), StoreError>;
- }
- /// Book persistence.
- #[async_trait]
- pub trait BookStore: Send + Sync {
- /// Create a new book.
- async fn create_book(&self, book: Book) -> Result<(), StoreError>;
- /// Fetch a book by id.
- async fn get_book(&self, id: &BookId) -> Result<Book, StoreError>;
- /// List all books.
- async fn list_books(&self) -> Result<Vec<Book>, StoreError>;
- }
- // ---------------------------------------------------------------------------
- // Composite trait
- // ---------------------------------------------------------------------------
- /// Async storage abstraction composing all sub-traits.
- pub trait Store:
- AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore
- {
- }
- impl<T: AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore> Store
- for T
- {
- }
|