Преглед изворни кода

Add a generic function to execute db operations

This function would log slow operations and log errors
Cesar Rodas пре 5 месеци
родитељ
комит
29079a3f7e

+ 2 - 4
crates/cdk-common/src/database/mint/test.rs

@@ -79,7 +79,7 @@ where
 }
 
 /// Test the basic storing and retrieving proofs from the database. Probably the database would use
-/// binary/Vec<u8> to store data, that's why this test would quickly identify issues before running
+/// binary/`Vec<u8>` to store data, that's why this test would quickly identify issues before running
 /// other tests
 pub async fn add_and_find_proofs<DB>(db: DB)
 where
@@ -110,9 +110,7 @@ where
 
     // Add proofs to database
     let mut tx = Database::begin_transaction(&db).await.unwrap();
-    tx.add_proofs(proofs.clone(), Some(quote_id))
-        .await
-        .unwrap();
+    tx.add_proofs(proofs.clone(), Some(quote_id)).await.unwrap();
     assert!(tx.commit().await.is_ok());
 
     let proofs_from_db = db.get_proofs_by_ys(&[proofs[0].c, proofs[1].c]).await;

+ 104 - 78
crates/cdk-postgres/src/db.rs

@@ -1,4 +1,5 @@
 use cdk_common::database::Error;
+use cdk_sql_common::run_db_operation;
 use cdk_sql_common::stmt::{Column, Statement};
 use futures_util::{pin_mut, TryStreamExt};
 use tokio_postgres::error::SqlState;
