ソースを参照

Add row locking mechanism for transaction safety

Add a LockedRows tracking system to enforce that resources must be
read (and locked) before being modified within a transaction. This
prevents race conditions and ensures data consistency.

Key changes:
- Add locked_row module with RowId enum and LockedRows tracker
- Change add_mint_quote to return the quote and lock it
- Change increment_mint_quote_amount_paid to take quote by value
  instead of quote_id, returning the updated quote
- Change increment_mint_quote_amount_issued to take quote by value
  instead of quote_id, returning the updated quote
- Add get_proofs_states to ProofsTransaction trait
- Auto-lock quotes when fetched via get_mint_quote, get_melt_quote,
  and related lookup methods
- Add state transition checks before updating proof states in
  melt and swap sagas

This ensures the database layer can reject modifications to resources
that weren't properly read first, forcing callers to always work with
fresh data from the database.
Cesar Rodas 3 週間 前
コミット
9519887bc6

+ 127 - 0
crates/cdk-common/src/database/locked_row.rs

@@ -0,0 +1,127 @@
+//! Row locking mechanism for database transactions.
+//!
+//! This module provides a mechanism for database layers to track which rows are currently
+//! locked within a transaction. The primary advantage is ensuring that upper layers always
+//! read the latest state from the database and properly lock resources before modifications.
+//!
+//! By requiring explicit locking before updates, this prevents race conditions and ensures
+//! data consistency when multiple operations might attempt to modify the same resources
+//! concurrently.
+
+use std::collections::HashSet;
+
+use cashu::quote_id::QuoteId;
+use cashu::PublicKey;
+
+use crate::database::Error;
+
+/// Identifies a database row that can be locked.
+///
+/// This enum represents the different types of resources that can be locked
+/// during a database transaction, allowing for type-safe tracking of locked rows.
+#[derive(Debug, Hash, Eq, PartialEq)]
+pub enum RowId {
+    /// A proof identified by its public key.
+    Proof(PublicKey),
+    /// A quote identified by its quote ID.
+    Quote(QuoteId),
+}
+
+impl From<PublicKey> for RowId {
+    fn from(value: PublicKey) -> Self {
+        RowId::Proof(value)
+    }
+}
+
+impl From<&PublicKey> for RowId {
+    fn from(value: &PublicKey) -> Self {
+        RowId::Proof(*value)
+    }
+}
+
+impl From<&QuoteId> for RowId {
+    fn from(value: &QuoteId) -> Self {
+        RowId::Quote(value.to_owned())
+    }
+}
+
+/// Tracks which rows are currently locked within a transaction.
+///
+/// This structure maintains a set of locked row identifiers, allowing the database
+/// layer to verify that rows have been properly locked before allowing modifications.
+/// This ensures that:
+///
+/// - Resources are read from the database before being modified (forcing fresh reads)
+/// - Multiple concurrent operations cannot modify the same resource simultaneously
+/// - Updates to unlocked rows are rejected, preventing accidental data corruption
+#[derive(Debug, Default)]
+pub struct LockedRows {
+    inner: HashSet<RowId>,
+}
+
+impl LockedRows {
+    /// Locks a single row, marking it as acquired for modification.
+    ///
+    /// After locking, any subsequent calls to [`is_locked`](Self::is_locked) for this
+    /// row will succeed. This should be called when reading a row that will be modified.
+    pub fn lock<T>(&mut self, record_id: T)
+    where
+        T: Into<RowId>,
+    {
+        self.inner.insert(record_id.into());
+    }
+
+    /// Locks multiple rows at once.
+    ///
+    /// This is a convenience method equivalent to calling [`lock`](Self::lock)
+    /// for each item in the collection.
+    pub fn lock_many<T>(&mut self, records_id: Vec<T>)
+    where
+        T: Into<RowId>,
+    {
+        records_id.into_iter().for_each(|record_id| {
+            self.inner.insert(record_id.into());
+        });
+    }
+
+    /// Verifies that all specified rows are currently locked.
+    ///
+    /// # Errors
+    ///
+    /// Returns [`Error::UpdatingUnlockedRecord`] if any of the specified rows
+    /// have not been locked. This prevents modifications to rows that weren't
+    /// properly read first.
+    pub fn is_locked_many<T>(&self, records_id: Vec<T>) -> Result<(), Error>
+    where
+        T: Into<RowId>,
+    {
+        records_id
+            .into_iter()
+            .map(|resource_id| {
+                let id = resource_id.into();
+                self.inner
+                    .contains(&id)
+                    .then_some(())
+                    .ok_or(Error::UpdatingUnlockedRecord(id))
+            })
+            .collect::<Result<(), _>>()
+    }
+
+    /// Verifies that a single row is currently locked.
+    ///
+    /// # Errors
+    ///
+    /// Returns [`Error::UpdatingUnlockedRecord`] if the specified row has not
+    /// been locked. This prevents modifications to rows that weren't properly
+    /// read first.
+    pub fn is_locked<T>(&self, resource_id: T) -> Result<(), Error>
+    where
+        T: Into<RowId>,
+    {
+        let id = resource_id.into();
+        self.inner
+            .contains(&id)
+            .then_some(())
+            .ok_or(Error::UpdatingUnlockedRecord(id))
+    }
+}

+ 12 - 6
crates/cdk-common/src/database/mint/mod.rs

@@ -117,20 +117,20 @@ pub trait QuotesTransaction {
         quote_id: &QuoteId,
     ) -> Result<Option<MintMintQuote>, Self::Err>;
     /// Add [`MintMintQuote`]
-    async fn add_mint_quote(&mut self, quote: MintMintQuote) -> Result<(), Self::Err>;
+    async fn add_mint_quote(&mut self, quote: MintMintQuote) -> Result<MintMintQuote, Self::Err>;
     /// Increment amount paid [`MintMintQuote`]
     async fn increment_mint_quote_amount_paid(
         &mut self,
-        quote_id: &QuoteId,
+        quote: mint::MintQuote,
         amount_paid: Amount,
         payment_id: String,
-    ) -> Result<Amount, Self::Err>;
+    ) -> Result<mint::MintQuote, Self::Err>;
     /// Increment amount paid [`MintMintQuote`]
     async fn increment_mint_quote_amount_issued(
         &mut self,
-        quote_id: &QuoteId,
+        quote: mint::MintQuote,
         amount_issued: Amount,
-    ) -> Result<Amount, Self::Err>;
+    ) -> Result<mint::MintQuote, Self::Err>;
 
     /// Get [`mint::MeltQuote`] and lock it for update in this transaction
     async fn get_melt_quote(
@@ -223,6 +223,12 @@ pub trait ProofsTransaction {
         proofs_state: State,
     ) -> Result<Vec<Option<State>>, Self::Err>;
 
+    /// get proofs states
+    async fn get_proofs_states(
+        &mut self,
+        ys: &[PublicKey],
+    ) -> Result<Vec<Option<State>>, Self::Err>;
+
     /// Remove [`Proofs`]
     async fn remove_proofs(
         &mut self,
@@ -232,7 +238,7 @@ pub trait ProofsTransaction {
 
     /// Get ys by quote id
     async fn get_proof_ys_by_quote_id(
-        &self,
+        &mut self,
         quote_id: &QuoteId,
     ) -> Result<Vec<PublicKey>, Self::Err>;
 }

+ 274 - 54
crates/cdk-common/src/database/mint/test/mint.rs

@@ -88,24 +88,24 @@ where
     );
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    assert!(tx.add_mint_quote(mint_quote.clone()).await.is_ok());
+    let mint_quote = tx.add_mint_quote(mint_quote).await.unwrap();
 
     let p1 = unique_string();
     let p2 = unique_string();
 
-    let new_paid_amount = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1.clone())
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 100.into(), p1.clone())
         .await
         .unwrap();
 
