db.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. use cdk_common::database::Error;
  2. use cdk_sql_common::run_db_operation;
  3. use cdk_sql_common::stmt::{Column, Statement};
  4. use futures_util::{pin_mut, TryStreamExt};
  5. use tokio_postgres::error::SqlState;
  6. use tokio_postgres::{Client, Error as PgError};
  7. use crate::value::PgValue;
  8. #[inline(always)]
  9. fn to_pgsql_error(err: PgError) -> Error {
  10. if let Some(err) = err.as_db_error() {
  11. let code = err.code().to_owned();
  12. if code == SqlState::INTEGRITY_CONSTRAINT_VIOLATION || code == SqlState::UNIQUE_VIOLATION {
  13. return Error::Duplicate;
  14. }
  15. }
  16. Error::Database(Box::new(err))
  17. }
  18. #[inline(always)]
  19. pub async fn pg_batch(conn: &Client, statement: Statement) -> Result<(), Error> {
  20. let (sql, _placeholder_values) = statement.to_sql()?;
  21. run_db_operation(&sql, conn.batch_execute(&sql), to_pgsql_error).await
  22. }
  23. #[inline(always)]
  24. pub async fn pg_execute(conn: &Client, statement: Statement) -> Result<usize, Error> {
  25. let (sql, placeholder_values) = statement.to_sql()?;
  26. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  27. run_db_operation(
  28. &sql,
  29. async {
  30. conn.execute_raw(
  31. &prepared_statement,
  32. placeholder_values
  33. .iter()
  34. .map(|x| x.into())
  35. .collect::<Vec<PgValue>>(),
  36. )
  37. .await
  38. .map(|x| x as usize)
  39. },
  40. to_pgsql_error,
  41. )
  42. .await
  43. }
  44. #[inline(always)]
  45. pub async fn pg_fetch_one(
  46. conn: &Client,
  47. statement: Statement,
  48. ) -> Result<Option<Vec<Column>>, Error> {
  49. let (sql, placeholder_values) = statement.to_sql()?;
  50. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  51. run_db_operation(
  52. &sql,
  53. async {
  54. let stream = conn
  55. .query_raw(
  56. &prepared_statement,
  57. placeholder_values
  58. .iter()
  59. .map(|x| x.into())
  60. .collect::<Vec<PgValue>>(),
  61. )
  62. .await?;
  63. pin_mut!(stream);
  64. stream
  65. .try_next()
  66. .await?
  67. .map(|row| {
  68. (0..row.len())
  69. .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
  70. .collect::<Result<Vec<_>, _>>()
  71. })
  72. .transpose()
  73. },
  74. to_pgsql_error,
  75. )
  76. .await
  77. }
  78. #[inline(always)]
  79. pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
  80. let (sql, placeholder_values) = statement.to_sql()?;
  81. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  82. run_db_operation(
  83. &sql,
  84. async {
  85. let stream = conn
  86. .query_raw(
  87. &prepared_statement,
  88. placeholder_values
  89. .iter()
  90. .map(|x| x.into())
  91. .collect::<Vec<PgValue>>(),
  92. )
  93. .await?;
  94. pin_mut!(stream);
  95. let mut rows = vec![];
  96. while let Some(row) = stream.try_next().await? {
  97. rows.push(
  98. (0..row.len())
  99. .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
  100. .collect::<Result<Vec<_>, _>>()?,
  101. );
  102. }
  103. Ok(rows)
  104. },
  105. to_pgsql_error,
  106. )
  107. .await
  108. }
  109. #[inline(always)]
  110. pub async fn pg_pluck(conn: &Client, statement: Statement) -> Result<Option<Column>, Error> {
  111. let (sql, placeholder_values) = statement.to_sql()?;
  112. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  113. run_db_operation(
  114. &sql,
  115. async {
  116. let stream = conn
  117. .query_raw(
  118. &prepared_statement,
  119. placeholder_values
  120. .iter()
  121. .map(|x| x.into())
  122. .collect::<Vec<PgValue>>(),
  123. )
  124. .await?;
  125. pin_mut!(stream);
  126. stream
  127. .try_next()
  128. .await?
  129. .map(|row| row.try_get::<_, PgValue>(0).map(|value| value.into()))
  130. .transpose()
  131. },
  132. to_pgsql_error,
  133. )
  134. .await
  135. }