Browse Source

fix race condition when concurrent payments are processed for the same payment_id

Dustin Dannenhauer 2 weeks ago
parent
commit
5a50e243ff

+ 163 - 0
crates/cdk-integration-tests/tests/mint.rs

@@ -98,3 +98,166 @@ async fn test_correct_keyset() {
 
     assert_ne!(new_keyset_info.id, keyset_info.id);
 }
+
+/// Test concurrent payment processing to verify race condition fix
+///
+/// This test simulates the real-world race condition where multiple concurrent
+/// payment notifications arrive for the same payment_id. Before the fix, this
+/// would cause "Payment ID already exists" errors. After the fix, all but one
+/// should gracefully handle the duplicate and return a Duplicate error.
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn test_concurrent_duplicate_payment_handling() {
+    use cdk::cdk_database::{MintDatabase, MintQuotesDatabase};
+    use cdk::mint::MintQuote;
+    use cdk::Amount;
+    use cashu::PaymentMethod;
+    use cdk_common::payment::PaymentIdentifier;
+    use tokio::task::JoinSet;
+
+    // Create a test mint with in-memory database
+    let mnemonic = Mnemonic::generate(12).unwrap();
+    let fee_reserve = FeeReserve {
+        min_fee_reserve: 1.into(),
+        percent_fee_reserve: 1.0,
+    };
+
+    let database = Arc::new(memory::empty().await.expect("valid db instance"));
+
+    let fake_wallet = FakeWallet::new(
+        fee_reserve,
+        HashMap::default(),
+        HashSet::default(),
+        0,
+        CurrencyUnit::Sat,
+    );
+
+    let mut mint_builder = MintBuilder::new(database.clone());
+
+    mint_builder = mint_builder
+        .with_name("concurrent test mint".to_string())
+        .with_description("testing concurrent payment handling".to_string());
+
+    mint_builder
+        .add_payment_processor(
+            CurrencyUnit::Sat,
+            PaymentMethod::Bolt11,
+            MintMeltLimits::new(1, 5_000),
+            Arc::new(fake_wallet),
+        )
+        .await
+        .unwrap();
+
+    let mint = mint_builder
+        .build_with_seed(database.clone(), &mnemonic.to_seed_normalized(""))
+        .await
+        .unwrap();
+
+    let quote_ttl = QuoteTTL::new(10000, 10000);
+    mint.set_quote_ttl(quote_ttl).await.unwrap();
+
+    // Create a mint quote
+    let current_time = cdk::util::unix_time();
+    let mint_quote = MintQuote::new(
+        None,
+        "concurrent_test_invoice".to_string(),
+        CurrencyUnit::Sat,
+        Some(Amount::from(1000)),
+        current_time + 3600, // expires in 1 hour
+        PaymentIdentifier::CustomId("test_lookup_id".to_string()),
+        None,
+        Amount::ZERO,
+        Amount::ZERO,
+        PaymentMethod::Bolt11,
+        current_time,
+        vec![],
+        vec![],
+    );
+
+    // Add the quote to the database
+    {
+        let mut tx = MintDatabase::begin_transaction(&*database).await.unwrap();
+        tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+        tx.commit().await.unwrap();
+    }
+
+    // Simulate 10 concurrent payment notifications with the SAME payment_id
+    let payment_id = "duplicate_payment_test_12345";
+    let mut join_set = JoinSet::new();
+
+    for i in 0..10 {
+        let db_clone = database.clone();
+        let quote_id = mint_quote.id.clone();
+        let payment_id_clone = payment_id.to_string();
+
+        join_set.spawn(async move {
+            let mut tx = MintDatabase::begin_transaction(&*db_clone).await.unwrap();
+            let result = tx
+                .increment_mint_quote_amount_paid(&quote_id, Amount::from(10), payment_id_clone)
+                .await;
+
+            if result.is_ok() {
+                tx.commit().await.unwrap();
+            }
+
+            (i, result)
+        });
+    }
+
+    // Collect results
+    let mut success_count = 0;
+    let mut duplicate_errors = 0;
+    let mut other_errors = Vec::new();
+
+    while let Some(result) = join_set.join_next().await {
+        let (task_id, db_result) = result.unwrap();
+        match db_result {
+            Ok(_) => success_count += 1,
+            Err(e) => {
+                let err_str = format!("{:?}", e);
+                if err_str.contains("Duplicate") {
+                    duplicate_errors += 1;
+                } else {
+                    other_errors.push((task_id, err_str));
+                }
+            }
+        }
+    }
+
+    // Verify results
+    assert_eq!(
+        success_count, 1,
+        "Exactly one task should successfully process the payment (got {})",
+        success_count
+    );
+    assert_eq!(
+        duplicate_errors, 9,
+        "Nine tasks should receive Duplicate error (got {})",
+        duplicate_errors
+    );
+    assert!(
+        other_errors.is_empty(),
+        "No unexpected errors should occur. Got: {:?}",
+        other_errors
+    );
+
+    // Verify the quote was incremented exactly once
+    let final_quote = MintQuotesDatabase::get_mint_quote(&*database, &mint_quote.id)
+        .await
+        .unwrap()
+        .expect("Quote should exist");
+
+    assert_eq!(
+        final_quote.amount_paid(),
+        Amount::from(10),
+        "Quote amount should be incremented exactly once"
+    );
+    assert_eq!(
+        final_quote.payments.len(),
+        1,
+        "Should have exactly one payment recorded"
+    );
+    assert_eq!(
+        final_quote.payments[0].payment_id, payment_id,
+        "Payment ID should match"
+    );
+}

