Просмотр исходного кода

Close double-spend races and atomicity gaps in the storage layer

A concurrent saga or raw transfer could double-spend a posting: the SQL store
checked posting status with a SELECT, then mutated with an UPDATE keyed only on
the primary key, so under PostgreSQL READ COMMITTED two transactions could both
pass the read and both apply the write. Reservation, finalize, and lifecycle
paths had matching gaps, and the account snapshot pinned at validation was never
re-checked at commit.

Make the status/reservation precondition the authorization boundary via
conditional UPDATE ... WHERE + rows_affected checks, matching the InMemory
reference. Remove the split finalize_postings/store_transfer write APIs so all
posting mutation flows through the atomic commit_transfer (which also fixes the
created-only transfer indexing). Propagate lifecycle event-append errors, reject
close on any non-Inactive posting, and re-check account versions atomically at
commit via new account_guards on the plan and commit request.
Cesar Rodas 6 дней назад
Родитель
Сommit
93e35fed20

+ 9 - 0
crates/kuatia-core/src/validate.rs

@@ -45,6 +45,10 @@ pub struct Plan {
     pub postings_to_create: Vec<Posting>,
     pub postings_to_create: Vec<Posting>,
     /// CAS guards for CappedOverdraft accounts: (account, asset, expected_balance).
     /// CAS guards for CappedOverdraft accounts: (account, asset, expected_balance).
     pub cas_guards: Vec<(AccountId, AssetId, Cent)>,
     pub cas_guards: Vec<(AccountId, AssetId, Cent)>,
+    /// Account version guards: (account, expected_version). Each pinned account
+    /// snapshot is re-checked atomically at commit so a concurrent
+    /// freeze/unfreeze/close (which bump `version`) aborts the transfer.
+    pub account_guards: Vec<(AccountId, u64)>,
 }
 }
 
 
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
@@ -280,6 +284,9 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
     }
     }
 
 
     // 5b. Snapshot pinning: each account_snapshot must match current state.
     // 5b. Snapshot pinning: each account_snapshot must match current state.
+    //     Emit a version guard per pinned account so the commit boundary can
+    //     re-check atomically against concurrent lifecycle mutations.
+    let mut account_guards: Vec<(AccountId, u64)> = Vec::new();
     for snap in envelope.account_snapshots() {
     for snap in envelope.account_snapshots() {
         let account = input
         let account = input
             .accounts
             .accounts
@@ -293,6 +300,7 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
                 actual,
                 actual,
             });
             });
         }
         }
+        account_guards.push((snap.account, account.version));
     }
     }
 
 
     // 5c. Book policy: gate which assets and accounts may participate. Enforced
     // 5c. Book policy: gate which assets and accounts may participate. Enforced
@@ -475,6 +483,7 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
         postings_to_deactivate,
         postings_to_deactivate,
         postings_to_create,
         postings_to_create,
         cas_guards,
         cas_guards,
+        account_guards,
     })
     })
 }
 }
 
 

+ 72 - 117
crates/kuatia-storage-sql/src/lib.rs

@@ -436,7 +436,11 @@ impl PostingStore for SqlStore {
         ids: &[PostingId],
         ids: &[PostingId],
         reservation: ReservationId,
         reservation: ReservationId,
     ) -> Result<(), StoreError> {
     ) -> Result<(), StoreError> {
-        // Validate all Active first, then update in a transaction.
+        // Conditional update: each posting transitions Active → PendingInactive
+        // only if it is still Active. The status precondition lives in the WHERE
+        // clause so a concurrent transition cannot slip between a check and the
+        // write; `rows_affected() == 1` is the authorization. A missing row and a
+        // non-Active row are indistinguishable here — both are "not reservable".
         let mut tx = self
         let mut tx = self
             .pool
             .pool
             .begin()
             .begin()
@@ -444,33 +448,20 @@ impl PostingStore for SqlStore {
             .map_err(|e| StoreError::Internal(e.to_string()))?;
             .map_err(|e| StoreError::Internal(e.to_string()))?;
 
 
         for id in ids {
         for id in ids {
-            let row =
-                sqlx::query("SELECT status FROM postings WHERE transfer_id = $1 AND idx = $2")
-                    .bind(id.transfer.0.as_slice())
-                    .bind(id.index as i16)
-                    .fetch_optional(&mut *tx)
-                    .await
-                    .map_err(|e| StoreError::Internal(e.to_string()))?
-                    .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
-            let status: i16 = row
-                .try_get("status")
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            if status != 0 {
-                return Err(StoreError::PostingNotActive(*id));
-            }
-        }
-
-        for id in ids {
-            sqlx::query(
-                "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4",
+            let res = sqlx::query(
+                "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4 AND status = $5",
             )
             )
             .bind(status_to_i16(PostingStatus::PendingInactive))
             .bind(status_to_i16(PostingStatus::PendingInactive))
             .bind(reservation.0)
             .bind(reservation.0)
             .bind(id.transfer.0.as_slice())
             .bind(id.transfer.0.as_slice())
             .bind(id.index as i16)
             .bind(id.index as i16)
+            .bind(status_to_i16(PostingStatus::Active))
             .execute(&mut *tx)
             .execute(&mut *tx)
             .await
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
             .map_err(|e| StoreError::Internal(e.to_string()))?;
+            if res.rows_affected() != 1 {
+                return Err(StoreError::PostingNotActive(*id));
+            }
         }
         }
 
 
         tx.commit()
         tx.commit()
@@ -490,6 +481,9 @@ impl PostingStore for SqlStore {
             .await
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
             .map_err(|e| StoreError::Internal(e.to_string()))?;
 
 
+        // Classify then conditionally release in one pass. Releasing an Active
+        // posting is a no-op (matches InMemory); releasing a reserved one must
+        // affect exactly one row, otherwise it changed under us concurrently.
         for id in ids {
         for id in ids {
             let row = sqlx::query(
             let row = sqlx::query(
                 "SELECT status, reservation FROM postings WHERE transfer_id = $1 AND idx = $2",
                 "SELECT status, reservation FROM postings WHERE transfer_id = $1 AND idx = $2",
@@ -512,69 +506,29 @@ impl PostingStore for SqlStore {
             if status == 1 && owner != Some(reservation.0) {
             if status == 1 && owner != Some(reservation.0) {
                 return Err(StoreError::ReservationMismatch(*id));
                 return Err(StoreError::ReservationMismatch(*id));
             }
             }
-        }
 
 
-        for id in ids {
-            sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
+            let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
                 .bind(status_to_i16(PostingStatus::Active))
                 .bind(status_to_i16(PostingStatus::Active))
                 .bind(id.transfer.0.as_slice())
                 .bind(id.transfer.0.as_slice())
                 .bind(id.index as i16)
                 .bind(id.index as i16)
                 .bind(status_to_i16(PostingStatus::PendingInactive))
                 .bind(status_to_i16(PostingStatus::PendingInactive))
+                .bind(reservation.0)
                 .execute(&mut *tx)
                 .execute(&mut *tx)
                 .await
                 .await
                 .map_err(|e| StoreError::Internal(e.to_string()))?;
                 .map_err(|e| StoreError::Internal(e.to_string()))?;
-        }
-
-        tx.commit()
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-        Ok(())
-    }
-
-    async fn finalize_postings(
-        &self,
-        deactivate: &[PostingId],
-        create: &[Posting],
-    ) -> Result<(), StoreError> {
-        let mut tx = self
-            .pool
-            .begin()
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-
-        for id in deactivate {
-            let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3")
-                .bind(status_to_i16(PostingStatus::Inactive))
-                .bind(id.transfer.0.as_slice())
-                .bind(id.index as i16)
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-            if res.rows_affected() == 0 {
-                return Err(StoreError::NotFound(format!("posting {id:?}")));
+            // status == 1 was reserved by us → the row must flip; status == 0
+            // (Active) is a no-op with zero rows affected.
+            if status == 1 && res.rows_affected() != 1 {
+                return Err(StoreError::ReservationMismatch(*id));
             }
             }
         }
         }
 
 
-        for posting in create {
-            sqlx::query(
-                "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6)"
-            )
-                .bind(posting.id.transfer.0.as_slice())
-                .bind(posting.id.index as i16)
-                .bind(posting.owner.0)
-                .bind(posting.asset.0 as i32)
-                .bind(posting.value.value())
-                .bind(status_to_i16(posting.status))
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-        }
-
         tx.commit()
         tx.commit()
             .await
             .await
             .map_err(|e| StoreError::Internal(e.to_string()))?;
             .map_err(|e| StoreError::Internal(e.to_string()))?;
         Ok(())
         Ok(())
     }
     }
+
 }
 }
 
 
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
@@ -611,48 +565,6 @@ impl TransferStore for SqlStore {
         }
         }
     }
     }
 
 
