store.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. //! Storage abstraction separating the pure decision logic from IO.
  2. //!
  3. //! The [`Store`] trait composes focused sub-traits, each a dumb instruction
  4. //! follower: writes apply one update and report an affected-row count (or an I/O
  5. //! error). The saga, not the store, interprets counts and owns idempotency and
  6. //! compensation.
  7. //! - [`AccountStore`] — account CRUD and versioning
  8. //! - [`PostingStore`] — posting reads and lifecycle transitions
  9. //! - [`TransferStore`] — transfer persistence and queries
  10. //! - [`SagaStore`] — saga state for crash recovery
  11. //! - [`EventStore`] — the ledger event log
  12. //! - [`BookStore`] — book persistence
  13. use async_trait::async_trait;
  14. use kuatia_types::{
  15. Account, AccountId, AssetId, Book, BookId, Envelope, EnvelopeId, Posting, PostingId,
  16. PostingStatus, Receipt, ReservationId,
  17. };
  18. use crate::error::StoreError;
  19. use crate::events::EventStore;
  20. /// Pairs a committed transfer with its receipt.
  21. #[derive(Debug, Clone)]
  22. pub struct EnvelopeRecord {
  23. /// The envelope that was committed.
  24. pub envelope: Envelope,
  25. /// The receipt proving commitment.
  26. pub receipt: Receipt,
  27. /// Unix milliseconds when this record was created.
  28. pub created_at: i64,
  29. }
  30. /// Pagination and filtering parameters for posting queries.
  31. #[derive(Debug, Clone)]
  32. pub struct PostingQuery {
  33. /// Filter to postings owned by this account.
  34. pub account: AccountId,
  35. /// Filter by asset.
  36. pub asset: Option<AssetId>,
  37. /// Filter by posting status.
  38. pub status: Option<PostingStatus>,
  39. /// Max results to return.
  40. pub limit: Option<u32>,
  41. /// Number of results to skip.
  42. pub offset: Option<u32>,
  43. }
  44. /// Pagination and filtering parameters for transfer queries.
  45. #[derive(Debug, Clone, Default)]
  46. pub struct TransferQuery {
  47. /// Filter to transfers involving this account.
  48. pub account: Option<AccountId>,
  49. /// Inclusive lower bound (unix millis).
  50. pub from_ts: Option<i64>,
  51. /// Exclusive upper bound (unix millis).
  52. pub to_ts: Option<i64>,
  53. /// Filter by book.
  54. pub book: Option<BookId>,
  55. /// Max results to return.
  56. pub limit: Option<u32>,
  57. /// Number of results to skip.
  58. pub offset: Option<u32>,
  59. }
  60. /// A page of results with total count for pagination.
  61. #[derive(Debug, Clone)]
  62. pub struct Page<T> {
  63. /// The items in this page.
  64. pub items: Vec<T>,
  65. /// Total number of matching items (before pagination).
  66. pub total: u64,
  67. }
  68. // ---------------------------------------------------------------------------
  69. // Sub-traits
  70. // ---------------------------------------------------------------------------
  71. /// Account persistence: create, version, query.
  72. #[async_trait]
  73. pub trait AccountStore: Send + Sync {
  74. /// Fetch a single account by id.
  75. async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError>;
  76. /// Fetch multiple accounts by id.
  77. async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError>;
  78. /// Persist a new account (version 1).
  79. async fn create_account(&self, account: Account) -> Result<(), StoreError>;
  80. /// Append a new version to an existing account.
  81. async fn append_account_version(&self, account: Account) -> Result<(), StoreError>;
  82. /// Return the full version history for an account.
  83. async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError>;
  84. /// List all accounts (latest version of each).
  85. async fn list_accounts(&self) -> Result<Vec<Account>, StoreError>;
  86. }
  87. /// Posting persistence: reads and lifecycle transitions.
  88. #[async_trait]
  89. pub trait PostingStore: Send + Sync {
  90. /// Fetch postings by their ids.
  91. async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError>;
  92. /// Return postings owned by an account, optionally filtered by asset and/or status.
  93. async fn get_postings_by_account(
  94. &self,
  95. account: &AccountId,
  96. asset: Option<&AssetId>,
  97. status: Option<PostingStatus>,
  98. ) -> Result<Vec<Posting>, StoreError>;
  99. /// Reserve postings: `Active → PendingInactive`, stamping `reservation` as
  100. /// the owner token. A dumb instruction — each id flips only if still `Active`;
  101. /// returns the **number of rows reserved** (0 ≤ n ≤ ids.len()). It does not
  102. /// error on a short count; the caller (saga) interprets it.
  103. async fn reserve_postings(
  104. &self,
  105. ids: &[PostingId],
  106. reservation: ReservationId,
  107. ) -> Result<u64, StoreError>;
  108. /// Release postings: `PendingInactive` owned by `reservation` → `Active`,
  109. /// clearing the owner. A dumb instruction — only postings reserved by this
  110. /// `reservation` flip; returns the **number of rows released**. Releasing an
  111. /// `Active` (already released) or differently-owned posting simply does not
  112. /// count. The caller interprets the result.
  113. async fn release_postings(
  114. &self,
  115. ids: &[PostingId],
  116. reservation: ReservationId,
  117. ) -> Result<u64, StoreError>;
  118. /// Deactivate postings: flip to `Inactive`. A dumb instruction — it applies
  119. /// the conditional update and returns the **number of rows changed**; it does
  120. /// not decide whether that count is correct. The caller (saga) interprets it.
  121. /// - `reservation == None` (raw): only postings still `Active` flip.
  122. /// - `reservation == Some(rid)`: only postings `PendingInactive` owned by
  123. /// `rid` flip.
  124. /// Returns the count of postings actually transitioned (0 ≤ n ≤ ids.len()).
  125. async fn deactivate_postings(
  126. &self,
  127. ids: &[PostingId],
  128. reservation: Option<ReservationId>,
  129. ) -> Result<u64, StoreError>;
  130. /// Insert postings if absent (idempotent). A dumb instruction — inserts each
  131. /// posting unless one with the same id already exists, and returns the
  132. /// **number of rows inserted** (already-present postings contribute 0). The
  133. /// caller decides what a short count means.
  134. async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError>;
  135. /// Query postings with filtering and pagination.
  136. async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
  137. let all = self
  138. .get_postings_by_account(&query.account, query.asset.as_ref(), query.status)
  139. .await?;
  140. let total = all.len() as u64;
  141. let offset = query.offset.unwrap_or(0) as usize;
  142. let limit = query.limit.unwrap_or(u32::MAX) as usize;
  143. let items = all.into_iter().skip(offset).take(limit).collect();
  144. Ok(Page { items, total })
  145. }
  146. }
  147. /// Transfer persistence: store and query committed transfers.
  148. #[async_trait]
  149. pub trait TransferStore: Send + Sync {
  150. /// Fetch a transfer record by its content-addressed id.
  151. async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError>;
  152. /// Persist a transfer record if absent (idempotent) and index it under every
  153. /// account in `involved` (both created and consumed owners — the caller
  154. /// supplies the set so storage computes nothing). A dumb instruction:
  155. /// returns **1** if the transfer row was newly inserted, **0** if it already
  156. /// existed. The caller decides what `0` means.
  157. async fn store_transfer(
  158. &self,
  159. record: EnvelopeRecord,
  160. involved: &[AccountId],
  161. ) -> Result<u64, StoreError>;
  162. /// Return all transfers involving the given account.
  163. async fn get_transfers_for_account(
  164. &self,
  165. account: &AccountId,
  166. ) -> Result<Vec<EnvelopeRecord>, StoreError>;
  167. /// Query transfers with filtering and pagination.
  168. async fn query_transfers(
  169. &self,
  170. query: &TransferQuery,
  171. ) -> Result<Page<EnvelopeRecord>, StoreError> {
  172. // Default in-memory implementation
  173. let all = if let Some(ref account) = query.account {
  174. self.get_transfers_for_account(account).await?
  175. } else {
  176. return Err(StoreError::Internal(
  177. "query_transfers requires account filter in default implementation".into(),
  178. ));
  179. };
  180. let filtered: Vec<EnvelopeRecord> = all
  181. .into_iter()
  182. .filter(|r| {
  183. if let Some(from) = query.from_ts
  184. && r.created_at < from
  185. {
  186. return false;
  187. }
  188. if let Some(to) = query.to_ts
  189. && r.created_at >= to
  190. {
  191. return false;
  192. }
  193. if let Some(book) = query.book
  194. && r.envelope.book() != book
  195. {
  196. return false;
  197. }
  198. true
  199. })
  200. .collect();
  201. let total = filtered.len() as u64;
  202. let offset = query.offset.unwrap_or(0) as usize;
  203. let limit = query.limit.unwrap_or(u32::MAX) as usize;
  204. let items = filtered.into_iter().skip(offset).take(limit).collect();
  205. Ok(Page { items, total })
  206. }
  207. }
  208. /// Saga state persistence for crash recovery.
  209. #[async_trait]
  210. pub trait SagaStore: Send + Sync {
  211. /// Persist a saga execution state.
  212. async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError>;
  213. /// Load all pending (incomplete) saga states.
  214. async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError>;
  215. /// Delete a completed saga state.
  216. async fn delete_saga(&self, id: &i64) -> Result<(), StoreError>;
  217. }
  218. /// Book persistence.
  219. #[async_trait]
  220. pub trait BookStore: Send + Sync {
  221. /// Create a new book.
  222. async fn create_book(&self, book: Book) -> Result<(), StoreError>;
  223. /// Fetch a book by id.
  224. async fn get_book(&self, id: &BookId) -> Result<Book, StoreError>;
  225. /// List all books.
  226. async fn list_books(&self) -> Result<Vec<Book>, StoreError>;
  227. }
  228. // ---------------------------------------------------------------------------
  229. // Composite trait
  230. // ---------------------------------------------------------------------------
  231. /// Async storage abstraction composing all sub-traits.
  232. pub trait Store:
  233. AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore
  234. {
  235. }
  236. impl<T: AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore> Store
  237. for T
  238. {
  239. }