-    assert_eq!(new_paid_amount, 100.into());
+    assert_eq!(mint_quote.amount_paid(), 100.into());
 
-    let new_paid_amount = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 250.into(), p2.clone())
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 250.into(), p2.clone())
         .await
         .unwrap();
 
-    assert_eq!(new_paid_amount, 350.into());
+    assert_eq!(mint_quote.amount_paid(), 350.into());
 
     tx.commit().await.unwrap();
 
@@ -150,19 +150,19 @@ where
     let p2 = unique_string();
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
-    let new_paid_amount = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1.clone())
+    let mint_quote = tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 100.into(), p1.clone())
         .await
         .unwrap();
 
-    assert_eq!(new_paid_amount, 100.into());
+    assert_eq!(mint_quote.amount_paid(), 100.into());
 
-    let new_paid_amount = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 250.into(), p2.clone())
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 250.into(), p2.clone())
         .await
         .unwrap();
-    assert_eq!(new_paid_amount, 350.into());
+    assert_eq!(mint_quote.amount_paid(), 350.into());
     tx.commit().await.unwrap();
 
     let mint_quote_from_db = db
@@ -213,14 +213,14 @@ where
     let p1 = unique_string();
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
-    let amount_paid = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1.clone())
+    let mint_quote = tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 100.into(), p1.clone())
         .await
         .unwrap();
 
     assert!(tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1)
+        .increment_mint_quote_amount_paid(mint_quote.clone(), 100.into(), p1)
         .await
         .is_err());
     tx.commit().await.unwrap();
@@ -230,7 +230,7 @@ where
         .await
         .unwrap()
         .expect("mint_from_db");
-    assert_eq!(mint_quote_from_db.amount_paid(), amount_paid);
+    assert_eq!(mint_quote_from_db.amount_paid(), mint_quote.amount_paid());
     assert_eq!(mint_quote_from_db.payments.len(), 1);
 }
 
@@ -258,16 +258,21 @@ where
     );
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
-    let amount_paid = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1.clone())
+    let mint_quote = tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 100.into(), p1.clone())
         .await
         .unwrap();
     tx.commit().await.unwrap();
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("no error")
+        .expect("quote");
     assert!(tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1)
+        .increment_mint_quote_amount_paid(mint_quote.clone(), 100.into(), p1)
         .await
         .is_err());
     tx.commit().await.unwrap(); // although in theory nothing has changed, let's try it out
@@ -277,7 +282,7 @@ where
         .await
         .unwrap()
         .expect("mint_from_db");
-    assert_eq!(mint_quote_from_db.amount_paid(), amount_paid);
+    assert_eq!(mint_quote_from_db.amount_paid(), mint_quote.amount_paid());
     assert_eq!(mint_quote_from_db.payments.len(), 1);
 }
 
@@ -303,9 +308,9 @@ where
     );
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+    let mint_quote = tx.add_mint_quote(mint_quote.clone()).await.unwrap();
     assert!(tx
-        .increment_mint_quote_amount_issued(&mint_quote.id, 100.into())
+        .increment_mint_quote_amount_issued(mint_quote, 100.into())
         .await
         .is_err());
 }
@@ -336,8 +341,13 @@ where
     tx.commit().await.unwrap();
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("no error")
+        .expect("quote");
     assert!(tx
-        .increment_mint_quote_amount_issued(&mint_quote.id, 100.into())
+        .increment_mint_quote_amount_issued(mint_quote, 100.into())
         .await
         .is_err());
 }
@@ -365,12 +375,13 @@ where
 
     let p1 = unique_string();
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
-    tx.increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1.clone())
+    let mint_quote = tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 100.into(), p1.clone())
         .await
         .unwrap();
     assert!(tx
-        .increment_mint_quote_amount_issued(&mint_quote.id, 101.into())
+        .increment_mint_quote_amount_issued(mint_quote, 101.into())
         .await
         .is_err());
 }
@@ -398,15 +409,21 @@ where
 
     let p1 = unique_string();
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
-    tx.increment_mint_quote_amount_paid(&mint_quote.id, 100.into(), p1.clone())
+    let mint_quote = tx.add_mint_quote(mint_quote).await.unwrap();
+    let quote_id = mint_quote.id.clone();
+    tx.increment_mint_quote_amount_paid(mint_quote, 100.into(), p1.clone())
         .await
         .unwrap();
     tx.commit().await.unwrap();
 
     let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let mint_quote = tx
+        .get_mint_quote(&quote_id)
+        .await
+        .expect("no error")
+        .expect("quote");
     assert!(tx
-        .increment_mint_quote_amount_issued(&mint_quote.id, 101.into())
+        .increment_mint_quote_amount_issued(mint_quote, 101.into())
         .await
         .is_err());
 }
@@ -1034,25 +1051,35 @@ where
 
     // Add quote
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+    let mint_quote = tx.add_mint_quote(mint_quote).await.unwrap();
     tx.commit().await.unwrap();
 
     // Increment amount paid first time
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    let new_total = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 300.into(), "payment_1".to_string())
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 300.into(), "payment_1".to_string())
         .await
         .unwrap();
-    assert_eq!(new_total, 300.into());
+    assert_eq!(mint_quote.amount_paid(), 300.into());
     tx.commit().await.unwrap();
 
     // Increment amount paid second time
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    let new_total = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 200.into(), "payment_2".to_string())
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 200.into(), "payment_2".to_string())
         .await
         .unwrap();
-    assert_eq!(new_total, 500.into());
+    assert_eq!(mint_quote.amount_paid(), 500.into());
     tx.commit().await.unwrap();
 
     // Verify final state
