瀏覽代碼

Add inflight holds: authorize, confirm, partially confirm, void

Callers need to reserve funds for a multi-leg trade without settling it, then
confirm it fully or in parts, or void it. The ledger is append-only with
derived balances, so a durable hold has to be real committed state, not a lock
or a transient reservation.

Model an inflight transaction as the ordinary trade with every destination
rewritten to a fresh per-destination NoOverdraft holding account flagged
INFLIGHT. Committing that rewritten transfer parks the funds. Confirm and void
are ordinary commits from the holds to their destinations or back to the
funders recorded in the authorize transfer's leg table. State is derived from
balances and the metadata role tags on the settling transfers, so nothing new
is persisted: no store, no migration, and resolve is unchanged.
Over-confirmation is blocked by the NoOverdraft hold, and concurrent
confirmations serialize on the shared holding posting.

The inflight facts live in an inflight metadata namespace on the holding
accounts and the transfers, keeping the whole lifecycle self-describing.
Cesar Rodas 11 小時之前
父節點
當前提交
64ed89456c

+ 4 - 1
crates/kuatia-types/src/lib.rs

@@ -587,7 +587,10 @@ bitflags::bitflags! {
         const FROZEN = 1 << 0;
         /// Terminal — no further activity.
         const CLOSED = 1 << 1;
-        // Bits 2–7: reserved for future system flags.
+        /// Holding account for an inflight (authorize/confirm/void) transaction.
+        /// Parks funds between authorize and settlement; closed once drained.
+        const INFLIGHT = 1 << 2;
+        // Bits 3–7: reserved for future system flags.
         // Bits 8–31: user-defined.
         /// User-defined flag 0.
         const USER_0 = 1 << 8;

+ 33 - 1
crates/kuatia/src/error.rs

@@ -4,7 +4,8 @@
 //! and from storage, so callers get a single error type from every API.
 
 use kuatia_core::{
-    AccountId, BookId, EnvelopeId, OverflowError, PostingId, SelectionError, ValidationError,
+    AccountId, AssetId, BookId, EnvelopeId, OverflowError, PostingId, SelectionError,
+    ValidationError,
 };
 use kuatia_storage::error::StoreError;
 
@@ -29,6 +30,23 @@ pub enum LedgerError {
     AccountAlreadyClosed(AccountId),
     /// A transfer named a book that does not exist.
     BookNotFound(BookId),
+    /// The referenced inflight transaction does not exist (no authorize record).
+    InflightNotFound(EnvelopeId),
+    /// The referenced transfer is not an inflight authorize, or its metadata is
+    /// malformed.
+    NotInflightTransaction(EnvelopeId),
+    /// The destination already has an open inflight hold; only one is allowed at
+    /// a time per account.
+    InflightAlreadyOpen(AccountId),
+    /// The inflight transaction has no leg matching this destination and asset.
+    InflightLegNotFound {
+        /// The destination account with no matching leg.
+        destination: AccountId,
+        /// The asset with no matching leg.
+        asset: AssetId,
+    },
+    /// An inflight movement must move between two distinct accounts.
+    InflightSelfMovement(AccountId),
     /// Monetary arithmetic overflow.
     Overflow,
     /// A saga step failed and its compensation also failed.
@@ -52,6 +70,20 @@ impl std::fmt::Display for LedgerError {
             Self::AccountNotEmpty(id) => write!(f, "account not empty: {id:?}"),
             Self::AccountAlreadyClosed(id) => write!(f, "account already closed: {id:?}"),
             Self::BookNotFound(id) => write!(f, "book not found: {id:?}"),
+            Self::InflightNotFound(id) => write!(f, "inflight transaction not found: {id:?}"),
+            Self::NotInflightTransaction(id) => {
+                write!(f, "not an inflight authorize transaction: {id:?}")
+            }
+            Self::InflightAlreadyOpen(id) => {
+                write!(f, "account already has an open inflight hold: {id:?}")
+            }
+            Self::InflightLegNotFound { destination, asset } => write!(
+                f,
+                "inflight leg not found for destination {destination:?} asset {asset:?}"
+            ),
+            Self::InflightSelfMovement(id) => {
+                write!(f, "inflight movement must have distinct from/to: {id:?}")
+            }
             Self::Overflow => write!(f, "monetary amount overflow"),
             Self::CompensationFailed {
                 original,

+ 598 - 0
crates/kuatia/src/inflight.rs

@@ -0,0 +1,598 @@
+//! Inflight holds: authorize funds now, confirm (fully or partially) or void
+//! later.
+//!
+//! An inflight transaction is an ordinary trade whose every destination is
+//! rewritten to a fresh per-destination holding account (`NoOverdraft`, flagged
+//! [`AccountFlags::INFLIGHT`]). Committing that rewritten transfer parks the
+//! funds. Confirm and void are ordinary commits that move a hold's balance to
+//! its destination or back to its funder. Nothing new is stored: the authorize
+//! transfer's metadata carries the leg table, and every artifact is tagged in an
+//! `inflight.` metadata namespace so the lifecycle is read, not inferred.
+//!
+//! See `doc/adr/0004-inflight-holds-via-holding-accounts.md`.
+
+use std::collections::{BTreeMap, BTreeSet};
+use std::sync::Arc;
+
+use kuatia_core::{
+    Account, AccountFlags, AccountId, AccountPolicy, AssetId, BookId, Cent, EnvelopeId, Metadata,
+    Receipt, SelectionError, Transfer, TransferBuilder,
+};
+use kuatia_storage::store::EnvelopeRecord;
+use kuatia_types::PostingStatus;
+
+use crate::error::LedgerError;
+use crate::ledger::Ledger;
+
+// Metadata keys (all under the `inflight.` namespace).
+const K_ROLE: &str = "inflight.role";
+const K_TX: &str = "inflight.tx";
+const K_DEST: &str = "inflight.destination";
+const K_LEGS: &str = "inflight.legs";
+
+// `inflight.role` values.
+const ROLE_AUTHORIZE: u8 = 0;
+const ROLE_HOLD: u8 = 1;
+const ROLE_CONFIRM: u8 = 2;
+const ROLE_VOID: u8 = 3;
+
+const LEG_BYTES: usize = 8 + 8 + 8 + 4 + 16; // destination, hold, funder, asset, amount
+
+/// One leg of an inflight transaction: an amount of an asset funded by `funder`,
+/// parked in `hold`, destined for `destination`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct InflightLeg {
+    /// Account the funds settle to on confirm.
+    pub destination: AccountId,
+    /// Per-destination holding account parking the funds.
+    pub hold: AccountId,
+    /// Account that funded this leg (the funds return here on void).
+    pub funder: AccountId,
+    /// Asset being held.
+    pub asset: AssetId,
+    /// Amount authorized for this leg.
+    pub amount: Cent,
+}
+
+/// Result of [`Ledger::authorize`].
+#[derive(Debug, Clone)]
+pub struct Authorization {
+    /// Handle for the inflight transaction: the authorize transfer's id.
+    pub inflight: EnvelopeId,
+    /// Receipt of the authorize commit.
+    pub receipt: Receipt,
+    /// The legs, one per original movement.
+    pub legs: Vec<InflightLeg>,
+}
+
+/// Lifecycle state of an inflight transaction, derived from balances and the
+/// settling transfers.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum InflightState {
+    /// Nothing settled yet; the full authorized amount is still held.
+    Held,
+    /// Some funds settled, some still held.
+    PartiallyConfirmed,
+    /// Fully settled to destinations.
+    Confirmed,
+    /// Fully returned to funders.
+    Voided,
+    /// Fully settled, but a mix of confirmed and voided legs.
+    Mixed,
+}
+
+/// Per-(destination, asset) status line.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct InflightLegStatus {
+    /// Destination account.
+    pub destination: AccountId,
+    /// Holding account.
+    pub hold: AccountId,
+    /// Asset.
+    pub asset: AssetId,
+    /// Amount originally authorized.
+    pub authorized: Cent,
+    /// Amount confirmed to the destination so far.
+    pub confirmed: Cent,
+    /// Amount returned to funders so far.
+    pub voided: Cent,
+    /// Amount still held (`= authorized - confirmed - voided`).
+    pub held: Cent,
+}
+
+/// Derived status of an inflight transaction.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct InflightStatus {
+    /// The inflight handle.
+    pub inflight: EnvelopeId,
+    /// One entry per (destination, asset).
+    pub legs: Vec<InflightLegStatus>,
+    /// Overall state.
+    pub state: InflightState,
+}
+
+// ---------------------------------------------------------------------------
+// Metadata encoding
+// ---------------------------------------------------------------------------
+
+fn encode_legs(legs: &[InflightLeg]) -> Vec<u8> {
+    let mut buf = Vec::with_capacity(4 + legs.len() * LEG_BYTES);
+    buf.extend_from_slice(&(legs.len() as u32).to_be_bytes());
+    for l in legs {
+        buf.extend_from_slice(&l.destination.0.to_be_bytes());
+        buf.extend_from_slice(&l.hold.0.to_be_bytes());
+        buf.extend_from_slice(&l.funder.0.to_be_bytes());
+        buf.extend_from_slice(&l.asset.0.to_be_bytes());
+        buf.extend_from_slice(&l.amount.to_canonical_bytes());
+    }
+    buf
+}
+
+fn malformed(tid: EnvelopeId) -> LedgerError {
+    LedgerError::NotInflightTransaction(tid)
+}
+
+fn read_i64(bytes: &[u8], off: usize, tid: EnvelopeId) -> Result<i64, LedgerError> {
+    let slice: [u8; 8] = bytes
+        .get(off..off + 8)
+        .and_then(|s| s.try_into().ok())
+        .ok_or_else(|| malformed(tid))?;
+    Ok(i64::from_be_bytes(slice))
+}
+
+fn decode_legs(bytes: &[u8], tid: EnvelopeId) -> Result<Vec<InflightLeg>, LedgerError> {
+    let count_slice: [u8; 4] = bytes
+        .get(0..4)
+        .and_then(|s| s.try_into().ok())
+        .ok_or_else(|| malformed(tid))?;
+    let n = u32::from_be_bytes(count_slice) as usize;
+    let mut out = Vec::with_capacity(n);
+    let mut off = 4;
+    for _ in 0..n {
+        let destination = AccountId::new(read_i64(bytes, off, tid)?);
+        let hold = AccountId::new(read_i64(bytes, off + 8, tid)?);
+        let funder = AccountId::new(read_i64(bytes, off + 16, tid)?);
+        let asset_slice: [u8; 4] = bytes
+            .get(off + 24..off + 28)
+            .and_then(|s| s.try_into().ok())
+            .ok_or_else(|| malformed(tid))?;
+        let amount_slice: [u8; 16] = bytes
+            .get(off + 28..off + 44)
+            .and_then(|s| s.try_into().ok())
+            .ok_or_else(|| malformed(tid))?;
+        out.push(InflightLeg {
+            destination,
+            hold,
+            funder,
+            asset: AssetId::new(u32::from_be_bytes(asset_slice)),
+            amount: Cent::from_canonical_bytes(&amount_slice)?,
+        });
+        off += LEG_BYTES;
+    }
+    Ok(out)
+}
+
+fn hold_metadata(destination: AccountId) -> Metadata {
+    let mut m = Metadata::new();
+    m.insert(K_ROLE.to_string(), vec![ROLE_HOLD]);
+    m.insert(K_DEST.to_string(), destination.0.to_be_bytes().to_vec());
+    m
+}
+
+fn authorize_metadata(base: &Metadata, legs: &[InflightLeg]) -> Metadata {
+    let mut m = base.clone();
+    m.insert(K_ROLE.to_string(), vec![ROLE_AUTHORIZE]);
+    m.insert(K_LEGS.to_string(), encode_legs(legs));
+    m
+}
+
+fn settle_metadata(role: u8, tx: EnvelopeId, destination: AccountId) -> Metadata {
+    let mut m = Metadata::new();
+    m.insert(K_ROLE.to_string(), vec![role]);
+    m.insert(K_TX.to_string(), tx.0.to_vec());
+    m.insert(K_DEST.to_string(), destination.0.to_be_bytes().to_vec());
+    m
+}
+
+fn role_of(meta: &Metadata) -> Option<u8> {
+    meta.get(K_ROLE).and_then(|v| v.first().copied())
+}
+
+impl Ledger {
+    // -----------------------------------------------------------------------
+    // Authorize
+    // -----------------------------------------------------------------------
+
+    /// Authorize a trade without settling it. Each movement's destination is
+    /// rewritten to a fresh per-destination holding account, and the rewritten
+    /// transfer is committed, parking the funds. Returns a handle used by
+    /// [`confirm_all`](Self::confirm_all), [`confirm`](Self::confirm), and
+    /// [`void`](Self::void).
+    ///
+    /// Every movement must move between two distinct accounts. A destination
+    /// that already has an open inflight hold is rejected.
+    pub async fn authorize(
+        self: &Arc<Self>,
+        transfer: Transfer,
+    ) -> Result<Authorization, LedgerError> {
+        // One holding account per distinct destination.
+        let mut dest_to_hold: BTreeMap<AccountId, AccountId> = BTreeMap::new();
+        for m in &transfer.movements {
+            if m.from == m.to {
+                return Err(LedgerError::InflightSelfMovement(m.from));
+            }
+            dest_to_hold.entry(m.to).or_default();
+        }
+
+        // Enforce one open inflight per destination, then create the holds.
+        for (dest, hold) in &dest_to_hold {
+            if self.open_inflight_hold_for(dest).await?.is_some() {
+                return Err(LedgerError::InflightAlreadyOpen(*dest));
+            }
+            let mut acct = Account::new(*hold, AccountPolicy::NoOverdraft);
+            acct.flags = AccountFlags::INFLIGHT;
+            acct.book = transfer.book;
+            acct.metadata = hold_metadata(*dest);
+            self.create_account(acct).await?;
+        }
+
+        // Rewrite each movement to its hold and record the leg table.
+        let mut legs = Vec::with_capacity(transfer.movements.len());
+        let mut builder = TransferBuilder::new()
+            .book(transfer.book)
+            .user_data(transfer.user_data.clone());
+        for m in &transfer.movements {
+            let hold = dest_to_hold[&m.to];
+            legs.push(InflightLeg {
+                destination: m.to,
+                hold,
+                funder: m.from,
+                asset: m.asset,
+                amount: m.amount,
+            });
+            builder = builder.movement(m.from, hold, m.asset, m.amount);
+        }
+        let rewritten = builder
+            .metadata(authorize_metadata(&transfer.metadata, &legs))
+            .build();
+
+        let receipt = self.commit(rewritten).await?;
+        Ok(Authorization {
+            inflight: receipt.transfer_id,
+            receipt,
+            legs,
+        })
+    }
+
+    // -----------------------------------------------------------------------
+    // Confirm
+    // -----------------------------------------------------------------------
+
+    /// Confirm the entire inflight transaction: sweep every hold's remaining
+    /// balance to its destination and close the drained holds.
+    pub async fn confirm_all(
+        self: &Arc<Self>,
+        inflight: &EnvelopeId,
+    ) -> Result<Vec<Receipt>, LedgerError> {
+        let (record, legs) = self.load_inflight(inflight).await?;
+        let book = record.envelope.book();
+        let mut receipts = Vec::new();
+        for hold in holds_of(&legs) {
+            let dest = destination_of(&legs, hold, *inflight)?;
+            for asset in assets_of(&legs, hold) {
+                let bal = self.balance(&hold, &asset).await?;
+                if bal.is_positive() {
+                    receipts.push(
+                        self.settle(book, *inflight, hold, dest, dest, asset, bal, ROLE_CONFIRM)
+                            .await?,
+                    );
+                }
+            }
+            self.close_if_drained(&hold).await?;
+        }
+        Ok(receipts)
+    }
+
+    /// Confirm a slice of one leg: move `amount` of `asset` from the destination's
+    /// hold to the destination. `amount` must not exceed the amount still held;
+    /// the `NoOverdraft` hold makes over-confirmation impossible regardless.
+    pub async fn confirm(
+        self: &Arc<Self>,
+        inflight: &EnvelopeId,
+        destination: &AccountId,
+        asset: &AssetId,
+        amount: Cent,
+    ) -> Result<Receipt, LedgerError> {
+        let (record, legs) = self.load_inflight(inflight).await?;
+        let leg = legs
+            .iter()
+            .find(|l| &l.destination == destination && &l.asset == asset)
+            .ok_or(LedgerError::InflightLegNotFound {
+                destination: *destination,
+                asset: *asset,
+            })?;
+        let hold = leg.hold;
+        let held = self.balance(&hold, asset).await?;
+        if amount > held {
+            return Err(LedgerError::Selection(SelectionError::InsufficientFunds {
+                available: held,
+                requested: amount,
+            }));
+        }
+        let receipt = self
+            .settle(
+                record.envelope.book(),
+                *inflight,
+                hold,
+                *destination,
+                *destination,
+                *asset,
+                amount,
+                ROLE_CONFIRM,
+            )
+            .await?;
+        self.close_if_drained(&hold).await?;
+        Ok(receipt)
+    }
+
+    // -----------------------------------------------------------------------
+    // Void
+    // -----------------------------------------------------------------------
+
+    /// Void the entire inflight transaction: return every hold's remaining
+    /// balance to the funders recorded in the leg table and close the holds.
+    pub async fn void(
+        self: &Arc<Self>,
+        inflight: &EnvelopeId,
+    ) -> Result<Vec<Receipt>, LedgerError> {
+        let (record, legs) = self.load_inflight(inflight).await?;
+        let book = record.envelope.book();
+        let mut receipts = Vec::new();
+        for hold in holds_of(&legs) {
+            let dest = destination_of(&legs, hold, *inflight)?;
+            for asset in assets_of(&legs, hold) {
+                let mut remaining = self.balance(&hold, &asset).await?;
+                // Return to funders in leg order, each up to what it funded. For
+                // the common single-funder-per-(hold, asset) case this returns the
+                // whole remaining balance to that funder.
+                let mut funders: Vec<(AccountId, Cent)> = legs
+                    .iter()
+                    .filter(|l| l.hold == hold && l.asset == asset)
+                    .map(|l| (l.funder, l.amount))
+                    .collect();
+                // Ensure any co-funding rounding leftover lands on the last funder.
+                if let Some(last) = funders.last_mut() {
+                    last.1 = Cent::from(i64::MAX);
+                }
+                for (funder, cap) in funders {
+                    if !remaining.is_positive() {
+                        break;
+                    }
+                    let give = if cap < remaining { cap } else { remaining };
+                    if give.is_positive() {
+                        receipts.push(
+                            self.settle(
+                                book, *inflight, hold, funder, dest, asset, give, ROLE_VOID,
+                            )
+                            .await?,
+                        );
+                        remaining = remaining.checked_sub(give)?;
+                    }
+                }
+            }
+            self.close_if_drained(&hold).await?;
+        }
+        Ok(receipts)
+    }
+
+    // -----------------------------------------------------------------------
+    // Status / queries
+    // -----------------------------------------------------------------------
+
+    /// Derived status of an inflight transaction: per-leg authorized, confirmed,
+    /// voided, and still-held amounts, plus an overall state. All figures come
+    /// from balances and the metadata-tagged settling transfers.
+    pub async fn inflight_status(
+        &self,
+        inflight: &EnvelopeId,
+    ) -> Result<InflightStatus, LedgerError> {
+        let (_record, legs) = self.load_inflight(inflight).await?;
+
+        // Authorized per (hold, asset).
+        let mut authorized: BTreeMap<(AccountId, AssetId), Cent> = BTreeMap::new();
+        for l in &legs {
+            let e = authorized.entry((l.hold, l.asset)).or_insert(Cent::ZERO);
+            *e = e.checked_add(l.amount)?;
+        }
+
+        // Confirmed / voided per (hold, asset), summed from settle transfers.
+        let mut confirmed: BTreeMap<(AccountId, AssetId), Cent> = BTreeMap::new();
+        let mut voided: BTreeMap<(AccountId, AssetId), Cent> = BTreeMap::new();
+        for hold in holds_of(&legs) {
+            for record in self.history(&hold).await? {
+                let role = match role_of(record.envelope.metadata()) {
+                    Some(r @ (ROLE_CONFIRM | ROLE_VOID)) => r,
+                    _ => continue,
+                };
+                for np in record.envelope.creates() {
+                    if np.owner == hold {
+                        continue; // change returned to the hold, not settled out
+                    }
+                    let bucket = if role == ROLE_CONFIRM {
+                        &mut confirmed
+                    } else {
+                        &mut voided
+                    };
+                    let e = bucket.entry((hold, np.asset)).or_insert(Cent::ZERO);
+                    *e = e.checked_add(np.value)?;
+                }
+            }
+        }
+
+        let mut lines = Vec::new();
+        for ((hold, asset), auth) in &authorized {
+            let held = self.balance(hold, asset).await?;
+            let dest = destination_of(&legs, *hold, *inflight)?;
+            lines.push(InflightLegStatus {
+                destination: dest,
+                hold: *hold,
+                asset: *asset,
+                authorized: *auth,
+                confirmed: confirmed
+                    .get(&(*hold, *asset))
+                    .copied()
+                    .unwrap_or(Cent::ZERO),
+                voided: voided.get(&(*hold, *asset)).copied().unwrap_or(Cent::ZERO),
+                held,
+            });
+        }
+
+        let state = overall_state(&lines);
+        Ok(InflightStatus {
+            inflight: *inflight,
+            legs: lines,
+            state,
+        })
+    }
+
+    /// List the holding accounts of every currently open inflight (an
+    /// `INFLIGHT`-flagged account that is not closed).
+    pub async fn list_open_inflights(&self) -> Result<Vec<AccountId>, LedgerError> {
+        Ok(self
+            .list_accounts()
+            .await?
+            .into_iter()
+            .filter(|a| a.flags.contains(AccountFlags::INFLIGHT) && !a.is_closed())
+            .map(|a| a.id)
+            .collect())
+    }
+
+    // -----------------------------------------------------------------------
+    // Internal helpers
+    // -----------------------------------------------------------------------
+
+    /// Load the authorize transfer and decode its leg table.
+    async fn load_inflight(
+        &self,
+        inflight: &EnvelopeId,
+    ) -> Result<(EnvelopeRecord, Vec<InflightLeg>), LedgerError> {
+        let record = self
+            .store()
+            .get_transfer(inflight)
+            .await?
+            .ok_or(LedgerError::InflightNotFound(*inflight))?;
+        if role_of(record.envelope.metadata()) != Some(ROLE_AUTHORIZE) {
+            return Err(LedgerError::NotInflightTransaction(*inflight));
+        }
+        let bytes = record
+            .envelope
+            .metadata()
+            .get(K_LEGS)
+            .ok_or_else(|| malformed(*inflight))?;
+        let legs = decode_legs(bytes, *inflight)?;
+        Ok((record, legs))
+    }
+
+    /// Find an open (not-closed) inflight holding account for `destination`.
+    async fn open_inflight_hold_for(
+        &self,
+        destination: &AccountId,
+    ) -> Result<Option<AccountId>, LedgerError> {
+        let want = destination.0.to_be_bytes();
+        for a in self.list_accounts().await? {
+            if a.flags.contains(AccountFlags::INFLIGHT)
+                && !a.is_closed()
+                && a.metadata.get(K_DEST).map(|v| v.as_slice()) == Some(want.as_slice())
+            {
+                return Ok(Some(a.id));
+            }
+        }
+        Ok(None)
+    }
+
+    /// Commit a `hold -> target` settling transfer tagged with the inflight role.
+    #[allow(clippy::too_many_arguments)]
+    async fn settle(
+        self: &Arc<Self>,
+        book: BookId,
+        inflight: EnvelopeId,
+        hold: AccountId,
+        target: AccountId,
+        destination: AccountId,
+        asset: AssetId,
+        amount: Cent,
+        role: u8,
+    ) -> Result<Receipt, LedgerError> {
+        let tx = TransferBuilder::new()
+            .book(book)
+            .pay(hold, target, asset, amount)
+            .metadata(settle_metadata(role, inflight, destination))
+            .build();
+        self.commit(tx).await
+    }
+
+    /// Close a holding account once it has no live (Active/PendingInactive)
+    /// postings left. No-op if already closed or still holding funds.
+    async fn close_if_drained(&self, hold: &AccountId) -> Result<(), LedgerError> {
+        let live = self
+            .store()
+            .get_postings_by_account(hold, None, None)
+            .await?
+            .into_iter()
+            .any(|p| p.status != PostingStatus::Inactive);
+        if live {
+            return Ok(());
+        }
+        if !self.get_account(hold).await?.is_closed() {
+            self.close(hold).await?;
+        }
+        Ok(())
+    }
+}
+
+fn holds_of(legs: &[InflightLeg]) -> BTreeSet<AccountId> {
+    legs.iter().map(|l| l.hold).collect()
+}
+
+fn assets_of(legs: &[InflightLeg], hold: AccountId) -> BTreeSet<AssetId> {
+    legs.iter()
+        .filter(|l| l.hold == hold)
+        .map(|l| l.asset)
+        .collect()
+}
+
+fn destination_of(
+    legs: &[InflightLeg],
+    hold: AccountId,
+    inflight: EnvelopeId,
+) -> Result<AccountId, LedgerError> {
+    legs.iter()
+        .find(|l| l.hold == hold)
+        .map(|l| l.destination)
+        .ok_or_else(|| malformed(inflight))
+}
+
+fn overall_state(lines: &[InflightLegStatus]) -> InflightState {
+    let mut any_held = false;
+    let mut any_confirmed = false;
+    let mut any_voided = false;
+    for l in lines {
+        if l.held.is_positive() {
+            any_held = true;
+        }
+        if l.confirmed.is_positive() {
+            any_confirmed = true;
+        }
+        if l.voided.is_positive() {
+            any_voided = true;
+        }
+    }
+    match (any_held, any_confirmed, any_voided) {
+        (true, false, false) => InflightState::Held,
+        (true, _, _) => InflightState::PartiallyConfirmed,
+        (false, true, true) => InflightState::Mixed,
+        (false, false, true) => InflightState::Voided,
+        // Fully settled to destinations, or an empty/zero authorization.
+        (false, _, false) => InflightState::Confirmed,
+    }
+}

