Bläddra i källkod

Split the database trait into read and transactions. (#826)

* Split the database trait into read and transactions.

The transaction traits will encapsulate all database changes and also expect
READ-and-lock operations to read and lock records from the database for
exclusive access, thereby avoiding race conditions.

The Transaction trait expects a `rollback` operation on Drop unless the
transaction has been committed.

* fix: melt quote duplicate error

This change stops a second melt quote from being created
if there is an existing valid melt quote for an invoice already.
If the first melt quote has expired then we allow for a new melt quote to be created.

---------

Co-authored-by: thesimplekid <tsk@thesimplekid.com>
C 1 månad sedan
förälder
incheckning
238b09d56a

+ 1 - 1
crates/cdk-cli/src/sub_commands/check_pending.rs

@@ -26,7 +26,7 @@ pub async fn check_pending(multi_mint_wallet: &MultiMintWallet) -> Result<()> {
         // Try to reclaim any proofs that are no longer pending
         match wallet.reclaim_unspent(pending_proofs).await {
             Ok(()) => println!("Successfully reclaimed pending proofs"),
-            Err(e) => println!("Error reclaimed pending proofs: {}", e),
+            Err(e) => println!("Error reclaimed pending proofs: {e}"),
         }
     }
     Ok(())

+ 46 - 28
crates/cdk-common/src/database/mint/auth/mod.rs

@@ -5,61 +5,79 @@ use std::collections::HashMap;
 use async_trait::async_trait;
 use cashu::{AuthRequired, ProtectedEndpoint};
 
+use super::DbTransactionFinalizer;
 use crate::database::Error;
 use crate::mint::MintKeySetInfo;
 use crate::nuts::nut07::State;
 use crate::nuts::{AuthProof, BlindSignature, Id, PublicKey};
 
-/// Mint Database trait
+/// Mint Database transaction
 #[async_trait]
-pub trait MintAuthDatabase {
-    /// Mint Database Error
-    type Err: Into<Error> + From<Error>;
+pub trait MintAuthTransaction<Error>: DbTransactionFinalizer<Err = Error> {
     /// Add Active Keyset
-    async fn set_active_keyset(&self, id: Id) -> Result<(), Self::Err>;
-    /// Get Active Keyset
-    async fn get_active_keyset_id(&self) -> Result<Option<Id>, Self::Err>;
+    async fn set_active_keyset(&mut self, id: Id) -> Result<(), Error>;
 
     /// Add [`MintKeySetInfo`]
-    async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err>;
-    /// Get [`MintKeySetInfo`]
-    async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err>;
-    /// Get [`MintKeySetInfo`]s
-    async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err>;
+    async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error>;
 
     /// Add spent [`AuthProof`]
-    async fn add_proof(&self, proof: AuthProof) -> Result<(), Self::Err>;
-    /// Get [`AuthProof`] state
-    async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err>;
+    async fn add_proof(&mut self, proof: AuthProof) -> Result<(), Error>;
+
     /// Update [`AuthProof`]s state
     async fn update_proof_state(
-        &self,
+        &mut self,
         y: &PublicKey,
         proofs_state: State,
-    ) -> Result<Option<State>, Self::Err>;
+    ) -> Result<Option<State>, Error>;
 
     /// Add [`BlindSignature`]
     async fn add_blind_signatures(
-        &self,
+        &mut self,
         blinded_messages: &[PublicKey],
         blind_signatures: &[BlindSignature],
-    ) -> Result<(), Self::Err>;
-    /// Get [`BlindSignature`]s
-    async fn get_blind_signatures(
-        &self,
-        blinded_messages: &[PublicKey],
-    ) -> Result<Vec<Option<BlindSignature>>, Self::Err>;
+    ) -> Result<(), Error>;
 
     /// Add protected endpoints
     async fn add_protected_endpoints(
-        &self,
+        &mut self,
         protected_endpoints: HashMap<ProtectedEndpoint, AuthRequired>,
-    ) -> Result<(), Self::Err>;
+    ) -> Result<(), Error>;
+
     /// Removed Protected endpoints
     async fn remove_protected_endpoints(
-        &self,
+        &mut self,
         protected_endpoints: Vec<ProtectedEndpoint>,
-    ) -> Result<(), Self::Err>;
+    ) -> Result<(), Error>;
+}
+
+/// Mint Database trait
+#[async_trait]
+pub trait MintAuthDatabase {
+    /// Mint Database Error
+    type Err: Into<Error> + From<Error>;
+
+    /// Begins a transaction
+    async fn begin_transaction<'a>(
+        &'a self,
+    ) -> Result<Box<dyn MintAuthTransaction<Self::Err> + Send + Sync + 'a>, Self::Err>;
+
+    /// Get Active Keyset
+    async fn get_active_keyset_id(&self) -> Result<Option<Id>, Self::Err>;
+
+    /// Get [`MintKeySetInfo`]
+    async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err>;
+    /// Get [`MintKeySetInfo`]s
+    async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err>;
+
+    /// Get [`AuthProof`] state
+    async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err>;
+
+    /// Get [`BlindSignature`]s
+    async fn get_blind_signatures(
+        &self,
+        blinded_messages: &[PublicKey],
+    ) -> Result<Vec<Option<BlindSignature>>, Self::Err>;
+
     /// Get auth for protected_endpoint
     async fn get_auth_for_endpoint(
         &self,

+ 138 - 43
crates/cdk-common/src/database/mint/mod.rs

@@ -21,7 +21,17 @@ mod auth;
 pub mod test;
 
 #[cfg(feature = "auth")]
-pub use auth::MintAuthDatabase;
+pub use auth::{MintAuthDatabase, MintAuthTransaction};
+
+/// KeysDatabaseWriter
+#[async_trait]
+pub trait KeysDatabaseTransaction<'a, Error>: DbTransactionFinalizer<Err = Error> {
+    /// Add Active Keyset
+    async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error>;
+
+    /// Add [`MintKeySetInfo`]
+    async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error>;
+}
 
 /// Mint Keys Database trait
 #[async_trait]
@@ -29,36 +39,84 @@ pub trait KeysDatabase {
     /// Mint Keys Database Error
     type Err: Into<Error> + From<Error>;
 
-    /// Add Active Keyset
-    async fn set_active_keyset(&self, unit: CurrencyUnit, id: Id) -> Result<(), Self::Err>;
+    /// Beings a transaction
+    async fn begin_transaction<'a>(
+        &'a self,
+    ) -> Result<Box<dyn KeysDatabaseTransaction<'a, Self::Err> + Send + Sync + 'a>, Error>;
+
     /// Get Active Keyset
     async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err>;
+
     /// Get all Active Keyset
     async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err>;
-    /// Add [`MintKeySetInfo`]
-    async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err>;
+
     /// Get [`MintKeySetInfo`]
     async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err>;
+
     /// Get [`MintKeySetInfo`]s
     async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err>;
 }
 
-/// Mint Quote Database trait
+/// Mint Quote Database writer trait
 #[async_trait]
