فهرست منبع

Feat: implement database transaction pattern across all database backends

Separate read and write operations across the codebase by introducing explicit
database transactions. Write methods are moved from the Database trait to a new
DatabaseTransaction trait, requiring all write operations to be performed
within explicit transactions.

Database Layer Changes:
- Create WalletDatabaseTransaction trait with all write methods
- Add DbTransactionFinalizer trait with commit/rollback methods
- Update WalletDatabase trait to only contain read methods and
  begin_db_transaction()
- Add required read methods to WalletDatabaseTransaction for operations needed
  during transactions (get_keyset_by_id, get_mint_quote, get_melt_quote,
get_proofs)

This change improves transaction safety, enables proper rollback on errors, and
provides a foundation for more complex multi-operation transactions.  Maintains
backward compatibility with the existing FFI interface while adapting to a
transaction-based architecture.
Cesar Rodas 1 هفته پیش
والد
کامیت
a49466357a

+ 1 - 14
crates/cdk-common/src/database/mint/mod.rs

@@ -6,7 +6,7 @@ use async_trait::async_trait;
 use cashu::quote_id::QuoteId;
 use cashu::Amount;
 
-use super::Error;
+use super::{DbTransactionFinalizer, Error};
 use crate::mint::{self, MintKeySetInfo, MintQuote as MintMintQuote, Operation};
 use crate::nuts::{
     BlindSignature, BlindedMessage, CurrencyUnit, Id, MeltQuoteState, Proof, Proofs, PublicKey,
@@ -403,19 +403,6 @@ pub trait SagaDatabase {
     ) -> Result<Vec<mint::Saga>, Self::Err>;
 }
 
-#[async_trait]
-/// Commit and Rollback
-pub trait DbTransactionFinalizer {
-    /// Mint Signature Database Error
-    type Err: Into<Error> + From<Error>;
-
-    /// Commits all the changes into the database
-    async fn commit(self: Box<Self>) -> Result<(), Self::Err>;
-
-    /// Rollbacks the write transaction
-    async fn rollback(self: Box<Self>) -> Result<(), Self::Err>;
-}
-
 /// Key-Value Store Transaction trait
 #[async_trait]
 pub trait KVStoreTransaction<'a, Error>: DbTransactionFinalizer<Err = Error> {

+ 23 - 7
crates/cdk-common/src/database/mod.rs

@@ -7,18 +7,21 @@ mod wallet;
 
 #[cfg(feature = "mint")]
 pub use mint::{
-    Database as MintDatabase, DbTransactionFinalizer as MintDbWriterFinalizer, DynMintDatabase,
-    KVStore as MintKVStore, KVStoreDatabase as MintKVStoreDatabase,
-    KVStoreTransaction as MintKVStoreTransaction, KeysDatabase as MintKeysDatabase,
-    KeysDatabaseTransaction as MintKeyDatabaseTransaction, ProofsDatabase as MintProofsDatabase,
-    ProofsTransaction as MintProofsTransaction, QuotesDatabase as MintQuotesDatabase,
-    QuotesTransaction as MintQuotesTransaction, SignaturesDatabase as MintSignaturesDatabase,
+    Database as MintDatabase, DynMintDatabase, KVStore as MintKVStore,
+    KVStoreDatabase as MintKVStoreDatabase, KVStoreTransaction as MintKVStoreTransaction,
+    KeysDatabase as MintKeysDatabase, KeysDatabaseTransaction as MintKeyDatabaseTransaction,
+    ProofsDatabase as MintProofsDatabase, ProofsTransaction as MintProofsTransaction,
+    QuotesDatabase as MintQuotesDatabase, QuotesTransaction as MintQuotesTransaction,
+    SignaturesDatabase as MintSignaturesDatabase,
     SignaturesTransaction as MintSignatureTransaction, Transaction as MintTransaction,
 };
 #[cfg(all(feature = "mint", feature = "auth"))]
 pub use mint::{DynMintAuthDatabase, MintAuthDatabase, MintAuthTransaction};
 #[cfg(feature = "wallet")]
-pub use wallet::Database as WalletDatabase;
+pub use wallet::{
+    Database as WalletDatabase, DatabaseTransaction as WalletDatabaseTransaction,
+    DynWalletDatabaseTransaction,
+};
 
 /// Data conversion error
 #[derive(thiserror::Error, Debug)]
@@ -203,3 +206,16 @@ impl From<crate::state::Error> for Error {
         }
     }
 }
+
+#[async_trait::async_trait]
+/// Commit and Rollback
+pub trait DbTransactionFinalizer {
+    /// Mint Signature Database Error
+    type Err: Into<Error> + From<Error>;
+
+    /// Commits all the changes into the database
+    async fn commit(self: Box<Self>) -> Result<(), Self::Err>;
+
+    /// Rollbacks the write transaction
+    async fn rollback(self: Box<Self>) -> Result<(), Self::Err>;
+}

+ 102 - 45
crates/cdk-common/src/database/wallet.rs

@@ -6,7 +6,7 @@ use std::fmt::Debug;
 use async_trait::async_trait;
 use cashu::KeySet;
 
-use super::Error;
+use super::{DbTransactionFinalizer, Error};
 use crate::common::ProofInfo;
 use crate::mint_url::MintUrl;
 use crate::nuts::{
@@ -16,78 +16,142 @@ use crate::wallet::{
     self, MintQuote as WalletMintQuote, Transaction, TransactionDirection, TransactionId,
 };
 
-/// Wallet Database trait
+/// Easy to use Dynamic Database type alias
+pub type DynWalletDatabaseTransaction<'a> =
+    Box<dyn DatabaseTransaction<'a, super::Error> + Sync + Send + 'a>;
+
+/// Database transaction
+///
+/// This trait encapsulates all the changes to be done in the wallet
 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
-pub trait Database: Debug {
-    /// Wallet Database Error
-    type Err: Into<Error> + From<Error>;
-
+pub trait DatabaseTransaction<'a, Error>: DbTransactionFinalizer<Err = Error> {
     /// Add Mint to storage
     async fn add_mint(
-        &self,
+        &mut self,
         mint_url: MintUrl,
         mint_info: Option<MintInfo>,
-    ) -> Result<(), Self::Err>;
+    ) -> Result<(), Error>;
+
     /// Remove Mint from storage
-    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), Self::Err>;
-    /// Get mint from storage
-    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err>;
-    /// Get all mints from storage
-    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, Self::Err>;
+    async fn remove_mint(&mut self, mint_url: MintUrl) -> Result<(), Error>;
+
     /// Update mint url
     async fn update_mint_url(
-        &self,
+        &mut self,
         old_mint_url: MintUrl,
         new_mint_url: MintUrl,
-    ) -> Result<(), Self::Err>;
+    ) -> Result<(), Error>;
+
+    /// Get mint keyset by id
+    async fn get_keyset_by_id(&mut self, keyset_id: &Id) -> Result<Option<KeySetInfo>, Error>;
 
     /// Add mint keyset to storage
     async fn add_mint_keysets(
-        &self,
+        &mut self,
         mint_url: MintUrl,
         keysets: Vec<KeySetInfo>,
-    ) -> Result<(), Self::Err>;
+    ) -> Result<(), Error>;
+
+    /// Get mint quote from storage. This function locks the returned minted quote for update
+    async fn get_mint_quote(
+        &mut self,
+        quote_id: &str,
+    ) -> Result<Option<WalletMintQuote>, Self::Err>;
+
+    /// Add mint quote to storage
+    async fn add_mint_quote(&mut self, quote: WalletMintQuote) -> Result<(), Error>;
+
+    /// Remove mint quote from storage
+    async fn remove_mint_quote(&mut self, quote_id: &str) -> Result<(), Error>;
+
+    /// Get melt quote from storage
+    async fn get_melt_quote(&mut self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Error>;
+
+    /// Add melt quote to storage
+    async fn add_melt_quote(&mut self, quote: wallet::MeltQuote) -> Result<(), Error>;
+
+    /// Remove melt quote from storage
+    async fn remove_melt_quote(&mut self, quote_id: &str) -> Result<(), Error>;
+
+    /// Add [`Keys`] to storage
+    async fn add_keys(&mut self, keyset: KeySet) -> Result<(), Error>;
+
+    /// Remove [`Keys`] from storage
+    async fn remove_keys(&mut self, id: &Id) -> Result<(), Error>;
+
+    /// Get proofs from storage and lock them for update
+    async fn get_proofs(
+        &mut self,
+        mint_url: Option<MintUrl>,
+        unit: Option<CurrencyUnit>,
+        state: Option<Vec<State>>,
+        spending_conditions: Option<Vec<SpendingConditions>>,
+    ) -> Result<Vec<ProofInfo>, Error>;
+
+    /// Update the proofs in storage by adding new proofs or removing proofs by
+    /// their Y value.
+    async fn update_proofs(
+        &mut self,
+        added: Vec<ProofInfo>,
+        removed_ys: Vec<PublicKey>,
+    ) -> Result<(), Error>;
+
+    /// Update proofs state in storage
+    async fn update_proofs_state(&mut self, ys: Vec<PublicKey>, state: State) -> Result<(), Error>;
+
+    /// Atomically increment Keyset counter and return new value
+    async fn increment_keyset_counter(&mut self, keyset_id: &Id, count: u32) -> Result<u32, Error>;
+
+    /// Add transaction to storage
+    async fn add_transaction(&mut self, transaction: Transaction) -> Result<(), Error>;
+
+    /// Remove transaction from storage
+    async fn remove_transaction(&mut self, transaction_id: TransactionId) -> Result<(), Error>;
+}
+
+/// Wallet Database trait
+#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
+#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
+pub trait Database: Debug {
+    /// Wallet Database Error
+    type Err: Into<Error> + From<Error>;
+
+    /// Begins a DB transaction
+    async fn begin_db_transaction<'a>(
+        &'a self,
+    ) -> Result<Box<dyn DatabaseTransaction<'a, Self::Err> + Send + Sync + 'a>, Self::Err>;
+
+    /// Get mint from storage
+    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err>;
+
+    /// Get all mints from storage
+    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, Self::Err>;
+
     /// Get mint keysets for mint url
     async fn get_mint_keysets(
         &self,
         mint_url: MintUrl,
     ) -> Result<Option<Vec<KeySetInfo>>, Self::Err>;
+
     /// Get mint keyset by id
     async fn get_keyset_by_id(&self, keyset_id: &Id) -> Result<Option<KeySetInfo>, Self::Err>;
 
-    /// Add mint quote to storage
-    async fn add_mint_quote(&self, quote: WalletMintQuote) -> Result<(), Self::Err>;
     /// Get mint quote from storage
     async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<WalletMintQuote>, Self::Err>;
+
     /// Get mint quotes from storage
     async fn get_mint_quotes(&self) -> Result<Vec<WalletMintQuote>, Self::Err>;
-    /// Remove mint quote from storage
-    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err>;
 
-    /// Add melt quote to storage
-    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), Self::Err>;
     /// Get melt quote from storage
     async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Self::Err>;
+
     /// Get melt quotes from storage
     async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, Self::Err>;
-    /// Remove melt quote from storage
-    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err>;
 
-    /// Add [`Keys`] to storage
-    async fn add_keys(&self, keyset: KeySet) -> Result<(), Self::Err>;
     /// Get [`Keys`] from storage
     async fn get_keys(&self, id: &Id) -> Result<Option<Keys>, Self::Err>;
-    /// Remove [`Keys`] from storage
-    async fn remove_keys(&self, id: &Id) -> Result<(), Self::Err>;
 
