Kaynağa Gözat

Add comments and improved the API

Cesar Rodas 5 ay önce
ebeveyn
işleme
bd738ebd78

+ 7 - 4
crates/cdk-sqlite/src/common.rs

@@ -1,17 +1,20 @@
 use std::sync::Arc;
+use std::time::Duration;
 
 use rusqlite::{params, Connection};
 
 use crate::pool::{Pool, ResourceManager};
 
+/// The config need to create a new SQLite connection
 #[derive(Debug)]
-pub(crate) struct Config {
+pub struct Config {
     path: Option<String>,
     password: Option<String>,
 }
 
+/// Sqlite connection manager
 #[derive(Debug)]
-pub(crate) struct SqliteConnectionManager;
+pub struct SqliteConnectionManager;
 
 impl ResourceManager for SqliteConnectionManager {
     type Config = Config;
@@ -30,7 +33,7 @@ impl ResourceManager for SqliteConnectionManager {
         };
 
         if let Some(password) = config.password.as_ref() {
-            conn.execute_batch(&format!("pragma key = {password};"))?;
+            conn.execute_batch(&format!("pragma key = '{password}';"))?;
         }
 
         conn.execute_batch(
@@ -77,7 +80,7 @@ pub fn create_sqlite_pool(
         )
     };
 
-    Pool::new(config, max_size)
+    Pool::new(config, max_size, Duration::from_secs(5))
 }
 
 /// Migrates the migration generated by `build.rs`

+ 2 - 0
crates/cdk-sqlite/src/macros.rs

@@ -1,3 +1,5 @@
+//! Collection of macros to generate code to digest data from SQLite
+
 /// Unpacks a vector of Column, and consumes it, parsing into individual variables, checking the
 /// vector is big enough.
 #[macro_export]

+ 1 - 0
crates/cdk-sqlite/src/mint/async_rusqlite.rs

@@ -88,6 +88,7 @@ impl Statement {
     }
 }
 
+/// Process a query
 #[inline(always)]
 fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, Error> {
     let mut stmt = conn.prepare_cached(&sql.sql)?;

+ 46 - 13
crates/cdk-sqlite/src/pool.rs

@@ -6,24 +6,33 @@ use std::fmt::Debug;
 use std::ops::{Deref, DerefMut};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Condvar, Mutex};
+use std::time::Duration;
 
+/// Pool error
 #[derive(thiserror::Error, Debug)]
 pub enum Error<E> {
     /// Mutex Poison Error
     #[error("Internal: PoisonError")]
-    PoisonError,
+    Poison,
+
+    /// Timeout error
+    #[error("Timed out waiting for a resource")]
+    Timeout,
 
     /// Internal database error
     #[error(transparent)]
-    ResourceError(#[from] E),
+    Resource(#[from] E),
 }
 
 /// Trait to manage resources
 pub trait ResourceManager: Debug {
+    /// The resource to be pooled
     type Resource: Debug;
 
+    /// The configuration that is needed in order to create the resource
     type Config: Debug;
 
+    /// The error the resource may return when creating a new instance
     type Error: Debug;
 
     /// Creates a new resource with a given config
@@ -43,10 +52,12 @@ where
     queue: Mutex<Vec<RM::Resource>>,
     in_use: AtomicUsize,
     max_size: usize,
+    default_timeout: Duration,
     waiter: Condvar,
 }
 
-pub struct WrappedResource<RM>
+/// The pooled resource
+pub struct PooledResource<RM>
 where
     RM: ResourceManager,
 {
@@ -54,7 +65,7 @@ where
     pool: Arc<Pool<RM>>,
 }
 
-impl<RM> Drop for WrappedResource<RM>
+impl<RM> Drop for PooledResource<RM>
 where
     RM: ResourceManager,
 {
@@ -70,7 +81,7 @@ where
     }
 }
 
-impl<RM> Deref for WrappedResource<RM>
+impl<RM> Deref for PooledResource<RM>
 where
     RM: ResourceManager,
 {
@@ -81,7 +92,7 @@ where
     }
 }
 
