async_sqlite.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. //! Simple SQLite
  2. use cdk_common::database::Error;
  3. use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
  4. use cdk_sql_common::run_db_operation_sync;
  5. use cdk_sql_common::stmt::{query, Column, SqlPart, Statement};
  6. use rusqlite::{ffi, CachedStatement, Connection, Error as SqliteError, ErrorCode};
  7. use tokio::sync::Mutex;
  8. use crate::common::{from_sqlite, to_sqlite};
  9. /// Async Sqlite wrapper
  10. #[derive(Debug)]
  11. pub struct AsyncSqlite {
  12. inner: Mutex<Connection>,
  13. }
  14. impl AsyncSqlite {
  15. pub fn new(inner: Connection) -> Self {
  16. Self {
  17. inner: inner.into(),
  18. }
  19. }
  20. }
  21. impl AsyncSqlite {
  22. fn get_stmt<'a>(
  23. &self,
  24. conn: &'a Connection,
  25. statement: Statement,
  26. ) -> Result<(String, CachedStatement<'a>), Error> {
  27. let (sql, placeholder_values) = statement.to_sql()?;
  28. let new_sql = sql.trim().trim_end_matches("FOR UPDATE");
  29. let mut stmt = conn
  30. .prepare_cached(new_sql)
  31. .map_err(|e| Error::Database(Box::new(e)))?;
  32. for (i, value) in placeholder_values.into_iter().enumerate() {
  33. stmt.raw_bind_parameter(i + 1, to_sqlite(value))
  34. .map_err(|e| Error::Database(Box::new(e)))?;
  35. }
  36. Ok((sql, stmt))
  37. }
  38. }
  39. #[inline(always)]
  40. fn to_sqlite_error(err: SqliteError) -> Error {
  41. tracing::error!("Failed query with error {:?}", err);
  42. if let rusqlite::Error::SqliteFailure(
  43. ffi::Error {
  44. code,
  45. extended_code,
  46. },
  47. _,
  48. ) = err
  49. {
  50. if code == ErrorCode::ConstraintViolation
  51. && (extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
  52. || extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
  53. {
  54. Error::Duplicate
  55. } else {
  56. Error::Database(Box::new(err))
  57. }
  58. } else {
  59. Error::Database(Box::new(err))
  60. }
  61. }
  62. /// SQLite trasanction handler
  63. #[allow(missing_debug_implementations)]
  64. pub struct SQLiteTransactionHandler;
  65. #[async_trait::async_trait]
  66. impl DatabaseTransaction<AsyncSqlite> for SQLiteTransactionHandler {
  67. /// Consumes the current transaction committing the changes
  68. async fn commit(conn: &mut AsyncSqlite) -> Result<(), Error> {
  69. query("COMMIT")?.execute(conn).await?;
  70. Ok(())
  71. }
  72. /// Begin a transaction
  73. async fn begin(conn: &mut AsyncSqlite) -> Result<(), Error> {
  74. query("BEGIN IMMEDIATE")?.execute(conn).await?;
  75. Ok(())
  76. }
  77. /// Consumes the transaction rolling back all changes
  78. async fn rollback(conn: &mut AsyncSqlite) -> Result<(), Error> {
  79. query("ROLLBACK")?.execute(conn).await?;
  80. Ok(())
  81. }
  82. }
  83. impl DatabaseConnector for AsyncSqlite {
  84. type Transaction = SQLiteTransactionHandler;
  85. }
  86. #[async_trait::async_trait]
  87. impl DatabaseExecutor for AsyncSqlite {
  88. fn name() -> &'static str {
  89. "sqlite"
  90. }
  91. async fn execute(&self, statement: Statement) -> Result<usize, Error> {
  92. let conn = self.inner.lock().await;
  93. let (sql, mut stmt) = self
  94. .get_stmt(&conn, statement)
  95. .map_err(|e| Error::Database(Box::new(e)))?;
  96. run_db_operation_sync(&sql, || stmt.raw_execute(), to_sqlite_error)
  97. }
  98. async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
  99. let conn = self.inner.lock().await;
  100. let (sql, mut stmt) = self
  101. .get_stmt(&conn, statement)
  102. .map_err(|e| Error::Database(Box::new(e)))?;
  103. run_db_operation_sync(
  104. &sql,
  105. || {
  106. let columns = stmt.column_count();
  107. let mut rows = stmt.raw_query();
  108. rows.next()?
  109. .map(|row| {
  110. (0..columns)
  111. .map(|i| row.get(i).map(from_sqlite))
  112. .collect::<Result<Vec<_>, _>>()
  113. })
  114. .transpose()
  115. },
  116. to_sqlite_error,
  117. )
  118. }
  119. async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
  120. let conn = self.inner.lock().await;
  121. let (sql, mut stmt) = self
  122. .get_stmt(&conn, statement)
  123. .map_err(|e| Error::Database(Box::new(e)))?;
  124. let columns = stmt.column_count();
  125. run_db_operation_sync(
  126. &sql,
  127. || {
  128. let mut rows = stmt.raw_query();
  129. let mut results = vec![];
  130. while let Some(row) = rows.next()? {
  131. results.push(
  132. (0..columns)
  133. .map(|i| row.get(i).map(from_sqlite))
  134. .collect::<Result<Vec<_>, _>>()?,
  135. )
  136. }
  137. Ok(results)
  138. },
  139. to_sqlite_error,
  140. )
  141. }
  142. async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
  143. let conn = self.inner.lock().await;
  144. let (sql, mut stmt) = self
  145. .get_stmt(&conn, statement)
  146. .map_err(|e| Error::Database(Box::new(e)))?;
  147. run_db_operation_sync(
  148. &sql,
  149. || {
  150. let mut rows = stmt.raw_query();
  151. rows.next()?
  152. .map(|row| row.get(0usize).map(from_sqlite))
  153. .transpose()
  154. },
  155. to_sqlite_error,
  156. )
  157. }
  158. async fn batch(&self, mut statement: Statement) -> Result<(), Error> {
  159. let sql = {
  160. let part = statement
  161. .parts
  162. .pop()
  163. .ok_or(Error::Internal("Empty SQL".to_owned()))?;
  164. if !statement.parts.is_empty() || matches!(part, SqlPart::Placeholder(_, _)) {
  165. return Err(Error::Internal(
  166. "Invalid usage, batch does not support placeholders".to_owned(),
  167. ));
  168. }
  169. if let SqlPart::Raw(sql) = part {
  170. sql
  171. } else {
  172. unreachable!()
  173. }
  174. };
  175. let conn = self.inner.lock().await;
  176. run_db_operation_sync(&sql, || conn.execute_batch(&sql), to_sqlite_error)
  177. }
  178. }