pool.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. //! Very simple connection pool, to avoid an external dependency on r2d2 and other crates. If this
  2. //! endup work it can be re-used in other parts of the project and may be promoted to its own
  3. //! generic crate
  4. use std::fmt::Debug;
  5. use std::ops::{Deref, DerefMut};
  6. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  7. use std::sync::{Arc, Condvar, Mutex};
  8. use std::time::Duration;
  9. use tokio::time::Instant;
  10. use crate::database::DatabaseConnector;
  11. /// Pool error
  12. #[derive(Debug, thiserror::Error)]
  13. pub enum Error<E>
  14. where
  15. E: std::error::Error + Send + Sync + 'static,
  16. {
  17. /// Mutex Poison Error
  18. #[error("Internal: PoisonError")]
  19. Poison,
  20. /// Timeout error
  21. #[error("Timed out waiting for a resource")]
  22. Timeout,
  23. /// Internal database error
  24. #[error(transparent)]
  25. Resource(#[from] E),
  26. }
  27. /// Configuration
  28. pub trait DatabaseConfig: Clone + Debug + Send + Sync {
  29. /// Max resource sizes
  30. fn max_size(&self) -> usize;
  31. /// Default timeout
  32. fn default_timeout(&self) -> Duration;
  33. }
  34. /// Trait to manage resources
  35. pub trait DatabasePool: Debug {
  36. /// The resource to be pooled
  37. type Connection: DatabaseConnector;
  38. /// The configuration that is needed in order to create the resource
  39. type Config: DatabaseConfig;
  40. /// The error the resource may return when creating a new instance
  41. type Error: Debug + std::error::Error + Send + Sync + 'static;
  42. /// Creates a new resource with a given config.
  43. ///
  44. /// If `stale` is ever set to TRUE it is assumed the resource is no longer valid and it will be
  45. /// dropped.
  46. fn new_resource(
  47. config: &Self::Config,
  48. stale: Arc<AtomicBool>,
  49. timeout: Duration,
  50. ) -> Result<Self::Connection, Error<Self::Error>>;
  51. /// The object is dropped
  52. fn drop(_resource: Self::Connection) {}
  53. }
  54. /// Generic connection pool of resources R
  55. #[derive(Debug)]
  56. pub struct Pool<RM>
  57. where
  58. RM: DatabasePool,
  59. {
  60. config: RM::Config,
  61. queue: Mutex<Vec<(Arc<AtomicBool>, RM::Connection)>>,
  62. in_use: AtomicUsize,
  63. max_size: usize,
  64. default_timeout: Duration,
  65. waiter: Condvar,
  66. }
  67. /// The pooled resource
  68. pub struct PooledResource<RM>
  69. where
  70. RM: DatabasePool,
  71. {
  72. resource: Option<(Arc<AtomicBool>, RM::Connection)>,
  73. pool: Arc<Pool<RM>>,
  74. }
  75. impl<RM> Debug for PooledResource<RM>
  76. where
  77. RM: DatabasePool,
  78. {
  79. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  80. write!(f, "Resource: {:?}", self.resource)
  81. }
  82. }
  83. impl<RM> Drop for PooledResource<RM>
  84. where
  85. RM: DatabasePool,
  86. {
  87. fn drop(&mut self) {
  88. if let Some(resource) = self.resource.take() {
  89. let mut active_resource = self.pool.queue.lock().expect("active_resource");
  90. active_resource.push(resource);
  91. self.pool.in_use.fetch_sub(1, Ordering::AcqRel);
  92. // Notify a waiting thread
  93. self.pool.waiter.notify_one();
  94. }
  95. }
  96. }
  97. impl<RM> Deref for PooledResource<RM>
  98. where
  99. RM: DatabasePool,
  100. {
  101. type Target = RM::Connection;
  102. fn deref(&self) -> &Self::Target {
  103. &self.resource.as_ref().expect("resource already dropped").1
  104. }
  105. }
  106. impl<RM> DerefMut for PooledResource<RM>
  107. where
  108. RM: DatabasePool,
  109. {
  110. fn deref_mut(&mut self) -> &mut Self::Target {
  111. &mut self.resource.as_mut().expect("resource already dropped").1
  112. }
  113. }
  114. impl<RM> Pool<RM>
  115. where
  116. RM: DatabasePool,
  117. {
  118. /// Creates a new pool
  119. pub fn new(config: RM::Config) -> Arc<Self> {
  120. Arc::new(Self {
  121. default_timeout: config.default_timeout(),
  122. max_size: config.max_size(),
  123. config,
  124. queue: Default::default(),
  125. in_use: Default::default(),
  126. waiter: Default::default(),
  127. })
  128. }
  129. /// Similar to get_timeout but uses the default timeout value.
  130. #[inline(always)]
  131. pub fn get(self: &Arc<Self>) -> Result<PooledResource<RM>, Error<RM::Error>> {
  132. self.get_timeout(self.default_timeout)
  133. }
  134. /// Get a new resource or fail after timeout is reached.
  135. ///
  136. /// This function will return a free resource or create a new one if there is still room for it;
  137. /// otherwise, it will wait for a resource to be released for reuse.
  138. #[inline(always)]
  139. pub fn get_timeout(
  140. self: &Arc<Self>,
  141. timeout: Duration,
  142. ) -> Result<PooledResource<RM>, Error<RM::Error>> {
  143. let mut resources = self.queue.lock().map_err(|_| Error::Poison)?;
  144. let time = Instant::now();
  145. loop {
  146. if let Some((stale, resource)) = resources.pop() {
  147. if !stale.load(Ordering::SeqCst) {
  148. drop(resources);
  149. self.in_use.fetch_add(1, Ordering::AcqRel);
  150. return Ok(PooledResource {
  151. resource: Some((stale, resource)),
  152. pool: self.clone(),
  153. });
  154. }
  155. }
  156. if self.in_use.load(Ordering::Relaxed) < self.max_size {
  157. drop(resources);
  158. self.in_use.fetch_add(1, Ordering::AcqRel);
  159. let stale: Arc<AtomicBool> = Arc::new(false.into());
  160. return Ok(PooledResource {
  161. resource: Some((
  162. stale.clone(),
  163. RM::new_resource(&self.config, stale, timeout)?,
  164. )),
  165. pool: self.clone(),
  166. });
  167. }
  168. resources = self
  169. .waiter
  170. .wait_timeout(resources, timeout)
  171. .map_err(|_| Error::Poison)
  172. .and_then(|(lock, timeout_result)| {
  173. if timeout_result.timed_out() {
  174. tracing::warn!(
  175. "Timeout waiting for the resource (pool size: {}). Waited {} ms",
  176. self.max_size,
  177. time.elapsed().as_millis()
  178. );
  179. Err(Error::Timeout)
  180. } else {
  181. Ok(lock)
  182. }
  183. })?;
  184. }
  185. }
  186. }
  187. impl<RM> Drop for Pool<RM>
  188. where
  189. RM: DatabasePool,
  190. {
  191. fn drop(&mut self) {
  192. if let Ok(mut resources) = self.queue.lock() {
  193. loop {
  194. while let Some(resource) = resources.pop() {
  195. RM::drop(resource.1);
  196. }
  197. if self.in_use.load(Ordering::Relaxed) == 0 {
  198. break;
  199. }
  200. resources = if let Ok(resources) = self.waiter.wait(resources) {
  201. resources
  202. } else {
  203. break;
  204. };
  205. }
  206. }
  207. }
  208. }