-    async fn store_transfer(&self, record: EnvelopeRecord) -> Result<(), StoreError> {
-        let tid = record.receipt.transfer_id;
-        let transfer_bytes = serialize_blob(&record.envelope)?;
-        let receipt_bytes = serialize_blob(&record.receipt)?;
-
-        let mut tx = self
-            .pool
-            .begin()
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-
-        sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5)")
-            .bind(tid.0.as_slice())
-            .bind(&transfer_bytes)
-            .bind(&receipt_bytes)
-            .bind(record.created_at)
-            .bind(record.envelope.book().0)
-            .execute(&mut *tx)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-
-        // Populate transfer_accounts join table
-        let mut account_ids: HashSet<i64> = HashSet::new();
-        for np in record.envelope.creates() {
-            account_ids.insert(np.owner.0);
-        }
-
-        for aid in &account_ids {
-            sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2)")
-                .bind(tid.0.as_slice())
-                .bind(*aid)
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
-        }
-
-        tx.commit()
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
-        Ok(())
-    }
-
     async fn get_transfers_for_account(
     async fn get_transfers_for_account(
         &self,
         &self,
         account: &AccountId,
         account: &AccountId,
@@ -962,6 +874,29 @@ impl CommitStore for SqlStore {
             }
             }
         }
         }
 
 
+        // 2b. Account version guards — re-check each pinned account version in the
+        //     same transaction so a concurrent freeze/unfreeze/close (which bumps
+        //     version) aborts the commit instead of racing past validation.
+        for (account, expected) in req.account_guards {
+            let actual: i64 = sqlx::query(
+                "SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1",
+            )
+            .bind(account.0)
+            .fetch_optional(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?
+            .ok_or(StoreError::NotFound(format!("account {account:?}")))?
+            .try_get("version")
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+            if actual as u64 != *expected {
+                return Err(StoreError::VersionConflict {
+                    account: *account,
+                    expected: *expected,
+                    actual: actual as u64,
+                });
+            }
+        }
+
         // 3. Authorize consumed postings and collect their owners.
         // 3. Authorize consumed postings and collect their owners.
         let mut account_ids: HashSet<i64> = HashSet::new();
         let mut account_ids: HashSet<i64> = HashSet::new();
         for pid in req.deactivate {
         for pid in req.deactivate {
@@ -1000,15 +935,35 @@ impl CommitStore for SqlStore {
             account_ids.insert(owner);
             account_ids.insert(owner);
         }
         }
 
 
-        // 4. Deactivate consumed postings, asserting each affects exactly one row.
+        // 4. Deactivate consumed postings. The authorization precondition lives in
+        //    the WHERE clause (not just the step-3 read) so two concurrent commits
+        //    cannot both deactivate the same posting under READ COMMITTED. The raw
+        //    path requires it still Active; the saga path requires it still
+        //    PendingInactive owned by this reservation. `rows_affected() == 1` is
+        //    the authorization.
         for pid in req.deactivate {
         for pid in req.deactivate {
-            let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3")
-                .bind(status_to_i16(PostingStatus::Inactive))
-                .bind(pid.transfer.0.as_slice())
-                .bind(pid.index as i16)
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            let res = match req.reservation {
+                None => {
+                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
+                        .bind(status_to_i16(PostingStatus::Inactive))
+                        .bind(pid.transfer.0.as_slice())
+                        .bind(pid.index as i16)
+                        .bind(status_to_i16(PostingStatus::Active))
+                        .execute(&mut *tx)
+                        .await
+                }
+                Some(rid) => {
+                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
+                        .bind(status_to_i16(PostingStatus::Inactive))
+                        .bind(pid.transfer.0.as_slice())
+                        .bind(pid.index as i16)
+                        .bind(status_to_i16(PostingStatus::PendingInactive))
+                        .bind(rid.0)
+                        .execute(&mut *tx)
+                        .await
+                }
+            }
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
             if res.rows_affected() != 1 {
             if res.rows_affected() != 1 {
                 return Err(StoreError::ReservationMismatch(*pid));
                 return Err(StoreError::ReservationMismatch(*pid));
             }
             }

+ 22 - 26
crates/kuatia-storage/src/mem_store.rs

@@ -218,24 +218,6 @@ impl PostingStore for InMemoryStore {
         Ok(())
         Ok(())
     }
     }
 
 
-    async fn finalize_postings(
-        &self,
-        deactivate: &[PostingId],
-        create: &[Posting],
-    ) -> Result<(), StoreError> {
-        let mut postings = self.postings.write().await;
-        for pid in deactivate {
-            let p = postings
-                .get_mut(pid)
-                .ok_or_else(|| StoreError::NotFound(format!("posting {pid:?}")))?;
-            p.status = PostingStatus::Inactive;
-            p.reservation = None;
-        }
-        for posting in create {
-            postings.insert(posting.id, posting.clone());
-        }
-        Ok(())
-    }
 }
 }
 
 
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
@@ -249,12 +231,6 @@ impl TransferStore for InMemoryStore {
         Ok(transfers.get(id).cloned())
         Ok(transfers.get(id).cloned())
     }
     }
 
 
