Ver código fonte

Make storage a dumb instruction-follower; move commit + recovery into the saga

The commit path bundled everything into one monolithic store transaction
(commit_transfer) and exposed two confusing entry points (commit /
commit_atomic).  That put a lot of domain assumptions in storage — it
interpreted state, enforced guards, decided idempotency and error semantics —
and the saga's durable crash recovery was designed (SagaStore, legend
pause/resume) but never wired, so that single transaction was the only thing
protecting against a half-applied commit.

Invert it. Storage becomes a dumb instruction-follower: every write applies one
update and returns the number of affected rows (or an I/O error), never
deciding state, idempotency, or compensation. The saga reads each count and
owns the logic (full continue / partial error+compensate / zero
read-and-check-same-envelope).  Crash-safety moves to a write-ahead PendingSaga
record plus idempotent roll-forward in Ledger::recover(), so a crash at any
point converges without a global transaction and leaves no orphaned
reservations.

- Storage primitives now return counts: reserve/release/deactivate_postings,
  insert_postings, store_transfer(record, involved), idempotent append_event
  (dedup_key in 001_init.sql). Drop the semantic write-outcome StoreError
  variants.
- Unify on commit(transfer) = resolve + commit_envelope (reserve -> validate ->
  finalize on the primitives); reverse() uses it; remove commit_atomic and the
  CommitStore/CommitRequest/commit_transfer atomic boundary and Plan guard
  fields.
- Add Ledger::recover() with write-ahead persistence; prove it with recovery
  and mid-finalize crash-injection tests.
- Tradeoff: the CappedOverdraft floor and freeze/close guards are now
  validate-time and best-effort under concurrency; double-spend safety still
  holds via the reservation protocol. Recorded in
   doc/adr/0001-dumb-storage-saga-recovery.md.
- Sweep docs, READMEs, module docs, and examples to the new model.
Cesar Rodas 6 dias atrás
pai
commit
45b57f2319

BIN
Archive.zip


+ 5 - 3
CLAUDE.md

@@ -29,14 +29,16 @@ doc/
 - **Envelope**: concrete postings to consume and create — the resolved form of movements.
 - **Conservation**: for each asset, `sum(consumed) == sum(created)`.
 - **Account policies**: NoOverdraft, CappedOverdraft, UncappedOverdraft, SystemAccount, ExternalAccount. Only `NoOverdraft` forbids negative postings; the other four permit them. An overdraft is a negative posting that covers a shortfall — down to the floor for `CappedOverdraft`, unbounded for `UncappedOverdraft`.
-- **Atomic commit**: `CommitStore::commit_transfer` is the single atomic boundary — postings, transfer record, account index, and events apply in one transaction. It enforces `CappedOverdraft` CAS guards and reservation ownership. `reserve_postings`/`release_postings` carry a `ReservationId` so only the owning saga can finalize/release a reserved posting.
+- **Dumb storage**: the `Store` is a thin instruction follower. Write methods apply one update and return the **number of affected rows** (or an I/O error) — they never interpret counts, decide state, enforce idempotency, or compensate. The saga owns all of that. There is no monolithic `commit_transfer`; commit is a sequence of dumb primitives (`reserve_postings`, `deactivate_postings`, `insert_postings`, `store_transfer`, `append_event`), each idempotent. See [doc/adr/0001-dumb-storage-saga-recovery.md](doc/adr/0001-dumb-storage-saga-recovery.md).
 
 ## Architecture
 
 - **Pure core / async layer separation**: kuatia-core has zero IO, fully deterministic, testable with golden vectors. kuatia adds async Store trait and saga pipeline.
-- **Saga commit pipeline**: reserve → validate → finalize, with automatic retry and LIFO compensation via the `legend` crate.
+- **Saga commit pipeline**: every commit is the envelope saga `reserve → validate → finalize`, with automatic retry and LIFO compensation via the `legend` crate. `commit(transfer)` = resolve (read-only) then `commit_envelope`; `reverse()` builds a reversal envelope and runs the same path. There is one commit path, not a separate "atomic" one.
+- **Count interpretation**: the saga reads each primitive's affected-row count — full = continue; partial = error → compensate; zero = read state and continue only if this same envelope/reservation already applied it (idempotency).
+- **Durable recovery**: a write-ahead `PendingSaga {envelope, reservation}` is persisted via `SagaStore` before the saga mutates anything. `Ledger::recover()` (call on startup) force-completes any pending saga through the idempotent primitives — converging from a crash at any point (pre-reserve, reserved, or mid-finalize). Roll-forward, not rollback, so there are no orphaned `PendingInactive` postings to reconcile.
 - **Content-addressed transfers**: EnvelopeId = double-SHA-256 of canonical bytes. Provides idempotency and tamper evidence.
-- **Append-only accounts**: versioned, never modified in place. Snapshot pinning prevents TOCTOU races.
+- **Append-only accounts**: versioned, never modified in place. Snapshot pinning (validate-time) prevents TOCTOU races; under the dumb-storage model the overdraft-floor and freeze/close guards are validate-time and best-effort under concurrency.
 - **Store uses `Arc<dyn Store>`**: Ledger is non-generic, enabling concrete saga types.
 
 ## Resolve algorithm

+ 1 - 0
Cargo.lock

@@ -590,6 +590,7 @@ dependencies = [
  "kuatia-types",
  "legend",
  "serde",
+ "serde_json",
  "sqlx",
  "tokio",
  "tracing",

+ 4 - 20
crates/kuatia-core/src/validate.rs

@@ -43,12 +43,6 @@ pub struct Plan {
     pub postings_to_deactivate: Vec<PostingId>,
     /// New postings to persist.
     pub postings_to_create: Vec<Posting>,
-    /// CAS guards for CappedOverdraft accounts: (account, asset, expected_balance).
-    pub cas_guards: Vec<(AccountId, AssetId, Cent)>,
-    /// Account version guards: (account, expected_version). Each pinned account
-    /// snapshot is re-checked atomically at commit so a concurrent
-    /// freeze/unfreeze/close (which bump `version`) aborts the transfer.
-    pub account_guards: Vec<(AccountId, u64)>,
 }
 
 // ---------------------------------------------------------------------------
@@ -284,9 +278,6 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
     }
 
     // 5b. Snapshot pinning: each account_snapshot must match current state.
-    //     Emit a version guard per pinned account so the commit boundary can
-    //     re-check atomically against concurrent lifecycle mutations.
-    let mut account_guards: Vec<(AccountId, u64)> = Vec::new();
     for snap in envelope.account_snapshots() {
         let account = input
             .accounts
@@ -300,7 +291,6 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
                 actual,
             });
         }
-        account_guards.push((snap.account, account.version));
     }
 
     // 5c. Book policy: gate which assets and accounts may participate. Enforced
@@ -414,8 +404,6 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
         *entry = entry.checked_add(np.value)?;
     }
 
-    let mut cas_guards = Vec::new();
-
     for ((account_id, asset_id), delta) in &deltas {
         let current_balance = input
             .balances
@@ -445,8 +433,6 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
                         projected,
                     });
                 }
-                // Emit CAS guard for write-skew prevention
-                cas_guards.push((*account_id, *asset_id, current_balance));
             }
             AccountPolicy::UncappedOverdraft
             | AccountPolicy::SystemAccount
@@ -482,8 +468,6 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
         transfer_id: tid,
         postings_to_deactivate,
         postings_to_create,
-        cas_guards,
-        account_guards,
     })
 }
 
@@ -832,10 +816,9 @@ mod tests {
             book: None,
         };
 
+        // A CappedOverdraft spend within the floor validates and produces a plan.
         let plan = validate_and_plan(input).unwrap();
-        // Should have a CAS guard for the capped account
-        assert_eq!(plan.cas_guards.len(), 1);
-        assert_eq!(plan.cas_guards[0].0, AccountId::new(1));
+        assert!(!plan.postings_to_create.is_empty());
     }
 
     #[test]
@@ -940,8 +923,9 @@ mod tests {
             book: None,
         };
 
+        // UncappedOverdraft permits the negative projection; the plan validates.
         let plan = validate_and_plan(input).unwrap();
-        assert!(plan.cas_guards.is_empty());
+        assert!(!plan.postings_to_create.is_empty());
     }
 
     #[test]

+ 166 - 266
crates/kuatia-storage-sql/src/lib.rs

@@ -10,8 +10,6 @@
 //! store.migrate().await?;
 //! ```
 
-use std::collections::HashSet;
-
 use async_trait::async_trait;
 use sqlx::{Any, Pool, Row};
 
@@ -435,18 +433,16 @@ impl PostingStore for SqlStore {
         &self,
         ids: &[PostingId],
         reservation: ReservationId,
-    ) -> Result<(), StoreError> {
-        // Conditional update: each posting transitions Active → PendingInactive
-        // only if it is still Active. The status precondition lives in the WHERE
-        // clause so a concurrent transition cannot slip between a check and the
-        // write; `rows_affected() == 1` is the authorization. A missing row and a
-        // non-Active row are indistinguishable here — both are "not reservable".
+    ) -> Result<u64, StoreError> {
+        // Dumb instruction: each id flips Active → PendingInactive (the status
+        // precondition is in the WHERE so it is atomic). Return the count of rows
+        // changed; the caller decides what a short count means.
         let mut tx = self
             .pool
             .begin()
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
-
+        let mut reserved: u64 = 0;
         for id in ids {
             let res = sqlx::query(
                 "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4 AND status = $5",
@@ -459,54 +455,30 @@ impl PostingStore for SqlStore {
             .execute(&mut *tx)
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
-            if res.rows_affected() != 1 {
-                return Err(StoreError::PostingNotActive(*id));
-            }
+            reserved += res.rows_affected();
         }
 
         tx.commit()
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
-        Ok(())
+        Ok(reserved)
     }
 
     async fn release_postings(
         &self,
         ids: &[PostingId],
         reservation: ReservationId,
-    ) -> Result<(), StoreError> {
+    ) -> Result<u64, StoreError> {
+        // Dumb instruction: each id reserved by `reservation` flips
+        // PendingInactive → Active. Return the count released; an already-Active
+        // or differently-owned posting simply does not count.
         let mut tx = self
             .pool
             .begin()
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
-
-        // Classify then conditionally release in one pass. Releasing an Active
-        // posting is a no-op (matches InMemory); releasing a reserved one must
-        // affect exactly one row, otherwise it changed under us concurrently.
+        let mut released: u64 = 0;
         for id in ids {
-            let row = sqlx::query(
-                "SELECT status, reservation FROM postings WHERE transfer_id = $1 AND idx = $2",
-            )
-            .bind(id.transfer.0.as_slice())
-            .bind(id.index as i16)
-            .fetch_optional(&mut *tx)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?
-            .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
-            let status: i16 = row
-                .try_get("status")
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            if status == 2 {
-                return Err(StoreError::PostingInactive(*id));
-            }
-            let owner: Option<i64> = row
-                .try_get("reservation")
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            if status == 1 && owner != Some(reservation.0) {
-                return Err(StoreError::ReservationMismatch(*id));
-            }
-
             let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
                 .bind(status_to_i16(PostingStatus::Active))
                 .bind(id.transfer.0.as_slice())
@@ -516,19 +488,86 @@ impl PostingStore for SqlStore {
                 .execute(&mut *tx)
                 .await
                 .map_err(|e| StoreError::Internal(e.to_string()))?;
-            // status == 1 was reserved by us → the row must flip; status == 0
-            // (Active) is a no-op with zero rows affected.
-            if status == 1 && res.rows_affected() != 1 {
-                return Err(StoreError::ReservationMismatch(*id));
-            }
+            released += res.rows_affected();
         }
 
         tx.commit()
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
-        Ok(())
+        Ok(released)
+    }
+
+    async fn deactivate_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: Option<ReservationId>,
+    ) -> Result<u64, StoreError> {
+        let mut tx = self
+            .pool
+            .begin()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        let mut changed: u64 = 0;
+        for id in ids {
+            // The precondition is the instruction; the count is the result. The
+            // caller decides what a short count means.
+            let res = match reservation {
+                None => {
+                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
+                        .bind(status_to_i16(PostingStatus::Inactive))
+                        .bind(id.transfer.0.as_slice())
+                        .bind(id.index as i16)
+                        .bind(status_to_i16(PostingStatus::Active))
+                        .execute(&mut *tx)
+                        .await
+                }
+                Some(rid) => {
+                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
+                        .bind(status_to_i16(PostingStatus::Inactive))
+                        .bind(id.transfer.0.as_slice())
+                        .bind(id.index as i16)
+                        .bind(status_to_i16(PostingStatus::PendingInactive))
+                        .bind(rid.0)
+                        .execute(&mut *tx)
+                        .await
+                }
+            }
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+            changed += res.rows_affected();
+        }
+        tx.commit()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        Ok(changed)
     }
 
+    async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError> {
+        let mut tx = self
+            .pool
+            .begin()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        let mut inserted: u64 = 0;
+        for posting in postings {
+            let res = sqlx::query(
+                "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (transfer_id, idx) DO NOTHING"
+            )
+                .bind(posting.id.transfer.0.as_slice())
+                .bind(posting.id.index as i16)
+                .bind(posting.owner.0)
+                .bind(posting.asset.0 as i32)
+                .bind(posting.value.value())
+                .bind(status_to_i16(posting.status))
+                .execute(&mut *tx)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            inserted += res.rows_affected();
+        }
+        tx.commit()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        Ok(inserted)
+    }
 }
 
 // ---------------------------------------------------------------------------
@@ -565,6 +604,49 @@ impl TransferStore for SqlStore {
         }
     }
 
