浏览代码

Working on a better database abstraction

This is WIP commit

After [this question in the chat](https://matrix.to/#/!oJFtttFHGfnTGrIjvD:matrix.cashu.space/$oJFtttFHGfnTGrIjvD:matrix.cashu.space/$I5ZtjJtBM0ctltThDYpoCwClZFlM6PHzf8q2Rjqmso8)
regarding a database transaction within the same function, I realized a few
design flaws in our SQL database abstraction, particularly regarding
transactions.

1. Our upper abstraction got it right, where a transaction is bound with `&mut
   self`, so Rust knows how to handle its lifetime with' async/await'.
2. The raw database does not; instead, it returns &self, and beginning a
   transaction takes &self as well, which is problematic for Rust, but that's not
   all. It is fundamentally wrong. A transaction should take &mut self when
   beginning a transaction, as that connection is bound to a transaction and
   should not be returned to the pool. Currently, that responsibility lies with
   the implementor. If a mistake is made, a transaction could be executed in two
   or more connections.
3. The way a database is bound to our store layer is through a single struct,
   which may or may not internally utilize our connection pool. This is also
   another design flow, in this PR, a connection pool is owned, and to use a
   connection, it should be requested, and that connection is reference with
   mutable when beginning a transaction
Cesar Rodas 3 月之前
父节点
当前提交
b6a40c21f0
共有 3 个文件被更改,包括 329 次插入91 次删除
  1. 1 1
      crates/cdk-sql-common/src/database.rs
  2. 46 10
      crates/cdk-sql-common/src/mint/auth/mod.rs
  3. 282 80
      crates/cdk-sql-common/src/mint/mod.rs

+ 1 - 1
crates/cdk-sql-common/src/database.rs

@@ -42,7 +42,7 @@ pub trait DatabaseTransaction<'a>: Debug + DatabaseExecutor + Send + Sync {
 
 /// Database connector
 #[async_trait::async_trait]
-pub trait DatabaseConnector: Debug + DatabaseExecutor + Send + Sync {
+pub trait DatabaseConnector: Debug + DatabaseExecutor + Send + Sync + 'static {
     /// Transaction type for this database connection
     type Transaction<'a>: DatabaseTransaction<'a>
     where

+ 46 - 10
crates/cdk-sql-common/src/mint/auth/mod.rs

@@ -1,6 +1,7 @@
 //! SQL Mint Auth
 
 use std::collections::HashMap;
+use std::fmt::Debug;
 use std::marker::PhantomData;
 use std::str::FromStr;
 
@@ -17,6 +18,7 @@ use crate::column_as_string;
 use crate::common::migrate;
 use crate::database::{DatabaseConnector, DatabaseTransaction};
 use crate::mint::Error;
+use crate::pool::ResourceManager;
 use crate::stmt::query;
 
 /// Mint SQL Database
@@ -56,9 +58,11 @@ mod migrations;
 
 
 #[async_trait]
-impl<'a, T> MintAuthTransaction<database::Error> for SQLTransaction<'a, T>
+impl<'a, DB, RM, C> MintAuthTransaction<database::Error> for SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = database::Error>,
+    C: Debug + Clone + Send + Sync,
 {
     #[instrument(skip(self))]
     async fn set_active_keyset(&mut self, id: Id) -> Result<(), database::Error> {
@@ -73,7 +77,11 @@ where
             "#,
         )?
         .bind("id", id.to_string())
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         Ok(())
@@ -109,7 +117,11 @@ where
         .bind("derivation_path", keyset.derivation_path.to_string())
         .bind("max_order", keyset.max_order)
         .bind("derivation_path_index", keyset.derivation_path_index)
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         Ok(())
@@ -129,7 +141,11 @@ where
         .bind("secret", proof.secret.to_string())
         .bind("c", proof.c.to_bytes().to_vec())
         .bind("state", "UNSPENT".to_string())
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await
         {
             tracing::debug!("Attempting to add known proof. Skipping.... {:?}", err);
@@ -144,7 +160,11 @@ where
     ) -> Result<Option<State>, Self::Err> {
         let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#)?
             .bind("y", y.to_bytes().to_vec())
-            .pluck(&self.inner)
+            .pluck(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?
             .map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str)))
             .transpose()?;
@@ -156,7 +176,11 @@ where
                 current_state.as_ref().map(|state| state.to_string()),
             )
             .bind("new_state", proofs_state.to_string())
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
 
         Ok(current_state)
@@ -181,7 +205,11 @@ where
             .bind("amount", u64::from(signature.amount) as i64)
             .bind("keyset_id", signature.keyset_id.to_string())
             .bind("c", signature.c.to_bytes().to_vec())
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
         }
 
