mod.rs 84 KB


  1. //! SQL database implementation of the Mint
  2. //!
  3. //! This is a generic SQL implementation for the mint storage layer. Any database can be plugged in
  4. //! as long as standard ANSI SQL is used, as Postgres and SQLite would understand it.
  5. //!
  6. //! This implementation also has a rudimentary but standard migration and versioning system.
  7. //!
  8. //! The trait expects an asynchronous interaction, but it also provides tools to spawn blocking
  9. //! clients in a pool and expose them to an asynchronous environment, making them compatible with
  10. //! Mint.
  11. use std::collections::HashMap;
  12. use std::fmt::Debug;
  13. use std::str::FromStr;
  14. use std::sync::Arc;
  15. use async_trait::async_trait;
  16. use bitcoin::bip32::DerivationPath;
  17. use cdk_common::database::mint::{validate_kvstore_params, SagaDatabase, SagaTransaction};
  18. use cdk_common::database::{
  19. self, ConversionError, DbTransactionFinalizer, Error, MintDatabase, MintKeyDatabaseTransaction,
  20. MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
  21. MintSignatureTransaction, MintSignaturesDatabase,
  22. };
  23. use cdk_common::mint::{
  24. self, IncomingPayment, Issuance, MeltPaymentRequest, MeltQuote, MintKeySetInfo, MintQuote,
  25. Operation,
  26. };
  27. use cdk_common::nut00::ProofsMethods;
  28. use cdk_common::payment::PaymentIdentifier;
  29. use cdk_common::quote_id::QuoteId;
  30. use cdk_common::secret::Secret;
  31. use cdk_common::state::{check_melt_quote_state_transition, check_state_transition};
  32. use cdk_common::util::unix_time;
  33. use cdk_common::{
  34. Amount, BlindSignature, BlindSignatureDleq, BlindedMessage, CurrencyUnit, Id, MeltQuoteState,
  35. PaymentMethod, Proof, Proofs, PublicKey, SecretKey, State,
  36. };
  37. use lightning_invoice::Bolt11Invoice;
  38. use migrations::MIGRATIONS;
  39. use tracing::instrument;
  40. use crate::common::migrate;
  41. use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
  42. use crate::pool::{DatabasePool, Pool, PooledResource};
  43. use crate::stmt::{query, Column};
  44. use crate::{
  45. column_as_nullable_number, column_as_nullable_string, column_as_number, column_as_string,
  46. unpack_into,
  47. };
  48. #[cfg(feature = "auth")]
  49. mod auth;
  50. #[rustfmt::skip]
  51. mod migrations {
  52. include!(concat!(env!("OUT_DIR"), "/migrations_mint.rs"));
  53. }
  54. #[cfg(feature = "auth")]
  55. pub use auth::SQLMintAuthDatabase;
  56. #[cfg(feature = "prometheus")]
  57. use cdk_prometheus::METRICS;
  58. /// Mint SQL Database
  59. #[derive(Debug, Clone)]
  60. pub struct SQLMintDatabase<RM>
  61. where
  62. RM: DatabasePool + 'static,
  63. {
  64. pool: Arc<Pool<RM>>,
  65. }
  66. /// SQL Transaction Writer
  67. pub struct SQLTransaction<RM>
  68. where
  69. RM: DatabasePool + 'static,
  70. {
  71. inner: ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
  72. }
  73. #[inline(always)]
  74. async fn get_current_states<C>(
  75. conn: &C,
  76. ys: &[PublicKey],
  77. ) -> Result<HashMap<PublicKey, State>, Error>
  78. where
  79. C: DatabaseExecutor + Send + Sync,
  80. {
  81. if ys.is_empty() {
  82. return Ok(Default::default());
  83. }
  84. query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
  85. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  86. .fetch_all(conn)
  87. .await?
  88. .into_iter()
  89. .map(|row| {
  90. Ok((
  91. column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
  92. column_as_string!(&row[1], State::from_str),
  93. ))
  94. })
  95. .collect::<Result<HashMap<_, _>, _>>()
  96. }
  97. impl<RM> SQLMintDatabase<RM>
  98. where
  99. RM: DatabasePool + 'static,
  100. {
  101. /// Creates a new instance
  102. pub async fn new<X>(db: X) -> Result<Self, Error>
  103. where
  104. X: Into<RM::Config>,
  105. {
  106. let pool = Pool::new(db.into());
  107. Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
  108. Ok(Self { pool })
  109. }
  110. /// Migrate
  111. async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
  112. let tx = ConnectionWithTransaction::new(conn).await?;
  113. migrate(&tx, RM::Connection::name(), MIGRATIONS).await?;
  114. tx.commit().await?;
  115. Ok(())
  116. }
  117. }
  118. #[async_trait]
  119. impl<RM> database::MintProofsTransaction<'_> for SQLTransaction<RM>
  120. where
  121. RM: DatabasePool + 'static,
  122. {
  123. type Err = Error;
  124. async fn add_proofs(
  125. &mut self,
  126. proofs: Proofs,
  127. quote_id: Option<QuoteId>,
  128. operation: &Operation,
  129. ) -> Result<(), Self::Err> {
  130. let current_time = unix_time();
  131. // Check any previous proof, this query should return None in order to proceed storing
  132. // Any result here would error
  133. match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
  134. .bind_vec(
  135. "ys",
  136. proofs
  137. .iter()
  138. .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
  139. .collect::<Result<_, _>>()?,
  140. )
  141. .pluck(&self.inner)
  142. .await?
  143. .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
  144. .transpose()?
  145. {
  146. Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
  147. Some(_) => Err(database::Error::Duplicate),
  148. None => Ok(()), // no previous record
  149. }?;
  150. for proof in proofs {
  151. query(
  152. r#"
  153. INSERT INTO proof
  154. (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
  155. VALUES
  156. (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
  157. "#,
  158. )?
  159. .bind("y", proof.y()?.to_bytes().to_vec())
  160. .bind("amount", proof.amount.to_i64())
  161. .bind("keyset_id", proof.keyset_id.to_string())
  162. .bind("secret", proof.secret.to_string())
  163. .bind("c", proof.c.to_bytes().to_vec())
  164. .bind(
  165. "witness",
  166. proof.witness.map(|w| serde_json::to_string(&w).unwrap()),
  167. )
  168. .bind("state", "UNSPENT".to_string())
  169. .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
  170. .bind("created_time", current_time as i64)
  171. .bind("operation_kind", operation.kind())
  172. .bind("operation_id", operation.id().to_string())
  173. .execute(&self.inner)
  174. .await?;
  175. }
  176. Ok(())
  177. }
  178. async fn update_proofs_states(
  179. &mut self,
  180. ys: &[PublicKey],
  181. new_state: State,
  182. ) -> Result<Vec<Option<State>>, Self::Err> {
  183. let mut current_states = get_current_states(&self.inner, ys).await?;
  184. if current_states.len() != ys.len() {
  185. tracing::warn!(
  186. "Attempted to update state of non-existent proof {} {}",
  187. current_states.len(),
  188. ys.len()
  189. );
  190. return Err(database::Error::ProofNotFound);
  191. }
  192. for state in current_states.values() {
  193. check_state_transition(*state, new_state)?;
  194. }
  195. query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
  196. .bind("new_state", new_state.to_string())
  197. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  198. .execute(&self.inner)
  199. .await?;
  200. if new_state == State::Spent {
  201. query(
  202. r#"
  203. INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
  204. SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
  205. FROM proof
  206. WHERE y IN (:ys)
  207. GROUP BY keyset_id
  208. ON CONFLICT (keyset_id)
  209. DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
  210. "#,
  211. )?
  212. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  213. .execute(&self.inner)
  214. .await?;
  215. }
  216. Ok(ys.iter().map(|y| current_states.remove(y)).collect())
  217. }
  218. async fn remove_proofs(
  219. &mut self,
  220. ys: &[PublicKey],
  221. _quote_id: Option<QuoteId>,
  222. ) -> Result<(), Self::Err> {
  223. if ys.is_empty() {
  224. return Ok(());
  225. }
  226. let total_deleted = query(
  227. r#"
  228. DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
  229. "#,
  230. )?
  231. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  232. .bind_vec("exclude_state", vec![State::Spent.to_string()])
  233. .execute(&self.inner)
  234. .await?;
  235. if total_deleted != ys.len() {
  236. // Query current states to provide detailed logging
  237. let current_states = get_current_states(&self.inner, ys).await?;
  238. let missing_count = ys.len() - current_states.len();
  239. let spent_count = current_states
  240. .values()
  241. .filter(|s| **s == State::Spent)
  242. .count();
  243. if missing_count > 0 {
  244. tracing::warn!(
  245. "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
  246. missing_count,
  247. ys.len()
  248. );
  249. }
  250. if spent_count > 0 {
  251. tracing::warn!(
  252. "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
  253. spent_count,
  254. ys.len()
  255. );
  256. }
  257. tracing::debug!(
  258. "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
  259. ys.len(),
  260. total_deleted,
  261. missing_count,
  262. spent_count
  263. );
  264. return Err(Self::Err::AttemptRemoveSpentProof);
  265. }
  266. Ok(())
  267. }
  268. async fn get_proof_ys_by_quote_id(
  269. &self,
  270. quote_id: &QuoteId,
  271. ) -> Result<Vec<PublicKey>, Self::Err> {
  272. Ok(query(
  273. r#"
  274. SELECT
  275. amount,
  276. keyset_id,
  277. secret,
  278. c,
  279. witness
  280. FROM
  281. proof
  282. WHERE
  283. quote_id = :quote_id
  284. "#,
  285. )?
  286. .bind("quote_id", quote_id.to_string())
  287. .fetch_all(&self.inner)
  288. .await?
  289. .into_iter()
  290. .map(sql_row_to_proof)
  291. .collect::<Result<Vec<Proof>, _>>()?
  292. .ys()?)
  293. }
  294. }
  295. #[async_trait]
  296. impl<RM> database::MintTransaction<'_, Error> for SQLTransaction<RM> where RM: DatabasePool + 'static
  297. {}
  298. #[async_trait]
  299. impl<RM> DbTransactionFinalizer for SQLTransaction<RM>
  300. where
  301. RM: DatabasePool + 'static,
  302. {
  303. type Err = Error;
  304. async fn commit(self: Box<Self>) -> Result<(), Error> {
  305. let result = self.inner.commit().await;
  306. #[cfg(feature = "prometheus")]
  307. {
  308. let success = result.is_ok();
  309. METRICS.record_mint_operation("transaction_commit", success);
  310. METRICS.record_mint_operation_histogram("transaction_commit", success, 1.0);
  311. }
  312. Ok(result?)
  313. }
  314. async fn rollback(self: Box<Self>) -> Result<(), Error> {
  315. let result = self.inner.rollback().await;
  316. #[cfg(feature = "prometheus")]
  317. {
  318. let success = result.is_ok();
  319. METRICS.record_mint_operation("transaction_rollback", success);
  320. METRICS.record_mint_operation_histogram("transaction_rollback", success, 1.0);
  321. }
  322. Ok(result?)
  323. }
  324. }
  325. #[inline(always)]
  326. async fn get_mint_quote_payments<C>(
  327. conn: &C,
  328. quote_id: &QuoteId,
  329. ) -> Result<Vec<IncomingPayment>, Error>
  330. where
  331. C: DatabaseExecutor + Send + Sync,
  332. {
  333. // Get payment IDs and timestamps from the mint_quote_payments table
  334. query(
  335. r#"
  336. SELECT
  337. payment_id,
  338. timestamp,
  339. amount
  340. FROM
  341. mint_quote_payments
  342. WHERE
  343. quote_id=:quote_id
  344. "#,
  345. )?
  346. .bind("quote_id", quote_id.to_string())
  347. .fetch_all(conn)
  348. .await?
  349. .into_iter()
  350. .map(|row| {
  351. let amount: u64 = column_as_number!(row[2].clone());
  352. let time: u64 = column_as_number!(row[1].clone());
  353. Ok(IncomingPayment::new(
  354. amount.into(),
  355. column_as_string!(&row[0]),
  356. time,
  357. ))
  358. })
  359. .collect()
  360. }
  361. #[inline(always)]
  362. async fn get_mint_quote_issuance<C>(conn: &C, quote_id: &QuoteId) -> Result<Vec<Issuance>, Error>
  363. where
  364. C: DatabaseExecutor + Send + Sync,
  365. {
  366. // Get payment IDs and timestamps from the mint_quote_payments table
  367. query(
  368. r#"
  369. SELECT amount, timestamp
  370. FROM mint_quote_issued
  371. WHERE quote_id=:quote_id
  372. "#,
  373. )?
  374. .bind("quote_id", quote_id.to_string())
  375. .fetch_all(conn)
  376. .await?
  377. .into_iter()
  378. .map(|row| {
  379. let time: u64 = column_as_number!(row[1].clone());
  380. Ok(Issuance::new(
  381. Amount::from_i64(column_as_number!(row[0].clone()))
  382. .expect("Is amount when put into db"),
  383. time,
  384. ))
  385. })
  386. .collect()
  387. }
  388. // Inline helper functions that work with both connections and transactions
  389. #[inline]
  390. async fn get_mint_quote_inner<T>(
  391. executor: &T,
  392. quote_id: &QuoteId,
  393. for_update: bool,
  394. ) -> Result<Option<MintQuote>, Error>
  395. where
  396. T: DatabaseExecutor,
  397. {
  398. let payments = get_mint_quote_payments(executor, quote_id).await?;
  399. let issuance = get_mint_quote_issuance(executor, quote_id).await?;
  400. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  401. let query_str = format!(
  402. r#"
  403. SELECT
  404. id,
  405. amount,
  406. unit,
  407. request,
  408. expiry,
  409. request_lookup_id,
  410. pubkey,
  411. created_time,
  412. amount_paid,
  413. amount_issued,
  414. payment_method,
  415. request_lookup_id_kind
  416. FROM
  417. mint_quote
  418. WHERE id = :id
  419. {for_update_clause}
  420. "#
  421. );
  422. query(&query_str)?
  423. .bind("id", quote_id.to_string())
  424. .fetch_one(executor)
  425. .await?
  426. .map(|row| sql_row_to_mint_quote(row, payments, issuance))
  427. .transpose()
  428. }
  429. #[inline]
  430. async fn get_mint_quote_by_request_inner<T>(
  431. executor: &T,
  432. request: &str,
  433. for_update: bool,
  434. ) -> Result<Option<MintQuote>, Error>
  435. where
  436. T: DatabaseExecutor,
  437. {
  438. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  439. let query_str = format!(
  440. r#"
  441. SELECT
  442. id,
  443. amount,
  444. unit,
  445. request,
  446. expiry,
  447. request_lookup_id,
  448. pubkey,
  449. created_time,
  450. amount_paid,
  451. amount_issued,
  452. payment_method,
  453. request_lookup_id_kind
  454. FROM
  455. mint_quote
  456. WHERE request = :request
  457. {for_update_clause}
  458. "#
  459. );
  460. let mut mint_quote = query(&query_str)?
  461. .bind("request", request.to_string())
  462. .fetch_one(executor)
  463. .await?
  464. .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
  465. .transpose()?;
  466. if let Some(quote) = mint_quote.as_mut() {
  467. let payments = get_mint_quote_payments(executor, &quote.id).await?;
  468. let issuance = get_mint_quote_issuance(executor, &quote.id).await?;
  469. quote.issuance = issuance;
  470. quote.payments = payments;
  471. }
  472. Ok(mint_quote)
  473. }
  474. #[inline]
  475. async fn get_mint_quote_by_request_lookup_id_inner<T>(
  476. executor: &T,
  477. request_lookup_id: &PaymentIdentifier,
  478. for_update: bool,
  479. ) -> Result<Option<MintQuote>, Error>
  480. where
  481. T: DatabaseExecutor,
  482. {
  483. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  484. let query_str = format!(
  485. r#"
  486. SELECT
  487. id,
  488. amount,
  489. unit,
  490. request,
  491. expiry,
  492. request_lookup_id,
  493. pubkey,
  494. created_time,
  495. amount_paid,
  496. amount_issued,
  497. payment_method,
  498. request_lookup_id_kind
  499. FROM
  500. mint_quote
  501. WHERE request_lookup_id = :request_lookup_id
  502. AND request_lookup_id_kind = :request_lookup_id_kind
  503. {for_update_clause}
  504. "#
  505. );
  506. let mut mint_quote = query(&query_str)?
  507. .bind("request_lookup_id", request_lookup_id.to_string())
  508. .bind("request_lookup_id_kind", request_lookup_id.kind())
  509. .fetch_one(executor)
  510. .await?
  511. .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
  512. .transpose()?;
  513. if let Some(quote) = mint_quote.as_mut() {
  514. let payments = get_mint_quote_payments(executor, &quote.id).await?;
  515. let issuance = get_mint_quote_issuance(executor, &quote.id).await?;
  516. quote.issuance = issuance;
  517. quote.payments = payments;
  518. }
  519. Ok(mint_quote)
  520. }
  521. #[inline]
  522. async fn get_melt_quote_inner<T>(
  523. executor: &T,
  524. quote_id: &QuoteId,
  525. for_update: bool,
  526. ) -> Result<Option<mint::MeltQuote>, Error>
  527. where
  528. T: DatabaseExecutor,
  529. {
  530. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  531. let query_str = format!(
  532. r#"
  533. SELECT
  534. id,
  535. unit,
  536. amount,
  537. request,
  538. fee_reserve,
  539. expiry,
  540. state,
  541. payment_preimage,
  542. request_lookup_id,
  543. created_time,
  544. paid_time,
  545. payment_method,
  546. options,
  547. request_lookup_id_kind
  548. FROM
  549. melt_quote
  550. WHERE
  551. id=:id
  552. {for_update_clause}
  553. "#
  554. );
  555. query(&query_str)?
  556. .bind("id", quote_id.to_string())
  557. .fetch_one(executor)
  558. .await?
  559. .map(sql_row_to_melt_quote)
  560. .transpose()
  561. }
  562. #[async_trait]
  563. impl<RM> MintKeyDatabaseTransaction<'_, Error> for SQLTransaction<RM>
  564. where
  565. RM: DatabasePool + 'static,
  566. {
  567. async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error> {
  568. query(
  569. r#"
  570. INSERT INTO
  571. keyset (
  572. id, unit, active, valid_from, valid_to, derivation_path,
  573. amounts, input_fee_ppk, derivation_path_index
  574. )
  575. VALUES (
  576. :id, :unit, :active, :valid_from, :valid_to, :derivation_path,
  577. :amounts, :input_fee_ppk, :derivation_path_index
  578. )
  579. ON CONFLICT(id) DO UPDATE SET
  580. unit = excluded.unit,
  581. active = excluded.active,
  582. valid_from = excluded.valid_from,
  583. valid_to = excluded.valid_to,
  584. derivation_path = excluded.derivation_path,
  585. amounts = excluded.amounts,
  586. input_fee_ppk = excluded.input_fee_ppk,
  587. derivation_path_index = excluded.derivation_path_index
  588. "#,
  589. )?
  590. .bind("id", keyset.id.to_string())
  591. .bind("unit", keyset.unit.to_string())
  592. .bind("active", keyset.active)
  593. .bind("valid_from", keyset.valid_from as i64)
  594. .bind("valid_to", keyset.final_expiry.map(|v| v as i64))
  595. .bind("derivation_path", keyset.derivation_path.to_string())
  596. .bind("amounts", serde_json::to_string(&keyset.amounts).ok())
  597. .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
  598. .bind("derivation_path_index", keyset.derivation_path_index)
  599. .execute(&self.inner)
  600. .await?;
  601. Ok(())
  602. }
  603. async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error> {
  604. query(r#"UPDATE keyset SET active=FALSE WHERE unit = :unit"#)?
  605. .bind("unit", unit.to_string())
  606. .execute(&self.inner)
  607. .await?;
  608. query(r#"UPDATE keyset SET active=TRUE WHERE unit = :unit AND id = :id"#)?
  609. .bind("unit", unit.to_string())
  610. .bind("id", id.to_string())
  611. .execute(&self.inner)
  612. .await?;
  613. Ok(())
  614. }
  615. }
  616. #[async_trait]
  617. impl<RM> MintKeysDatabase for SQLMintDatabase<RM>
  618. where
  619. RM: DatabasePool + 'static,
  620. {
  621. type Err = Error;
  622. async fn begin_transaction<'a>(
  623. &'a self,
  624. ) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
  625. let tx = SQLTransaction {
  626. inner: ConnectionWithTransaction::new(
  627. self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
  628. )
  629. .await?,
  630. };
  631. Ok(Box::new(tx))
  632. }
  633. async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
  634. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  635. Ok(
  636. query(r#" SELECT id FROM keyset WHERE active = :active AND unit = :unit"#)?
  637. .bind("active", true)
  638. .bind("unit", unit.to_string())
  639. .pluck(&*conn)
  640. .await?
  641. .map(|id| match id {
  642. Column::Text(text) => Ok(Id::from_str(&text)?),
  643. Column::Blob(id) => Ok(Id::from_bytes(&id)?),
  644. _ => Err(Error::InvalidKeysetId),
  645. })
  646. .transpose()?,
  647. )
  648. }
  649. async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
  650. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  651. Ok(
  652. query(r#"SELECT id, unit FROM keyset WHERE active = :active"#)?
  653. .bind("active", true)
  654. .fetch_all(&*conn)
  655. .await?
  656. .into_iter()
  657. .map(|row| {
  658. Ok((
  659. column_as_string!(&row[1], CurrencyUnit::from_str),
  660. column_as_string!(&row[0], Id::from_str, Id::from_bytes),
  661. ))
  662. })
  663. .collect::<Result<HashMap<_, _>, Error>>()?,
  664. )
  665. }
  666. async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
  667. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  668. Ok(query(
  669. r#"SELECT
  670. id,
  671. unit,
  672. active,
  673. valid_from,
  674. valid_to,
  675. derivation_path,
  676. derivation_path_index,
  677. amounts,
  678. input_fee_ppk
  679. FROM
  680. keyset
  681. WHERE id=:id"#,
  682. )?
  683. .bind("id", id.to_string())
  684. .fetch_one(&*conn)
  685. .await?
  686. .map(sql_row_to_keyset_info)
  687. .transpose()?)
  688. }
  689. async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
  690. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  691. Ok(query(
  692. r#"SELECT
  693. id,
  694. unit,
  695. active,
  696. valid_from,
  697. valid_to,
  698. derivation_path,
  699. derivation_path_index,
  700. amounts,
  701. input_fee_ppk
  702. FROM
  703. keyset
  704. "#,
  705. )?
  706. .fetch_all(&*conn)
  707. .await?
  708. .into_iter()
  709. .map(sql_row_to_keyset_info)
  710. .collect::<Result<Vec<_>, _>>()?)
  711. }
  712. }
  713. #[async_trait]
  714. impl<RM> MintQuotesTransaction<'_> for SQLTransaction<RM>
  715. where
  716. RM: DatabasePool + 'static,
  717. {
  718. type Err = Error;
  719. async fn add_melt_request(
  720. &mut self,
  721. quote_id: &QuoteId,
  722. inputs_amount: Amount,
  723. inputs_fee: Amount,
  724. ) -> Result<(), Self::Err> {
  725. // Insert melt_request
  726. query(
  727. r#"
  728. INSERT INTO melt_request
  729. (quote_id, inputs_amount, inputs_fee)
  730. VALUES
  731. (:quote_id, :inputs_amount, :inputs_fee)
  732. "#,
  733. )?
  734. .bind("quote_id", quote_id.to_string())
  735. .bind("inputs_amount", inputs_amount.to_i64())
  736. .bind("inputs_fee", inputs_fee.to_i64())
  737. .execute(&self.inner)
  738. .await?;
  739. Ok(())
  740. }
  741. async fn add_blinded_messages(
  742. &mut self,
  743. quote_id: Option<&QuoteId>,
  744. blinded_messages: &[BlindedMessage],
  745. operation: &Operation,
  746. ) -> Result<(), Self::Err> {
  747. let current_time = unix_time();
  748. // Insert blinded_messages directly into blind_signature with c = NULL
  749. // Let the database constraint handle duplicate detection
  750. for message in blinded_messages {
  751. match query(
  752. r#"
  753. INSERT INTO blind_signature
  754. (blinded_message, amount, keyset_id, c, quote_id, created_time, operation_kind, operation_id)
  755. VALUES
  756. (:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time, :operation_kind, :operation_id)
  757. "#,
  758. )?
  759. .bind(
  760. "blinded_message",
  761. message.blinded_secret.to_bytes().to_vec(),
  762. )
  763. .bind("amount", message.amount.to_i64())
  764. .bind("keyset_id", message.keyset_id.to_string())
  765. .bind("quote_id", quote_id.map(|q| q.to_string()))
  766. .bind("created_time", current_time as i64)
  767. .bind("operation_kind", operation.kind())
  768. .bind("operation_id", operation.id().to_string())
  769. .execute(&self.inner)
  770. .await
  771. {
  772. Ok(_) => continue,
  773. Err(database::Error::Duplicate) => {
  774. // Primary key constraint violation - blinded message already exists
  775. // This could be either:
  776. // 1. Already signed (c IS NOT NULL) - definitely an error
  777. // 2. Already pending (c IS NULL) - also an error
  778. return Err(database::Error::Duplicate);
  779. }
  780. Err(err) => return Err(err),
  781. }
  782. }
  783. Ok(())
  784. }
  785. async fn delete_blinded_messages(
  786. &mut self,
  787. blinded_secrets: &[PublicKey],
  788. ) -> Result<(), Self::Err> {
  789. if blinded_secrets.is_empty() {
  790. return Ok(());
  791. }
  792. // Delete blinded messages from blind_signature table where c IS NULL
  793. // (only delete unsigned blinded messages)
  794. query(
  795. r#"
  796. DELETE FROM blind_signature
  797. WHERE blinded_message IN (:blinded_secrets) AND c IS NULL
  798. "#,
  799. )?
  800. .bind_vec(
  801. "blinded_secrets",
  802. blinded_secrets
  803. .iter()
  804. .map(|secret| secret.to_bytes().to_vec())
  805. .collect(),
  806. )
  807. .execute(&self.inner)
  808. .await?;
  809. Ok(())
  810. }
  811. async fn get_melt_request_and_blinded_messages(
  812. &mut self,
  813. quote_id: &QuoteId,
  814. ) -> Result<Option<database::mint::MeltRequestInfo>, Self::Err> {
  815. let melt_request_row = query(
  816. r#"
  817. SELECT inputs_amount, inputs_fee
  818. FROM melt_request
  819. WHERE quote_id = :quote_id
  820. FOR UPDATE
  821. "#,
  822. )?
  823. .bind("quote_id", quote_id.to_string())
  824. .fetch_one(&self.inner)
  825. .await?;
  826. if let Some(row) = melt_request_row {
  827. let inputs_amount: u64 = column_as_number!(row[0].clone());
  828. let inputs_fee: u64 = column_as_number!(row[1].clone());
  829. // Get blinded messages from blind_signature table where c IS NULL
  830. let blinded_messages_rows = query(
  831. r#"
  832. SELECT blinded_message, keyset_id, amount
  833. FROM blind_signature
  834. WHERE quote_id = :quote_id AND c IS NULL
  835. "#,
  836. )?
  837. .bind("quote_id", quote_id.to_string())
  838. .fetch_all(&self.inner)
  839. .await?;
  840. let blinded_messages: Result<Vec<BlindedMessage>, Error> = blinded_messages_rows
  841. .into_iter()
  842. .map(|row| -> Result<BlindedMessage, Error> {
  843. let blinded_message_key =
  844. column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice);
  845. let keyset_id = column_as_string!(&row[1], Id::from_str, Id::from_bytes);
  846. let amount: u64 = column_as_number!(row[2].clone());
  847. Ok(BlindedMessage {
  848. blinded_secret: blinded_message_key,
  849. keyset_id,
  850. amount: Amount::from(amount),
  851. witness: None, // Not storing witness in database currently
  852. })
  853. })
  854. .collect();
  855. let blinded_messages = blinded_messages?;
  856. Ok(Some(database::mint::MeltRequestInfo {
  857. inputs_amount: Amount::from(inputs_amount),
  858. inputs_fee: Amount::from(inputs_fee),
  859. change_outputs: blinded_messages,
  860. }))
  861. } else {
  862. Ok(None)
  863. }
  864. }
  865. async fn delete_melt_request(&mut self, quote_id: &QuoteId) -> Result<(), Self::Err> {
  866. // Delete from melt_request table
  867. query(
  868. r#"
  869. DELETE FROM melt_request
  870. WHERE quote_id = :quote_id
  871. "#,
  872. )?
  873. .bind("quote_id", quote_id.to_string())
  874. .execute(&self.inner)
  875. .await?;
  876. // Also delete blinded messages (where c IS NULL) from blind_signature table
  877. query(
  878. r#"
  879. DELETE FROM blind_signature
  880. WHERE quote_id = :quote_id AND c IS NULL
  881. "#,
  882. )?
  883. .bind("quote_id", quote_id.to_string())
  884. .execute(&self.inner)
  885. .await?;
  886. Ok(())
  887. }
  888. #[instrument(skip(self))]
  889. async fn increment_mint_quote_amount_paid(
  890. &mut self,
  891. quote_id: &QuoteId,
  892. amount_paid: Amount,
  893. payment_id: String,
  894. ) -> Result<Amount, Self::Err> {
  895. if amount_paid == Amount::ZERO {
  896. tracing::warn!("Amount payments of zero amount should not be recorded.");
  897. return Err(Error::Duplicate);
  898. }
  899. // Check if payment_id already exists in mint_quote_payments
  900. let exists = query(
  901. r#"
  902. SELECT payment_id
  903. FROM mint_quote_payments
  904. WHERE payment_id = :payment_id
  905. FOR UPDATE
  906. "#,
  907. )?
  908. .bind("payment_id", payment_id.clone())
  909. .fetch_one(&self.inner)
  910. .await?;
  911. if exists.is_some() {
  912. tracing::error!("Payment ID already exists: {}", payment_id);
  913. return Err(database::Error::Duplicate);
  914. }
  915. // Get current amount_paid from quote
  916. let current_amount = query(
  917. r#"
  918. SELECT amount_paid
  919. FROM mint_quote
  920. WHERE id = :quote_id
  921. FOR UPDATE
  922. "#,
  923. )?
  924. .bind("quote_id", quote_id.to_string())
  925. .fetch_one(&self.inner)
  926. .await
  927. .inspect_err(|err| {
  928. tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
  929. })?;
  930. let current_amount_paid = if let Some(current_amount) = current_amount {
  931. let amount: u64 = column_as_number!(current_amount[0].clone());
  932. Amount::from(amount)
  933. } else {
  934. Amount::ZERO
  935. };
  936. // Calculate new amount_paid with overflow check
  937. let new_amount_paid = current_amount_paid
  938. .checked_add(amount_paid)
  939. .ok_or_else(|| database::Error::AmountOverflow)?;
  940. tracing::debug!(
  941. "Mint quote {} amount paid was {} is now {}.",
  942. quote_id,
  943. current_amount_paid,
  944. new_amount_paid
  945. );
  946. // Update the amount_paid
  947. query(
  948. r#"
  949. UPDATE mint_quote
  950. SET amount_paid = :amount_paid
  951. WHERE id = :quote_id
  952. "#,
  953. )?
  954. .bind("amount_paid", new_amount_paid.to_i64())
  955. .bind("quote_id", quote_id.to_string())
  956. .execute(&self.inner)
  957. .await
  958. .inspect_err(|err| {
  959. tracing::error!("SQLite could not update mint quote amount_paid: {}", err);
  960. })?;
  961. // Add payment_id to mint_quote_payments table
  962. query(
  963. r#"
  964. INSERT INTO mint_quote_payments
  965. (quote_id, payment_id, amount, timestamp)
  966. VALUES (:quote_id, :payment_id, :amount, :timestamp)
  967. "#,
  968. )?
  969. .bind("quote_id", quote_id.to_string())
  970. .bind("payment_id", payment_id)
  971. .bind("amount", amount_paid.to_i64())
  972. .bind("timestamp", unix_time() as i64)
  973. .execute(&self.inner)
  974. .await
  975. .map_err(|err| {
  976. tracing::error!("SQLite could not insert payment ID: {}", err);
  977. err
  978. })?;
  979. Ok(new_amount_paid)
  980. }
  981. #[instrument(skip_all)]
  982. async fn increment_mint_quote_amount_issued(
  983. &mut self,
  984. quote_id: &QuoteId,
  985. amount_issued: Amount,
  986. ) -> Result<Amount, Self::Err> {
  987. // Get current amount_issued from quote
  988. let current_amounts = query(
  989. r#"
  990. SELECT amount_issued, amount_paid
  991. FROM mint_quote
  992. WHERE id = :quote_id
  993. FOR UPDATE
  994. "#,
  995. )?
  996. .bind("quote_id", quote_id.to_string())
  997. .fetch_one(&self.inner)
  998. .await
  999. .inspect_err(|err| {
  1000. tracing::error!("SQLite could not get mint quote amount_issued: {}", err);
  1001. })?
  1002. .ok_or(Error::QuoteNotFound)?;
  1003. let new_amount_issued = {
  1004. // Make sure the db protects issuing not paid quotes
  1005. unpack_into!(
  1006. let (current_amount_issued, current_amount_paid) = current_amounts
  1007. );
  1008. let current_amount_issued: u64 = column_as_number!(current_amount_issued);
  1009. let current_amount_paid: u64 = column_as_number!(current_amount_paid);
  1010. let current_amount_issued = Amount::from(current_amount_issued);
  1011. let current_amount_paid = Amount::from(current_amount_paid);
  1012. // Calculate new amount_issued with overflow check
  1013. let new_amount_issued = current_amount_issued
  1014. .checked_add(amount_issued)
  1015. .ok_or_else(|| database::Error::AmountOverflow)?;
  1016. current_amount_paid
  1017. .checked_sub(new_amount_issued)
  1018. .ok_or(Error::Internal("Over-issued not allowed".to_owned()))?;
  1019. new_amount_issued
  1020. };
  1021. // Update the amount_issued
  1022. query(
  1023. r#"
  1024. UPDATE mint_quote
  1025. SET amount_issued = :amount_issued
  1026. WHERE id = :quote_id
  1027. "#,
  1028. )?
  1029. .bind("amount_issued", new_amount_issued.to_i64())
  1030. .bind("quote_id", quote_id.to_string())
  1031. .execute(&self.inner)
  1032. .await
  1033. .inspect_err(|err| {
  1034. tracing::error!("SQLite could not update mint quote amount_issued: {}", err);
  1035. })?;
  1036. let current_time = unix_time();
  1037. query(
  1038. r#"
  1039. INSERT INTO mint_quote_issued
  1040. (quote_id, amount, timestamp)
  1041. VALUES (:quote_id, :amount, :timestamp);
  1042. "#,
  1043. )?
  1044. .bind("quote_id", quote_id.to_string())
  1045. .bind("amount", amount_issued.to_i64())
  1046. .bind("timestamp", current_time as i64)
  1047. .execute(&self.inner)
  1048. .await?;
  1049. Ok(new_amount_issued)
  1050. }
  1051. #[instrument(skip_all)]
  1052. async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> {
  1053. query(
  1054. r#"
  1055. INSERT INTO mint_quote (
  1056. id, amount, unit, request, expiry, request_lookup_id, pubkey, created_time, payment_method, request_lookup_id_kind
  1057. )
  1058. VALUES (
  1059. :id, :amount, :unit, :request, :expiry, :request_lookup_id, :pubkey, :created_time, :payment_method, :request_lookup_id_kind
  1060. )
  1061. "#,
  1062. )?
  1063. .bind("id", quote.id.to_string())
  1064. .bind("amount", quote.amount.map(|a| a.to_i64()))
  1065. .bind("unit", quote.unit.to_string())
  1066. .bind("request", quote.request)
  1067. .bind("expiry", quote.expiry as i64)
  1068. .bind(
  1069. "request_lookup_id",
  1070. quote.request_lookup_id.to_string(),
  1071. )
  1072. .bind("pubkey", quote.pubkey.map(|p| p.to_string()))
  1073. .bind("created_time", quote.created_time as i64)
  1074. .bind("payment_method", quote.payment_method.to_string())
  1075. .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
  1076. .execute(&self.inner)
  1077. .await?;
  1078. Ok(())
  1079. }
  1080. async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
  1081. // Now insert the new quote
  1082. query(
  1083. r#"
  1084. INSERT INTO melt_quote
  1085. (
  1086. id, unit, amount, request, fee_reserve, state,
  1087. expiry, payment_preimage, request_lookup_id,
  1088. created_time, paid_time, options, request_lookup_id_kind, payment_method
  1089. )
  1090. VALUES
  1091. (
  1092. :id, :unit, :amount, :request, :fee_reserve, :state,
  1093. :expiry, :payment_preimage, :request_lookup_id,
  1094. :created_time, :paid_time, :options, :request_lookup_id_kind, :payment_method
  1095. )
  1096. "#,
  1097. )?
  1098. .bind("id", quote.id.to_string())
  1099. .bind("unit", quote.unit.to_string())
  1100. .bind("amount", quote.amount.to_i64())
  1101. .bind("request", serde_json::to_string(&quote.request)?)
  1102. .bind("fee_reserve", quote.fee_reserve.to_i64())
  1103. .bind("state", quote.state.to_string())
  1104. .bind("expiry", quote.expiry as i64)
  1105. .bind("payment_preimage", quote.payment_preimage)
  1106. .bind(
  1107. "request_lookup_id",
  1108. quote.request_lookup_id.as_ref().map(|id| id.to_string()),
  1109. )
  1110. .bind("created_time", quote.created_time as i64)
  1111. .bind("paid_time", quote.paid_time.map(|t| t as i64))
  1112. .bind(
  1113. "options",
  1114. quote.options.map(|o| serde_json::to_string(&o).ok()),
  1115. )
  1116. .bind(
  1117. "request_lookup_id_kind",
  1118. quote.request_lookup_id.map(|id| id.kind()),
  1119. )
  1120. .bind("payment_method", quote.payment_method.to_string())
  1121. .execute(&self.inner)
  1122. .await?;
  1123. Ok(())
  1124. }
  1125. async fn update_melt_quote_request_lookup_id(
  1126. &mut self,
  1127. quote_id: &QuoteId,
  1128. new_request_lookup_id: &PaymentIdentifier,
  1129. ) -> Result<(), Self::Err> {
  1130. query(r#"UPDATE melt_quote SET request_lookup_id = :new_req_id, request_lookup_id_kind = :new_kind WHERE id = :id"#)?
  1131. .bind("new_req_id", new_request_lookup_id.to_string())
  1132. .bind("new_kind",new_request_lookup_id.kind() )
  1133. .bind("id", quote_id.to_string())
  1134. .execute(&self.inner)
  1135. .await?;
  1136. Ok(())
  1137. }
  1138. async fn update_melt_quote_state(
  1139. &mut self,
  1140. quote_id: &QuoteId,
  1141. state: MeltQuoteState,
  1142. payment_proof: Option<String>,
  1143. ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
  1144. let mut quote = query(
  1145. r#"
  1146. SELECT
  1147. id,
  1148. unit,
  1149. amount,
  1150. request,
  1151. fee_reserve,
  1152. expiry,
  1153. state,
  1154. payment_preimage,
  1155. request_lookup_id,
  1156. created_time,
  1157. paid_time,
  1158. payment_method,
  1159. options,
  1160. request_lookup_id_kind
  1161. FROM
  1162. melt_quote
  1163. WHERE
  1164. id=:id
  1165. "#,
  1166. )?
  1167. .bind("id", quote_id.to_string())
  1168. .fetch_one(&self.inner)
  1169. .await?
  1170. .map(sql_row_to_melt_quote)
  1171. .transpose()?
  1172. .ok_or(Error::QuoteNotFound)?;
  1173. check_melt_quote_state_transition(quote.state, state)?;
  1174. // When transitioning to Pending, lock all quotes with the same lookup_id
  1175. // and check if any are already pending or paid
  1176. if state == MeltQuoteState::Pending {
  1177. if let Some(ref lookup_id) = quote.request_lookup_id {
  1178. // Lock all quotes with the same lookup_id to prevent race conditions
  1179. let locked_quotes: Vec<(String, String)> = query(
  1180. r#"
  1181. SELECT id, state
  1182. FROM melt_quote
  1183. WHERE request_lookup_id = :lookup_id
  1184. FOR UPDATE
  1185. "#,
  1186. )?
  1187. .bind("lookup_id", lookup_id.to_string())
  1188. .fetch_all(&self.inner)
  1189. .await?
  1190. .into_iter()
  1191. .map(|row| {
  1192. unpack_into!(let (id, state) = row);
  1193. Ok((column_as_string!(id), column_as_string!(state)))
  1194. })
  1195. .collect::<Result<Vec<_>, Error>>()?;
  1196. // Check if any other quote with the same lookup_id is pending or paid
  1197. let has_conflict = locked_quotes.iter().any(|(id, state)| {
  1198. id != &quote_id.to_string()
  1199. && (state == &MeltQuoteState::Pending.to_string()
  1200. || state == &MeltQuoteState::Paid.to_string())
  1201. });
  1202. if has_conflict {
  1203. tracing::warn!(
  1204. "Cannot transition quote {} to Pending: another quote with lookup_id {} is already pending or paid",
  1205. quote_id,
  1206. lookup_id
  1207. );
  1208. return Err(Error::Duplicate);
  1209. }
  1210. }
  1211. }
  1212. let rec = if state == MeltQuoteState::Paid {
  1213. let current_time = unix_time();
  1214. query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time, payment_preimage = :payment_preimage WHERE id = :id"#)?
  1215. .bind("state", state.to_string())
  1216. .bind("paid_time", current_time as i64)
  1217. .bind("payment_preimage", payment_proof)
  1218. .bind("id", quote_id.to_string())
  1219. .execute(&self.inner)
  1220. .await
  1221. } else {
  1222. query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#)?
  1223. .bind("state", state.to_string())
  1224. .bind("id", quote_id.to_string())
  1225. .execute(&self.inner)
  1226. .await
  1227. };
  1228. match rec {
  1229. Ok(_) => {}
  1230. Err(err) => {
  1231. tracing::error!("SQLite Could not update melt quote");
  1232. return Err(err);
  1233. }
  1234. };
  1235. let old_state = quote.state;
  1236. quote.state = state;
  1237. if state == MeltQuoteState::Unpaid || state == MeltQuoteState::Failed {
  1238. self.delete_melt_request(quote_id).await?;
  1239. }
  1240. Ok((old_state, quote))
  1241. }
  1242. async fn get_mint_quote(&mut self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
  1243. get_mint_quote_inner(&self.inner, quote_id, true).await
  1244. }
  1245. async fn get_melt_quote(
  1246. &mut self,
  1247. quote_id: &QuoteId,
  1248. ) -> Result<Option<mint::MeltQuote>, Self::Err> {
  1249. get_melt_quote_inner(&self.inner, quote_id, true).await
  1250. }
  1251. async fn get_mint_quote_by_request(
  1252. &mut self,
  1253. request: &str,
  1254. ) -> Result<Option<MintQuote>, Self::Err> {
  1255. get_mint_quote_by_request_inner(&self.inner, request, true).await
  1256. }
  1257. async fn get_mint_quote_by_request_lookup_id(
  1258. &mut self,
  1259. request_lookup_id: &PaymentIdentifier,
  1260. ) -> Result<Option<MintQuote>, Self::Err> {
  1261. get_mint_quote_by_request_lookup_id_inner(&self.inner, request_lookup_id, true).await
  1262. }
  1263. }
  1264. #[async_trait]
  1265. impl<RM> MintQuotesDatabase for SQLMintDatabase<RM>
  1266. where
  1267. RM: DatabasePool + 'static,
  1268. {
  1269. type Err = Error;
  1270. async fn get_mint_quote(&self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
  1271. #[cfg(feature = "prometheus")]
  1272. METRICS.inc_in_flight_requests("get_mint_quote");
  1273. #[cfg(feature = "prometheus")]
  1274. let start_time = std::time::Instant::now();
  1275. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1276. let result = get_mint_quote_inner(&*conn, quote_id, false).await;
  1277. #[cfg(feature = "prometheus")]
  1278. {
  1279. let success = result.is_ok();
  1280. METRICS.record_mint_operation("get_mint_quote", success);
  1281. METRICS.record_mint_operation_histogram(
  1282. "get_mint_quote",
  1283. success,
  1284. start_time.elapsed().as_secs_f64(),
  1285. );
  1286. METRICS.dec_in_flight_requests("get_mint_quote");
  1287. }
  1288. result
  1289. }
  1290. async fn get_mint_quote_by_request(
  1291. &self,
  1292. request: &str,
  1293. ) -> Result<Option<MintQuote>, Self::Err> {
  1294. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1295. get_mint_quote_by_request_inner(&*conn, request, false).await
  1296. }
  1297. async fn get_mint_quote_by_request_lookup_id(
  1298. &self,
  1299. request_lookup_id: &PaymentIdentifier,
  1300. ) -> Result<Option<MintQuote>, Self::Err> {
  1301. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1302. get_mint_quote_by_request_lookup_id_inner(&*conn, request_lookup_id, false).await
  1303. }
  1304. async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
  1305. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1306. let mut mint_quotes = query(
  1307. r#"
  1308. SELECT
  1309. id,
  1310. amount,
  1311. unit,
  1312. request,
  1313. expiry,
  1314. request_lookup_id,
  1315. pubkey,
  1316. created_time,
  1317. amount_paid,
  1318. amount_issued,
  1319. payment_method,
  1320. request_lookup_id_kind
  1321. FROM
  1322. mint_quote
  1323. "#,
  1324. )?
  1325. .fetch_all(&*conn)
  1326. .await?
  1327. .into_iter()
  1328. .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
  1329. .collect::<Result<Vec<_>, _>>()?;
  1330. for quote in mint_quotes.as_mut_slice() {
  1331. let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
  1332. let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
  1333. quote.issuance = issuance;
  1334. quote.payments = payments;
  1335. }
  1336. Ok(mint_quotes)
  1337. }
  1338. async fn get_melt_quote(
  1339. &self,
  1340. quote_id: &QuoteId,
  1341. ) -> Result<Option<mint::MeltQuote>, Self::Err> {
  1342. #[cfg(feature = "prometheus")]
  1343. METRICS.inc_in_flight_requests("get_melt_quote");
  1344. #[cfg(feature = "prometheus")]
  1345. let start_time = std::time::Instant::now();
  1346. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1347. let result = get_melt_quote_inner(&*conn, quote_id, false).await;
  1348. #[cfg(feature = "prometheus")]
  1349. {
  1350. let success = result.is_ok();
  1351. METRICS.record_mint_operation("get_melt_quote", success);
  1352. METRICS.record_mint_operation_histogram(
  1353. "get_melt_quote",
  1354. success,
  1355. start_time.elapsed().as_secs_f64(),
  1356. );
  1357. METRICS.dec_in_flight_requests("get_melt_quote");
  1358. }
  1359. result
  1360. }
  1361. async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
  1362. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1363. Ok(query(
  1364. r#"
  1365. SELECT
  1366. id,
  1367. unit,
  1368. amount,
  1369. request,
  1370. fee_reserve,
  1371. expiry,
  1372. state,
  1373. payment_preimage,
  1374. request_lookup_id,
  1375. created_time,
  1376. paid_time,
  1377. payment_method,
  1378. options,
  1379. request_lookup_id_kind
  1380. FROM
  1381. melt_quote
  1382. "#,
  1383. )?
  1384. .fetch_all(&*conn)
  1385. .await?
  1386. .into_iter()
  1387. .map(sql_row_to_melt_quote)
  1388. .collect::<Result<Vec<_>, _>>()?)
  1389. }
  1390. }
  1391. #[async_trait]
  1392. impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
  1393. where
  1394. RM: DatabasePool + 'static,
  1395. {
  1396. type Err = Error;
  1397. async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
  1398. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1399. let mut proofs = query(
  1400. r#"
  1401. SELECT
  1402. amount,
  1403. keyset_id,
  1404. secret,
  1405. c,
  1406. witness,
  1407. y
  1408. FROM
  1409. proof
  1410. WHERE
  1411. y IN (:ys)
  1412. "#,
  1413. )?
  1414. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  1415. .fetch_all(&*conn)
  1416. .await?
  1417. .into_iter()
  1418. .map(|mut row| {
  1419. Ok((
  1420. column_as_string!(
  1421. row.pop().ok_or(Error::InvalidDbResponse)?,
  1422. PublicKey::from_hex,
  1423. PublicKey::from_slice
  1424. ),
  1425. sql_row_to_proof(row)?,
  1426. ))
  1427. })
  1428. .collect::<Result<HashMap<_, _>, Error>>()?;
  1429. Ok(ys.iter().map(|y| proofs.remove(y)).collect())
  1430. }
  1431. async fn get_proof_ys_by_quote_id(
  1432. &self,
  1433. quote_id: &QuoteId,
  1434. ) -> Result<Vec<PublicKey>, Self::Err> {
  1435. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1436. Ok(query(
  1437. r#"
  1438. SELECT
  1439. amount,
  1440. keyset_id,
  1441. secret,
  1442. c,
  1443. witness
  1444. FROM
  1445. proof
  1446. WHERE
  1447. quote_id = :quote_id
  1448. "#,
  1449. )?
  1450. .bind("quote_id", quote_id.to_string())
  1451. .fetch_all(&*conn)
  1452. .await?
  1453. .into_iter()
  1454. .map(sql_row_to_proof)
  1455. .collect::<Result<Vec<Proof>, _>>()?
  1456. .ys()?)
  1457. }
  1458. async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
  1459. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1460. let mut current_states = get_current_states(&*conn, ys).await?;
  1461. Ok(ys.iter().map(|y| current_states.remove(y)).collect())
  1462. }
  1463. async fn get_proofs_by_keyset_id(
  1464. &self,
  1465. keyset_id: &Id,
  1466. ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
  1467. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1468. Ok(query(
  1469. r#"
  1470. SELECT
  1471. keyset_id,
  1472. amount,
  1473. secret,
  1474. c,
  1475. witness,
  1476. state
  1477. FROM
  1478. proof
  1479. WHERE
  1480. keyset_id=:keyset_id
  1481. "#,
  1482. )?
  1483. .bind("keyset_id", keyset_id.to_string())
  1484. .fetch_all(&*conn)
  1485. .await?
  1486. .into_iter()
  1487. .map(sql_row_to_proof_with_state)
  1488. .collect::<Result<Vec<_>, _>>()?
  1489. .into_iter()
  1490. .unzip())
  1491. }
  1492. /// Get total proofs redeemed by keyset id
  1493. async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
  1494. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1495. query(
  1496. r#"
  1497. SELECT
  1498. keyset_id,
  1499. total_redeemed as amount
  1500. FROM
  1501. keyset_amounts
  1502. "#,
  1503. )?
  1504. .fetch_all(&*conn)
  1505. .await?
  1506. .into_iter()
  1507. .map(sql_row_to_hashmap_amount)
  1508. .collect()
  1509. }
  1510. }
  1511. #[async_trait]
  1512. impl<RM> MintSignatureTransaction<'_> for SQLTransaction<RM>
  1513. where
  1514. RM: DatabasePool + 'static,
  1515. {
  1516. type Err = Error;
  1517. async fn add_blind_signatures(
  1518. &mut self,
  1519. blinded_messages: &[PublicKey],
  1520. blind_signatures: &[BlindSignature],
  1521. quote_id: Option<QuoteId>,
  1522. ) -> Result<(), Self::Err> {
  1523. let current_time = unix_time();
  1524. if blinded_messages.len() != blind_signatures.len() {
  1525. return Err(database::Error::Internal(
  1526. "Mismatched array lengths for blinded messages and blind signatures".to_string(),
  1527. ));
  1528. }
  1529. // Select all existing rows for the given blinded messages at once
  1530. let mut existing_rows = query(
  1531. r#"
  1532. SELECT blinded_message, c, dleq_e, dleq_s
  1533. FROM blind_signature
  1534. WHERE blinded_message IN (:blinded_messages)
  1535. FOR UPDATE
  1536. "#,
  1537. )?
  1538. .bind_vec(
  1539. "blinded_messages",
  1540. blinded_messages
  1541. .iter()
  1542. .map(|message| message.to_bytes().to_vec())
  1543. .collect(),
  1544. )
  1545. .fetch_all(&self.inner)
  1546. .await?
  1547. .into_iter()
  1548. .map(|mut row| {
  1549. Ok((
  1550. column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
  1551. (row[0].clone(), row[1].clone(), row[2].clone()),
  1552. ))
  1553. })
  1554. .collect::<Result<HashMap<_, _>, Error>>()?;
  1555. // Iterate over the provided blinded messages and signatures
  1556. for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
  1557. match existing_rows.remove(message) {
  1558. None => {
  1559. // Unknown blind message: Insert new row with all columns
  1560. query(
  1561. r#"
  1562. INSERT INTO blind_signature
  1563. (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
  1564. VALUES
  1565. (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
  1566. "#,
  1567. )?
  1568. .bind("blinded_message", message.to_bytes().to_vec())
  1569. .bind("amount", u64::from(signature.amount) as i64)
  1570. .bind("keyset_id", signature.keyset_id.to_string())
  1571. .bind("c", signature.c.to_bytes().to_vec())
  1572. .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
  1573. .bind(
  1574. "dleq_e",
  1575. signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
  1576. )
  1577. .bind(
  1578. "dleq_s",
  1579. signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
  1580. )
  1581. .bind("created_time", current_time as i64)
  1582. .bind("signed_time", current_time as i64)
  1583. .execute(&self.inner)
  1584. .await?;
  1585. query(
  1586. r#"
  1587. INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
  1588. VALUES (:keyset_id, :amount, 0)
  1589. ON CONFLICT (keyset_id)
  1590. DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
  1591. "#,
  1592. )?
  1593. .bind("amount", u64::from(signature.amount) as i64)
  1594. .bind("keyset_id", signature.keyset_id.to_string())
  1595. .execute(&self.inner)
  1596. .await?;
  1597. }
  1598. Some((c, _dleq_e, _dleq_s)) => {
  1599. // Blind message exists: check if c is NULL
  1600. match c {
  1601. Column::Null => {
  1602. // Blind message with no c: Update with missing columns c, dleq_e, dleq_s
  1603. query(
  1604. r#"
  1605. UPDATE blind_signature
  1606. SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
  1607. WHERE blinded_message = :blinded_message
  1608. "#,
  1609. )?
  1610. .bind("c", signature.c.to_bytes().to_vec())
  1611. .bind(
  1612. "dleq_e",
  1613. signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
  1614. )
  1615. .bind(
  1616. "dleq_s",
  1617. signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
  1618. )
  1619. .bind("blinded_message", message.to_bytes().to_vec())
  1620. .bind("signed_time", current_time as i64)
  1621. .bind("amount", u64::from(signature.amount) as i64)
  1622. .execute(&self.inner)
  1623. .await?;
  1624. query(
  1625. r#"
  1626. INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
  1627. VALUES (:keyset_id, :amount, 0)
  1628. ON CONFLICT (keyset_id)
  1629. DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
  1630. "#,
  1631. )?
  1632. .bind("amount", u64::from(signature.amount) as i64)
  1633. .bind("keyset_id", signature.keyset_id.to_string())
  1634. .execute(&self.inner)
  1635. .await?;
  1636. }
  1637. _ => {
  1638. // Blind message already has c: Error
  1639. tracing::error!(
  1640. "Attempting to add signature to message already signed {}",
  1641. message
  1642. );
  1643. return Err(database::Error::Duplicate);
  1644. }
  1645. }
  1646. }
  1647. }
  1648. }
  1649. debug_assert!(
  1650. existing_rows.is_empty(),
  1651. "Unexpected existing rows remain: {:?}",
  1652. existing_rows.keys().collect::<Vec<_>>()
  1653. );
  1654. if !existing_rows.is_empty() {
  1655. tracing::error!("Did not check all existing rows");
  1656. return Err(Error::Internal(
  1657. "Did not check all existing rows".to_string(),
  1658. ));
  1659. }
  1660. Ok(())
  1661. }
  1662. async fn get_blind_signatures(
  1663. &mut self,
  1664. blinded_messages: &[PublicKey],
  1665. ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
  1666. let mut blinded_signatures = query(
  1667. r#"SELECT
  1668. keyset_id,
  1669. amount,
  1670. c,
  1671. dleq_e,
  1672. dleq_s,
  1673. blinded_message
  1674. FROM
  1675. blind_signature
  1676. WHERE blinded_message IN (:b) AND c IS NOT NULL
  1677. "#,
  1678. )?
  1679. .bind_vec(
  1680. "b",
  1681. blinded_messages
  1682. .iter()
  1683. .map(|b| b.to_bytes().to_vec())
  1684. .collect(),
  1685. )
  1686. .fetch_all(&self.inner)
  1687. .await?
  1688. .into_iter()
  1689. .map(|mut row| {
  1690. Ok((
  1691. column_as_string!(
  1692. &row.pop().ok_or(Error::InvalidDbResponse)?,
  1693. PublicKey::from_hex,
  1694. PublicKey::from_slice
  1695. ),
  1696. sql_row_to_blind_signature(row)?,
  1697. ))
  1698. })
  1699. .collect::<Result<HashMap<_, _>, Error>>()?;
  1700. Ok(blinded_messages
  1701. .iter()
  1702. .map(|y| blinded_signatures.remove(y))
  1703. .collect())
  1704. }
  1705. }
  1706. #[async_trait]
  1707. impl<RM> MintSignaturesDatabase for SQLMintDatabase<RM>
  1708. where
  1709. RM: DatabasePool + 'static,
  1710. {
  1711. type Err = Error;
  1712. async fn get_blind_signatures(
  1713. &self,
  1714. blinded_messages: &[PublicKey],
  1715. ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
  1716. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1717. let mut blinded_signatures = query(
  1718. r#"SELECT
  1719. keyset_id,
  1720. amount,
  1721. c,
  1722. dleq_e,
  1723. dleq_s,
  1724. blinded_message
  1725. FROM
  1726. blind_signature
  1727. WHERE blinded_message IN (:b) AND c IS NOT NULL
  1728. "#,
  1729. )?
  1730. .bind_vec(
  1731. "b",
  1732. blinded_messages
  1733. .iter()
  1734. .map(|b_| b_.to_bytes().to_vec())
  1735. .collect(),
  1736. )
  1737. .fetch_all(&*conn)
  1738. .await?
  1739. .into_iter()
  1740. .map(|mut row| {
  1741. Ok((
  1742. column_as_string!(
  1743. &row.pop().ok_or(Error::InvalidDbResponse)?,
  1744. PublicKey::from_hex,
  1745. PublicKey::from_slice
  1746. ),
  1747. sql_row_to_blind_signature(row)?,
  1748. ))
  1749. })
  1750. .collect::<Result<HashMap<_, _>, Error>>()?;
  1751. Ok(blinded_messages
  1752. .iter()
  1753. .map(|y| blinded_signatures.remove(y))
  1754. .collect())
  1755. }
  1756. async fn get_blind_signatures_for_keyset(
  1757. &self,
  1758. keyset_id: &Id,
  1759. ) -> Result<Vec<BlindSignature>, Self::Err> {
  1760. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1761. Ok(query(
  1762. r#"
  1763. SELECT
  1764. keyset_id,
  1765. amount,
  1766. c,
  1767. dleq_e,
  1768. dleq_s
  1769. FROM
  1770. blind_signature
  1771. WHERE
  1772. keyset_id=:keyset_id AND c IS NOT NULL
  1773. "#,
  1774. )?
  1775. .bind("keyset_id", keyset_id.to_string())
  1776. .fetch_all(&*conn)
  1777. .await?
  1778. .into_iter()
  1779. .map(sql_row_to_blind_signature)
  1780. .collect::<Result<Vec<BlindSignature>, _>>()?)
  1781. }
  1782. /// Get [`BlindSignature`]s for quote
  1783. async fn get_blind_signatures_for_quote(
  1784. &self,
  1785. quote_id: &QuoteId,
  1786. ) -> Result<Vec<BlindSignature>, Self::Err> {
  1787. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1788. Ok(query(
  1789. r#"
  1790. SELECT
  1791. keyset_id,
  1792. amount,
  1793. c,
  1794. dleq_e,
  1795. dleq_s
  1796. FROM
  1797. blind_signature
  1798. WHERE
  1799. quote_id=:quote_id AND c IS NOT NULL
  1800. "#,
  1801. )?
  1802. .bind("quote_id", quote_id.to_string())
  1803. .fetch_all(&*conn)
  1804. .await?
  1805. .into_iter()
  1806. .map(sql_row_to_blind_signature)
  1807. .collect::<Result<Vec<BlindSignature>, _>>()?)
  1808. }
  1809. /// Get total proofs redeemed by keyset id
  1810. async fn get_total_issued(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
  1811. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1812. query(
  1813. r#"
  1814. SELECT
  1815. keyset_id,
  1816. total_issued as amount
  1817. FROM
  1818. keyset_amounts
  1819. "#,
  1820. )?
  1821. .fetch_all(&*conn)
  1822. .await?
  1823. .into_iter()
  1824. .map(sql_row_to_hashmap_amount)
  1825. .collect()
  1826. }
  1827. }
  1828. #[async_trait]
  1829. impl<RM> database::MintKVStoreTransaction<'_, Error> for SQLTransaction<RM>
  1830. where
  1831. RM: DatabasePool + 'static,
  1832. {
  1833. async fn kv_read(
  1834. &mut self,
  1835. primary_namespace: &str,
  1836. secondary_namespace: &str,
  1837. key: &str,
  1838. ) -> Result<Option<Vec<u8>>, Error> {
  1839. // Validate parameters according to KV store requirements
  1840. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  1841. Ok(query(
  1842. r#"
  1843. SELECT value
  1844. FROM kv_store
  1845. WHERE primary_namespace = :primary_namespace
  1846. AND secondary_namespace = :secondary_namespace
  1847. AND key = :key
  1848. "#,
  1849. )?
  1850. .bind("primary_namespace", primary_namespace.to_owned())
  1851. .bind("secondary_namespace", secondary_namespace.to_owned())
  1852. .bind("key", key.to_owned())
  1853. .pluck(&self.inner)
  1854. .await?
  1855. .and_then(|col| match col {
  1856. Column::Blob(data) => Some(data),
  1857. _ => None,
  1858. }))
  1859. }
  1860. async fn kv_write(
  1861. &mut self,
  1862. primary_namespace: &str,
  1863. secondary_namespace: &str,
  1864. key: &str,
  1865. value: &[u8],
  1866. ) -> Result<(), Error> {
  1867. // Validate parameters according to KV store requirements
  1868. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  1869. let current_time = unix_time();
  1870. query(
  1871. r#"
  1872. INSERT INTO kv_store
  1873. (primary_namespace, secondary_namespace, key, value, created_time, updated_time)
  1874. VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
  1875. ON CONFLICT(primary_namespace, secondary_namespace, key)
  1876. DO UPDATE SET
  1877. value = excluded.value,
  1878. updated_time = excluded.updated_time
  1879. "#,
  1880. )?
  1881. .bind("primary_namespace", primary_namespace.to_owned())
  1882. .bind("secondary_namespace", secondary_namespace.to_owned())
  1883. .bind("key", key.to_owned())
  1884. .bind("value", value.to_vec())
  1885. .bind("created_time", current_time as i64)
  1886. .bind("updated_time", current_time as i64)
  1887. .execute(&self.inner)
  1888. .await?;
  1889. Ok(())
  1890. }
  1891. async fn kv_remove(
  1892. &mut self,
  1893. primary_namespace: &str,
  1894. secondary_namespace: &str,
  1895. key: &str,
  1896. ) -> Result<(), Error> {
  1897. // Validate parameters according to KV store requirements
  1898. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  1899. query(
  1900. r#"
  1901. DELETE FROM kv_store
  1902. WHERE primary_namespace = :primary_namespace
  1903. AND secondary_namespace = :secondary_namespace
  1904. AND key = :key
  1905. "#,
  1906. )?
  1907. .bind("primary_namespace", primary_namespace.to_owned())
  1908. .bind("secondary_namespace", secondary_namespace.to_owned())
  1909. .bind("key", key.to_owned())
  1910. .execute(&self.inner)
  1911. .await?;
  1912. Ok(())
  1913. }
  1914. async fn kv_list(
  1915. &mut self,
  1916. primary_namespace: &str,
  1917. secondary_namespace: &str,
  1918. ) -> Result<Vec<String>, Error> {
  1919. // Validate namespace parameters according to KV store requirements
  1920. cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
  1921. cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
  1922. // Check empty namespace rules
  1923. if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
  1924. return Err(Error::KVStoreInvalidKey(
  1925. "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
  1926. ));
  1927. }
  1928. Ok(query(
  1929. r#"
  1930. SELECT key
  1931. FROM kv_store
  1932. WHERE primary_namespace = :primary_namespace
  1933. AND secondary_namespace = :secondary_namespace
  1934. ORDER BY key
  1935. "#,
  1936. )?
  1937. .bind("primary_namespace", primary_namespace.to_owned())
  1938. .bind("secondary_namespace", secondary_namespace.to_owned())
  1939. .fetch_all(&self.inner)
  1940. .await?
  1941. .into_iter()
  1942. .map(|row| Ok(column_as_string!(&row[0])))
  1943. .collect::<Result<Vec<_>, Error>>()?)
  1944. }
  1945. }
  1946. #[async_trait]
  1947. impl<RM> database::MintKVStoreDatabase for SQLMintDatabase<RM>
  1948. where
  1949. RM: DatabasePool + 'static,
  1950. {
  1951. type Err = Error;
  1952. async fn kv_read(
  1953. &self,
  1954. primary_namespace: &str,
  1955. secondary_namespace: &str,
  1956. key: &str,
  1957. ) -> Result<Option<Vec<u8>>, Error> {
  1958. // Validate parameters according to KV store requirements
  1959. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  1960. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1961. Ok(query(
  1962. r#"
  1963. SELECT value
  1964. FROM kv_store
  1965. WHERE primary_namespace = :primary_namespace
  1966. AND secondary_namespace = :secondary_namespace
  1967. AND key = :key
  1968. "#,
  1969. )?
  1970. .bind("primary_namespace", primary_namespace.to_owned())
  1971. .bind("secondary_namespace", secondary_namespace.to_owned())
  1972. .bind("key", key.to_owned())
  1973. .pluck(&*conn)
  1974. .await?
  1975. .and_then(|col| match col {
  1976. Column::Blob(data) => Some(data),
  1977. _ => None,
  1978. }))
  1979. }
  1980. async fn kv_list(
  1981. &self,
  1982. primary_namespace: &str,
  1983. secondary_namespace: &str,
  1984. ) -> Result<Vec<String>, Error> {
  1985. // Validate namespace parameters according to KV store requirements
  1986. cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
  1987. cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
  1988. // Check empty namespace rules
  1989. if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
  1990. return Err(Error::KVStoreInvalidKey(
  1991. "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
  1992. ));
  1993. }
  1994. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1995. Ok(query(
  1996. r#"
  1997. SELECT key
  1998. FROM kv_store
  1999. WHERE primary_namespace = :primary_namespace
  2000. AND secondary_namespace = :secondary_namespace
  2001. ORDER BY key
  2002. "#,
  2003. )?
  2004. .bind("primary_namespace", primary_namespace.to_owned())
  2005. .bind("secondary_namespace", secondary_namespace.to_owned())
  2006. .fetch_all(&*conn)
  2007. .await?
  2008. .into_iter()
  2009. .map(|row| Ok(column_as_string!(&row[0])))
  2010. .collect::<Result<Vec<_>, Error>>()?)
  2011. }
  2012. }
  2013. #[async_trait]
  2014. impl<RM> database::MintKVStore for SQLMintDatabase<RM>
  2015. where
  2016. RM: DatabasePool + 'static,
  2017. {
  2018. async fn begin_transaction<'a>(
  2019. &'a self,
  2020. ) -> Result<Box<dyn database::MintKVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, Error>
  2021. {
  2022. Ok(Box::new(SQLTransaction {
  2023. inner: ConnectionWithTransaction::new(
  2024. self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
  2025. )
  2026. .await?,
  2027. }))
  2028. }
  2029. }
  2030. #[async_trait]
  2031. impl<RM> SagaTransaction<'_> for SQLTransaction<RM>
  2032. where
  2033. RM: DatabasePool + 'static,
  2034. {
  2035. type Err = Error;
  2036. async fn get_saga(
  2037. &mut self,
  2038. operation_id: &uuid::Uuid,
  2039. ) -> Result<Option<mint::Saga>, Self::Err> {
  2040. Ok(query(
  2041. r#"
  2042. SELECT
  2043. operation_id,
  2044. operation_kind,
  2045. state,
  2046. blinded_secrets,
  2047. input_ys,
  2048. quote_id,
  2049. created_at,
  2050. updated_at
  2051. FROM
  2052. saga_state
  2053. WHERE
  2054. operation_id = :operation_id
  2055. FOR UPDATE
  2056. "#,
  2057. )?
  2058. .bind("operation_id", operation_id.to_string())
  2059. .fetch_one(&self.inner)
  2060. .await?
  2061. .map(sql_row_to_saga)
  2062. .transpose()?)
  2063. }
  2064. async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
  2065. let current_time = unix_time();
  2066. let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
  2067. .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {e}")))?;
  2068. let input_ys_json = serde_json::to_string(&saga.input_ys)
  2069. .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {e}")))?;
  2070. query(
  2071. r#"
  2072. INSERT INTO saga_state
  2073. (operation_id, operation_kind, state, blinded_secrets, input_ys, quote_id, created_at, updated_at)
  2074. VALUES
  2075. (:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :quote_id, :created_at, :updated_at)
  2076. "#,
  2077. )?
  2078. .bind("operation_id", saga.operation_id.to_string())
  2079. .bind("operation_kind", saga.operation_kind.to_string())
  2080. .bind("state", saga.state.state())
  2081. .bind("blinded_secrets", blinded_secrets_json)
  2082. .bind("input_ys", input_ys_json)
  2083. .bind("quote_id", saga.quote_id.as_deref())
  2084. .bind("created_at", saga.created_at as i64)
  2085. .bind("updated_at", current_time as i64)
  2086. .execute(&self.inner)
  2087. .await?;
  2088. Ok(())
  2089. }
  2090. async fn update_saga(
  2091. &mut self,
  2092. operation_id: &uuid::Uuid,
  2093. new_state: mint::SagaStateEnum,
  2094. ) -> Result<(), Self::Err> {
  2095. let current_time = unix_time();
  2096. query(
  2097. r#"
  2098. UPDATE saga_state
  2099. SET state = :state, updated_at = :updated_at
  2100. WHERE operation_id = :operation_id
  2101. "#,
  2102. )?
  2103. .bind("state", new_state.state())
  2104. .bind("updated_at", current_time as i64)
  2105. .bind("operation_id", operation_id.to_string())
  2106. .execute(&self.inner)
  2107. .await?;
  2108. Ok(())
  2109. }
  2110. async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
  2111. query(
  2112. r#"
  2113. DELETE FROM saga_state
  2114. WHERE operation_id = :operation_id
  2115. "#,
  2116. )?
  2117. .bind("operation_id", operation_id.to_string())
  2118. .execute(&self.inner)
  2119. .await?;
  2120. Ok(())
  2121. }
  2122. }
  2123. #[async_trait]
  2124. impl<RM> SagaDatabase for SQLMintDatabase<RM>
  2125. where
  2126. RM: DatabasePool + 'static,
  2127. {
  2128. type Err = Error;
  2129. async fn get_incomplete_sagas(
  2130. &self,
  2131. operation_kind: mint::OperationKind,
  2132. ) -> Result<Vec<mint::Saga>, Self::Err> {
  2133. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  2134. Ok(query(
  2135. r#"
  2136. SELECT
  2137. operation_id,
  2138. operation_kind,
  2139. state,
  2140. blinded_secrets,
  2141. input_ys,
  2142. quote_id,
  2143. created_at,
  2144. updated_at
  2145. FROM
  2146. saga_state
  2147. WHERE
  2148. operation_kind = :operation_kind
  2149. ORDER BY created_at ASC
  2150. "#,
  2151. )?
  2152. .bind("operation_kind", operation_kind.to_string())
  2153. .fetch_all(&*conn)
  2154. .await?
  2155. .into_iter()
  2156. .map(sql_row_to_saga)
  2157. .collect::<Result<Vec<_>, _>>()?)
  2158. }
  2159. }
  2160. #[async_trait]
  2161. impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
  2162. where
  2163. RM: DatabasePool + 'static,
  2164. {
  2165. async fn begin_transaction<'a>(
  2166. &'a self,
  2167. ) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
  2168. let tx = SQLTransaction {
  2169. inner: ConnectionWithTransaction::new(
  2170. self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
  2171. )
  2172. .await?,
  2173. };
  2174. Ok(Box::new(tx))
  2175. }
  2176. }
  2177. fn sql_row_to_keyset_info(row: Vec<Column>) -> Result<MintKeySetInfo, Error> {
  2178. unpack_into!(
  2179. let (
  2180. id,
  2181. unit,
  2182. active,
  2183. valid_from,
  2184. valid_to,
  2185. derivation_path,
  2186. derivation_path_index,
  2187. amounts,
  2188. row_keyset_ppk
  2189. ) = row
  2190. );
  2191. let amounts = column_as_nullable_string!(amounts)
  2192. .and_then(|str| serde_json::from_str(&str).ok())
  2193. .ok_or_else(|| Error::Database("amounts field is required".to_string().into()))?;
  2194. Ok(MintKeySetInfo {
  2195. id: column_as_string!(id, Id::from_str, Id::from_bytes),
  2196. unit: column_as_string!(unit, CurrencyUnit::from_str),
  2197. active: matches!(active, Column::Integer(1)),
  2198. valid_from: column_as_number!(valid_from),
  2199. derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
  2200. derivation_path_index: column_as_nullable_number!(derivation_path_index),
  2201. amounts,
  2202. input_fee_ppk: column_as_number!(row_keyset_ppk),
  2203. final_expiry: column_as_nullable_number!(valid_to),
  2204. })
  2205. }
  2206. #[instrument(skip_all)]
  2207. fn sql_row_to_mint_quote(
  2208. row: Vec<Column>,
  2209. payments: Vec<IncomingPayment>,
  2210. issueances: Vec<Issuance>,
  2211. ) -> Result<MintQuote, Error> {
  2212. unpack_into!(
  2213. let (
  2214. id, amount, unit, request, expiry, request_lookup_id,
  2215. pubkey, created_time, amount_paid, amount_issued, payment_method, request_lookup_id_kind
  2216. ) = row
  2217. );
  2218. let request_str = column_as_string!(&request);
  2219. let request_lookup_id = column_as_nullable_string!(&request_lookup_id).unwrap_or_else(|| {
  2220. Bolt11Invoice::from_str(&request_str)
  2221. .map(|invoice| invoice.payment_hash().to_string())
  2222. .unwrap_or_else(|_| request_str.clone())
  2223. });
  2224. let request_lookup_id_kind = column_as_string!(request_lookup_id_kind);
  2225. let pubkey = column_as_nullable_string!(&pubkey)
  2226. .map(|pk| PublicKey::from_hex(&pk))
  2227. .transpose()?;
  2228. let id = column_as_string!(id);
  2229. let amount: Option<u64> = column_as_nullable_number!(amount);
  2230. let amount_paid: u64 = column_as_number!(amount_paid);
  2231. let amount_issued: u64 = column_as_number!(amount_issued);
  2232. let payment_method = column_as_string!(payment_method, PaymentMethod::from_str);
  2233. Ok(MintQuote::new(
  2234. Some(QuoteId::from_str(&id)?),
  2235. request_str,
  2236. column_as_string!(unit, CurrencyUnit::from_str),
  2237. amount.map(Amount::from),
  2238. column_as_number!(expiry),
  2239. PaymentIdentifier::new(&request_lookup_id_kind, &request_lookup_id)
  2240. .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
  2241. pubkey,
  2242. amount_paid.into(),
  2243. amount_issued.into(),
  2244. payment_method,
  2245. column_as_number!(created_time),
  2246. payments,
  2247. issueances,
  2248. ))
  2249. }
  2250. fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<mint::MeltQuote, Error> {
  2251. unpack_into!(
  2252. let (
  2253. id,
  2254. unit,
  2255. amount,
  2256. request,
  2257. fee_reserve,
  2258. expiry,
  2259. state,
  2260. payment_preimage,
  2261. request_lookup_id,
  2262. created_time,
  2263. paid_time,
  2264. payment_method,
  2265. options,
  2266. request_lookup_id_kind
  2267. ) = row
  2268. );
  2269. let id = column_as_string!(id);
  2270. let amount: u64 = column_as_number!(amount);
  2271. let fee_reserve: u64 = column_as_number!(fee_reserve);
  2272. let expiry = column_as_number!(expiry);
  2273. let payment_preimage = column_as_nullable_string!(payment_preimage);
  2274. let options = column_as_nullable_string!(options);
  2275. let options = options.and_then(|o| serde_json::from_str(&o).ok());
  2276. let created_time: i64 = column_as_number!(created_time);
  2277. let paid_time = column_as_nullable_number!(paid_time);
  2278. let payment_method = PaymentMethod::from_str(&column_as_string!(payment_method))?;
  2279. let state =
  2280. MeltQuoteState::from_str(&column_as_string!(&state)).map_err(ConversionError::from)?;
  2281. let unit = column_as_string!(unit);
  2282. let request = column_as_string!(request);
  2283. let request_lookup_id_kind = column_as_nullable_string!(request_lookup_id_kind);
  2284. let request_lookup_id = column_as_nullable_string!(&request_lookup_id).or_else(|| {
  2285. Bolt11Invoice::from_str(&request)
  2286. .ok()
  2287. .map(|invoice| invoice.payment_hash().to_string())
  2288. });
  2289. let request_lookup_id = if let (Some(id_kind), Some(request_lookup_id)) =
  2290. (request_lookup_id_kind, request_lookup_id)
  2291. {
  2292. Some(
  2293. PaymentIdentifier::new(&id_kind, &request_lookup_id)
  2294. .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
  2295. )
  2296. } else {
  2297. None
  2298. };
  2299. let request = match serde_json::from_str(&request) {
  2300. Ok(req) => req,
  2301. Err(err) => {
  2302. tracing::debug!(
  2303. "Melt quote from pre migrations defaulting to bolt11 {}.",
  2304. err
  2305. );
  2306. let bolt11 = Bolt11Invoice::from_str(&request).unwrap();
  2307. MeltPaymentRequest::Bolt11 { bolt11 }
  2308. }
  2309. };
  2310. Ok(MeltQuote {
  2311. id: QuoteId::from_str(&id)?,
  2312. unit: CurrencyUnit::from_str(&unit)?,
  2313. amount: Amount::from(amount),
  2314. request,
  2315. fee_reserve: Amount::from(fee_reserve),
  2316. state,
  2317. expiry,
  2318. payment_preimage,
  2319. request_lookup_id,
  2320. options,
  2321. created_time: created_time as u64,
  2322. paid_time,
  2323. payment_method,
  2324. })
  2325. }
  2326. fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
  2327. unpack_into!(
  2328. let (
  2329. amount,
  2330. keyset_id,
  2331. secret,
  2332. c,
  2333. witness
  2334. ) = row
  2335. );
  2336. let amount: u64 = column_as_number!(amount);
  2337. Ok(Proof {
  2338. amount: Amount::from(amount),
  2339. keyset_id: column_as_string!(keyset_id, Id::from_str),
  2340. secret: column_as_string!(secret, Secret::from_str),
  2341. c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
  2342. witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
  2343. dleq: None,
  2344. })
  2345. }
  2346. fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
  2347. unpack_into!(
  2348. let (
  2349. keyset_id, amount
  2350. ) = row
  2351. );
  2352. let amount: u64 = column_as_number!(amount);
  2353. Ok((
  2354. column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
  2355. Amount::from(amount),
  2356. ))
  2357. }
  2358. fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, Option<State>), Error> {
  2359. unpack_into!(
  2360. let (
  2361. keyset_id, amount, secret, c, witness, state
  2362. ) = row
  2363. );
  2364. let amount: u64 = column_as_number!(amount);
  2365. let state = column_as_nullable_string!(state).and_then(|s| State::from_str(&s).ok());
  2366. Ok((
  2367. Proof {
  2368. amount: Amount::from(amount),
  2369. keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
  2370. secret: column_as_string!(secret, Secret::from_str),
  2371. c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
  2372. witness: column_as_nullable_string!(witness)
  2373. .and_then(|w| serde_json::from_str(&w).ok()),
  2374. dleq: None,
  2375. },
  2376. state,
  2377. ))
  2378. }
  2379. fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error> {
  2380. unpack_into!(
  2381. let (
  2382. keyset_id, amount, c, dleq_e, dleq_s
  2383. ) = row
  2384. );
  2385. let dleq = match (
  2386. column_as_nullable_string!(dleq_e),
  2387. column_as_nullable_string!(dleq_s),
  2388. ) {
  2389. (Some(e), Some(s)) => Some(BlindSignatureDleq {
  2390. e: SecretKey::from_hex(e)?,
  2391. s: SecretKey::from_hex(s)?,
  2392. }),
  2393. _ => None,
  2394. };
  2395. let amount: u64 = column_as_number!(amount);
  2396. Ok(BlindSignature {
  2397. amount: Amount::from(amount),
  2398. keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
  2399. c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
  2400. dleq,
  2401. })
  2402. }
  2403. fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
  2404. unpack_into!(
  2405. let (
  2406. operation_id,
  2407. operation_kind,
  2408. state,
  2409. blinded_secrets,
  2410. input_ys,
  2411. quote_id,
  2412. created_at,
  2413. updated_at
  2414. ) = row
  2415. );
  2416. let operation_id_str = column_as_string!(&operation_id);
  2417. let operation_id = uuid::Uuid::parse_str(&operation_id_str)
  2418. .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
  2419. let operation_kind_str = column_as_string!(&operation_kind);
  2420. let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
  2421. .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
  2422. let state_str = column_as_string!(&state);
  2423. let state = mint::SagaStateEnum::new(operation_kind, &state_str)
  2424. .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
  2425. let blinded_secrets_str = column_as_string!(&blinded_secrets);
  2426. let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
  2427. .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {e}")))?;
  2428. let input_ys_str = column_as_string!(&input_ys);
  2429. let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
  2430. .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {e}")))?;
  2431. let quote_id = match &quote_id {
  2432. Column::Text(s) => {
  2433. if s.is_empty() {
  2434. None
  2435. } else {
  2436. Some(s.clone())
  2437. }
  2438. }
  2439. Column::Null => None,
  2440. _ => None,
  2441. };
  2442. let created_at: u64 = column_as_number!(created_at);
  2443. let updated_at: u64 = column_as_number!(updated_at);
  2444. Ok(mint::Saga {
  2445. operation_id,
  2446. operation_kind,
  2447. state,
  2448. blinded_secrets,
  2449. input_ys,
  2450. quote_id,
  2451. created_at,
  2452. updated_at,
  2453. })
  2454. }
  2455. #[cfg(test)]
  2456. mod test {
  2457. use super::*;
  2458. mod keyset_amounts_tests {
  2459. use super::*;
  2460. #[test]
  2461. fn keyset_with_amounts() {
  2462. let amounts = (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
  2463. let result = sql_row_to_keyset_info(vec![
  2464. Column::Text("0083a60439303340".to_owned()),
  2465. Column::Text("sat".to_owned()),
  2466. Column::Integer(1),
  2467. Column::Integer(1749844864),
  2468. Column::Null,
  2469. Column::Text("0'/0'/0'".to_owned()),
  2470. Column::Integer(0),
  2471. Column::Text(serde_json::to_string(&amounts).expect("valid json")),
  2472. Column::Integer(0),
  2473. ]);
  2474. assert!(result.is_ok());
  2475. let keyset = result.unwrap();
  2476. assert_eq!(keyset.amounts.len(), 32);
  2477. }
  2478. }
  2479. }