-pub trait QuotesDatabase {
+pub trait QuotesTransaction<'a> {
     /// Mint Quotes Database Error
     type Err: Into<Error> + From<Error>;
 
+    /// Get [`MintMintQuote`] and lock it for update in this transaction
+    async fn get_mint_quote(&mut self, quote_id: &Uuid)
+        -> Result<Option<MintMintQuote>, Self::Err>;
     /// Add [`MintMintQuote`]
-    async fn add_mint_quote(&self, quote: MintMintQuote) -> Result<(), Self::Err>;
-    /// Get [`MintMintQuote`]
-    async fn get_mint_quote(&self, quote_id: &Uuid) -> Result<Option<MintMintQuote>, Self::Err>;
+    async fn add_or_replace_mint_quote(&mut self, quote: MintMintQuote) -> Result<(), Self::Err>;
     /// Update state of [`MintMintQuote`]
     async fn update_mint_quote_state(
-        &self,
+        &mut self,
         quote_id: &Uuid,
         state: MintQuoteState,
     ) -> Result<MintQuoteState, Self::Err>;
+    /// Remove [`MintMintQuote`]
+    async fn remove_mint_quote(&mut self, quote_id: &Uuid) -> Result<(), Self::Err>;
+    /// Get [`mint::MeltQuote`] and lock it for update in this transaction
+    async fn get_melt_quote(
+        &mut self,
+        quote_id: &Uuid,
+    ) -> Result<Option<mint::MeltQuote>, Self::Err>;
+    /// Add [`mint::MeltQuote`]
+    async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err>;
+
+    /// Updates the request lookup id for a melt quote
+    async fn update_melt_quote_request_lookup_id(
+        &mut self,
+        quote_id: &Uuid,
+        new_request_lookup_id: &str,
+    ) -> Result<(), Self::Err>;
+
+    /// Update [`mint::MeltQuote`] state
+    ///
+    /// It is expected for this function to fail if the state is already set to the new state
+    async fn update_melt_quote_state(
+        &mut self,
+        quote_id: &Uuid,
+        new_state: MeltQuoteState,
+    ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err>;
+    /// Remove [`mint::MeltQuote`]
+    async fn remove_melt_quote(&mut self, quote_id: &Uuid) -> Result<(), Self::Err>;
+    /// Get all [`MintMintQuote`]s and lock it for update in this transaction
+    async fn get_mint_quote_by_request(
+        &mut self,
+        request: &str,
+    ) -> Result<Option<MintMintQuote>, Self::Err>;
+}
+
+/// Mint Quote Database trait
+#[async_trait]
+pub trait QuotesDatabase {
+    /// Mint Quotes Database Error
+    type Err: Into<Error> + From<Error>;
+
+    /// Get [`MintMintQuote`]
+    async fn get_mint_quote(&self, quote_id: &Uuid) -> Result<Option<MintMintQuote>, Self::Err>;
+
     /// Get all [`MintMintQuote`]s
     async fn get_mint_quote_by_request(
         &self,
@@ -76,30 +134,15 @@ pub trait QuotesDatabase {
         &self,
         state: MintQuoteState,
     ) -> Result<Vec<MintMintQuote>, Self::Err>;
-    /// Remove [`MintMintQuote`]
-    async fn remove_mint_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err>;
-
-    /// Add [`mint::MeltQuote`]
-    async fn add_melt_quote(&self, quote: mint::MeltQuote) -> Result<(), Self::Err>;
     /// Get [`mint::MeltQuote`]
     async fn get_melt_quote(&self, quote_id: &Uuid) -> Result<Option<mint::MeltQuote>, Self::Err>;
-    /// Update [`mint::MeltQuote`] state
-    ///
-    /// It is expected for this function to fail if the state is already set to the new state
-    async fn update_melt_quote_state(
-        &self,
-        quote_id: &Uuid,
-        new_state: MeltQuoteState,
-    ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err>;
     /// Get all [`mint::MeltQuote`]s
     async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err>;
-    /// Remove [`mint::MeltQuote`]
-    async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err>;
 }
 
-/// Mint Proof Database trait
+/// Mint Proof Transaction trait
 #[async_trait]
-pub trait ProofsDatabase {
+pub trait ProofsTransaction<'a> {
     /// Mint Proof Database Error
     type Err: Into<Error> + From<Error>;
 
@@ -107,25 +150,34 @@ pub trait ProofsDatabase {
     ///
     /// Adds proofs to the database. The database should error if the proof already exits, with a
     /// `AttemptUpdateSpentProof` if the proof is already spent or a `Duplicate` error otherwise.
-    async fn add_proofs(&self, proof: Proofs, quote_id: Option<Uuid>) -> Result<(), Self::Err>;
+    async fn add_proofs(&mut self, proof: Proofs, quote_id: Option<Uuid>) -> Result<(), Self::Err>;
+    /// Updates the proofs to a given states and return the previous states
+    async fn update_proofs_states(
+        &mut self,
+        ys: &[PublicKey],
+        proofs_state: State,
+    ) -> Result<Vec<Option<State>>, Self::Err>;
+
     /// Remove [`Proofs`]
     async fn remove_proofs(
-        &self,
+        &mut self,
         ys: &[PublicKey],
         quote_id: Option<Uuid>,
     ) -> Result<(), Self::Err>;
+}
+
+/// Mint Proof Database trait
+#[async_trait]
+pub trait ProofsDatabase {
+    /// Mint Proof Database Error
+    type Err: Into<Error> + From<Error>;
+
     /// Get [`Proofs`] by ys
     async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err>;
     /// Get ys by quote id
     async fn get_proof_ys_by_quote_id(&self, quote_id: &Uuid) -> Result<Vec<PublicKey>, Self::Err>;
     /// Get [`Proofs`] state
     async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err>;
-    /// Get [`Proofs`] state
-    async fn update_proofs_states(
-        &self,
-        ys: &[PublicKey],
-        proofs_state: State,
-    ) -> Result<Vec<Option<State>>, Self::Err>;
     /// Get [`Proofs`] by state
     async fn get_proofs_by_keyset_id(
         &self,
@@ -134,18 +186,32 @@ pub trait ProofsDatabase {
 }
 
 #[async_trait]
-/// Mint Signatures Database trait
-pub trait SignaturesDatabase {
+/// Mint Signatures Transaction trait
+pub trait SignaturesTransaction<'a> {
     /// Mint Signature Database Error
     type Err: Into<Error> + From<Error>;
 
     /// Add [`BlindSignature`]
     async fn add_blind_signatures(
-        &self,
+        &mut self,
         blinded_messages: &[PublicKey],
         blind_signatures: &[BlindSignature],
         quote_id: Option<Uuid>,
     ) -> Result<(), Self::Err>;
+
+    /// Get [`BlindSignature`]s
+    async fn get_blind_signatures(
+        &mut self,
+        blinded_messages: &[PublicKey],
+    ) -> Result<Vec<Option<BlindSignature>>, Self::Err>;
+}
+
+#[async_trait]
+/// Mint Signatures Database trait
+pub trait SignaturesDatabase {
+    /// Mint Signature Database Error
+    type Err: Into<Error> + From<Error>;
+
     /// Get [`BlindSignature`]s
     async fn get_blind_signatures(
         &self,
@@ -163,18 +229,47 @@ pub trait SignaturesDatabase {
     ) -> Result<Vec<BlindSignature>, Self::Err>;
 }
 
+#[async_trait]
+/// Commit and Rollback
+pub trait DbTransactionFinalizer {
+    /// Mint Signature Database Error
+    type Err: Into<Error> + From<Error>;
+
+    /// Commits all the changes into the database
+    async fn commit(self: Box<Self>) -> Result<(), Self::Err>;
+
+    /// Rollbacks the write transaction
+    async fn rollback(self: Box<Self>) -> Result<(), Self::Err>;
+}
+
+/// Base database writer
+#[async_trait]
+pub trait Transaction<'a, Error>:
+    DbTransactionFinalizer<Err = Error>
+    + QuotesTransaction<'a, Err = Error>
+    + SignaturesTransaction<'a, Err = Error>
+    + ProofsTransaction<'a, Err = Error>
+{
+    /// Set [`QuoteTTL`]
+    async fn set_quote_ttl(&mut self, quote_ttl: QuoteTTL) -> Result<(), Error>;
+
+    /// Set [`MintInfo`]
+    async fn set_mint_info(&mut self, mint_info: MintInfo) -> Result<(), Error>;
+}
+
 /// Mint Database trait
 #[async_trait]
 pub trait Database<Error>:
     QuotesDatabase<Err = Error> + ProofsDatabase<Err = Error> + SignaturesDatabase<Err = Error>
 {
-    /// Set [`MintInfo`]
-    async fn set_mint_info(&self, mint_info: MintInfo) -> Result<(), Error>;
+    /// Beings a transaction
+    async fn begin_transaction<'a>(
+        &'a self,
+    ) -> Result<Box<dyn Transaction<'a, Error> + Send + Sync + 'a>, Error>;
+
     /// Get [`MintInfo`]
     async fn get_mint_info(&self) -> Result<MintInfo, Error>;
 
-    /// Set [`QuoteTTL`]
-    async fn set_quote_ttl(&self, quote_ttl: QuoteTTL) -> Result<(), Error>;
     /// Get [`QuoteTTL`]
     async fn get_quote_ttl(&self) -> Result<QuoteTTL, Error>;
 }

+ 17 - 7
crates/cdk-common/src/database/mint/test.rs

@@ -2,17 +2,20 @@
 //!
 //! This set is generic and checks the default and expected behaviour for a mint database
 //! implementation
-use std::fmt::Debug;
 use std::str::FromStr;
 
 use cashu::secret::Secret;
 use cashu::{Amount, CurrencyUnit, SecretKey};
 
 use super::*;
+use crate::database;
 use crate::mint::MintKeySetInfo;
 
 #[inline]
-async fn setup_keyset<E: Debug, DB: Database<E> + KeysDatabase<Err = E>>(db: &DB) -> Id {
+async fn setup_keyset<DB>(db: &DB) -> Id
+where
+    DB: KeysDatabase<Err = database::Error>,
+{
     let keyset_id = Id::from_str("00916bbf7ef91a36").unwrap();
     let keyset_info = MintKeySetInfo {
         id: keyset_id,
@@ -25,12 +28,17 @@ async fn setup_keyset<E: Debug, DB: Database<E> + KeysDatabase<Err = E>>(db: &DB
         max_order: 32,
         input_fee_ppk: 0,
     };
-    db.add_keyset_info(keyset_info).await.unwrap();
+    let mut writer = db.begin_transaction().await.expect("db.begin()");
+    writer.add_keyset_info(keyset_info).await.unwrap();
+    writer.commit().await.expect("commit()");
     keyset_id
 }
 
 /// State transition test
-pub async fn state_transition<E: Debug, DB: Database<E> + KeysDatabase<Err = E>>(db: DB) {
+pub async fn state_transition<DB>(db: DB)
+where
+    DB: Database<database::Error> + KeysDatabase<Err = database::Error>,
+{
     let keyset_id = setup_keyset(&db).await;
 
     let proofs = vec![
@@ -53,19 +61,21 @@ pub async fn state_transition<E: Debug, DB: Database<E> + KeysDatabase<Err = E>>
     ];
 
     // Add proofs to database
-    db.add_proofs(proofs.clone(), None).await.unwrap();
+    let mut tx = Database::begin_transaction(&db).await.unwrap();
+    tx.add_proofs(proofs.clone(), None).await.unwrap();
 
     // Mark one proof as `pending`
-    assert!(db
+    assert!(tx
         .update_proofs_states(&[proofs[0].y().unwrap()], State::Pending)
         .await
         .is_ok());
 
     // Attempt to select the `pending` proof, as `pending` again (which should fail)
-    assert!(db
+    assert!(tx
         .update_proofs_states(&[proofs[0].y().unwrap()], State::Pending)
         .await
         .is_err());
+    tx.commit().await.unwrap();
 }
 
 /// Unit test that is expected to be passed for a correct database implementation

+ 7 - 4
crates/cdk-common/src/database/mod.rs

@@ -5,14 +5,17 @@ pub mod mint;
 #[cfg(feature = "wallet")]
 mod wallet;
 
-#[cfg(all(feature = "mint", feature = "auth"))]
-pub use mint::MintAuthDatabase;
 #[cfg(feature = "mint")]
 pub use mint::{
-    Database as MintDatabase, KeysDatabase as MintKeysDatabase,
-    ProofsDatabase as MintProofsDatabase, QuotesDatabase as MintQuotesDatabase,
+    Database as MintDatabase, DbTransactionFinalizer as MintDbWriterFinalizer,
+    KeysDatabase as MintKeysDatabase, KeysDatabaseTransaction as MintKeyDatabaseTransaction,
+    ProofsDatabase as MintProofsDatabase, ProofsTransaction as MintProofsTransaction,
+    QuotesDatabase as MintQuotesDatabase, QuotesTransaction as MintQuotesTransaction,
     SignaturesDatabase as MintSignaturesDatabase,
+    SignaturesTransaction as MintSignatureTransaction, Transaction as MintTransaction,
 };
+#[cfg(all(feature = "mint", feature = "auth"))]
+pub use mint::{MintAuthDatabase, MintAuthTransaction};
 #[cfg(feature = "wallet")]
 pub use wallet::Database as WalletDatabase;
 

+ 6 - 6
crates/cdk-integration-tests/src/init_auth_mint.rs

@@ -71,9 +71,9 @@ where
                 acc
             });
 
-    auth_database
-        .add_protected_endpoints(blind_auth_endpoints)
-        .await?;
+    let mut tx = auth_database.begin_transaction().await?;
+
+    tx.add_protected_endpoints(blind_auth_endpoints).await?;
 
     let mut clear_auth_endpoint = HashMap::new();
     clear_auth_endpoint.insert(
@@ -81,9 +81,9 @@ where
         AuthRequired::Clear,
     );
 
-    auth_database
-        .add_protected_endpoints(clear_auth_endpoint)
-        .await?;
+    tx.add_protected_endpoints(clear_auth_endpoint).await?;
+
+    tx.commit().await?;
 
     mint_builder = mint_builder.with_auth_localstore(Arc::new(auth_database));
 

+ 5 - 4
crates/cdk-integration-tests/src/init_pure_tests.rs

@@ -227,11 +227,12 @@ pub async fn create_and_start_test_mint() -> Result<Mint> {
         .map(|x| x.clone())
         .expect("localstore");
 
-    localstore
-        .set_mint_info(mint_builder.mint_info.clone())
-        .await?;
+    let mut tx = localstore.begin_transaction().await?;
+    tx.set_mint_info(mint_builder.mint_info.clone()).await?;
+
     let quote_ttl = QuoteTTL::new(10000, 10000);
-    localstore.set_quote_ttl(quote_ttl).await?;
+    tx.set_quote_ttl(quote_ttl).await?;
+    tx.commit().await?;
 
     let mint = mint_builder.build().await?;
 

+ 1 - 3
crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs

@@ -425,9 +425,7 @@ async fn test_pay_invoice_twice() {
 
     let melt = wallet.melt(&melt_quote.id).await.unwrap();
 
-    let melt_two = wallet.melt_quote(invoice, None).await.unwrap();
-
-    let melt_two = wallet.melt(&melt_two.id).await;
+    let melt_two = wallet.melt_quote(invoice, None).await;
 
     match melt_two {
         Err(err) => match err {

+ 5 - 3
crates/cdk-integration-tests/tests/mint.rs

@@ -51,13 +51,15 @@ async fn test_correct_keyset() {
         .with_seed(mnemonic.to_seed_normalized("").to_vec());
 
     let mint = mint_builder.build().await.unwrap();
+    let mut tx = localstore.begin_transaction().await.unwrap();
 
-    localstore
-        .set_mint_info(mint_builder.mint_info.clone())
+    tx.set_mint_info(mint_builder.mint_info.clone())
         .await
         .unwrap();
     let quote_ttl = QuoteTTL::new(10000, 10000);
-    localstore.set_quote_ttl(quote_ttl).await.unwrap();
+    tx.set_quote_ttl(quote_ttl).await.unwrap();
+
+    tx.commit().await.unwrap();
 
     let active = mint.get_active_keysets();
 

+ 10 - 2
crates/cdk-mint-rpc/src/proto/server.rs

@@ -655,8 +655,16 @@ impl CdkMint for MintRPCServer {
 
                 mint_quote.state = state;
 
-                self.mint
-                    .update_mint_quote(mint_quote)
+                let mut tx = self
+                    .mint
+                    .localstore
+                    .begin_transaction()
+                    .await
+                    .map_err(|_| Status::internal("Could not update quote".to_string()))?;
+                tx.add_or_replace_mint_quote(mint_quote)
+                    .await
+                    .map_err(|_| Status::internal("Could not update quote".to_string()))?;
+                tx.commit()
                     .await
                     .map_err(|_| Status::internal("Could not update quote".to_string()))?;
             }

+ 5 - 6
crates/cdk-mintd/src/main.rs

@@ -526,12 +526,11 @@ async fn main() -> anyhow::Result<()> {
 
         mint_builder = mint_builder.set_blind_auth_settings(auth_settings.mint_max_bat);
 
-        auth_localstore
-            .remove_protected_endpoints(unprotected_endpoints)
-            .await?;
-        auth_localstore
-            .add_protected_endpoints(protected_endpoints)
-            .await?;
+        let mut tx = auth_localstore.begin_transaction().await?;
+
+        tx.remove_protected_endpoints(unprotected_endpoints).await?;
+        tx.add_protected_endpoints(protected_endpoints).await?;
+        tx.commit().await?;
     }
 
     let mint = mint_builder.build().await?;

+ 8 - 5
crates/cdk-signatory/src/common.rs

@@ -25,13 +25,14 @@ pub async fn init_keysets(
     // Get keysets info from DB
     let keysets_infos = localstore.get_keyset_infos().await?;
 
+    let mut tx = localstore.begin_transaction().await?;
     if !keysets_infos.is_empty() {
         tracing::debug!("Setting all saved keysets to inactive");
         for keyset in keysets_infos.clone() {
             // Set all to in active
             let mut keyset = keyset;
             keyset.active = false;
-            localstore.add_keyset_info(keyset).await?;
+            tx.add_keyset_info(keyset).await?;
         }
 
         let keysets_by_unit: HashMap<CurrencyUnit, Vec<MintKeySetInfo>> =
@@ -74,9 +75,9 @@ pub async fn init_keysets(
                     active_keysets.insert(id, keyset);
                     let mut keyset_info = highest_index_keyset;
                     keyset_info.active = true;
-                    localstore.add_keyset_info(keyset_info).await?;
+                    tx.add_keyset_info(keyset_info).await?;
                     active_keyset_units.push(unit.clone());
-                    localstore.set_active_keyset(unit, id).await?;
+                    tx.set_active_keyset(unit, id).await?;
                 } else {
                     // Check to see if there are not keysets by this unit
                     let derivation_path_index = if keysets.is_empty() {
@@ -104,8 +105,8 @@ pub async fn init_keysets(
                     );
 
                     let id = keyset_info.id;
-                    localstore.add_keyset_info(keyset_info).await?;
-                    localstore.set_active_keyset(unit.clone(), id).await?;
+                    tx.add_keyset_info(keyset_info).await?;
+                    tx.set_active_keyset(unit.clone(), id).await?;
                     active_keysets.insert(id, keyset);
                     active_keyset_units.push(unit.clone());
                 };
@@ -113,6 +114,8 @@ pub async fn init_keysets(
         }
     }
 
+    tx.commit().await?;
+
     Ok((active_keysets, active_keyset_units))
 }
 

+ 9 - 4
crates/cdk-signatory/src/db_signatory.rs

@@ -53,6 +53,7 @@ impl DbSignatory {
         .await?;
 
         supported_units.entry(CurrencyUnit::Auth).or_insert((0, 1));
+        let mut tx = localstore.begin_transaction().await?;
 
         // Create new keysets for supported units that aren't covered by the current keysets
         for (unit, (fee, max_order)) in supported_units {
@@ -77,12 +78,14 @@ impl DbSignatory {
                 );
 
                 let id = keyset_info.id;
-                localstore.add_keyset_info(keyset_info).await?;
-                localstore.set_active_keyset(unit, id).await?;
+                tx.add_keyset_info(keyset_info).await?;
+                tx.set_active_keyset(unit, id).await?;
                 active_keysets.insert(id, keyset);
             }
         }
 
+        tx.commit().await?;
+
         let keys = Self {
             keysets: Default::default(),
             active_keysets: Default::default(),
@@ -244,8 +247,10 @@ impl Signatory for DbSignatory {
             None,
         );
         let id = info.id;
-        self.localstore.add_keyset_info(info.clone()).await?;
-        self.localstore.set_active_keyset(args.unit, id).await?;
+        let mut tx = self.localstore.begin_transaction().await?;
+        tx.add_keyset_info(info.clone()).await?;
+        tx.set_active_keyset(args.unit, id).await?;
+        tx.commit().await?;
 
         self.reload_keys_from_db().await?;
 

+ 154 - 154
crates/cdk-sqlite/src/mint/auth/mod.rs

@@ -6,14 +6,14 @@ use std::path::Path;
 use std::str::FromStr;
 
 use async_trait::async_trait;
-use cdk_common::database::{self, MintAuthDatabase};
+use cdk_common::database::{self, MintAuthDatabase, MintAuthTransaction};
 use cdk_common::mint::MintKeySetInfo;
 use cdk_common::nuts::{AuthProof, BlindSignature, Id, PublicKey, State};
 use cdk_common::{AuthRequired, ProtectedEndpoint};
 use tracing::instrument;
 
 use super::async_rusqlite::AsyncRusqlite;
-use super::{sqlite_row_to_blind_signature, sqlite_row_to_keyset_info};
+use super::{sqlite_row_to_blind_signature, sqlite_row_to_keyset_info, SqliteTransaction};
 use crate::column_as_string;
 use crate::common::{create_sqlite_pool, migrate};
 use crate::mint::async_rusqlite::query;
@@ -56,11 +56,9 @@ impl MintSqliteAuthDatabase {
 }
 
 #[async_trait]
-impl MintAuthDatabase for MintSqliteAuthDatabase {
-    type Err = database::Error;
-
+impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
     #[instrument(skip(self))]
-    async fn set_active_keyset(&self, id: Id) -> Result<(), Self::Err> {
+    async fn set_active_keyset(&mut self, id: Id) -> Result<(), database::Error> {
         tracing::info!("Setting auth keyset {id} active");
         query(
             r#"
@@ -72,30 +70,13 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
             "#,
         )
         .bind(":id", id.to_string())
-        .execute(&self.pool)
+        .execute(&self.inner)
         .await?;
 
         Ok(())
     }
 
-    async fn get_active_keyset_id(&self) -> Result<Option<Id>, Self::Err> {
-        Ok(query(
-            r#"
-            SELECT
-                id
-            FROM
-                keyset
-            WHERE
-                active = 1;
-            "#,
-        )
-        .pluck(&self.pool)
-        .await?
-        .map(|id| Ok::<_, Error>(column_as_string!(id, Id::from_str, Id::from_bytes)))
-        .transpose()?)
-    }
-
-    async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err> {
+    async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), database::Error> {
         query(
             r#"
         INSERT INTO
@@ -125,12 +106,159 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
         .bind(":derivation_path", keyset.derivation_path.to_string())
         .bind(":max_order", keyset.max_order)
         .bind(":derivation_path_index", keyset.derivation_path_index)
-        .execute(&self.pool)
+        .execute(&self.inner)
         .await?;
 
         Ok(())
     }
 
+    async fn add_proof(&mut self, proof: AuthProof) -> Result<(), database::Error> {
+        if let Err(err) = query(
+            r#"
+                INSERT INTO proof
+                (y, keyset_id, secret, c, state)
+                VALUES
+                (:y, :keyset_id, :secret, :c, :state)
+                "#,
+        )
+        .bind(":y", proof.y()?.to_bytes().to_vec())
+        .bind(":keyset_id", proof.keyset_id.to_string())
+        .bind(":secret", proof.secret.to_string())
+        .bind(":c", proof.c.to_bytes().to_vec())
+        .bind(":state", "UNSPENT".to_string())
+        .execute(&self.inner)
+        .await
+        {
+            tracing::debug!("Attempting to add known proof. Skipping.... {:?}", err);
+        }
+        Ok(())
+    }
+
+    async fn update_proof_state(
+        &mut self,
+        y: &PublicKey,
+        proofs_state: State,
+    ) -> Result<Option<State>, Self::Err> {
+        let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#)
+            .bind(":y", y.to_bytes().to_vec())
+            .pluck(&self.inner)
+            .await?
+            .map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str)))
+            .transpose()?;
+
+        query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#)
+            .bind(":y", y.to_bytes().to_vec())
+            .bind(
+                ":state",
+                current_state.as_ref().map(|state| state.to_string()),
+            )
+            .bind(":new_state", proofs_state.to_string())
+            .execute(&self.inner)
+            .await?;
+
+        Ok(current_state)
+    }
+
+    async fn add_blind_signatures(
+        &mut self,
+        blinded_messages: &[PublicKey],
+        blind_signatures: &[BlindSignature],
+    ) -> Result<(), database::Error> {
+        for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
+            query(
+                r#"
+                       INSERT
+                       INTO blind_signature
+                       (y, amount, keyset_id, c)
+                       VALUES
+                       (:y, :amount, :keyset_id, :c)
+                   "#,
+            )
+            .bind(":y", message.to_bytes().to_vec())
+            .bind(":amount", u64::from(signature.amount) as i64)
+            .bind(":keyset_id", signature.keyset_id.to_string())
+            .bind(":c", signature.c.to_bytes().to_vec())
+            .execute(&self.inner)
+            .await?;
+        }
+
+        Ok(())
+    }
+
+    async fn add_protected_endpoints(
+        &mut self,
+        protected_endpoints: HashMap<ProtectedEndpoint, AuthRequired>,
+    ) -> Result<(), database::Error> {
+        for (endpoint, auth) in protected_endpoints.iter() {
+            if let Err(err) = query(
+                r#"
+                 INSERT OR REPLACE INTO protected_endpoints
+                 (endpoint, auth)
+                 VALUES (:endpoint, :auth);
+                 "#,
+            )
+            .bind(":endpoint", serde_json::to_string(endpoint)?)
+            .bind(":auth", serde_json::to_string(auth)?)
+            .execute(&self.inner)
+            .await
+            {
+                tracing::debug!(
+                    "Attempting to add protected endpoint. Skipping.... {:?}",
+                    err
+                );
+            }
+        }
+
+        Ok(())
+    }
+    async fn remove_protected_endpoints(
+        &mut self,
+        protected_endpoints: Vec<ProtectedEndpoint>,
+    ) -> Result<(), database::Error> {
+        query(r#"DELETE FROM protected_endpoints WHERE endpoint IN (:endpoints)"#)
+            .bind_vec(
+                ":endpoints",
+                protected_endpoints
+                    .iter()
+                    .map(serde_json::to_string)
+                    .collect::<Result<_, _>>()?,
+            )
+            .execute(&self.inner)
+            .await?;
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl MintAuthDatabase for MintSqliteAuthDatabase {
+    type Err = database::Error;
+
+    async fn begin_transaction<'a>(
+        &'a self,
+    ) -> Result<Box<dyn MintAuthTransaction<database::Error> + Send + Sync + 'a>, database::Error>
+    {
+        Ok(Box::new(SqliteTransaction {
+            inner: self.pool.begin().await?,
+        }))
+    }
+
+    async fn get_active_keyset_id(&self) -> Result<Option<Id>, Self::Err> {
+        Ok(query(
+            r#"
+            SELECT
+                id
+            FROM
+                keyset
+            WHERE
+                active = 1;
+            "#,
+        )
+        .pluck(&self.pool)
+        .await?
+        .map(|id| Ok::<_, Error>(column_as_string!(id, Id::from_str, Id::from_bytes)))
+        .transpose()?)
+    }
+
     async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
         Ok(query(
             r#"SELECT
@@ -177,28 +305,6 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
         .collect::<Result<Vec<_>, _>>()?)
     }
 
-    async fn add_proof(&self, proof: AuthProof) -> Result<(), Self::Err> {
-        if let Err(err) = query(
-            r#"
-            INSERT INTO proof
-            (y, keyset_id, secret, c, state)
-            VALUES
-            (:y, :keyset_id, :secret, :c, :state)
-            "#,
-        )
-        .bind(":y", proof.y()?.to_bytes().to_vec())
-        .bind(":keyset_id", proof.keyset_id.to_string())
-        .bind(":secret", proof.secret.to_string())
-        .bind(":c", proof.c.to_bytes().to_vec())
-        .bind(":state", "UNSPENT".to_string())
-        .execute(&self.pool)
-        .await
-        {
-            tracing::debug!("Attempting to add known proof. Skipping.... {:?}", err);
-        }
-        Ok(())
-    }
-
     async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
         let mut current_states = query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)
             .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
@@ -216,65 +322,6 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
         Ok(ys.iter().map(|y| current_states.remove(y)).collect())
     }
 
-    async fn update_proof_state(
-        &self,
-        y: &PublicKey,
-        proofs_state: State,
-    ) -> Result<Option<State>, Self::Err> {
-        let transaction = self.pool.begin().await?;
-
-        let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#)
-            .bind(":y", y.to_bytes().to_vec())
-            .pluck(&transaction)
-            .await?
-            .map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str)))
-            .transpose()?;
-
-        query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#)
-            .bind(":y", y.to_bytes().to_vec())
-            .bind(
-                ":state",
-                current_state.as_ref().map(|state| state.to_string()),
-            )
-            .bind(":new_state", proofs_state.to_string())
-            .execute(&transaction)
-            .await?;
-
-        transaction.commit().await?;
-
-        Ok(current_state)
-    }
-
-    async fn add_blind_signatures(
-        &self,
-        blinded_messages: &[PublicKey],
-        blind_signatures: &[BlindSignature],
-    ) -> Result<(), Self::Err> {
-        let transaction = self.pool.begin().await?;
-
-        for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
-            query(
-                r#"
-                    INSERT
-                    INTO blind_signature
-                    (y, amount, keyset_id, c)
-                    VALUES
-                    (:y, :amount, :keyset_id, :c)
-                "#,
-            )
-            .bind(":y", message.to_bytes().to_vec())
-            .bind(":amount", u64::from(signature.amount) as i64)
-            .bind(":keyset_id", signature.keyset_id.to_string())
-            .bind(":c", signature.c.to_bytes().to_vec())
-            .execute(&transaction)
-            .await?;
-        }
-
-        transaction.commit().await?;
-
-        Ok(())
-    }
-
     async fn get_blind_signatures(
         &self,
         blinded_messages: &[PublicKey],
@@ -319,53 +366,6 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
             .collect())
     }
 
-    async fn add_protected_endpoints(
-        &self,
-        protected_endpoints: HashMap<ProtectedEndpoint, AuthRequired>,
-    ) -> Result<(), Self::Err> {
-        let transaction = self.pool.begin().await?;
-
-        for (endpoint, auth) in protected_endpoints.iter() {
-            if let Err(err) = query(
-                r#"
-                INSERT OR REPLACE INTO protected_endpoints
-                (endpoint, auth)
-                VALUES (:endpoint, :auth);
-                "#,
-            )
-            .bind(":endpoint", serde_json::to_string(endpoint)?)
-            .bind(":auth", serde_json::to_string(auth)?)
-            .execute(&transaction)
-            .await
-            {
-                tracing::debug!(
-                    "Attempting to add protected endpoint. Skipping.... {:?}",
-                    err
-                );
-            }
-        }
-
-        transaction.commit().await?;
-
-        Ok(())
-    }
-    async fn remove_protected_endpoints(
-        &self,
-        protected_endpoints: Vec<ProtectedEndpoint>,
-    ) -> Result<(), Self::Err> {
-        query(r#"DELETE FROM protected_endpoints WHERE endpoint IN (:endpoints)"#)
-            .bind_vec(
-                ":endpoints",
-                protected_endpoints
-                    .iter()
-                    .map(serde_json::to_string)
-                    .collect::<Result<_, _>>()?,
-            )
-            .execute(&self.pool)
-            .await?;
-        Ok(())
-    }
-
     async fn get_auth_for_endpoint(
         &self,
         protected_endpoint: ProtectedEndpoint,

+ 13 - 11
crates/cdk-sqlite/src/mint/memory.rs

@@ -1,9 +1,7 @@
 //! In-memory database that is provided by the `cdk-sqlite` crate, mainly for testing purposes.
 use std::collections::HashMap;
 
-use cdk_common::database::{
-    self, MintDatabase, MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase,
-};
+use cdk_common::database::{self, MintDatabase, MintKeysDatabase};
 use cdk_common::mint::{self, MintKeySetInfo, MintQuote};
 use cdk_common::nuts::{CurrencyUnit, Id, Proofs};
 use cdk_common::MintInfo;
@@ -31,28 +29,32 @@ pub async fn new_with_state(
     mint_info: MintInfo,
 ) -> Result<MintSqliteDatabase, database::Error> {
     let db = empty().await?;
+    let mut tx = MintKeysDatabase::begin_transaction(&db).await?;
 
     for active_keyset in active_keysets {
-        db.set_active_keyset(active_keyset.0, active_keyset.1)
+        tx.set_active_keyset(active_keyset.0, active_keyset.1)
             .await?;
     }
 
     for keyset in keysets {
-        db.add_keyset_info(keyset).await?;
+        tx.add_keyset_info(keyset).await?;
     }
+    tx.commit().await?;
+
+    let mut tx = MintDatabase::begin_transaction(&db).await?;
 
     for quote in mint_quotes {
-        db.add_mint_quote(quote).await?;
+        tx.add_or_replace_mint_quote(quote).await?;
     }
 
     for quote in melt_quotes {
-        db.add_melt_quote(quote).await?;
+        tx.add_melt_quote(quote).await?;
     }
 
-    db.add_proofs(pending_proofs, None).await?;
-    db.add_proofs(spent_proofs, None).await?;
-
-    db.set_mint_info(mint_info).await?;
+    tx.add_proofs(pending_proofs, None).await?;
+    tx.add_proofs(spent_proofs, None).await?;
+    tx.set_mint_info(mint_info).await?;
+    tx.commit().await?;
 
     Ok(db)
 }

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 490 - 331
crates/cdk-sqlite/src/mint/mod.rs


+ 1 - 1
crates/cdk/src/lib.rs

@@ -13,7 +13,7 @@ pub mod cdk_database {
     #[cfg(feature = "mint")]
     pub use cdk_common::database::{
         MintDatabase, MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase,
-        MintSignaturesDatabase,
+        MintSignaturesDatabase, MintTransaction,
     };
 }
 

+ 9 - 8
crates/cdk/src/mint/auth/mod.rs

@@ -163,17 +163,16 @@ impl Mint {
             err
         })?;
 
+        let mut tx = auth_localstore.begin_transaction().await?;
+
         // Add proof to the database
-        auth_localstore
-            .add_proof(proof.clone())
-            .await
-            .map_err(|err| {
-                tracing::error!("Failed to add proof to database: {:?}", err);
-                err
-            })?;
+        tx.add_proof(proof.clone()).await.map_err(|err| {
+            tracing::error!("Failed to add proof to database: {:?}", err);
+            err
+        })?;
 
         // Update proof state to spent
-        let state = match auth_localstore.update_proof_state(&y, State::Spent).await {
+        let state = match tx.update_proof_state(&y, State::Spent).await {
             Ok(state) => {
                 tracing::debug!(
                     "Successfully updated proof state to SPENT, previous state: {:?}",
@@ -205,6 +204,8 @@ impl Mint {
             }
         };
 
+        tx.commit().await?;
+
         Ok(())
     }
 

+ 2 - 78
crates/cdk/src/mint/check_spendable.rs

@@ -1,40 +1,10 @@
-use std::collections::{HashMap, HashSet};
-
 use futures::future::try_join_all;
 use tracing::instrument;
 
-use super::{CheckStateRequest, CheckStateResponse, Mint, ProofState, PublicKey, State};
-use crate::{cdk_database, Error};
+use super::{CheckStateRequest, CheckStateResponse, Mint, ProofState, State};
+use crate::Error;
 
 impl Mint {
-    /// Helper function to reset proofs to their original state, skipping spent proofs
-    async fn reset_proofs_to_original_state(
-        &self,
-        ys: &[PublicKey],
-        original_states: Vec<Option<State>>,
-    ) -> Result<(), Error> {
-        let mut ys_by_state = HashMap::new();
-        let mut unknown_proofs = Vec::new();
-        for (y, state) in ys.iter().zip(original_states) {
-            if let Some(state) = state {
-                // Skip attempting to update proofs that were originally spent
-                if state != State::Spent {
-                    ys_by_state.entry(state).or_insert_with(Vec::new).push(*y);
-                }
-            } else {
-                unknown_proofs.push(*y);
-            }
-        }
-
-        for (state, ys) in ys_by_state {
-            self.localstore.update_proofs_states(&ys, state).await?;
-        }
-
-        self.localstore.remove_proofs(&unknown_proofs, None).await?;
-
-        Ok(())
-    }
-
     /// Check state
     #[instrument(skip_all)]
     pub async fn check_state(
@@ -70,50 +40,4 @@ impl Mint {
             states: proof_states,
         })
     }
-
-    /// Check Tokens are not spent or pending
-    #[instrument(skip_all)]
-    pub async fn check_ys_spendable(
-        &self,
-        ys: &[PublicKey],
-        proof_state: State,
-    ) -> Result<(), Error> {
-        let original_proofs_state =
-            match self.localstore.update_proofs_states(ys, proof_state).await {
-                Ok(states) => states,
-                Err(cdk_database::Error::AttemptUpdateSpentProof)
-                | Err(cdk_database::Error::AttemptRemoveSpentProof) => {
-                    return Err(Error::TokenAlreadySpent)
-                }
-                Err(err) => return Err(err.into()),
-            };
-
-        assert!(ys.len() == original_proofs_state.len());
-
-        let proofs_state = original_proofs_state
-            .iter()
-            .flatten()
-            .collect::<HashSet<&State>>();
-
-        if proofs_state.contains(&State::Pending) {
-            // Reset states before returning error
-            self.reset_proofs_to_original_state(ys, original_proofs_state)
-                .await?;
-            return Err(Error::TokenPending);
-        }
-
-        if proofs_state.contains(&State::Spent) {
-            // Reset states before returning error
-            self.reset_proofs_to_original_state(ys, original_proofs_state)
-                .await?;
-            return Err(Error::TokenAlreadySpent);
-        }
-
-        for public_key in ys {
-            tracing::trace!("proof: {} set to {}", public_key.to_hex(), proof_state);
-            self.pubsub_manager.proof_state((*public_key, proof_state));
-        }
-
-        Ok(())
-    }
 }

+ 52 - 68
crates/cdk/src/mint/issue/issue_nut04.rs

@@ -105,7 +105,9 @@ impl Mint {
             create_invoice_response.request_lookup_id,
         );
 
-        self.localstore.add_mint_quote(quote.clone()).await?;
+        let mut tx = self.localstore.begin_transaction().await?;
+        tx.add_or_replace_mint_quote(quote.clone()).await?;
+        tx.commit().await?;
 
         let quote: MintQuoteBolt11Response<Uuid> = quote.into();
 
@@ -121,8 +123,8 @@ impl Mint {
         &self,
         quote_id: &Uuid,
     ) -> Result<MintQuoteBolt11Response<Uuid>, Error> {
-        let quote = self
-            .localstore
+        let mut tx = self.localstore.begin_transaction().await?;
+        let mut mint_quote = tx
             .get_mint_quote(quote_id)
             .await?
             .ok_or(Error::UnknownQuote)?;
@@ -130,30 +132,24 @@ impl Mint {
         // Since the pending state is not part of the NUT it should not be part of the
         // response. In practice the wallet should not be checking the state of
         // a quote while waiting for the mint response.
-        let state = match quote.state {
-            MintQuoteState::Pending => MintQuoteState::Paid,
-            MintQuoteState::Unpaid => self.check_mint_quote_paid(quote_id).await?,
-            s => s,
-        };
+        if mint_quote.state == MintQuoteState::Unpaid {
+            self.check_mint_quote_paid(tx, &mut mint_quote)
+                .await?
+                .commit()
+                .await?;
+        }
 
         Ok(MintQuoteBolt11Response {
-            quote: quote.id,
-            request: quote.request,
-            state,
-            expiry: Some(quote.expiry),
-            pubkey: quote.pubkey,
-            amount: Some(quote.amount),
-            unit: Some(quote.unit.clone()),
+            quote: mint_quote.id,
+            request: mint_quote.request,
+            state: mint_quote.state,
+            expiry: Some(mint_quote.expiry),
+            pubkey: mint_quote.pubkey,
+            amount: Some(mint_quote.amount),
+            unit: Some(mint_quote.unit.clone()),
         })
     }
 
-    /// Update mint quote
-    #[instrument(skip_all)]
-    pub async fn update_mint_quote(&self, quote: MintQuote) -> Result<(), Error> {
-        self.localstore.add_mint_quote(quote).await?;
-        Ok(())
-    }
-
     /// Get mint quotes
     #[instrument(skip_all)]
     pub async fn mint_quotes(&self) -> Result<Vec<MintQuote>, Error> {
@@ -186,7 +182,9 @@ impl Mint {
     /// Remove mint quote
     #[instrument(skip_all)]
     pub async fn remove_mint_quote(&self, quote_id: &Uuid) -> Result<(), Error> {
-        self.localstore.remove_mint_quote(quote_id).await?;
+        let mut tx = self.localstore.begin_transaction().await?;
+        tx.remove_mint_quote(quote_id).await?;
+        tx.commit().await?;
 
         Ok(())
     }
@@ -215,9 +213,10 @@ impl Mint {
             mint_quote.id
         );
         if mint_quote.state != MintQuoteState::Issued && mint_quote.state != MintQuoteState::Paid {
-            self.localstore
-                .update_mint_quote_state(&mint_quote.id, MintQuoteState::Paid)
+            let mut tx = self.localstore.begin_transaction().await?;
+            tx.update_mint_quote_state(&mint_quote.id, MintQuoteState::Paid)
                 .await?;
+            tx.commit().await?;
         } else {
             tracing::debug!(
                 "{} Quote already {} continuing",
@@ -238,39 +237,27 @@ impl Mint {
         &self,
         mint_request: MintRequest<Uuid>,
     ) -> Result<MintResponse, Error> {
-        let mint_quote = self
-            .localstore
+        let mut tx = self.localstore.begin_transaction().await?;
+
+        let mut mint_quote = tx
             .get_mint_quote(&mint_request.quote)
             .await?
             .ok_or(Error::UnknownQuote)?;
 
-        let state = self
-            .localstore
-            .update_mint_quote_state(&mint_request.quote, MintQuoteState::Pending)
-            .await?;
-
-        let state = if state == MintQuoteState::Unpaid {
-            self.check_mint_quote_paid(&mint_quote.id).await?
+        let mut tx = if mint_quote.state == MintQuoteState::Unpaid {
+            self.check_mint_quote_paid(tx, &mut mint_quote).await?
         } else {
-            state
+            tx
         };
 
-        match state {
+        match mint_quote.state {
             MintQuoteState::Unpaid => {
-                let _state = self
-                    .localstore
-                    .update_mint_quote_state(&mint_request.quote, MintQuoteState::Unpaid)
-                    .await?;
                 return Err(Error::UnpaidQuote);
             }
             MintQuoteState::Pending => {
                 return Err(Error::PendingQuote);
             }
             MintQuoteState::Issued => {
-                let _state = self
-                    .localstore
-                    .update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued)
-                    .await?;
                 return Err(Error::IssuedQuote);
             }
             MintQuoteState::Paid => (),
@@ -282,17 +269,14 @@ impl Mint {
             mint_request.verify_signature(pubkey)?;
         }
 
-        let Verification { amount, unit } = match self.verify_outputs(&mint_request.outputs).await {
-            Ok(verification) => verification,
-            Err(err) => {
-                tracing::debug!("Could not verify mint outputs");
-                self.localstore
-                    .update_mint_quote_state(&mint_request.quote, MintQuoteState::Paid)
-                    .await?;
-
-                return Err(err);
-            }
-        };
+        let Verification { amount, unit } =
+            match self.verify_outputs(&mut tx, &mint_request.outputs).await {
+                Ok(verification) => verification,
+                Err(err) => {
+                    tracing::debug!("Could not verify mint outputs");
+                    return Err(err);
+                }
+            };
 
         // We check the total value of blinded messages == mint quote
         if amount != mint_quote.amount {
@@ -313,21 +297,21 @@ impl Mint {
             blind_signatures.push(blind_signature);
         }
 
-        self.localstore
-            .add_blind_signatures(
-                &mint_request
-                    .outputs
-                    .iter()
-                    .map(|p| p.blinded_secret)
-                    .collect::<Vec<PublicKey>>(),
-                &blind_signatures,
-                Some(mint_request.quote),
-            )
+        tx.add_blind_signatures(
+            &mint_request
+                .outputs
+                .iter()
+                .map(|p| p.blinded_secret)
+                .collect::<Vec<PublicKey>>(),
+            &blind_signatures,
+            Some(mint_request.quote),
+        )
+        .await?;
+
+        tx.update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued)
             .await?;
 
-        self.localstore
-            .update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued)
-            .await?;
+        tx.commit().await?;
 
         self.pubsub_manager
             .mint_quote_bolt11_status(mint_quote, MintQuoteState::Issued);

+ 13 - 12
crates/cdk/src/mint/ln.rs

@@ -1,19 +1,18 @@
 use cdk_common::common::PaymentProcessorKey;
+use cdk_common::database::{self, MintTransaction};
+use cdk_common::mint::MintQuote;
 use cdk_common::MintQuoteState;
 
 use super::Mint;
-use crate::mint::Uuid;
 use crate::Error;
 
 impl Mint {
     /// Check the status of an ln payment for a quote
-    pub async fn check_mint_quote_paid(&self, quote_id: &Uuid) -> Result<MintQuoteState, Error> {
-        let mut quote = self
-            .localstore
-            .get_mint_quote(quote_id)
-            .await?
-            .ok_or(Error::UnknownQuote)?;
-
+    pub async fn check_mint_quote_paid(
+        &self,
+        tx: Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>,
+        quote: &mut MintQuote,
+    ) -> Result<Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>, Error> {
         let ln = match self.ln.get(&PaymentProcessorKey::new(
             quote.unit.clone(),
             cdk_common::PaymentMethod::Bolt11,
@@ -26,14 +25,16 @@ impl Mint {
             }
         };
 
+        tx.commit().await?;
+
         let ln_status = ln
             .check_incoming_payment_status(&quote.request_lookup_id)
             .await?;
 
+        let mut tx = self.localstore.begin_transaction().await?;
+
         if ln_status != quote.state && quote.state != MintQuoteState::Issued {
-            self.localstore
-                .update_mint_quote_state(quote_id, ln_status)
-                .await?;
+            tx.update_mint_quote_state(&quote.id, ln_status).await?;
 
             quote.state = ln_status;
 
@@ -41,6 +42,6 @@ impl Mint {
                 .mint_quote_bolt11_status(quote.clone(), ln_status);
         }
 
-        Ok(quote.state)
+        Ok(tx)
     }
 }

+ 110 - 152
crates/cdk/src/mint/melt.rs

@@ -1,6 +1,7 @@
 use std::str::FromStr;
 
 use anyhow::bail;
+use cdk_common::database::{self, MintTransaction};
 use cdk_common::nut00::ProofsMethods;
 use cdk_common::nut05::MeltMethodOptions;
 use cdk_common::MeltOptions;
@@ -14,6 +15,7 @@ use super::{
 };
 use crate::amount::to_unit;
 use crate::cdk_payment::{MakePaymentResponse, MintPayment};
+use crate::mint::proof_writer::ProofWriter;
 use crate::mint::verification::Verification;
 use crate::mint::SigFlag;
 use crate::nuts::nut11::{enforce_sig_flag, EnforceSigFlag};
@@ -170,7 +172,30 @@ impl Mint {
             payment_quote.request_lookup_id
         );
 
-        self.localstore.add_melt_quote(quote.clone()).await?;
+        let mut tx = self.localstore.begin_transaction().await?;
+        if let Some(mut from_db_quote) = tx.get_melt_quote(&quote.id).await? {
+            if from_db_quote.state != quote.state {
+                tx.update_melt_quote_state(&quote.id, from_db_quote.state)
+                    .await?;
+                from_db_quote.state = quote.state;
+            }
+            if from_db_quote.request_lookup_id != quote.request_lookup_id {
+                tx.update_melt_quote_request_lookup_id(&quote.id, &quote.request_lookup_id)
+                    .await?;
+                from_db_quote.request_lookup_id = quote.request_lookup_id.clone();
+            }
+            if from_db_quote != quote {
+                return Err(Error::Internal);
+            }
+        } else if let Err(err) = tx.add_melt_quote(quote.clone()).await {
+            match err {
+                database::Error::Duplicate => {
+                    return Err(Error::RequestAlreadyPaid);
+                }
+                _ => return Err(Error::from(err)),
+            }
+        }
+        tx.commit().await?;
 
         Ok(quote.into())
     }
@@ -208,13 +233,6 @@ impl Mint {
         })
     }
 
-    /// Update melt quote
-    #[instrument(skip_all)]
-    pub async fn update_melt_quote(&self, quote: MeltQuote) -> Result<(), Error> {
-        self.localstore.add_melt_quote(quote).await?;
-        Ok(())
-    }
-
     /// Get melt quotes
     #[instrument(skip_all)]
     pub async fn melt_quotes(&self) -> Result<Vec<MeltQuote>, Error> {
@@ -222,14 +240,6 @@ impl Mint {
         Ok(quotes)
     }
 
-    /// Remove melt quote
-    #[instrument(skip(self))]
-    pub async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Error> {
-        self.localstore.remove_melt_quote(quote_id).await?;
-
-        Ok(())
-    }
-
     /// Check melt has expected fees
     #[instrument(skip_all)]
     pub async fn check_melt_expected_ln_fees(
@@ -291,10 +301,10 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn verify_melt_request(
         &self,
+        tx: &mut Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>,
         melt_request: &MeltRequest<Uuid>,
-    ) -> Result<MeltQuote, Error> {
-        let (state, quote) = self
-            .localstore
+    ) -> Result<(ProofWriter, MeltQuote), Error> {
+        let (state, quote) = tx
             .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending)
             .await?;
 
@@ -315,8 +325,6 @@ impl Mint {
 
         ensure_cdk!(input_unit.is_some(), Error::UnsupportedUnit);
 
-        let input_ys = melt_request.inputs().ys()?;
-
         let fee = self.get_proofs_fee(melt_request.inputs()).await?;
 
         let required_total = quote.amount + quote.fee_reserve + fee;
@@ -337,27 +345,10 @@ impl Mint {
             ));
         }
 
-        if let Some(err) = self
-            .localstore
-            .add_proofs(melt_request.inputs().clone(), None)
-            .await
-            .err()
-        {
-            return match err {
-                cdk_common::database::Error::Duplicate => Err(Error::TokenPending),
-                cdk_common::database::Error::AttemptUpdateSpentProof => {
-                    Err(Error::TokenAlreadySpent)
-                }
-                err => Err(Error::Database(err)),
-            };
-        }
-
-        self.check_ys_spendable(&input_ys, State::Pending).await?;
+        let mut proof_writer =
+            ProofWriter::new(self.localstore.clone(), self.pubsub_manager.clone());
 
-        for proof in melt_request.inputs() {
-            self.pubsub_manager
-                .proof_state((proof.y()?, State::Pending));
-        }
+        proof_writer.add_proofs(tx, melt_request.inputs()).await?;
 
         let EnforceSigFlag { sig_flag, .. } = enforce_sig_flag(melt_request.inputs().clone());
 
@@ -368,43 +359,14 @@ impl Mint {
                 let Verification {
                     amount: _,
                     unit: output_unit,
-                } = self.verify_outputs(outputs).await?;
+                } = self.verify_outputs(tx, outputs).await?;
 
                 ensure_cdk!(input_unit == output_unit, Error::UnsupportedUnit);
             }
         }
 
         tracing::debug!("Verified melt quote: {}", melt_request.quote());
-        Ok(quote)
-    }
-
-    /// Process unpaid melt request
-    /// In the event that a melt request fails and the lighthing payment is not
-    /// made The proofs should be returned to an unspent state and the
-    /// quote should be unpaid
-    #[instrument(skip_all)]
-    pub async fn process_unpaid_melt(&self, melt_request: &MeltRequest<Uuid>) -> Result<(), Error> {
-        let input_ys = melt_request.inputs().ys()?;
-
-        self.localstore
-            .remove_proofs(&input_ys, Some(*melt_request.quote()))
-            .await?;
-
-        self.localstore
-            .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Unpaid)
-            .await?;
-
-        if let Ok(Some(quote)) = self.localstore.get_melt_quote(melt_request.quote()).await {
-            self.pubsub_manager
-                .melt_quote_status(quote, None, None, MeltQuoteState::Unpaid);
-        }
-
-        for public_key in input_ys {
-            self.pubsub_manager
-                .proof_state((public_key, State::Unspent));
-        }
-
-        Ok(())
+        Ok((proof_writer, quote))
     }
 
     /// Melt Bolt11
@@ -435,40 +397,27 @@ impl Mint {
             }
         }
 
-        let quote = match self.verify_melt_request(melt_request).await {
-            Ok(quote) => quote,
-            Err(err) => {
+        let mut tx = self.localstore.begin_transaction().await?;
+
+        let (proof_writer, quote) = self
+            .verify_melt_request(&mut tx, melt_request)
+            .await
+            .map_err(|err| {
                 tracing::debug!("Error attempting to verify melt quote: {}", err);
+                err
+            })?;
 
-                if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                    tracing::error!(
-                        "Could not reset melt quote {} state: {}",
-                        melt_request.quote(),
-                        err
-                    );
-                }
-                return Err(err);
-            }
-        };
+        let settled_internally_amount = self
+            .handle_internal_melt_mint(&mut tx, &quote, melt_request)
+            .await
+            .map_err(|err| {
+                tracing::error!("Attempting to settle internally failed: {}", err);
+                err
+            })?;
 
-        let settled_internally_amount =
-            match self.handle_internal_melt_mint(&quote, melt_request).await {
-                Ok(amount) => amount,
-                Err(err) => {
-                    tracing::error!("Attempting to settle internally failed");
-                    if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                        tracing::error!(
-                            "Could not reset melt quote {} state: {}",
-                            melt_request.quote(),
-                            err
-                        );
-                    }
-                    return Err(err);
-                }
-            };
+        let (tx, preimage, amount_spent_quote_unit, quote) = match settled_internally_amount {
+            Some(amount_spent) => (tx, None, amount_spent, quote),
 
-        let (preimage, amount_spent_quote_unit) = match settled_internally_amount {
-            Some(amount_spent) => (None, amount_spent),
             None => {
                 // If the quote unit is SAT or MSAT we can check that the expected fees are
                 // provided. We also check if the quote is less then the invoice
@@ -484,9 +433,6 @@ impl Mint {
                             Ok(amount) => amount,
                             Err(err) => {
                                 tracing::error!("Fee is not expected: {}", err);
-                                if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                                    tracing::error!("Could not reset melt quote state: {}", err);
-                                }
                                 return Err(Error::Internal);
                             }
                         }
@@ -501,14 +447,13 @@ impl Mint {
                     Some(ln) => ln,
                     None => {
                         tracing::info!("Could not get ln backend for {}, bolt11 ", quote.unit);
-                        if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                            tracing::error!("Could not reset melt quote state: {}", err);
-                        }
-
                         return Err(Error::UnsupportedUnit);
                     }
                 };
 
+                // Commit before talking to the external call
+                tx.commit().await?;
+
                 let pre = match ln
                     .make_payment(quote.clone(), partial_amount, Some(quote.fee_reserve))
                     .await
@@ -517,13 +462,18 @@ impl Mint {
                         if pay.status == MeltQuoteState::Unknown
                             || pay.status == MeltQuoteState::Failed =>
                     {
-                        let check_response = check_payment_state(Arc::clone(ln), &quote)
-                            .await
-                            .map_err(|_| Error::Internal)?;
+                        let check_response =
+                            if let Ok(ok) = check_payment_state(Arc::clone(ln), &quote).await {
+                                ok
+                            } else {
+                                return Err(Error::Internal);
+                            };
 
                         if check_response.status == MeltQuoteState::Paid {
                             tracing::warn!("Pay invoice returned {} but check returned {}. Proofs stuck as pending", pay.status.to_string(), check_response.status.to_string());
 
+                            proof_writer.commit();
+
                             return Err(Error::Internal);
                         }
 
@@ -535,21 +485,22 @@ impl Mint {
                         // hold the proofs as pending to we reset them  and return an error.
                         if matches!(err, cdk_payment::Error::InvoiceAlreadyPaid) {
                             tracing::debug!("Invoice already paid, resetting melt quote");
-                            if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                                tracing::error!("Could not reset melt quote state: {}", err);
-                            }
                             return Err(Error::RequestAlreadyPaid);
                         }
 
                         tracing::error!("Error returned attempting to pay: {} {}", quote.id, err);
 
-                        let check_response = check_payment_state(Arc::clone(ln), &quote)
-                            .await
-                            .map_err(|_| Error::Internal)?;
+                        let check_response =
+                            if let Ok(ok) = check_payment_state(Arc::clone(ln), &quote).await {
+                                ok
+                            } else {
+                                proof_writer.commit();
+                                return Err(Error::Internal);
+                            };
                         // If there error is something else we want to check the status of the payment ensure it is not pending or has been made.
                         if check_response.status == MeltQuoteState::Paid {
                             tracing::warn!("Pay invoice returned an error but check returned {}. Proofs stuck as pending", check_response.status.to_string());
-
+                            proof_writer.commit();
                             return Err(Error::Internal);
                         }
                         check_response
@@ -563,9 +514,6 @@ impl Mint {
                             "Lightning payment for quote {} failed.",
                             melt_request.quote()
                         );
-                        if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                            tracing::error!("Could not reset melt quote state: {}", err);
-                        }
                         return Err(Error::PaymentFailed);
                     }
                     MeltQuoteState::Pending => {
@@ -573,6 +521,7 @@ impl Mint {
                             "LN payment pending, proofs are stuck as pending for quote: {}",
                             melt_request.quote()
                         );
+                        proof_writer.commit();
                         return Err(Error::PendingQuote);
                     }
                 }
@@ -584,6 +533,7 @@ impl Mint {
                     to_unit(pre.total_spent, &pre.unit, &quote.unit).unwrap_or_default();
 
                 let payment_lookup_id = pre.payment_lookup_id;
+                let mut tx = self.localstore.begin_transaction().await?;
 
                 if payment_lookup_id != quote.request_lookup_id {
                     tracing::info!(
@@ -595,19 +545,34 @@ impl Mint {
                     let mut melt_quote = quote;
                     melt_quote.request_lookup_id = payment_lookup_id;
 
-                    if let Err(err) = self.localstore.add_melt_quote(melt_quote).await {
+                    if let Err(err) = tx
+                        .update_melt_quote_request_lookup_id(
+                            &melt_quote.id,
+                            &melt_quote.request_lookup_id,
+                        )
+                        .await
+                    {
                         tracing::warn!("Could not update payment lookup id: {}", err);
                     }
-                }
 
-                (pre.payment_proof, amount_spent)
+                    (tx, pre.payment_proof, amount_spent, melt_quote)
+                } else {
+                    (tx, pre.payment_proof, amount_spent, quote)
+                }
             }
         };
 
         // If we made it here the payment has been made.
         // We process the melt burning the inputs and returning change
         let res = self
-            .process_melt_request(melt_request, preimage, amount_spent_quote_unit)
+            .process_melt_request(
+                tx,
+                proof_writer,
+                quote,
+                melt_request,
+                preimage,
+                amount_spent_quote_unit,
+            )
             .await
             .map_err(|err| {
                 tracing::error!("Could not process melt request: {}", err);
@@ -622,26 +587,22 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn process_melt_request(
         &self,
+        mut tx: Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>,
+        mut proof_writer: ProofWriter,
+        quote: MeltQuote,
         melt_request: &MeltRequest<Uuid>,
         payment_preimage: Option<String>,
         total_spent: Amount,
     ) -> Result<MeltQuoteBolt11Response<Uuid>, Error> {
         tracing::debug!("Processing melt quote: {}", melt_request.quote());
 
-        let quote = self
-            .localstore
-            .get_melt_quote(melt_request.quote())
-            .await?
-            .ok_or(Error::UnknownQuote)?;
-
         let input_ys = melt_request.inputs().ys()?;
 
-        self.localstore
-            .update_proofs_states(&input_ys, State::Spent)
+        proof_writer
+            .update_proofs_states(&mut tx, &input_ys, State::Spent)
             .await?;
 
-        self.localstore
-            .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Paid)
+        tx.update_melt_quote_state(melt_request.quote(), MeltQuoteState::Paid)
             .await?;
 
         self.pubsub_manager.melt_quote_status(
@@ -651,10 +612,6 @@ impl Mint {
             MeltQuoteState::Paid,
         );
 
-        for public_key in input_ys {
-            self.pubsub_manager.proof_state((public_key, State::Spent));
-        }
-
         let mut change = None;
 
         // Check if there is change to return
@@ -664,8 +621,7 @@ impl Mint {
                 let blinded_messages: Vec<PublicKey> =
                     outputs.iter().map(|b| b.blinded_secret).collect();
 
-                if self
-                    .localstore
+                if tx
                     .get_blind_signatures(&blinded_messages)
                     .await?
                     .iter()
@@ -707,21 +663,23 @@ impl Mint {
                     change_sigs.push(blinded_signature)
                 }
 
-                self.localstore
-                    .add_blind_signatures(
-                        &outputs[0..change_sigs.len()]
-                            .iter()
-                            .map(|o| o.blinded_secret)
-                            .collect::<Vec<PublicKey>>(),
-                        &change_sigs,
-                        Some(quote.id),
-                    )
-                    .await?;
+                tx.add_blind_signatures(
+                    &outputs[0..change_sigs.len()]
+                        .iter()
+                        .map(|o| o.blinded_secret)
+                        .collect::<Vec<PublicKey>>(),
+                    &change_sigs,
+                    Some(quote.id),
+                )
+                .await?;
 
                 change = Some(change_sigs);
             }
         }
 
+        proof_writer.commit();
+        tx.commit().await?;
+
         Ok(MeltQuoteBolt11Response {
             amount: quote.amount,
             paid: Some(true),

+ 13 - 10
crates/cdk/src/mint/mod.rs

@@ -7,7 +7,7 @@ use arc_swap::ArcSwap;
 use cdk_common::common::{PaymentProcessorKey, QuoteTTL};
 #[cfg(feature = "auth")]
 use cdk_common::database::MintAuthDatabase;
-use cdk_common::database::{self, MintDatabase};
+use cdk_common::database::{self, MintDatabase, MintTransaction};
 use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind};
 use cdk_common::secret;
 use cdk_signatory::signatory::{Signatory, SignatoryKeySet};
@@ -24,9 +24,9 @@ use crate::cdk_payment::{self, MintPayment};
 use crate::error::Error;
 use crate::fees::calculate_fee;
 use crate::nuts::*;
-use crate::Amount;
 #[cfg(feature = "auth")]
 use crate::OidcClient;
+use crate::{cdk_database, Amount};
 
 #[cfg(feature = "auth")]
 pub(crate) mod auth;
@@ -36,6 +36,7 @@ mod issue;
 mod keysets;
 mod ln;
 mod melt;
+mod proof_writer;
 mod start_up_check;
 pub mod subscription;
 mod swap;
@@ -225,7 +226,9 @@ impl Mint {
     /// Set mint info
     #[instrument(skip_all)]
     pub async fn set_mint_info(&self, mint_info: MintInfo) -> Result<(), Error> {
-        Ok(self.localstore.set_mint_info(mint_info).await?)
+        let mut tx = self.localstore.begin_transaction().await?;
+        tx.set_mint_info(mint_info).await?;
+        Ok(tx.commit().await?)
     }
 
     /// Get quote ttl
@@ -237,7 +240,9 @@ impl Mint {
     /// Set quote ttl
     #[instrument(skip_all)]
     pub async fn set_quote_ttl(&self, quote_ttl: QuoteTTL) -> Result<(), Error> {
-        Ok(self.localstore.set_quote_ttl(quote_ttl).await?)
+        let mut tx = self.localstore.begin_transaction().await?;
+        tx.set_quote_ttl(quote_ttl).await?;
+        Ok(tx.commit().await?)
     }
 
     /// Wait for any invoice to be paid
@@ -407,14 +412,11 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn handle_internal_melt_mint(
         &self,
+        tx: &mut Box<dyn MintTransaction<'_, cdk_database::Error> + Send + Sync + '_>,
         melt_quote: &MeltQuote,
         melt_request: &MeltRequest<Uuid>,
     ) -> Result<Option<Amount>, Error> {
-        let mint_quote = match self
-            .localstore
-            .get_mint_quote_by_request(&melt_quote.request)
-            .await
-        {
+        let mint_quote = match tx.get_mint_quote_by_request(&melt_quote.request).await {
             Ok(Some(mint_quote)) => mint_quote,
             // Not an internal melt -> mint
             Ok(None) => return Ok(None),
@@ -423,6 +425,7 @@ impl Mint {
                 return Err(Error::Internal);
             }
         };
+        tracing::error!("internal stuff");
 
         // Mint quote has already been settled, proofs should not be burned or held.
         if mint_quote.state == MintQuoteState::Issued || mint_quote.state == MintQuoteState::Paid {
@@ -449,7 +452,7 @@ impl Mint {
 
         let amount = melt_quote.amount;
 
-        self.update_mint_quote(mint_quote).await?;
+        tx.add_or_replace_mint_quote(mint_quote).await?;
 
         Ok(Some(amount))
     }

+ 214 - 0
crates/cdk/src/mint/proof_writer.rs

@@ -0,0 +1,214 @@
+//! Proof writer
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use cdk_common::database::{self, MintDatabase, MintTransaction};
+use cdk_common::{Error, Proofs, ProofsMethods, PublicKey, State};
+
+use super::subscription::PubSubManager;
+
+type Db = Arc<dyn MintDatabase<database::Error> + Send + Sync>;
+type Tx<'a, 'b> = Box<dyn MintTransaction<'a, database::Error> + Send + Sync + 'b>;
+
+/// Proof writer
+///
+/// This is a proof writer that emulates a database transaction but without holding the transaction
+/// alive while waiting for external events to be fully committed to the database; instead, it
+/// maintains a `pending` state.
+///
+/// This struct allows for premature exit on error, enabling it to remove proofs or reset their
+/// status.
+///
+/// This struct is not fully ACID. If the process exits due to a panic, and the `Drop` function
+/// cannot be run, the reset process should reset the state.
+pub struct ProofWriter {
+    db: Option<Db>,
+    pubsub_manager: Arc<PubSubManager>,
+    proof_original_states: Option<HashMap<PublicKey, Option<State>>>,
+}
+
+impl ProofWriter {
+    /// Creates a new ProofWriter on top of the database
+    pub fn new(db: Db, pubsub_manager: Arc<PubSubManager>) -> Self {
+        Self {
+            db: Some(db),
+            pubsub_manager,
+            proof_original_states: Some(Default::default()),
+        }
+    }
+
+    /// The changes are permanent, consume the struct removing the database, so the Drop does
+    /// nothing
+    pub fn commit(mut self) {
+        self.db.take();
+        self.proof_original_states.take();
+    }
+
+    /// Add proofs
+    pub async fn add_proofs(
+        &mut self,
+        tx: &mut Tx<'_, '_>,
+        proofs: &Proofs,
+    ) -> Result<Vec<PublicKey>, Error> {
+        let proof_states = if let Some(proofs) = self.proof_original_states.as_mut() {
+            proofs
+        } else {
+            return Err(Error::Internal);
+        };
+
+        if let Some(err) = tx.add_proofs(proofs.clone(), None).await.err() {
+            return match err {
+                cdk_common::database::Error::Duplicate => Err(Error::TokenPending),
+                cdk_common::database::Error::AttemptUpdateSpentProof => {
+                    Err(Error::TokenAlreadySpent)
+                }
+                err => Err(Error::Database(err)),
+            };
+        }
+
+        let ys = proofs.ys()?;
+
+        for pk in ys.iter() {
+            proof_states.insert(*pk, None);
+        }
+
+        self.update_proofs_states(tx, &ys, State::Pending).await?;
+
+        Ok(ys)
+    }
+
+    /// Update proof status
+    pub async fn update_proofs_states(
+        &mut self,
+        tx: &mut Tx<'_, '_>,
+        ys: &[PublicKey],
+        new_proof_state: State,
+    ) -> Result<(), Error> {
+        let proof_states = if let Some(proofs) = self.proof_original_states.as_mut() {
+            proofs
+        } else {
+            return Err(Error::Internal);
+        };
+
+        let original_proofs_state = match tx.update_proofs_states(ys, new_proof_state).await {
+            Ok(states) => states,
+            Err(database::Error::AttemptUpdateSpentProof)
+            | Err(database::Error::AttemptRemoveSpentProof) => {
+                return Err(Error::TokenAlreadySpent)
+            }
+            Err(err) => return Err(err.into()),
+        };
+
+        if ys.len() != original_proofs_state.len() {
+            return Err(Error::Internal);
+        }
+
+        let proofs_state = original_proofs_state
+            .iter()
+            .flatten()
+            .map(|x| x.to_owned())
+            .collect::<HashSet<State>>();
+
+        let forbidden_states = if new_proof_state == State::Pending {
+            // If the new state is `State::Pending` it cannot be pending already
+            vec![State::Pending, State::Spent]
+        } else {
+            // For other state it cannot be spent
+            vec![State::Spent]
+        };
+
+        for forbidden_state in forbidden_states.iter() {
+            if proofs_state.contains(forbidden_state) {
+                reset_proofs_to_original_state(tx, ys, original_proofs_state).await?;
+
+                return Err(if proofs_state.contains(&State::Pending) {
+                    Error::TokenPending
+                } else {
+                    Error::TokenAlreadySpent
+                });
+            }
+        }
+
+        for (idx, ys) in ys.iter().enumerate() {
+            proof_states
+                .entry(*ys)
+                .or_insert(original_proofs_state[idx]);
+        }
+
+        for pk in ys {
+            self.pubsub_manager.proof_state((*pk, new_proof_state));
+        }
+
+        Ok(())
+    }
+
+    /// Rollback all changes in this ProofWriter consuming it.
+    pub async fn rollback(mut self, tx: &mut Tx<'_, '_>) -> Result<(), Error> {
+        let (ys, original_states) = if let Some(proofs) = self.proof_original_states.take() {
+            proofs.into_iter().unzip::<_, _, Vec<_>, Vec<_>>()
+        } else {
+            return Ok(());
+        };
+        reset_proofs_to_original_state(tx, &ys, original_states).await?;
+        Ok(())
+    }
+}
+
+/// Resets proofs to their original states or removes them
+#[inline(always)]
+async fn reset_proofs_to_original_state(
+    tx: &mut Tx<'_, '_>,
+    ys: &[PublicKey],
+    original_states: Vec<Option<State>>,
+) -> Result<(), Error> {
+    let mut ys_by_state = HashMap::new();
+    let mut unknown_proofs = Vec::new();
+    for (y, state) in ys.iter().zip(original_states) {
+        if let Some(state) = state {
+            // Skip attempting to update proofs that were originally spent
+            if state != State::Spent {
+                ys_by_state.entry(state).or_insert_with(Vec::new).push(*y);
+            }
+        } else {
+            unknown_proofs.push(*y);
+        }
+    }
+
+    for (state, ys) in ys_by_state {
+        tx.update_proofs_states(&ys, state).await?;
+    }
+
+    tx.remove_proofs(&unknown_proofs, None).await?;
+
+    Ok(())
+}
+
+#[inline(always)]
+async fn rollback(
+    db: Arc<dyn MintDatabase<database::Error> + Send + Sync>,
+    ys: Vec<PublicKey>,
+    original_states: Vec<Option<State>>,
+) -> Result<(), Error> {
+    let mut tx = db.begin_transaction().await?;
+    reset_proofs_to_original_state(&mut tx, &ys, original_states).await?;
+    tx.commit().await?;
+
+    Ok(())
+}
+
+impl Drop for ProofWriter {
+    fn drop(&mut self) {
+        let db = if let Some(db) = self.db.take() {
+            db
+        } else {
+            return;
+        };
+        let (ys, states) = if let Some(proofs) = self.proof_original_states.take() {
+            proofs.into_iter().unzip()
+        } else {
+            return;
+        };
+
+        tokio::spawn(rollback(db, ys, states));
+    }
+}

+ 13 - 5
crates/cdk/src/mint/start_up_check.rs

@@ -21,10 +21,14 @@ impl Mint {
             "There are {} pending and unpaid mint quotes.",
             all_quotes.len()
         );
-        for quote in all_quotes.iter() {
+        for mut quote in all_quotes.into_iter() {
             tracing::debug!("Checking status of mint quote: {}", quote.id);
-            if let Err(err) = self.check_mint_quote_paid(&quote.id).await {
-                tracing::error!("Could not check status of {}, {}", quote.id, err);
+            match self
+                .check_mint_quote_paid(self.localstore.begin_transaction().await?, &mut quote)
+                .await
+            {
+                Ok(tx) => tx.commit().await?,
+                Err(err) => tracing::error!("Could not check status of {}, {}", quote.id, err),
             }
         }
         Ok(())
@@ -39,6 +43,8 @@ impl Mint {
             .collect();
         tracing::info!("There are {} pending melt quotes.", pending_quotes.len());
 
+        let mut tx = self.localstore.begin_transaction().await?;
+
         for pending_quote in pending_quotes {
             tracing::debug!("Checking status for melt quote {}.", pending_quote.id);
 
@@ -72,8 +78,7 @@ impl Mint {
                 MeltQuoteState::Unknown => MeltQuoteState::Unpaid,
             };
 
-            if let Err(err) = self
-                .localstore
+            if let Err(err) = tx
                 .update_melt_quote_state(&pending_quote.id, melt_quote_state)
                 .await
             {
@@ -86,6 +91,9 @@ impl Mint {
                 );
             };
         }
+
+        tx.commit().await?;
+
         Ok(())
     }
 }

+ 25 - 47
crates/cdk/src/mint/swap.rs

@@ -1,9 +1,9 @@
 use tracing::instrument;
 
 use super::nut11::{enforce_sig_flag, EnforceSigFlag};
+use super::proof_writer::ProofWriter;
 use super::{Mint, PublicKey, SigFlag, State, SwapRequest, SwapResponse};
-use crate::nuts::nut00::ProofsMethods;
-use crate::{cdk_database, Error};
+use crate::Error;
 
 impl Mint {
     /// Process Swap
@@ -12,8 +12,10 @@ impl Mint {
         &self,
         swap_request: SwapRequest,
     ) -> Result<SwapResponse, Error> {
+        let mut tx = self.localstore.begin_transaction().await?;
+
         if let Err(err) = self
-            .verify_transaction_balanced(swap_request.inputs(), swap_request.outputs())
+            .verify_transaction_balanced(&mut tx, swap_request.inputs(), swap_request.outputs())
             .await
         {
             tracing::debug!("Attempt to swap unbalanced transaction, aborting: {err}");
@@ -22,23 +24,11 @@ impl Mint {
 
         self.validate_sig_flag(&swap_request).await?;
 
-        // After swap request is fully validated, add the new proofs to DB
-        let input_ys = swap_request.inputs().ys()?;
-        if let Some(err) = self
-            .localstore
-            .add_proofs(swap_request.inputs().clone(), None)
-            .await
-            .err()
-        {
-            return match err {
-                cdk_common::database::Error::Duplicate => Err(Error::TokenPending),
-                cdk_common::database::Error::AttemptUpdateSpentProof => {
-                    Err(Error::TokenAlreadySpent)
-                }
-                err => Err(Error::Database(err)),
-            };
-        }
-        self.check_ys_spendable(&input_ys, State::Pending).await?;
+        let mut proof_writer =
+            ProofWriter::new(self.localstore.clone(), self.pubsub_manager.clone());
+        let input_ys = proof_writer
+            .add_proofs(&mut tx, swap_request.inputs())
+            .await?;
 
         let mut promises = Vec::with_capacity(swap_request.outputs().len());
 
@@ -47,35 +37,23 @@ impl Mint {
             promises.push(blinded_signature);
         }
 
-        // TODO: It may be possible to have a race condition, that's why an error when changing the
-        // state can be converted to a TokenAlreadySpent error.
-        //
-        // A concept of transaction/writer for the Database trait would eliminate this problem and
-        // will remove all the "reset" codebase, resulting in fewer lines of code, and less
-        // error-prone database updates
-        self.localstore
-            .update_proofs_states(&input_ys, State::Spent)
-            .await
-            .map_err(|e| match e {
-                cdk_database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent,
-                e => e.into(),
-            })?;
+        proof_writer
+            .update_proofs_states(&mut tx, &input_ys, State::Spent)
+            .await?;
 
-        for pub_key in input_ys {
-            self.pubsub_manager.proof_state((pub_key, State::Spent));
-        }
+        tx.add_blind_signatures(
+            &swap_request
+                .outputs()
+                .iter()
+                .map(|o| o.blinded_secret)
+                .collect::<Vec<PublicKey>>(),
+            &promises,
+            None,
+        )
+        .await?;
 
-        self.localstore
-            .add_blind_signatures(
-                &swap_request
-                    .outputs()
-                    .iter()
-                    .map(|o| o.blinded_secret)
-                    .collect::<Vec<PublicKey>>(),
-                &promises,
-                None,
-            )
-            .await?;
+        proof_writer.commit();
+        tx.commit().await?;
 
         Ok(SwapResponse::new(promises))
     }

+ 11 - 5
crates/cdk/src/mint/verification.rs

@@ -4,6 +4,7 @@ use cdk_common::{Amount, BlindedMessage, CurrencyUnit, Id, Proofs, ProofsMethods
 use tracing::instrument;
 
 use super::{Error, Mint};
+use crate::cdk_database;
 
 /// Verification result
 #[derive(Debug, Clone, Hash, PartialEq, Eq)]
@@ -149,12 +150,12 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn check_output_already_signed(
         &self,
+        tx: &mut Box<dyn cdk_database::MintTransaction<'_, cdk_database::Error> + Send + Sync + '_>,
         outputs: &[BlindedMessage],
     ) -> Result<(), Error> {
         let blinded_messages: Vec<PublicKey> = outputs.iter().map(|o| o.blinded_secret).collect();
 
-        if self
-            .localstore
+        if tx
             .get_blind_signatures(&blinded_messages)
             .await?
             .iter()
@@ -173,7 +174,11 @@ impl Mint {
     /// Verifies outputs
     /// Checks outputs are unique, of the same unit and not signed before
     #[instrument(skip_all)]
-    pub async fn verify_outputs(&self, outputs: &[BlindedMessage]) -> Result<Verification, Error> {
+    pub async fn verify_outputs(
+        &self,
+        tx: &mut Box<dyn cdk_database::MintTransaction<'_, cdk_database::Error> + Send + Sync + '_>,
+        outputs: &[BlindedMessage],
+    ) -> Result<Verification, Error> {
         if outputs.is_empty() {
             return Ok(Verification {
                 amount: Amount::ZERO,
@@ -182,7 +187,7 @@ impl Mint {
         }
 
         Mint::check_outputs_unique(outputs)?;
-        self.check_output_already_signed(outputs).await?;
+        self.check_output_already_signed(tx, outputs).await?;
 
         let unit = self.verify_outputs_keyset(outputs).await?;
 
@@ -215,10 +220,11 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn verify_transaction_balanced(
         &self,
+        tx: &mut Box<dyn cdk_database::MintTransaction<'_, cdk_database::Error> + Send + Sync + '_>,
         inputs: &Proofs,
         outputs: &[BlindedMessage],
     ) -> Result<(), Error> {
-        let output_verification = self.verify_outputs(outputs).await.map_err(|err| {
+        let output_verification = self.verify_outputs(tx, outputs).await.map_err(|err| {
             tracing::debug!("Output verification failed: {:?}", err);
             err
         })?;

+ 1 - 1
misc/itests.sh

@@ -210,7 +210,7 @@ if [ $? -ne 0 ]; then
 fi
 
 echo "Running happy_path_mint_wallet test with CLN mint"
-cargo test -p cdk-integration-tests --test happy_path_mint_wallet test_happy_mint_melt_round_trip
+cargo test -p cdk-integration-tests --test happy_path_mint_wallet
 if [ $? -ne 0 ]; then
     echo "happy_path_mint_wallet test failed, exiting"
     exit 1

Vissa filer visades inte eftersom för många filer har ändrats