Przeglądaj źródła

Refactor melt quote locking from state transition to load time

Moves the sibling quote locking logic out of `update_melt_quote_state` and into
a new `load_melt_quotes_exclusively` function that combines quote loading with
defensive locking.

Previously, the SQL layer checked for conflicting quotes during state
transitions inside `update_melt_quote_state`. This approach had a subtle race
window: between loading a quote and transitioning its state, another concurrent
operation could modify a sibling quote. By moving the locking earlier—to the
moment the quote is loaded—we eliminate this window entirely.

The new architecture:

1. **`get_melt_quotes_by_request_lookup_id`**: Low-level database method that
retrieves and locks all melt quotes sharing a lookup identifier using `FOR
UPDATE`.

2. **`load_melt_quotes_exclusively`**: Application-layer function that loads a
quote by ID, locks all siblings, and validates no sibling is already pending or
paid. Returns the quote ready for safe state transitions.

3. **Deadlock handling**: Adds `Error::Locked` variant and maps PostgreSQL
deadlock detection (`T_R_DEADLOCK_DETECTED`) to this error, which is then
converted to `Duplicate` to signal the conflict to callers.

The refactored flow is used in three critical paths:
- Melt saga setup (before transitioning to pending)
- Melt saga finalization (before completing payment)
- Startup recovery via `finalize_melt_quote`

This approach mirrors the existing mint quote locking pattern and provides
consistent protection against BOLT12-related race conditions where multiple
quotes can share the same payment lookup identifier.
Cesar Rodas 1 miesiąc temu
rodzic
commit
9c1a806e4a

+ 21 - 1
crates/cdk-common/src/database/mint/mod.rs

@@ -8,7 +8,7 @@ use cashu::Amount;
 
 use super::{DbTransactionFinalizer, Error};
 use crate::database::Acquired;
