浏览代码

Implement transactions for redb

Cesar Rodas 1 月之前
父节点
当前提交
5ea135842e
共有 1 个文件被更改,包括 512 次插入447 次删除
  1. 512 447
      crates/cdk-redb/src/wallet/mod.rs

+ 512 - 447
crates/cdk-redb/src/wallet/mod.rs

@@ -54,6 +54,29 @@ pub struct WalletRedbDatabase {
     db: Arc<Database>,
 }
 
+/// Redb Wallet Transaction
+pub struct RedbWalletTransaction {
+    write_txn: Option<redb::WriteTransaction>,
+}
+
+impl RedbWalletTransaction {
+    /// Create a new transaction
+    fn new(write_txn: redb::WriteTransaction) -> Self {
+        Self {
+            write_txn: Some(write_txn),
+        }
+    }
+
+    /// Get a mutable reference to the write transaction
+    fn txn(&mut self) -> Result<&mut redb::WriteTransaction, Error> {
+        self.write_txn.as_mut().ok_or_else(|| {
+            Error::CDKDatabase(database::Error::Internal(
+                "Transaction already consumed".to_owned(),
+            ))
+        })
+    }
+}
+
 impl WalletRedbDatabase {
     /// Create new [`WalletRedbDatabase`]
     pub fn new(path: &Path) -> Result<Self, Error> {
@@ -189,45 +212,6 @@ impl WalletDatabase for WalletRedbDatabase {
     type Err = database::Error;
 
     #[instrument(skip(self))]
-    async fn add_mint(
-        &self,
-        mint_url: MintUrl,
-        mint_info: Option<MintInfo>,
-    ) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
-            table
-                .insert(
-                    mint_url.to_string().as_str(),
-                    serde_json::to_string(&mint_info)
-                        .map_err(Error::from)?
-                        .as_str(),
-                )
-                .map_err(Error::from)?;
-        }
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
-    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
-            table
-                .remove(mint_url.to_string().as_str())
-                .map_err(Error::from)?;
-        }
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
     async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
         let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
@@ -262,149 +246,6 @@ impl WalletDatabase for WalletRedbDatabase {
     }
 
     #[instrument(skip(self))]
-    async fn update_mint_url(
-        &self,
-        old_mint_url: MintUrl,
-        new_mint_url: MintUrl,
-    ) -> Result<(), Self::Err> {
-        // Update proofs table
-        {
-            let proofs = self
-                .get_proofs(Some(old_mint_url.clone()), None, None, None)
-                .await
-                .map_err(Error::from)?;
-
-            // Proofs with new url
-            let updated_proofs: Vec<ProofInfo> = proofs
-                .clone()
-                .into_iter()
-                .map(|mut p| {
-                    p.mint_url = new_mint_url.clone();
-                    p
-                })
-                .collect();
-
-            if !updated_proofs.is_empty() {
-                self.update_proofs(updated_proofs, vec![]).await?;
-            }
-        }
-
-        // Update mint quotes
-        {
-            let quotes = self.get_mint_quotes().await?;
-
-            let unix_time = unix_time();
-
-            let quotes: Vec<MintQuote> = quotes
-                .into_iter()
-                .filter_map(|mut q| {
-                    if q.expiry < unix_time {
-                        q.mint_url = new_mint_url.clone();
-                        Some(q)
-                    } else {
-                        None
-                    }
-                })
-                .collect();
-
-            for quote in quotes {
-                self.add_mint_quote(quote).await?;
-            }
-        }
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
-    async fn add_mint_keysets(
-        &self,
-        mint_url: MintUrl,
-        keysets: Vec<KeySetInfo>,
-    ) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        let mut existing_u32 = false;
-
-        {
-            let mut table = write_txn
-                .open_multimap_table(MINT_KEYSETS_TABLE)
-                .map_err(Error::from)?;
-            let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
-            let mut u32_table = write_txn
-                .open_table(KEYSET_U32_MAPPING)
-                .map_err(Error::from)?;
-
-            for keyset in keysets {
-                // Check if keyset already exists
-                let existing_keyset = {
-                    let existing_keyset = keysets_table
-                        .get(keyset.id.to_bytes().as_slice())
-                        .map_err(Error::from)?;
-
-                    existing_keyset.map(|r| r.value().to_string())
-                };
-
-                let existing = u32_table
-                    .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
-                    .map_err(Error::from)?;
-
-                match existing {
-                    None => existing_u32 = false,
-                    Some(id) => {
-                        let id = Id::from_str(id.value())?;
-
-                        if id == keyset.id {
-                            existing_u32 = false;
-                        } else {
-                            println!("Breaking here");
-                            existing_u32 = true;
-                            break;
-                        }
-                    }
-                }
-
-                let keyset = if let Some(existing_keyset) = existing_keyset {
-                    let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
-
-                    existing_keyset.active = keyset.active;
-                    existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
-
-                    existing_keyset
-                } else {
-                    table
-                        .insert(
-                            mint_url.to_string().as_str(),
-                            keyset.id.to_bytes().as_slice(),
-                        )
-                        .map_err(Error::from)?;
-
-                    keyset
-                };
-
-                keysets_table
-                    .insert(
-                        keyset.id.to_bytes().as_slice(),
-                        serde_json::to_string(&keyset)
-                            .map_err(Error::from)?
-                            .as_str(),
-                    )
-                    .map_err(Error::from)?;
-            }
-        }
-
-        if existing_u32 {
-            tracing::warn!("Keyset already exists for keyset id");
-            write_txn.abort().map_err(Error::from)?;
-
-            return Err(database::Error::Duplicate);
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip(self))]
     async fn get_mint_keysets(
         &self,
         mint_url: MintUrl,
@@ -462,27 +303,6 @@ impl WalletDatabase for WalletRedbDatabase {
     }
 
     #[instrument(skip_all)]
-    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MINT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table
-                .insert(
-                    quote.id.as_str(),
-                    serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
-                )
-                .map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
     async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
         let table = read_txn
@@ -512,43 +332,6 @@ impl WalletDatabase for WalletRedbDatabase {
     }
 
     #[instrument(skip_all)]
-    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MINT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table.remove(quote_id).map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
-    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MELT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table
-                .insert(
-                    quote.id.as_str(),
-                    serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
-                )
-                .map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
     async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Error::from)?;
         let table = read_txn
@@ -577,74 +360,6 @@ impl WalletDatabase for WalletRedbDatabase {
             .collect())
     }
 