-impl<RM> DerefMut for WrappedResource<RM>
+impl<RM> DerefMut for PooledResource<RM>
 where
     RM: ResourceManager,
 {
@@ -95,25 +106,40 @@ where
     RM: ResourceManager,
 {
     /// Creates a new pool
-    pub fn new(config: RM::Config, max_size: usize) -> Arc<Self> {
+    pub fn new(config: RM::Config, max_size: usize, default_timeout: Duration) -> Arc<Self> {
         Arc::new(Self {
             config,
             queue: Default::default(),
             in_use: Default::default(),
             waiter: Default::default(),
+            default_timeout,
             max_size,
         })
     }
 
-    pub fn get(self: &Arc<Self>) -> Result<WrappedResource<RM>, Error<RM::Error>> {
-        let mut resources = self.queue.lock().map_err(|_| Error::PoisonError)?;
+    /// Similar to get_timeout but uses the default timeout value.
+    #[inline(always)]
+    pub fn get(self: &Arc<Self>) -> Result<PooledResource<RM>, Error<RM::Error>> {
+        self.get_timeout(self.default_timeout)
+    }
+
+    /// Get a new resource or fail after timeout is reached.
+    ///
+    /// This function will return a free resource or create a new one if there is still room for it;
+    /// otherwise, it will wait for a resource to be released for reuse.
+    #[inline(always)]
+    pub fn get_timeout(
+        self: &Arc<Self>,
+        timeout: Duration,
+    ) -> Result<PooledResource<RM>, Error<RM::Error>> {
+        let mut resources = self.queue.lock().map_err(|_| Error::Poison)?;
 
         loop {
             if let Some(resource) = resources.pop() {
                 drop(resources);
                 self.in_use.fetch_add(1, Ordering::AcqRel);
 
-                return Ok(WrappedResource {
+                return Ok(PooledResource {
                     resource: Some(resource),
                     pool: self.clone(),
                 });
@@ -123,7 +149,7 @@ where
                 drop(resources);
                 self.in_use.fetch_add(1, Ordering::AcqRel);
 
-                return Ok(WrappedResource {
+                return Ok(PooledResource {
                     resource: Some(RM::new_resource(&self.config)?),
                     pool: self.clone(),
                 });
@@ -131,8 +157,15 @@ where
 
             resources = self
                 .waiter
-                .wait(resources)
-                .map_err(|_| Error::PoisonError)?;
+                .wait_timeout(resources, timeout)
+                .map_err(|_| Error::Poison)
+                .and_then(|(lock, timeout_result)| {
+                    if timeout_result.timed_out() {
+                        Err(Error::Timeout)
+                    } else {
+                        Ok(lock)
+                    }
+                })?;
         }
     }
 }

+ 20 - 9
crates/cdk-sqlite/src/stmt.rs

@@ -1,32 +1,42 @@
 use rusqlite::{self, CachedStatement};
 
 use crate::common::SqliteConnectionManager;
-use crate::pool::WrappedResource;
+use crate::pool::PooledResource;
 
+/// The Value coming from SQLite
 pub type Value = rusqlite::types::Value;
 
 /// The Column type
-pub type Column = rusqlite::types::Value;
+pub type Column = Value;
 
-/// Expected Sql response
+/// Expected response type for a given SQL statement
 #[derive(Debug, Clone, Copy, Default)]
 pub enum ExpectedSqlResponse {
+    /// A single row
     SingleRow,
+    /// All the rows that matches a query
     #[default]
     ManyRows,
+    /// How many rows were affected by the query
     AffectedRows,
+    /// Return the first column of the first row
     Pluck,
 }
 
 /// Sql message
 #[derive(Default, Debug)]
 pub struct Statement {
+    /// The SQL statement
     pub sql: String,
+    /// The list of arguments for the placeholders. It only supports named arguments for simplicity
+    /// sake
     pub args: Vec<(String, Value)>,
+    /// The expected response type
     pub expected_response: ExpectedSqlResponse,
 }
 
 impl Statement {
+    /// Creates a new statement
     pub fn new<T: ToString>(sql: T) -> Self {
         Self {
             sql: sql.to_string(),
@@ -34,6 +44,7 @@ impl Statement {
         }
     }
 
+    /// Binds a given placeholder to a value.
     #[inline]
     pub fn bind<C: ToString, V: Into<Value>>(mut self, name: C, value: V) -> Self {
         self.args.push((name.to_string(), value.into()));
@@ -87,7 +98,7 @@ impl Statement {
 
     fn get_stmt(
         self,
-        conn: &WrappedResource<SqliteConnectionManager>,
+        conn: &PooledResource<SqliteConnectionManager>,
     ) -> rusqlite::Result<CachedStatement<'_>> {
         let mut stmt = conn.prepare_cached(&self.sql)?;
         for (name, value) in self.args {
@@ -101,11 +112,11 @@ impl Statement {
 
         Ok(stmt)
     }
-    ///
+
     /// Executes a query and returns the affected rows
     pub fn plunk(
         self,
-        conn: &WrappedResource<SqliteConnectionManager>,
+        conn: &PooledResource<SqliteConnectionManager>,
     ) -> rusqlite::Result<Option<Value>> {
         let mut stmt = self.get_stmt(conn)?;
         let mut rows = stmt.raw_query();
@@ -115,7 +126,7 @@ impl Statement {
     /// Executes a query and returns the affected rows
     pub fn execute(
         self,
-        conn: &WrappedResource<SqliteConnectionManager>,
+        conn: &PooledResource<SqliteConnectionManager>,
     ) -> rusqlite::Result<usize> {
         self.get_stmt(conn)?.raw_execute()
     }
@@ -123,7 +134,7 @@ impl Statement {
     /// Runs the query and returns the first row or None
     pub fn fetch_one(
         self,
-        conn: &WrappedResource<SqliteConnectionManager>,
+        conn: &PooledResource<SqliteConnectionManager>,
     ) -> rusqlite::Result<Option<Vec<Column>>> {
         let mut stmt = self.get_stmt(conn)?;
         let columns = stmt.column_count();
@@ -140,7 +151,7 @@ impl Statement {
     /// Runs the query and returns the first row or None
     pub fn fetch_all(
         self,
-        conn: &WrappedResource<SqliteConnectionManager>,
+        conn: &PooledResource<SqliteConnectionManager>,
     ) -> rusqlite::Result<Vec<Vec<Column>>> {
         let mut stmt = self.get_stmt(conn)?;
         let columns = stmt.column_count();