Forráskód Böngészése

Merge pull request #946 from crodas/feature/run-db-operation

Introduce `run_db_operation_sync` and `run_db_operation`
thesimplekid 2 hónapja
szülő
commit
8da03b87e2

+ 69 - 1
crates/cdk-sql-common/src/common.rs

@@ -1,13 +1,81 @@
+use std::fmt::Debug;
+use std::future::Future;
+use std::time::Instant;
+
+use cdk_common::database::Error;
+
 use crate::database::DatabaseExecutor;
 use crate::stmt::query;
 
+const SLOW_QUERY_THRESHOLD_MS: u128 = 20;
+
+/// Run a database operation and log slow operations, it also converts and logs any error with a
+/// given info for more context. This function is expecting a synchronous database operation
+#[inline(always)]
+pub fn run_db_operation_sync<F, E, E1, T>(
+    info: &str,
+    operation: F,
+    error_map: E,
+) -> Result<T, Error>
+where
+    F: FnOnce() -> Result<T, E1>,
+    E1: Debug,
+    E: FnOnce(E1) -> Error,
+{
+    let start = Instant::now();
+
+    tracing::trace!("Running db operation {}", info);
+
+    let result = operation().map_err(|e| {
+        tracing::error!("Query {} failed with error {:?}", info, e);
+        error_map(e)
+    });
+
+    let duration = start.elapsed();
+    if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
+        tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
+    }
+
+    result
+}
+
+/// Run a database operation and log slow operations, it also converts and logs any error with a
+/// given info for more context
+#[inline(always)]
+pub async fn run_db_operation<Fut, E, E1, T>(
+    info: &str,
+    operation: Fut,
+    error_map: E,
+) -> Result<T, Error>
+where
+    Fut: Future<Output = Result<T, E1>>,
+    E1: Debug,
+    E: FnOnce(E1) -> Error,
+{
+    let start = Instant::now();
+
+    tracing::trace!("Running db operation {}", info);
+
+    let result = operation.await.map_err(|e| {
+        tracing::error!("Query {} failed with error {:?}", info, e);
+        error_map(e)
+    });
+
+    let duration = start.elapsed();
+    if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
+        tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
+    }
+
+    result
+}
+
 /// Migrates the migration generated by `build.rs`
 #[inline(always)]
 pub async fn migrate<C>(
     conn: &C,
     db_prefix: &str,
     migrations: &[(&str, &str, &str)],
-) -> Result<(), cdk_common::database::Error>
+) -> Result<(), Error>
 where
     C: DatabaseExecutor,
 {

+ 1 - 0
crates/cdk-sql-common/src/lib.rs

@@ -11,6 +11,7 @@ pub mod stmt;
 pub mod value;
 
 pub use cdk_common::database::ConversionError;
+pub use common::{run_db_operation, run_db_operation_sync};
 
 #[cfg(feature = "mint")]
 pub mod mint;

+ 54 - 43
crates/cdk-sqlite/src/async_sqlite.rs

@@ -1,6 +1,7 @@
 //! Simple SQLite
 use cdk_common::database::Error;
 use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
+use cdk_sql_common::run_db_operation_sync;
 use cdk_sql_common::stmt::{query, Column, SqlPart, Statement};
 use rusqlite::{ffi, CachedStatement, Connection, Error as SqliteError, ErrorCode};
 use tokio::sync::Mutex;
@@ -25,7 +26,7 @@ impl AsyncSqlite {
         &self,
         conn: &'a Connection,
         statement: Statement,
-    ) -> Result<CachedStatement<'a>, Error> {
+    ) -> Result<(String, CachedStatement<'a>), Error> {
         let (sql, placeholder_values) = statement.to_sql()?;
 
         let new_sql = sql.trim().trim_end_matches("FOR UPDATE");
@@ -39,7 +40,7 @@ impl AsyncSqlite {
                 .map_err(|e| Error::Database(Box::new(e)))?;
         }
 
-        Ok(stmt)
+        Ok((sql, stmt))
     }
 }
 