+ 4 - 0
crates/kuatia/src/lib.rs

@@ -5,6 +5,7 @@
 //! commit pipeline (load → plan → apply) behind a convenient async API.
 
 pub mod error;
+pub mod inflight;
 pub mod ledger;
 pub mod saga;
 
@@ -22,6 +23,9 @@ pub mod prelude {
     pub use kuatia_core::*;
 
     pub use crate::error::LedgerError;
+    pub use crate::inflight::{
+        Authorization, InflightLeg, InflightLegStatus, InflightState, InflightStatus,
+    };
     pub use crate::ledger::Ledger;
     pub use kuatia_storage::mem_store::InMemoryStore;
     pub use kuatia_storage::store::Store;

+ 296 - 0
crates/kuatia/tests/inflight.rs

@@ -0,0 +1,296 @@
+//! Integration tests for inflight holds (authorize / confirm / void).
+//!
+//! The running example is the ADR's confirmed trade between A and B with a fee
+//! account, spanning two assets:
+//!
+//! ```text
+//! A -> B   -> 100 EUR
+//! B -> A   ->  10 BTC
+//! A -> fee ->   1 BTC
+//! B -> fee ->   1 EUR
+//! ```
+//!
+//! Authorized, the funds park in per-destination holding accounts; `fee`'s hold
+//! collects EUR from B and BTC from A.
+
+#![allow(missing_docs)]
+
+use std::collections::BTreeMap;
+use std::sync::Arc;
+
+use kuatia::prelude::*;
+
+fn eur() -> AssetId {
+    AssetId::new(1)
+}
+fn btc() -> AssetId {
+    AssetId::new(2)
+}
+fn a() -> AccountId {
+    AccountId::new(1)
+}
+fn b() -> AccountId {
+    AccountId::new(2)
+}
+fn fee() -> AccountId {
+    AccountId::new(3)
+}
+fn ext() -> AccountId {
+    AccountId::new(99)
+}
+
+fn make_account(id: i64, policy: AccountPolicy) -> Account {
+    Account {
+        id: AccountId::new(id),
+        version: 1,
+        policy,
+        flags: AccountFlags::empty(),
+        book: BookId(0),
+        user_data: UserData::default(),
+        metadata: BTreeMap::new(),
+    }
+}
+
+async fn deposit(ledger: &Arc<Ledger>, to: AccountId, asset: AssetId, amount: i64) {
+    let t = TransferBuilder::new()
+        .deposit(to, asset, Cent::from(amount), ext())
+        .unwrap()
+        .build();
+    ledger.commit(t).await.unwrap();
+}
+
+/// A ledger with accounts A, B, fee, external; A holds 100 EUR + 1 BTC, B holds
+/// 10 BTC + 1 EUR.
+async fn setup() -> Arc<Ledger> {
+    let ledger = Arc::new(Ledger::new(InMemoryStore::new()));
+    for id in [1, 2, 3] {
+        ledger
+            .store()
+            .create_account(make_account(id, AccountPolicy::NoOverdraft))
+            .await
+            .unwrap();
+    }
+    ledger
+        .store()
+        .create_account(make_account(99, AccountPolicy::ExternalAccount))
+        .await
+        .unwrap();
+    deposit(&ledger, a(), eur(), 100).await;
+    deposit(&ledger, a(), btc(), 1).await;
+    deposit(&ledger, b(), btc(), 10).await;
+    deposit(&ledger, b(), eur(), 1).await;
+    ledger
+}
+
+fn trade() -> Transfer {
+    TransferBuilder::new()
+        .pay(a(), b(), eur(), Cent::from(100))
+        .pay(b(), a(), btc(), Cent::from(10))
+        .pay(a(), fee(), btc(), Cent::from(1))
+        .pay(b(), fee(), eur(), Cent::from(1))
+        .build()
+}
+
+async fn bal(ledger: &Arc<Ledger>, account: AccountId, asset: AssetId) -> Cent {
+    ledger.balance(&account, &asset).await.unwrap()
+}
+
+/// After authorize, funds leave the payers and sit in the holds; the payers'
+/// balances drop to zero and nothing has reached the destinations yet.
+#[tokio::test]
+async fn authorize_parks_funds_in_holds() {
+    let ledger = setup().await;
+    let auth = ledger.authorize(trade()).await.unwrap();
+
+    // Payers emptied.
+    assert_eq!(bal(&ledger, a(), eur()).await, Cent::ZERO);
+    assert_eq!(bal(&ledger, a(), btc()).await, Cent::ZERO);
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::ZERO);
+    assert_eq!(bal(&ledger, b(), btc()).await, Cent::ZERO);
+
+    // Destinations untouched.
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::ZERO);
+    assert_eq!(bal(&ledger, fee(), eur()).await, Cent::ZERO);
+
+    // Three holds are open, and status reports everything Held.
+    assert_eq!(ledger.list_open_inflights().await.unwrap().len(), 3);
+    let status = ledger.inflight_status(&auth.inflight).await.unwrap();
+    assert_eq!(status.state, InflightState::Held);
+    let total_held: Cent = Cent::checked_sum(status.legs.iter().map(|l| l.held)).unwrap();
+    let total_auth: Cent = Cent::checked_sum(status.legs.iter().map(|l| l.authorized)).unwrap();
+    assert_eq!(total_held, total_auth);
+}
+
+/// Confirming the whole transaction settles every leg to its destination and
+/// closes the holds. The net result equals the original trade.
+#[tokio::test]
+async fn confirm_all_settles_to_destinations() {
+    let ledger = setup().await;
+    let auth = ledger.authorize(trade()).await.unwrap();
+
+    ledger.confirm_all(&auth.inflight).await.unwrap();
+
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::from(100));
+    assert_eq!(bal(&ledger, a(), btc()).await, Cent::from(10));
+    assert_eq!(bal(&ledger, fee(), eur()).await, Cent::from(1));
+    assert_eq!(bal(&ledger, fee(), btc()).await, Cent::from(1));
+
+    // Holds drained and closed.
+    assert!(ledger.list_open_inflights().await.unwrap().is_empty());
+    let status = ledger.inflight_status(&auth.inflight).await.unwrap();
+    assert_eq!(status.state, InflightState::Confirmed);
+}
+
+/// Voiding returns every held posting to the funder recorded in the leg table,
+/// including the multi-asset fee hold funded by two different accounts.
+#[tokio::test]
+async fn void_returns_funds_to_funders() {
+    let ledger = setup().await;
+    let auth = ledger.authorize(trade()).await.unwrap();
+
+    ledger.void(&auth.inflight).await.unwrap();
+
+    // Everyone is back where they started.
+    assert_eq!(bal(&ledger, a(), eur()).await, Cent::from(100));
+    assert_eq!(bal(&ledger, a(), btc()).await, Cent::from(1));
+    assert_eq!(bal(&ledger, b(), btc()).await, Cent::from(10));
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::from(1));
+    assert_eq!(bal(&ledger, fee(), eur()).await, Cent::ZERO);
+    assert_eq!(bal(&ledger, fee(), btc()).await, Cent::ZERO);
+
+    assert!(ledger.list_open_inflights().await.unwrap().is_empty());
+    let status = ledger.inflight_status(&auth.inflight).await.unwrap();
+    assert_eq!(status.state, InflightState::Voided);
+}
+
+/// A partial confirm delivers a slice and leaves the remainder held. Confirming
+/// the rest drains and closes the hold.
+#[tokio::test]
+async fn partial_confirm_then_confirm_remainder() {
+    let ledger = setup().await;
+    let auth = ledger.authorize(trade()).await.unwrap();
+
+    ledger
+        .confirm(&auth.inflight, &b(), &eur(), Cent::from(40))
+        .await
+        .unwrap();
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::from(40));
+
+    // The B/EUR leg is partially confirmed.
+    let status = ledger.inflight_status(&auth.inflight).await.unwrap();
+    let leg = status
+        .legs
+        .iter()
+        .find(|l| l.destination == b() && l.asset == eur())
+        .unwrap();
+    assert_eq!(leg.authorized, Cent::from(100));
+    assert_eq!(leg.confirmed, Cent::from(40));
+    assert_eq!(leg.held, Cent::from(60));
+    assert_eq!(status.state, InflightState::PartiallyConfirmed);
+
+    // Confirm the rest.
+    ledger
+        .confirm(&auth.inflight, &b(), &eur(), Cent::from(60))
+        .await
+        .unwrap();
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::from(100));
+    // The B hold is now closed (its only asset drained).
+    assert!(
+        !ledger
+            .list_open_inflights()
+            .await
+            .unwrap()
+            .contains(&leg.hold)
+    );
+}
+
+/// A partial confirm followed by a void: the slice reaches the destination and
+/// the remainder returns to the funder.
+#[tokio::test]
+async fn partial_confirm_then_void_remainder() {
+    let ledger = setup().await;
+    let auth = ledger.authorize(trade()).await.unwrap();
+
+    ledger
+        .confirm(&auth.inflight, &b(), &eur(), Cent::from(40))
+        .await
+        .unwrap();
+    ledger.void(&auth.inflight).await.unwrap();
+
+    // B kept the confirmed 40 EUR from its own hold, and got its 1 EUR fee
+    // contribution back from the (now voided) fee hold: 41 total. A got the
+    // remaining 60 EUR of B's hold back.
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::from(41));
+    assert_eq!(bal(&ledger, a(), eur()).await, Cent::from(60));
+
+    let status = ledger.inflight_status(&auth.inflight).await.unwrap();
+    let leg = status
+        .legs
+        .iter()
+        .find(|l| l.destination == b() && l.asset == eur())
+        .unwrap();
+    assert_eq!(leg.confirmed, Cent::from(40));
+    assert_eq!(leg.voided, Cent::from(60));
+    assert_eq!(leg.held, Cent::ZERO);
+    assert_eq!(status.state, InflightState::Mixed);
+}
+
+/// Confirming more than is held is rejected. The `NoOverdraft` hold makes
+/// over-confirmation impossible.
+#[tokio::test]
+async fn over_confirm_is_rejected() {
+    let ledger = setup().await;
+    let auth = ledger.authorize(trade()).await.unwrap();
+
+    let err = ledger
+        .confirm(&auth.inflight, &b(), &eur(), Cent::from(101))
+        .await
+        .unwrap_err();
+    assert!(matches!(err, LedgerError::Selection(_)));
+    // Nothing moved.
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::ZERO);
+}
+
+/// Only one open inflight is allowed per destination account at a time.
+#[tokio::test]
+async fn one_open_inflight_per_account() {
+    let ledger = setup().await;
+    let _auth = ledger.authorize(trade()).await.unwrap();
+
+    // A second authorize touching B (an open destination) is rejected.
+    deposit(&ledger, a(), eur(), 10).await;
+    let again = TransferBuilder::new()
+        .pay(a(), b(), eur(), Cent::from(10))
+        .build();
+    let err = ledger.authorize(again).await.unwrap_err();
+    assert!(matches!(err, LedgerError::InflightAlreadyOpen(id) if id == b()));
+}
+
+/// After a full confirm closes the holds, a fresh inflight to the same
+/// destinations is allowed again.
+#[tokio::test]
+async fn reauthorize_after_settlement() {
+    let ledger = setup().await;
+    let auth = ledger.authorize(trade()).await.unwrap();
+    ledger.confirm_all(&auth.inflight).await.unwrap();
+
+    // B now holds 100 EUR; authorize a new hold of 30 of it to fee.
+    let again = TransferBuilder::new()
+        .pay(b(), fee(), eur(), Cent::from(30))
+        .build();
+    let auth2 = ledger.authorize(again).await.unwrap();
+    assert_eq!(bal(&ledger, b(), eur()).await, Cent::from(70));
+    ledger.confirm_all(&auth2.inflight).await.unwrap();
+    assert_eq!(bal(&ledger, fee(), eur()).await, Cent::from(31));
+}
+
+/// Operating on a non-inflight or unknown transfer id is a clean error.
+#[tokio::test]
+async fn unknown_inflight_is_an_error() {
+    let ledger = setup().await;
+    let bogus = EnvelopeId([7u8; 32]);
+    assert!(matches!(
+        ledger.confirm_all(&bogus).await.unwrap_err(),
+        LedgerError::InflightNotFound(_)
+    ));
+}

