saga.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #![allow(missing_docs)]
  2. use std::sync::Arc;
  3. use kuatia::ledger::Ledger;
  4. use kuatia::mem_store::InMemoryStore;
  5. use kuatia::saga::*;
  6. use kuatia_core::*;
  7. use legend::{ExecutionResult, legend};
  8. use std::collections::BTreeMap;
  9. fn usd() -> AssetId {
  10. AssetId::new(1)
  11. }
  12. fn account(id: i64) -> AccountId {
  13. AccountId::new(id)
  14. }
  15. fn external() -> AccountId {
  16. AccountId::new(99)
  17. }
  18. fn make_account(id: i64, policy: AccountPolicy) -> Account {
  19. Account {
  20. id: AccountId::new(id),
  21. version: 1,
  22. policy,
  23. flags: AccountFlags::empty(),
  24. book: BookId(0),
  25. user_data: UserData::default(),
  26. metadata: BTreeMap::new(),
  27. }
  28. }
  29. async fn setup_ledger() -> Arc<Ledger> {
  30. let store = InMemoryStore::new();
  31. let ledger = Arc::new(Ledger::new(store));
  32. for (id, policy) in [
  33. (1, AccountPolicy::NoOverdraft),
  34. (2, AccountPolicy::NoOverdraft),
  35. (3, AccountPolicy::NoOverdraft),
  36. (99, AccountPolicy::ExternalAccount),
  37. ] {
  38. ledger
  39. .store()
  40. .create_account(make_account(id, policy))
  41. .await
  42. .unwrap();
  43. }
  44. ledger
  45. }
  46. // Define a two-step saga: deposit then pay
  47. legend! {
  48. FundAndPay<LedgerCtx, SagaError> {
  49. deposit: DepositMovementStep,
  50. pay: PayMovementStep,
  51. }
  52. }
  53. #[tokio::test]
  54. async fn saga_happy_path() {
  55. let ledger = setup_ledger().await;
  56. let saga = FundAndPay::new(FundAndPayInputs {
  57. deposit: DepositInput {
  58. to: account(1),
  59. asset: usd(),
  60. amount: Cent::from(100),
  61. external: external(),
  62. },
  63. pay: PayInput {
  64. from: account(1),
  65. to: account(2),
  66. asset: usd(),
  67. amount: Cent::from(60),
  68. },
  69. });
  70. let ctx = LedgerCtx::new(ledger.clone());
  71. let execution = saga.build(ctx);
  72. match execution.start().await {
  73. ExecutionResult::Completed(e) => {
  74. assert_eq!(e.context().receipts.len(), 2);
  75. }
  76. other => panic!("expected Completed, got {:?}", result_debug(&other)),
  77. }
  78. assert_eq!(
  79. ledger.balance(&account(1), &usd()).await.unwrap(),
  80. Cent::from(40)
  81. );
  82. assert_eq!(
  83. ledger.balance(&account(2), &usd()).await.unwrap(),
  84. Cent::from(60)
  85. );
  86. assert_eq!(
  87. ledger.balance(&external(), &usd()).await.unwrap(),
  88. Cent::from(-100)
  89. );
  90. }
  91. // Define a saga that will fail on the second step and trigger compensation
  92. legend! {
  93. DepositAndOverspend<LedgerCtx, SagaError> {
  94. deposit: DepositMovementStep,
  95. pay: PayMovementStep,
  96. }
  97. }
  98. #[tokio::test]
  99. async fn saga_compensation_on_failure() {
  100. let ledger = setup_ledger().await;
  101. // Deposit 50 then try to pay 100 -> pay fails -> deposit should be reversed
  102. let saga = DepositAndOverspend::new(DepositAndOverspendInputs {
  103. deposit: DepositInput {
  104. to: account(1),
  105. asset: usd(),
  106. amount: Cent::from(50),
  107. external: external(),
  108. },
  109. pay: PayInput {
  110. from: account(1),
  111. to: account(2),
  112. asset: usd(),
  113. amount: Cent::from(100), // more than available
  114. },
  115. });
  116. let ctx = LedgerCtx::new(ledger.clone());
  117. let execution = saga.build(ctx);
  118. match execution.start().await {
  119. ExecutionResult::Failed(_, _err) => {
  120. // The deposit should have been compensated (reversed)
  121. // Note: balances won't be exactly 0 because the deposit reversal
  122. // creates new postings, but the net effect should be zero
  123. assert_eq!(
  124. ledger.balance(&account(1), &usd()).await.unwrap(),
  125. Cent::ZERO
  126. );
  127. assert_eq!(
  128. ledger.balance(&external(), &usd()).await.unwrap(),
  129. Cent::ZERO
  130. );
  131. }
  132. other => panic!("expected Failed, got {:?}", result_debug(&other)),
  133. }
  134. }
  135. // Three-step saga
  136. legend! {
  137. ThreeStepFlow<LedgerCtx, SagaError> {
  138. deposit: DepositMovementStep,
  139. pay_ab: PayMovementStep,
  140. pay_bc: PayMovementStep,
  141. }
  142. }
  143. #[tokio::test]
  144. async fn saga_three_steps_happy() {
  145. let ledger = setup_ledger().await;
  146. let saga = ThreeStepFlow::new(ThreeStepFlowInputs {
  147. deposit: DepositInput {
  148. to: account(1),
  149. asset: usd(),
  150. amount: Cent::from(100),
  151. external: external(),
  152. },
  153. pay_ab: PayInput {
  154. from: account(1),
  155. to: account(2),
  156. asset: usd(),
  157. amount: Cent::from(60),
  158. },
  159. pay_bc: PayInput {
  160. from: account(2),
  161. to: account(3),
  162. asset: usd(),
  163. amount: Cent::from(30),
  164. },
  165. });
  166. let ctx = LedgerCtx::new(ledger.clone());
  167. let execution = saga.build(ctx);
  168. match execution.start().await {
  169. ExecutionResult::Completed(e) => {
  170. assert_eq!(e.context().receipts.len(), 3);
  171. }
  172. other => panic!("expected Completed, got {:?}", result_debug(&other)),
  173. }
  174. assert_eq!(
  175. ledger.balance(&account(1), &usd()).await.unwrap(),
  176. Cent::from(40)
  177. );
  178. assert_eq!(
  179. ledger.balance(&account(2), &usd()).await.unwrap(),
  180. Cent::from(30)
  181. );
  182. assert_eq!(
  183. ledger.balance(&account(3), &usd()).await.unwrap(),
  184. Cent::from(30)
  185. );
  186. }
  187. fn result_debug<Ctx, Err, Steps>(r: &ExecutionResult<Ctx, Err, Steps>) -> &'static str
  188. where
  189. Ctx: Send + Sync,
  190. Err: Send + Sync + Clone,
  191. Steps: legend::hlist::InstructionList<Ctx, Err>,
  192. {
  193. match r {
  194. ExecutionResult::Completed(_) => "Completed",
  195. ExecutionResult::Paused(_) => "Paused",
  196. ExecutionResult::Failed(_, _) => "Failed",
  197. ExecutionResult::CompensationFailed { .. } => "CompensationFailed",
  198. }
  199. }