Преглед изворни кода

Make the Store the atomic invariant boundary for commits

A static review found the pure validator was strong but the commit path
was split across separate storage calls, so a crash could change balances
without recording the transfer, and declared protections (CAS guards, book
policy, overdraft) were computed but never enforced. This reworks the Store
trait to be the single place ledger state changes.

- Add CommitStore::commit_transfer: postings, transfer record, the
  both-sided account index, and events apply in one transaction. Events are
  no longer best-effort. CAS guards are enforced (retryable Conflict) and
  consumed postings are authorized against a ReservationId so only the
  reserving saga can finalize or release them.
- Model overdraft as a negative posting covering a shortfall, down to the
  floor for CappedOverdraft and unbounded for UncappedOverdraft; resolve()
  mints it and validation permits negative postings on every policy except
  NoOverdraft.
- Enforce BookPolicy (allowed assets/accounts/flags) in validation.
- Make the SQL backend portable: per-backend migrations (BLOB vs BYTEA),
  a migrations ledger for idempotency, and portable upserts.
- Stop content-addressed ids depending on randomness: deterministic
  DEFAULT_BOOK, and move the AutoId timestamp base to a fixed recent epoch.
- Sync all documentation and project context to the new behavior.
Cesar Rodas пре 1 недеља
родитељ
комит
c715bd3924

+ 9 - 7
CLAUDE.md

@@ -10,7 +10,7 @@ Kuatia is an append-only, auditable, multi-asset UTXO-style ledger library in Ru
 crates/
   kuatia-types/     Domain types: AccountId, Posting, Movement, Cent, AutoId, etc.
   kuatia-core/      Pure, sync, no-IO logic: validation, hashing, posting selection
-  kuatia-storage/   Store trait (6 sub-traits), InMemoryStore, conformance tests
+  kuatia-storage/   Store trait (7 sub-traits), InMemoryStore, conformance tests
   kuatia-storage-sql/  SQL backend: SQLite/PostgreSQL via sqlx
   kuatia/           Async layer: Ledger resource, saga pipeline, intent API
 doc/
@@ -28,7 +28,8 @@ doc/
 - **Movement**: `{ from, to, asset, amount }` — the fundamental unit of intent. All operations (pay, deposit, withdraw) are one or more movements.
 - **Envelope**: concrete postings to consume and create — the resolved form of movements.
 - **Conservation**: for each asset, `sum(consumed) == sum(created)`.
-- **Account policies**: NoOverdraft, CappedOverdraft, UncappedOverdraft, SystemAccount, ExternalAccount. Only SystemAccount and ExternalAccount may hold negative postings.
+- **Account policies**: NoOverdraft, CappedOverdraft, UncappedOverdraft, SystemAccount, ExternalAccount. Only `NoOverdraft` forbids negative postings; the other four permit them. An overdraft is a negative posting that covers a shortfall — down to the floor for `CappedOverdraft`, unbounded for `UncappedOverdraft`.
+- **Atomic commit**: `CommitStore::commit_transfer` is the single atomic boundary — postings, transfer record, account index, and events apply in one transaction. It enforces `CappedOverdraft` CAS guards and reservation ownership. `reserve_postings`/`release_postings` carry a `ReservationId` so only the owning saga can finalize/release a reserved posting.
 
 ## Architecture
 
@@ -42,7 +43,7 @@ doc/
 
 Two-pass:
 1. For each movement, create output posting on `to` and accumulate net debit on `from`.
-2. For each (account, asset) with positive net debit, select postings (greedy largest-first) and compute change.
+2. For each (account, asset) with positive net debit, select postings (greedy largest-first) and compute change. If positive postings are insufficient: `CappedOverdraft`/`UncappedOverdraft` accounts consume all positives and create a negative posting for the shortfall (floor enforced in validation); other policies fail with `InsufficientFunds`.
 
 Deposit: two movements cancel to zero net debit on the system account — no posting selection needed.
 
@@ -54,9 +55,10 @@ Deposit: two movements cancel to zero net debit on the system account — no pos
 4. Consumed postings Active or PendingInactive
 5. Referenced accounts exist, not frozen, not closed
 6. Account snapshot pinning
-7. Per-asset conservation
-8. Negative postings only on SystemAccount/ExternalAccount
-9. Policy enforcement (balance floor)
+7. Book policy (if a book is loaded): referenced assets/accounts/flags allowed by the book
+8. Per-asset conservation
+9. Negative postings forbidden only on `NoOverdraft` (allowed on overdraft/system/external)
+10. Policy enforcement (balance floor)
 
 ## Testing
 
