keyvalue.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. //! Generic KV Store implementations for SQL databases
  2. //!
  3. //! This module provides generic implementations of KVStore traits that can be
  4. //! used by both mint and wallet database implementations.
  5. use std::sync::Arc;
  6. use cdk_common::database::{validate_kvstore_params, Error};
  7. use cdk_common::util::unix_time;
  8. use crate::column_as_string;
  9. #[cfg(feature = "mint")]
  10. use crate::database::ConnectionWithTransaction;
  11. #[cfg(feature = "mint")]
  12. use crate::pool::PooledResource;
  13. use crate::pool::{DatabasePool, Pool};
  14. use crate::stmt::{query, Column};
  15. /// Generic implementation of KVStoreTransaction for SQL databases
  16. #[cfg(feature = "mint")]
  17. pub(crate) async fn kv_read_in_transaction<RM>(
  18. conn: &ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
  19. primary_namespace: &str,
  20. secondary_namespace: &str,
  21. key: &str,
  22. ) -> Result<Option<Vec<u8>>, Error>
  23. where
  24. RM: DatabasePool,
  25. {
  26. // Validate parameters according to KV store requirements
  27. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  28. Ok(query(
  29. r#"
  30. SELECT value
  31. FROM kv_store
  32. WHERE primary_namespace = :primary_namespace
  33. AND secondary_namespace = :secondary_namespace
  34. AND key = :key
  35. "#,
  36. )?
  37. .bind("primary_namespace", primary_namespace.to_owned())
  38. .bind("secondary_namespace", secondary_namespace.to_owned())
  39. .bind("key", key.to_owned())
  40. .pluck(conn)
  41. .await?
  42. .and_then(|col| match col {
  43. Column::Blob(data) => Some(data),
  44. _ => None,
  45. }))
  46. }
  47. /// Generic implementation of kv_write for transactions
  48. #[cfg(feature = "mint")]
  49. pub(crate) async fn kv_write_in_transaction<RM>(
  50. conn: &ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
  51. primary_namespace: &str,
  52. secondary_namespace: &str,
  53. key: &str,
  54. value: &[u8],
  55. ) -> Result<(), Error>
  56. where
  57. RM: DatabasePool,
  58. {
  59. // Validate parameters according to KV store requirements
  60. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  61. let current_time = unix_time();
  62. query(
  63. r#"
  64. INSERT INTO kv_store
  65. (primary_namespace, secondary_namespace, key, value, created_time, updated_time)
  66. VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
  67. ON CONFLICT(primary_namespace, secondary_namespace, key)
  68. DO UPDATE SET
  69. value = excluded.value,
  70. updated_time = excluded.updated_time
  71. "#,
  72. )?
  73. .bind("primary_namespace", primary_namespace.to_owned())
  74. .bind("secondary_namespace", secondary_namespace.to_owned())
  75. .bind("key", key.to_owned())
  76. .bind("value", value.to_vec())
  77. .bind("created_time", current_time as i64)
  78. .bind("updated_time", current_time as i64)
  79. .execute(conn)
  80. .await?;
  81. Ok(())
  82. }
  83. /// Generic implementation of kv_remove for transactions
  84. #[cfg(feature = "mint")]
  85. pub(crate) async fn kv_remove_in_transaction<RM>(
  86. conn: &ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
  87. primary_namespace: &str,
  88. secondary_namespace: &str,
  89. key: &str,
  90. ) -> Result<(), Error>
  91. where
  92. RM: DatabasePool,
  93. {
  94. // Validate parameters according to KV store requirements
  95. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  96. query(
  97. r#"
  98. DELETE FROM kv_store
  99. WHERE primary_namespace = :primary_namespace
  100. AND secondary_namespace = :secondary_namespace
  101. AND key = :key
  102. "#,
  103. )?
  104. .bind("primary_namespace", primary_namespace.to_owned())
  105. .bind("secondary_namespace", secondary_namespace.to_owned())
  106. .bind("key", key.to_owned())
  107. .execute(conn)
  108. .await?;
  109. Ok(())
  110. }
  111. /// Generic implementation of kv_list for transactions
  112. #[cfg(feature = "mint")]
  113. pub(crate) async fn kv_list_in_transaction<RM>(
  114. conn: &ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
  115. primary_namespace: &str,
  116. secondary_namespace: &str,
  117. ) -> Result<Vec<String>, Error>
  118. where
  119. RM: DatabasePool,
  120. {
  121. // Validate namespace parameters according to KV store requirements
  122. validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
  123. query(
  124. r#"
  125. SELECT key
  126. FROM kv_store
  127. WHERE primary_namespace = :primary_namespace
  128. AND secondary_namespace = :secondary_namespace
  129. ORDER BY key
  130. "#,
  131. )?
  132. .bind("primary_namespace", primary_namespace.to_owned())
  133. .bind("secondary_namespace", secondary_namespace.to_owned())
  134. .fetch_all(conn)
  135. .await?
  136. .into_iter()
  137. .map(|row| Ok(column_as_string!(&row[0])))
  138. .collect::<Result<Vec<_>, Error>>()
  139. }
  140. /// Generic implementation of kv_read for database (non-transactional)
  141. pub(crate) async fn kv_read<RM>(
  142. pool: &Arc<Pool<RM>>,
  143. primary_namespace: &str,
  144. secondary_namespace: &str,
  145. key: &str,
  146. ) -> Result<Option<Vec<u8>>, Error>
  147. where
  148. RM: DatabasePool + 'static,
  149. {
  150. // Validate parameters according to KV store requirements
  151. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  152. let conn = pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  153. Ok(query(
  154. r#"
  155. SELECT value
  156. FROM kv_store
  157. WHERE primary_namespace = :primary_namespace
  158. AND secondary_namespace = :secondary_namespace
  159. AND key = :key
  160. "#,
  161. )?
  162. .bind("primary_namespace", primary_namespace.to_owned())
  163. .bind("secondary_namespace", secondary_namespace.to_owned())
  164. .bind("key", key.to_owned())
  165. .pluck(&*conn)
  166. .await?
  167. .and_then(|col| match col {
  168. Column::Blob(data) => Some(data),
  169. _ => None,
  170. }))
  171. }
  172. /// Generic implementation of kv_list for database (non-transactional)
  173. pub(crate) async fn kv_list<RM>(
  174. pool: &Arc<Pool<RM>>,
  175. primary_namespace: &str,
  176. secondary_namespace: &str,
  177. ) -> Result<Vec<String>, Error>
  178. where
  179. RM: DatabasePool + 'static,
  180. {
  181. // Validate namespace parameters according to KV store requirements
  182. validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
  183. let conn = pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  184. query(
  185. r#"
  186. SELECT key
  187. FROM kv_store
  188. WHERE primary_namespace = :primary_namespace
  189. AND secondary_namespace = :secondary_namespace
  190. ORDER BY key
  191. "#,
  192. )?
  193. .bind("primary_namespace", primary_namespace.to_owned())
  194. .bind("secondary_namespace", secondary_namespace.to_owned())
  195. .fetch_all(&*conn)
  196. .await?
  197. .into_iter()
  198. .map(|row| Ok(column_as_string!(&row[0])))
  199. .collect::<Result<Vec<_>, Error>>()
  200. }
  201. /// Generic implementation of kv_write for database (non-transactional, standalone)
  202. #[cfg(feature = "wallet")]
  203. pub(crate) async fn kv_write_standalone<C>(
  204. conn: &C,
  205. primary_namespace: &str,
  206. secondary_namespace: &str,
  207. key: &str,
  208. value: &[u8],
  209. ) -> Result<(), Error>
  210. where
  211. C: crate::database::DatabaseExecutor,
  212. {
  213. // Validate parameters according to KV store requirements
  214. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  215. let current_time = unix_time();
  216. query(
  217. r#"
  218. INSERT INTO kv_store
  219. (primary_namespace, secondary_namespace, key, value, created_time, updated_time)
  220. VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
  221. ON CONFLICT(primary_namespace, secondary_namespace, key)
  222. DO UPDATE SET
  223. value = excluded.value,
  224. updated_time = excluded.updated_time
  225. "#,
  226. )?
  227. .bind("primary_namespace", primary_namespace.to_owned())
  228. .bind("secondary_namespace", secondary_namespace.to_owned())
  229. .bind("key", key.to_owned())
  230. .bind("value", value.to_vec())
  231. .bind("created_time", current_time as i64)
  232. .bind("updated_time", current_time as i64)
  233. .execute(conn)
  234. .await?;
  235. Ok(())
  236. }
  237. /// Generic implementation of kv_remove for database (non-transactional, standalone)
  238. #[cfg(feature = "wallet")]
  239. pub(crate) async fn kv_remove_standalone<C>(
  240. conn: &C,
  241. primary_namespace: &str,
  242. secondary_namespace: &str,
  243. key: &str,
  244. ) -> Result<(), Error>
  245. where
  246. C: crate::database::DatabaseExecutor,
  247. {
  248. // Validate parameters according to KV store requirements
  249. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  250. query(
  251. r#"
  252. DELETE FROM kv_store
  253. WHERE primary_namespace = :primary_namespace
  254. AND secondary_namespace = :secondary_namespace
  255. AND key = :key
  256. "#,
  257. )?
  258. .bind("primary_namespace", primary_namespace.to_owned())
  259. .bind("secondary_namespace", secondary_namespace.to_owned())
  260. .bind("key", key.to_owned())
  261. .execute(conn)
  262. .await?;
  263. Ok(())
  264. }