+ 40 - 42
crates/cdk-sql-common/src/mint/mod.rs

@@ -765,25 +765,8 @@ where
             return Err(Error::Duplicate);
         }
 
-        // Check if payment_id already exists in mint_quote_payments
-        let exists = query(
-            r#"
-            SELECT payment_id
-            FROM mint_quote_payments
-            WHERE payment_id = :payment_id
-            FOR UPDATE
-            "#,
-        )?
-        .bind("payment_id", payment_id.clone())
-        .fetch_one(&self.inner)
-        .await?;
-
-        if exists.is_some() {
-            tracing::error!("Payment ID already exists: {}", payment_id);
-            return Err(database::Error::Duplicate);
-        }
-
-        // Get current amount_paid from quote
+        // Step 1: Lock the mint_quote row FIRST to prevent deadlocks
+        // This establishes a consistent lock ordering across all transactions
         let current_amount = query(
             r#"
             SELECT amount_paid
@@ -796,7 +779,7 @@ where
         .fetch_one(&self.inner)
         .await
         .inspect_err(|err| {
-            tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
+            tracing::error!("Could not get mint quote amount_paid: {}", err);
         })?;
 
         let current_amount_paid = if let Some(current_amount) = current_amount {
@@ -806,7 +789,41 @@ where
             Amount::ZERO
         };
 
-        // Calculate new amount_paid with overflow check
+        // Step 2: Try to insert payment_id - this will fail fast if it's a duplicate
+        // The UNIQUE constraint on payment_id will prevent duplicate insertions atomically
+        let insert_result = query(
+            r#"
+            INSERT INTO mint_quote_payments
+            (quote_id, payment_id, amount, timestamp)
+            VALUES (:quote_id, :payment_id, :amount, :timestamp)
+            "#,
+        )?
+        .bind("quote_id", quote_id.to_string())
+        .bind("payment_id", payment_id.clone())
+        .bind("amount", amount_paid.to_i64())
+        .bind("timestamp", unix_time() as i64)
+        .execute(&self.inner)
+        .await;
+
+        // Check if insert failed due to duplicate payment_id
+        match insert_result {
+            Ok(_) => {
+                // Insert succeeded - now safe to update the amount
+            }
+            Err(err) => {
+                // Check if error is due to UNIQUE constraint violation
+                let err_msg = err.to_string().to_lowercase();
+                if err_msg.contains("unique") || err_msg.contains("duplicate") {
+                    tracing::debug!("Payment ID already processed: {}", payment_id);
+                    return Err(database::Error::Duplicate);
+                } else {
+                    tracing::error!("Could not insert payment ID: {}", err);
+                    return Err(err);
+                }
+            }
+        }
+
+        // Step 3: Calculate new amount_paid with overflow check
         let new_amount_paid = current_amount_paid
             .checked_add(amount_paid)
             .ok_or_else(|| database::Error::AmountOverflow)?;
@@ -818,7 +835,7 @@ where
             new_amount_paid
         );
 
