Parcourir la source

Do not perform external calls during a database transaction. (#954)

The codebase was used to correctly perform signatory calls during a database
transaction, as the signatory was previously exclusively in process. However, a
few months ago, it was changed to be a trait that can be either local or
remote. Making external calls to services, adding latency, during an ongoing
database transaction is a bad idea because it will lock the rows until the
service call is finalized, which is unpredictable.

The issue is even worse in our pipeline where the SQLite storage driver is used
with the ":memory:" path, which forces the Database pool to have a size of 1.
Since our tests run in parallel, they would randomly fail.

This issue was failing in the CI, but the error was not making the pipeline
fail. This bug was fixed as well.
C il y a 2 mois
Parent
commit
64f7b07855

+ 2 - 5
crates/cashu/src/nuts/nut04.rs

@@ -331,7 +331,7 @@ mod tests {
 
         match settings.options {
             Some(MintMethodOptions::Bolt11 { description }) => {
-                assert_eq!(description, true);
+                assert!(description);
             }
             _ => panic!("Expected Bolt11 options with description = true"),
         }
@@ -363,10 +363,7 @@ mod tests {
 
         match settings.options {
             Some(MintMethodOptions::Bolt11 { description }) => {
-                assert_eq!(
-                    description, true,
-                    "Top-level description should take precedence"
-                );
+                assert!(description, "Top-level description should take precedence");
             }
             _ => panic!("Expected Bolt11 options with description = true"),
         }

+ 2 - 5
crates/cashu/src/nuts/nut05.rs

@@ -405,7 +405,7 @@ mod tests {
 
         match settings.options {
             Some(MeltMethodOptions::Bolt11 { amountless }) => {
-                assert_eq!(amountless, true);
+                assert!(amountless);
             }
             _ => panic!("Expected Bolt11 options with amountless = true"),
         }
@@ -437,10 +437,7 @@ mod tests {
 
         match settings.options {
             Some(MeltMethodOptions::Bolt11 { amountless }) => {
-                assert_eq!(
-                    amountless, true,
-                    "Top-level amountless should take precedence"
-                );
+                assert!(amountless, "Top-level amountless should take precedence");
             }
             _ => panic!("Expected Bolt11 options with amountless = true"),
         }

+ 1 - 1
crates/cashu/src/nuts/nut18/payment_request.rs

@@ -404,7 +404,7 @@ mod tests {
         let bolt11 = Bolt11Invoice::from_str(bolt11).unwrap();
 
         let nut10 = SpendingConditions::HTLCConditions {
-            data: bolt11.payment_hash().clone(),
+            data: *bolt11.payment_hash(),
             conditions: None,
         };
 

+ 1 - 1
crates/cdk-integration-tests/src/lib.rs

@@ -177,7 +177,7 @@ pub async fn wait_for_mint_to_be_paid(
         result = timeout_future => {
             match result {
                 Ok(payment_result) => payment_result,
-                Err(_) => Err(anyhow!("Timeout waiting for mint quote to be paid")),
+                Err(_) => Err(anyhow!("Timeout waiting for mint quote ({}) to be paid", mint_quote_id)),
             }
         }
         result = periodic_task => {

+ 1 - 0
crates/cdk-integration-tests/tests/fake_wallet.rs

@@ -1204,6 +1204,7 @@ async fn test_fake_mint_duplicate_proofs_swap() {
 
     let blinded_message = pre_mint.blinded_messages();
 
+    let inputs = vec![proofs[0].clone()];
     let outputs = vec![blinded_message[0].clone(), blinded_message[0].clone()];
 
     let swap_request = SwapRequest::new(inputs, outputs);

+ 1 - 2
crates/cdk-integration-tests/tests/integration_tests_pure.rs

@@ -321,8 +321,7 @@ async fn test_attempt_to_swap_by_overflowing() {
             cdk::Error::AmountOverflow => (),
             cdk::Error::AmountError(_) => (),
             _ => {
-                println!("{:?}", err);
-                panic!("Wrong error returned in swap overflow")
+                panic!("Wrong error returned in swap overflow {:?}", err);
             }
         },
     }

+ 8 - 0
crates/cdk-sql-common/src/pool.rs

@@ -8,6 +8,8 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::sync::{Arc, Condvar, Mutex};
 use std::time::Duration;
 
+use tokio::time::Instant;
+
 use crate::database::DatabaseConnector;
 
 /// Pool error
@@ -163,6 +165,7 @@ where
         timeout: Duration,
     ) -> Result<PooledResource<RM>, Error<RM::Error>> {
         let mut resources = self.queue.lock().map_err(|_| Error::Poison)?;
+        let time = Instant::now();
 
         loop {
             if let Some((stale, resource)) = resources.pop() {
@@ -197,6 +200,11 @@ where
                 .map_err(|_| Error::Poison)
                 .and_then(|(lock, timeout_result)| {
                     if timeout_result.timed_out() {
+                        tracing::warn!(
+                            "Timeout waiting for the resource (pool size: {}). Waited {} ms",
+                            self.max_size,
+                            time.elapsed().as_millis()
+                        );
                         Err(Error::Timeout)
                     } else {
                         Ok(lock)

+ 6 - 7
crates/cdk/src/mint/issue/mod.rs

@@ -496,6 +496,12 @@ impl Mint {
 
         self.check_mint_quote_paid(&mut mint_quote).await?;
 
+        // get the blind signatures before having starting the db transaction, if there are any
+        // rollbacks this blind_signatures will be lost, and the signature is stateless. It is not a
+        // good idea to call an external service (which is really a trait, it could be anything
+        // anywhere) while keeping a database transaction on-going
+        let blind_signatures = self.blind_sign(mint_request.outputs.clone()).await?;
+
         let mut tx = self.localstore.begin_transaction().await?;
 
         let mint_quote = tx
@@ -568,13 +574,6 @@ impl Mint {
         let unit = unit.ok_or(Error::UnsupportedUnit).unwrap();
         ensure_cdk!(unit == mint_quote.unit, Error::UnsupportedUnit);
 
-        let mut blind_signatures = Vec::with_capacity(mint_request.outputs.len());
-
-        for blinded_message in mint_request.outputs.iter() {
-            let blind_signature = self.blind_sign(blinded_message.clone()).await?;
-            blind_signatures.push(blind_signature);
-        }
-
         tx.add_blind_signatures(
             &mint_request
                 .outputs

+ 24 - 11
crates/cdk/src/mint/melt.rs

@@ -448,6 +448,7 @@ impl Mint {
     pub async fn verify_melt_request(
         &self,
         tx: &mut Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>,
+        input_verification: Verification,
         melt_request: &MeltRequest<Uuid>,
     ) -> Result<(ProofWriter, MeltQuote), Error> {
         let (state, quote) = tx
@@ -467,7 +468,7 @@ impl Mint {
         let Verification {
             amount: input_amount,
             unit: input_unit,
-        } = self.verify_inputs(melt_request.inputs()).await?;
+        } = input_verification;
 
         ensure_cdk!(input_unit.is_some(), Error::UnsupportedUnit);
 
@@ -545,10 +546,12 @@ impl Mint {
             }
         }
 
+        let verification = self.verify_inputs(melt_request.inputs()).await?;
+
         let mut tx = self.localstore.begin_transaction().await?;
 
         let (proof_writer, quote) = self
-            .verify_melt_request(&mut tx, melt_request)
+            .verify_melt_request(&mut tx, verification, melt_request)
             .await
             .map_err(|err| {
                 tracing::debug!("Error attempting to verify melt quote: {}", err);
@@ -785,7 +788,6 @@ impl Mint {
                 let change_target = melt_request.inputs_amount()? - total_spent - fee;
 
                 let mut amounts = change_target.split();
-                let mut change_sigs = Vec::with_capacity(amounts.len());
 
                 if outputs.len().lt(&amounts.len()) {
                     tracing::debug!(
@@ -800,15 +802,20 @@ impl Mint {
                     amounts.sort_by(|a, b| b.cmp(a));
                 }
 
-                let mut outputs = outputs;
+                let mut blinded_messages = vec![];
 
-                for (amount, blinded_message) in amounts.iter().zip(&mut outputs) {
+                for (amount, mut blinded_message) in amounts.iter().zip(outputs.clone()) {
                     blinded_message.amount = *amount;
-
-                    let blinded_signature = self.blind_sign(blinded_message.clone()).await?;
-                    change_sigs.push(blinded_signature)
+                    blinded_messages.push(blinded_message);
                 }
 
+                // commit db transaction before calling the signatory
+                tx.commit().await?;
+
+                let change_sigs = self.blind_sign(blinded_messages).await?;
+
+                let mut tx = self.localstore.begin_transaction().await?;
+
                 tx.add_blind_signatures(
                     &outputs[0..change_sigs.len()]
                         .iter()
@@ -820,7 +827,16 @@ impl Mint {
                 .await?;
 
                 change = Some(change_sigs);
+
+                proof_writer.commit();
+                tx.commit().await?;
+            } else {
+                proof_writer.commit();
+                tx.commit().await?;
             }
+        } else {
+            proof_writer.commit();
+            tx.commit().await?;
         }
 
         self.pubsub_manager.melt_quote_status(
@@ -830,9 +846,6 @@ impl Mint {
             MeltQuoteState::Paid,
         );
 
-        proof_writer.commit();
-        tx.commit().await?;
-
         Ok(MeltQuoteBolt11Response {
             amount: quote.amount,
             paid: Some(true),

+ 12 - 11
crates/cdk/src/mint/mod.rs

@@ -201,7 +201,7 @@ impl Mint {
     /// - Invoice payment monitoring across all configured payment processors
     ///
     /// Future services may include:
-    /// - Quote cleanup and expiration management  
+    /// - Quote cleanup and expiration management
     /// - Periodic database maintenance
     /// - Health check monitoring
     /// - Metrics collection
@@ -475,6 +475,7 @@ impl Mint {
                     match result {
                         Ok(mut stream) => {
                             while let Some(request_lookup_id) = stream.next().await {
+                                tracing::debug!("Received payment {:?}", request_lookup_id);
                                 if let Err(e) = Self::handle_payment_notification(
                                     &localstore,
                                     &pubsub_manager,
@@ -516,6 +517,11 @@ impl Mint {
             .get_mint_quote_by_request_lookup_id(&wait_payment_response.payment_identifier)
             .await
         {
+            tracing::debug!(
+                "Received payment {} for quote-id {}",
+                wait_payment_response.payment_identifier,
+                mint_quote.id
+            );
             Self::handle_mint_quote_payment(
                 &mut tx,
                 &mint_quote,
@@ -525,7 +531,7 @@ impl Mint {
             .await?;
         } else {
             tracing::warn!(
-                "Could not get request for request lookup id {:?}.",
+                "Could not get request for request lookup id {:?}",
                 wait_payment_response.payment_identifier
             );
         }
@@ -635,13 +641,9 @@ impl Mint {
     #[tracing::instrument(skip_all)]
     pub async fn blind_sign(
         &self,
-        blinded_message: BlindedMessage,
-    ) -> Result<BlindSignature, Error> {
-        self.signatory
-            .blind_sign(vec![blinded_message])
-            .await?
-            .pop()
-            .ok_or(Error::Internal)
+        blinded_messages: Vec<BlindedMessage>,
+    ) -> Result<Vec<BlindSignature>, Error> {
+        self.signatory.blind_sign(blinded_messages).await
     }
 
     /// Verify [`Proof`] meets conditions and is signed
@@ -699,7 +701,6 @@ impl Mint {
                 return Err(Error::Internal);
             }
         };
-        tracing::error!("internal stuff");
 
         // Mint quote has already been settled, proofs should not be burned or held.
         if mint_quote.state() == MintQuoteState::Issued
@@ -716,7 +717,7 @@ impl Mint {
         if let Some(amount) = mint_quote.amount {
             if amount > inputs_amount_quote_unit {
                 tracing::debug!(
-                    "Not enough inuts provided: {} needed {}",
+                    "Not enough inputs provided: {} needed {}",
                     inputs_amount_quote_unit,
                     amount
                 );

+ 20 - 8
crates/cdk/src/mint/swap.rs

@@ -12,10 +12,29 @@ impl Mint {
         &self,
         swap_request: SwapRequest,
     ) -> Result<SwapResponse, Error> {
+        // Do the external call before beginning the db transaction
+        // Check any overflow before talking to the signatory
+        swap_request.input_amount()?;
+        swap_request.output_amount()?;
+
+        let promises = self.blind_sign(swap_request.outputs().to_owned()).await?;
+        let input_verification =
+            self.verify_inputs(swap_request.inputs())
+                .await
+                .map_err(|err| {
+                    tracing::debug!("Input verification failed: {:?}", err);
+                    err
+                })?;
+
         let mut tx = self.localstore.begin_transaction().await?;
 
         if let Err(err) = self
-            .verify_transaction_balanced(&mut tx, swap_request.inputs(), swap_request.outputs())
+            .verify_transaction_balanced(
+                &mut tx,
+                input_verification,
+                swap_request.inputs(),
+                swap_request.outputs(),
+            )
             .await
         {
             tracing::debug!("Attempt to swap unbalanced transaction, aborting: {err}");
@@ -30,13 +49,6 @@ impl Mint {
             .add_proofs(&mut tx, swap_request.inputs())
             .await?;
 
-        let mut promises = Vec::with_capacity(swap_request.outputs().len());
-
-        for blinded_message in swap_request.outputs() {
-            let blinded_signature = self.blind_sign(blinded_message.clone()).await?;
-            promises.push(blinded_signature);
-        }
-
         proof_writer
             .update_proofs_states(&mut tx, &input_ys, State::Spent)
             .await?;

+ 3 - 9
crates/cdk/src/mint/verification.rs

@@ -58,10 +58,7 @@ impl Mint {
     ///
     /// Checks that the outputs are all of the same unit and the keyset is active
     #[instrument(skip_all)]
-    pub async fn verify_outputs_keyset(
-        &self,
-        outputs: &[BlindedMessage],
-    ) -> Result<CurrencyUnit, Error> {
+    pub fn verify_outputs_keyset(&self, outputs: &[BlindedMessage]) -> Result<CurrencyUnit, Error> {
         let mut keyset_units = HashSet::new();
 
         let output_keyset_ids: HashSet<Id> = outputs.iter().map(|p| p.keyset_id).collect();
@@ -189,7 +186,7 @@ impl Mint {
         Mint::check_outputs_unique(outputs)?;
         self.check_output_already_signed(tx, outputs).await?;
 
-        let unit = self.verify_outputs_keyset(outputs).await?;
+        let unit = self.verify_outputs_keyset(outputs)?;
 
         let amount = Amount::try_sum(outputs.iter().map(|o| o.amount).collect::<Vec<Amount>>())?;
 
@@ -221,6 +218,7 @@ impl Mint {
     pub async fn verify_transaction_balanced(
         &self,
         tx: &mut Box<dyn cdk_database::MintTransaction<'_, cdk_database::Error> + Send + Sync + '_>,
+        input_verification: Verification,
         inputs: &Proofs,
         outputs: &[BlindedMessage],
     ) -> Result<(), Error> {
@@ -228,10 +226,6 @@ impl Mint {
             tracing::debug!("Output verification failed: {:?}", err);
             err
         })?;
-        let input_verification = self.verify_inputs(inputs).await.map_err(|err| {
-            tracing::debug!("Input verification failed: {:?}", err);
-            err
-        })?;
 
         if output_verification.unit != input_verification.unit {
             tracing::debug!(

+ 18 - 0
justfile

@@ -79,8 +79,19 @@ test-all db="memory":
     #!/usr/bin/env bash
     just test {{db}}
     ./misc/itests.sh "{{db}}"
+    status=$?
+    if [ $status -ne 0 ]; then
+       echo "Failed test with status {$status}"
+       exit $status
+    fi
     ./misc/fake_itests.sh "{{db}}" external_signatory
+    status=$?
+    if [ $status -ne 0 ]; then
+       echo "Failed test with status {$status}"
+       exit $status
+    fi
     ./misc/fake_itests.sh "{{db}}"
+    exit $?
     
 test-nutshell:
   #!/usr/bin/env bash
@@ -150,12 +161,19 @@ goose-changelog-commits *COMMITS="5":
 itest db:
   #!/usr/bin/env bash
   ./misc/itests.sh "{{db}}"
+  exit $?
 
   
 fake-mint-itest db:
   #!/usr/bin/env bash
   ./misc/fake_itests.sh "{{db}}" external_signatory
+  status=$?
+  if [ $status -ne 0 ]; then
+     echo "Failed test with status {$status}"
+     exit $status
+  fi
   ./misc/fake_itests.sh "{{db}}"
+  exit $?
 
   
 itest-payment-processor ln: