Bladeren bron

Add PostgreSQL support

Cesar Rodas 3 maanden geleden
bovenliggende
commit
5327b9ae30

+ 35 - 0
crates/cdk-postgres/Cargo.toml

@@ -0,0 +1,35 @@
+[package]
+name = "cdk-postgres"
+version.workspace = true
+edition.workspace = true
+authors = ["CDK Developers"]
+description = "PostgreSQL storage backend for CDK"
+license.workspace = true
+homepage = "https://github.com/cashubtc/cdk"
+repository = "https://github.com/cashubtc/cdk.git"
+rust-version.workspace = true                            # MSRV
+readme = "README.md"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+[features]
+default = ["mint", "wallet", "auth"]
+mint = ["cdk-common/mint", "cdk-sql-common/mint"]
+wallet = ["cdk-common/wallet", "cdk-sql-common/wallet"]
+auth = ["cdk-common/auth", "cdk-sql-common/auth"]
+
+[dependencies]
+async-trait.workspace = true
+cdk-common = { workspace = true, features = ["test"] }
+bitcoin.workspace = true
+cdk-sql-common = { workspace = true }
+thiserror.workspace = true
+tokio = { workspace = true, features = ["rt-multi-thread"] }
+tracing.workspace = true
+serde.workspace = true
+serde_json.workspace = true
+lightning-invoice.workspace = true
+uuid.workspace = true
+tokio-postgres = "0.7.13"
+futures-util = "0.3.31"
+postgres-native-tls = "0.5.1"
+once_cell.workspace = true

+ 135 - 0
crates/cdk-postgres/src/db.rs

@@ -0,0 +1,135 @@
+use cdk_common::database::Error;
+use cdk_sql_common::stmt::{Column, Statement};
+use futures_util::{pin_mut, TryStreamExt};
+use tokio_postgres::Client;
+
+use crate::value::PgValue;
+
+#[inline(always)]
+pub async fn pg_batch(conn: &Client, statement: Statement) -> Result<(), Error> {
+    let (sql, _placeholder_values) = statement.to_sql()?;
+
+    conn.batch_execute(&sql)
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))
+}
+
+#[inline(always)]
+pub async fn pg_execute(conn: &Client, statement: Statement) -> Result<usize, Error> {
+    let (sql, placeholder_values) = statement.to_sql()?;
+    let prepared_statement = conn
+        .prepare(&sql)
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?;
+
+    conn.execute_raw(
+        &prepared_statement,
+        placeholder_values
+            .iter()
+            .map(|x| x.into())
+            .collect::<Vec<PgValue>>(),
+    )
+    .await
+    .map_err(|e| Error::Database(Box::new(e)))
+    .map(|x| x as usize)
+}
+
+#[inline(always)]
+pub async fn pg_fetch_one(
+    conn: &Client,
+    statement: Statement,
+) -> Result<Option<Vec<Column>>, Error> {
+    let (sql, placeholder_values) = statement.to_sql()?;
+    let prepared_statement = conn
+        .prepare(&sql)
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?;
+
+    let stream = conn
+        .query_raw(
+            &prepared_statement,
+            placeholder_values
+                .iter()
+                .map(|x| x.into())
+                .collect::<Vec<PgValue>>(),
+        )
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?;
+
+    pin_mut!(stream);
+
+    Ok(stream
+        .try_next()
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?
+        .map(|row| {
+            (0..row.len())
+                .map(|i| row.get::<_, PgValue>(i).into())
+                .collect::<Vec<_>>()
+        }))
+}
+
+#[inline(always)]
+pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
+    let (sql, placeholder_values) = statement.to_sql()?;
+    let prepared_statement = conn
+        .prepare(&sql)
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?;
+
+    let stream = conn
+        .query_raw(
+            &prepared_statement,
+            placeholder_values
+                .iter()
+                .map(|x| x.into())
+                .collect::<Vec<PgValue>>(),
+        )
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?;
+
+    pin_mut!(stream);
+
+    let mut rows = vec![];
+    while let Some(row) = stream
+        .try_next()
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?
+    {
+        rows.push(
+            (0..row.len())
+                .map(|i| row.get::<_, PgValue>(i).into())
+                .collect::<Vec<_>>(),
+        );
+    }
+
+    Ok(rows)
+}
+
+#[inline(always)]
+pub async fn gn_pluck(conn: &Client, statement: Statement) -> Result<Option<Column>, Error> {
+    let (sql, placeholder_values) = statement.to_sql()?;
+    let prepared_statement = conn
+        .prepare(&sql)
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?;
+
+    let stream = conn
+        .query_raw(
+            &prepared_statement,
+            placeholder_values
+                .iter()
+                .map(|x| x.into())
+                .collect::<Vec<PgValue>>(),
+        )
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?;
+
+    pin_mut!(stream);
+
+    Ok(stream
+        .try_next()
+        .await
+        .map_err(|e| Error::Database(Box::new(e)))?
+        .map(|row| row.get::<_, PgValue>(0).into()))
+}