@@ -9,7 +10,8 @@ use crate::value::PgValue;
 #[inline(always)]
 fn to_pgsql_error(err: PgError) -> Error {
     if let Some(err) = err.as_db_error() {
-        if *err.code() == SqlState::INTEGRITY_CONSTRAINT_VIOLATION {
+        let code = err.code().to_owned();
+        if code == SqlState::INTEGRITY_CONSTRAINT_VIOLATION || code == SqlState::UNIQUE_VIOLATION {
             return Error::Duplicate;
         }
     }
@@ -21,7 +23,12 @@ fn to_pgsql_error(err: PgError) -> Error {
 pub async fn pg_batch(conn: &Client, statement: Statement) -> Result<(), Error> {
     let (sql, _placeholder_values) = statement.to_sql()?;
 
-    conn.batch_execute(&sql).await.map_err(to_pgsql_error)
+    run_db_operation(
+        &sql,
+        async { conn.batch_execute(&sql).await },
+        to_pgsql_error,
+    )
+    .await
 }
 
 #[inline(always)]
@@ -29,16 +36,22 @@ pub async fn pg_execute(conn: &Client, statement: Statement) -> Result<usize, Er
     let (sql, placeholder_values) = statement.to_sql()?;
     let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
 
-    conn.execute_raw(
-        &prepared_statement,
-        placeholder_values
-            .iter()
-            .map(|x| x.into())
-            .collect::<Vec<PgValue>>(),
+    run_db_operation(
+        &sql,
+        async {
+            conn.execute_raw(
+                &prepared_statement,
+                placeholder_values
+                    .iter()
+                    .map(|x| x.into())
+                    .collect::<Vec<PgValue>>(),
+            )
+            .await
+            .map(|x| x as usize)
+        },
+        to_pgsql_error,
     )
     .await
-    .map_err(to_pgsql_error)
-    .map(|x| x as usize)
 }
 
 #[inline(always)]
@@ -49,30 +62,34 @@ pub async fn pg_fetch_one(
     let (sql, placeholder_values) = statement.to_sql()?;
     let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
 
-    let stream = conn
-        .query_raw(
-            &prepared_statement,
-            placeholder_values
-                .iter()
-                .map(|x| x.into())
-                .collect::<Vec<PgValue>>(),
-        )
-        .await
-        .map_err(to_pgsql_error)?;
-
-    pin_mut!(stream);
-
-    stream
-        .try_next()
-        .await
-        .map_err(to_pgsql_error)?
-        .map(|row| {
-            (0..row.len())
-                .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
-                .collect::<Result<Vec<_>, _>>()
-        })
-        .transpose()
-        .map_err(to_pgsql_error)
+    run_db_operation(
+        &sql,
+        async {
+            let stream = conn
+                .query_raw(
+                    &prepared_statement,
+                    placeholder_values
+                        .iter()
+                        .map(|x| x.into())
+                        .collect::<Vec<PgValue>>(),
+                )
+                .await?;
+
+            pin_mut!(stream);
+
+            stream
+                .try_next()
+                .await?
+                .map(|row| {
+                    (0..row.len())
+                        .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
+                        .collect::<Result<Vec<_>, _>>()
+                })
+                .transpose()
+        },
+        to_pgsql_error,
+    )
+    .await
 }
 
 #[inline(always)]
@@ -80,30 +97,35 @@ pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec
     let (sql, placeholder_values) = statement.to_sql()?;
     let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
 
-    let stream = conn
-        .query_raw(
-            &prepared_statement,
-            placeholder_values
-                .iter()
-                .map(|x| x.into())
-                .collect::<Vec<PgValue>>(),
-        )
-        .await
-        .map_err(to_pgsql_error)?;
-
-    pin_mut!(stream);
-
-    let mut rows = vec![];
-    while let Some(row) = stream.try_next().await.map_err(to_pgsql_error)? {
-        rows.push(
-            (0..row.len())
-                .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
-                .collect::<Result<Vec<_>, _>>()
-                .map_err(to_pgsql_error)?,
-        );
-    }
-
-    Ok(rows)
+    run_db_operation(
+        &sql,
+        async {
+            let stream = conn
+                .query_raw(
+                    &prepared_statement,
+                    placeholder_values
+                        .iter()
+                        .map(|x| x.into())
+                        .collect::<Vec<PgValue>>(),
+                )
+                .await?;
+
+            pin_mut!(stream);
+
+            let mut rows = vec![];
+            while let Some(row) = stream.try_next().await? {
+                rows.push(
+                    (0..row.len())
+                        .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
+                        .collect::<Result<Vec<_>, _>>()?,
+                );
+            }
+
+            Ok(rows)
+        },
+        to_pgsql_error,
+    )
+    .await
 }
 
 #[inline(always)]
@@ -111,24 +133,28 @@ pub async fn gn_pluck(conn: &Client, statement: Statement) -> Result<Option<Colu
     let (sql, placeholder_values) = statement.to_sql()?;
     let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
 
-    let stream = conn
-        .query_raw(
-            &prepared_statement,
-            placeholder_values
-                .iter()
-                .map(|x| x.into())
-                .collect::<Vec<PgValue>>(),
-        )
-        .await
-        .map_err(to_pgsql_error)?;
-
-    pin_mut!(stream);
-
-    stream
-        .try_next()
-        .await
-        .map_err(to_pgsql_error)?
-        .map(|row| row.try_get::<_, PgValue>(0).map(|value| value.into()))
-        .transpose()
-        .map_err(to_pgsql_error)
+    run_db_operation(
+        &sql,
+        async {
+            let stream = conn
+                .query_raw(
+                    &prepared_statement,
+                    placeholder_values
+                        .iter()
+                        .map(|x| x.into())
+                        .collect::<Vec<PgValue>>(),
+                )
+                .await?;
+
+            pin_mut!(stream);
+
+            stream
+                .try_next()
+                .await?
+                .map(|row| row.try_get::<_, PgValue>(0).map(|value| value.into()))
+                .transpose()
+        },
+        to_pgsql_error,
+    )
+    .await
 }

+ 3 - 0
crates/cdk-sql-common/src/mint/mod.rs

@@ -255,6 +255,9 @@ where
         ys: &[PublicKey],
         _quote_id: Option<Uuid>,
     ) -> Result<(), Self::Err> {
+        if ys.is_empty() {
+            return Ok(());
+        }
         let total_deleted = query(
             r#"
             DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)

+ 3 - 1
crates/cdk/src/mint/proof_writer.rs

@@ -193,7 +193,9 @@ async fn reset_proofs_to_original_state(
         tx.update_proofs_states(&ys, state).await?;
     }
 
-    tx.remove_proofs(&unknown_proofs, None).await?;
+    if !unknown_proofs.is_empty() {
+        tx.remove_proofs(&unknown_proofs, None).await?;
+    }
 
     Ok(())
 }