pool.rs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. //! Very simple connection pool, to avoid an external dependency on r2d2 and other crates. If this
  2. //! endup work it can be re-used in other parts of the project and may be promoted to its own
  3. //! generic crate
  4. use std::fmt::Debug;
  5. use std::ops::{Deref, DerefMut};
  6. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  7. use std::sync::{Arc, Condvar, Mutex};
  8. use std::time::Duration;
  9. use crate::database::DatabaseConnector;
  10. /// Pool error
  11. #[derive(Debug, thiserror::Error)]
  12. pub enum Error<E>
  13. where
  14. E: std::error::Error + Send + Sync + 'static,
  15. {
  16. /// Mutex Poison Error
  17. #[error("Internal: PoisonError")]
  18. Poison,
  19. /// Timeout error
  20. #[error("Timed out waiting for a resource")]
  21. Timeout,
  22. /// Internal database error
  23. #[error(transparent)]
  24. Resource(#[from] E),
  25. }
  26. /// Configuration
  27. pub trait DatabaseConfig: Clone + Debug + Send + Sync {
  28. /// Max resource sizes
  29. fn max_size(&self) -> usize;
  30. /// Default timeout
  31. fn default_timeout(&self) -> Duration;
  32. }
  33. /// Trait to manage resources
  34. pub trait DatabasePool: Debug {
  35. /// The resource to be pooled
  36. type Connection: DatabaseConnector;
  37. /// The configuration that is needed in order to create the resource
  38. type Config: DatabaseConfig;
  39. /// The error the resource may return when creating a new instance
  40. type Error: Debug + std::error::Error + Send + Sync + 'static;
  41. /// Creates a new resource with a given config.
  42. ///
  43. /// If `stale` is ever set to TRUE it is assumed the resource is no longer valid and it will be
  44. /// dropped.
  45. fn new_resource(
  46. config: &Self::Config,
  47. stale: Arc<AtomicBool>,
  48. timeout: Duration,
  49. ) -> Result<Self::Connection, Error<Self::Error>>;
  50. /// The object is dropped
  51. fn drop(_resource: Self::Connection) {}
  52. }
  53. /// Generic connection pool of resources R
  54. #[derive(Debug)]
  55. pub struct Pool<RM>
  56. where
  57. RM: DatabasePool,
  58. {
  59. config: RM::Config,
  60. queue: Mutex<Vec<(Arc<AtomicBool>, RM::Connection)>>,
  61. in_use: AtomicUsize,
  62. max_size: usize,
  63. default_timeout: Duration,
  64. waiter: Condvar,
  65. }
  66. /// The pooled resource
  67. pub struct PooledResource<RM>
  68. where
  69. RM: DatabasePool,
  70. {
  71. resource: Option<(Arc<AtomicBool>, RM::Connection)>,
  72. pool: Arc<Pool<RM>>,
  73. }
  74. impl<RM> Debug for PooledResource<RM>
  75. where
  76. RM: DatabasePool,
  77. {
  78. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  79. write!(f, "Resource: {:?}", self.resource)
  80. }
  81. }
  82. impl<RM> Drop for PooledResource<RM>
  83. where
  84. RM: DatabasePool,
  85. {
  86. fn drop(&mut self) {
  87. if let Some(resource) = self.resource.take() {
  88. let mut active_resource = self.pool.queue.lock().expect("active_resource");
  89. active_resource.push(resource);
  90. self.pool.in_use.fetch_sub(1, Ordering::AcqRel);
  91. // Notify a waiting thread
  92. self.pool.waiter.notify_one();
  93. }
  94. }
  95. }
  96. impl<RM> Deref for PooledResource<RM>
  97. where
  98. RM: DatabasePool,
  99. {
  100. type Target = RM::Connection;
  101. fn deref(&self) -> &Self::Target {
  102. &self.resource.as_ref().expect("resource already dropped").1
  103. }
  104. }
  105. impl<RM> DerefMut for PooledResource<RM>
  106. where
  107. RM: DatabasePool,
  108. {
  109. fn deref_mut(&mut self) -> &mut Self::Target {
  110. &mut self.resource.as_mut().expect("resource already dropped").1
  111. }
  112. }
  113. impl<RM> Pool<RM>
  114. where
  115. RM: DatabasePool,
  116. {
  117. /// Creates a new pool
  118. pub fn new(config: RM::Config) -> Arc<Self> {
  119. Arc::new(Self {
  120. default_timeout: config.default_timeout(),
  121. max_size: config.max_size(),
  122. config,
  123. queue: Default::default(),
  124. in_use: Default::default(),
  125. waiter: Default::default(),
  126. })
  127. }
  128. /// Similar to get_timeout but uses the default timeout value.
  129. #[inline(always)]
  130. pub fn get(self: &Arc<Self>) -> Result<PooledResource<RM>, Error<RM::Error>> {
  131. self.get_timeout(self.default_timeout)
  132. }
  133. /// Get a new resource or fail after timeout is reached.
  134. ///
  135. /// This function will return a free resource or create a new one if there is still room for it;
  136. /// otherwise, it will wait for a resource to be released for reuse.
  137. #[inline(always)]
  138. pub fn get_timeout(
  139. self: &Arc<Self>,
  140. timeout: Duration,
  141. ) -> Result<PooledResource<RM>, Error<RM::Error>> {
  142. let mut resources = self.queue.lock().map_err(|_| Error::Poison)?;
  143. loop {
  144. if let Some((stale, resource)) = resources.pop() {
  145. if !stale.load(Ordering::SeqCst) {
  146. drop(resources);
  147. self.in_use.fetch_add(1, Ordering::AcqRel);
  148. return Ok(PooledResource {
  149. resource: Some((stale, resource)),
  150. pool: self.clone(),
  151. });
  152. }
  153. }
  154. if self.in_use.load(Ordering::Relaxed) < self.max_size {
  155. drop(resources);
  156. self.in_use.fetch_add(1, Ordering::AcqRel);
  157. let stale: Arc<AtomicBool> = Arc::new(false.into());
  158. return Ok(PooledResource {
  159. resource: Some((
  160. stale.clone(),
  161. RM::new_resource(&self.config, stale, timeout)?,
  162. )),
  163. pool: self.clone(),
  164. });
  165. }
  166. resources = self
  167. .waiter
  168. .wait_timeout(resources, timeout)
  169. .map_err(|_| Error::Poison)
  170. .and_then(|(lock, timeout_result)| {
  171. if timeout_result.timed_out() {
  172. Err(Error::Timeout)
  173. } else {
  174. Ok(lock)
  175. }
  176. })?;
  177. }
  178. }
  179. }
  180. impl<RM> Drop for Pool<RM>
  181. where
  182. RM: DatabasePool,
  183. {
  184. fn drop(&mut self) {
  185. if let Ok(mut resources) = self.queue.lock() {
  186. loop {
  187. while let Some(resource) = resources.pop() {
  188. RM::drop(resource.1);
  189. }
  190. if self.in_use.load(Ordering::Relaxed) == 0 {
  191. break;
  192. }
  193. resources = if let Ok(resources) = self.waiter.wait(resources) {
  194. resources
  195. } else {
  196. break;
  197. };
  198. }
  199. }
  200. }
  201. }