|
@@ -54,6 +54,7 @@ impl Debug for SslMode {
|
|
|
#[derive(Clone, Debug)]
|
|
#[derive(Clone, Debug)]
|
|
|
pub struct PgConfig {
|
|
pub struct PgConfig {
|
|
|
url: String,
|
|
url: String,
|
|
|
|
|
+ schema: Option<String>,
|
|
|
tls: SslMode,
|
|
tls: SslMode,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -67,8 +68,29 @@ impl DatabaseConfig for PgConfig {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+impl PgConfig {
|
|
|
|
|
+ /// strip schema from the connection string
|
|
|
|
|
+ fn strip_schema(input: &str) -> (Option<String>, String) {
|
|
|
|
|
+ let mut schema: Option<String> = None;
|
|
|
|
|
+
|
|
|
|
|
+ // Split by whitespace
|
|
|
|
|
+ let mut parts = Vec::new();
|
|
|
|
|
+ for token in input.split_whitespace() {
|
|
|
|
|
+ if let Some(rest) = token.strip_prefix("schema=") {
|
|
|
|
|
+ schema = Some(rest.to_string());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ parts.push(token);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let cleaned = parts.join(" ");
|
|
|
|
|
+ (schema, cleaned)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
impl From<&str> for PgConfig {
|
|
impl From<&str> for PgConfig {
|
|
|
fn from(conn_str: &str) -> Self {
|
|
fn from(conn_str: &str) -> Self {
|
|
|
|
|
+ let (schema, conn_str) = Self::strip_schema(conn_str);
|
|
|
fn build_tls(accept_invalid_certs: bool, accept_invalid_hostnames: bool) -> SslMode {
|
|
fn build_tls(accept_invalid_certs: bool, accept_invalid_hostnames: bool) -> SslMode {
|
|
|
let mut builder = TlsConnector::builder();
|
|
let mut builder = TlsConnector::builder();
|
|
|
if accept_invalid_certs {
|
|
if accept_invalid_certs {
|
|
@@ -105,6 +127,7 @@ impl From<&str> for PgConfig {
|
|
|
|
|
|
|
|
PgConfig {
|
|
PgConfig {
|
|
|
url: conn_str.to_owned(),
|
|
url: conn_str.to_owned(),
|
|
|
|
|
+ schema,
|
|
|
tls,
|
|
tls,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -149,6 +172,17 @@ impl PostgresConnection {
|
|
|
let result_clone = result.clone();
|
|
let result_clone = result.clone();
|
|
|
let notify_clone = notify.clone();
|
|
let notify_clone = notify.clone();
|
|
|
|
|
|
|
|
|
|
+ async fn select_schema(conn: &Client, schema: &str) -> Result<(), Error> {
|
|
|
|
|
+ conn.batch_execute(&format!(
|
|
|
|
|
+ r#"
|
|
|
|
|
+ CREATE SCHEMA IF NOT EXISTS "{schema}";
|
|
|
|
|
+ SET search_path TO "{schema}"
|
|
|
|
|
+ "#
|
|
|
|
|
+ ))
|
|
|
|
|
+ .await
|
|
|
|
|
+ .map_err(|e| Error::Database(Box::new(e)))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
tokio::spawn(async move {
|
|
tokio::spawn(async move {
|
|
|
match config.tls {
|
|
match config.tls {
|
|
|
SslMode::NoTls(tls) => {
|
|
SslMode::NoTls(tls) => {
|
|
@@ -163,11 +197,21 @@ impl PostgresConnection {
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
+ let still_valid_for_spawn = still_valid.clone();
|
|
|
tokio::spawn(async move {
|
|
tokio::spawn(async move {
|
|
|
let _ = connection.await;
|
|
let _ = connection.await;
|
|
|
- still_valid.store(false, std::sync::atomic::Ordering::Release);
|
|
|
|
|
|
|
+ still_valid_for_spawn.store(false, std::sync::atomic::Ordering::Release);
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
|
|
+ if let Some(schema) = config.schema.as_ref() {
|
|
|
|
|
+ if let Err(err) = select_schema(&client, schema).await {
|
|
|
|
|
+ *error_clone.lock().await = Some(err);
|
|
|
|
|
+ still_valid.store(false, std::sync::atomic::Ordering::Release);
|
|
|
|
|
+ notify_clone.notify_waiters();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
let _ = result_clone.set(client);
|
|
let _ = result_clone.set(client);
|
|
|
notify_clone.notify_waiters();
|
|
notify_clone.notify_waiters();
|
|
|
}
|
|
}
|
|
@@ -183,11 +227,21 @@ impl PostgresConnection {
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
+ let still_valid_for_spawn = still_valid.clone();
|
|
|
tokio::spawn(async move {
|
|
tokio::spawn(async move {
|
|
|
let _ = connection.await;
|
|
let _ = connection.await;
|
|
|
- still_valid.store(false, std::sync::atomic::Ordering::Release);
|
|
|
|
|
|
|
+ still_valid_for_spawn.store(false, std::sync::atomic::Ordering::Release);
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
|
|
+ if let Some(schema) = config.schema.as_ref() {
|
|
|
|
|
+ if let Err(err) = select_schema(&client, schema).await {
|
|
|
|
|
+ *error_clone.lock().await = Some(err);
|
|
|
|
|
+ still_valid.store(false, std::sync::atomic::Ordering::Release);
|
|
|
|
|
+ notify_clone.notify_waiters();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
let _ = result_clone.set(client);
|
|
let _ = result_clone.set(client);
|
|
|
notify_clone.notify_waiters();
|
|
notify_clone.notify_waiters();
|
|
|
}
|
|
}
|
|
@@ -275,22 +329,19 @@ pub type WalletPgDatabase = SQLWalletDatabase<PgConnectionPool>;
|
|
|
#[cfg(test)]
|
|
#[cfg(test)]
|
|
|
mod test {
|
|
mod test {
|
|
|
use cdk_common::mint_db_test;
|
|
use cdk_common::mint_db_test;
|
|
|
- use once_cell::sync::Lazy;
|
|
|
|
|
- use tokio::sync::Mutex;
|
|
|
|
|
|
|
|
|
|
use super::*;
|
|
use super::*;
|
|
|
|
|
|
|
|
- static MIGRATION_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
|
|
|
|
-
|
|
|
|
|
- async fn provide_db() -> MintPgDatabase {
|
|
|
|
|
- let m = MIGRATION_LOCK.lock().await;
|
|
|
|
|
|
|
+ async fn provide_db(test_id: String) -> MintPgDatabase {
|
|
|
let db_url = std::env::var("CDK_MINTD_DATABASE_URL")
|
|
let db_url = std::env::var("CDK_MINTD_DATABASE_URL")
|
|
|
.or_else(|_| std::env::var("PG_DB_URL")) // Fallback for compatibility
|
|
.or_else(|_| std::env::var("PG_DB_URL")) // Fallback for compatibility
|
|
|
.unwrap_or("host=localhost user=test password=test dbname=testdb port=5433".to_owned());
|
|
.unwrap_or("host=localhost user=test password=test dbname=testdb port=5433".to_owned());
|
|
|
|
|
+
|
|
|
|
|
+ let db_url = format!("{db_url} schema={test_id}");
|
|
|
|
|
+
|
|
|
let db = MintPgDatabase::new(db_url.as_str())
|
|
let db = MintPgDatabase::new(db_url.as_str())
|
|
|
.await
|
|
.await
|
|
|
.expect("database");
|
|
.expect("database");
|
|
|
- drop(m);
|
|
|
|
|
db
|
|
db
|
|
|
}
|
|
}
|
|
|
|
|
|