|
@@ -192,9 +192,12 @@ where
|
|
|
let time = Instant::now();
|
|
let time = Instant::now();
|
|
|
|
|
|
|
|
loop {
|
|
loop {
|
|
|
- if let Some((stale, resource)) = resources.pop() {
|
|
|
|
|
|
|
+ while let Some((stale, resource)) = resources.pop() {
|
|
|
if !stale.load(Ordering::SeqCst) {
|
|
if !stale.load(Ordering::SeqCst) {
|
|
|
- drop(resources);
|
|
|
|
|
|
|
+ // Increment counter BEFORE releasing the mutex to prevent race condition
|
|
|
|
|
+ // where another thread sees in_use < max_size and creates a duplicate connection.
|
|
|
|
|
+ // For in-memory SQLite, each connection is a separate database, so this race
|
|
|
|
|
+ // would cause some connections to miss migrations.
|
|
|
self.increment_connection_counter();
|
|
self.increment_connection_counter();
|
|
|
|
|
|
|
|
return Ok(PooledResource {
|
|
return Ok(PooledResource {
|
|
@@ -207,17 +210,26 @@ where
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if self.in_use.load(Ordering::Relaxed) < self.max_size {
|
|
if self.in_use.load(Ordering::Relaxed) < self.max_size {
|
|
|
- drop(resources);
|
|
|
|
|
- let stale: Arc<AtomicBool> = Arc::new(false.into());
|
|
|
|
|
- let new_resource = RM::new_resource(&self.config, stale.clone(), timeout)?;
|
|
|
|
|
|
|
+ // Increment counter BEFORE releasing the mutex to prevent race condition.
|
|
|
|
|
+ // This ensures other threads see the updated count and wait instead of
|
|
|
|
|
+ // creating additional connections beyond max_size.
|
|
|
self.increment_connection_counter();
|
|
self.increment_connection_counter();
|
|
|
-
|
|
|
|
|
- return Ok(PooledResource {
|
|
|
|
|
- resource: Some((stale, new_resource)),
|
|
|
|
|
- pool: self.clone(),
|
|
|
|
|
- #[cfg(feature = "prometheus")]
|
|
|
|
|
- start_time: Instant::now(),
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ let stale: Arc<AtomicBool> = Arc::new(false.into());
|
|
|
|
|
+ match RM::new_resource(&self.config, stale.clone(), timeout) {
|
|
|
|
|
+ Ok(new_resource) => {
|
|
|
|
|
+ return Ok(PooledResource {
|
|
|
|
|
+ resource: Some((stale, new_resource)),
|
|
|
|
|
+ pool: self.clone(),
|
|
|
|
|
+ #[cfg(feature = "prometheus")]
|
|
|
|
|
+ start_time: Instant::now(),
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ // If resource creation fails, decrement the counter we just incremented
|
|
|
|
|
+ self.in_use.fetch_sub(1, Ordering::AcqRel);
|
|
|
|
|
+ return Err(e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
resources = self
|
|
resources = self
|