Browse Source

Migrate from `sqlx` to rusqlite

1. Add rusqlite with rusqlite with a working thread
2. Add wallet without a thread (synchronous)
3. Add custom migration
Cesar Rodas 5 tháng trước cách đây
mục cha
commit
76d402a417
47 tập tin đã thay đổi với 1889 bổ sung1481 xóa
  1. 0 2
      crates/cdk-mintd/src/main.rs
  2. 5 9
      crates/cdk-sqlite/Cargo.toml
  3. 26 38
      crates/cdk-sqlite/src/common.rs
  4. 2 0
      crates/cdk-sqlite/src/lib.rs
  5. 169 0
      crates/cdk-sqlite/src/macros.rs
  6. 397 0
      crates/cdk-sqlite/src/mint/async_rusqlite.rs
  7. 0 0
      crates/cdk-sqlite/src/mint/auth/migrations/V1_init.sql
  8. 243 395
      crates/cdk-sqlite/src/mint/auth/mod.rs
  9. 39 1
      crates/cdk-sqlite/src/mint/error.rs
  10. 0 0
      crates/cdk-sqlite/src/mint/migrations/V10__melt_requests.sql
  11. 0 0
      crates/cdk-sqlite/src/mint/migrations/V11__dleq_for_sigs.sql
  12. 0 0
      crates/cdk-sqlite/src/mint/migrations/V12__mint_mint_quote_pubkey.sql
  13. 0 0
      crates/cdk-sqlite/src/mint/migrations/V13__amount_to_pay_msats.sql
  14. 0 0
      crates/cdk-sqlite/src/mint/migrations/V14__remove_mint_url.sql
  15. 0 0
      crates/cdk-sqlite/src/mint/migrations/V15__add_config_table.sql
  16. 0 0
      crates/cdk-sqlite/src/mint/migrations/V16__keyset_id_as_foreign_key.sql
  17. 0 0
      crates/cdk-sqlite/src/mint/migrations/V17__mint_time_of_quotes.sql
  18. 0 0
      crates/cdk-sqlite/src/mint/migrations/V18__mint_created_time_signature.sql
  19. 0 0
      crates/cdk-sqlite/src/mint/migrations/V19__drop_keystore_foreign.sql
  20. 0 0
      crates/cdk-sqlite/src/mint/migrations/V1__init.sql
  21. 0 0
      crates/cdk-sqlite/src/mint/migrations/V2__quote_state.sql
  22. 0 0
      crates/cdk-sqlite/src/mint/migrations/V3__nut04_state.sql
  23. 0 0
      crates/cdk-sqlite/src/mint/migrations/V4__request_lookup_id.sql
  24. 0 0
      crates/cdk-sqlite/src/mint/migrations/V5__input_fee.sql
  25. 0 0
      crates/cdk-sqlite/src/mint/migrations/V6__derivation_path_index.sql
  26. 0 0
      crates/cdk-sqlite/src/mint/migrations/V7__allow_unspent.sql
  27. 0 0
      crates/cdk-sqlite/src/mint/migrations/V8__update_mint_url.sql
  28. 0 0
      crates/cdk-sqlite/src/mint/migrations/V9__proofs_quote_id.sql
  29. 469 637
      crates/cdk-sqlite/src/mint/mod.rs
  30. 159 0
      crates/cdk-sqlite/src/stmt.rs
  31. 20 1
      crates/cdk-sqlite/src/wallet/error.rs
  32. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V10__wallet_mint_quote_secretkey.sql
  33. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V11__mint_tos.sql
  34. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V12__drop_nostr_last_checked.sql
  35. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V13__allow_pending_spent.sql
  36. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V14__wallet_dleq_proofs.sql
  37. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V15__add_transactions_table.sql
  38. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V1__init.sql
  39. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V2__quote_state.sql
  40. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V3__nut04_state.sql
  41. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V4__input_fee.sql
  42. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V5__mint_icon_url.sql
  43. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V6__update_mint_url.sql
  44. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V7__icon_url.sql
  45. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V8__mint_time.sql
  46. 0 0
      crates/cdk-sqlite/src/wallet/migrations/V9__mint_urls.sql
  47. 360 398
      crates/cdk-sqlite/src/wallet/mod.rs

+ 0 - 2
crates/cdk-mintd/src/main.rs

@@ -407,8 +407,6 @@ async fn main() -> anyhow::Result<()> {
                     let sql_db_path = work_dir.join("cdk-mintd-auth.sqlite");
                     let sqlite_db = MintSqliteAuthDatabase::new(&sql_db_path).await?;
 
-                    sqlite_db.migrate().await;
-
                     Arc::new(sqlite_db)
                 }
                 #[cfg(feature = "redb")]

+ 5 - 9
crates/cdk-sqlite/Cargo.toml

@@ -16,23 +16,19 @@ default = ["mint", "wallet", "auth"]
 mint = ["cdk-common/mint"]
 wallet = ["cdk-common/wallet"]
 auth = ["cdk-common/auth"]
-sqlcipher = ["libsqlite3-sys"]
+sqlcipher = ["r2d2_sqlite/bundled-sqlcipher"]
 
 [dependencies]
 async-trait.workspace = true
 cdk-common = { workspace = true, features = ["test"] }
 bitcoin.workspace = true
-sqlx = { version = "0.7.4", default-features = false, features = [
-    "runtime-tokio-rustls",
-    "sqlite",
-    "macros",
-    "migrate",
-    "uuid",
-] }
-libsqlite3-sys = { version = "0.27.0", features = ["bundled-sqlcipher"], optional = true }
+r2d2_sqlite = { version = "0.19.0", features = ["bundled"] }
+r2d2 = { version = "0.8" }
 thiserror.workspace = true
 tokio.workspace = true
 tracing.workspace = true
+serde.workspace = true
 serde_json.workspace = true
 lightning-invoice.workspace = true
 uuid.workspace = true
+refinery = { version = "0.8.16", features = ["rusqlite", "rusqlite-bundled"] }

+ 26 - 38
crates/cdk-sqlite/src/common.rs

@@ -1,46 +1,34 @@
-use std::str::FromStr;
-use std::time::Duration;
+use r2d2::Pool;
+use r2d2_sqlite::SqliteConnectionManager;
 