@@ -82,7 +84,7 @@ cargo test -p kuatia        # integration + saga tests
    ^sign (always 0 = positive)
   ```
   - Bit 63: always 0 (keeps i64 positive)
-  - Bits 62–23: Unix milliseconds (40 bits ≈ 34.8 years from epoch)
+  - Bits 62–23: milliseconds since `KUATIA_EPOCH_MS` (2026-01-01T00:00:00Z), not the Unix epoch — 40 bits ≈ 34.8 years going forward (until ~2060)
   - Bits 22–0: lower 23 bits of CRC32 of context-specific data (e.g. serialized event)
   - When no data is provided, an internal atomic counter is used (wraps on 23-bit overflow)
   - Implementation: `AutoId` in `kuatia-types/src/autoid.rs`, includes inline CRC32 (IEEE)

+ 1 - 1
README.md

@@ -38,7 +38,7 @@ balance is always the sum of its active postings.
 |-------|---------|
 | **kuatia-types** | Domain types — `AccountId`, `Posting`, `Transfer`, `Cent`, etc. |
 | **kuatia-core** | Pure, sans-IO decision logic — validation, hashing, posting selection. |
-| **kuatia-storage** | `Store` trait (6 sub-traits), `InMemoryStore`, `store_tests!` conformance macro. |
+| **kuatia-storage** | `Store` trait (7 sub-traits), `InMemoryStore`, `store_tests!` conformance macro. |
 | **kuatia-storage-sql** | SQL-backed `Store` — SQLite and PostgreSQL via sqlx. |
 | **kuatia** | Async resource layer — `Ledger`, saga commit pipeline, intent-layer API. |
 

+ 4 - 3
crates/kuatia-core/README.md

@@ -23,8 +23,9 @@ with golden vectors. Depends only on `kuatia-types` and `sha2`.
 4. All consumed postings are Active or PendingInactive
 5. All accounts exist, not frozen, not closed
 6. Account snapshot pinning (OCC)
-7. Per-asset conservation: `sum(consumed) == sum(created)`
-8. Negative postings only on SystemAccount or ExternalAccount
-9. Account policy enforcement (overdraft limits)
+7. Book policy (if a book is loaded): referenced assets/accounts/flags allowed
+8. Per-asset conservation: `sum(consumed) == sum(created)`
+9. Negative postings forbidden only on `NoOverdraft` (allowed on overdraft/system/external)
+10. Account policy enforcement (overdraft limits)
 
 Returns a `Plan` on success, or a `ValidationError` describing the violation.

+ 12 - 14
crates/kuatia-core/src/posting_selection.rs

@@ -92,16 +92,15 @@ mod tests {
     use kuatia_types::*;
 
     fn make_posting(index: u16, value: i64) -> Posting {
-        Posting {
-            id: PostingId {
+        Posting::new(
+            PostingId {
                 transfer: EnvelopeId([1; 32]),
                 index,
             },
-            owner: AccountId::new(1),
-            asset: AssetId::new(1),
-            value: Cent::from(value),
-            status: PostingStatus::Active,
-        }
+            AccountId::new(1),
+            AssetId::new(1),
+            Cent::from(value),
+        )
     }
 
     #[test]
@@ -155,16 +154,15 @@ mod tests {
 
     #[test]
     fn ignores_negative_postings() {
-        let negative = Posting {
-            id: PostingId {
+        let negative = Posting::new(
+            PostingId {
                 transfer: EnvelopeId([1; 32]),
                 index: 0,
             },
-            owner: AccountId::new(1),
-            asset: AssetId::new(1),
-            value: Cent::from(-100),
-            status: PostingStatus::Active,
-        };
+            AccountId::new(1),
+            AssetId::new(1),
+            Cent::from(-100),
+        );
         let good = make_posting(1, 50);
         let postings = vec![negative, good];
         let result = select_postings(&postings, AssetId::new(1), Cent::from(50)).unwrap();

+ 108 - 14
crates/kuatia-core/src/validate.rs

@@ -26,6 +26,11 @@ pub struct PlanInput<'a> {
     pub accounts: &'a HashMap<AccountId, Account>,
     /// Current balances keyed by (account, asset).
     pub balances: &'a HashMap<(AccountId, AssetId), Cent>,
+    /// The book gating this transfer, if one is loaded. `Some` enforces the
+    /// book's [`BookPolicy`] (allowed assets/accounts/flags); `None` means the
+    /// implicit unrestricted default book. The async layer is responsible for
+    /// rejecting a *named* book id that has no row before reaching here.
+    pub book: Option<&'a Book>,
 }
 
 /// The validated effects to apply atomically. Produced only when every
@@ -101,7 +106,7 @@ pub enum ValidationError {
         /// The actual current snapshot hash.
         actual: [u8; 32],
     },
-    /// A negative posting targets an account that is not a system or external account.
+    /// A negative posting targets an account whose policy forbids offset positions.
     NegativePostingOnNonSystemAccount {
         /// The account that would receive the negative posting.
         account: AccountId,
@@ -110,6 +115,20 @@ pub enum ValidationError {
         /// The negative value.
         value: Cent,
     },
+    /// An asset is not permitted by the transfer's book policy.
+    BookAssetNotAllowed {
+        /// The book whose policy rejected the asset.
+        book: BookId,
+        /// The disallowed asset.
+        asset: AssetId,
+    },
+    /// An account is not permitted to participate by the transfer's book policy.
+    BookAccountNotAllowed {
+        /// The book whose policy rejected the account.
+        book: BookId,
+        /// The disallowed account.
+        account: AccountId,
+    },
     /// An arithmetic operation overflowed.
     Overflow,
 }
@@ -172,9 +191,15 @@ impl std::fmt::Display for ValidationError {
             } => {
                 write!(
                     f,
-                    "negative posting ({value}) on non-system account {account:?}/{asset:?}"
+                    "negative posting ({value}) on account {account:?}/{asset:?} whose policy forbids offsets"
                 )
             }
+            Self::BookAssetNotAllowed { book, asset } => {
+                write!(f, "asset {asset:?} not allowed by book {book:?}")
+            }
+            Self::BookAccountNotAllowed { book, account } => {
+                write!(f, "account {account:?} not allowed by book {book:?}")
+            }
             Self::Overflow => write!(f, "monetary amount overflow"),
         }
     }
@@ -270,6 +295,47 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
         }
     }
 
+    // 5c. Book policy: gate which assets and accounts may participate. Enforced
+    //     only when a book is loaded; an empty policy field means "no restriction".
+    if let Some(book) = input.book {
+        let policy = &book.policy;
+
+        if !policy.allowed_assets.is_empty() {
+            let mut referenced_assets: HashSet<AssetId> = HashSet::new();
+            for pid in envelope.consumes() {
+                referenced_assets.insert(consumed_by_id[pid].asset);
+            }
+            for np in envelope.creates() {
+                referenced_assets.insert(np.asset);
+            }
+            for asset in &referenced_assets {
+                if !policy.allowed_assets.contains(asset) {
+                    return Err(ValidationError::BookAssetNotAllowed {
+                        book: book.id,
+                        asset: *asset,
+                    });
+                }
+            }
+        }
+
+        let no_account_restriction =
+            policy.allowed_accounts.is_empty() && policy.allowed_flags.is_empty();
+        if !no_account_restriction {
+            for aid in &all_account_ids {
+                let account = &input.accounts[aid];
+                let listed = policy.allowed_accounts.contains(aid);
+                let flag_match = !policy.allowed_flags.is_empty()
+                    && account.flags.intersects(policy.allowed_flags);
+                if !(listed || flag_match) {
+                    return Err(ValidationError::BookAccountNotAllowed {
+                        book: book.id,
+                        account: *aid,
+                    });
+                }
+            }
+        }
+    }
+
     // 6. Per-asset conservation: Σ consumed == Σ created
     let mut consumed_by_asset: HashMap<AssetId, Cent> = HashMap::new();
     for pid in envelope.consumes() {
@@ -301,7 +367,9 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
         }
     }
 
-    // 7. Negative postings may only target system or external accounts.
+    // 7. Negative postings (offset positions) may target system, external, or
+    //    overdraft accounts. Overdraft floors are enforced separately in step 8.
+    //    Only NoOverdraft forbids holding a negative posting.
     for np in envelope.creates() {
         if np.value.is_negative() {
             let account = input
@@ -309,8 +377,11 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
                 .get(&np.owner)
                 .ok_or(ValidationError::AccountNotFound(np.owner))?;
             match account.policy {
-                AccountPolicy::SystemAccount | AccountPolicy::ExternalAccount => {}
-                _ => {
+                AccountPolicy::SystemAccount
+                | AccountPolicy::ExternalAccount
+                | AccountPolicy::UncappedOverdraft
+                | AccountPolicy::CappedOverdraft { .. } => {}
+                AccountPolicy::NoOverdraft => {
                     return Err(ValidationError::NegativePostingOnNonSystemAccount {
                         account: np.owner,
                         asset: np.asset,
@@ -386,15 +457,16 @@ pub fn validate_and_plan(input: PlanInput<'_>) -> Result<Plan, ValidationError>
         .creates
         .iter()
         .enumerate()
-        .map(|(i, np)| Posting {
-            id: PostingId {
-                transfer: tid,
-                index: i as u16,
-            },
-            owner: np.owner,
-            asset: np.asset,
-            value: np.value,
-            status: PostingStatus::Active,
+        .map(|(i, np)| {
+            Posting::new(
+                PostingId {
+                    transfer: tid,
+                    index: i as u16,
+                },
+                np.owner,
+                np.asset,
+                np.value,
+            )
         })
         .collect();
 
@@ -466,6 +538,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         let plan = validate_and_plan(input).unwrap();
@@ -490,6 +563,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -520,6 +594,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         match validate_and_plan(input) {
@@ -549,6 +624,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -569,6 +645,7 @@ mod tests {
             asset: AssetId::new(1),
             value: Cent::from(100),
             status: PostingStatus::Inactive, // already consumed
+            reservation: None,
         };
         let envelope = Envelope {
             consumes: vec![pid],
@@ -593,6 +670,7 @@ mod tests {
             consumed_postings: &[posting],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -613,6 +691,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -633,6 +712,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -653,6 +733,7 @@ mod tests {
             asset: AssetId::new(1),
             value: Cent::from(50),
             status: PostingStatus::Active,
+            reservation: None,
         };
         // Try to send 50 but create 100 for recipient (conservation will fail first,
         // but let's test overdraft with a valid conservation)
@@ -683,6 +764,7 @@ mod tests {
             consumed_postings: &[posting],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         match validate_and_plan(input) {
@@ -705,6 +787,7 @@ mod tests {
             asset: AssetId::new(1),
             value: Cent::from(100),
             status: PostingStatus::Active,
+            reservation: None,
         };
         let envelope = Envelope {
             consumes: vec![pid],
@@ -737,6 +820,7 @@ mod tests {
             consumed_postings: &[posting],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         let plan = validate_and_plan(input).unwrap();
@@ -757,6 +841,7 @@ mod tests {
             asset: AssetId::new(1),
             value: Cent::from(100),
             status: PostingStatus::Active,
+            reservation: None,
         };
         let envelope = Envelope {
             consumes: vec![pid],
@@ -789,6 +874,7 @@ mod tests {
             consumed_postings: &[posting],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         match validate_and_plan(input) {
@@ -814,6 +900,7 @@ mod tests {
             asset: AssetId::new(1),
             value: Cent::from(100),
             status: PostingStatus::Active,
+            reservation: None,
         };
         let envelope = Envelope {
             consumes: vec![pid],
@@ -841,6 +928,7 @@ mod tests {
             consumed_postings: &[posting],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         let plan = validate_and_plan(input).unwrap();
@@ -868,6 +956,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -889,6 +978,7 @@ mod tests {
             asset: AssetId::new(1),
             value: Cent::from(100),
             status: PostingStatus::Active,
+            reservation: None,
         };
         let envelope = Envelope {
             consumes: vec![pid],
@@ -923,6 +1013,7 @@ mod tests {
             consumed_postings: &[posting],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         let plan = validate_and_plan(input).unwrap();
@@ -963,6 +1054,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -1001,6 +1093,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         assert_eq!(
@@ -1026,6 +1119,7 @@ mod tests {
             consumed_postings: &[],
             accounts: &accounts,
             balances: &balances,
+            book: None,
         };
 
         let plan = validate_and_plan(input).unwrap();

+ 7 - 1
crates/kuatia-storage-sql/README.md

@@ -15,7 +15,13 @@ kuatia-storage-sql = { features = ["sqlite"] }   # or "postgres"
 | Feature | Backend | Status |
 |---------|---------|--------|
 | `sqlite` (default) | SQLite via sqlx | Conformance tests pass |
-| `postgres` | PostgreSQL via sqlx | Feature-flagged, needs running instance |
+| `postgres` | PostgreSQL via sqlx | Portable DDL/queries; needs a running instance to test |
+
+The backend is detected at migration time and the matching DDL is applied from
+`src/migrations/{sqlite,postgres}/` (SQLite uses `BLOB`, PostgreSQL uses
+`BYTEA`). Applied migrations are tracked in a `_migrations` table, so
+`migrate()` is idempotent. Upserts use portable `ON CONFLICT … DO UPDATE`, and
+all ids are generated in Rust (no `AUTOINCREMENT`/`SERIAL`).
 
 ## Usage
 

+ 257 - 32
crates/kuatia-storage-sql/src/lib.rs

@@ -35,14 +35,37 @@ impl SqlStore {
         }
     }
 
-    /// Run database migrations.
+    /// Run database migrations. Idempotent: a `_migrations` ledger records what
+    /// has been applied, so re-running is a no-op. The DDL is selected per
+    /// backend (SQLite uses `BLOB`, PostgreSQL uses `BYTEA`).
     pub async fn migrate(&self) -> Result<(), StoreError> {
-        for sql in [
-            include_str!("migrations/001_init.sql"),
-            include_str!("migrations/002_timestamps_and_columns.sql"),
-            include_str!("migrations/003_events.sql"),
-            include_str!("migrations/004_books.sql"),
-        ] {
+        // Detect the backend. `sqlite_version()` exists only on SQLite.
+        let is_sqlite = sqlx::query("SELECT sqlite_version()")
+            .fetch_optional(&self.pool)
+            .await
+            .is_ok();
+
+        sqlx::query("CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)")
+            .execute(&self.pool)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+
+        let migrations: &[(&str, &str)] = if is_sqlite {
+            &[("001_init", include_str!("migrations/sqlite/001_init.sql"))]
+        } else {
+            &[("001_init", include_str!("migrations/postgres/001_init.sql"))]
+        };
+
+        for (name, sql) in migrations {
+            let applied = sqlx::query("SELECT 1 FROM _migrations WHERE name = $1")
+                .bind(*name)
+                .fetch_optional(&self.pool)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            if applied.is_some() {
+                continue;
+            }
+
             for statement in sql.split(';') {
                 let trimmed = statement.trim();
                 if !trimmed.is_empty() {
@@ -52,6 +75,12 @@ impl SqlStore {
                         .map_err(|e| StoreError::Internal(e.to_string()))?;
                 }
             }
+
+            sqlx::query("INSERT INTO _migrations (name) VALUES ($1)")
+                .bind(*name)
+                .execute(&self.pool)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
         }
         Ok(())
     }
@@ -148,6 +177,9 @@ fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
     let status: i16 = row
         .try_get("status")
         .map_err(|e| StoreError::Internal(e.to_string()))?;
+    let reservation: Option<i64> = row
+        .try_get("reservation")
+        .map_err(|e| StoreError::Internal(e.to_string()))?;
 
     let mut tid = [0u8; 32];
     tid.copy_from_slice(&transfer_id);
@@ -161,6 +193,7 @@ fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
         asset: AssetId::new(asset as u32),
         value: Cent::from(value),
         status: status_from_i16(status)?,
+        reservation: reservation.map(ReservationId::new),
     })
 }
 
@@ -398,7 +431,11 @@ impl PostingStore for SqlStore {
         })
     }
 
-    async fn reserve_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
+    async fn reserve_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: ReservationId,
+    ) -> Result<(), StoreError> {
         // Validate all Active first, then update in a transaction.
         let mut tx = self
             .pool
@@ -424,13 +461,16 @@ impl PostingStore for SqlStore {
         }
 
         for id in ids {
-            sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3")
-                .bind(status_to_i16(PostingStatus::PendingInactive))
-                .bind(id.transfer.0.as_slice())
-                .bind(id.index as i16)
-                .execute(&mut *tx)
-                .await
-                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            sqlx::query(
+                "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4",
+            )
+            .bind(status_to_i16(PostingStatus::PendingInactive))
+            .bind(reservation.0)
+            .bind(id.transfer.0.as_slice())
+            .bind(id.index as i16)
+            .execute(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
         }
 
         tx.commit()
@@ -439,7 +479,11 @@ impl PostingStore for SqlStore {
         Ok(())
     }
 
-    async fn release_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
+    async fn release_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: ReservationId,
+    ) -> Result<(), StoreError> {
         let mut tx = self
             .pool
             .begin()
@@ -447,24 +491,31 @@ impl PostingStore for SqlStore {
             .map_err(|e| StoreError::Internal(e.to_string()))?;
 
         for id in ids {
-            let row =
-                sqlx::query("SELECT status FROM postings WHERE transfer_id = $1 AND idx = $2")
-                    .bind(id.transfer.0.as_slice())
-                    .bind(id.index as i16)
-                    .fetch_optional(&mut *tx)
-                    .await
-                    .map_err(|e| StoreError::Internal(e.to_string()))?
-                    .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
+            let row = sqlx::query(
+                "SELECT status, reservation FROM postings WHERE transfer_id = $1 AND idx = $2",
+            )
+            .bind(id.transfer.0.as_slice())
+            .bind(id.index as i16)
+            .fetch_optional(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?
+            .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
             let status: i16 = row
                 .try_get("status")
                 .map_err(|e| StoreError::Internal(e.to_string()))?;
             if status == 2 {
                 return Err(StoreError::PostingInactive(*id));
             }
+            let owner: Option<i64> = row
+                .try_get("reservation")
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            if status == 1 && owner != Some(reservation.0) {
+                return Err(StoreError::ReservationMismatch(*id));
+            }
         }
 
         for id in ids {
-            sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3 AND status = $4")
+            sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
                 .bind(status_to_i16(PostingStatus::Active))
                 .bind(id.transfer.0.as_slice())
                 .bind(id.index as i16)
@@ -492,13 +543,16 @@ impl PostingStore for SqlStore {
             .map_err(|e| StoreError::Internal(e.to_string()))?;
 
         for id in deactivate {
-            sqlx::query("UPDATE postings SET status = $1 WHERE transfer_id = $2 AND idx = $3")
+            let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3")
                 .bind(status_to_i16(PostingStatus::Inactive))
                 .bind(id.transfer.0.as_slice())
                 .bind(id.index as i16)
                 .execute(&mut *tx)
                 .await
                 .map_err(|e| StoreError::Internal(e.to_string()))?;
+            if res.rows_affected() == 0 {
+                return Err(StoreError::NotFound(format!("posting {id:?}")));
+            }
         }
 
         for posting in create {
@@ -705,12 +759,15 @@ impl TransferStore for SqlStore {
 #[async_trait]
 impl SagaStore for SqlStore {
     async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError> {
-        sqlx::query("INSERT OR REPLACE INTO sagas (id, data) VALUES ($1, $2)")
-            .bind(*id)
-            .bind(&data)
-            .execute(&self.pool)
-            .await
-            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        sqlx::query(
+            "INSERT INTO sagas (id, data) VALUES ($1, $2) \
+             ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
+        )
+        .bind(*id)
+        .bind(&data)
+        .execute(&self.pool)
+        .await
+        .map_err(|e| StoreError::Internal(e.to_string()))?;
         Ok(())
     }
 
@@ -849,3 +906,171 @@ impl BookStore for SqlStore {
             .collect()
     }
 }
+
+// ---------------------------------------------------------------------------
+// CommitStore — the single atomic commit boundary
+// ---------------------------------------------------------------------------
+
+#[async_trait]
+impl CommitStore for SqlStore {
+    async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError> {
+        let tid = req.record.receipt.transfer_id;
+        let transfer_bytes = serialize_blob(&req.record.envelope)?;
+        let receipt_bytes = serialize_blob(&req.record.receipt)?;
+
+        let mut tx = self
+            .pool
+            .begin()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+
+        // 1. Idempotency: already committed?
+        let exists = sqlx::query("SELECT 1 FROM transfers WHERE id = $1")
+            .bind(tid.0.as_slice())
+            .fetch_optional(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        if exists.is_some() {
+            return Ok(());
+        }
+
+        // 2. CAS guards — recompute Σ(non-Inactive) in Rust and compare.
+        for (account, asset, expected) in req.cas_guards {
+            let rows = sqlx::query(
+                "SELECT value FROM postings WHERE owner = $1 AND asset = $2 AND status != $3",
+            )
+            .bind(account.0)
+            .bind(asset.0 as i32)
+            .bind(status_to_i16(PostingStatus::Inactive))
+            .fetch_all(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+            let mut sum: i64 = 0;
+            for row in &rows {
+                let v: i64 = row
+                    .try_get("value")
+                    .map_err(|e| StoreError::Internal(e.to_string()))?;
+                sum = sum
+                    .checked_add(v)
+                    .ok_or_else(|| StoreError::Internal("balance overflow during cas".into()))?;
+            }
+            if sum != expected.value() {
+                return Err(StoreError::Conflict {
+                    account: *account,
+                    asset: *asset,
+                });
+            }
+        }
+
+        // 3. Authorize consumed postings and collect their owners.
+        let mut account_ids: HashSet<i64> = HashSet::new();
+        for pid in req.deactivate {
+            let row = sqlx::query(
+                "SELECT owner, status, reservation FROM postings WHERE transfer_id = $1 AND idx = $2",
+            )
+            .bind(pid.transfer.0.as_slice())
+            .bind(pid.index as i16)
+            .fetch_optional(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?
+            .ok_or(StoreError::ReservationMismatch(*pid))?;
+            let owner: i64 = row
+                .try_get("owner")
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            let status: i16 = row
+                .try_get("status")
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            let reservation: Option<i64> = row
+                .try_get("reservation")
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            match req.reservation {
+                None => {
+                    if status != status_to_i16(PostingStatus::Active) {
+                        return Err(StoreError::ReservationMismatch(*pid));
+                    }
+                }
+                Some(rid) => {
+                    if status != status_to_i16(PostingStatus::PendingInactive)
+                        || reservation != Some(rid.0)
+                    {
+                        return Err(StoreError::ReservationMismatch(*pid));
+                    }
+                }
+            }
+            account_ids.insert(owner);
+        }
+
+        // 4. Deactivate consumed postings, asserting each affects exactly one row.
+        for pid in req.deactivate {
+            let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3")
+                .bind(status_to_i16(PostingStatus::Inactive))
+                .bind(pid.transfer.0.as_slice())
+                .bind(pid.index as i16)
+                .execute(&mut *tx)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            if res.rows_affected() != 1 {
+                return Err(StoreError::ReservationMismatch(*pid));
+            }
+        }
+
+        // 5. Insert created postings (Active, unreserved → reservation defaults NULL).
+        for posting in req.create {
+            sqlx::query(
+                "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6)"
+            )
+                .bind(posting.id.transfer.0.as_slice())
+                .bind(posting.id.index as i16)
+                .bind(posting.owner.0)
+                .bind(posting.asset.0 as i32)
+                .bind(posting.value.value())
+                .bind(status_to_i16(posting.status))
+                .execute(&mut *tx)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            account_ids.insert(posting.owner.0);
+        }
+
+        // 6. Persist the transfer record.
+        sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5)")
+            .bind(tid.0.as_slice())
+            .bind(&transfer_bytes)
+            .bind(&receipt_bytes)
+            .bind(req.record.created_at)
+            .bind(req.record.envelope.book().0)
+            .execute(&mut *tx)
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+
+        // 7. Index transfer_accounts from BOTH created and consumed owners.
+        for aid in &account_ids {
+            sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2)")
+                .bind(tid.0.as_slice())
+                .bind(*aid)
+                .execute(&mut *tx)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+        }
+
+        // 8. Append events within the same transaction.
+        for event in req.events {
+            let kind_str = serde_json::to_string(&event.kind)
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+            let data = serialize_blob(event)?;
+            let seq = self.autoid.next() as u64;
+            sqlx::query("INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)")
+                .bind(seq as i64)
+                .bind(event.timestamp)
+                .bind(&kind_str)
+                .bind(&data)
+                .execute(&mut *tx)
+                .await
+                .map_err(|e| StoreError::Internal(e.to_string()))?;
+        }
+
+        tx.commit()
+            .await
+            .map_err(|e| StoreError::Internal(e.to_string()))?;
+        Ok(())
+    }
+}

+ 0 - 4
crates/kuatia-storage-sql/src/migrations/002_timestamps_and_columns.sql

@@ -1,4 +0,0 @@
-ALTER TABLE transfers ADD COLUMN created_at BIGINT NOT NULL DEFAULT 0;
-ALTER TABLE transfers ADD COLUMN book BIGINT NOT NULL DEFAULT 0;
-CREATE INDEX idx_transfers_created_at ON transfers(created_at);
-CREATE INDEX idx_transfers_book ON transfers(book);

+ 0 - 6
crates/kuatia-storage-sql/src/migrations/003_events.sql

@@ -1,6 +0,0 @@
-CREATE TABLE IF NOT EXISTS events (
-    seq       BIGINT PRIMARY KEY,
-    timestamp BIGINT NOT NULL,
-    kind      TEXT NOT NULL,
-    data      BLOB NOT NULL
-);

+ 0 - 5
crates/kuatia-storage-sql/src/migrations/004_books.sql

@@ -1,5 +0,0 @@
-CREATE TABLE IF NOT EXISTS books (
-    id       BIGINT PRIMARY KEY,
-    name     TEXT NOT NULL,
-    data     BLOB NOT NULL
-);

+ 60 - 0
crates/kuatia-storage-sql/src/migrations/postgres/001_init.sql

@@ -0,0 +1,60 @@
+CREATE TABLE IF NOT EXISTS accounts (
+    id          BIGINT NOT NULL,
+    version     BIGINT NOT NULL,
+    policy      TEXT NOT NULL,
+    flags       INTEGER NOT NULL,
+    book        BIGINT NOT NULL,
+    user_data   BYTEA NOT NULL,
+    metadata    BYTEA NOT NULL,
+    PRIMARY KEY (id, version)
+);
+
+CREATE TABLE IF NOT EXISTS postings (
+    transfer_id BYTEA NOT NULL,
+    idx         SMALLINT NOT NULL,
+    owner       BIGINT NOT NULL,
+    asset       INTEGER NOT NULL,
+    value       BIGINT NOT NULL,
+    status      SMALLINT NOT NULL,
+    reservation BIGINT,
+    PRIMARY KEY (transfer_id, idx)
+);
+
+CREATE INDEX IF NOT EXISTS idx_postings_owner ON postings(owner, asset, status);
+
+CREATE TABLE IF NOT EXISTS transfers (
+    id         BYTEA PRIMARY KEY,
+    transfer   BYTEA NOT NULL,
+    receipt    BYTEA NOT NULL,
+    created_at BIGINT NOT NULL DEFAULT 0,
+    book       BIGINT NOT NULL DEFAULT 0
+);
+
+CREATE INDEX IF NOT EXISTS idx_transfers_created_at ON transfers(created_at);
+CREATE INDEX IF NOT EXISTS idx_transfers_book ON transfers(book);
+
+CREATE TABLE IF NOT EXISTS transfer_accounts (
+    transfer_id BYTEA NOT NULL,
+    account_id  BIGINT NOT NULL,
+    PRIMARY KEY (transfer_id, account_id)
+);
+
+CREATE INDEX IF NOT EXISTS idx_xfer_acct ON transfer_accounts(account_id);
+
+CREATE TABLE IF NOT EXISTS sagas (
+    id   BIGINT PRIMARY KEY,
+    data BYTEA NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS events (
+    seq       BIGINT PRIMARY KEY,
+    timestamp BIGINT NOT NULL,
+    kind      TEXT NOT NULL,
+    data      BYTEA NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS books (
+    id   BIGINT PRIMARY KEY,
+    name TEXT NOT NULL,
+    data BYTEA NOT NULL
+);

+ 23 - 4
crates/kuatia-storage-sql/src/migrations/001_init.sql → crates/kuatia-storage-sql/src/migrations/sqlite/001_init.sql

@@ -3,7 +3,7 @@ CREATE TABLE IF NOT EXISTS accounts (
     version     BIGINT NOT NULL,
     policy      TEXT NOT NULL,
     flags       INTEGER NOT NULL,
-    book     BIGINT NOT NULL,
+    book        BIGINT NOT NULL,
     user_data   BLOB NOT NULL,
     metadata    BLOB NOT NULL,
     PRIMARY KEY (id, version)
@@ -16,17 +16,23 @@ CREATE TABLE IF NOT EXISTS postings (
     asset       INTEGER NOT NULL,
     value       BIGINT NOT NULL,
     status      SMALLINT NOT NULL,
+    reservation BIGINT,
     PRIMARY KEY (transfer_id, idx)
 );
 
 CREATE INDEX IF NOT EXISTS idx_postings_owner ON postings(owner, asset, status);
 
 CREATE TABLE IF NOT EXISTS transfers (
-    id       BLOB PRIMARY KEY,
-    transfer BLOB NOT NULL,
-    receipt  BLOB NOT NULL
+    id         BLOB PRIMARY KEY,
+    transfer   BLOB NOT NULL,
+    receipt    BLOB NOT NULL,
+    created_at BIGINT NOT NULL DEFAULT 0,
+    book       BIGINT NOT NULL DEFAULT 0
 );
 
+CREATE INDEX IF NOT EXISTS idx_transfers_created_at ON transfers(created_at);
+CREATE INDEX IF NOT EXISTS idx_transfers_book ON transfers(book);
+
 CREATE TABLE IF NOT EXISTS transfer_accounts (
     transfer_id BLOB NOT NULL,
     account_id  BIGINT NOT NULL,
@@ -39,3 +45,16 @@ CREATE TABLE IF NOT EXISTS sagas (
     id   BIGINT PRIMARY KEY,
     data BLOB NOT NULL
 );
+
+CREATE TABLE IF NOT EXISTS events (
+    seq       BIGINT PRIMARY KEY,
+    timestamp BIGINT NOT NULL,
+    kind      TEXT NOT NULL,
+    data      BLOB NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS books (
+    id   BIGINT PRIMARY KEY,
+    name TEXT NOT NULL,
+    data BLOB NOT NULL
+);

+ 15 - 0
crates/kuatia-storage-sql/tests/sqlite.rs

@@ -16,3 +16,18 @@ async fn new_store() -> SqlStore {
 }
 
 kuatia_storage::store_tests!(new_store);
+
+/// migrate() is idempotent: running it repeatedly on the same DB is a no-op.
+#[tokio::test]
+async fn migrate_is_idempotent() {
+    sqlx::any::install_default_drivers();
+    let pool = sqlx::any::AnyPoolOptions::new()
+        .max_connections(1)
+        .connect("sqlite::memory:")
+        .await
+        .unwrap();
+    let store = SqlStore::new(pool);
+    store.migrate().await.unwrap();
+    store.migrate().await.unwrap();
+    store.migrate().await.unwrap();
+}

+ 4 - 3
crates/kuatia-storage/README.md

@@ -2,7 +2,7 @@
 
 Storage abstraction for the kuatia ledger.
 
-Defines the `Store` trait (composed of six sub-traits), provides an
+Defines the `Store` trait (composed of seven sub-traits), provides an
 in-memory implementation for tests, and exports a `store_tests!` conformance
 macro that any backend can use to validate its implementation.
 
@@ -11,13 +11,14 @@ macro that any backend can use to validate its implementation.
 | Trait | Purpose |
 |-------|---------|
 | `AccountStore` | Account CRUD and versioning |
-| `PostingStore` | Posting reads, reserve/release/finalize lifecycle |
+| `PostingStore` | Posting reads, reserve/release/finalize lifecycle (reserve/release carry a `ReservationId`) |
 | `TransferStore` | Transfer persistence and queries |
 | `SagaStore` | Saga state for crash recovery |
 | `EventStore` | Append-only ledger event log |
 | `BookStore` | Book (transfer policy scope) persistence |
+| `CommitStore` | `commit_transfer` — the single atomic commit boundary (postings + transfer + index + events) |
 
-`Store` is a blanket trait — any type implementing all six sub-traits is a `Store`.
+`Store` is a blanket trait — any type implementing all seven sub-traits is a `Store`.
 
 ## Conformance testing
 

+ 16 - 1
crates/kuatia-storage/src/error.rs

@@ -1,6 +1,6 @@
 //! Error types for storage implementations.
 
-use kuatia_types::{AccountId, PostingId};
+use kuatia_types::{AccountId, AssetId, PostingId};
 
 /// Errors produced by [`Store`](crate::store::Store) implementations.
 #[derive(Debug)]
@@ -24,6 +24,17 @@ pub enum StoreError {
     PostingNotActive(PostingId),
     /// Attempted to release a void (Inactive) posting.
     PostingInactive(PostingId),
+    /// A CAS guard's expected balance no longer matches the current balance —
+    /// a concurrent transfer moved it. The caller should re-validate and retry.
+    Conflict {
+        /// Account whose balance changed under the guard.
+        account: AccountId,
+        /// Asset whose balance changed.
+        asset: AssetId,
+    },
+    /// A posting's reservation owner did not match the caller's reservation —
+    /// it is reserved by a different saga (or not reserved at all).
+    ReservationMismatch(PostingId),
 }
 
 impl std::fmt::Display for StoreError {
@@ -44,6 +55,10 @@ impl std::fmt::Display for StoreError {
             Self::Internal(msg) => write!(f, "internal error: {msg}"),
             Self::PostingNotActive(id) => write!(f, "posting not active: {id:?}"),
             Self::PostingInactive(id) => write!(f, "posting is void (inactive): {id:?}"),
+            Self::Conflict { account, asset } => {
+                write!(f, "cas conflict on {account:?}/{asset:?}: balance changed")
+            }
+            Self::ReservationMismatch(id) => write!(f, "reservation mismatch on posting {id:?}"),
         }
     }
 }

+ 1 - 1
crates/kuatia-storage/src/lib.rs

@@ -1,6 +1,6 @@
 //! Storage abstraction for the ledger.
 //!
-//! Provides the [`Store`](store::Store) trait (composed of four sub-traits),
+//! Provides the [`Store`](store::Store) trait (composed of seven sub-traits),
 //! an in-memory implementation, and a conformance test suite macro.
 
 pub mod error;

+ 127 - 13
crates/kuatia-storage/src/mem_store.rs

@@ -8,13 +8,15 @@ use tokio::sync::RwLock;
 
 use kuatia_types::autoid::AutoId;
 use kuatia_types::{
-    Account, AccountId, AssetId, EnvelopeId, Book, BookId, Posting, PostingId, PostingStatus,
+    Account, AccountId, AssetId, Book, BookId, Cent, EnvelopeId, Posting, PostingId, PostingStatus,
+    ReservationId,
 };
 
 use crate::error::StoreError;
 use crate::events::{EventStore, LedgerEvent};
 use crate::store::{
-    AccountStore, EnvelopeRecord, BookStore, PostingStore, SagaStore, TransferStore,
+    AccountStore, BookStore, CommitRequest, CommitStore, EnvelopeRecord, PostingStore, SagaStore,
+    TransferStore,
 };
 
 /// In-memory [`Store`](crate::store::Store) implementation backed by `RwLock<HashMap>`.
@@ -162,7 +164,11 @@ impl PostingStore for InMemoryStore {
             .collect())
     }
 
-    async fn reserve_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
+    async fn reserve_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: ReservationId,
+    ) -> Result<(), StoreError> {
         let mut postings = self.postings.write().await;
         for id in ids {
             let posting = postings
@@ -173,22 +179,31 @@ impl PostingStore for InMemoryStore {
             }
         }
         for id in ids {
-            postings
+            let posting = postings
                 .get_mut(id)
-                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?
-                .status = PostingStatus::PendingInactive;
+                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
+            posting.status = PostingStatus::PendingInactive;
+            posting.reservation = Some(reservation);
         }
         Ok(())
     }
 
-    async fn release_postings(&self, ids: &[PostingId]) -> Result<(), StoreError> {
+    async fn release_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: ReservationId,
+    ) -> Result<(), StoreError> {
         let mut postings = self.postings.write().await;
         for id in ids {
             let posting = postings
                 .get(id)
                 .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
-            if posting.status == PostingStatus::Inactive {
-                return Err(StoreError::PostingInactive(*id));
+            match posting.status {
+                PostingStatus::Inactive => return Err(StoreError::PostingInactive(*id)),
+                PostingStatus::PendingInactive if posting.reservation != Some(reservation) => {
+                    return Err(StoreError::ReservationMismatch(*id));
+                }
+                _ => {}
             }
         }
         for id in ids {
@@ -197,6 +212,7 @@ impl PostingStore for InMemoryStore {
                 .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
             if posting.status == PostingStatus::PendingInactive {
                 posting.status = PostingStatus::Active;
+                posting.reservation = None;
             }
         }
         Ok(())
@@ -209,9 +225,11 @@ impl PostingStore for InMemoryStore {
     ) -> Result<(), StoreError> {
         let mut postings = self.postings.write().await;
         for pid in deactivate {
-            if let Some(p) = postings.get_mut(pid) {
-                p.status = PostingStatus::Inactive;
-            }
+            let p = postings
+                .get_mut(pid)
+                .ok_or_else(|| StoreError::NotFound(format!("posting {pid:?}")))?;
+            p.status = PostingStatus::Inactive;
+            p.reservation = None;
         }
         for posting in create {
             postings.insert(posting.id, posting.clone());
@@ -241,8 +259,10 @@ impl TransferStore for InMemoryStore {
         &self,
         account: &AccountId,
     ) -> Result<Vec<EnvelopeRecord>, StoreError> {
-        let transfers = self.transfers.read().await;
+        // Lock order postings → transfers must match `commit_transfer` to avoid
+        // an AB–BA deadlock.
         let postings = self.postings.read().await;
+        let transfers = self.transfers.read().await;
         let mut result: Vec<EnvelopeRecord> = transfers
             .values()
             .filter(|record| {
@@ -348,3 +368,97 @@ impl BookStore for InMemoryStore {
         Ok(books.values().cloned().collect())
     }
 }
+
+// ---------------------------------------------------------------------------
+// CommitStore
+// ---------------------------------------------------------------------------
+
+#[async_trait]
+impl CommitStore for InMemoryStore {
+    async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError> {
+        // Lock order postings → transfers → events; every reader that takes more
+        // than one of these must follow the same order.
+        let mut postings = self.postings.write().await;
+        let mut transfers = self.transfers.write().await;
+        let mut events = self.events.write().await;
+
+        let tid = req.record.receipt.transfer_id;
+
+        // 1. Idempotency: a prior attempt already committed this transfer.
+        if transfers.contains_key(&tid) {
+            return Ok(());
+        }
+
+        // 2. CAS guards — recompute each balance (Σ non-Inactive postings) before
+        //    any mutation, matching how validation snapshotted it.
+        for (account, asset, expected) in req.cas_guards {
+            let balance = Cent::checked_sum(
+                postings
+                    .values()
+                    .filter(|p| {
+                        p.owner == *account
+                            && p.asset == *asset
+                            && p.status != PostingStatus::Inactive
+                    })
+                    .map(|p| p.value),
+            )
+            .map_err(|_| StoreError::Internal("balance overflow during cas".into()))?;
+            if balance != *expected {
+                return Err(StoreError::Conflict {
+                    account: *account,
+                    asset: *asset,
+                });
+            }
+        }
+
+        // 3. Authorize every consumed posting against the reservation.
+        for pid in req.deactivate {
+            let posting = postings
+                .get(pid)
+                .ok_or(StoreError::ReservationMismatch(*pid))?;
+            match req.reservation {
+                None => {
+                    if posting.status != PostingStatus::Active {
+                        return Err(StoreError::ReservationMismatch(*pid));
+                    }
+                }
+                Some(rid) => {
+                    if posting.status != PostingStatus::PendingInactive
+                        || posting.reservation != Some(rid)
+                    {
+                        return Err(StoreError::ReservationMismatch(*pid));
+                    }
+                }
+            }
+        }
+
+        // 4. Deactivate consumed postings.
+        for pid in req.deactivate {
+            let posting = postings
+                .get_mut(pid)
+                .ok_or(StoreError::ReservationMismatch(*pid))?;
+            posting.status = PostingStatus::Inactive;
+            posting.reservation = None;
+        }
+
+        // 5. Insert created postings.
+        for posting in req.create {
+            postings.insert(posting.id, posting.clone());
+        }
+
+        // 6. Persist the transfer record.
+        transfers.insert(tid, req.record);
+
+        // 7. Append events in-transaction, assigning sequence numbers.
+        for event in req.events {
+            let seq = self.autoid.next() as u64;
+            events.push(LedgerEvent {
+                seq,
+                timestamp: event.timestamp,
+                kind: event.kind.clone(),
+            });
+        }
+
+        Ok(())
+    }
+}

+ 69 - 14
crates/kuatia-storage/src/store.rs

@@ -1,19 +1,22 @@
 //! Storage abstraction separating the pure decision logic from IO.
 //!
-//! The [`Store`] trait composes four focused sub-traits:
+//! The [`Store`] trait composes six focused sub-traits:
 //! - [`AccountStore`] — account CRUD and versioning
 //! - [`PostingStore`] — posting reads and lifecycle transitions
 //! - [`TransferStore`] — transfer persistence and queries
 //! - [`SagaStore`] — saga state for crash recovery
+//! - [`EventStore`](crate::events::EventStore) — the ledger event log
+//! - [`BookStore`] — book persistence
+//! - [`CommitStore`] — the single atomic commit boundary
 
 use async_trait::async_trait;
 use kuatia_types::{
-    Account, AccountId, AssetId, Envelope, EnvelopeId, Book, BookId, Posting, PostingId,
-    PostingStatus, Receipt,
+    Account, AccountId, AssetId, Book, BookId, Cent, Envelope, EnvelopeId, Posting, PostingId,
+    PostingStatus, Receipt, ReservationId,
 };
 
 use crate::error::StoreError;
-use crate::events::EventStore;
+use crate::events::{EventStore, LedgerEvent};
 
 /// Pairs a committed transfer with its receipt.
 #[derive(Debug, Clone)]
@@ -26,6 +29,27 @@ pub struct EnvelopeRecord {
     pub created_at: i64,
 }
 
+/// Everything one atomic commit must persist together. Carries decomposed
+/// primitives (not `kuatia_core::Plan`) so this crate need not depend on the
+/// pure-core crate.
+pub struct CommitRequest<'a> {
+    /// Consumed postings to mark `Inactive`.
+    pub deactivate: &'a [PostingId],
+    /// New postings to insert (already `Active`, from the validated plan).
+    pub create: &'a [Posting],
+    /// `(account, asset, expected_balance)` guards to verify before mutating —
+    /// a mismatch means a concurrent transfer moved the balance ([`StoreError::Conflict`]).
+    pub cas_guards: &'a [(AccountId, AssetId, Cent)],
+    /// Reservation authorizing consumption of `deactivate`.
+    /// - `None` — raw path: the postings must be `Active`.
+    /// - `Some(rid)` — saga path: the postings must be `PendingInactive` owned by `rid`.
+    pub reservation: Option<ReservationId>,
+    /// The transfer record to persist.
+    pub record: EnvelopeRecord,
+    /// Events to append within the same transaction (e.g. `TransferCommitted`).
+    pub events: &'a [LedgerEvent],
+}
+
 /// Pagination and filtering parameters for posting queries.
 #[derive(Debug, Clone)]
 pub struct PostingQuery {
@@ -100,15 +124,24 @@ pub trait PostingStore: Send + Sync {
         asset: Option<&AssetId>,
         status: Option<PostingStatus>,
     ) -> Result<Vec<Posting>, StoreError>;
-    /// Reserve postings: Active → PendingInactive.
-    /// Atomic: if any posting is not Active, the entire batch fails.
-    async fn reserve_postings(&self, ids: &[PostingId]) -> Result<(), StoreError>;
-    /// Release postings back from reservation.
-    /// - PendingInactive → Active
+    /// Reserve postings: Active → PendingInactive, stamping `reservation` as the
+    /// owner token. Atomic: if any posting is not Active, the entire batch fails.
+    async fn reserve_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: ReservationId,
+    ) -> Result<(), StoreError>;
+    /// Release postings reserved under `reservation`, back from reservation.
+    /// - PendingInactive owned by `reservation` → Active (clears the owner)
+    /// - PendingInactive owned by a different reservation → fail ([`StoreError::ReservationMismatch`])
     /// - Active → no-op (already released)
     /// - Inactive → fail (void posting cannot be released)
-    /// Atomic: if any posting is Inactive, the entire batch fails.
-    async fn release_postings(&self, ids: &[PostingId]) -> Result<(), StoreError>;
+    /// Atomic: if any posting fails its check, the entire batch fails.
+    async fn release_postings(
+        &self,
+        ids: &[PostingId],
+        reservation: ReservationId,
+    ) -> Result<(), StoreError>;
     /// Deactivate postings and insert newly created postings.
     async fn finalize_postings(
         &self,
@@ -213,17 +246,39 @@ pub trait BookStore: Send + Sync {
     async fn list_books(&self) -> Result<Vec<Book>, StoreError>;
 }
 
+/// The single atomic commit boundary — the one place ledger state changes.
+#[async_trait]
+pub trait CommitStore: Send + Sync {
+    /// Apply a validated transfer atomically: enforce CAS guards, authorize and
+    /// deactivate consumed postings, insert created postings, persist the
+    /// transfer record (indexed by **both** created and consumed account owners),
+    /// and append the events — all in one critical section.
+    ///
+    /// Idempotent on the transfer id: if already committed, returns `Ok(())`.
+    /// Returns [`StoreError::Conflict`] (retryable) if a guard balance changed,
+    /// or [`StoreError::ReservationMismatch`] if a consumed posting is not owned
+    /// as `req.reservation` requires.
+    async fn commit_transfer(&self, req: CommitRequest<'_>) -> Result<(), StoreError>;
+}
+
 // ---------------------------------------------------------------------------
 // Composite trait
 // ---------------------------------------------------------------------------
 
 /// Async storage abstraction composing all sub-traits.
 pub trait Store:
-    AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore
+    AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore + CommitStore
 {
 }
 
-impl<T: AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore> Store
-    for T
+impl<
+    T: AccountStore
+        + PostingStore
+        + TransferStore
+        + SagaStore
+        + EventStore
+        + BookStore
+        + CommitStore,
+> Store for T
 {
 }

+ 196 - 15
crates/kuatia-storage/src/store_tests.rs

@@ -38,16 +38,15 @@ fn make_posting(
     asset: u32,
     value: i64,
 ) -> Posting {
-    Posting {
-        id: PostingId {
+    Posting::new(
+        PostingId {
             transfer: EnvelopeId(transfer_hash),
             index,
         },
-        owner: AccountId::new(owner),
-        asset: AssetId::new(asset),
-        value: Cent::from(value),
-        status: PostingStatus::Active,
-    }
+        AccountId::new(owner),
+        AssetId::new(asset),
+        Cent::from(value),
+    )
 }
 
 fn make_envelope_with_book(book: BookId) -> (Envelope, EnvelopeId) {
@@ -324,7 +323,7 @@ pub async fn reserve_postings_batch(store: &(impl Store + 'static)) {
         .await
         .unwrap();
 
-    store.reserve_postings(&[p1.id, p2.id]).await.unwrap();
+    store.reserve_postings(&[p1.id, p2.id], ReservationId::new(1)).await.unwrap();
 
     let got = store.get_postings(&[p1.id, p2.id]).await.unwrap();
     assert!(
@@ -342,9 +341,9 @@ pub async fn reserve_non_active_fails(store: &(impl Store + 'static)) {
         .await
         .unwrap();
 
-    store.reserve_postings(&[p1.id]).await.unwrap();
+    store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
-    let err = store.reserve_postings(&[p1.id, p2.id]).await.unwrap_err();
+    let err = store.reserve_postings(&[p1.id, p2.id], ReservationId::new(1)).await.unwrap_err();
     assert!(matches!(err, StoreError::PostingNotActive(_)));
 
     let got = store.get_postings(&[p2.id]).await.unwrap();
@@ -358,9 +357,9 @@ pub async fn release_postings_batch(store: &(impl Store + 'static)) {
         .finalize_postings(&[], std::slice::from_ref(&p1))
         .await
         .unwrap();
-    store.reserve_postings(&[p1.id]).await.unwrap();
+    store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
-    store.release_postings(&[p1.id]).await.unwrap();
+    store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
     let got = store.get_postings(&[p1.id]).await.unwrap();
     assert_eq!(got[0].status, PostingStatus::Active);
@@ -374,7 +373,7 @@ pub async fn release_active_is_noop(store: &(impl Store + 'static)) {
         .await
         .unwrap();
 
-    store.release_postings(&[p1.id]).await.unwrap();
+    store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
     let got = store.get_postings(&[p1.id]).await.unwrap();
     assert_eq!(got[0].status, PostingStatus::Active);
@@ -390,7 +389,7 @@ pub async fn release_inactive_fails(store: &(impl Store + 'static)) {
 
     store.finalize_postings(&[p1.id], &[]).await.unwrap();
 
-    let err = store.release_postings(&[p1.id]).await.unwrap_err();
+    let err = store.release_postings(&[p1.id], ReservationId::new(1)).await.unwrap_err();
     assert!(matches!(err, StoreError::PostingInactive(_)));
 }
 
@@ -401,7 +400,7 @@ pub async fn finalize_deactivates_postings(store: &(impl Store + 'static)) {
         .finalize_postings(&[], std::slice::from_ref(&p1))
         .await
         .unwrap();
-    store.reserve_postings(&[p1.id]).await.unwrap();
+    store.reserve_postings(&[p1.id], ReservationId::new(1)).await.unwrap();
 
     let p2 = make_posting([2; 32], 0, 1, 1, 100);
     store
@@ -417,6 +416,183 @@ pub async fn finalize_deactivates_postings(store: &(impl Store + 'static)) {
 }
 
 // ---------------------------------------------------------------------------
+// CommitStore tests
+// ---------------------------------------------------------------------------
+
+/// Build a transfer record that spends `consumed` (owned by account 1) entirely
+/// into a new posting owned by account 2, with the given transfer id.
+fn commit_record(tid: EnvelopeId, consumes: Vec<PostingId>) -> EnvelopeRecord {
+    let envelope = EnvelopeBuilder::new()
+        .consumes(consumes)
+        .creates(vec![NewPosting {
+            owner: AccountId::new(2),
+            asset: AssetId::new(1),
+            value: Cent::from(100),
+            payer: None,
+        }])
+        .build();
+    EnvelopeRecord {
+        envelope,
+        receipt: Receipt { transfer_id: tid },
+        created_at: 1000,
+    }
+}
+
+/// commit_transfer applies postings, transfer, account index (both sides), and
+/// events atomically; the consumed-only owner is indexed for history.
+pub async fn commit_transfer_atomic(store: &(impl Store + 'static)) {
+    let consumed = make_posting([7; 32], 0, 1, 1, 100); // owned by account 1
+    store
+        .finalize_postings(&[], std::slice::from_ref(&consumed))
+        .await
+        .unwrap();
+
+    let created = make_posting([8; 32], 0, 2, 1, 100); // owned by account 2
+    let tid = EnvelopeId([8; 32]);
+    let events = [LedgerEvent {
+        seq: 0,
+        timestamp: 1000,
+        kind: LedgerEventKind::TransferCommitted { transfer_id: tid },
+    }];
+    store
+        .commit_transfer(CommitRequest {
+            deactivate: &[consumed.id],
+            create: std::slice::from_ref(&created),
+            cas_guards: &[],
+            reservation: None,
+            record: commit_record(tid, vec![consumed.id]),
+            events: &events,
+        })
+        .await
+        .unwrap();
+
+    // Consumed posting is now void; created posting exists and is active.
+    assert_eq!(
+        store.get_postings(&[consumed.id]).await.unwrap()[0].status,
+        PostingStatus::Inactive
+    );
+    assert_eq!(
+        store.get_postings(&[created.id]).await.unwrap()[0].status,
+        PostingStatus::Active
+    );
+
+    // Transfer record is retrievable.
+    assert!(store.get_transfer(&tid).await.unwrap().is_some());
+
+    // History indexes BOTH the created owner (2) and the consumed-only owner (1).
+    assert_eq!(
+        store
+            .get_transfers_for_account(&AccountId::new(2))
+            .await
+            .unwrap()
+            .len(),
+        1
+    );
+    assert_eq!(
+        store
+            .get_transfers_for_account(&AccountId::new(1))
+            .await
+            .unwrap()
+            .len(),
+        1
+    );
+
+    // The event was appended in the same commit.
+    assert_eq!(store.get_events_since(0, 10).await.unwrap().len(), 1);
+}
+
+/// A second commit of the same transfer id is a no-op (idempotent).
+pub async fn commit_transfer_idempotent(store: &(impl Store + 'static)) {
+    let consumed = make_posting([7; 32], 0, 1, 1, 100);
+    store
+        .finalize_postings(&[], std::slice::from_ref(&consumed))
+        .await
+        .unwrap();
+    let created = make_posting([8; 32], 0, 2, 1, 100);
+    let tid = EnvelopeId([8; 32]);
+    store
+        .commit_transfer(CommitRequest {
+            deactivate: &[],
+            create: std::slice::from_ref(&created),
+            cas_guards: &[],
+            reservation: None,
+            record: commit_record(tid, vec![]),
+            events: &[],
+        })
+        .await
+        .unwrap();
+    // Second commit returns Ok without inserting a duplicate posting/event.
+    store
+        .commit_transfer(CommitRequest {
+            deactivate: &[],
+            create: std::slice::from_ref(&created),
+            cas_guards: &[],
+            reservation: None,
+            record: commit_record(tid, vec![]),
+            events: &[],
+        })
+        .await
+        .unwrap();
+    assert!(store.get_events_since(0, 10).await.unwrap().is_empty());
+}
+
+/// commit_transfer rejects consuming a posting reserved by a different saga.
+pub async fn commit_transfer_reservation_mismatch(store: &(impl Store + 'static)) {
+    let consumed = make_posting([7; 32], 0, 1, 1, 100);
+    store
+        .finalize_postings(&[], std::slice::from_ref(&consumed))
+        .await
+        .unwrap();
+    // Reserved under reservation 1.
+    store
+        .reserve_postings(&[consumed.id], ReservationId::new(1))
+        .await
+        .unwrap();
+
+    let created = make_posting([8; 32], 0, 2, 1, 100);
+    let tid = EnvelopeId([8; 32]);
+    // Committing under reservation 2 must fail.
+    let err = store
+        .commit_transfer(CommitRequest {
+            deactivate: &[consumed.id],
+            create: std::slice::from_ref(&created),
+            cas_guards: &[],
+            reservation: Some(ReservationId::new(2)),
+            record: commit_record(tid, vec![consumed.id]),
+            events: &[],
+        })
+        .await
+        .unwrap_err();
+    assert!(matches!(err, StoreError::ReservationMismatch(_)));
+}
+
+/// commit_transfer aborts with Conflict when a CAS guard's balance is stale.
+pub async fn commit_transfer_cas_conflict(store: &(impl Store + 'static)) {
+    let consumed = make_posting([7; 32], 0, 1, 1, 100);
+    store
+        .finalize_postings(&[], std::slice::from_ref(&consumed))
+        .await
+        .unwrap();
+    let created = make_posting([8; 32], 0, 2, 1, 100);
+    let tid = EnvelopeId([8; 32]);
+    // Guard claims account 1 holds 50, but it actually holds 100.
+    let err = store
+        .commit_transfer(CommitRequest {
+            deactivate: &[consumed.id],
+            create: std::slice::from_ref(&created),
+            cas_guards: &[(AccountId::new(1), AssetId::new(1), Cent::from(50))],
+            reservation: None,
+            record: commit_record(tid, vec![consumed.id]),
+            events: &[],
+        })
+        .await
+        .unwrap_err();
+    assert!(matches!(err, StoreError::Conflict { .. }));
+    // The transfer was not committed.
+    assert!(store.get_transfer(&tid).await.unwrap().is_none());
+}
+
+// ---------------------------------------------------------------------------
 // TransferStore tests
 // ---------------------------------------------------------------------------
 
@@ -752,6 +928,11 @@ macro_rules! store_tests {
             release_active_is_noop,
             release_inactive_fails,
             finalize_deactivates_postings,
+            // CommitStore
+            commit_transfer_atomic,
+            commit_transfer_idempotent,
+            commit_transfer_reservation_mismatch,
+            commit_transfer_cas_conflict,
             // TransferStore
             store_and_get_transfer,
             get_missing_transfer,

+ 14 - 3
crates/kuatia-types/src/autoid.rs

@@ -10,9 +10,15 @@
 //! ```
 //!
 //! - Bit 63: always 0 (keeps i64 positive)
