//! Very simple connection pool, to avoid an external dependency on r2d2 and other crates. If this //! endup work it can be re-used in other parts of the project and may be promoted to its own //! generic crate use std::fmt::Debug; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; /// Pool error #[derive(thiserror::Error, Debug)] pub enum Error { /// Mutex Poison Error #[error("Internal: PoisonError")] Poison, /// Timeout error #[error("Timed out waiting for a resource")] Timeout, /// Internal database error #[error(transparent)] Resource(#[from] E), } /// Trait to manage resources pub trait ResourceManager: Debug { /// The resource to be pooled type Resource: Debug; /// The configuration that is needed in order to create the resource type Config: Clone + Debug; /// The error the resource may return when creating a new instance type Error: Debug; /// Creates a new resource with a given config fn new_resource( config: &Self::Config, still_valid: Arc, timeout: Duration, ) -> Result>; /// The object is dropped fn drop(_resource: Self::Resource) {} } /// Generic connection pool of resources R #[derive(Debug)] pub struct Pool where RM: ResourceManager, { config: RM::Config, queue: Mutex, RM::Resource)>>, in_use: AtomicUsize, max_size: usize, default_timeout: Duration, waiter: Condvar, } /// The pooled resource pub struct PooledResource where RM: ResourceManager, { resource: Option<(Arc, RM::Resource)>, pool: Arc>, } impl Drop for PooledResource where RM: ResourceManager, { fn drop(&mut self) { if let Some(resource) = self.resource.take() { let mut active_resource = self.pool.queue.lock().expect("active_resource"); active_resource.push(resource); self.pool.in_use.fetch_sub(1, Ordering::AcqRel); // Notify a waiting thread self.pool.waiter.notify_one(); } } } impl Deref for PooledResource where RM: ResourceManager, { type Target = RM::Resource; fn deref(&self) -> &Self::Target { &self.resource.as_ref().expect("resource already dropped").1 } } impl DerefMut for PooledResource where RM: ResourceManager, { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.resource.as_mut().expect("resource already dropped").1 } } impl Pool where RM: ResourceManager, { /// Creates a new pool pub fn new(config: RM::Config, max_size: usize, default_timeout: Duration) -> Arc { Arc::new(Self { config, queue: Default::default(), in_use: Default::default(), waiter: Default::default(), default_timeout, max_size, }) } /// Similar to get_timeout but uses the default timeout value. #[inline(always)] pub fn get(self: &Arc) -> Result, Error> { self.get_timeout(self.default_timeout) } /// Get a new resource or fail after timeout is reached. /// /// This function will return a free resource or create a new one if there is still room for it; /// otherwise, it will wait for a resource to be released for reuse. #[inline(always)] pub fn get_timeout( self: &Arc, timeout: Duration, ) -> Result, Error> { let mut resources = self.queue.lock().map_err(|_| Error::Poison)?; loop { if let Some(resource) = resources.pop() { if resource.0.load(Ordering::SeqCst) { drop(resources); self.in_use.fetch_add(1, Ordering::AcqRel); return Ok(PooledResource { resource: Some(resource), pool: self.clone(), }); } } if self.in_use.load(Ordering::Relaxed) < self.max_size { drop(resources); self.in_use.fetch_add(1, Ordering::AcqRel); let still_valid: Arc = Arc::new(true.into()); return Ok(PooledResource { resource: Some(( still_valid.clone(), RM::new_resource(&self.config, still_valid, timeout)?, )), pool: self.clone(), }); } resources = self .waiter .wait_timeout(resources, timeout) .map_err(|_| Error::Poison) .and_then(|(lock, timeout_result)| { if timeout_result.timed_out() { Err(Error::Timeout) } else { Ok(lock) } })?; } } } impl Drop for Pool where RM: ResourceManager, { fn drop(&mut self) { if let Ok(mut resources) = self.queue.lock() { loop { while let Some(resource) = resources.pop() { RM::drop(resource.1); } if self.in_use.load(Ordering::Relaxed) == 0 { break; } resources = if let Ok(resources) = self.waiter.wait(resources) { resources } else { break; }; } } } }