Преглед изворни кода

Fix race condition when concurrent payments are processed for the same payment_id (#1304)

* fix race condition when concurrent payments are processed for the same payment_id


---------

Co-authored-by: Dustin Dannenhauer <dustin.td@gmail.com>
C пре 2 месеци
родитељ
комит
2fa92cb344
3 измењених фајлова са 203 додато и 14 уклоњено
  1. 163 0
      crates/cdk-integration-tests/tests/mint.rs
  2. 24 10
      crates/cdk/src/mint/ln.rs
  3. 16 4
      crates/cdk/src/mint/mod.rs

+ 163 - 0
crates/cdk-integration-tests/tests/mint.rs

@@ -98,3 +98,166 @@ async fn test_correct_keyset() {
 
     assert_ne!(new_keyset_info.id, keyset_info.id);
 }
+
+/// Test concurrent payment processing to verify race condition fix
+///
+/// This test simulates the real-world race condition where multiple concurrent
+/// payment notifications arrive for the same payment_id. Before the fix, this
+/// would cause "Payment ID already exists" errors. After the fix, all but one
+/// should gracefully handle the duplicate and return a Duplicate error.
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn test_concurrent_duplicate_payment_handling() {
+    use cashu::PaymentMethod;
+    use cdk::cdk_database::{MintDatabase, MintQuotesDatabase};
+    use cdk::mint::MintQuote;
+    use cdk::Amount;
+    use cdk_common::payment::PaymentIdentifier;
+    use tokio::task::JoinSet;
+
+    // Create a test mint with in-memory database
+    let mnemonic = Mnemonic::generate(12).unwrap();
+    let fee_reserve = FeeReserve {
+        min_fee_reserve: 1.into(),
+        percent_fee_reserve: 1.0,
+    };
+
+    let database = Arc::new(memory::empty().await.expect("valid db instance"));
+
+    let fake_wallet = FakeWallet::new(
+        fee_reserve,
+        HashMap::default(),
+        HashSet::default(),
+        0,
+        CurrencyUnit::Sat,
+    );
+
+    let mut mint_builder = MintBuilder::new(database.clone());
+
+    mint_builder = mint_builder
+        .with_name("concurrent test mint".to_string())
+        .with_description("testing concurrent payment handling".to_string());
+
+    mint_builder
+        .add_payment_processor(
+            CurrencyUnit::Sat,
+            PaymentMethod::Bolt11,
+            MintMeltLimits::new(1, 5_000),
+            Arc::new(fake_wallet),
+        )
+        .await
+        .unwrap();
+
+    let mint = mint_builder
+        .build_with_seed(database.clone(), &mnemonic.to_seed_normalized(""))
+        .await
+        .unwrap();
+
+    let quote_ttl = QuoteTTL::new(10000, 10000);
+    mint.set_quote_ttl(quote_ttl).await.unwrap();
+
+    // Create a mint quote
+    let current_time = cdk::util::unix_time();
+    let mint_quote = MintQuote::new(
+        None,
+        "concurrent_test_invoice".to_string(),
+        CurrencyUnit::Sat,
+        Some(Amount::from(1000)),
+        current_time + 3600, // expires in 1 hour
+        PaymentIdentifier::CustomId("test_lookup_id".to_string()),
+        None,
+        Amount::ZERO,
+        Amount::ZERO,
+        PaymentMethod::Bolt11,
+        current_time,
+        vec![],
+        vec![],
+    );
+
+    // Add the quote to the database
+    {
+        let mut tx = MintDatabase::begin_transaction(&*database).await.unwrap();
+        tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+        tx.commit().await.unwrap();
+    }
+
+    // Simulate 10 concurrent payment notifications with the SAME payment_id
+    let payment_id = "duplicate_payment_test_12345";
+    let mut join_set = JoinSet::new();
+
+    for i in 0..10 {
+        let db_clone = database.clone();
+        let quote_id = mint_quote.id.clone();
+        let payment_id_clone = payment_id.to_string();
+
+        join_set.spawn(async move {
+            let mut tx = MintDatabase::begin_transaction(&*db_clone).await.unwrap();
+            let result = tx
+                .increment_mint_quote_amount_paid(&quote_id, Amount::from(10), payment_id_clone)
+                .await;
+
+            if result.is_ok() {
+                tx.commit().await.unwrap();
+            }
+
+            (i, result)
+        });
+    }
+
+    // Collect results
+    let mut success_count = 0;
+    let mut duplicate_errors = 0;
+    let mut other_errors = Vec::new();
+
+    while let Some(result) = join_set.join_next().await {
+        let (task_id, db_result) = result.unwrap();
+        match db_result {
+            Ok(_) => success_count += 1,
+            Err(e) => {
+                let err_str = format!("{:?}", e);
+                if err_str.contains("Duplicate") {
+                    duplicate_errors += 1;
+                } else {
+                    other_errors.push((task_id, err_str));
+                }
+            }
+        }
+    }
+
+    // Verify results
+    assert_eq!(
+        success_count, 1,
+        "Exactly one task should successfully process the payment (got {})",
+        success_count
+    );
+    assert_eq!(
+        duplicate_errors, 9,
+        "Nine tasks should receive Duplicate error (got {})",
+        duplicate_errors
+    );
+    assert!(
+        other_errors.is_empty(),
+        "No unexpected errors should occur. Got: {:?}",
+        other_errors
+    );
+
+    // Verify the quote was incremented exactly once
+    let final_quote = MintQuotesDatabase::get_mint_quote(&*database, &mint_quote.id)
+        .await
+        .unwrap()
+        .expect("Quote should exist");
+
+    assert_eq!(
+        final_quote.amount_paid(),
+        Amount::from(10),
+        "Quote amount should be incremented exactly once"
+    );
+    assert_eq!(
+        final_quote.payments.len(),
+        1,
+        "Should have exactly one payment recorded"
+    );
+    assert_eq!(
+        final_quote.payments[0].payment_id, payment_id,
+        "Payment ID should match"
+    );
+}

+ 24 - 10
crates/cdk/src/mint/ln.rs

@@ -7,7 +7,7 @@ use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::MintQuote;
 use cdk_common::payment::DynMintPayment;
 use cdk_common::util::unix_time;
-use cdk_common::{Amount, MintQuoteState, PaymentMethod};
+use cdk_common::{database, Amount, MintQuoteState, PaymentMethod};
 use tracing::instrument;
 
 use super::subscription::PubSubManager;
@@ -83,15 +83,29 @@ impl Mint {
 
                 let amount_paid = to_unit(payment.payment_amount, &payment.unit, &quote.unit)?;
 
-                quote.increment_amount_paid(amount_paid)?;
-                quote.add_payment(amount_paid, payment.payment_id.clone(), unix_time())?;
-
-                let total_paid = tx
-                    .increment_mint_quote_amount_paid(&quote.id, amount_paid, payment.payment_id)
-                    .await?;
-
-                if let Some(pubsub_manager) = pubsub_manager.as_ref() {
-                    pubsub_manager.mint_quote_payment(quote, total_paid);
+                match tx
+                    .increment_mint_quote_amount_paid(
+                        &quote.id,
+                        amount_paid,
+                        payment.payment_id.clone(),
+                    )
+                    .await
+                {
+                    Ok(total_paid) => {
+                        quote.increment_amount_paid(amount_paid)?;
+                        quote.add_payment(amount_paid, payment.payment_id.clone(), unix_time())?;
+                        if let Some(pubsub_manager) = pubsub_manager.as_ref() {
+                            pubsub_manager.mint_quote_payment(quote, total_paid);
+                        }
+                    }
+                    Err(database::Error::Duplicate) => {
+                        tracing::debug!(
+                            "Payment ID {} already processed (caught race condition in check_mint_quote_paid)",
+                            payment.payment_id
+                        );
+                        // This is fine - another concurrent request already processed this payment
+                    }
+                    Err(e) => return Err(e.into()),
                 }
             }
         }

+ 16 - 4
crates/cdk/src/mint/mod.rs

@@ -749,14 +749,26 @@ impl Mint {
                     payment_amount_quote_unit
                 );
 
-                let total_paid = tx
+                match tx
                     .increment_mint_quote_amount_paid(
                         &mint_quote.id,
                         payment_amount_quote_unit,
-                        wait_payment_response.payment_id,
+                        wait_payment_response.payment_id.clone(),
                     )
-                    .await?;
-                pubsub_manager.mint_quote_payment(mint_quote, total_paid);
+                    .await
+                {
+                    Ok(total_paid) => {
+                        pubsub_manager.mint_quote_payment(mint_quote, total_paid);
+                    }
+                    Err(database::Error::Duplicate) => {
+                        tracing::info!(
+                            "Payment ID {} already processed (caught race condition)",
+                            wait_payment_response.payment_id
+                        );
+                        // This is fine - another concurrent request already processed this payment
+                    }
+                    Err(e) => return Err(e.into()),
+                }
             }
         } else {
             tracing::info!("Received payment notification for already seen payment.");