|
@@ -23,7 +23,6 @@ pub(crate) fn now_millis() -> Result<i64, LedgerError> {
|
|
|
}
|
|
}
|
|
|
use crate::saga::{
|
|
use crate::saga::{
|
|
|
FinalizeInput, FinalizeTransferStep, LedgerCtx, ReserveInput, ReservePostingsStep, SagaError,
|
|
FinalizeInput, FinalizeTransferStep, LedgerCtx, ReserveInput, ReservePostingsStep, SagaError,
|
|
|
- ValidateInput, ValidateTransferStep,
|
|
|
|
|
};
|
|
};
|
|
|
use kuatia_storage::error::StoreError;
|
|
use kuatia_storage::error::StoreError;
|
|
|
use kuatia_storage::events::{LedgerEvent, LedgerEventKind};
|
|
use kuatia_storage::events::{LedgerEvent, LedgerEventKind};
|
|
@@ -35,20 +34,33 @@ mod envelope_saga {
|
|
|
legend! {
|
|
legend! {
|
|
|
EnvelopeSaga<LedgerCtx, SagaError> {
|
|
EnvelopeSaga<LedgerCtx, SagaError> {
|
|
|
reserve: ReservePostingsStep,
|
|
reserve: ReservePostingsStep,
|
|
|
- validate: ValidateTransferStep,
|
|
|
|
|
finalize: FinalizeTransferStep,
|
|
finalize: FinalizeTransferStep,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
use envelope_saga::*;
|
|
use envelope_saga::*;
|
|
|
|
|
|
|
|
|
|
+/// Phase of an in-flight commit, persisted with the write-ahead record so
|
|
|
|
|
+/// recovery knows whether validation has completed.
|
|
|
|
|
+#[derive(Clone, Copy, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
|
|
|
|
|
+enum SagaPhase {
|
|
|
|
|
+ /// Saved before reserve. Validation has not necessarily run, so recovery must
|
|
|
|
|
+ /// re-reserve and re-validate before it can commit.
|
|
|
|
|
+ Reserving,
|
|
|
|
|
+ /// Saved at the start of finalize — after validation passed and just before
|
|
|
|
|
+ /// the consumed postings begin turning `Inactive` (the point of no return).
|
|
|
|
|
+ /// Recovery rolls forward without re-validating.
|
|
|
|
|
+ Finalizing,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
/// Write-ahead record for an in-flight commit, persisted via `SagaStore` before
|
|
/// Write-ahead record for an in-flight commit, persisted via `SagaStore` before
|
|
|
/// the saga mutates anything and removed once it reaches a terminal state. On
|
|
/// the saga mutates anything and removed once it reaches a terminal state. On
|
|
|
-/// startup [`Ledger::recover`] re-drives any that survive a crash.
|
|
|
|
|
|
|
+/// startup [`Ledger::recover`] completes any that survive a crash.
|
|
|
#[derive(serde::Serialize, serde::Deserialize)]
|
|
#[derive(serde::Serialize, serde::Deserialize)]
|
|
|
struct PendingSaga {
|
|
struct PendingSaga {
|
|
|
envelope: Envelope,
|
|
envelope: Envelope,
|
|
|
reservation: kuatia_core::ReservationId,
|
|
reservation: kuatia_core::ReservationId,
|
|
|
|
|
+ phase: SagaPhase,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Async ledger resource composing the commit pipeline.
|
|
/// Async ledger resource composing the commit pipeline.
|
|
@@ -250,7 +262,7 @@ impl Ledger {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// -----------------------------------------------------------------------
|
|
// -----------------------------------------------------------------------
|
|
|
- // Commit: every commit is the envelope saga (reserve -> validate -> finalize)
|
|
|
|
|
|
|
+ // Commit: every commit is the envelope saga (reserve -> finalize; finalize re-validates)
|
|
|
// -----------------------------------------------------------------------
|
|
// -----------------------------------------------------------------------
|
|
|
|
|
|
|
|
/// Commit a [`Transfer`] intent. Resolves it into a concrete envelope, then
|
|
/// Commit a [`Transfer`] intent. Resolves it into a concrete envelope, then
|
|
@@ -288,25 +300,32 @@ impl Ledger {
|
|
|
return Ok(record.receipt);
|
|
return Ok(record.receipt);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Write-ahead: persist {envelope, reservation} so recovery can re-drive.
|
|
|
|
|
|
|
+ // Write-ahead: persist {envelope, reservation, phase=Reserving} before any
|
|
|
|
|
+ // mutation. The finalize step bumps the phase to Finalizing.
|
|
|
let reservation = kuatia_core::ReservationId::default();
|
|
let reservation = kuatia_core::ReservationId::default();
|
|
|
let saga_id = reservation.0;
|
|
let saga_id = reservation.0;
|
|
|
- let blob = serde_json::to_vec(&PendingSaga {
|
|
|
|
|
- envelope: envelope.clone(),
|
|
|
|
|
- reservation,
|
|
|
|
|
- })
|
|
|
|
|
- .map_err(|e| LedgerError::Store(StoreError::Internal(e.to_string())))?;
|
|
|
|
|
- self.store.save_saga(&saga_id, blob).await?;
|
|
|
|
|
|
|
+ self.save_pending(&envelope, reservation, SagaPhase::Reserving)
|
|
|
|
|
+ .await?;
|
|
|
|
|
|
|
|
let result = self.drive_envelope_saga(envelope, reservation).await;
|
|
let result = self.drive_envelope_saga(envelope, reservation).await;
|
|
|
|
|
|
|
|
- // Terminal: drop the pending record whether we committed or compensated.
|
|
|
|
|
- self.store.delete_saga(&saga_id).await?;
|
|
|
|
|
|
|
+ // Delete the pending record only when it is safe: on success, or on a
|
|
|
|
|
+ // failure that never reached finalize (phase still Reserving → the saga's
|
|
|
|
|
+ // compensation released our reservation, nothing of ours was applied). If
|
|
|
|
|
+ // finalize started (Finalizing) and failed, keep it so `recover()` rolls
|
|
|
|
|
+ // the half-applied commit forward.
|
|
|
|
|
+ let safe_to_delete = match &result {
|
|
|
|
|
+ Ok(_) => true,
|
|
|
|
|
+ Err(_) => self.read_pending_phase(saga_id).await? != Some(SagaPhase::Finalizing),
|
|
|
|
|
+ };
|
|
|
|
|
+ if safe_to_delete {
|
|
|
|
|
+ self.store.delete_saga(&saga_id).await?;
|
|
|
|
|
+ }
|
|
|
result
|
|
result
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /// Build and run the envelope saga to a terminal outcome, returning the
|
|
|
|
|
- /// resulting receipt.
|
|
|
|
|
|
|
+ /// Build and run the envelope saga (reserve → finalize) to a terminal
|
|
|
|
|
+ /// outcome, returning the resulting receipt.
|
|
|
async fn drive_envelope_saga(
|
|
async fn drive_envelope_saga(
|
|
|
self: &Arc<Self>,
|
|
self: &Arc<Self>,
|
|
|
envelope: Envelope,
|
|
envelope: Envelope,
|
|
@@ -314,7 +333,6 @@ impl Ledger {
|
|
|
) -> Result<Receipt, LedgerError> {
|
|
) -> Result<Receipt, LedgerError> {
|
|
|
let saga = EnvelopeSaga::new(EnvelopeSagaInputs {
|
|
let saga = EnvelopeSaga::new(EnvelopeSagaInputs {
|
|
|
reserve: ReserveInput,
|
|
reserve: ReserveInput,
|
|
|
- validate: ValidateInput,
|
|
|
|
|
finalize: FinalizeInput,
|
|
finalize: FinalizeInput,
|
|
|
});
|
|
});
|
|
|
let ctx = LedgerCtx::for_envelope(Arc::clone(self), envelope, reservation);
|
|
let ctx = LedgerCtx::for_envelope(Arc::clone(self), envelope, reservation);
|
|
@@ -350,14 +368,16 @@ impl Ledger {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /// Re-drive every pending saga to completion. Call on startup to recover
|
|
|
|
|
- /// commits interrupted by a crash, returning how many were processed.
|
|
|
|
|
|
|
+ /// Complete every pending saga left by a crash. Call on startup; returns how
|
|
|
|
|
+ /// many were processed.
|
|
|
///
|
|
///
|
|
|
- /// Recovery does **not** re-run reserve/validate — those reject already-
|
|
|
|
|
- /// consumed postings, and the envelope was already validated when first
|
|
|
|
|
- /// committed. Instead it force-completes the envelope through the idempotent
|
|
|
|
|
- /// primitives with the original reservation, so a crash at any point
|
|
|
|
|
- /// (pre-reserve, reserved, or mid-finalize) converges to the committed state.
|
|
|
|
|
|
|
+ /// Recovery branches on the persisted phase. A `Reserving` saga had not
|
|
|
|
|
+ /// necessarily validated, so it is re-run through the real saga (which
|
|
|
|
|
+ /// re-reserves and **re-validates** — aborting cleanly if the postings were
|
|
|
|
|
+ /// taken or an account was frozen meanwhile). A `Finalizing` saga had already
|
|
|
|
|
+ /// validated and owns its postings, so it is rolled forward through the
|
|
|
|
|
+ /// verified `finalize_envelope`. Either way the record is removed only once
|
|
|
|
|
+ /// the work is committed or safely abandoned.
|
|
|
#[instrument(skip(self), name = "ledger.recover")]
|
|
#[instrument(skip(self), name = "ledger.recover")]
|
|
|
pub async fn recover(self: &Arc<Self>) -> Result<usize, LedgerError> {
|
|
pub async fn recover(self: &Arc<Self>) -> Result<usize, LedgerError> {
|
|
|
let pending = self.store.list_pending_sagas().await?;
|
|
let pending = self.store.list_pending_sagas().await?;
|
|
@@ -366,35 +386,101 @@ impl Ledger {
|
|
|
let PendingSaga {
|
|
let PendingSaga {
|
|
|
envelope,
|
|
envelope,
|
|
|
reservation,
|
|
reservation,
|
|
|
|
|
+ phase,
|
|
|
} = serde_json::from_slice(&blob)
|
|
} = serde_json::from_slice(&blob)
|
|
|
.map_err(|e| LedgerError::Store(StoreError::Internal(e.to_string())))?;
|
|
.map_err(|e| LedgerError::Store(StoreError::Internal(e.to_string())))?;
|
|
|
- self.complete_envelope(&envelope, reservation).await?;
|
|
|
|
|
- self.store.delete_saga(&saga_id).await?;
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // Already committed (crashed after store_transfer) → nothing to do.
|
|
|
|
|
+ if self.store.get_transfer(&envelope_id(&envelope)).await?.is_some() {
|
|
|
|
|
+ self.store.delete_saga(&saga_id).await?;
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ match phase {
|
|
|
|
|
+ SagaPhase::Finalizing => {
|
|
|
|
|
+ // Validation passed and the postings are ours; roll forward.
|
|
|
|
|
+ // Keep the record if completion fails so a later run retries.
|
|
|
|
|
+ if self.finalize_envelope(&envelope, reservation).await.is_ok() {
|
|
|
|
|
+ self.store.delete_saga(&saga_id).await?;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ SagaPhase::Reserving => {
|
|
|
|
|
+ // Re-run the validating saga. On failure, delete only if it did
|
|
|
|
|
+ // not reach finalize (clean abort); otherwise keep for next run.
|
|
|
|
|
+ let result = self.drive_envelope_saga(envelope, reservation).await;
|
|
|
|
|
+ let safe_to_delete = result.is_ok()
|
|
|
|
|
+ || self.read_pending_phase(saga_id).await?
|
|
|
|
|
+ != Some(SagaPhase::Finalizing);
|
|
|
|
|
+ if safe_to_delete {
|
|
|
|
|
+ self.store.delete_saga(&saga_id).await?;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
Ok(count)
|
|
Ok(count)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /// Idempotently push `envelope`'s postings and record to their committed
|
|
|
|
|
- /// state. Safe to call from any partial point: each primitive no-ops what is
|
|
|
|
|
- /// already done. Used by [`recover`].
|
|
|
|
|
- async fn complete_envelope(
|
|
|
|
|
|
|
+ /// Idempotently finalize `envelope` to its committed state, **verifying every
|
|
|
|
|
+ /// step's end-state**. Used by the saga's finalize step and by recovery.
|
|
|
|
|
+ ///
|
|
|
|
|
+ /// When the consumed postings are still pre-deactivation it re-validates
|
|
|
|
|
+ /// against current state (the last-step floor / freeze-close guard) and then
|
|
|
|
|
+ /// marks the saga `Finalizing` (the point of no return). Once any consumed
|
|
|
|
|
+ /// posting is already `Inactive` — a prior attempt or recovery passed that
|
|
|
|
|
+ /// point — it rolls forward without re-validating (validation rejects
|
|
|
|
|
+ /// `Inactive`). It never creates or stores anything unless **all** consumed
|
|
|
|
|
+ /// postings are confirmed `Inactive`, which is the double-spend guard.
|
|
|
|
|
+ pub(crate) async fn finalize_envelope(
|
|
|
&self,
|
|
&self,
|
|
|
envelope: &Envelope,
|
|
envelope: &Envelope,
|
|
|
reservation: kuatia_core::ReservationId,
|
|
reservation: kuatia_core::ReservationId,
|
|
|
- ) -> Result<(), LedgerError> {
|
|
|
|
|
|
|
+ ) -> Result<Receipt, LedgerError> {
|
|
|
let tid = envelope_id(envelope);
|
|
let tid = envelope_id(envelope);
|
|
|
- if self.store.get_transfer(&tid).await?.is_some() {
|
|
|
|
|
- return Ok(()); // already committed
|
|
|
|
|
|
|
+ if let Some(record) = self.store.get_transfer(&tid).await? {
|
|
|
|
|
+ return Ok(record.receipt); // already committed
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
let consumes = envelope.consumes();
|
|
let consumes = envelope.consumes();
|
|
|
- // Reserve then deactivate: this drives Active → PendingInactive → Inactive,
|
|
|
|
|
- // and each call no-ops anything already past that state.
|
|
|
|
|
- self.store.reserve_postings(consumes, reservation).await?;
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // Read consumed postings (also captures their owners for indexing).
|
|
|
|
|
+ let consumed = if consumes.is_empty() {
|
|
|
|
|
+ Vec::new()
|
|
|
|
|
+ } else {
|
|
|
|
|
+ self.store.get_postings(consumes).await?
|
|
|
|
|
+ };
|
|
|
|
|
+ let past_no_return = consumed
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .any(|p| p.status == PostingStatus::Inactive);
|
|
|
|
|
+
|
|
|
|
|
+ // Last-step boundary re-check: re-validate floor + freeze/close + snapshots
|
|
|
|
|
+ // against current state, but only while it is still safe (validation
|
|
|
|
|
+ // rejects already-`Inactive` consumed postings).
|
|
|
|
|
+ if !past_no_return {
|
|
|
|
|
+ let loaded = self.load(envelope).await?;
|
|
|
|
|
+ self.plan(envelope, &loaded)?;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Point of no return: record Finalizing before any posting turns Inactive.
|
|
|
|
|
+ self.save_pending(envelope, reservation, SagaPhase::Finalizing)
|
|
|
|
|
+ .await?;
|
|
|
|
|
+
|
|
|
|
|
+ // Deactivate consumed postings (PendingInactive owned by us → Inactive),
|
|
|
|
|
+ // then assert ALL consumed postings are Inactive. This is the double-spend
|
|
|
|
|
+ // guard: do not create/store unless the inputs were really consumed by us.
|
|
|
self.store
|
|
self.store
|
|
|
.deactivate_postings(consumes, Some(reservation))
|
|
.deactivate_postings(consumes, Some(reservation))
|
|
|
.await?;
|
|
.await?;
|
|
|
|
|
+ if !consumes.is_empty() {
|
|
|
|
|
+ let after = self.store.get_postings(consumes).await?;
|
|
|
|
|
+ if after.len() != consumes.len()
|
|
|
|
|
+ || after.iter().any(|p| p.status != PostingStatus::Inactive)
|
|
|
|
|
+ {
|
|
|
|
|
+ return Err(LedgerError::Store(StoreError::Internal(
|
|
|
|
|
+ "finalize: consumed postings not all inactive (contended or not reserved by this saga)".into(),
|
|
|
|
|
+ )));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
+ // Created postings, derived deterministically from the envelope.
|
|
|
let created: Vec<Posting> = envelope
|
|
let created: Vec<Posting> = envelope
|
|
|
.creates()
|
|
.creates()
|
|
|
.iter()
|
|
.iter()
|
|
@@ -412,25 +498,38 @@ impl Ledger {
|
|
|
})
|
|
})
|
|
|
.collect();
|
|
.collect();
|
|
|
self.store.insert_postings(&created).await?;
|
|
self.store.insert_postings(&created).await?;
|
|
|
|
|
+ if !created.is_empty() {
|
|
|
|
|
+ let ids: Vec<PostingId> = created.iter().map(|p| p.id).collect();
|
|
|
|
|
+ if self.store.get_postings(&ids).await?.len() != created.len() {
|
|
|
|
|
+ return Err(LedgerError::Store(StoreError::Internal(
|
|
|
|
|
+ "finalize: created postings missing after insert".into(),
|
|
|
|
|
+ )));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
+ // Index both created and consumed owners.
|
|
|
let mut involved: Vec<AccountId> = created.iter().map(|p| p.owner).collect();
|
|
let mut involved: Vec<AccountId> = created.iter().map(|p| p.owner).collect();
|
|
|
- if !consumes.is_empty() {
|
|
|
|
|
- let consumed = self.store.get_postings(consumes).await?;
|
|
|
|
|
- involved.extend(consumed.iter().map(|p| p.owner));
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ involved.extend(consumed.iter().map(|p| p.owner));
|
|
|
involved.sort();
|
|
involved.sort();
|
|
|
involved.dedup();
|
|
involved.dedup();
|
|
|
|
|
|
|
|
|
|
+ let receipt = Receipt { transfer_id: tid };
|
|
|
self.store
|
|
self.store
|
|
|
.store_transfer(
|
|
.store_transfer(
|
|
|
EnvelopeRecord {
|
|
EnvelopeRecord {
|
|
|
envelope: envelope.clone(),
|
|
envelope: envelope.clone(),
|
|
|
- receipt: Receipt { transfer_id: tid },
|
|
|
|
|
|
|
+ receipt: receipt.clone(),
|
|
|
created_at: now_millis()?,
|
|
created_at: now_millis()?,
|
|
|
},
|
|
},
|
|
|
&involved,
|
|
&involved,
|
|
|
)
|
|
)
|
|
|
.await?;
|
|
.await?;
|
|
|
|
|
+ if self.store.get_transfer(&tid).await?.is_none() {
|
|
|
|
|
+ return Err(LedgerError::Store(StoreError::Internal(
|
|
|
|
|
+ "finalize: transfer record missing after store".into(),
|
|
|
|
|
+ )));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
self.store
|
|
self.store
|
|
|
.append_event(&LedgerEvent {
|
|
.append_event(&LedgerEvent {
|
|
|
seq: 0,
|
|
seq: 0,
|
|
@@ -438,9 +537,38 @@ impl Ledger {
|
|
|
kind: LedgerEventKind::TransferCommitted { transfer_id: tid },
|
|
kind: LedgerEventKind::TransferCommitted { transfer_id: tid },
|
|
|
})
|
|
})
|
|
|
.await?;
|
|
.await?;
|
|
|
|
|
+ Ok(receipt)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /// Persist the write-ahead pending-saga record (upsert on the reservation id).
|
|
|
|
|
+ async fn save_pending(
|
|
|
|
|
+ &self,
|
|
|
|
|
+ envelope: &Envelope,
|
|
|
|
|
+ reservation: kuatia_core::ReservationId,
|
|
|
|
|
+ phase: SagaPhase,
|
|
|
|
|
+ ) -> Result<(), LedgerError> {
|
|
|
|
|
+ let blob = serde_json::to_vec(&PendingSaga {
|
|
|
|
|
+ envelope: envelope.clone(),
|
|
|
|
|
+ reservation,
|
|
|
|
|
+ phase,
|
|
|
|
|
+ })
|
|
|
|
|
+ .map_err(|e| LedgerError::Store(StoreError::Internal(e.to_string())))?;
|
|
|
|
|
+ self.store.save_saga(&reservation.0, blob).await?;
|
|
|
Ok(())
|
|
Ok(())
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /// Read the persisted phase of a pending saga, if it still exists.
|
|
|
|
|
+ async fn read_pending_phase(&self, saga_id: i64) -> Result<Option<SagaPhase>, LedgerError> {
|
|
|
|
|
+ for (id, blob) in self.store.list_pending_sagas().await? {
|
|
|
|
|
+ if id == saga_id {
|
|
|
|
|
+ let pending: PendingSaga = serde_json::from_slice(&blob)
|
|
|
|
|
+ .map_err(|e| LedgerError::Store(StoreError::Internal(e.to_string())))?;
|
|
|
|
|
+ return Ok(Some(pending.phase));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(None)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// -----------------------------------------------------------------------
|
|
// -----------------------------------------------------------------------
|
|
|
// Reverse
|
|
// Reverse
|
|
|
// -----------------------------------------------------------------------
|
|
// -----------------------------------------------------------------------
|
|
@@ -740,7 +868,7 @@ pub struct LoadedState {
|
|
|
#[cfg(test)]
|
|
#[cfg(test)]
|
|
|
mod recovery_tests {
|
|
mod recovery_tests {
|
|
|
use super::*;
|
|
use super::*;
|
|
|
- use kuatia_core::{Account, AccountFlags, TransferBuilder, UserData};
|
|
|
|
|
|
|
+ use kuatia_core::{Account, AccountFlags, ReservationId, TransferBuilder, UserData};
|
|
|
use kuatia_storage::mem_store::InMemoryStore;
|
|
use kuatia_storage::mem_store::InMemoryStore;
|
|
|
use std::collections::BTreeMap;
|
|
use std::collections::BTreeMap;
|
|
|
|
|
|
|
@@ -756,106 +884,125 @@ mod recovery_tests {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /// A commit interrupted right after its write-ahead record (before any step)
|
|
|
|
|
- /// is completed by `recover()`: the postings move and the record is cleared.
|
|
|
|
|
- #[tokio::test]
|
|
|
|
|
- async fn recover_redrives_pending_saga() {
|
|
|
|
|
|
|
+ async fn funded_ledger() -> Arc<Ledger> {
|
|
|
let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
|
|
let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
|
|
|
for (id, p) in [
|
|
for (id, p) in [
|
|
|
(1, AccountPolicy::NoOverdraft),
|
|
(1, AccountPolicy::NoOverdraft),
|
|
|
(2, AccountPolicy::NoOverdraft),
|
|
(2, AccountPolicy::NoOverdraft),
|
|
|
|
|
+ (3, AccountPolicy::NoOverdraft),
|
|
|
(99, AccountPolicy::ExternalAccount),
|
|
(99, AccountPolicy::ExternalAccount),
|
|
|
] {
|
|
] {
|
|
|
ledger.store().create_account(acct(id, p)).await.unwrap();
|
|
ledger.store().create_account(acct(id, p)).await.unwrap();
|
|
|
}
|
|
}
|
|
|
- // Fund account 1.
|
|
|
|
|
let deposit = TransferBuilder::new()
|
|
let deposit = TransferBuilder::new()
|
|
|
.deposit(AccountId::new(1), AssetId::new(1), Cent::from(100), AccountId::new(99))
|
|
.deposit(AccountId::new(1), AssetId::new(1), Cent::from(100), AccountId::new(99))
|
|
|
.unwrap()
|
|
.unwrap()
|
|
|
.build();
|
|
.build();
|
|
|
ledger.commit(deposit).await.unwrap();
|
|
ledger.commit(deposit).await.unwrap();
|
|
|
|
|
+ ledger
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Resolve a pay envelope but persist it as a pending saga WITHOUT running
|
|
|
|
|
- // it — simulating a crash right after the write-ahead record.
|
|
|
|
|
- let pay = TransferBuilder::new()
|
|
|
|
|
|
|
+ fn pay_transfer() -> Transfer {
|
|
|
|
|
+ TransferBuilder::new()
|
|
|
.pay(AccountId::new(1), AccountId::new(2), AssetId::new(1), Cent::from(40))
|
|
.pay(AccountId::new(1), AccountId::new(2), AssetId::new(1), Cent::from(40))
|
|
|
- .build();
|
|
|
|
|
- let envelope = ledger.resolve(&pay).await.unwrap();
|
|
|
|
|
- let reservation = kuatia_core::ReservationId::default();
|
|
|
|
|
|
|
+ .build()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ async fn save_pending(ledger: &Arc<Ledger>, envelope: &Envelope, rid: ReservationId, phase: SagaPhase) {
|
|
|
let blob = serde_json::to_vec(&PendingSaga {
|
|
let blob = serde_json::to_vec(&PendingSaga {
|
|
|
- envelope,
|
|
|
|
|
- reservation,
|
|
|
|
|
|
|
+ envelope: envelope.clone(),
|
|
|
|
|
+ reservation: rid,
|
|
|
|
|
+ phase,
|
|
|
})
|
|
})
|
|
|
.unwrap();
|
|
.unwrap();
|
|
|
- ledger.store().save_saga(&reservation.0, blob).await.unwrap();
|
|
|
|
|
|
|
+ ledger.store().save_saga(&rid.0, blob).await.unwrap();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /// A commit interrupted right after its write-ahead record (phase Reserving,
|
|
|
|
|
+ /// before any step) is re-run and completed by `recover()`.
|
|
|
|
|
+ #[tokio::test]
|
|
|
|
|
+ async fn recover_redrives_reserving_saga() {
|
|
|
|
|
+ let ledger = funded_ledger().await;
|
|
|
|
|
+ let envelope = ledger.resolve(&pay_transfer()).await.unwrap();
|
|
|
|
|
+ let rid = ReservationId::default();
|
|
|
|
|
+ save_pending(&ledger, &envelope, rid, SagaPhase::Reserving).await;
|
|
|
|
|
|
|
|
- // Recover re-drives it to completion.
|
|
|
|
|
assert_eq!(ledger.recover().await.unwrap(), 1);
|
|
assert_eq!(ledger.recover().await.unwrap(), 1);
|
|
|
- assert_eq!(
|
|
|
|
|
- ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(),
|
|
|
|
|
- Cent::from(40)
|
|
|
|
|
- );
|
|
|
|
|
- assert_eq!(
|
|
|
|
|
- ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(),
|
|
|
|
|
- Cent::from(60)
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(), Cent::from(40));
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(), Cent::from(60));
|
|
|
assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
|
|
assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /// A commit that crashed **mid-finalize** — consumed posting already flipped
|
|
|
|
|
- /// to Inactive but the transfer record not yet written — is still completed by
|
|
|
|
|
- /// `recover()` (reserve/validate are skipped; the primitives no-op the done work).
|
|
|
|
|
|
|
+ /// A commit that crashed mid-finalize (phase Finalizing; the consumed posting
|
|
|
|
|
+ /// is already Inactive) is rolled forward by `recover()`.
|
|
|
#[tokio::test]
|
|
#[tokio::test]
|
|
|
async fn recover_completes_partial_finalize() {
|
|
async fn recover_completes_partial_finalize() {
|
|
|
- let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
|
|
|
|
|
- for (id, p) in [
|
|
|
|
|
- (1, AccountPolicy::NoOverdraft),
|
|
|
|
|
- (2, AccountPolicy::NoOverdraft),
|
|
|
|
|
- (99, AccountPolicy::ExternalAccount),
|
|
|
|
|
- ] {
|
|
|
|
|
- ledger.store().create_account(acct(id, p)).await.unwrap();
|
|
|
|
|
- }
|
|
|
|
|
- let deposit = TransferBuilder::new()
|
|
|
|
|
- .deposit(AccountId::new(1), AssetId::new(1), Cent::from(100), AccountId::new(99))
|
|
|
|
|
- .unwrap()
|
|
|
|
|
- .build();
|
|
|
|
|
- ledger.commit(deposit).await.unwrap();
|
|
|
|
|
-
|
|
|
|
|
- // Resolve a pay envelope and manually run the commit halfway: reserve the
|
|
|
|
|
- // consumed posting and deactivate it (now Inactive), then "crash" — the
|
|
|
|
|
- // transfer record and created postings were never written.
|
|
|
|
|
- let pay = TransferBuilder::new()
|
|
|
|
|
- .pay(AccountId::new(1), AccountId::new(2), AssetId::new(1), Cent::from(40))
|
|
|
|
|
- .build();
|
|
|
|
|
- let envelope = ledger.resolve(&pay).await.unwrap();
|
|
|
|
|
- let reservation = kuatia_core::ReservationId::default();
|
|
|
|
|
|
|
+ let ledger = funded_ledger().await;
|
|
|
|
|
+ let envelope = ledger.resolve(&pay_transfer()).await.unwrap();
|
|
|
|
|
+ let rid = ReservationId::default();
|
|
|
|
|
+ // Run the commit halfway: reserve + deactivate the consumed posting.
|
|
|
let consumes = envelope.consumes().to_vec();
|
|
let consumes = envelope.consumes().to_vec();
|
|
|
- ledger.store().reserve_postings(&consumes, reservation).await.unwrap();
|
|
|
|
|
- let n = ledger
|
|
|
|
|
|
|
+ ledger.store().reserve_postings(&consumes, rid).await.unwrap();
|
|
|
|
|
+ assert_eq!(ledger.store().deactivate_postings(&consumes, Some(rid)).await.unwrap(), 1);
|
|
|
|
|
+ save_pending(&ledger, &envelope, rid, SagaPhase::Finalizing).await;
|
|
|
|
|
+
|
|
|
|
|
+ assert_eq!(ledger.recover().await.unwrap(), 1);
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(), Cent::from(40));
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(), Cent::from(60));
|
|
|
|
|
+ assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /// Recovery of a `Reserving` saga re-validates against current state: if an
|
|
|
|
|
+ /// account was frozen after the write-ahead record, the commit is abandoned —
|
|
|
|
|
+ /// no postings move, the reservation is released, and the record is cleared.
|
|
|
|
|
+ #[tokio::test]
|
|
|
|
|
+ async fn recover_revalidates_and_aborts_when_account_frozen() {
|
|
|
|
|
+ let ledger = funded_ledger().await;
|
|
|
|
|
+ let envelope = ledger.resolve(&pay_transfer()).await.unwrap();
|
|
|
|
|
+ let tid = envelope_id(&envelope);
|
|
|
|
|
+ let rid = ReservationId::default();
|
|
|
|
|
+ save_pending(&ledger, &envelope, rid, SagaPhase::Reserving).await;
|
|
|
|
|
+
|
|
|
|
|
+ // A freeze lands before recovery runs.
|
|
|
|
|
+ ledger.freeze(&AccountId::new(1)).await.unwrap();
|
|
|
|
|
+
|
|
|
|
|
+ assert_eq!(ledger.recover().await.unwrap(), 1);
|
|
|
|
|
+ // Nothing committed; balances unchanged; reservation released.
|
|
|
|
|
+ assert!(ledger.store().get_transfer(&tid).await.unwrap().is_none());
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(), Cent::from(100));
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(), Cent::ZERO);
|
|
|
|
|
+ let active = ledger
|
|
|
.store()
|
|
.store()
|
|
|
- .deactivate_postings(&consumes, Some(reservation))
|
|
|
|
|
|
|
+ .get_postings_by_account(&AccountId::new(1), Some(&AssetId::new(1)), Some(PostingStatus::Active))
|
|
|
.await
|
|
.await
|
|
|
.unwrap();
|
|
.unwrap();
|
|
|
- assert_eq!(n, 1); // consumed posting is now Inactive
|
|
|
|
|
|
|
+ assert_eq!(active.len(), 1); // back to Active
|
|
|
|
|
+ assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- let blob = serde_json::to_vec(&PendingSaga {
|
|
|
|
|
- envelope,
|
|
|
|
|
- reservation,
|
|
|
|
|
- })
|
|
|
|
|
- .unwrap();
|
|
|
|
|
- ledger.store().save_saga(&reservation.0, blob).await.unwrap();
|
|
|
|
|
|
|
+ /// Recovery cannot double-spend: if the consumed posting was taken by another
|
|
|
|
|
+ /// transfer while the saga was pending, recovery aborts without creating or
|
|
|
|
|
+ /// storing anything.
|
|
|
|
|
+ #[tokio::test]
|
|
|
|
|
+ async fn recover_does_not_double_spend_a_taken_posting() {
|
|
|
|
|
+ let ledger = funded_ledger().await;
|
|
|
|
|
+ let envelope = ledger.resolve(&pay_transfer()).await.unwrap();
|
|
|
|
|
+ let tid = envelope_id(&envelope);
|
|
|
|
|
+ let rid = ReservationId::default();
|
|
|
|
|
+ save_pending(&ledger, &envelope, rid, SagaPhase::Reserving).await;
|
|
|
|
|
+
|
|
|
|
|
+ // Another transfer consumes account 1's posting and commits.
|
|
|
|
|
+ let steal = TransferBuilder::new()
|
|
|
|
|
+ .pay(AccountId::new(1), AccountId::new(3), AssetId::new(1), Cent::from(50))
|
|
|
|
|
+ .build();
|
|
|
|
|
+ ledger.commit(steal).await.unwrap();
|
|
|
|
|
|
|
|
- // Recovery finishes the commit despite reserve/validate being unable to
|
|
|
|
|
- // re-run over the already-consumed posting.
|
|
|
|
|
assert_eq!(ledger.recover().await.unwrap(), 1);
|
|
assert_eq!(ledger.recover().await.unwrap(), 1);
|
|
|
- assert_eq!(
|
|
|
|
|
- ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(),
|
|
|
|
|
- Cent::from(40)
|
|
|
|
|
- );
|
|
|
|
|
- assert_eq!(
|
|
|
|
|
- ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(),
|
|
|
|
|
- Cent::from(60)
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ // Our envelope never committed; only the stealing transfer applied.
|
|
|
|
|
+ assert!(ledger.store().get_transfer(&tid).await.unwrap().is_none());
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(), Cent::from(50));
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(3), &AssetId::new(1)).await.unwrap(), Cent::from(50));
|
|
|
|
|
+ assert_eq!(ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(), Cent::ZERO);
|
|
|
assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
|
|
assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|