pool.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. //! Very simple connection pool, to avoid an external dependency on r2d2 and other crates. If this
  2. //! endup work it can be re-used in other parts of the project and may be promoted to its own
  3. //! generic crate
  4. use std::fmt::Debug;
  5. use std::ops::{Deref, DerefMut};
  6. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  7. use std::sync::{Arc, Condvar, Mutex};
  8. use std::time::Duration;
  9. /// Pool error
  10. #[derive(thiserror::Error, Debug)]
  11. pub enum Error<E> {
  12. /// Mutex Poison Error
  13. #[error("Internal: PoisonError")]
  14. Poison,
  15. /// Timeout error
  16. #[error("Timed out waiting for a resource")]
  17. Timeout,
  18. /// Internal database error
  19. #[error(transparent)]
  20. Resource(#[from] E),
  21. }
  22. /// Trait to manage resources
  23. pub trait ResourceManager: Debug {
  24. /// The resource to be pooled
  25. type Resource: Debug;
  26. /// The configuration that is needed in order to create the resource
  27. type Config: Clone + Debug;
  28. /// The error the resource may return when creating a new instance
  29. type Error: Debug;
  30. /// Creates a new resource with a given config
  31. fn new_resource(
  32. config: &Self::Config,
  33. still_valid: Arc<AtomicBool>,
  34. timeout: Duration,
  35. ) -> Result<Self::Resource, Error<Self::Error>>;
  36. /// The object is dropped
  37. fn drop(_resource: Self::Resource) {}
  38. }
  39. /// Generic connection pool of resources R
  40. #[derive(Debug)]
  41. pub struct Pool<RM>
  42. where
  43. RM: ResourceManager,
  44. {
  45. config: RM::Config,
  46. queue: Mutex<Vec<(Arc<AtomicBool>, RM::Resource)>>,
  47. in_use: AtomicUsize,
  48. max_size: usize,
  49. default_timeout: Duration,
  50. waiter: Condvar,
  51. }
  52. /// The pooled resource
  53. pub struct PooledResource<RM>
  54. where
  55. RM: ResourceManager,
  56. {
  57. resource: Option<(Arc<AtomicBool>, RM::Resource)>,
  58. pool: Arc<Pool<RM>>,
  59. }
  60. impl<RM> Drop for PooledResource<RM>
  61. where
  62. RM: ResourceManager,
  63. {
  64. fn drop(&mut self) {
  65. if let Some(resource) = self.resource.take() {
  66. let mut active_resource = self.pool.queue.lock().expect("active_resource");
  67. active_resource.push(resource);
  68. self.pool.in_use.fetch_sub(1, Ordering::AcqRel);
  69. // Notify a waiting thread
  70. self.pool.waiter.notify_one();
  71. }
  72. }
  73. }
  74. impl<RM> Deref for PooledResource<RM>
  75. where
  76. RM: ResourceManager,
  77. {
  78. type Target = RM::Resource;
  79. fn deref(&self) -> &Self::Target {
  80. &self.resource.as_ref().expect("resource already dropped").1
  81. }
  82. }
  83. impl<RM> DerefMut for PooledResource<RM>
  84. where
  85. RM: ResourceManager,
  86. {
  87. fn deref_mut(&mut self) -> &mut Self::Target {
  88. &mut self.resource.as_mut().expect("resource already dropped").1
  89. }
  90. }
  91. impl<RM> Pool<RM>
  92. where
  93. RM: ResourceManager,
  94. {
  95. /// Creates a new pool
  96. pub fn new(config: RM::Config, max_size: usize, default_timeout: Duration) -> Arc<Self> {
  97. Arc::new(Self {
  98. config,
  99. queue: Default::default(),
  100. in_use: Default::default(),
  101. waiter: Default::default(),
  102. default_timeout,
  103. max_size,
  104. })
  105. }
  106. /// Similar to get_timeout but uses the default timeout value.
  107. #[inline(always)]
  108. pub fn get(self: &Arc<Self>) -> Result<PooledResource<RM>, Error<RM::Error>> {
  109. self.get_timeout(self.default_timeout)
  110. }
  111. /// Get a new resource or fail after timeout is reached.
  112. ///
  113. /// This function will return a free resource or create a new one if there is still room for it;
  114. /// otherwise, it will wait for a resource to be released for reuse.
  115. #[inline(always)]
  116. pub fn get_timeout(
  117. self: &Arc<Self>,
  118. timeout: Duration,
  119. ) -> Result<PooledResource<RM>, Error<RM::Error>> {
  120. let mut resources = self.queue.lock().map_err(|_| Error::Poison)?;
  121. loop {
  122. if let Some(resource) = resources.pop() {
  123. if resource.0.load(Ordering::SeqCst) {
  124. drop(resources);
  125. self.in_use.fetch_add(1, Ordering::AcqRel);
  126. return Ok(PooledResource {
  127. resource: Some(resource),
  128. pool: self.clone(),
  129. });
  130. }
  131. }
  132. if self.in_use.load(Ordering::Relaxed) < self.max_size {
  133. drop(resources);
  134. self.in_use.fetch_add(1, Ordering::AcqRel);
  135. let still_valid: Arc<AtomicBool> = Arc::new(true.into());
  136. return Ok(PooledResource {
  137. resource: Some((
  138. still_valid.clone(),
  139. RM::new_resource(&self.config, still_valid, timeout)?,
  140. )),
  141. pool: self.clone(),
  142. });
  143. }
  144. resources = self
  145. .waiter
  146. .wait_timeout(resources, timeout)
  147. .map_err(|_| Error::Poison)
  148. .and_then(|(lock, timeout_result)| {
  149. if timeout_result.timed_out() {
  150. Err(Error::Timeout)
  151. } else {
  152. Ok(lock)
  153. }
  154. })?;
  155. }
  156. }
  157. }
  158. impl<RM> Drop for Pool<RM>
  159. where
  160. RM: ResourceManager,
  161. {
  162. fn drop(&mut self) {
  163. if let Ok(mut resources) = self.queue.lock() {
  164. loop {
  165. while let Some(resource) = resources.pop() {
  166. RM::drop(resource.1);
  167. }
  168. if self.in_use.load(Ordering::Relaxed) == 0 {
  169. break;
  170. }
  171. resources = if let Ok(resources) = self.waiter.wait(resources) {
  172. resources
  173. } else {
  174. break;
  175. };
  176. }
  177. }
  178. }
  179. }