-//! - Bits 62–23: Unix milliseconds (40 bits ≈ 34.8 years from epoch)
+//! - Bits 62–23: milliseconds since [`KUATIA_EPOCH_MS`] (40 bits ≈ 34.8 years)
 //! - Bits 22–0: lower 23 bits of CRC32 of context data, or an internal
 //!   counter that wraps on overflow when no data is provided.
+//!
+//! The millisecond field counts from a fixed recent epoch
+//! ([`KUATIA_EPOCH_MS`] = 2026-01-01T00:00:00Z) rather than the Unix epoch, so
+//! the 40-bit window gives ~34.8 years of range *going forward* (until ~2060)
+//! instead of a window already partly elapsed since 1970. Collision resistance
+//! within a millisecond comes from the CRC32 tail (for content-keyed ids).
 
 use std::sync::atomic::{AtomicU32, Ordering};
 use std::time::{SystemTime, UNIX_EPOCH};
@@ -21,6 +27,10 @@ const TIMESTAMP_BITS: u32 = 40;
 const TAIL_BITS: u32 = 23;
 const TAIL_MASK: u32 = (1 << TAIL_BITS) - 1;
 
+/// Custom epoch for the timestamp field: 2026-01-01T00:00:00Z in Unix
+/// milliseconds. Ids generated before this instant clamp to 0.
+pub const KUATIA_EPOCH_MS: u64 = 1_767_225_600_000;
+
 /// Snowflake-style ID generator.
 ///
 /// Each generator holds an internal counter used when no CRC32 data is
