lib.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. //! SQL-backed Store implementation for SQLite and PostgreSQL.
  2. //!
  3. //! Uses `sqlx::Any` for database-agnostic queries. Enable features
  4. //! `sqlite` or `postgres` to select the backend.
  5. //!
  6. //! ```text
  7. //! let pool = sqlx::any::Pool<Any>Options::new()
  8. //! .connect("sqlite::memory:").await?;
  9. //! let store = SqlStore::new(pool);
  10. //! store.migrate().await?;
  11. //! ```
  12. use std::collections::HashSet;
  13. use async_trait::async_trait;
  14. use sqlx::{Any, Pool, Row};
  15. use kuatia_storage::error::StoreError;
  16. use kuatia_storage::events::{EventStore, LedgerEvent};
  17. use kuatia_storage::store::*;
  18. use kuatia_types::*;
  19. /// SQL-backed [`Store`] implementation.
  20. pub struct SqlStore {
  21. pool: Pool<Any>,
  22. autoid: kuatia_types::autoid::AutoId,
  23. }
  24. impl SqlStore {
  25. /// Create a new SQL store wrapping an existing connection pool.
  26. pub fn new(pool: Pool<Any>) -> Self {
  27. Self {
  28. pool,
  29. autoid: kuatia_types::autoid::AutoId::new(),
  30. }
  31. }
  32. /// Run database migrations.
  33. pub async fn migrate(&self) -> Result<(), StoreError> {
  34. for sql in [
  35. include_str!("migrations/001_init.sql"),
  36. include_str!("migrations/002_timestamps_and_columns.sql"),
  37. include_str!("migrations/003_events.sql"),
  38. ] {
  39. for statement in sql.split(';') {
  40. let trimmed = statement.trim();
  41. if !trimmed.is_empty() {
  42. sqlx::query(trimmed)
  43. .execute(&self.pool)
  44. .await
  45. .map_err(|e| StoreError::Internal(e.to_string()))?;
  46. }
  47. }
  48. }
  49. Ok(())
  50. }
  51. }
  52. // ---------------------------------------------------------------------------
  53. // Serialization helpers
  54. // ---------------------------------------------------------------------------
  55. fn serialize_policy(policy: &AccountPolicy) -> Result<String, StoreError> {
  56. serde_json::to_string(policy)
  57. .map_err(|e| StoreError::Internal(format!("policy serialization: {e}")))
  58. }
  59. fn deserialize_policy(s: &str) -> Result<AccountPolicy, StoreError> {
  60. serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
  61. }
  62. fn serialize_blob<T: serde::Serialize>(val: &T) -> Result<Vec<u8>, StoreError> {
  63. serde_json::to_vec(val).map_err(|e| StoreError::Internal(format!("blob serialization: {e}")))
  64. }
  65. fn deserialize_blob<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T, StoreError> {
  66. serde_json::from_slice(bytes).map_err(|e| StoreError::Internal(format!("bad blob: {e}")))
  67. }
  68. fn status_to_i16(s: PostingStatus) -> i16 {
  69. match s {
  70. PostingStatus::Active => 0,
  71. PostingStatus::PendingInactive => 1,
  72. PostingStatus::Inactive => 2,
  73. }
  74. }
  75. fn status_from_i16(v: i16) -> Result<PostingStatus, StoreError> {
  76. match v {
  77. 0 => Ok(PostingStatus::Active),
  78. 1 => Ok(PostingStatus::PendingInactive),
  79. 2 => Ok(PostingStatus::Inactive),
  80. _ => Err(StoreError::Internal(format!("bad posting status: {v}"))),
  81. }
  82. }
  83. fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
  84. let id: i64 = row
  85. .try_get("id")
  86. .map_err(|e| StoreError::Internal(e.to_string()))?;
  87. let version: i64 = row
  88. .try_get("version")
  89. .map_err(|e| StoreError::Internal(e.to_string()))?;
  90. let policy_str: String = row
  91. .try_get("policy")
  92. .map_err(|e| StoreError::Internal(e.to_string()))?;
  93. let flags_bits: i32 = row
  94. .try_get("flags")
  95. .map_err(|e| StoreError::Internal(e.to_string()))?;
  96. let book: i32 = row
  97. .try_get("book")
  98. .map_err(|e| StoreError::Internal(e.to_string()))?;
  99. let code: i32 = row
  100. .try_get("code")
  101. .map_err(|e| StoreError::Internal(e.to_string()))?;
  102. let user_data_bytes: Vec<u8> = row
  103. .try_get("user_data")
  104. .map_err(|e| StoreError::Internal(e.to_string()))?;
  105. let metadata_bytes: Vec<u8> = row
  106. .try_get("metadata")
  107. .map_err(|e| StoreError::Internal(e.to_string()))?;
  108. Ok(Account {
  109. id: AccountId::new(id),
  110. version: version as u64,
  111. policy: deserialize_policy(&policy_str)?,
  112. flags: AccountFlags::from_bits_truncate(flags_bits as u32),
  113. book: book as u32,
  114. code: code as u32,
  115. user_data: deserialize_blob(&user_data_bytes)?,
  116. metadata: deserialize_blob(&metadata_bytes)?,
  117. })
  118. }
  119. fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
  120. let transfer_id: Vec<u8> = row
  121. .try_get("transfer_id")
  122. .map_err(|e| StoreError::Internal(e.to_string()))?;
  123. let idx: i16 = row
  124. .try_get("idx")
  125. .map_err(|e| StoreError::Internal(e.to_string()))?;
  126. let owner: i64 = row
  127. .try_get("owner")
  128. .map_err(|e| StoreError::Internal(e.to_string()))?;
  129. let asset: i32 = row
  130. .try_get("asset")
  131. .map_err(|e| StoreError::Internal(e.to_string()))?;
  132. let value: i64 = row
  133. .try_get("value")
  134. .map_err(|e| StoreError::Internal(e.to_string()))?;
  135. let status: i16 = row
  136. .try_get("status")
  137. .map_err(|e| StoreError::Internal(e.to_string()))?;
  138. let mut tid = [0u8; 32];
  139. tid.copy_from_slice(&transfer_id);
  140. Ok(Posting {
  141. id: PostingId {
  142. transfer: EnvelopeId(tid),
  143. index: idx as u16,
  144. },
  145. owner: AccountId::new(owner),
  146. asset: AssetId::new(asset as u32),
  147. value: Cent::from(value),
  148. status: status_from_i16(status)?,
  149. })
  150. }
  151. // ---------------------------------------------------------------------------
  152. // AccountStore
  153. // ---------------------------------------------------------------------------
  154. #[async_trait]
  155. impl AccountStore for SqlStore {
  156. async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError> {
  157. let row = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
  158. .bind(id.0)
  159. .fetch_optional(&self.pool)
  160. .await
  161. .map_err(|e| StoreError::Internal(e.to_string()))?
  162. .ok_or_else(|| StoreError::NotFound(format!("account {id:?}")))?;
  163. row_to_account(&row)
  164. }
  165. async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError> {
  166. let mut result = Vec::with_capacity(ids.len());
  167. for id in ids {
  168. result.push(self.get_account(id).await?);
  169. }
  170. Ok(result)
  171. }
  172. async fn create_account(&self, account: Account) -> Result<(), StoreError> {
  173. let exists = sqlx::query("SELECT 1 FROM accounts WHERE id = $1 LIMIT 1")
  174. .bind(account.id.0)
  175. .fetch_optional(&self.pool)
  176. .await
  177. .map_err(|e| StoreError::Internal(e.to_string()))?;
  178. if exists.is_some() {
  179. return Err(StoreError::AlreadyExists(format!(
  180. "account {:?}",
  181. account.id
  182. )));
  183. }
  184. sqlx::query(
  185. "INSERT INTO accounts (id, version, policy, flags, book, code, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
  186. )
  187. .bind(account.id.0)
  188. .bind(account.version as i64)
  189. .bind(serialize_policy(&account.policy)?)
  190. .bind(account.flags.bits() as i32)
  191. .bind(account.book as i32)
  192. .bind(account.code as i32)
  193. .bind(serialize_blob(&account.user_data)?)
  194. .bind(serialize_blob(&account.metadata)?)
  195. .execute(&self.pool)
  196. .await
  197. .map_err(|e| StoreError::Internal(e.to_string()))?;
  198. Ok(())
  199. }
  200. async fn append_account_version(&self, account: Account) -> Result<(), StoreError> {
  201. let current =
  202. sqlx::query("SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
  203. .bind(account.id.0)
  204. .fetch_optional(&self.pool)
  205. .await
  206. .map_err(|e| StoreError::Internal(e.to_string()))?
  207. .ok_or_else(|| StoreError::NotFound(format!("account {:?}", account.id)))?;
  208. let current_version: i64 = current
  209. .try_get("version")
  210. .map_err(|e| StoreError::Internal(e.to_string()))?;
  211. let expected = current_version
  212. .checked_add(1)
  213. .ok_or_else(|| StoreError::Internal("account version overflow".to_string()))?;
  214. if account.version as i64 != expected {
  215. return Err(StoreError::VersionConflict {
  216. account: account.id,
  217. expected: expected as u64,
  218. actual: account.version,
  219. });
  220. }
  221. sqlx::query(
  222. "INSERT INTO accounts (id, version, policy, flags, book, code, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
  223. )
  224. .bind(account.id.0)
  225. .bind(account.version as i64)
  226. .bind(serialize_policy(&account.policy)?)
  227. .bind(account.flags.bits() as i32)
  228. .bind(account.book as i32)
  229. .bind(account.code as i32)
  230. .bind(serialize_blob(&account.user_data)?)
  231. .bind(serialize_blob(&account.metadata)?)
  232. .execute(&self.pool)
  233. .await
  234. .map_err(|e| StoreError::Internal(e.to_string()))?;
  235. Ok(())
  236. }
  237. async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError> {
  238. let rows = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version ASC")
  239. .bind(id.0)
  240. .fetch_all(&self.pool)
  241. .await
  242. .map_err(|e| StoreError::Internal(e.to_string()))?;
  243. if rows.is_empty() {
  244. return Err(StoreError::NotFound(format!("account {id:?}")));
  245. }
  246. rows.iter().map(row_to_account).collect()
  247. }
  248. async fn list_accounts(&self) -> Result<Vec<Account>, StoreError> {
  249. let rows = sqlx::query("SELECT * FROM accounts ORDER BY id, version DESC")
  250. .fetch_all(&self.pool)
  251. .await
  252. .map_err(|e| StoreError::Internal(e.to_string()))?;
  253. let mut accounts: Vec<Account> = rows.iter().map(row_to_account).collect::<Result<_, _>>()?;
  254. accounts.dedup_by_key(|a| a.id);
  255. Ok(accounts)
  256. }
  257. }
  258. // ---------------------------------------------------------------------------
  259. // PostingStore
  260. // ---------------------------------------------------------------------------
  261. #[async_trait]
  262. impl PostingStore for SqlStore {
  263. async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError> {
  264. let mut result = Vec::with_capacity(ids.len());
  265. for id in ids {
  266. let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
  267. .bind(id.transfer.0.as_slice())
  268. .bind(id.index as i16)
  269. .fetch_optional(&self.pool)
  270. .await
  271. .map_err(|e| StoreError::Internal(e.to_string()))?
  272. .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
  273. result.push(row_to_posting(&row)?);
  274. }
  275. Ok(result)
  276. }
  277. async fn get_postings_by_account(
  278. &self,
  279. account: &AccountId,
  280. asset: Option<&AssetId>,
  281. status: Option<PostingStatus>,
  282. ) -> Result<Vec<Posting>, StoreError> {
  283. let rows = match (asset, status) {
  284. (Some(a), Some(s)) => {
  285. sqlx::query(
  286. "SELECT * FROM postings WHERE owner = $1 AND asset = $2 AND status = $3",
  287. )
  288. .bind(account.0)
  289. .bind(a.0 as i32)
  290. .bind(status_to_i16(s))
  291. .fetch_all(&self.pool)
  292. .await
  293. }
  294. (Some(a), None) => {
  295. sqlx::query("SELECT * FROM postings WHERE owner = $1 AND asset = $2")
  296. .bind(account.0)
  297. .bind(a.0 as i32)
  298. .fetch_all(&self.pool)
  299. .await
  300. }
  301. (None, Some(s)) => {
  302. sqlx::query("SELECT * FROM postings WHERE owner = $1 AND status = $2")
  303. .bind(account.0)
  304. .bind(status_to_i16(s))
  305. .fetch_all(&self.pool)
  306. .await
  307. }
  308. (None, None) => {
  309. sqlx::query("SELECT * FROM postings WHERE owner = $1")
  310. .bind(account.0)
  311. .fetch_all(&self.pool)
  312. .await
  313. }
  314. }
  315. .map_err(|e| StoreError::Internal(e.to_string()))?;
  316. rows.iter().map(row_to_posting).collect()
  317. }
  318. async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
  319. let (where_clause, count_clause) = {
  320. let mut w = String::from("WHERE owner = $1");
  321. let mut idx = 2u32;
  322. if query.asset.is_some() {
  323. w.push_str(&format!(" AND asset = ${idx}"));
  324. idx += 1;
  325. }
  326. if query.status.is_some() {
  327. w.push_str(&format!(" AND status = ${idx}"));
  328. }
  329. let c = format!("SELECT COUNT(*) as cnt FROM postings {w}");
  330. let limit = query.limit.unwrap_or(u32::MAX);
  331. let offset = query.offset.unwrap_or(0);
  332. w.push_str(&format!(" LIMIT {limit} OFFSET {offset}"));
  333. (format!("SELECT * FROM postings {w}"), c)
  334. };
  335. // Build count query
  336. let mut count_q = sqlx::query(&count_clause).bind(query.account.0);
  337. if let Some(ref a) = query.asset {
  338. count_q = count_q.bind(a.0 as i32);
  339. }
  340. if let Some(s) = query.status {
  341. count_q = count_q.bind(status_to_i16(s));
  342. }
  343. let count_row = count_q
  344. .fetch_one(&self.pool)
  345. .await
  346. .map_err(|e| StoreError::Internal(e.to_string()))?;
  347. let total: i64 = count_row
  348. .try_get("cnt")
  349. .map_err(|e| StoreError::Internal(e.to_string()))?;
  350. // Build data query
  351. let mut data_q = sqlx::query(&where_clause).bind(query.account.0);
  352. if let Some(ref a) = query.asset {
  353. data_q = data_q.bind(a.0 as i32);
  354. }
  355. if let Some(s) = query.status {
  356. data_q = data_q.bind(status_to_i16(s));
  357. }
  358. let rows = data_q
  359. .fetch_all(&self.pool)
  360. .await
  361. .map_err(|e| StoreError::Internal(e.to_string()))?;
  362. let items: Vec<Posting> = rows.iter().map(row_to_posting).collect::<Result<_, _>>()?;
  363. Ok(Page {
  364. items,
  365. total: total as u64,
  366. })
  367. }
  368. async fn reserve_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
  369. // Validate all Active first, then update in a transaction.
  370. let mut tx = self
  371. .pool
  372. .begin()
  373. .await
  374. .map_err(|e| StoreError::Internal(e.to_string()))?;
  375. for id in ids {
  376. let row =
  377. sqlx::query("SELECT status FROM postings WHERE transfer_id = $1 AND idx = $2")
  378. .bind(id.transfer.0.as_slice())
  379. .bind(id.index as i16)
  380. .fetch_optional(&mut *tx)
  381. .await
  382. .map_err(|e| StoreError::Internal(e.to_string()))?
  383. .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
  384. let status: i16 = row
  385. .try_get("status")
  386. .map_err(|e| StoreError::Internal(e.to_string()))?;
  387. if status != 0 {
  388. return Err(StoreError::PostingNotActive(*id));
  389. }
  390. }
  391. for id in ids {
  392. sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3")
  393. .bind(status_to_i16(PostingStatus::PendingInactive))
  394. .bind(id.transfer.0.as_slice())
  395. .bind(id.index as i16)
  396. .execute(&mut *tx)
  397. .await
  398. .map_err(|e| StoreError::Internal(e.to_string()))?;
  399. }
  400. tx.commit()
  401. .await
  402. .map_err(|e| StoreError::Internal(e.to_string()))?;
  403. Ok(())
  404. }
  405. async fn release_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
  406. let mut tx = self
  407. .pool
  408. .begin()
  409. .await
  410. .map_err(|e| StoreError::Internal(e.to_string()))?;
  411. for id in ids {
  412. let row =
  413. sqlx::query("SELECT status FROM postings WHERE transfer_id = $1 AND idx = $2")
  414. .bind(id.transfer.0.as_slice())
  415. .bind(id.index as i16)
  416. .fetch_optional(&mut *tx)
  417. .await
  418. .map_err(|e| StoreError::Internal(e.to_string()))?
  419. .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
  420. let status: i16 = row
  421. .try_get("status")
  422. .map_err(|e| StoreError::Internal(e.to_string()))?;
  423. if status == 2 {
  424. return Err(StoreError::PostingInactive(*id));
  425. }
  426. }
  427. for id in ids {
  428. sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3 AND status = $4")
  429. .bind(status_to_i16(PostingStatus::Active))
  430. .bind(id.transfer.0.as_slice())
  431. .bind(id.index as i16)
  432. .bind(status_to_i16(PostingStatus::PendingInactive))
  433. .execute(&mut *tx)
  434. .await
  435. .map_err(|e| StoreError::Internal(e.to_string()))?;
  436. }
  437. tx.commit()
  438. .await
  439. .map_err(|e| StoreError::Internal(e.to_string()))?;
  440. Ok(())
  441. }
  442. async fn finalize_postings(
  443. &self,
  444. deactivate: &[PostingId],
  445. create: &[Posting],
  446. ) -> Result<(), StoreError> {
  447. let mut tx = self
  448. .pool
  449. .begin()
  450. .await
  451. .map_err(|e| StoreError::Internal(e.to_string()))?;
  452. for id in deactivate {
  453. sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3")
  454. .bind(status_to_i16(PostingStatus::Inactive))
  455. .bind(id.transfer.0.as_slice())
  456. .bind(id.index as i16)
  457. .execute(&mut *tx)
  458. .await
  459. .map_err(|e| StoreError::Internal(e.to_string()))?;
  460. }
  461. for posting in create {
  462. sqlx::query(
  463. "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6)"
  464. )
  465. .bind(posting.id.transfer.0.as_slice())
  466. .bind(posting.id.index as i16)
  467. .bind(posting.owner.0)
  468. .bind(posting.asset.0 as i32)
  469. .bind(posting.value.value())
  470. .bind(status_to_i16(posting.status))
  471. .execute(&mut *tx)
  472. .await
  473. .map_err(|e| StoreError::Internal(e.to_string()))?;
  474. }
  475. tx.commit()
  476. .await
  477. .map_err(|e| StoreError::Internal(e.to_string()))?;
  478. Ok(())
  479. }
  480. }
  481. // ---------------------------------------------------------------------------
  482. // TransferStore
  483. // ---------------------------------------------------------------------------
  484. #[async_trait]
  485. impl TransferStore for SqlStore {
  486. async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
  487. let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
  488. .bind(id.0.as_slice())
  489. .fetch_optional(&self.pool)
  490. .await
  491. .map_err(|e| StoreError::Internal(e.to_string()))?;
  492. match row {
  493. None => Ok(None),
  494. Some(row) => {
  495. let transfer_bytes: Vec<u8> = row
  496. .try_get("transfer")
  497. .map_err(|e| StoreError::Internal(e.to_string()))?;
  498. let receipt_bytes: Vec<u8> = row
  499. .try_get("receipt")
  500. .map_err(|e| StoreError::Internal(e.to_string()))?;
  501. let created_at: i64 = row
  502. .try_get("created_at")
  503. .map_err(|e| StoreError::Internal(e.to_string()))?;
  504. Ok(Some(EnvelopeRecord {
  505. envelope: deserialize_blob(&transfer_bytes)?,
  506. receipt: deserialize_blob(&receipt_bytes)?,
  507. created_at,
  508. }))
  509. }
  510. }
  511. }
  512. async fn store_transfer(&self, record: EnvelopeRecord) -> Result<(), StoreError> {
  513. let tid = record.receipt.transfer_id;
  514. let transfer_bytes = serialize_blob(&record.envelope)?;
  515. let receipt_bytes = serialize_blob(&record.receipt)?;
  516. let mut tx = self
  517. .pool
  518. .begin()
  519. .await
  520. .map_err(|e| StoreError::Internal(e.to_string()))?;
  521. sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book, code) VALUES ($1, $2, $3, $4, $5, $6)")
  522. .bind(tid.0.as_slice())
  523. .bind(&transfer_bytes)
  524. .bind(&receipt_bytes)
  525. .bind(record.created_at)
  526. .bind(record.envelope.book() as i32)
  527. .bind(record.envelope.code() as i32)
  528. .execute(&mut *tx)
  529. .await
  530. .map_err(|e| StoreError::Internal(e.to_string()))?;
  531. // Populate transfer_accounts join table
  532. let mut account_ids: HashSet<i64> = HashSet::new();
  533. for np in record.envelope.creates() {
  534. account_ids.insert(np.owner.0);
  535. }
  536. for aid in &account_ids {
  537. sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2)")
  538. .bind(tid.0.as_slice())
  539. .bind(*aid)
  540. .execute(&mut *tx)
  541. .await
  542. .map_err(|e| StoreError::Internal(e.to_string()))?;
  543. }
  544. tx.commit()
  545. .await
  546. .map_err(|e| StoreError::Internal(e.to_string()))?;
  547. Ok(())
  548. }
  549. async fn get_transfers_for_account(
  550. &self,
  551. account: &AccountId,
  552. ) -> Result<Vec<EnvelopeRecord>, StoreError> {
  553. let rows = sqlx::query(
  554. "SELECT t.id, t.transfer, t.receipt, t.created_at FROM transfers t INNER JOIN transfer_accounts ta ON t.id = ta.transfer_id WHERE ta.account_id = $1 ORDER BY t.created_at"
  555. )
  556. .bind(account.0)
  557. .fetch_all(&self.pool)
  558. .await
  559. .map_err(|e| StoreError::Internal(e.to_string()))?;
  560. let mut result = Vec::with_capacity(rows.len());
  561. for row in &rows {
  562. let transfer_bytes: Vec<u8> = row
  563. .try_get("transfer")
  564. .map_err(|e| StoreError::Internal(e.to_string()))?;
  565. let receipt_bytes: Vec<u8> = row
  566. .try_get("receipt")
  567. .map_err(|e| StoreError::Internal(e.to_string()))?;
  568. let created_at: i64 = row
  569. .try_get("created_at")
  570. .map_err(|e| StoreError::Internal(e.to_string()))?;
  571. result.push(EnvelopeRecord {
  572. envelope: deserialize_blob(&transfer_bytes)?,
  573. receipt: deserialize_blob(&receipt_bytes)?,
  574. created_at,
  575. });
  576. }
  577. Ok(result)
  578. }
  579. async fn query_transfers(
  580. &self,
  581. query: &TransferQuery,
  582. ) -> Result<Page<EnvelopeRecord>, StoreError> {
  583. // Load base records, using the account join when available.
  584. let base_records = if let Some(ref account) = query.account {
  585. self.get_transfers_for_account(account).await?
  586. } else {
  587. let rows = sqlx::query(
  588. "SELECT transfer, receipt, created_at FROM transfers ORDER BY created_at",
  589. )
  590. .fetch_all(&self.pool)
  591. .await
  592. .map_err(|e| StoreError::Internal(e.to_string()))?;
  593. let mut records = Vec::with_capacity(rows.len());
  594. for row in &rows {
  595. let transfer_bytes: Vec<u8> = row
  596. .try_get("transfer")
  597. .map_err(|e| StoreError::Internal(e.to_string()))?;
  598. let receipt_bytes: Vec<u8> = row
  599. .try_get("receipt")
  600. .map_err(|e| StoreError::Internal(e.to_string()))?;
  601. let created_at: i64 = row
  602. .try_get("created_at")
  603. .map_err(|e| StoreError::Internal(e.to_string()))?;
  604. records.push(EnvelopeRecord {
  605. envelope: deserialize_blob(&transfer_bytes)?,
  606. receipt: deserialize_blob(&receipt_bytes)?,
  607. created_at,
  608. });
  609. }
  610. records
  611. };
  612. // Filter in memory for remaining conditions.
  613. let filtered: Vec<EnvelopeRecord> = base_records
  614. .into_iter()
  615. .filter(|r| {
  616. if let Some(from) = query.from_ts
  617. && r.created_at < from
  618. {
  619. return false;
  620. }
  621. if let Some(to) = query.to_ts
  622. && r.created_at >= to
  623. {
  624. return false;
  625. }
  626. if let Some(book) = query.book
  627. && r.envelope.book() != book
  628. {
  629. return false;
  630. }
  631. if let Some(code) = query.code
  632. && r.envelope.code() != code
  633. {
  634. return false;
  635. }
  636. true
  637. })
  638. .collect();
  639. let total = filtered.len() as u64;
  640. let offset = query.offset.unwrap_or(0) as usize;
  641. let limit = query.limit.unwrap_or(u32::MAX) as usize;
  642. let items = filtered.into_iter().skip(offset).take(limit).collect();
  643. Ok(Page { items, total })
  644. }
  645. }
  646. // ---------------------------------------------------------------------------
  647. // SagaStore
  648. // ---------------------------------------------------------------------------
  649. #[async_trait]
  650. impl SagaStore for SqlStore {
  651. async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError> {
  652. sqlx::query("INSERT OR REPLACE INTO sagas (id, data) VALUES ($1, $2)")
  653. .bind(*id)
  654. .bind(&data)
  655. .execute(&self.pool)
  656. .await
  657. .map_err(|e| StoreError::Internal(e.to_string()))?;
  658. Ok(())
  659. }
  660. async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError> {
  661. let rows = sqlx::query("SELECT id, data FROM sagas")
  662. .fetch_all(&self.pool)
  663. .await
  664. .map_err(|e| StoreError::Internal(e.to_string()))?;
  665. let mut result = Vec::with_capacity(rows.len());
  666. for row in &rows {
  667. let id: i64 = row
  668. .try_get("id")
  669. .map_err(|e| StoreError::Internal(e.to_string()))?;
  670. let data: Vec<u8> = row
  671. .try_get("data")
  672. .map_err(|e| StoreError::Internal(e.to_string()))?;
  673. result.push((id, data));
  674. }
  675. Ok(result)
  676. }
  677. async fn delete_saga(&self, id: &i64) -> Result<(), StoreError> {
  678. sqlx::query("DELETE FROM sagas WHERE id = $1")
  679. .bind(*id)
  680. .execute(&self.pool)
  681. .await
  682. .map_err(|e| StoreError::Internal(e.to_string()))?;
  683. Ok(())
  684. }
  685. }
  686. // ---------------------------------------------------------------------------
  687. // EventStore
  688. // ---------------------------------------------------------------------------
  689. #[async_trait]
  690. impl EventStore for SqlStore {
  691. async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
  692. let kind_str =
  693. serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
  694. let data = serialize_blob(event)?;
  695. let seq = self.autoid.next() as u64;
  696. sqlx::query("INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)")
  697. .bind(seq as i64)
  698. .bind(event.timestamp)
  699. .bind(&kind_str)
  700. .bind(&data)
  701. .execute(&self.pool)
  702. .await
  703. .map_err(|e| StoreError::Internal(e.to_string()))?;
  704. Ok(seq)
  705. }
  706. async fn get_events_since(
  707. &self,
  708. after_seq: u64,
  709. limit: u32,
  710. ) -> Result<Vec<LedgerEvent>, StoreError> {
  711. let rows = sqlx::query("SELECT seq, data FROM events WHERE seq > $1 ORDER BY seq LIMIT $2")
  712. .bind(after_seq as i64)
  713. .bind(limit as i32)
  714. .fetch_all(&self.pool)
  715. .await
  716. .map_err(|e| StoreError::Internal(e.to_string()))?;
  717. let mut events = Vec::with_capacity(rows.len());
  718. for row in &rows {
  719. let seq: i64 = row
  720. .try_get("seq")
  721. .map_err(|e| StoreError::Internal(e.to_string()))?;
  722. let data: Vec<u8> = row
  723. .try_get("data")
  724. .map_err(|e| StoreError::Internal(e.to_string()))?;
  725. let mut event: LedgerEvent = deserialize_blob(&data)?;
  726. event.seq = seq as u64;
  727. events.push(event);
  728. }
  729. Ok(events)
  730. }
  731. }