@@ -202,7 +230,11 @@ where
             )?
             .bind("endpoint", serde_json::to_string(endpoint)?)
             .bind("auth", serde_json::to_string(auth)?)
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await
             {
                 tracing::debug!(
@@ -226,7 +258,11 @@ where
                     .map(serde_json::to_string)
                     .collect::<Result<_, _>>()?,
             )
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
         Ok(())
     }

+ 282 - 80
crates/cdk-sql-common/src/mint/mod.rs

@@ -9,6 +9,7 @@
 //! clients in a pool and expose them to an asynchronous environment, making them compatible with
 //! Mint.
 use std::collections::HashMap;
+use std::fmt::Debug;
 use std::marker::PhantomData;
 use std::str::FromStr;
 use std::sync::Arc;
@@ -40,7 +41,7 @@ use uuid::Uuid;
 
 use crate::common::migrate;
 use crate::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
-use crate::pool::{Pool, ResourceManager};
+use crate::pool::{Pool, PooledResource, ResourceManager};
 use crate::stmt::{query, Column};
 use crate::{
     column_as_nullable_number, column_as_nullable_string, column_as_number, column_as_string,
@@ -59,21 +60,37 @@ pub use auth::SQLMintAuthDatabase;
 
 /// Mint SQL Database
 #[derive(Debug, Clone)]
-pub struct SQLMintDatabase<T, DB>
+pub struct SQLMintDatabase<RM, DB, C>
 where
     DB: DatabaseConnector,
-    T: ResourceManager<Resource = DB>,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
-    db: Arc<Pool<T>>,
+    db: Arc<Pool<RM>>,
 }
 
 /// SQL Transaction Writer
-pub struct SQLTransaction<'a, T>
+pub struct SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
-    inner: T,
-    _phantom: PhantomData<&'a ()>,
+    conn: PooledResource<RM>,
+    inner: Option<DB::Transaction<'a>>,
+    _phantom: PhantomData<&'a mut DB>,
+}
+
+impl<'a, DB, RM, C> SQLTransaction<'a, DB, RM, C>
+where
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
+{
+    pub async fn begin(&'a mut self) -> Result<(), Error> {
+        self.inner = Some(self.conn.begin().await?);
+        Ok(())
+    }
 }
 
 #[inline(always)]
@@ -118,18 +135,19 @@ where
     Ok(())
 }
 
-impl<T, DB> SQLMintDatabase<T, DB>
+impl<RM, DB, C> SQLMintDatabase<RM, DB, C>
 where
     DB: DatabaseConnector,
-    T: ResourceManager<Resource = DB>,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     /// Creates a new instance
     pub async fn new<X>(db: X) -> Result<Self, Error>
     where
         X: Into<DB>,
     {
-        let db = db.into();
-        Self::migrate(&db).await?;
+        let db = Pool::new(db.into());
+        Self::migrate(db.get()).await?;
         Ok(Self { db })
     }
 
@@ -158,9 +176,11 @@ where
 }
 
 #[async_trait]
-impl<'a, T> database::MintProofsTransaction<'a> for SQLTransaction<'a, T>
+impl<'a, DB, RM, C> database::MintProofsTransaction<'a> for SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
@@ -181,7 +201,11 @@ where
                     .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
                     .collect::<Result<_, _>>()?,
             )
