Kaynağa Gözat

Check change unique (#1112)

* fix(cdk): prevent duplicate blinded message processing with database constraints

Add unique constraints on blinded_message column in both PostgreSQL and SQLite databases, and implement application-level checks to prevent duplicate blinded messages from being processed. Also ensure proper cleanup of melt requests after successful processing.

* feat: db tests for unique

* refactor(cdk-sql): consolidate blinded messages into blind signature table

Migrate from separate blinded_messages table to unified blind_signature table.
Add signed_time column and make c column nullable to track both pending
blind messages (c=NULL) and completed signatures. Update insert/update
logic to handle upsert scenarios for blind signature completion.

* refactor(cdk-sql): remove unique constraint migration and filter queries for signed messages

Remove database-level unique constraint on blinded_message and instead filter
queries to only consider messages with signatures (c IS NOT NULL

* refactor(database): improve blinded message duplicate detection using database constraints

Replace manual duplicate checking with database constraint handling for better
reliability and simplified code flow in melt request processing.

* refactor(cdk-sql): optimize blind signature processing with batch queries

Replace individual queries per blinded message with single batch query
and HashMap lookup to eliminate N+1 query performance issue.

* fix: signed time to swap sigs

* refactor(cdk): split blinded message handling and improve duplicate detection

- Split add_melt_request_and_blinded_messages into separate methods
- Add blinded messages to database before signing in swap operations
- Improve duplicate output detection with proper error handling
- Make add_blinded_messages method accept optional quote_id for flexibility

* refactor(cdk): add BlindedMessageWriter for improved transaction rollback

- Add BlindedMessageWriter component for managing blinded message state
- Implement proper rollback mechanisms in swap operations
- Add delete_blinded_messages database interface for cleanup
- Improve error handling with better state management
thesimplekid 1 ay önce
ebeveyn
işleme
4b04d10383

+ 2 - 1
.typos.toml

@@ -6,5 +6,6 @@ extend-ignore-re = [
     "casshuAeyJ0b2tlbiI6W3sibWludCI6Imh0dHBzOi8vODMzMy5zcGFjZTozMzM4IiwicHJvb2ZzIjpbeyJhbW91bnQiOjIsImlkIjoiMDA5YTFmMjkzMjUzZTQxZSIsInNlY3JldCI6IjQwNzkxNWJjMjEyYmU2MWE3N2UzZTZkMmFlYjRjNzI3OTgwYmRhNTFjZDA2YTZhZmMyOWUyODYxNzY4YTc4MzciLCJDIjoiMDJiYzkwOTc5OTdkODFhZmIyY2M3MzQ2YjVlNDM0NWE5MzQ2YmQyYTUwNmViNzk1ODU5OGE3MmYwY2Y4NTE2M2VhIn0seyJhbW91bnQiOjgsImlkIjoiMDA5YTFmMjkzMjUzZTQxZSIsInNlY3JldCI6ImZlMTUxMDkzMTRlNjFkNzc1NmIwZjhlZTBmMjNhNjI0YWNhYTNmNGUwNDJmNjE0MzNjNzI4YzcwNTdiOTMxYmUiLCJDIjoiMDI5ZThlNTA1MGI4OTBhN2Q2YzA5NjhkYjE2YmMxZDVkNWZhMDQwZWExZGUyODRmNmVjNjlkNjEyOTlmNjcxMDU5In1dfV0sInVuaXQiOiJzYXQiLCJtZW1vIjoiVGhhbmsgeW91LiJ9",
     "autheticator",
     "Gam",
-    "flate2"
+    "flate2",
+    "lnbc[A-Za-z0-9-_]+"
 ]

+ 14 - 2
crates/cdk-common/src/database/mint/mod.rs

@@ -132,15 +132,27 @@ pub trait QuotesTransaction<'a> {
     /// Mint Quotes Database Error
     type Err: Into<Error> + From<Error>;
 
-    /// Add melt_request with quote_id, inputs_amount, and blinded_messages
-    async fn add_melt_request_and_blinded_messages(
+    /// Add melt_request with quote_id, inputs_amount, and inputs_fee
+    async fn add_melt_request(
         &mut self,
         quote_id: &QuoteId,
         inputs_amount: Amount,
         inputs_fee: Amount,
+    ) -> Result<(), Self::Err>;
+
+    /// Add blinded_messages for a quote_id
+    async fn add_blinded_messages(
+        &mut self,
+        quote_id: Option<&QuoteId>,
         blinded_messages: &[BlindedMessage],
     ) -> Result<(), Self::Err>;
 
+    /// Delete blinded_messages by their blinded secrets
+    async fn delete_blinded_messages(
+        &mut self,
+        blinded_secrets: &[PublicKey],
+    ) -> Result<(), Self::Err>;
+
     /// Get melt_request and associated blinded_messages by quote_id
     async fn get_melt_request_and_blinded_messages(
         &mut self,

+ 197 - 1
crates/cdk-common/src/database/mint/test/mint.rs

@@ -1,8 +1,14 @@
 //! Payments
 
+use std::str::FromStr;
+
+use cashu::quote_id::QuoteId;
+use cashu::{Amount, Id, SecretKey};
+
 use crate::database::mint::test::unique_string;
 use crate::database::mint::{Database, Error, KeysDatabase};
-use crate::mint::MintQuote;
+use crate::database::MintSignaturesDatabase;
+use crate::mint::{MeltPaymentRequest, MeltQuote, MintQuote};
 use crate::payment::PaymentIdentifier;
 
 /// Add a mint quote
@@ -404,3 +410,193 @@ where
         .await
         .is_err());
 }
+/// Successful melt with unique blinded messages
+pub async fn add_melt_request_unique_blinded_messages<DB>(db: DB)
+where
+    DB: Database<Error> + KeysDatabase<Err = Error> + MintSignaturesDatabase<Err = Error>,
+{
+    let inputs_amount = Amount::from(100u64);
+    let inputs_fee = Amount::from(1u64);
+    let keyset_id = Id::from_str("001711afb1de20cb").unwrap();
+
+    // Create a dummy blinded message
+    let blinded_secret = SecretKey::generate().public_key();
+    let blinded_message = cashu::BlindedMessage {
+        blinded_secret,
+        keyset_id,
+        amount: Amount::from(100u64),
+        witness: None,
+    };
+    let blinded_messages = vec![blinded_message];
+
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let quote = MeltQuote::new(MeltPaymentRequest::Bolt11 { bolt11: "lnbc330n1p5d85skpp5344v3ktclujsjl3h09wgsfm7zytumr7h7zhrl857f5w8nv0a52zqdqqcqzzsxqyz5vqrzjqvueefmrckfdwyyu39m0lf24sqzcr9vcrmxrvgfn6empxz7phrjxvrttncqq0lcqqyqqqqlgqqqqqqgq2qsp5j3rrg8kvpemqxtf86j8tjm90wq77c7ende4e5qmrerq4xsg02vhq9qxpqysgqjltywgyk6uc5qcgwh8xnzmawl2tjlhz8d28tgp3yx8xwtz76x0jqkfh6mmq70hervjxs0keun7ur0spldgll29l0dnz3md50d65sfqqqwrwpsu".parse().unwrap() }, cashu::CurrencyUnit::Sat, 33.into(), Amount::ZERO, 0, None, None, cashu::PaymentMethod::Bolt11);
+    tx.add_melt_quote(quote.clone()).await.unwrap();
+    tx.add_melt_request(&quote.id, inputs_amount, inputs_fee)
+        .await
+        .unwrap();
+    tx.add_blinded_messages(Some(&quote.id), &blinded_messages)
+        .await
+        .unwrap();
+    tx.commit().await.unwrap();
+
+    // Verify retrieval
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let retrieved = tx
+        .get_melt_request_and_blinded_messages(&quote.id)
+        .await
+        .unwrap()
+        .unwrap();
+    assert_eq!(retrieved.inputs_amount, inputs_amount);
+    assert_eq!(retrieved.inputs_fee, inputs_fee);
+    assert_eq!(retrieved.change_outputs.len(), 1);
+    assert_eq!(retrieved.change_outputs[0].amount, Amount::from(100u64));
+    tx.commit().await.unwrap();
+}
+
+/// Reject melt with duplicate blinded message (already signed)
+pub async fn reject_melt_duplicate_blinded_signature<DB>(db: DB)
+where
+    DB: Database<Error> + KeysDatabase<Err = Error> + MintSignaturesDatabase<Err = Error>,
+{
+    let quote_id1 = QuoteId::new_uuid();
+    let inputs_amount = Amount::from(100u64);
+    let inputs_fee = Amount::from(1u64);
+    let keyset_id = Id::from_str("001711afb1de20cb").unwrap();
+
+    // Create a dummy blinded message
+    let blinded_secret = SecretKey::generate().public_key();
+    let blinded_message = cashu::BlindedMessage {
+        blinded_secret,
+        keyset_id,
+        amount: Amount::from(100u64),
+        witness: None,
+    };
+    let blinded_messages = vec![blinded_message.clone()];
+
+    // First, "sign" it by adding to blind_signature (simulate successful mint)
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let c = SecretKey::generate().public_key();
+    let blind_sig = cashu::BlindSignature {
+        amount: Amount::from(100u64),
+        keyset_id,
+        c,
+        dleq: None,
+    };
+    let blinded_secrets = vec![blinded_message.blinded_secret];
+    tx.add_blind_signatures(&blinded_secrets, &[blind_sig], Some(quote_id1))
+        .await
+        .unwrap();
+    tx.commit().await.unwrap();
+
+    // Now try to add melt request with the same blinded message - should fail due to constraint
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let quote2 = MeltQuote::new(MeltPaymentRequest::Bolt11 { bolt11: "lnbc330n1p5d85skpp5344v3ktclujsjl3h09wgsfm7zytumr7h7zhrl857f5w8nv0a52zqdqqcqzzsxqyz5vqrzjqvueefmrckfdwyyu39m0lf24sqzcr9vcrmxrvgfn6empxz7phrjxvrttncqq0lcqqyqqqqlgqqqqqqgq2qsp5j3rrg8kvpemqxtf86j8tjm90wq77c7ende4e5qmrerq4xsg02vhq9qxpqysgqjltywgyk6uc5qcgwh8xnzmawl2tjlhz8d28tgp3yx8xwtz76x0jqkfh6mmq70hervjxs0keun7ur0spldgll29l0dnz3md50d65sfqqqwrwpsu".parse().unwrap() }, cashu::CurrencyUnit::Sat, 33.into(), Amount::ZERO, 0, None, None, cashu::PaymentMethod::Bolt11);
+    tx.add_melt_quote(quote2.clone()).await.unwrap();
+    tx.add_melt_request(&quote2.id, inputs_amount, inputs_fee)
+        .await
+        .unwrap();
+    let result = tx
+        .add_blinded_messages(Some(&quote2.id), &blinded_messages)
+        .await;
+    assert!(result.is_err() && matches!(result.unwrap_err(), Error::Duplicate));
+    tx.rollback().await.unwrap(); // Rollback to avoid partial state
+}
+
+/// Reject duplicate blinded message insert via DB constraint (different quotes)
+pub async fn reject_duplicate_blinded_message_db_constraint<DB>(db: DB)
+where
+    DB: Database<Error> + KeysDatabase<Err = Error>,
+{
+    let inputs_amount = Amount::from(100u64);
+    let inputs_fee = Amount::from(1u64);
+    let keyset_id = Id::from_str("001711afb1de20cb").unwrap();
+
+    // Create a dummy blinded message
+    let blinded_secret = SecretKey::generate().public_key();
+    let blinded_message = cashu::BlindedMessage {
+        blinded_secret,
+        keyset_id,
+        amount: Amount::from(100u64),
+        witness: None,
+    };
+    let blinded_messages = vec![blinded_message];
+
+    // First insert succeeds
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let quote = MeltQuote::new(MeltPaymentRequest::Bolt11 { bolt11: "lnbc330n1p5d85skpp5344v3ktclujsjl3h09wgsfm7zytumr7h7zhrl857f5w8nv0a52zqdqqcqzzsxqyz5vqrzjqvueefmrckfdwyyu39m0lf24sqzcr9vcrmxrvgfn6empxz7phrjxvrttncqq0lcqqyqqqqlgqqqqqqgq2qsp5j3rrg8kvpemqxtf86j8tjm90wq77c7ende4e5qmrerq4xsg02vhq9qxpqysgqjltywgyk6uc5qcgwh8xnzmawl2tjlhz8d28tgp3yx8xwtz76x0jqkfh6mmq70hervjxs0keun7ur0spldgll29l0dnz3md50d65sfqqqwrwpsu".parse().unwrap() }, cashu::CurrencyUnit::Sat, 33.into(), Amount::ZERO, 0, None, None, cashu::PaymentMethod::Bolt11);
+    tx.add_melt_quote(quote.clone()).await.unwrap();
+    tx.add_melt_request(&quote.id, inputs_amount, inputs_fee)
+        .await
+        .unwrap();
+    assert!(tx
+        .add_blinded_messages(Some(&quote.id), &blinded_messages)
+        .await
+        .is_ok());
+    tx.commit().await.unwrap();
+
+    // Second insert with same blinded_message but different quote_id should fail due to unique constraint on blinded_message
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let quote = MeltQuote::new(MeltPaymentRequest::Bolt11 { bolt11: "lnbc330n1p5d85skpp5344v3ktclujsjl3h09wgsfm7zytumr7h7zhrl857f5w8nv0a52zqdqqcqzzsxqyz5vqrzjqvueefmrckfdwyyu39m0lf24sqzcr9vcrmxrvgfn6empxz7phrjxvrttncqq0lcqqyqqqqlgqqqqqqgq2qsp5j3rrg8kvpemqxtf86j8tjm90wq77c7ende4e5qmrerq4xsg02vhq9qxpqysgqjltywgyk6uc5qcgwh8xnzmawl2tjlhz8d28tgp3yx8xwtz76x0jqkfh6mmq70hervjxs0keun7ur0spldgll29l0dnz3md50d65sfqqqwrwpsu".parse().unwrap() }, cashu::CurrencyUnit::Sat, 33.into(), Amount::ZERO, 0, None, None, cashu::PaymentMethod::Bolt11);
+    tx.add_melt_quote(quote.clone()).await.unwrap();
+    tx.add_melt_request(&quote.id, inputs_amount, inputs_fee)
+        .await
+        .unwrap();
+    let result = tx
+        .add_blinded_messages(Some(&quote.id), &blinded_messages)
+        .await;
+    // Expect a database error due to unique violation
+    assert!(result.is_err()); // Specific error might be DB-specific, e.g., SqliteError or PostgresError
+    tx.rollback().await.unwrap();
+}
+
+/// Cleanup of melt request after processing
+pub async fn cleanup_melt_request_after_processing<DB>(db: DB)
+where
+    DB: Database<Error> + KeysDatabase<Err = Error>,
+{
+    let inputs_amount = Amount::from(100u64);
+    let inputs_fee = Amount::from(1u64);
+    let keyset_id = Id::from_str("001711afb1de20cb").unwrap();
+
+    // Create dummy blinded message
+    let blinded_secret = SecretKey::generate().public_key();
+    let blinded_message = cashu::BlindedMessage {
+        blinded_secret,
+        keyset_id,
+        amount: Amount::from(100u64),
+        witness: None,
+    };
+    let blinded_messages = vec![blinded_message];
+
+    // Insert melt request
+    let mut tx1 = Database::begin_transaction(&db).await.unwrap();
+    let quote = MeltQuote::new(MeltPaymentRequest::Bolt11 { bolt11: "lnbc330n1p5d85skpp5344v3ktclujsjl3h09wgsfm7zytumr7h7zhrl857f5w8nv0a52zqdqqcqzzsxqyz5vqrzjqvueefmrckfdwyyu39m0lf24sqzcr9vcrmxrvgfn6empxz7phrjxvrttncqq0lcqqyqqqqlgqqqqqqgq2qsp5j3rrg8kvpemqxtf86j8tjm90wq77c7ende4e5qmrerq4xsg02vhq9qxpqysgqjltywgyk6uc5qcgwh8xnzmawl2tjlhz8d28tgp3yx8xwtz76x0jqkfh6mmq70hervjxs0keun7ur0spldgll29l0dnz3md50d65sfqqqwrwpsu".parse().unwrap() }, cashu::CurrencyUnit::Sat, 33.into(), Amount::ZERO, 0, None, None, cashu::PaymentMethod::Bolt11);
+    tx1.add_melt_quote(quote.clone()).await.unwrap();
+    tx1.add_melt_request(&quote.id, inputs_amount, inputs_fee)
+        .await
+        .unwrap();
+    tx1.add_blinded_messages(Some(&quote.id), &blinded_messages)
+        .await
+        .unwrap();
+    tx1.commit().await.unwrap();
+
+    // Simulate processing: get and delete
+    let mut tx2 = Database::begin_transaction(&db).await.unwrap();
+    let _retrieved = tx2
+        .get_melt_request_and_blinded_messages(&quote.id)
+        .await
+        .unwrap()
+        .unwrap();
+    tx2.delete_melt_request(&quote.id).await.unwrap();
+    tx2.commit().await.unwrap();
+
+    // Verify melt_request is deleted
+    let mut tx3 = Database::begin_transaction(&db).await.unwrap();
+    let retrieved = tx3
+        .get_melt_request_and_blinded_messages(&quote.id)
+        .await
+        .unwrap();
+    assert!(retrieved.is_none());
+    tx3.commit().await.unwrap();
+}

+ 5 - 1
crates/cdk-common/src/database/mint/test/mod.rs

@@ -235,7 +235,11 @@ macro_rules! mint_db_test {
             reject_over_issue_same_tx,
             reject_over_issue_different_tx,
             reject_over_issue_with_payment,
-            reject_over_issue_with_payment_different_tx
+            reject_over_issue_with_payment_different_tx,
+            add_melt_request_unique_blinded_messages,
+            reject_melt_duplicate_blinded_signature,
+            reject_duplicate_blinded_message_db_constraint,
+            cleanup_melt_request_after_processing
         );
     };
     ($make_db_fn:ident, $($name:ident),+ $(,)?) => {

+ 23 - 0
crates/cdk-sql-common/src/mint/migrations/postgres/20250924215800_migrate_blinded_messages_to_blind_signatures.sql

@@ -0,0 +1,23 @@
+-- Remove NOT NULL constraint from c column in blind_signature table
+ALTER TABLE blind_signature ALTER COLUMN c DROP NOT NULL;
+
+-- Add signed_time column to blind_signature table
+ALTER TABLE blind_signature ADD COLUMN signed_time INTEGER NULL;
+
+-- Update existing records to set signed_time equal to created_time for existing signatures
+UPDATE blind_signature SET signed_time = created_time WHERE c IS NOT NULL;
+
+-- Insert data from blinded_messages table into blind_signature table with NULL c column
+INSERT INTO blind_signature (blinded_message, amount, keyset_id, c, quote_id, created_time, signed_time)
+SELECT blinded_message, amount, keyset_id, NULL as c, quote_id, 0 as created_time, NULL as signed_time
+FROM blinded_messages
+WHERE NOT EXISTS (
+    SELECT 1 FROM blind_signature 
+    WHERE blind_signature.blinded_message = blinded_messages.blinded_message
+);
+
+-- Create index on quote_id if it does not exist
+CREATE INDEX IF NOT EXISTS blind_signature_quote_id_index ON blind_signature(quote_id);
+
+-- Drop the blinded_messages table as data has been migrated
+DROP TABLE IF EXISTS blinded_messages;

+ 40 - 0
crates/cdk-sql-common/src/mint/migrations/sqlite/20250924215800_migrate_blinded_messages_to_blind_signatures.sql

@@ -0,0 +1,40 @@
+-- Remove NOT NULL constraint from c column in blind_signature table
+-- SQLite does not support ALTER COLUMN directly, so we need to recreate the table
+
+-- Step 1 - Create new table with nullable c column and signed_time column
+CREATE TABLE blind_signature_new (
+    blinded_message BLOB PRIMARY KEY,
+    amount INTEGER NOT NULL,
+    keyset_id TEXT NOT NULL,
+    c BLOB NULL,
+    dleq_e TEXT,
+    dleq_s TEXT,
+    quote_id TEXT,
+    created_time INTEGER NOT NULL DEFAULT 0,
+    signed_time INTEGER
+);
+
+-- Step 2 - Copy existing data from old blind_signature table
+INSERT INTO blind_signature_new (blinded_message, amount, keyset_id, c, dleq_e, dleq_s, quote_id, created_time)
+SELECT blinded_message, amount, keyset_id, c, dleq_e, dleq_s, quote_id, created_time
+FROM blind_signature;
+
+-- Step 3 - Insert data from blinded_messages table with NULL c column
+INSERT INTO blind_signature_new (blinded_message, amount, keyset_id, c, quote_id, created_time)
+SELECT blinded_message, amount, keyset_id, NULL as c, quote_id, 0 as created_time
+FROM blinded_messages
+WHERE NOT EXISTS (
+    SELECT 1 FROM blind_signature_new 
+    WHERE blind_signature_new.blinded_message = blinded_messages.blinded_message
+);
+
+-- Step 4 - Drop old table and rename new table
+DROP TABLE blind_signature;
+ALTER TABLE blind_signature_new RENAME TO blind_signature;
+
+-- Step 5 - Recreate indexes
+CREATE INDEX IF NOT EXISTS keyset_id_index ON blind_signature(keyset_id);
+CREATE INDEX IF NOT EXISTS blind_signature_quote_id_index ON blind_signature(quote_id);
+
+-- Step 6 - Drop the blinded_messages table as data has been migrated
+DROP TABLE IF EXISTS blinded_messages;

+ 199 - 45
crates/cdk-sql-common/src/mint/mod.rs

@@ -546,13 +546,13 @@ where
 {
     type Err = Error;
 
-    async fn add_melt_request_and_blinded_messages(
+    async fn add_melt_request(
         &mut self,
         quote_id: &QuoteId,
         inputs_amount: Amount,
         inputs_fee: Amount,
-        blinded_messages: &[BlindedMessage],
     ) -> Result<(), Self::Err> {
+        // Insert melt_request
         query(
             r#"
             INSERT INTO melt_request
@@ -567,26 +567,79 @@ where
         .execute(&self.inner)
         .await?;
 
+        Ok(())
+    }
+
+    async fn add_blinded_messages(
+        &mut self,
+        quote_id: Option<&QuoteId>,
+        blinded_messages: &[BlindedMessage],
+    ) -> Result<(), Self::Err> {
+        let current_time = unix_time();
+
+        // Insert blinded_messages directly into blind_signature with c = NULL
+        // Let the database constraint handle duplicate detection
         for message in blinded_messages {
-            query(
+            match query(
                 r#"
-                INSERT INTO blinded_messages
-                (quote_id, blinded_message, keyset_id, amount)
+                INSERT INTO blind_signature
+                (blinded_message, amount, keyset_id, c, quote_id, created_time)
                 VALUES
-                (:quote_id, :blinded_message, :keyset_id, :amount)
+                (:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time)
                 "#,
             )?
-            .bind("quote_id", quote_id.to_string())
             .bind(
                 "blinded_message",
                 message.blinded_secret.to_bytes().to_vec(),
             )
-            .bind("keyset_id", message.keyset_id.to_string())
             .bind("amount", message.amount.to_i64())
+            .bind("keyset_id", message.keyset_id.to_string())
+            .bind("quote_id", quote_id.map(|q| q.to_string()))
+            .bind("created_time", current_time as i64)
             .execute(&self.inner)
-            .await?;
+            .await
+            {
+                Ok(_) => continue,
+                Err(database::Error::Duplicate) => {
+                    // Primary key constraint violation - blinded message already exists
+                    // This could be either:
+                    // 1. Already signed (c IS NOT NULL) - definitely an error
+                    // 2. Already pending (c IS NULL) - also an error
+                    return Err(database::Error::Duplicate);
+                }
+                Err(err) => return Err(err),
+            }
+        }
+
+        Ok(())
+    }
+
+    async fn delete_blinded_messages(
+        &mut self,
+        blinded_secrets: &[PublicKey],
+    ) -> Result<(), Self::Err> {
+        if blinded_secrets.is_empty() {
+            return Ok(());
         }
 
+        // Delete blinded messages from blind_signature table where c IS NULL
+        // (only delete unsigned blinded messages)
+        query(
+            r#"
+            DELETE FROM blind_signature
+            WHERE blinded_message IN (:blinded_secrets) AND c IS NULL
+            "#,
+        )?
+        .bind_vec(
+            "blinded_secrets",
+            blinded_secrets
+                .iter()
+                .map(|secret| secret.to_bytes().to_vec())
+                .collect(),
+        )
+        .execute(&self.inner)
+        .await?;
+
         Ok(())
     }
 
@@ -610,11 +663,12 @@ where
             let inputs_amount: u64 = column_as_number!(row[0].clone());
             let inputs_fee: u64 = column_as_number!(row[1].clone());
 
+            // Get blinded messages from blind_signature table where c IS NULL
             let blinded_messages_rows = query(
                 r#"
                 SELECT blinded_message, keyset_id, amount
-                FROM blinded_messages
-                WHERE quote_id = :quote_id
+                FROM blind_signature
+                WHERE quote_id = :quote_id AND c IS NULL
                 "#,
             )?
             .bind("quote_id", quote_id.to_string())
@@ -650,6 +704,7 @@ where
     }
 
     async fn delete_melt_request(&mut self, quote_id: &QuoteId) -> Result<(), Self::Err> {
+        // Delete from melt_request table
         query(
             r#"
             DELETE FROM melt_request
@@ -660,6 +715,17 @@ where
         .execute(&self.inner)
         .await?;
 
+        // Also delete blinded messages (where c IS NULL) from blind_signature table
+        query(
+            r#"
+            DELETE FROM blind_signature
+            WHERE quote_id = :quote_id AND c IS NULL
+            "#,
+        )?
+        .bind("quote_id", quote_id.to_string())
+        .execute(&self.inner)
+        .await?;
+
         Ok(())
     }
 
@@ -1560,34 +1626,122 @@ where
     ) -> Result<(), Self::Err> {
         let current_time = unix_time();
 
+        if blinded_messages.len() != blind_signatures.len() {
+            return Err(database::Error::Internal(
+                "Mismatched array lengths for blinded messages and blind signatures".to_string(),
+            ));
+        }
+
+        // Select all existing rows for the given blinded messages at once
+        let mut existing_rows = query(
+            r#"
+            SELECT blinded_message, c, dleq_e, dleq_s
+            FROM blind_signature
+            WHERE blinded_message IN (:blinded_messages)
+            FOR UPDATE
+            "#,
+        )?
+        .bind_vec(
+            "blinded_messages",
+            blinded_messages
+                .iter()
+                .map(|message| message.to_bytes().to_vec())
+                .collect(),
+        )
+        .fetch_all(&self.inner)
+        .await?
+        .into_iter()
+        .map(|mut row| {
+            Ok((
+                column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
+                (row[0].clone(), row[1].clone(), row[2].clone()),
+            ))
+        })
+        .collect::<Result<HashMap<_, _>, Error>>()?;
+
+        // Iterate over the provided blinded messages and signatures
         for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
-            query(
-                r#"
-                    INSERT INTO blind_signature
-                    (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time)
-                    VALUES
-                    (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time)
-                "#,
-            )?
-            .bind("blinded_message", message.to_bytes().to_vec())
-            .bind("amount", u64::from(signature.amount) as i64)
-            .bind("keyset_id", signature.keyset_id.to_string())
-            .bind("c", signature.c.to_bytes().to_vec())
-            .bind("quote_id", quote_id.as_ref().map(|q| match q {
-                QuoteId::BASE64(s) => s.to_string(),
-                QuoteId::UUID(u) => u.hyphenated().to_string(),
-            }))
-            .bind(
-                "dleq_e",
-                signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
-            )
-            .bind(
-                "dleq_s",
-                signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
-            )
-            .bind("created_time", current_time as i64)
-            .execute(&self.inner)
-            .await?;
+            match existing_rows.remove(message) {
+                None => {
+                    // Unknown blind message: Insert new row with all columns
+                    query(
+                        r#"
+                        INSERT INTO blind_signature
+                        (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
+                        VALUES
+                        (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
+                        "#,
+                    )?
+                    .bind("blinded_message", message.to_bytes().to_vec())
+                    .bind("amount", u64::from(signature.amount) as i64)
+                    .bind("keyset_id", signature.keyset_id.to_string())
+                    .bind("c", signature.c.to_bytes().to_vec())
+                    .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
+                    .bind(
+                        "dleq_e",
+                        signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
+                    )
+                    .bind(
+                        "dleq_s",
+                        signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
+                    )
+                    .bind("created_time", current_time as i64)
+                    .bind("signed_time", current_time as i64)
+                    .execute(&self.inner)
+                    .await?;
+                }
+                Some((c, _dleq_e, _dleq_s)) => {
+                    // Blind message exists: check if c is NULL
+                    match c {
+                        Column::Null => {
+                            // Blind message with no c: Update with missing columns c, dleq_e, dleq_s
+                            query(
+                                r#"
+                                UPDATE blind_signature
+                                SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
+                                WHERE blinded_message = :blinded_message
+                                "#,
+                            )?
+                            .bind("c", signature.c.to_bytes().to_vec())
+                            .bind(
+                                "dleq_e",
+                                signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
+                            )
+                            .bind(
+                                "dleq_s",
+                                signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
+                            )
+                            .bind("blinded_message", message.to_bytes().to_vec())
+                            .bind("signed_time", current_time as i64)
+                            .bind("amount", u64::from(signature.amount) as i64)
+                            .execute(&self.inner)
+                            .await?;
+                        }
+                        _ => {
+                            // Blind message already has c: Error
+                            tracing::error!(
+                                "Attempting to add signature to message already signed {}",
+                                message
+                            );
+
+                            return Err(database::Error::Duplicate);
+                        }
+                    }
+                }
+            }
+        }
+
+        debug_assert!(
+            existing_rows.is_empty(),
+            "Unexpected existing rows remain: {:?}",
+            existing_rows.keys().collect::<Vec<_>>()
+        );
+
+        if !existing_rows.is_empty() {
+            tracing::error!("Did not check all existing rows");
+            return Err(Error::Internal(
+                "Did not check all existing rows".to_string(),
+            ));
         }
 
         Ok(())
@@ -1607,14 +1761,14 @@ where
                 blinded_message
             FROM
                 blind_signature
-            WHERE blinded_message IN (:y)
+            WHERE blinded_message IN (:b) AND c IS NOT NULL
             "#,
         )?
         .bind_vec(
-            "y",
+            "b",
             blinded_messages
                 .iter()
-                .map(|y| y.to_bytes().to_vec())
+                .map(|b| b.to_bytes().to_vec())
                 .collect(),
         )
         .fetch_all(&self.inner)
@@ -1660,11 +1814,11 @@ where
                 blinded_message
             FROM
                 blind_signature
-            WHERE blinded_message IN (:blinded_message)
+            WHERE blinded_message IN (:b) AND c IS NOT NULL
             "#,
         )?
         .bind_vec(
-            "blinded_message",
+            "b",
             blinded_messages
                 .iter()
                 .map(|b_| b_.to_bytes().to_vec())
@@ -1706,7 +1860,7 @@ where
             FROM
                 blind_signature
             WHERE
-                keyset_id=:keyset_id
+                keyset_id=:keyset_id AND c IS NOT NULL
             "#,
         )?
         .bind("keyset_id", keyset_id.to_string())
@@ -1734,7 +1888,7 @@ where
             FROM
                 blind_signature
             WHERE
-                quote_id=:quote_id
+                quote_id=:quote_id AND c IS NOT NULL
             "#,
         )?
         .bind("quote_id", quote_id.to_string())

+ 152 - 0
crates/cdk/src/mint/blinded_message_writer.rs

@@ -0,0 +1,152 @@
+//! Blinded message writer
+use std::collections::HashSet;
+
+use cdk_common::database::{self, DynMintDatabase, MintTransaction};
+use cdk_common::nuts::BlindedMessage;
+use cdk_common::{Error, PublicKey, QuoteId};
+
+type Tx<'a, 'b> = Box<dyn MintTransaction<'a, database::Error> + Send + Sync + 'b>;
+
+/// Blinded message writer
+///
+/// This is a blinded message writer that emulates a database transaction but without holding the
+/// transaction alive while waiting for external events to be fully committed to the database;
+/// instead, it maintains a `pending` state.
+///
+/// This struct allows for premature exit on error, enabling it to remove blinded messages that
+/// were added during the operation.
+///
+/// This struct is not fully ACID. If the process exits due to a panic, and the `Drop` function
+/// cannot be run, the cleanup process should reset the state.
+pub struct BlindedMessageWriter {
+    db: Option<DynMintDatabase>,
+    added_blinded_secrets: Option<HashSet<PublicKey>>,
+}
+
+impl BlindedMessageWriter {
+    /// Creates a new BlindedMessageWriter on top of the database
+    pub fn new(db: DynMintDatabase) -> Self {
+        Self {
+            db: Some(db),
+            added_blinded_secrets: Some(Default::default()),
+        }
+    }
+
+    /// The changes are permanent, consume the struct removing the database, so the Drop does
+    /// nothing
+    pub fn commit(mut self) {
+        self.db.take();
+        self.added_blinded_secrets.take();
+    }
+
+    /// Add blinded messages
+    pub async fn add_blinded_messages(
+        &mut self,
+        tx: &mut Tx<'_, '_>,
+        quote_id: Option<QuoteId>,
+        blinded_messages: &[BlindedMessage],
+    ) -> Result<Vec<PublicKey>, Error> {
+        let added_secrets = if let Some(secrets) = self.added_blinded_secrets.as_mut() {
+            secrets
+        } else {
+            return Err(Error::Internal);
+        };
+
+        if let Some(err) = tx
+            .add_blinded_messages(quote_id.as_ref(), blinded_messages)
+            .await
+            .err()
+        {
+            return match err {
+                cdk_common::database::Error::Duplicate => Err(Error::DuplicateOutputs),
+                err => Err(Error::Database(err)),
+            };
+        }
+
+        let blinded_secrets: Vec<PublicKey> = blinded_messages
+            .iter()
+            .map(|bm| bm.blinded_secret)
+            .collect();
+
+        for blinded_secret in &blinded_secrets {
+            added_secrets.insert(*blinded_secret);
+        }
+
+        Ok(blinded_secrets)
+    }
+
+    /// Rollback all changes in this BlindedMessageWriter consuming it.
+    pub async fn rollback(mut self) -> Result<(), Error> {
+        let db = if let Some(db) = self.db.take() {
+            db
+        } else {
+            return Ok(());
+        };
+        let mut tx = db.begin_transaction().await?;
+        let blinded_secrets: Vec<PublicKey> =
+            if let Some(secrets) = self.added_blinded_secrets.take() {
+                secrets.into_iter().collect()
+            } else {
+                return Ok(());
+            };
+
+        if !blinded_secrets.is_empty() {
+            tracing::info!("Rollback {} blinded messages", blinded_secrets.len(),);
+
+            remove_blinded_messages(&mut tx, &blinded_secrets).await?;
+        }
+
+        tx.commit().await?;
+
+        Ok(())
+    }
+}
+
+/// Removes blinded messages from the database
+#[inline(always)]
+async fn remove_blinded_messages(
+    tx: &mut Tx<'_, '_>,
+    blinded_secrets: &[PublicKey],
+) -> Result<(), Error> {
+    tx.delete_blinded_messages(blinded_secrets)
+        .await
+        .map_err(Error::Database)
+}
+
+#[inline(always)]
+async fn rollback_blinded_messages(
+    db: DynMintDatabase,
+    blinded_secrets: Vec<PublicKey>,
+) -> Result<(), Error> {
+    let mut tx = db.begin_transaction().await?;
+    remove_blinded_messages(&mut tx, &blinded_secrets).await?;
+    tx.commit().await?;
+
+    Ok(())
+}
+
+impl Drop for BlindedMessageWriter {
+    fn drop(&mut self) {
+        let db = if let Some(db) = self.db.take() {
+            db
+        } else {
+            tracing::debug!("Blinded message writer dropped after commit, no need to rollback.");
+            return;
+        };
+        let blinded_secrets: Vec<PublicKey> =
+            if let Some(secrets) = self.added_blinded_secrets.take() {
+                secrets.into_iter().collect()
+            } else {
+                return;
+            };
+
+        if !blinded_secrets.is_empty() {
+            tracing::debug!("Blinded message writer dropper with messages attempting to remove.");
+            tokio::spawn(async move {
+                if let Err(err) = rollback_blinded_messages(db, blinded_secrets).await {
+                    tracing::error!("Failed to rollback blinded messages in Drop: {}", err);
+                }
+            });
+        }
+    }
+}

+ 11 - 1
crates/cdk/src/mint/melt.rs

@@ -636,10 +636,15 @@ impl Mint {
 
         let inputs_fee = self.get_proofs_fee(melt_request.inputs()).await?;
 
-        tx.add_melt_request_and_blinded_messages(
+        tx.add_melt_request(
             melt_request.quote_id(),
             melt_request.inputs_amount()?,
             inputs_fee,
+        )
+        .await?;
+
+        tx.add_blinded_messages(
+            Some(melt_request.quote_id()),
             melt_request.outputs().as_ref().unwrap_or(&Vec::new()),
         )
         .await?;
@@ -997,6 +1002,9 @@ impl Mint {
                 change = Some(change_sigs);
 
                 proof_writer.commit();
+
+                tx.delete_melt_request(&quote.id).await?;
+
                 tx.commit().await?;
             } else {
                 tracing::info!(
@@ -1006,11 +1014,13 @@ impl Mint {
                     total_spent
                 );
                 proof_writer.commit();
+                tx.delete_melt_request(&quote.id).await?;
                 tx.commit().await?;
             }
         } else {
             tracing::debug!("No change required for melt {}", quote.id);
             proof_writer.commit();
+            tx.delete_melt_request(&quote.id).await?;
             tx.commit().await?;
         }
 

+ 1 - 0
crates/cdk/src/mint/mod.rs

@@ -34,6 +34,7 @@ use crate::{cdk_database, Amount};
 
 #[cfg(feature = "auth")]
 pub(crate) mod auth;
+mod blinded_message_writer;
 mod builder;
 mod check_spendable;
 mod issue;

+ 48 - 4
crates/cdk/src/mint/swap.rs

@@ -2,6 +2,7 @@
 use cdk_prometheus::METRICS;
 use tracing::instrument;
 
+use super::blinded_message_writer::BlindedMessageWriter;
 use super::nut11::{enforce_sig_flag, EnforceSigFlag};
 use super::proof_writer::ProofWriter;
 use super::{Mint, PublicKey, SigFlag, State, SwapRequest, SwapResponse};
@@ -21,11 +22,41 @@ impl Mint {
         swap_request.input_amount()?;
         swap_request.output_amount()?;
 
+        // We add blinded messages to db before attempting to sign
+        // this ensures that they are unique and have not been used before
+        let mut blinded_message_writer = BlindedMessageWriter::new(self.localstore.clone());
+        let mut tx = self.localstore.begin_transaction().await?;
+
+        match blinded_message_writer
+            .add_blinded_messages(&mut tx, None, swap_request.outputs())
+            .await
+        {
+            Ok(_) => {
+                tx.commit().await?;
+            }
+            Err(err) => {
+                #[cfg(feature = "prometheus")]
+                {
+                    METRICS.dec_in_flight_requests("process_swap_request");
+                    METRICS.record_mint_operation("process_swap_request", false);
+                    METRICS.record_error();
+                }
+                return Err(err);
+            }
+        }
+
         let promises = self.blind_sign(swap_request.outputs().to_owned()).await?;
         let input_verification =
             self.verify_inputs(swap_request.inputs())
                 .await
                 .map_err(|err| {
+                    #[cfg(feature = "prometheus")]
+                    {
+                        METRICS.dec_in_flight_requests("process_swap_request");
+                        METRICS.record_mint_operation("process_swap_request", false);
+                        METRICS.record_error();
+                    }
+
                     tracing::debug!("Input verification failed: {:?}", err);
                     err
                 })?;
@@ -49,14 +80,21 @@ impl Mint {
                 METRICS.record_error();
             }
 
+            tx.rollback().await?;
+            blinded_message_writer.rollback().await?;
+
             return Err(err);
         };
 
         let validate_sig_result = self.validate_sig_flag(&swap_request).await;
-        if validate_sig_result.is_err() {
+
+        if let Err(err) = validate_sig_result {
+            tx.rollback().await?;
+            blinded_message_writer.rollback().await?;
+
             #[cfg(feature = "prometheus")]
             self.record_swap_failure("process_swap_request");
-            return Err(validate_sig_result.err().unwrap());
+            return Err(err);
         }
         let mut proof_writer =
             ProofWriter::new(self.localstore.clone(), self.pubsub_manager.clone());
@@ -72,6 +110,8 @@ impl Mint {
                     METRICS.record_mint_operation("process_swap_request", false);
                     METRICS.record_error();
                 }
+                tx.rollback().await?;
+                blinded_message_writer.rollback().await?;
                 return Err(err);
             }
         };
@@ -80,10 +120,13 @@ impl Mint {
             .update_proofs_states(&mut tx, &input_ys, State::Spent)
             .await;
 
-        if update_proof_states_result.is_err() {
+        if let Err(err) = update_proof_states_result {
             #[cfg(feature = "prometheus")]
             self.record_swap_failure("process_swap_request");
-            return Err(update_proof_states_result.err().unwrap());
+
+            tx.rollback().await?;
+            blinded_message_writer.rollback().await?;
+            return Err(err);
         }
 
         tx.add_blind_signatures(
@@ -98,6 +141,7 @@ impl Mint {
         .await?;
 
         proof_writer.commit();
+        blinded_message_writer.commit();
         tx.commit().await?;
 
         let response = SwapResponse::new(promises);