@@ -63,10 +73,11 @@ impl AutoId {
     }
 
     fn now_ms() -> u64 {
-        SystemTime::now()
+        let unix_ms = SystemTime::now()
             .duration_since(UNIX_EPOCH)
             .unwrap_or_default()
-            .as_millis() as u64
+            .as_millis() as u64;
+        unix_ms.saturating_sub(KUATIA_EPOCH_MS)
     }
 
     fn pack(ms: u64, tail: u32) -> i64 {

+ 62 - 5
crates/kuatia-types/src/lib.rs

@@ -421,8 +421,29 @@ impl fmt::Debug for BookId {
     }
 }
 
+/// The implicit book used when a transfer does not name one. Fixed so that two
+/// otherwise-identical transfers hash to the same [`EnvelopeId`] — a random
+/// default would break content-addressed idempotency.
+pub const DEFAULT_BOOK: BookId = BookId(0);
+
 impl Default for BookId {
+    /// Deterministic: returns [`DEFAULT_BOOK`]. Use [`BookId::generate`] to mint
+    /// a fresh unique id for a real book.
     fn default() -> Self {
+        DEFAULT_BOOK
+    }
+}
+
+impl BookId {
+    /// Create a `BookId` from an `i64`.
+    pub const fn new(id: i64) -> Self {
+        Self(id)
+    }
+
+    /// Mint a fresh, process-unique book id. Unlike [`Default`], this is not
+    /// stable across calls — use it when creating a new [`Book`], never for the
+    /// implicit book of a transfer.
+    pub fn generate() -> Self {
         thread_local! {
             static GEN: crate::autoid::AutoId = crate::autoid::AutoId::new();
         }
@@ -430,13 +451,33 @@ impl Default for BookId {
     }
 }
 
-impl BookId {
-    /// Create a `BookId` from an `i64`.
+/// Identifies a reservation — the owner token stamped on a posting while it is
+/// `PendingInactive`, so only the saga that reserved it may finalize or release it.
+#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
+pub struct ReservationId(pub i64);
+
+impl fmt::Debug for ReservationId {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "ReservationId({})", self.0)
+    }
+}
+
+impl ReservationId {
+    /// Create a `ReservationId` from an `i64`.
     pub const fn new(id: i64) -> Self {
         Self(id)
     }
 }
 