-    /// Update the proofs in storage by adding new proofs or removing proofs by
-    /// their Y value.
-    async fn update_proofs(
-        &self,
-        added: Vec<ProofInfo>,
-        removed_ys: Vec<PublicKey>,
-    ) -> Result<(), Self::Err>;
     /// Get proofs from storage
     async fn get_proofs(
         &self,
@@ -96,6 +160,7 @@ pub trait Database: Debug {
         state: Option<Vec<State>>,
         spending_conditions: Option<Vec<SpendingConditions>>,
     ) -> Result<Vec<ProofInfo>, Self::Err>;
+
     /// Get balance
     async fn get_balance(
         &self,
@@ -103,19 +168,13 @@ pub trait Database: Debug {
         unit: Option<CurrencyUnit>,
         state: Option<Vec<State>>,
     ) -> Result<u64, Self::Err>;
-    /// Update proofs state in storage
-    async fn update_proofs_state(&self, ys: Vec<PublicKey>, state: State) -> Result<(), Self::Err>;
 
-    /// Atomically increment Keyset counter and return new value
-    async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<u32, Self::Err>;
-
-    /// Add transaction to storage
-    async fn add_transaction(&self, transaction: Transaction) -> Result<(), Self::Err>;
     /// Get transaction from storage
     async fn get_transaction(
         &self,
         transaction_id: TransactionId,
     ) -> Result<Option<Transaction>, Self::Err>;
+
     /// List transactions from storage
     async fn list_transactions(
         &self,
@@ -123,6 +182,4 @@ pub trait Database: Debug {
         direction: Option<TransactionDirection>,
         unit: Option<CurrencyUnit>,
     ) -> Result<Vec<Transaction>, Self::Err>;
-    /// Remove transaction from storage
-    async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), Self::Err>;
 }

+ 1 - 0
crates/cdk-ffi/Cargo.toml

@@ -28,6 +28,7 @@ tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread"] }
 uniffi = { version = "0.29", features = ["cli", "tokio"] }
 url = { workspace = true }
 uuid = { workspace = true, features = ["v4"] }
+cdk-common.workspace = true
 
 
 [features]

+ 284 - 144
crates/cdk-ffi/src/database.rs

@@ -4,6 +4,7 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use cdk::cdk_database::WalletDatabase as CdkWalletDatabase;
+use cdk_common::database::{DbTransactionFinalizer, WalletDatabaseTransaction};
 
 use crate::error::FfiError;
 use crate::postgres::WalletPostgresDatabase;
@@ -173,27 +174,6 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
     type Err = cdk::cdk_database::Error;
 
     // Mint Management
-    async fn add_mint(
-        &self,
-        mint_url: cdk::mint_url::MintUrl,
-        mint_info: Option<cdk::nuts::MintInfo>,
-    ) -> Result<(), Self::Err> {
-        let ffi_mint_url = mint_url.into();
-        let ffi_mint_info = mint_info.map(Into::into);
-        self.ffi_db
-            .add_mint(ffi_mint_url, ffi_mint_info)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
-    async fn remove_mint(&self, mint_url: cdk::mint_url::MintUrl) -> Result<(), Self::Err> {
-        let ffi_mint_url = mint_url.into();
-        self.ffi_db
-            .remove_mint(ffi_mint_url)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     async fn get_mint(
         &self,
         mint_url: cdk::mint_url::MintUrl,
@@ -226,34 +206,7 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
         Ok(cdk_result)
     }
 
-    async fn update_mint_url(
-        &self,
-        old_mint_url: cdk::mint_url::MintUrl,
-        new_mint_url: cdk::mint_url::MintUrl,
-    ) -> Result<(), Self::Err> {
-        let ffi_old_mint_url = old_mint_url.into();
-        let ffi_new_mint_url = new_mint_url.into();
-        self.ffi_db
-            .update_mint_url(ffi_old_mint_url, ffi_new_mint_url)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     // Keyset Management
-    async fn add_mint_keysets(
-        &self,
-        mint_url: cdk::mint_url::MintUrl,
-        keysets: Vec<cdk::nuts::KeySetInfo>,
-    ) -> Result<(), Self::Err> {
-        let ffi_mint_url = mint_url.into();
-        let ffi_keysets: Vec<KeySetInfo> = keysets.into_iter().map(Into::into).collect();
-
-        self.ffi_db
-            .add_mint_keysets(ffi_mint_url, ffi_keysets)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     async fn get_mint_keysets(
         &self,
         mint_url: cdk::mint_url::MintUrl,
@@ -281,14 +234,6 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
     }
 
     // Mint Quote Management
-    async fn add_mint_quote(&self, quote: cdk::wallet::MintQuote) -> Result<(), Self::Err> {
-        let ffi_quote = quote.into();
-        self.ffi_db
-            .add_mint_quote(ffi_quote)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     async fn get_mint_quote(
         &self,
         quote_id: &str,
@@ -321,22 +266,7 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
             .collect::<Result<Vec<_>, _>>()?)
     }
 
-    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
-        self.ffi_db
-            .remove_mint_quote(quote_id.to_string())
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     // Melt Quote Management
-    async fn add_melt_quote(&self, quote: cdk::wallet::MeltQuote) -> Result<(), Self::Err> {
-        let ffi_quote = quote.into();
-        self.ffi_db
-            .add_melt_quote(ffi_quote)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     async fn get_melt_quote(
         &self,
         quote_id: &str,
@@ -369,22 +299,7 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
             .collect::<Result<Vec<_>, _>>()?)
     }
 
-    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
-        self.ffi_db
-            .remove_melt_quote(quote_id.to_string())
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     // Keys Management
-    async fn add_keys(&self, keyset: cdk::nuts::KeySet) -> Result<(), Self::Err> {
-        let ffi_keyset: KeySet = keyset.into();
-        self.ffi_db
-            .add_keys(ffi_keyset)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     async fn get_keys(&self, id: &cdk::nuts::Id) -> Result<Option<cdk::nuts::Keys>, Self::Err> {
         let ffi_id: Id = (*id).into();
         let result = self
@@ -403,29 +318,7 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
             .transpose()
     }
 
-    async fn remove_keys(&self, id: &cdk::nuts::Id) -> Result<(), Self::Err> {
-        let ffi_id = (*id).into();
-        self.ffi_db
-            .remove_keys(ffi_id)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     // Proof Management
-    async fn update_proofs(
-        &self,
-        added: Vec<cdk::types::ProofInfo>,
-        removed_ys: Vec<cdk::nuts::PublicKey>,
-    ) -> Result<(), Self::Err> {
-        let ffi_added: Vec<ProofInfo> = added.into_iter().map(Into::into).collect();
-        let ffi_removed_ys: Vec<PublicKey> = removed_ys.into_iter().map(Into::into).collect();
-
-        self.ffi_db
-            .update_proofs(ffi_added, ffi_removed_ys)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
-    }
-
     async fn get_proofs(
         &self,
         mint_url: Option<cdk::mint_url::MintUrl>,
@@ -491,26 +384,202 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
             .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
     }
 
-    async fn update_proofs_state(
+    // Transaction Management
+    async fn get_transaction(
+        &self,
+        transaction_id: cdk::wallet::types::TransactionId,
+    ) -> Result<Option<cdk::wallet::types::Transaction>, Self::Err> {
+        let ffi_id = transaction_id.into();
+        let result = self
+            .ffi_db
+            .get_transaction(ffi_id)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))?;
+
+        result
+            .map(|tx| tx.try_into())
+            .transpose()
+            .map_err(|e: FfiError| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn list_transactions(
         &self,
+        mint_url: Option<cdk::mint_url::MintUrl>,
+        direction: Option<cdk::wallet::types::TransactionDirection>,
+        unit: Option<cdk::nuts::CurrencyUnit>,
+    ) -> Result<Vec<cdk::wallet::types::Transaction>, Self::Err> {
+        let ffi_mint_url = mint_url.map(Into::into);
+        let ffi_direction = direction.map(Into::into);
+        let ffi_unit = unit.map(Into::into);
+
+        let result = self
+            .ffi_db
+            .list_transactions(ffi_mint_url, ffi_direction, ffi_unit)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))?;
+
+        result
+            .into_iter()
+            .map(|tx| tx.try_into())
+            .collect::<Result<Vec<_>, FfiError>>()
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn begin_db_transaction<'a>(
+        &'a self,
+    ) -> Result<Box<dyn WalletDatabaseTransaction<'a, Self::Err> + Send + Sync + 'a>, Self::Err>
+    {
+        Ok(Box::new(WalletDatabaseTransactionBridge {
+            ffi_db: Arc::clone(&self.ffi_db),
+        }))
+    }
+}
+
+/// Transaction bridge for FFI wallet database
+struct WalletDatabaseTransactionBridge {
+    ffi_db: Arc<dyn WalletDatabase>,
+}
+
+#[async_trait::async_trait]
+impl<'a> WalletDatabaseTransaction<'a, cdk::cdk_database::Error>
+    for WalletDatabaseTransactionBridge
+{
+    async fn add_mint(
+        &mut self,
+        mint_url: cdk::mint_url::MintUrl,
+        mint_info: Option<cdk::nuts::MintInfo>,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_mint_url = mint_url.into();
+        let ffi_mint_info = mint_info.map(Into::into);
+        self.ffi_db
+            .add_mint(ffi_mint_url, ffi_mint_info)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn remove_mint(
+        &mut self,
+        mint_url: cdk::mint_url::MintUrl,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_mint_url = mint_url.into();
+        self.ffi_db
+            .remove_mint(ffi_mint_url)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn update_mint_url(
+        &mut self,
+        old_mint_url: cdk::mint_url::MintUrl,
+        new_mint_url: cdk::mint_url::MintUrl,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_old_mint_url = old_mint_url.into();
+        let ffi_new_mint_url = new_mint_url.into();
+        self.ffi_db
+            .update_mint_url(ffi_old_mint_url, ffi_new_mint_url)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn add_mint_keysets(
+        &mut self,
+        mint_url: cdk::mint_url::MintUrl,
+        keysets: Vec<cdk::nuts::KeySetInfo>,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_mint_url = mint_url.into();
+        let ffi_keysets: Vec<KeySetInfo> = keysets.into_iter().map(Into::into).collect();
+        self.ffi_db
+            .add_mint_keysets(ffi_mint_url, ffi_keysets)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn add_mint_quote(
+        &mut self,
+        quote: cdk::wallet::MintQuote,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_quote = quote.into();
+        self.ffi_db
+            .add_mint_quote(ffi_quote)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn remove_mint_quote(&mut self, quote_id: &str) -> Result<(), cdk::cdk_database::Error> {
+        self.ffi_db
+            .remove_mint_quote(quote_id.to_string())
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn add_melt_quote(
+        &mut self,
+        quote: cdk::wallet::MeltQuote,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_quote = quote.into();
+        self.ffi_db
+            .add_melt_quote(ffi_quote)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn remove_melt_quote(&mut self, quote_id: &str) -> Result<(), cdk::cdk_database::Error> {
+        self.ffi_db
+            .remove_melt_quote(quote_id.to_string())
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn add_keys(
+        &mut self,
+        keyset: cdk::nuts::KeySet,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_keyset: KeySet = keyset.into();
+        self.ffi_db
+            .add_keys(ffi_keyset)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn remove_keys(&mut self, id: &cdk::nuts::Id) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_id = (*id).into();
+        self.ffi_db
+            .remove_keys(ffi_id)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn update_proofs(
+        &mut self,
+        added: Vec<cdk::types::ProofInfo>,
+        removed_ys: Vec<cdk::nuts::PublicKey>,
+    ) -> Result<(), cdk::cdk_database::Error> {
+        let ffi_added: Vec<ProofInfo> = added.into_iter().map(Into::into).collect();
+        let ffi_removed_ys: Vec<PublicKey> = removed_ys.into_iter().map(Into::into).collect();
+        self.ffi_db
+            .update_proofs(ffi_added, ffi_removed_ys)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    async fn update_proofs_state(
+        &mut self,
         ys: Vec<cdk::nuts::PublicKey>,
         state: cdk::nuts::State,
-    ) -> Result<(), Self::Err> {
+    ) -> Result<(), cdk::cdk_database::Error> {
         let ffi_ys: Vec<PublicKey> = ys.into_iter().map(Into::into).collect();
         let ffi_state = state.into();
-
         self.ffi_db
             .update_proofs_state(ffi_ys, ffi_state)
             .await
             .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
     }
 
-    // Keyset Counter Management
     async fn increment_keyset_counter(
-        &self,
+        &mut self,
         keyset_id: &cdk::nuts::Id,
         count: u32,
-    ) -> Result<u32, Self::Err> {
+    ) -> Result<u32, cdk::cdk_database::Error> {
         let ffi_id = (*keyset_id).into();
         self.ffi_db
             .increment_keyset_counter(ffi_id, count)
@@ -518,11 +587,10 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
             .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
     }
 
-    // Transaction Management
     async fn add_transaction(
-        &self,
+        &mut self,
         transaction: cdk::wallet::types::Transaction,
-    ) -> Result<(), Self::Err> {
+    ) -> Result<(), cdk::cdk_database::Error> {
         let ffi_transaction = transaction.into();
         self.ffi_db
             .add_transaction(ffi_transaction)
@@ -530,55 +598,127 @@ impl CdkWalletDatabase for WalletDatabaseBridge {
             .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
     }
 
-    async fn get_transaction(
-        &self,
+    async fn remove_transaction(
+        &mut self,
         transaction_id: cdk::wallet::types::TransactionId,
-    ) -> Result<Option<cdk::wallet::types::Transaction>, Self::Err> {
+    ) -> Result<(), cdk::cdk_database::Error> {
         let ffi_id = transaction_id.into();
+        self.ffi_db
+            .remove_transaction(ffi_id)
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+    }
+
+    // Read methods needed during transactions
+    async fn get_keyset_by_id(
+        &mut self,
+        keyset_id: &cdk::nuts::Id,
+    ) -> Result<Option<cdk::nuts::KeySetInfo>, cdk::cdk_database::Error> {
+        let ffi_id = (*keyset_id).into();
         let result = self
             .ffi_db
-            .get_transaction(ffi_id)
+            .get_keyset_by_id(ffi_id)
             .await
             .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))?;
+        Ok(result.map(Into::into))
+    }
 
-        result
-            .map(|tx| tx.try_into())
-            .transpose()
-            .map_err(|e: FfiError| cdk::cdk_database::Error::Database(e.to_string().into()))
+    async fn get_mint_quote(
+        &mut self,
+        quote_id: &str,
+    ) -> Result<Option<cdk::wallet::MintQuote>, cdk::cdk_database::Error> {
+        let result = self
+            .ffi_db
+            .get_mint_quote(quote_id.to_string())
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))?;
+        Ok(result
+            .map(|q| {
+                q.try_into()
+                    .map_err(|e: FfiError| cdk::cdk_database::Error::Database(e.to_string().into()))
+            })
+            .transpose()?)
     }
 
-    async fn list_transactions(
-        &self,
+    async fn get_melt_quote(
+        &mut self,
+        quote_id: &str,
+    ) -> Result<Option<cdk::wallet::MeltQuote>, cdk::cdk_database::Error> {
+        let result = self
+            .ffi_db
+            .get_melt_quote(quote_id.to_string())
+            .await
+            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))?;
+        Ok(result
+            .map(|q| {
+                q.try_into()
+                    .map_err(|e: FfiError| cdk::cdk_database::Error::Database(e.to_string().into()))
+            })
+            .transpose()?)
+    }
+
+    async fn get_proofs(
+        &mut self,
         mint_url: Option<cdk::mint_url::MintUrl>,
-        direction: Option<cdk::wallet::types::TransactionDirection>,
         unit: Option<cdk::nuts::CurrencyUnit>,
-    ) -> Result<Vec<cdk::wallet::types::Transaction>, Self::Err> {
+        state: Option<Vec<cdk::nuts::State>>,
+        spending_conditions: Option<Vec<cdk::nuts::SpendingConditions>>,
+    ) -> Result<Vec<cdk::types::ProofInfo>, cdk::cdk_database::Error> {
         let ffi_mint_url = mint_url.map(Into::into);
-        let ffi_direction = direction.map(Into::into);
         let ffi_unit = unit.map(Into::into);
+        let ffi_state = state.map(|s| s.into_iter().map(Into::into).collect());
+        let ffi_spending_conditions =
+            spending_conditions.map(|sc| sc.into_iter().map(Into::into).collect());
 
         let result = self
             .ffi_db
-            .list_transactions(ffi_mint_url, ffi_direction, ffi_unit)
+            .get_proofs(ffi_mint_url, ffi_unit, ffi_state, ffi_spending_conditions)
             .await
             .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))?;
 
-        result
+        // Convert back to CDK ProofInfo
+        let cdk_result: Result<Vec<cdk::types::ProofInfo>, cdk::cdk_database::Error> = result
             .into_iter()
-            .map(|tx| tx.try_into())
-            .collect::<Result<Vec<_>, FfiError>>()
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+            .map(|info| {
+                Ok(cdk::types::ProofInfo {
+                    proof: info.proof.try_into().map_err(|e: FfiError| {
+                        cdk::cdk_database::Error::Database(e.to_string().into())
+                    })?,
+                    y: info.y.try_into().map_err(|e: FfiError| {
+                        cdk::cdk_database::Error::Database(e.to_string().into())
+                    })?,
+                    mint_url: info.mint_url.try_into().map_err(|e: FfiError| {
+                        cdk::cdk_database::Error::Database(e.to_string().into())
+                    })?,
+                    state: info.state.into(),
+                    spending_condition: info
+                        .spending_condition
+                        .map(|sc| sc.try_into())
+                        .transpose()
+                        .map_err(|e: FfiError| {
+                            cdk::cdk_database::Error::Database(e.to_string().into())
+                        })?,
+                    unit: info.unit.into(),
+                })
+            })
+            .collect();
+
+        cdk_result
     }
+}
 
-    async fn remove_transaction(
-        &self,
-        transaction_id: cdk::wallet::types::TransactionId,
-    ) -> Result<(), Self::Err> {
-        let ffi_id = transaction_id.into();
-        self.ffi_db
-            .remove_transaction(ffi_id)
-            .await
-            .map_err(|e| cdk::cdk_database::Error::Database(e.to_string().into()))
+#[async_trait::async_trait]
+impl DbTransactionFinalizer for WalletDatabaseTransactionBridge {
+    type Err = cdk::cdk_database::Error;
+
+    async fn commit(self: Box<Self>) -> Result<(), cdk::cdk_database::Error> {
+        // FFI databases handle transactions internally, no-op here
+        Ok(())
+    }
+
+    async fn rollback(self: Box<Self>) -> Result<(), cdk::cdk_database::Error> {
+        // FFI databases handle transactions internally, no-op here
+        Ok(())
     }
 }
 

+ 138 - 35
crates/cdk-ffi/src/postgres.rs

@@ -47,19 +47,29 @@ impl WalletDatabase for WalletPostgresDatabase {
     ) -> Result<(), FfiError> {
         let cdk_mint_url = mint_url.try_into()?;
         let cdk_mint_info = mint_info.map(Into::into);
-        println!("adding new mint");
-        self.inner
-            .add_mint(cdk_mint_url, cdk_mint_info)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
             .await
-            .map_err(|e| {
-                println!("ffi error {:?}", e);
-                FfiError::Database { msg: e.to_string() }
-            })
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_mint(cdk_mint_url, cdk_mint_info)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
     async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), FfiError> {
         let cdk_mint_url = mint_url.try_into()?;
-        self.inner
-            .remove_mint(cdk_mint_url)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_mint(cdk_mint_url)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -90,8 +100,15 @@ impl WalletDatabase for WalletPostgresDatabase {
     ) -> Result<(), FfiError> {
         let cdk_old_mint_url = old_mint_url.try_into()?;
         let cdk_new_mint_url = new_mint_url.try_into()?;
-        self.inner
-            .update_mint_url(cdk_old_mint_url, cdk_new_mint_url)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.update_mint_url(cdk_old_mint_url, cdk_new_mint_url)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -102,8 +119,15 @@ impl WalletDatabase for WalletPostgresDatabase {
     ) -> Result<(), FfiError> {
         let cdk_mint_url = mint_url.try_into()?;
         let cdk_keysets: Vec<cdk::nuts::KeySetInfo> = keysets.into_iter().map(Into::into).collect();
-        self.inner
-            .add_mint_keysets(cdk_mint_url, cdk_keysets)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_mint_keysets(cdk_mint_url, cdk_keysets)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -133,8 +157,15 @@ impl WalletDatabase for WalletPostgresDatabase {
     // Mint Quote Management
     async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), FfiError> {
         let cdk_quote = quote.try_into()?;
-        self.inner
-            .add_mint_quote(cdk_quote)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_mint_quote(cdk_quote)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -158,8 +189,15 @@ impl WalletDatabase for WalletPostgresDatabase {
     }
 
     async fn remove_mint_quote(&self, quote_id: String) -> Result<(), FfiError> {
-        self.inner
-            .remove_mint_quote(&quote_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_mint_quote(&quote_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -167,8 +205,15 @@ impl WalletDatabase for WalletPostgresDatabase {
     // Melt Quote Management
     async fn add_melt_quote(&self, quote: MeltQuote) -> Result<(), FfiError> {
         let cdk_quote = quote.try_into()?;
-        self.inner
-            .add_melt_quote(cdk_quote)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_melt_quote(cdk_quote)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -192,8 +237,15 @@ impl WalletDatabase for WalletPostgresDatabase {
     }
 
     async fn remove_melt_quote(&self, quote_id: String) -> Result<(), FfiError> {
-        self.inner
-            .remove_melt_quote(&quote_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_melt_quote(&quote_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -202,8 +254,15 @@ impl WalletDatabase for WalletPostgresDatabase {
     async fn add_keys(&self, keyset: KeySet) -> Result<(), FfiError> {
         // Convert FFI KeySet to cdk::nuts::KeySet
         let cdk_keyset: cdk::nuts::KeySet = keyset.try_into()?;
-        self.inner
-            .add_keys(cdk_keyset)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_keys(cdk_keyset)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -220,8 +279,15 @@ impl WalletDatabase for WalletPostgresDatabase {
 
     async fn remove_keys(&self, id: Id) -> Result<(), FfiError> {
         let cdk_id = id.into();
-        self.inner
-            .remove_keys(&cdk_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_keys(&cdk_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -255,8 +321,15 @@ impl WalletDatabase for WalletPostgresDatabase {
             removed_ys.into_iter().map(|pk| pk.try_into()).collect();
         let cdk_removed_ys = cdk_removed_ys?;
 
-        self.inner
-            .update_proofs(cdk_added, cdk_removed_ys)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.update_proofs(cdk_added, cdk_removed_ys)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -315,8 +388,15 @@ impl WalletDatabase for WalletPostgresDatabase {
         let cdk_ys = cdk_ys?;
         let cdk_state = state.into();
 
-        self.inner
-            .update_proofs_state(cdk_ys, cdk_state)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.update_proofs_state(cdk_ys, cdk_state)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -324,10 +404,19 @@ impl WalletDatabase for WalletPostgresDatabase {
     // Keyset Counter Management
     async fn increment_keyset_counter(&self, keyset_id: Id, count: u32) -> Result<u32, FfiError> {
         let cdk_id = keyset_id.into();
-        self.inner
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        let result = tx
             .increment_keyset_counter(&cdk_id, count)
             .await
-            .map_err(|e| FfiError::Database { msg: e.to_string() })
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        Ok(result)
     }
 
     // Transaction Management
@@ -335,8 +424,15 @@ impl WalletDatabase for WalletPostgresDatabase {
         // Convert FFI Transaction to CDK Transaction using TryFrom
         let cdk_transaction: cdk::wallet::types::Transaction = transaction.try_into()?;
 
-        self.inner
-            .add_transaction(cdk_transaction)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_transaction(cdk_transaction)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -375,8 +471,15 @@ impl WalletDatabase for WalletPostgresDatabase {
 
     async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), FfiError> {
         let cdk_id = transaction_id.try_into()?;
-        self.inner
-            .remove_transaction(cdk_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_transaction(cdk_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }

+ 137 - 30
crates/cdk-ffi/src/sqlite.rs

@@ -79,16 +79,30 @@ impl WalletDatabase for WalletSqliteDatabase {
     ) -> Result<(), FfiError> {
         let cdk_mint_url = mint_url.try_into()?;
         let cdk_mint_info = mint_info.map(Into::into);
-        self.inner
-            .add_mint(cdk_mint_url, cdk_mint_info)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_mint(cdk_mint_url, cdk_mint_info)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
 
     async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), FfiError> {
         let cdk_mint_url = mint_url.try_into()?;
-        self.inner
-            .remove_mint(cdk_mint_url)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_mint(cdk_mint_url)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -122,8 +136,15 @@ impl WalletDatabase for WalletSqliteDatabase {
     ) -> Result<(), FfiError> {
         let cdk_old_mint_url = old_mint_url.try_into()?;
         let cdk_new_mint_url = new_mint_url.try_into()?;
-        self.inner
-            .update_mint_url(cdk_old_mint_url, cdk_new_mint_url)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.update_mint_url(cdk_old_mint_url, cdk_new_mint_url)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -136,8 +157,15 @@ impl WalletDatabase for WalletSqliteDatabase {
     ) -> Result<(), FfiError> {
         let cdk_mint_url = mint_url.try_into()?;
         let cdk_keysets: Vec<cdk::nuts::KeySetInfo> = keysets.into_iter().map(Into::into).collect();
-        self.inner
-            .add_mint_keysets(cdk_mint_url, cdk_keysets)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_mint_keysets(cdk_mint_url, cdk_keysets)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -168,8 +196,15 @@ impl WalletDatabase for WalletSqliteDatabase {
     // Mint Quote Management
     async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), FfiError> {
         let cdk_quote = quote.try_into()?;
-        self.inner
-            .add_mint_quote(cdk_quote)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_mint_quote(cdk_quote)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -193,8 +228,15 @@ impl WalletDatabase for WalletSqliteDatabase {
     }
 
     async fn remove_mint_quote(&self, quote_id: String) -> Result<(), FfiError> {
-        self.inner
-            .remove_mint_quote(&quote_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_mint_quote(&quote_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -202,8 +244,15 @@ impl WalletDatabase for WalletSqliteDatabase {
     // Melt Quote Management
     async fn add_melt_quote(&self, quote: MeltQuote) -> Result<(), FfiError> {
         let cdk_quote = quote.try_into()?;
-        self.inner
-            .add_melt_quote(cdk_quote)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_melt_quote(cdk_quote)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -227,8 +276,15 @@ impl WalletDatabase for WalletSqliteDatabase {
     }
 
     async fn remove_melt_quote(&self, quote_id: String) -> Result<(), FfiError> {
-        self.inner
-            .remove_melt_quote(&quote_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_melt_quote(&quote_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -237,8 +293,15 @@ impl WalletDatabase for WalletSqliteDatabase {
     async fn add_keys(&self, keyset: KeySet) -> Result<(), FfiError> {
         // Convert FFI KeySet to cdk::nuts::KeySet
         let cdk_keyset: cdk::nuts::KeySet = keyset.try_into()?;
-        self.inner
-            .add_keys(cdk_keyset)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_keys(cdk_keyset)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -255,8 +318,15 @@ impl WalletDatabase for WalletSqliteDatabase {
 
     async fn remove_keys(&self, id: Id) -> Result<(), FfiError> {
         let cdk_id = id.into();
-        self.inner
-            .remove_keys(&cdk_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_keys(&cdk_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -290,8 +360,15 @@ impl WalletDatabase for WalletSqliteDatabase {
             removed_ys.into_iter().map(|pk| pk.try_into()).collect();
         let cdk_removed_ys = cdk_removed_ys?;
 
-        self.inner
-            .update_proofs(cdk_added, cdk_removed_ys)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.update_proofs(cdk_added, cdk_removed_ys)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -350,8 +427,15 @@ impl WalletDatabase for WalletSqliteDatabase {
         let cdk_ys = cdk_ys?;
         let cdk_state = state.into();
 
-        self.inner
-            .update_proofs_state(cdk_ys, cdk_state)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.update_proofs_state(cdk_ys, cdk_state)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -359,10 +443,19 @@ impl WalletDatabase for WalletSqliteDatabase {
     // Keyset Counter Management
     async fn increment_keyset_counter(&self, keyset_id: Id, count: u32) -> Result<u32, FfiError> {
         let cdk_id = keyset_id.into();
-        self.inner
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        let result = tx
             .increment_keyset_counter(&cdk_id, count)
             .await
-            .map_err(|e| FfiError::Database { msg: e.to_string() })
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        Ok(result)
     }
 
     // Transaction Management
@@ -370,8 +463,15 @@ impl WalletDatabase for WalletSqliteDatabase {
         // Convert FFI Transaction to CDK Transaction using TryFrom
         let cdk_transaction: cdk::wallet::types::Transaction = transaction.try_into()?;
 
-        self.inner
-            .add_transaction(cdk_transaction)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.add_transaction(cdk_transaction)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }
@@ -410,8 +510,15 @@ impl WalletDatabase for WalletSqliteDatabase {
 
     async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), FfiError> {
         let cdk_id = transaction_id.try_into()?;
-        self.inner
-            .remove_transaction(cdk_id)
+        let mut tx = self
+            .inner
+            .begin_db_transaction()
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.remove_transaction(cdk_id)
+            .await
+            .map_err(|e| FfiError::Database { msg: e.to_string() })?;
+        tx.commit()
             .await
             .map_err(|e| FfiError::Database { msg: e.to_string() })
     }

+ 3 - 3
crates/cdk-integration-tests/tests/fake_auth.rs

@@ -515,11 +515,11 @@ async fn test_reuse_auth_proof() {
         assert!(quote.amount == Some(10.into()));
     }
 
-    wallet
-        .localstore
-        .update_proofs(proofs, vec![])
+    let mut tx = wallet.localstore.begin_db_transaction().await.unwrap();
+    tx.update_proofs(vec![], proofs.iter().map(|p| p.y).collect())
         .await
         .unwrap();
+    tx.commit().await.unwrap();
 
     {
         let quote_res = wallet.mint_quote(10.into(), None).await;

+ 531 - 463
crates/cdk-redb/src/wallet/mod.rs

@@ -54,6 +54,29 @@ pub struct WalletRedbDatabase {
     db: Arc<Database>,
 }
 
+/// Redb Wallet Transaction
+pub struct RedbWalletTransaction {
+    write_txn: Option<redb::WriteTransaction>,
+}
+
+impl RedbWalletTransaction {
+    /// Create a new transaction
+    fn new(write_txn: redb::WriteTransaction) -> Self {
+        Self {
+            write_txn: Some(write_txn),
+        }
+    }
+
+    /// Get a mutable reference to the write transaction
+    fn txn(&mut self) -> Result<&mut redb::WriteTransaction, Error> {
+        self.write_txn.as_mut().ok_or_else(|| {
+            Error::CDKDatabase(database::Error::Internal(
+                "Transaction already consumed".to_owned(),
+            ))
+        })
+    }
+}
+
 impl WalletRedbDatabase {
     /// Create new [`WalletRedbDatabase`]
     pub fn new(path: &Path) -> Result<Self, Error> {
@@ -189,45 +212,6 @@ impl WalletDatabase for WalletRedbDatabase {
     type Err = database::Error;
 
     #[instrument(skip(self))]
-    async fn add_mint(
-        &self,
-        mint_url: MintUrl,
-        mint_info: Option<MintInfo>,
-    ) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
-            table
-                .insert(
-                    mint_url.to_string().as_str(),
-                    serde_json::to_string(&mint_info)
-                        .map_err(Error::from)?
-                        .as_str(),
-                )
-                .map_err(Error::from)?;
-        }
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
-    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
-            table
-                .remove(mint_url.to_string().as_str())
-                .map_err(Error::from)?;
-        }
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
     async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
         let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
@@ -262,149 +246,6 @@ impl WalletDatabase for WalletRedbDatabase {
     }
 
     #[instrument(skip(self))]
-    async fn update_mint_url(
-        &self,
-        old_mint_url: MintUrl,
-        new_mint_url: MintUrl,
-    ) -> Result<(), Self::Err> {
-        // Update proofs table
-        {
-            let proofs = self
-                .get_proofs(Some(old_mint_url.clone()), None, None, None)
-                .await
-                .map_err(Error::from)?;
-
-            // Proofs with new url
-            let updated_proofs: Vec<ProofInfo> = proofs
-                .clone()
-                .into_iter()
-                .map(|mut p| {
-                    p.mint_url = new_mint_url.clone();
-                    p
-                })
-                .collect();
-
-            if !updated_proofs.is_empty() {
-                self.update_proofs(updated_proofs, vec![]).await?;
-            }
-        }
-
-        // Update mint quotes
-        {
-            let quotes = self.get_mint_quotes().await?;
-
-            let unix_time = unix_time();
-
-            let quotes: Vec<MintQuote> = quotes
-                .into_iter()
-                .filter_map(|mut q| {
-                    if q.expiry < unix_time {
-                        q.mint_url = new_mint_url.clone();
-                        Some(q)
-                    } else {
-                        None
-                    }
-                })
-                .collect();
-
-            for quote in quotes {
-                self.add_mint_quote(quote).await?;
-            }
-        }
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
-    async fn add_mint_keysets(
-        &self,
-        mint_url: MintUrl,
-        keysets: Vec<KeySetInfo>,
-    ) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        let mut existing_u32 = false;
-
-        {
-            let mut table = write_txn
-                .open_multimap_table(MINT_KEYSETS_TABLE)
-                .map_err(Error::from)?;
-            let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
-            let mut u32_table = write_txn
-                .open_table(KEYSET_U32_MAPPING)
-                .map_err(Error::from)?;
-
-            for keyset in keysets {
-                // Check if keyset already exists
-                let existing_keyset = {
-                    let existing_keyset = keysets_table
-                        .get(keyset.id.to_bytes().as_slice())
-                        .map_err(Error::from)?;
-
-                    existing_keyset.map(|r| r.value().to_string())
-                };
-
-                let existing = u32_table
-                    .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
-                    .map_err(Error::from)?;
-
-                match existing {
-                    None => existing_u32 = false,
-                    Some(id) => {
-                        let id = Id::from_str(id.value())?;
-
-                        if id == keyset.id {
-                            existing_u32 = false;
-                        } else {
-                            println!("Breaking here");
-                            existing_u32 = true;
-                            break;
-                        }
-                    }
-                }
-
-                let keyset = if let Some(existing_keyset) = existing_keyset {
-                    let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
-
-                    existing_keyset.active = keyset.active;
-                    existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
-
-                    existing_keyset
-                } else {
-                    table
-                        .insert(
-                            mint_url.to_string().as_str(),
-                            keyset.id.to_bytes().as_slice(),
-                        )
-                        .map_err(Error::from)?;
-
-                    keyset
-                };
-
-                keysets_table
-                    .insert(
-                        keyset.id.to_bytes().as_slice(),
-                        serde_json::to_string(&keyset)
-                            .map_err(Error::from)?
-                            .as_str(),
-                    )
-                    .map_err(Error::from)?;
-            }
-        }
-
-        if existing_u32 {
-            tracing::warn!("Keyset already exists for keyset id");
-            write_txn.abort().map_err(Error::from)?;
-
-            return Err(database::Error::Duplicate);
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
     async fn get_mint_keysets(
         &self,
         mint_url: MintUrl,
@@ -462,27 +303,6 @@ impl WalletDatabase for WalletRedbDatabase {
     }
 
     #[instrument(skip_all)]
-    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MINT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table
-                .insert(
-                    quote.id.as_str(),
-                    serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
-                )
-                .map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
     async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
         let table = read_txn
@@ -512,43 +332,6 @@ impl WalletDatabase for WalletRedbDatabase {
     }
 
     #[instrument(skip_all)]
-    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MINT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table.remove(quote_id).map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
-    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MELT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table
-                .insert(
-                    quote.id.as_str(),
-                    serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
-                )
-                .map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
     async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Error::from)?;
         let table = read_txn
@@ -577,74 +360,6 @@ impl WalletDatabase for WalletRedbDatabase {
             .collect())
     }
 
-    #[instrument(skip_all)]
-    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MELT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table.remove(quote_id).map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
-    async fn add_keys(&self, keyset: KeySet) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        keyset.verify_id()?;
-
-        let existing_keys;
-        let existing_u32;
-
-        {
-            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
-
-            existing_keys = table
-                .insert(
-                    keyset.id.to_string().as_str(),
-                    serde_json::to_string(&keyset.keys)
-                        .map_err(Error::from)?
-                        .as_str(),
-                )
-                .map_err(Error::from)?
-                .is_some();
-
-            let mut table = write_txn
-                .open_table(KEYSET_U32_MAPPING)
-                .map_err(Error::from)?;
-
-            let existing = table
-                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
-                .map_err(Error::from)?;
-
-            match existing {
-                None => existing_u32 = false,
-                Some(id) => {
-                    let id = Id::from_str(id.value())?;
-
-                    existing_u32 = id != keyset.id;
-                }
-            }
-        }
-
-        if existing_keys || existing_u32 {
-            tracing::warn!("Keys already exist for keyset id");
-            write_txn.abort().map_err(Error::from)?;
-
-            return Err(database::Error::Duplicate);
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
     #[instrument(skip(self), fields(keyset_id = %keyset_id))]
     async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Error::from)?;
@@ -660,261 +375,614 @@ impl WalletDatabase for WalletRedbDatabase {
         Ok(None)
     }
 
-    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
-    async fn remove_keys(&self, keyset_id: &Id) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+    #[instrument(skip_all)]
+    async fn get_proofs(
+        &self,
+        mint_url: Option<MintUrl>,
+        unit: Option<CurrencyUnit>,
+        state: Option<Vec<State>>,
+        spending_conditions: Option<Vec<SpendingConditions>>,
+    ) -> Result<Vec<ProofInfo>, Self::Err> {
+        let read_txn = self.db.begin_read().map_err(Error::from)?;
 
-        {
-            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
+        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
 
-            table
-                .remove(keyset_id.to_string().as_str())
-                .map_err(Error::from)?;
-        }
+        let proofs: Vec<ProofInfo> = table
+            .iter()
+            .map_err(Error::from)?
+            .flatten()
+            .filter_map(|(_k, v)| {
+                let mut proof = None;
 
-        write_txn.commit().map_err(Error::from)?;
+                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
+                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
+                    {
+                        proof = Some(proof_info)
+                    }
+                }
 
-        Ok(())
-    }
+                proof
+            })
+            .collect();
 
-    #[instrument(skip(self, added, deleted_ys))]
-    async fn update_proofs(
-        &self,
-        added: Vec<ProofInfo>,
-        deleted_ys: Vec<PublicKey>,
-    ) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+        Ok(proofs)
+    }
 
-        {
-            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+    async fn get_balance(
+        &self,
+        mint_url: Option<MintUrl>,
+        unit: Option<CurrencyUnit>,
+        state: Option<Vec<State>>,
+    ) -> Result<u64, database::Error> {
+        // For redb, we still need to fetch all proofs and sum them
+        // since redb doesn't have SQL aggregation
+        let proofs = self.get_proofs(mint_url, unit, state, None).await?;
+        Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
+    }
 
-            for proof_info in added.iter() {
-                table
-                    .insert(
-                        proof_info.y.to_bytes().as_slice(),
-                        serde_json::to_string(&proof_info)
-                            .map_err(Error::from)?
-                            .as_str(),
-                    )
-                    .map_err(Error::from)?;
-            }
+    #[instrument(skip(self))]
+    async fn get_transaction(
+        &self,
+        transaction_id: TransactionId,
+    ) -> Result<Option<Transaction>, Self::Err> {
+        let read_txn = self.db.begin_read().map_err(Error::from)?;
+        let table = read_txn
+            .open_table(TRANSACTIONS_TABLE)
+            .map_err(Error::from)?;
 
-            for y in deleted_ys.iter() {
-                table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
-            }
+        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
+            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
         }
-        write_txn.commit().map_err(Error::from)?;
 
-        Ok(())
+        Ok(None)
     }
 
-    #[instrument(skip_all)]
-    async fn get_proofs(
+    #[instrument(skip(self))]
+    async fn list_transactions(
         &self,
         mint_url: Option<MintUrl>,
+        direction: Option<TransactionDirection>,
         unit: Option<CurrencyUnit>,
-        state: Option<Vec<State>>,
-        spending_conditions: Option<Vec<SpendingConditions>>,
-    ) -> Result<Vec<ProofInfo>, Self::Err> {
+    ) -> Result<Vec<Transaction>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Error::from)?;
 
-        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        let table = read_txn
+            .open_table(TRANSACTIONS_TABLE)
+            .map_err(Error::from)?;
 
-        let proofs: Vec<ProofInfo> = table
+        let transactions: Vec<Transaction> = table
             .iter()
             .map_err(Error::from)?
             .flatten()
             .filter_map(|(_k, v)| {
-                let mut proof = None;
+                let mut transaction = None;
 
-                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
-                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
-                    {
-                        proof = Some(proof_info)
+                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
+                    if tx.matches_conditions(&mint_url, &direction, &unit) {
+                        transaction = Some(tx)
                     }
                 }
 
-                proof
+                transaction
             })
             .collect();
 
-        Ok(proofs)
+        Ok(transactions)
     }
 
-    async fn get_balance(
-        &self,
-        mint_url: Option<MintUrl>,
-        unit: Option<CurrencyUnit>,
-        state: Option<Vec<State>>,
-    ) -> Result<u64, database::Error> {
-        // For redb, we still need to fetch all proofs and sum them
-        // since redb doesn't have SQL aggregation
-        let proofs = self.get_proofs(mint_url, unit, state, None).await?;
-        Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
+    async fn begin_db_transaction<'a>(
+        &'a self,
+    ) -> Result<
+        Box<dyn cdk_common::database::WalletDatabaseTransaction<'a, Self::Err> + Send + Sync + 'a>,
+        Self::Err,
+    > {
+        let write_txn = self.db.begin_write().map_err(Error::from)?;
+        Ok(Box::new(RedbWalletTransaction::new(write_txn)))
     }
+}
 
-    async fn update_proofs_state(
-        &self,
-        ys: Vec<PublicKey>,
-        state: State,
+#[async_trait]
+impl<'a> cdk_common::database::WalletDatabaseTransaction<'a, database::Error>
+    for RedbWalletTransaction
+{
+    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
+    async fn get_keyset_by_id(
+        &mut self,
+        keyset_id: &Id,
+    ) -> Result<Option<KeySetInfo>, database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let table = txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
+
+        let result = match table
+            .get(keyset_id.to_bytes().as_slice())
+            .map_err(Error::from)?
+        {
+            Some(keyset) => {
+                let keyset: KeySetInfo =
+                    serde_json::from_str(keyset.value()).map_err(Error::from)?;
+
+                Ok(Some(keyset))
+            }
+            None => Ok(None),
+        };
+
+        result
+    }
+
+    #[instrument(skip(self))]
+    async fn add_mint(
+        &mut self,
+        mint_url: MintUrl,
+        mint_info: Option<MintInfo>,
     ) -> Result<(), database::Error> {
-        let read_txn = self.db.begin_read().map_err(Error::from)?;
-        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        let txn = self.txn()?;
+        let mut table = txn.open_table(MINTS_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                mint_url.to_string().as_str(),
+                serde_json::to_string(&mint_info)
+                    .map_err(Error::from)?
+                    .as_str(),
+            )
+            .map_err(Error::from)?;
+        Ok(())
+    }
 
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+    #[instrument(skip(self))]
+    async fn remove_mint(&mut self, mint_url: MintUrl) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(MINTS_TABLE).map_err(Error::from)?;
+        table
+            .remove(mint_url.to_string().as_str())
+            .map_err(Error::from)?;
+        Ok(())
+    }
 
-        for y in ys {
-            let y_slice = y.to_bytes();
-            let proof = table
-                .get(y_slice.as_slice())
-                .map_err(Error::from)?
-                .ok_or(Error::UnknownY)?;
+    #[instrument(skip(self))]
+    async fn update_mint_url(
+        &mut self,
+        old_mint_url: MintUrl,
+        new_mint_url: MintUrl,
+    ) -> Result<(), database::Error> {
+        // Update proofs table
+        {
+            let proofs = self
+                .get_proofs(Some(old_mint_url.clone()), None, None, None)
+                .await
+                .map_err(Error::from)?;
 
-            let mut proof_info =
-                serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
+            // Proofs with new url
+            let updated_proofs: Vec<ProofInfo> = proofs
+                .clone()
+                .into_iter()
+                .map(|mut p| {
+                    p.mint_url = new_mint_url.clone();
+                    p
+                })
+                .collect();
 
-            proof_info.state = state;
+            if !updated_proofs.is_empty() {
+                self.update_proofs(updated_proofs, vec![]).await?;
+            }
+        }
 
-            {
-                let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        // Update mint quotes
+        {
+            let read_txn = self.txn()?;
+            let mut table = read_txn
+                .open_table(MINT_QUOTES_TABLE)
+                .map_err(Error::from)?;
+
+            let unix_time = unix_time();
+
+            let quotes = table
+                .iter()
+                .map_err(Error::from)?
+                .flatten()
+                .filter_map(|(_, quote)| {
+                    let mut q: MintQuote = serde_json::from_str(quote.value())
+                        .inspect_err(|err| {
+                            tracing::warn!(
+                                "Failed to deserialize {}  with error {}",
+                                quote.value(),
+                                err
+                            )
+                        })
+                        .ok()?;
+                    if q.expiry < unix_time {
+                        q.mint_url = new_mint_url.clone();
+                        Some(q)
+                    } else {
+                        None
+                    }
+                })
+                .collect::<Vec<_>>();
+
+            for quote in quotes {
                 table
                     .insert(
-                        y_slice.as_slice(),
-                        serde_json::to_string(&proof_info)
-                            .map_err(Error::from)?
-                            .as_str(),
+                        quote.id.as_str(),
+                        serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
                     )
                     .map_err(Error::from)?;
             }
         }
 
-        write_txn.commit().map_err(Error::from)?;
-
         Ok(())
     }
 
-    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
-    async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<u32, Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+    #[instrument(skip(self))]
+    async fn add_mint_keysets(
+        &mut self,
+        mint_url: MintUrl,
+        keysets: Vec<KeySetInfo>,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn
+            .open_multimap_table(MINT_KEYSETS_TABLE)
+            .map_err(Error::from)?;
+        let mut keysets_table = txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
+        let mut u32_table = txn.open_table(KEYSET_U32_MAPPING).map_err(Error::from)?;
 
-        let current_counter;
-        let new_counter;
-        {
-            let table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
-            let counter = table
-                .get(keyset_id.to_string().as_str())
-                .map_err(Error::from)?;
+        let mut existing_u32 = false;
+
+        for keyset in keysets {
+            // Check if keyset already exists
+            let existing_keyset = {
+                let existing_keyset = keysets_table
+                    .get(keyset.id.to_bytes().as_slice())
+                    .map_err(Error::from)?;
 
-            current_counter = match counter {
-                Some(c) => c.value(),
-                None => 0,
+                existing_keyset.map(|r| r.value().to_string())
             };
 
-            new_counter = current_counter + count;
-        }
+            let existing = u32_table
+                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
+                .map_err(Error::from)?;
 
-        {
-            let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
+            match existing {
+                None => existing_u32 = false,
+                Some(id) => {
+                    let id = Id::from_str(id.value())?;
 
-            table
-                .insert(keyset_id.to_string().as_str(), new_counter)
-                .map_err(Error::from)?;
-        }
-        write_txn.commit().map_err(Error::from)?;
+                    if id == keyset.id {
+                        existing_u32 = false;
+                    } else {
+                        existing_u32 = true;
+                        break;
+                    }
+                }
+            }
 
-        Ok(new_counter)
-    }
+            let keyset = if let Some(existing_keyset) = existing_keyset {
+                let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
 
-    #[instrument(skip(self))]
-    async fn add_transaction(&self, transaction: Transaction) -> Result<(), Self::Err> {
-        let id = transaction.id();
+                existing_keyset.active = keyset.active;
+                existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
 
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+                existing_keyset
+            } else {
+                table
+                    .insert(
+                        mint_url.to_string().as_str(),
+                        keyset.id.to_bytes().as_slice(),
+                    )
+                    .map_err(Error::from)?;
 
-        {
-            let mut table = write_txn
-                .open_table(TRANSACTIONS_TABLE)
-                .map_err(Error::from)?;
-            table
+                keyset
+            };
+
+            keysets_table
                 .insert(
-                    id.as_slice(),
-                    serde_json::to_string(&transaction)
+                    keyset.id.to_bytes().as_slice(),
+                    serde_json::to_string(&keyset)
                         .map_err(Error::from)?
                         .as_str(),
                 )
                 .map_err(Error::from)?;
         }
 
-        write_txn.commit().map_err(Error::from)?;
+        if existing_u32 {
+            tracing::warn!("Keyset already exists for keyset id");
+            return Err(database::Error::Duplicate);
+        }
 
         Ok(())
     }
 
-    #[instrument(skip(self))]
-    async fn get_transaction(
-        &self,
-        transaction_id: TransactionId,
-    ) -> Result<Option<Transaction>, Self::Err> {
-        let read_txn = self.db.begin_read().map_err(Error::from)?;
-        let table = read_txn
-            .open_table(TRANSACTIONS_TABLE)
+    #[instrument(skip_all)]
+    async fn get_mint_quote(
+        &mut self,
+        quote_id: &str,
+    ) -> Result<Option<MintQuote>, database::Error> {
+        let txn = self.txn()?;
+        let table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+
+        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
+            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
+        }
+
+        Ok(None)
+    }
+
+    #[instrument(skip_all)]
+    async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                quote.id.as_str(),
+                serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
+            )
             .map_err(Error::from)?;
+        Ok(())
+    }
 
-        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
-            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
+    #[instrument(skip_all)]
+    async fn remove_mint_quote(&mut self, quote_id: &str) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+        table.remove(quote_id).map_err(Error::from)?;
+        Ok(())
+    }
+
+    #[instrument(skip_all)]
+    async fn get_melt_quote(
+        &mut self,
+        quote_id: &str,
+    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
+        let txn = self.txn()?;
+        let table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
+
+        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
+            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
         }
 
         Ok(None)
     }
 
-    #[instrument(skip(self))]
-    async fn list_transactions(
-        &self,
-        mint_url: Option<MintUrl>,
-        direction: Option<TransactionDirection>,
-        unit: Option<CurrencyUnit>,
-    ) -> Result<Vec<Transaction>, Self::Err> {
-        let read_txn = self.db.begin_read().map_err(Error::from)?;
+    #[instrument(skip_all)]
+    async fn add_melt_quote(&mut self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                quote.id.as_str(),
+                serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
+            )
+            .map_err(Error::from)?;
+        Ok(())
+    }
 
-        let table = read_txn
-            .open_table(TRANSACTIONS_TABLE)
+    #[instrument(skip_all)]
+    async fn remove_melt_quote(&mut self, quote_id: &str) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
+        table.remove(quote_id).map_err(Error::from)?;
+        Ok(())
+    }
+
+    #[instrument(skip_all)]
+    async fn add_keys(&mut self, keyset: KeySet) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+
+        keyset.verify_id()?;
+
+        let mut table = txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
+
+        let existing_keys = table
+            .insert(
+                keyset.id.to_string().as_str(),
+                serde_json::to_string(&keyset.keys)
+                    .map_err(Error::from)?
+                    .as_str(),
+            )
+            .map_err(Error::from)?
+            .is_some();
+
+        let mut table = txn.open_table(KEYSET_U32_MAPPING).map_err(Error::from)?;
+
+        let existing = table
+            .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
             .map_err(Error::from)?;
 
-        let transactions: Vec<Transaction> = table
+        let existing_u32 = match existing {
+            None => false,
+            Some(id) => {
+                let id = Id::from_str(id.value())?;
+                id != keyset.id
+            }
+        };
+
+        if existing_keys || existing_u32 {
+            tracing::warn!("Keys already exist for keyset id");
+            return Err(database::Error::Duplicate);
+        }
+
+        Ok(())
+    }
+
+    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
+    async fn remove_keys(&mut self, keyset_id: &Id) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
+
+        table
+            .remove(keyset_id.to_string().as_str())
+            .map_err(Error::from)?;
+
+        Ok(())
+    }
+
+    #[instrument(skip_all)]
+    async fn get_proofs(
+        &mut self,
+        mint_url: Option<MintUrl>,
+        unit: Option<CurrencyUnit>,
+        state: Option<Vec<State>>,
+        spending_conditions: Option<Vec<SpendingConditions>>,
+    ) -> Result<Vec<ProofInfo>, database::Error> {
+        let txn = self.txn()?;
+        let table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+
+        let proofs: Vec<ProofInfo> = table
             .iter()
             .map_err(Error::from)?
             .flatten()
             .filter_map(|(_k, v)| {
-                let mut transaction = None;
+                let mut proof = None;
 
-                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
-                    if tx.matches_conditions(&mint_url, &direction, &unit) {
-                        transaction = Some(tx)
+                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
+                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
+                    {
+                        proof = Some(proof_info)
                     }
                 }
 
-                transaction
+                proof
             })
             .collect();
 
-        Ok(transactions)
+        Ok(proofs)
     }
 
-    #[instrument(skip(self))]
-    async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+    #[instrument(skip(self, added, deleted_ys))]
+    async fn update_proofs(
+        &mut self,
+        added: Vec<ProofInfo>,
+        deleted_ys: Vec<PublicKey>,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
 
-        {
-            let mut table = write_txn
-                .open_table(TRANSACTIONS_TABLE)
+        for proof_info in added.iter() {
+            table
+                .insert(
+                    proof_info.y.to_bytes().as_slice(),
+                    serde_json::to_string(&proof_info)
+                        .map_err(Error::from)?
+                        .as_str(),
+                )
                 .map_err(Error::from)?;
+        }
+
+        for y in deleted_ys.iter() {
+            table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
+        }
+
+        Ok(())
+    }
+
+    async fn update_proofs_state(
+        &mut self,
+        ys: Vec<PublicKey>,
+        state: State,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+
+        for y in ys {
+            let y_slice = y.to_bytes();
+            let proof = table
+                .get(y_slice.as_slice())
+                .map_err(Error::from)?
+                .ok_or(Error::UnknownY)?;
+
+            let mut proof_info =
+                serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
+            drop(proof);
+
+            proof_info.state = state;
+
             table
-                .remove(transaction_id.as_slice())
+                .insert(
+                    y_slice.as_slice(),
+                    serde_json::to_string(&proof_info)
+                        .map_err(Error::from)?
+                        .as_str(),
+                )
                 .map_err(Error::from)?;
         }
 
-        write_txn.commit().map_err(Error::from)?;
+        Ok(())
+    }
+
+    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
+    async fn increment_keyset_counter(
+        &mut self,
+        keyset_id: &Id,
+        count: u32,
+    ) -> Result<u32, database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
+        let current_counter = table
+            .get(keyset_id.to_string().as_str())
+            .map_err(Error::from)?
+            .map(|x| x.value())
+            .unwrap_or_default();
+
+        let new_counter = current_counter + count;
+
+        table
+            .insert(keyset_id.to_string().as_str(), new_counter)
+            .map_err(Error::from)?;
+
+        Ok(new_counter)
+    }
+
+    #[instrument(skip(self))]
+    async fn add_transaction(&mut self, transaction: Transaction) -> Result<(), database::Error> {
+        let id = transaction.id();
+        let txn = self.txn()?;
+        let mut table = txn.open_table(TRANSACTIONS_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                id.as_slice(),
+                serde_json::to_string(&transaction)
+                    .map_err(Error::from)?
+                    .as_str(),
+            )
+            .map_err(Error::from)?;
+        Ok(())
+    }
+
+    #[instrument(skip(self))]
+    async fn remove_transaction(
+        &mut self,
+        transaction_id: TransactionId,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn()?;
+        let mut table = txn.open_table(TRANSACTIONS_TABLE).map_err(Error::from)?;
+        table
+            .remove(transaction_id.as_slice())
+            .map_err(Error::from)?;
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl cdk_common::database::DbTransactionFinalizer for RedbWalletTransaction {
+    type Err = database::Error;
+
+    async fn commit(mut self: Box<Self>) -> Result<(), database::Error> {
+        if let Some(txn) = self.write_txn.take() {
+            txn.commit().map_err(Error::from)?;
+        }
+        Ok(())
+    }
 
+    async fn rollback(mut self: Box<Self>) -> Result<(), database::Error> {
+        if let Some(txn) = self.write_txn.take() {
+            txn.abort().map_err(Error::from)?;
+        }
         Ok(())
     }
 }
+
+impl Drop for RedbWalletTransaction {
+    fn drop(&mut self) {
+        if let Some(txn) = self.write_txn.take() {
+            let _ = txn.abort();
+        }
+    }
+}

+ 2 - 2
crates/cdk-sql-common/src/mint/mod.rs

@@ -17,7 +17,7 @@ use async_trait::async_trait;
 use bitcoin::bip32::DerivationPath;
 use cdk_common::database::mint::{validate_kvstore_params, SagaDatabase, SagaTransaction};
 use cdk_common::database::{
-    self, ConversionError, Error, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction,
+    self, ConversionError, DbTransactionFinalizer, Error, MintDatabase, MintKeyDatabaseTransaction,
     MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
     MintSignatureTransaction, MintSignaturesDatabase,
 };
@@ -297,7 +297,7 @@ impl<RM> database::MintTransaction<'_, Error> for SQLTransaction<RM> where RM: D
 {}
 
 #[async_trait]
-impl<RM> MintDbWriterFinalizer for SQLTransaction<RM>
+impl<RM> DbTransactionFinalizer for SQLTransaction<RM>
 where
     RM: DatabasePool + 'static,
 {

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 686 - 290
crates/cdk-sql-common/src/wallet/mod.rs


+ 10 - 3
crates/cdk-sqlite/src/wallet/mod.rs

@@ -94,11 +94,15 @@ mod tests {
         let proof_info =
             ProofInfo::new(proof, mint_url.clone(), State::Unspent, CurrencyUnit::Sat).unwrap();
 
+        let mut tx = db.begin_db_transaction().await.expect("tx");
+
         // Store the proof in the database
-        db.update_proofs(vec![proof_info.clone()], vec![])
+        tx.update_proofs(vec![proof_info.clone()], vec![])
             .await
             .unwrap();
 
+        tx.commit().await.expect("commit");
+
         // Retrieve the proof from the database
         let retrieved_proofs = db
             .get_proofs(
@@ -154,6 +158,8 @@ mod tests {
             PaymentMethod::Custom("custom".to_string()),
         ];
 
+        let mut tx = db.begin_db_transaction().await.expect("begin");
+
         for (i, payment_method) in payment_methods.iter().enumerate() {
             let quote = MintQuote {
                 id: format!("test_quote_{}", i),
@@ -170,13 +176,14 @@ mod tests {
             };
 
             // Store the quote
-            db.add_mint_quote(quote.clone()).await.unwrap();
+            tx.add_mint_quote(quote.clone()).await.unwrap();
 
             // Retrieve and verify
-            let retrieved = db.get_mint_quote(&quote.id).await.unwrap().unwrap();
+            let retrieved = tx.get_mint_quote(&quote.id).await.unwrap().unwrap();
             assert_eq!(retrieved.payment_method, *payment_method);
             assert_eq!(retrieved.amount_issued, Amount::from(0));
             assert_eq!(retrieved.amount_paid, Amount::from(0));
         }
+        tx.commit().await.expect("commit");
     }
 }

+ 15 - 18
crates/cdk/src/wallet/auth/auth_wallet.rs

@@ -286,21 +286,25 @@ impl AuthWallet {
     /// Get Auth Token
     #[instrument(skip(self))]
     pub async fn get_blind_auth_token(&self) -> Result<Option<BlindAuthToken>, Error> {
-        let unspent = self.get_unspent_auth_proofs().await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        let unspent = tx
+            .get_proofs(
+                Some(self.mint_url.clone()),
+                Some(CurrencyUnit::Auth),
+                Some(vec![State::Unspent]),
+                None,
+            )
+            .await?;
 
         let auth_proof = match unspent.first() {
             Some(proof) => {
-                self.localstore
-                    .update_proofs(vec![], vec![proof.y()?])
-                    .await?;
-                proof
+                tx.update_proofs(vec![], vec![proof.proof.y()?]).await?;
+                proof.proof.clone().try_into()?
             }
             None => return Ok(None),
         };
 
-        Ok(Some(BlindAuthToken {
-            auth_proof: auth_proof.clone(),
-        }))
+        Ok(Some(BlindAuthToken { auth_proof }))
     }
 
     /// Auth for request
@@ -337,15 +341,6 @@ impl AuthWallet {
     #[instrument(skip(self))]
     pub async fn mint_blind_auth(&self, amount: Amount) -> Result<Proofs, Error> {
         tracing::debug!("Minting {} blind auth proofs", amount);
-        // Check that mint is in store of mints
-        if self
-            .localstore
-            .get_mint(self.mint_url.clone())
-            .await?
-            .is_none()
-        {
-            self.get_mint_info().await?;
-        }
 
         let auth_token = self.auth_client.get_auth_token().await?;
 
@@ -455,7 +450,9 @@ impl AuthWallet {
             .collect::<Result<Vec<ProofInfo>, _>>()?;
 
         // Add new proofs to store
-        self.localstore.update_proofs(proof_infos, vec![]).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.update_proofs(proof_infos, vec![]).await?;
+        tx.commit().await?;
 
         Ok(proofs)
     }

+ 38 - 26
crates/cdk/src/wallet/issue/issue_bolt11.rs

@@ -92,7 +92,9 @@ impl Wallet {
             Some(secret_key),
         );
 
-        self.localstore.add_mint_quote(quote.clone()).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.add_mint_quote(quote.clone()).await?;
+        tx.commit().await?;
 
         Ok(quote)
     }
@@ -105,18 +107,22 @@ impl Wallet {
     ) -> Result<MintQuoteBolt11Response<String>, Error> {
         let response = self.client.get_mint_quote_status(quote_id).await?;
 
-        match self.localstore.get_mint_quote(quote_id).await? {
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
+        match tx.get_mint_quote(quote_id).await? {
             Some(quote) => {
                 let mut quote = quote;
 
                 quote.state = response.state;
-                self.localstore.add_mint_quote(quote).await?;
+                tx.add_mint_quote(quote).await?;
             }
             None => {
                 tracing::info!("Quote mint {} unknown", quote_id);
             }
         }
 
+        tx.commit().await?;
+
         Ok(response)
     }
 
@@ -135,7 +141,9 @@ impl Wallet {
                     .await?;
                 total_amount += proofs.total_amount()?;
             } else if mint_quote.expiry.le(&unix_time()) {
-                self.localstore.remove_mint_quote(&mint_quote.id).await?;
+                let mut tx = self.localstore.begin_db_transaction().await?;
+                tx.remove_mint_quote(&mint_quote.id).await?;
+                tx.commit().await?;
             }
         }
         Ok(total_amount)
@@ -194,8 +202,8 @@ impl Wallet {
         amount_split_target: SplitTarget,
         spending_conditions: Option<SpendingConditions>,
     ) -> Result<Proofs, Error> {
-        let quote_info = self
-            .localstore
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        let quote_info = tx
             .get_mint_quote(quote_id)
             .await?
             .ok_or(Error::UnknownQuote)?;
@@ -243,8 +251,7 @@ impl Wallet {
                 );
 
                 // Atomically get the counter range we need
-                let new_counter = self
-                    .localstore
+                let new_counter = tx
                     .increment_keyset_counter(&active_keyset_id, num_secrets)
                     .await?;
 
@@ -271,6 +278,8 @@ impl Wallet {
             request.sign(secret_key)?;
         }
 
+        tx.commit().await?;
+
         let mint_res = self.client.post_mint(request).await?;
 
         let keys = self.load_keyset_keys(active_keyset_id).await?;
@@ -294,8 +303,10 @@ impl Wallet {
             &keys,
         )?;
 
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
         // Remove filled quote from store
-        self.localstore.remove_mint_quote(&quote_info.id).await?;
+        tx.remove_mint_quote(&quote_info.id).await?;
 
         let proof_infos = proofs
             .iter()
@@ -310,25 +321,26 @@ impl Wallet {
             .collect::<Result<Vec<ProofInfo>, _>>()?;
 
         // Add new proofs to store
-        self.localstore.update_proofs(proof_infos, vec![]).await?;
+        tx.update_proofs(proof_infos, vec![]).await?;
 
         // Add transaction to store
-        self.localstore
-            .add_transaction(Transaction {
-                mint_url: self.mint_url.clone(),
-                direction: TransactionDirection::Incoming,
-                amount: proofs.total_amount()?,
-                fee: Amount::ZERO,
-                unit: self.unit.clone(),
-                ys: proofs.ys()?,
-                timestamp: unix_time,
-                memo: None,
-                metadata: HashMap::new(),
-                quote_id: Some(quote_id.to_string()),
-                payment_request: Some(quote_info.request),
-                payment_proof: None,
-            })
-            .await?;
+        tx.add_transaction(Transaction {
+            mint_url: self.mint_url.clone(),
+            direction: TransactionDirection::Incoming,
+            amount: proofs.total_amount()?,
+            fee: Amount::ZERO,
+            unit: self.unit.clone(),
+            ys: proofs.ys()?,
+            timestamp: unix_time,
+            memo: None,
+            metadata: HashMap::new(),
+            quote_id: Some(quote_id.to_string()),
+            payment_request: Some(quote_info.request),
+            payment_proof: None,
+        })
+        .await?;
+
+        tx.commit().await?;
 
         Ok(proofs)
     }

+ 53 - 32
crates/cdk/src/wallet/issue/issue_bolt12.rs

@@ -32,10 +32,14 @@ impl Wallet {
         // If we have a description, we check that the mint supports it.
         if description.is_some() {
             let mint_method_settings = self
-                .localstore
-                .get_mint(mint_url.clone())
+                .metadata_cache
+                .load(&self.localstore, &self.client, {
+                    let ttl = self.metadata_cache_ttl.read();
+                    *ttl
+                })
                 .await?
-                .ok_or(Error::IncorrectMint)?
+                .mint_info
+                .clone()
                 .nuts
                 .nut04
                 .get_settings(unit, &crate::nuts::PaymentMethod::Bolt12)
@@ -69,7 +73,9 @@ impl Wallet {
             Some(secret_key),
         );
 
-        self.localstore.add_mint_quote(quote.clone()).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.add_mint_quote(quote.clone()).await?;
+        tx.commit().await?;
 
         Ok(quote)
     }
@@ -83,7 +89,8 @@ impl Wallet {
         amount_split_target: SplitTarget,
         spending_conditions: Option<SpendingConditions>,
     ) -> Result<Proofs, Error> {
-        let quote_info = self.localstore.get_mint_quote(quote_id).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        let quote_info = tx.get_mint_quote(quote_id).await?;
 
         let quote_info = if let Some(quote) = quote_info {
             if quote.expiry.le(&unix_time()) && quote.expiry.ne(&0) {
@@ -100,14 +107,21 @@ impl Wallet {
             .get_keyset_fees_and_amounts_by_id(active_keyset_id)
             .await?;
 
-        let amount = match amount {
-            Some(amount) => amount,
+        let (mut tx, quote_info, amount) = match amount {
+            Some(amount) => (tx, quote_info, amount),
             None => {
                 // If an amount it not supplied with check the status of the quote
                 // The mint will tell us how much can be minted
+                tx.commit().await?;
                 let state = self.mint_bolt12_quote_state(quote_id).await?;
 
-                state.amount_paid - state.amount_issued
+                let mut tx = self.localstore.begin_db_transaction().await?;
+                let quote_info = tx
+                    .get_mint_quote(quote_id)
+                    .await?
+                    .ok_or(Error::UnknownQuote)?;
+
+                (tx, quote_info, state.amount_paid - state.amount_issued)
             }
         };
 
@@ -136,8 +150,7 @@ impl Wallet {
                 );
 
                 // Atomically get the counter range we need
-                let new_counter = self
-                    .localstore
+                let new_counter = tx
                     .increment_keyset_counter(&active_keyset_id, num_secrets)
                     .await?;
 
@@ -167,8 +180,12 @@ impl Wallet {
             return Err(Error::SignatureMissingOrInvalid);
         }
 
+        tx.commit().await?;
+
         let mint_res = self.client.post_mint(request).await?;
 
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
         let keys = self.load_keyset_keys(active_keyset_id).await?;
 
         // Verify the signature DLEQ is valid
@@ -191,14 +208,13 @@ impl Wallet {
         )?;
 
         // Remove filled quote from store
-        let mut quote_info = self
-            .localstore
+        let mut quote_info = tx
             .get_mint_quote(quote_id)
             .await?
             .ok_or(Error::UnpaidQuote)?;
         quote_info.amount_issued += proofs.total_amount()?;
 
-        self.localstore.add_mint_quote(quote_info.clone()).await?;
+        tx.add_mint_quote(quote_info.clone()).await?;
 
         let proof_infos = proofs
             .iter()
@@ -213,25 +229,26 @@ impl Wallet {
             .collect::<Result<Vec<ProofInfo>, _>>()?;
 
         // Add new proofs to store
-        self.localstore.update_proofs(proof_infos, vec![]).await?;
+        tx.update_proofs(proof_infos, vec![]).await?;
 
         // Add transaction to store
-        self.localstore
-            .add_transaction(Transaction {
-                mint_url: self.mint_url.clone(),
-                direction: TransactionDirection::Incoming,
-                amount: proofs.total_amount()?,
-                fee: Amount::ZERO,
-                unit: self.unit.clone(),
-                ys: proofs.ys()?,
-                timestamp: unix_time(),
-                memo: None,
-                metadata: HashMap::new(),
-                quote_id: Some(quote_id.to_string()),
-                payment_request: Some(quote_info.request),
-                payment_proof: None,
-            })
-            .await?;
+        tx.add_transaction(Transaction {
+            mint_url: self.mint_url.clone(),
+            direction: TransactionDirection::Incoming,
+            amount: proofs.total_amount()?,
+            fee: Amount::ZERO,
+            unit: self.unit.clone(),
+            ys: proofs.ys()?,
+            timestamp: unix_time(),
+            memo: None,
+            metadata: HashMap::new(),
+            quote_id: Some(quote_id.to_string()),
+            payment_request: Some(quote_info.request),
+            payment_proof: None,
+        })
+        .await?;
+
+        tx.commit().await?;
 
         Ok(proofs)
     }
@@ -244,19 +261,23 @@ impl Wallet {
     ) -> Result<MintQuoteBolt12Response<String>, Error> {
         let response = self.client.get_mint_quote_bolt12_status(quote_id).await?;
 
-        match self.localstore.get_mint_quote(quote_id).await? {
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
+        match tx.get_mint_quote(quote_id).await? {
             Some(quote) => {
                 let mut quote = quote;
                 quote.amount_issued = response.amount_issued;
                 quote.amount_paid = response.amount_paid;
 
-                self.localstore.add_mint_quote(quote).await?;
+                tx.add_mint_quote(quote).await?;
             }
             None => {
                 tracing::info!("Quote mint {} unknown", quote_id);
             }
         }
 
+        tx.commit().await?;
+
         Ok(response)
     }
 }

+ 38 - 31
crates/cdk/src/wallet/melt/melt_bolt11.rs

@@ -89,7 +89,9 @@ impl Wallet {
             payment_method: PaymentMethod::Bolt11,
         };
 
-        self.localstore.add_melt_quote(quote.clone()).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.add_melt_quote(quote.clone()).await?;
+        tx.commit().await?;
 
         Ok(quote)
     }
@@ -102,25 +104,29 @@ impl Wallet {
     ) -> Result<MeltQuoteBolt11Response<String>, Error> {
         let response = self.client.get_melt_quote_status(quote_id).await?;
 
-        match self.localstore.get_melt_quote(quote_id).await? {
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
+        match tx.get_melt_quote(quote_id).await? {
             Some(quote) => {
                 let mut quote = quote;
 
                 if let Err(e) = self
-                    .add_transaction_for_pending_melt(&quote, &response)
+                    .add_transaction_for_pending_melt(&mut tx, &quote, &response)
                     .await
                 {
                     tracing::error!("Failed to add transaction for pending melt: {}", e);
                 }
 
                 quote.state = response.state;
-                self.localstore.add_melt_quote(quote).await?;
+                tx.add_melt_quote(quote).await?;
             }
             None => {
                 tracing::info!("Quote melt {} unknown", quote_id);
             }
         }
 
+        tx.commit().await?;
+
         Ok(response)
     }
 
@@ -139,8 +145,8 @@ impl Wallet {
         proofs: Proofs,
         metadata: HashMap<String, String>,
     ) -> Result<Melted, Error> {
-        let quote_info = self
-            .localstore
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        let quote_info = tx
             .get_melt_quote(quote_id)
             .await?
             .ok_or(Error::UnknownQuote)?;
@@ -156,9 +162,7 @@ impl Wallet {
         }
 
         let ys = proofs.ys()?;
-        self.localstore
-            .update_proofs_state(ys, State::Pending)
-            .await?;
+        tx.update_proofs_state(ys, State::Pending).await?;
 
         let active_keyset_id = self.fetch_active_keyset().await?.id;
 
@@ -179,8 +183,7 @@ impl Wallet {
             );
 
             // Atomically get the counter range we need
-            let new_counter = self
-                .localstore
+            let new_counter = tx
                 .increment_keyset_counter(&active_keyset_id, num_secrets)
                 .await?;
 
@@ -195,6 +198,8 @@ impl Wallet {
             Some(premint_secrets.blinded_messages()),
         );
 
+        tx.commit().await?;
+
         let melt_response = match quote_info.payment_method {
             cdk_common::PaymentMethod::Bolt11 => {
                 self.try_proof_operation_or_reclaim(
@@ -274,30 +279,32 @@ impl Wallet {
             None => Vec::new(),
         };
 
-        self.localstore.remove_melt_quote(&quote_info.id).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
+        tx.remove_melt_quote(&quote_info.id).await?;
 
         let deleted_ys = proofs.ys()?;
-        self.localstore
-            .update_proofs(change_proof_infos, deleted_ys)
-            .await?;
+
+        tx.update_proofs(change_proof_infos, deleted_ys).await?;
 
         // Add transaction to store
-        self.localstore
-            .add_transaction(Transaction {
-                mint_url: self.mint_url.clone(),
-                direction: TransactionDirection::Outgoing,
-                amount: melted.amount,
-                fee: melted.fee_paid,
-                unit: self.unit.clone(),
-                ys: proofs.ys()?,
-                timestamp: unix_time(),
-                memo: None,
-                metadata,
-                quote_id: Some(quote_id.to_string()),
-                payment_request: Some(quote_info.request),
-                payment_proof: payment_preimage,
-            })
-            .await?;
+        tx.add_transaction(Transaction {
+            mint_url: self.mint_url.clone(),
+            direction: TransactionDirection::Outgoing,
+            amount: melted.amount,
+            fee: melted.fee_paid,
+            unit: self.unit.clone(),
+            ys: proofs.ys()?,
+            timestamp: unix_time(),
+            memo: None,
+            metadata,
+            quote_id: Some(quote_id.to_string()),
+            payment_request: Some(quote_info.request),
+            payment_proof: payment_preimage,
+        })
+        .await?;
+
+        tx.commit().await?;
 
         Ok(melted)
     }

+ 10 - 4
crates/cdk/src/wallet/melt/melt_bolt12.rs

@@ -61,7 +61,9 @@ impl Wallet {
             payment_method: PaymentMethod::Bolt12,
         };
 
-        self.localstore.add_melt_quote(quote.clone()).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.add_melt_quote(quote.clone()).await?;
+        tx.commit().await?;
 
         Ok(quote)
     }
@@ -74,25 +76,29 @@ impl Wallet {
     ) -> Result<MeltQuoteBolt11Response<String>, Error> {
         let response = self.client.get_melt_bolt12_quote_status(quote_id).await?;
 
-        match self.localstore.get_melt_quote(quote_id).await? {
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
+        match tx.get_melt_quote(quote_id).await? {
             Some(quote) => {
                 let mut quote = quote;
 
                 if let Err(e) = self
-                    .add_transaction_for_pending_melt(&quote, &response)
+                    .add_transaction_for_pending_melt(&mut tx, &quote, &response)
                     .await
                 {
                     tracing::error!("Failed to add transaction for pending melt: {}", e);
                 }
 
                 quote.state = response.state;
-                self.localstore.add_melt_quote(quote).await?;
+                tx.add_melt_quote(quote).await?;
             }
             None => {
                 tracing::info!("Quote melt {} unknown", quote_id);
             }
         }
 
+        tx.commit().await?;
+
         Ok(response)
     }
 }

+ 24 - 21
crates/cdk/src/wallet/melt/mod.rs

@@ -1,8 +1,9 @@
 use std::collections::HashMap;
 
+use cdk_common::database::DynWalletDatabaseTransaction;
 use cdk_common::util::unix_time;
 use cdk_common::wallet::{MeltQuote, Transaction, TransactionDirection};
-use cdk_common::{Error, MeltQuoteBolt11Response, MeltQuoteState, ProofsMethods};
+use cdk_common::{Error, MeltQuoteBolt11Response, MeltQuoteState, ProofsMethods, State};
 use tracing::instrument;
 
 use crate::Wallet;
@@ -48,6 +49,7 @@ impl Wallet {
 
     pub(crate) async fn add_transaction_for_pending_melt(
         &self,
+        tx: &mut DynWalletDatabaseTransaction<'_>,
         quote: &MeltQuote,
         response: &MeltQuoteBolt11Response<String>,
     ) -> Result<(), Error> {
@@ -59,29 +61,30 @@ impl Wallet {
                 response.state
             );
             if response.state == MeltQuoteState::Paid {
-                let pending_proofs = self.get_pending_proofs().await?;
+                let pending_proofs = self
+                    .get_proofs_with(Some(tx), Some(vec![State::Pending]), None)
+                    .await?;
                 let proofs_total = pending_proofs.total_amount().unwrap_or_default();
                 let change_total = response.change_amount().unwrap_or_default();
 
-                self.localstore
-                    .add_transaction(Transaction {
-                        mint_url: self.mint_url.clone(),
-                        direction: TransactionDirection::Outgoing,
-                        amount: response.amount,
-                        fee: proofs_total
-                            .checked_sub(response.amount)
-                            .and_then(|amt| amt.checked_sub(change_total))
-                            .unwrap_or_default(),
-                        unit: quote.unit.clone(),
-                        ys: pending_proofs.ys()?,
-                        timestamp: unix_time(),
-                        memo: None,
-                        metadata: HashMap::new(),
-                        quote_id: Some(quote.id.clone()),
-                        payment_request: Some(quote.request.clone()),
-                        payment_proof: response.payment_preimage.clone(),
-                    })
-                    .await?;
+                tx.add_transaction(Transaction {
+                    mint_url: self.mint_url.clone(),
+                    direction: TransactionDirection::Outgoing,
+                    amount: response.amount,
+                    fee: proofs_total
+                        .checked_sub(response.amount)
+                        .and_then(|amt| amt.checked_sub(change_total))
+                        .unwrap_or_default(),
+                    unit: quote.unit.clone(),
+                    ys: pending_proofs.ys()?,
+                    timestamp: unix_time(),
+                    memo: None,
+                    metadata: HashMap::new(),
+                    quote_id: Some(quote.id.clone()),
+                    payment_request: Some(quote.request.clone()),
+                    payment_proof: response.payment_preimage.clone(),
+                })
+                .await?;
             }
         }
         Ok(())

+ 15 - 6
crates/cdk/src/wallet/mint_metadata_cache.rs

@@ -492,9 +492,18 @@ impl MintMetadataCache {
             versions.insert(storage_id, metadata.status.version);
         }
 
+        let mut tx = if let Ok(ok) = storage
+            .begin_db_transaction()
+            .await
+            .inspect_err(|err| tracing::warn!("Could not begin database transaction: {err}"))
+        {
+            ok
+        } else {
+            return;
+        };
+
         // Save mint info
-        storage
-            .add_mint(mint_url.clone(), Some(metadata.mint_info.clone()))
+        tx.add_mint(mint_url.clone(), Some(metadata.mint_info.clone()))
             .await
             .inspect_err(|e| tracing::warn!("Failed to save mint info for {}: {}", mint_url, e))
             .ok();
@@ -503,8 +512,7 @@ impl MintMetadataCache {
         let keysets: Vec<_> = metadata.keysets.values().map(|ks| (**ks).clone()).collect();
 
         if !keysets.is_empty() {
-            storage
-                .add_mint_keysets(mint_url.clone(), keysets)
+            tx.add_mint_keysets(mint_url.clone(), keysets)
                 .await
                 .inspect_err(|e| tracing::warn!("Failed to save keysets for {}: {}", mint_url, e))
                 .ok();
@@ -529,8 +537,7 @@ impl MintMetadataCache {
                     keys: (**keys).clone(),
                 };
 
-                storage
-                    .add_keys(keyset)
+                tx.add_keys(keyset)
                     .await
                     .inspect_err(|e| {
                         tracing::warn!(
@@ -543,6 +550,8 @@ impl MintMetadataCache {
                     .ok();
             }
         }
+
+        let _ = tx.commit().await.ok();
     }
 
     /// Fetch fresh metadata from mint HTTP API and update cache

+ 19 - 13
crates/cdk/src/wallet/mod.rs

@@ -8,7 +8,7 @@ use std::sync::Arc;
 use std::time::Duration;
 
 use cdk_common::amount::FeeAndAmounts;
-use cdk_common::database::{self, WalletDatabase};
+use cdk_common::database::{self, DynWalletDatabaseTransaction, WalletDatabase};
 use cdk_common::parking_lot::RwLock;
 use cdk_common::subscription::WalletParams;
 use getrandom::getrandom;
@@ -271,9 +271,10 @@ impl Wallet {
     #[instrument(skip(self))]
     pub async fn update_mint_url(&mut self, new_mint_url: MintUrl) -> Result<(), Error> {
         // Update the mint URL in the wallet DB
-        self.localstore
-            .update_mint_url(self.mint_url.clone(), new_mint_url.clone())
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.update_mint_url(self.mint_url.clone(), new_mint_url.clone())
             .await?;
+        tx.commit().await?;
 
         // Update the mint URL in the wallet struct field
         self.mint_url = new_mint_url;
@@ -367,12 +368,15 @@ impl Wallet {
     }
 
     /// Get amounts needed to refill proof state
-    #[instrument(skip(self))]
-    pub async fn amounts_needed_for_state_target(
+    #[instrument(skip(self, tx))]
+    pub(crate) async fn amounts_needed_for_state_target(
         &self,
+        tx: &mut DynWalletDatabaseTransaction<'_>,
         fee_and_amounts: &FeeAndAmounts,
     ) -> Result<Vec<Amount>, Error> {
-        let unspent_proofs = self.get_unspent_proofs().await?;
+        let unspent_proofs = self
+            .get_proofs_with(Some(tx), Some(vec![State::Unspent]), None)
+            .await?;
 
         let amounts_count: HashMap<u64, u64> =
             unspent_proofs
@@ -402,14 +406,15 @@ impl Wallet {
     }
 
     /// Determine [`SplitTarget`] for amount based on state
-    #[instrument(skip(self))]
+    #[instrument(skip(self, tx))]
     async fn determine_split_target_values(
         &self,
+        tx: &mut DynWalletDatabaseTransaction<'_>,
         change_amount: Amount,
         fee_and_amounts: &FeeAndAmounts,
     ) -> Result<SplitTarget, Error> {
         let mut amounts_needed_refill = self
-            .amounts_needed_for_state_target(fee_and_amounts)
+            .amounts_needed_for_state_target(tx, fee_and_amounts)
             .await?;
 
         amounts_needed_refill.sort();
@@ -495,9 +500,10 @@ impl Wallet {
 
                 tracing::debug!("Restored {} proofs", proofs.len());
 
-                self.localstore
-                    .increment_keyset_counter(&keyset.id, proofs.len() as u32)
+                let mut tx = self.localstore.begin_db_transaction().await?;
+                tx.increment_keyset_counter(&keyset.id, proofs.len() as u32)
                     .await?;
+                tx.commit().await?;
 
                 let states = self.check_proofs_spent(proofs.clone()).await?;
 
@@ -523,9 +529,9 @@ impl Wallet {
                     })
                     .collect::<Result<Vec<ProofInfo>, _>>()?;
 
-                self.localstore
-                    .update_proofs(unspent_proofs, vec![])
-                    .await?;
+                let mut tx = self.localstore.begin_db_transaction().await?;
+                tx.update_proofs(unspent_proofs, vec![]).await?;
+                tx.commit().await?;
 
                 empty_batch = 0;
                 start_counter += 100;

+ 45 - 26
crates/cdk/src/wallet/proofs.rs

@@ -1,6 +1,7 @@
 use std::collections::{HashMap, HashSet};
 
 use cdk_common::amount::KeysetFeeAndAmounts;
+use cdk_common::database::DynWalletDatabaseTransaction;
 use cdk_common::wallet::TransactionId;
 use cdk_common::Id;
 use tracing::instrument;
@@ -18,38 +19,40 @@ impl Wallet {
     /// Get unspent proofs for mint
     #[instrument(skip(self))]
     pub async fn get_unspent_proofs(&self) -> Result<Proofs, Error> {
-        self.get_proofs_with(Some(vec![State::Unspent]), None).await
+        self.get_proofs_with(None, Some(vec![State::Unspent]), None)
+            .await
     }
 
     /// Get pending [`Proofs`]
     #[instrument(skip(self))]
     pub async fn get_pending_proofs(&self) -> Result<Proofs, Error> {
-        self.get_proofs_with(Some(vec![State::Pending]), None).await
+        self.get_proofs_with(None, Some(vec![State::Pending]), None)
+            .await
     }
 
     /// Get reserved [`Proofs`]
     #[instrument(skip(self))]
     pub async fn get_reserved_proofs(&self) -> Result<Proofs, Error> {
-        self.get_proofs_with(Some(vec![State::Reserved]), None)
+        self.get_proofs_with(None, Some(vec![State::Reserved]), None)
             .await
     }
 
     /// Get pending spent [`Proofs`]
     #[instrument(skip(self))]
     pub async fn get_pending_spent_proofs(&self) -> Result<Proofs, Error> {
-        self.get_proofs_with(Some(vec![State::PendingSpent]), None)
+        self.get_proofs_with(None, Some(vec![State::PendingSpent]), None)
             .await
     }
 
     /// Get this wallet's [Proofs] that match the args
     pub async fn get_proofs_with(
         &self,
+        tx: Option<&mut DynWalletDatabaseTransaction<'_>>,
         state: Option<Vec<State>>,
         spending_conditions: Option<Vec<SpendingConditions>>,
     ) -> Result<Proofs, Error> {
-        Ok(self
-            .localstore
-            .get_proofs(
+        Ok(if let Some(tx) = tx {
+            tx.get_proofs(
                 Some(self.mint_url.clone()),
                 Some(self.unit.clone()),
                 state,
@@ -58,16 +61,28 @@ impl Wallet {
             .await?
             .into_iter()
             .map(|p| p.proof)
-            .collect())
+            .collect()
+        } else {
+            self.localstore
+                .get_proofs(
+                    Some(self.mint_url.clone()),
+                    Some(self.unit.clone()),
+                    state,
+                    spending_conditions,
+                )
+                .await?
+                .into_iter()
+                .map(|p| p.proof)
+                .collect()
+        })
     }
 
     /// Return proofs to unspent allowing them to be selected and spent
     #[instrument(skip(self))]
     pub async fn unreserve_proofs(&self, ys: Vec<PublicKey>) -> Result<(), Error> {
-        Ok(self
-            .localstore
-            .update_proofs_state(ys, State::Unspent)
-            .await?)
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.update_proofs_state(ys, State::Unspent).await?;
+        Ok(tx.commit().await?)
     }
 
     /// Reclaim unspent proofs
@@ -93,13 +108,14 @@ impl Wallet {
 
         self.swap(None, SplitTarget::default(), unspent, None, false)
             .await?;
-
-        match self.localstore.remove_transaction(transaction_id).await {
-            Ok(_) => (),
-            Err(e) => {
-                tracing::warn!("Failed to remove transaction: {:?}", e);
-            }
-        }
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        let _ = tx
+            .remove_transaction(transaction_id)
+            .await
+            .inspect_err(|err| {
+                tracing::warn!("Failed to remove transaction: {:?}", err);
+            });
+        tx.commit().await?;
 
         Ok(())
     }
@@ -121,7 +137,9 @@ impl Wallet {
             })
             .collect();
 
-        self.localstore.update_proofs(vec![], spent_ys).await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.update_proofs(vec![], spent_ys).await?;
+        tx.commit().await?;
 
         Ok(spendable.states)
     }
@@ -165,12 +183,13 @@ impl Wallet {
 
         let amount = Amount::try_sum(pending_proofs.iter().map(|p| p.proof.amount))?;
 
-        self.localstore
-            .update_proofs(
-                vec![],
-                non_pending_proofs.into_iter().map(|p| p.y).collect(),
-            )
-            .await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.update_proofs(
+            vec![],
+            non_pending_proofs.into_iter().map(|p| p.y).collect(),
+        )
+        .await?;
+        tx.commit().await?;
 
         balance += amount;
 

+ 29 - 28
crates/cdk/src/wallet/receive.rs

@@ -110,12 +110,12 @@ impl Wallet {
             .into_iter()
             .map(|p| ProofInfo::new(p, self.mint_url.clone(), State::Pending, self.unit.clone()))
             .collect::<Result<Vec<ProofInfo>, _>>()?;
-        self.localstore
-            .update_proofs(proofs_info.clone(), vec![])
-            .await?;
+
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.update_proofs(proofs_info.clone(), vec![]).await?;
 
         let mut pre_swap = self
-            .create_swap(None, opts.amount_split_target, proofs, None, false)
+            .create_swap(tx, None, opts.amount_split_target, proofs, None, false)
             .await?;
 
         if sig_flag.eq(&SigFlag::SigAll) {
@@ -141,8 +141,8 @@ impl Wallet {
             &keys,
         )?;
 
-        self.localstore
-            .increment_keyset_counter(&active_keyset_id, recv_proofs.len() as u32)
+        let mut tx = self.localstore.begin_db_transaction().await?;
+        tx.increment_keyset_counter(&active_keyset_id, recv_proofs.len() as u32)
             .await?;
 
         let total_amount = recv_proofs.total_amount()?;
@@ -151,30 +151,31 @@ impl Wallet {
             .into_iter()
             .map(|proof| ProofInfo::new(proof, mint_url.clone(), State::Unspent, self.unit.clone()))
             .collect::<Result<Vec<ProofInfo>, _>>()?;
-        self.localstore
-            .update_proofs(
-                recv_proof_infos,
-                proofs_info.into_iter().map(|p| p.y).collect(),
-            )
-            .await?;
+
+        tx.update_proofs(
+            recv_proof_infos,
+            proofs_info.into_iter().map(|p| p.y).collect(),
+        )
+        .await?;
 
         // Add transaction to store
-        self.localstore
-            .add_transaction(Transaction {
-                mint_url: self.mint_url.clone(),
-                direction: TransactionDirection::Incoming,
-                amount: total_amount,
-                fee: proofs_amount - total_amount,
-                unit: self.unit.clone(),
-                ys: proofs_ys,
-                timestamp: unix_time(),
-                memo,
-                metadata: opts.metadata,
-                quote_id: None,
-                payment_request: None,
-                payment_proof: None,
-            })
-            .await?;
+        tx.add_transaction(Transaction {
+            mint_url: self.mint_url.clone(),
+            direction: TransactionDirection::Incoming,
+            amount: total_amount,
+            fee: proofs_amount - total_amount,
+            unit: self.unit.clone(),
+            ys: proofs_ys,
+            timestamp: unix_time(),
+            memo,
+            metadata: opts.metadata,
+            quote_id: None,
+            payment_request: None,
+            payment_proof: None,
+        })
+        .await?;
+
+        tx.commit().await?;
 
         Ok(total_amount)
     }

+ 12 - 9
crates/cdk/src/wallet/reclaim.rs

@@ -45,6 +45,8 @@ impl Wallet {
             .await?
             .states;
 
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
         for (state, unspent) in proofs
             .into_iter()
             .zip(statuses)
@@ -54,17 +56,18 @@ impl Wallet {
                 acc
             })
         {
-            self.localstore
-                .update_proofs_state(
-                    unspent
-                        .iter()
-                        .map(|x| x.y())
-                        .collect::<Result<Vec<_>, _>>()?,
-                    state,
-                )
-                .await?;
+            tx.update_proofs_state(
+                unspent
+                    .iter()
+                    .map(|x| x.y())
+                    .collect::<Result<Vec<_>, _>>()?,
+                state,
+            )
+            .await?;
         }
 
+        tx.commit().await?;
+
         Ok(())
     }
 

+ 39 - 26
crates/cdk/src/wallet/send.rs

@@ -44,6 +44,7 @@ impl Wallet {
         // Get available proofs matching conditions
         let mut available_proofs = self
             .get_proofs_with(
+                None,
                 Some(vec![State::Unspent]),
                 opts.conditions.clone().map(|c| vec![c]),
             )
@@ -152,9 +153,10 @@ impl Wallet {
         tracing::debug!("Send amounts: {:?}", send_amounts);
         tracing::debug!("Send fee: {:?}", send_fee);
 
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
         // Reserve proofs
-        self.localstore
-            .update_proofs_state(proofs.ys()?, State::Reserved)
+        tx.update_proofs_state(proofs.ys()?, State::Reserved)
             .await?;
 
         // Check if proofs are exact send amount (and does not exceed max_proofs)
@@ -188,6 +190,8 @@ impl Wallet {
         // Calculate swap fee
         let swap_fee = self.get_proofs_fee(&proofs_to_swap).await?;
 
+        tx.commit().await?;
+
         // Return prepared send
         Ok(PreparedSend {
             wallet: self.clone(),
@@ -305,10 +309,13 @@ impl PreparedSend {
             return Err(Error::InsufficientFunds);
         }
 
+        let mut tx = self.wallet.localstore.begin_db_transaction().await?;
+
         // Check if proofs are reserved or unspent
         let sendable_proof_ys = self
             .wallet
             .get_proofs_with(
+                Some(&mut tx),
                 Some(vec![State::Reserved, State::Unspent]),
                 self.options.conditions.clone().map(|c| vec![c]),
             )
@@ -328,9 +335,8 @@ impl PreparedSend {
             "Updating proofs state to pending spent: {:?}",
             proofs_to_send.ys()?
         );
-        self.wallet
-            .localstore
-            .update_proofs_state(proofs_to_send.ys()?, State::PendingSpent)
+
+        tx.update_proofs_state(proofs_to_send.ys()?, State::PendingSpent)
             .await?;
 
         // Include token memo
@@ -338,23 +344,23 @@ impl PreparedSend {
         let memo = send_memo.and_then(|m| if m.include_memo { Some(m.memo) } else { None });
 
         // Add transaction to store
-        self.wallet
-            .localstore
-            .add_transaction(Transaction {
-                mint_url: self.wallet.mint_url.clone(),
-                direction: TransactionDirection::Outgoing,
-                amount: self.amount,
-                fee: total_send_fee,
-                unit: self.wallet.unit.clone(),
-                ys: proofs_to_send.ys()?,
-                timestamp: unix_time(),
-                memo: memo.clone(),
-                metadata: self.options.metadata,
-                quote_id: None,
-                payment_request: None,
-                payment_proof: None,
-            })
-            .await?;
+        tx.add_transaction(Transaction {
+            mint_url: self.wallet.mint_url.clone(),
+            direction: TransactionDirection::Outgoing,
+            amount: self.amount,
+            fee: total_send_fee,
+            unit: self.wallet.unit.clone(),
+            ys: proofs_to_send.ys()?,
+            timestamp: unix_time(),
+            memo: memo.clone(),
+            metadata: self.options.metadata,
+            quote_id: None,
+            payment_request: None,
+            payment_proof: None,
+        })
+        .await?;
+
+        tx.commit().await?;
 
         // Create and return token
         Ok(Token::new(
@@ -369,8 +375,15 @@ impl PreparedSend {
     pub async fn cancel(self) -> Result<(), Error> {
         tracing::info!("Cancelling prepared send");
 
+        let mut tx = self.wallet.localstore.begin_db_transaction().await?;
+
         // Double-check proofs state
-        let reserved_proofs = self.wallet.get_reserved_proofs().await?.ys()?;
+        let reserved_proofs = self
+            .wallet
+            .get_proofs_with(Some(&mut tx), Some(vec![State::Reserved]), None)
+            .await?
+            .ys()?;
+
         if !self
             .proofs()
             .ys()?
@@ -380,11 +393,11 @@ impl PreparedSend {
             return Err(Error::UnexpectedProofState);
         }
 
-        self.wallet
-            .localstore
-            .update_proofs_state(self.proofs().ys()?, State::Unspent)
+        tx.update_proofs_state(self.proofs().ys()?, State::Unspent)
             .await?;
 
+        tx.commit().await?;
+
         Ok(())
     }
 }

+ 18 - 16
crates/cdk/src/wallet/swap.rs

@@ -1,3 +1,4 @@
+use cdk_common::database::DynWalletDatabaseTransaction;
 use cdk_common::nut02::KeySetInfosMethods;
 use tracing::instrument;
 
@@ -27,6 +28,7 @@ impl Wallet {
 
         let pre_swap = self
             .create_swap(
+                self.localstore.begin_db_transaction().await?,
                 amount,
                 amount_split_target.clone(),
                 input_proofs.clone(),
@@ -133,9 +135,11 @@ impl Wallet {
             .map(|proof| proof.y())
             .collect::<Result<Vec<PublicKey>, _>>()?;
 
-        self.localstore
-            .update_proofs(added_proofs, deleted_ys)
-            .await?;
+        let mut tx = self.localstore.begin_db_transaction().await?;
+
+        tx.update_proofs(added_proofs, deleted_ys).await?;
+        tx.commit().await?;
+
         Ok(send_proofs)
     }
 
@@ -196,9 +200,10 @@ impl Wallet {
     }
 
     /// Create Swap Payload
-    #[instrument(skip(self, proofs))]
+    #[instrument(skip(self, proofs, tx))]
     pub async fn create_swap(
         &self,
+        mut tx: DynWalletDatabaseTransaction<'_>,
         amount: Option<Amount>,
         amount_split_target: SplitTarget,
         proofs: Proofs,
@@ -210,13 +215,13 @@ impl Wallet {
 
         // Desired amount is either amount passed or value of all proof
         let proofs_total = proofs.total_amount()?;
-
-        let ys: Vec<PublicKey> = proofs.ys()?;
-        self.localstore
-            .update_proofs_state(ys, State::Reserved)
+        let fee = self.get_proofs_fee(&proofs).await?;
+        let fee_and_amounts = self
+            .get_keyset_fees_and_amounts_by_id(active_keyset_id)
             .await?;
 
-        let fee = self.get_proofs_fee(&proofs).await?;
+        let ys: Vec<PublicKey> = proofs.ys()?;
+        tx.update_proofs_state(ys, State::Reserved).await?;
 
         let total_to_subtract = amount
             .unwrap_or(Amount::ZERO)
@@ -227,10 +232,6 @@ impl Wallet {
             .checked_sub(total_to_subtract)
             .ok_or(Error::InsufficientFunds)?;
 
-        let fee_and_amounts = self
-            .get_keyset_fees_and_amounts_by_id(active_keyset_id)
-            .await?;
-
         let (send_amount, change_amount) = match include_fees {
             true => {
                 let split_count = amount
@@ -259,7 +260,7 @@ impl Wallet {
         // else use state refill
         let change_split_target = match amount_split_target {
             SplitTarget::None => {
-                self.determine_split_target_values(change_amount, &fee_and_amounts)
+                self.determine_split_target_values(&mut tx, change_amount, &fee_and_amounts)
                     .await?
             }
             s => s,
@@ -296,8 +297,7 @@ impl Wallet {
                 total_secrets_needed
             );
 
-            let new_counter = self
-                .localstore
+            let new_counter = tx
                 .increment_keyset_counter(&active_keyset_id, total_secrets_needed)
                 .await?;
 
@@ -366,6 +366,8 @@ impl Wallet {
 
         let swap_request = SwapRequest::new(proofs, desired_messages.blinded_messages());
 
+        tx.commit().await?;
+
         Ok(PreSwap {
             pre_mint_secrets: desired_messages,
             swap_request,

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است