+ 432 - 0
crates/cdk-postgres/src/lib.rs

@@ -0,0 +1,432 @@
+use std::fmt::Debug;
+use std::marker::PhantomData;
+use std::sync::atomic::AtomicBool;
+use std::sync::{Arc, OnceLock};
+use std::time::Duration;
+
+use cdk_common::database::Error;
+use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
+use cdk_sql_common::mint::SQLMintAuthDatabase;
+use cdk_sql_common::pool::{Pool, PooledResource, ResourceManager};
+use cdk_sql_common::stmt::{Column, Statement};
+use cdk_sql_common::{SQLMintDatabase, SQLWalletDatabase};
+use db::{gn_pluck, pg_batch, pg_execute, pg_fetch_all, pg_fetch_one};
+use tokio::sync::{Mutex, Notify};
+use tokio::time::timeout;
+use tokio_postgres::{connect, Client, Error as PgError, NoTls};
+
+mod db;
+mod value;
+
+#[derive(Debug)]
+pub struct PgConnectionPool;
+
+#[derive(Clone)]
+pub enum SslMode {
+    NoTls(NoTls),
+    NativeTls(postgres_native_tls::MakeTlsConnector),
+}
+
+impl Default for SslMode {
+    fn default() -> Self {
+        SslMode::NoTls(NoTls {})
+    }
+}
+
+impl Debug for SslMode {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let debug_text = match self {
+            Self::NoTls(_) => "NoTls",
+            Self::NativeTls(_) => "NativeTls",
+        };
+
+        write!(f, "SslMode::{debug_text}")
+    }
+}
+
+/// Postgres configuration
+#[derive(Clone, Debug)]
+pub struct PgConfig {
+    url: String,
+    tls: SslMode,
+}
+
+/// A simple wrapper for the async connect, this would trigger the `connect` in another tokio task
+/// that would eventually resolve
+#[derive(Debug)]
+pub struct FutureConnect {
+    timeout: Duration,
+    error: Arc<Mutex<Option<cdk_common::database::Error>>>,
+    result: Arc<OnceLock<Client>>,
+    notify: Arc<Notify>,
+}
+
+impl FutureConnect {
+    /// Creates a new instance
+    pub fn new(config: PgConfig, timeout: Duration, still_valid: Arc<AtomicBool>) -> Self {
+        let failed = Arc::new(Mutex::new(None));
+        let result = Arc::new(OnceLock::new());
+        let notify = Arc::new(Notify::new());
+        let error_clone = failed.clone();
+        let result_clone = result.clone();
+        let notify_clone = notify.clone();
+
+        tokio::spawn(async move {
+            match config.tls {
+                SslMode::NoTls(tls) => {
+                    let (client, connection) = match connect(&config.url, tls).await {
+                        Ok((client, connection)) => (client, connection),
+                        Err(err) => {
+                            *error_clone.lock().await =
+                                Some(cdk_common::database::Error::Database(Box::new(err)));
+                            still_valid.store(false, std::sync::atomic::Ordering::Release);
+                            notify_clone.notify_waiters();
+                            return;
+                        }
+                    };
+
+                    tokio::spawn(async move {
+                        let _ = connection.await;
+                        still_valid.store(false, std::sync::atomic::Ordering::Release);
+                    });
+
+                    let _ = result_clone.set(client);
+                    notify_clone.notify_waiters();
+                }
+                SslMode::NativeTls(tls) => {
+                    let (client, connection) = match connect(&config.url, tls).await {
+                        Ok((client, connection)) => (client, connection),
+                        Err(err) => {
+                            *error_clone.lock().await =
+                                Some(cdk_common::database::Error::Database(Box::new(err)));
+                            still_valid.store(false, std::sync::atomic::Ordering::Release);
+                            notify_clone.notify_waiters();
+                            return;
+                        }
+                    };
+
+                    tokio::spawn(async move {
+                        let _ = connection.await;
+                        still_valid.store(false, std::sync::atomic::Ordering::Release);
+                    });
+
+                    let _ = result_clone.set(client);
+                    notify_clone.notify_waiters();
+                }
+            }
+        });
+
+        Self {
+            error: failed,
+            timeout,
+            result,
+            notify,
+        }
+    }
+
+    /// Gets the wrapped instance or the connection error. The connection is returned as reference,
+    /// and the actual error is returned once, next times a generic error would be returned
+    pub async fn client(&self) -> Result<&Client, cdk_common::database::Error> {
+        if let Some(client) = self.result.get() {
+            return Ok(client);
+        }
+
+        if let Some(error) = self.error.lock().await.take() {
+            return Err(error);
+        }
+
+        if timeout(self.timeout, self.notify.notified()).await.is_err() {
+            return Err(cdk_common::database::Error::Internal("Timeout".to_owned()));
+        }
+
+        // Check result again
+        if let Some(client) = self.result.get() {
+            Ok(client)
+        } else if let Some(error) = self.error.lock().await.take() {
+            Err(error)
+        } else {
+            Err(cdk_common::database::Error::Internal(
+                "Failed connection".to_owned(),
+            ))
+        }
+    }
+}
+
+impl ResourceManager for PgConnectionPool {
+    type Config = PgConfig;
+
+    type Resource = FutureConnect;
+
+    type Error = PgError;
+
+    fn new_resource(
+        config: &Self::Config,
+        still_valid: Arc<AtomicBool>,
+        timeout: Duration,
+    ) -> Result<Self::Resource, cdk_sql_common::pool::Error<Self::Error>> {
+        Ok(FutureConnect::new(config.to_owned(), timeout, still_valid))
+    }
+}
+
+#[derive(Debug)]
+pub struct CdkPostgres {
+    pool: Arc<Pool<PgConnectionPool>>,
+}
+
+impl From<&str> for CdkPostgres {
+    fn from(value: &str) -> Self {
+        let config = PgConfig {
+            url: value.to_owned(),
+            tls: Default::default(),
+        };
+        let pool = Pool::<PgConnectionPool>::new(config, 10, Duration::from_secs(10));
+        CdkPostgres { pool }
+    }
+}
+
+pub struct CdkPostgresTx<'a> {
+    conn: Option<PooledResource<PgConnectionPool>>,
+    done: bool,
+    _phantom: PhantomData<&'a ()>,
+}
+
+impl Drop for CdkPostgresTx<'_> {
+    fn drop(&mut self) {
+        if let Some(conn) = self.conn.take() {
+            if !self.done {
+                tokio::spawn(async move {
+                    let _ = conn
+                        .client()
+                        .await
+                        .expect("client")
+                        .batch_execute("ROLLBACK")
+                        .await;
+                });
+            }
+        }
+    }
+}
+
+impl Debug for CdkPostgresTx<'_> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "PgTx")
+    }
+}
+
+#[async_trait::async_trait]
+impl DatabaseConnector for CdkPostgres {
+    type Transaction<'a> = CdkPostgresTx<'a>;
+
+    async fn begin(&self) -> Result<Self::Transaction<'_>, Error> {
+        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
+
+        conn.client()
+            .await?
+            .batch_execute("BEGIN TRANSACTION")
+            .await
+            .map_err(|e| Error::Database(Box::new(e)))?;
+
+        Ok(Self::Transaction {
+            conn: Some(conn),
+            done: false,
+            _phantom: PhantomData,
+        })
+    }
+}
+
+#[async_trait::async_trait]
+impl<'a> DatabaseTransaction<'a> for CdkPostgresTx<'a> {
+    async fn commit(mut self) -> Result<(), Error> {
+        self.conn
+            .as_ref()
+            .ok_or(Error::Internal("Missing connection".to_owned()))?
+            .client()
+            .await?
+            .batch_execute("COMMIT")
+            .await
+            .map_err(|e| Error::Database(Box::new(e)))?;
+        self.done = true;
+        Ok(())
+    }
+
+    async fn rollback(mut self) -> Result<(), Error> {
+        self.conn
+            .as_ref()
+            .ok_or(Error::Internal("Missing connection".to_owned()))?
+            .client()
+            .await?
+            .batch_execute("ROLLBACK")
+            .await
+            .map_err(|e| Error::Database(Box::new(e)))?;
+        self.done = true;
+        Ok(())
+    }
+}
+
+#[async_trait::async_trait]
+impl DatabaseExecutor for CdkPostgresTx<'_> {
+    fn name() -> &'static str {
+        "postgres"
+    }
+
+    async fn execute(&self, statement: Statement) -> Result<usize, Error> {
+        pg_execute(
+            self.conn
+                .as_ref()
+                .ok_or(Error::Internal("Missing connection".to_owned()))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
+        pg_fetch_one(
+            self.conn
+                .as_ref()
+                .ok_or(Error::Internal("Missing connection".to_owned()))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
+        pg_fetch_all(
+            self.conn
+                .as_ref()
+                .ok_or(Error::Internal("Missing connection".to_owned()))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
+        gn_pluck(
+            self.conn
+                .as_ref()
+                .ok_or(Error::Internal("Missing connection".to_owned()))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn batch(&self, statement: Statement) -> Result<(), Error> {
+        pg_batch(
+            self.conn
+                .as_ref()
+                .ok_or(Error::Internal("Missing connection".to_owned()))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+}
+
+#[async_trait::async_trait]
+impl DatabaseExecutor for CdkPostgres {
+    fn name() -> &'static str {
+        "postgres"
+    }
+
+    async fn execute(&self, statement: Statement) -> Result<usize, Error> {
+        pg_execute(
+            self.pool
+                .get()
+                .map_err(|e| Error::Database(Box::new(e)))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
+        pg_fetch_one(
+            self.pool
+                .get()
+                .map_err(|e| Error::Database(Box::new(e)))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
+        pg_fetch_all(
+            self.pool
+                .get()
+                .map_err(|e| Error::Database(Box::new(e)))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
+        gn_pluck(
+            self.pool
+                .get()
+                .map_err(|e| Error::Database(Box::new(e)))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+
+    async fn batch(&self, statement: Statement) -> Result<(), Error> {
+        pg_batch(
+            self.pool
+                .get()
+                .map_err(|e| Error::Database(Box::new(e)))?
+                .client()
+                .await?,
+            statement,
+        )
+        .await
+    }
+}
+
+/// Mint DB implementation with PostgreSQL
+pub type MintPgDatabase = SQLMintDatabase<CdkPostgres>;
+
+/// Mint Auth database with Postgres
+#[cfg(feature = "auth")]
+pub type MintPgAuthDatabase = SQLMintAuthDatabase<CdkPostgres>;
+
+/// Mint DB implementation with PostgresSQL
+pub type WalletPgDatabase = SQLWalletDatabase<CdkPostgres>;
+
+#[cfg(test)]
+mod test {
+    use cdk_common::mint_db_test;
+    use once_cell::sync::Lazy;
+    use tokio::sync::Mutex;
+
+    use super::*;
+
+    static MIGRATION_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
+
+    async fn provide_db() -> MintPgDatabase {
+        let m = MIGRATION_LOCK.lock().await;
+        let db_url = std::env::var("DATABASE_URL")
+            .unwrap_or("host=localhost user=test password=test dbname=testdb port=5433".to_owned());
+        let db = MintPgDatabase::new(db_url.as_str())
+            .await
+            .expect("database");
+        drop(m);
+        db
+    }
+
+    mint_db_test!(provide_db);
+}

+ 130 - 0
crates/cdk-postgres/src/value.rs

@@ -0,0 +1,130 @@
+use std::fmt::Debug;
+
+use cdk_sql_common::value::Value;
+use tokio_postgres::types::{self, FromSql, ToSql};
+
+#[derive(Debug)]
+pub enum PgValue<'a> {
+    Null,
+    Integer(i64),
+    Real(f64),
+    Text(&'a str),
+    Blob(&'a [u8]),
+}
+
+impl<'a> From<&'a Value> for PgValue<'a> {
+    fn from(value: &'a Value) -> Self {
+        match value {
+            Value::Blob(b) => PgValue::Blob(b),
+            Value::Text(text) => PgValue::Text(text.as_str()),
+            Value::Null => PgValue::Null,
+            Value::Integer(i) => PgValue::Integer(*i),
+            Value::Real(r) => PgValue::Real(*r),
+        }
+    }
+}
+
+impl<'a> From<PgValue<'a>> for Value {
+    fn from(val: PgValue<'a>) -> Self {
+        match val {
+            PgValue::Blob(value) => Value::Blob(value.to_owned()),
+            PgValue::Text(value) => Value::Text(value.to_owned()),
+            PgValue::Null => Value::Null,
+            PgValue::Integer(n) => Value::Integer(n),
+            PgValue::Real(r) => Value::Real(r),
+        }
+    }
+}
+
+impl<'a> FromSql<'a> for PgValue<'a> {
+    fn accepts(_ty: &types::Type) -> bool {
+        true
+    }
+
+    fn from_sql(
+        ty: &types::Type,
+        raw: &'a [u8],
+    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
+        Ok(match *ty {
+            types::Type::VARCHAR | types::Type::TEXT | types::Type::BPCHAR | types::Type::NAME => {
+                PgValue::Text(<&str as FromSql>::from_sql(ty, raw)?)
+            }
+            types::Type::BOOL => PgValue::Integer(if <bool as FromSql>::from_sql(ty, raw)? {
+                1
+            } else {
+                0
+            }),
+            types::Type::INT2 | types::Type::INT4 | types::Type::INT8 => {
+                PgValue::Integer(<i64 as FromSql>::from_sql(ty, raw)?)
+            }
+            types::Type::BIT_ARRAY | types::Type::BYTEA | types::Type::UNKNOWN => {
+                PgValue::Blob(<&[u8] as FromSql>::from_sql(ty, raw)?)
+            }
+            _ => panic!("Unsupported type {ty:?}"),
+        })
+    }
+
+    fn from_sql_null(_ty: &types::Type) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
+        Ok(PgValue::Null)
+    }
+}
+
+impl ToSql for PgValue<'_> {
+    fn to_sql(
+        &self,
+        ty: &types::Type,
+        out: &mut types::private::BytesMut,
+    ) -> Result<types::IsNull, Box<dyn std::error::Error + Sync + Send>>
+    where
+        Self: Sized,
+    {
+        match self {
+            PgValue::Blob(blob) => blob.to_sql(ty, out),
+            PgValue::Text(text) => text.to_sql(ty, out),
+            PgValue::Null => Ok(types::IsNull::Yes),
+            PgValue::Real(r) => r.to_sql(ty, out),
+            PgValue::Integer(i) => match *ty {
+                types::Type::BOOL => (*i != 0).to_sql(ty, out),
+                types::Type::INT2 => (*i as i16).to_sql(ty, out),
+                types::Type::INT4 => (*i as i32).to_sql(ty, out),
+                _ => i.to_sql_checked(ty, out),
+            },
+        }
+    }
+
+    fn accepts(_ty: &types::Type) -> bool
+    where
+        Self: Sized,
+    {
+        true
+    }
+
+    fn encode_format(&self, ty: &types::Type) -> types::Format {
+        match self {
+            PgValue::Blob(blob) => blob.encode_format(ty),
+            PgValue::Text(text) => text.encode_format(ty),
+            PgValue::Null => types::Format::Text,
+            PgValue::Real(r) => r.encode_format(ty),
+            PgValue::Integer(i) => i.encode_format(ty),
+        }
+    }
+
+    fn to_sql_checked(
+        &self,
+        ty: &types::Type,
+        out: &mut types::private::BytesMut,
+    ) -> Result<types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
+        match self {
+            PgValue::Blob(blob) => blob.to_sql_checked(ty, out),
+            PgValue::Text(text) => text.to_sql_checked(ty, out),
+            PgValue::Null => Ok(types::IsNull::Yes),
+            PgValue::Real(r) => r.to_sql_checked(ty, out),
+            PgValue::Integer(i) => match *ty {
+                types::Type::BOOL => (*i != 0).to_sql_checked(ty, out),
+                types::Type::INT2 => (*i as i16).to_sql_checked(ty, out),
+                types::Type::INT4 => (*i as i32).to_sql_checked(ty, out),
+                _ => i.to_sql_checked(ty, out),
+            },
+        }
+    }
+}

+ 26 - 0
crates/cdk-postgres/start_db_for_test.sh

@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+CONTAINER_NAME="rust-test-pg"
+DB_USER="test"
+DB_PASS="test"
+DB_NAME="testdb"
+DB_PORT="5433"
+
+echo "Starting fresh PostgreSQL container..."
+docker run -d --rm \
+  --name "${CONTAINER_NAME}" \
+  -e POSTGRES_USER="${DB_USER}" \
+  -e POSTGRES_PASSWORD="${DB_PASS}" \
+  -e POSTGRES_DB="${DB_NAME}" \
+  -p ${DB_PORT}:5432 \
+  postgres:16
+
+echo "Waiting for PostgreSQL to be ready and database '${DB_NAME}' to exist..."
+until docker exec -e PGPASSWORD="${DB_PASS}" "${CONTAINER_NAME}" \
+    psql -U "${DB_USER}" -d "${DB_NAME}" -c "SELECT 1;" >/dev/null 2>&1; do
+  sleep 0.5
+done
+
+export DATABASE_URL="host=localhost user=${DB_USER} password=${DB_PASS} dbname=${DB_NAME} port=${DB_PORT}"
+