-        // Update the amount_paid
+        // Step 4: Update the amount_paid
         query(
             r#"
             UPDATE mint_quote
@@ -831,26 +848,7 @@ where
         .execute(&self.inner)
         .await
         .inspect_err(|err| {
-            tracing::error!("SQLite could not update mint quote amount_paid: {}", err);
-        })?;
-
-        // Add payment_id to mint_quote_payments table
-        query(
-            r#"
-            INSERT INTO mint_quote_payments
-            (quote_id, payment_id, amount, timestamp)
-            VALUES (:quote_id, :payment_id, :amount, :timestamp)
-            "#,
-        )?
-        .bind("quote_id", quote_id.to_string())
-        .bind("payment_id", payment_id)
-        .bind("amount", amount_paid.to_i64())
-        .bind("timestamp", unix_time() as i64)
-        .execute(&self.inner)
-        .await
-        .map_err(|err| {
-            tracing::error!("SQLite could not insert payment ID: {}", err);
-            err
+            tracing::error!("Could not update mint quote amount_paid: {}", err);
         })?;
 
         Ok(new_amount_paid)

+ 25 - 9
crates/cdk/src/mint/ln.rs

@@ -3,6 +3,7 @@ use std::sync::Arc;
 
 use cdk_common::amount::to_unit;
 use cdk_common::common::PaymentProcessorKey;
+use cdk_common::database;
 use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::MintQuote;
 use cdk_common::payment::DynMintPayment;
@@ -83,15 +84,30 @@ impl Mint {
 
                 let amount_paid = to_unit(payment.payment_amount, &payment.unit, &quote.unit)?;
 
-                quote.increment_amount_paid(amount_paid)?;
-                quote.add_payment(amount_paid, payment.payment_id.clone(), unix_time())?;
-
-                let total_paid = tx
-                    .increment_mint_quote_amount_paid(&quote.id, amount_paid, payment.payment_id)
-                    .await?;
-
-                if let Some(pubsub_manager) = pubsub_manager.as_ref() {
-                    pubsub_manager.mint_quote_payment(quote, total_paid);
+                match tx
+                    .increment_mint_quote_amount_paid(
+                        &quote.id,
+                        amount_paid,
+                        payment.payment_id.clone(),
+                    )
+                    .await
+                {
+                    Ok(total_paid) => {
+                        quote.increment_amount_paid(amount_paid)?;
+                        quote.add_payment(amount_paid, payment.payment_id.clone(), unix_time())?;
+                        if let Some(pubsub_manager) = pubsub_manager.as_ref() {
+                            pubsub_manager.mint_quote_payment(quote, total_paid);
+                        }
+                    }
+                    Err(database::Error::Duplicate) => {
+                        tracing::debug!(
+                            "Payment ID {} already processed (caught race condition in check_mint_quote_paid)",
+                            payment.payment_id
+                        );
+                        // This is fine - another concurrent request already processed this payment
+                        // The in-memory check at line 49 can miss this due to stale data
+                    }
+                    Err(e) => return Err(e.into()),
                 }
             }
         }

+ 17 - 4
crates/cdk/src/mint/mod.rs

@@ -749,14 +749,27 @@ impl Mint {
                     payment_amount_quote_unit
                 );
 
-                let total_paid = tx
+                match tx
                     .increment_mint_quote_amount_paid(
                         &mint_quote.id,
                         payment_amount_quote_unit,
-                        wait_payment_response.payment_id,
+                        wait_payment_response.payment_id.clone(),
                     )
-                    .await?;
-                pubsub_manager.mint_quote_payment(mint_quote, total_paid);
+                    .await
+                {
+                    Ok(total_paid) => {
+                        pubsub_manager.mint_quote_payment(mint_quote, total_paid);
+                    }
+                    Err(database::Error::Duplicate) => {
+                        tracing::info!(
+                            "Payment ID {} already processed (caught race condition)",
+                            wait_payment_response.payment_id
+                        );
+                        // This is fine - another concurrent request already processed this payment
+                        // The in-memory check at line 712 can miss this due to stale data
+                    }
+                    Err(e) => return Err(e.into()),
+                }
             }
         } else {
             tracing::info!("Received payment notification for already seen payment.");