Przeglądaj źródła

Melt pending quote check (#1531)

* feat: check with ln on melt quote check

* feat: add debug to payment id

* feat: centralize proof update
tsk 1 tydzień temu
rodzic
commit
a658dbfd05

+ 16 - 1
crates/cdk-common/src/payment.rs

@@ -82,7 +82,7 @@ impl From<Infallible> for Error {
 }
 
 /// Payment identifier types
-#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)]
+#[derive(Clone, Hash, PartialEq, Eq, Deserialize, Serialize)]
 #[serde(tag = "type", content = "value")]
 pub enum PaymentIdentifier {
     /// Label identifier
@@ -151,6 +151,21 @@ impl std::fmt::Display for PaymentIdentifier {
     }
 }
 
+impl std::fmt::Debug for PaymentIdentifier {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            PaymentIdentifier::PaymentHash(h) => write!(f, "PaymentHash({})", hex::encode(h)),
+            PaymentIdentifier::Bolt12PaymentHash(h) => {
+                write!(f, "Bolt12PaymentHash({})", hex::encode(h))
+            }
+            PaymentIdentifier::PaymentId(h) => write!(f, "PaymentId({})", hex::encode(h)),
+            PaymentIdentifier::Label(s) => write!(f, "Label({})", s),
+            PaymentIdentifier::OfferId(s) => write!(f, "OfferId({})", s),
+            PaymentIdentifier::CustomId(s) => write!(f, "CustomId({})", s),
+        }
+    }
+}
+
 /// Options for creating a BOLT11 incoming payment request
 #[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
 pub struct Bolt11IncomingPaymentOptions {

+ 5 - 2
crates/cdk/src/mint/melt/melt_saga/compensation.rs

@@ -9,13 +9,15 @@ use cdk_common::{Error, PublicKey, QuoteId};
 use tracing::instrument;
 use uuid::Uuid;
 
+use crate::mint::subscription::PubSubManager;
+
 /// Trait for compensating actions in the saga pattern.
 ///
 /// Compensating actions are registered as steps complete and executed in reverse
 /// order (LIFO) if the saga fails. Each action should be idempotent.
 #[async_trait]
 pub trait CompensatingAction: Send + Sync {
-    async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error>;
+    async fn execute(&self, db: &DynMintDatabase, pubsub: &PubSubManager) -> Result<(), Error>;
     fn name(&self) -> &'static str;
 }
 
@@ -46,7 +48,7 @@ pub struct RemoveMeltSetup {
 #[async_trait]
 impl CompensatingAction for RemoveMeltSetup {
     #[instrument(skip_all)]
-    async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error> {
+    async fn execute(&self, db: &DynMintDatabase, pubsub: &PubSubManager) -> Result<(), Error> {
         tracing::info!(
             "Compensation: Removing melt setup for quote {} ({} proofs, {} blinded messages, saga {})",
             self.quote_id,
@@ -57,6 +59,7 @@ impl CompensatingAction for RemoveMeltSetup {
 
         super::super::shared::rollback_melt_quote(
             db,
+            pubsub,
             &self.quote_id,
             &self.input_ys,
             &self.blinded_secrets,

+ 1 - 1
crates/cdk/src/mint/melt/melt_saga/mod.rs

@@ -1003,7 +1003,7 @@ impl<S> MeltSaga<S> {
 
         while let Some(compensation) = compensations.pop_front() {
             tracing::debug!("Running compensation: {}", compensation.name());
-            if let Err(e) = compensation.execute(&self.db).await {
+            if let Err(e) = compensation.execute(&self.db, &self.pubsub).await {
                 tracing::error!(
                     "Compensation {} failed: {}. Continuing...",
                     compensation.name(),

+ 3 - 1
crates/cdk/src/mint/melt/mod.rs

@@ -461,7 +461,7 @@ impl Mint {
     ) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
         #[cfg(feature = "prometheus")]
         METRICS.inc_in_flight_requests("check_melt_quote");
-        let quote = match self.localstore.get_melt_quote(quote_id).await {
+        let mut quote = match self.localstore.get_melt_quote(quote_id).await {
             Ok(Some(quote)) => quote,
             Ok(None) => {
                 #[cfg(feature = "prometheus")]
@@ -483,6 +483,8 @@ impl Mint {
             }
         };
 
+        self.handle_pending_melt_quote(&mut quote).await?;
+
         let blind_signatures = match self
             .localstore
             .get_blind_signatures_for_quote(quote_id)

+ 22 - 1
crates/cdk/src/mint/melt/shared.rs

@@ -74,6 +74,7 @@ pub fn get_keyset_fee_and_amounts(
 /// Returns database errors if transaction fails
 pub async fn rollback_melt_quote(
     db: &DynMintDatabase,
+    pubsub: &PubSubManager,
     quote_id: &QuoteId,
     input_ys: &[PublicKey],
     blinded_secrets: &[PublicKey],
@@ -93,9 +94,22 @@ pub async fn rollback_melt_quote(
 
     let mut tx = db.begin_transaction().await?;
 
+    let mut proofs_recovered = false;
+
     // Remove input proofs
     if !input_ys.is_empty() {
-        tx.remove_proofs(input_ys, Some(quote_id.clone())).await?;
+        match tx.remove_proofs(input_ys, Some(quote_id.clone())).await {
+            Ok(_) => {
+                proofs_recovered = true;
+            }
+            Err(database::Error::AttemptRemoveSpentProof) => {
+                tracing::warn!(
+                    "Proofs already spent or missing during rollback for quote {}",
+                    quote_id
+                );
+            }
+            Err(e) => return Err(e.into()),
+        }
     }
 
     // Remove blinded messages (change outputs)
@@ -132,6 +146,13 @@ pub async fn rollback_melt_quote(
 
     tx.commit().await?;
 
+    // Publish proof state changes
+    if proofs_recovered {
+        for pk in input_ys.iter() {
+            pubsub.proof_state((*pk, State::Unspent));
+        }
+    }
+
     tracing::info!(
         "Successfully rolled back melt quote {} and deleted saga {}",
         quote_id,

+ 1 - 0
crates/cdk/src/mint/mod.rs

@@ -39,6 +39,7 @@ mod keysets;
 mod ln;
 mod melt;
 mod proofs;
+mod saga_recovery;
 mod start_up_check;
 mod subscription;
 mod swap;

+ 94 - 0
crates/cdk/src/mint/saga_recovery.rs

@@ -0,0 +1,94 @@
+//! Shared saga recovery logic for melt operations.
+//!
+//! This module contains functions used by both startup recovery and on-demand quote checking
+//! to process melt saga outcomes consistently.
+
+use cdk_common::database::DynMintDatabase;
+use cdk_common::mint::{MeltQuote, Saga};
+use cdk_common::nuts::MeltQuoteState;
+use cdk_common::payment::MakePaymentResponse;
+use tracing::instrument;
+
+use crate::mint::subscription::PubSubManager;
+use crate::mint::Mint;
+use crate::Error;
+
+/// Process the outcome of a melt saga based on LN payment status.
+///
+/// This function handles the shared logic for deciding whether to finalize, compensate, or skip
+/// a melt operation based on the payment response from the LN backend.
+///
+/// # Arguments
+/// * `saga` - The melt saga being processed
+/// * `quote` - The melt quote associated with the saga
+/// * `payment_response` - The payment status from the LN backend
+/// * `db` - Database handle
+/// * `pubsub` - PubSub manager for notifications
+/// * `mint` - Mint instance for signing operations
+///
+/// # Returns
+/// Ok(()) on success, or an error if processing fails
+#[instrument(skip_all)]
+pub(crate) async fn process_melt_saga_outcome(
+    saga: &Saga,
+    quote: &mut MeltQuote,
+    payment_response: &MakePaymentResponse,
+    db: &DynMintDatabase,
+    pubsub: &PubSubManager,
+    mint: &Mint,
+) -> Result<(), Error> {
+    match payment_response.status {
+        MeltQuoteState::Paid => {
+            tracing::info!(
+                "Finalizing paid melt quote {} (saga {})",
+                quote.id,
+                saga.operation_id
+            );
+            super::melt::shared::finalize_melt_quote(
+                mint,
+                db,
+                pubsub,
+                quote,
+                payment_response.total_spent.clone(),
+                payment_response.payment_proof.clone(),
+                &payment_response.payment_lookup_id,
+            )
+            .await?;
+            // Delete saga after successful finalization
+            let mut tx = db.begin_transaction().await?;
+            tx.delete_saga(&saga.operation_id).await?;
+            tx.commit().await?;
+        }
+        MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
+            tracing::info!(
+                "Compensating failed melt quote {} (saga {})",
+                quote.id,
+                saga.operation_id
+            );
+            let input_ys = db.get_proof_ys_by_operation_id(&saga.operation_id).await?;
+            let blinded_secrets = db
+                .get_blinded_secrets_by_operation_id(&saga.operation_id)
+                .await?;
+            super::melt::shared::rollback_melt_quote(
+                db,
+                pubsub,
+                &quote.id,
+                &input_ys,
+                &blinded_secrets,
+                &saga.operation_id,
+            )
+            .await?;
+
+            quote.state = MeltQuoteState::Unpaid;
+        }
+        MeltQuoteState::Pending | MeltQuoteState::Unknown => {
+            tracing::debug!(
+                "Melt quote {} (saga {}) payment status still {}, skipping action",
+                quote.id,
+                saga.operation_id,
+                payment_response.status
+            );
+        }
+    }
+    Ok(())
+}

+ 101 - 147
crates/cdk/src/mint/start_up_check.rs

@@ -5,8 +5,7 @@
 
 use std::str::FromStr;
 
-use cdk_common::database::Error as DatabaseError;
-use cdk_common::mint::OperationKind;
+use cdk_common::mint::{OperationKind, Saga};
 use cdk_common::QuoteId;
 
 use super::{Error, Mint};
@@ -15,6 +14,23 @@ use crate::mint::{MeltQuote, MeltQuoteState};
 use crate::types::PaymentProcessorKey;
 
 impl Mint {
+    /// Get incomplete melt saga by quote_id
+    async fn get_melt_saga_by_quote_id(&self, quote_id: &str) -> Result<Option<Saga>, Error> {
+        let incomplete_sagas = self
+            .localstore
+            .get_incomplete_sagas(OperationKind::Melt)
+            .await?;
+
+        for saga in incomplete_sagas {
+            if let Some(ref qid) = saga.quote_id {
+                if qid == quote_id {
+                    return Ok(Some(saga));
+                }
+            }
+        }
+        Ok(None)
+    }
+
     /// Checks the payment status of a melt quote with the LN backend
     ///
     /// This is a helper function used by saga recovery to determine whether to
@@ -145,7 +161,10 @@ impl Mint {
             };
 
             // Execute compensation (includes saga deletion)
-            if let Err(e) = compensation.execute(&self.localstore).await {
+            if let Err(e) = compensation
+                .execute(&self.localstore, &self.pubsub_manager)
+                .await
+            {
                 tracing::error!(
                     "Failed to compensate saga {}: {}. Continuing...",
                     saga.operation_id,
@@ -296,7 +315,7 @@ impl Mint {
                 }
             };
 
-            let quote = match self.localstore.get_melt_quote(&quote_id_parsed).await {
+            let mut quote = match self.localstore.get_melt_quote(&quote_id_parsed).await {
                 Ok(Some(q)) => q,
                 Ok(None) => {
                     tracing::warn!(
@@ -457,68 +476,54 @@ impl Mint {
                     Ok(payment_response) => {
                         match payment_response.status {
                             MeltQuoteState::Paid => {
-                                // Payment succeeded - finalize instead of compensating
-                                tracing::info!(
-                                    "Saga {} for quote {} - payment PAID on LN backend, will finalize",
-                                    saga.operation_id,
-                                    quote_id
-                                );
-
-                                if let Err(err) = self
-                                    .finalize_paid_melt_quote(
-                                        &quote,
-                                        payment_response.total_spent,
-                                        payment_response.payment_proof,
-                                        &payment_response.payment_lookup_id,
-                                    )
-                                    .await
+                                if let Err(err) = super::saga_recovery::process_melt_saga_outcome(
+                                    &saga,
+                                    &mut quote,
+                                    &payment_response,
+                                    &self.localstore,
+                                    &self.pubsub_manager,
+                                    self,
+                                )
+                                .await
                                 {
                                     tracing::error!(
-                                        "Failed to finalize paid melt saga {}: {}. Will retry on next recovery cycle.",
+                                        "Failed to process paid melt saga {}: {}. Will retry on next recovery cycle.",
                                         saga.operation_id,
                                         err
                                     );
                                     continue;
                                 }
-
-                                // Delete saga after successful finalization
-                                let mut tx = self.localstore.begin_transaction().await?;
-                                if let Err(e) = tx.delete_saga(&saga.operation_id).await {
+                                continue; // Saga handled
+                            }
+                            MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
+                                if let Err(err) = super::saga_recovery::process_melt_saga_outcome(
+                                    &saga,
+                                    &mut quote,
+                                    &payment_response,
+                                    &self.localstore,
+                                    &self.pubsub_manager,
+                                    self,
+                                )
+                                .await
+                                {
                                     tracing::error!(
-                                        "Failed to delete saga {}: {}. Will retry on next recovery cycle.",
+                                        "Failed to process failed melt saga {}: {}. Will retry on next recovery cycle.",
                                         saga.operation_id,
-                                        e
+                                        err
                                     );
-                                    tx.rollback().await?;
                                     continue;
                                 }
-                                tx.commit().await?;
-                                tracing::info!(
-                                    "Successfully recovered and finalized melt saga {}",
-                                    saga.operation_id
-                                );
-
-                                continue; // Skip compensation, saga handled
-                            }
-                            MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
-                                // Payment failed - compensate
-                                tracing::info!(
-                                    "Saga {} for quote {} - payment {} on LN backend, will compensate",
-                                    saga.operation_id,
-                                    quote_id,
-                                    payment_response.status
-                                );
-                                true
+                                continue; // Saga handled
                             }
                             MeltQuoteState::Pending | MeltQuoteState::Unknown => {
                                 // Payment still pending - skip for check_pending_melt_quotes
                                 tracing::info!(
-                                    "Saga {} for quote {} - payment {} on LN backend, skipping (will be handled by check_pending_melt_quotes)",
+                                    "Saga {} for quote {} - payment {} on LN backend, skipping",
                                     saga.operation_id,
                                     quote_id,
                                     payment_response.status
                                 );
-                                continue; // Skip this saga, don't compensate or finalize
+                                continue; // Skip this saga
                             }
                         }
                     }
@@ -544,112 +549,23 @@ impl Mint {
                     blinded_secrets.len()
                 );
 
-                let mut tx = self.localstore.begin_transaction().await?;
-
-                // Remove blinded messages (change outputs)
-                if !blinded_secrets.is_empty() {
-                    if let Err(e) = tx.delete_blinded_messages(&blinded_secrets).await {
-                        tracing::error!(
-                            "Failed to delete blinded messages for saga {}: {}",
-                            saga.operation_id,
-                            e
-                        );
-                        tx.rollback().await?;
-                        continue;
-                    }
-                }
-
-                // Remove proofs (inputs)
-                if !input_ys.is_empty() {
-                    match tx.remove_proofs(&input_ys, None).await {
-                        Ok(()) => {}
-                        Err(DatabaseError::AttemptRemoveSpentProof) => {
-                            // Proofs are already spent or missing - this is okay for compensation.
-                            // The goal is to make proofs unusable, and they already are.
-                            // Continue with saga deletion to avoid infinite recovery loop.
-                            tracing::warn!(
-                                "Saga {} compensation: proofs already spent or missing, proceeding with saga cleanup",
-                                saga.operation_id
-                            );
-                        }
-                        Err(e) => {
-                            tracing::error!(
-                                "Failed to remove proofs for saga {}: {}",
-                                saga.operation_id,
-                                e
-                            );
-                            tx.rollback().await?;
-                            continue;
-                        }
-                    }
-                }
-
-                // Reset quote state to Unpaid (melt-specific, unlike swap)
-                // Acquire lock on the quote first
-                let mut locked_quote = match tx.get_melt_quote(&quote_id_parsed).await {
-                    Ok(Some(q)) => q,
-                    Ok(None) => {
-                        tracing::warn!(
-                            "Melt quote {} not found for saga {} - may have been cleaned up",
-                            quote_id_parsed,
-                            saga.operation_id
-                        );
-                        // Continue with saga deletion even if quote is gone
-                        if let Err(e) = tx.delete_saga(&saga.operation_id).await {
-                            tracing::error!("Failed to delete saga {}: {}", saga.operation_id, e);
-                            tx.rollback().await?;
-                            continue;
-                        }
-                        tx.commit().await?;
-                        continue;
-                    }
-                    Err(e) => {
-                        tracing::error!(
-                            "Failed to get quote for saga {}: {}",
-                            saga.operation_id,
-                            e
-                        );
-                        tx.rollback().await?;
-                        continue;
-                    }
-                };
-
-                if let Err(e) = tx
-                    .update_melt_quote_state(&mut locked_quote, MeltQuoteState::Unpaid, None)
-                    .await
+                if let Err(err) = super::melt::shared::rollback_melt_quote(
+                    &self.localstore,
+                    &self.pubsub_manager,
+                    &quote_id_parsed,
+                    &input_ys,
+                    &blinded_secrets,
+                    &saga.operation_id,
+                )
+                .await
                 {
                     tracing::error!(
-                        "Failed to reset quote state for saga {}: {}",
-                        saga.operation_id,
-                        e
-                    );
-                    tx.rollback().await?;
-                    continue;
-                }
-
-                // Delete melt request tracking record
-                if let Err(e) = tx.delete_melt_request(&quote_id_parsed).await {
-                    tracing::error!(
-                        "Failed to delete melt request for saga {}: {}",
+                        "Failed to rollback melt quote {} for saga {}: {}",
+                        quote_id_parsed,
                         saga.operation_id,
-                        e
+                        err
                     );
-                    // Don't fail if melt request doesn't exist - it might not have been created yet
-                }
-
-                // Delete saga after successful compensation
-                if let Err(e) = tx.delete_saga(&saga.operation_id).await {
-                    tracing::error!("Failed to delete saga for {}: {}", saga.operation_id, e);
-                    tx.rollback().await?;
-                    continue;
                 }
-
-                tx.commit().await?;
-
-                tracing::info!(
-                    "Successfully recovered and compensated melt saga {}",
-                    saga.operation_id
-                );
             }
         }
 
@@ -660,4 +576,42 @@ impl Mint {
 
         Ok(())
     }
+
+    /// Handle pending melt quote by resuming the saga
+    pub(crate) async fn handle_pending_melt_quote(
+        &self,
+        quote: &mut MeltQuote,
+    ) -> Result<(), Error> {
+        if quote.state != MeltQuoteState::Pending {
+            return Ok(());
+        }
+
+        let saga = match self
+            .get_melt_saga_by_quote_id(&quote.id.to_string())
+            .await?
+        {
+            Some(saga) => saga,
+            None => {
+                tracing::warn!(
+                    "No saga found for pending melt quote {}, cannot resume",
+                    quote.id
+                );
+                return Ok(());
+            }
+        };
+
+        let payment_response = self.check_melt_payment_status(quote).await?;
+
+        super::saga_recovery::process_melt_saga_outcome(
+            &saga,
+            quote,
+            &payment_response,
+            &self.localstore,
+            &self.pubsub_manager,
+            self,
+        )
+        .await?;
+
+        Ok(())
+    }
 }

+ 2 - 0
crates/cdk/src/mint/subscription.rs

@@ -62,6 +62,8 @@ impl MintPubSubSpec {
             match idx {
                 NotificationId::ProofState(pk) => public_keys.push(*pk),
                 NotificationId::MeltQuoteBolt11(uuid) | NotificationId::MeltQuoteBolt12(uuid) => {
+                    // TODO: In the HTTP handler, we check with the LN backend if a payment is in a pending quote state to resolve stuck payments.
+                    // Implement similar logic here for WebSocket-only wallets.
                     if let Some(melt_quote) = self
                         .db
                         .get_melt_quote(uuid)

+ 4 - 2
crates/cdk/src/mint/swap/swap_saga/compensation.rs

@@ -4,9 +4,11 @@ use cdk_common::{Error, PublicKey};
 use tracing::instrument;
 use uuid::Uuid;
 
+use crate::mint::subscription::PubSubManager;
+
 #[async_trait]
 pub trait CompensatingAction: Send + Sync {
-    async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error>;
+    async fn execute(&self, db: &DynMintDatabase, pubsub: &PubSubManager) -> Result<(), Error>;
     fn name(&self) -> &'static str;
 }
 
@@ -31,7 +33,7 @@ pub struct RemoveSwapSetup {
 #[async_trait]
 impl CompensatingAction for RemoveSwapSetup {
     #[instrument(skip_all)]
-    async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error> {
+    async fn execute(&self, db: &DynMintDatabase, _pubsub: &PubSubManager) -> Result<(), Error> {
         if self.blinded_secrets.is_empty() && self.input_ys.is_empty() {
             return Ok(());
         }

+ 1 - 1
crates/cdk/src/mint/swap/swap_saga/mod.rs

@@ -484,7 +484,7 @@ impl<S> SwapSaga<'_, S> {
 
         while let Some(compensation) = compensations.pop_front() {
             tracing::debug!("Running compensation: {}", compensation.name());
-            if let Err(e) = compensation.execute(&self.db).await {
+            if let Err(e) = compensation.execute(&self.db, &self.pubsub).await {
                 tracing::error!(
                     "Compensation {} failed: {}. Continuing...",
                     compensation.name(),