@@ -104,68 +105,81 @@ impl DatabaseExecutor for AsyncSqlite {
     async fn execute(&self, statement: Statement) -> Result<usize, Error> {
         let conn = self.inner.lock().await;
 
-        let mut stmt = self
+        let (sql, mut stmt) = self
             .get_stmt(&conn, statement)
             .map_err(|e| Error::Database(Box::new(e)))?;
 
-        Ok(stmt.raw_execute().map_err(to_sqlite_error)?)
+        run_db_operation_sync(&sql, || stmt.raw_execute(), to_sqlite_error)
     }
 
     async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
         let conn = self.inner.lock().await;
-        let mut stmt = self
+        let (sql, mut stmt) = self
             .get_stmt(&conn, statement)
             .map_err(|e| Error::Database(Box::new(e)))?;
 
-        let columns = stmt.column_count();
-
-        let mut rows = stmt.raw_query();
-        rows.next()
-            .map_err(to_sqlite_error)?
-            .map(|row| {
-                (0..columns)
-                    .map(|i| row.get(i).map(from_sqlite))
-                    .collect::<Result<Vec<_>, _>>()
-            })
-            .transpose()
-            .map_err(to_sqlite_error)
+        run_db_operation_sync(
+            &sql,
+            || {
+                let columns = stmt.column_count();
+
+                let mut rows = stmt.raw_query();
+                rows.next()?
+                    .map(|row| {
+                        (0..columns)
+                            .map(|i| row.get(i).map(from_sqlite))
+                            .collect::<Result<Vec<_>, _>>()
+                    })
+                    .transpose()
+            },
+            to_sqlite_error,
+        )
     }
 
     async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
         let conn = self.inner.lock().await;
-        let mut stmt = self
+        let (sql, mut stmt) = self
             .get_stmt(&conn, statement)
             .map_err(|e| Error::Database(Box::new(e)))?;
 
         let columns = stmt.column_count();
 
-        let mut rows = stmt.raw_query();
-        let mut results = vec![];
-
-        while let Some(row) = rows.next().map_err(to_sqlite_error)? {
-            results.push(
-                (0..columns)
-                    .map(|i| row.get(i).map(from_sqlite))
-                    .collect::<Result<Vec<_>, _>>()
-                    .map_err(to_sqlite_error)?,
-            )
-        }
-
-        Ok(results)
+        run_db_operation_sync(
+            &sql,
+            || {
+                let mut rows = stmt.raw_query();
+                let mut results = vec![];
+
+                while let Some(row) = rows.next()? {
+                    results.push(
+                        (0..columns)
+                            .map(|i| row.get(i).map(from_sqlite))
+                            .collect::<Result<Vec<_>, _>>()?,
+                    )
+                }
+
+                Ok(results)
+            },
+            to_sqlite_error,
+        )
     }
 
     async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
         let conn = self.inner.lock().await;
-        let mut stmt = self
+        let (sql, mut stmt) = self
             .get_stmt(&conn, statement)
             .map_err(|e| Error::Database(Box::new(e)))?;
 
-        let mut rows = stmt.raw_query();
-        rows.next()
-            .map_err(to_sqlite_error)?
-            .map(|row| row.get(0usize).map(from_sqlite))
-            .transpose()
-            .map_err(to_sqlite_error)
+        run_db_operation_sync(
+            &sql,
+            || {
+                let mut rows = stmt.raw_query();
+                rows.next()?
+                    .map(|row| row.get(0usize).map(from_sqlite))
+                    .transpose()
+            },
+            to_sqlite_error,
+        )
     }
 
     async fn batch(&self, mut statement: Statement) -> Result<(), Error> {
@@ -187,11 +201,8 @@ impl DatabaseExecutor for AsyncSqlite {
                 unreachable!()
             }
         };
+        let conn = self.inner.lock().await;
 
-        self.inner
-            .lock()
-            .await
-            .execute_batch(&sql)
-            .map_err(to_sqlite_error)
+        run_db_operation_sync(&sql, || conn.execute_batch(&sql), to_sqlite_error)
     }
 }