-            .pluck(&self.inner)
+            .pluck(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?
             .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
             .transpose()?
@@ -212,7 +236,11 @@ where
             .bind("state", "UNSPENT".to_string())
             .bind("quote_id", quote_id.map(|q| q.hyphenated().to_string()))
             .bind("created_time", current_time as i64)
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
         }
 
@@ -224,7 +252,13 @@ where
         ys: &[PublicKey],
         new_state: State,
     ) -> Result<Vec<Option<State>>, Self::Err> {
-        let mut current_states = get_current_states(&self.inner, ys).await?;
+        let mut current_states = get_current_states(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            ys,
+        )
+        .await?;
 
         if current_states.len() != ys.len() {
             tracing::warn!(
@@ -242,7 +276,11 @@ where
         query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
             .bind("new_state", new_state.to_string())
             .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
 
         Ok(ys.iter().map(|y| current_states.remove(y)).collect())
@@ -260,7 +298,11 @@ where
         )?
         .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
         .bind_vec("exclude_state", vec![State::Spent.to_string()])
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         if total_deleted != ys.len() {
@@ -272,33 +314,57 @@ where
 }
 
 #[async_trait]
-impl<'a, T> database::MintTransaction<'a, Error> for SQLTransaction<'a, T>
+impl<'a, DB, RM, C> database::MintTransaction<'a, Error> for SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     async fn set_mint_info(&mut self, mint_info: MintInfo) -> Result<(), Error> {
-        Ok(set_to_config(&self.inner, "mint_info", &mint_info).await?)
+        Ok(set_to_config(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            "mint_info",
+            &mint_info,
+        )
+        .await?)
     }
 
     async fn set_quote_ttl(&mut self, quote_ttl: QuoteTTL) -> Result<(), Error> {
-        Ok(set_to_config(&self.inner, "quote_ttl", &quote_ttl).await?)
+        Ok(set_to_config(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            "quote_ttl",
+            &quote_ttl,
+        )
+        .await?)
     }
 }
 
 #[async_trait]
-impl<'a, T> MintDbWriterFinalizer for SQLTransaction<'a, T>
+impl<'a, DB, RM, C> MintDbWriterFinalizer for SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
     async fn commit(self: Box<Self>) -> Result<(), Error> {
-        self.inner.commit().await?;
+        self.inner
+            .ok_or(Error::Internal("Missing active transaction".to_owned()))?
+            .commit()
+            .await?;
         Ok(())
     }
 
     async fn rollback(self: Box<Self>) -> Result<(), Error> {
-        self.inner.rollback().await?;
+        self.inner
+            .ok_or(Error::Internal("Missing active transaction".to_owned()))?
+            .rollback()
+            .await?;
         Ok(())
     }
 }
@@ -364,9 +430,11 @@ WHERE quote_id=:quote_id
 }
 
 #[async_trait]