@@ -1090,27 +1117,42 @@ where
 
     // First increment amount_paid to allow issuing
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.increment_mint_quote_amount_paid(&mint_quote.id, 1000.into(), "payment_1".to_string())
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
+    tx.increment_mint_quote_amount_paid(mint_quote.clone(), 1000.into(), "payment_1".to_string())
         .await
         .unwrap();
     tx.commit().await.unwrap();
 
     // Increment amount issued first time
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    let new_total = tx
-        .increment_mint_quote_amount_issued(&mint_quote.id, 400.into())
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
+    let mint_quote = tx
+        .increment_mint_quote_amount_issued(mint_quote, 400.into())
         .await
         .unwrap();
-    assert_eq!(new_total, 400.into());
+    assert_eq!(mint_quote.amount_issued(), 400.into());
     tx.commit().await.unwrap();
 
     // Increment amount issued second time
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    let new_total = tx
-        .increment_mint_quote_amount_issued(&mint_quote.id, 300.into())
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
+    let mint_quote = tx
+        .increment_mint_quote_amount_issued(mint_quote, 300.into())
         .await
         .unwrap();
-    assert_eq!(new_total, 700.into());
+    assert_eq!(mint_quote.amount_issued(), 700.into());
     tx.commit().await.unwrap();
 
     // Verify final state
@@ -1335,17 +1377,27 @@ where
 
     // First payment with payment_id "payment_1"
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    let new_total = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 300.into(), "payment_1".to_string())
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 300.into(), "payment_1".to_string())
         .await
         .unwrap();
-    assert_eq!(new_total, 300.into());
+    assert_eq!(mint_quote.amount_paid(), 300.into());
     tx.commit().await.unwrap();
 
     // Try to add the same payment_id again - should fail with Duplicate error
     let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
     let result = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 300.into(), "payment_1".to_string())
+        .increment_mint_quote_amount_paid(mint_quote.clone(), 300.into(), "payment_1".to_string())
         .await;
 
     assert!(
@@ -1360,14 +1412,182 @@ where
 
     // A different payment_id should succeed
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    let new_total = tx
-        .increment_mint_quote_amount_paid(&mint_quote.id, 200.into(), "payment_2".to_string())
+    let mint_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .expect("valid quote")
+        .expect("valid result");
+    let mint_quote = tx
+        .increment_mint_quote_amount_paid(mint_quote, 200.into(), "payment_2".to_string())
         .await
         .unwrap();
-    assert_eq!(new_total, 500.into());
+    assert_eq!(mint_quote.amount_paid(), 500.into());
     tx.commit().await.unwrap();
 
     // Verify final state
     let retrieved = db.get_mint_quote(&mint_quote.id).await.unwrap().unwrap();
     assert_eq!(retrieved.amount_paid(), 500.into());
 }
+
+/// Test that modifying a mint quote without loading it first fails
+///
+/// This test verifies that the database layer rejects attempts to modify
+/// records that weren't first read (and locked) within the transaction.
+pub async fn modify_mint_quote_without_loading_fails<DB>(db: DB)
+where
+    DB: Database<Error> + KeysDatabase<Err = Error>,
+{
+    use crate::database::mint::test::unique_string;
+
+    // First, create a mint quote in the database
+    let mint_quote = MintQuote::new(
+        None,
+        "".to_owned(),
+        cashu::CurrencyUnit::Sat,
+        None,
+        0,
+        PaymentIdentifier::CustomId(unique_string()),
+        None,
+        1000.into(),
+        500.into(), // already has some amount paid
+        cashu::PaymentMethod::Bolt11,
+        0,
+        vec![],
+        vec![],
+    );
+
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let mint_quote = tx.add_mint_quote(mint_quote).await.unwrap();
+    tx.commit().await.unwrap();
+
+    // Now try to modify the quote in a new transaction WITHOUT loading it first
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+
+    // Clone the quote from our local variable (simulating having a stale reference)
+    // and try to increment amount_paid without first calling get_mint_quote
+    let result = tx
+        .increment_mint_quote_amount_paid(mint_quote.clone(), 100.into(), unique_string())
+        .await;
+
+    // This should fail because we didn't load (and lock) the quote first
+    assert!(
+        result.is_err(),
+        "Modifying mint quote without loading should fail"
+    );
+
+    // Verify the error is the expected UpdatingUnlockedRecord error
+    let err = result.unwrap_err();
+    assert!(
+        matches!(err, Error::UpdatingUnlockedRecord(_)),
+        "Expected UpdatingUnlockedRecord error, got: {:?}",
+        err
+    );
+
+    tx.rollback().await.unwrap();
+}
+
+/// Test that incrementing amount_issued without loading the quote first fails
+pub async fn increment_amount_issued_without_loading_fails<DB>(db: DB)
+where
+    DB: Database<Error> + KeysDatabase<Err = Error>,
+{
+    use crate::database::mint::test::unique_string;
+
+    // Create a mint quote with amount_paid already set
+    let mint_quote = MintQuote::new(
+        None,
+        "".to_owned(),
+        cashu::CurrencyUnit::Sat,
+        None,
+        0,
+        PaymentIdentifier::CustomId(unique_string()),
+        None,
+        1000.into(),
+        500.into(), // has paid amount to allow issuing
+        cashu::PaymentMethod::Bolt11,
+        0,
+        vec![],
+        vec![],
+    );
+
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let mint_quote = tx.add_mint_quote(mint_quote).await.unwrap();
+    tx.commit().await.unwrap();
+
+    // Try to increment amount_issued without loading the quote first
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+
+    let result = tx
+        .increment_mint_quote_amount_issued(mint_quote.clone(), 100.into())
+        .await;
+
+    // This should fail
+    assert!(
+        result.is_err(),
+        "Incrementing amount_issued without loading should fail"
+    );
+
+    let err = result.unwrap_err();
+    assert!(
+        matches!(err, Error::UpdatingUnlockedRecord(_)),
+        "Expected UpdatingUnlockedRecord error, got: {:?}",
+        err
+    );
+
+    tx.rollback().await.unwrap();
+}
+
+/// Test that loading the quote first allows modifications
+pub async fn modify_mint_quote_after_loading_succeeds<DB>(db: DB)
+where
+    DB: Database<Error> + KeysDatabase<Err = Error>,
+{
+    use crate::database::mint::test::unique_string;
+
+    let mint_quote = MintQuote::new(
+        None,
+        "".to_owned(),
+        cashu::CurrencyUnit::Sat,
+        None,
+        0,
+        PaymentIdentifier::CustomId(unique_string()),
+        None,
+        1000.into(),
+        0.into(),
+        cashu::PaymentMethod::Bolt11,
+        0,
+        vec![],
+        vec![],
+    );
+
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    tx.add_mint_quote(mint_quote.clone()).await.unwrap();
+    tx.commit().await.unwrap();
+
+    // Now load the quote first, then modify it
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+
+    // First load the quote (this should lock it)
+    let loaded_quote = tx
+        .get_mint_quote(&mint_quote.id)
+        .await
+        .unwrap()
+        .expect("quote should exist");
+
+    // Now modification should succeed
+    let result = tx
+        .increment_mint_quote_amount_paid(loaded_quote, 100.into(), unique_string())
+        .await;
+
+    assert!(
+        result.is_ok(),
+        "Modifying after loading should succeed, got: {:?}",
+        result.err()
+    );
+
+    tx.commit().await.unwrap();
+
+    // Verify the modification was persisted
+    let retrieved = db.get_mint_quote(&mint_quote.id).await.unwrap().unwrap();
+    assert_eq!(retrieved.amount_paid(), 100.into());
+}