+    async fn store_transfer(
+        &self,
+        record: EnvelopeRecord,
+        involved: &[AccountId],
+    ) -> Result<u64, StoreError> {
+        let tid = record.receipt.transfer_id;
+        let transfer_bytes = serialize_blob(&record.envelope)?;
+        let receipt_bytes = serialize_blob(&record.receipt)?;
+
+        let mut tx = self
+            .pool
+            .begin()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+
+        let res = sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO NOTHING")
+            .bind(tid.0.as_slice())
+            .bind(&transfer_bytes)
+            .bind(&receipt_bytes)
+            .bind(record.created_at)
+            .bind(record.envelope.book().0)
+            .execute(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        let inserted = res.rows_affected();
+
+        // Index every involved account (caller supplies the set; storage does no
+        // computation). Idempotent so a replay is harmless.
+        for account in involved {
+            sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2) ON CONFLICT (transfer_id, account_id) DO NOTHING")
+                .bind(tid.0.as_slice())
+                .bind(account.0)
+                .execute(&mut *tx)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+        }
+
+        tx.commit()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        Ok(inserted)
+    }
+
     async fn get_transfers_for_account(
         &self,
         account: &AccountId,
@@ -723,16 +805,44 @@ impl EventStore for SqlStore {
         let data = serialize_blob(event)?;
         let seq = self.autoid.next() as u64;
 
-        sqlx::query("INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)")
-            .bind(seq as i64)
-            .bind(event.timestamp)
-            .bind(&kind_str)
-            .bind(&data)
-            .execute(&self.pool)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-
-        Ok(seq)
+        // Idempotent on the dedup key: a replayed transfer event conflicts on
+        // `dedup_key` and returns the existing seq instead of a duplicate row.
+        match kuatia_storage::events::event_dedup_key(&event.kind) {
+            Some(eid) => {
+                let res = sqlx::query("INSERT INTO events (seq, timestamp, kind, data, dedup_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (dedup_key) DO NOTHING")
+                    .bind(seq as i64)
+                    .bind(event.timestamp)
+                    .bind(&kind_str)
+                    .bind(&data)
+                    .bind(eid.0.as_slice())
+                    .execute(&self.pool)
+                    .await
+                    .map_err(|e| StoreError::Internal(e.to_string()))?;
+                if res.rows_affected() == 0 {
+                    let row = sqlx::query("SELECT seq FROM events WHERE dedup_key = $1")
+                        .bind(eid.0.as_slice())
+                        .fetch_one(&self.pool)
+                        .await
+                        .map_err(|e| StoreError::Internal(e.to_string()))?;
+                    let existing: i64 = row
+                        .try_get("seq")
+                        .map_err(|e| StoreError::Internal(e.to_string()))?;
+                    return Ok(existing as u64);
+                }
+                Ok(seq)
+            }
+            None => {
+                sqlx::query("INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)")
+                    .bind(seq as i64)
+                    .bind(event.timestamp)
+                    .bind(&kind_str)
+                    .bind(&data)
+                    .execute(&self.pool)
+                    .await
+                    .map_err(|e| StoreError::Internal(e.to_string()))?;
+                Ok(seq)
+            }
+        }
     }
 
     async fn get_events_since(
@@ -819,213 +929,3 @@ impl BookStore for SqlStore {
     }
 }
 
-// ---------------------------------------------------------------------------
-// CommitStore — the single atomic commit boundary
-// ---------------------------------------------------------------------------
-
-#[async_trait]
-impl CommitStore for SqlStore {
-    async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError> {
-        let tid = req.record.receipt.transfer_id;
-        let transfer_bytes = serialize_blob(&req.record.envelope)?;
-        let receipt_bytes = serialize_blob(&req.record.receipt)?;
-
-        let mut tx = self
-            .pool
-            .begin()
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-
-        // 1. Idempotency: already committed?
-        let exists = sqlx::query("SELECT 1 FROM transfers WHERE id = $1")
-            .bind(tid.0.as_slice())
-            .fetch_optional(&mut *tx)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-        if exists.is_some() {
-            return Ok(());
-        }
-
-        // 2. CAS guards — recompute Σ(non-Inactive) in Rust and compare.
-        for (account, asset, expected) in req.cas_guards {
-            let rows = sqlx::query(
-                "SELECT value FROM postings WHERE owner = $1 AND asset = $2 AND status != $3",
-            )
-            .bind(account.0)
-            .bind(asset.0 as i32)
-            .bind(status_to_i16(PostingStatus::Inactive))
-            .fetch_all(&mut *tx)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-            let mut sum: i64 = 0;
-            for row in &rows {
-                let v: i64 = row
-                    .try_get("value")
-                    .map_err(|e| StoreError::Internal(e.to_string()))?;
-                sum = sum
-                    .checked_add(v)
-                    .ok_or_else(|| StoreError::Internal("balance overflow during cas".into()))?;
-            }
-            if sum != expected.value() {
-                return Err(StoreError::Conflict {
-                    account: *account,
-                    asset: *asset,
-                });
-            }
-        }
-
-        // 2b. Account version guards — re-check each pinned account version in the
-        //     same transaction so a concurrent freeze/unfreeze/close (which bumps
-        //     version) aborts the commit instead of racing past validation.
-        for (account, expected) in req.account_guards {
-            let actual: i64 = sqlx::query(
-                "SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1",
-            )
-            .bind(account.0)
-            .fetch_optional(&mut *tx)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?
-            .ok_or(StoreError::NotFound(format!("account {account:?}")))?
-            .try_get("version")
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-            if actual as u64 != *expected {
-                return Err(StoreError::VersionConflict {
-                    account: *account,
-                    expected: *expected,
-                    actual: actual as u64,
-                });
-            }
-        }
-
-        // 3. Authorize consumed postings and collect their owners.
-        let mut account_ids: HashSet<i64> = HashSet::new();
-        for pid in req.deactivate {
-            let row = sqlx::query(
-                "SELECT owner, status, reservation FROM postings WHERE transfer_id = $1 AND idx = $2",
-            )
-            .bind(pid.transfer.0.as_slice())
-            .bind(pid.index as i16)
-            .fetch_optional(&mut *tx)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?
-            .ok_or(StoreError::ReservationMismatch(*pid))?;
-            let owner: i64 = row
-                .try_get("owner")
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            let status: i16 = row
-                .try_get("status")
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            let reservation: Option<i64> = row
-                .try_get("reservation")
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            match req.reservation {
-                None => {
-                    if status != status_to_i16(PostingStatus::Active) {
-                        return Err(StoreError::ReservationMismatch(*pid));
-                    }
-                }
-                Some(rid) => {
-                    if status != status_to_i16(PostingStatus::PendingInactive)
-                        || reservation != Some(rid.0)
-                    {
-                        return Err(StoreError::ReservationMismatch(*pid));
-                    }
-                }
-            }
-            account_ids.insert(owner);
-        }
-
-        // 4. Deactivate consumed postings. The authorization precondition lives in
-        //    the WHERE clause (not just the step-3 read) so two concurrent commits
-        //    cannot both deactivate the same posting under READ COMMITTED. The raw
-        //    path requires it still Active; the saga path requires it still
-        //    PendingInactive owned by this reservation. `rows_affected() == 1` is
-        //    the authorization.
-        for pid in req.deactivate {
-            let res = match req.reservation {
-                None => {
-                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
-                        .bind(status_to_i16(PostingStatus::Inactive))
-                        .bind(pid.transfer.0.as_slice())
-                        .bind(pid.index as i16)
-                        .bind(status_to_i16(PostingStatus::Active))
-                        .execute(&mut *tx)
-                        .await
-                }
-                Some(rid) => {
-                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
-                        .bind(status_to_i16(PostingStatus::Inactive))
-                        .bind(pid.transfer.0.as_slice())
-                        .bind(pid.index as i16)
-                        .bind(status_to_i16(PostingStatus::PendingInactive))
-                        .bind(rid.0)
-                        .execute(&mut *tx)
-                        .await
-                }
-            }
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-            if res.rows_affected() != 1 {
-                return Err(StoreError::ReservationMismatch(*pid));
-            }
-        }
-
-        // 5. Insert created postings (Active, unreserved → reservation defaults NULL).
-        for posting in req.create {
-            sqlx::query(
-                "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6)"
-            )
-                .bind(posting.id.transfer.0.as_slice())
-                .bind(posting.id.index as i16)
-                .bind(posting.owner.0)
-                .bind(posting.asset.0 as i32)
-                .bind(posting.value.value())
-                .bind(status_to_i16(posting.status))
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            account_ids.insert(posting.owner.0);
-        }
-
-        // 6. Persist the transfer record.
-        sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5)")
-            .bind(tid.0.as_slice())
-            .bind(&transfer_bytes)
-            .bind(&receipt_bytes)
-            .bind(req.record.created_at)
-            .bind(req.record.envelope.book().0)
-            .execute(&mut *tx)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-
-        // 7. Index transfer_accounts from BOTH created and consumed owners.
-        for aid in &account_ids {
-            sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2)")
-                .bind(tid.0.as_slice())
-                .bind(*aid)
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-        }
-
-        // 8. Append events within the same transaction.
-        for event in req.events {
-            let kind_str = serde_json::to_string(&event.kind)
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            let data = serialize_blob(event)?;
-            let seq = self.autoid.next() as u64;
-            sqlx::query("INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)")
-                .bind(seq as i64)
-                .bind(event.timestamp)
-                .bind(&kind_str)
-                .bind(&data)
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-        }
-
-        tx.commit()
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-        Ok(())
-    }
-}

+ 2 - 1
crates/kuatia-storage-sql/src/migrations/postgres/001_init.sql

@@ -50,7 +50,8 @@ CREATE TABLE IF NOT EXISTS events (
     seq       BIGINT PRIMARY KEY,
     timestamp BIGINT NOT NULL,
     kind      TEXT NOT NULL,
-    data      BYTEA NOT NULL
+    data      BYTEA NOT NULL,
+    dedup_key BYTEA UNIQUE
 );
 
 CREATE TABLE IF NOT EXISTS books (

+ 2 - 1
crates/kuatia-storage-sql/src/migrations/sqlite/001_init.sql

@@ -50,7 +50,8 @@ CREATE TABLE IF NOT EXISTS events (
     seq       BIGINT PRIMARY KEY,
     timestamp BIGINT NOT NULL,
     kind      TEXT NOT NULL,
-    data      BLOB NOT NULL
+    data      BLOB NOT NULL,
+    dedup_key BLOB UNIQUE
 );
 
 CREATE TABLE IF NOT EXISTS books (

+ 10 - 5
crates/kuatia-storage/README.md

@@ -11,14 +11,19 @@ macro that any backend can use to validate its implementation.
 | Trait | Purpose |
 |-------|---------|
 | `AccountStore` | Account CRUD and versioning |
-| `PostingStore` | Posting reads, reserve/release/finalize lifecycle (reserve/release carry a `ReservationId`) |
-| `TransferStore` | Transfer persistence and queries |
+| `PostingStore` | Posting reads + lifecycle: `reserve`/`release`/`deactivate`/`insert` postings (reserve/release/deactivate carry a `ReservationId`) |
+| `TransferStore` | Transfer persistence (`store_transfer`) and queries |
 | `SagaStore` | Saga state for crash recovery |
-| `EventStore` | Append-only ledger event log |
+| `EventStore` | Append-only ledger event log (idempotent on a per-transfer dedup key) |
 | `BookStore` | Book (transfer policy scope) persistence |
-| `CommitStore` | `commit_transfer` — the single atomic commit boundary (postings + transfer + index + events) |
 
-`Store` is a blanket trait — any type implementing all seven sub-traits is a `Store`.
+The store is a **dumb instruction follower**: write methods apply one update and
+return the **number of affected rows** (or an I/O error). They do not interpret
+counts, decide state, enforce idempotency, or compensate — the saga in the
+`kuatia` crate does. There is no `commit_transfer`; commit is a sequence of these
+primitives, each idempotent.
+
+`Store` is a blanket trait — any type implementing the sub-traits is a `Store`.
 
 ## Conformance testing
 

+ 5 - 22
crates/kuatia-storage/src/error.rs

@@ -1,8 +1,12 @@
 //! Error types for storage implementations.
 
-use kuatia_types::{AccountId, AssetId, PostingId};
+use kuatia_types::AccountId;
 
 /// Errors produced by [`Store`](crate::store::Store) implementations.
+///
+/// The store is a dumb instruction follower: writes report affected-row counts,
+/// not semantic verdicts, so there are no "posting not active"/"reservation
+/// mismatch"/"cas conflict" variants — the saga derives those from counts.
 #[derive(Debug)]
 pub enum StoreError {
     /// The requested entity was not found.
@@ -20,21 +24,6 @@ pub enum StoreError {
     },
     /// Catch-all for unexpected internal errors.
     Internal(String),
-    /// Attempted to reserve a posting that is not Active.
-    PostingNotActive(PostingId),
-    /// Attempted to release a void (Inactive) posting.
-    PostingInactive(PostingId),
-    /// A CAS guard's expected balance no longer matches the current balance —
-    /// a concurrent transfer moved it. The caller should re-validate and retry.
-    Conflict {
-        /// Account whose balance changed under the guard.
-        account: AccountId,
-        /// Asset whose balance changed.
-        asset: AssetId,
-    },
-    /// A posting's reservation owner did not match the caller's reservation —
-    /// it is reserved by a different saga (or not reserved at all).
-    ReservationMismatch(PostingId),
 }
 
 impl std::fmt::Display for StoreError {
@@ -53,12 +42,6 @@ impl std::fmt::Display for StoreError {
                 )
             }
             Self::Internal(msg) => write!(f, "internal error: {msg}"),
-            Self::PostingNotActive(id) => write!(f, "posting not active: {id:?}"),
-            Self::PostingInactive(id) => write!(f, "posting is void (inactive): {id:?}"),
-            Self::Conflict { account, asset } => {
-                write!(f, "cas conflict on {account:?}/{asset:?}: balance changed")
-            }
-            Self::ReservationMismatch(id) => write!(f, "reservation mismatch on posting {id:?}"),
         }
     }
 }

+ 18 - 3
crates/kuatia-storage/src/events.rs

@@ -48,12 +48,27 @@ pub struct LedgerEvent {
     pub kind: LedgerEventKind,
 }
 
