Преглед на файлове

Merge pull request #822 from crodas/feature/sqlite-add-busy-timeout

Add 10 second busy timeout
thesimplekid преди 1 седмица
родител
ревизия
37f9d9122b

+ 4 - 1
crates/cdk-common/src/database/mint/mod.rs

@@ -88,7 +88,7 @@ pub trait QuotesDatabase {
         &self,
         quote_id: &Uuid,
         state: MeltQuoteState,
-    ) -> Result<MeltQuoteState, Self::Err>;
+    ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err>;
     /// Get all [`mint::MeltQuote`]s
     async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err>;
     /// Remove [`mint::MeltQuote`]
@@ -114,6 +114,9 @@ pub trait ProofsDatabase {
     type Err: Into<Error> + From<Error>;
 
     /// Add  [`Proofs`]
+    ///
+    /// Adds proofs to the database. The database should error if the proof already exits, with a
+    /// `AttemptUpdateSpentProof` if the proof is already spent or a `Duplicate` error otherwise.
     async fn add_proofs(&self, proof: Proofs, quote_id: Option<Uuid>) -> Result<(), Self::Err>;
     /// Remove [`Proofs`]
     async fn remove_proofs(

+ 5 - 0
crates/cdk-common/src/database/mod.rs

@@ -22,6 +22,11 @@ pub enum Error {
     /// Database Error
     #[error(transparent)]
     Database(Box<dyn std::error::Error + Send + Sync>),
+
+    /// Duplicate entry
+    #[error("Duplicate entry")]
+    Duplicate,
+
     /// DHKE error
     #[error(transparent)]
     DHKE(#[from] crate::dhke::Error),

+ 4 - 3
crates/cdk-redb/src/mint/mod.rs

@@ -470,12 +470,13 @@ impl MintQuotesDatabase for MintRedbDatabase {
         &self,
         quote_id: &Uuid,
         state: MeltQuoteState,
-    ) -> Result<MeltQuoteState, Self::Err> {
+    ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
         let write_txn = self.db.begin_write().map_err(Error::from)?;
 
         let current_state;
+        let mut melt_quote: mint::MeltQuote;
+
         {
-            let mut melt_quote: mint::MeltQuote;
             let mut table = write_txn
                 .open_table(MELT_QUOTES_TABLE)
                 .map_err(Error::from)?;
@@ -506,7 +507,7 @@ impl MintQuotesDatabase for MintRedbDatabase {
         }
         write_txn.commit().map_err(Error::from)?;
 
-        Ok(current_state)
+        Ok((current_state, melt_quote))
     }
 
     async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {

+ 3 - 1
crates/cdk-sqlite/src/common.rs

@@ -47,6 +47,8 @@ impl ResourceManager for SqliteConnectionManager {
             "#,
         )?;
 
+        conn.busy_timeout(Duration::from_secs(10))?;
+
         Ok(conn)
     }
 }
@@ -81,7 +83,7 @@ pub fn create_sqlite_pool(
         )
     };
 
-    Pool::new(config, max_size, Duration::from_secs(5))
+    Pool::new(config, max_size, Duration::from_secs(10))
 }
 
 /// Migrates the migration generated by `build.rs`

+ 59 - 5
crates/cdk-sqlite/src/mint/async_rusqlite.rs

@@ -4,7 +4,7 @@ use std::sync::{mpsc as std_mpsc, Arc, Mutex};
 use std::thread::spawn;
 use std::time::Instant;
 
-use rusqlite::Connection;
+use rusqlite::{ffi, Connection, ErrorCode, TransactionBehavior};
 use tokio::sync::{mpsc, oneshot};
 
 use crate::common::SqliteConnectionManager;
@@ -202,6 +202,26 @@ fn rusqlite_spawn_worker_threads(
                     Ok(ok) => reply_to.send(ok),
                     Err(err) => {
                         tracing::error!("Failed query with error {:?}", err);
+                        let err = if let Error::Sqlite(rusqlite::Error::SqliteFailure(
+                            ffi::Error {
+                                code,
+                                extended_code,
+                            },
+                            _,
+                        )) = &err
+                        {
+                            if *code == ErrorCode::ConstraintViolation
+                                && (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
+                                    || *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
+                            {
+                                Error::Duplicate
+                            } else {
+                                err
+                            }
+                        } else {
+                            err
+                        };
+
                         reply_to.send(DbResponse::Error(err))
                     }
                 };
@@ -262,7 +282,7 @@ fn rusqlite_worker_manager(
                     }
                 };
 
-                let tx = match conn.transaction() {
+                let tx = match conn.transaction_with_behavior(TransactionBehavior::Immediate) {
                     Ok(tx) => tx,
                     Err(err) => {
                         tracing::error!("Failed to begin a transaction: {:?}", err);
@@ -300,7 +320,10 @@ fn rusqlite_worker_manager(
                             tracing::trace!("Tx {}: Commit", tx_id);
                             let _ = reply_to.send(match tx.commit() {
                                 Ok(()) => DbResponse::Ok,
-                                Err(err) => DbResponse::Error(err.into()),
+                                Err(err) => {
+                                    tracing::error!("Failed commit {:?}", err);
+                                    DbResponse::Error(err.into())
+                                }
                             });
                             break;
                         }
@@ -308,7 +331,10 @@ fn rusqlite_worker_manager(
                             tracing::trace!("Tx {}: Rollback", tx_id);
                             let _ = reply_to.send(match tx.rollback() {
                                 Ok(()) => DbResponse::Ok,
-                                Err(err) => DbResponse::Error(err.into()),
+                                Err(err) => {
+                                    tracing::error!("Failed rollback {:?}", err);
+                                    DbResponse::Error(err.into())
+                                }
                             });
                             break;
                         }
@@ -319,7 +345,35 @@ fn rusqlite_worker_manager(
                             tracing::trace!("Tx {}: SQL {}", tx_id, sql.sql);
                             let _ = match process_query(&tx, sql) {
                                 Ok(ok) => reply_to.send(ok),
-                                Err(err) => reply_to.send(DbResponse::Error(err)),
+                                Err(err) => {
+                                    tracing::error!(
+                                        "Tx {}: Failed query with error {:?}",
+                                        tx_id,
+                                        err
+                                    );
+                                    let err = if let Error::Sqlite(
+                                        rusqlite::Error::SqliteFailure(
+                                            ffi::Error {
+                                                code,
+                                                extended_code,
+                                            },
+                                            _,
+                                        ),
+                                    ) = &err
+                                    {
+                                        if *code == ErrorCode::ConstraintViolation
+                                            && (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
+                                                || *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
+                                        {
+                                            Error::Duplicate
+                                        } else {
+                                            err
+                                        }
+                                    } else {
+                                        err
+                                    };
+                                    reply_to.send(DbResponse::Error(err))
+                                }
                             };
                         }
                     }

+ 8 - 1
crates/cdk-sqlite/src/mint/error.rs

@@ -9,6 +9,10 @@ pub enum Error {
     #[error(transparent)]
     Sqlite(#[from] rusqlite::Error),
 
+    /// Duplicate entry
+    #[error("Record already exists")]
+    Duplicate,
+
     /// Pool error
     #[error(transparent)]
     Pool(#[from] crate::pool::Error<rusqlite::Error>),
@@ -98,6 +102,9 @@ pub enum Error {
 
 impl From<Error> for cdk_common::database::Error {
     fn from(e: Error) -> Self {
-        Self::Database(Box::new(e))
+        match e {
+            Error::Duplicate => Self::Duplicate,
+            e => Self::Database(Box::new(e)),
+        }
     }
 }

+ 34 - 6
crates/cdk-sqlite/src/mint/mod.rs

@@ -674,10 +674,10 @@ ON CONFLICT(request_lookup_id) DO UPDATE SET
         &self,
         quote_id: &Uuid,
         state: MeltQuoteState,
-    ) -> Result<MeltQuoteState, Self::Err> {
+    ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
         let transaction = self.pool.begin().await?;
 
-        let quote = query(
+        let mut quote = query(
             r#"
             SELECT
                 id,
@@ -732,7 +732,10 @@ ON CONFLICT(request_lookup_id) DO UPDATE SET
             }
         };
 
-        Ok(quote.state)
+        let old_state = quote.state;
+        quote.state = state;
+
+        Ok((old_state, quote))
     }
 
     async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> {
@@ -813,10 +816,30 @@ impl MintProofsDatabase for MintSqliteDatabase {
 
         let current_time = unix_time();
 
+        // Check any previous proof, this query should return None in order to proceed storing
+        // Any result here would error
+        match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1"#)
+            .bind_vec(
+                ":ys",
+                proofs
+                    .iter()
+                    .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
+                    .collect::<Result<_, _>>()?,
+            )
+            .pluck(&transaction)
+            .await?
+            .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
+            .transpose()?
+        {
+            Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
+            Some(_) => Err(database::Error::Duplicate),
+            None => Ok(()), // no previous record
+        }?;
+
         for proof in proofs {
             query(
                 r#"
-                INSERT OR IGNORE INTO proof
+                INSERT INTO proof
                 (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time)
                 VALUES
                 (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time)
@@ -852,10 +875,11 @@ impl MintProofsDatabase for MintSqliteDatabase {
 
         let total_deleted = query(
             r#"
-            DELETE FROM proof WHERE y IN (:ys) AND state != 'SPENT'
+            DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
             "#,
         )
         .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
+        .bind_vec(":exclude_state", vec![State::Spent.to_string()])
         .execute(&transaction)
         .await?;
 
@@ -974,7 +998,11 @@ impl MintProofsDatabase for MintSqliteDatabase {
 
         if current_states.len() != ys.len() {
             transaction.rollback().await?;
-            tracing::warn!("Attempted to update state of non-existent proof");
+            tracing::warn!(
+                "Attempted to update state of non-existent proof {} {}",
+                current_states.len(),
+                ys.len()
+            );
             return Err(database::Error::ProofNotFound);
         }
 

+ 15 - 9
crates/cdk/src/mint/melt.rs

@@ -297,7 +297,7 @@ impl Mint {
         &self,
         melt_request: &MeltRequest<Uuid>,
     ) -> Result<MeltQuote, Error> {
-        let state = self
+        let (state, quote) = self
             .localstore
             .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending)
             .await?;
@@ -309,12 +309,6 @@ impl Mint {
             MeltQuoteState::Unknown => Err(Error::UnknownPaymentState),
         }?;
 
-        let quote = self
-            .localstore
-            .get_melt_quote(melt_request.quote())
-            .await?
-            .ok_or(Error::UnknownQuote)?;
-
         self.pubsub_manager
             .melt_quote_status(&quote, None, None, MeltQuoteState::Pending);
 
@@ -347,11 +341,23 @@ impl Mint {
             ));
         }
 
-        self.localstore
+        if let Some(err) = self
+            .localstore
             .add_proofs(melt_request.inputs().clone(), None)
-            .await?;
+            .await
+            .err()
+        {
+            return match err {
+                cdk_common::database::Error::Duplicate => Err(Error::TokenPending),
+                cdk_common::database::Error::AttemptUpdateSpentProof => {
+                    Err(Error::TokenAlreadySpent)
+                }
+                err => Err(Error::Database(err)),
+            };
+        }
 
         self.check_ys_spendable(&input_ys, State::Pending).await?;
+
         for proof in melt_request.inputs() {
             self.pubsub_manager
                 .proof_state((proof.y()?, State::Pending));

+ 13 - 2
crates/cdk/src/mint/swap.rs

@@ -24,9 +24,20 @@ impl Mint {
 
         // After swap request is fully validated, add the new proofs to DB
         let input_ys = swap_request.inputs().ys()?;
-        self.localstore
+        if let Some(err) = self
+            .localstore
             .add_proofs(swap_request.inputs().clone(), None)
-            .await?;
+            .await
+            .err()
+        {
+            return match err {
+                cdk_common::database::Error::Duplicate => Err(Error::TokenPending),
+                cdk_common::database::Error::AttemptUpdateSpentProof => {
+                    Err(Error::TokenAlreadySpent)
+                }
+                err => Err(Error::Database(err)),
+            };
+        }
         self.check_ys_spendable(&input_ys, State::Pending).await?;
 
         let mut promises = Vec::with_capacity(swap_request.outputs().len());