concurrency.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. //! Concurrency tests for the saga commit pipeline over `InMemoryStore`.
  2. //!
  3. //! `InMemoryStore` guards each field with a `tokio::RwLock`, so every individual
  4. //! `Store` primitive is atomic. A saga, however, is a *sequence* of primitives
  5. //! with no overarching lock, so the interesting races live between primitives
  6. //! across concurrent sagas that share one `Arc<Ledger>`. The generated
  7. //! conformance suite only drives the store sequentially, so none of this is
  8. //! covered there.
  9. //!
  10. //! These tests run on a multi-thread runtime and use `tokio::spawn` so the
  11. //! sagas genuinely interleave rather than run to completion one at a time.
  12. #![allow(missing_docs)]
  13. use std::collections::BTreeMap;
  14. use std::sync::Arc;
  15. use kuatia::ledger::Ledger;
  16. use kuatia::mem_store::InMemoryStore;
  17. use kuatia_core::*;
  18. fn usd() -> AssetId {
  19. AssetId::new(1)
  20. }
  21. fn account(id: i64) -> AccountId {
  22. AccountId::new(id)
  23. }
  24. fn external() -> AccountId {
  25. AccountId::new(99)
  26. }
  27. fn make_account(id: i64, policy: AccountPolicy) -> Account {
  28. Account {
  29. id: AccountId::new(id),
  30. version: 1,
  31. policy,
  32. flags: AccountFlags::empty(),
  33. book: BookId(0),
  34. user_data: UserData::default(),
  35. metadata: BTreeMap::new(),
  36. }
  37. }
  38. /// A ledger with `NoOverdraft` accounts `1..=n` plus an external account.
  39. async fn ledger_with_accounts(n: i64) -> Arc<Ledger> {
  40. let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
  41. for id in 1..=n {
  42. ledger
  43. .store()
  44. .create_account(make_account(id, AccountPolicy::NoOverdraft))
  45. .await
  46. .unwrap();
  47. }
  48. ledger
  49. .store()
  50. .create_account(make_account(99, AccountPolicy::ExternalAccount))
  51. .await
  52. .unwrap();
  53. ledger
  54. }
  55. async fn deposit(ledger: &Arc<Ledger>, to: AccountId, amount: Cent) {
  56. let transfer = TransferBuilder::new()
  57. .deposit(to, usd(), amount, external())
  58. .unwrap()
  59. .build();
  60. ledger.commit(transfer).await.unwrap();
  61. }
  62. // ---------------------------------------------------------------------------
  63. // 1. Double-spend prevention (the headline invariant)
  64. // ---------------------------------------------------------------------------
  65. /// Many transfers concurrently try to spend the *same* funded posting to
  66. /// different recipients. Exactly one may win: the winner's `reserve_postings`
  67. /// flips the single Active posting to `PendingInactive`, and every other saga's
  68. /// reserve returns zero for a fresh reservation, so it fails and compensates.
  69. /// The ledger stays conserved: the payer ends at zero and exactly one recipient
  70. /// receives the full amount.
  71. #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
  72. async fn concurrent_double_spend_has_one_winner() {
  73. const RECIPIENTS: i64 = 8;
  74. let ledger = ledger_with_accounts(1 + RECIPIENTS).await;
  75. // Account 1 holds a single Active posting of 100.
  76. deposit(&ledger, account(1), Cent::from(100)).await;
  77. // Fire one full-balance payment per recipient, all at once.
  78. let mut handles = Vec::new();
  79. for recipient in 2..=(1 + RECIPIENTS) {
  80. let ledger = Arc::clone(&ledger);
  81. handles.push(tokio::spawn(async move {
  82. let transfer = TransferBuilder::new()
  83. .pay(account(1), account(recipient), usd(), Cent::from(100))
  84. .build();
  85. ledger.commit(transfer).await
  86. }));
  87. }
  88. let mut winners = 0;
  89. for h in handles {
  90. if h.await.unwrap().is_ok() {
  91. winners += 1;
  92. }
  93. }
  94. assert_eq!(winners, 1, "exactly one concurrent spend may succeed");
  95. // Conservation: payer drained, exactly one recipient credited, total = 100.
  96. assert_eq!(
  97. ledger.balance(&account(1), &usd()).await.unwrap(),
  98. Cent::ZERO
  99. );
  100. let mut credited = 0;
  101. let mut total = Cent::ZERO;
  102. for recipient in 2..=(1 + RECIPIENTS) {
  103. let bal = ledger.balance(&account(recipient), &usd()).await.unwrap();
  104. if bal != Cent::ZERO {
  105. credited += 1;
  106. assert_eq!(bal, Cent::from(100));
  107. }
  108. total = total.checked_add(bal).unwrap();
  109. }
  110. assert_eq!(credited, 1, "exactly one recipient is credited");
  111. assert_eq!(total, Cent::from(100), "value is conserved");
  112. }
  113. // ---------------------------------------------------------------------------
  114. // 2. Idempotency
  115. // ---------------------------------------------------------------------------
  116. /// Re-committing an already-committed envelope returns the same receipt and does
  117. /// not move value a second time. This is the sequential idempotency contract
  118. /// that `commit_envelope` guarantees via its content-addressed short-circuit.
  119. #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
  120. async fn recommit_same_envelope_is_idempotent() {
  121. let ledger = ledger_with_accounts(2).await;
  122. deposit(&ledger, account(1), Cent::from(100)).await;
  123. let transfer = TransferBuilder::new()
  124. .pay(account(1), account(2), usd(), Cent::from(50))
  125. .build();
  126. let envelope = ledger.resolve(&transfer).await.unwrap();
  127. let first = ledger.commit_envelope(envelope.clone()).await.unwrap();
  128. let second = ledger.commit_envelope(envelope).await.unwrap();
  129. assert_eq!(first, second, "replay returns the original receipt");
  130. assert_eq!(
  131. ledger.balance(&account(1), &usd()).await.unwrap(),
  132. Cent::from(50)
  133. );
  134. assert_eq!(
  135. ledger.balance(&account(2), &usd()).await.unwrap(),
  136. Cent::from(50)
  137. );
  138. }
  139. /// The same envelope committed concurrently from many tasks. Because the
  140. /// content-addressed id is the idempotency key, value moves exactly once no
  141. /// matter how the sagas interleave: some tasks win or observe the stored
  142. /// transfer and return its receipt; the rest lose the reservation race and
  143. /// fail. Every successful receipt is identical, and the balances move once.
  144. #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
  145. async fn concurrent_identical_commits_move_value_once() {
  146. const TASKS: usize = 8;
  147. let ledger = ledger_with_accounts(2).await;
  148. deposit(&ledger, account(1), Cent::from(100)).await;
  149. let transfer = TransferBuilder::new()
  150. .pay(account(1), account(2), usd(), Cent::from(50))
  151. .build();
  152. let envelope = ledger.resolve(&transfer).await.unwrap();
  153. let mut handles = Vec::new();
  154. for _ in 0..TASKS {
  155. let ledger = Arc::clone(&ledger);
  156. let envelope = envelope.clone();
  157. handles.push(tokio::spawn(async move {
  158. ledger.commit_envelope(envelope).await
  159. }));
  160. }
  161. let mut receipts = Vec::new();
  162. for h in handles {
  163. if let Ok(receipt) = h.await.unwrap() {
  164. receipts.push(receipt);
  165. }
  166. }
  167. assert!(!receipts.is_empty(), "at least one commit succeeds");
  168. let first = &receipts[0];
  169. assert!(
  170. receipts.iter().all(|r| r == first),
  171. "every successful commit returns the same receipt"
  172. );
  173. // Value moved exactly once, and exactly one transfer is stored.
  174. assert_eq!(
  175. ledger.balance(&account(1), &usd()).await.unwrap(),
  176. Cent::from(50)
  177. );
  178. assert_eq!(
  179. ledger.balance(&account(2), &usd()).await.unwrap(),
  180. Cent::from(50)
  181. );
  182. assert!(
  183. ledger
  184. .store()
  185. .get_transfer(&first.transfer_id)
  186. .await
  187. .unwrap()
  188. .is_some(),
  189. "the committed transfer is persisted"
  190. );
  191. }
  192. // ---------------------------------------------------------------------------
  193. // 3. Freeze vs. commit race
  194. // ---------------------------------------------------------------------------
  195. /// Freezing an account concurrently with a payment out of it must leave a
  196. /// consistent state. The account is versioned and the commit pins the snapshot
  197. /// it validated against, so the two serialize one way or the other: either the
  198. /// payment finalizes first (against the unfrozen snapshot) and the freeze lands
  199. /// on top, or the freeze bumps the version first and the commit's last-step
  200. /// re-validation rejects the now-frozen account. There is no middle ground where
  201. /// value moves out of a frozen account against a stale snapshot. Value is always
  202. /// conserved and the payment is all-or-nothing.
  203. #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
  204. async fn freeze_during_commit_stays_consistent() {
  205. // Race is timing-dependent; run several fresh rounds to sample interleavings.
  206. for _ in 0..24 {
  207. let ledger = ledger_with_accounts(2).await;
  208. deposit(&ledger, account(1), Cent::from(100)).await;
  209. let freezer = {
  210. let ledger = Arc::clone(&ledger);
  211. tokio::spawn(async move { ledger.freeze(&account(1)).await })
  212. };
  213. let payer = {
  214. let ledger = Arc::clone(&ledger);
  215. tokio::spawn(async move {
  216. let transfer = TransferBuilder::new()
  217. .pay(account(1), account(2), usd(), Cent::from(50))
  218. .build();
  219. ledger.commit(transfer).await
  220. })
  221. };
  222. freezer.await.unwrap().expect("freeze always succeeds");
  223. let paid = payer.await.unwrap().is_ok();
  224. let b1 = ledger.balance(&account(1), &usd()).await.unwrap();
  225. let b2 = ledger.balance(&account(2), &usd()).await.unwrap();
  226. // Conservation and all-or-nothing, keyed on whether the pay committed.
  227. assert_eq!(
  228. b1.checked_add(b2).unwrap(),
  229. Cent::from(100),
  230. "value is conserved regardless of who won"
  231. );
  232. if paid {
  233. assert_eq!(b1, Cent::from(50));
  234. assert_eq!(b2, Cent::from(50));
  235. } else {
  236. assert_eq!(b1, Cent::from(100));
  237. assert_eq!(b2, Cent::ZERO);
  238. }
  239. // The account is frozen either way; no further payment may leave it.
  240. assert!(ledger.get_account(&account(1)).await.unwrap().is_frozen());
  241. let after = TransferBuilder::new()
  242. .pay(account(1), account(2), usd(), Cent::from(10))
  243. .build();
  244. assert!(
  245. ledger.commit(after).await.is_err(),
  246. "a frozen account cannot pay"
  247. );
  248. }
  249. }
  250. // ---------------------------------------------------------------------------
  251. // 4. Disjoint transfers all commit and conserve
  252. // ---------------------------------------------------------------------------
  253. /// Concurrent transfers over non-overlapping accounts never contend, so all of
  254. /// them commit and total value is conserved. This is the throughput counterpart
  255. /// to the double-spend test: parallelism is only constrained where postings are
  256. /// actually shared.
  257. #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
  258. async fn disjoint_transfers_all_commit_and_conserve() {
  259. const PAIRS: i64 = 8;
  260. // Accounts 1..=2*PAIRS: odd = payer (funded), even = payee.
  261. let ledger = ledger_with_accounts(2 * PAIRS).await;
  262. for k in 0..PAIRS {
  263. deposit(&ledger, account(2 * k + 1), Cent::from(100)).await;
  264. }
  265. let mut handles = Vec::new();
  266. for k in 0..PAIRS {
  267. let ledger = Arc::clone(&ledger);
  268. handles.push(tokio::spawn(async move {
  269. let transfer = TransferBuilder::new()
  270. .pay(
  271. account(2 * k + 1),
  272. account(2 * k + 2),
  273. usd(),
  274. Cent::from(100),
  275. )
  276. .build();
  277. ledger.commit(transfer).await
  278. }));
  279. }
  280. for h in handles {
  281. h.await.unwrap().expect("disjoint transfers never contend");
  282. }
  283. let mut total = Cent::ZERO;
  284. for id in 1..=(2 * PAIRS) {
  285. let bal = ledger.balance(&account(id), &usd()).await.unwrap();
  286. let expected = if id % 2 == 0 {
  287. Cent::from(100)
  288. } else {
  289. Cent::ZERO
  290. };
  291. assert_eq!(bal, expected, "account {id} settled");
  292. total = total.checked_add(bal).unwrap();
  293. }
  294. assert_eq!(total, Cent::from(100 * PAIRS), "value is conserved");
  295. }
  296. // ---------------------------------------------------------------------------
  297. // 5. Overdraft floor is best-effort under concurrency (documented limitation)
  298. // ---------------------------------------------------------------------------
  299. /// Documents a known, accepted limitation: the `CappedOverdraft` floor is
  300. /// re-checked at the last step before writing, but that check is not atomic
  301. /// with the write. Two overdrafts that each pass the floor check against the
  302. /// same pre-transfer balance can both commit and jointly push the account below
  303. /// its floor. See `doc/transfers.md`.
  304. ///
  305. /// This test is `#[ignore]`d because the breach is timing-dependent, so it is
  306. /// executable documentation rather than a CI assertion. What always holds, and
  307. /// what it does assert, is per-asset conservation: the overdraft's negative
  308. /// postings are real value owed, never minted. If a run drives the account below
  309. /// the floor, that is the documented behavior, not a conservation failure.
  310. #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
  311. #[ignore = "documents the best-effort overdraft floor; breach is timing-dependent"]
  312. async fn overdraft_floor_is_best_effort_under_concurrency() {
  313. let floor = Cent::from(-100);
  314. let mut observed_breach = false;
  315. const PAYEES: i64 = 8;
  316. for _ in 0..64 {
  317. let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
  318. ledger
  319. .store()
  320. .create_account(make_account(1, AccountPolicy::CappedOverdraft { floor }))
  321. .await
  322. .unwrap();
  323. for payee in 2..=(1 + PAYEES) {
  324. ledger
  325. .store()
  326. .create_account(make_account(payee, AccountPolicy::NoOverdraft))
  327. .await
  328. .unwrap();
  329. }
  330. // One payment of 60 to each distinct payee from an empty overdraft
  331. // account (distinct payees keep the envelopes distinct, so they are not
  332. // collapsed by content-addressed idempotency). Each alone projects to
  333. // -60 (within the -100 floor); any two that slip through the last-step
  334. // floor check together already breach it.
  335. let mut handles = Vec::new();
  336. for payee in 2..=(1 + PAYEES) {
  337. let ledger = Arc::clone(&ledger);
  338. handles.push(tokio::spawn(async move {
  339. let transfer = TransferBuilder::new()
  340. .pay(account(1), account(payee), usd(), Cent::from(60))
  341. .build();
  342. ledger.commit(transfer).await
  343. }));
  344. }
  345. for h in handles {
  346. let _ = h.await.unwrap();
  347. }
  348. let mut total = ledger.balance(&account(1), &usd()).await.unwrap();
  349. for payee in 2..=(1 + PAYEES) {
  350. total = total
  351. .checked_add(ledger.balance(&account(payee), &usd()).await.unwrap())
  352. .unwrap();
  353. }
  354. assert_eq!(
  355. total,
  356. Cent::ZERO,
  357. "value is conserved even when the floor is breached"
  358. );
  359. if ledger.balance(&account(1), &usd()).await.unwrap() < floor {
  360. observed_breach = true;
  361. }
  362. }
  363. eprintln!(
  364. "overdraft floor breach observed under concurrency: {observed_breach} \
  365. (best-effort by design; see doc/transfers.md)"
  366. );
  367. }