-    #[instrument(skip_all)]
-    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn
-                .open_table(MELT_QUOTES_TABLE)
-                .map_err(Error::from)?;
-            table.remove(quote_id).map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip_all)]
-    async fn add_keys(&self, keyset: KeySet) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        keyset.verify_id()?;
-
-        let existing_keys;
-        let existing_u32;
-
-        {
-            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
-
-            existing_keys = table
-                .insert(
-                    keyset.id.to_string().as_str(),
-                    serde_json::to_string(&keyset.keys)
-                        .map_err(Error::from)?
-                        .as_str(),
-                )
-                .map_err(Error::from)?
-                .is_some();
-
-            let mut table = write_txn
-                .open_table(KEYSET_U32_MAPPING)
-                .map_err(Error::from)?;
-
-            let existing = table
-                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
-                .map_err(Error::from)?;
-
-            match existing {
-                None => existing_u32 = false,
-                Some(id) => {
-                    let id = Id::from_str(id.value())?;
-
-                    existing_u32 = id != keyset.id;
-                }
-            }
-        }
-
-        if existing_keys || existing_u32 {
-            tracing::warn!("Keys already exist for keyset id");
-            write_txn.abort().map_err(Error::from)?;
-
-            return Err(database::Error::Duplicate);
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
     #[instrument(skip(self), fields(keyset_id = %keyset_id))]
     async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Error::from)?;
@@ -660,54 +375,6 @@ impl WalletDatabase for WalletRedbDatabase {
         Ok(None)
     }
 
