mod.rs 86 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799
  1. //! SQL database implementation of the Mint
  2. //!
  3. //! This is a generic SQL implementation for the mint storage layer. Any database can be plugged in
  4. //! as long as standard ANSI SQL is used, as Postgres and SQLite would understand it.
  5. //!
  6. //! This implementation also has a rudimentary but standard migration and versioning system.
  7. //!
  8. //! The trait expects an asynchronous interaction, but it also provides tools to spawn blocking
  9. //! clients in a pool and expose them to an asynchronous environment, making them compatible with
  10. //! Mint.
  11. use std::collections::HashMap;
  12. use std::fmt::Debug;
  13. use std::str::FromStr;
  14. use std::sync::Arc;
  15. use async_trait::async_trait;
  16. use bitcoin::bip32::DerivationPath;
  17. use cdk_common::database::mint::{
  18. CompletedOperationsDatabase, CompletedOperationsTransaction, SagaDatabase, SagaTransaction,
  19. };
  20. use cdk_common::database::{
  21. self, ConversionError, DbTransactionFinalizer, Error, MintDatabase, MintKeyDatabaseTransaction,
  22. MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
  23. MintSignatureTransaction, MintSignaturesDatabase,
  24. };
  25. use cdk_common::mint::{
  26. self, IncomingPayment, Issuance, MeltPaymentRequest, MeltQuote, MintKeySetInfo, MintQuote,
  27. Operation,
  28. };
  29. use cdk_common::nut00::ProofsMethods;
  30. use cdk_common::payment::PaymentIdentifier;
  31. use cdk_common::quote_id::QuoteId;
  32. use cdk_common::secret::Secret;
  33. use cdk_common::state::{check_melt_quote_state_transition, check_state_transition};
  34. use cdk_common::util::unix_time;
  35. use cdk_common::{
  36. Amount, BlindSignature, BlindSignatureDleq, BlindedMessage, CurrencyUnit, Id, MeltQuoteState,
  37. PaymentMethod, Proof, Proofs, PublicKey, SecretKey, State,
  38. };
  39. use lightning_invoice::Bolt11Invoice;
  40. use migrations::MIGRATIONS;
  41. use tracing::instrument;
  42. use crate::common::migrate;
  43. use crate::database::{ConnectionWithTransaction, DatabaseExecutor};
  44. use crate::pool::{DatabasePool, Pool, PooledResource};
  45. use crate::stmt::{query, Column};
  46. use crate::{
  47. column_as_nullable_number, column_as_nullable_string, column_as_number, column_as_string,
  48. unpack_into,
  49. };
  50. #[cfg(feature = "auth")]
  51. mod auth;
  52. #[rustfmt::skip]
  53. mod migrations {
  54. include!(concat!(env!("OUT_DIR"), "/migrations_mint.rs"));
  55. }
  56. #[cfg(feature = "auth")]
  57. pub use auth::SQLMintAuthDatabase;
  58. #[cfg(feature = "prometheus")]
  59. use cdk_prometheus::METRICS;
  60. /// Mint SQL Database
  61. #[derive(Debug, Clone)]
  62. pub struct SQLMintDatabase<RM>
  63. where
  64. RM: DatabasePool + 'static,
  65. {
  66. pool: Arc<Pool<RM>>,
  67. }
  68. /// SQL Transaction Writer
  69. pub struct SQLTransaction<RM>
  70. where
  71. RM: DatabasePool + 'static,
  72. {
  73. inner: ConnectionWithTransaction<RM::Connection, PooledResource<RM>>,
  74. }
  75. #[inline(always)]
  76. async fn get_current_states<C>(
  77. conn: &C,
  78. ys: &[PublicKey],
  79. ) -> Result<HashMap<PublicKey, State>, Error>
  80. where
  81. C: DatabaseExecutor + Send + Sync,
  82. {
  83. if ys.is_empty() {
  84. return Ok(Default::default());
  85. }
  86. query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
  87. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  88. .fetch_all(conn)
  89. .await?
  90. .into_iter()
  91. .map(|row| {
  92. Ok((
  93. column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice),
  94. column_as_string!(&row[1], State::from_str),
  95. ))
  96. })
  97. .collect::<Result<HashMap<_, _>, _>>()
  98. }
  99. impl<RM> SQLMintDatabase<RM>
  100. where
  101. RM: DatabasePool + 'static,
  102. {
  103. /// Creates a new instance
  104. pub async fn new<X>(db: X) -> Result<Self, Error>
  105. where
  106. X: Into<RM::Config>,
  107. {
  108. let pool = Pool::new(db.into());
  109. Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
  110. Ok(Self { pool })
  111. }
  112. /// Migrate
  113. async fn migrate(conn: PooledResource<RM>) -> Result<(), Error> {
  114. let tx = ConnectionWithTransaction::new(conn).await?;
  115. migrate(&tx, RM::Connection::name(), MIGRATIONS).await?;
  116. tx.commit().await?;
  117. Ok(())
  118. }
  119. }
  120. #[async_trait]
  121. impl<RM> database::MintProofsTransaction<'_> for SQLTransaction<RM>
  122. where
  123. RM: DatabasePool + 'static,
  124. {
  125. type Err = Error;
  126. async fn add_proofs(
  127. &mut self,
  128. proofs: Proofs,
  129. quote_id: Option<QuoteId>,
  130. operation: &Operation,
  131. ) -> Result<(), Self::Err> {
  132. let current_time = unix_time();
  133. // Check any previous proof, this query should return None in order to proceed storing
  134. // Any result here would error
  135. match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1 FOR UPDATE"#)?
  136. .bind_vec(
  137. "ys",
  138. proofs
  139. .iter()
  140. .map(|y| y.y().map(|y| y.to_bytes().to_vec()))
  141. .collect::<Result<_, _>>()?,
  142. )
  143. .pluck(&self.inner)
  144. .await?
  145. .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str)))
  146. .transpose()?
  147. {
  148. Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof),
  149. Some(_) => Err(database::Error::Duplicate),
  150. None => Ok(()), // no previous record
  151. }?;
  152. for proof in proofs {
  153. query(
  154. r#"
  155. INSERT INTO proof
  156. (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
  157. VALUES
  158. (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
  159. "#,
  160. )?
  161. .bind("y", proof.y()?.to_bytes().to_vec())
  162. .bind("amount", proof.amount.to_i64())
  163. .bind("keyset_id", proof.keyset_id.to_string())
  164. .bind("secret", proof.secret.to_string())
  165. .bind("c", proof.c.to_bytes().to_vec())
  166. .bind(
  167. "witness",
  168. proof.witness.and_then(|w| serde_json::to_string(&w).inspect_err(|e| tracing::error!("Failed to serialize witness: {:?}", e)).ok()),
  169. )
  170. .bind("state", "UNSPENT".to_string())
  171. .bind("quote_id", quote_id.clone().map(|q| q.to_string()))
  172. .bind("created_time", current_time as i64)
  173. .bind("operation_kind", operation.kind().to_string())
  174. .bind("operation_id", operation.id().to_string())
  175. .execute(&self.inner)
  176. .await?;
  177. }
  178. Ok(())
  179. }
  180. async fn update_proofs_states(
  181. &mut self,
  182. ys: &[PublicKey],
  183. new_state: State,
  184. ) -> Result<Vec<Option<State>>, Self::Err> {
  185. let mut current_states = get_current_states(&self.inner, ys).await?;
  186. if current_states.len() != ys.len() {
  187. tracing::warn!(
  188. "Attempted to update state of non-existent proof {} {}",
  189. current_states.len(),
  190. ys.len()
  191. );
  192. return Err(database::Error::ProofNotFound);
  193. }
  194. for state in current_states.values() {
  195. check_state_transition(*state, new_state)?;
  196. }
  197. query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#)?
  198. .bind("new_state", new_state.to_string())
  199. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  200. .execute(&self.inner)
  201. .await?;
  202. if new_state == State::Spent {
  203. query(
  204. r#"
  205. INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
  206. SELECT keyset_id, 0, COALESCE(SUM(amount), 0)
  207. FROM proof
  208. WHERE y IN (:ys)
  209. GROUP BY keyset_id
  210. ON CONFLICT (keyset_id)
  211. DO UPDATE SET total_redeemed = keyset_amounts.total_redeemed + EXCLUDED.total_redeemed
  212. "#,
  213. )?
  214. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  215. .execute(&self.inner)
  216. .await?;
  217. }
  218. Ok(ys.iter().map(|y| current_states.remove(y)).collect())
  219. }
  220. async fn remove_proofs(
  221. &mut self,
  222. ys: &[PublicKey],
  223. _quote_id: Option<QuoteId>,
  224. ) -> Result<(), Self::Err> {
  225. if ys.is_empty() {
  226. return Ok(());
  227. }
  228. let total_deleted = query(
  229. r#"
  230. DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
  231. "#,
  232. )?
  233. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  234. .bind_vec("exclude_state", vec![State::Spent.to_string()])
  235. .execute(&self.inner)
  236. .await?;
  237. if total_deleted != ys.len() {
  238. // Query current states to provide detailed logging
  239. let current_states = get_current_states(&self.inner, ys).await?;
  240. let missing_count = ys.len() - current_states.len();
  241. let spent_count = current_states
  242. .values()
  243. .filter(|s| **s == State::Spent)
  244. .count();
  245. if missing_count > 0 {
  246. tracing::warn!(
  247. "remove_proofs: {} of {} proofs do not exist in database (already removed?)",
  248. missing_count,
  249. ys.len()
  250. );
  251. }
  252. if spent_count > 0 {
  253. tracing::warn!(
  254. "remove_proofs: {} of {} proofs are in Spent state and cannot be removed",
  255. spent_count,
  256. ys.len()
  257. );
  258. }
  259. tracing::debug!(
  260. "remove_proofs details: requested={}, deleted={}, missing={}, spent={}",
  261. ys.len(),
  262. total_deleted,
  263. missing_count,
  264. spent_count
  265. );
  266. return Err(Self::Err::AttemptRemoveSpentProof);
  267. }
  268. Ok(())
  269. }
  270. async fn get_proof_ys_by_quote_id(
  271. &self,
  272. quote_id: &QuoteId,
  273. ) -> Result<Vec<PublicKey>, Self::Err> {
  274. Ok(query(
  275. r#"
  276. SELECT
  277. amount,
  278. keyset_id,
  279. secret,
  280. c,
  281. witness
  282. FROM
  283. proof
  284. WHERE
  285. quote_id = :quote_id
  286. "#,
  287. )?
  288. .bind("quote_id", quote_id.to_string())
  289. .fetch_all(&self.inner)
  290. .await?
  291. .into_iter()
  292. .map(sql_row_to_proof)
  293. .collect::<Result<Vec<Proof>, _>>()?
  294. .ys()?)
  295. }
  296. }
  297. #[async_trait]
  298. impl<RM> database::MintTransaction<'_, Error> for SQLTransaction<RM> where RM: DatabasePool + 'static
  299. {}
  300. #[async_trait]
  301. impl<RM> DbTransactionFinalizer for SQLTransaction<RM>
  302. where
  303. RM: DatabasePool + 'static,
  304. {
  305. type Err = Error;
  306. async fn commit(self: Box<Self>) -> Result<(), Error> {
  307. let result = self.inner.commit().await;
  308. #[cfg(feature = "prometheus")]
  309. {
  310. let success = result.is_ok();
  311. METRICS.record_mint_operation("transaction_commit", success);
  312. METRICS.record_mint_operation_histogram("transaction_commit", success, 1.0);
  313. }
  314. Ok(result?)
  315. }
  316. async fn rollback(self: Box<Self>) -> Result<(), Error> {
  317. let result = self.inner.rollback().await;
  318. #[cfg(feature = "prometheus")]
  319. {
  320. let success = result.is_ok();
  321. METRICS.record_mint_operation("transaction_rollback", success);
  322. METRICS.record_mint_operation_histogram("transaction_rollback", success, 1.0);
  323. }
  324. Ok(result?)
  325. }
  326. }
  327. #[inline(always)]
  328. async fn get_mint_quote_payments<C>(
  329. conn: &C,
  330. quote_id: &QuoteId,
  331. ) -> Result<Vec<IncomingPayment>, Error>
  332. where
  333. C: DatabaseExecutor + Send + Sync,
  334. {
  335. // Get payment IDs and timestamps from the mint_quote_payments table
  336. query(
  337. r#"
  338. SELECT
  339. payment_id,
  340. timestamp,
  341. amount
  342. FROM
  343. mint_quote_payments
  344. WHERE
  345. quote_id=:quote_id
  346. "#,
  347. )?
  348. .bind("quote_id", quote_id.to_string())
  349. .fetch_all(conn)
  350. .await?
  351. .into_iter()
  352. .map(|row| {
  353. let amount: u64 = column_as_number!(row[2].clone());
  354. let time: u64 = column_as_number!(row[1].clone());
  355. Ok(IncomingPayment::new(
  356. amount.into(),
  357. column_as_string!(&row[0]),
  358. time,
  359. ))
  360. })
  361. .collect()
  362. }
  363. #[inline(always)]
  364. async fn get_mint_quote_issuance<C>(conn: &C, quote_id: &QuoteId) -> Result<Vec<Issuance>, Error>
  365. where
  366. C: DatabaseExecutor + Send + Sync,
  367. {
  368. // Get payment IDs and timestamps from the mint_quote_payments table
  369. query(
  370. r#"
  371. SELECT amount, timestamp
  372. FROM mint_quote_issued
  373. WHERE quote_id=:quote_id
  374. "#,
  375. )?
  376. .bind("quote_id", quote_id.to_string())
  377. .fetch_all(conn)
  378. .await?
  379. .into_iter()
  380. .map(|row| {
  381. let time: u64 = column_as_number!(row[1].clone());
  382. Ok(Issuance::new(
  383. Amount::from_i64(column_as_number!(row[0].clone()))
  384. .expect("Is amount when put into db"),
  385. time,
  386. ))
  387. })
  388. .collect()
  389. }
  390. // Inline helper functions that work with both connections and transactions
  391. #[inline]
  392. async fn get_mint_quote_inner<T>(
  393. executor: &T,
  394. quote_id: &QuoteId,
  395. for_update: bool,
  396. ) -> Result<Option<MintQuote>, Error>
  397. where
  398. T: DatabaseExecutor,
  399. {
  400. let payments = get_mint_quote_payments(executor, quote_id).await?;
  401. let issuance = get_mint_quote_issuance(executor, quote_id).await?;
  402. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  403. let query_str = format!(
  404. r#"
  405. SELECT
  406. id,
  407. amount,
  408. unit,
  409. request,
  410. expiry,
  411. request_lookup_id,
  412. pubkey,
  413. created_time,
  414. amount_paid,
  415. amount_issued,
  416. payment_method,
  417. request_lookup_id_kind
  418. FROM
  419. mint_quote
  420. WHERE id = :id
  421. {for_update_clause}
  422. "#
  423. );
  424. query(&query_str)?
  425. .bind("id", quote_id.to_string())
  426. .fetch_one(executor)
  427. .await?
  428. .map(|row| sql_row_to_mint_quote(row, payments, issuance))
  429. .transpose()
  430. }
  431. #[inline]
  432. async fn get_mint_quote_by_request_inner<T>(
  433. executor: &T,
  434. request: &str,
  435. for_update: bool,
  436. ) -> Result<Option<MintQuote>, Error>
  437. where
  438. T: DatabaseExecutor,
  439. {
  440. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  441. let query_str = format!(
  442. r#"
  443. SELECT
  444. id,
  445. amount,
  446. unit,
  447. request,
  448. expiry,
  449. request_lookup_id,
  450. pubkey,
  451. created_time,
  452. amount_paid,
  453. amount_issued,
  454. payment_method,
  455. request_lookup_id_kind
  456. FROM
  457. mint_quote
  458. WHERE request = :request
  459. {for_update_clause}
  460. "#
  461. );
  462. let mut mint_quote = query(&query_str)?
  463. .bind("request", request.to_string())
  464. .fetch_one(executor)
  465. .await?
  466. .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
  467. .transpose()?;
  468. if let Some(quote) = mint_quote.as_mut() {
  469. let payments = get_mint_quote_payments(executor, &quote.id).await?;
  470. let issuance = get_mint_quote_issuance(executor, &quote.id).await?;
  471. quote.issuance = issuance;
  472. quote.payments = payments;
  473. }
  474. Ok(mint_quote)
  475. }
  476. #[inline]
  477. async fn get_mint_quote_by_request_lookup_id_inner<T>(
  478. executor: &T,
  479. request_lookup_id: &PaymentIdentifier,
  480. for_update: bool,
  481. ) -> Result<Option<MintQuote>, Error>
  482. where
  483. T: DatabaseExecutor,
  484. {
  485. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  486. let query_str = format!(
  487. r#"
  488. SELECT
  489. id,
  490. amount,
  491. unit,
  492. request,
  493. expiry,
  494. request_lookup_id,
  495. pubkey,
  496. created_time,
  497. amount_paid,
  498. amount_issued,
  499. payment_method,
  500. request_lookup_id_kind
  501. FROM
  502. mint_quote
  503. WHERE request_lookup_id = :request_lookup_id
  504. AND request_lookup_id_kind = :request_lookup_id_kind
  505. {for_update_clause}
  506. "#
  507. );
  508. let mut mint_quote = query(&query_str)?
  509. .bind("request_lookup_id", request_lookup_id.to_string())
  510. .bind("request_lookup_id_kind", request_lookup_id.kind())
  511. .fetch_one(executor)
  512. .await?
  513. .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
  514. .transpose()?;
  515. if let Some(quote) = mint_quote.as_mut() {
  516. let payments = get_mint_quote_payments(executor, &quote.id).await?;
  517. let issuance = get_mint_quote_issuance(executor, &quote.id).await?;
  518. quote.issuance = issuance;
  519. quote.payments = payments;
  520. }
  521. Ok(mint_quote)
  522. }
  523. #[inline]
  524. async fn get_melt_quote_inner<T>(
  525. executor: &T,
  526. quote_id: &QuoteId,
  527. for_update: bool,
  528. ) -> Result<Option<mint::MeltQuote>, Error>
  529. where
  530. T: DatabaseExecutor,
  531. {
  532. let for_update_clause = if for_update { "FOR UPDATE" } else { "" };
  533. let query_str = format!(
  534. r#"
  535. SELECT
  536. id,
  537. unit,
  538. amount,
  539. request,
  540. fee_reserve,
  541. expiry,
  542. state,
  543. payment_preimage,
  544. request_lookup_id,
  545. created_time,
  546. paid_time,
  547. payment_method,
  548. options,
  549. request_lookup_id_kind
  550. FROM
  551. melt_quote
  552. WHERE
  553. id=:id
  554. {for_update_clause}
  555. "#
  556. );
  557. query(&query_str)?
  558. .bind("id", quote_id.to_string())
  559. .fetch_one(executor)
  560. .await?
  561. .map(sql_row_to_melt_quote)
  562. .transpose()
  563. }
  564. #[async_trait]
  565. impl<RM> MintKeyDatabaseTransaction<'_, Error> for SQLTransaction<RM>
  566. where
  567. RM: DatabasePool + 'static,
  568. {
  569. async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error> {
  570. query(
  571. r#"
  572. INSERT INTO
  573. keyset (
  574. id, unit, active, valid_from, valid_to, derivation_path,
  575. amounts, input_fee_ppk, derivation_path_index
  576. )
  577. VALUES (
  578. :id, :unit, :active, :valid_from, :valid_to, :derivation_path,
  579. :amounts, :input_fee_ppk, :derivation_path_index
  580. )
  581. ON CONFLICT(id) DO UPDATE SET
  582. unit = excluded.unit,
  583. active = excluded.active,
  584. valid_from = excluded.valid_from,
  585. valid_to = excluded.valid_to,
  586. derivation_path = excluded.derivation_path,
  587. amounts = excluded.amounts,
  588. input_fee_ppk = excluded.input_fee_ppk,
  589. derivation_path_index = excluded.derivation_path_index
  590. "#,
  591. )?
  592. .bind("id", keyset.id.to_string())
  593. .bind("unit", keyset.unit.to_string())
  594. .bind("active", keyset.active)
  595. .bind("valid_from", keyset.valid_from as i64)
  596. .bind("valid_to", keyset.final_expiry.map(|v| v as i64))
  597. .bind("derivation_path", keyset.derivation_path.to_string())
  598. .bind("amounts", serde_json::to_string(&keyset.amounts).ok())
  599. .bind("input_fee_ppk", keyset.input_fee_ppk as i64)
  600. .bind("derivation_path_index", keyset.derivation_path_index)
  601. .execute(&self.inner)
  602. .await?;
  603. Ok(())
  604. }
  605. async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error> {
  606. query(r#"UPDATE keyset SET active=FALSE WHERE unit = :unit"#)?
  607. .bind("unit", unit.to_string())
  608. .execute(&self.inner)
  609. .await?;
  610. query(r#"UPDATE keyset SET active=TRUE WHERE unit = :unit AND id = :id"#)?
  611. .bind("unit", unit.to_string())
  612. .bind("id", id.to_string())
  613. .execute(&self.inner)
  614. .await?;
  615. Ok(())
  616. }
  617. }
  618. #[async_trait]
  619. impl<RM> MintKeysDatabase for SQLMintDatabase<RM>
  620. where
  621. RM: DatabasePool + 'static,
  622. {
  623. type Err = Error;
  624. async fn begin_transaction<'a>(
  625. &'a self,
  626. ) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
  627. let tx = SQLTransaction {
  628. inner: ConnectionWithTransaction::new(
  629. self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
  630. )
  631. .await?,
  632. };
  633. Ok(Box::new(tx))
  634. }
  635. async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
  636. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  637. Ok(
  638. query(r#" SELECT id FROM keyset WHERE active = :active AND unit = :unit"#)?
  639. .bind("active", true)
  640. .bind("unit", unit.to_string())
  641. .pluck(&*conn)
  642. .await?
  643. .map(|id| match id {
  644. Column::Text(text) => Ok(Id::from_str(&text)?),
  645. Column::Blob(id) => Ok(Id::from_bytes(&id)?),
  646. _ => Err(Error::InvalidKeysetId),
  647. })
  648. .transpose()?,
  649. )
  650. }
  651. async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
  652. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  653. Ok(
  654. query(r#"SELECT id, unit FROM keyset WHERE active = :active"#)?
  655. .bind("active", true)
  656. .fetch_all(&*conn)
  657. .await?
  658. .into_iter()
  659. .map(|row| {
  660. Ok((
  661. column_as_string!(&row[1], CurrencyUnit::from_str),
  662. column_as_string!(&row[0], Id::from_str, Id::from_bytes),
  663. ))
  664. })
  665. .collect::<Result<HashMap<_, _>, Error>>()?,
  666. )
  667. }
  668. async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
  669. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  670. Ok(query(
  671. r#"SELECT
  672. id,
  673. unit,
  674. active,
  675. valid_from,
  676. valid_to,
  677. derivation_path,
  678. derivation_path_index,
  679. amounts,
  680. input_fee_ppk
  681. FROM
  682. keyset
  683. WHERE id=:id"#,
  684. )?
  685. .bind("id", id.to_string())
  686. .fetch_one(&*conn)
  687. .await?
  688. .map(sql_row_to_keyset_info)
  689. .transpose()?)
  690. }
  691. async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
  692. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  693. Ok(query(
  694. r#"SELECT
  695. id,
  696. unit,
  697. active,
  698. valid_from,
  699. valid_to,
  700. derivation_path,
  701. derivation_path_index,
  702. amounts,
  703. input_fee_ppk
  704. FROM
  705. keyset
  706. "#,
  707. )?
  708. .fetch_all(&*conn)
  709. .await?
  710. .into_iter()
  711. .map(sql_row_to_keyset_info)
  712. .collect::<Result<Vec<_>, _>>()?)
  713. }
  714. }
  715. #[async_trait]
  716. impl<RM> MintQuotesTransaction<'_> for SQLTransaction<RM>
  717. where
  718. RM: DatabasePool + 'static,
  719. {
  720. type Err = Error;
  721. async fn add_melt_request(
  722. &mut self,
  723. quote_id: &QuoteId,
  724. inputs_amount: Amount,
  725. inputs_fee: Amount,
  726. ) -> Result<(), Self::Err> {
  727. // Insert melt_request
  728. query(
  729. r#"
  730. INSERT INTO melt_request
  731. (quote_id, inputs_amount, inputs_fee)
  732. VALUES
  733. (:quote_id, :inputs_amount, :inputs_fee)
  734. "#,
  735. )?
  736. .bind("quote_id", quote_id.to_string())
  737. .bind("inputs_amount", inputs_amount.to_i64())
  738. .bind("inputs_fee", inputs_fee.to_i64())
  739. .execute(&self.inner)
  740. .await?;
  741. Ok(())
  742. }
  743. async fn add_blinded_messages(
  744. &mut self,
  745. quote_id: Option<&QuoteId>,
  746. blinded_messages: &[BlindedMessage],
  747. operation: &Operation,
  748. ) -> Result<(), Self::Err> {
  749. let current_time = unix_time();
  750. // Insert blinded_messages directly into blind_signature with c = NULL
  751. // Let the database constraint handle duplicate detection
  752. for message in blinded_messages {
  753. match query(
  754. r#"
  755. INSERT INTO blind_signature
  756. (blinded_message, amount, keyset_id, c, quote_id, created_time, operation_kind, operation_id)
  757. VALUES
  758. (:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time, :operation_kind, :operation_id)
  759. "#,
  760. )?
  761. .bind(
  762. "blinded_message",
  763. message.blinded_secret.to_bytes().to_vec(),
  764. )
  765. .bind("amount", message.amount.to_i64())
  766. .bind("keyset_id", message.keyset_id.to_string())
  767. .bind("quote_id", quote_id.map(|q| q.to_string()))
  768. .bind("created_time", current_time as i64)
  769. .bind("operation_kind", operation.kind().to_string())
  770. .bind("operation_id", operation.id().to_string())
  771. .execute(&self.inner)
  772. .await
  773. {
  774. Ok(_) => continue,
  775. Err(database::Error::Duplicate) => {
  776. // Primary key constraint violation - blinded message already exists
  777. // This could be either:
  778. // 1. Already signed (c IS NOT NULL) - definitely an error
  779. // 2. Already pending (c IS NULL) - also an error
  780. return Err(database::Error::Duplicate);
  781. }
  782. Err(err) => return Err(err),
  783. }
  784. }
  785. Ok(())
  786. }
  787. async fn delete_blinded_messages(
  788. &mut self,
  789. blinded_secrets: &[PublicKey],
  790. ) -> Result<(), Self::Err> {
  791. if blinded_secrets.is_empty() {
  792. return Ok(());
  793. }
  794. // Delete blinded messages from blind_signature table where c IS NULL
  795. // (only delete unsigned blinded messages)
  796. query(
  797. r#"
  798. DELETE FROM blind_signature
  799. WHERE blinded_message IN (:blinded_secrets) AND c IS NULL
  800. "#,
  801. )?
  802. .bind_vec(
  803. "blinded_secrets",
  804. blinded_secrets
  805. .iter()
  806. .map(|secret| secret.to_bytes().to_vec())
  807. .collect(),
  808. )
  809. .execute(&self.inner)
  810. .await?;
  811. Ok(())
  812. }
  813. async fn get_melt_request_and_blinded_messages(
  814. &mut self,
  815. quote_id: &QuoteId,
  816. ) -> Result<Option<database::mint::MeltRequestInfo>, Self::Err> {
  817. let melt_request_row = query(
  818. r#"
  819. SELECT inputs_amount, inputs_fee
  820. FROM melt_request
  821. WHERE quote_id = :quote_id
  822. FOR UPDATE
  823. "#,
  824. )?
  825. .bind("quote_id", quote_id.to_string())
  826. .fetch_one(&self.inner)
  827. .await?;
  828. if let Some(row) = melt_request_row {
  829. let inputs_amount: u64 = column_as_number!(row[0].clone());
  830. let inputs_fee: u64 = column_as_number!(row[1].clone());
  831. // Get blinded messages from blind_signature table where c IS NULL
  832. let blinded_messages_rows = query(
  833. r#"
  834. SELECT blinded_message, keyset_id, amount
  835. FROM blind_signature
  836. WHERE quote_id = :quote_id AND c IS NULL
  837. "#,
  838. )?
  839. .bind("quote_id", quote_id.to_string())
  840. .fetch_all(&self.inner)
  841. .await?;
  842. let blinded_messages: Result<Vec<BlindedMessage>, Error> = blinded_messages_rows
  843. .into_iter()
  844. .map(|row| -> Result<BlindedMessage, Error> {
  845. let blinded_message_key =
  846. column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice);
  847. let keyset_id = column_as_string!(&row[1], Id::from_str, Id::from_bytes);
  848. let amount: u64 = column_as_number!(row[2].clone());
  849. Ok(BlindedMessage {
  850. blinded_secret: blinded_message_key,
  851. keyset_id,
  852. amount: Amount::from(amount),
  853. witness: None, // Not storing witness in database currently
  854. })
  855. })
  856. .collect();
  857. let blinded_messages = blinded_messages?;
  858. Ok(Some(database::mint::MeltRequestInfo {
  859. inputs_amount: Amount::from(inputs_amount),
  860. inputs_fee: Amount::from(inputs_fee),
  861. change_outputs: blinded_messages,
  862. }))
  863. } else {
  864. Ok(None)
  865. }
  866. }
  867. async fn delete_melt_request(&mut self, quote_id: &QuoteId) -> Result<(), Self::Err> {
  868. // Delete from melt_request table
  869. query(
  870. r#"
  871. DELETE FROM melt_request
  872. WHERE quote_id = :quote_id
  873. "#,
  874. )?
  875. .bind("quote_id", quote_id.to_string())
  876. .execute(&self.inner)
  877. .await?;
  878. // Also delete blinded messages (where c IS NULL) from blind_signature table
  879. query(
  880. r#"
  881. DELETE FROM blind_signature
  882. WHERE quote_id = :quote_id AND c IS NULL
  883. "#,
  884. )?
  885. .bind("quote_id", quote_id.to_string())
  886. .execute(&self.inner)
  887. .await?;
  888. Ok(())
  889. }
  890. #[instrument(skip(self))]
  891. async fn increment_mint_quote_amount_paid(
  892. &mut self,
  893. quote_id: &QuoteId,
  894. amount_paid: Amount,
  895. payment_id: String,
  896. ) -> Result<Amount, Self::Err> {
  897. if amount_paid == Amount::ZERO {
  898. tracing::warn!("Amount payments of zero amount should not be recorded.");
  899. return Err(Error::Duplicate);
  900. }
  901. // Check if payment_id already exists in mint_quote_payments
  902. let exists = query(
  903. r#"
  904. SELECT payment_id
  905. FROM mint_quote_payments
  906. WHERE payment_id = :payment_id
  907. FOR UPDATE
  908. "#,
  909. )?
  910. .bind("payment_id", payment_id.clone())
  911. .fetch_one(&self.inner)
  912. .await?;
  913. if exists.is_some() {
  914. tracing::error!("Payment ID already exists: {}", payment_id);
  915. return Err(database::Error::Duplicate);
  916. }
  917. // Get current amount_paid from quote
  918. let current_amount = query(
  919. r#"
  920. SELECT amount_paid
  921. FROM mint_quote
  922. WHERE id = :quote_id
  923. FOR UPDATE
  924. "#,
  925. )?
  926. .bind("quote_id", quote_id.to_string())
  927. .fetch_one(&self.inner)
  928. .await
  929. .inspect_err(|err| {
  930. tracing::error!("SQLite could not get mint quote amount_paid: {}", err);
  931. })?;
  932. let current_amount_paid = if let Some(current_amount) = current_amount {
  933. let amount: u64 = column_as_number!(current_amount[0].clone());
  934. Amount::from(amount)
  935. } else {
  936. Amount::ZERO
  937. };
  938. // Calculate new amount_paid with overflow check
  939. let new_amount_paid = current_amount_paid
  940. .checked_add(amount_paid)
  941. .ok_or_else(|| database::Error::AmountOverflow)?;
  942. tracing::debug!(
  943. "Mint quote {} amount paid was {} is now {}.",
  944. quote_id,
  945. current_amount_paid,
  946. new_amount_paid
  947. );
  948. // Update the amount_paid
  949. query(
  950. r#"
  951. UPDATE mint_quote
  952. SET amount_paid = :amount_paid
  953. WHERE id = :quote_id
  954. "#,
  955. )?
  956. .bind("amount_paid", new_amount_paid.to_i64())
  957. .bind("quote_id", quote_id.to_string())
  958. .execute(&self.inner)
  959. .await
  960. .inspect_err(|err| {
  961. tracing::error!("SQLite could not update mint quote amount_paid: {}", err);
  962. })?;
  963. // Add payment_id to mint_quote_payments table
  964. query(
  965. r#"
  966. INSERT INTO mint_quote_payments
  967. (quote_id, payment_id, amount, timestamp)
  968. VALUES (:quote_id, :payment_id, :amount, :timestamp)
  969. "#,
  970. )?
  971. .bind("quote_id", quote_id.to_string())
  972. .bind("payment_id", payment_id)
  973. .bind("amount", amount_paid.to_i64())
  974. .bind("timestamp", unix_time() as i64)
  975. .execute(&self.inner)
  976. .await
  977. .map_err(|err| {
  978. tracing::error!("SQLite could not insert payment ID: {}", err);
  979. err
  980. })?;
  981. Ok(new_amount_paid)
  982. }
  983. #[instrument(skip_all)]
  984. async fn increment_mint_quote_amount_issued(
  985. &mut self,
  986. quote_id: &QuoteId,
  987. amount_issued: Amount,
  988. ) -> Result<Amount, Self::Err> {
  989. // Get current amount_issued from quote
  990. let current_amounts = query(
  991. r#"
  992. SELECT amount_issued, amount_paid
  993. FROM mint_quote
  994. WHERE id = :quote_id
  995. FOR UPDATE
  996. "#,
  997. )?
  998. .bind("quote_id", quote_id.to_string())
  999. .fetch_one(&self.inner)
  1000. .await
  1001. .inspect_err(|err| {
  1002. tracing::error!("SQLite could not get mint quote amount_issued: {}", err);
  1003. })?
  1004. .ok_or(Error::QuoteNotFound)?;
  1005. let new_amount_issued = {
  1006. // Make sure the db protects issuing not paid quotes
  1007. unpack_into!(
  1008. let (current_amount_issued, current_amount_paid) = current_amounts
  1009. );
  1010. let current_amount_issued: u64 = column_as_number!(current_amount_issued);
  1011. let current_amount_paid: u64 = column_as_number!(current_amount_paid);
  1012. let current_amount_issued = Amount::from(current_amount_issued);
  1013. let current_amount_paid = Amount::from(current_amount_paid);
  1014. // Calculate new amount_issued with overflow check
  1015. let new_amount_issued = current_amount_issued
  1016. .checked_add(amount_issued)
  1017. .ok_or_else(|| database::Error::AmountOverflow)?;
  1018. current_amount_paid
  1019. .checked_sub(new_amount_issued)
  1020. .ok_or(Error::Internal("Over-issued not allowed".to_owned()))?;
  1021. new_amount_issued
  1022. };
  1023. // Update the amount_issued
  1024. query(
  1025. r#"
  1026. UPDATE mint_quote
  1027. SET amount_issued = :amount_issued
  1028. WHERE id = :quote_id
  1029. "#,
  1030. )?
  1031. .bind("amount_issued", new_amount_issued.to_i64())
  1032. .bind("quote_id", quote_id.to_string())
  1033. .execute(&self.inner)
  1034. .await
  1035. .inspect_err(|err| {
  1036. tracing::error!("SQLite could not update mint quote amount_issued: {}", err);
  1037. })?;
  1038. let current_time = unix_time();
  1039. query(
  1040. r#"
  1041. INSERT INTO mint_quote_issued
  1042. (quote_id, amount, timestamp)
  1043. VALUES (:quote_id, :amount, :timestamp);
  1044. "#,
  1045. )?
  1046. .bind("quote_id", quote_id.to_string())
  1047. .bind("amount", amount_issued.to_i64())
  1048. .bind("timestamp", current_time as i64)
  1049. .execute(&self.inner)
  1050. .await?;
  1051. Ok(new_amount_issued)
  1052. }
  1053. #[instrument(skip_all)]
  1054. async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> {
  1055. query(
  1056. r#"
  1057. INSERT INTO mint_quote (
  1058. id, amount, unit, request, expiry, request_lookup_id, pubkey, created_time, payment_method, request_lookup_id_kind
  1059. )
  1060. VALUES (
  1061. :id, :amount, :unit, :request, :expiry, :request_lookup_id, :pubkey, :created_time, :payment_method, :request_lookup_id_kind
  1062. )
  1063. "#,
  1064. )?
  1065. .bind("id", quote.id.to_string())
  1066. .bind("amount", quote.amount.map(|a| a.to_i64()))
  1067. .bind("unit", quote.unit.to_string())
  1068. .bind("request", quote.request)
  1069. .bind("expiry", quote.expiry as i64)
  1070. .bind(
  1071. "request_lookup_id",
  1072. quote.request_lookup_id.to_string(),
  1073. )
  1074. .bind("pubkey", quote.pubkey.map(|p| p.to_string()))
  1075. .bind("created_time", quote.created_time as i64)
  1076. .bind("payment_method", quote.payment_method.to_string())
  1077. .bind("request_lookup_id_kind", quote.request_lookup_id.kind())
  1078. .execute(&self.inner)
  1079. .await?;
  1080. Ok(())
  1081. }
  1082. async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
  1083. // Now insert the new quote
  1084. query(
  1085. r#"
  1086. INSERT INTO melt_quote
  1087. (
  1088. id, unit, amount, request, fee_reserve, state,
  1089. expiry, payment_preimage, request_lookup_id,
  1090. created_time, paid_time, options, request_lookup_id_kind, payment_method
  1091. )
  1092. VALUES
  1093. (
  1094. :id, :unit, :amount, :request, :fee_reserve, :state,
  1095. :expiry, :payment_preimage, :request_lookup_id,
  1096. :created_time, :paid_time, :options, :request_lookup_id_kind, :payment_method
  1097. )
  1098. "#,
  1099. )?
  1100. .bind("id", quote.id.to_string())
  1101. .bind("unit", quote.unit.to_string())
  1102. .bind("amount", quote.amount.to_i64())
  1103. .bind("request", serde_json::to_string(&quote.request)?)
  1104. .bind("fee_reserve", quote.fee_reserve.to_i64())
  1105. .bind("state", quote.state.to_string())
  1106. .bind("expiry", quote.expiry as i64)
  1107. .bind("payment_preimage", quote.payment_preimage)
  1108. .bind(
  1109. "request_lookup_id",
  1110. quote.request_lookup_id.as_ref().map(|id| id.to_string()),
  1111. )
  1112. .bind("created_time", quote.created_time as i64)
  1113. .bind("paid_time", quote.paid_time.map(|t| t as i64))
  1114. .bind(
  1115. "options",
  1116. quote.options.map(|o| serde_json::to_string(&o).ok()),
  1117. )
  1118. .bind(
  1119. "request_lookup_id_kind",
  1120. quote.request_lookup_id.map(|id| id.kind()),
  1121. )
  1122. .bind("payment_method", quote.payment_method.to_string())
  1123. .execute(&self.inner)
  1124. .await?;
  1125. Ok(())
  1126. }
  1127. async fn update_melt_quote_request_lookup_id(
  1128. &mut self,
  1129. quote_id: &QuoteId,
  1130. new_request_lookup_id: &PaymentIdentifier,
  1131. ) -> Result<(), Self::Err> {
  1132. query(r#"UPDATE melt_quote SET request_lookup_id = :new_req_id, request_lookup_id_kind = :new_kind WHERE id = :id"#)?
  1133. .bind("new_req_id", new_request_lookup_id.to_string())
  1134. .bind("new_kind",new_request_lookup_id.kind() )
  1135. .bind("id", quote_id.to_string())
  1136. .execute(&self.inner)
  1137. .await?;
  1138. Ok(())
  1139. }
  1140. async fn update_melt_quote_state(
  1141. &mut self,
  1142. quote_id: &QuoteId,
  1143. state: MeltQuoteState,
  1144. payment_proof: Option<String>,
  1145. ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
  1146. let mut quote = query(
  1147. r#"
  1148. SELECT
  1149. id,
  1150. unit,
  1151. amount,
  1152. request,
  1153. fee_reserve,
  1154. expiry,
  1155. state,
  1156. payment_preimage,
  1157. request_lookup_id,
  1158. created_time,
  1159. paid_time,
  1160. payment_method,
  1161. options,
  1162. request_lookup_id_kind
  1163. FROM
  1164. melt_quote
  1165. WHERE
  1166. id=:id
  1167. "#,
  1168. )?
  1169. .bind("id", quote_id.to_string())
  1170. .fetch_one(&self.inner)
  1171. .await?
  1172. .map(sql_row_to_melt_quote)
  1173. .transpose()?
  1174. .ok_or(Error::QuoteNotFound)?;
  1175. check_melt_quote_state_transition(quote.state, state)?;
  1176. // When transitioning to Pending, lock all quotes with the same lookup_id
  1177. // and check if any are already pending or paid
  1178. if state == MeltQuoteState::Pending {
  1179. if let Some(ref lookup_id) = quote.request_lookup_id {
  1180. // Lock all quotes with the same lookup_id to prevent race conditions
  1181. let locked_quotes: Vec<(String, String)> = query(
  1182. r#"
  1183. SELECT id, state
  1184. FROM melt_quote
  1185. WHERE request_lookup_id = :lookup_id
  1186. FOR UPDATE
  1187. "#,
  1188. )?
  1189. .bind("lookup_id", lookup_id.to_string())
  1190. .fetch_all(&self.inner)
  1191. .await?
  1192. .into_iter()
  1193. .map(|row| {
  1194. unpack_into!(let (id, state) = row);
  1195. Ok((column_as_string!(id), column_as_string!(state)))
  1196. })
  1197. .collect::<Result<Vec<_>, Error>>()?;
  1198. // Check if any other quote with the same lookup_id is pending or paid
  1199. let has_conflict = locked_quotes.iter().any(|(id, state)| {
  1200. id != &quote_id.to_string()
  1201. && (state == &MeltQuoteState::Pending.to_string()
  1202. || state == &MeltQuoteState::Paid.to_string())
  1203. });
  1204. if has_conflict {
  1205. tracing::warn!(
  1206. "Cannot transition quote {} to Pending: another quote with lookup_id {} is already pending or paid",
  1207. quote_id,
  1208. lookup_id
  1209. );
  1210. return Err(Error::Duplicate);
  1211. }
  1212. }
  1213. }
  1214. let rec = if state == MeltQuoteState::Paid {
  1215. let current_time = unix_time();
  1216. query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time, payment_preimage = :payment_preimage WHERE id = :id"#)?
  1217. .bind("state", state.to_string())
  1218. .bind("paid_time", current_time as i64)
  1219. .bind("payment_preimage", payment_proof)
  1220. .bind("id", quote_id.to_string())
  1221. .execute(&self.inner)
  1222. .await
  1223. } else {
  1224. query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#)?
  1225. .bind("state", state.to_string())
  1226. .bind("id", quote_id.to_string())
  1227. .execute(&self.inner)
  1228. .await
  1229. };
  1230. match rec {
  1231. Ok(_) => {}
  1232. Err(err) => {
  1233. tracing::error!("SQLite Could not update melt quote");
  1234. return Err(err);
  1235. }
  1236. };
  1237. let old_state = quote.state;
  1238. quote.state = state;
  1239. if state == MeltQuoteState::Unpaid || state == MeltQuoteState::Failed {
  1240. self.delete_melt_request(quote_id).await?;
  1241. }
  1242. Ok((old_state, quote))
  1243. }
  1244. async fn get_mint_quote(&mut self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
  1245. get_mint_quote_inner(&self.inner, quote_id, true).await
  1246. }
  1247. async fn get_melt_quote(
  1248. &mut self,
  1249. quote_id: &QuoteId,
  1250. ) -> Result<Option<mint::MeltQuote>, Self::Err> {
  1251. get_melt_quote_inner(&self.inner, quote_id, true).await
  1252. }
  1253. async fn get_mint_quote_by_request(
  1254. &mut self,
  1255. request: &str,
  1256. ) -> Result<Option<MintQuote>, Self::Err> {
  1257. get_mint_quote_by_request_inner(&self.inner, request, true).await
  1258. }
  1259. async fn get_mint_quote_by_request_lookup_id(
  1260. &mut self,
  1261. request_lookup_id: &PaymentIdentifier,
  1262. ) -> Result<Option<MintQuote>, Self::Err> {
  1263. get_mint_quote_by_request_lookup_id_inner(&self.inner, request_lookup_id, true).await
  1264. }
  1265. }
  1266. #[async_trait]
  1267. impl<RM> MintQuotesDatabase for SQLMintDatabase<RM>
  1268. where
  1269. RM: DatabasePool + 'static,
  1270. {
  1271. type Err = Error;
  1272. async fn get_mint_quote(&self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
  1273. #[cfg(feature = "prometheus")]
  1274. METRICS.inc_in_flight_requests("get_mint_quote");
  1275. #[cfg(feature = "prometheus")]
  1276. let start_time = std::time::Instant::now();
  1277. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1278. let result = get_mint_quote_inner(&*conn, quote_id, false).await;
  1279. #[cfg(feature = "prometheus")]
  1280. {
  1281. let success = result.is_ok();
  1282. METRICS.record_mint_operation("get_mint_quote", success);
  1283. METRICS.record_mint_operation_histogram(
  1284. "get_mint_quote",
  1285. success,
  1286. start_time.elapsed().as_secs_f64(),
  1287. );
  1288. METRICS.dec_in_flight_requests("get_mint_quote");
  1289. }
  1290. result
  1291. }
  1292. async fn get_mint_quote_by_request(
  1293. &self,
  1294. request: &str,
  1295. ) -> Result<Option<MintQuote>, Self::Err> {
  1296. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1297. get_mint_quote_by_request_inner(&*conn, request, false).await
  1298. }
  1299. async fn get_mint_quote_by_request_lookup_id(
  1300. &self,
  1301. request_lookup_id: &PaymentIdentifier,
  1302. ) -> Result<Option<MintQuote>, Self::Err> {
  1303. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1304. get_mint_quote_by_request_lookup_id_inner(&*conn, request_lookup_id, false).await
  1305. }
  1306. async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
  1307. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1308. let mut mint_quotes = query(
  1309. r#"
  1310. SELECT
  1311. id,
  1312. amount,
  1313. unit,
  1314. request,
  1315. expiry,
  1316. request_lookup_id,
  1317. pubkey,
  1318. created_time,
  1319. amount_paid,
  1320. amount_issued,
  1321. payment_method,
  1322. request_lookup_id_kind
  1323. FROM
  1324. mint_quote
  1325. "#,
  1326. )?
  1327. .fetch_all(&*conn)
  1328. .await?
  1329. .into_iter()
  1330. .map(|row| sql_row_to_mint_quote(row, vec![], vec![]))
  1331. .collect::<Result<Vec<_>, _>>()?;
  1332. for quote in mint_quotes.as_mut_slice() {
  1333. let payments = get_mint_quote_payments(&*conn, &quote.id).await?;
  1334. let issuance = get_mint_quote_issuance(&*conn, &quote.id).await?;
  1335. quote.issuance = issuance;
  1336. quote.payments = payments;
  1337. }
  1338. Ok(mint_quotes)
  1339. }
  1340. async fn get_melt_quote(
  1341. &self,
  1342. quote_id: &QuoteId,
  1343. ) -> Result<Option<mint::MeltQuote>, Self::Err> {
  1344. #[cfg(feature = "prometheus")]
  1345. METRICS.inc_in_flight_requests("get_melt_quote");
  1346. #[cfg(feature = "prometheus")]
  1347. let start_time = std::time::Instant::now();
  1348. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1349. let result = get_melt_quote_inner(&*conn, quote_id, false).await;
  1350. #[cfg(feature = "prometheus")]
  1351. {
  1352. let success = result.is_ok();
  1353. METRICS.record_mint_operation("get_melt_quote", success);
  1354. METRICS.record_mint_operation_histogram(
  1355. "get_melt_quote",
  1356. success,
  1357. start_time.elapsed().as_secs_f64(),
  1358. );
  1359. METRICS.dec_in_flight_requests("get_melt_quote");
  1360. }
  1361. result
  1362. }
  1363. async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
  1364. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1365. Ok(query(
  1366. r#"
  1367. SELECT
  1368. id,
  1369. unit,
  1370. amount,
  1371. request,
  1372. fee_reserve,
  1373. expiry,
  1374. state,
  1375. payment_preimage,
  1376. request_lookup_id,
  1377. created_time,
  1378. paid_time,
  1379. payment_method,
  1380. options,
  1381. request_lookup_id_kind
  1382. FROM
  1383. melt_quote
  1384. "#,
  1385. )?
  1386. .fetch_all(&*conn)
  1387. .await?
  1388. .into_iter()
  1389. .map(sql_row_to_melt_quote)
  1390. .collect::<Result<Vec<_>, _>>()?)
  1391. }
  1392. }
  1393. #[async_trait]
  1394. impl<RM> MintProofsDatabase for SQLMintDatabase<RM>
  1395. where
  1396. RM: DatabasePool + 'static,
  1397. {
  1398. type Err = Error;
  1399. async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result<Vec<Option<Proof>>, Self::Err> {
  1400. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1401. let mut proofs = query(
  1402. r#"
  1403. SELECT
  1404. amount,
  1405. keyset_id,
  1406. secret,
  1407. c,
  1408. witness,
  1409. y
  1410. FROM
  1411. proof
  1412. WHERE
  1413. y IN (:ys)
  1414. "#,
  1415. )?
  1416. .bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
  1417. .fetch_all(&*conn)
  1418. .await?
  1419. .into_iter()
  1420. .map(|mut row| {
  1421. Ok((
  1422. column_as_string!(
  1423. row.pop().ok_or(Error::InvalidDbResponse)?,
  1424. PublicKey::from_hex,
  1425. PublicKey::from_slice
  1426. ),
  1427. sql_row_to_proof(row)?,
  1428. ))
  1429. })
  1430. .collect::<Result<HashMap<_, _>, Error>>()?;
  1431. Ok(ys.iter().map(|y| proofs.remove(y)).collect())
  1432. }
  1433. async fn get_proof_ys_by_quote_id(
  1434. &self,
  1435. quote_id: &QuoteId,
  1436. ) -> Result<Vec<PublicKey>, Self::Err> {
  1437. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1438. Ok(query(
  1439. r#"
  1440. SELECT
  1441. amount,
  1442. keyset_id,
  1443. secret,
  1444. c,
  1445. witness
  1446. FROM
  1447. proof
  1448. WHERE
  1449. quote_id = :quote_id
  1450. "#,
  1451. )?
  1452. .bind("quote_id", quote_id.to_string())
  1453. .fetch_all(&*conn)
  1454. .await?
  1455. .into_iter()
  1456. .map(sql_row_to_proof)
  1457. .collect::<Result<Vec<Proof>, _>>()?
  1458. .ys()?)
  1459. }
  1460. async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
  1461. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1462. let mut current_states = get_current_states(&*conn, ys).await?;
  1463. Ok(ys.iter().map(|y| current_states.remove(y)).collect())
  1464. }
  1465. async fn get_proofs_by_keyset_id(
  1466. &self,
  1467. keyset_id: &Id,
  1468. ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
  1469. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1470. Ok(query(
  1471. r#"
  1472. SELECT
  1473. keyset_id,
  1474. amount,
  1475. secret,
  1476. c,
  1477. witness,
  1478. state
  1479. FROM
  1480. proof
  1481. WHERE
  1482. keyset_id=:keyset_id
  1483. "#,
  1484. )?
  1485. .bind("keyset_id", keyset_id.to_string())
  1486. .fetch_all(&*conn)
  1487. .await?
  1488. .into_iter()
  1489. .map(sql_row_to_proof_with_state)
  1490. .collect::<Result<Vec<_>, _>>()?
  1491. .into_iter()
  1492. .unzip())
  1493. }
  1494. /// Get total proofs redeemed by keyset id
  1495. async fn get_total_redeemed(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
  1496. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1497. query(
  1498. r#"
  1499. SELECT
  1500. keyset_id,
  1501. total_redeemed as amount
  1502. FROM
  1503. keyset_amounts
  1504. "#,
  1505. )?
  1506. .fetch_all(&*conn)
  1507. .await?
  1508. .into_iter()
  1509. .map(sql_row_to_hashmap_amount)
  1510. .collect()
  1511. }
  1512. }
  1513. #[async_trait]
  1514. impl<RM> MintSignatureTransaction<'_> for SQLTransaction<RM>
  1515. where
  1516. RM: DatabasePool + 'static,
  1517. {
  1518. type Err = Error;
  1519. async fn add_blind_signatures(
  1520. &mut self,
  1521. blinded_messages: &[PublicKey],
  1522. blind_signatures: &[BlindSignature],
  1523. quote_id: Option<QuoteId>,
  1524. ) -> Result<(), Self::Err> {
  1525. let current_time = unix_time();
  1526. if blinded_messages.len() != blind_signatures.len() {
  1527. return Err(database::Error::Internal(
  1528. "Mismatched array lengths for blinded messages and blind signatures".to_string(),
  1529. ));
  1530. }
  1531. // Select all existing rows for the given blinded messages at once
  1532. let mut existing_rows = query(
  1533. r#"
  1534. SELECT blinded_message, c, dleq_e, dleq_s
  1535. FROM blind_signature
  1536. WHERE blinded_message IN (:blinded_messages)
  1537. FOR UPDATE
  1538. "#,
  1539. )?
  1540. .bind_vec(
  1541. "blinded_messages",
  1542. blinded_messages
  1543. .iter()
  1544. .map(|message| message.to_bytes().to_vec())
  1545. .collect(),
  1546. )
  1547. .fetch_all(&self.inner)
  1548. .await?
  1549. .into_iter()
  1550. .map(|mut row| {
  1551. Ok((
  1552. column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
  1553. (row[0].clone(), row[1].clone(), row[2].clone()),
  1554. ))
  1555. })
  1556. .collect::<Result<HashMap<_, _>, Error>>()?;
  1557. // Iterate over the provided blinded messages and signatures
  1558. for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
  1559. match existing_rows.remove(message) {
  1560. None => {
  1561. // Unknown blind message: Insert new row with all columns
  1562. query(
  1563. r#"
  1564. INSERT INTO blind_signature
  1565. (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
  1566. VALUES
  1567. (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
  1568. "#,
  1569. )?
  1570. .bind("blinded_message", message.to_bytes().to_vec())
  1571. .bind("amount", u64::from(signature.amount) as i64)
  1572. .bind("keyset_id", signature.keyset_id.to_string())
  1573. .bind("c", signature.c.to_bytes().to_vec())
  1574. .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
  1575. .bind(
  1576. "dleq_e",
  1577. signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
  1578. )
  1579. .bind(
  1580. "dleq_s",
  1581. signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
  1582. )
  1583. .bind("created_time", current_time as i64)
  1584. .bind("signed_time", current_time as i64)
  1585. .execute(&self.inner)
  1586. .await?;
  1587. query(
  1588. r#"
  1589. INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
  1590. VALUES (:keyset_id, :amount, 0)
  1591. ON CONFLICT (keyset_id)
  1592. DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
  1593. "#,
  1594. )?
  1595. .bind("amount", u64::from(signature.amount) as i64)
  1596. .bind("keyset_id", signature.keyset_id.to_string())
  1597. .execute(&self.inner)
  1598. .await?;
  1599. }
  1600. Some((c, _dleq_e, _dleq_s)) => {
  1601. // Blind message exists: check if c is NULL
  1602. match c {
  1603. Column::Null => {
  1604. // Blind message with no c: Update with missing columns c, dleq_e, dleq_s
  1605. query(
  1606. r#"
  1607. UPDATE blind_signature
  1608. SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
  1609. WHERE blinded_message = :blinded_message
  1610. "#,
  1611. )?
  1612. .bind("c", signature.c.to_bytes().to_vec())
  1613. .bind(
  1614. "dleq_e",
  1615. signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
  1616. )
  1617. .bind(
  1618. "dleq_s",
  1619. signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
  1620. )
  1621. .bind("blinded_message", message.to_bytes().to_vec())
  1622. .bind("signed_time", current_time as i64)
  1623. .bind("amount", u64::from(signature.amount) as i64)
  1624. .execute(&self.inner)
  1625. .await?;
  1626. query(
  1627. r#"
  1628. INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
  1629. VALUES (:keyset_id, :amount, 0)
  1630. ON CONFLICT (keyset_id)
  1631. DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
  1632. "#,
  1633. )?
  1634. .bind("amount", u64::from(signature.amount) as i64)
  1635. .bind("keyset_id", signature.keyset_id.to_string())
  1636. .execute(&self.inner)
  1637. .await?;
  1638. }
  1639. _ => {
  1640. // Blind message already has c: Error
  1641. tracing::error!(
  1642. "Attempting to add signature to message already signed {}",
  1643. message
  1644. );
  1645. return Err(database::Error::Duplicate);
  1646. }
  1647. }
  1648. }
  1649. }
  1650. }
  1651. debug_assert!(
  1652. existing_rows.is_empty(),
  1653. "Unexpected existing rows remain: {:?}",
  1654. existing_rows.keys().collect::<Vec<_>>()
  1655. );
  1656. if !existing_rows.is_empty() {
  1657. tracing::error!("Did not check all existing rows");
  1658. return Err(Error::Internal(
  1659. "Did not check all existing rows".to_string(),
  1660. ));
  1661. }
  1662. Ok(())
  1663. }
  1664. async fn get_blind_signatures(
  1665. &mut self,
  1666. blinded_messages: &[PublicKey],
  1667. ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
  1668. let mut blinded_signatures = query(
  1669. r#"SELECT
  1670. keyset_id,
  1671. amount,
  1672. c,
  1673. dleq_e,
  1674. dleq_s,
  1675. blinded_message
  1676. FROM
  1677. blind_signature
  1678. WHERE blinded_message IN (:b) AND c IS NOT NULL
  1679. "#,
  1680. )?
  1681. .bind_vec(
  1682. "b",
  1683. blinded_messages
  1684. .iter()
  1685. .map(|b| b.to_bytes().to_vec())
  1686. .collect(),
  1687. )
  1688. .fetch_all(&self.inner)
  1689. .await?
  1690. .into_iter()
  1691. .map(|mut row| {
  1692. Ok((
  1693. column_as_string!(
  1694. &row.pop().ok_or(Error::InvalidDbResponse)?,
  1695. PublicKey::from_hex,
  1696. PublicKey::from_slice
  1697. ),
  1698. sql_row_to_blind_signature(row)?,
  1699. ))
  1700. })
  1701. .collect::<Result<HashMap<_, _>, Error>>()?;
  1702. Ok(blinded_messages
  1703. .iter()
  1704. .map(|y| blinded_signatures.remove(y))
  1705. .collect())
  1706. }
  1707. }
  1708. #[async_trait]
  1709. impl<RM> MintSignaturesDatabase for SQLMintDatabase<RM>
  1710. where
  1711. RM: DatabasePool + 'static,
  1712. {
  1713. type Err = Error;
  1714. async fn get_blind_signatures(
  1715. &self,
  1716. blinded_messages: &[PublicKey],
  1717. ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
  1718. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1719. let mut blinded_signatures = query(
  1720. r#"SELECT
  1721. keyset_id,
  1722. amount,
  1723. c,
  1724. dleq_e,
  1725. dleq_s,
  1726. blinded_message
  1727. FROM
  1728. blind_signature
  1729. WHERE blinded_message IN (:b) AND c IS NOT NULL
  1730. "#,
  1731. )?
  1732. .bind_vec(
  1733. "b",
  1734. blinded_messages
  1735. .iter()
  1736. .map(|b_| b_.to_bytes().to_vec())
  1737. .collect(),
  1738. )
  1739. .fetch_all(&*conn)
  1740. .await?
  1741. .into_iter()
  1742. .map(|mut row| {
  1743. Ok((
  1744. column_as_string!(
  1745. &row.pop().ok_or(Error::InvalidDbResponse)?,
  1746. PublicKey::from_hex,
  1747. PublicKey::from_slice
  1748. ),
  1749. sql_row_to_blind_signature(row)?,
  1750. ))
  1751. })
  1752. .collect::<Result<HashMap<_, _>, Error>>()?;
  1753. Ok(blinded_messages
  1754. .iter()
  1755. .map(|y| blinded_signatures.remove(y))
  1756. .collect())
  1757. }
  1758. async fn get_blind_signatures_for_keyset(
  1759. &self,
  1760. keyset_id: &Id,
  1761. ) -> Result<Vec<BlindSignature>, Self::Err> {
  1762. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1763. Ok(query(
  1764. r#"
  1765. SELECT
  1766. keyset_id,
  1767. amount,
  1768. c,
  1769. dleq_e,
  1770. dleq_s
  1771. FROM
  1772. blind_signature
  1773. WHERE
  1774. keyset_id=:keyset_id AND c IS NOT NULL
  1775. "#,
  1776. )?
  1777. .bind("keyset_id", keyset_id.to_string())
  1778. .fetch_all(&*conn)
  1779. .await?
  1780. .into_iter()
  1781. .map(sql_row_to_blind_signature)
  1782. .collect::<Result<Vec<BlindSignature>, _>>()?)
  1783. }
  1784. /// Get [`BlindSignature`]s for quote
  1785. async fn get_blind_signatures_for_quote(
  1786. &self,
  1787. quote_id: &QuoteId,
  1788. ) -> Result<Vec<BlindSignature>, Self::Err> {
  1789. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1790. Ok(query(
  1791. r#"
  1792. SELECT
  1793. keyset_id,
  1794. amount,
  1795. c,
  1796. dleq_e,
  1797. dleq_s
  1798. FROM
  1799. blind_signature
  1800. WHERE
  1801. quote_id=:quote_id AND c IS NOT NULL
  1802. "#,
  1803. )?
  1804. .bind("quote_id", quote_id.to_string())
  1805. .fetch_all(&*conn)
  1806. .await?
  1807. .into_iter()
  1808. .map(sql_row_to_blind_signature)
  1809. .collect::<Result<Vec<BlindSignature>, _>>()?)
  1810. }
  1811. /// Get total proofs redeemed by keyset id
  1812. async fn get_total_issued(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
  1813. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  1814. query(
  1815. r#"
  1816. SELECT
  1817. keyset_id,
  1818. total_issued as amount
  1819. FROM
  1820. keyset_amounts
  1821. "#,
  1822. )?
  1823. .fetch_all(&*conn)
  1824. .await?
  1825. .into_iter()
  1826. .map(sql_row_to_hashmap_amount)
  1827. .collect()
  1828. }
  1829. }
  1830. #[async_trait]
  1831. impl<RM> database::KVStoreTransaction<Error> for SQLTransaction<RM>
  1832. where
  1833. RM: DatabasePool + 'static,
  1834. {
  1835. async fn kv_read(
  1836. &mut self,
  1837. primary_namespace: &str,
  1838. secondary_namespace: &str,
  1839. key: &str,
  1840. ) -> Result<Option<Vec<u8>>, Error> {
  1841. crate::keyvalue::kv_read_in_transaction(
  1842. &self.inner,
  1843. primary_namespace,
  1844. secondary_namespace,
  1845. key,
  1846. )
  1847. .await
  1848. }
  1849. async fn kv_write(
  1850. &mut self,
  1851. primary_namespace: &str,
  1852. secondary_namespace: &str,
  1853. key: &str,
  1854. value: &[u8],
  1855. ) -> Result<(), Error> {
  1856. crate::keyvalue::kv_write_in_transaction(
  1857. &self.inner,
  1858. primary_namespace,
  1859. secondary_namespace,
  1860. key,
  1861. value,
  1862. )
  1863. .await
  1864. }
  1865. async fn kv_remove(
  1866. &mut self,
  1867. primary_namespace: &str,
  1868. secondary_namespace: &str,
  1869. key: &str,
  1870. ) -> Result<(), Error> {
  1871. crate::keyvalue::kv_remove_in_transaction(
  1872. &self.inner,
  1873. primary_namespace,
  1874. secondary_namespace,
  1875. key,
  1876. )
  1877. .await
  1878. }
  1879. async fn kv_list(
  1880. &mut self,
  1881. primary_namespace: &str,
  1882. secondary_namespace: &str,
  1883. ) -> Result<Vec<String>, Error> {
  1884. crate::keyvalue::kv_list_in_transaction(&self.inner, primary_namespace, secondary_namespace)
  1885. .await
  1886. }
  1887. }
  1888. #[async_trait]
  1889. impl<RM> database::KVStoreDatabase for SQLMintDatabase<RM>
  1890. where
  1891. RM: DatabasePool + 'static,
  1892. {
  1893. type Err = Error;
  1894. async fn kv_read(
  1895. &self,
  1896. primary_namespace: &str,
  1897. secondary_namespace: &str,
  1898. key: &str,
  1899. ) -> Result<Option<Vec<u8>>, Error> {
  1900. crate::keyvalue::kv_read(&self.pool, primary_namespace, secondary_namespace, key).await
  1901. }
  1902. async fn kv_list(
  1903. &self,
  1904. primary_namespace: &str,
  1905. secondary_namespace: &str,
  1906. ) -> Result<Vec<String>, Error> {
  1907. crate::keyvalue::kv_list(&self.pool, primary_namespace, secondary_namespace).await
  1908. }
  1909. }
  1910. #[async_trait]
  1911. impl<RM> database::KVStore for SQLMintDatabase<RM>
  1912. where
  1913. RM: DatabasePool + 'static,
  1914. {
  1915. async fn begin_transaction(
  1916. &self,
  1917. ) -> Result<Box<dyn database::KVStoreTransaction<Self::Err> + Send + Sync>, Error> {
  1918. Ok(Box::new(SQLTransaction {
  1919. inner: ConnectionWithTransaction::new(
  1920. self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
  1921. )
  1922. .await?,
  1923. }))
  1924. }
  1925. }
  1926. #[async_trait]
  1927. impl<RM> SagaTransaction<'_> for SQLTransaction<RM>
  1928. where
  1929. RM: DatabasePool + 'static,
  1930. {
  1931. type Err = Error;
  1932. async fn get_saga(
  1933. &mut self,
  1934. operation_id: &uuid::Uuid,
  1935. ) -> Result<Option<mint::Saga>, Self::Err> {
  1936. Ok(query(
  1937. r#"
  1938. SELECT
  1939. operation_id,
  1940. operation_kind,
  1941. state,
  1942. blinded_secrets,
  1943. input_ys,
  1944. quote_id,
  1945. created_at,
  1946. updated_at
  1947. FROM
  1948. saga_state
  1949. WHERE
  1950. operation_id = :operation_id
  1951. FOR UPDATE
  1952. "#,
  1953. )?
  1954. .bind("operation_id", operation_id.to_string())
  1955. .fetch_one(&self.inner)
  1956. .await?
  1957. .map(sql_row_to_saga)
  1958. .transpose()?)
  1959. }
  1960. async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
  1961. let current_time = unix_time();
  1962. let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
  1963. .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {e}")))?;
  1964. let input_ys_json = serde_json::to_string(&saga.input_ys)
  1965. .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {e}")))?;
  1966. query(
  1967. r#"
  1968. INSERT INTO saga_state
  1969. (operation_id, operation_kind, state, blinded_secrets, input_ys, quote_id, created_at, updated_at)
  1970. VALUES
  1971. (:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :quote_id, :created_at, :updated_at)
  1972. "#,
  1973. )?
  1974. .bind("operation_id", saga.operation_id.to_string())
  1975. .bind("operation_kind", saga.operation_kind.to_string())
  1976. .bind("state", saga.state.state())
  1977. .bind("blinded_secrets", blinded_secrets_json)
  1978. .bind("input_ys", input_ys_json)
  1979. .bind("quote_id", saga.quote_id.as_deref())
  1980. .bind("created_at", saga.created_at as i64)
  1981. .bind("updated_at", current_time as i64)
  1982. .execute(&self.inner)
  1983. .await?;
  1984. Ok(())
  1985. }
  1986. async fn update_saga(
  1987. &mut self,
  1988. operation_id: &uuid::Uuid,
  1989. new_state: mint::SagaStateEnum,
  1990. ) -> Result<(), Self::Err> {
  1991. let current_time = unix_time();
  1992. query(
  1993. r#"
  1994. UPDATE saga_state
  1995. SET state = :state, updated_at = :updated_at
  1996. WHERE operation_id = :operation_id
  1997. "#,
  1998. )?
  1999. .bind("state", new_state.state())
  2000. .bind("updated_at", current_time as i64)
  2001. .bind("operation_id", operation_id.to_string())
  2002. .execute(&self.inner)
  2003. .await?;
  2004. Ok(())
  2005. }
  2006. async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
  2007. query(
  2008. r#"
  2009. DELETE FROM saga_state
  2010. WHERE operation_id = :operation_id
  2011. "#,
  2012. )?
  2013. .bind("operation_id", operation_id.to_string())
  2014. .execute(&self.inner)
  2015. .await?;
  2016. Ok(())
  2017. }
  2018. }
  2019. #[async_trait]
  2020. impl<RM> SagaDatabase for SQLMintDatabase<RM>
  2021. where
  2022. RM: DatabasePool + 'static,
  2023. {
  2024. type Err = Error;
  2025. async fn get_incomplete_sagas(
  2026. &self,
  2027. operation_kind: mint::OperationKind,
  2028. ) -> Result<Vec<mint::Saga>, Self::Err> {
  2029. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  2030. Ok(query(
  2031. r#"
  2032. SELECT
  2033. operation_id,
  2034. operation_kind,
  2035. state,
  2036. blinded_secrets,
  2037. input_ys,
  2038. quote_id,
  2039. created_at,
  2040. updated_at
  2041. FROM
  2042. saga_state
  2043. WHERE
  2044. operation_kind = :operation_kind
  2045. ORDER BY created_at ASC
  2046. "#,
  2047. )?
  2048. .bind("operation_kind", operation_kind.to_string())
  2049. .fetch_all(&*conn)
  2050. .await?
  2051. .into_iter()
  2052. .map(sql_row_to_saga)
  2053. .collect::<Result<Vec<_>, _>>()?)
  2054. }
  2055. }
  2056. #[async_trait]
  2057. impl<RM> CompletedOperationsTransaction<'_> for SQLTransaction<RM>
  2058. where
  2059. RM: DatabasePool + 'static,
  2060. {
  2061. type Err = Error;
  2062. async fn add_completed_operation(
  2063. &mut self,
  2064. operation: &mint::Operation,
  2065. fee_by_keyset: &std::collections::HashMap<cdk_common::nuts::Id, cdk_common::Amount>,
  2066. ) -> Result<(), Self::Err> {
  2067. query(
  2068. r#"
  2069. INSERT INTO completed_operations
  2070. (operation_id, operation_kind, completed_at, total_issued, total_redeemed, fee_collected, payment_amount, payment_fee, payment_method)
  2071. VALUES
  2072. (:operation_id, :operation_kind, :completed_at, :total_issued, :total_redeemed, :fee_collected, :payment_amount, :payment_fee, :payment_method)
  2073. "#,
  2074. )?
  2075. .bind("operation_id", operation.id().to_string())
  2076. .bind("operation_kind", operation.kind().to_string())
  2077. .bind("completed_at", operation.completed_at().unwrap_or(unix_time()) as i64)
  2078. .bind("total_issued", operation.total_issued().to_u64() as i64)
  2079. .bind("total_redeemed", operation.total_redeemed().to_u64() as i64)
  2080. .bind("fee_collected", operation.fee_collected().to_u64() as i64)
  2081. .bind("payment_amount", operation.payment_amount().map(|a| a.to_u64() as i64))
  2082. .bind("payment_fee", operation.payment_fee().map(|a| a.to_u64() as i64))
  2083. .bind("payment_method", operation.payment_method().map(|m| m.to_string()))
  2084. .execute(&self.inner)
  2085. .await?;
  2086. // Update keyset_amounts with fee_collected from the breakdown
  2087. for (keyset_id, fee) in fee_by_keyset {
  2088. if fee.to_u64() > 0 {
  2089. query(
  2090. r#"
  2091. INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed, fee_collected)
  2092. VALUES (:keyset_id, 0, 0, :fee)
  2093. ON CONFLICT (keyset_id)
  2094. DO UPDATE SET fee_collected = keyset_amounts.fee_collected + EXCLUDED.fee_collected
  2095. "#,
  2096. )?
  2097. .bind("keyset_id", keyset_id.to_string())
  2098. .bind("fee", fee.to_u64() as i64)
  2099. .execute(&self.inner)
  2100. .await?;
  2101. }
  2102. }
  2103. Ok(())
  2104. }
  2105. }
  2106. #[async_trait]
  2107. impl<RM> CompletedOperationsDatabase for SQLMintDatabase<RM>
  2108. where
  2109. RM: DatabasePool + 'static,
  2110. {
  2111. type Err = Error;
  2112. async fn get_completed_operation(
  2113. &self,
  2114. operation_id: &uuid::Uuid,
  2115. ) -> Result<Option<mint::Operation>, Self::Err> {
  2116. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  2117. Ok(query(
  2118. r#"
  2119. SELECT
  2120. operation_id,
  2121. operation_kind,
  2122. completed_at,
  2123. total_issued,
  2124. total_redeemed,
  2125. fee_collected,
  2126. payment_method
  2127. FROM
  2128. completed_operations
  2129. WHERE
  2130. operation_id = :operation_id
  2131. "#,
  2132. )?
  2133. .bind("operation_id", operation_id.to_string())
  2134. .fetch_one(&*conn)
  2135. .await?
  2136. .map(sql_row_to_completed_operation)
  2137. .transpose()?)
  2138. }
  2139. async fn get_completed_operations_by_kind(
  2140. &self,
  2141. operation_kind: mint::OperationKind,
  2142. ) -> Result<Vec<mint::Operation>, Self::Err> {
  2143. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  2144. Ok(query(
  2145. r#"
  2146. SELECT
  2147. operation_id,
  2148. operation_kind,
  2149. completed_at,
  2150. total_issued,
  2151. total_redeemed,
  2152. fee_collected,
  2153. payment_method
  2154. FROM
  2155. completed_operations
  2156. WHERE
  2157. operation_kind = :operation_kind
  2158. ORDER BY completed_at DESC
  2159. "#,
  2160. )?
  2161. .bind("operation_kind", operation_kind.to_string())
  2162. .fetch_all(&*conn)
  2163. .await?
  2164. .into_iter()
  2165. .map(sql_row_to_completed_operation)
  2166. .collect::<Result<Vec<_>, _>>()?)
  2167. }
  2168. async fn get_completed_operations(&self) -> Result<Vec<mint::Operation>, Self::Err> {
  2169. let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
  2170. Ok(query(
  2171. r#"
  2172. SELECT
  2173. operation_id,
  2174. operation_kind,
  2175. completed_at,
  2176. total_issued,
  2177. total_redeemed,
  2178. fee_collected,
  2179. payment_method
  2180. FROM
  2181. completed_operations
  2182. ORDER BY completed_at DESC
  2183. "#,
  2184. )?
  2185. .fetch_all(&*conn)
  2186. .await?
  2187. .into_iter()
  2188. .map(sql_row_to_completed_operation)
  2189. .collect::<Result<Vec<_>, _>>()?)
  2190. }
  2191. }
  2192. #[async_trait]
  2193. impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
  2194. where
  2195. RM: DatabasePool + 'static,
  2196. {
  2197. async fn begin_transaction<'a>(
  2198. &'a self,
  2199. ) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
  2200. let tx = SQLTransaction {
  2201. inner: ConnectionWithTransaction::new(
  2202. self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
  2203. )
  2204. .await?,
  2205. };
  2206. Ok(Box::new(tx))
  2207. }
  2208. }
  2209. fn sql_row_to_keyset_info(row: Vec<Column>) -> Result<MintKeySetInfo, Error> {
  2210. unpack_into!(
  2211. let (
  2212. id,
  2213. unit,
  2214. active,
  2215. valid_from,
  2216. valid_to,
  2217. derivation_path,
  2218. derivation_path_index,
  2219. amounts,
  2220. row_keyset_ppk
  2221. ) = row
  2222. );
  2223. let amounts = column_as_nullable_string!(amounts)
  2224. .and_then(|str| serde_json::from_str(&str).ok())
  2225. .ok_or_else(|| Error::Database("amounts field is required".to_string().into()))?;
  2226. Ok(MintKeySetInfo {
  2227. id: column_as_string!(id, Id::from_str, Id::from_bytes),
  2228. unit: column_as_string!(unit, CurrencyUnit::from_str),
  2229. active: matches!(active, Column::Integer(1)),
  2230. valid_from: column_as_number!(valid_from),
  2231. derivation_path: column_as_string!(derivation_path, DerivationPath::from_str),
  2232. derivation_path_index: column_as_nullable_number!(derivation_path_index),
  2233. amounts,
  2234. input_fee_ppk: column_as_number!(row_keyset_ppk),
  2235. final_expiry: column_as_nullable_number!(valid_to),
  2236. })
  2237. }
  2238. #[instrument(skip_all)]
  2239. fn sql_row_to_mint_quote(
  2240. row: Vec<Column>,
  2241. payments: Vec<IncomingPayment>,
  2242. issueances: Vec<Issuance>,
  2243. ) -> Result<MintQuote, Error> {
  2244. unpack_into!(
  2245. let (
  2246. id, amount, unit, request, expiry, request_lookup_id,
  2247. pubkey, created_time, amount_paid, amount_issued, payment_method, request_lookup_id_kind
  2248. ) = row
  2249. );
  2250. let request_str = column_as_string!(&request);
  2251. let request_lookup_id = column_as_nullable_string!(&request_lookup_id).unwrap_or_else(|| {
  2252. Bolt11Invoice::from_str(&request_str)
  2253. .map(|invoice| invoice.payment_hash().to_string())
  2254. .unwrap_or_else(|_| request_str.clone())
  2255. });
  2256. let request_lookup_id_kind = column_as_string!(request_lookup_id_kind);
  2257. let pubkey = column_as_nullable_string!(&pubkey)
  2258. .map(|pk| PublicKey::from_hex(&pk))
  2259. .transpose()?;
  2260. let id = column_as_string!(id);
  2261. let amount: Option<u64> = column_as_nullable_number!(amount);
  2262. let amount_paid: u64 = column_as_number!(amount_paid);
  2263. let amount_issued: u64 = column_as_number!(amount_issued);
  2264. let payment_method = column_as_string!(payment_method, PaymentMethod::from_str);
  2265. Ok(MintQuote::new(
  2266. Some(QuoteId::from_str(&id)?),
  2267. request_str,
  2268. column_as_string!(unit, CurrencyUnit::from_str),
  2269. amount.map(Amount::from),
  2270. column_as_number!(expiry),
  2271. PaymentIdentifier::new(&request_lookup_id_kind, &request_lookup_id)
  2272. .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
  2273. pubkey,
  2274. amount_paid.into(),
  2275. amount_issued.into(),
  2276. payment_method,
  2277. column_as_number!(created_time),
  2278. payments,
  2279. issueances,
  2280. ))
  2281. }
  2282. // FIXME: Replace unwrap with proper error handling
  2283. fn sql_row_to_melt_quote(row: Vec<Column>) -> Result<mint::MeltQuote, Error> {
  2284. unpack_into!(
  2285. let (
  2286. id,
  2287. unit,
  2288. amount,
  2289. request,
  2290. fee_reserve,
  2291. expiry,
  2292. state,
  2293. payment_preimage,
  2294. request_lookup_id,
  2295. created_time,
  2296. paid_time,
  2297. payment_method,
  2298. options,
  2299. request_lookup_id_kind
  2300. ) = row
  2301. );
  2302. let id = column_as_string!(id);
  2303. let amount: u64 = column_as_number!(amount);
  2304. let fee_reserve: u64 = column_as_number!(fee_reserve);
  2305. let expiry = column_as_number!(expiry);
  2306. let payment_preimage = column_as_nullable_string!(payment_preimage);
  2307. let options = column_as_nullable_string!(options);
  2308. let options = options.and_then(|o| serde_json::from_str(&o).ok());
  2309. let created_time: i64 = column_as_number!(created_time);
  2310. let paid_time = column_as_nullable_number!(paid_time);
  2311. let payment_method = PaymentMethod::from_str(&column_as_string!(payment_method))?;
  2312. let state =
  2313. MeltQuoteState::from_str(&column_as_string!(&state)).map_err(ConversionError::from)?;
  2314. let unit = column_as_string!(unit);
  2315. let request = column_as_string!(request);
  2316. let request_lookup_id_kind = column_as_nullable_string!(request_lookup_id_kind);
  2317. let request_lookup_id = column_as_nullable_string!(&request_lookup_id).or_else(|| {
  2318. Bolt11Invoice::from_str(&request)
  2319. .ok()
  2320. .map(|invoice| invoice.payment_hash().to_string())
  2321. });
  2322. let request_lookup_id = if let (Some(id_kind), Some(request_lookup_id)) =
  2323. (request_lookup_id_kind, request_lookup_id)
  2324. {
  2325. Some(
  2326. PaymentIdentifier::new(&id_kind, &request_lookup_id)
  2327. .map_err(|_| ConversionError::MissingParameter("Payment id".to_string()))?,
  2328. )
  2329. } else {
  2330. None
  2331. };
  2332. let request = match serde_json::from_str(&request) {
  2333. Ok(req) => req,
  2334. Err(err) => {
  2335. tracing::debug!(
  2336. "Melt quote from pre migrations defaulting to bolt11 {}.",
  2337. err
  2338. );
  2339. let bolt11 = Bolt11Invoice::from_str(&request)
  2340. .map_err(|e| Error::Internal(format!("Could not parse invoice: {e}")))?;
  2341. MeltPaymentRequest::Bolt11 { bolt11 }
  2342. }
  2343. };
  2344. Ok(MeltQuote {
  2345. id: QuoteId::from_str(&id)?,
  2346. unit: CurrencyUnit::from_str(&unit)?,
  2347. amount: Amount::from(amount),
  2348. request,
  2349. fee_reserve: Amount::from(fee_reserve),
  2350. state,
  2351. expiry,
  2352. payment_preimage,
  2353. request_lookup_id,
  2354. options,
  2355. created_time: created_time as u64,
  2356. paid_time,
  2357. payment_method,
  2358. })
  2359. }
  2360. fn sql_row_to_proof(row: Vec<Column>) -> Result<Proof, Error> {
  2361. unpack_into!(
  2362. let (
  2363. amount,
  2364. keyset_id,
  2365. secret,
  2366. c,
  2367. witness
  2368. ) = row
  2369. );
  2370. let amount: u64 = column_as_number!(amount);
  2371. Ok(Proof {
  2372. amount: Amount::from(amount),
  2373. keyset_id: column_as_string!(keyset_id, Id::from_str),
  2374. secret: column_as_string!(secret, Secret::from_str),
  2375. c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
  2376. witness: column_as_nullable_string!(witness).and_then(|w| serde_json::from_str(&w).ok()),
  2377. dleq: None,
  2378. })
  2379. }
  2380. fn sql_row_to_hashmap_amount(row: Vec<Column>) -> Result<(Id, Amount), Error> {
  2381. unpack_into!(
  2382. let (
  2383. keyset_id, amount
  2384. ) = row
  2385. );
  2386. let amount: u64 = column_as_number!(amount);
  2387. Ok((
  2388. column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
  2389. Amount::from(amount),
  2390. ))
  2391. }
  2392. fn sql_row_to_proof_with_state(row: Vec<Column>) -> Result<(Proof, Option<State>), Error> {
  2393. unpack_into!(
  2394. let (
  2395. keyset_id, amount, secret, c, witness, state
  2396. ) = row
  2397. );
  2398. let amount: u64 = column_as_number!(amount);
  2399. let state = column_as_nullable_string!(state).and_then(|s| State::from_str(&s).ok());
  2400. Ok((
  2401. Proof {
  2402. amount: Amount::from(amount),
  2403. keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
  2404. secret: column_as_string!(secret, Secret::from_str),
  2405. c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
  2406. witness: column_as_nullable_string!(witness)
  2407. .and_then(|w| serde_json::from_str(&w).ok()),
  2408. dleq: None,
  2409. },
  2410. state,
  2411. ))
  2412. }
  2413. fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error> {
  2414. unpack_into!(
  2415. let (
  2416. keyset_id, amount, c, dleq_e, dleq_s
  2417. ) = row
  2418. );
  2419. let dleq = match (
  2420. column_as_nullable_string!(dleq_e),
  2421. column_as_nullable_string!(dleq_s),
  2422. ) {
  2423. (Some(e), Some(s)) => Some(BlindSignatureDleq {
  2424. e: SecretKey::from_hex(e)?,
  2425. s: SecretKey::from_hex(s)?,
  2426. }),
  2427. _ => None,
  2428. };
  2429. let amount: u64 = column_as_number!(amount);
  2430. Ok(BlindSignature {
  2431. amount: Amount::from(amount),
  2432. keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
  2433. c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
  2434. dleq,
  2435. })
  2436. }
  2437. fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
  2438. unpack_into!(
  2439. let (
  2440. operation_id,
  2441. operation_kind,
  2442. state,
  2443. blinded_secrets,
  2444. input_ys,
  2445. quote_id,
  2446. created_at,
  2447. updated_at
  2448. ) = row
  2449. );
  2450. let operation_id_str = column_as_string!(&operation_id);
  2451. let operation_id = uuid::Uuid::parse_str(&operation_id_str)
  2452. .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
  2453. let operation_kind_str = column_as_string!(&operation_kind);
  2454. let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
  2455. .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
  2456. let state_str = column_as_string!(&state);
  2457. let state = mint::SagaStateEnum::new(operation_kind, &state_str)
  2458. .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
  2459. let blinded_secrets_str = column_as_string!(&blinded_secrets);
  2460. let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
  2461. .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {e}")))?;
  2462. let input_ys_str = column_as_string!(&input_ys);
  2463. let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
  2464. .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {e}")))?;
  2465. let quote_id = match &quote_id {
  2466. Column::Text(s) => {
  2467. if s.is_empty() {
  2468. None
  2469. } else {
  2470. Some(s.clone())
  2471. }
  2472. }
  2473. Column::Null => None,
  2474. _ => None,
  2475. };
  2476. let created_at: u64 = column_as_number!(created_at);
  2477. let updated_at: u64 = column_as_number!(updated_at);
  2478. Ok(mint::Saga {
  2479. operation_id,
  2480. operation_kind,
  2481. state,
  2482. blinded_secrets,
  2483. input_ys,
  2484. quote_id,
  2485. created_at,
  2486. updated_at,
  2487. })
  2488. }
  2489. fn sql_row_to_completed_operation(row: Vec<Column>) -> Result<mint::Operation, Error> {
  2490. unpack_into!(
  2491. let (
  2492. operation_id,
  2493. operation_kind,
  2494. completed_at,
  2495. total_issued,
  2496. total_redeemed,
  2497. fee_collected,
  2498. payment_method
  2499. ) = row
  2500. );
  2501. let operation_id_str = column_as_string!(&operation_id);
  2502. let operation_id = uuid::Uuid::parse_str(&operation_id_str)
  2503. .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
  2504. let operation_kind_str = column_as_string!(&operation_kind);
  2505. let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
  2506. .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
  2507. let completed_at: u64 = column_as_number!(completed_at);
  2508. let total_issued_u64: u64 = column_as_number!(total_issued);
  2509. let total_redeemed_u64: u64 = column_as_number!(total_redeemed);
  2510. let fee_collected_u64: u64 = column_as_number!(fee_collected);
  2511. let total_issued = Amount::from(total_issued_u64);
  2512. let total_redeemed = Amount::from(total_redeemed_u64);
  2513. let fee_collected = Amount::from(fee_collected_u64);
  2514. let payment_method = column_as_nullable_string!(payment_method)
  2515. .map(|s| PaymentMethod::from_str(&s))
  2516. .transpose()
  2517. .map_err(|e| Error::Internal(format!("Invalid payment method: {e}")))?;
  2518. Ok(mint::Operation::new(
  2519. operation_id,
  2520. operation_kind,
  2521. total_issued,
  2522. total_redeemed,
  2523. fee_collected,
  2524. Some(completed_at),
  2525. payment_method,
  2526. ))
  2527. }
  2528. #[cfg(test)]
  2529. mod test {
  2530. use super::*;
  2531. mod keyset_amounts_tests {
  2532. use super::*;
  2533. #[test]
  2534. fn keyset_with_amounts() {
  2535. let amounts = (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>();
  2536. let result = sql_row_to_keyset_info(vec![
  2537. Column::Text("0083a60439303340".to_owned()),
  2538. Column::Text("sat".to_owned()),
  2539. Column::Integer(1),
  2540. Column::Integer(1749844864),
  2541. Column::Null,
  2542. Column::Text("0'/0'/0'".to_owned()),
  2543. Column::Integer(0),
  2544. Column::Text(serde_json::to_string(&amounts).expect("valid json")),
  2545. Column::Integer(0),
  2546. ]);
  2547. assert!(result.is_ok());
  2548. let keyset = result.unwrap();
  2549. assert_eq!(keyset.amounts.len(), 32);
  2550. }
  2551. }
  2552. }