| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #![allow(missing_docs)]
- use std::sync::Arc;
- use kuatia::ledger::Ledger;
- use kuatia::mem_store::InMemoryStore;
- use kuatia::saga::*;
- use kuatia_core::*;
- use legend::{ExecutionResult, legend};
- use std::collections::BTreeMap;
- fn usd() -> AssetId {
- AssetId::new(1)
- }
- fn account(id: i64) -> AccountId {
- AccountId::new(id)
- }
- fn external() -> AccountId {
- AccountId::new(99)
- }
- fn make_account(id: i64, policy: AccountPolicy) -> Account {
- Account {
- id: AccountId::new(id),
- version: 1,
- policy,
- flags: AccountFlags::empty(),
- book: 0,
- code: 0,
- user_data: UserData::default(),
- metadata: BTreeMap::new(),
- }
- }
- async fn setup_ledger() -> Arc<Ledger> {
- let store = InMemoryStore::new();
- let ledger = Arc::new(Ledger::new(store));
- for (id, policy) in [
- (1, AccountPolicy::NoOverdraft),
- (2, AccountPolicy::NoOverdraft),
- (3, AccountPolicy::NoOverdraft),
- (99, AccountPolicy::ExternalAccount),
- ] {
- ledger
- .store()
- .create_account(make_account(id, policy))
- .await
- .unwrap();
- }
- ledger
- }
- // Define a two-step saga: deposit then pay
- legend! {
- FundAndPay<LedgerCtx, SagaError> {
- deposit: DepositMovementStep,
- pay: PayMovementStep,
- }
- }
- #[tokio::test]
- async fn saga_happy_path() {
- let ledger = setup_ledger().await;
- let saga = FundAndPay::new(FundAndPayInputs {
- deposit: DepositInput {
- to: account(1),
- asset: usd(),
- amount: Cent::from(100),
- external: external(),
- },
- pay: PayInput {
- from: account(1),
- to: account(2),
- asset: usd(),
- amount: Cent::from(60),
- },
- });
- let ctx = LedgerCtx::new(ledger.clone());
- let execution = saga.build(ctx);
- match execution.start().await {
- ExecutionResult::Completed(e) => {
- assert_eq!(e.context().receipts.len(), 2);
- }
- other => panic!("expected Completed, got {:?}", result_debug(&other)),
- }
- assert_eq!(
- ledger.balance(&account(1), &usd()).await.unwrap(),
- Cent::from(40)
- );
- assert_eq!(
- ledger.balance(&account(2), &usd()).await.unwrap(),
- Cent::from(60)
- );
- assert_eq!(
- ledger.balance(&external(), &usd()).await.unwrap(),
- Cent::from(-100)
- );
- }
- // Define a saga that will fail on the second step and trigger compensation
- legend! {
- DepositAndOverspend<LedgerCtx, SagaError> {
- deposit: DepositMovementStep,
- pay: PayMovementStep,
- }
- }
- #[tokio::test]
- async fn saga_compensation_on_failure() {
- let ledger = setup_ledger().await;
- // Deposit 50 then try to pay 100 -> pay fails -> deposit should be reversed
- let saga = DepositAndOverspend::new(DepositAndOverspendInputs {
- deposit: DepositInput {
- to: account(1),
- asset: usd(),
- amount: Cent::from(50),
- external: external(),
- },
- pay: PayInput {
- from: account(1),
- to: account(2),
- asset: usd(),
- amount: Cent::from(100), // more than available
- },
- });
- let ctx = LedgerCtx::new(ledger.clone());
- let execution = saga.build(ctx);
- match execution.start().await {
- ExecutionResult::Failed(_, _err) => {
- // The deposit should have been compensated (reversed)
- // Note: balances won't be exactly 0 because the deposit reversal
- // creates new postings, but the net effect should be zero
- assert_eq!(
- ledger.balance(&account(1), &usd()).await.unwrap(),
- Cent::ZERO
- );
- assert_eq!(
- ledger.balance(&external(), &usd()).await.unwrap(),
- Cent::ZERO
- );
- }
- other => panic!("expected Failed, got {:?}", result_debug(&other)),
- }
- }
- // Three-step saga
- legend! {
- ThreeStepFlow<LedgerCtx, SagaError> {
- deposit: DepositMovementStep,
- pay_ab: PayMovementStep,
- pay_bc: PayMovementStep,
- }
- }
- #[tokio::test]
- async fn saga_three_steps_happy() {
- let ledger = setup_ledger().await;
- let saga = ThreeStepFlow::new(ThreeStepFlowInputs {
- deposit: DepositInput {
- to: account(1),
- asset: usd(),
- amount: Cent::from(100),
- external: external(),
- },
- pay_ab: PayInput {
- from: account(1),
- to: account(2),
- asset: usd(),
- amount: Cent::from(60),
- },
- pay_bc: PayInput {
- from: account(2),
- to: account(3),
- asset: usd(),
- amount: Cent::from(30),
- },
- });
- let ctx = LedgerCtx::new(ledger.clone());
- let execution = saga.build(ctx);
- match execution.start().await {
- ExecutionResult::Completed(e) => {
- assert_eq!(e.context().receipts.len(), 3);
- }
- other => panic!("expected Completed, got {:?}", result_debug(&other)),
- }
- assert_eq!(
- ledger.balance(&account(1), &usd()).await.unwrap(),
- Cent::from(40)
- );
- assert_eq!(
- ledger.balance(&account(2), &usd()).await.unwrap(),
- Cent::from(30)
- );
- assert_eq!(
- ledger.balance(&account(3), &usd()).await.unwrap(),
- Cent::from(30)
- );
- }
- fn result_debug<Ctx, Err, Steps>(r: &ExecutionResult<Ctx, Err, Steps>) -> &'static str
- where
- Ctx: Send + Sync,
- Err: Send + Sync + Clone,
- Steps: legend::hlist::InstructionList<Ctx, Err>,
- {
- match r {
- ExecutionResult::Completed(_) => "Completed",
- ExecutionResult::Paused(_) => "Paused",
- ExecutionResult::Failed(_, _) => "Failed",
- ExecutionResult::CompensationFailed { .. } => "CompensationFailed",
- }
- }
|