| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- use std::fmt::Debug;
- use std::future::Future;
- use std::time::Instant;
- use cdk_common::database::Error;
- use crate::database::DatabaseExecutor;
- use crate::stmt::query;
- const SLOW_QUERY_THRESHOLD_MS: u128 = 20;
- /// Run a database operation and log slow operations, it also converts and logs any error with a
- /// given info for more context. This function is expecting a synchronous database operation
- #[inline(always)]
- pub fn run_db_operation_sync<F, E, E1, T>(
- info: &str,
- operation: F,
- error_map: E,
- ) -> Result<T, Error>
- where
- F: FnOnce() -> Result<T, E1>,
- E1: Debug,
- E: FnOnce(E1) -> Error,
- {
- let start = Instant::now();
- tracing::trace!("Running db operation {}", info);
- let result = operation().map_err(|e| {
- tracing::error!("Query {} failed with error {:?}", info, e);
- error_map(e)
- });
- let duration = start.elapsed();
- if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
- tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
- }
- result
- }
- /// Run a database operation and log slow operations, it also converts and logs any error with a
- /// given info for more context
- #[inline(always)]
- pub async fn run_db_operation<Fut, E, E1, T>(
- info: &str,
- operation: Fut,
- error_map: E,
- ) -> Result<T, Error>
- where
- Fut: Future<Output = Result<T, E1>>,
- E1: Debug,
- E: FnOnce(E1) -> Error,
- {
- let start = Instant::now();
- tracing::trace!("Running db operation {}", info);
- let result = operation.await.map_err(|e| {
- tracing::error!("Query {} failed with error {:?}", info, e);
- error_map(e)
- });
- let duration = start.elapsed();
- if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
- tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
- }
- result
- }
- /// Migrates the migration generated by `build.rs`
- #[inline(always)]
- pub async fn migrate<C>(
- conn: &C,
- db_prefix: &str,
- migrations: &[(&str, &str, &str)],
- ) -> Result<(), Error>
- where
- C: DatabaseExecutor,
- {
- query(
- r#"
- CREATE TABLE IF NOT EXISTS migrations (
- name TEXT PRIMARY KEY,
- applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- "#,
- )?
- .execute(conn)
- .await?;
- // Apply each migration if it hasn’t been applied yet
- for (prefix, name, sql) in migrations {
- if !prefix.is_empty() && *prefix != db_prefix {
- continue;
- }
- let is_missing = query("SELECT name FROM migrations WHERE name = :name")?
- .bind("name", name)
- .pluck(conn)
- .await?
- .is_none();
- if is_missing {
- query(sql)?.batch(conn).await?;
- query(r#"INSERT INTO migrations (name) VALUES (:name)"#)?
- .bind("name", name)
- .execute(conn)
- .await?;
- }
- }
- Ok(())
- }
|