Browse Source

Add ability to disconnect

Cesar Rodas 1 month ago
parent
commit
5242c42c72
2 changed files with 29 additions and 16 deletions
  1. 24 15
      crates/cdk-sql-base/src/pool.rs
  2. 5 1
      crates/cdk-sqlite/src/common.rs

+ 24 - 15
crates/cdk-sql-base/src/pool.rs

@@ -4,7 +4,7 @@
 
 use std::fmt::Debug;
 use std::ops::{Deref, DerefMut};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::sync::{Arc, Condvar, Mutex};
 use std::time::Duration;
 
@@ -36,7 +36,10 @@ pub trait ResourceManager: Debug {
     type Error: Debug;
 
     /// Creates a new resource with a given config
-    fn new_resource(config: &Self::Config) -> Result<Self::Resource, Error<Self::Error>>;
+    fn new_resource(
+        config: &Self::Config,
+        still_valid: Arc<AtomicBool>,
+    ) -> Result<Self::Resource, Error<Self::Error>>;
 
     /// The object is dropped
     fn drop(_resource: Self::Resource) {}
@@ -49,7 +52,7 @@ where
     RM: ResourceManager,
 {
     config: RM::Config,
-    queue: Mutex<Vec<RM::Resource>>,
+    queue: Mutex<Vec<(Arc<AtomicBool>, RM::Resource)>>,
     in_use: AtomicUsize,
     max_size: usize,
     default_timeout: Duration,
@@ -61,7 +64,7 @@ pub struct PooledResource<RM>
 where
     RM: ResourceManager,
 {
-    resource: Option<RM::Resource>,
+    resource: Option<(Arc<AtomicBool>, RM::Resource)>,
     pool: Arc<Pool<RM>>,
 }
 
@@ -88,7 +91,7 @@ where
     type Target = RM::Resource;
 
     fn deref(&self) -> &Self::Target {
-        self.resource.as_ref().expect("resource already dropped")
+        &self.resource.as_ref().expect("resource already dropped").1
     }
 }
 
@@ -97,7 +100,7 @@ where
     RM: ResourceManager,
 {
     fn deref_mut(&mut self) -> &mut Self::Target {
-        self.resource.as_mut().expect("resource already dropped")
+        &mut self.resource.as_mut().expect("resource already dropped").1
     }
 }
 
@@ -136,21 +139,27 @@ where
 
         loop {
             if let Some(resource) = resources.pop() {
-                drop(resources);
-                self.in_use.fetch_add(1, Ordering::AcqRel);
-
-                return Ok(PooledResource {
-                    resource: Some(resource),
-                    pool: self.clone(),
-                });
+                if resource.0.load(Ordering::SeqCst) {
+                    drop(resources);
+                    self.in_use.fetch_add(1, Ordering::AcqRel);
+
+                    return Ok(PooledResource {
+                        resource: Some(resource),
+                        pool: self.clone(),
+                    });
+                }
             }
 
             if self.in_use.load(Ordering::Relaxed) < self.max_size {
                 drop(resources);
                 self.in_use.fetch_add(1, Ordering::AcqRel);
+                let still_valid: Arc<AtomicBool> = Arc::new(true.into());
 
                 return Ok(PooledResource {
-                    resource: Some(RM::new_resource(&self.config)?),
+                    resource: Some((
+                        still_valid.clone(),
+                        RM::new_resource(&self.config, still_valid)?,
+                    )),
                     pool: self.clone(),
                 });
             }
@@ -178,7 +187,7 @@ where
         if let Ok(mut resources) = self.queue.lock() {
             loop {
                 while let Some(resource) = resources.pop() {
-                    RM::drop(resource);
+                    RM::drop(resource.1);
                 }
 
                 if self.in_use.load(Ordering::Relaxed) == 0 {

+ 5 - 1
crates/cdk-sqlite/src/common.rs

@@ -1,3 +1,4 @@
+use std::sync::atomic::AtomicBool;
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -23,7 +24,10 @@ impl ResourceManager for SqliteConnectionManager {
 
     type Error = rusqlite::Error;
 
-    fn new_resource(config: &Self::Config) -> Result<Self::Resource, pool::Error<Self::Error>> {
+    fn new_resource(
+        config: &Self::Config,
+        _still_valid: Arc<AtomicBool>,
+    ) -> Result<Self::Resource, pool::Error<Self::Error>> {
         let conn = if let Some(path) = config.path.as_ref() {
             Connection::open(path)?
         } else {