+ 1 - 1
doc/adr/0004-inflight-holds-via-holding-accounts.md

@@ -1,6 +1,6 @@
 # Inflight holds via per-destination holding accounts
 
-* Status: proposed
+* Status: accepted
 * Authors: Cesar Rodas
 * Date: 2026-07-03
 * Targeted modules: `kuatia` (`ledger`, `saga`), `kuatia-types`

+ 117 - 0
doc/inflight.md

@@ -0,0 +1,117 @@
+# Inflight holds
+
+Inflight holds let you reserve funds now and settle later: authorize a trade,
+then confirm it (in full or in parts) or void it. This is the
+authorization/capture pattern, applied to a multi-leg trade.
+
+The design and its rationale are in
+[adr/0004-inflight-holds-via-holding-accounts.md](adr/0004-inflight-holds-via-holding-accounts.md).
+This page is the usage guide.
+
+## Model
+
+An inflight transaction is an ordinary trade whose every destination is
+rewritten to a fresh per-destination **holding account** (`NoOverdraft`, flagged
+`INFLIGHT`). Committing that rewritten transfer parks the funds:
+
+```text
+Confirmed trade            Inflight form
+-----------------          --------------------------------
+A -> B   -> 100 EUR        A -> hold(B)   -> 100 EUR
+B -> A   ->  10 BTC        B -> hold(A)   ->  10 BTC
+A -> fee ->   1 BTC        A -> hold(fee) ->   1 BTC
+B -> fee ->   1 EUR        B -> hold(fee) ->   1 EUR
+```
+
+A hold is keyed by destination, so `hold(fee)` collects EUR from B and BTC from
+A. Each holding posting's funder is recorded in the authorize transfer's leg
+table, so a void returns each posting to the account that paid it.
+
+Nothing new is stored. The authorize transfer is the record: its `EnvelopeId` is
+the inflight handle, and its metadata carries the leg table. Every artifact
+(holding accounts, authorize, confirm, void) is tagged in an `inflight.`
+metadata namespace, so the lifecycle is read from recorded fields, not inferred.
+
+## Lifecycle
+
+```mermaid
+stateDiagram-v2
+    [*] --> Held: authorize
+    Held --> Held: confirm (partial)
+    Held --> Confirmed: confirm_all / drained
+    Held --> Voided: void
+    Confirmed --> [*]
+    Voided --> [*]
+```
+
+Every operation is an ordinary `commit`, so idempotency, conservation, and crash
+recovery are inherited unchanged. A hold closes automatically once drained.
+
+## API
+
+All methods hang off `Ledger`.
+
+```rust
+use kuatia::prelude::*;
+
+// Authorize the trade. Funds leave A and B and park in the holds.
+let trade = TransferBuilder::new()
+    .pay(a, b, eur, Cent::from(100))
+    .pay(b, a, btc, Cent::from(10))
+    .pay(a, fee, btc, Cent::from(1))
+    .pay(b, fee, eur, Cent::from(1))
+    .build();
+let auth = ledger.authorize(trade).await?;
+
+// Confirm one leg partially: deliver 40 EUR of B's hold to B now.
+ledger.confirm(&auth.inflight, &b, &eur, Cent::from(40)).await?;
+
+// Confirm everything else and close the holds.
+ledger.confirm_all(&auth.inflight).await?;
+
+// ...or return everything to the funders instead.
+ledger.void(&auth.inflight).await?;
+
+// Derived status: per-leg authorized / confirmed / voided / held, plus state.
+let status = ledger.inflight_status(&auth.inflight).await?;
+
+// The holding accounts of every open inflight.
+let open = ledger.list_open_inflights().await?;
+```
+
+`authorize` returns an `Authorization { inflight, receipt, legs }`. The
+`inflight` field (an `EnvelopeId`) is the handle passed to every other call.
+
+## Guarantees
+
+- **Over-confirmation is impossible.** A hold is `NoOverdraft`, so confirming
+  more than it holds fails validation. The sum of confirmations can never exceed
+  the authorized amount.
+- **No double-spend under concurrency.** Concurrent confirmations serialize on
+  the shared holding posting via the reservation protocol. On contention, one
+  wins and the caller retries the other against the new remaining balance.
+- **State is derived.** The amount still held on a leg is `balance(hold, asset)`.
+  Confirmed and voided amounts are summed from the metadata-tagged settling
+  transfers. Nothing mutable is stored.
+
+## Constraints and limitations
+
+- **One open inflight per account.** A destination with an open hold cannot be
+  the destination of a second authorize until the first is confirmed or voided.
+- **Distinct movements.** Every movement in an authorize must move between two
+  different accounts.
+- **Void needs an open payer.** Voiding returns funds to the original funder, so
+  that account must still be open.
+- **Single funder per `(hold, asset)`.** When two accounts fund the same asset
+  into the same destination hold, a partially-confirmed remainder cannot be
+  split back to each funder exactly; void returns it in leg order.
+- **Books.** A hold is created in the authorize transfer's book. If that book
+  restricts participation by flag or account, it must admit the holds (for
+  example by allowing the `INFLIGHT` flag).
+
+## Where it lives
+
+- `crates/kuatia/src/inflight.rs` — the API and metadata schema.
+- `crates/kuatia/tests/inflight.rs` — authorize, confirm, partial confirm, void,
+  over-confirm rejection, one-open-per-account, and status tests.
+- `AccountFlags::INFLIGHT` — `crates/kuatia-types/src/lib.rs`.