-    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
-    async fn remove_keys(&self, keyset_id: &Id) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
-
-            table
-                .remove(keyset_id.to_string().as_str())
-                .map_err(Error::from)?;
-        }
-
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
-    #[instrument(skip(self, added, deleted_ys))]
-    async fn update_proofs(
-        &self,
-        added: Vec<ProofInfo>,
-        deleted_ys: Vec<PublicKey>,
-    ) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
-
-        {
-            let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
-
-            for proof_info in added.iter() {
-                table
-                    .insert(
-                        proof_info.y.to_bytes().as_slice(),
-                        serde_json::to_string(&proof_info)
-                            .map_err(Error::from)?
-                            .as_str(),
-                    )
-                    .map_err(Error::from)?;
-            }
-
-            for y in deleted_ys.iter() {
-                table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
-            }
-        }
-        write_txn.commit().map_err(Error::from)?;
-
-        Ok(())
-    }
-
     #[instrument(skip_all)]
     async fn get_proofs(
         &self,
@@ -753,168 +420,566 @@ impl WalletDatabase for WalletRedbDatabase {
         Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
     }
 
-    async fn update_proofs_state(
+    #[instrument(skip(self))]
+    async fn get_transaction(
         &self,
-        ys: Vec<PublicKey>,
-        state: State,
-    ) -> Result<(), database::Error> {
+        transaction_id: TransactionId,
+    ) -> Result<Option<Transaction>, Self::Err> {
         let read_txn = self.db.begin_read().map_err(Error::from)?;
-        let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        let table = read_txn
+            .open_table(TRANSACTIONS_TABLE)
+            .map_err(Error::from)?;
 
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
+            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
+        }
 
-        for y in ys {
-            let y_slice = y.to_bytes();
-            let proof = table
-                .get(y_slice.as_slice())
-                .map_err(Error::from)?
-                .ok_or(Error::UnknownY)?;
+        Ok(None)
+    }
 
-            let mut proof_info =
-                serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
+    #[instrument(skip(self))]
+    async fn list_transactions(
+        &self,
+        mint_url: Option<MintUrl>,
+        direction: Option<TransactionDirection>,
+        unit: Option<CurrencyUnit>,
+    ) -> Result<Vec<Transaction>, Self::Err> {
+        let read_txn = self.db.begin_read().map_err(Error::from)?;
 
-            proof_info.state = state;
+        let table = read_txn
+            .open_table(TRANSACTIONS_TABLE)
+            .map_err(Error::from)?;
 
-            {
-                let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
-                table
-                    .insert(
-                        y_slice.as_slice(),
-                        serde_json::to_string(&proof_info)
-                            .map_err(Error::from)?
-                            .as_str(),
-                    )
-                    .map_err(Error::from)?;
-            }
-        }
+        let transactions: Vec<Transaction> = table
+            .iter()
+            .map_err(Error::from)?
+            .flatten()
+            .filter_map(|(_k, v)| {
+                let mut transaction = None;
+
+                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
+                    if tx.matches_conditions(&mint_url, &direction, &unit) {
+                        transaction = Some(tx)
+                    }
+                }
 
-        write_txn.commit().map_err(Error::from)?;
+                transaction
+            })
+            .collect();
 
-        Ok(())
+        Ok(transactions)
     }
 
-    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
-    async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<u32, Self::Err> {
+    async fn begin_db_transaction<'a>(
+        &'a self,
+    ) -> Result<
+        Box<dyn cdk_common::database::WalletDatabaseTransaction<'a, Self::Err> + Send + Sync + 'a>,
+        Self::Err,
+    > {
         let write_txn = self.db.begin_write().map_err(Error::from)?;
+        Ok(Box::new(RedbWalletTransaction::new(write_txn)))
+    }
+}
 
-        let current_counter;
-        let new_counter;
-        {
-            let table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
-            let counter = table
-                .get(keyset_id.to_string().as_str())
-                .map_err(Error::from)?;
+#[async_trait]
+impl<'a> cdk_common::database::WalletDatabaseTransaction<'a, database::Error>
+    for RedbWalletTransaction
+{
+    #[instrument(skip(self, mint_info))]
+    async fn add_mint(
+        &mut self,
+        mint_url: MintUrl,
+        mint_info: Option<MintInfo>,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MINTS_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                mint_url.to_string().as_str(),
+                serde_json::to_string(&mint_info)
+                    .map_err(Error::from)?
+                    .as_str(),
+            )
+            .map_err(Error::from)?;
+        Ok(())
+    }
 
-            current_counter = match counter {
-                Some(c) => c.value(),
-                None => 0,
-            };
+    #[instrument(skip(self))]
+    async fn remove_mint(&mut self, mint_url: MintUrl) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MINTS_TABLE).map_err(Error::from)?;
+        table
+            .remove(mint_url.to_string().as_str())
+            .map_err(Error::from)?;
+        Ok(())
+    }
 
