async_rusqlite.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. use std::marker::PhantomData;
  2. use std::sync::Arc;
  3. //use std::sync::atomic::AtomicUsize;
  4. //use std::sync::Arc;
  5. use std::thread::spawn;
  6. use rusqlite::Connection;
  7. use tokio::sync::{mpsc, oneshot};
  8. use crate::common::SqliteConnectionManager;
  9. use crate::mint::Error;
  10. use crate::pool::Pool;
  11. use crate::stmt::{Column, ExpectedSqlResponse, Statement as InnerStatement, Value};
  12. const BUFFER_REQUEST_SIZE: usize = 10_000;
  13. #[derive(Debug, Clone)]
  14. pub struct AsyncRusqlite {
  15. sender: mpsc::Sender<DbRequest>,
  16. //inflight_requests: Arc<AtomicUsize>,
  17. }
  18. /// Internal request for the database thread
  19. #[derive(Debug)]
  20. pub enum DbRequest {
  21. Sql(InnerStatement, oneshot::Sender<DbResponse>),
  22. Begin(oneshot::Sender<DbResponse>),
  23. Commit(oneshot::Sender<DbResponse>),
  24. Rollback(oneshot::Sender<DbResponse>),
  25. }
  26. #[derive(Debug)]
  27. pub enum DbResponse {
  28. Transaction(mpsc::Sender<DbRequest>),
  29. AffectedRows(usize),
  30. Pluck(Option<Column>),
  31. Row(Option<Vec<Column>>),
  32. Rows(Vec<Vec<Column>>),
  33. Error(Error),
  34. Unexpected,
  35. Ok,
  36. }
  37. /// Statement for the async_rusqlite wrapper
  38. pub struct Statement(InnerStatement);
  39. impl Statement {
  40. /// Bind a variable
  41. pub fn bind<C: ToString, V: Into<Value>>(self, name: C, value: V) -> Self {
  42. Self(self.0.bind(name, value))
  43. }
  44. /// Bind vec
  45. pub fn bind_vec<C: ToString, V: Into<Value>>(self, name: C, value: Vec<V>) -> Self {
  46. Self(self.0.bind_vec(name, value))
  47. }
  48. /// Executes a query and return the number of affected rows
  49. pub async fn execute<C>(self, conn: &C) -> Result<usize, Error>
  50. where
  51. C: DatabaseExecutor + Send + Sync,
  52. {
  53. conn.execute(self.0).await
  54. }
  55. /// Returns the first column of the first row of the query result
  56. pub async fn pluck<C>(self, conn: &C) -> Result<Option<Column>, Error>
  57. where
  58. C: DatabaseExecutor + Send + Sync,
  59. {
  60. conn.pluck(self.0).await
  61. }
  62. /// Returns the first row of the query result
  63. pub async fn fetch_one<C>(self, conn: &C) -> Result<Option<Vec<Column>>, Error>
  64. where
  65. C: DatabaseExecutor + Send + Sync,
  66. {
  67. conn.fetch_one(self.0).await
  68. }
  69. /// Returns all rows of the query result
  70. pub async fn fetch_all<C>(self, conn: &C) -> Result<Vec<Vec<Column>>, Error>
  71. where
  72. C: DatabaseExecutor + Send + Sync,
  73. {
  74. conn.fetch_all(self.0).await
  75. }
  76. }
  77. /// Process a query
  78. #[inline(always)]
  79. fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, Error> {
  80. let mut stmt = conn.prepare_cached(&sql.sql)?;
  81. for (name, value) in sql.args {
  82. let index = stmt
  83. .parameter_index(&name)
  84. .map_err(|_| Error::MissingParameter(name.clone()))?
  85. .ok_or(Error::MissingParameter(name))?;
  86. stmt.raw_bind_parameter(index, value)?;
  87. }
  88. let columns = stmt.column_count();
  89. Ok(match sql.expected_response {
  90. ExpectedSqlResponse::AffectedRows => DbResponse::AffectedRows(stmt.raw_execute()?),
  91. ExpectedSqlResponse::ManyRows => {
  92. let mut rows = stmt.raw_query();
  93. let mut results = vec![];
  94. while let Some(row) = rows.next()? {
  95. results.push(
  96. (0..columns)
  97. .map(|i| row.get(i))
  98. .collect::<Result<Vec<_>, _>>()?,
  99. )
  100. }
  101. DbResponse::Rows(results)
  102. }
  103. ExpectedSqlResponse::Pluck => {
  104. let mut rows = stmt.raw_query();
  105. DbResponse::Pluck(rows.next()?.map(|row| row.get(0usize)).transpose()?)
  106. }
  107. ExpectedSqlResponse::SingleRow => {
  108. let mut rows = stmt.raw_query();
  109. let row = rows
  110. .next()?
  111. .map(|row| {
  112. (0..columns)
  113. .map(|i| row.get(i))
  114. .collect::<Result<Vec<_>, _>>()
  115. })
  116. .transpose()?;
  117. DbResponse::Row(row)
  118. }
  119. })
  120. }
  121. fn rusqlite_worker(
  122. mut receiver: mpsc::Receiver<DbRequest>,
  123. pool: Arc<Pool<SqliteConnectionManager>>,
  124. ) {
  125. while let Some(request) = receiver.blocking_recv() {
  126. match request {
  127. DbRequest::Sql(sql, reply_to) => {
  128. let conn = match pool.get() {
  129. Ok(conn) => conn,
  130. Err(err) => {
  131. let _ = reply_to.send(DbResponse::Error(err.into()));
  132. continue;
  133. }
  134. };
  135. let result = process_query(&conn, sql);
  136. let _ = match result {
  137. Ok(ok) => reply_to.send(ok),
  138. Err(err) => reply_to.send(DbResponse::Error(err)),
  139. };
  140. drop(conn);
  141. }
  142. DbRequest::Begin(reply_to) => {
  143. let (sender, mut receiver) = mpsc::channel(BUFFER_REQUEST_SIZE);
  144. let mut conn = match pool.get() {
  145. Ok(conn) => conn,
  146. Err(err) => {
  147. let _ = reply_to.send(DbResponse::Error(err.into()));
  148. continue;
  149. }
  150. };
  151. let tx = match conn.transaction() {
  152. Ok(tx) => tx,
  153. Err(err) => {
  154. let _ = reply_to.send(DbResponse::Error(err.into()));
  155. continue;
  156. }
  157. };
  158. // Transaction has begun successfully, send the `sender` back to the caller
  159. // and wait for statements to execute. On `Drop` the wrapper transaction
  160. // should send a `rollback`.
  161. //
  162. let _ = reply_to.send(DbResponse::Transaction(sender));
  163. // We intentionally handle the transaction hijacking the main loop, there is
  164. // no point is queueing more operations for SQLite, since transaction have
  165. // exclusive access. In other database implementation this block of code
  166. // should be sent to their own thread to allow concurrency
  167. loop {
  168. let request = if let Some(request) = receiver.blocking_recv() {
  169. request
  170. } else {
  171. // If the receiver loop is broken (i.e no more `senders` are active) and no
  172. // `Commit` statement has been sent, this will trigger a `Rollback`
  173. // automatically
  174. let _ = tx.rollback();
  175. break;
  176. };
  177. match request {
  178. DbRequest::Commit(reply_to) => {
  179. let _ = reply_to.send(match tx.commit() {
  180. Ok(()) => DbResponse::Ok,
  181. Err(err) => DbResponse::Error(err.into()),
  182. });
  183. break;
  184. }
  185. DbRequest::Rollback(reply_to) => {
  186. let _ = reply_to.send(match tx.rollback() {
  187. Ok(()) => DbResponse::Ok,
  188. Err(err) => DbResponse::Error(err.into()),
  189. });
  190. break;
  191. }
  192. DbRequest::Begin(reply_to) => {
  193. let _ = reply_to.send(DbResponse::Unexpected);
  194. }
  195. DbRequest::Sql(sql, reply_to) => {
  196. let _ = match process_query(&tx, sql) {
  197. Ok(ok) => reply_to.send(ok),
  198. Err(err) => reply_to.send(DbResponse::Error(err)),
  199. };
  200. }
  201. }
  202. }
  203. drop(conn);
  204. }
  205. DbRequest::Commit(reply_to) => {
  206. let _ = reply_to.send(DbResponse::Unexpected);
  207. }
  208. DbRequest::Rollback(reply_to) => {
  209. let _ = reply_to.send(DbResponse::Unexpected);
  210. }
  211. }
  212. }
  213. }
  214. #[async_trait::async_trait]
  215. pub trait DatabaseExecutor {
  216. fn get_queue_sender(&self) -> mpsc::Sender<DbRequest>;
  217. async fn execute(&self, mut statement: InnerStatement) -> Result<usize, Error> {
  218. let (sender, receiver) = oneshot::channel();
  219. statement.expected_response = ExpectedSqlResponse::AffectedRows;
  220. self.get_queue_sender()
  221. .send(DbRequest::Sql(statement, sender))
  222. .await
  223. .map_err(|_| Error::Communication)?;
  224. match receiver.await.map_err(|_| Error::Communication)? {
  225. DbResponse::AffectedRows(n) => Ok(n),
  226. DbResponse::Error(err) => Err(err),
  227. _ => Err(Error::InvalidDbResponse),
  228. }
  229. }
  230. async fn fetch_one(&self, mut statement: InnerStatement) -> Result<Option<Vec<Column>>, Error> {
  231. let (sender, receiver) = oneshot::channel();
  232. statement.expected_response = ExpectedSqlResponse::SingleRow;
  233. self.get_queue_sender()
  234. .send(DbRequest::Sql(statement, sender))
  235. .await
  236. .map_err(|_| Error::Communication)?;
  237. match receiver.await.map_err(|_| Error::Communication)? {
  238. DbResponse::Row(row) => Ok(row),
  239. DbResponse::Error(err) => Err(err),
  240. _ => Err(Error::InvalidDbResponse),
  241. }
  242. }
  243. async fn fetch_all(&self, mut statement: InnerStatement) -> Result<Vec<Vec<Column>>, Error> {
  244. let (sender, receiver) = oneshot::channel();
  245. statement.expected_response = ExpectedSqlResponse::ManyRows;
  246. self.get_queue_sender()
  247. .send(DbRequest::Sql(statement, sender))
  248. .await
  249. .map_err(|_| Error::Communication)?;
  250. match receiver.await.map_err(|_| Error::Communication)? {
  251. DbResponse::Rows(rows) => Ok(rows),
  252. DbResponse::Error(err) => Err(err),
  253. _ => Err(Error::InvalidDbResponse),
  254. }
  255. }
  256. async fn pluck(&self, mut statement: InnerStatement) -> Result<Option<Column>, Error> {
  257. let (sender, receiver) = oneshot::channel();
  258. statement.expected_response = ExpectedSqlResponse::Pluck;
  259. self.get_queue_sender()
  260. .send(DbRequest::Sql(statement, sender))
  261. .await
  262. .map_err(|_| Error::Communication)?;
  263. match receiver.await.map_err(|_| Error::Communication)? {
  264. DbResponse::Pluck(value) => Ok(value),
  265. DbResponse::Error(err) => Err(err),
  266. _ => Err(Error::InvalidDbResponse),
  267. }
  268. }
  269. }
  270. #[inline(always)]
  271. pub fn query<T: ToString>(sql: T) -> Statement {
  272. Statement(crate::stmt::Statement::new(sql))
  273. }
  274. impl AsyncRusqlite {
  275. pub fn new(pool: Arc<Pool<SqliteConnectionManager>>) -> Self {
  276. let (sender, receiver) = mpsc::channel(BUFFER_REQUEST_SIZE);
  277. spawn(move || {
  278. rusqlite_worker(receiver, pool);
  279. });
  280. Self {
  281. sender,
  282. //inflight_requests: Arc::new(0.into()),
  283. }
  284. }
  285. /// Begins a transaction
  286. ///
  287. /// If the transaction is Drop it will trigger a rollback operation
  288. pub async fn begin(&self) -> Result<Transaction<'_>, Error> {
  289. let (sender, receiver) = oneshot::channel();
  290. self.sender
  291. .send(DbRequest::Begin(sender))
  292. .await
  293. .map_err(|_| Error::Communication)?;
  294. match receiver.await.map_err(|_| Error::Communication)? {
  295. DbResponse::Transaction(db_sender) => Ok(Transaction {
  296. db_sender,
  297. _marker: PhantomData,
  298. }),
  299. DbResponse::Error(err) => Err(err),
  300. _ => Err(Error::InvalidDbResponse),
  301. }
  302. }
  303. }
  304. impl DatabaseExecutor for AsyncRusqlite {
  305. #[inline(always)]
  306. fn get_queue_sender(&self) -> mpsc::Sender<DbRequest> {
  307. self.sender.clone()
  308. }
  309. }
  310. pub struct Transaction<'conn> {
  311. db_sender: mpsc::Sender<DbRequest>,
  312. _marker: PhantomData<&'conn ()>,
  313. }
  314. impl Drop for Transaction<'_> {
  315. fn drop(&mut self) {
  316. let (sender, _) = oneshot::channel();
  317. let _ = self.db_sender.try_send(DbRequest::Rollback(sender));
  318. }
  319. }
  320. impl Transaction<'_> {
  321. pub async fn commit(self) -> Result<(), Error> {
  322. let (sender, receiver) = oneshot::channel();
  323. self.db_sender
  324. .send(DbRequest::Commit(sender))
  325. .await
  326. .map_err(|_| Error::Communication)?;
  327. match receiver.await.map_err(|_| Error::Communication)? {
  328. DbResponse::Ok => Ok(()),
  329. DbResponse::Error(err) => Err(err),
  330. _ => Err(Error::InvalidDbResponse),
  331. }
  332. }
  333. pub async fn rollback(self) -> Result<(), Error> {
  334. let (sender, receiver) = oneshot::channel();
  335. self.db_sender
  336. .send(DbRequest::Rollback(sender))
  337. .await
  338. .map_err(|_| Error::Communication)?;
  339. match receiver.await.map_err(|_| Error::Communication)? {
  340. DbResponse::Ok => Ok(()),
  341. DbResponse::Error(err) => Err(err),
  342. _ => Err(Error::InvalidDbResponse),
  343. }
  344. }
  345. }
  346. impl DatabaseExecutor for Transaction<'_> {
  347. /// Get the internal sender to the SQL queue
  348. #[inline(always)]
  349. fn get_queue_sender(&self) -> mpsc::Sender<DbRequest> {
  350. self.db_sender.clone()
  351. }
  352. }