-impl<'a, T> MintKeyDatabaseTransaction<'a, Error> for SQLTransaction<'a, T>
+impl<'a, DB, RM, C> MintKeyDatabaseTransaction<'a, Error> for SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error> {
         query(
@@ -400,7 +468,11 @@ where
         .bind("max_order", keyset.max_order)
         .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
         .bind("derivation_path_index", keyset.derivation_path_index)
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         Ok(())
@@ -409,13 +481,21 @@ where
     async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error> {
         query(r#"UPDATE keyset SET active=FALSE WHERE unit IS :unit"#)?
             .bind("unit", unit.to_string())
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
 
         query(r#"UPDATE keyset SET active=TRUE WHERE unit IS :unit AND id IS :id"#)?
             .bind("unit", unit.to_string())
             .bind("id", id.to_string())
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
 
         Ok(())
@@ -423,19 +503,21 @@ where
 }
 
 #[async_trait]
-impl<T, DB> MintKeysDatabase for SQLMintDatabase<T, DB>
+impl<RM, DB, C> MintKeysDatabase for SQLMintDatabase<RM, DB, C>
 where
     DB: DatabaseConnector,
-    T: ResourceManager<Resource = DB>,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
     async fn begin_transaction<'a>(
         &'a self,
     ) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
-        let conn = self.db.get().map_err(|e| Error::Database(Box::new(e)))?;
+        let mut conn = self.db.get().map_err(|e| Error::Database(Box::new(e)))?;
         Ok(Box::new(SQLTransaction {
-            inner: conn.begin().await?,
+            inner: None,
+            conn,
             _phantom: PhantomData,
         }))
     }
@@ -521,9 +603,11 @@ where
 }
 
 #[async_trait]
-impl<'a, T> MintQuotesTransaction<'a> for SQLTransaction<'a, T>
+impl<'a, DB, RM, C> MintQuotesTransaction<'a> for SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
@@ -544,7 +628,11 @@ where
             "#,
         )?
         .bind("payment_id", payment_id.clone())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         if exists.is_some() {
@@ -562,7 +650,11 @@ where
             "#,
         )?
         .bind("quote_id", quote_id.as_hyphenated().to_string())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await
         .map_err(|err| {
             tracing::error!("SQLite could not get mint quote amount_paid");
@@ -591,7 +683,11 @@ where
         )?
         .bind("amount_paid", new_amount_paid.to_i64())
         .bind("quote_id", quote_id.as_hyphenated().to_string())
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await
         .map_err(|err| {
             tracing::error!("SQLite could not update mint quote amount_paid");
@@ -610,7 +706,11 @@ where
         .bind("payment_id", payment_id)
         .bind("amount", amount_paid.to_i64())
         .bind("timestamp", unix_time() as i64)
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await
         .map_err(|err| {
             tracing::error!("SQLite could not insert payment ID: {}", err);
@@ -636,7 +736,11 @@ where
             "#,
         )?
         .bind("quote_id", quote_id.as_hyphenated().to_string())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await
         .map_err(|err| {
             tracing::error!("SQLite could not get mint quote amount_issued");
@@ -666,7 +770,11 @@ where
         )?
         .bind("amount_issued", new_amount_issued.to_i64())
         .bind("quote_id", quote_id.as_hyphenated().to_string())
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await
         .map_err(|err| {
             tracing::error!("SQLite could not update mint quote amount_issued");
@@ -685,7 +793,11 @@ VALUES (:quote_id, :amount, :timestamp);
         .bind("quote_id", quote_id.as_hyphenated().to_string())
         .bind("amount", amount_issued.to_i64())
         .bind("timestamp", current_time as i64)
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         Ok(new_amount_issued)
@@ -718,7 +830,7 @@ VALUES (:quote_id, :amount, :timestamp);
         .bind("created_time", quote.created_time as i64)
         .bind("payment_method", quote.payment_method.to_string())
         .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
-        .execute(&self.inner)
+        .execute(self.inner.as_ref().ok_or(Error::Internal("Missing active transaction".to_owned()))?)
         .await?;
 
         Ok(())
@@ -727,7 +839,11 @@ VALUES (:quote_id, :amount, :timestamp);
     async fn remove_mint_quote(&mut self, quote_id: &Uuid) -> Result<(), Self::Err> {
         query(r#"DELETE FROM mint_quote WHERE id=:id"#)?
             .bind("id", quote_id.as_hyphenated().to_string())
-            .execute(&self.inner)
+            .execute(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            )
             .await?;
         Ok(())
     }
@@ -746,7 +862,11 @@ VALUES (:quote_id, :amount, :timestamp);
         .bind("request_lookup_id", quote.request_lookup_id.to_string())
         .bind("state", MeltQuoteState::Unpaid.to_string())
         .bind("current_time", current_time as i64)
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         if row_affected > 0 {
@@ -786,7 +906,11 @@ VALUES (:quote_id, :amount, :timestamp);
             quote.options.map(|o| serde_json::to_string(&o).ok()),
         )
         .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         Ok(())
@@ -801,7 +925,7 @@ VALUES (:quote_id, :amount, :timestamp);
             .bind("new_req_id", new_request_lookup_id.to_string())
             .bind("new_kind",new_request_lookup_id.kind() )
             .bind("id", quote_id.as_hyphenated().to_string())
-            .execute(&self.inner)
+            .execute(self.inner.as_ref().ok_or(Error::Internal("Missing active transaction".to_owned()))?)
             .await?;
         Ok(())
     }
@@ -838,7 +962,11 @@ VALUES (:quote_id, :amount, :timestamp);
         )?
         .bind("id", quote_id.as_hyphenated().to_string())
         .bind("state", state.to_string())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?
         .map(sql_row_to_melt_quote)
         .transpose()?
@@ -851,13 +979,17 @@ VALUES (:quote_id, :amount, :timestamp);
                 .bind("paid_time", current_time as i64)
                 .bind("payment_preimage", payment_proof)
                 .bind("id", quote_id.as_hyphenated().to_string())
-                .execute(&self.inner)
+                .execute(self.inner.as_ref().ok_or(Error::Internal("Missing active transaction".to_owned()))?)
                 .await
         } else {
             query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#)?
                 .bind("state", state.to_string())
                 .bind("id", quote_id.as_hyphenated().to_string())
-                .execute(&self.inner)
+                .execute(
+                    self.inner
+                        .as_ref()
+                        .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+                )
                 .await
         };
 
@@ -883,15 +1015,31 @@ VALUES (:quote_id, :amount, :timestamp);
             "#,
         )?
         .bind("id", quote_id.as_hyphenated().to_string())
-        .execute(&self.inner)
+        .execute(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?;
 
         Ok(())
     }
 
     async fn get_mint_quote(&mut self, quote_id: &Uuid) -> Result<Option<MintQuote>, Self::Err> {
-        let payments = get_mint_quote_payments(&self.inner, quote_id).await?;
-        let issuance = get_mint_quote_issuance(&self.inner, quote_id).await?;
+        let payments = get_mint_quote_payments(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            quote_id,
+        )
+        .await?;
+        let issuance = get_mint_quote_issuance(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+            quote_id,
+        )
+        .await?;
 
         Ok(query(
             r#"
@@ -915,7 +1063,11 @@ VALUES (:quote_id, :amount, :timestamp);
             "#,
         )?
         .bind("id", quote_id.as_hyphenated().to_string())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?
         .map(|row| sql_row_to_mint_quote(row, payments, issuance))
         .transpose()?)
@@ -949,7 +1101,11 @@ VALUES (:quote_id, :amount, :timestamp);
             "#,
         )?
         .bind("id", quote_id.as_hyphenated().to_string())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?
         .map(sql_row_to_melt_quote)
         .transpose()?)
@@ -981,14 +1137,30 @@ VALUES (:quote_id, :amount, :timestamp);
             "#,
         )?
         .bind("request", request.to_string())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?
         .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
         .transpose()?;
 
         if let Some(quote) = mint_quote.as_mut() {
-            let payments = get_mint_quote_payments(&self.inner, &quote.id).await?;
-            let issuance = get_mint_quote_issuance(&self.inner, &quote.id).await?;
+            let payments = get_mint_quote_payments(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+                &quote.id,
+            )
+            .await?;
+            let issuance = get_mint_quote_issuance(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+                &quote.id,
+            )
+            .await?;
             quote.issuance = issuance;
             quote.payments = payments;
         }
@@ -1024,14 +1196,30 @@ VALUES (:quote_id, :amount, :timestamp);
         )?
         .bind("request_lookup_id", request_lookup_id.to_string())
         .bind("request_lookup_id_kind", request_lookup_id.kind())
-        .fetch_one(&self.inner)
+        .fetch_one(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?
         .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
         .transpose()?;
 
         if let Some(quote) = mint_quote.as_mut() {
-            let payments = get_mint_quote_payments(&self.inner, &quote.id).await?;
-            let issuance = get_mint_quote_issuance(&self.inner, &quote.id).await?;
+            let payments = get_mint_quote_payments(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+                &quote.id,
+            )
+            .await?;
+            let issuance = get_mint_quote_issuance(
+                self.inner
+                    .as_ref()
+                    .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+                &quote.id,
+            )
+            .await?;
             quote.issuance = issuance;
             quote.payments = payments;
         }
@@ -1041,10 +1229,11 @@ VALUES (:quote_id, :amount, :timestamp);
 }
 
 #[async_trait]
-impl<T, DB> MintQuotesDatabase for SQLMintDatabase<T, DB>
+impl<RM, DB, C> MintQuotesDatabase for SQLMintDatabase<RM, DB, C>
 where
     DB: DatabaseConnector,
-    T: ResourceManager<Resource = DB>,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
@@ -1265,10 +1454,11 @@ where
 }
 
 #[async_trait]