+/// The idempotency key for an event, if it has a natural one. Replayable events
+/// (a committed transfer, re-driven by saga recovery) dedup on their transfer
+/// id; events with no natural identity (account lifecycle) return `None` and may
+/// recur.
+pub fn event_dedup_key(kind: &LedgerEventKind) -> Option<EnvelopeId> {
+    match kind {
+        LedgerEventKind::TransferCommitted { transfer_id } => Some(*transfer_id),
+        LedgerEventKind::AccountCreated { .. }
+        | LedgerEventKind::AccountFrozen { .. }
+        | LedgerEventKind::AccountUnfrozen { .. }
+        | LedgerEventKind::AccountClosed { .. } => None,
+    }
+}
+
 /// Persistent event log for ledger events.
 #[async_trait]
 pub trait EventStore: Send + Sync {
-    /// Append an event and return its assigned sequence number.
-    ///
-    /// The `seq` field on the input event is ignored -- the store assigns it.
+    /// Append an event and return its sequence number. Idempotent on the event's
+    /// [`event_dedup_key`]: appending an event whose key already exists does not
+    /// insert a duplicate and returns the existing seq. The `seq` field on the
+    /// input is ignored -- the store assigns it.
     async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError>;
 
     /// Return events with sequence numbers greater than `after_seq`, up to `limit`.

+ 89 - 152
crates/kuatia-storage/src/mem_store.rs

@@ -8,15 +8,14 @@ use tokio::sync::RwLock;
 
 use kuatia_types::autoid::AutoId;
 use kuatia_types::{
-    Account, AccountId, AssetId, Book, BookId, Cent, EnvelopeId, Posting, PostingId, PostingStatus,
+    Account, AccountId, AssetId, Book, BookId, EnvelopeId, Posting, PostingId, PostingStatus,
     ReservationId,
 };
 
 use crate::error::StoreError;
 use crate::events::{EventStore, LedgerEvent};
 use crate::store::{
-    AccountStore, BookStore, CommitRequest, CommitStore, EnvelopeRecord, PostingStore, SagaStore,
-    TransferStore,
+    AccountStore, BookStore, EnvelopeRecord, PostingStore, SagaStore, TransferStore,
 };
 
 /// In-memory [`Store`](crate::store::Store) implementation backed by `RwLock<HashMap>`.
@@ -168,56 +167,82 @@ impl PostingStore for InMemoryStore {
         &self,
         ids: &[PostingId],
         reservation: ReservationId,
-    ) -> Result<(), StoreError> {
+    ) -> Result<u64, StoreError> {
         let mut postings = self.postings.write().await;
+        let mut reserved: u64 = 0;
         for id in ids {
-            let posting = postings
-                .get(id)
-                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
-            if posting.status != PostingStatus::Active {
-                return Err(StoreError::PostingNotActive(*id));
+            let Some(posting) = postings.get_mut(id) else {
+                continue; // dumb: a missing row just doesn't count
+            };
+            if posting.status == PostingStatus::Active {
+                posting.status = PostingStatus::PendingInactive;
+                posting.reservation = Some(reservation);
+                reserved += 1;
             }
         }
-        for id in ids {
-            let posting = postings
-                .get_mut(id)
-                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
-            posting.status = PostingStatus::PendingInactive;
-            posting.reservation = Some(reservation);
-        }
-        Ok(())
+        Ok(reserved)
     }
 
     async fn release_postings(
         &self,
         ids: &[PostingId],
         reservation: ReservationId,
-    ) -> Result<(), StoreError> {
+    ) -> Result<u64, StoreError> {
         let mut postings = self.postings.write().await;
+        let mut released: u64 = 0;
         for id in ids {
-            let posting = postings
-                .get(id)
-                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
-            match posting.status {
-                PostingStatus::Inactive => return Err(StoreError::PostingInactive(*id)),
-                PostingStatus::PendingInactive if posting.reservation != Some(reservation) => {
-                    return Err(StoreError::ReservationMismatch(*id));
-                }
-                _ => {}
+            let Some(posting) = postings.get_mut(id) else {
+                continue;
+            };
+            if posting.status == PostingStatus::PendingInactive
+                && posting.reservation == Some(reservation)
+            {
+                posting.status = PostingStatus::Active;
+                posting.reservation = None;
+                released += 1;
             }
         }
+        Ok(released)
+    }
+
+    async fn deactivate_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: Option<ReservationId>,
+    ) -> Result<u64, StoreError> {
+        let mut postings = self.postings.write().await;
+        let mut changed: u64 = 0;
         for id in ids {
-            let posting = postings
-                .get_mut(id)
-                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
-            if posting.status == PostingStatus::PendingInactive {
-                posting.status = PostingStatus::Active;
+            let Some(posting) = postings.get_mut(id) else {
+                continue; // dumb: a missing row just doesn't count
+            };
+            let matches = match reservation {
+                None => posting.status == PostingStatus::Active,
+                Some(rid) => {
+                    posting.status == PostingStatus::PendingInactive
+                        && posting.reservation == Some(rid)
+                }
+            };
+            if matches {
+                posting.status = PostingStatus::Inactive;
                 posting.reservation = None;
+                changed += 1;
             }
         }
-        Ok(())
+        Ok(changed)
     }
 
+    async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError> {
+        let mut store = self.postings.write().await;
+        let mut inserted: u64 = 0;
+        for posting in postings {
+            if !store.contains_key(&posting.id) {
+                store.insert(posting.id, posting.clone());
+                inserted += 1;
+            }
+        }
+        Ok(inserted)
+    }
 }
 
 // ---------------------------------------------------------------------------
@@ -231,12 +256,29 @@ impl TransferStore for InMemoryStore {
         Ok(transfers.get(id).cloned())
     }
 
+    async fn store_transfer(
+        &self,
+        record: EnvelopeRecord,
+        _involved: &[AccountId],
+    ) -> Result<u64, StoreError> {
+        // `_involved` is ignored here: `get_transfers_for_account` derives the
+        // involved accounts from the stored envelope (creates owners + consumed
+        // posting owners), which matches the set the caller passes. The SQL
+        // backend instead persists `involved` into its `transfer_accounts` index.
+        let mut transfers = self.transfers.write().await;
+        if transfers.contains_key(&record.receipt.transfer_id) {
+            return Ok(0);
+        }
+        transfers.insert(record.receipt.transfer_id, record);
+        Ok(1)
+    }
+
     async fn get_transfers_for_account(
         &self,
         account: &AccountId,
     ) -> Result<Vec<EnvelopeRecord>, StoreError> {
-        // Lock order postings → transfers must match `commit_transfer` to avoid
-        // an AB–BA deadlock.
+        // Acquire postings → transfers in a consistent order to avoid an AB–BA
+        // deadlock with any reader that takes both.
         let postings = self.postings.read().await;
         let transfers = self.transfers.read().await;
         let mut result: Vec<EnvelopeRecord> = transfers
@@ -291,14 +333,22 @@ impl SagaStore for InMemoryStore {
 #[async_trait]
 impl EventStore for InMemoryStore {
     async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
-        let seq = self.autoid.next() as u64;
         let mut events = self.events.write().await;
-        let stored = LedgerEvent {
+        // Idempotent on the dedup key: a replayed transfer event returns the
+        // existing seq instead of inserting a duplicate.
+        if let Some(key) = crate::events::event_dedup_key(&event.kind)
+            && let Some(existing) = events
+                .iter()
+                .find(|e| crate::events::event_dedup_key(&e.kind) == Some(key))
+        {
+            return Ok(existing.seq);
+        }
+        let seq = self.autoid.next() as u64;
+        events.push(LedgerEvent {
             seq,
             timestamp: event.timestamp,
             kind: event.kind.clone(),
-        };
-        events.push(stored);
+        });
         Ok(seq)
     }
 
@@ -345,116 +395,3 @@ impl BookStore for InMemoryStore {
     }
 }
 
-// ---------------------------------------------------------------------------
-// CommitStore
-// ---------------------------------------------------------------------------
-
-#[async_trait]
-impl CommitStore for InMemoryStore {
-    async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError> {
-        // Lock order accounts → postings → transfers → events; every reader that
-        // takes more than one of these must follow the same order. Holding the
-        // accounts read lock for the whole commit keeps the version guard (step
-        // 2b) atomic against a concurrent lifecycle mutation.
-        let accounts = self.accounts.read().await;
-        let mut postings = self.postings.write().await;
-        let mut transfers = self.transfers.write().await;
-        let mut events = self.events.write().await;
-
-        let tid = req.record.receipt.transfer_id;
-
-        // 1. Idempotency: a prior attempt already committed this transfer.
-        if transfers.contains_key(&tid) {
-            return Ok(());
-        }
-
-        // 2. CAS guards — recompute each balance (Σ non-Inactive postings) before
-        //    any mutation, matching how validation snapshotted it.
-        for (account, asset, expected) in req.cas_guards {
-            let balance = Cent::checked_sum(
-                postings
-                    .values()
-                    .filter(|p| {
-                        p.owner == *account
-                            && p.asset == *asset
-                            && p.status != PostingStatus::Inactive
-                    })
-                    .map(|p| p.value),
-            )
-            .map_err(|_| StoreError::Internal("balance overflow during cas".into()))?;
-            if balance != *expected {
-                return Err(StoreError::Conflict {
-                    account: *account,
-                    asset: *asset,
-                });
-            }
-        }
-
-        // 2b. Account version guards — a concurrent freeze/unfreeze/close bumps
-        //     the version, invalidating the snapshot pinned at validation.
-        for (account, expected) in req.account_guards {
-            let actual = accounts
-                .get(account)
-                .and_then(|versions| versions.last())
-                .map(|a| a.version)
-                .ok_or(StoreError::NotFound(format!("account {account:?}")))?;
-            if actual != *expected {
-                return Err(StoreError::VersionConflict {
-                    account: *account,
-                    expected: *expected,
-                    actual,
-                });
-            }
-        }
-
-        // 3. Authorize every consumed posting against the reservation.
-        for pid in req.deactivate {
-            let posting = postings
-                .get(pid)
-                .ok_or(StoreError::ReservationMismatch(*pid))?;
-            match req.reservation {
-                None => {
-                    if posting.status != PostingStatus::Active {
-                        return Err(StoreError::ReservationMismatch(*pid));
-                    }
-                }
-                Some(rid) => {
-                    if posting.status != PostingStatus::PendingInactive
-                        || posting.reservation != Some(rid)
-                    {
-                        return Err(StoreError::ReservationMismatch(*pid));
-                    }
-                }
-            }
-        }
-
-        // 4. Deactivate consumed postings.
-        for pid in req.deactivate {
-            let posting = postings
-                .get_mut(pid)
-                .ok_or(StoreError::ReservationMismatch(*pid))?;
-            posting.status = PostingStatus::Inactive;
-            posting.reservation = None;
-        }
-
-        // 5. Insert created postings.
-        for posting in req.create {
-            postings.insert(posting.id, posting.clone());
-        }
-
-        // 6. Persist the transfer record.
-        transfers.insert(tid, req.record);
-
-        // 7. Append events in-transaction, assigning sequence numbers.
-        for event in req.events {
-            let seq = self.autoid.next() as u64;
-            events.push(LedgerEvent {
-                seq,
-                timestamp: event.timestamp,
-                kind: event.kind.clone(),
-            });
-        }
-
-        Ok(())
-    }
-}

+ 49 - 64
crates/kuatia-storage/src/store.rs

@@ -1,22 +1,24 @@
 //! Storage abstraction separating the pure decision logic from IO.
 //!
-//! The [`Store`] trait composes six focused sub-traits:
+//! The [`Store`] trait composes focused sub-traits, each a dumb instruction
+//! follower: writes apply one update and report an affected-row count (or an I/O
+//! error). The saga, not the store, interprets counts and owns idempotency and
+//! compensation.
 //! - [`AccountStore`] — account CRUD and versioning
 //! - [`PostingStore`] — posting reads and lifecycle transitions
 //! - [`TransferStore`] — transfer persistence and queries
 //! - [`SagaStore`] — saga state for crash recovery
 //! - [`EventStore`](crate::events::EventStore) — the ledger event log
 //! - [`BookStore`] — book persistence
-//! - [`CommitStore`] — the single atomic commit boundary
 
 use async_trait::async_trait;
 use kuatia_types::{
-    Account, AccountId, AssetId, Book, BookId, Cent, Envelope, EnvelopeId, Posting, PostingId,
+    Account, AccountId, AssetId, Book, BookId, Envelope, EnvelopeId, Posting, PostingId,
     PostingStatus, Receipt, ReservationId,
 };
 
 use crate::error::StoreError;
-use crate::events::{EventStore, LedgerEvent};
+use crate::events::EventStore;
 
 /// Pairs a committed transfer with its receipt.
 #[derive(Debug, Clone)]
@@ -29,31 +31,6 @@ pub struct EnvelopeRecord {
     pub created_at: i64,
 }
 
-/// Everything one atomic commit must persist together. Carries decomposed
-/// primitives (not `kuatia_core::Plan`) so this crate need not depend on the
-/// pure-core crate.
-pub struct CommitRequest<'a> {
-    /// Consumed postings to mark `Inactive`.
-    pub deactivate: &'a [PostingId],
-    /// New postings to insert (already `Active`, from the validated plan).
-    pub create: &'a [Posting],
-    /// `(account, asset, expected_balance)` guards to verify before mutating —
-    /// a mismatch means a concurrent transfer moved the balance ([`StoreError::Conflict`]).
-    pub cas_guards: &'a [(AccountId, AssetId, Cent)],
-    /// `(account, expected_version)` guards re-checked atomically at commit — a
-    /// mismatch means a concurrent lifecycle mutation (freeze/unfreeze/close)
-    /// bumped the account version after validation ([`StoreError::VersionConflict`]).
-    pub account_guards: &'a [(AccountId, u64)],
-    /// Reservation authorizing consumption of `deactivate`.
-    /// - `None` — raw path: the postings must be `Active`.
-    /// - `Some(rid)` — saga path: the postings must be `PendingInactive` owned by `rid`.
-    pub reservation: Option<ReservationId>,
-    /// The transfer record to persist.
-    pub record: EnvelopeRecord,
-    /// Events to append within the same transaction (e.g. `TransferCommitted`).
-    pub events: &'a [LedgerEvent],
-}
-
 /// Pagination and filtering parameters for posting queries.
 #[derive(Debug, Clone)]
 pub struct PostingQuery {
@@ -128,24 +105,44 @@ pub trait PostingStore: Send + Sync {
         asset: Option<&AssetId>,
         status: Option<PostingStatus>,
     ) -> Result<Vec<Posting>, StoreError>;
-    /// Reserve postings: Active → PendingInactive, stamping `reservation` as the
-    /// owner token. Atomic: if any posting is not Active, the entire batch fails.
+    /// Reserve postings: `Active → PendingInactive`, stamping `reservation` as
+    /// the owner token. A dumb instruction — each id flips only if still `Active`;
+    /// returns the **number of rows reserved** (0 ≤ n ≤ ids.len()). It does not
+    /// error on a short count; the caller (saga) interprets it.
     async fn reserve_postings(
         &self,
         ids: &[PostingId],
         reservation: ReservationId,
-    ) -> Result<(), StoreError>;
-    /// Release postings reserved under `reservation`, back from reservation.
-    /// - PendingInactive owned by `reservation` → Active (clears the owner)
-    /// - PendingInactive owned by a different reservation → fail ([`StoreError::ReservationMismatch`])
-    /// - Active → no-op (already released)
-    /// - Inactive → fail (void posting cannot be released)
-    /// Atomic: if any posting fails its check, the entire batch fails.
+    ) -> Result<u64, StoreError>;
+    /// Release postings: `PendingInactive` owned by `reservation` → `Active`,
+    /// clearing the owner. A dumb instruction — only postings reserved by this
+    /// `reservation` flip; returns the **number of rows released**. Releasing an
+    /// `Active` (already released) or differently-owned posting simply does not
+    /// count. The caller interprets the result.
     async fn release_postings(
         &self,
         ids: &[PostingId],
         reservation: ReservationId,
-    ) -> Result<(), StoreError>;
+    ) -> Result<u64, StoreError>;
+
+    /// Deactivate postings: flip to `Inactive`. A dumb instruction — it applies
+    /// the conditional update and returns the **number of rows changed**; it does
+    /// not decide whether that count is correct. The caller (saga) interprets it.
+    /// - `reservation == None` (raw): only postings still `Active` flip.
+    /// - `reservation == Some(rid)`: only postings `PendingInactive` owned by
+    ///   `rid` flip.
+    /// Returns the count of postings actually transitioned (0 ≤ n ≤ ids.len()).
+    async fn deactivate_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: Option<ReservationId>,
+    ) -> Result<u64, StoreError>;
+
+    /// Insert postings if absent (idempotent). A dumb instruction — inserts each
+    /// posting unless one with the same id already exists, and returns the
+    /// **number of rows inserted** (already-present postings contribute 0). The
+    /// caller decides what a short count means.
+    async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError>;
 
     /// Query postings with filtering and pagination.
     async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
