db.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. use cdk_common::database::Error;
  2. use cdk_sql_common::run_db_operation;
  3. use cdk_sql_common::stmt::{Column, Statement};
  4. use futures_util::{pin_mut, TryStreamExt};
  5. use tokio_postgres::error::SqlState;
  6. use tokio_postgres::{Client, Error as PgError};
  7. use crate::value::PgValue;
  8. #[inline(always)]
  9. fn to_pgsql_error(err: PgError) -> Error {
  10. if let Some(err) = err.as_db_error() {
  11. let code = err.code().to_owned();
  12. if code == SqlState::INTEGRITY_CONSTRAINT_VIOLATION || code == SqlState::UNIQUE_VIOLATION {
  13. return Error::Duplicate;
  14. }
  15. if code == SqlState::T_R_DEADLOCK_DETECTED {
  16. return Error::Locked;
  17. }
  18. }
  19. Error::Database(Box::new(err))
  20. }
  21. #[inline(always)]
  22. pub async fn pg_batch(conn: &Client, statement: Statement) -> Result<(), Error> {
  23. let (sql, _placeholder_values) = statement.to_sql()?;
  24. run_db_operation(&sql, conn.batch_execute(&sql), to_pgsql_error).await
  25. }
  26. #[inline(always)]
  27. pub async fn pg_execute(conn: &Client, statement: Statement) -> Result<usize, Error> {
  28. let (sql, placeholder_values) = statement.to_sql()?;
  29. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  30. run_db_operation(
  31. &sql,
  32. async {
  33. conn.execute_raw(
  34. &prepared_statement,
  35. placeholder_values
  36. .iter()
  37. .map(|x| x.into())
  38. .collect::<Vec<PgValue>>(),
  39. )
  40. .await
  41. .map(|x| x as usize)
  42. },
  43. to_pgsql_error,
  44. )
  45. .await
  46. }
  47. #[inline(always)]
  48. pub async fn pg_fetch_one(
  49. conn: &Client,
  50. statement: Statement,
  51. ) -> Result<Option<Vec<Column>>, Error> {
  52. let (sql, placeholder_values) = statement.to_sql()?;
  53. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  54. run_db_operation(
  55. &sql,
  56. async {
  57. let stream = conn
  58. .query_raw(
  59. &prepared_statement,
  60. placeholder_values
  61. .iter()
  62. .map(|x| x.into())
  63. .collect::<Vec<PgValue>>(),
  64. )
  65. .await?;
  66. pin_mut!(stream);
  67. stream
  68. .try_next()
  69. .await?
  70. .map(|row| {
  71. (0..row.len())
  72. .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
  73. .collect::<Result<Vec<_>, _>>()
  74. })
  75. .transpose()
  76. },
  77. to_pgsql_error,
  78. )
  79. .await
  80. }
  81. #[inline(always)]
  82. pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
  83. let (sql, placeholder_values) = statement.to_sql()?;
  84. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  85. run_db_operation(
  86. &sql,
  87. async {
  88. let stream = conn
  89. .query_raw(
  90. &prepared_statement,
  91. placeholder_values
  92. .iter()
  93. .map(|x| x.into())
  94. .collect::<Vec<PgValue>>(),
  95. )
  96. .await?;
  97. pin_mut!(stream);
  98. let mut rows = vec![];
  99. while let Some(row) = stream.try_next().await? {
  100. rows.push(
  101. (0..row.len())
  102. .map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
  103. .collect::<Result<Vec<_>, _>>()?,
  104. );
  105. }
  106. Ok(rows)
  107. },
  108. to_pgsql_error,
  109. )
  110. .await
  111. }
  112. #[inline(always)]
  113. pub async fn pg_pluck(conn: &Client, statement: Statement) -> Result<Option<Column>, Error> {
  114. let (sql, placeholder_values) = statement.to_sql()?;
  115. let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
  116. run_db_operation(
  117. &sql,
  118. async {
  119. let stream = conn
  120. .query_raw(
  121. &prepared_statement,
  122. placeholder_values
  123. .iter()
  124. .map(|x| x.into())
  125. .collect::<Vec<PgValue>>(),
  126. )
  127. .await?;
  128. pin_mut!(stream);
  129. stream
  130. .try_next()
  131. .await?
  132. .map(|row| row.try_get::<_, PgValue>(0).map(|value| value.into()))
  133. .transpose()
  134. },
  135. to_pgsql_error,
  136. )
  137. .await
  138. }