| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- //! Simple SQLite
- use cdk_common::database::Error;
- use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
- use cdk_sql_common::run_db_operation_sync;
- use cdk_sql_common::stmt::{query, Column, SqlPart, Statement};
- use rusqlite::{ffi, CachedStatement, Connection, Error as SqliteError, ErrorCode};
- use tokio::sync::Mutex;
- use crate::common::{from_sqlite, to_sqlite};
- /// Async Sqlite wrapper
- #[derive(Debug)]
- pub struct AsyncSqlite {
- inner: Mutex<Connection>,
- }
- impl AsyncSqlite {
- pub fn new(inner: Connection) -> Self {
- Self {
- inner: inner.into(),
- }
- }
- }
- impl AsyncSqlite {
- fn get_stmt<'a>(
- &self,
- conn: &'a Connection,
- statement: Statement,
- ) -> Result<(String, CachedStatement<'a>), Error> {
- let (sql, placeholder_values) = statement.to_sql()?;
- let new_sql = sql.trim().trim_end_matches("FOR UPDATE");
- let mut stmt = conn
- .prepare_cached(new_sql)
- .map_err(|e| Error::Database(Box::new(e)))?;
- for (i, value) in placeholder_values.into_iter().enumerate() {
- stmt.raw_bind_parameter(i + 1, to_sqlite(value))
- .map_err(|e| Error::Database(Box::new(e)))?;
- }
- Ok((sql, stmt))
- }
- }
- #[inline(always)]
- fn to_sqlite_error(err: SqliteError) -> Error {
- tracing::error!("Failed query with error {:?}", err);
- if let rusqlite::Error::SqliteFailure(
- ffi::Error {
- code,
- extended_code,
- },
- _,
- ) = err
- {
- if code == ErrorCode::ConstraintViolation
- && (extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
- || extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
- {
- Error::Duplicate
- } else {
- Error::Database(Box::new(err))
- }
- } else {
- Error::Database(Box::new(err))
- }
- }
- /// SQLite trasanction handler
- pub struct SQLiteTransactionHandler;
- #[async_trait::async_trait]
- impl DatabaseTransaction<AsyncSqlite> for SQLiteTransactionHandler {
- /// Consumes the current transaction committing the changes
- async fn commit(conn: &mut AsyncSqlite) -> Result<(), Error> {
- query("COMMIT")?.execute(conn).await?;
- Ok(())
- }
- /// Begin a transaction
- async fn begin(conn: &mut AsyncSqlite) -> Result<(), Error> {
- query("BEGIN IMMEDIATE")?.execute(conn).await?;
- Ok(())
- }
- /// Consumes the transaction rolling back all changes
- async fn rollback(conn: &mut AsyncSqlite) -> Result<(), Error> {
- query("ROLLBACK")?.execute(conn).await?;
- Ok(())
- }
- }
- impl DatabaseConnector for AsyncSqlite {
- type Transaction = SQLiteTransactionHandler;
- }
- #[async_trait::async_trait]
- impl DatabaseExecutor for AsyncSqlite {
- fn name() -> &'static str {
- "sqlite"
- }
- async fn execute(&self, statement: Statement) -> Result<usize, Error> {
- let conn = self.inner.lock().await;
- let (sql, mut stmt) = self
- .get_stmt(&conn, statement)
- .map_err(|e| Error::Database(Box::new(e)))?;
- run_db_operation_sync(&sql, || stmt.raw_execute(), to_sqlite_error)
- }
- async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
- let conn = self.inner.lock().await;
- let (sql, mut stmt) = self
- .get_stmt(&conn, statement)
- .map_err(|e| Error::Database(Box::new(e)))?;
- run_db_operation_sync(
- &sql,
- || {
- let columns = stmt.column_count();
- let mut rows = stmt.raw_query();
- rows.next()?
- .map(|row| {
- (0..columns)
- .map(|i| row.get(i).map(from_sqlite))
- .collect::<Result<Vec<_>, _>>()
- })
- .transpose()
- },
- to_sqlite_error,
- )
- }
- async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
- let conn = self.inner.lock().await;
- let (sql, mut stmt) = self
- .get_stmt(&conn, statement)
- .map_err(|e| Error::Database(Box::new(e)))?;
- let columns = stmt.column_count();
- run_db_operation_sync(
- &sql,
- || {
- let mut rows = stmt.raw_query();
- let mut results = vec![];
- while let Some(row) = rows.next()? {
- results.push(
- (0..columns)
- .map(|i| row.get(i).map(from_sqlite))
- .collect::<Result<Vec<_>, _>>()?,
- )
- }
- Ok(results)
- },
- to_sqlite_error,
- )
- }
- async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
- let conn = self.inner.lock().await;
- let (sql, mut stmt) = self
- .get_stmt(&conn, statement)
- .map_err(|e| Error::Database(Box::new(e)))?;
- run_db_operation_sync(
- &sql,
- || {
- let mut rows = stmt.raw_query();
- rows.next()?
- .map(|row| row.get(0usize).map(from_sqlite))
- .transpose()
- },
- to_sqlite_error,
- )
- }
- async fn batch(&self, mut statement: Statement) -> Result<(), Error> {
- let sql = {
- let part = statement
- .parts
- .pop()
- .ok_or(Error::Internal("Empty SQL".to_owned()))?;
- if !statement.parts.is_empty() || matches!(part, SqlPart::Placeholder(_, _)) {
- return Err(Error::Internal(
- "Invalid usage, batch does not support placeholders".to_owned(),
- ));
- }
- if let SqlPart::Raw(sql) = part {
- sql
- } else {
- unreachable!()
- }
- };
- let conn = self.inner.lock().await;
- run_db_operation_sync(&sql, || conn.execute_batch(&sql), to_sqlite_error)
- }
- }
|