saga.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #![allow(missing_docs)]
  2. use std::sync::Arc;
  3. use kuatia::ledger::Ledger;
  4. use kuatia::mem_store::InMemoryStore;
  5. use kuatia::saga::*;
  6. use kuatia_core::*;
  7. use legend::{ExecutionResult, legend};
  8. use std::collections::BTreeMap;
  9. fn usd() -> AssetId {
  10. AssetId::new(1)
  11. }
  12. fn account(id: i64) -> AccountId {
  13. AccountId::new(id)
  14. }
  15. fn external() -> AccountId {
  16. AccountId::new(99)
  17. }
  18. fn make_account(id: i64, policy: AccountPolicy) -> Account {
  19. Account {
  20. id: AccountId::new(id),
  21. version: 1,
  22. policy,
  23. flags: AccountFlags::empty(),
  24. book: 0,
  25. code: 0,
  26. user_data: UserData::default(),
  27. metadata: BTreeMap::new(),
  28. }
  29. }
  30. async fn setup_ledger() -> Arc<Ledger> {
  31. let store = InMemoryStore::new();
  32. let ledger = Arc::new(Ledger::new(store));
  33. for (id, policy) in [
  34. (1, AccountPolicy::NoOverdraft),
  35. (2, AccountPolicy::NoOverdraft),
  36. (3, AccountPolicy::NoOverdraft),
  37. (99, AccountPolicy::ExternalAccount),
  38. ] {
  39. ledger
  40. .store()
  41. .create_account(make_account(id, policy))
  42. .await
  43. .unwrap();
  44. }
  45. ledger
  46. }
  47. // Define a two-step saga: deposit then pay
  48. legend! {
  49. FundAndPay<LedgerCtx, SagaError> {
  50. deposit: DepositMovementStep,
  51. pay: PayMovementStep,
  52. }
  53. }
  54. #[tokio::test]
  55. async fn saga_happy_path() {
  56. let ledger = setup_ledger().await;
  57. let saga = FundAndPay::new(FundAndPayInputs {
  58. deposit: DepositInput {
  59. to: account(1),
  60. asset: usd(),
  61. amount: Cent::from(100),
  62. external: external(),
  63. },
  64. pay: PayInput {
  65. from: account(1),
  66. to: account(2),
  67. asset: usd(),
  68. amount: Cent::from(60),
  69. },
  70. });
  71. let ctx = LedgerCtx::new(ledger.clone());
  72. let execution = saga.build(ctx);
  73. match execution.start().await {
  74. ExecutionResult::Completed(e) => {
  75. assert_eq!(e.context().receipts.len(), 2);
  76. }
  77. other => panic!("expected Completed, got {:?}", result_debug(&other)),
  78. }
  79. assert_eq!(
  80. ledger.balance(&account(1), &usd()).await.unwrap(),
  81. Cent::from(40)
  82. );
  83. assert_eq!(
  84. ledger.balance(&account(2), &usd()).await.unwrap(),
  85. Cent::from(60)
  86. );
  87. assert_eq!(
  88. ledger.balance(&external(), &usd()).await.unwrap(),
  89. Cent::from(-100)
  90. );
  91. }
  92. // Define a saga that will fail on the second step and trigger compensation
  93. legend! {
  94. DepositAndOverspend<LedgerCtx, SagaError> {
  95. deposit: DepositMovementStep,
  96. pay: PayMovementStep,
  97. }
  98. }
  99. #[tokio::test]
  100. async fn saga_compensation_on_failure() {
  101. let ledger = setup_ledger().await;
  102. // Deposit 50 then try to pay 100 -> pay fails -> deposit should be reversed
  103. let saga = DepositAndOverspend::new(DepositAndOverspendInputs {
  104. deposit: DepositInput {
  105. to: account(1),
  106. asset: usd(),
  107. amount: Cent::from(50),
  108. external: external(),
  109. },
  110. pay: PayInput {
  111. from: account(1),
  112. to: account(2),
  113. asset: usd(),
  114. amount: Cent::from(100), // more than available
  115. },
  116. });
  117. let ctx = LedgerCtx::new(ledger.clone());
  118. let execution = saga.build(ctx);
  119. match execution.start().await {
  120. ExecutionResult::Failed(_, _err) => {
  121. // The deposit should have been compensated (reversed)
  122. // Note: balances won't be exactly 0 because the deposit reversal
  123. // creates new postings, but the net effect should be zero
  124. assert_eq!(
  125. ledger.balance(&account(1), &usd()).await.unwrap(),
  126. Cent::ZERO
  127. );
  128. assert_eq!(
  129. ledger.balance(&external(), &usd()).await.unwrap(),
  130. Cent::ZERO
  131. );
  132. }
  133. other => panic!("expected Failed, got {:?}", result_debug(&other)),
  134. }
  135. }
  136. // Three-step saga
  137. legend! {
  138. ThreeStepFlow<LedgerCtx, SagaError> {
  139. deposit: DepositMovementStep,
  140. pay_ab: PayMovementStep,
  141. pay_bc: PayMovementStep,
  142. }
  143. }
  144. #[tokio::test]
  145. async fn saga_three_steps_happy() {
  146. let ledger = setup_ledger().await;
  147. let saga = ThreeStepFlow::new(ThreeStepFlowInputs {
  148. deposit: DepositInput {
  149. to: account(1),
  150. asset: usd(),
  151. amount: Cent::from(100),
  152. external: external(),
  153. },
  154. pay_ab: PayInput {
  155. from: account(1),
  156. to: account(2),
  157. asset: usd(),
  158. amount: Cent::from(60),
  159. },
  160. pay_bc: PayInput {
  161. from: account(2),
  162. to: account(3),
  163. asset: usd(),
  164. amount: Cent::from(30),
  165. },
  166. });
  167. let ctx = LedgerCtx::new(ledger.clone());
  168. let execution = saga.build(ctx);
  169. match execution.start().await {
  170. ExecutionResult::Completed(e) => {
  171. assert_eq!(e.context().receipts.len(), 3);
  172. }
  173. other => panic!("expected Completed, got {:?}", result_debug(&other)),
  174. }
  175. assert_eq!(
  176. ledger.balance(&account(1), &usd()).await.unwrap(),
  177. Cent::from(40)
  178. );
  179. assert_eq!(
  180. ledger.balance(&account(2), &usd()).await.unwrap(),
  181. Cent::from(30)
  182. );
  183. assert_eq!(
  184. ledger.balance(&account(3), &usd()).await.unwrap(),
  185. Cent::from(30)
  186. );
  187. }
  188. fn result_debug<Ctx, Err, Steps>(r: &ExecutionResult<Ctx, Err, Steps>) -> &'static str
  189. where
  190. Ctx: Send + Sync,
  191. Err: Send + Sync + Clone,
  192. Steps: legend::hlist::InstructionList<Ctx, Err>,
  193. {
  194. match r {
  195. ExecutionResult::Completed(_) => "Completed",
  196. ExecutionResult::Paused(_) => "Paused",
  197. ExecutionResult::Failed(_, _) => "Failed",
  198. ExecutionResult::CompensationFailed { .. } => "CompensationFailed",
  199. }
  200. }