Cesar Rodas 1 month ago
parent
commit
071374dc1b

+ 95 - 35
crates/cdk-common/src/database/mint.rs

@@ -14,29 +14,113 @@ use crate::nuts::{
     Proofs, PublicKey, State,
 };
 
+/// Database Writer
+///
+/// This trait is the only way to update the database, in a atomic way, from the Rust side, making
+/// sure that on commit all changes happen or none.
+///
+/// Every record read or updated by this Writer should be locked exclusively until the Writer is
+/// consumed, either by commit or rollback.
+///
+/// On Drop, if unless commit() was called explicitly, the changes are expected to be rolled back.
+#[async_trait]
+pub trait Transaction: Send + Sync {
+    /// Add [`MintMintQuote`]
+    async fn add_mint_quote(&mut self, quote: MintMintQuote) -> Result<(), Error>;
+
+    /// Get [`MintMintQuote`]
+    ///
+    /// While this Writer object is in scope the quote should be locked exclusively
+    async fn get_mint_quote(&mut self, quote_id: &Uuid) -> Result<Option<MintMintQuote>, Error>;
+
+    /// Get all [`MintMintQuote`]s
+    async fn get_mint_quote_by_request(
+        &mut self,
+        request: &str,
+    ) -> Result<Option<MintMintQuote>, Error>;
+
+    /// Get all [`MintMintQuote`]s
+    async fn get_mint_quote_by_request_lookup_id(
+        &mut self,
+        request_lookup_id: &str,
+    ) -> Result<Option<MintMintQuote>, Error>;
+
+    /// Update state of [`MintMintQuote`]
+    async fn update_mint_quote_state(
+        &mut self,
+        quote_id: &Uuid,
+        state: MintQuoteState,
+    ) -> Result<MintQuoteState, Error>;
+
+    /// Add  [`Proofs`]
+    async fn add_proofs(&mut self, proof: Proofs, quote_id: Option<Uuid>) -> Result<(), Error>;
+
+    /// Get [`Proofs`] state
+    async fn update_proofs_states(
+        &mut self,
+        ys: &[PublicKey],
+        proofs_state: State,
+    ) -> Result<Vec<Option<State>>, Error>;
+
+    /// Get [`BlindSignature`]s and lock them exclusively until the Writer is dropped
+    async fn get_blind_signatures(
+        &mut self,
+        blinded_messages: &[PublicKey],
+    ) -> Result<Vec<Option<BlindSignature>>, Error>;
+
+    /// Add [`BlindSignature`]
+    async fn add_blind_signatures(
+        &mut self,
+        blinded_messages: &[PublicKey],
+        blind_signatures: &[BlindSignature],
+        quote_id: Option<Uuid>,
+    ) -> Result<(), Error>;
+
+    /// Get melt request
+    async fn get_melt_request(
+        &mut self,
+        quote_id: &Uuid,
+    ) -> Result<Option<(MeltBolt11Request<Uuid>, LnKey)>, Error>;
+
+    /// Get [`mint::MeltQuote`]
+    ///
+    /// While this Writer object is in scope the quote should be locked exclusively
+    async fn get_melt_quote(&mut self, quote_id: &Uuid) -> Result<Option<mint::MeltQuote>, Error>;
+
+    /// Update [`mint::MeltQuote`] state
+    async fn update_melt_quote_state(
+        &mut self,
+        quote_id: &Uuid,
+        state: MeltQuoteState,
+    ) -> Result<MeltQuoteState, Error>;
+
+    /// Consumes the Writer and commit the changes
+    async fn commit(self: Box<Self>) -> Result<(), Error>;
+
+    /// Consumes the Writer and rollback the changes
+    async fn rollback(self: Box<Self>) -> Result<(), Error>;
+}
+
 /// Mint Database trait
 #[async_trait]
 pub trait Database {
     /// Mint Database Error
     type Err: Into<Error> + From<Error>;
 
+    /// Get a Database Writer
+    async fn begin_transaction(&self) -> Result<Box<dyn Transaction>, Self::Err>;
+
     /// Add Active Keyset
     async fn set_active_keyset(&self, unit: CurrencyUnit, id: Id) -> Result<(), Self::Err>;
     /// Get Active Keyset
+    ///
+    /// TODO: Refactor code to use `SignatoryManager` instead of the database
     async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err>;
     /// Get all Active Keyset
     async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err>;
 
-    /// Add [`MintMintQuote`]
-    async fn add_mint_quote(&self, quote: MintMintQuote) -> Result<(), Self::Err>;
     /// Get [`MintMintQuote`]
     async fn get_mint_quote(&self, quote_id: &Uuid) -> Result<Option<MintMintQuote>, Self::Err>;
-    /// Update state of [`MintMintQuote`]
-    async fn update_mint_quote_state(
-        &self,
-        quote_id: &Uuid,
-        state: MintQuoteState,
-    ) -> Result<MintQuoteState, Self::Err>;
     /// Get all [`MintMintQuote`]s
     async fn get_mint_quote_by_request(
         &self,
@@ -49,6 +133,7 @@ pub trait Database {
     ) -> Result<Option<MintMintQuote>, Self::Err>;
     /// Get Mint Quotes
     async fn get_mint_quotes(&self) -> Result<Vec<MintMintQuote>, Self::Err>;
+
     /// Remove [`MintMintQuote`]
     async fn remove_mint_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err>;
 
@@ -56,12 +141,6 @@ pub trait Database {
     async fn add_melt_quote(&self, quote: mint::MeltQuote) -> Result<(), Self::Err>;
     /// Get [`mint::MeltQuote`]
     async fn get_melt_quote(&self, quote_id: &Uuid) -> Result<Option<mint::MeltQuote>, Self::Err>;
-    /// Update [`mint::MeltQuote`] state
-    async fn update_melt_quote_state(
-        &self,
-        quote_id: &Uuid,
-        state: MeltQuoteState,
-    ) -> Result<MeltQuoteState, Self::Err>;
     /// Get all [`mint::MeltQuote`]s
     async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err>;
     /// Remove [`mint::MeltQuote`]
@@ -82,43 +161,24 @@ pub trait Database {
     /// Add [`MintKeySetInfo`]
     async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err>;
     /// Get [`MintKeySetInfo`]
+    /// TODO: Refactor code to use `SignatoryManager` instead of the database
     async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err>;
     /// Get [`MintKeySetInfo`]s
+    /// TODO: Refactor code to use `SignatoryManager` instead of the database
     async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err>;
 
-    /// Add  [`Proofs`]
-    async fn add_proofs(&self, proof: Proofs, quote_id: Option<Uuid>) -> Result<(), Self::Err>;
-    /// Remove [`Proofs`]
-    async fn remove_proofs(
-        &self,
-        ys: &[PublicKey],
-        quote_id: Option<Uuid>,
-    ) -> Result<(), Self::Err>;
     /// Get [`Proofs`] by ys
     async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err>;
     /// Get ys by quote id
     async fn get_proof_ys_by_quote_id(&self, quote_id: &Uuid) -> Result<Vec<PublicKey>, Self::Err>;
     /// Get [`Proofs`] state
     async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err>;
-    /// Get [`Proofs`] state
-    async fn update_proofs_states(
-        &self,
-        ys: &[PublicKey],
-        proofs_state: State,
-    ) -> Result<Vec<Option<State>>, Self::Err>;
     /// Get [`Proofs`] by state
     async fn get_proofs_by_keyset_id(
         &self,
         keyset_id: &Id,
     ) -> Result<(Proofs, Vec<Option<State>>), Self::Err>;
 
-    /// Add [`BlindSignature`]
-    async fn add_blind_signatures(
-        &self,
-        blinded_messages: &[PublicKey],
-        blind_signatures: &[BlindSignature],
-        quote_id: Option<Uuid>,
-    ) -> Result<(), Self::Err>;
     /// Get [`BlindSignature`]s
     async fn get_blind_signatures(
         &self,

+ 1 - 1
crates/cdk-common/src/database/mod.rs

@@ -6,7 +6,7 @@ mod mint;
 mod wallet;
 
 #[cfg(feature = "mint")]
-pub use mint::Database as MintDatabase;
+pub use mint::{Database as MintDatabase, Transaction as MintTransaction};
 #[cfg(feature = "wallet")]
 pub use wallet::Database as WalletDatabase;
 

+ 7 - 20
crates/cdk-mint-rpc/src/proto/server.rs

@@ -516,9 +516,9 @@ impl CdkMint for MintRPCServer {
         let state = MintQuoteState::from_str(&request.state)
             .map_err(|_| Status::invalid_argument("Invalid quote state".to_string()))?;
 
-        let mint_quote = self
-            .mint
-            .localstore
+        let mut tx = self.mint.localstore.begin_transaction().await?;
+
+        let mint_quote = db
             .get_mint_quote(&quote_id)
             .await
             .map_err(|_| Status::invalid_argument("Could not find quote".to_string()))?
@@ -527,32 +527,19 @@ impl CdkMint for MintRPCServer {
         match state {
             MintQuoteState::Paid => {
                 self.mint
-                    .pay_mint_quote(&mint_quote)
+                    .pay_mint_quote(&mut tx, &mint_quote)
                     .await
                     .map_err(|_| Status::internal("Could not find quote".to_string()))?;
             }
             _ => {
-                let mut mint_quote = mint_quote;
-
-                mint_quote.state = state;
-
-                self.mint
-                    .update_mint_quote(mint_quote)
-                    .await
-                    .map_err(|_| Status::internal("Could not update quote".to_string()))?;
+                tx.update_mint_quote_state(&mint_quote.id, state).await?;
             }
         }
 
-        let mint_quote = self
-            .mint
-            .localstore
-            .get_mint_quote(&quote_id)
-            .await
-            .map_err(|_| Status::invalid_argument("Could not find quote".to_string()))?
-            .ok_or(Status::invalid_argument("Could not find quote".to_string()))?;
+        tx.commit().await?;
 
         Ok(Response::new(UpdateNut04QuoteRequest {
-            state: mint_quote.state.to_string(),
+            state: state.to_string(),
             quote_id: mint_quote.id.to_string(),
         }))
     }

+ 421 - 195
crates/cdk/src/cdk_database/mint_memory.rs

@@ -1,15 +1,18 @@
 //! Mint in memory database
 
 use std::collections::HashMap;
+use std::sync::atomic::AtomicU64;
 use std::sync::Arc;
+use std::time::Duration;
 
 use async_trait::async_trait;
 use cdk_common::common::QuoteTTL;
-use cdk_common::database::{Error, MintDatabase};
+use cdk_common::database::{Error, MintDatabase, MintTransaction};
 use cdk_common::mint::MintKeySetInfo;
 use cdk_common::nut00::ProofsMethods;
 use cdk_common::MintInfo;
-use tokio::sync::{Mutex, RwLock};
+use tokio::sync::RwLock;
+use tokio::time::sleep;
 use uuid::Uuid;
 
 use crate::dhke::hash_to_curve;
@@ -21,22 +24,351 @@ use crate::nuts::{
 };
 use crate::types::LnKey;
 
+/// Macro to merge two `Arc<RwLock<HashMap<K, V>>>` where `map2` is drained into `map1`
+macro_rules! merge {
+    ($map1:expr, $map2:expr) => {{
+        let mut map1_lock = $map1.write().await;
+        let mut map2_lock = $map2.write().await;
+
+        for (k, v) in map2_lock.drain() {
+            map1_lock.insert(k, v);
+        }
+    }};
+}
+
+#[derive(Debug, Default)]
+#[allow(clippy::type_complexity)]
+struct MemoryStorage {
+    active_keysets: RwLock<HashMap<CurrencyUnit, Id>>,
+    keysets: RwLock<HashMap<Id, MintKeySetInfo>>,
+    mint_quotes: RwLock<HashMap<Uuid, MintQuote>>,
+    melt_quotes: RwLock<HashMap<Uuid, mint::MeltQuote>>,
+    proofs: RwLock<HashMap<[u8; 33], Proof>>,
+    proof_state: RwLock<HashMap<[u8; 33], nut07::State>>,
+    quote_proofs: RwLock<HashMap<Uuid, Vec<PublicKey>>>,
+    blinded_signatures: RwLock<HashMap<[u8; 33], BlindSignature>>,
+    quote_signatures: RwLock<HashMap<Uuid, Vec<BlindSignature>>>,
+    melt_requests: RwLock<HashMap<Uuid, (MeltBolt11Request<Uuid>, LnKey)>>,
+    mint_info: RwLock<MintInfo>,
+    quote_ttl: RwLock<QuoteTTL>,
+}
+
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
+enum AnyId {
+    MintQuote(Uuid),
+    MeltQuote(Uuid),
+    BlindSignature(PublicKey),
+}
+
+/// Poor man's concurrent access manager
+#[derive(Debug, Default)]
+struct AccessManager(RwLock<HashMap<AnyId, u64>>);
+
+impl AccessManager {
+    /// Lock a resource for exclusive access
+    ///
+    /// If the resource is already locked, it will wait until it is unlocked. Since this
+    /// implementation is mainly for testing, it is not optimized for performance. In a real-world
+    /// scenario, a more sophisticated releasing mechanism should be used to avoid CPU overhead.
+    pub async fn lock(&self, resource_id: AnyId, writer_id: u64) {
+        loop {
+            let mut write = self.0.write().await;
+            match write.get(&resource_id) {
+                Some(lock_writer_id) if *lock_writer_id == writer_id => break,
+                None => {
+                    write.insert(resource_id.clone(), writer_id);
+                    break;
+                }
+                _ => {}
+            }
+            drop(write);
+            sleep(Duration::from_nanos(10)).await;
+        }
+    }
+
+    /// Access a resource for reading, if it is locked, it will wait until it is unlocked.
+    ///
+    /// Since this implementation is mainly for testing, it will not add a read-lock to the
+    /// resource. In a real-world scenario an Read-Write lock should be used.
+    pub async fn access(&self, resource_id: AnyId) {
+        loop {
+            let read = self.0.read().await;
+            let lock_reader_id = read.get(&resource_id).cloned();
+            if lock_reader_id.is_none() {
+                break;
+            }
+            drop(read);
+            sleep(Duration::from_nanos(10)).await;
+        }
+    }
+
+    pub async fn release(&self, writer_id: u64) {
+        let mut write = self.0.write().await;
+        write.retain(|_, v| *v != writer_id);
+    }
+}
+
 /// Mint Memory Database
 #[derive(Debug, Clone, Default)]
-#[allow(clippy::type_complexity)]
 pub struct MintMemoryDatabase {
-    active_keysets: Arc<RwLock<HashMap<CurrencyUnit, Id>>>,
-    keysets: Arc<RwLock<HashMap<Id, MintKeySetInfo>>>,
-    mint_quotes: Arc<RwLock<HashMap<Uuid, MintQuote>>>,
-    melt_quotes: Arc<RwLock<HashMap<Uuid, mint::MeltQuote>>>,
-    proofs: Arc<RwLock<HashMap<[u8; 33], Proof>>>,
-    proof_state: Arc<Mutex<HashMap<[u8; 33], nut07::State>>>,
-    quote_proofs: Arc<Mutex<HashMap<Uuid, Vec<PublicKey>>>>,
-    blinded_signatures: Arc<RwLock<HashMap<[u8; 33], BlindSignature>>>,
-    quote_signatures: Arc<RwLock<HashMap<Uuid, Vec<BlindSignature>>>>,
-    melt_requests: Arc<RwLock<HashMap<Uuid, (MeltBolt11Request<Uuid>, LnKey)>>>,
-    mint_info: Arc<RwLock<MintInfo>>,
-    quote_ttl: Arc<RwLock<QuoteTTL>>,
+    /// Storage
+    inner: Arc<MemoryStorage>,
+    /// Exclusive access list, where transaction can lock Ids for exclusive access
+    /// until they either commit or rollback
+    exclusive_access_manager: Arc<AccessManager>,
+    writer_index: Arc<AtomicU64>,
+}
+
+/// Writer for the [`MintMemoryDatabase`]
+pub struct MintMemoryWriter {
+    exclusive_access_manager: Arc<AccessManager>,
+    inner: Arc<MemoryStorage>,
+    changes: MemoryStorage,
+    id: u64,
+}
+
+#[async_trait]
+impl MintTransaction for MintMemoryWriter {
+    async fn get_mint_quote(&mut self, quote_id: &Uuid) -> Result<Option<MintQuote>, Error> {
+        self.exclusive_access_manager
+            .lock(AnyId::MintQuote(quote_id.to_owned()), self.id)
+            .await;
+
+        if let Some(quote) = self.changes.mint_quotes.read().await.get(quote_id) {
+            return Ok(Some(quote.clone()));
+        }
+
+        Ok(self.inner.mint_quotes.read().await.get(quote_id).cloned())
+    }
+
+    async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Error> {
+        self.exclusive_access_manager
+            .lock(AnyId::MintQuote(quote.id.clone()), self.id)
+            .await;
+        self.changes
+            .mint_quotes
+            .write()
+            .await
+            .insert(quote.id, quote);
+        Ok(())
+    }
+
+    async fn get_melt_request(
+        &mut self,
+        quote_id: &Uuid,
+    ) -> Result<Option<(MeltBolt11Request<Uuid>, LnKey)>, Error> {
+        let melt_requests = self.inner.melt_requests.read().await;
+        let melt_request = melt_requests.get(quote_id).cloned();
+
+        if let Some((request, _)) = &melt_request {
+            self.exclusive_access_manager
+                .lock(AnyId::MeltQuote(request.quote), self.id)
+                .await;
+        }
+
+        Ok(melt_request)
+    }
+
+    async fn get_blind_signatures(
+        &mut self,
+        blinded_messages: &[PublicKey],
+    ) -> Result<Vec<Option<BlindSignature>>, Error> {
+        let mut signatures = Vec::with_capacity(blinded_messages.len());
+
+        let blinded_signatures = self.inner.blinded_signatures.read().await;
+
+        for blinded_message in blinded_messages {
+            let signature = blinded_signatures.get(&blinded_message.to_bytes()).cloned();
+
+            self.exclusive_access_manager
+                .lock(AnyId::BlindSignature(*blinded_message), self.id)
+                .await;
+
+            signatures.push(signature)
+        }
+
+        Ok(signatures)
+    }
+
+    async fn get_mint_quote_by_request_lookup_id(
+        &mut self,
+        request: &str,
+    ) -> Result<Option<MintQuote>, Error> {
+        let result = self
+            .inner
+            .mint_quotes
+            .read()
+            .await
+            .values()
+            .filter(|q| q.request_lookup_id.eq(request))
+            .next()
+            .cloned();
+
+        if let Some(quote) = &result {
+            self.exclusive_access_manager
+                .lock(AnyId::MintQuote(quote.id), self.id)
+                .await;
+        }
+
+        Ok(result)
+    }
+
+    async fn get_mint_quote_by_request(&self, request: &str) -> Result<Option<MintQuote>, Error> {
+        let result = self
+            .inner
+            .mint_quotes
+            .read()
+            .await
+            .values()
+            .filter(|q| q.request.eq(request))
+            .next()
+            .cloned();
+
+        if let Some(quote) = &result {
+            self.exclusive_access_manager
+                .lock(AnyId::MintQuote(quote.id), self.id)
+                .await;
+        }
+
+        Ok(result)
+    }
+
+    async fn update_mint_quote_state(
+        &mut self,
+        quote_id: &Uuid,
+        state: MintQuoteState,
+    ) -> Result<MintQuoteState, Error> {
+        let mut quote = self
+            .get_mint_quote(quote_id)
+            .await?
+            .ok_or(Error::UnknownQuote)?;
+
+        let current_state = quote.state;
+        quote.state = state;
+
+        self.changes
+            .mint_quotes
+            .write()
+            .await
+            .insert(*quote_id, quote.clone());
+
+        Ok(current_state)
+    }
+
+    async fn add_blind_signatures(
+        &mut self,
+        blinded_message: &[PublicKey],
+        blind_signatures: &[BlindSignature],
+        quote_id: Option<Uuid>,
+    ) -> Result<(), Error> {
+        let mut current_blinded_signatures = self.changes.blinded_signatures.write().await;
+
+        for (blinded_message, blind_signature) in blinded_message.iter().zip(blind_signatures) {
+            current_blinded_signatures.insert(blinded_message.to_bytes(), blind_signature.clone());
+        }
+
+        if let Some(quote_id) = quote_id {
+            let mut current_quote_signatures = self.inner.quote_signatures.write().await;
+            current_quote_signatures.insert(quote_id, blind_signatures.to_vec());
+        }
+
+        Ok(())
+    }
+
+    async fn add_proofs(&mut self, proofs: Proofs, quote_id: Option<Uuid>) -> Result<(), Error> {
+        let mut db_proofs = self.inner.proofs.write().await;
+
+        let mut ys = Vec::with_capacity(proofs.capacity());
+
+        for proof in proofs {
+            let y = hash_to_curve(&proof.secret.to_bytes())?;
+            ys.push(y);
+
+            let y = y.to_bytes();
+
+            db_proofs.insert(y, proof);
+        }
+
+        if let Some(quote_id) = quote_id {
+            let mut db_quote_proofs = self.inner.quote_proofs.write().await;
+
+            db_quote_proofs.insert(quote_id, ys);
+        }
+
+        Ok(())
+    }
+
+    async fn update_melt_quote_state(
+        &mut self,
+        quote_id: &Uuid,
+        state: MeltQuoteState,
+    ) -> Result<MeltQuoteState, Error> {
+        let mut melt_quotes = self.inner.melt_quotes.write().await;
+
+        let mut quote = melt_quotes
+            .get(quote_id)
+            .cloned()
+            .ok_or(Error::UnknownQuote)?;
+
+        let current_state = quote.state;
+
+        quote.state = state;
+
+        melt_quotes.insert(*quote_id, quote.clone());
+
+        Ok(current_state)
+    }
+
+    async fn get_melt_quote(&mut self, quote_id: &Uuid) -> Result<Option<mint::MeltQuote>, Error> {
+        let melt_quote = self.inner.melt_quotes.read().await.get(quote_id).cloned();
+        if let Some(quote) = &melt_quote {
+            self.exclusive_access_manager
+                .lock(AnyId::MeltQuote(quote.id), self.id)
+                .await;
+        }
+        Ok(melt_quote)
+    }
+
+    async fn update_proofs_states(
+        &mut self,
+        ys: &[PublicKey],
+        proof_state: State,
+    ) -> Result<Vec<Option<State>>, Error> {
+        let mut proofs_states = self.inner.proof_state.write().await;
+
+        let mut states = Vec::new();
+
+        for y in ys {
+            let state = proofs_states.insert(y.to_bytes(), proof_state);
+            states.push(state);
+        }
+
+        Ok(states)
+    }
+
+    /// Consumes the Writer and commit the changes
+    async fn commit(mut self: Box<Self>) -> Result<(), Error> {
+        merge!(self.inner.keysets, self.changes.keysets);
+        merge!(self.inner.mint_quotes, self.changes.mint_quotes);
+        merge!(self.inner.melt_quotes, self.changes.melt_quotes);
+        merge!(self.inner.proofs, self.changes.proofs);
+        merge!(
+            self.inner.blinded_signatures,
+            self.changes.blinded_signatures
+        );
+        merge!(self.inner.quote_proofs, self.changes.quote_proofs);
+        merge!(self.inner.quote_signatures, self.changes.quote_signatures);
+        merge!(self.inner.melt_requests, self.changes.melt_requests);
+
+        self.exclusive_access_manager.release(self.id).await;
+        todo!()
+    }
+
+    /// Consumes the Writer and rollback the changes
+    async fn rollback(self: Box<Self>) -> Result<(), Error> {
+        self.exclusive_access_manager.release(self.id).await;
+        Ok(())
+    }
 }
 
 impl MintMemoryDatabase {
@@ -77,24 +409,22 @@ impl MintMemoryDatabase {
             .collect();
 
         Ok(Self {
-            active_keysets: Arc::new(RwLock::new(active_keysets)),
-            keysets: Arc::new(RwLock::new(
-                keysets.into_iter().map(|k| (k.id, k)).collect(),
-            )),
-            mint_quotes: Arc::new(RwLock::new(
-                mint_quotes.into_iter().map(|q| (q.id, q)).collect(),
-            )),
-            melt_quotes: Arc::new(RwLock::new(
-                melt_quotes.into_iter().map(|q| (q.id, q)).collect(),
-            )),
-            proofs: Arc::new(RwLock::new(proofs)),
-            proof_state: Arc::new(Mutex::new(proof_states)),
-            blinded_signatures: Arc::new(RwLock::new(blinded_signatures)),
-            quote_proofs: Arc::new(Mutex::new(quote_proofs)),
-            quote_signatures: Arc::new(RwLock::new(quote_signatures)),
-            melt_requests: Arc::new(RwLock::new(melt_requests)),
-            mint_info: Arc::new(RwLock::new(mint_info)),
-            quote_ttl: Arc::new(RwLock::new(quote_ttl)),
+            writer_index: Arc::new(0.into()),
+            exclusive_access_manager: Arc::new(AccessManager::default()),
+            inner: Arc::new(MemoryStorage {
+                active_keysets: RwLock::new(active_keysets),
+                keysets: RwLock::new(keysets.into_iter().map(|k| (k.id, k)).collect()),
+                mint_quotes: RwLock::new(mint_quotes.into_iter().map(|q| (q.id, q)).collect()),
+                melt_quotes: RwLock::new(melt_quotes.into_iter().map(|q| (q.id, q)).collect()),
+                proofs: RwLock::new(proofs),
+                proof_state: RwLock::new(proof_states),
+                blinded_signatures: RwLock::new(blinded_signatures),
+                quote_proofs: RwLock::new(quote_proofs),
+                quote_signatures: RwLock::new(quote_signatures),
+                melt_requests: RwLock::new(melt_requests),
+                mint_info: RwLock::new(mint_info),
+                quote_ttl: RwLock::new(quote_ttl),
+            }),
         })
     }
 }
@@ -103,60 +433,48 @@ impl MintMemoryDatabase {
 impl MintDatabase for MintMemoryDatabase {
     type Err = Error;
 
+    async fn begin_transaction(&self) -> Result<Box<dyn MintTransaction>, Self::Err> {
+        Ok(Box::new(MintMemoryWriter {
+            inner: self.inner.clone(),
+            exclusive_access_manager: self.exclusive_access_manager.clone(),
+            changes: MemoryStorage::default(),
+            id: self
+                .writer_index
+                .fetch_add(1, std::sync::atomic::Ordering::SeqCst),
+        }))
+    }
+
     async fn set_active_keyset(&self, unit: CurrencyUnit, id: Id) -> Result<(), Self::Err> {
-        self.active_keysets.write().await.insert(unit, id);
+        self.inner.active_keysets.write().await.insert(unit, id);
         Ok(())
     }
 
     async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
-        Ok(self.active_keysets.read().await.get(unit).cloned())
+        Ok(self.inner.active_keysets.read().await.get(unit).cloned())
     }
 
     async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
-        Ok(self.active_keysets.read().await.clone())
+        Ok(self.inner.active_keysets.read().await.clone())
     }
 
     async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err> {
-        self.keysets.write().await.insert(keyset.id, keyset);
+        self.inner.keysets.write().await.insert(keyset.id, keyset);
         Ok(())
     }
 
     async fn get_keyset_info(&self, keyset_id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
-        Ok(self.keysets.read().await.get(keyset_id).cloned())
+        Ok(self.inner.keysets.read().await.get(keyset_id).cloned())
     }
 
     async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
-        Ok(self.keysets.read().await.values().cloned().collect())
-    }
-
-    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> {
-        self.mint_quotes.write().await.insert(quote.id, quote);
-        Ok(())
+        Ok(self.inner.keysets.read().await.values().cloned().collect())
     }
 
     async fn get_mint_quote(&self, quote_id: &Uuid) -> Result<Option<MintQuote>, Self::Err> {
-        Ok(self.mint_quotes.read().await.get(quote_id).cloned())
-    }
-
-    async fn update_mint_quote_state(
-        &self,
-        quote_id: &Uuid,
-        state: MintQuoteState,
-    ) -> Result<MintQuoteState, Self::Err> {
-        let mut mint_quotes = self.mint_quotes.write().await;
-
-        let mut quote = mint_quotes
-            .get(quote_id)
-            .cloned()
-            .ok_or(Error::UnknownQuote)?;
-
-        let current_state = quote.state;
-
-        quote.state = state;
-
-        mint_quotes.insert(*quote_id, quote.clone());
-
-        Ok(current_state)
+        self.exclusive_access_manager
+            .access(AnyId::MintQuote(quote_id.to_owned()))
+            .await;
+        Ok(self.inner.mint_quotes.read().await.get(quote_id).cloned())
     }
 
     async fn get_mint_quote_by_request_lookup_id(
@@ -191,51 +509,44 @@ impl MintDatabase for MintMemoryDatabase {
     }
 
     async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
-        Ok(self.mint_quotes.read().await.values().cloned().collect())
+        Ok(self
+            .inner
+            .mint_quotes
+            .read()
+            .await
+            .values()
+            .cloned()
+            .collect())
     }
 
     async fn remove_mint_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> {
-        self.mint_quotes.write().await.remove(quote_id);
+        self.inner.mint_quotes.write().await.remove(quote_id);
 
         Ok(())
     }
 
     async fn add_melt_quote(&self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
-        self.melt_quotes.write().await.insert(quote.id, quote);
+        self.inner.melt_quotes.write().await.insert(quote.id, quote);
         Ok(())
     }
 
     async fn get_melt_quote(&self, quote_id: &Uuid) -> Result<Option<mint::MeltQuote>, Self::Err> {
-        Ok(self.melt_quotes.read().await.get(quote_id).cloned())
-    }
-
-    async fn update_melt_quote_state(
-        &self,
-        quote_id: &Uuid,
-        state: MeltQuoteState,
-    ) -> Result<MeltQuoteState, Self::Err> {
-        let mut melt_quotes = self.melt_quotes.write().await;
-
-        let mut quote = melt_quotes
-            .get(quote_id)
-            .cloned()
-            .ok_or(Error::UnknownQuote)?;
-
-        let current_state = quote.state;
-
-        quote.state = state;
-
-        melt_quotes.insert(*quote_id, quote.clone());
-
-        Ok(current_state)
+        Ok(self.inner.melt_quotes.read().await.get(quote_id).cloned())
     }
 
     async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
-        Ok(self.melt_quotes.read().await.values().cloned().collect())
+        Ok(self
+            .inner
+            .melt_quotes
+            .read()
+            .await
+            .values()
+            .cloned()
+            .collect())
     }
 
     async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> {
-        self.melt_quotes.write().await.remove(quote_id);
+        self.inner.melt_quotes.write().await.remove(quote_id);
 
         Ok(())
     }
@@ -245,7 +556,7 @@ impl MintDatabase for MintMemoryDatabase {
         melt_request: MeltBolt11Request<Uuid>,
         ln_key: LnKey,
     ) -> Result<(), Self::Err> {
-        let mut melt_requests = self.melt_requests.write().await;
+        let mut melt_requests = self.inner.melt_requests.write().await;
         melt_requests.insert(melt_request.quote, (melt_request, ln_key));
         Ok(())
     }
@@ -254,67 +565,15 @@ impl MintDatabase for MintMemoryDatabase {
         &self,
         quote_id: &Uuid,
     ) -> Result<Option<(MeltBolt11Request<Uuid>, LnKey)>, Self::Err> {
-        let melt_requests = self.melt_requests.read().await;
+        let melt_requests = self.inner.melt_requests.read().await;
 
         let melt_request = melt_requests.get(quote_id);
 
         Ok(melt_request.cloned())
     }
 
-    async fn add_proofs(&self, proofs: Proofs, quote_id: Option<Uuid>) -> Result<(), Self::Err> {
-        let mut db_proofs = self.proofs.write().await;
-
-        let mut ys = Vec::with_capacity(proofs.capacity());
-
-        for proof in proofs {
-            let y = hash_to_curve(&proof.secret.to_bytes())?;
-            ys.push(y);
-
-            let y = y.to_bytes();
-
-            db_proofs.insert(y, proof);
-        }
-
-        if let Some(quote_id) = quote_id {
-            let mut db_quote_proofs = self.quote_proofs.lock().await;
-
-            db_quote_proofs.insert(quote_id, ys);
-        }
-
-        Ok(())
-    }
-
-    async fn remove_proofs(
-        &self,
-        ys: &[PublicKey],
-        quote_id: Option<Uuid>,
-    ) -> Result<(), Self::Err> {
-        {
-            let mut db_proofs = self.proofs.write().await;
-
-            ys.iter().for_each(|y| {
-                db_proofs.remove(&y.to_bytes());
-            });
-        }
-
-        {
-            let mut db_proofs_state = self.proof_state.lock().await;
-
-            ys.iter().for_each(|y| {
-                db_proofs_state.remove(&y.to_bytes());
-            });
-        }
-
-        if let Some(quote_id) = quote_id {
-            let mut quote_proofs = self.quote_proofs.lock().await;
-            quote_proofs.remove(&quote_id);
-        }
-
-        Ok(())
-    }
-
     async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
-        let spent_proofs = self.proofs.read().await;
+        let spent_proofs = self.inner.proofs.read().await;
 
         let mut proofs = Vec::with_capacity(ys.len());
 
@@ -328,7 +587,7 @@ impl MintDatabase for MintMemoryDatabase {
     }
 
     async fn get_proof_ys_by_quote_id(&self, quote_id: &Uuid) -> Result<Vec<PublicKey>, Self::Err> {
-        let quote_proofs = &__self.quote_proofs.lock().await;
+        let quote_proofs = &self.inner.quote_proofs.write().await;
 
         match quote_proofs.get(quote_id) {
             Some(ys) => Ok(ys.clone()),
@@ -336,25 +595,8 @@ impl MintDatabase for MintMemoryDatabase {
         }
     }
 
-    async fn update_proofs_states(
-        &self,
-        ys: &[PublicKey],
-        proof_state: State,
-    ) -> Result<Vec<Option<State>>, Self::Err> {
-        let mut proofs_states = self.proof_state.lock().await;
-
-        let mut states = Vec::new();
-
-        for y in ys {
-            let state = proofs_states.insert(y.to_bytes(), proof_state);
-            states.push(state);
-        }
-
-        Ok(states)
-    }
-
     async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
-        let proofs_states = self.proof_state.lock().await;
+        let proofs_states = self.inner.proof_state.write().await;
 
         let mut states = Vec::new();
 
@@ -370,7 +612,7 @@ impl MintDatabase for MintMemoryDatabase {
         &self,
         keyset_id: &Id,
     ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
-        let proofs = self.proofs.read().await;
+        let proofs = self.inner.proofs.read().await;
 
         let proofs_for_id: Proofs = proofs
             .iter()
@@ -390,37 +632,21 @@ impl MintDatabase for MintMemoryDatabase {
         Ok((proofs_for_id, states))
     }
 
-    async fn add_blind_signatures(
-        &self,
-        blinded_message: &[PublicKey],
-        blind_signatures: &[BlindSignature],
-        quote_id: Option<Uuid>,
-    ) -> Result<(), Self::Err> {
-        let mut current_blinded_signatures = self.blinded_signatures.write().await;
-
-        for (blinded_message, blind_signature) in blinded_message.iter().zip(blind_signatures) {
-            current_blinded_signatures.insert(blinded_message.to_bytes(), blind_signature.clone());
-        }
-
-        if let Some(quote_id) = quote_id {
-            let mut current_quote_signatures = self.quote_signatures.write().await;
-            current_quote_signatures.insert(quote_id, blind_signatures.to_vec());
-        }
-
-        Ok(())
-    }
-
     async fn get_blind_signatures(
         &self,
         blinded_messages: &[PublicKey],
     ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
         let mut signatures = Vec::with_capacity(blinded_messages.len());
 
-        let blinded_signatures = self.blinded_signatures.read().await;
+        let blinded_signatures = self.inner.blinded_signatures.read().await;
 
         for blinded_message in blinded_messages {
             let signature = blinded_signatures.get(&blinded_message.to_bytes()).cloned();
 
+            self.exclusive_access_manager
+                .access(AnyId::BlindSignature(*blinded_message))
+                .await;
+
             signatures.push(signature)
         }
 
@@ -431,7 +657,7 @@ impl MintDatabase for MintMemoryDatabase {
         &self,
         keyset_id: &Id,
     ) -> Result<Vec<BlindSignature>, Self::Err> {
-        let blinded_signatures = self.blinded_signatures.read().await;
+        let blinded_signatures = self.inner.blinded_signatures.read().await;
 
         Ok(blinded_signatures
             .values()
@@ -445,33 +671,33 @@ impl MintDatabase for MintMemoryDatabase {
         &self,
         quote_id: &Uuid,
     ) -> Result<Vec<BlindSignature>, Self::Err> {
-        let ys = self.quote_signatures.read().await;
+        let ys = self.inner.quote_signatures.read().await;
 
         Ok(ys.get(quote_id).cloned().unwrap_or_default())
     }
 
     async fn set_mint_info(&self, mint_info: MintInfo) -> Result<(), Self::Err> {
-        let mut current_mint_info = self.mint_info.write().await;
+        let mut current_mint_info = self.inner.mint_info.write().await;
 
         *current_mint_info = mint_info;
 
         Ok(())
     }
     async fn get_mint_info(&self) -> Result<MintInfo, Self::Err> {
-        let mint_info = self.mint_info.read().await;
+        let mint_info = self.inner.mint_info.read().await;
 
         Ok(mint_info.clone())
     }
 
     async fn set_quote_ttl(&self, quote_ttl: QuoteTTL) -> Result<(), Self::Err> {
-        let mut current_quote_ttl = self.quote_ttl.write().await;
+        let mut current_quote_ttl = self.inner.quote_ttl.write().await;
 
         *current_quote_ttl = quote_ttl;
 
         Ok(())
     }
     async fn get_quote_ttl(&self) -> Result<QuoteTTL, Self::Err> {
-        let quote_ttl = self.quote_ttl.read().await;
+        let quote_ttl = self.inner.quote_ttl.read().await;
 
         Ok(*quote_ttl)
     }

+ 3 - 4
crates/cdk/src/mint/check_spendable.rs

@@ -1,5 +1,6 @@
 use std::collections::HashSet;
 
+use cdk_common::database::MintTransaction;
 use tracing::instrument;
 
 use super::{CheckStateRequest, CheckStateResponse, Mint, ProofState, PublicKey, State};
@@ -38,13 +39,11 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn check_ys_spendable(
         &self,
+        tx: &mut Box<dyn MintTransaction>,
         ys: &[PublicKey],
         proof_state: State,
     ) -> Result<(), Error> {
-        let proofs_state = self
-            .localstore
-            .update_proofs_states(ys, proof_state)
-            .await?;
+        let proofs_state = tx.update_proofs_states(ys, proof_state).await?;
 
         let proofs_state = proofs_state.iter().flatten().collect::<HashSet<&State>>();
 

+ 42 - 119
crates/cdk/src/mint/melt.rs

@@ -2,6 +2,7 @@ use std::collections::HashSet;
 use std::str::FromStr;
 
 use anyhow::bail;
+use cdk_common::database::MintTransaction;
 use cdk_common::nut00::ProofsMethods;
 use cdk_common::MeltOptions;
 use lightning_invoice::Bolt11Invoice;
@@ -284,14 +285,15 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn verify_melt_request(
         &self,
+        tx: &mut Box<dyn MintTransaction>,
         melt_request: &MeltBolt11Request<Uuid>,
     ) -> Result<MeltQuote, Error> {
-        let state = self
-            .localstore
-            .update_melt_quote_state(&melt_request.quote, MeltQuoteState::Pending)
-            .await?;
+        let quote = tx
+            .get_melt_quote(&melt_request.quote)
+            .await?
+            .ok_or(Error::UnknownQuote)?;
 
-        match state {
+        match quote.state {
             MeltQuoteState::Unpaid | MeltQuoteState::Failed => Ok(()),
             MeltQuoteState::Pending => Err(Error::PendingQuote),
             MeltQuoteState::Paid => Err(Error::PaidQuote),
@@ -305,21 +307,15 @@ impl Mint {
             return Err(Error::DuplicateProofs);
         }
 
-        self.localstore
-            .add_proofs(melt_request.inputs.clone(), Some(melt_request.quote))
+        tx.add_proofs(melt_request.inputs.clone(), Some(melt_request.quote))
             .await?;
-        self.check_ys_spendable(&ys, State::Pending).await?;
+
+        self.check_ys_spendable(tx, &ys, State::Pending).await?;
 
         for proof in &melt_request.inputs {
             self.verify_proof(proof).await?;
         }
 
-        let quote = self
-            .localstore
-            .get_melt_quote(&melt_request.quote)
-            .await?
-            .ok_or(Error::UnknownQuote)?;
-
         let proofs_total = melt_request.proofs_amount()?;
 
         let fee = self.get_proofs_fee(&melt_request.inputs).await?;
@@ -395,38 +391,6 @@ impl Mint {
         Ok(quote)
     }
 
-    /// Process unpaid melt request
-    /// In the event that a melt request fails and the lighthing payment is not
-    /// made The [`Proofs`] should be returned to an unspent state and the
-    /// quote should be unpaid
-    #[instrument(skip_all)]
-    pub async fn process_unpaid_melt(
-        &self,
-        melt_request: &MeltBolt11Request<Uuid>,
-    ) -> Result<(), Error> {
-        let input_ys = melt_request.inputs.ys()?;
-
-        self.localstore
-            .remove_proofs(&input_ys, Some(melt_request.quote))
-            .await?;
-
-        self.localstore
-            .update_melt_quote_state(&melt_request.quote, MeltQuoteState::Unpaid)
-            .await?;
-
-        if let Ok(Some(quote)) = self.localstore.get_melt_quote(&melt_request.quote).await {
-            self.pubsub_manager
-                .melt_quote_status(&quote, None, None, MeltQuoteState::Unpaid);
-        }
-
-        for public_key in input_ys {
-            self.pubsub_manager
-                .proof_state((public_key, State::Unspent));
-        }
-
-        Ok(())
-    }
-
     /// Melt Bolt11
     #[instrument(skip_all)]
     pub async fn melt_bolt11(
@@ -455,37 +419,12 @@ impl Mint {
             }
         }
 
-        let quote = match self.verify_melt_request(melt_request).await {
-            Ok(quote) => quote,
-            Err(err) => {
-                tracing::debug!("Error attempting to verify melt quote: {}", err);
-
-                if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                    tracing::error!(
-                        "Could not reset melt quote {} state: {}",
-                        melt_request.quote,
-                        err
-                    );
-                }
-                return Err(err);
-            }
-        };
+        let mut tx = self.localstore.begin_transaction().await?;
 
-        let settled_internally_amount =
-            match self.handle_internal_melt_mint(&quote, melt_request).await {
-                Ok(amount) => amount,
-                Err(err) => {
-                    tracing::error!("Attempting to settle internally failed");
-                    if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                        tracing::error!(
-                            "Could not reset melt quote {} state: {}",
-                            melt_request.quote,
-                            err
-                        );
-                    }
-                    return Err(err);
-                }
-            };
+        let quote = self.verify_melt_request(&mut tx, melt_request).await?;
+        let settled_internally_amount = self
+            .handle_internal_melt_mint(&mut tx, &quote, melt_request)
+            .await?;
 
         let (preimage, amount_spent_quote_unit) = match settled_internally_amount {
             Some(amount_spent) => (None, amount_spent),
@@ -504,9 +443,6 @@ impl Mint {
                             Ok(amount) => amount,
                             Err(err) => {
                                 tracing::error!("Fee is not expected: {}", err);
-                                if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                                    tracing::error!("Could not reset melt quote state: {}", err);
-                                }
                                 return Err(Error::Internal);
                             }
                         }
@@ -521,10 +457,6 @@ impl Mint {
                     Some(ln) => ln,
                     None => {
                         tracing::info!("Could not get ln backend for {}, bolt11 ", quote.unit);
-                        if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                            tracing::error!("Could not reset melt quote state: {}", err);
-                        }
-
                         return Err(Error::UnsupportedUnit);
                     }
                 };
@@ -555,9 +487,6 @@ impl Mint {
                         // hold the proofs as pending to we reset them  and return an error.
                         if matches!(err, cdk_lightning::Error::InvoiceAlreadyPaid) {
                             tracing::debug!("Invoice already paid, resetting melt quote");
-                            if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                                tracing::error!("Could not reset melt quote state: {}", err);
-                            }
                             return Err(Error::RequestAlreadyPaid);
                         }
 
@@ -583,9 +512,6 @@ impl Mint {
                             "Lightning payment for quote {} failed.",
                             melt_request.quote
                         );
-                        if let Err(err) = self.process_unpaid_melt(melt_request).await {
-                            tracing::error!("Could not reset melt quote state: {}", err);
-                        }
                         return Err(Error::PaymentFailed);
                     }
                     MeltQuoteState::Pending => {
@@ -627,13 +553,15 @@ impl Mint {
         // If we made it here the payment has been made.
         // We process the melt burning the inputs and returning change
         let res = self
-            .process_melt_request(melt_request, preimage, amount_spent_quote_unit)
+            .process_melt_request(&mut tx, melt_request, preimage, amount_spent_quote_unit)
             .await
             .map_err(|err| {
                 tracing::error!("Could not process melt request: {}", err);
                 err
             })?;
 
+        tx.commit().await?;
+
         Ok(res)
     }
     /// Process melt request marking [`Proofs`] as spent
@@ -642,39 +570,24 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn process_melt_request(
         &self,
+        tx: &mut Box<dyn MintTransaction>,
         melt_request: &MeltBolt11Request<Uuid>,
         payment_preimage: Option<String>,
         total_spent: Amount,
     ) -> Result<MeltQuoteBolt11Response<Uuid>, Error> {
         tracing::debug!("Processing melt quote: {}", melt_request.quote);
 
-        let quote = self
-            .localstore
+        let quote = tx
             .get_melt_quote(&melt_request.quote)
             .await?
             .ok_or(Error::UnknownQuote)?;
 
         let input_ys = melt_request.inputs.ys()?;
 
-        self.localstore
-            .update_proofs_states(&input_ys, State::Spent)
+        tx.update_proofs_states(&input_ys, State::Spent).await?;
+        tx.update_melt_quote_state(&melt_request.quote, MeltQuoteState::Paid)
             .await?;
 
-        self.localstore
-            .update_melt_quote_state(&melt_request.quote, MeltQuoteState::Paid)
-            .await?;
-
-        self.pubsub_manager.melt_quote_status(
-            &quote,
-            payment_preimage.clone(),
-            None,
-            MeltQuoteState::Paid,
-        );
-
-        for public_key in input_ys {
-            self.pubsub_manager.proof_state((public_key, State::Spent));
-        }
-
         let mut change = None;
 
         // Check if there is change to return
@@ -724,21 +637,31 @@ impl Mint {
                     change_sigs.push(blinded_signature)
                 }
 
-                self.localstore
-                    .add_blind_signatures(
-                        &outputs[0..change_sigs.len()]
-                            .iter()
-                            .map(|o| o.blinded_secret)
-                            .collect::<Vec<PublicKey>>(),
-                        &change_sigs,
-                        Some(quote.id),
-                    )
-                    .await?;
+                tx.add_blind_signatures(
+                    &outputs[0..change_sigs.len()]
+                        .iter()
+                        .map(|o| o.blinded_secret)
+                        .collect::<Vec<PublicKey>>(),
+                    &change_sigs,
+                    Some(quote.id),
+                )
+                .await?;
 
                 change = Some(change_sigs);
             }
         }
 
+        self.pubsub_manager.melt_quote_status(
+            &quote,
+            payment_preimage.clone(),
+            None,
+            MeltQuoteState::Paid,
+        );
+
+        for public_key in input_ys {
+            self.pubsub_manager.proof_state((public_key, State::Spent));
+        }
+
         Ok(MeltQuoteBolt11Response {
             amount: quote.amount,
             paid: Some(true),

+ 35 - 51
crates/cdk/src/mint/mint_nut04.rs

@@ -1,5 +1,6 @@
 use std::collections::HashSet;
 
+use cdk_common::database::MintTransaction;
 use cdk_common::Id;
 use tracing::instrument;
 use uuid::Uuid;
@@ -122,7 +123,9 @@ impl Mint {
             create_invoice_response.request_lookup_id,
         );
 
-        self.localstore.add_mint_quote(quote.clone()).await?;
+        let mut tx = self.localstore.begin_transaction().await?;
+        tx.add_mint_quote(quote.clone()).await?;
+        tx.commit().await?;
 
         let quote: MintQuoteBolt11Response<Uuid> = quote.into();
 
@@ -161,13 +164,6 @@ impl Mint {
         })
     }
 
-    /// Update mint quote
-    #[instrument(skip_all)]
-    pub async fn update_mint_quote(&self, quote: MintQuote) -> Result<(), Error> {
-        self.localstore.add_mint_quote(quote).await?;
-        Ok(())
-    }
-
     /// Get mint quotes
     #[instrument(skip_all)]
     pub async fn mint_quotes(&self) -> Result<Vec<MintQuote>, Error> {
@@ -211,19 +207,24 @@ impl Mint {
         &self,
         request_lookup_id: &str,
     ) -> Result<(), Error> {
-        if let Ok(Some(mint_quote)) = self
-            .localstore
+        let mut tx = self.localstore.begin_transaction().await?;
+        if let Ok(Some(mint_quote)) = tx
             .get_mint_quote_by_request_lookup_id(request_lookup_id)
             .await
         {
-            self.pay_mint_quote(&mint_quote).await?;
+            self.pay_mint_quote(&mut tx, &mint_quote).await?;
+            tx.commit().await?;
         }
         Ok(())
     }
 
     /// Mark mint quote as paid
     #[instrument(skip_all)]
-    pub async fn pay_mint_quote(&self, mint_quote: &MintQuote) -> Result<(), Error> {
+    pub async fn pay_mint_quote(
+        &self,
+        tx: &mut Box<dyn MintTransaction>,
+        mint_quote: &MintQuote,
+    ) -> Result<(), Error> {
         tracing::debug!(
             "Received payment notification for mint quote {}",
             mint_quote.id
@@ -241,8 +242,7 @@ impl Mint {
                 return Err(Error::ExpiredQuote(mint_quote.expiry, unix_time));
             }
 
-            self.localstore
-                .update_mint_quote_state(&mint_quote.id, MintQuoteState::Paid)
+            tx.update_mint_quote_state(&mint_quote.id, MintQuoteState::Paid)
                 .await?;
         } else {
             tracing::debug!(
@@ -264,34 +264,22 @@ impl Mint {
         &self,
         mint_request: nut04::MintBolt11Request<Uuid>,
     ) -> Result<nut04::MintBolt11Response, Error> {
-        let mint_quote =
-            if let Some(mint_quote) = self.localstore.get_mint_quote(&mint_request.quote).await? {
-                mint_quote
-            } else {
-                return Err(Error::UnknownQuote);
-            };
-
-        let state = self
-            .localstore
-            .update_mint_quote_state(&mint_request.quote, MintQuoteState::Pending)
-            .await?;
+        let mut tx = self.localstore.begin_transaction().await?;
+
+        let mint_quote = if let Some(mint_quote) = tx.get_mint_quote(&mint_request.quote).await? {
+            mint_quote
+        } else {
+            return Err(Error::UnknownQuote);
+        };
 
-        match state {
+        match mint_quote.state {
             MintQuoteState::Unpaid => {
-                let _state = self
-                    .localstore
-                    .update_mint_quote_state(&mint_request.quote, MintQuoteState::Unpaid)
-                    .await?;
                 return Err(Error::UnpaidQuote);
             }
             MintQuoteState::Pending => {
                 return Err(Error::PendingQuote);
             }
             MintQuoteState::Issued => {
-                let _state = self
-                    .localstore
-                    .update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued)
-                    .await?;
                 return Err(Error::IssuedQuote);
             }
             MintQuoteState::Paid => (),
@@ -353,10 +341,6 @@ impl Mint {
                 mint_request.quote
             );
 
-            self.localstore
-                .update_mint_quote_state(&mint_request.quote, MintQuoteState::Paid)
-                .await?;
-
             return Err(Error::BlindedMessageAlreadySigned);
         }
 
@@ -367,21 +351,21 @@ impl Mint {
             blind_signatures.push(blind_signature);
         }
 
-        self.localstore
-            .add_blind_signatures(
-                &mint_request
-                    .outputs
-                    .iter()
-                    .map(|p| p.blinded_secret)
-                    .collect::<Vec<PublicKey>>(),
-                &blind_signatures,
-                Some(mint_request.quote),
-            )
+        tx.add_blind_signatures(
+            &mint_request
+                .outputs
+                .iter()
+                .map(|p| p.blinded_secret)
+                .collect::<Vec<PublicKey>>(),
+            &blind_signatures,
+            Some(mint_request.quote),
+        )
+        .await?;
+
+        tx.update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued)
             .await?;
 
-        self.localstore
-            .update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued)
-            .await?;
+        tx.commit().await?;
 
         self.pubsub_manager
             .mint_quote_bolt11_status(mint_quote, MintQuoteState::Issued);

+ 5 - 11
crates/cdk/src/mint/mod.rs

@@ -6,7 +6,7 @@ use std::sync::Arc;
 use bitcoin::bip32::{ChildNumber, DerivationPath, Xpriv};
 use bitcoin::secp256k1::{self, Secp256k1};
 use cdk_common::common::{LnKey, QuoteTTL};
-use cdk_common::database::{self, MintDatabase};
+use cdk_common::database::{self, MintDatabase, MintTransaction};
 use cdk_common::mint::MintKeySetInfo;
 use futures::StreamExt;
 use serde::{Deserialize, Serialize};
@@ -392,14 +392,11 @@ impl Mint {
     #[instrument(skip_all)]
     pub async fn handle_internal_melt_mint(
         &self,
+        tx: &mut Box<dyn MintTransaction>,
         melt_quote: &MeltQuote,
         melt_request: &MeltBolt11Request<Uuid>,
     ) -> Result<Option<Amount>, Error> {
-        let mint_quote = match self
-            .localstore
-            .get_mint_quote_by_request(&melt_quote.request)
-            .await
-        {
+        let mint_quote = match tx.get_mint_quote_by_request(&melt_quote.request).await {
             Ok(Some(mint_quote)) => mint_quote,
             // Not an internal melt -> mint
             Ok(None) => return Ok(None),
@@ -419,8 +416,6 @@ impl Mint {
             Error::AmountOverflow
         })?;
 
-        let mut mint_quote = mint_quote;
-
         if mint_quote.amount > inputs_amount_quote_unit {
             tracing::debug!(
                 "Not enough inuts provided: {} needed {}",
@@ -430,12 +425,11 @@ impl Mint {
             return Err(Error::InsufficientFunds);
         }
 
-        mint_quote.state = MintQuoteState::Paid;
+        tx.update_mint_quote_state(&mint_quote.id, MintQuoteState::Paid)
+            .await?;
 
         let amount = melt_quote.amount;
 
-        self.update_mint_quote(mint_quote).await?;
-
         Ok(Some(amount))
     }
 

+ 13 - 9
crates/cdk/src/mint/start_up_check.rs

@@ -19,6 +19,8 @@ impl Mint {
 
         unpaid_quotes.append(&mut pending_quotes);
 
+        let mut tx = self.localstore.begin_transaction().await?;
+
         for ln in self.ln.values() {
             for quote in unpaid_quotes.iter() {
                 tracing::debug!("Checking status of mint quote: {}", quote.id);
@@ -27,9 +29,7 @@ impl Mint {
                     Ok(state) => {
                         if state != quote.state {
                             tracing::trace!("Mint quote status changed: {}", quote.id);
-                            self.localstore
-                                .update_mint_quote_state(&quote.id, state)
-                                .await?;
+                            tx.update_mint_quote_state(&quote.id, state).await?;
                         }
                     }
 
@@ -40,6 +40,9 @@ impl Mint {
                 }
             }
         }
+
+        tx.commit().await?;
+
         Ok(())
     }
 
@@ -54,7 +57,8 @@ impl Mint {
 
         for pending_quote in pending_quotes {
             tracing::debug!("Checking status for melt quote {}.", pending_quote.id);
-            let melt_request_ln_key = self.localstore.get_melt_request(&pending_quote.id).await?;
+            let mut tx = self.localstore.begin_transaction().await?;
+            let melt_request_ln_key = tx.get_melt_request(&pending_quote.id).await?;
 
             let (melt_request, ln_key) = match melt_request_ln_key {
                 None => {
@@ -86,6 +90,7 @@ impl Mint {
                         MeltQuoteState::Paid => {
                             if let Err(err) = self
                                 .process_melt_request(
+                                    &mut tx,
                                     &melt_request,
                                     pay_invoice_response.payment_preimage,
                                     pay_invoice_response.total_spent,
@@ -107,9 +112,8 @@ impl Mint {
                                 "Lightning payment for quote {} failed.",
                                 pending_quote.id
                             );
-                            if let Err(err) = self.process_unpaid_melt(&melt_request).await {
-                                tracing::error!("Could not reset melt quote state: {}", err);
-                            }
+
+                            tx.rollback().await?;
                         }
                         MeltQuoteState::Pending => {
                             tracing::warn!(
@@ -127,9 +131,9 @@ impl Mint {
                         pending_quote.id
                     );
 
-                    self.localstore
-                        .update_melt_quote_state(&pending_quote.id, pay_invoice_response.status)
+                    tx.update_melt_quote_state(&pending_quote.id, pay_invoice_response.status)
                         .await?;
+                    tx.commit().await?;
                 }
             };
         }

+ 22 - 26
crates/cdk/src/mint/swap.rs

@@ -20,8 +20,9 @@ impl Mint {
             .map(|b| b.blinded_secret)
             .collect();
 
-        if self
-            .localstore
+        let mut tx = self.localstore.begin_transaction().await?;
+
+        if tx
             .get_blind_signatures(&blinded_messages)
             .await?
             .iter()
@@ -60,10 +61,10 @@ impl Mint {
 
         let input_ys = swap_request.inputs.ys()?;
 
-        self.localstore
-            .add_proofs(swap_request.inputs.clone(), None)
+        tx.add_proofs(swap_request.inputs.clone(), None).await?;
+
+        self.check_ys_spendable(&mut tx, &input_ys, State::Pending)
             .await?;
-        self.check_ys_spendable(&input_ys, State::Pending).await?;
 
         // Check that there are no duplicate proofs in request
         if input_ys
@@ -72,14 +73,12 @@ impl Mint {
             .len()
             .ne(&proof_count)
         {
-            self.localstore.remove_proofs(&input_ys, None).await?;
             return Err(Error::DuplicateProofs);
         }
 
         for proof in &swap_request.inputs {
             if let Err(err) = self.verify_proof(proof).await {
                 tracing::info!("Error verifying proof in swap");
-                self.localstore.remove_proofs(&input_ys, None).await?;
                 return Err(err);
             }
         }
@@ -96,7 +95,7 @@ impl Mint {
                 }
                 None => {
                     tracing::info!("Swap request with unknown keyset in inputs");
-                    self.localstore.remove_proofs(&input_ys, None).await?;
+                    return Err(Error::UnknownKeySet);
                 }
             }
         }
@@ -111,7 +110,7 @@ impl Mint {
                 }
                 None => {
                     tracing::info!("Swap request with unknown keyset in outputs");
-                    self.localstore.remove_proofs(&input_ys, None).await?;
+                    return Err(Error::UnknownKeySet);
                 }
             }
         }
@@ -121,7 +120,6 @@ impl Mint {
         // now
         if keyset_units.len().gt(&1) {
             tracing::error!("Only one unit is allowed in request: {:?}", keyset_units);
-            self.localstore.remove_proofs(&input_ys, None).await?;
             return Err(Error::UnsupportedUnit);
         }
 
@@ -136,7 +134,6 @@ impl Mint {
             for blinded_message in &swap_request.outputs {
                 if let Err(err) = blinded_message.verify_p2pk(&pubkeys, sigs_required) {
                     tracing::info!("Could not verify p2pk in swap request");
-                    self.localstore.remove_proofs(&input_ys, None).await?;
                     return Err(err.into());
                 }
             }
@@ -149,26 +146,25 @@ impl Mint {
             promises.push(blinded_signature);
         }
 
-        self.localstore
-            .update_proofs_states(&input_ys, State::Spent)
-            .await?;
+        tx.update_proofs_states(&input_ys, State::Spent).await?;
+
+        tx.add_blind_signatures(
+            &swap_request
+                .outputs
+                .iter()
+                .map(|o| o.blinded_secret)
+                .collect::<Vec<PublicKey>>(),
+            &promises,
+            None,
+        )
+        .await?;
+
+        tx.commit().await?;
 
         for pub_key in input_ys {
             self.pubsub_manager.proof_state((pub_key, State::Spent));
         }
 
-        self.localstore
-            .add_blind_signatures(
-                &swap_request
-                    .outputs
-                    .iter()
-                    .map(|o| o.blinded_secret)
-                    .collect::<Vec<PublicKey>>(),
-                &promises,
-                None,
-            )
-            .await?;
-
         Ok(SwapResponse::new(promises))
     }
 }