+ 5 - 54
crates/cdk-common/src/database/mint/test/mod.rs

@@ -9,8 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
 
 // For derivation path parsing
 use bitcoin::bip32::DerivationPath;
-use cashu::secret::Secret;
-use cashu::{Amount, CurrencyUnit, SecretKey};
+use cashu::CurrencyUnit;
 
 use super::*;
 use crate::database::KVStoreDatabase;
@@ -57,56 +56,6 @@ where
     keyset_id
 }
 
-/// State transition test
-pub async fn state_transition<DB>(db: DB)
-where
-    DB: Database<crate::database::Error> + KeysDatabase<Err = crate::database::Error>,
-{
-    let keyset_id = setup_keyset(&db).await;
-
-    let proofs = vec![
-        Proof {
-            amount: Amount::from(100),
-            keyset_id,
-            secret: Secret::generate(),
-            c: SecretKey::generate().public_key(),
-            witness: None,
-            dleq: None,
-        },
-        Proof {
-            amount: Amount::from(200),
-            keyset_id,
-            secret: Secret::generate(),
-            c: SecretKey::generate().public_key(),
-            witness: None,
-            dleq: None,
-        },
-    ];
-
-    // Add proofs to database
-    let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_proofs(
-        proofs.clone(),
-        None,
-        &Operation::new_swap(Amount::ZERO, Amount::ZERO, Amount::ZERO),
-    )
-    .await
-    .unwrap();
-
-    // Mark one proof as `pending`
-    assert!(tx
-        .update_proofs_states(&[proofs[0].y().unwrap()], State::Pending)
-        .await
-        .is_ok());
-
-    // Attempt to select the `pending` proof, as `pending` again (which should fail)
-    assert!(tx
-        .update_proofs_states(&[proofs[0].y().unwrap()], State::Pending)
-        .await
-        .is_err());
-    tx.commit().await.unwrap();
-}
-
 /// Test KV store functionality including write, read, list, update, and remove operations
 pub async fn kvstore_functionality<DB>(db: DB)
 where
