common.rs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. use std::fmt::Debug;
  2. use std::future::Future;
  3. use std::time::Instant;
  4. use cdk_common::database::Error;
  5. use crate::database::DatabaseExecutor;
  6. use crate::stmt::query;
  7. const SLOW_QUERY_THRESHOLD_MS: u128 = 20;
  8. /// Run a database operation and log slow operations, it also converts and logs any error with a
  9. /// given info for more context. This function is expecting a synchronous database operation
  10. #[inline(always)]
  11. pub fn run_db_operation_sync<F, E, E1, T>(
  12. info: &str,
  13. operation: F,
  14. error_map: E,
  15. ) -> Result<T, Error>
  16. where
  17. F: FnOnce() -> Result<T, E1>,
  18. E1: Debug,
  19. E: FnOnce(E1) -> Error,
  20. {
  21. let start = Instant::now();
  22. tracing::trace!("Running db operation {}", info);
  23. let result = operation().map_err(|e| {
  24. tracing::error!("Query {} failed with error {:?}", info, e);
  25. error_map(e)
  26. });
  27. let duration = start.elapsed();
  28. if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
  29. tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
  30. }
  31. result
  32. }
  33. /// Run a database operation and log slow operations, it also converts and logs any error with a
  34. /// given info for more context
  35. #[inline(always)]
  36. pub async fn run_db_operation<Fut, E, E1, T>(
  37. info: &str,
  38. operation: Fut,
  39. error_map: E,
  40. ) -> Result<T, Error>
  41. where
  42. Fut: Future<Output = Result<T, E1>>,
  43. E1: Debug,
  44. E: FnOnce(E1) -> Error,
  45. {
  46. let start = Instant::now();
  47. tracing::trace!("Running db operation {}", info);
  48. let result = operation.await.map_err(|e| {
  49. tracing::error!("Query {} failed with error {:?}", info, e);
  50. error_map(e)
  51. });
  52. let duration = start.elapsed();
  53. if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
  54. tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
  55. }
  56. result
  57. }
  58. /// Migrates the migration generated by `build.rs`
  59. #[inline(always)]
  60. pub async fn migrate<C>(
  61. conn: &C,
  62. db_prefix: &str,
  63. migrations: &[(&str, &str, &str)],
  64. ) -> Result<(), Error>
  65. where
  66. C: DatabaseExecutor,
  67. {
  68. query(
  69. r#"
  70. CREATE TABLE IF NOT EXISTS migrations (
  71. name TEXT PRIMARY KEY,
  72. applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  73. )
  74. "#,
  75. )?
  76. .execute(conn)
  77. .await?;
  78. // Apply each migration if it hasn’t been applied yet
  79. for (prefix, name, sql) in migrations {
  80. if !prefix.is_empty() && *prefix != db_prefix {
  81. continue;
  82. }
  83. let is_missing = query("SELECT name FROM migrations WHERE name = :name")?
  84. .bind("name", name)
  85. .pluck(conn)
  86. .await?
  87. .is_none();
  88. if is_missing {
  89. query(sql)?.batch(conn).await?;
  90. query(r#"INSERT INTO migrations (name) VALUES (:name)"#)?
  91. .bind("name", name)
  92. .execute(conn)
  93. .await?;
  94. }
  95. }
  96. Ok(())
  97. }