+impl Default for ReservationId {
+    fn default() -> Self {
+        thread_local! {
+            static GEN: crate::autoid::AutoId = crate::autoid::AutoId::new();
+        }
+        GEN.with(|g| Self(g.next()))
+    }
+}
+
 // ---------------------------------------------------------------------------
 // Book
 // ---------------------------------------------------------------------------
@@ -476,7 +517,7 @@ impl BookBuilder {
     pub fn new(name: impl Into<String>) -> Self {
         Self {
             book: Book {
-                id: BookId::default(),
+                id: BookId::generate(),
                 name: name.into(),
                 policy: BookPolicy {
                     allowed_assets: Vec::new(),
@@ -549,8 +590,8 @@ pub enum PostingStatus {
 /// A signed amount of one asset, owned by exactly one account.
 ///
 /// A positive posting is value controlled by the account; a negative posting is
-/// an offset position (issuance, external flow, or system balancing), only
-/// allowed on `SystemAccount` or `ExternalAccount`.
+/// an offset position (issuance, external flow, overdraft, or system balancing).
+/// Negative postings are allowed on every policy except `NoOverdraft`.
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct Posting {
     /// Unique identifier derived from the creating transfer.
@@ -563,9 +604,25 @@ pub struct Posting {
     pub value: Cent,
     /// Lifecycle state — only `Active` postings count toward balance.
     pub status: PostingStatus,
+    /// Owner token while `PendingInactive`. `Some(rid)` iff reserved by saga
+    /// `rid`; `None` when `Active` or `Inactive`. Only the holder of a matching
+    /// `ReservationId` may finalize or release a reserved posting.
+    pub reservation: Option<ReservationId>,
 }
 
 impl Posting {
+    /// Construct an `Active`, unreserved posting.
+    pub fn new(id: PostingId, owner: AccountId, asset: AssetId, value: Cent) -> Self {
+        Self {
+            id,
+            owner,
+            asset,
+            value,
+            status: PostingStatus::Active,
+            reservation: None,
+        }
+    }
+
     /// Returns `true` if this posting's status is [`PostingStatus::Active`].
     pub fn is_active(&self) -> bool {
         self.status == PostingStatus::Active

+ 4 - 1
crates/kuatia/src/error.rs

@@ -4,7 +4,7 @@
 //! and from storage, so callers get a single error type from every API.
 
 use kuatia_core::{
-    AccountId, EnvelopeId, OverflowError, PostingId, SelectionError, ValidationError,
+    AccountId, BookId, EnvelopeId, OverflowError, PostingId, SelectionError, ValidationError,
 };
 use kuatia_storage::error::StoreError;
 
@@ -27,6 +27,8 @@ pub enum LedgerError {
     AccountNotEmpty(AccountId),
     /// The account is already closed.
     AccountAlreadyClosed(AccountId),
+    /// A transfer named a book that does not exist.
+    BookNotFound(BookId),
     /// Monetary arithmetic overflow.
     Overflow,
     /// A saga step failed and its compensation also failed.
@@ -49,6 +51,7 @@ impl std::fmt::Display for LedgerError {
             Self::AccountNotFound(id) => write!(f, "account not found: {id:?}"),
             Self::AccountNotEmpty(id) => write!(f, "account not empty: {id:?}"),
             Self::AccountAlreadyClosed(id) => write!(f, "account already closed: {id:?}"),
+            Self::BookNotFound(id) => write!(f, "book not found: {id:?}"),
             Self::Overflow => write!(f, "monetary amount overflow"),
             Self::CompensationFailed {
                 original,

+ 92 - 40
crates/kuatia/src/ledger.rs

@@ -7,9 +7,9 @@ use legend::{ExecutionResult, legend};
 use tracing::instrument;
 
 use kuatia_core::{
-    AccountId, AccountSnapshotId, AssetId, Cent, Envelope, EnvelopeBuilder, EnvelopeId, NewPosting,
-    PlanInput, Posting, PostingId, PostingStatus, Receipt, Transfer, account_snapshot_id,
-    envelope_id, select_postings, validate_and_plan,
+    AccountId, AccountPolicy, AccountSnapshotId, AssetId, Book, Cent, DEFAULT_BOOK, Envelope,
+    EnvelopeBuilder, EnvelopeId, NewPosting, PlanInput, Posting, PostingId, PostingStatus, Receipt,
+    SelectionError, Transfer, account_snapshot_id, envelope_id, select_postings, validate_and_plan,
 };
 
 use crate::error::LedgerError;
@@ -25,8 +25,9 @@ use crate::saga::{
     FinalizeInput, FinalizeTransferStep, LedgerCtx, ReserveInput, ReservePostingsStep,
     ResolveInput, ResolveStep, SagaError, ValidateInput, ValidateTransferStep,
 };
+use kuatia_storage::error::StoreError;
 use kuatia_storage::events::{LedgerEvent, LedgerEventKind};
-use kuatia_storage::store::{EnvelopeRecord, Store};
+use kuatia_storage::store::{CommitRequest, EnvelopeRecord, Store};
 
 #[allow(missing_docs)]
 mod transfer_saga {
@@ -99,10 +100,21 @@ impl Ledger {
             balances.insert((*account_id, *asset_id), bal);
         }
 
+        // Load the gating book. A missing named (non-default) book is an error;
+        // a missing default book means "unrestricted" (no policy to enforce).
+        let book_id = envelope.book();
+        let book = match self.store.get_book(&book_id).await {
+            Ok(b) => Some(b),
+            Err(StoreError::NotFound(_)) if book_id == DEFAULT_BOOK => None,
+            Err(StoreError::NotFound(_)) => return Err(LedgerError::BookNotFound(book_id)),
+            Err(e) => return Err(e.into()),
+        };
+
         Ok(LoadedState {
             consumed_postings,
             accounts,
             balances,
+            book,
         })
     }
 
@@ -117,6 +129,7 @@ impl Ledger {
             consumed_postings: &loaded.consumed_postings,
             accounts: &loaded.accounts,
             balances: &loaded.balances,
+            book: loaded.book.as_ref(),
         };
         Ok(validate_and_plan(input)?)
     }
@@ -128,32 +141,35 @@ impl Ledger {
         envelope: &Envelope,
         plan: &kuatia_core::Plan,
     ) -> Result<Receipt, LedgerError> {
-        self.store
-            .finalize_postings(&plan.postings_to_deactivate, &plan.postings_to_create)
-            .await?;
-
         let receipt = Receipt {
             transfer_id: plan.transfer_id,
         };
 
-        self.store
-            .store_transfer(EnvelopeRecord {
-                envelope: envelope.clone(),
-                receipt: receipt.clone(),
-                created_at: now_millis()?,
-            })
-            .await?;
+        // Raw path: consumed postings are Active (never reserved), so pass
+        // `reservation: None`. Postings, transfer record, account index, and
+        // event commit atomically.
+        let events = [LedgerEvent {
+            seq: 0,
+            timestamp: now_millis()?,
+            kind: LedgerEventKind::TransferCommitted {
+                transfer_id: receipt.transfer_id,
+            },
+        }];
 
-        let _ = self
-            .store
-            .append_event(&LedgerEvent {
-                seq: 0,
-                timestamp: now_millis()?,
-                kind: LedgerEventKind::TransferCommitted {
-                    transfer_id: receipt.transfer_id,
+        self.store
+            .commit_transfer(CommitRequest {
+                deactivate: &plan.postings_to_deactivate,
+                create: &plan.postings_to_create,
+                cas_guards: &plan.cas_guards,
+                reservation: None,
+                record: EnvelopeRecord {
+                    envelope: envelope.clone(),
+                    receipt: receipt.clone(),
+                    created_at: now_millis()?,
                 },
+                events: &events,
             })
-            .await;
+            .await?;
 
         Ok(receipt)
     }
@@ -195,24 +211,58 @@ impl Ledger {
                 .store
                 .get_postings_by_account(account, Some(asset), Some(PostingStatus::Active))
                 .await?;
-            let selected = select_postings(&available, *asset, *net_debit)?;
-
-            let consumed_sum = Cent::checked_sum(
-                available
-                    .iter()
-                    .filter(|p| selected.contains(&p.id))
-                    .map(|p| p.value),
+            let total_positive = Cent::checked_sum(
+                available.iter().filter(|p| p.value.is_positive()).map(|p| p.value),
             )?;
-            let change = consumed_sum.checked_sub(*net_debit)?;
-
-            consumes.extend_from_slice(&selected);
-            if change.is_positive() {
-                creates.push(NewPosting {
-                    owner: *account,
-                    asset: *asset,
-                    value: change,
-                    payer: None,
-                });
+
+            if total_positive >= *net_debit {
+                // Enough positive postings: select a subset and compute change.
+                let selected = select_postings(&available, *asset, *net_debit)?;
+                let consumed_sum = Cent::checked_sum(
+                    available
+                        .iter()
+                        .filter(|p| selected.contains(&p.id))
+                        .map(|p| p.value),
+                )?;
+                let change = consumed_sum.checked_sub(*net_debit)?;
+
+                consumes.extend_from_slice(&selected);
+                if change.is_positive() {
+                    creates.push(NewPosting {
+                        owner: *account,
+                        asset: *asset,
+                        value: change,
+                        payer: None,
+                    });
+                }
+            } else {
+                // Not enough positive postings. Overdraft accounts cover the
+                // shortfall with a negative posting (an offset position); the
+                // floor is enforced later in validation. Any other policy fails.
+                let policy = self.store.get_account(account).await?.policy;
+                match policy {
+                    AccountPolicy::CappedOverdraft { .. } | AccountPolicy::UncappedOverdraft => {
+                        let positives: Vec<PostingId> = available
+                            .iter()
+                            .filter(|p| p.value.is_positive())
+                            .map(|p| p.id)
+                            .collect();
+                        consumes.extend_from_slice(&positives);
+                        let shortfall = net_debit.checked_sub(total_positive)?;
+                        creates.push(NewPosting {
+                            owner: *account,
+                            asset: *asset,
+                            value: shortfall.checked_neg()?,
+                            payer: None,
+                        });
+                    }
+                    _ => {
+                        return Err(LedgerError::Selection(SelectionError::InsufficientFunds {
+                            available: total_positive,
+                            requested: *net_debit,
+                        }));
+                    }
+                }
             }
         }
 
@@ -600,4 +650,6 @@ pub struct LoadedState {
     pub accounts: HashMap<AccountId, kuatia_core::Account>,
     /// Current balances for all referenced (account, asset) pairs.
     pub balances: HashMap<(AccountId, AssetId), Cent>,
+    /// The book gating this transfer, if one is loaded (`None` = unrestricted default).
+    pub book: Option<Book>,
 }

+ 32 - 24
crates/kuatia/src/saga.rs

@@ -30,14 +30,14 @@ use serde::{Deserialize, Serialize};
 use tracing::Instrument;
 
 use kuatia_core::{
-    AccountId, AssetId, Cent, Envelope, Plan, PlanInput, PostingId, Receipt, Transfer,
-    TransferBuilder, validate_and_plan,
+    AccountId, AssetId, Cent, Envelope, Plan, PlanInput, PostingId, Receipt, ReservationId,
+    Transfer, TransferBuilder, validate_and_plan,
 };
 
 use crate::error::LedgerError;
 use crate::ledger::{Ledger, now_millis};
 use kuatia_storage::events::{LedgerEvent, LedgerEventKind};
-use kuatia_storage::store::EnvelopeRecord;
+use kuatia_storage::store::{CommitRequest, EnvelopeRecord};
 
 // ---------------------------------------------------------------------------
 // Saga error -- serializable + cloneable wrapper
@@ -85,6 +85,9 @@ pub struct LedgerCtx {
     pub plan: Option<Plan>,
     /// Resolved envelope produced by the resolve step.
     pub envelope: Option<Envelope>,
+    /// Reservation owner token for this saga's reserved postings. Serialized so
+    /// it survives pause/recovery, keeping ownership stable across restarts.
+    pub reservation: ReservationId,
     #[serde(skip)]
     ledger: Option<Arc<Ledger>>,
 }
@@ -109,6 +112,7 @@ impl LedgerCtx {
             reserved_postings: Vec::new(),
             plan: None,
             envelope: None,
+            reservation: ReservationId::default(),
             ledger: Some(ledger),
         }
     }
@@ -207,7 +211,7 @@ impl Step<LedgerCtx, SagaError> for ReservePostingsStep {
 
             ctx.ledger()?
                 .store()
-                .reserve_postings(&posting_ids)
+                .reserve_postings(&posting_ids, ctx.reservation)
                 .await
                 .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
             ctx.reserved_postings.extend_from_slice(&posting_ids);
@@ -223,7 +227,7 @@ impl Step<LedgerCtx, SagaError> for ReservePostingsStep {
     ) -> Result<CompensationOutcome, SagaError> {
         ctx.ledger()?
             .store()
-            .release_postings(&ctx.reserved_postings)
+            .release_postings(&ctx.reserved_postings, ctx.reservation)
             .await
             .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
         ctx.reserved_postings.clear();
@@ -270,6 +274,7 @@ impl Step<LedgerCtx, SagaError> for ValidateTransferStep {
                 consumed_postings: &loaded.consumed_postings,
                 accounts: &loaded.accounts,
                 balances: &loaded.balances,
+                book: loaded.book.as_ref(),
             };
 
             let plan =
@@ -322,33 +327,36 @@ impl Step<LedgerCtx, SagaError> for FinalizeTransferStep {
 
             let store = ctx.ledger()?.store();
 
-            store
-                .finalize_postings(&plan.postings_to_deactivate, &plan.postings_to_create)
-                .await
-                .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
-
             let receipt = Receipt {
                 transfer_id: plan.transfer_id,
             };
+
+            // Saga path: our postings are PendingInactive under `ctx.reservation`.
+            // Postings, transfer record, account index, and event commit atomically.
+            let events = [LedgerEvent {
+                seq: 0,
+                timestamp: now_millis().map_err(SagaError::from)?,
+                kind: LedgerEventKind::TransferCommitted {
+                    transfer_id: receipt.transfer_id,
+                },
+            }];
+
             store
-                .store_transfer(EnvelopeRecord {
-                    envelope: envelope.clone(),
-                    receipt: receipt.clone(),
-                    created_at: now_millis().map_err(SagaError::from)?,
+                .commit_transfer(CommitRequest {
+                    deactivate: &plan.postings_to_deactivate,
+                    create: &plan.postings_to_create,
+                    cas_guards: &plan.cas_guards,
+                    reservation: Some(ctx.reservation),
+                    record: EnvelopeRecord {
+                        envelope: envelope.clone(),
+                        receipt: receipt.clone(),
+                        created_at: now_millis().map_err(SagaError::from)?,
+                    },
+                    events: &events,
                 })
                 .await
                 .map_err(|e| SagaError::from(LedgerError::Store(e)))?;
 
-            let _ = store
-                .append_event(&LedgerEvent {
-                    seq: 0,
-                    timestamp: now_millis().map_err(SagaError::from)?,
-                    kind: LedgerEventKind::TransferCommitted {
-                        transfer_id: receipt.transfer_id,
-                    },
-                })
-                .await;
-
             ctx.receipts.push(receipt);
             ctx.reserved_postings.clear();
             Ok(StepOutcome::Continue)

+ 126 - 0
crates/kuatia/tests/integration.rs

@@ -713,3 +713,129 @@ async fn account_hash_changes_with_version() {
     let h2 = kuatia_core::account_hash(&acc);
     assert_ne!(h1, h2);
 }
+
+// ---------------------------------------------------------------------------
+// Overdraft via negative postings
+// ---------------------------------------------------------------------------
+
+#[tokio::test]
+async fn capped_overdraft_creates_negative_posting() {
+    let store = InMemoryStore::new();
+    let ledger = Arc::new(Ledger::new(store));
+    for (id, policy) in [
+        (10, AccountPolicy::CappedOverdraft { floor: Cent::from(-200) }),
+        (2, AccountPolicy::NoOverdraft),
+        (99, AccountPolicy::ExternalAccount),
+    ] {
+        ledger.store().create_account(make_account(id, policy)).await.unwrap();
+    }
+
+    // Fund account 10 with 50, then pay 100 — overdraft covers the 50 shortfall.
+    deposit(&ledger, account(10), usd(), Cent::from(50), external()).await;
+    pay(&ledger, account(10), account(2), usd(), Cent::from(100)).await;
+
+    assert_eq!(ledger.balance(&account(10), &usd()).await.unwrap(), Cent::from(-50));
+    assert_eq!(ledger.balance(&account(2), &usd()).await.unwrap(), Cent::from(100));
+
+    // A negative posting now backs the overdraft.
+    let postings = ledger
+        .store()
+        .get_postings_by_account(&account(10), Some(&usd()), Some(PostingStatus::Active))
+        .await
+        .unwrap();
+    assert!(postings.iter().any(|p| p.value == Cent::from(-50)));
+}
+
+#[tokio::test]
+async fn capped_overdraft_respects_floor() {
+    let store = InMemoryStore::new();
+    let ledger = Arc::new(Ledger::new(store));
+    for (id, policy) in [
+        (10, AccountPolicy::CappedOverdraft { floor: Cent::from(-80) }),
+        (2, AccountPolicy::NoOverdraft),
+        (99, AccountPolicy::ExternalAccount),
+    ] {
+        ledger.store().create_account(make_account(id, policy)).await.unwrap();
+    }
+
+    // Paying 100 from an empty account would project to -100, below the -80 floor.
+    let transfer = TransferBuilder::new()
+        .pay(account(10), account(2), usd(), Cent::from(100))
+        .build();
+    assert!(ledger.commit(transfer).await.is_err());
+    assert_eq!(ledger.balance(&account(10), &usd()).await.unwrap(), Cent::ZERO);
+}
+
+#[tokio::test]
+async fn uncapped_overdraft_allows_arbitrary_negative() {
+    let store = InMemoryStore::new();
+    let ledger = Arc::new(Ledger::new(store));
+    for (id, policy) in [
+        (10, AccountPolicy::UncappedOverdraft),
+        (2, AccountPolicy::NoOverdraft),
+        (99, AccountPolicy::ExternalAccount),
+    ] {
+        ledger.store().create_account(make_account(id, policy)).await.unwrap();
+    }
+
+    pay(&ledger, account(10), account(2), usd(), Cent::from(1_000_000)).await;
+    assert_eq!(
+        ledger.balance(&account(10), &usd()).await.unwrap(),
+        Cent::from(-1_000_000)
+    );
+}
+
+// ---------------------------------------------------------------------------
+// Book policy enforcement
+// ---------------------------------------------------------------------------
+
+#[tokio::test]
+async fn book_policy_rejects_disallowed_asset() {
+    let ledger = setup_ledger().await;
+    // Book 5 permits only EUR.
+    let book = BookBuilder::new("eur-only")
+        .id(BookId::new(5))
+        .allow_asset(eur())
+        .build();
+    ledger.store().create_book(book).await.unwrap();
+
+    deposit(&ledger, account(1), usd(), Cent::from(100), external()).await;
+
+    // Paying USD under a EUR-only book is rejected, balance unchanged.
+    let transfer = TransferBuilder::new()
+        .book(BookId::new(5))
+        .pay(account(1), account(2), usd(), Cent::from(50))
+        .build();
+    assert!(ledger.commit(transfer).await.is_err());
+    assert_eq!(ledger.balance(&account(1), &usd()).await.unwrap(), Cent::from(100));
+}
+
+#[tokio::test]
+async fn transfer_in_missing_named_book_is_rejected() {
+    let ledger = setup_ledger().await;
+    deposit(&ledger, account(1), usd(), Cent::from(100), external()).await;
+
+    let transfer = TransferBuilder::new()
+        .book(BookId::new(404))
+        .pay(account(1), account(2), usd(), Cent::from(50))
+        .build();
+    assert!(ledger.commit(transfer).await.is_err());
+    assert_eq!(ledger.balance(&account(1), &usd()).await.unwrap(), Cent::from(100));
+}
+
+// ---------------------------------------------------------------------------
+// Content-addressed determinism
+// ---------------------------------------------------------------------------
+
+#[tokio::test]
+async fn identical_transfers_share_envelope_id() {
+    // Two independently-built default-book transfers must hash identically.
+    let a = TransferBuilder::new()
+        .pay(account(1), account(2), usd(), Cent::from(10))
+        .build();
+    let b = TransferBuilder::new()
+        .pay(account(1), account(2), usd(), Cent::from(10))
+        .build();
+    assert_eq!(a.book, b.book, "default book must be deterministic");
+    assert_eq!(a.book, DEFAULT_BOOK);
+}

+ 6 - 6
doc/accounts.md

@@ -2,7 +2,7 @@
 
 ## Overview
 
-An account is a versioned entity that owns postings. Balance is never stored — it is always computed as the sum of active postings for a given (account, asset) pair.
+An account is a versioned entity that owns postings. Balance is never stored — it is always computed from postings for a given (account, asset) pair. The **ledger balance** sums non-`Inactive` postings (`Active + PendingInactive`); the **available balance** sums only `Active` postings (excluding those reserved for an in-flight transfer). `balance()` returns the ledger balance.
 
 ## Structure
 
@@ -23,14 +23,14 @@ Each account has a policy that controls what balance constraints apply:
 | Policy | Balance floor | Negative postings | CAS guard |
 |--------|--------------|-------------------|-----------|
 | `NoOverdraft` | `>= 0` | No | No |
-| `CappedOverdraft { floor }` | `>= floor` | No | Yes |
-| `UncappedOverdraft` | None | No | No |
+| `CappedOverdraft { floor }` | `>= floor` | Yes (down to floor) | Yes |
+| `UncappedOverdraft` | None | Yes (unbounded) | No |
 | `SystemAccount` | None | Yes | No |
 | `ExternalAccount` | None | Yes | No |
 
-Only `SystemAccount` and `ExternalAccount` may hold negative postings (offset positions). Validation rejects any transfer that would create a negative posting on another account type.
+An overdraft is represented as a **negative posting** (an offset position) assigned to the account to cover a shortfall. When an account's positive postings are insufficient for a debit, the resolve step consumes them all and creates a negative posting for the remainder. `NoOverdraft` accounts forbid this; validation rejects any transfer that would create a negative posting on a `NoOverdraft` account. `CappedOverdraft`'s floor bounds how negative the balance may go; `UncappedOverdraft`, `SystemAccount`, and `ExternalAccount` are unbounded.
 
-`CappedOverdraft` accounts emit CAS (Compare-And-Swap) guards during validation to prevent write-skew — two concurrent transfers could each pass validation independently but together push the balance below the floor.
+`CappedOverdraft` accounts emit CAS (Compare-And-Swap) guards during validation to prevent write-skew — two concurrent transfers could each pass validation independently but together push the balance below the floor. The guards are enforced atomically inside `commit_transfer` (the commit aborts with a retryable conflict if a guarded balance changed since validation).
 
 ## Lifecycle
 
@@ -101,4 +101,4 @@ Boundary accounts representing the outside world (banks, payment processors). Th
 
 ### Credit accounts (`CappedOverdraft`)
 
-Accounts with a negative floor (e.g. credit lines). The floor is the maximum allowed overdraft. Write-skew prevention via CAS guards ensures concurrent transfers respect the floor.
+Accounts with a negative floor (e.g. credit lines). The floor is the maximum allowed overdraft. When the account's positive postings are insufficient for a debit, a negative posting is created to cover the shortfall, down to the floor. Write-skew prevention via CAS guards (enforced inside `commit_transfer`) ensures concurrent transfers respect the floor.

+ 21 - 9
doc/architecture.md

@@ -4,7 +4,7 @@
 
 Value is stored as **postings** — signed amounts of a single asset owned by exactly one account. A positive posting is value controlled by the account; a negative posting is an offset position (issuance, external flow, or system balancing).
 
-Account balance = sum of active postings for that (account, asset) pair. There is no mutable balance field to drift out of sync.
+Account balance = sum of non-`Inactive` postings (`Active + PendingInactive`) for that (account, asset) pair. There is no mutable balance field to drift out of sync.
 
 Consumed postings are marked inactive but never deleted, preserving a full audit trail.
 
@@ -38,7 +38,7 @@ This separation ensures the auditable heart of the system is fully deterministic
 
 ## Store Sub-Trait Architecture
 
-The `Store` trait is a composite of five focused sub-traits, each responsible for a single domain:
+The `Store` trait is a composite of seven focused sub-traits, each responsible for a single domain:
 
 ```mermaid
 classDiagram
@@ -53,8 +53,8 @@ classDiagram
     class PostingStore {
         +get_postings(ids)
         +get_postings_by_account(account, asset?, status?)
-        +reserve_postings(ids)
-        +release_postings(ids)
+        +reserve_postings(ids, reservation)
+        +release_postings(ids, reservation)
         +finalize_postings(deactivate, create)
     }
     class TransferStore {
@@ -72,6 +72,14 @@ classDiagram
         +append_event(event)
         +get_events_since(after_seq, limit)
     }
+    class BookStore {
+        +create_book(book)
+        +get_book(id)
+        +list_books()
+    }
+    class CommitStore {
+        +commit_transfer(req)
+    }
     class Store {
         <<composite>>
     }
@@ -80,8 +88,12 @@ classDiagram
     Store --|> TransferStore
     Store --|> SagaStore
     Store --|> EventStore
+    Store --|> BookStore
+    Store --|> CommitStore
 ```
 
+`CommitStore::commit_transfer` is the single atomic commit boundary — it applies posting deactivations/creations, the transfer record, the both-sided account index, and events in one transaction, enforcing `CappedOverdraft` CAS guards and reservation ownership.
+
 The store only persists and reads — all domain logic (balance computation, validation, policy enforcement) lives in the Ledger and `kuatia-core`.
 
 ## Saga Commit Pipeline
@@ -164,7 +176,7 @@ All domain types implement deterministic binary serialization (`ToBytes` trait)
 
 ## Append-Only Account Versioning
 
-Accounts are never modified in place. Each mutation (freeze, unfreeze, close, balance change) appends a new snapshot with an incremented `version` field (starts at 1 on creation).
+Accounts are never modified in place. Each account mutation (freeze, unfreeze, close, or a policy/flags change) appends a new snapshot with an incremented `version` field (starts at 1 on creation). Note that transfers do **not** bump account versions — balances are derived from postings, not stored on the account.
 
 The store enforces that each new version is exactly `current + 1`, preventing gaps or overwrites. The full version history is queryable via `account_history()`.
 
@@ -189,18 +201,18 @@ Each account has a policy controlling its balance floor and whether it may hold
 | Policy | Balance floor | Negative postings | CAS guard |
 |--------|--------------|-------------------|-----------|
 | `NoOverdraft` | `>= 0` | No | No |
-| `CappedOverdraft { floor }` | `>= floor` | No | Yes |
-| `UncappedOverdraft` | None | No | No |
+| `CappedOverdraft { floor }` | `>= floor` | Yes (down to floor) | Yes |
+| `UncappedOverdraft` | None | Yes (unbounded) | No |
 | `SystemAccount` | None | Yes | No |
 | `ExternalAccount` | None | Yes | No |
 
-Only `SystemAccount` and `ExternalAccount` may receive negative postings (offset positions). Validation rejects any transfer that would create a negative posting on another account type.
+An overdraft is a **negative posting** assigned to the account to cover a shortfall. Only `NoOverdraft` forbids negative postings; validation rejects a negative posting on a `NoOverdraft` account. `CappedOverdraft`'s floor (enforced in validation, with concurrency protected by CAS guards) bounds the negative balance; the other policies are unbounded.
 
 ## CAS (Compare-And-Swap) Guards for CappedOverdraft
 
 `CappedOverdraft` accounts have a balance floor that is not backed by the UTXO model alone — two concurrent transfers could each pass validation but together push the balance below the floor (write-skew).
 
-The validation phase emits `cas_guards: Vec<(AccountId, AssetId, Cent)>` for these accounts. The saga pipeline handles isolation via the reserve step (Active → PendingInactive), which prevents concurrent transfers from consuming the same postings.
+The validation phase emits `cas_guards: Vec<(AccountId, AssetId, Cent)>` for these accounts. They are enforced atomically inside `commit_transfer`: before mutating any state it recomputes each guarded balance and aborts with a retryable `Conflict` if it changed since validation. The saga pipeline additionally isolates the consumed postings via the reserve step (Active → PendingInactive), stamping each reserved posting with a `ReservationId` so only the reserving saga can finalize or release it.
 
 Other policies do not need CAS guards: `NoOverdraft` is fully UTXO-backed (you can only spend postings you own), and unconstrained policies have no floor to violate.
 

+ 23 - 13
doc/crates.md

@@ -26,7 +26,8 @@ Pure, sans-IO (Input/Output) decision logic. No async runtime, near-zero depende
 | `OverflowError` | Returned when a `Cent` operation would overflow or underflow |
 | `PostingStatus` | Posting lifecycle: `Active`, `PendingInactive`, `Inactive` |
 | `Amount` | Parser/formatter for decimal strings. Not stored — use at API boundaries only |
-| `Posting` | Signed amount of one asset owned by one account. Has `status: PostingStatus` |
+| `Posting` | Signed amount of one asset owned by one account. Has `status: PostingStatus` and `reservation: Option<ReservationId>` (owner token while `PendingInactive`) |
+| `ReservationId` | Owner token stamped on a reserved posting so only the reserving saga may finalize/release it |
 | `NewPosting` | Posting to be created (no id yet — assigned during validation) |
 | `Transfer` | Atomic unit: consumes postings + creates postings + metadata |
 | `EnvelopeBuilder` | Fluent builder for `Transfer` construction |
@@ -36,7 +37,7 @@ Pure, sans-IO (Input/Output) decision logic. No async runtime, near-zero depende
 | `UserData` | Fixed 28 bytes (u128 + u64 + u32) for correlation IDs, external refs |
 | `Metadata` | `BTreeMap<String, Vec<u8>>` for free-form key-value data |
 | `Receipt` | Confirmation of a committed transfer (contains `transfer_id`) |
-| `AutoId` | Snowflake-inspired i64 ID generator — `[0][40-bit ms][23-bit CRC32 or counter]`. Lives in `kuatia-types::autoid` |
+| `AutoId` | Snowflake-inspired i64 ID generator — `[0][40-bit ms][23-bit CRC32 or counter]`. The ms field counts from `KUATIA_EPOCH_MS` (2026-01-01T00:00:00Z), giving ~34.8 years forward. Lives in `kuatia-types::autoid` |
 
 ### Validation Invariants
 
@@ -49,9 +50,10 @@ graph TD
     C --> D[4. Posting active or reserved]
     D --> E[5. Account existence & lifecycle]
     E --> F[6. Snapshot pinning]
-    F --> G[7. Per-asset conservation]
-    G --> H[8. Negative posting restriction]
-    H --> J[9. Policy enforcement]
+    F --> BP[7. Book policy]
+    BP --> G[8. Per-asset conservation]
+    G --> H[9. Negative posting restriction]
+    H --> J[10. Policy enforcement]
     J --> I[Plan]
     style I fill:#e8f5e9
 ```
@@ -62,9 +64,10 @@ graph TD
 4. **Posting active or reserved** — consumed postings must be `Active` or `PendingInactive` (prevents double-spend)
 5. **Account existence & lifecycle** — all referenced accounts exist, not frozen, not closed
 6. **Snapshot pinning** — account snapshots (if provided) must match current state
-7. **Per-asset conservation** — `sum(consumed) == sum(created)` for each asset
-8. **Negative posting restriction** — negative postings only allowed on `SystemAccount` or `ExternalAccount`
-9. **Policy enforcement** — projected balance satisfies account's floor
+7. **Book policy** — when a book is loaded, referenced assets/accounts/flags must be allowed by the book
+8. **Per-asset conservation** — `sum(consumed) == sum(created)` for each asset
+9. **Negative posting restriction** — negative postings forbidden only on `NoOverdraft` (allowed on overdraft/system/external)
+10. **Policy enforcement** — projected balance satisfies account's floor
 
 Output is a `Plan` containing `transfer_id`, `postings_to_deactivate`, `postings_to_create`, and `cas_guards` (Compare-And-Swap guards for concurrency safety).
 
@@ -79,7 +82,7 @@ Async resource layer. Depends on `kuatia-core`, `tokio`, `async-trait`, `serde`,
 | Module | Purpose |
 |--------|---------|
 | `kuatia` | `Ledger` — primary API (non-generic, uses `Arc<dyn Store>`), saga commit pipeline, intent layer |
-| `store` | `Store` composite trait + sub-traits (`AccountStore`, `PostingStore`, `TransferStore`, `SagaStore`) |
+| `store` | `Store` composite trait + sub-traits (`AccountStore`, `PostingStore`, `TransferStore`, `SagaStore`, `EventStore`, `BookStore`, `CommitStore`) |
 | `error` | `StoreError`, `LedgerError` — unified error hierarchy |
 | `mem_store` | `InMemoryStore` — in-memory `Store` implementation for tests |
 | `saga` | Pipeline steps (reserve, validate, finalize) + high-level legend step adapters |
@@ -94,7 +97,7 @@ Driven by a `TransferSaga` defined via `legend!` — four steps with automatic r
 graph LR
     A[resolve] -->|Envelope| B[reserve_postings]
     B -->|batch Active→PendingInactive| C[validate_and_plan]
-    C -->|Plan| D[finalize + store + emit event]
+    C -->|Plan| D[commit_transfer atomically]
     D --> E[Receipt]
     style E fill:#e8f5e9
 ```
@@ -159,7 +162,7 @@ Transfers are built via `TransferBuilder` and committed with `ledger.commit(tran
 
 ### Store Trait
 
-The `Store` trait is a composite of five focused sub-traits:
+The `Store` trait is a composite of seven focused sub-traits:
 
 ```mermaid
 graph TB
@@ -168,13 +171,17 @@ graph TB
     Store --> TransferStore
     Store --> SagaStore
     Store --> EventStore
+    Store --> BookStore
+    Store --> CommitStore
 ```
 
 - **`AccountStore`**: `get_account`, `get_accounts`, `create_account`, `append_account_version`, `get_account_history`, `list_accounts`
-- **`PostingStore`**: `get_postings`, `get_postings_by_account(account, asset?, status?)`, `query_postings(query)`, `reserve_postings`, `release_postings`, `finalize_postings`
+- **`PostingStore`**: `get_postings`, `get_postings_by_account(account, asset?, status?)`, `query_postings(query)`, `reserve_postings(ids, reservation)`, `release_postings(ids, reservation)`, `finalize_postings`
 - **`TransferStore`**: `get_transfer`, `store_transfer`, `get_transfers_for_account`, `query_transfers`
 - **`EventStore`**: `append_event`, `get_events_since`
 - **`SagaStore`**: `save_saga`, `list_pending_sagas`, `delete_saga`
+- **`BookStore`**: `create_book`, `get_book`, `list_books`
+- **`CommitStore`**: `commit_transfer(req)` — the single atomic commit boundary. It applies posting deactivations/creations, the transfer record, the both-sided account index, and events in one critical section, enforcing `CappedOverdraft` CAS guards and reservation ownership. `reserve_postings`/`release_postings`/`finalize_postings` remain as lower-level primitives; `commit_transfer` is the production commit path.
 
 #### Batch posting operations
 
@@ -212,6 +219,7 @@ LedgerError
 ├── AccountNotFound
 ├── AccountNotEmpty              // can't close with active postings
 ├── AccountAlreadyClosed
+├── BookNotFound                 // transfer named a book that does not exist
 ├── Overflow                     // monetary arithmetic overflow
 └── CompensationFailed           // saga compensation failed (original + compensation errors)
 ```
@@ -223,7 +231,9 @@ StoreError
 ├── VersionConflict { account, expected, actual }
 ├── Internal(String)
 ├── PostingNotActive(PostingId)   // reserve_postings: posting not Active
-└── PostingInactive(PostingId)    // release_postings: posting is void
+├── PostingInactive(PostingId)    // release_postings: posting is void
+├── Conflict { account, asset }   // commit_transfer: CAS guard balance changed (retryable)
+└── ReservationMismatch(PostingId) // posting reserved by a different saga
 ```
 
 ### Saga Steps

+ 3 - 3
doc/glossary.md

@@ -9,9 +9,9 @@
 A signed amount of one asset owned by one account. The fundamental unit of value in the ledger. Postings are immutable once created — consumed postings are marked `Inactive` but never deleted.
 
 - **Positive posting**: value controlled by the account.
-- **Negative posting**: an offset position — only allowed for accounts whose policy permits issuance, external flow, or system balancing (`SystemAccount`, `ExternalAccount`).
+- **Negative posting**: an offset position — allowed on any policy except `NoOverdraft`. It represents issuance, external flow, system balancing (`SystemAccount`, `ExternalAccount`), or an overdraft (`CappedOverdraft`/`UncappedOverdraft`).
 
-Lifecycle: `Active` → `PendingInactive` (reserved by saga) → `Inactive` (consumed).
+Lifecycle: `Active` → `PendingInactive` (reserved by a saga, stamped with its `ReservationId`) → `Inactive` (consumed). **Ledger balance** sums `Active + PendingInactive` postings; **available balance** sums only `Active` (postings reserved for an in-flight transfer are not available to spend).
 
 ### Account
 
@@ -55,7 +55,7 @@ For every transfer, for each asset: `sum(consumed) == sum(created)`. This is the
 
 ### AutoId
 
-Snowflake-inspired `i64` identifier: `[0 sign bit][40-bit ms timestamp][23-bit counter or CRC32]`. Generated in Rust — the database never assigns IDs.
+Snowflake-inspired `i64` identifier: `[0 sign bit][40-bit ms timestamp][23-bit counter or CRC32]`. The timestamp counts milliseconds since `KUATIA_EPOCH_MS` (2026-01-01T00:00:00Z), giving ~34.8 years of range going forward. Generated in Rust — the database never assigns IDs.
 
 ---
 

+ 10 - 6
doc/transfers.md

@@ -109,9 +109,10 @@ For each movement:
 
 For each `(account, asset)` pair where net debit > 0:
 1. Query active postings for that account and asset
-2. Run greedy largest-first selection to cover the net debit
-3. Compute change = selected sum - net debit
-4. If change > 0, create a change posting returning the remainder to the account
+2. If positive postings cover the net debit: run greedy largest-first selection, compute change = selected sum − net debit, and (if change > 0) create a change posting returning the remainder to the account.
+3. If positive postings are **insufficient**:
+   - For `CappedOverdraft` / `UncappedOverdraft` accounts: consume all positive postings and create a **negative posting** for the shortfall (`net_debit − total_positive`). The `CappedOverdraft` floor is enforced later in validation.
+   - For any other policy: fail with `InsufficientFunds`.
 
 Pairs with net debit <= 0 (e.g. the external account in a deposit) are skipped — no posting selection needed.
 
@@ -187,8 +188,11 @@ Every envelope passes through `validate_and_plan()` before being applied. The va
 4. All consumed postings are Active or PendingInactive
 5. All referenced accounts exist, not frozen, not closed
 6. Account snapshot pinning (if provided)
-7. Per-asset conservation: `sum(consumed) == sum(created)`
-8. Negative postings only on SystemAccount or ExternalAccount
-9. Policy enforcement: projected balance satisfies account floor
+7. Book policy (if a book is loaded): referenced assets/accounts/flags allowed by the book
+8. Per-asset conservation: `sum(consumed) == sum(created)`
+9. Negative postings forbidden only on `NoOverdraft` accounts (allowed on overdraft/system/external)
+10. Policy enforcement: projected balance satisfies account floor
+
+After validation, the effects are applied through a single atomic `commit_transfer` (postings, transfer record, account index, and events commit together or not at all), which also enforces the `CappedOverdraft` CAS guards.
 
 See [architecture.md](architecture.md) for details on each check.