@@ -238,7 +187,6 @@ macro_rules! mint_db_test {
     ($make_db_fn:ident) => {
         mint_db_test!(
             $make_db_fn,
-            state_transition,
             add_and_find_proofs,
             add_duplicate_proofs,
             kvstore_functionality,
@@ -306,7 +254,10 @@ macro_rules! mint_db_test {
             get_mint_quote_by_request_lookup_id_in_transaction,
             get_blind_signatures_in_transaction,
             reject_duplicate_payment_ids,
-            remove_spent_proofs_should_fail
+            remove_spent_proofs_should_fail,
+            modify_mint_quote_without_loading_fails,
+            increment_amount_issued_without_loading_fails,
+            modify_mint_quote_after_loading_succeeds
         );
     };
     ($make_db_fn:ident, $($name:ident),+ $(,)?) => {

+ 8 - 0
crates/cdk-common/src/database/mint/test/proofs.rs

@@ -705,6 +705,10 @@ where
 
     // Transition proofs to Pending state
     let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let _records = tx
+        .get_proof_ys_by_quote_id(&quote_id)
+        .await
+        .expect("valid records");
     tx.update_proofs_states(&ys, State::Pending).await.unwrap();
     tx.commit().await.unwrap();
 
@@ -716,6 +720,10 @@ where
 
     // Now transition proofs to Spent state
     let mut tx = Database::begin_transaction(&db).await.unwrap();
+    let _records = tx
+        .get_proof_ys_by_quote_id(&quote_id)
+        .await
+        .expect("valid records");
     tx.update_proofs_states(&ys, State::Spent).await.unwrap();
     tx.commit().await.unwrap();
 

+ 7 - 0
crates/cdk-common/src/database/mod.rs

@@ -1,6 +1,7 @@
 //! CDK Database
 
 mod kvstore;
+pub mod locked_row;
 
 #[cfg(feature = "mint")]
 pub mod mint;
@@ -32,6 +33,8 @@ pub use wallet::{
     DynWalletDatabaseTransaction,
 };
 
+use crate::database::locked_row::RowId;
+
 /// Type alias for dynamic Wallet Database
 #[cfg(feature = "wallet")]
 pub type DynWalletDatabase = std::sync::Arc<dyn WalletDatabase<Error> + Send + Sync>;
@@ -131,6 +134,10 @@ pub enum Error {
     #[error(transparent)]
     Database(Box<dyn std::error::Error + Send + Sync>),
 
+    /// Misusage of update
+    #[error("Attempting to update record without previously locking ita {0:?}")]
+    UpdatingUnlockedRecord(RowId),
+
     /// Duplicate entry
     #[error("Duplicate entry")]
     Duplicate,

+ 3 - 0
crates/cdk-common/src/error.rs

@@ -47,6 +47,9 @@ pub enum Error {
     /// Amount overflow
     #[error("Amount Overflow")]
     AmountOverflow,
+    /// Over issue - tried to issue more than paid
+    #[error("Cannot issue more than amount paid")]
+    OverIssue,
     /// Witness missing or invalid
     #[error("Signature missing or invalid")]
     SignatureMissingOrInvalid,

+ 12 - 1
crates/cdk-common/src/mint.rs

@@ -471,15 +471,26 @@ impl MintQuote {
     }
 
     /// Increment the amount issued on the mint quote by a given amount
+    ///
+    /// # Errors
+    /// Returns an error if the new issued amount would exceed the paid amount
+    /// (can't issue more than what's been paid) or if the addition would overflow.
     #[instrument(skip(self))]
     pub fn increment_amount_issued(
         &mut self,
         additional_amount: Amount,
     ) -> Result<Amount, crate::Error> {
-        self.amount_issued = self
+        let new_amount_issued = self
             .amount_issued
             .checked_add(additional_amount)
             .ok_or(crate::Error::AmountOverflow)?;
+
+        // Can't issue more than what's been paid
+        if new_amount_issued > self.amount_paid {
+            return Err(crate::Error::OverIssue);
+        }
+
+        self.amount_issued = new_amount_issued;
         Ok(self.amount_issued)
     }
 

+ 6 - 1
crates/cdk-integration-tests/tests/mint.rs

@@ -203,8 +203,13 @@ async fn test_concurrent_duplicate_payment_handling() {
 
         join_set.spawn(async move {
             let mut tx = MintDatabase::begin_transaction(&*db_clone).await.unwrap();
+            let quote_from_db = tx
+                .get_mint_quote(&quote_id)
+                .await
+                .expect("no error")
+                .expect("some value");
             let result = tx
-                .increment_mint_quote_amount_paid(&quote_id, Amount::from(10), payment_id_clone)
+                .increment_mint_quote_amount_paid(quote_from_db, Amount::from(10), payment_id_clone)
                 .await;
 
             if result.is_ok() {

+ 8 - 1
crates/cdk-mint-rpc/src/proto/server.rs

@@ -665,8 +665,15 @@ impl CdkMint for MintRPCServer {
                     .await
                     .map_err(|_| Status::internal("Could not start db transaction".to_string()))?;
 
+                // Re-fetch the mint quote within the transaction to lock it
+                let mint_quote = tx
+                    .get_mint_quote(&quote_id)
+                    .await
+                    .map_err(|_| Status::internal("Could not get quote in transaction".to_string()))?
+                    .ok_or(Status::invalid_argument("Quote not found in transaction".to_string()))?;
+
                 self.mint
-                    .pay_mint_quote(&mut tx, &mint_quote, response)
+                    .pay_mint_quote(&mut tx, mint_quote, response)
                     .await
                     .map_err(|_| Status::internal("Could not process payment".to_string()))?;
 

+ 6 - 6
crates/cdk-sql-common/src/mint/auth/mod.rs

@@ -121,6 +121,7 @@ where
     }
 
     async fn add_proof(&mut self, proof: AuthProof) -> Result<(), database::Error> {
+        let y = proof.y()?;
         if let Err(err) = query(
             r#"
                 INSERT INTO proof
@@ -129,7 +130,7 @@ where
                 (:y, :keyset_id, :secret, :c, :state)
                 "#,
         )?
-        .bind("y", proof.y()?.to_bytes().to_vec())
+        .bind("y", y.to_bytes().to_vec())
         .bind("keyset_id", proof.keyset_id.to_string())
         .bind("secret", proof.secret.to_string())
         .bind("c", proof.c.to_bytes().to_vec())
@@ -139,6 +140,7 @@ where
         {
             tracing::debug!("Attempting to add known proof. Skipping.... {:?}", err);
         }
+        self.locked_records.lock(y);
         Ok(())
     }
 
@@ -147,6 +149,7 @@ where
         y: &PublicKey,
         proofs_state: State,
     ) -> Result<Option<State>, Self::Err> {
+        self.locked_records.is_locked(y)?;
         let current_state = query(r#"SELECT state FROM proof WHERE y = :y FOR UPDATE"#)?
             .bind("y", y.to_bytes().to_vec())
             .pluck(&self.inner)
@@ -154,12 +157,8 @@ where
             .map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str)))
             .transpose()?;
 
-        query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#)?
+        query(r#"UPDATE proof SET state = :new_state WHERE  y = :y"#)?
             .bind("y", y.to_bytes().to_vec())
-            .bind(
-                "state",
-                current_state.as_ref().map(|state| state.to_string()),
-            )
             .bind("new_state", proofs_state.to_string())
             .execute(&self.inner)
             .await?;
@@ -255,6 +254,7 @@ where
                 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
             )
             .await?,
+            locked_records: Default::default(),
         }))
     }
 

+ 124 - 120
crates/cdk-sql-common/src/mint/mod.rs

@@ -15,6 +15,7 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use bitcoin::bip32::DerivationPath;
+use cdk_common::database::locked_row::LockedRows;
 use cdk_common::database::mint::{
     CompletedOperationsDatabase, CompletedOperationsTransaction, SagaDatabase, SagaTransaction,
 };
@@ -31,7 +32,7 @@ use cdk_common::nut00::ProofsMethods;
 use cdk_common::payment::PaymentIdentifier;
 use cdk_common::quote_id::QuoteId;
 use cdk_common::secret::Secret;
-use cdk_common::state::{check_melt_quote_state_transition, check_state_transition};
+use cdk_common::state::check_melt_quote_state_transition;
 use cdk_common::util::unix_time;
 use cdk_common::{
     Amount, BlindSignature, BlindSignatureDleq, BlindedMessage, CurrencyUnit, Id, MeltQuoteState,
@@ -78,31 +79,7 @@ where
     RM: DatabasePool + 'static,
 {
     inner: ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
-}
-
-#[inline(always)]
-async fn get_current_states<C>(
-    conn: &C,
-    ys: &[PublicKey],
-) -> Result<HashMap<PublicKey, State>, Error>
-where
-    C: DatabaseExecutor + Send + Sync,
-{
-    if ys.is_empty() {
-        return Ok(Default::default());
-    }
-    query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
-        .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
-        .fetch_all(conn)
-        .await?
-        .into_iter()
-        .map(|row| {
-            Ok((
-                column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
-                column_as_string!(&row[1], State::from_str),
-            ))
-        })
-        .collect::<Result<HashMap<_, _>, _>>()
+    locked_records: LockedRows,
 }
 
 impl<RM> SQLMintDatabase<RM>
@@ -166,6 +143,9 @@ where
         }?;
 
         for proof in proofs {
+            let y = proof.y()?;
+            self.locked_records.lock(&y);
+
             query(
                 r#"
                   INSERT INTO proof
@@ -174,7 +154,7 @@ where
                   (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
                   "#,
             )?
-            .bind("y", proof.y()?.to_bytes().to_vec())
+            .bind("y", y.to_bytes().to_vec())
             .bind("amount", proof.amount.to_i64())
             .bind("keyset_id", proof.keyset_id.to_string())
             .bind("secret", proof.secret.to_string())
@@ -200,7 +180,9 @@ where
         ys: &[PublicKey],
         new_state: State,
     ) -> Result<Vec<Option<State>>, Self::Err> {
-        let mut current_states = get_current_states(&self.inner, ys).await?;
+        self.locked_records.is_locked_many(ys.to_owned())?;
+
+        let mut current_states = get_current_states(&self.inner, ys, true).await?;
 
         if current_states.len() != ys.len() {
             tracing::warn!(
@@ -211,10 +193,6 @@ where
             return Err(database::Error::ProofNotFound);
         }
 
-        for state in current_states.values() {
-            check_state_transition(*state, new_state)?;
-        }
-
         query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
             .bind("new_state", new_state.to_string())
             .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
@@ -261,7 +239,7 @@ where
 
         if total_deleted != ys.len() {
             // Query current states to provide detailed logging
-            let current_states = get_current_states(&self.inner, ys).await?;
+            let current_states = get_current_states(&self.inner, ys, true).await?;
 
             let missing_count = ys.len() - current_states.len();
             let spent_count = current_states
@@ -300,7 +278,7 @@ where
     }
 
     async fn get_proof_ys_by_quote_id(
-        &self,
+        &mut self,
         quote_id: &QuoteId,
     ) -> Result<Vec<PublicKey>, Self::Err> {
         Ok(query(
@@ -315,16 +293,37 @@ where
                 proof
             WHERE
                 quote_id = :quote_id
+            FOR UPDATE
             "#,
         )?
         .bind("quote_id", quote_id.to_string())
         .fetch_all(&self.inner)
         .await?
         .into_iter()
-        .map(sql_row_to_proof)
+        .map(|row| {
+            sql_row_to_proof(row).and_then(|row| {
+                let _ = row.y().map(|c| self.locked_records.lock(c));
+                Ok(row)
+            })
+        })
         .collect::<Result<Vec<Proof>, _>>()?
         .ys()?)
     }
+
+    async fn get_proofs_states(
+        &mut self,
+        ys: &[PublicKey],
+    ) -> Result<Vec<Option<State>>, Self::Err> {
+        let mut current_states =
+            get_current_states(&self.inner, ys, true)
+                .await
+                .inspect(|public_keys| {
+                    self.locked_records
+                        .lock_many(public_keys.keys().collect::<Vec<_>>());
+                })?;
+
+        Ok(ys.iter().map(|y| current_states.remove(y)).collect())
+    }
 }
 
 #[async_trait]
@@ -363,6 +362,37 @@ where
 }
 
 #[inline(always)]
+async fn get_current_states<C>(
+    conn: &C,
+    ys: &[PublicKey],
+    for_update: bool,
+) -> Result<HashMap<PublicKey, State>, Error>
+where
+    C: DatabaseExecutor + Send + Sync,
+{
+    if ys.is_empty() {
+        return Ok(Default::default());
+    }
+    let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
+
+    query(&format!(
+        r#"SELECT y, state FROM proof WHERE y IN (:ys) {}"#,
+        for_update_clause
+    ))?
+    .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
+    .fetch_all(conn)
+    .await?
+    .into_iter()
+    .map(|row| {
+        Ok((
+            column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
+            column_as_string!(&row[1], State::from_str),
+        ))
+    })
+    .collect::<Result<HashMap<_, _>, _>>()
+}
+
+#[inline(always)]
 async fn get_mint_quote_payments<C>(
     conn: &C,
     quote_id: &QuoteId,
@@ -688,6 +718,7 @@ where
                 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
             )
             .await?,
+            locked_records: Default::default(),
         };
 
         Ok(Box::new(tx))
@@ -973,10 +1004,11 @@ where
     #[instrument(skip(self))]
     async fn increment_mint_quote_amount_paid(
         &mut self,
-        quote_id: &QuoteId,
+        mut quote: mint::MintQuote,
         amount_paid: Amount,
         payment_id: String,
-    ) -> Result<Amount, Self::Err> {
+    ) -> Result<mint::MintQuote, Self::Err> {
+        self.locked_records.is_locked(&quote.id)?;
         if amount_paid == Amount::ZERO {
             tracing::warn!("Amount payments of zero amount should not be recorded.");
             return Err(Error::Duplicate);
@@ -1000,37 +1032,14 @@ where
             return Err(database::Error::Duplicate);
         }
 
-        // Get current amount_paid from quote
-        let current_amount = query(
-            r#"
-            SELECT amount_paid
-            FROM mint_quote
-            WHERE id = :quote_id
-            FOR UPDATE
-            "#,
-        )?
-        .bind("quote_id", quote_id.to_string())
-        .fetch_one(&self.inner)
-        .await
-        .inspect_err(|err| {
-            tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
-        })?;
-
-        let current_amount_paid = if let Some(current_amount) = current_amount {
-            let amount: u64 = column_as_number!(current_amount[0].clone());
-            Amount::from(amount)
-        } else {
-            Amount::ZERO
-        };
-
-        // Calculate new amount_paid with overflow check
-        let new_amount_paid = current_amount_paid
-            .checked_add(amount_paid)
-            .ok_or_else(|| database::Error::AmountOverflow)?;
+        let current_amount_paid = quote.amount_paid();
+        let new_amount_paid = quote
+            .increment_amount_paid(amount_paid)
+            .map_err(|_| Error::AmountOverflow)?;
 
         tracing::debug!(
             "Mint quote {} amount paid was {} is now {}.",
-            quote_id,
+            quote.id,
             current_amount_paid,
             new_amount_paid
         );
@@ -1044,7 +1053,7 @@ where
             "#,
         )?
         .bind("amount_paid", new_amount_paid.to_i64())
-        .bind("quote_id", quote_id.to_string())
+        .bind("quote_id", quote.id.to_string())
         .execute(&self.inner)
         .await
         .inspect_err(|err| {
@@ -1059,7 +1068,7 @@ where
             VALUES (:quote_id, :payment_id, :amount, :timestamp)
             "#,
         )?
-        .bind("quote_id", quote_id.to_string())
+        .bind("quote_id", quote.id.to_string())
         .bind("payment_id", payment_id)
         .bind("amount", amount_paid.to_i64())
         .bind("timestamp", unix_time() as i64)
@@ -1070,55 +1079,20 @@ where
             err
         })?;
 
-        Ok(new_amount_paid)
+        Ok(quote)
     }
 
     #[instrument(skip_all)]
     async fn increment_mint_quote_amount_issued(
         &mut self,
-        quote_id: &QuoteId,
+        mut quote: mint::MintQuote,
         amount_issued: Amount,
-    ) -> Result<Amount, Self::Err> {
-        // Get current amount_issued from quote
-        let current_amounts = query(
-            r#"
-            SELECT amount_issued, amount_paid
-            FROM mint_quote
-            WHERE id = :quote_id
-            FOR UPDATE
-            "#,
-        )?
-        .bind("quote_id", quote_id.to_string())
-        .fetch_one(&self.inner)
-        .await
-        .inspect_err(|err| {
-            tracing::error!("SQLite could not get mint quote amount_issued: {}", err);
-        })?
-        .ok_or(Error::QuoteNotFound)?;
+    ) -> Result<mint::MintQuote, Self::Err> {
+        self.locked_records.is_locked(&quote.id)?;
 
-        let new_amount_issued = {
-            // Make sure the db protects issuing not paid quotes
-            unpack_into!(
-                let (current_amount_issued, current_amount_paid) = current_amounts
-            );
-
-            let current_amount_issued: u64 = column_as_number!(current_amount_issued);
-            let current_amount_paid: u64 = column_as_number!(current_amount_paid);
-
-            let current_amount_issued = Amount::from(current_amount_issued);
-            let current_amount_paid = Amount::from(current_amount_paid);
-
-            // Calculate new amount_issued with overflow check
-            let new_amount_issued = current_amount_issued
-                .checked_add(amount_issued)
-                .ok_or_else(|| database::Error::AmountOverflow)?;
-
-            current_amount_paid
-                .checked_sub(new_amount_issued)
-                .ok_or(Error::Internal("Over-issued not allowed".to_owned()))?;
-
-            new_amount_issued
-        };
+        let new_amount_issued = quote
+            .increment_amount_issued(amount_issued)
+            .map_err(|_| Error::AmountOverflow)?;
 
         // Update the amount_issued
         query(
@@ -1129,7 +1103,7 @@ where
             "#,
         )?
         .bind("amount_issued", new_amount_issued.to_i64())
-        .bind("quote_id", quote_id.to_string())
+        .bind("quote_id", quote.id.to_string())
         .execute(&self.inner)
         .await
         .inspect_err(|err| {
@@ -1145,17 +1119,17 @@ INSERT INTO mint_quote_issued
 VALUES (:quote_id, :amount, :timestamp);
             "#,
         )?
-        .bind("quote_id", quote_id.to_string())
+        .bind("quote_id", quote.id.to_string())
         .bind("amount", amount_issued.to_i64())
         .bind("timestamp", current_time as i64)
         .execute(&self.inner)
         .await?;
 
-        Ok(new_amount_issued)
+        Ok(quote)
     }
 
     #[instrument(skip_all)]
-    async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> {
+    async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<MintQuote, Self::Err> {
         query(
             r#"
                 INSERT INTO mint_quote (
@@ -1169,7 +1143,7 @@ VALUES (:quote_id, :amount, :timestamp);
         .bind("id", quote.id.to_string())
         .bind("amount", quote.amount.map(|a| a.to_i64()))
         .bind("unit", quote.unit.to_string())
-        .bind("request", quote.request)
+        .bind("request", quote.request.clone())
         .bind("expiry", quote.expiry as i64)
         .bind(
             "request_lookup_id",
@@ -1182,7 +1156,9 @@ VALUES (:quote_id, :amount, :timestamp);
         .execute(&self.inner)
         .await?;
 
-        Ok(())
+        self.locked_records.lock(&quote.id);
+
+        Ok(quote)
     }
 
     async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
@@ -1229,6 +1205,8 @@ VALUES (:quote_id, :amount, :timestamp);
         .execute(&self.inner)
         .await?;
 
+        self.locked_records.lock(&quote.id);
+
         Ok(())
     }
 
@@ -1361,28 +1339,52 @@ VALUES (:quote_id, :amount, :timestamp);
     }
 
     async fn get_mint_quote(&mut self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
-        get_mint_quote_inner(&self.inner, quote_id, true).await
+        get_mint_quote_inner(&self.inner, quote_id, true)
+            .await
+            .inspect(|quote| {
+                quote.as_ref().inspect(|mint_quote| {
+                    self.locked_records.lock(&mint_quote.id);
+                });
+            })
     }
 
     async fn get_melt_quote(
         &mut self,
         quote_id: &QuoteId,
     ) -> Result<Option<mint::MeltQuote>, Self::Err> {
-        get_melt_quote_inner(&self.inner, quote_id, true).await
+        get_melt_quote_inner(&self.inner, quote_id, true)
+            .await
+            .inspect(|quote| {
+                quote.as_ref().inspect(|melt_quote| {
+                    self.locked_records.lock(&melt_quote.id);
+                });
+            })
     }
 
     async fn get_mint_quote_by_request(
         &mut self,
         request: &str,
     ) -> Result<Option<MintQuote>, Self::Err> {
-        get_mint_quote_by_request_inner(&self.inner, request, true).await
+        get_mint_quote_by_request_inner(&self.inner, request, true)
+            .await
+            .inspect(|quote| {
+                quote.as_ref().inspect(|mint_quote| {
+                    self.locked_records.lock(&mint_quote.id);
+                });
+            })
     }
 
     async fn get_mint_quote_by_request_lookup_id(
         &mut self,
         request_lookup_id: &PaymentIdentifier,
     ) -> Result<Option<MintQuote>, Self::Err> {
-        get_mint_quote_by_request_lookup_id_inner(&self.inner, request_lookup_id, true).await
+        get_mint_quote_by_request_lookup_id_inner(&self.inner, request_lookup_id, true)
+            .await
+            .inspect(|quote| {
+                quote.as_ref().inspect(|mint_quote| {
+                    self.locked_records.lock(&mint_quote.id);
+                });
+            })
     }
 }
 
@@ -1605,7 +1607,7 @@ where
 
     async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
         let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
-        let mut current_states = get_current_states(&*conn, ys).await?;
+        let mut current_states = get_current_states(&*conn, ys, false).await?;
 
         Ok(ys.iter().map(|y| current_states.remove(y)).collect())
     }
@@ -2094,6 +2096,7 @@ where
                 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
             )
             .await?,
+            locked_records: Default::default(),
         }))
     }
 }