-use crate::mint::{self, MintKeySetInfo, MintQuote as MintMintQuote, Operation};
+use crate::mint::{self, MeltQuote, MintKeySetInfo, MintQuote as MintMintQuote, Operation};
 use crate::nuts::{
     BlindSignature, BlindedMessage, CurrencyUnit, Id, MeltQuoteState, Proof, Proofs, PublicKey,
     State,
@@ -157,6 +157,26 @@ pub trait QuotesTransaction {
     /// Add [`mint::MeltQuote`]
     async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err>;
 
+    /// Retrieves all melt quotes matching a payment lookup identifier and locks them for update.
+    ///
+    /// This method returns multiple quotes because certain payment methods (notably BOLT12 offers)
+    /// can generate multiple payment attempts that share the same lookup identifier. Locking all
+    /// related quotes prevents race conditions where concurrent melt operations could interfere
+    /// with each other, potentially leading to double-spending or state inconsistencies.
+    ///
+    /// The returned quotes are locked within the current transaction to ensure safe concurrent
+    /// modification. This is essential during melt saga initiation and finalization to guarantee
+    /// atomic state transitions across all related quotes.
+    ///
+    /// # Arguments
+    ///
+    /// * `request_lookup_id` - The payment identifier used by the Lightning backend to track
+    ///   payment state (e.g., payment hash, offer ID, or label).
+    async fn get_melt_quotes_by_request_lookup_id(
+        &mut self,
+        request_lookup_id: &PaymentIdentifier,
+    ) -> Result<Vec<Acquired<MeltQuote>>, Self::Err>;
+
     /// Updates the request lookup id for a melt quote.
     ///
     /// Requires an [`Acquired`] melt quote to ensure the row is locked before modification.

+ 5 - 0
crates/cdk-common/src/database/mod.rs

@@ -206,6 +206,11 @@ pub enum Error {
     /// Duplicate entry
     #[error("Duplicate entry")]
     Duplicate,
+
+    /// Locked resource
+    #[error("Locked resource")]
+    Locked,
+
     /// Amount overflow
     #[error("Amount overflow")]
     AmountOverflow,

+ 4 - 0
crates/cdk-postgres/src/db.rs

@@ -14,6 +14,10 @@ fn to_pgsql_error(err: PgError) -> Error {
         if code == SqlState::INTEGRITY_CONSTRAINT_VIOLATION || code == SqlState::UNIQUE_VIOLATION {
             return Error::Duplicate;
         }
+
+        if code == SqlState::T_R_DEADLOCK_DETECTED {
+            return Error::Locked;
+        }
     }
 
     Error::Database(Box::new(err))

+ 55 - 41
crates/cdk-sql-common/src/mint/mod.rs

@@ -656,6 +656,52 @@ where
         .transpose()
 }
 
+#[inline]
+async fn get_melt_quotes_by_request_lookup_id_inner<T>(
+    executor: &T,
+    request_lookup_id: &PaymentIdentifier,
+    for_update: bool,
+) -> Result<Vec<mint::MeltQuote>, Error>
+where
+    T: DatabaseExecutor,
+{
+    let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
+    let query_str = format!(
+        r#"
+        SELECT
+            id,
+            unit,
+            amount,
+            request,
+            fee_reserve,
+            expiry,
+            state,
+            payment_preimage,
+            request_lookup_id,
+            created_time,
+            paid_time,
+            payment_method,
+            options,
+            request_lookup_id_kind
+        FROM
+            melt_quote
+        WHERE
+            request_lookup_id = :request_lookup_id
+            AND request_lookup_id_kind = :request_lookup_id_kind
+        {for_update_clause}
+        "#
+    );
+
+    query(&query_str)?
+        .bind("request_lookup_id", request_lookup_id.to_string())
+        .bind("request_lookup_id_kind", request_lookup_id.kind())
+        .fetch_all(executor)
+        .await?
+        .into_iter()
+        .map(sql_row_to_melt_quote)
+        .collect::<Result<Vec<_>, _>>()
+}
+
 #[async_trait]
 impl<RM> MintKeyDatabaseTransaction<'_, Error> for SQLTransaction<RM>
 where
@@ -1189,47 +1235,6 @@ where
 
         check_melt_quote_state_transition(old_state, state)?;
 
-        // When transitioning to Pending, lock all quotes with the same lookup_id
-        // and check if any are already pending or paid
-        if state == MeltQuoteState::Pending {
-            if let Some(ref lookup_id) = quote.request_lookup_id {
-                // Lock all quotes with the same lookup_id to prevent race conditions
-                let locked_quotes: Vec<(String, String)> = query(
-                    r#"
-                    SELECT id, state
-                    FROM melt_quote
-                    WHERE request_lookup_id = :lookup_id
-                    FOR UPDATE
-                    "#,
-                )?
-                .bind("lookup_id", lookup_id.to_string())
-                .fetch_all(&self.inner)
-                .await?
-                .into_iter()
-                .map(|row| {
-                    unpack_into!(let (id, state) = row);
-                    Ok((column_as_string!(id), column_as_string!(state)))
-                })
-                .collect::<Result<Vec<_>, Error>>()?;
-
-                // Check if any other quote with the same lookup_id is pending or paid
-                let has_conflict = locked_quotes.iter().any(|(id, s)| {
-                    id != &quote.id.to_string()
-                        && (s == &MeltQuoteState::Pending.to_string()
-                            || s == &MeltQuoteState::Paid.to_string())
-                });
-
-                if has_conflict {
-                    tracing::warn!(
-                        "Cannot transition quote {} to Pending: another quote with lookup_id {} is already pending or paid",
-                        quote.id,
-                        lookup_id
-                    );
-                    return Err(Error::Duplicate);
-                }
-            }
-        }
-
         let rec = if state == MeltQuoteState::Paid {
             let current_time = unix_time();
             quote.paid_time = Some(current_time);
@@ -1284,6 +1289,15 @@ where
             .map(|quote| quote.map(|inner| inner.into()))
     }
 
+    async fn get_melt_quotes_by_request_lookup_id(
+        &mut self,
+        request_lookup_id: &PaymentIdentifier,
+    ) -> Result<Vec<Acquired<mint::MeltQuote>>, Self::Err> {
+        get_melt_quotes_by_request_lookup_id_inner(&self.inner, request_lookup_id, true)
+            .await
+            .map(|quote| quote.into_iter().map(|inner| inner.into()).collect())
+    }
+
     async fn get_mint_quote_by_request(
         &mut self,
         request: &str,

+ 12 - 17
crates/cdk/src/mint/melt/melt_saga/mod.rs

@@ -16,6 +16,7 @@ use tracing::instrument;
 use self::compensation::{CompensatingAction, RemoveMeltSetup};
 use self::state::{Initial, PaymentConfirmed, SettlementDecision, SetupComplete};
 use crate::cdk_payment::MakePaymentResponse;
+use crate::mint::melt::shared;
 use crate::mint::subscription::PubSubManager;
 use crate::mint::verification::Verification;
 use crate::mint::{MeltQuoteBolt11Response, MeltRequest};
@@ -202,6 +203,15 @@ impl MeltSaga<Initial> {
 
         let mut tx = self.db.begin_transaction().await?;
 
+        let mut quote =
+            match shared::load_melt_quotes_exclusively(&mut tx, melt_request.quote()).await {
+                Ok(quote) => quote,
+                Err(err) => {
+                    tx.rollback().await?;
+                    return Err(err);
+                }
+            };
+
         // Calculate fee to create Operation with actual amounts
         let fee_breakdown = self.mint.get_proofs_fee(melt_request.inputs()).await?;
 
@@ -281,19 +291,6 @@ impl MeltSaga<Initial> {
             );
         }
 
-        // Get and lock the quote
-        let mut quote = match tx.get_melt_quote(melt_request.quote()).await {
-            Ok(Some(q)) => q,
-            Ok(None) => {
-                tx.rollback().await?;
-                return Err(Error::UnknownQuote);
-            }
-            Err(err) => {
-                tx.rollback().await?;
-                return Err(err.into());
-            }
-        };
-
         let previous_state = quote.state;
 
         // Publish proof state changes
@@ -888,10 +885,8 @@ impl MeltSaga<PaymentConfirmed> {
         let mut tx = self.db.begin_transaction().await?;
 
         // Acquire lock on the quote for safe state update
-        let mut quote = tx
-            .get_melt_quote(&self.state_data.quote.id)
-            .await?
-            .ok_or(Error::UnknownQuote)?;
+        let mut quote =
+            shared::load_melt_quotes_exclusively(&mut tx, &self.state_data.quote.id).await?;
 
         // Get melt request info (needed for validation and change)
         let MeltRequestInfo {

+ 2 - 0
crates/cdk/src/mint/melt/melt_saga/tests.rs

@@ -2755,6 +2755,7 @@ async fn test_duplicate_lookup_id_prevents_second_pending() {
         .await
         .unwrap()
         .expect("Quote 1 should exist");
+
     let quote2 = mint
         .localstore
         .get_melt_quote(&quote_response2.quote)
@@ -2846,6 +2847,7 @@ async fn test_duplicate_lookup_id_prevents_second_pending() {
         .await
         .unwrap()
         .unwrap();
+
     assert_eq!(
         still_unpaid_quote2.state,
         MeltQuoteState::Unpaid,

+ 69 - 11
crates/cdk/src/mint/melt/shared.rs

@@ -244,6 +244,73 @@ pub async fn process_melt_change(
     Ok((Some(change_sigs), tx))
 }
 
+/// Loads a melt quote and acquires exclusive locks on all related quotes.
+///
+/// This function combines quote loading with defensive locking to prevent race conditions in BOLT12
+/// scenarios where multiple melt quotes can share the same `request_lookup_id`. It performs three
+/// operations atomically:
+///
+/// 1. Loads the melt quote by ID 2. Acquires row-level locks on all sibling quotes sharing the same
+///    lookup identifier 3. Validates that no sibling quote is already in `Pending` or `Paid` state
+///
+/// This ensures that when a melt operation begins, no other concurrent melt can proceed on a
+/// related quote, preventing double-spending and state inconsistencies.
+///
+/// # Arguments
+///
+/// * `tx` - The active database transaction used to load and acquire locks. * `quote_id` - The ID
+///   of the melt quote to load and process.
+///
+/// # Returns
+///
+/// The loaded and locked melt quote, ready for state transitions.
+///
+/// # Errors
+///
+/// * [`Error::UnknownQuote`] if no quote exists with the given ID. * [`Error::Database(Duplicate)`]
+///   if another quote with the same lookup ID is already pending or paid, indicating a conflicting
+///   concurrent melt operation.
+pub async fn load_melt_quotes_exclusively(
+    tx: &mut Box<dyn database::MintTransaction<database::Error> + Send + Sync>,
+    quote_id: &QuoteId,
+) -> Result<Acquired<MeltQuote>, Error> {
+    let quote = tx
+        .get_melt_quote(quote_id)
+        .await
+        .map_err(|e| match e {
+            database::Error::Locked => database::Error::Duplicate,
+            e => e,
+        })?
+        .ok_or(Error::UnknownQuote)?;
+
+    // Lock any other quotes so they cannot be modified
+    let locked_quotes = if let Some(request_lookup_id) = quote.request_lookup_id.as_ref() {
+        tx.get_melt_quotes_by_request_lookup_id(request_lookup_id)
+            .await
+            .map_err(|e| match e {
+                database::Error::Locked => database::Error::Duplicate,
+                e => e,
+            })?
+    } else {
+        vec![]
+    };
+
+    if locked_quotes.iter().any(|locked_quote| {
+        locked_quote.id != quote.id
+            && (locked_quote.state == MeltQuoteState::Pending
+                || locked_quote.state == MeltQuoteState::Paid)
+    }) {
+        tracing::warn!(
+            "Cannot transition quote {} to Pending: another quote with lookup_id {:?} is already pending or paid",
+            quote.id,
+            quote.request_lookup_id,
+        );
+        return Err(Error::Database(crate::cdk_database::Error::Duplicate));
+    }
+
+    Ok(quote)
+}
+
 /// Finalizes a melt quote by updating proofs, quote state, and publishing changes.
 ///
 /// This function performs the core finalization operations that are common to both
@@ -400,17 +467,8 @@ pub async fn finalize_melt_quote(
     let mut tx = db.begin_transaction().await?;
 
     // Acquire lock on the quote for safe state update
-    let mut locked_quote = match tx.get_melt_quote(&quote.id).await? {
-        Some(q) => q,
-        None => {
-            tracing::warn!(
-                "Melt quote {} not found - may have been completed already",
-                quote.id
-            );
-            tx.rollback().await?;
-            return Ok(None);
-        }
-    };
+
+    let mut locked_quote = load_melt_quotes_exclusively(&mut tx, &quote.id).await?;
 
     // Get melt request info
     let melt_request_info = match tx.get_melt_request_and_blinded_messages(&quote.id).await? {