-            new_counter = current_counter + count;
+    #[instrument(skip(self))]
+    async fn update_mint_url(
+        &mut self,
+        old_mint_url: MintUrl,
+        new_mint_url: MintUrl,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+
+        // Get all proofs with old mint URL
+        let proofs_table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        let proofs: Vec<ProofInfo> = proofs_table
+            .iter()
+            .map_err(Error::from)?
+            .flatten()
+            .filter_map(|(_k, v)| {
+                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
+                    if proof_info.matches_conditions(
+                        &Some(old_mint_url.clone()),
+                        &None,
+                        &None,
+                        &None,
+                    ) {
+                        return Some(proof_info);
+                    }
+                }
+                None
+            })
+            .collect();
+
+        // Update proofs
+        drop(proofs_table);
+        let mut proofs_table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        for mut proof_info in proofs {
+            proof_info.mint_url = new_mint_url.clone();
+            proofs_table
+                .insert(
+                    proof_info.y.to_bytes().as_slice(),
+                    serde_json::to_string(&proof_info)
+                        .map_err(Error::from)?
+                        .as_str(),
+                )
+                .map_err(Error::from)?;
         }
 
-        {
-            let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
+        // Update mint quotes
+        let quotes_table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+        let unix_time = unix_time();
+        let quotes: Vec<MintQuote> = quotes_table
+            .iter()
+            .map_err(Error::from)?
+            .flatten()
+            .filter_map(|(_id, quote)| {
+                if let Ok(mut q) = serde_json::from_str::<MintQuote>(quote.value()) {
+                    if q.mint_url == old_mint_url && q.expiry >= unix_time {
+                        q.mint_url = new_mint_url.clone();
+                        return Some(q);
+                    }
+                }
+                None
+            })
+            .collect();
 
-            table
-                .insert(keyset_id.to_string().as_str(), new_counter)
+        drop(quotes_table);
+        let mut quotes_table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+        for quote in quotes {
+            quotes_table
+                .insert(
+                    quote.id.as_str(),
+                    serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
+                )
                 .map_err(Error::from)?;
         }
-        write_txn.commit().map_err(Error::from)?;
 
-        Ok(new_counter)
+        Ok(())
     }
 
     #[instrument(skip(self))]
-    async fn add_transaction(&self, transaction: Transaction) -> Result<(), Self::Err> {
-        let id = transaction.id();
+    async fn add_mint_keysets(
+        &mut self,
+        mint_url: MintUrl,
+        keysets: Vec<KeySetInfo>,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
 
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+        let mut existing_u32 = false;
+        let mut table = txn
+            .open_multimap_table(MINT_KEYSETS_TABLE)
+            .map_err(Error::from)?;
+        let mut keysets_table = txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
+        let mut u32_table = txn.open_table(KEYSET_U32_MAPPING).map_err(Error::from)?;
+
+        for keyset in keysets {
+            // Check if keyset already exists
+            let existing_keyset = {
+                let existing_keyset = keysets_table
+                    .get(keyset.id.to_bytes().as_slice())
+                    .map_err(Error::from)?;
 
-        {
-            let mut table = write_txn
-                .open_table(TRANSACTIONS_TABLE)
+                existing_keyset.map(|r| r.value().to_string())
+            };
+
+            let existing = u32_table
+                .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
                 .map_err(Error::from)?;
-            table
+
+            match existing {
+                None => existing_u32 = false,
+                Some(id) => {
+                    let id = Id::from_str(id.value())?;
+
+                    if id == keyset.id {
+                        existing_u32 = false;
+                    } else {
+                        existing_u32 = true;
+                        break;
+                    }
+                }
+            }
+
+            let keyset = if let Some(existing_keyset) = existing_keyset {
+                let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
+
+                existing_keyset.active = keyset.active;
+                existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
+
+                existing_keyset
+            } else {
+                table
+                    .insert(
+                        mint_url.to_string().as_str(),
+                        keyset.id.to_bytes().as_slice(),
+                    )
+                    .map_err(Error::from)?;
+
+                keyset
+            };
+
+            keysets_table
                 .insert(
-                    id.as_slice(),
-                    serde_json::to_string(&transaction)
+                    keyset.id.to_bytes().as_slice(),
+                    serde_json::to_string(&keyset)
                         .map_err(Error::from)?
                         .as_str(),
                 )
                 .map_err(Error::from)?;
         }
 
-        write_txn.commit().map_err(Error::from)?;
+        if existing_u32 {
+            return Err(database::Error::Duplicate);
+        }
 
         Ok(())
     }
 
     #[instrument(skip(self))]
-    async fn get_transaction(
-        &self,
-        transaction_id: TransactionId,
-    ) -> Result<Option<Transaction>, Self::Err> {
-        let read_txn = self.db.begin_read().map_err(Error::from)?;
-        let table = read_txn
-            .open_table(TRANSACTIONS_TABLE)
+    async fn get_mint_quote(
+        &mut self,
+        quote_id: &str,
+    ) -> Result<Option<MintQuote>, database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+
+        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
+            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
+        }
+
+        Ok(None)
+    }
+
+    #[instrument(skip(self))]
+    async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                quote.id.as_str(),
+                serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
+            )
             .map_err(Error::from)?;