@@ -2399,6 +2402,7 @@ where
                 self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
             )
             .await?,
+            locked_records: Default::default(),
         };
 
         Ok(Box::new(tx))

+ 5 - 5
crates/cdk/src/mint/issue/mod.rs

@@ -409,7 +409,7 @@ impl Mint {
                 .get_mint_quote_by_request_lookup_id(&wait_payment_response.payment_identifier)
                 .await
             {
-                self.pay_mint_quote(&mut tx, &mint_quote, wait_payment_response)
+                self.pay_mint_quote(&mut tx, mint_quote, wait_payment_response)
                     .await?;
             } else {
                 tracing::warn!(
@@ -452,7 +452,7 @@ impl Mint {
     pub async fn pay_mint_quote(
         &self,
         tx: &mut Box<dyn database::MintTransaction<database::Error> + Send + Sync>,
-        mint_quote: &MintQuote,
+        mint_quote: MintQuote,
         wait_payment_response: WaitPaymentResponse,
     ) -> Result<(), Error> {
         #[cfg(feature = "prometheus")]
@@ -674,8 +674,8 @@ impl Mint {
             .await?;
 
 
-        let total_issued = tx
-            .increment_mint_quote_amount_issued(&mint_request.quote, amount_issued)
+        let updated_quote = tx
+            .increment_mint_quote_amount_issued(mint_quote.clone(), amount_issued)
             .await?;
 
 
@@ -686,7 +686,7 @@ impl Mint {
         tx.commit().await?;
 
         self.pubsub_manager
-            .mint_quote_issue(&mint_quote, total_issued);
+            .mint_quote_issue(&mint_quote, updated_quote.amount_issued());
 
         Ok(MintResponse {
             signatures: blind_signatures,

+ 4 - 6
crates/cdk/src/mint/ln.rs

@@ -6,7 +6,6 @@ use cdk_common::common::PaymentProcessorKey;
 use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::MintQuote;
 use cdk_common::payment::DynMintPayment;
-use cdk_common::util::unix_time;
 use cdk_common::{database, Amount, MintQuoteState, PaymentMethod};
 use tracing::instrument;
 
@@ -85,18 +84,17 @@ impl Mint {
 
                 match tx
                     .increment_mint_quote_amount_paid(
-                        &quote.id,
+                        quote.clone(),
                         amount_paid,
                         payment.payment_id.clone(),
                     )
                     .await
                 {
-                    Ok(total_paid) => {
-                        quote.increment_amount_paid(amount_paid)?;
-                        quote.add_payment(amount_paid, payment.payment_id.clone(), unix_time())?;
+                    Ok(updated_quote) => {
                         if let Some(pubsub_manager) = pubsub_manager.as_ref() {
-                            pubsub_manager.mint_quote_payment(quote, total_paid);
+                            pubsub_manager.mint_quote_payment(quote, updated_quote.amount_paid());
                         }
+                        *quote = updated_quote;
                     }
                     Err(database::Error::Duplicate) => {
                         tracing::debug!(

+ 16 - 3
crates/cdk/src/mint/melt/melt_saga/mod.rs

@@ -6,6 +6,7 @@ use cdk_common::database::mint::MeltRequestInfo;
 use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::{MeltSagaState, Operation, Saga, SagaStateEnum};
 use cdk_common::nuts::MeltQuoteState;
+use cdk_common::state::check_state_transition;
 use cdk_common::{Amount, Error, ProofsMethods, PublicKey, QuoteId, State};
 #[cfg(feature = "prometheus")]
 use cdk_prometheus::METRICS;
@@ -236,6 +237,17 @@ impl MeltSaga<Initial> {
 
         let input_ys = melt_request.inputs().ys()?;
 
+        for current_state in tx
+            .get_proofs_states(&input_ys)
+            .await?
+            .into_iter()
+            .collect::<Option<Vec<_>>>()
+            .ok_or(Error::UnexpectedProofState)?
+        {
+            check_state_transition(current_state, State::Pending)
+                .map_err(|_| Error::UnexpectedProofState)?;
+        }
+
         // Update proof states to Pending
         let original_states = match tx.update_proofs_states(&input_ys, State::Pending).await {
             Ok(states) => states,
@@ -529,15 +541,16 @@ impl MeltSaga<SetupComplete> {
         )
         .await?;
 
-        let total_paid = tx
+        let mint_quote = tx
             .increment_mint_quote_amount_paid(
-                &mint_quote.id,
+                mint_quote,
                 amount,
                 self.state_data.quote.id.to_string(),
             )
             .await?;
 
-        self.pubsub.mint_quote_payment(&mint_quote, total_paid);
+        self.pubsub
+            .mint_quote_payment(&mint_quote, mint_quote.amount_paid());
 
         tracing::info!(
             "Melt quote {} paid Mint quote {}",

+ 12 - 0
crates/cdk/src/mint/melt/shared.rs

@@ -8,6 +8,7 @@
 
 use cdk_common::database::{self, DynMintDatabase};
 use cdk_common::nuts::{BlindSignature, BlindedMessage, MeltQuoteState, State};
+use cdk_common::state::check_state_transition;
 use cdk_common::{Amount, Error, PublicKey, QuoteId};
 use cdk_signatory::signatory::SignatoryKeySet;
 
@@ -324,6 +325,17 @@ pub async fn finalize_melt_core(
             .await?;
     }
 
+    for current_state in tx
+        .get_proofs_states(&input_ys)
+        .await?
+        .into_iter()
+        .collect::<Option<Vec<_>>>()
+        .ok_or(Error::UnexpectedProofState)?
+    {
+        check_state_transition(current_state, State::Spent)
+            .map_err(|_| Error::UnexpectedProofState)?;
+    }
+
     // Mark input proofs as spent
     match tx.update_proofs_states(input_ys, State::Spent).await {
         Ok(_) => {}

+ 6 - 5
crates/cdk/src/mint/mod.rs

@@ -690,7 +690,7 @@ impl Mint {
         {
             Self::handle_mint_quote_payment(
                 &mut tx,
-                &mint_quote,
+                mint_quote,
                 wait_payment_response,
                 pubsub_manager,
             )
@@ -710,7 +710,7 @@ impl Mint {
     #[instrument(skip_all)]
     async fn handle_mint_quote_payment(
         tx: &mut Box<dyn database::MintTransaction<database::Error> + Send + Sync>,
-        mint_quote: &MintQuote,
+        mint_quote: MintQuote,
         wait_payment_response: WaitPaymentResponse,
         pubsub_manager: &Arc<PubSubManager>,
     ) -> Result<(), Error> {
@@ -751,14 +751,15 @@ impl Mint {
 
                 match tx
                     .increment_mint_quote_amount_paid(
-                        &mint_quote.id,
+                        mint_quote,
                         payment_amount_quote_unit,
                         wait_payment_response.payment_id.clone(),
                     )
                     .await
                 {
-                    Ok(total_paid) => {
-                        pubsub_manager.mint_quote_payment(mint_quote, total_paid);
+                    Ok(updated_quote) => {
+                        pubsub_manager
+                            .mint_quote_payment(&updated_quote, updated_quote.amount_paid());
                     }
                     Err(database::Error::Duplicate) => {
                         tracing::info!(

+ 1 - 1
crates/cdk/src/mint/start_up_check.rs

@@ -224,7 +224,7 @@ impl Mint {
 
                     let mut quote_id_found = None;
                     for quote in melt_quotes {
-                        let tx = self.localstore.begin_transaction().await?;
+                        let mut tx = self.localstore.begin_transaction().await?;
                         let proof_ys = tx.get_proof_ys_by_quote_id(&quote.id).await?;
                         tx.rollback().await?;
 

+ 12 - 0
crates/cdk/src/mint/swap/swap_saga/mod.rs

@@ -4,6 +4,7 @@ use std::sync::Arc;
 use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::{Operation, Saga, SwapSagaState};
 use cdk_common::nuts::BlindedMessage;
+use cdk_common::state::check_state_transition;
 use cdk_common::{database, Amount, Error, Proofs, ProofsMethods, PublicKey, QuoteId, State};
 use tokio::sync::Mutex;
 use tracing::instrument;
@@ -433,6 +434,17 @@ impl SwapSaga<'_, Signed> {
             }
         }
 
+        for current_state in tx
+            .get_proofs_states(&self.state_data.ys)
+            .await?
+            .into_iter()
+            .collect::<Option<Vec<_>>>()
+            .ok_or(Error::UnexpectedProofState)?
+        {
+            check_state_transition(current_state, State::Spent)
+                .map_err(|_| Error::UnexpectedProofState)?;
+        }
+
         match tx
             .update_proofs_states(&self.state_data.ys, State::Spent)
             .await