-    async fn store_transfer(&self, record: EnvelopeRecord) -> Result<(), StoreError> {
-        let mut transfers = self.transfers.write().await;
-        transfers.insert(record.receipt.transfer_id, record);
-        Ok(())
-    }
-
     async fn get_transfers_for_account(
     async fn get_transfers_for_account(
         &self,
         &self,
         account: &AccountId,
         account: &AccountId,
@@ -376,8 +352,11 @@ impl BookStore for InMemoryStore {
 #[async_trait]
 #[async_trait]
 impl CommitStore for InMemoryStore {
 impl CommitStore for InMemoryStore {
     async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError> {
     async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError> {
-        // Lock order postings → transfers → events; every reader that takes more
-        // than one of these must follow the same order.
+        // Lock order accounts → postings → transfers → events; every reader that
+        // takes more than one of these must follow the same order. Holding the
+        // accounts read lock for the whole commit keeps the version guard (step
+        // 2b) atomic against a concurrent lifecycle mutation.
+        let accounts = self.accounts.read().await;
         let mut postings = self.postings.write().await;
         let mut postings = self.postings.write().await;
         let mut transfers = self.transfers.write().await;
         let mut transfers = self.transfers.write().await;
         let mut events = self.events.write().await;
         let mut events = self.events.write().await;
@@ -411,6 +390,23 @@ impl CommitStore for InMemoryStore {
             }
             }
         }
         }
 
 
+        // 2b. Account version guards — a concurrent freeze/unfreeze/close bumps
+        //     the version, invalidating the snapshot pinned at validation.
+        for (account, expected) in req.account_guards {
+            let actual = accounts
+                .get(account)
+                .and_then(|versions| versions.last())
+                .map(|a| a.version)
+                .ok_or(StoreError::NotFound(format!("account {account:?}")))?;
+            if actual != *expected {
+                return Err(StoreError::VersionConflict {
+                    account: *account,
+                    expected: *expected,
+                    actual,
+                });
+            }
+        }
+
         // 3. Authorize every consumed posting against the reservation.
         // 3. Authorize every consumed posting against the reservation.
         for pid in req.deactivate {
         for pid in req.deactivate {
             let posting = postings
             let posting = postings

+ 4 - 8
crates/kuatia-storage/src/store.rs

@@ -40,6 +40,10 @@ pub struct CommitRequest<'a> {
     /// `(account, asset, expected_balance)` guards to verify before mutating —
     /// `(account, asset, expected_balance)` guards to verify before mutating —
     /// a mismatch means a concurrent transfer moved the balance ([`StoreError::Conflict`]).
     /// a mismatch means a concurrent transfer moved the balance ([`StoreError::Conflict`]).
     pub cas_guards: &'a [(AccountId, AssetId, Cent)],
     pub cas_guards: &'a [(AccountId, AssetId, Cent)],
+    /// `(account, expected_version)` guards re-checked atomically at commit — a
+    /// mismatch means a concurrent lifecycle mutation (freeze/unfreeze/close)
+    /// bumped the account version after validation ([`StoreError::VersionConflict`]).
+    pub account_guards: &'a [(AccountId, u64)],
     /// Reservation authorizing consumption of `deactivate`.
     /// Reservation authorizing consumption of `deactivate`.
     /// - `None` — raw path: the postings must be `Active`.
     /// - `None` — raw path: the postings must be `Active`.
     /// - `Some(rid)` — saga path: the postings must be `PendingInactive` owned by `rid`.
     /// - `Some(rid)` — saga path: the postings must be `PendingInactive` owned by `rid`.
@@ -142,12 +146,6 @@ pub trait PostingStore: Send + Sync {
         ids: &[PostingId],
         ids: &[PostingId],
         reservation: ReservationId,
         reservation: ReservationId,
     ) -> Result<(), StoreError>;
     ) -> Result<(), StoreError>;
-    /// Deactivate postings and insert newly created postings.
-    async fn finalize_postings(
-        &self,
-        deactivate: &[PostingId],
-        create: &[Posting],
-    ) -> Result<(), StoreError>;
 
 
     /// Query postings with filtering and pagination.
     /// Query postings with filtering and pagination.
     async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
     async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
@@ -171,8 +169,6 @@ pub trait PostingStore: Send + Sync {
 pub trait TransferStore: Send + Sync {
 pub trait TransferStore: Send + Sync {
     /// Fetch a transfer record by its content-addressed id.
     /// Fetch a transfer record by its content-addressed id.
     async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError>;
     async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError>;
-    /// Persist a committed transfer and its receipt.
-    async fn store_transfer(&self, record: EnvelopeRecord) -> Result<(), StoreError>;
     /// Return all transfers involving the given account.
     /// Return all transfers involving the given account.
     async fn get_transfers_for_account(
     async fn get_transfers_for_account(
         &self,
         &self,

+ 231 - 119
crates/kuatia-storage/src/store_tests.rs

@@ -96,6 +96,88 @@ fn make_envelope() -> (Envelope, EnvelopeId) {
     (t, tid)
     (t, tid)
 }
 }
 
 
+/// Seed `create` as Active postings. Since the split write APIs are gone,
+/// `commit_transfer` is the only mutation path; we wrap it in a throwaway
+/// transfer whose envelope mirrors the seeded postings so both the SQL index
+/// (`transfer_accounts`, built from `req.create`) and the InMemory index (built
+/// from the envelope) stay consistent. `tag` keeps the seed transfer id unique
+/// within a test so idempotency doesn't swallow the insert.
+async fn seed_active(store: &(impl Store + 'static), tag: u8, create: &[Posting]) {
+    let creates: Vec<NewPosting> = create
+        .iter()
+        .map(|p| NewPosting {
+            owner: p.owner,
+            asset: p.asset,
+            value: p.value,
+            payer: None,
+        })
+        .collect();
+    let envelope = EnvelopeBuilder::new().creates(creates).build();
+    let mut tid_bytes = [0u8; 32];
+    tid_bytes[0] = tag;
+    let tid = EnvelopeId(tid_bytes);
+    store
+        .commit_transfer(CommitRequest {
+            deactivate: &[],
+            create,
+            cas_guards: &[],
+            account_guards: &[],
+            reservation: None,
+            record: EnvelopeRecord {
+                envelope,
+                receipt: Receipt { transfer_id: tid },
+                created_at: 0,
+            },
+            events: &[],
+        })
+        .await
+        .unwrap();
+}
+
+/// Persist `envelope` as a committed transfer, deriving its created postings the
+/// way the ledger does (`PostingId { transfer: tid, index }`). The faithful
+/// replacement for the removed `store_transfer` — it populates the account index
+/// on both backends.
+async fn commit_envelope(
+    store: &(impl Store + 'static),
+    envelope: Envelope,
+    tid: EnvelopeId,
+    created_at: i64,
+) {
+    let create: Vec<Posting> = envelope
+        .creates()
+        .iter()
+        .enumerate()
+        .map(|(i, np)| {
+            Posting::new(
+                PostingId {
+                    transfer: tid,
+                    index: i as u16,
+                },
+                np.owner,
+                np.asset,
+                np.value,
+            )
+        })
+        .collect();
+    store
+        .commit_transfer(CommitRequest {
+            deactivate: &[],
+            create: &create,
+            cas_guards: &[],
+            account_guards: &[],
+            reservation: None,
+            record: EnvelopeRecord {
+                envelope,
+                receipt: Receipt { transfer_id: tid },
+                created_at,
+            },
+            events: &[],
+        })
+        .await
+        .unwrap();
+}
+
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
 // AccountStore tests
 // AccountStore tests
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
@@ -199,13 +281,10 @@ pub async fn list_accounts(store: &(impl Store + 'static)) {
 // PostingStore tests
 // PostingStore tests
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
 
 
-/// Finalize with empty deactivate creates new postings.
-pub async fn finalize_creates_postings(store: &(impl Store + 'static)) {
+/// Committing with empty deactivate creates new postings.
+pub async fn commit_creates_postings(store: &(impl Store + 'static)) {
     let p = make_posting([1; 32], 0, 1, 1, 100);
     let p = make_posting([1; 32], 0, 1, 1, 100);
-    store
-        .finalize_postings(&[], std::slice::from_ref(&p))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&p)).await;
 
 
     let got = store.get_postings(&[p.id]).await.unwrap();
     let got = store.get_postings(&[p.id]).await.unwrap();
     assert_eq!(got.len(), 1);
     assert_eq!(got.len(), 1);
@@ -227,7 +306,7 @@ pub async fn get_postings_by_account_filters(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p2 = make_posting([1; 32], 1, 1, 2, 200);
     let p2 = make_posting([1; 32], 1, 1, 2, 200);
     let p3 = make_posting([1; 32], 2, 2, 1, 300);
     let p3 = make_posting([1; 32], 2, 2, 1, 300);
-    store.finalize_postings(&[], &[p1, p2, p3]).await.unwrap();
+    seed_active(store, 200, &[p1, p2, p3]).await;
 
 
     let all = store
     let all = store
         .get_postings_by_account(&AccountId::new(1), None, None)
         .get_postings_by_account(&AccountId::new(1), None, None)
@@ -255,7 +334,7 @@ pub async fn query_postings_pagination(store: &(impl Store + 'static)) {
     let postings: Vec<Posting> = (0..5)
     let postings: Vec<Posting> = (0..5)
         .map(|i| make_posting([1; 32], i, 1, 1, (i as i64 + 1) * 100))
         .map(|i| make_posting([1; 32], i, 1, 1, (i as i64 + 1) * 100))
         .collect();
         .collect();
-    store.finalize_postings(&[], &postings).await.unwrap();
+    seed_active(store, 200, &postings).await;
 
 
     // Page 1: first 2
     // Page 1: first 2
     let page1 = store
     let page1 = store
@@ -318,10 +397,7 @@ pub async fn query_postings_pagination(store: &(impl Store + 'static)) {
 pub async fn reserve_postings_batch(store: &(impl Store + 'static)) {
 pub async fn reserve_postings_batch(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p2 = make_posting([1; 32], 1, 1, 1, 200);
     let p2 = make_posting([1; 32], 1, 1, 1, 200);
-    store
-        .finalize_postings(&[], &[p1.clone(), p2.clone()])
-        .await
-        .unwrap();
+    seed_active(store, 200, &[p1.clone(), p2.clone()]).await;
 
 
     store.reserve_postings(&[p1.id, p2.id], ReservationId::new(1)).await.unwrap();
     store.reserve_postings(&[p1.id, p2.id], ReservationId::new(1)).await.unwrap();
 
 
@@ -336,10 +412,7 @@ pub async fn reserve_postings_batch(store: &(impl Store + 'static)) {
 pub async fn reserve_non_active_fails(store: &(impl Store + 'static)) {
 pub async fn reserve_non_active_fails(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p2 = make_posting([1; 32], 1, 1, 1, 200);
     let p2 = make_posting([1; 32], 1, 1, 1, 200);
-    store
-        .finalize_postings(&[], &[p1.clone(), p2.clone()])
-        .await
-        .unwrap();
+    seed_active(store, 200, &[p1.clone(), p2.clone()]).await;
 
 
     store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
     store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
 
@@ -353,10 +426,7 @@ pub async fn reserve_non_active_fails(store: &(impl Store + 'static)) {
 /// Release reserved postings back to Active.
 /// Release reserved postings back to Active.
 pub async fn release_postings_batch(store: &(impl Store + 'static)) {
 pub async fn release_postings_batch(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
-    store
-        .finalize_postings(&[], std::slice::from_ref(&p1))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&p1)).await;
     store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
     store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
 
     store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
     store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
@@ -368,10 +438,7 @@ pub async fn release_postings_batch(store: &(impl Store + 'static)) {
 /// Releasing an Active posting is a no-op (succeeds silently).
 /// Releasing an Active posting is a no-op (succeeds silently).
 pub async fn release_active_is_noop(store: &(impl Store + 'static)) {
 pub async fn release_active_is_noop(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
-    store
-        .finalize_postings(&[], std::slice::from_ref(&p1))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&p1)).await;
 
 
     store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
     store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
 
@@ -382,29 +449,45 @@ pub async fn release_active_is_noop(store: &(impl Store + 'static)) {
 /// Releasing an Inactive (void) posting fails.
 /// Releasing an Inactive (void) posting fails.
 pub async fn release_inactive_fails(store: &(impl Store + 'static)) {
 pub async fn release_inactive_fails(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
+    seed_active(store, 200, std::slice::from_ref(&p1)).await;
+
+    // Deactivate p1 (raw path: still Active) so the release sees a void posting.
     store
     store
-        .finalize_postings(&[], std::slice::from_ref(&p1))
+        .commit_transfer(CommitRequest {
+            deactivate: &[p1.id],
+            create: &[],
+            cas_guards: &[],
+            account_guards: &[],
+            reservation: None,
+            record: commit_record(EnvelopeId([3; 32]), vec![p1.id]),
+            events: &[],
+        })
         .await
         .await
         .unwrap();
         .unwrap();
 
 
-    store.finalize_postings(&[p1.id], &[]).await.unwrap();
-
     let err = store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap_err();
     let err = store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap_err();
     assert!(matches!(err, StoreError::PostingInactive(_)));
     assert!(matches!(err, StoreError::PostingInactive(_)));
 }
 }
 
 
-/// Finalize transitions PendingInactive → Inactive.
-pub async fn finalize_deactivates_postings(store: &(impl Store + 'static)) {
+/// Committing a reserved posting transitions it PendingInactive → Inactive while
+/// inserting the newly created posting.
+pub async fn commit_deactivates_postings(store: &(impl Store + 'static)) {
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
     let p1 = make_posting([1; 32], 0, 1, 1, 100);
-    store
-        .finalize_postings(&[], std::slice::from_ref(&p1))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&p1)).await;
     store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
     store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
 
     let p2 = make_posting([2; 32], 0, 1, 1, 100);
     let p2 = make_posting([2; 32], 0, 1, 1, 100);
+    // Saga path: p1 is PendingInactive owned by reservation 1.
     store
     store
-        .finalize_postings(&[p1.id], std::slice::from_ref(&p2))
+        .commit_transfer(CommitRequest {
+            deactivate: &[p1.id],
+            create: std::slice::from_ref(&p2),
+            cas_guards: &[],
+            account_guards: &[],
+            reservation: Some(ReservationId::new(1)),
+            record: commit_record(EnvelopeId([2; 32]), vec![p1.id]),
+            events: &[],
+        })
         .await
         .await
         .unwrap();
         .unwrap();
 
 
@@ -442,10 +525,7 @@ fn commit_record(tid: EnvelopeId, consumes: Vec<PostingId>) -> EnvelopeRecord {
 /// events atomically; the consumed-only owner is indexed for history.
 /// events atomically; the consumed-only owner is indexed for history.
 pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
 pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
     let consumed = make_posting([7; 32], 0, 1, 1, 100); // owned by account 1
     let consumed = make_posting([7; 32], 0, 1, 1, 100); // owned by account 1
-    store
-        .finalize_postings(&[], std::slice::from_ref(&consumed))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
 
 
     let created = make_posting([8; 32], 0, 2, 1, 100); // owned by account 2
     let created = make_posting([8; 32], 0, 2, 1, 100); // owned by account 2
     let tid = EnvelopeId([8; 32]);
     let tid = EnvelopeId([8; 32]);
@@ -459,6 +539,7 @@ pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
             deactivate: &[consumed.id],
             deactivate: &[consumed.id],
             create: std::slice::from_ref(&created),
             create: std::slice::from_ref(&created),
             cas_guards: &[],
             cas_guards: &[],
+            account_guards: &[],
             reservation: None,
             reservation: None,
             record: commit_record(tid, vec![consumed.id]),
             record: commit_record(tid, vec![consumed.id]),
             events: &events,
             events: &events,
@@ -480,6 +561,8 @@ pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
     assert!(store.get_transfer(&tid).await.unwrap().is_some());
     assert!(store.get_transfer(&tid).await.unwrap().is_some());
 
 
     // History indexes BOTH the created owner (2) and the consumed-only owner (1).
     // History indexes BOTH the created owner (2) and the consumed-only owner (1).
+    // Account 2 appears only in this transfer; account 1 appears here and in the
+    // seed transfer that funded it, so its history contains this transfer.
     assert_eq!(
     assert_eq!(
         store
         store
             .get_transfers_for_account(&AccountId::new(2))
             .get_transfers_for_account(&AccountId::new(2))
@@ -488,13 +571,13 @@ pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
             .len(),
             .len(),
         1
         1
     );
     );
-    assert_eq!(
+    assert!(
         store
         store
             .get_transfers_for_account(&AccountId::new(1))
             .get_transfers_for_account(&AccountId::new(1))
             .await
             .await
             .unwrap()
             .unwrap()
-            .len(),
-        1
+            .iter()
+            .any(|r| r.receipt.transfer_id == tid)
     );
     );
 
 
     // The event was appended in the same commit.
     // The event was appended in the same commit.
@@ -504,10 +587,7 @@ pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
 /// A second commit of the same transfer id is a no-op (idempotent).
 /// A second commit of the same transfer id is a no-op (idempotent).
 pub async fn commit_transfer_idempotent(store: &(impl Store + 'static)) {
 pub async fn commit_transfer_idempotent(store: &(impl Store + 'static)) {
     let consumed = make_posting([7; 32], 0, 1, 1, 100);
     let consumed = make_posting([7; 32], 0, 1, 1, 100);
-    store
-        .finalize_postings(&[], std::slice::from_ref(&consumed))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
     let created = make_posting([8; 32], 0, 2, 1, 100);
     let created = make_posting([8; 32], 0, 2, 1, 100);
     let tid = EnvelopeId([8; 32]);
     let tid = EnvelopeId([8; 32]);
     store
     store
@@ -515,6 +595,7 @@ pub async fn commit_transfer_idempotent(store: &(impl Store + 'static)) {
             deactivate: &[],
             deactivate: &[],
             create: std::slice::from_ref(&created),
             create: std::slice::from_ref(&created),
             cas_guards: &[],
             cas_guards: &[],
+            account_guards: &[],
             reservation: None,
             reservation: None,
             record: commit_record(tid, vec![]),
             record: commit_record(tid, vec![]),
             events: &[],
             events: &[],
@@ -527,6 +608,7 @@ pub async fn commit_transfer_idempotent(store: &(impl Store + 'static)) {
             deactivate: &[],
             deactivate: &[],
             create: std::slice::from_ref(&created),
             create: std::slice::from_ref(&created),
             cas_guards: &[],
             cas_guards: &[],
+            account_guards: &[],
             reservation: None,
             reservation: None,
             record: commit_record(tid, vec![]),
             record: commit_record(tid, vec![]),
             events: &[],
             events: &[],
@@ -539,10 +621,7 @@ pub async fn commit_transfer_idempotent(store: &(impl Store + 'static)) {
 /// commit_transfer rejects consuming a posting reserved by a different saga.
 /// commit_transfer rejects consuming a posting reserved by a different saga.
 pub async fn commit_transfer_reservation_mismatch(store: &(impl Store + 'static)) {
 pub async fn commit_transfer_reservation_mismatch(store: &(impl Store + 'static)) {
     let consumed = make_posting([7; 32], 0, 1, 1, 100);
     let consumed = make_posting([7; 32], 0, 1, 1, 100);
-    store
-        .finalize_postings(&[], std::slice::from_ref(&consumed))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
     // Reserved under reservation 1.
     // Reserved under reservation 1.
     store
     store
         .reserve_postings(&[consumed.id], ReservationId::new(1))
         .reserve_postings(&[consumed.id], ReservationId::new(1))
@@ -557,6 +636,7 @@ pub async fn commit_transfer_reservation_mismatch(store: &(impl Store + 'static)
             deactivate: &[consumed.id],
             deactivate: &[consumed.id],
             create: std::slice::from_ref(&created),
             create: std::slice::from_ref(&created),
             cas_guards: &[],
             cas_guards: &[],
+            account_guards: &[],
             reservation: Some(ReservationId::new(2)),
             reservation: Some(ReservationId::new(2)),
             record: commit_record(tid, vec![consumed.id]),
             record: commit_record(tid, vec![consumed.id]),
             events: &[],
             events: &[],
@@ -569,10 +649,7 @@ pub async fn commit_transfer_reservation_mismatch(store: &(impl Store + 'static)
 /// commit_transfer aborts with Conflict when a CAS guard's balance is stale.
 /// commit_transfer aborts with Conflict when a CAS guard's balance is stale.
 pub async fn commit_transfer_cas_conflict(store: &(impl Store + 'static)) {
 pub async fn commit_transfer_cas_conflict(store: &(impl Store + 'static)) {
     let consumed = make_posting([7; 32], 0, 1, 1, 100);
     let consumed = make_posting([7; 32], 0, 1, 1, 100);
-    store
-        .finalize_postings(&[], std::slice::from_ref(&consumed))
-        .await
-        .unwrap();
+    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
     let created = make_posting([8; 32], 0, 2, 1, 100);
     let created = make_posting([8; 32], 0, 2, 1, 100);
     let tid = EnvelopeId([8; 32]);
     let tid = EnvelopeId([8; 32]);
     // Guard claims account 1 holds 50, but it actually holds 100.
     // Guard claims account 1 holds 50, but it actually holds 100.
@@ -581,6 +658,7 @@ pub async fn commit_transfer_cas_conflict(store: &(impl Store + 'static)) {
             deactivate: &[consumed.id],
             deactivate: &[consumed.id],
             create: std::slice::from_ref(&created),
             create: std::slice::from_ref(&created),
             cas_guards: &[(AccountId::new(1), AssetId::new(1), Cent::from(50))],
             cas_guards: &[(AccountId::new(1), AssetId::new(1), Cent::from(50))],
+            account_guards: &[],
             reservation: None,
             reservation: None,
             record: commit_record(tid, vec![consumed.id]),
             record: commit_record(tid, vec![consumed.id]),
             events: &[],
             events: &[],
@@ -593,18 +671,94 @@ pub async fn commit_transfer_cas_conflict(store: &(impl Store + 'static)) {
 }
 }
 
 
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
+// Race regressions — the conditional-update / guard fixes. Expressed
+// sequentially (the conformance harness holds a single `&store`); the second
+// attempt is what must fail.
+// ---------------------------------------------------------------------------
+
+/// Reserving an already-reserved posting fails — no two reservations can own it.
+pub async fn reserve_twice_second_fails(store: &(impl Store + 'static)) {
+    let p1 = make_posting([1; 32], 0, 1, 1, 100);
+    seed_active(store, 200, std::slice::from_ref(&p1)).await;
+
+    store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
+    let err = store
+        .reserve_postings(&[p1.id], ReservationId::new(2))
+        .await
+        .unwrap_err();
+    assert!(matches!(err, StoreError::PostingNotActive(_)));
+}
+
+/// A posting cannot be consumed twice: once committed (Inactive), a second raw
+/// commit consuming it is rejected — the double-spend guard.
+pub async fn commit_double_spend_second_fails(store: &(impl Store + 'static)) {
+    let consumed = make_posting([7; 32], 0, 1, 1, 100);
+    seed_active(store, 200, std::slice::from_ref(&consumed)).await;
+
+    let created1 = make_posting([8; 32], 0, 2, 1, 100);
+    store
+        .commit_transfer(CommitRequest {
+            deactivate: &[consumed.id],
+            create: std::slice::from_ref(&created1),
+            cas_guards: &[],
+            account_guards: &[],
+            reservation: None,
+            record: commit_record(EnvelopeId([8; 32]), vec![consumed.id]),
+            events: &[],
+        })
+        .await
+        .unwrap();
+
+    let created2 = make_posting([9; 32], 0, 2, 1, 100);
+    let err = store
+        .commit_transfer(CommitRequest {
+            deactivate: &[consumed.id],
+            create: std::slice::from_ref(&created2),
+            cas_guards: &[],
+            account_guards: &[],
+            reservation: None,
+            record: commit_record(EnvelopeId([9; 32]), vec![consumed.id]),
+            events: &[],
+        })
+        .await
+        .unwrap_err();
+    assert!(matches!(err, StoreError::ReservationMismatch(_)));
+}
+
+/// A commit whose pinned account version is stale aborts with VersionConflict,
+/// closing the validate→commit window against a concurrent lifecycle mutation.
+pub async fn commit_stale_account_guard_fails(store: &(impl Store + 'static)) {
+    let acc = make_account(1, AccountPolicy::NoOverdraft); // version 1
+    store.create_account(acc.clone()).await.unwrap();
+    // A concurrent freeze/close would bump the version like this.
+    let mut bumped = acc.clone();
+    bumped.version = 2;
+    store.append_account_version(bumped).await.unwrap();
+
+    let created = make_posting([8; 32], 0, 1, 1, 100);
+    let err = store
+        .commit_transfer(CommitRequest {
+            deactivate: &[],
+            create: std::slice::from_ref(&created),
+            cas_guards: &[],
+            account_guards: &[(AccountId::new(1), 1)],
+            reservation: None,
+            record: commit_record(EnvelopeId([8; 32]), vec![]),
+            events: &[],
+        })
+        .await
+        .unwrap_err();
+    assert!(matches!(err, StoreError::VersionConflict { .. }));
+}
+
+// ---------------------------------------------------------------------------
 // TransferStore tests
 // TransferStore tests
 // ---------------------------------------------------------------------------
 // ---------------------------------------------------------------------------
 
 
-/// Store a transfer record and retrieve by id.
-pub async fn store_and_get_transfer(store: &(impl Store + 'static)) {
+/// Commit a transfer and retrieve it by id.
+pub async fn commit_and_get_transfer(store: &(impl Store + 'static)) {
     let (envelope, tid) = make_envelope();
     let (envelope, tid) = make_envelope();
-    let record = EnvelopeRecord {
-        envelope,
-        receipt: Receipt { transfer_id: tid },
-        created_at: 1000,
-    };
-    store.store_transfer(record.clone()).await.unwrap();
+    commit_envelope(store, envelope, tid, 1000).await;
 
 
     let got = store.get_transfer(&tid).await.unwrap();
     let got = store.get_transfer(&tid).await.unwrap();
     assert!(got.is_some());
     assert!(got.is_some());
@@ -620,12 +774,7 @@ pub async fn get_missing_transfer(store: &(impl Store + 'static)) {
 /// Query transfers by account.
 /// Query transfers by account.
 pub async fn get_transfers_for_account(store: &(impl Store + 'static)) {
 pub async fn get_transfers_for_account(store: &(impl Store + 'static)) {
     let (envelope, tid) = make_envelope();
     let (envelope, tid) = make_envelope();
-    let record = EnvelopeRecord {
-        envelope,
-        receipt: Receipt { transfer_id: tid },
-        created_at: 1000,
-    };
-    store.store_transfer(record).await.unwrap();
+    commit_envelope(store, envelope, tid, 1000).await;
 
 
     let records = store
     let records = store
         .get_transfers_for_account(&AccountId::new(1))
         .get_transfers_for_account(&AccountId::new(1))
@@ -640,15 +789,10 @@ pub async fn get_transfers_for_account(store: &(impl Store + 'static)) {
     assert!(empty.is_empty());
     assert!(empty.is_empty());
 }
 }
 
 
-/// Verify that created_at roundtrips through store/retrieve.
-pub async fn store_transfer_preserves_created_at(store: &(impl Store + 'static)) {
+/// Verify that created_at roundtrips through commit/retrieve.
+pub async fn commit_preserves_created_at(store: &(impl Store + 'static)) {
     let (envelope, tid) = make_envelope();
     let (envelope, tid) = make_envelope();
-    let record = EnvelopeRecord {
-        envelope,
-        receipt: Receipt { transfer_id: tid },
-        created_at: 1718000000000,
-    };
-    store.store_transfer(record.clone()).await.unwrap();
+    commit_envelope(store, envelope, tid, 1718000000000).await;
 
 
     let got = store.get_transfer(&tid).await.unwrap().unwrap();
     let got = store.get_transfer(&tid).await.unwrap().unwrap();
     assert_eq!(got.created_at, 1718000000000);
     assert_eq!(got.created_at, 1718000000000);
@@ -661,24 +805,10 @@ pub async fn store_transfer_preserves_created_at(store: &(impl Store + 'static))
 /// Query transfers by date range.
 /// Query transfers by date range.
 pub async fn query_transfers_by_date_range(store: &(impl Store + 'static)) {
 pub async fn query_transfers_by_date_range(store: &(impl Store + 'static)) {
     let (e1, t1) = make_envelope();
     let (e1, t1) = make_envelope();
-    store
-        .store_transfer(EnvelopeRecord {
-            envelope: e1,
-            receipt: Receipt { transfer_id: t1 },
-            created_at: 1000,
-        })
-        .await
-        .unwrap();
+    commit_envelope(store, e1, t1, 1000).await;
 
 
     let (e2, t2) = make_envelope_with_book(BookId(1));
     let (e2, t2) = make_envelope_with_book(BookId(1));
-    store
-        .store_transfer(EnvelopeRecord {
-            envelope: e2,
-            receipt: Receipt { transfer_id: t2 },
-            created_at: 2000,
-        })
-        .await
-        .unwrap();
+    commit_envelope(store, e2, t2, 2000).await;
 
 
     let page = store
     let page = store
         .query_transfers(&TransferQuery {
         .query_transfers(&TransferQuery {
@@ -700,14 +830,7 @@ pub async fn query_transfers_pagination(store: &(impl Store + 'static)) {
         tid_bytes[0] = i + 10;
         tid_bytes[0] = i + 10;
         let (envelope, _) = make_envelope();
         let (envelope, _) = make_envelope();
         let tid = EnvelopeId(tid_bytes);
         let tid = EnvelopeId(tid_bytes);
-        store
-            .store_transfer(EnvelopeRecord {
-                envelope,
-                receipt: Receipt { transfer_id: tid },
-                created_at: (i as i64 + 1) * 1000,
-            })
-            .await
-            .unwrap();
+        commit_envelope(store, envelope, tid, (i as i64 + 1) * 1000).await;
     }
     }
 
 
     let page = store
     let page = store
@@ -738,24 +861,10 @@ pub async fn query_transfers_pagination(store: &(impl Store + 'static)) {
 /// Query transfers by book.
 /// Query transfers by book.
 pub async fn query_transfers_by_book(store: &(impl Store + 'static)) {
 pub async fn query_transfers_by_book(store: &(impl Store + 'static)) {
     let (e1, t1) = make_envelope(); // book = 0
     let (e1, t1) = make_envelope(); // book = 0
-    store
-        .store_transfer(EnvelopeRecord {
-            envelope: e1,
-            receipt: Receipt { transfer_id: t1 },
-            created_at: 1000,
-        })
-        .await
-        .unwrap();
+    commit_envelope(store, e1, t1, 1000).await;
 
 
     let (e2, t2) = make_envelope_with_book(BookId(5));
     let (e2, t2) = make_envelope_with_book(BookId(5));
-    store
-        .store_transfer(EnvelopeRecord {
-            envelope: e2,
-            receipt: Receipt { transfer_id: t2 },
-            created_at: 2000,
-        })
-        .await
-        .unwrap();
+    commit_envelope(store, e2, t2, 2000).await;
 
 
     let page = store
     let page = store
         .query_transfers(&TransferQuery {
         .query_transfers(&TransferQuery {
@@ -918,7 +1027,7 @@ macro_rules! store_tests {
             get_account_history,
             get_account_history,
             list_accounts,
             list_accounts,
             // PostingStore
             // PostingStore
-            finalize_creates_postings,
+            commit_creates_postings,
             get_postings_missing_fails,
             get_postings_missing_fails,
             get_postings_by_account_filters,
             get_postings_by_account_filters,
             query_postings_pagination,
             query_postings_pagination,
@@ -927,17 +1036,20 @@ macro_rules! store_tests {
             release_postings_batch,
             release_postings_batch,
             release_active_is_noop,
             release_active_is_noop,
             release_inactive_fails,
             release_inactive_fails,
-            finalize_deactivates_postings,
+            commit_deactivates_postings,
             // CommitStore
             // CommitStore
             commit_transfer_atomic,
             commit_transfer_atomic,
             commit_transfer_idempotent,
             commit_transfer_idempotent,
             commit_transfer_reservation_mismatch,
             commit_transfer_reservation_mismatch,
             commit_transfer_cas_conflict,
             commit_transfer_cas_conflict,
+            reserve_twice_second_fails,
+            commit_double_spend_second_fails,
+            commit_stale_account_guard_fails,
             // TransferStore
             // TransferStore
-            store_and_get_transfer,
+            commit_and_get_transfer,
             get_missing_transfer,
             get_missing_transfer,
             get_transfers_for_account,
             get_transfers_for_account,
-            store_transfer_preserves_created_at,
+            commit_preserves_created_at,
             // TransferQuery
             // TransferQuery
             query_transfers_by_date_range,
             query_transfers_by_date_range,
             query_transfers_pagination,
             query_transfers_pagination,

+ 17 - 16
crates/kuatia/src/ledger.rs

@@ -161,6 +161,7 @@ impl Ledger {
                 deactivate: &plan.postings_to_deactivate,
                 deactivate: &plan.postings_to_deactivate,
                 create: &plan.postings_to_create,
                 create: &plan.postings_to_create,
                 cas_guards: &plan.cas_guards,
                 cas_guards: &plan.cas_guards,
+                account_guards: &plan.account_guards,
                 reservation: None,
                 reservation: None,
                 record: EnvelopeRecord {
                 record: EnvelopeRecord {
                     envelope: envelope.clone(),
                     envelope: envelope.clone(),
@@ -458,14 +459,13 @@ impl Ledger {
         next.version = next.version.checked_add(1).ok_or(LedgerError::Overflow)?;
         next.version = next.version.checked_add(1).ok_or(LedgerError::Overflow)?;
         next.flags |= kuatia_core::AccountFlags::FROZEN;
         next.flags |= kuatia_core::AccountFlags::FROZEN;
         self.store.append_account_version(next).await?;
         self.store.append_account_version(next).await?;
-        let _ = self
-            .store
+        self.store
             .append_event(&LedgerEvent {
             .append_event(&LedgerEvent {
                 seq: 0,
                 seq: 0,
                 timestamp: now_millis()?,
                 timestamp: now_millis()?,
                 kind: LedgerEventKind::AccountFrozen { account_id: *id },
                 kind: LedgerEventKind::AccountFrozen { account_id: *id },
             })
             })
-            .await;
+            .await?;
         Ok(())
         Ok(())
     }
     }
 
 
@@ -484,14 +484,13 @@ impl Ledger {
         next.version = next.version.checked_add(1).ok_or(LedgerError::Overflow)?;
         next.version = next.version.checked_add(1).ok_or(LedgerError::Overflow)?;
         next.flags.remove(kuatia_core::AccountFlags::FROZEN);
         next.flags.remove(kuatia_core::AccountFlags::FROZEN);
         self.store.append_account_version(next).await?;
         self.store.append_account_version(next).await?;
-        let _ = self
-            .store
+        self.store
             .append_event(&LedgerEvent {
             .append_event(&LedgerEvent {
                 seq: 0,
                 seq: 0,
                 timestamp: now_millis()?,
                 timestamp: now_millis()?,
                 kind: LedgerEventKind::AccountUnfrozen { account_id: *id },
                 kind: LedgerEventKind::AccountUnfrozen { account_id: *id },
             })
             })
-            .await;
+            .await?;
         Ok(())
         Ok(())
     }
     }
 
 
@@ -506,12 +505,16 @@ impl Ledger {
         if current.is_closed() {
         if current.is_closed() {
             return Err(LedgerError::AccountAlreadyClosed(*id));
             return Err(LedgerError::AccountAlreadyClosed(*id));
         }
         }
-        let has_active = !self
+        // Reject if any posting is still live — Active or PendingInactive
+        // (reserved, i.e. a transfer in flight). Only fully Inactive postings
+        // (or none) permit a close.
+        let blocking = self
             .store
             .store
-            .get_postings_by_account(id, None, Some(PostingStatus::Active))
+            .get_postings_by_account(id, None, None)
             .await?
             .await?
-            .is_empty();
-        if has_active {
+            .into_iter()
+            .any(|p| p.status != PostingStatus::Inactive);
+        if blocking {
             return Err(LedgerError::AccountNotEmpty(*id));
             return Err(LedgerError::AccountNotEmpty(*id));
         }
         }
         let mut next = current.clone();
         let mut next = current.clone();
@@ -519,14 +522,13 @@ impl Ledger {
         next.flags |= kuatia_core::AccountFlags::CLOSED;
         next.flags |= kuatia_core::AccountFlags::CLOSED;
         next.flags.remove(kuatia_core::AccountFlags::FROZEN);
         next.flags.remove(kuatia_core::AccountFlags::FROZEN);
         self.store.append_account_version(next).await?;
         self.store.append_account_version(next).await?;
-        let _ = self
-            .store
+        self.store
             .append_event(&LedgerEvent {
             .append_event(&LedgerEvent {
                 seq: 0,
                 seq: 0,
                 timestamp: now_millis()?,
                 timestamp: now_millis()?,
                 kind: LedgerEventKind::AccountClosed { account_id: *id },
                 kind: LedgerEventKind::AccountClosed { account_id: *id },
             })
             })
-            .await;
+            .await?;
         Ok(())
         Ok(())
     }
     }
 
 
@@ -600,14 +602,13 @@ impl Ledger {
     pub async fn create_account(&self, account: kuatia_core::Account) -> Result<(), LedgerError> {
     pub async fn create_account(&self, account: kuatia_core::Account) -> Result<(), LedgerError> {
         let id = account.id;
         let id = account.id;
         self.store.create_account(account).await?;
         self.store.create_account(account).await?;
-        let _ = self
-            .store
+        self.store
             .append_event(&LedgerEvent {
             .append_event(&LedgerEvent {
                 seq: 0,
                 seq: 0,
                 timestamp: now_millis()?,
                 timestamp: now_millis()?,
                 kind: LedgerEventKind::AccountCreated { account_id: id },
                 kind: LedgerEventKind::AccountCreated { account_id: id },
             })
             })
-            .await;
+            .await?;
         Ok(())
         Ok(())
     }
     }
 
 

+ 1 - 0
crates/kuatia/src/saga.rs

@@ -346,6 +346,7 @@ impl Step<LedgerCtx, SagaError> for FinalizeTransferStep {
                     deactivate: &plan.postings_to_deactivate,
                     deactivate: &plan.postings_to_deactivate,
                     create: &plan.postings_to_create,
                     create: &plan.postings_to_create,
                     cas_guards: &plan.cas_guards,
                     cas_guards: &plan.cas_guards,
+                    account_guards: &plan.account_guards,
                     reservation: Some(ctx.reservation),
                     reservation: Some(ctx.reservation),
                     record: EnvelopeRecord {
                     record: EnvelopeRecord {
                         envelope: envelope.clone(),
                         envelope: envelope.clone(),

+ 23 - 0
crates/kuatia/tests/integration.rs

@@ -531,6 +531,29 @@ async fn close_account_with_balance_rejected() {
 }
 }
 
 
 #[tokio::test]
 #[tokio::test]
+async fn close_rejects_reserved_postings() {
+    let ledger = setup_ledger().await;
+
+    deposit(&ledger, account(1), usd(), Cent::from(100), external()).await;
+
+    // Reserve the account's only posting (a transfer in flight): Active → PendingInactive.
+    let postings = ledger
+        .store()
+        .get_postings_by_account(&account(1), Some(&usd()), Some(PostingStatus::Active))
+        .await
+        .unwrap();
+    ledger
+        .store()
+        .reserve_postings(&[postings[0].id], ReservationId::new(1))
+        .await
+        .unwrap();
+
+    // Close must reject: the posting is live (PendingInactive), not Inactive.
+    let result = ledger.close(&account(1)).await;
+    assert!(result.is_err());
+}
+
+#[tokio::test]
 async fn freeze_closed_account_rejected() {
 async fn freeze_closed_account_rejected() {
     let ledger = setup_ledger().await;
     let ledger = setup_ledger().await;