async_sqlite.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. //! Simple SQLite
  2. use cdk_common::database::Error;
  3. use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
  4. use cdk_sql_common::run_db_operation_sync;
  5. use cdk_sql_common::stmt::{query, Column, SqlPart, Statement};
  6. use rusqlite::{ffi, CachedStatement, Connection, Error as SqliteError, ErrorCode};
  7. use tokio::sync::Mutex;
  8. use crate::common::{from_sqlite, to_sqlite};
  9. /// Async Sqlite wrapper
  10. #[derive(Debug)]
  11. pub struct AsyncSqlite {
  12. inner: Mutex<Connection>,
  13. }
  14. impl AsyncSqlite {
  15. pub fn new(inner: Connection) -> Self {
  16. Self {
  17. inner: inner.into(),
  18. }
  19. }
  20. }
  21. impl AsyncSqlite {
  22. fn get_stmt<'a>(
  23. &self,
  24. conn: &'a Connection,
  25. statement: Statement,
  26. ) -> Result<(String, CachedStatement<'a>), Error> {
  27. let (sql, placeholder_values) = statement.to_sql()?;
  28. let new_sql = sql.trim().trim_end_matches("FOR UPDATE");
  29. let mut stmt = conn
  30. .prepare_cached(new_sql)
  31. .map_err(|e| Error::Database(Box::new(e)))?;
  32. for (i, value) in placeholder_values.into_iter().enumerate() {
  33. stmt.raw_bind_parameter(i + 1, to_sqlite(value))
  34. .map_err(|e| Error::Database(Box::new(e)))?;
  35. }
  36. Ok((sql, stmt))
  37. }
  38. }
  39. #[inline(always)]
  40. fn to_sqlite_error(err: SqliteError) -> Error {
  41. tracing::error!("Failed query with error {:?}", err);
  42. if let rusqlite::Error::SqliteFailure(
  43. ffi::Error {
  44. code,
  45. extended_code,
  46. },
  47. _,
  48. ) = err
  49. {
  50. if code == ErrorCode::ConstraintViolation
  51. && (extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
  52. || extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
  53. {
  54. Error::Duplicate
  55. } else {
  56. Error::Database(Box::new(err))
  57. }
  58. } else {
  59. Error::Database(Box::new(err))
  60. }
  61. }
  62. /// SQLite trasanction handler
  63. pub struct SQLiteTransactionHandler;
  64. #[async_trait::async_trait]
  65. impl DatabaseTransaction<AsyncSqlite> for SQLiteTransactionHandler {
  66. /// Consumes the current transaction committing the changes
  67. async fn commit(conn: &mut AsyncSqlite) -> Result<(), Error> {
  68. query("COMMIT")?.execute(conn).await?;
  69. Ok(())
  70. }
  71. /// Begin a transaction
  72. async fn begin(conn: &mut AsyncSqlite) -> Result<(), Error> {
  73. query("BEGIN IMMEDIATE")?.execute(conn).await?;
  74. Ok(())
  75. }
  76. /// Consumes the transaction rolling back all changes
  77. async fn rollback(conn: &mut AsyncSqlite) -> Result<(), Error> {
  78. query("ROLLBACK")?.execute(conn).await?;
  79. Ok(())
  80. }
  81. }
  82. impl DatabaseConnector for AsyncSqlite {
  83. type Transaction = SQLiteTransactionHandler;
  84. }
  85. #[async_trait::async_trait]
  86. impl DatabaseExecutor for AsyncSqlite {
  87. fn name() -> &'static str {
  88. "sqlite"
  89. }
  90. async fn execute(&self, statement: Statement) -> Result<usize, Error> {
  91. let conn = self.inner.lock().await;
  92. let (sql, mut stmt) = self
  93. .get_stmt(&conn, statement)
  94. .map_err(|e| Error::Database(Box::new(e)))?;
  95. run_db_operation_sync(&sql, || stmt.raw_execute(), to_sqlite_error)
  96. }
  97. async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
  98. let conn = self.inner.lock().await;
  99. let (sql, mut stmt) = self
  100. .get_stmt(&conn, statement)
  101. .map_err(|e| Error::Database(Box::new(e)))?;
  102. run_db_operation_sync(
  103. &sql,
  104. || {
  105. let columns = stmt.column_count();
  106. let mut rows = stmt.raw_query();
  107. rows.next()?
  108. .map(|row| {
  109. (0..columns)
  110. .map(|i| row.get(i).map(from_sqlite))
  111. .collect::<Result<Vec<_>, _>>()
  112. })
  113. .transpose()
  114. },
  115. to_sqlite_error,
  116. )
  117. }
  118. async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
  119. let conn = self.inner.lock().await;
  120. let (sql, mut stmt) = self
  121. .get_stmt(&conn, statement)
  122. .map_err(|e| Error::Database(Box::new(e)))?;
  123. let columns = stmt.column_count();
  124. run_db_operation_sync(
  125. &sql,
  126. || {
  127. let mut rows = stmt.raw_query();
  128. let mut results = vec![];
  129. while let Some(row) = rows.next()? {
  130. results.push(
  131. (0..columns)
  132. .map(|i| row.get(i).map(from_sqlite))
  133. .collect::<Result<Vec<_>, _>>()?,
  134. )
  135. }
  136. Ok(results)
  137. },
  138. to_sqlite_error,
  139. )
  140. }
  141. async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
  142. let conn = self.inner.lock().await;
  143. let (sql, mut stmt) = self
  144. .get_stmt(&conn, statement)
  145. .map_err(|e| Error::Database(Box::new(e)))?;
  146. run_db_operation_sync(
  147. &sql,
  148. || {
  149. let mut rows = stmt.raw_query();
  150. rows.next()?
  151. .map(|row| row.get(0usize).map(from_sqlite))
  152. .transpose()
  153. },
  154. to_sqlite_error,
  155. )
  156. }
  157. async fn batch(&self, mut statement: Statement) -> Result<(), Error> {
  158. let sql = {
  159. let part = statement
  160. .parts
  161. .pop()
  162. .ok_or(Error::Internal("Empty SQL".to_owned()))?;
  163. if !statement.parts.is_empty() || matches!(part, SqlPart::Placeholder(_, _)) {
  164. return Err(Error::Internal(
  165. "Invalid usage, batch does not support placeholders".to_owned(),
  166. ));
  167. }
  168. if let SqlPart::Raw(sql) = part {
  169. sql
  170. } else {
  171. unreachable!()
  172. }
  173. };
  174. let conn = self.inner.lock().await;
  175. run_db_operation_sync(&sql, || conn.execute_batch(&sql), to_sqlite_error)
  176. }
  177. }