-impl<T, DB> MintProofsDatabase for SQLMintDatabase<T, DB>
+impl<RM, DB, C> MintProofsDatabase for SQLMintDatabase<RM, DB, C>
 where
     DB: DatabaseConnector,
-    T: ResourceManager<Resource = DB>,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
@@ -1372,9 +1562,11 @@ where
 }
 
 #[async_trait]
-impl<'a, T> MintSignatureTransaction<'a> for SQLTransaction<'a, T>
+impl<'a, DB, RM, C> MintSignatureTransaction<'a> for SQLTransaction<'a, DB, RM, C>
 where
-    T: DatabaseTransaction<'a>,
+    DB: DatabaseConnector,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
@@ -1409,7 +1601,7 @@ where
                 signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
             )
             .bind("created_time", current_time as i64)
-            .execute(&self.inner)
+            .execute(self.inner.as_ref().ok_or(Error::Internal("Missing active transaction".to_owned()))?)
             .await?;
         }
 
@@ -1440,7 +1632,11 @@ where
                 .map(|y| y.to_bytes().to_vec())
                 .collect(),
         )
-        .fetch_all(&self.inner)
+        .fetch_all(
+            self.inner
+                .as_ref()
+                .ok_or(Error::Internal("Missing active transaction".to_owned()))?,
+        )
         .await?
         .into_iter()
         .map(|mut row| {
@@ -1462,10 +1658,11 @@ where
 }
 
 #[async_trait]
-impl<T, DB> MintSignaturesDatabase for SQLMintDatabase<T, DB>
+impl<RM, DB, C> MintSignaturesDatabase for SQLMintDatabase<RM, DB, C>
 where
     DB: DatabaseConnector,
-    T: ResourceManager<Resource = DB>,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     type Err = Error;
 
@@ -1571,19 +1768,24 @@ where
 }
 
 #[async_trait]
-impl<T, DB> MintDatabase<Error> for SQLMintDatabase<T, DB>
+impl<RM, DB, C> MintDatabase<Error> for SQLMintDatabase<RM, DB, C>
 where
     DB: DatabaseConnector,
-    T: ResourceManager<Resource = DB>,
+    RM: ResourceManager<Resource = DB, Config = C, Error = Error>,
+    C: Debug + Clone + Send + Sync,
 {
     async fn begin_transaction<'a>(
         &'a self,
     ) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
-        let mut conn = self.db.get().map_err(|e| Error::Database(Box::new(e)))?;
-        Ok(Box::new(SQLTransaction {
-            inner: conn.begin().await?,
+        let mut tx = SQLTransaction::<_, _, C> {
+            conn: self.db.get().map_err(|e| Error::Database(Box::new(e)))?,
+            inner: None,
             _phantom: PhantomData,
-        }))
+        };
+
+        tx.begin().await?;
+
+        Ok(Box::new(tx))
     }
 
     async fn get_mint_info(&self) -> Result<MintInfo, Error> {