|
|
@@ -1,26 +1,32 @@
|
|
|
use cdk_common::database::Error;
|
|
|
use cdk_sql_common::stmt::{Column, Statement};
|
|
|
use futures_util::{pin_mut, TryStreamExt};
|
|
|
-use tokio_postgres::Client;
|
|
|
+use tokio_postgres::{error::SqlState, Client, Error as PgError};
|
|
|
|
|
|
use crate::value::PgValue;
|
|
|
|
|
|
#[inline(always)]
|
|
|
+fn to_pgsql_error(err: PgError) -> Error {
|
|
|
+ if let Some(err) = err.as_db_error() {
|
|
|
+ if *err.code() == SqlState::INTEGRITY_CONSTRAINT_VIOLATION {
|
|
|
+ return Error::Duplicate;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Error::Database(Box::new(err))
|
|
|
+}
|
|
|
+
|
|
|
+#[inline(always)]
|
|
|
pub async fn pg_batch(conn: &Client, statement: Statement) -> Result<(), Error> {
|
|
|
let (sql, _placeholder_values) = statement.to_sql()?;
|
|
|
|
|
|
- conn.batch_execute(&sql)
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))
|
|
|
+ conn.batch_execute(&sql).await.map_err(to_pgsql_error)
|
|
|
}
|
|
|
|
|
|
#[inline(always)]
|
|
|
pub async fn pg_execute(conn: &Client, statement: Statement) -> Result<usize, Error> {
|
|
|
let (sql, placeholder_values) = statement.to_sql()?;
|
|
|
- let prepared_statement = conn
|
|
|
- .prepare(&sql)
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?;
|
|
|
+ let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
|
|
|
|
|
|
conn.execute_raw(
|
|
|
&prepared_statement,
|
|
|
@@ -30,7 +36,7 @@ pub async fn pg_execute(conn: &Client, statement: Statement) -> Result<usize, Er
|
|
|
.collect::<Vec<PgValue>>(),
|
|
|
)
|
|
|
.await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))
|
|
|
+ .map_err(to_pgsql_error)
|
|
|
.map(|x| x as usize)
|
|
|
}
|
|
|
|
|
|
@@ -40,10 +46,7 @@ pub async fn pg_fetch_one(
|
|
|
statement: Statement,
|
|
|
) -> Result<Option<Vec<Column>>, Error> {
|
|
|
let (sql, placeholder_values) = statement.to_sql()?;
|
|
|
- let prepared_statement = conn
|
|
|
- .prepare(&sql)
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?;
|
|
|
+ let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
|
|
|
|
|
|
let stream = conn
|
|
|
.query_raw(
|
|
|
@@ -54,30 +57,27 @@ pub async fn pg_fetch_one(
|
|
|
.collect::<Vec<PgValue>>(),
|
|
|
)
|
|
|
.await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?;
|
|
|
+ .map_err(to_pgsql_error)?;
|
|
|
|
|
|
pin_mut!(stream);
|
|
|
|
|
|
stream
|
|
|
.try_next()
|
|
|
.await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?
|
|
|
+ .map_err(to_pgsql_error)?
|
|
|
.map(|row| {
|
|
|
(0..row.len())
|
|
|
.map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
|
|
|
.collect::<Result<Vec<_>, _>>()
|
|
|
})
|
|
|
.transpose()
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))
|
|
|
+ .map_err(to_pgsql_error)
|
|
|
}
|
|
|
|
|
|
#[inline(always)]
|
|
|
pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
|
|
|
let (sql, placeholder_values) = statement.to_sql()?;
|
|
|
- let prepared_statement = conn
|
|
|
- .prepare(&sql)
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?;
|
|
|
+ let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
|
|
|
|
|
|
let stream = conn
|
|
|
.query_raw(
|
|
|
@@ -88,21 +88,17 @@ pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec
|
|
|
.collect::<Vec<PgValue>>(),
|
|
|
)
|
|
|
.await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?;
|
|
|
+ .map_err(to_pgsql_error)?;
|
|
|
|
|
|
pin_mut!(stream);
|
|
|
|
|
|
let mut rows = vec![];
|
|
|
- while let Some(row) = stream
|
|
|
- .try_next()
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?
|
|
|
- {
|
|
|
+ while let Some(row) = stream.try_next().await.map_err(to_pgsql_error)? {
|
|
|
rows.push(
|
|
|
(0..row.len())
|
|
|
.map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
|
|
|
.collect::<Result<Vec<_>, _>>()
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?,
|
|
|
+ .map_err(to_pgsql_error)?,
|
|
|
);
|
|
|
}
|
|
|
|
|
|
@@ -112,10 +108,7 @@ pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec
|
|
|
#[inline(always)]
|
|
|
pub async fn gn_pluck(conn: &Client, statement: Statement) -> Result<Option<Column>, Error> {
|
|
|
let (sql, placeholder_values) = statement.to_sql()?;
|
|
|
- let prepared_statement = conn
|
|
|
- .prepare(&sql)
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?;
|
|
|
+ let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
|
|
|
|
|
|
let stream = conn
|
|
|
.query_raw(
|
|
|
@@ -126,15 +119,15 @@ pub async fn gn_pluck(conn: &Client, statement: Statement) -> Result<Option<Colu
|
|
|
.collect::<Vec<PgValue>>(),
|
|
|
)
|
|
|
.await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?;
|
|
|
+ .map_err(to_pgsql_error)?;
|
|
|
|
|
|
pin_mut!(stream);
|
|
|
|
|
|
stream
|
|
|
.try_next()
|
|
|
.await
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))?
|
|
|
+ .map_err(to_pgsql_error)?
|
|
|
.map(|row| row.try_get::<_, PgValue>(0).map(|value| value.into()))
|
|
|
.transpose()
|
|
|
- .map_err(|e| Error::Database(Box::new(e)))
|
|
|
+ .map_err(to_pgsql_error)
|
|
|
}
|