pool.rs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. //! Very simple connection pool, to avoid an external dependency on r2d2 and other crates. If this
  2. //! endup work it can be re-used in other parts of the project and may be promoted to its own
  3. //! generic crate
  4. use std::fmt::Debug;
  5. use std::ops::{Deref, DerefMut};
  6. use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
  7. use std::sync::{Arc, Condvar, Mutex};
  8. use std::time::{Duration, Instant};
  9. #[cfg(feature = "prometheus")]
  10. use cdk_prometheus::metrics::METRICS;
  11. use crate::database::DatabaseConnector;
  12. /// Pool error
  13. #[derive(Debug, thiserror::Error)]
  14. pub enum Error<E>
  15. where
  16. E: std::error::Error + Send + Sync + 'static,
  17. {
  18. /// Mutex Poison Error
  19. #[error("Internal: PoisonError")]
  20. Poison,
  21. /// Timeout error
  22. #[error("Timed out waiting for a resource")]
  23. Timeout,
  24. /// Internal database error
  25. #[error(transparent)]
  26. Resource(#[from] E),
  27. }
  28. /// Configuration
  29. pub trait DatabaseConfig: Clone + Debug + Send + Sync {
  30. /// Max resource sizes
  31. fn max_size(&self) -> usize;
  32. /// Default timeout
  33. fn default_timeout(&self) -> Duration;
  34. }
  35. /// Trait to manage resources
  36. pub trait DatabasePool: Debug {
  37. /// The resource to be pooled
  38. type Connection: DatabaseConnector;
  39. /// The configuration that is needed in order to create the resource
  40. type Config: DatabaseConfig;
  41. /// The error the resource may return when creating a new instance
  42. type Error: Debug + std::error::Error + Send + Sync + 'static;
  43. /// Creates a new resource with a given config.
  44. ///
  45. /// If `stale` is ever set to TRUE it is assumed the resource is no longer valid and it will be
  46. /// dropped.
  47. fn new_resource(
  48. config: &Self::Config,
  49. stale: Arc<AtomicBool>,
  50. timeout: Duration,
  51. ) -> Result<Self::Connection, Error<Self::Error>>;
  52. /// The object is dropped
  53. fn drop(_resource: Self::Connection) {}
  54. }
  55. /// Generic connection pool of resources R
  56. #[derive(Debug)]
  57. pub struct Pool<RM>
  58. where
  59. RM: DatabasePool,
  60. {
  61. config: RM::Config,
  62. queue: Mutex<Vec<(Arc<AtomicBool>, RM::Connection)>>,
  63. in_use: AtomicUsize,
  64. max_size: usize,
  65. default_timeout: Duration,
  66. waiter: Condvar,
  67. }
  68. /// The pooled resource
  69. pub struct PooledResource<RM>
  70. where
  71. RM: DatabasePool,
  72. {
  73. resource: Option<(Arc<AtomicBool>, RM::Connection)>,
  74. pool: Arc<Pool<RM>>,
  75. #[cfg(feature = "prometheus")]
  76. start_time: Instant,
  77. }
  78. impl<RM> Debug for PooledResource<RM>
  79. where
  80. RM: DatabasePool,
  81. {
  82. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  83. write!(f, "Resource: {:?}", self.resource)
  84. }
  85. }
  86. impl<RM> Drop for PooledResource<RM>
  87. where
  88. RM: DatabasePool,
  89. {
  90. fn drop(&mut self) {
  91. if let Some(resource) = self.resource.take() {
  92. let mut active_resource = self.pool.queue.lock().expect("active_resource");
  93. active_resource.push(resource);
  94. let _in_use = self.pool.in_use.fetch_sub(1, Ordering::AcqRel);
  95. #[cfg(feature = "prometheus")]
  96. {
  97. METRICS.set_db_connections_active(_in_use as i64);
  98. let duration = self.start_time.elapsed().as_secs_f64();
  99. METRICS.record_db_operation(duration, "drop");
  100. }
  101. // Notify a waiting thread
  102. self.pool.waiter.notify_one();
  103. }
  104. }
  105. }
  106. impl<RM> Deref for PooledResource<RM>
  107. where
  108. RM: DatabasePool,
  109. {
  110. type Target = RM::Connection;
  111. fn deref(&self) -> &Self::Target {
  112. &self.resource.as_ref().expect("resource already dropped").1
  113. }
  114. }
  115. impl<RM> DerefMut for PooledResource<RM>
  116. where
  117. RM: DatabasePool,
  118. {
  119. fn deref_mut(&mut self) -> &mut Self::Target {
  120. &mut self.resource.as_mut().expect("resource already dropped").1
  121. }
  122. }
  123. impl<RM> Pool<RM>
  124. where
  125. RM: DatabasePool,
  126. {
  127. /// Creates a new pool
  128. pub fn new(config: RM::Config) -> Arc<Self> {
  129. Arc::new(Self {
  130. default_timeout: config.default_timeout(),
  131. max_size: config.max_size(),
  132. config,
  133. queue: Default::default(),
  134. in_use: Default::default(),
  135. waiter: Default::default(),
  136. })
  137. }
  138. /// Similar to get_timeout but uses the default timeout value.
  139. #[inline(always)]
  140. pub fn get(self: &Arc<Self>) -> Result<PooledResource<RM>, Error<RM::Error>> {
  141. self.get_timeout(self.default_timeout)
  142. }
  143. /// Increments the in_use connection counter and updates the metric
  144. fn increment_connection_counter(&self) -> usize {
  145. let in_use = self.in_use.fetch_add(1, Ordering::AcqRel);
  146. #[cfg(feature = "prometheus")]
  147. {
  148. METRICS.set_db_connections_active(in_use as i64);
  149. }
  150. in_use
  151. }
  152. /// Get a new resource or fail after timeout is reached.
  153. ///
  154. /// This function will return a free resource or create a new one if there is still room for it;
  155. /// otherwise, it will wait for a resource to be released for reuse.
  156. #[inline(always)]
  157. pub fn get_timeout(
  158. self: &Arc<Self>,
  159. timeout: Duration,
  160. ) -> Result<PooledResource<RM>, Error<RM::Error>> {
  161. let mut resources = self.queue.lock().map_err(|_| Error::Poison)?;
  162. let time = Instant::now();
  163. loop {
  164. if let Some((stale, resource)) = resources.pop() {
  165. if !stale.load(Ordering::SeqCst) {
  166. drop(resources);
  167. self.increment_connection_counter();
  168. return Ok(PooledResource {
  169. resource: Some((stale, resource)),
  170. pool: self.clone(),
  171. #[cfg(feature = "prometheus")]
  172. start_time: Instant::now(),
  173. });
  174. }
  175. }
  176. if self.in_use.load(Ordering::Relaxed) < self.max_size {
  177. drop(resources);
  178. self.increment_connection_counter();
  179. let stale: Arc<AtomicBool> = Arc::new(false.into());
  180. return Ok(PooledResource {
  181. resource: Some((
  182. stale.clone(),
  183. RM::new_resource(&self.config, stale, timeout)?,
  184. )),
  185. pool: self.clone(),
  186. #[cfg(feature = "prometheus")]
  187. start_time: Instant::now(),
  188. });
  189. }
  190. resources = self
  191. .waiter
  192. .wait_timeout(resources, timeout)
  193. .map_err(|_| Error::Poison)
  194. .and_then(|(lock, timeout_result)| {
  195. if timeout_result.timed_out() {
  196. tracing::warn!(
  197. "Timeout waiting for the resource (pool size: {}). Waited {} ms",
  198. self.max_size,
  199. time.elapsed().as_millis()
  200. );
  201. Err(Error::Timeout)
  202. } else {
  203. Ok(lock)
  204. }
  205. })?;
  206. }
  207. }
  208. }
  209. impl<RM> Drop for Pool<RM>
  210. where
  211. RM: DatabasePool,
  212. {
  213. fn drop(&mut self) {
  214. if let Ok(mut resources) = self.queue.lock() {
  215. loop {
  216. while let Some(resource) = resources.pop() {
  217. RM::drop(resource.1);
  218. }
  219. if self.in_use.load(Ordering::Relaxed) == 0 {
  220. break;
  221. }
  222. resources = if let Ok(resources) = self.waiter.wait(resources) {
  223. resources
  224. } else {
  225. break;
  226. };
  227. }
  228. }
  229. }
  230. }