+        Ok(())
+    }
 
-        if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
-            return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
+    #[instrument(skip(self))]
+    async fn remove_mint_quote(&mut self, quote_id: &str) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
+        table.remove(quote_id).map_err(Error::from)?;
+        Ok(())
+    }
+
+    #[instrument(skip(self))]
+    async fn get_melt_quote(
+        &mut self,
+        quote_id: &str,
+    ) -> Result<Option<wallet::MeltQuote>, database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
+
+        if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
+            return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
         }
 
         Ok(None)
     }
 
     #[instrument(skip(self))]
-    async fn list_transactions(
-        &self,
-        mint_url: Option<MintUrl>,
-        direction: Option<TransactionDirection>,
-        unit: Option<CurrencyUnit>,
-    ) -> Result<Vec<Transaction>, Self::Err> {
-        let read_txn = self.db.begin_read().map_err(Error::from)?;
+    async fn add_melt_quote(&mut self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                quote.id.as_str(),
+                serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
+            )
+            .map_err(Error::from)?;
+        Ok(())
+    }
 
-        let table = read_txn
-            .open_table(TRANSACTIONS_TABLE)
+    #[instrument(skip(self))]
+    async fn remove_melt_quote(&mut self, quote_id: &str) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
+        table.remove(quote_id).map_err(Error::from)?;
+        Ok(())
+    }
+
+    #[instrument(skip_all)]
+    async fn add_keys(&mut self, keyset: KeySet) -> Result<(), database::Error> {
+        keyset.verify_id()?;
+
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
+
+        let existing_keys = table
+            .insert(
+                keyset.id.to_string().as_str(),
+                serde_json::to_string(&keyset.keys)
+                    .map_err(Error::from)?
+                    .as_str(),
+            )
+            .map_err(Error::from)?
+            .is_some();
+
+        let mut table = txn.open_table(KEYSET_U32_MAPPING).map_err(Error::from)?;
+
+        let existing = table
+            .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
             .map_err(Error::from)?;
 
-        let transactions: Vec<Transaction> = table
+        let existing_u32 = match existing {
+            None => false,
+            Some(id) => {
+                let id = Id::from_str(id.value())?;
+                id != keyset.id
+            }
+        };
+
+        if existing_keys || existing_u32 {
+            return Err(database::Error::Duplicate);
+        }
+
+        Ok(())
+    }
+
+    #[instrument(skip(self), fields(keyset_id = %id))]
+    async fn remove_keys(&mut self, id: &Id) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
+
+        table.remove(id.to_string().as_str()).map_err(Error::from)?;
+
+        Ok(())
+    }
+
+    #[instrument(skip(self))]
+    async fn get_proofs(
+        &mut self,
+        mint_url: Option<MintUrl>,
+        unit: Option<CurrencyUnit>,
+        state: Option<Vec<State>>,
+        spending_conditions: Option<Vec<SpendingConditions>>,
+    ) -> Result<Vec<ProofInfo>, database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+
+        let proofs: Vec<ProofInfo> = table
             .iter()
             .map_err(Error::from)?
             .flatten()
             .filter_map(|(_k, v)| {
-                let mut transaction = None;
+                let mut proof = None;
 
-                if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
-                    if tx.matches_conditions(&mint_url, &direction, &unit) {
-                        transaction = Some(tx)
+                if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
+                    if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
+                    {
+                        proof = Some(proof_info)
                     }
                 }
 
-                transaction
+                proof
             })
             .collect();
 
-        Ok(transactions)
+        Ok(proofs)
     }
 