@@ -169,6 +166,16 @@ pub trait PostingStore: Send + Sync {
 pub trait TransferStore: Send + Sync {
     /// Fetch a transfer record by its content-addressed id.
     async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError>;
+    /// Persist a transfer record if absent (idempotent) and index it under every
+    /// account in `involved` (both created and consumed owners — the caller
+    /// supplies the set so storage computes nothing). A dumb instruction:
+    /// returns **1** if the transfer row was newly inserted, **0** if it already
+    /// existed. The caller decides what `0` means.
+    async fn store_transfer(
+        &self,
+        record: EnvelopeRecord,
+        involved: &[AccountId],
+    ) -> Result<u64, StoreError>;
     /// Return all transfers involving the given account.
     async fn get_transfers_for_account(
         &self,
@@ -242,39 +249,17 @@ pub trait BookStore: Send + Sync {
     async fn list_books(&self) -> Result<Vec<Book>, StoreError>;
 }
 
-/// The single atomic commit boundary — the one place ledger state changes.
-#[async_trait]
-pub trait CommitStore: Send + Sync {
-    /// Apply a validated transfer atomically: enforce CAS guards, authorize and
-    /// deactivate consumed postings, insert created postings, persist the
-    /// transfer record (indexed by **both** created and consumed account owners),
-    /// and append the events — all in one critical section.
-    ///
-    /// Idempotent on the transfer id: if already committed, returns `Ok(())`.
-    /// Returns [`StoreError::Conflict`] (retryable) if a guard balance changed,
-    /// or [`StoreError::ReservationMismatch`] if a consumed posting is not owned
-    /// as `req.reservation` requires.
-    async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError>;
-}
-
 // ---------------------------------------------------------------------------
 // Composite trait
 // ---------------------------------------------------------------------------
 
 /// Async storage abstraction composing all sub-traits.
 pub trait Store:
-    AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore + CommitStore
+    AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore
 {
 }
 
-impl<
-    T: AccountStore
-        + PostingStore
-        + TransferStore
-        + SagaStore
-        + EventStore
-        + BookStore
-        + CommitStore,
-> Store for T
+impl<T: AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore> Store
+    for T
 {
 }

+ 134 - 317
crates/kuatia-storage/src/store_tests.rs

@@ -96,48 +96,15 @@ fn make_envelope() -> (Envelope, EnvelopeId) {
     (t, tid)
 }
 
-/// Seed `create` as Active postings. Since the split write APIs are gone,
-/// `commit_transfer` is the only mutation path; we wrap it in a throwaway
-/// transfer whose envelope mirrors the seeded postings so both the SQL index
-/// (`transfer_accounts`, built from `req.create`) and the InMemory index (built
-/// from the envelope) stay consistent. `tag` keeps the seed transfer id unique
-/// within a test so idempotency doesn't swallow the insert.
-async fn seed_active(store: &(impl Store + 'static), tag: u8, create: &[Posting]) {
-    let creates: Vec<NewPosting> = create
-        .iter()
-        .map(|p| NewPosting {
-            owner: p.owner,
-            asset: p.asset,
-            value: p.value,
-            payer: None,
-        })
-        .collect();
-    let envelope = EnvelopeBuilder::new().creates(creates).build();
-    let mut tid_bytes = [0u8; 32];
-    tid_bytes[0] = tag;
-    let tid = EnvelopeId(tid_bytes);
-    store
-        .commit_transfer(CommitRequest {
-            deactivate: &[],
-            create,
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: EnvelopeRecord {
-                envelope,
-                receipt: Receipt { transfer_id: tid },
-                created_at: 0,
-            },
-            events: &[],
-        })
-        .await
-        .unwrap();
+/// Seed `create` as Active postings via the dumb `insert_postings` primitive.
+/// `tag` is unused now (kept so existing call sites read unchanged).
+async fn seed_active(store: &(impl Store + 'static), _tag: u8, create: &[Posting]) {
+    store.insert_postings(create).await.unwrap();
 }
 
 /// Persist `envelope` as a committed transfer, deriving its created postings the
-/// way the ledger does (`PostingId { transfer: tid, index }`). The faithful
-/// replacement for the removed `store_transfer` — it populates the account index
-/// on both backends.
+/// way the ledger does (`PostingId { transfer: tid, index }`) and indexing the
+/// created owners — the same shape the saga produces.
 async fn commit_envelope(
     store: &(impl Store + 'static),
     envelope: Envelope,
@@ -160,20 +127,19 @@ async fn commit_envelope(
             )
         })
         .collect();
+    let mut involved: Vec<AccountId> = create.iter().map(|p| p.owner).collect();
+    involved.sort();
+    involved.dedup();
+    store.insert_postings(&create).await.unwrap();
     store
-        .commit_transfer(CommitRequest {
-            deactivate: &[],
-            create: &create,
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: EnvelopeRecord {
+        .store_transfer(
+            EnvelopeRecord {
                 envelope,
                 receipt: Receipt { transfer_id: tid },
                 created_at,
             },
-            events: &[],
-        })
+            &involved,
+        )
         .await
         .unwrap();
 }
@@ -408,19 +374,25 @@ pub async fn reserve_postings_batch(store: &(impl Store + 'static)) {
     );
 }
 
-/// Reserve fails if any posting is not Active — no partial mutation.
-pub async fn reserve_non_active_fails(store: &(impl Store + 'static)) {
+/// Reserve only flips the still-Active postings and reports that count; an
+/// already-reserved posting in the batch is skipped (the saga interprets the
+/// short count).
+pub async fn reserve_skips_non_active(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p2 = make_posting([1; 32], 1, 1, 1, 200);
     seed_active(store, 200, &[p1.clone(), p2.clone()]).await;
 
-    store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
-
-    let err = store.reserve_postings(&[p1.id, p2.id], ReservationId::new(1)).await.unwrap_err();
-    assert!(matches!(err, StoreError::PostingNotActive(_)));
+    assert_eq!(store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap(), 1);
 
-    let got = store.get_postings(&[p2.id]).await.unwrap();
-    assert_eq!(got[0].status, PostingStatus::Active);
+    // p1 already PendingInactive → only p2 (still Active) reserves.
+    assert_eq!(
+        store.reserve_postings(&[p1.id, p2.id], ReservationId::new(1)).await.unwrap(),
+        1
+    );
+    assert_eq!(
+        store.get_postings(&[p2.id]).await.unwrap()[0].status,
+        PostingStatus::PendingInactive
+    );
 }
 
 /// Release reserved postings back to Active.
@@ -446,31 +418,23 @@ pub async fn release_active_is_noop(store: &(impl Store + 'static)) {
     assert_eq!(got[0].status, PostingStatus::Active);
 }
 
-/// Releasing an Inactive (void) posting fails.
-pub async fn release_inactive_fails(store: &(impl Store + 'static)) {
+/// Releasing an Inactive (void) posting is a no-op: zero rows released.
+pub async fn release_inactive_zero(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     seed_active(store, 200, std::slice::from_ref(&p1)).await;
 
     // Deactivate p1 (raw path: still Active) so the release sees a void posting.
-    store
-        .commit_transfer(CommitRequest {
-            deactivate: &[p1.id],
-            create: &[],
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: commit_record(EnvelopeId([3; 32]), vec![p1.id]),
-            events: &[],
-        })
-        .await
-        .unwrap();
+    assert_eq!(store.deactivate_postings(&[p1.id], None).await.unwrap(), 1);
 
-    let err = store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap_err();
-    assert!(matches!(err, StoreError::PostingInactive(_)));
+    assert_eq!(store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap(), 0);
+    assert_eq!(
+        store.get_postings(&[p1.id]).await.unwrap()[0].status,
+        PostingStatus::Inactive
+    );
 }
 
-/// Committing a reserved posting transitions it PendingInactive → Inactive while
-/// inserting the newly created posting.
+/// Deactivating a reserved posting (saga path) transitions it
+/// PendingInactive → Inactive while a separate insert adds the created posting.
 pub async fn commit_deactivates_postings(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     seed_active(store, 200, std::slice::from_ref(&p1)).await;
@@ -478,18 +442,11 @@ pub async fn commit_deactivates_postings(store: &(impl Store + 'static)) {
 
     let p2 = make_posting([2; 32], 0, 1, 1, 100);
     // Saga path: p1 is PendingInactive owned by reservation 1.
-    store
-        .commit_transfer(CommitRequest {
-            deactivate: &[p1.id],
-            create: std::slice::from_ref(&p2),
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: Some(ReservationId::new(1)),
-            record: commit_record(EnvelopeId([2; 32]), vec![p1.id]),
-            events: &[],
-        })
-        .await
-        .unwrap();
+    assert_eq!(
+        store.deactivate_postings(&[p1.id], Some(ReservationId::new(1))).await.unwrap(),
+        1
+    );
+    store.insert_postings(std::slice::from_ref(&p2)).await.unwrap();
 
     let got = store.get_postings(&[p1.id]).await.unwrap();
     assert_eq!(got[0].status, PostingStatus::Inactive);
@@ -499,256 +456,116 @@ pub async fn commit_deactivates_postings(store: &(impl Store + 'static)) {
 }
 
 // ---------------------------------------------------------------------------
-// CommitStore tests
+// Dumb count-returning primitives (storage reports counts, never interprets)
 // ---------------------------------------------------------------------------
 
-/// Build a transfer record that spends `consumed` (owned by account 1) entirely
-/// into a new posting owned by account 2, with the given transfer id.
-fn commit_record(tid: EnvelopeId, consumes: Vec<PostingId>) -> EnvelopeRecord {
-    let envelope = EnvelopeBuilder::new()
-        .consumes(consumes)
-        .creates(vec![NewPosting {
-            owner: AccountId::new(2),
-            asset: AssetId::new(1),
-            value: Cent::from(100),
-            payer: None,
-        }])
-        .build();
-    EnvelopeRecord {
-        envelope,
-        receipt: Receipt { transfer_id: tid },
-        created_at: 1000,
-    }
+/// `insert_postings` reports how many rows were newly inserted; already-present
+/// postings contribute zero (idempotent).
+pub async fn insert_postings_counts(store: &(impl Store + 'static)) {
+    let p1 = make_posting([3; 32], 0, 1, 1, 100);
+    let p2 = make_posting([3; 32], 1, 1, 1, 200);
+    assert_eq!(store.insert_postings(std::slice::from_ref(&p1)).await.unwrap(), 1);
+    // p1 already present, p2 new → 1
+    assert_eq!(store.insert_postings(&[p1.clone(), p2.clone()]).await.unwrap(), 1);
+    // both present → 0
+    assert_eq!(store.insert_postings(&[p1, p2]).await.unwrap(), 0);
+}
+
+/// `deactivate_postings` (raw path) flips Active→Inactive and reports the count;
+/// a replay over already-Inactive postings reports zero.
+pub async fn deactivate_postings_counts(store: &(impl Store + 'static)) {
+    let p1 = make_posting([4; 32], 0, 1, 1, 100);
+    let p2 = make_posting([4; 32], 1, 1, 1, 200);
+    store.insert_postings(&[p1.clone(), p2.clone()]).await.unwrap();
+
+    assert_eq!(store.deactivate_postings(&[p1.id, p2.id], None).await.unwrap(), 2);
+    // replay: already Inactive → 0
+    assert_eq!(store.deactivate_postings(&[p1.id, p2.id], None).await.unwrap(), 0);
+    assert_eq!(
+        store.get_postings(&[p1.id]).await.unwrap()[0].status,
+        PostingStatus::Inactive
+    );
 }
 
-/// commit_transfer applies postings, transfer, account index (both sides), and
-/// events atomically; the consumed-only owner is indexed for history.
-pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
-    let consumed = make_posting([7; 32], 0, 1, 1, 100); // owned by account 1
-    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
-
-    let created = make_posting([8; 32], 0, 2, 1, 100); // owned by account 2
-    let tid = EnvelopeId([8; 32]);
-    let events = [LedgerEvent {
-        seq: 0,
-        timestamp: 1000,
-        kind: LedgerEventKind::TransferCommitted { transfer_id: tid },
-    }];
-    store
-        .commit_transfer(CommitRequest {
-            deactivate: &[consumed.id],
-            create: std::slice::from_ref(&created),
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: commit_record(tid, vec![consumed.id]),
-            events: &events,
-        })
-        .await
-        .unwrap();
+/// `deactivate_postings` (saga path) only flips postings reserved by the given
+/// reservation; a non-matching reservation reports zero.
+pub async fn deactivate_postings_saga_path(store: &(impl Store + 'static)) {
+    let p1 = make_posting([5; 32], 0, 1, 1, 100);
+    store.insert_postings(std::slice::from_ref(&p1)).await.unwrap();
+    store.reserve_postings(&[p1.id], ReservationId::new(7)).await.unwrap();
 
-    // Consumed posting is now void; created posting exists and is active.
+    // wrong reservation → 0 (storage doesn't error; the saga decides)
     assert_eq!(
-        store.get_postings(&[consumed.id]).await.unwrap()[0].status,
-        PostingStatus::Inactive
+        store.deactivate_postings(&[p1.id], Some(ReservationId::new(8))).await.unwrap(),
+        0
     );
+    // right reservation → 1
     assert_eq!(
-        store.get_postings(&[created.id]).await.unwrap()[0].status,
-        PostingStatus::Active
+        store.deactivate_postings(&[p1.id], Some(ReservationId::new(7))).await.unwrap(),
+        1
     );
+}
 
-    // Transfer record is retrievable.
-    assert!(store.get_transfer(&tid).await.unwrap().is_some());
+/// `store_transfer` returns 1 when the record is newly inserted, 0 on replay,
+/// and indexes the involved accounts.
+pub async fn store_transfer_counts(store: &(impl Store + 'static)) {
+    let (envelope, tid) = make_envelope(); // creates owners 1 and 99
+    let record = EnvelopeRecord {
+        envelope,
+        receipt: Receipt { transfer_id: tid },
+        created_at: 1000,
+    };
+    let involved = [AccountId::new(1), AccountId::new(99)];
 
-    // History indexes BOTH the created owner (2) and the consumed-only owner (1).
-    // Account 2 appears only in this transfer; account 1 appears here and in the
-    // seed transfer that funded it, so its history contains this transfer.
+    assert_eq!(store.store_transfer(record.clone(), &involved).await.unwrap(), 1);
+    // replay → 0
+    assert_eq!(store.store_transfer(record, &involved).await.unwrap(), 0);
+    assert!(store.get_transfer(&tid).await.unwrap().is_some());
     assert_eq!(
-        store
-            .get_transfers_for_account(&AccountId::new(2))
-            .await
-            .unwrap()
-            .len(),
+        store.get_transfers_for_account(&AccountId::new(1)).await.unwrap().len(),
         1
     );
-    assert!(
-        store
-            .get_transfers_for_account(&AccountId::new(1))
-            .await
-            .unwrap()
-            .iter()
-            .any(|r| r.receipt.transfer_id == tid)
-    );
-
-    // The event was appended in the same commit.
-    assert_eq!(store.get_events_since(0, 10).await.unwrap().len(), 1);
-}
-
-/// A second commit of the same transfer id is a no-op (idempotent).
-pub async fn commit_transfer_idempotent(store: &(impl Store + 'static)) {
-    let consumed = make_posting([7; 32], 0, 1, 1, 100);
-    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
-    let created = make_posting([8; 32], 0, 2, 1, 100);
-    let tid = EnvelopeId([8; 32]);
-    store
-        .commit_transfer(CommitRequest {
-            deactivate: &[],
-            create: std::slice::from_ref(&created),
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: commit_record(tid, vec![]),
-            events: &[],
-        })
-        .await
-        .unwrap();
-    // Second commit returns Ok without inserting a duplicate posting/event.
-    store
-        .commit_transfer(CommitRequest {
-            deactivate: &[],
-            create: std::slice::from_ref(&created),
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: commit_record(tid, vec![]),
-            events: &[],
-        })
-        .await
-        .unwrap();
-    assert!(store.get_events_since(0, 10).await.unwrap().is_empty());
-}
-
-/// commit_transfer rejects consuming a posting reserved by a different saga.
-pub async fn commit_transfer_reservation_mismatch(store: &(impl Store + 'static)) {
-    let consumed = make_posting([7; 32], 0, 1, 1, 100);
-    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
-    // Reserved under reservation 1.
-    store
-        .reserve_postings(&[consumed.id], ReservationId::new(1))
-        .await
-        .unwrap();
-
-    let created = make_posting([8; 32], 0, 2, 1, 100);
-    let tid = EnvelopeId([8; 32]);
-    // Committing under reservation 2 must fail.
-    let err = store
-        .commit_transfer(CommitRequest {
-            deactivate: &[consumed.id],
-            create: std::slice::from_ref(&created),
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: Some(ReservationId::new(2)),
-            record: commit_record(tid, vec![consumed.id]),
-            events: &[],
-        })
-        .await
-        .unwrap_err();
-    assert!(matches!(err, StoreError::ReservationMismatch(_)));
-}
-
-/// commit_transfer aborts with Conflict when a CAS guard's balance is stale.
-pub async fn commit_transfer_cas_conflict(store: &(impl Store + 'static)) {
-    let consumed = make_posting([7; 32], 0, 1, 1, 100);
-    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
-    let created = make_posting([8; 32], 0, 2, 1, 100);
-    let tid = EnvelopeId([8; 32]);
-    // Guard claims account 1 holds 50, but it actually holds 100.
-    let err = store
-        .commit_transfer(CommitRequest {
-            deactivate: &[consumed.id],
-            create: std::slice::from_ref(&created),
-            cas_guards: &[(AccountId::new(1), AssetId::new(1), Cent::from(50))],
-            account_guards: &[],
-            reservation: None,
-            record: commit_record(tid, vec![consumed.id]),
-            events: &[],
-        })
-        .await
-        .unwrap_err();
-    assert!(matches!(err, StoreError::Conflict { .. }));
-    // The transfer was not committed.
-    assert!(store.get_transfer(&tid).await.unwrap().is_none());
 }
 
 // ---------------------------------------------------------------------------
-// Race regressions — the conditional-update / guard fixes. Expressed
-// sequentially (the conformance harness holds a single `&store`); the second
-// attempt is what must fail.
+// Reservation / double-spend regressions (sequential — the conformance harness
+// holds a single `&store`; the second attempt is what must report zero).
 // ---------------------------------------------------------------------------
 
-/// Reserving an already-reserved posting fails — no two reservations can own it.
-pub async fn reserve_twice_second_fails(store: &(impl Store + 'static)) {
+/// A posting reserved by one reservation cannot be reserved by another: the
+/// second reserve flips zero rows (the saga reads the count to know it lost).
+pub async fn reserve_twice_second_zero(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     seed_active(store, 200, std::slice::from_ref(&p1)).await;
 
-    store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
-    let err = store
-        .reserve_postings(&[p1.id], ReservationId::new(2))
-        .await
-        .unwrap_err();
-    assert!(matches!(err, StoreError::PostingNotActive(_)));
+    assert_eq!(store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap(), 1);
+    assert_eq!(store.reserve_postings(&[p1.id], ReservationId::new(2)).await.unwrap(), 0);
 }
 
-/// A posting cannot be consumed twice: once committed (Inactive), a second raw
-/// commit consuming it is rejected — the double-spend guard.
-pub async fn commit_double_spend_second_fails(store: &(impl Store + 'static)) {
+/// A posting cannot be deactivated twice: once Inactive, a second raw deactivate
+/// reports zero — the double-spend guard at the storage layer.
+pub async fn deactivate_twice_second_zero(store: &(impl Store + 'static)) {
     let consumed = make_posting([7; 32], 0, 1, 1, 100);
     seed_active(store, 200, std::slice::from_ref(&consumed)).await;
 
-    let created1 = make_posting([8; 32], 0, 2, 1, 100);
-    store
-        .commit_transfer(CommitRequest {
-            deactivate: &[consumed.id],
-            create: std::slice::from_ref(&created1),
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: commit_record(EnvelopeId([8; 32]), vec![consumed.id]),
-            events: &[],
-        })
-        .await
-        .unwrap();
-
-    let created2 = make_posting([9; 32], 0, 2, 1, 100);
-    let err = store
-        .commit_transfer(CommitRequest {
-            deactivate: &[consumed.id],
-            create: std::slice::from_ref(&created2),
-            cas_guards: &[],
-            account_guards: &[],
-            reservation: None,
-            record: commit_record(EnvelopeId([9; 32]), vec![consumed.id]),
-            events: &[],
-        })
-        .await
-        .unwrap_err();
-    assert!(matches!(err, StoreError::ReservationMismatch(_)));
+    assert_eq!(store.deactivate_postings(&[consumed.id], None).await.unwrap(), 1);
+    assert_eq!(store.deactivate_postings(&[consumed.id], None).await.unwrap(), 0);
 }
 
-/// A commit whose pinned account version is stale aborts with VersionConflict,
-/// closing the validate→commit window against a concurrent lifecycle mutation.
-pub async fn commit_stale_account_guard_fails(store: &(impl Store + 'static)) {
-    let acc = make_account(1, AccountPolicy::NoOverdraft); // version 1
-    store.create_account(acc.clone()).await.unwrap();
-    // A concurrent freeze/close would bump the version like this.
-    let mut bumped = acc.clone();
-    bumped.version = 2;
-    store.append_account_version(bumped).await.unwrap();
-
-    let created = make_posting([8; 32], 0, 1, 1, 100);
-    let err = store
-        .commit_transfer(CommitRequest {
-            deactivate: &[],
-            create: std::slice::from_ref(&created),
-            cas_guards: &[],
-            account_guards: &[(AccountId::new(1), 1)],
-            reservation: None,
-            record: commit_record(EnvelopeId([8; 32]), vec![]),
-            events: &[],
-        })
-        .await
-        .unwrap_err();
-    assert!(matches!(err, StoreError::VersionConflict { .. }));
+/// `append_event` is idempotent on a transfer's dedup key: re-appending the same
+/// `TransferCommitted` returns the existing seq and does not duplicate the row.
+pub async fn append_event_idempotent(store: &(impl Store + 'static)) {
+    let event = LedgerEvent {
+        seq: 0,
+        timestamp: 1000,
+        kind: LedgerEventKind::TransferCommitted {
+            transfer_id: EnvelopeId([8; 32]),
+        },
+    };
+    let seq1 = store.append_event(&event).await.unwrap();
+    let seq2 = store.append_event(&event).await.unwrap();
+    assert_eq!(seq1, seq2);
+    assert_eq!(store.get_events_since(0, 10).await.unwrap().len(), 1);
 }
 
 // ---------------------------------------------------------------------------
@@ -1032,19 +849,19 @@ macro_rules! store_tests {
             get_postings_by_account_filters,
             query_postings_pagination,
             reserve_postings_batch,
-            reserve_non_active_fails,
+            reserve_skips_non_active,
             release_postings_batch,
             release_active_is_noop,
-            release_inactive_fails,
+            release_inactive_zero,
             commit_deactivates_postings,
-            // CommitStore
-            commit_transfer_atomic,
-            commit_transfer_idempotent,
-            commit_transfer_reservation_mismatch,
-            commit_transfer_cas_conflict,
-            reserve_twice_second_fails,
-            commit_double_spend_second_fails,
-            commit_stale_account_guard_fails,
+            insert_postings_counts,
+            deactivate_postings_counts,
+            deactivate_postings_saga_path,
+            store_transfer_counts,
+            // Reservation / double-spend regressions
+            reserve_twice_second_zero,
+            deactivate_twice_second_zero,
+            append_event_idempotent,
             // TransferStore
             commit_and_get_transfer,
             get_missing_transfer,

+ 1 - 0
crates/kuatia/Cargo.toml

@@ -15,6 +15,7 @@ kuatia-storage = { path = "../kuatia-storage" }
 legend = "0.1"
 tokio = { version = "1", features = ["sync", "rt", "macros"] }
 serde = { version = "1", features = ["derive"] }
+serde_json = "1"
 async-trait = "0.1"
 tracing = "0.1"
 

+ 25 - 11
crates/kuatia/README.md

@@ -25,17 +25,31 @@ let receipt = ledger.commit(transfer).await?;
 | `.withdraw(from, asset, amount, external)` | Send value to an external destination |
 | `.movement(from, to, asset, amount)` | Raw movement for custom operations |
 
-### Saga commit
-
-`commit(transfer)` runs the four-step pipeline with automatic compensation:
-1. **Resolve** — convert Transfer intent into concrete Envelope
-2. **Reserve** — batch CAS: Active → PendingInactive
-3. **Validate** — pure `validate_and_plan()`
-4. **Finalize** — one atomic `commit_transfer`: deactivate consumed postings, create new ones, persist the transfer record and account index, and emit the event — all in a single transaction
-
-### Atomic commit
-
-`commit_atomic(envelope)` — single-pass load → plan → apply. Used by `reverse()`.
+### Commit
+
+Every commit is the **envelope saga** (reserve → validate → finalize), driven by
+`legend` with automatic retry and LIFO compensation:
+
+- `commit(transfer)` — resolves the intent into a concrete envelope (read-only),
+  then runs `commit_envelope`.
+- `commit_envelope(envelope)` — the one commit path. Persists a write-ahead
+  `PendingSaga` record, then:
+  1. **Reserve** — `reserve_postings`: Active → PendingInactive, stamped with this saga's `ReservationId`
+  2. **Validate** — pure `validate_and_plan()`
+  3. **Finalize** — the dumb, idempotent primitives in sequence: `deactivate_postings` → `insert_postings` → `store_transfer` → `append_event`
+- `reverse(id)` — builds a reversal envelope and runs the same path.
+
+The store reports an **affected-row count** for each primitive; the saga
+interprets it (full = continue, partial = error → compensate, zero = read state
+and continue only if this same envelope already applied it). There is no
+monolithic `commit_transfer` and no separate "atomic" path.
+
+### Crash recovery
+
+`recover()` — call on startup. It force-completes any `PendingSaga` left by a
+crash, pushing the envelope through the idempotent primitives so a commit
+interrupted at any point (pre-reserve, reserved, or mid-finalize) converges to
+the committed state. Roll-forward, not rollback.
 
 ### Account lifecycle
 

+ 5 - 1
crates/kuatia/examples/create_accounts.rs

@@ -63,5 +63,9 @@ async fn connect() -> Result<Arc<Ledger>, Box<dyn std::error::Error>> {
         .await?;
     let store = SqlStore::new(pool);
     store.migrate().await?;
-    Ok(Arc::new(Ledger::new(store)))
+    let ledger = Arc::new(Ledger::new(store));
+    // On startup, finish any commit a crash interrupted (idempotent roll-forward).
+    // A clean store has nothing pending, so this returns 0.
+    ledger.recover().await?;
+    Ok(ledger)
 }

+ 4 - 1
crates/kuatia/examples/fund_and_trade.rs

@@ -96,5 +96,8 @@ async fn connect() -> Result<Arc<Ledger>, Box<dyn std::error::Error>> {
         .await?;
     let store = SqlStore::new(pool);
     store.migrate().await?;
-    Ok(Arc::new(Ledger::new(store)))
+    let ledger = Arc::new(Ledger::new(store));
+    // On startup, finish any commit a crash interrupted (idempotent roll-forward).
+    ledger.recover().await?;
+    Ok(ledger)
 }

+ 4 - 1
crates/kuatia/examples/withdraw.rs

@@ -71,5 +71,8 @@ async fn connect() -> Result<Arc<Ledger>, Box<dyn std::error::Error>> {
         .await?;
     let store = SqlStore::new(pool);
     store.migrate().await?;
-    Ok(Arc::new(Ledger::new(store)))
+    let ledger = Arc::new(Ledger::new(store));
+    // On startup, finish any commit a crash interrupted (idempotent roll-forward).
+    ledger.recover().await?;
+    Ok(ledger)
 }

+ 293 - 88
crates/kuatia/src/ledger.rs

@@ -22,28 +22,36 @@ pub(crate) fn now_millis() -> Result<i64, LedgerError> {
         .as_millis() as i64)
 }
 use crate::saga::{
-    FinalizeInput, FinalizeTransferStep, LedgerCtx, ReserveInput, ReservePostingsStep,
-    ResolveInput, ResolveStep, SagaError, ValidateInput, ValidateTransferStep,
+    FinalizeInput, FinalizeTransferStep, LedgerCtx, ReserveInput, ReservePostingsStep, SagaError,
+    ValidateInput, ValidateTransferStep,
 };
 use kuatia_storage::error::StoreError;
 use kuatia_storage::events::{LedgerEvent, LedgerEventKind};
-use kuatia_storage::store::{CommitRequest, EnvelopeRecord, Store};
+use kuatia_storage::store::{EnvelopeRecord, Store};
 
 #[allow(missing_docs)]
-mod transfer_saga {
+mod envelope_saga {
     use super::*;
     legend! {
-        TransferSaga<LedgerCtx, SagaError> {
-            resolve: ResolveStep,
+        EnvelopeSaga<LedgerCtx, SagaError> {
             reserve: ReservePostingsStep,
             validate: ValidateTransferStep,
             finalize: FinalizeTransferStep,
         }
     }
 }
-use transfer_saga::*;
+use envelope_saga::*;
+
+/// Write-ahead record for an in-flight commit, persisted via `SagaStore` before
+/// the saga mutates anything and removed once it reaches a terminal state. On
+/// startup [`Ledger::recover`] re-drives any that survive a crash.
+#[derive(serde::Serialize, serde::Deserialize)]
+struct PendingSaga {
+    envelope: Envelope,
+    reservation: kuatia_core::ReservationId,
+}
 
-/// Async ledger resource composing the four-phase commit pipeline.
+/// Async ledger resource composing the commit pipeline.
 pub struct Ledger {
     store: Arc<dyn Store>,
 }
@@ -134,47 +142,6 @@ impl Ledger {
         Ok(validate_and_plan(input)?)
     }
 
-    /// Phase 3: apply the plan to storage and return a receipt.
-    #[instrument(skip(self, envelope, plan), name = "ledger.apply")]
-    pub async fn apply(
-        &self,
-        envelope: &Envelope,
-        plan: &kuatia_core::Plan,
-    ) -> Result<Receipt, LedgerError> {
-        let receipt = Receipt {
-            transfer_id: plan.transfer_id,
-        };
-
-        // Raw path: consumed postings are Active (never reserved), so pass
-        // `reservation: None`. Postings, transfer record, account index, and
-        // event commit atomically.
-        let events = [LedgerEvent {
-            seq: 0,
-            timestamp: now_millis()?,
-            kind: LedgerEventKind::TransferCommitted {
-                transfer_id: receipt.transfer_id,
-            },
-        }];
-
-        self.store
-            .commit_transfer(CommitRequest {
-                deactivate: &plan.postings_to_deactivate,
-                create: &plan.postings_to_create,
-                cas_guards: &plan.cas_guards,
-                account_guards: &plan.account_guards,
-                reservation: None,
-                record: EnvelopeRecord {
-                    envelope: envelope.clone(),
-                    receipt: receipt.clone(),
-                    created_at: now_millis()?,
-                },
-                events: &events,
-            })
-            .await?;
-
-        Ok(receipt)
-    }
-
     // -----------------------------------------------------------------------
     // Resolve: Transfer (intent) -> Envelope (concrete postings)
     // -----------------------------------------------------------------------
@@ -283,12 +250,31 @@ impl Ledger {
     }
 
     // -----------------------------------------------------------------------
-    // Atomic commit (raw path -- used by reverse() and internal callers)
+    // Commit: every commit is the envelope saga (reserve -> validate -> finalize)
     // -----------------------------------------------------------------------
 
-    /// Load, validate, and apply an envelope in one shot (no saga).
-    #[instrument(skip(self, envelope), name = "ledger.commit_atomic")]
-    pub async fn commit_atomic(&self, mut envelope: Envelope) -> Result<Receipt, LedgerError> {
+    /// Commit a [`Transfer`] intent. Resolves it into a concrete envelope, then
+    /// drives the envelope saga. Resolution is read-only, so a crash before the
+    /// saga's write-ahead record leaves no partial state.
+    #[instrument(skip(self, transfer), fields(book = transfer.book.0), name = "ledger.commit")]
+    pub async fn commit(self: &Arc<Self>, transfer: Transfer) -> Result<Receipt, LedgerError> {
+        let envelope = self.resolve(&transfer).await?;
+        self.commit_envelope(envelope).await
+    }
+
+    /// Commit a pre-resolved [`Envelope`] through the saga pipeline (reserve ->
+    /// validate -> finalize). This is the single commit path; `commit()` and
+    /// `reverse()` both funnel through it.
+    ///
+    /// Before running, the saga (envelope + reservation) is persisted as a
+    /// pending record so a crash mid-commit is completed by [`recover`]. The
+    /// record is deleted once the saga reaches a terminal state. The commit is
+    /// idempotent on the content-addressed transfer id.
+    #[instrument(skip(self, envelope), name = "ledger.commit_envelope")]
+    pub async fn commit_envelope(
+        self: &Arc<Self>,
+        mut envelope: Envelope,
+    ) -> Result<Receipt, LedgerError> {
         if envelope.account_snapshots().is_empty() {
             let mut ids: Vec<AccountId> = envelope.creates().iter().map(|p| p.owner).collect();
             ids.sort();
@@ -296,76 +282,172 @@ impl Ledger {
             envelope.set_account_snapshots(self.resolve_snapshots(&ids).await?);
         }
 
+        // Idempotency: an already-committed transfer returns its receipt.
         let tid = envelope_id(&envelope);
         if let Some(record) = self.store.get_transfer(&tid).await? {
             return Ok(record.receipt);
         }
 
-        let loaded = self.load(&envelope).await?;
-        let plan = self.plan(&envelope, &loaded)?;
-        self.apply(&envelope, &plan).await
-    }
+        // Write-ahead: persist {envelope, reservation} so recovery can re-drive.
+        let reservation = kuatia_core::ReservationId::default();
+        let saga_id = reservation.0;
+        let blob = serde_json::to_vec(&PendingSaga {
+            envelope: envelope.clone(),
+            reservation,
+        })
+        .map_err(|e| LedgerError::Store(StoreError::Internal(e.to_string())))?;
+        self.store.save_saga(&saga_id, blob).await?;
 
-    // -----------------------------------------------------------------------
-    // Saga commit: resolve -> reserve -> validate -> finalize (via legend)
-    // -----------------------------------------------------------------------
+        let result = self.drive_envelope_saga(envelope, reservation).await;
 
-    /// Commit a transfer intent using the saga pipeline driven by legend.
-    ///
-    /// Steps: resolve movements into envelope -> reserve consumed postings ->
-    /// validate -> finalize.
-    /// On failure, legend compensates completed steps in reverse order.
-    #[instrument(skip(self, transfer), fields(book = transfer.book.0), name = "ledger.commit")]
-    pub async fn commit(self: &Arc<Self>, transfer: Transfer) -> Result<Receipt, LedgerError> {
-        let saga = TransferSaga::new(TransferSagaInputs {
-            resolve: ResolveInput {
-                transfer: transfer.clone(),
-            },
+        // Terminal: drop the pending record whether we committed or compensated.
+        self.store.delete_saga(&saga_id).await?;
+        result
+    }
+
+    /// Build and run the envelope saga to a terminal outcome, returning the
+    /// resulting receipt.
+    async fn drive_envelope_saga(
+        self: &Arc<Self>,
+        envelope: Envelope,
+        reservation: kuatia_core::ReservationId,
+    ) -> Result<Receipt, LedgerError> {
+        let saga = EnvelopeSaga::new(EnvelopeSagaInputs {
             reserve: ReserveInput,
             validate: ValidateInput,
             finalize: FinalizeInput,
         });
-
-        let ctx = LedgerCtx::new(Arc::clone(self));
+        let ctx = LedgerCtx::for_envelope(Arc::clone(self), envelope, reservation);
         let execution = saga.build(ctx);
 
         match execution.start().await {
             ExecutionResult::Completed(e) => {
                 let ctx = e.into_context();
                 ctx.receipts.last().cloned().ok_or_else(|| {
-                    LedgerError::Store(kuatia_storage::error::StoreError::Internal(
+                    LedgerError::Store(StoreError::Internal(
                         "saga completed but no receipt".into(),
                     ))
                 })
             }
-            ExecutionResult::Failed(_, err) => Err(LedgerError::Store(
-                kuatia_storage::error::StoreError::Internal(err.message),
-            )),
+            ExecutionResult::Failed(_, err) => {
+                Err(LedgerError::Store(StoreError::Internal(err.message)))
+            }
             ExecutionResult::CompensationFailed {
                 original_error,
                 compensation_error,
                 ..
             } => Err(LedgerError::CompensationFailed {
-                original: Box::new(LedgerError::Store(
-                    kuatia_storage::error::StoreError::Internal(original_error.message),
-                )),
-                compensation: Box::new(LedgerError::Store(
-                    kuatia_storage::error::StoreError::Internal(compensation_error.message),
-                )),
+                original: Box::new(LedgerError::Store(StoreError::Internal(
+                    original_error.message,
+                ))),
+                compensation: Box::new(LedgerError::Store(StoreError::Internal(
+                    compensation_error.message,
+                ))),
             }),
-            ExecutionResult::Paused(_) => Err(LedgerError::Store(
-                kuatia_storage::error::StoreError::Internal("saga paused unexpectedly".into()),
-            )),
+            ExecutionResult::Paused(_) => Err(LedgerError::Store(StoreError::Internal(
+                "saga paused unexpectedly".into(),
+            ))),
         }
     }
 
+    /// Re-drive every pending saga to completion. Call on startup to recover
+    /// commits interrupted by a crash, returning how many were processed.
+    ///
+    /// Recovery does **not** re-run reserve/validate — those reject already-
+    /// consumed postings, and the envelope was already validated when first
+    /// committed. Instead it force-completes the envelope through the idempotent
+    /// primitives with the original reservation, so a crash at any point
+    /// (pre-reserve, reserved, or mid-finalize) converges to the committed state.
+    #[instrument(skip(self), name = "ledger.recover")]
+    pub async fn recover(self: &Arc<Self>) -> Result<usize, LedgerError> {
+        let pending = self.store.list_pending_sagas().await?;
+        let count = pending.len();
+        for (saga_id, blob) in pending {
+            let PendingSaga {
+                envelope,
+                reservation,
+            } = serde_json::from_slice(&blob)
+                .map_err(|e| LedgerError::Store(StoreError::Internal(e.to_string())))?;
+            self.complete_envelope(&envelope, reservation).await?;
+            self.store.delete_saga(&saga_id).await?;
+        }
+        Ok(count)
+    }
+
+    /// Idempotently push `envelope`'s postings and record to their committed
+    /// state. Safe to call from any partial point: each primitive no-ops what is
+    /// already done. Used by [`recover`].
+    async fn complete_envelope(
+        &self,
+        envelope: &Envelope,
+        reservation: kuatia_core::ReservationId,
+    ) -> Result<(), LedgerError> {
+        let tid = envelope_id(envelope);
+        if self.store.get_transfer(&tid).await?.is_some() {
+            return Ok(()); // already committed
+        }
+
+        let consumes = envelope.consumes();
+        // Reserve then deactivate: this drives Active → PendingInactive → Inactive,
+        // and each call no-ops anything already past that state.
+        self.store.reserve_postings(consumes, reservation).await?;
+        self.store
+            .deactivate_postings(consumes, Some(reservation))
+            .await?;
+
+        let created: Vec<Posting> = envelope
+            .creates()
+            .iter()
+            .enumerate()
+            .map(|(i, np)| {
+                Posting::new(
+                    PostingId {
+                        transfer: tid,
+                        index: i as u16,
+                    },
+                    np.owner,
+                    np.asset,
+                    np.value,
+                )
+            })
+            .collect();
+        self.store.insert_postings(&created).await?;
+
+        let mut involved: Vec<AccountId> = created.iter().map(|p| p.owner).collect();
+        if !consumes.is_empty() {
+            let consumed = self.store.get_postings(consumes).await?;
+            involved.extend(consumed.iter().map(|p| p.owner));
+        }
+        involved.sort();
+        involved.dedup();
+
+        self.store
+            .store_transfer(
+                EnvelopeRecord {
+                    envelope: envelope.clone(),
+                    receipt: Receipt { transfer_id: tid },
+                    created_at: now_millis()?,
+                },
+                &involved,
+            )
+            .await?;
+        self.store
+            .append_event(&LedgerEvent {
+                seq: 0,
+                timestamp: now_millis()?,
+                kind: LedgerEventKind::TransferCommitted { transfer_id: tid },
+            })
+            .await?;
+        Ok(())
+    }
+
     // -----------------------------------------------------------------------
     // Reverse
     // -----------------------------------------------------------------------
 
     /// Create and commit a reversal envelope for the given envelope id.
     #[instrument(skip(self), name = "ledger.reverse")]
-    pub async fn reverse(&self, id: &EnvelopeId) -> Result<Receipt, LedgerError> {
+    pub async fn reverse(self: &Arc<Self>, id: &EnvelopeId) -> Result<Receipt, LedgerError> {
         let record = self
             .store
             .get_transfer(id)
@@ -407,7 +489,7 @@ impl Ledger {
             .metadata(original.metadata().clone())
             .build();
 
-        self.commit_atomic(reverse_envelope).await
+        self.commit_envelope(reverse_envelope).await
     }
 
     // -----------------------------------------------------------------------
@@ -654,3 +736,126 @@ pub struct LoadedState {
     /// The book gating this transfer, if one is loaded (`None` = unrestricted default).
     pub book: Option<Book>,
 }
+
+#[cfg(test)]
+mod recovery_tests {
+    use super::*;
+    use kuatia_core::{Account, AccountFlags, TransferBuilder, UserData};
+    use kuatia_storage::mem_store::InMemoryStore;
+    use std::collections::BTreeMap;
+
+    fn acct(id: i64, policy: AccountPolicy) -> Account {
+        Account {
+            id: AccountId::new(id),
+            version: 1,
+            policy,
+            flags: AccountFlags::empty(),
+            book: kuatia_core::BookId(0),
+            user_data: UserData::default(),
+            metadata: BTreeMap::new(),
+        }
+    }
+
+    /// A commit interrupted right after its write-ahead record (before any step)
+    /// is completed by `recover()`: the postings move and the record is cleared.
+    #[tokio::test]
+    async fn recover_redrives_pending_saga() {
+        let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
+        for (id, p) in [
+            (1, AccountPolicy::NoOverdraft),
+            (2, AccountPolicy::NoOverdraft),
+            (99, AccountPolicy::ExternalAccount),
+        ] {
+            ledger.store().create_account(acct(id, p)).await.unwrap();
+        }
+        // Fund account 1.
+        let deposit = TransferBuilder::new()
+            .deposit(AccountId::new(1), AssetId::new(1), Cent::from(100), AccountId::new(99))
+            .unwrap()
+            .build();
+        ledger.commit(deposit).await.unwrap();
+
+        // Resolve a pay envelope but persist it as a pending saga WITHOUT running
+        // it — simulating a crash right after the write-ahead record.
+        let pay = TransferBuilder::new()
+            .pay(AccountId::new(1), AccountId::new(2), AssetId::new(1), Cent::from(40))
+            .build();
+        let envelope = ledger.resolve(&pay).await.unwrap();
+        let reservation = kuatia_core::ReservationId::default();
+        let blob = serde_json::to_vec(&PendingSaga {
+            envelope,
+            reservation,
+        })
+        .unwrap();
+        ledger.store().save_saga(&reservation.0, blob).await.unwrap();
+
+        // Recover re-drives it to completion.
+        assert_eq!(ledger.recover().await.unwrap(), 1);
+        assert_eq!(
+            ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(),
+            Cent::from(40)
+        );
+        assert_eq!(
+            ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(),
+            Cent::from(60)
+        );
+        assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
+    }
+
+    /// A commit that crashed **mid-finalize** — consumed posting already flipped
+    /// to Inactive but the transfer record not yet written — is still completed by
+    /// `recover()` (reserve/validate are skipped; the primitives no-op the done work).
+    #[tokio::test]
+    async fn recover_completes_partial_finalize() {
+        let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
+        for (id, p) in [
+            (1, AccountPolicy::NoOverdraft),
+            (2, AccountPolicy::NoOverdraft),
+            (99, AccountPolicy::ExternalAccount),
+        ] {
+            ledger.store().create_account(acct(id, p)).await.unwrap();
+        }
+        let deposit = TransferBuilder::new()
+            .deposit(AccountId::new(1), AssetId::new(1), Cent::from(100), AccountId::new(99))
+            .unwrap()
+            .build();
+        ledger.commit(deposit).await.unwrap();
+
+        // Resolve a pay envelope and manually run the commit halfway: reserve the
+        // consumed posting and deactivate it (now Inactive), then "crash" — the
+        // transfer record and created postings were never written.
+        let pay = TransferBuilder::new()
+            .pay(AccountId::new(1), AccountId::new(2), AssetId::new(1), Cent::from(40))
+            .build();
+        let envelope = ledger.resolve(&pay).await.unwrap();
+        let reservation = kuatia_core::ReservationId::default();
+        let consumes = envelope.consumes().to_vec();
+        ledger.store().reserve_postings(&consumes, reservation).await.unwrap();
+        let n = ledger
+            .store()
+            .deactivate_postings(&consumes, Some(reservation))
+            .await
+            .unwrap();
+        assert_eq!(n, 1); // consumed posting is now Inactive
+
+        let blob = serde_json::to_vec(&PendingSaga {
+            envelope,
+            reservation,
+        })
+        .unwrap();
+        ledger.store().save_saga(&reservation.0, blob).await.unwrap();
+
+        // Recovery finishes the commit despite reserve/validate being unable to
+        // re-run over the already-consumed posting.
+        assert_eq!(ledger.recover().await.unwrap(), 1);
+        assert_eq!(
+            ledger.balance(&AccountId::new(2), &AssetId::new(1)).await.unwrap(),
+            Cent::from(40)
+        );
+        assert_eq!(
+            ledger.balance(&AccountId::new(1), &AssetId::new(1)).await.unwrap(),
+            Cent::from(60)
+        );
+        assert!(ledger.store().list_pending_sagas().await.unwrap().is_empty());
+    }
+}

+ 149 - 47
crates/kuatia/src/saga.rs

@@ -1,21 +1,24 @@
 //! Legend saga step adapters for the ledger.
 //!
 //! Provides [`Step`](legend::Step) implementations so the ledger can participate
-//! in multi-resource saga workflows. Each step commits a transfer in `execute`
-//! and reverses it in `compensate`, giving you automatic rollback across
+//! in multi-resource saga workflows, with automatic LIFO compensation across
 //! resource boundaries.
 //!
-//! # Transfer pipeline saga
+//! # Envelope pipeline saga
 //!
-//! The core transfer pipeline is broken into four saga steps:
+//! A commit is three saga steps over a pre-resolved [`Envelope`] (resolution
+//! runs before the saga, in `Ledger::commit`):
 //!
-//! 1. **ResolveStep** -- resolve a `Transfer` intent into an `Envelope`
-//! 2. **ReservePostingsStep** -- CAS each consumed posting from Active to PendingInactive
-//! 3. **ValidateTransferStep** -- load accounts/balances, run `validate_and_plan()`
-//! 4. **FinalizeTransferStep** -- PendingInactive to Inactive, create new postings, store envelope
+//! 1. **ReservePostingsStep** -- `reserve_postings`: Active → PendingInactive, stamped with the saga's `ReservationId`
+//! 2. **ValidateTransferStep** -- load accounts/balances, run `validate_and_plan()`
+//! 3. **FinalizeTransferStep** -- the dumb primitives in sequence: `deactivate_postings` → `insert_postings` → `store_transfer` → `append_event`
 //!
-//! The `TransferSaga` is defined via `legend!` in `ledger.rs` and driven by
-//! `commit()`.
+//! Each step issues dumb storage instructions and **interprets the affected-row
+//! count** itself (full = continue; partial = error → compensate; zero = read
+//! state and continue only if this same envelope/reservation already applied it).
+//! See [`verify_postings`]. The `EnvelopeSaga` is defined via `legend!` in
+//! `ledger.rs` and driven by `commit_envelope()`; crash recovery re-completes a
+//! persisted saga via `Ledger::recover`.
 //!
 //! # High-level composition
 //!
@@ -30,14 +33,45 @@ use serde::{Deserialize, Serialize};
 use tracing::Instrument;
 
 use kuatia_core::{
-    AccountId, AssetId, Cent, Envelope, Plan, PlanInput, PostingId, Receipt, ReservationId,
-    Transfer, TransferBuilder, validate_and_plan,
+    AccountId, AssetId, Cent, Envelope, Plan, PlanInput, Posting, PostingId, PostingStatus, Receipt,
+    ReservationId, Transfer, TransferBuilder, validate_and_plan,
 };
 
 use crate::error::LedgerError;
 use crate::ledger::{Ledger, now_millis};
 use kuatia_storage::events::{LedgerEvent, LedgerEventKind};
-use kuatia_storage::store::{CommitRequest, EnvelopeRecord};
+use kuatia_storage::store::{EnvelopeRecord, Store};
+
+/// Interpret a dumb primitive's affected-row `count` against the `ids` it
+/// targeted. `count == ids.len()` is success. A short count is acceptable only if
+/// the shortfall is already in the desired end-state — a prior attempt (or this
+/// saga, replayed by recovery) already applied it — verified by reading the
+/// postings and checking `ok`. Otherwise it is a genuine failure (contended or
+/// concurrently modified) and the saga compensates.
+async fn verify_postings(
+    store: &dyn Store,
+    ids: &[PostingId],
+    count: u64,
+    ok: impl Fn(&Posting) -> bool,
+    what: &str,
+) -> Result<(), SagaError> {
+    if count == ids.len() as u64 {
+        return Ok(());
+    }
+    let postings = store
+        .get_postings(ids)
+        .await
+        .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
+    if postings.len() == ids.len() && postings.iter().all(&ok) {
+        return Ok(());
+    }
+    Err(SagaError {
+        message: format!(
+            "{what}: storage applied {count}/{} rows and the end-state is not satisfied",
+            ids.len()
+        ),
+    })
+}
 
 // ---------------------------------------------------------------------------
 // Saga error -- serializable + cloneable wrapper
@@ -117,6 +151,23 @@ impl LedgerCtx {
         }
     }
 
+    /// Create a context for the envelope pipeline (reserve → validate → finalize)
+    /// with a pre-resolved envelope and an explicit reservation.
+    pub fn for_envelope(
+        ledger: Arc<Ledger>,
+        envelope: Envelope,
+        reservation: ReservationId,
+    ) -> Self {
+        Self {
+            receipts: Vec::new(),
+            reserved_postings: Vec::new(),
+            plan: None,
+            envelope: Some(envelope),
+            reservation,
+            ledger: Some(ledger),
+        }
+    }
+
     /// Re-inject the ledger handle after deserializing a paused execution.
     pub fn inject_ledger(&mut self, ledger: Arc<Ledger>) {
         self.ledger = Some(ledger);
@@ -204,16 +255,32 @@ impl Step<LedgerCtx, SagaError> for ReservePostingsStep {
 
     async fn execute(ctx: &mut LedgerCtx, _input: &ReserveInput) -> Result<StepOutcome, SagaError> {
         async {
-            let envelope = ctx.envelope.as_ref().ok_or(SagaError {
-                message: "no envelope in context -- resolve step must run first".into(),
-            })?;
-            let posting_ids: Vec<PostingId> = envelope.consumes().to_vec();
-
-            ctx.ledger()?
-                .store()
-                .reserve_postings(&posting_ids, ctx.reservation)
+            let posting_ids: Vec<PostingId> = ctx
+                .envelope
+                .as_ref()
+                .ok_or(SagaError {
+                    message: "no envelope in context -- resolve step must run first".into(),
+                })?
+                .consumes()
+                .to_vec();
+            let rid = ctx.reservation;
+            let ledger = ctx.ledger_arc()?;
+            let store = ledger.store();
+
+            let reserved = store
+                .reserve_postings(&posting_ids, rid)
                 .await
                 .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
+            // Storage reports the count; the saga decides. A short count is fine
+            // only if the shortfall is already reserved by us (idempotent replay).
+            verify_postings(
+                store,
+                &posting_ids,
+                reserved,
+                |p| p.status == PostingStatus::PendingInactive && p.reservation == Some(rid),
+                "reserve",
+            )
+            .await?;
             ctx.reserved_postings.extend_from_slice(&posting_ids);
             Ok(StepOutcome::Continue)
         }
@@ -320,40 +387,75 @@ impl Step<LedgerCtx, SagaError> for FinalizeTransferStep {
             let plan = ctx.plan.take().ok_or(SagaError {
                 message: "no plan in context -- validate step must run first".into(),
             })?;
-
-            let envelope = ctx.envelope.as_ref().ok_or(SagaError {
-                message: "no envelope in context -- resolve step must run first".into(),
-            })?;
-
-            let store = ctx.ledger()?.store();
-
+            let rid = ctx.reservation;
+            let ledger = ctx.ledger_arc()?;
+            let store = ledger.store();
             let receipt = Receipt {
                 transfer_id: plan.transfer_id,
             };
 
-            // Saga path: our postings are PendingInactive under `ctx.reservation`.
-            // Postings, transfer record, account index, and event commit atomically.
-            let events = [LedgerEvent {
-                seq: 0,
-                timestamp: now_millis().map_err(SagaError::from)?,
-                kind: LedgerEventKind::TransferCommitted {
-                    transfer_id: receipt.transfer_id,
-                },
-            }];
+            // Commit is a sequence of dumb, idempotent primitives. Each is its own
+            // atomic update; the saga sequences them and a crash mid-sequence is
+            // completed by idempotent roll-forward in recovery.
+
+            // 1. Deactivate the reserved consumed postings (saga path).
+            let deactivated = store
+                .deactivate_postings(&plan.postings_to_deactivate, Some(rid))
+                .await
+                .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
+            verify_postings(
+                store,
+                &plan.postings_to_deactivate,
+                deactivated,
+                |p| p.status == PostingStatus::Inactive,
+                "deactivate",
+            )
+            .await?;
+
+            // Involved accounts to index: created owners + consumed owners. The
+            // saga supplies the set so storage computes nothing.
+            let mut involved: Vec<AccountId> =
+                plan.postings_to_create.iter().map(|p| p.owner).collect();
+            if !plan.postings_to_deactivate.is_empty() {
+                let consumed = store
+                    .get_postings(&plan.postings_to_deactivate)
+                    .await
+                    .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
+                involved.extend(consumed.iter().map(|p| p.owner));
+            }
+            involved.sort();
+            involved.dedup();
+
+            // 2. Insert created postings (idempotent).
+            store
+                .insert_postings(&plan.postings_to_create)
+                .await
+                .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
 
+            // 3. Persist the transfer record + account index (idempotent).
+            let envelope = ctx.envelope.as_ref().ok_or(SagaError {
+                message: "no envelope in context -- resolve step must run first".into(),
+            })?;
             store
-                .commit_transfer(CommitRequest {
-                    deactivate: &plan.postings_to_deactivate,
-                    create: &plan.postings_to_create,
-                    cas_guards: &plan.cas_guards,
-                    account_guards: &plan.account_guards,
-                    reservation: Some(ctx.reservation),
-                    record: EnvelopeRecord {
+                .store_transfer(
+                    EnvelopeRecord {
                         envelope: envelope.clone(),
                         receipt: receipt.clone(),
                         created_at: now_millis().map_err(SagaError::from)?,
                     },
-                    events: &events,
+                    &involved,
+                )
+                .await
+                .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
+
+            // 4. Append the committed event (idempotent on the transfer id).
+            store
+                .append_event(&LedgerEvent {
+                    seq: 0,
+                    timestamp: now_millis().map_err(SagaError::from)?,
+                    kind: LedgerEventKind::TransferCommitted {
+                        transfer_id: receipt.transfer_id,
+                    },
                 })
                 .await
                 .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
@@ -371,7 +473,7 @@ impl Step<LedgerCtx, SagaError> for FinalizeTransferStep {
         _input: &FinalizeInput,
     ) -> Result<CompensationOutcome, SagaError> {
         if let Some(receipt) = ctx.receipts.pop() {
-            ctx.ledger()?.reverse(&receipt.transfer_id).await?;
+            ctx.ledger_arc()?.reverse(&receipt.transfer_id).await?;
         }
         Ok(CompensationOutcome::Completed)
     }
@@ -432,7 +534,7 @@ async fn compensate_last_receipt(ctx: &mut LedgerCtx) -> Result<CompensationOutc
     let receipt = ctx.receipts.pop().ok_or(SagaError {
         message: "no receipt to compensate".into(),
     })?;
-    ctx.ledger()?.reverse(&receipt.transfer_id).await?;
+    ctx.ledger_arc()?.reverse(&receipt.transfer_id).await?;
     Ok(CompensationOutcome::Completed)
 }
 

+ 4 - 4
crates/kuatia/tests/integration.rs

@@ -252,8 +252,8 @@ async fn idempotent_commit() {
         ])
         .build();
 
-    let r1 = ledger.commit_atomic(envelope.clone()).await.unwrap();
-    let r2 = ledger.commit_atomic(envelope).await.unwrap();
+    let r1 = ledger.commit_envelope(envelope.clone()).await.unwrap();
+    let r2 = ledger.commit_envelope(envelope).await.unwrap();
 
     assert_eq!(r1.transfer_id, r2.transfer_id);
     // Balance should only be 100, not 200 (second commit was a no-op)
@@ -437,7 +437,7 @@ async fn fx_trade_via_market_account() {
         ])
         .build();
 
-    ledger.commit_atomic(envelope).await.unwrap();
+    ledger.commit_envelope(envelope).await.unwrap();
 
     // Verify
     assert_eq!(
@@ -715,7 +715,7 @@ async fn stale_snapshot_rejected() {
         .account_snapshots(vec![stale_snapshot])
         .build();
 
-    let result = ledger.commit_atomic(envelope).await;
+    let result = ledger.commit_envelope(envelope).await;
     assert!(result.is_err());
 }
 

+ 1 - 1
doc/accounting-mapping.md

@@ -86,7 +86,7 @@ These differ in **grain** — one record vs. the collection of all records.
   - **`Envelope`** is the *resolved* form produced by `resolve()`:
     `{ consumes: Vec<PostingId>, creates: Vec<NewPosting>, account_snapshots,
     book, … }`. It names the concrete postings to spend and create.
-  - Committing one (`commit` / `commit_atomic`) returns a **`Receipt {
+  - Committing one (`commit` / `commit_envelope`) returns a **`Receipt {
     transfer_id }`** identifying the committed envelope — the `EnvelopeId`, which
     is content-addressed (the double-SHA-256 of the canonical envelope bytes).
 - **Transfer log = the accounting journal.** The append-only, ordered sequence of

+ 2 - 2
doc/accounts.md

@@ -30,7 +30,7 @@ Each account has a policy that controls what balance constraints apply:
 
 An overdraft is represented as a **negative posting** (an offset position) assigned to the account to cover a shortfall. When an account's positive postings are insufficient for a debit, the resolve step consumes them all and creates a negative posting for the remainder. `NoOverdraft` accounts forbid this; validation rejects any transfer that would create a negative posting on a `NoOverdraft` account. `CappedOverdraft`'s floor bounds how negative the balance may go; `UncappedOverdraft`, `SystemAccount`, and `ExternalAccount` are unbounded.
 
-`CappedOverdraft` accounts emit CAS (Compare-And-Swap) guards during validation to prevent write-skew — two concurrent transfers could each pass validation independently but together push the balance below the floor. The guards are enforced atomically inside `commit_transfer` (the commit aborts with a retryable conflict if a guarded balance changed since validation).
+`CappedOverdraft`'s floor is checked during validation. Under the dumb-storage model there is no atomic re-check at commit, so the floor is **best-effort under concurrency** — two concurrent transfers could each pass validation independently but together push the balance below the floor (write-skew). Double-spend safety is unaffected: the reservation protocol (an atomic conditional `reserve_postings`) still guarantees a posting cannot be consumed twice. See [accounting-mapping.md](accounting-mapping.md) and the ADR at [adr/0001-dumb-storage-saga-recovery.md](adr/0001-dumb-storage-saga-recovery.md).
 
 ## Lifecycle
 
@@ -101,4 +101,4 @@ Boundary accounts representing the outside world (banks, payment processors). Th
 
 ### Credit accounts (`CappedOverdraft`)
 
-Accounts with a negative floor (e.g. credit lines). The floor is the maximum allowed overdraft. When the account's positive postings are insufficient for a debit, a negative posting is created to cover the shortfall, down to the floor. Write-skew prevention via CAS guards (enforced inside `commit_transfer`) ensures concurrent transfers respect the floor.
+Accounts with a negative floor (e.g. credit lines). The floor is the maximum allowed overdraft. When the account's positive postings are insufficient for a debit, a negative posting is created to cover the shortfall, down to the floor. The floor is enforced at validation time and is best-effort under concurrency (see above).

+ 85 - 0
doc/adr/0001-dumb-storage-saga-recovery.md

@@ -0,0 +1,85 @@
+# ADR 0001 — Dumb storage + durable saga recovery
+
+Status: Accepted
+
+## Context
+
+A prior change (commit `c715bd3`, "Make the Store the atomic invariant boundary
+for commits") introduced `CommitStore::commit_transfer` — a single database
+transaction that bundled ~8 responsibilities: deactivate consumed postings,
+insert created postings, persist the transfer record, index `transfer_accounts`
+(both created and consumed owners), check `CappedOverdraft` CAS balance guards,
+check account-version guards, enforce reservation ownership, and append events.
+It also left two confusing public commit entry points (`Ledger::commit` for
+intent, `Ledger::commit_atomic` for a pre-resolved envelope) that both funneled
+into it.
+
+Two problems motivated revisiting this:
+
+1. **The storage layer carried a lot of domain assumptions.** A "dumb record
+   keeper" was the stated goal, yet `commit_transfer` interpreted state, decided
+   idempotency, enforced guards, and chose error semantics.
+2. **Crash recovery was never finished.** `SagaStore`
+   (`save_saga`/`list_pending_sagas`/`delete_saga`) and `legend`'s pause/resume
+   plumbing existed but nothing used them; `ExecutionResult::Paused` was treated
+   as an error. So the saga ran entirely in memory and `commit_transfer`'s single
+   transaction was the *only* crash-safety.
+
+## Decision
+
+Invert the design.
+
+- **Storage is a dumb instruction follower.** Every `Store` write method applies
+  one update and returns the **number of affected rows** (`Result<u64,
+  StoreError>`), or a genuine I/O error. It never interprets the count, decides
+  state, enforces idempotency, or compensates. The conditional `WHERE` clause is
+  the instruction; the count is the result. `commit_transfer`, `CommitStore`,
+  `CommitRequest`, and the semantic write-outcome `StoreError` variants
+  (`Conflict`, `ReservationMismatch`, `PostingNotActive`, `PostingInactive`) are
+  removed. The write primitives are `reserve_postings`, `release_postings`,
+  `deactivate_postings`, `insert_postings`, `store_transfer(record, involved)`,
+  and an idempotent `append_event` (dedup on the transfer id).
+
+- **The saga owns interpretation and idempotency.** A commit is the saga calling
+  those primitives in sequence and reading each count: full = continue; partial =
+  error → compensate; **zero = read state and continue only if this same
+  envelope/reservation already applied it**. (`verify_postings` in `saga.rs`.)
+
+- **One commit path.** `commit(transfer)` resolves the intent into an envelope
+  (read-only) then runs `commit_envelope`, the envelope saga (reserve → validate
+  → finalize). `commit_envelope(envelope)` serves pre-built/FX envelopes;
+  `reverse()` uses it. `commit_atomic` is gone.
+
+- **Durable recovery via write-ahead + roll-forward.** `commit_envelope`
+  persists a `PendingSaga {envelope, reservation}` via `SagaStore` before
+  mutating anything, and deletes it on terminal. `Ledger::recover()` (startup)
+  force-completes any surviving pending saga through the idempotent primitives,
+  using the original reservation. It does **not** re-run reserve/validate (those
+  reject already-consumed postings); it converges from a crash at any point
+  (pre-reserve / reserved / mid-finalize). Because recovery is roll-forward, the
+  reservation protocol never leaves orphaned `PendingInactive` postings, so no
+  separate reconciliation pass is needed.
+
+`legend`'s pause/resume is for external waits, not crash checkpoints, so durable
+recovery is this write-ahead layer around legend, not serialization of the
+in-flight execution.
+
+## Consequences
+
+- **Double-spend safety: preserved.** It comes from the reservation protocol —
+  `reserve_postings` is a single atomic conditional update, so two sagas cannot
+  both claim the same posting.
+- **Crash-safety: preserved, differently.** Not one transaction, but write-ahead
+  + idempotent roll-forward. Nothing is silently lost; a crash mid-finalize is
+  completed by `recover()`.
+- **Overdraft floor + freeze/close guards: now best-effort under concurrency.**
+  They are checked at validation time, not re-checked atomically at commit (the
+  `cas_guards`/`account_guards` and their commit-time re-check are removed). A
+  concurrent, unrelated balance change or a freeze/close between validation and
+  finalize has a small TOCTOU window. Accepted tradeoff for a dumb storage layer.
+- **Simpler, more testable surface.** Storage has no domain logic; all commit
+  correctness lives in one place (the saga) with per-primitive count-conformance
+  tests and crash-injection recovery tests.
+
+This supersedes the `c715bd3` atomic-boundary decision and parts of the
+`93e35fe` follow-up (the conditional-update/guard hardening of `commit_transfer`).

+ 75 - 45
doc/architecture.md

@@ -38,7 +38,7 @@ This separation ensures the auditable heart of the system is fully deterministic
 
 ## Store Sub-Trait Architecture
 
-The `Store` trait is a composite of seven focused sub-traits, each responsible for a single domain:
+The `Store` trait is a composite of focused sub-traits, each responsible for a single domain. Every write method is a **dumb instruction**: it applies one update and returns the number of affected rows (or an I/O error). It never interprets the count, decides state, enforces idempotency, or compensates — the saga does.
 
 ```mermaid
 classDiagram
@@ -53,13 +53,14 @@ classDiagram
     class PostingStore {
         +get_postings(ids)
         +get_postings_by_account(account, asset?, status?)
-        +reserve_postings(ids, reservation)
-        +release_postings(ids, reservation)
-        +finalize_postings(deactivate, create)
+        +reserve_postings(ids, reservation) u64
+        +release_postings(ids, reservation) u64
+        +deactivate_postings(ids, reservation?) u64
+        +insert_postings(postings) u64
     }
     class TransferStore {
         +get_transfer(id)
-        +store_transfer(record)
+        +store_transfer(record, involved) u64
         +get_transfers_for_account(account)
         +query_transfers(query)
     }
@@ -77,9 +78,6 @@ classDiagram
         +get_book(id)
         +list_books()
     }
-    class CommitStore {
-        +commit_transfer(req)
-    }
     class Store {
         <<composite>>
     }
@@ -89,16 +87,18 @@ classDiagram
     Store --|> SagaStore
     Store --|> EventStore
     Store --|> BookStore
-    Store --|> CommitStore
 ```
 
-`CommitStore::commit_transfer` is the single atomic commit boundary — it applies posting deactivations/creations, the transfer record, the both-sided account index, and events in one transaction, enforcing `CappedOverdraft` CAS guards and reservation ownership.
+There is no single atomic commit boundary. A commit is a sequence of the dumb primitives above (`reserve_postings` → `deactivate_postings` → `insert_postings` → `store_transfer` → `append_event`), each its own atomic update and each idempotent. The saga sequences them and interprets their counts; a crash mid-sequence is completed by roll-forward recovery (see below).
 
-The store only persists and reads — all domain logic (balance computation, validation, policy enforcement) lives in the Ledger and `kuatia-core`.
+The store only persists and reads — all domain logic (balance computation, validation, policy enforcement, and the interpretation of primitive counts) lives in the Ledger/saga and `kuatia-core`.
 
 ## Saga Commit Pipeline
 
-The intent layer uses a saga-based pipeline that breaks the commit into four independently-persisted steps:
+Every commit is the envelope saga. `commit(transfer)` resolves the intent into a
+concrete envelope (read-only), then runs `commit_envelope`, which persists a
+write-ahead `PendingSaga` record and drives three steps. The finalize step calls
+the dumb primitives one by one and interprets each affected-row count.
 
 ```mermaid
 sequenceDiagram
@@ -109,11 +109,11 @@ sequenceDiagram
     participant F as FinalizeStep
     participant S as Store
 
-    C->>L: commit(transfer)
+    C->>L: commit(transfer) → resolve → commit_envelope(envelope)
+    L->>S: save_saga(PendingSaga{envelope, reservation})
     L->>R: execute
-    R->>S: reserve_postings(ids)
-    Note over S: Active → PendingInactive (atomic batch)
-    R-->>L: reserved_postings tracked in LedgerCtx
+    R->>S: reserve_postings(ids, rid) → count
+    Note over R: interpret count (full / partial / zero+read)
 
     L->>V: execute
     V->>S: get_postings, get_accounts, get_postings_by_account
@@ -121,15 +121,17 @@ sequenceDiagram
     V-->>L: Plan stored in LedgerCtx
 
     L->>F: execute
-    F->>S: finalize_postings(deactivate, create)
-    Note over S: PendingInactive → Inactive + insert new
-    F->>S: store_transfer(record)
+    F->>S: deactivate_postings(consumed, rid) → count
+    F->>S: insert_postings(created) → count
+    F->>S: store_transfer(record, involved) → count
+    F->>S: append_event(committed) → count
     F-->>L: Receipt
-
+    L->>S: delete_saga(...)
     L-->>C: Receipt
 ```
 
-On failure, legend compensates completed steps in LIFO order:
+On in-process failure, legend compensates completed steps in LIFO order; a crash
+is handled instead by recovery (below).
 
 ```mermaid
 sequenceDiagram
@@ -149,21 +151,37 @@ sequenceDiagram
 
 Each step is a small, shard-local operation with automatic compensation on failure. This design avoids cross-shard transactions: no single step touches multiple shards atomically.
 
-## Raw Three-Phase Commit
+## Durable Crash Recovery
 
-A lower-level `commit_atomic()` method runs the traditional atomic pipeline in a single pass without reservation. Used internally by `reverse()` and available for callers who need direct control.
+There is no single atomic transaction, so crash-safety comes from a write-ahead
+record plus idempotent roll-forward. `commit_envelope` persists a `PendingSaga
+{envelope, reservation}` via `SagaStore` **before** the saga mutates anything,
+and deletes it once the saga reaches a terminal state.
+
+`Ledger::recover()` (call on startup) re-completes any surviving pending saga. It
+does **not** re-run reserve/validate (those reject already-consumed postings);
+instead it force-completes the envelope through the idempotent primitives with
+the original reservation:
 
 ```mermaid
 graph LR
-    A[load] -->|LoadedState| B[plan]
-    B -->|Plan| C[apply]
-    C -->|Receipt| D[done]
-    style A fill:#e1f5fe
-    style B fill:#fff3e0
-    style C fill:#e8f5e9
+    A[get_transfer?] -->|exists| Z[done]
+    A -->|missing| B[reserve_postings]
+    B --> C[deactivate_postings]
+    C --> D[insert_postings]
+    D --> E[store_transfer]
+    E --> F[append_event]
+    F --> Z
 ```
 
-The three phases can also be called independently: `load()`, `plan()`, `apply()`.
+Because each primitive no-ops what is already done, recovery converges from a
+crash at any point — pre-reserve (postings still Active), reserved
+(PendingInactive), or mid-finalize (already Inactive). It is roll-forward, not
+rollback, so the reservation protocol never leaves orphaned `PendingInactive`
+postings for a separate reconciliation pass to clean up.
+
+`reverse()` builds a reversal envelope and runs the same `commit_envelope` path —
+there is no separate raw/atomic entry point.
 
 ## Content-Addressed Transfers
 
@@ -198,23 +216,35 @@ Conservation boundaries are **per-asset only**. The `book` field on transfers an
 
 Each account has a policy controlling its balance floor and whether it may hold negative postings:
 
-| Policy | Balance floor | Negative postings | CAS guard |
-|--------|--------------|-------------------|-----------|
-| `NoOverdraft` | `>= 0` | No | No |
-| `CappedOverdraft { floor }` | `>= floor` | Yes (down to floor) | Yes |
-| `UncappedOverdraft` | None | Yes (unbounded) | No |
-| `SystemAccount` | None | Yes | No |
-| `ExternalAccount` | None | Yes | No |
+| Policy | Balance floor | Negative postings |
+|--------|--------------|-------------------|
+| `NoOverdraft` | `>= 0` | No |
+| `CappedOverdraft { floor }` | `>= floor` | Yes (down to floor) |
+| `UncappedOverdraft` | None | Yes (unbounded) |
+| `SystemAccount` | None | Yes |
+| `ExternalAccount` | None | Yes |
 
-An overdraft is a **negative posting** assigned to the account to cover a shortfall. Only `NoOverdraft` forbids negative postings; validation rejects a negative posting on a `NoOverdraft` account. `CappedOverdraft`'s floor (enforced in validation, with concurrency protected by CAS guards) bounds the negative balance; the other policies are unbounded.
+An overdraft is a **negative posting** assigned to the account to cover a shortfall. Only `NoOverdraft` forbids negative postings; validation rejects a negative posting on a `NoOverdraft` account. `CappedOverdraft`'s floor (checked in validation) bounds the negative balance; the other policies are unbounded.
 
-## CAS (Compare-And-Swap) Guards for CappedOverdraft
+## The CappedOverdraft Floor Under Concurrency
 
-`CappedOverdraft` accounts have a balance floor that is not backed by the UTXO model alone — two concurrent transfers could each pass validation but together push the balance below the floor (write-skew).
+`CappedOverdraft` accounts have a balance floor that is not backed by the UTXO
+model alone — two concurrent transfers could each pass validation but together
+push the balance below the floor (write-skew).
 
-The validation phase emits `cas_guards: Vec<(AccountId, AssetId, Cent)>` for these accounts. They are enforced atomically inside `commit_transfer`: before mutating any state it recomputes each guarded balance and aborts with a retryable `Conflict` if it changed since validation. The saga pipeline additionally isolates the consumed postings via the reserve step (Active → PendingInactive), stamping each reserved posting with a `ReservationId` so only the reserving saga can finalize or release it.
+Under the dumb-storage model the floor is checked at **validation time** and is
+**best-effort under concurrency**: there is no atomic re-check at commit (the
+earlier `cas_guards`-inside-`commit_transfer` mechanism was removed with the
+atomic boundary). Double-spend safety still holds unconditionally — the
+reservation protocol (`reserve_postings` is a single atomic conditional update,
+so two sagas cannot both claim the same posting) prevents consuming a posting
+twice. What is best-effort is specifically the *floor* on a `CappedOverdraft`
+account when unrelated concurrent activity moves its balance between validation
+and finalize. This tradeoff is recorded in
+[doc/adr/0001-dumb-storage-saga-recovery.md](adr/0001-dumb-storage-saga-recovery.md).
 
-Other policies do not need CAS guards: `NoOverdraft` is fully UTXO-backed (you can only spend postings you own), and unconstrained policies have no floor to violate.
+`NoOverdraft` is fully UTXO-backed (you can only spend postings you own), and the
+unconstrained policies have no floor to violate.
 
 ## No Sequential Hash Chain
 
@@ -243,11 +273,11 @@ Postings follow a three-state lifecycle managed by the saga pipeline:
 
 ```mermaid
 stateDiagram-v2
-    [*] --> Active: created by finalize
+    [*] --> Active: insert_postings
     Active --> PendingInactive: reserve_postings
     PendingInactive --> Active: release_postings (compensation)
-    PendingInactive --> Inactive: finalize_postings
-    Active --> Active: release_postings (no-op)
+    PendingInactive --> Inactive: deactivate_postings(reservation)
+    Active --> Inactive: deactivate_postings(None)
 ```
 
 | State | Available | In balance | Description |

+ 55 - 51
doc/crates.md

@@ -69,7 +69,7 @@ graph TD
 9. **Negative posting restriction** — negative postings forbidden only on `NoOverdraft` (allowed on overdraft/system/external)
 10. **Policy enforcement** — projected balance satisfies account's floor
 
-Output is a `Plan` containing `transfer_id`, `postings_to_deactivate`, `postings_to_create`, and `cas_guards` (Compare-And-Swap guards for concurrency safety).
+Output is a `Plan` containing `transfer_id`, `postings_to_deactivate`, and `postings_to_create`.
 
 ---
 
@@ -82,49 +82,45 @@ Async resource layer. Depends on `kuatia-core`, `tokio`, `async-trait`, `serde`,
 | Module | Purpose |
 |--------|---------|
 | `kuatia` | `Ledger` — primary API (non-generic, uses `Arc<dyn Store>`), saga commit pipeline, intent layer |
-| `store` | `Store` composite trait + sub-traits (`AccountStore`, `PostingStore`, `TransferStore`, `SagaStore`, `EventStore`, `BookStore`, `CommitStore`) |
+| `store` | `Store` composite trait + sub-traits (`AccountStore`, `PostingStore`, `TransferStore`, `SagaStore`, `EventStore`, `BookStore`) |
 | `error` | `StoreError`, `LedgerError` — unified error hierarchy |
 | `mem_store` | `InMemoryStore` — in-memory `Store` implementation for tests |
 | `saga` | Pipeline steps (reserve, validate, finalize) + high-level legend step adapters |
 
 ### Ledger API
 
-#### Saga Commit (default for intent layer)
+#### Commit (the envelope saga)
 
-Driven by a `TransferSaga` defined via `legend!` — four steps with automatic retry and LIFO compensation:
+`commit(transfer)` resolves the intent into an envelope (read-only) then runs the
+`EnvelopeSaga` (defined via `legend!`) — three steps with automatic retry and
+LIFO compensation. Finalize calls the dumb primitives one by one and interprets
+each affected-row count:
 
 ```mermaid
 graph LR
-    A[resolve] -->|Envelope| B[reserve_postings]
-    B -->|batch Active→PendingInactive| C[validate_and_plan]
-    C -->|Plan| D[commit_transfer atomically]
-    D --> E[Receipt]
+    A[resolve] -->|Envelope| W[save PendingSaga]
+    W --> B[reserve_postings]
+    B -->|Active→PendingInactive| C[validate_and_plan]
+    C -->|Plan| D[deactivate → insert → store_transfer → append_event]
+    D --> E[Receipt + delete PendingSaga]
     style E fill:#e8f5e9
 ```
 
-Note: `commit` requires `Arc<Ledger>` (takes `self: &Arc<Self>`).
+Note: `commit`/`commit_envelope`/`reverse`/`recover` require `Arc<Ledger>`.
 
-#### Raw Three-Phase Commit
+#### Crash recovery
 
-```mermaid
-graph LR
-    A["load()"] -->|LoadedState| B["plan()"]
-    B -->|Plan| C["apply()"]
-    C --> D[Receipt]
-    style A fill:#e1f5fe
-    style B fill:#fff3e0
-    style C fill:#e8f5e9
-```
-
-`commit_atomic(transfer)` runs all three in one shot. Used by `reverse()` and available for direct callers.
+`recover()` re-completes any `PendingSaga` left by a crash, pushing the envelope
+through the idempotent primitives (roll-forward). Call it on startup.
 
 #### Convenience
 
 | Method | Description |
 |--------|-------------|
-| `commit(transfer)` | Saga pipeline: resolve → reserve → validate → finalize with retry and compensation (requires `Arc<Ledger>`) |
-| `commit_atomic(transfer)` | Raw atomic pipeline: load → plan → apply (used by `reverse()`) |
-| `reverse(transfer_id)` | Creates compensating transfer that undoes the original |
+| `commit(transfer)` | Resolve intent → `commit_envelope` (requires `Arc<Ledger>`) |
+| `commit_envelope(envelope)` | The one commit path: write-ahead → reserve → validate → finalize (for pre-built/FX envelopes) |
+| `reverse(transfer_id)` | Builds a compensating envelope and runs `commit_envelope` |
+| `recover()` | Force-completes pending sagas after a crash (call on startup) |
 
 #### Intent Layer
 
@@ -162,7 +158,9 @@ Transfers are built via `TransferBuilder` and committed with `ledger.commit(tran
 
 ### Store Trait
 
-The `Store` trait is a composite of seven focused sub-traits:
+The `Store` trait is a composite of focused sub-traits. Every write method is a
+dumb instruction returning the number of affected rows (`u64`); the saga
+interprets the count.
 
 ```mermaid
 graph TB
@@ -172,36 +170,43 @@ graph TB
     Store --> SagaStore
     Store --> EventStore
     Store --> BookStore
-    Store --> CommitStore
 ```
 
 - **`AccountStore`**: `get_account`, `get_accounts`, `create_account`, `append_account_version`, `get_account_history`, `list_accounts`
-- **`PostingStore`**: `get_postings`, `get_postings_by_account(account, asset?, status?)`, `query_postings(query)`, `reserve_postings(ids, reservation)`, `release_postings(ids, reservation)`, `finalize_postings`
-- **`TransferStore`**: `get_transfer`, `store_transfer`, `get_transfers_for_account`, `query_transfers`
-- **`EventStore`**: `append_event`, `get_events_since`
-- **`SagaStore`**: `save_saga`, `list_pending_sagas`, `delete_saga`
+- **`PostingStore`**: `get_postings`, `get_postings_by_account(account, asset?, status?)`, `query_postings(query)`, and the dumb write primitives `reserve_postings(ids, reservation) -> u64`, `release_postings(ids, reservation) -> u64`, `deactivate_postings(ids, reservation?) -> u64`, `insert_postings(postings) -> u64`
+- **`TransferStore`**: `get_transfer`, `store_transfer(record, involved) -> u64`, `get_transfers_for_account`, `query_transfers`
+- **`EventStore`**: `append_event` (idempotent on a per-transfer dedup key), `get_events_since`
+- **`SagaStore`**: `save_saga`, `list_pending_sagas`, `delete_saga` — the write-ahead store the saga and `recover()` use
 - **`BookStore`**: `create_book`, `get_book`, `list_books`
-- **`CommitStore`**: `commit_transfer(req)` — the single atomic commit boundary. It applies posting deactivations/creations, the transfer record, the both-sided account index, and events in one critical section, enforcing `CappedOverdraft` CAS guards and reservation ownership. `reserve_postings`/`release_postings`/`finalize_postings` remain as lower-level primitives; `commit_transfer` is the production commit path.
+
+There is no `CommitStore`/`commit_transfer`: a commit is the saga calling these
+primitives in sequence, each idempotent, with crash-safety from write-ahead
+recovery rather than a single transaction.
 
 #### Batch posting operations
 
-`reserve_postings` and `release_postings` operate on batches with atomic semantics:
+`reserve_postings`/`release_postings`/`deactivate_postings` apply each id's
+conditional update and return how many rows changed (the saga decides what a
+short count means):
 
 ```mermaid
 stateDiagram-v2
-    [*] --> Active: created by finalize
+    [*] --> Active: insert_postings
     Active --> PendingInactive: reserve_postings
     PendingInactive --> Active: release_postings
-    PendingInactive --> Inactive: finalize_postings
-    Active --> Active: release_postings (no-op)
-    note right of Inactive: void — release_postings fails
+    PendingInactive --> Inactive: deactivate_postings(reservation)
+    Active --> Inactive: deactivate_postings(None)
 ```
 
-| Operation | Active | PendingInactive | Inactive |
-|-----------|--------|-----------------|----------|
-| `reserve_postings` | → PendingInactive | **fail** | **fail** |
-| `release_postings` | no-op | → Active | **fail** (void) |
-| `finalize_postings` | → Inactive | → Inactive | — |
+Each cell is the count a primitive returns (1 = flipped, 0 = no-op / not
+applicable). The saga interprets a 0:
+
+| Operation | Active | PendingInactive (this rid) | Inactive |
+|-----------|--------|----------------------------|----------|
+| `reserve_postings(rid)` | → PendingInactive (1) | 0 | 0 |
+| `release_postings(rid)` | 0 | → Active (1) | 0 |
+| `deactivate_postings(Some rid)` | 0 | → Inactive (1) | 0 |
+| `deactivate_postings(None)` | → Inactive (1) | 0 | 0 |
 
 If any posting in the batch fails validation, the entire batch is rejected and no state changes.
 
@@ -228,24 +233,23 @@ LedgerError
 StoreError
 ├── NotFound(String)
 ├── AlreadyExists(String)
-├── VersionConflict { account, expected, actual }
-├── Internal(String)
-├── PostingNotActive(PostingId)   // reserve_postings: posting not Active
-├── PostingInactive(PostingId)    // release_postings: posting is void
-├── Conflict { account, asset }   // commit_transfer: CAS guard balance changed (retryable)
-└── ReservationMismatch(PostingId) // posting reserved by a different saga
+├── VersionConflict { account, expected, actual }  // append_account_version: stale version
+└── Internal(String)
 ```
 
+The store has no semantic write-outcome errors (no "posting not active",
+"reservation mismatch", "cas conflict") — writes return affected-row counts and
+the saga derives meaning from them.
+
 ### Saga Steps
 
-#### Pipeline steps (used internally by `commit`)
+#### Envelope pipeline steps (used internally by `commit_envelope`; resolution runs before the saga)
 
 | Step | Execute | Compensate | Retry |
 |------|---------|------------|-------|
-| `ResolveStep` | Convert Transfer intent into Envelope | No-op | None |
-| `ReservePostingsStep` | Batch reserve `Active → PendingInactive` | Batch release back to `Active` | 3 |
+| `ReservePostingsStep` | `reserve_postings` `Active → PendingInactive`, interpret count | Release back to `Active` | 3 |
 | `ValidateTransferStep` | Load state, `validate_and_plan()` | No-op | None |
-| `FinalizeTransferStep` | Finalize postings, store transfer, emit event | `reverse(transfer_id)` | 3 |
+| `FinalizeTransferStep` | `deactivate_postings` → `insert_postings` → `store_transfer` → `append_event` | `reverse(transfer_id)` | 3 |
 
 #### High-level steps (for custom saga composition with `legend!`)
 

+ 13 - 1
doc/glossary.md

@@ -33,7 +33,19 @@ One or more movements to execute atomically. Built via `TransferBuilder`, commit
 
 ### Envelope
 
-The resolved, concrete form of a transfer: which postings to consume and which to create. Produced internally by the resolve step. Available for direct use via `commit_atomic(envelope)`.
+The resolved, concrete form of a transfer: which postings to consume and which to create. Produced by the resolve step (`commit`), or built directly and committed via `commit_envelope(envelope)`.
+
+### Dumb storage
+
+The design where every `Store` write method applies one update and returns the **number of affected rows** (or an I/O error), never interpreting that count, deciding state, enforcing idempotency, or compensating. The saga reads the count and decides: full = continue; partial = error → compensate; zero = read state and continue only if this same envelope/reservation already applied it.
+
+### Reservation protocol
+
+The concurrency-control mechanism for consumed postings: `reserve_postings` atomically flips `Active → PendingInactive` stamped with a `ReservationId`, so two sagas cannot both claim the same posting. This (not a global transaction) is what prevents double-spend.
+
+### PendingSaga / recovery
+
+A write-ahead record `{envelope, reservation}` persisted via `SagaStore` before a commit mutates anything. `Ledger::recover()` (startup) force-completes any pending saga through the idempotent primitives — roll-forward, converging from a crash at any point.
 
 ### Book
 

+ 13 - 5
doc/transfers.md

@@ -160,15 +160,18 @@ A single transfer can contain multiple movements of different types. All movemen
 Transfer → resolve → Envelope → reserve → validate → finalize → Receipt
 ```
 
-Four-phase pipeline with automatic retry and LIFO compensation on failure. Used by `ledger.commit(transfer)`.
+Resolution is read-only; `commit(transfer)` resolves then runs the envelope saga
+(reserve → validate → finalize) with automatic retry and LIFO compensation.
 
-### Atomic commit
+### Committing a pre-built envelope
 
 ```
-Envelope → load → plan → apply → Receipt
+Envelope → reserve → validate → finalize → Receipt
 ```
 
-Single-pass pipeline without reservation. Used by `ledger.commit_atomic(envelope)` and internally by `reverse()`.
+`ledger.commit_envelope(envelope)` runs the same saga for an envelope you already
+hold (e.g. a hand-built multi-asset/FX envelope, or a reversal). `reverse()` uses
+it. There is no separate single-pass "atomic" path.
 
 ## Reversal
 
@@ -193,6 +196,11 @@ Every envelope passes through `validate_and_plan()` before being applied. The va
 9. Negative postings forbidden only on `NoOverdraft` accounts (allowed on overdraft/system/external)
 10. Policy enforcement: projected balance satisfies account floor
 
-After validation, the effects are applied through a single atomic `commit_transfer` (postings, transfer record, account index, and events commit together or not at all), which also enforces the `CappedOverdraft` CAS guards.
+After validation, the finalize step applies the effects through a sequence of
+dumb, idempotent store primitives (`deactivate_postings` → `insert_postings` →
+`store_transfer` → `append_event`). There is no single transaction; crash-safety
+comes from a write-ahead `PendingSaga` record plus `recover()` roll-forward. The
+`CappedOverdraft` floor is checked in validation (step 10) and is best-effort
+under concurrency — see [architecture.md](architecture.md).
 
 See [architecture.md](architecture.md) for details on each check.