-use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
-use sqlx::{Error, Pool, Sqlite};
-
-#[inline(always)]
-pub async fn create_sqlite_pool(
+/// Create a configured rusqlite connection to a SQLite database.
+/// For SQLCipher support, enable the "sqlcipher" feature and pass a password.
+pub fn create_sqlite_pool(
     path: &str,
     #[cfg(feature = "sqlcipher")] password: String,
-) -> Result<Pool<Sqlite>, Error> {
-    let db_options = SqliteConnectOptions::from_str(path)?
-        .busy_timeout(Duration::from_secs(10))
-        .read_only(false)
-        .pragma("busy_timeout", "5000")
-        .pragma("journal_mode", "wal")
-        .pragma("synchronous", "normal")
-        .pragma("temp_store", "memory")
-        .pragma("mmap_size", "30000000000")
-        .shared_cache(true)
-        .create_if_missing(true);
-
-    #[cfg(feature = "sqlcipher")]
-    let db_options = db_options.pragma("key", password);
+) -> Result<Pool<SqliteConnectionManager>, r2d2::Error> {
+    let (manager, is_memory) = if path.contains(":memory:") {
+        (SqliteConnectionManager::memory(), true)
+    } else {
+        (SqliteConnectionManager::file(path), false)
+    };
 
-    let is_memory = path.contains(":memory:");
+    let manager = manager.with_init(|conn| {
+        // Apply pragmas
+        conn.pragma_update(None, "busy_timeout", 5000)?;
+        conn.pragma_update(None, "journal_mode", "wal")?;
+        conn.pragma_update(None, "synchronous", "normal")?;
+        conn.pragma_update(None, "temp_store", "memory")?;
+        conn.pragma_update(None, "mmap_size", 30000000000i64)?;
+        conn.pragma_update(None, "cache", "shared")?;
 
-    let options = SqlitePoolOptions::new()
-        .min_connections(1)
-        .max_connections(1);
+        #[cfg(feature = "sqlcipher")]
+        conn.pragma_update(None, "key", &password)?;
 
-    let pool = if is_memory {
-        // Make sure that the connection is not closed after the first query, or any query, as long
-        // as the pool is not dropped
-        options
-            .idle_timeout(None)
-            .max_lifetime(None)
-            .test_before_acquire(false)
-    } else {
-        options
-    }
-    .connect_with(db_options)
-    .await?;
+        Ok(())
+    });
 
-    Ok(pool)
+    r2d2::Pool::builder()
+        .max_size(if is_memory { 1 } else { 20 })
+        .build(manager)
 }

+ 2 - 0
crates/cdk-sqlite/src/lib.rs

@@ -4,6 +4,8 @@
 #![warn(rustdoc::bare_urls)]
 
 mod common;
+mod macros;
+mod stmt;
 
 #[cfg(feature = "mint")]
 pub mod mint;

+ 169 - 0
crates/cdk-sqlite/src/macros.rs

@@ -0,0 +1,169 @@
+/// Unpacks a row<Column>, and consumes it, parsing into individual variables, checking the
+/// Vec<Column> is big enough
+#[macro_export]
+macro_rules! unpack_into {
+    (let ($($var:ident),+) = $array:expr) => {
+        let ($($var),+) = {
+            let mut vec = $array.to_vec();
+            vec.reverse();
+            let required = 0 $(+ {let _ = stringify!($var); 1})+;
+            if vec.len() < required {
+                return Err(Error::MissingColumn(required, vec.len()));
+            }
+            Ok::<_, Error>((
+                $(
+                    vec.pop().expect(&format!("Checked length already for {}", stringify!($var)))
+                ),+
+            ))?
+        };
+    };
+}
+
+/// Parses a SQLite column as a string or NULL
+#[macro_export]
+macro_rules! column_as_nullable_string {
+    ($col:expr, $callback_str:expr, $callback_bytes:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => Ok(Some(text).and_then($callback_str)),
+            $crate::stmt::Column::Blob(bytes) => Ok(Some(bytes).and_then($callback_bytes)),
+            $crate::stmt::Column::Null => Ok(None),
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+    ($col:expr, $callback_str:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => Ok(Some(text).and_then($callback_str)),
+            $crate::stmt::Column::Blob(bytes) => {
+                Ok(Some(String::from_utf8_lossy(&bytes)).and_then($callback_str))
+            }
+            $crate::stmt::Column::Null => Ok(None),
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+    ($col:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => Ok(Some(text.to_owned())),
+            $crate::stmt::Column::Blob(bytes) => {
+                Ok(Some(String::from_utf8_lossy(&bytes).to_string()))
+            }
+            $crate::stmt::Column::Null => Ok(None),
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+}
+
+/// Parses a column as a number or NULL
+#[macro_export]
+macro_rules! column_as_nullable_number {
+    ($col:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => Ok(Some(text.parse().map_err(|_| {
+                Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
+            })?)),
+            $crate::stmt::Column::Integer(n) => Ok(Some(n.try_into().map_err(|_| {
+                Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
+            })?)),
+            $crate::stmt::Column::Null => Ok(None),
+            other => Err(Error::InvalidType(
+                "Number".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+}
+
+/// Parses a column as a number
+#[macro_export]
+macro_rules! column_as_number {
+    ($col:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => text.parse().map_err(|_| {
+                Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
+            }),
+            $crate::stmt::Column::Integer(n) => n.try_into().map_err(|_| {
+                Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
+            }),
+            other => Err(Error::InvalidType(
+                "Number".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+}
+
+/// Parses a column as a NULL or Binary
+#[macro_export]
+macro_rules! column_as_nullable_binary {
+    ($col:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => Ok(Some(text.as_bytes().to_vec())),
+            $crate::stmt::Column::Blob(bytes) => Ok(Some(bytes.to_owned())),
+            $crate::stmt::Column::Null => Ok(None),
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+}
+
+/// Parses a SQLite column as a binary
+#[macro_export]
+macro_rules! column_as_binary {
+    ($col:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => Ok(text.as_bytes().to_vec()),
+            $crate::stmt::Column::Blob(bytes) => Ok(bytes.to_owned()),
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+}
+
+/// Parses a SQLite column as a string
+#[macro_export]
+macro_rules! column_as_string {
+    ($col:expr, $callback_str:expr, $callback_bytes:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => $callback_str(&text).map_err(Error::from),
+            $crate::stmt::Column::Blob(bytes) => $callback_bytes(&bytes).map_err(Error::from),
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+    ($col:expr, $callback:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => $callback(&text).map_err(Error::from),
+            $crate::stmt::Column::Blob(bytes) => {
+                $callback(&String::from_utf8_lossy(&bytes)).map_err(Error::from)
+            }
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+    ($col:expr) => {
+        (match $col {
+            $crate::stmt::Column::Text(text) => Ok(text.to_owned()),
+            $crate::stmt::Column::Blob(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()),
+            other => Err(Error::InvalidType(
+                "String".to_owned(),
+                other.data_type().to_string(),
+            )),
+        })?
+    };
+}

+ 397 - 0
crates/cdk-sqlite/src/mint/async_rusqlite.rs

@@ -0,0 +1,397 @@
+use std::marker::PhantomData;
+//use std::sync::atomic::AtomicUsize;
+//use std::sync::Arc;
+use std::thread::spawn;
+
+use r2d2_sqlite::rusqlite::Connection;
+use r2d2_sqlite::SqliteConnectionManager;
+use tokio::sync::{mpsc, oneshot};
+
+use crate::mint::Error;
+use crate::stmt::{Column, ExpectedSqlResponse, Statement as InnerStatement, Value};
+
+const BUFFER_REQUEST_SIZE: usize = 10_000;
+
+#[derive(Debug, Clone)]
+pub struct AsyncRusqlite {
+    sender: mpsc::Sender<DbRequest>,
+    //inflight_requests: Arc<AtomicUsize>,
+}
+
+/// Internal request for the database thread
+#[derive(Debug)]
+pub enum DbRequest {
+    Sql(InnerStatement, oneshot::Sender<DbResponse>),
+    Begin(oneshot::Sender<DbResponse>),
+    Commit(oneshot::Sender<DbResponse>),
+    Rollback(oneshot::Sender<DbResponse>),
+}
+
+#[derive(Debug)]
+pub enum DbResponse {
+    Transaction(mpsc::Sender<DbRequest>),
+    AffectedRows(usize),
+    Pluck(Option<Column>),
+    Row(Option<Vec<Column>>),
+    Rows(Vec<Vec<Column>>),
+    Error(Error),
+    Unexpected,
+    Ok,
+}
+
+/// Statement for the async_rusqlite wrapper
+pub struct Statement(InnerStatement);
+
+impl Statement {
+    /// Bind a variable
+    pub fn bind<C: ToString, V: Into<Value>>(self, name: C, value: V) -> Self {
+        Self(self.0.bind(name, value))
+    }
+
+    /// Bind vec
+    pub fn bind_vec<C: ToString, V: Into<Value>>(self, name: C, value: Vec<V>) -> Self {
+        Self(self.0.bind_vec(name, value))
+    }
+
+    /// Executes a query and return the number of affected rows
+    pub async fn execute<C>(self, conn: &C) -> Result<usize, Error>
+    where
+        C: DatabaseExecutor + Send + Sync,
+    {
+        conn.execute(self.0).await
+    }
+
+    /// Returns the first column of the first row of the query result
+    pub async fn pluck<C>(self, conn: &C) -> Result<Option<Column>, Error>
+    where
+        C: DatabaseExecutor + Send + Sync,
+    {
+        conn.pluck(self.0).await
+    }
+
+    /// Returns the first row of the query result
+    pub async fn fetch_one<C>(self, conn: &C) -> Result<Option<Vec<Column>>, Error>
+    where
+        C: DatabaseExecutor + Send + Sync,
+    {
+        conn.fetch_one(self.0).await
+    }
+
+    /// Returns all rows of the query result
+    pub async fn fetch_all<C>(self, conn: &C) -> Result<Vec<Vec<Column>>, Error>
+    where
+        C: DatabaseExecutor + Send + Sync,
+    {
+        conn.fetch_all(self.0).await
+    }
+}
+
+#[inline(always)]
+fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, Error> {
+    let mut stmt = conn.prepare_cached(&sql.sql)?;
+    for (name, value) in sql.args {
+        let index = stmt
+            .parameter_index(&name)
+            .map_err(|_| Error::MissingParameter(name.clone()))?
+            .ok_or(Error::MissingParameter(name))?;
+
+        stmt.raw_bind_parameter(index, value)?;
+    }
+
+    let columns = stmt.column_count();
+
+    Ok(match sql.expected_response {
+        ExpectedSqlResponse::AffectedRows => DbResponse::AffectedRows(stmt.raw_execute()?),
+        ExpectedSqlResponse::ManyRows => {
+            let mut rows = stmt.raw_query();
+            let mut results = vec![];
+
+            while let Some(row) = rows.next()? {
+                results.push(
+                    (0..columns)
+                        .map(|i| row.get(i))
+                        .collect::<Result<Vec<_>, _>>()?,
+                )
+            }
+
+            DbResponse::Rows(results)
+        }
+        ExpectedSqlResponse::Pluck => {
+            let mut rows = stmt.raw_query();
+            DbResponse::Pluck(rows.next()?.map(|row| row.get(0usize)).transpose()?)
+        }
+        ExpectedSqlResponse::SingleRow => {
+            let mut rows = stmt.raw_query();
+            let row = rows
+                .next()?
+                .map(|row| {
+                    (0..columns)
+                        .map(|i| row.get(i))
+                        .collect::<Result<Vec<_>, _>>()
+                })
+                .transpose()?;
+            DbResponse::Row(row)
+        }
+    })
+}
+
+fn rusqlite_worker(
+    mut receiver: mpsc::Receiver<DbRequest>,
+    pool: r2d2::Pool<SqliteConnectionManager>,
+) {
+    while let Some(request) = receiver.blocking_recv() {
+        match request {
+            DbRequest::Sql(sql, reply_to) => {
+                let conn = match pool.get() {
+                    Ok(conn) => conn,
+                    Err(err) => {
+                        let _ = reply_to.send(DbResponse::Error(err.into()));
+                        continue;
+                    }
+                };
+
+                let result = process_query(&conn, sql);
+                let _ = match result {
+                    Ok(ok) => reply_to.send(ok),
+                    Err(err) => reply_to.send(DbResponse::Error(err)),
+                };
+                drop(conn);
+            }
+            DbRequest::Begin(reply_to) => {
+                let (sender, mut receiver) = mpsc::channel(BUFFER_REQUEST_SIZE);
+                let mut conn = match pool.get() {
+                    Ok(conn) => conn,
+                    Err(err) => {
+                        let _ = reply_to.send(DbResponse::Error(err.into()));
+                        continue;
+                    }
+                };
+
+                let tx = match conn.transaction() {
+                    Ok(tx) => tx,
+                    Err(err) => {
+                        let _ = reply_to.send(DbResponse::Error(err.into()));
+                        continue;
+                    }
+                };
+
+                // Transaction has begun successfully, send the `sender` back to the caller
+                // and wait for statements to execute. On `Drop` the wrapper transaction
+                // should send a `rollback`.
+                //
+                let _ = reply_to.send(DbResponse::Transaction(sender));
+
+                // We intentionally handle the transaction hijacking the main loop, there is
+                // no point is queueing more operations for SQLite, since transaction have
+                // exclusive access. In other database implementation this block of code
+                // should be sent to their own thread to allow concurrency
+                loop {
+                    let request = if let Some(request) = receiver.blocking_recv() {
+                        request
+                    } else {
+                        // If the receiver loop is broken (i.e no more `senders` are active) and no
+                        // `Commit` statement has been sent, this will trigger a `Rollback`
+                        // automatically
+                        let _ = tx.rollback();
+                        break;
+                    };
+
+                    match request {
+                        DbRequest::Commit(reply_to) => {
+                            let _ = reply_to.send(match tx.commit() {
+                                Ok(()) => DbResponse::Ok,
+                                Err(err) => DbResponse::Error(err.into()),
+                            });
+                            break;
+                        }
+                        DbRequest::Rollback(reply_to) => {
+                            let _ = reply_to.send(match tx.rollback() {
+                                Ok(()) => DbResponse::Ok,
+                                Err(err) => DbResponse::Error(err.into()),
+                            });
+                            break;
+                        }
+                        DbRequest::Begin(reply_to) => {
+                            let _ = reply_to.send(DbResponse::Unexpected);
+                        }
+                        DbRequest::Sql(sql, reply_to) => {
+                            let _ = match process_query(&tx, sql) {
+                                Ok(ok) => reply_to.send(ok),
+                                Err(err) => reply_to.send(DbResponse::Error(err)),
+                            };
+                        }
+                    }
+                }
+
+                drop(conn);
+            }
+            DbRequest::Commit(reply_to) => {
+                let _ = reply_to.send(DbResponse::Unexpected);
+            }
+            DbRequest::Rollback(reply_to) => {
+                let _ = reply_to.send(DbResponse::Unexpected);
+            }
+        }
+    }
+}
+
+#[async_trait::async_trait]
+pub trait DatabaseExecutor {
+    fn get_queue_sender(&self) -> mpsc::Sender<DbRequest>;
+
+    async fn execute(&self, mut statement: InnerStatement) -> Result<usize, Error> {
+        let (sender, receiver) = oneshot::channel();
+        statement.expected_response = ExpectedSqlResponse::AffectedRows;
+        self.get_queue_sender()
+            .send(DbRequest::Sql(statement, sender))
+            .await
+            .map_err(|_| Error::Communication)?;
+
+        match receiver.await.map_err(|_| Error::Communication)? {
+            DbResponse::AffectedRows(n) => Ok(n),
+            DbResponse::Error(err) => Err(err),
+            _ => Err(Error::InvalidDbResponse),
+        }
+    }
+
+    async fn fetch_one(&self, mut statement: InnerStatement) -> Result<Option<Vec<Column>>, Error> {
+        let (sender, receiver) = oneshot::channel();
+        statement.expected_response = ExpectedSqlResponse::SingleRow;
+        self.get_queue_sender()
+            .send(DbRequest::Sql(statement, sender))
+            .await
+            .map_err(|_| Error::Communication)?;
+
+        match receiver.await.map_err(|_| Error::Communication)? {
+            DbResponse::Row(row) => Ok(row),
+            DbResponse::Error(err) => Err(err),
+            _ => Err(Error::InvalidDbResponse),
+        }
+    }
+
+    async fn fetch_all(&self, mut statement: InnerStatement) -> Result<Vec<Vec<Column>>, Error> {
+        let (sender, receiver) = oneshot::channel();
+        statement.expected_response = ExpectedSqlResponse::ManyRows;
+        self.get_queue_sender()
+            .send(DbRequest::Sql(statement, sender))
+            .await
+            .map_err(|_| Error::Communication)?;
+
+        match receiver.await.map_err(|_| Error::Communication)? {
+            DbResponse::Rows(rows) => Ok(rows),
+            DbResponse::Error(err) => Err(err),
+            _ => Err(Error::InvalidDbResponse),
+        }
+    }
+
+    async fn pluck(&self, mut statement: InnerStatement) -> Result<Option<Column>, Error> {
+        let (sender, receiver) = oneshot::channel();
+        statement.expected_response = ExpectedSqlResponse::Pluck;
+        self.get_queue_sender()
+            .send(DbRequest::Sql(statement, sender))
+            .await
+            .map_err(|_| Error::Communication)?;
+
+        match receiver.await.map_err(|_| Error::Communication)? {
+            DbResponse::Pluck(value) => Ok(value),
+            DbResponse::Error(err) => Err(err),
+            _ => Err(Error::InvalidDbResponse),
+        }
+    }
+}
+
+#[inline(always)]
+pub fn query<T: ToString>(sql: T) -> Statement {
+    Statement(crate::stmt::Statement::new(sql))
+}
+
+impl AsyncRusqlite {
+    pub fn new(pool: r2d2::Pool<SqliteConnectionManager>) -> Self {
+        let (sender, receiver) = mpsc::channel(BUFFER_REQUEST_SIZE);
+        spawn(move || {
+            rusqlite_worker(receiver, pool);
+        });
+
+        Self {
+            sender,
+            //inflight_requests: Arc::new(0.into()),
+        }
+    }
+
+    /// Begins a transaction
+    ///
+    /// If the transaction is Drop it will trigger a rollback operation
+    pub async fn begin(&self) -> Result<Transaction<'_>, Error> {
+        let (sender, receiver) = oneshot::channel();
+        self.sender
+            .send(DbRequest::Begin(sender))
+            .await
+            .map_err(|_| Error::Communication)?;
+
+        match receiver.await.map_err(|_| Error::Communication)? {
+            DbResponse::Transaction(db_sender) => Ok(Transaction {
+                db_sender,
+                _marker: PhantomData,
+            }),
+            DbResponse::Error(err) => Err(err),
+            _ => Err(Error::InvalidDbResponse),
+        }
+    }
+}
+
+impl DatabaseExecutor for AsyncRusqlite {
+    #[inline(always)]
+    fn get_queue_sender(&self) -> mpsc::Sender<DbRequest> {
+        self.sender.clone()
+    }
+}
+
+pub struct Transaction<'conn> {
+    db_sender: mpsc::Sender<DbRequest>,
+    _marker: PhantomData<&'conn ()>,
+}
+
+impl Drop for Transaction<'_> {
+    fn drop(&mut self) {
+        let (sender, _) = oneshot::channel();
+        let _ = self.db_sender.try_send(DbRequest::Rollback(sender));
+    }
+}
+
+impl Transaction<'_> {
+    pub async fn commit(self) -> Result<(), Error> {
+        let (sender, receiver) = oneshot::channel();
+        self.db_sender
+            .send(DbRequest::Commit(sender))
+            .await
+            .map_err(|_| Error::Communication)?;
+
+        match receiver.await.map_err(|_| Error::Communication)? {
+            DbResponse::Ok => Ok(()),
+            DbResponse::Error(err) => Err(err),
+            _ => Err(Error::InvalidDbResponse),
+        }
+    }
+
+    pub async fn rollback(self) -> Result<(), Error> {
+        let (sender, receiver) = oneshot::channel();
+        self.db_sender
+            .send(DbRequest::Rollback(sender))
+            .await
+            .map_err(|_| Error::Communication)?;
+
+        match receiver.await.map_err(|_| Error::Communication)? {
+            DbResponse::Ok => Ok(()),
+            DbResponse::Error(err) => Err(err),
+            _ => Err(Error::InvalidDbResponse),
+        }
+    }
+}
+
+impl DatabaseExecutor for Transaction<'_> {
+    /// Get the internal sender to the SQL queue
+    #[inline(always)]
+    fn get_queue_sender(&self) -> mpsc::Sender<DbRequest> {
+        self.db_sender.clone()
+    }
+}

+ 0 - 0
crates/cdk-sqlite/src/mint/auth/migrations/20250109143347_init.sql → crates/cdk-sqlite/src/mint/auth/migrations/V1_init.sql


+ 243 - 395
crates/cdk-sqlite/src/mint/auth/mod.rs

@@ -3,50 +3,41 @@
 use std::collections::HashMap;
 use std::path::Path;
 use std::str::FromStr;
-use std::time::Duration;
 
 use async_trait::async_trait;
 use cdk_common::database::{self, MintAuthDatabase};
 use cdk_common::mint::MintKeySetInfo;
 use cdk_common::nuts::{AuthProof, BlindSignature, Id, PublicKey, State};
 use cdk_common::{AuthRequired, ProtectedEndpoint};
-use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
-use sqlx::Row;
 use tracing::instrument;
 
+use super::async_rusqlite::AsyncRusqlite;
 use super::{sqlite_row_to_blind_signature, sqlite_row_to_keyset_info};
+use crate::column_as_string;
+use crate::common::create_sqlite_pool;
+use crate::mint::async_rusqlite::query;
 use crate::mint::Error;
 
 /// Mint SQLite Database
 #[derive(Debug, Clone)]
 pub struct MintSqliteAuthDatabase {
-    pool: SqlitePool,
+    pool: AsyncRusqlite,
 }
 
+refinery::embed_migrations!("./src/mint/auth/migrations");
+
 impl MintSqliteAuthDatabase {
     /// Create new [`MintSqliteAuthDatabase`]
-    pub async fn new(path: &Path) -> Result<Self, Error> {
-        let path = path.to_str().ok_or(Error::InvalidDbPath)?;
-        let db_options = SqliteConnectOptions::from_str(path)?
-            .busy_timeout(Duration::from_secs(5))
-            .read_only(false)
-            .create_if_missing(true)
-            .auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Full);
-
-        let pool = SqlitePoolOptions::new()
-            .max_connections(1)
-            .connect_with(db_options)
-            .await?;
+    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+        let pool = create_sqlite_pool(path.as_ref().to_str().ok_or(Error::InvalidDbPath)?)?;
+        let mut conn = pool.get()?;
 
-        Ok(Self { pool })
-    }
+        migrations::runner().run(&mut *conn)?;
+        drop(conn);
 
-    /// Migrate [`MintSqliteAuthDatabase`]
-    pub async fn migrate(&self) {
-        sqlx::migrate!("./src/mint/auth/migrations")
-            .run(&self.pool)
-            .await
-            .expect("Could not run migrations");
+        Ok(Self {
+            pool: AsyncRusqlite::new(pool),
+        })
     }
 }
 
@@ -57,230 +48,153 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
     #[instrument(skip(self))]
     async fn set_active_keyset(&self, id: Id) -> Result<(), Self::Err> {
         tracing::info!("Setting auth keyset {id} active");
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-        let update_res = sqlx::query(
+        query(
             r#"
-    UPDATE keyset 
-    SET active = CASE 
-        WHEN id = ? THEN TRUE
-        ELSE FALSE
-    END;
-    "#,
+            UPDATE keyset
+            SET active = CASE
+                WHEN id = :id THEN TRUE
+                ELSE FALSE
+            END;
+            "#,
         )
-        .bind(id.to_string())
-        .execute(&mut *transaction)
-        .await;
-
-        match update_res {
-            Ok(_) => {
-                transaction.commit().await.map_err(Error::from)?;
-                Ok(())
-            }
-            Err(err) => {
-                tracing::error!("SQLite Could not update keyset");
-                if let Err(err) = transaction.rollback().await {
-                    tracing::error!("Could not rollback sql transaction: {}", err);
-                }
-                Err(Error::from(err).into())
-            }
-        }
+        .bind(":id", id.to_string())
+        .execute(&self.pool)
+        .await?;
+
+        Ok(())
     }
 
     async fn get_active_keyset_id(&self) -> Result<Option<Id>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-
-        let rec = sqlx::query(
+        Ok(query(
             r#"
-SELECT id
-FROM keyset
-WHERE active = 1;
-        "#,
+            SELECT
+                id
+            FROM
+                keyset
+            WHERE
+                active = 1;
+            "#,
         )
-        .fetch_one(&mut *transaction)
-        .await;
-
-        let rec = match rec {
-            Ok(rec) => {
-                transaction.commit().await.map_err(Error::from)?;
-                rec
-            }
-            Err(err) => match err {
-                sqlx::Error::RowNotFound => {
-                    transaction.commit().await.map_err(Error::from)?;
-                    return Ok(None);
-                }
-                _ => {
-                    return {
-                        if let Err(err) = transaction.rollback().await {
-                            tracing::error!("Could not rollback sql transaction: {}", err);
-                        }
-                        Err(Error::SQLX(err).into())
-                    }
-                }
-            },
-        };
-
-        Ok(Some(
-            Id::from_str(rec.try_get("id").map_err(Error::from)?).map_err(Error::from)?,
-        ))
+        .pluck(&self.pool)
+        .await?
+        .map(|id| Ok::<_, Error>(column_as_string!(id, Id::from_str, Id::from_bytes)))
+        .transpose()?)
     }
 
     async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-        let res = sqlx::query(
+        query(
             r#"
-INSERT OR REPLACE INTO keyset
-(id, unit, active, valid_from, valid_to, derivation_path, max_order, derivation_path_index)
-VALUES (?, ?, ?, ?, ?, ?, ?, ?);
+        INSERT INTO
+            keyset (
+                id, unit, active, valid_from, valid_to, derivation_path,
+                max_order, derivation_path_index
+            )
+        VALUES (
+            :id, :unit, :active, :valid_from, :valid_to, :derivation_path,
+            :max_order, :derivation_path_index
+        )
+        ON CONFLICT(id) DO UPDATE SET
+            unit = excluded.unit,
+            active = excluded.active,
+            valid_from = excluded.valid_from,
+            valid_to = excluded.valid_to,
+            derivation_path = excluded.derivation_path,
+            max_order = excluded.max_order,
+            derivation_path_index = excluded.derivation_path_index
         "#,
         )
-        .bind(keyset.id.to_string())
-        .bind(keyset.unit.to_string())
-        .bind(keyset.active)
-        .bind(keyset.valid_from as i64)
-        .bind(keyset.valid_to.map(|v| v as i64))
-        .bind(keyset.derivation_path.to_string())
-        .bind(keyset.max_order)
-        .bind(keyset.derivation_path_index)
-        .execute(&mut *transaction)
-        .await;
-
-        match res {
-            Ok(_) => {
-                transaction.commit().await.map_err(Error::from)?;
-                Ok(())
-            }
-            Err(err) => {
-                tracing::error!("SQLite could not add keyset info");
-                if let Err(err) = transaction.rollback().await {
-                    tracing::error!("Could not rollback sql transaction: {}", err);
-                }
+        .bind(":id", keyset.id.to_string())
+        .bind(":unit", keyset.unit.to_string())
+        .bind(":active", keyset.active)
+        .bind(":valid_from", keyset.valid_from as i64)
+        .bind(":valid_to", keyset.valid_to.map(|v| v as i64))
+        .bind(":derivation_path", keyset.derivation_path.to_string())
+        .bind(":max_order", keyset.max_order)
+        .bind(":derivation_path_index", keyset.derivation_path_index)
+        .execute(&self.pool)
+        .await?;
 
-                Err(Error::from(err).into())
-            }
-        }
+        Ok(())
     }
 
     async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-        let rec = sqlx::query(
-            r#"
-SELECT *
-FROM keyset
-WHERE id=?;
-        "#,
+        Ok(query(
+            r#"SELECT
+                id,
+                unit,
+                active,
+                valid_from,
+                valid_to,
+                derivation_path,
+                derivation_path_index,
+                max_order,
+                input_fee_ppk
+            FROM
+                keyset
+                WHERE id=:id"#,
         )
-        .bind(id.to_string())
-        .fetch_one(&mut *transaction)
-        .await;
-
-        match rec {
-            Ok(rec) => {
-                transaction.commit().await.map_err(Error::from)?;
-                Ok(Some(sqlite_row_to_keyset_info(rec)?))
-            }
-            Err(err) => match err {
-                sqlx::Error::RowNotFound => {
-                    transaction.commit().await.map_err(Error::from)?;
-                    return Ok(None);
-                }
-                _ => {
-                    tracing::error!("SQLite could not get keyset info");
-                    if let Err(err) = transaction.rollback().await {
-                        tracing::error!("Could not rollback sql transaction: {}", err);
-                    }
-                    return Err(Error::SQLX(err).into());
-                }
-            },
-        }
+        .bind(":id", id.to_string())
+        .fetch_one(&self.pool)
+        .await?
+        .map(sqlite_row_to_keyset_info)
+        .transpose()?)
     }
 
     async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-        let recs = sqlx::query(
-            r#"
-SELECT *
-FROM keyset;
-        "#,
+        Ok(query(
+            r#"SELECT
+                id,
+                unit,
+                active,
+                valid_from,
+                valid_to,
+                derivation_path,
+                derivation_path_index,
+                max_order,
+                input_fee_ppk
+            FROM
+                keyset
+                WHERE id=:id"#,
         )
-        .fetch_all(&mut *transaction)
-        .await
-        .map_err(Error::from);
-
-        match recs {
-            Ok(recs) => {
-                transaction.commit().await.map_err(Error::from)?;
-                Ok(recs
-                    .into_iter()
-                    .map(sqlite_row_to_keyset_info)
-                    .collect::<Result<_, _>>()?)
-            }
-            Err(err) => {
-                tracing::error!("SQLite could not get keyset info");
-                if let Err(err) = transaction.rollback().await {
-                    tracing::error!("Could not rollback sql transaction: {}", err);
-                }
-                Err(err.into())
-            }
-        }
+        .fetch_all(&self.pool)
+        .await?
+        .into_iter()
+        .map(sqlite_row_to_keyset_info)
+        .collect::<Result<Vec<_>, _>>()?)
     }
 
     async fn add_proof(&self, proof: AuthProof) -> Result<(), Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-        if let Err(err) = sqlx::query(
+        query(
             r#"
-INSERT INTO proof
-(y, keyset_id, secret, c, state)
-VALUES (?, ?, ?, ?, ?);
-        "#,
+            INSERT INTO proof
+            (y, keyset_id, secret, c, state)
+            VALUES
+            (:y, :keyset_id, :secret, :c, :state)
+            "#,
         )
-        .bind(proof.y()?.to_bytes().to_vec())
-        .bind(proof.keyset_id.to_string())
-        .bind(proof.secret.to_string())
-        .bind(proof.c.to_bytes().to_vec())
-        .bind("UNSPENT")
-        .execute(&mut *transaction)
-        .await
-        .map_err(Error::from)
-        {
-            tracing::debug!("Attempting to add known proof. Skipping.... {:?}", err);
-        }
-        transaction.commit().await.map_err(Error::from)?;
-
+        .bind(":y", proof.y()?.to_bytes().to_vec())
+        .bind(":keyset_id", proof.keyset_id.to_string())
+        .bind(":secret", proof.secret.to_string())
+        .bind(":c", proof.c.to_bytes().to_vec())
+        .bind(":state", "UNSPENT".to_string())
+        .execute(&self.pool)
+        .await?;
         Ok(())
     }
 
     async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-
-        let sql = format!(
-            "SELECT y, state FROM proof WHERE y IN ({})",
-            "?,".repeat(ys.len()).trim_end_matches(',')
-        );
-
-        let mut current_states = ys
-            .iter()
-            .fold(sqlx::query(&sql), |query, y| {
-                query.bind(y.to_bytes().to_vec())
-            })
-            .fetch_all(&mut *transaction)
-            .await
-            .map_err(|err| {
-                tracing::error!("SQLite could not get state of proof: {err:?}");
-                Error::SQLX(err)
-            })?
+        let mut current_states = query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)
+            .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
+            .fetch_all(&self.pool)
+            .await?
             .into_iter()
             .map(|row| {
-                PublicKey::from_slice(row.get("y"))
-                    .map_err(Error::from)
-                    .and_then(|y| {
-                        let state: String = row.get("state");
-                        State::from_str(&state)
-                            .map_err(Error::from)
-                            .map(|state| (y, state))
-                    })
+                Ok((
+                    column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
+                    column_as_string!(&row[1], State::from_str),
+                ))
             })
-            .collect::<Result<HashMap<_, _>, _>>()?;
+            .collect::<Result<HashMap<_, _>, Error>>()?;
 
         Ok(ys.iter().map(|y| current_states.remove(y)).collect())
     }
@@ -290,70 +204,56 @@ VALUES (?, ?, ?, ?, ?);
         y: &PublicKey,
         proofs_state: State,
     ) -> Result<Option<State>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
+        let transaction = self.pool.begin().await?;
 
-        // Get current state for single y
-        let current_state = sqlx::query("SELECT state FROM proof WHERE y = ?")
-            .bind(y.to_bytes().to_vec())
-            .fetch_optional(&mut *transaction)
-            .await
-            .map_err(|err| {
-                tracing::error!("SQLite could not get state of proof: {err:?}");
-                Error::SQLX(err)
-            })?
-            .map(|row| {
-                let state: String = row.get("state");
-                State::from_str(&state).map_err(Error::from)
-            })
+        let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#)
+            .bind(":y", y.to_bytes().to_vec())
+            .pluck(&transaction)
+            .await?
+            .map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str)))
             .transpose()?;
 
-        // Update state for single y
-        sqlx::query("UPDATE proof SET state = ? WHERE state != ? AND y = ?")
-            .bind(proofs_state.to_string())
-            .bind(State::Spent.to_string())
-            .bind(y.to_bytes().to_vec())
-            .execute(&mut *transaction)
-            .await
-            .map_err(|err| {
-                tracing::error!("SQLite could not update proof state: {err:?}");
-                Error::SQLX(err)
-            })?;
+        query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#)
+            .bind(":y", y.to_bytes().to_vec())
+            .bind(
+                ":state",
+                current_state.as_ref().map(|state| state.to_string()),
+            )
+            .bind(":new_state", proofs_state.to_string())
+            .execute(&transaction)
+            .await?;
+
+        transaction.commit().await?;
 
-        transaction.commit().await.map_err(Error::from)?;
         Ok(current_state)
     }
 
     async fn add_blind_signatures(
         &self,
         blinded_messages: &[PublicKey],
-        blind_signatures: &[BlindSignature],
+        blinded_signatures: &[BlindSignature],
     ) -> Result<(), Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-        for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
-            let res = sqlx::query(
+        let transaction = self.pool.begin().await?;
+
+        for (message, signature) in blinded_messages.iter().zip(blinded_signatures) {
+            query(
                 r#"
-INSERT INTO blind_signature
-(y, amount, keyset_id, c)
-VALUES (?, ?, ?, ?);
-        "#,
+                    INSERT
+                    INTO blind_signature
+                    (y, amount, keyset_id, c)
+                    VALUES
+                    (:y, :amount, :keyset_id, :c)
+                "#,
             )
-            .bind(message.to_bytes().to_vec())
-            .bind(u64::from(signature.amount) as i64)
-            .bind(signature.keyset_id.to_string())
-            .bind(signature.c.to_bytes().to_vec())
-            .execute(&mut *transaction)
-            .await;
-
-            if let Err(err) = res {
-                tracing::error!("SQLite could not add blind signature");
-                if let Err(err) = transaction.rollback().await {
-                    tracing::error!("Could not rollback sql transaction: {}", err);
-                }
-                return Err(Error::SQLX(err).into());
-            }
+            .bind(":y", message.to_bytes().to_vec())
+            .bind(":amount", u64::from(signature.amount) as i64)
+            .bind(":keyset_id", signature.keyset_id.to_string())
+            .bind(":c", signature.c.to_bytes().to_vec())
+            .execute(&transaction)
+            .await?;
         }
 
-        transaction.commit().await.map_err(Error::from)?;
+        transaction.commit().await?;
 
         Ok(())
     }
@@ -362,32 +262,40 @@ VALUES (?, ?, ?, ?);
         &self,
         blinded_messages: &[PublicKey],
     ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-
-        let sql = format!(
-            "SELECT * FROM blind_signature WHERE y IN ({})",
-            "?,".repeat(blinded_messages.len()).trim_end_matches(',')
-        );
-
-        let mut blinded_signatures = blinded_messages
-            .iter()
-            .fold(sqlx::query(&sql), |query, y| {
-                query.bind(y.to_bytes().to_vec())
-            })
-            .fetch_all(&mut *transaction)
-            .await
-            .map_err(|err| {
-                tracing::error!("SQLite could not get state of proof: {err:?}");
-                Error::SQLX(err)
-            })?
-            .into_iter()
-            .map(|row| {
-                PublicKey::from_slice(row.get("y"))
-                    .map_err(Error::from)
-                    .and_then(|y| sqlite_row_to_blind_signature(row).map(|blinded| (y, blinded)))
-            })
-            .collect::<Result<HashMap<_, _>, _>>()?;
-
+        let mut blinded_signatures = query(
+            r#"SELECT
+                keyset_id,
+                amount,
+                c,
+                dleq_e,
+                dleq_s,
+                y
+            FROM
+                blind_signature
+            WHERE y IN (:y)
+            "#,
+        )
+        .bind_vec(
+            ":y",
+            blinded_messages
+                .iter()
+                .map(|y| y.to_bytes().to_vec())
+                .collect(),
+        )
+        .fetch_all(&self.pool)
+        .await?
+        .into_iter()
+        .map(|mut row| {
+            Ok((
+                column_as_string!(
+                    &row.pop().ok_or(Error::InvalidDbResponse)?,
+                    PublicKey::from_hex,
+                    PublicKey::from_slice
+                ),
+                sqlite_row_to_blind_signature(row)?,
+            ))
+        })
+        .collect::<Result<HashMap<_, _>, Error>>()?;
         Ok(blinded_messages
             .iter()
             .map(|y| blinded_signatures.remove(y))
@@ -398,21 +306,20 @@ VALUES (?, ?, ?, ?);
         &self,
         protected_endpoints: HashMap<ProtectedEndpoint, AuthRequired>,
     ) -> Result<(), Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
+        let transaction = self.pool.begin().await?;
 
         for (endpoint, auth) in protected_endpoints.iter() {
-            if let Err(err) = sqlx::query(
+            if let Err(err) = query(
                 r#"
-INSERT OR REPLACE INTO protected_endpoints
-(endpoint, auth)
-VALUES (?, ?);
-        "#,
+                INSERT OR REPLACE INTO protected_endpoints
+                (endpoint, auth)
+                VALUES (:endpoint, :auth);
+                "#,
             )
-            .bind(serde_json::to_string(endpoint)?)
-            .bind(serde_json::to_string(auth)?)
-            .execute(&mut *transaction)
+            .bind(":endpoint", serde_json::to_string(endpoint)?)
+            .bind(":auth", serde_json::to_string(auth)?)
+            .execute(&transaction)
             .await
-            .map_err(Error::from)
             {
                 tracing::debug!(
                     "Attempting to add protected endpoint. Skipping.... {:?}",
@@ -421,7 +328,7 @@ VALUES (?, ?);
             }
         }
 
-        transaction.commit().await.map_err(Error::from)?;
+        transaction.commit().await?;
 
         Ok(())
     }
@@ -429,111 +336,52 @@ VALUES (?, ?);
         &self,
         protected_endpoints: Vec<ProtectedEndpoint>,
     ) -> Result<(), Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-
-        let sql = format!(
-            "DELETE FROM protected_endpoints WHERE endpoint IN ({})",
-            std::iter::repeat("?")
-                .take(protected_endpoints.len())
-                .collect::<Vec<_>>()
-                .join(",")
-        );
-
-        let endpoints = protected_endpoints
-            .iter()
-            .map(serde_json::to_string)
-            .collect::<Result<Vec<_>, _>>()?;
-
-        endpoints
-            .iter()
-            .fold(sqlx::query(&sql), |query, endpoint| query.bind(endpoint))
-            .execute(&mut *transaction)
-            .await
-            .map_err(Error::from)?;
-
-        transaction.commit().await.map_err(Error::from)?;
+        query(r#"DELETE FROM protected_endpoints WHERE endpoint IN (:endpoints)"#)
+            .bind_vec(
+                ":endpoints",
+                protected_endpoints
+                    .iter()
+                    .map(serde_json::to_string)
+                    .collect::<Result<_, _>>()?,
+            )
+            .execute(&self.pool)
+            .await?;
         Ok(())
     }
+
     async fn get_auth_for_endpoint(
         &self,
         protected_endpoint: ProtectedEndpoint,
     ) -> Result<Option<AuthRequired>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-
-        let rec = sqlx::query(
-            r#"
-SELECT *
-FROM protected_endpoints
-WHERE endpoint=?;
-        "#,
+        Ok(
+            query(r#"SELECT auth FROM protected_endpoints WHERE endpoint = :endpoint"#)
+                .bind(":endpoint", serde_json::to_string(&protected_endpoint)?)
+                .pluck(&self.pool)
+                .await?
+                .map(|auth| {
+                    Ok::<_, Error>(column_as_string!(
+                        auth,
+                        serde_json::from_str,
+                        serde_json::from_slice
+                    ))
+                })
+                .transpose()?,
         )
-        .bind(serde_json::to_string(&protected_endpoint)?)
-        .fetch_one(&mut *transaction)
-        .await;
-
-        match rec {
-            Ok(rec) => {
-                transaction.commit().await.map_err(Error::from)?;
-
-                let auth: String = rec.try_get("auth").map_err(Error::from)?;
-
-                Ok(Some(serde_json::from_str(&auth)?))
-            }
-            Err(err) => match err {
-                sqlx::Error::RowNotFound => {
-                    transaction.commit().await.map_err(Error::from)?;
-                    return Ok(None);
-                }
-                _ => {
-                    return {
-                        if let Err(err) = transaction.rollback().await {
-                            tracing::error!("Could not rollback sql transaction: {}", err);
-                        }
-                        Err(Error::SQLX(err).into())
-                    }
-                }
-            },
-        }
     }
+
     async fn get_auth_for_endpoints(
         &self,
     ) -> Result<HashMap<ProtectedEndpoint, Option<AuthRequired>>, Self::Err> {
-        let mut transaction = self.pool.begin().await.map_err(Error::from)?;
-
-        let recs = sqlx::query(
-            r#"
-SELECT *
-FROM protected_endpoints
-        "#,
-        )
-        .fetch_all(&mut *transaction)
-        .await;
-
-        match recs {
-            Ok(recs) => {
-                transaction.commit().await.map_err(Error::from)?;
-
-                let mut endpoints = HashMap::new();
-
-                for rec in recs {
-                    let auth: String = rec.try_get("auth").map_err(Error::from)?;
-                    let endpoint: String = rec.try_get("endpoint").map_err(Error::from)?;
-
-                    let endpoint: ProtectedEndpoint = serde_json::from_str(&endpoint)?;
-                    let auth: AuthRequired = serde_json::from_str(&auth)?;
-
-                    endpoints.insert(endpoint, Some(auth));
-                }
-
-                Ok(endpoints)
-            }
-            Err(err) => {
-                tracing::error!("SQLite could not get protected endpoints");
-                if let Err(err) = transaction.rollback().await {
-                    tracing::error!("Could not rollback sql transaction: {}", err);
-                }
-                Err(Error::from(err).into())
-            }
-        }
+        Ok(query(r#"SELECT endpoint, auth FROM protected_endpoints"#)
+            .fetch_all(&self.pool)
+            .await?
+            .into_iter()
+            .map(|row| {
+                let endpoint =
+                    column_as_string!(&row[0], serde_json::from_str, serde_json::from_slice);
+                let auth = column_as_string!(&row[1], serde_json::from_str, serde_json::from_slice);
+                Ok((endpoint, Some(auth)))
+            })
+            .collect::<Result<HashMap<_, _>, Error>>()?)
     }
 }

+ 39 - 1
crates/cdk-sqlite/src/mint/error.rs

@@ -7,7 +7,45 @@ use thiserror::Error;
 pub enum Error {
     /// SQLX Error
     #[error(transparent)]
-    SQLX(#[from] sqlx::Error),
+    Sqlite(#[from] r2d2_sqlite::rusqlite::Error),
+
+    /// Pool error
+    #[error(transparent)]
+    Pool(#[from] r2d2::Error),
+    /// Migration
+    #[error(transparent)]
+    Migration(#[from] refinery::Error),
+    /// Invalid UUID
+    #[error("Invalid UUID: {0}")]
+    InvalidUuid(String),
+    /// QuoteNotFound
+    #[error("Quote not found")]
+    QuoteNotFound,
+
+    /// Missing named parameter
+    #[error("Missing named parameter {0}")]
+    MissingParameter(String),
+
+    /// Communication error with the database
+    #[error("Internal communication error")]
+    Communication,
+
+    /// Invalid resposne from the database thread
+    #[error("Internal communication error")]
+    InvalidDbResponse,
+
+    /// Invalid db type
+    #[error("Invalid type from db, expected {0} got {1}")]
+    InvalidType(String, String),
+
+    /// Missing columns
+    #[error("Not enough elements: expected {0}, got {1}")]
+    MissingColumn(usize, usize),
+
+    /// Invalid data conversion in column
+    #[error("Error converting {0} to {1}")]
+    InvalidConversion(String, String),
+
     /// NUT00 Error
     #[error(transparent)]
     CDKNUT00(#[from] cdk_common::nuts::nut00::Error),

+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240923153640_melt_requests.sql → crates/cdk-sqlite/src/mint/migrations/V10__melt_requests.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240930101140_dleq_for_sigs.sql → crates/cdk-sqlite/src/mint/migrations/V11__dleq_for_sigs.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20241108093102_mint_mint_quote_pubkey.sql → crates/cdk-sqlite/src/mint/migrations/V12__mint_mint_quote_pubkey.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20250103201327_amount_to_pay_msats.sql → crates/cdk-sqlite/src/mint/migrations/V13__amount_to_pay_msats.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20250129200912_remove_mint_url.sql → crates/cdk-sqlite/src/mint/migrations/V14__remove_mint_url.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20250129230326_add_config_table.sql → crates/cdk-sqlite/src/mint/migrations/V15__add_config_table.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20250307213652_keyset_id_as_foreign_key.sql → crates/cdk-sqlite/src/mint/migrations/V16__keyset_id_as_foreign_key.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20250406091754_mint_time_of_quotes.sql → crates/cdk-sqlite/src/mint/migrations/V17__mint_time_of_quotes.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20250406093755_mint_created_time_signature.sql → crates/cdk-sqlite/src/mint/migrations/V18__mint_created_time_signature.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20250415093121_drop_keystore_foreign.sql → crates/cdk-sqlite/src/mint/migrations/V19__drop_keystore_foreign.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240612124932_init.sql → crates/cdk-sqlite/src/mint/migrations/V1__init.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240618200350_quote_state.sql → crates/cdk-sqlite/src/mint/migrations/V2__quote_state.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240626091921_nut04_state.sql → crates/cdk-sqlite/src/mint/migrations/V3__nut04_state.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240703122347_request_lookup_id.sql → crates/cdk-sqlite/src/mint/migrations/V4__request_lookup_id.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240710144711_input_fee.sql → crates/cdk-sqlite/src/mint/migrations/V5__input_fee.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240711183109_derivation_path_index.sql → crates/cdk-sqlite/src/mint/migrations/V6__derivation_path_index.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240718203721_allow_unspent.sql → crates/cdk-sqlite/src/mint/migrations/V7__allow_unspent.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240811031111_update_mint_url.sql → crates/cdk-sqlite/src/mint/migrations/V8__update_mint_url.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240919103407_proofs_quote_id.sql → crates/cdk-sqlite/src/mint/migrations/V9__proofs_quote_id.sql


Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 469 - 637
crates/cdk-sqlite/src/mint/mod.rs


+ 159 - 0
crates/cdk-sqlite/src/stmt.rs

@@ -0,0 +1,159 @@
+use r2d2::PooledConnection;
+use r2d2_sqlite::rusqlite::{self, CachedStatement};
+use r2d2_sqlite::SqliteConnectionManager;
+
+pub type Value = r2d2_sqlite::rusqlite::types::Value;
+
+/// The Column type
+pub type Column = r2d2_sqlite::rusqlite::types::Value;
+
+/// Expected Sql response
+#[derive(Debug, Clone, Copy, Default)]
+pub enum ExpectedSqlResponse {
+    SingleRow,
+    #[default]
+    ManyRows,
+    AffectedRows,
+    Pluck,
+}
+
+/// Sql message
+#[derive(Default, Debug)]
+pub struct Statement {
+    pub sql: String,
+    pub args: Vec<(String, Value)>,
+    pub expected_response: ExpectedSqlResponse,
+}
+
+impl Statement {
+    pub fn new<T: ToString>(sql: T) -> Self {
+        Self {
+            sql: sql.to_string(),
+            ..Default::default()
+        }
+    }
+
+    #[inline]
+    pub fn bind<C: ToString, V: Into<Value>>(mut self, name: C, value: V) -> Self {
+        self.args.push((name.to_string(), value.into()));
+        self
+    }
+
+    /// Binds a single variable with a vector.
+    ///
+    /// This will rewrite the function from `:foo` (where value is vec![1, 2, 3]) to `:foo0, :foo1,
+    /// :foo2` and binds each value from the value vector accordingly.
+    #[inline]
+    pub fn bind_vec<C: ToString, V: Into<Value>>(mut self, name: C, value: Vec<V>) -> Self {
+        let mut new_sql = String::with_capacity(self.sql.len());
+        let target = name.to_string();
+        let mut i = 0;
+
+        let placeholders = value
+            .into_iter()
+            .enumerate()
+            .map(|(key, value)| {
+                let key = format!("{target}{key}");
+                self.args.push((key.clone(), value.into()));
+                key
+            })
+            .collect::<Vec<_>>()
+            .join(",");
+
+        while let Some(pos) = self.sql[i..].find(&target) {
+            let abs_pos = i + pos;
+            let after = abs_pos + target.len();
+            let is_word_boundary = self.sql[after..]
+                .chars()
+                .next()
+                .map_or(true, |c| !c.is_alphanumeric() && c != '_');
+
+            if is_word_boundary {
+                new_sql.push_str(&self.sql[i..abs_pos]);
+                new_sql.push_str(&placeholders);
+                i = after;
+            } else {
+                new_sql.push_str(&self.sql[i..=abs_pos]);
+                i = abs_pos + 1;
+            }
+        }
+
+        new_sql.push_str(&self.sql[i..]);
+
+        self.sql = new_sql;
+        self
+    }
+
+    fn get_stmt(
+        self,
+        conn: &PooledConnection<SqliteConnectionManager>,
+    ) -> rusqlite::Result<CachedStatement<'_>> {
+        let mut stmt = conn.prepare_cached(&self.sql)?;
+        for (name, value) in self.args {
+            let index = stmt
+                .parameter_index(&name)
+                .map_err(|_| rusqlite::Error::InvalidColumnName(name.clone()))?
+                .ok_or(rusqlite::Error::InvalidColumnName(name))?;
+
+            stmt.raw_bind_parameter(index, value)?;
+        }
+
+        Ok(stmt)
+    }
+    ///
+    /// Executes a query and returns the affected rows
+    pub fn plunk(
+        self,
+        conn: &PooledConnection<SqliteConnectionManager>,
+    ) -> rusqlite::Result<Option<Value>> {
+        let mut stmt = self.get_stmt(conn)?;
+        let mut rows = stmt.raw_query();
+        rows.next()?.map(|row| row.get(0)).transpose()
+    }
+
+    /// Executes a query and returns the affected rows
+    pub fn execute(
+        self,
+        conn: &PooledConnection<SqliteConnectionManager>,
+    ) -> rusqlite::Result<usize> {
+        self.get_stmt(conn)?.raw_execute()
+    }
+
+    /// Runs the query and returns the first row or None
+    pub fn fetch_one(
+        self,
+        conn: &PooledConnection<SqliteConnectionManager>,
+    ) -> rusqlite::Result<Option<Vec<Column>>> {
+        let mut stmt = self.get_stmt(conn)?;
+        let columns = stmt.column_count();
+        let mut rows = stmt.raw_query();
+        rows.next()?
+            .map(|row| {
+                (0..columns)
+                    .map(|i| row.get(i))
+                    .collect::<Result<Vec<_>, _>>()
+            })
+            .transpose()
+    }
+
+    /// Runs the query and returns the first row or None
+    pub fn fetch_all(
+        self,
+        conn: &PooledConnection<SqliteConnectionManager>,
+    ) -> rusqlite::Result<Vec<Vec<Column>>> {
+        let mut stmt = self.get_stmt(conn)?;
+        let columns = stmt.column_count();
+        let mut rows = stmt.raw_query();
+        let mut results = vec![];
+
+        while let Some(row) = rows.next()? {
+            results.push(
+                (0..columns)
+                    .map(|i| row.get(i))
+                    .collect::<Result<Vec<_>, _>>()?,
+            );
+        }
+
+        Ok(results)
+    }
+}

+ 20 - 1
crates/cdk-sqlite/src/wallet/error.rs

@@ -7,7 +7,26 @@ use thiserror::Error;
 pub enum Error {
     /// SQLX Error
     #[error(transparent)]
-    SQLX(#[from] sqlx::Error),
+    Sqlite(#[from] r2d2_sqlite::rusqlite::Error),
+    /// Pool error
+    #[error(transparent)]
+    Pool(#[from] r2d2::Error),
+
+    /// Missing columns
+    #[error("Not enough elements: expected {0}, got {1}")]
+    MissingColumn(usize, usize),
+
+    /// Invalid db type
+    #[error("Invalid type from db, expected {0} got {1}")]
+    InvalidType(String, String),
+
+    /// Invalid data conversion in column
+    #[error("Error converting {0} to {1}")]
+    InvalidConversion(String, String),
+
+    /// Migration
+    #[error(transparent)]
+    Migration(#[from] refinery::Error),
     /// Serde Error
     #[error(transparent)]
     Serde(#[from] serde_json::Error),

+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20241108092756_wallet_mint_quote_secretkey.sql → crates/cdk-sqlite/src/wallet/migrations/V10__wallet_mint_quote_secretkey.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20250214135017_mint_tos.sql → crates/cdk-sqlite/src/wallet/migrations/V11__mint_tos.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20250310111513_drop_nostr_last_checked.sql → crates/cdk-sqlite/src/wallet/migrations/V12__drop_nostr_last_checked.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20250314082116_allow_pending_spent.sql → crates/cdk-sqlite/src/wallet/migrations/V13__allow_pending_spent.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20250323152040_wallet_dleq_proofs.sql → crates/cdk-sqlite/src/wallet/migrations/V14__wallet_dleq_proofs.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20250401120000_add_transactions_table.sql → crates/cdk-sqlite/src/wallet/migrations/V15__add_transactions_table.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240612132920_init.sql → crates/cdk-sqlite/src/wallet/migrations/V1__init.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240618195700_quote_state.sql → crates/cdk-sqlite/src/wallet/migrations/V2__quote_state.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240626092101_nut04_state.sql → crates/cdk-sqlite/src/wallet/migrations/V3__nut04_state.sql


+ 0 - 0
crates/cdk-sqlite/src/mint/migrations/20240710145043_input_fee.sql → crates/cdk-sqlite/src/wallet/migrations/V4__input_fee.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240810214105_mint_icon_url.sql → crates/cdk-sqlite/src/wallet/migrations/V5__mint_icon_url.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240810233905_update_mint_url.sql → crates/cdk-sqlite/src/wallet/migrations/V6__update_mint_url.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240902151515_icon_url.sql → crates/cdk-sqlite/src/wallet/migrations/V7__icon_url.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20240902210905_mint_time.sql → crates/cdk-sqlite/src/wallet/migrations/V8__mint_time.sql


+ 0 - 0
crates/cdk-sqlite/src/wallet/migrations/20241011125207_mint_urls.sql → crates/cdk-sqlite/src/wallet/migrations/V9__mint_urls.sql


Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 360 - 398
crates/cdk-sqlite/src/wallet/mod.rs


Một số tệp đã không được hiển thị bởi vì quá nhiều tập tin thay đổi trong này khác