-    #[instrument(skip(self))]
-    async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), Self::Err> {
-        let write_txn = self.db.begin_write().map_err(Error::from)?;
+    #[instrument(skip(self, added, removed_ys))]
+    async fn update_proofs(
+        &mut self,
+        added: Vec<ProofInfo>,
+        removed_ys: Vec<PublicKey>,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+
+        for proof_info in added.iter() {
+            table
+                .insert(
+                    proof_info.y.to_bytes().as_slice(),
+                    serde_json::to_string(&proof_info)
+                        .map_err(Error::from)?
+                        .as_str(),
+                )
+                .map_err(Error::from)?;
+        }
+
+        for y in removed_ys.iter() {
+            table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
+        }
 
+        Ok(())
+    }
+
+    #[instrument(skip(self, ys))]
+    async fn update_proofs_state(
+        &mut self,
+        ys: Vec<PublicKey>,
+        state: State,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+
+        // First read all proofs
+        let table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        let mut proofs_to_update = Vec::new();
+
+        for y in ys {
+            let y_slice = y.to_bytes();
+            let proof = table
+                .get(y_slice.as_slice())
+                .map_err(Error::from)?
+                .ok_or(Error::UnknownY)?;
+
+            let mut proof_info =
+                serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
+
+            proof_info.state = state;
+            proofs_to_update.push((y_slice, proof_info));
+        }
+
+        // Now update them
+        drop(table);
+        let mut table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
+        for (y_slice, proof_info) in proofs_to_update {
+            table
+                .insert(
+                    y_slice.as_slice(),
+                    serde_json::to_string(&proof_info)
+                        .map_err(Error::from)?
+                        .as_str(),
+                )
+                .map_err(Error::from)?;
+        }
+
+        Ok(())
+    }
+
+    #[instrument(skip(self), fields(keyset_id = %keyset_id))]
+    async fn increment_keyset_counter(
+        &mut self,
+        keyset_id: &Id,
+        count: u32,
+    ) -> Result<u32, database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+
+        let current_counter;
         {
-            let mut table = write_txn
-                .open_table(TRANSACTIONS_TABLE)
+            let table = txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
+            let counter = table
+                .get(keyset_id.to_string().as_str())
                 .map_err(Error::from)?;
+
+            current_counter = match counter {
+                Some(c) => c.value(),
+                None => 0,
+            };
+        }
+
+        let new_counter = current_counter + count;
+
+        {
+            let mut table = txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
+
             table
-                .remove(transaction_id.as_slice())
+                .insert(keyset_id.to_string().as_str(), new_counter)
                 .map_err(Error::from)?;
         }
 
-        write_txn.commit().map_err(Error::from)?;
+        Ok(new_counter)
+    }
+
+    #[instrument(skip(self))]
+    async fn add_transaction(&mut self, transaction: Transaction) -> Result<(), database::Error> {
+        let id = transaction.id();
+
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(TRANSACTIONS_TABLE).map_err(Error::from)?;
+        table
+            .insert(
+                id.as_slice(),
+                serde_json::to_string(&transaction)
+                    .map_err(Error::from)?
+                    .as_str(),
+            )
+            .map_err(Error::from)?;
+
+        Ok(())
+    }
+
+    #[instrument(skip(self))]
+    async fn remove_transaction(
+        &mut self,
+        transaction_id: TransactionId,
+    ) -> Result<(), database::Error> {
+        let txn = self.txn().map_err(Into::<database::Error>::into)?;
+        let mut table = txn.open_table(TRANSACTIONS_TABLE).map_err(Error::from)?;
+        table
+            .remove(transaction_id.as_slice())
+            .map_err(Error::from)?;
+
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl cdk_common::database::DbTransactionFinalizer for RedbWalletTransaction {
+    type Err = database::Error;
+
+    async fn commit(mut self: Box<Self>) -> Result<(), database::Error> {
+        if let Some(txn) = self.write_txn.take() {
+            txn.commit().map_err(Error::from)?;
+        }
+        Ok(())
+    }
 
+    async fn rollback(mut self: Box<Self>) -> Result<(), database::Error> {
+        if let Some(txn) = self.write_txn.take() {
+            txn.abort().map_err(Error::from)?;
+        }
         Ok(())
     }
 }
+
+impl Drop for RedbWalletTransaction {
+    fn drop(&mut self) {
+        if let Some(txn) = self.write_txn.take() {
+            let _ = txn.abort();
+        }
+    }
+}