mod.rs 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155
  1. //! Cashu Mint
  2. use std::collections::HashMap;
  3. use std::sync::Arc;
  4. use std::time::Duration;
  5. use arc_swap::ArcSwap;
  6. use cdk_common::amount::to_unit;
  7. use cdk_common::common::{PaymentProcessorKey, QuoteTTL};
  8. #[cfg(feature = "auth")]
  9. use cdk_common::database::DynMintAuthDatabase;
  10. use cdk_common::database::{self, DynMintDatabase};
  11. use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind};
  12. use cdk_common::payment::{DynMintPayment, WaitPaymentResponse};
  13. pub use cdk_common::quote_id::QuoteId;
  14. use cdk_common::secret;
  15. #[cfg(feature = "prometheus")]
  16. use cdk_prometheus::global;
  17. use cdk_signatory::signatory::{Signatory, SignatoryKeySet};
  18. use futures::StreamExt;
  19. #[cfg(feature = "auth")]
  20. use nut21::ProtectedEndpoint;
  21. use subscription::PubSubManager;
  22. use tokio::sync::{Mutex, Notify};
  23. use tokio::task::{JoinHandle, JoinSet};
  24. use tracing::instrument;
  25. use crate::error::Error;
  26. use crate::fees::calculate_fee;
  27. use crate::nuts::*;
  28. use crate::Amount;
  29. #[cfg(feature = "auth")]
  30. use crate::OidcClient;
  31. #[cfg(feature = "auth")]
  32. pub(crate) mod auth;
  33. mod builder;
  34. mod check_spendable;
  35. mod issue;
  36. mod keysets;
  37. mod ln;
  38. mod melt;
  39. mod start_up_check;
  40. mod subscription;
  41. mod swap;
  42. mod verification;
  43. pub use builder::{MintBuilder, MintMeltLimits};
  44. pub use cdk_common::mint::{MeltQuote, MintKeySetInfo, MintQuote};
  45. pub use verification::Verification;
  46. const CDK_MINT_PRIMARY_NAMESPACE: &str = "cdk_mint";
  47. const CDK_MINT_CONFIG_SECONDARY_NAMESPACE: &str = "config";
  48. const CDK_MINT_CONFIG_KV_KEY: &str = "mint_info";
  49. const CDK_MINT_QUOTE_TTL_KV_KEY: &str = "quote_ttl";
  50. /// Cashu Mint
  51. #[derive(Clone)]
  52. pub struct Mint {
  53. /// Signatory backend.
  54. ///
  55. /// It is implemented in the cdk-signatory crate, and it can be embedded in the mint or it can
  56. /// be a gRPC client to a remote signatory server.
  57. signatory: Arc<dyn Signatory + Send + Sync>,
  58. /// Mint Storage backend
  59. localstore: DynMintDatabase,
  60. /// Auth Storage backend (only available with auth feature)
  61. #[cfg(feature = "auth")]
  62. auth_localstore: Option<DynMintAuthDatabase>,
  63. /// Payment processors for mint
  64. payment_processors: Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
  65. /// Subscription manager
  66. pubsub_manager: Arc<PubSubManager>,
  67. #[cfg(feature = "auth")]
  68. oidc_client: Option<OidcClient>,
  69. /// In-memory keyset
  70. keysets: Arc<ArcSwap<Vec<SignatoryKeySet>>>,
  71. /// Background task management
  72. task_state: Arc<Mutex<TaskState>>,
  73. }
  74. /// State for managing background tasks
  75. #[derive(Default)]
  76. struct TaskState {
  77. /// Shutdown signal for all background tasks
  78. shutdown_notify: Option<Arc<Notify>>,
  79. /// Handle to the main supervisor task
  80. supervisor_handle: Option<JoinHandle<Result<(), Error>>>,
  81. }
  82. impl Mint {
  83. /// Create new [`Mint`] without authentication
  84. pub async fn new(
  85. mint_info: MintInfo,
  86. signatory: Arc<dyn Signatory + Send + Sync>,
  87. localstore: DynMintDatabase,
  88. payment_processors: HashMap<PaymentProcessorKey, DynMintPayment>,
  89. ) -> Result<Self, Error> {
  90. Self::new_internal(
  91. mint_info,
  92. signatory,
  93. localstore,
  94. #[cfg(feature = "auth")]
  95. None,
  96. payment_processors,
  97. )
  98. .await
  99. }
  100. /// Create new [`Mint`] with authentication support
  101. #[cfg(feature = "auth")]
  102. pub async fn new_with_auth(
  103. mint_info: MintInfo,
  104. signatory: Arc<dyn Signatory + Send + Sync>,
  105. localstore: DynMintDatabase,
  106. auth_localstore: DynMintAuthDatabase,
  107. payment_processors: HashMap<PaymentProcessorKey, DynMintPayment>,
  108. ) -> Result<Self, Error> {
  109. Self::new_internal(
  110. mint_info,
  111. signatory,
  112. localstore,
  113. Some(auth_localstore),
  114. payment_processors,
  115. )
  116. .await
  117. }
  118. /// Internal function to create a new [`Mint`] with shared logic
  119. #[inline]
  120. async fn new_internal(
  121. mint_info: MintInfo,
  122. signatory: Arc<dyn Signatory + Send + Sync>,
  123. localstore: DynMintDatabase,
  124. #[cfg(feature = "auth")] auth_localstore: Option<DynMintAuthDatabase>,
  125. payment_processors: HashMap<PaymentProcessorKey, DynMintPayment>,
  126. ) -> Result<Self, Error> {
  127. let keysets = signatory.keysets().await?;
  128. if !keysets
  129. .keysets
  130. .iter()
  131. .any(|keyset| keyset.active && keyset.unit != CurrencyUnit::Auth)
  132. {
  133. return Err(Error::NoActiveKeyset);
  134. }
  135. tracing::info!(
  136. "Using Signatory {} with {} active keys",
  137. signatory.name(),
  138. keysets
  139. .keysets
  140. .iter()
  141. .filter(|keyset| keyset.active && keyset.unit != CurrencyUnit::Auth)
  142. .count()
  143. );
  144. // Persist missing pubkey early to avoid losing it on next boot and ensure stable identity across restarts
  145. let mut computed_info = mint_info;
  146. if computed_info.pubkey.is_none() {
  147. computed_info.pubkey = Some(keysets.pubkey);
  148. }
  149. match localstore
  150. .kv_read(
  151. CDK_MINT_PRIMARY_NAMESPACE,
  152. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  153. CDK_MINT_CONFIG_KV_KEY,
  154. )
  155. .await?
  156. {
  157. Some(bytes) => {
  158. let mut stored: MintInfo = serde_json::from_slice(&bytes)?;
  159. let mut mutated = false;
  160. if stored.pubkey.is_none() && computed_info.pubkey.is_some() {
  161. stored.pubkey = computed_info.pubkey;
  162. mutated = true;
  163. }
  164. if mutated {
  165. let updated = serde_json::to_vec(&stored)?;
  166. let mut tx = localstore.begin_transaction().await?;
  167. tx.kv_write(
  168. CDK_MINT_PRIMARY_NAMESPACE,
  169. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  170. CDK_MINT_CONFIG_KV_KEY,
  171. &updated,
  172. )
  173. .await?;
  174. tx.commit().await?;
  175. }
  176. }
  177. None => {
  178. let bytes = serde_json::to_vec(&computed_info)?;
  179. let mut tx = localstore.begin_transaction().await?;
  180. tx.kv_write(
  181. CDK_MINT_PRIMARY_NAMESPACE,
  182. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  183. CDK_MINT_CONFIG_KV_KEY,
  184. &bytes,
  185. )
  186. .await?;
  187. tx.commit().await?;
  188. }
  189. }
  190. let payment_processors = Arc::new(payment_processors);
  191. Ok(Self {
  192. signatory,
  193. pubsub_manager: PubSubManager::new((localstore.clone(), payment_processors.clone())),
  194. localstore,
  195. #[cfg(feature = "auth")]
  196. oidc_client: computed_info.nuts.nut21.as_ref().map(|nut21| {
  197. OidcClient::new(
  198. nut21.openid_discovery.clone(),
  199. Some(nut21.client_id.clone()),
  200. )
  201. }),
  202. payment_processors,
  203. #[cfg(feature = "auth")]
  204. auth_localstore,
  205. keysets: Arc::new(ArcSwap::new(keysets.keysets.into())),
  206. task_state: Arc::new(Mutex::new(TaskState::default())),
  207. })
  208. }
  209. /// Start the mint's background services and operations
  210. ///
  211. /// This function immediately starts background services and returns. The background
  212. /// tasks will continue running until `stop()` is called.
  213. ///
  214. /// # Returns
  215. ///
  216. /// Returns `Ok(())` if background services started successfully, or an `Error`
  217. /// if startup failed.
  218. ///
  219. /// # Background Services
  220. ///
  221. /// Currently manages:
  222. /// - Payment processor initialization and startup
  223. /// - Invoice payment monitoring across all configured payment processors
  224. pub async fn start(&self) -> Result<(), Error> {
  225. // Recover from incomplete swap sagas
  226. // This cleans up incomplete swap operations using persisted saga state
  227. if let Err(e) = self.recover_from_incomplete_sagas().await {
  228. tracing::error!("Failed to recover incomplete swap sagas: {}", e);
  229. // Don't fail startup
  230. }
  231. // Recover from incomplete melt sagas
  232. // This cleans up incomplete melt operations using persisted saga state
  233. // Now includes checking payment status with LN backend to determine
  234. // whether to finalize (if paid) or compensate (if failed/unpaid)
  235. if let Err(e) = self.recover_from_incomplete_melt_sagas().await {
  236. tracing::error!("Failed to recover incomplete melt sagas: {}", e);
  237. // Don't fail startup
  238. }
  239. let mut task_state = self.task_state.lock().await;
  240. // Prevent starting if already running
  241. if task_state.shutdown_notify.is_some() {
  242. return Err(Error::Internal); // Already started
  243. }
  244. // Start all payment processors first
  245. tracing::info!("Starting payment processors...");
  246. let mut seen_processors = Vec::new();
  247. for (key, processor) in self.payment_processors.iter() {
  248. // Skip if we've already spawned a task for this processor instance
  249. if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
  250. continue;
  251. }
  252. seen_processors.push(Arc::clone(processor));
  253. tracing::info!("Starting payment wait task for {:?}", key);
  254. match processor.start().await {
  255. Ok(()) => {
  256. tracing::debug!("Successfully started payment processor for {:?}", key);
  257. }
  258. Err(e) => {
  259. // Log the error but continue with other processors
  260. tracing::error!("Failed to start payment processor for {:?}: {}", key, e);
  261. return Err(e.into());
  262. }
  263. }
  264. }
  265. tracing::info!("Payment processor startup completed");
  266. // Create shutdown signal
  267. let shutdown_notify = Arc::new(Notify::new());
  268. // Clone required components for the background task
  269. let payment_processors = self.payment_processors.clone();
  270. let localstore = Arc::clone(&self.localstore);
  271. let pubsub_manager = Arc::clone(&self.pubsub_manager);
  272. let shutdown_clone = shutdown_notify.clone();
  273. // Spawn the supervisor task
  274. let supervisor_handle = tokio::spawn(async move {
  275. Self::wait_for_paid_invoices(
  276. &payment_processors,
  277. localstore,
  278. pubsub_manager,
  279. shutdown_clone,
  280. )
  281. .await
  282. });
  283. // Store the handles
  284. task_state.shutdown_notify = Some(shutdown_notify);
  285. task_state.supervisor_handle = Some(supervisor_handle);
  286. // Give the background task a tiny bit of time to start waiting
  287. tokio::time::sleep(std::time::Duration::from_millis(10)).await;
  288. tracing::info!("Mint background services started");
  289. Ok(())
  290. }
  291. /// Stop all background services and wait for graceful shutdown
  292. ///
  293. /// This function signals all background tasks to shut down and waits for them
  294. /// to complete gracefully. It's safe to call multiple times.
  295. ///
  296. /// # Returns
  297. ///
  298. /// Returns `Ok(())` when all background services have shut down cleanly, or an
  299. /// `Error` if there was an issue during shutdown.
  300. pub async fn stop(&self) -> Result<(), Error> {
  301. let mut task_state = self.task_state.lock().await;
  302. // Take the handles out of the state
  303. let shutdown_notify = task_state.shutdown_notify.take();
  304. let supervisor_handle = task_state.supervisor_handle.take();
  305. // If nothing to stop, return early
  306. let (shutdown_notify, supervisor_handle) = match (shutdown_notify, supervisor_handle) {
  307. (Some(notify), Some(handle)) => (notify, handle),
  308. _ => {
  309. tracing::debug!("Stop called but no background services were running");
  310. // Still try to stop payment processors
  311. return self.stop_payment_processors().await;
  312. }
  313. };
  314. // Drop the lock before waiting
  315. drop(task_state);
  316. tracing::info!("Stopping mint background services...");
  317. // Signal shutdown
  318. shutdown_notify.notify_waiters();
  319. // Wait for supervisor to complete
  320. let result = match supervisor_handle.await {
  321. Ok(result) => {
  322. tracing::info!("Mint background services stopped");
  323. result
  324. }
  325. Err(join_error) => {
  326. tracing::error!("Background service task panicked: {:?}", join_error);
  327. Err(Error::Internal)
  328. }
  329. };
  330. // Stop all payment processors
  331. self.stop_payment_processors().await?;
  332. result
  333. }
  334. /// Stop all payment processors
  335. async fn stop_payment_processors(&self) -> Result<(), Error> {
  336. tracing::info!("Stopping payment processors...");
  337. let mut seen_processors = Vec::new();
  338. for (key, processor) in self.payment_processors.iter() {
  339. // Skip if we've already spawned a task for this processor instance
  340. if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
  341. continue;
  342. }
  343. seen_processors.push(Arc::clone(processor));
  344. match processor.stop().await {
  345. Ok(()) => {
  346. tracing::debug!("Successfully stopped payment processor for {:?}", key);
  347. }
  348. Err(e) => {
  349. // Log the error but continue with other processors
  350. tracing::error!("Failed to stop payment processor for {:?}: {}", key, e);
  351. }
  352. }
  353. }
  354. tracing::info!("Payment processor shutdown completed");
  355. Ok(())
  356. }
  357. /// Get the payment processor for the given unit and payment method
  358. pub fn get_payment_processor(
  359. &self,
  360. unit: CurrencyUnit,
  361. payment_method: PaymentMethod,
  362. ) -> Result<DynMintPayment, Error> {
  363. let key = PaymentProcessorKey::new(unit.clone(), payment_method.clone());
  364. self.payment_processors.get(&key).cloned().ok_or_else(|| {
  365. tracing::info!(
  366. "No payment processor set for pair {}, {}",
  367. unit,
  368. payment_method
  369. );
  370. Error::UnsupportedUnit
  371. })
  372. }
  373. /// Localstore
  374. pub fn localstore(&self) -> DynMintDatabase {
  375. Arc::clone(&self.localstore)
  376. }
  377. /// Pub Sub manager
  378. pub fn pubsub_manager(&self) -> Arc<PubSubManager> {
  379. Arc::clone(&self.pubsub_manager)
  380. }
  381. /// Get mint info
  382. #[instrument(skip_all)]
  383. pub async fn mint_info(&self) -> Result<MintInfo, Error> {
  384. let mint_info = self
  385. .localstore
  386. .kv_read(
  387. CDK_MINT_PRIMARY_NAMESPACE,
  388. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  389. CDK_MINT_CONFIG_KV_KEY,
  390. )
  391. .await?
  392. .ok_or(Error::CouldNotGetMintInfo)?;
  393. let mint_info: MintInfo = serde_json::from_slice(&mint_info)?;
  394. #[cfg(feature = "auth")]
  395. let mint_info = if let Some(auth_db) = self.auth_localstore.as_ref() {
  396. let mut mint_info = mint_info;
  397. let auth_endpoints = auth_db.get_auth_for_endpoints().await?;
  398. let mut clear_auth_endpoints: Vec<ProtectedEndpoint> = vec![];
  399. let mut blind_auth_endpoints: Vec<ProtectedEndpoint> = vec![];
  400. for (endpoint, auth) in auth_endpoints {
  401. match auth {
  402. Some(AuthRequired::Clear) => {
  403. clear_auth_endpoints.push(endpoint);
  404. }
  405. Some(AuthRequired::Blind) => {
  406. blind_auth_endpoints.push(endpoint);
  407. }
  408. None => (),
  409. }
  410. }
  411. mint_info.nuts.nut21 = mint_info.nuts.nut21.map(|mut a| {
  412. a.protected_endpoints = clear_auth_endpoints;
  413. a
  414. });
  415. mint_info.nuts.nut22 = mint_info.nuts.nut22.map(|mut a| {
  416. a.protected_endpoints = blind_auth_endpoints;
  417. a
  418. });
  419. mint_info
  420. } else {
  421. mint_info
  422. };
  423. Ok(mint_info)
  424. }
  425. /// Set mint info
  426. #[instrument(skip_all)]
  427. pub async fn set_mint_info(&self, mint_info: MintInfo) -> Result<(), Error> {
  428. tracing::info!("Updating mint info");
  429. let mint_info_bytes = serde_json::to_vec(&mint_info)?;
  430. let mut tx = self.localstore.begin_transaction().await?;
  431. tx.kv_write(
  432. CDK_MINT_PRIMARY_NAMESPACE,
  433. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  434. CDK_MINT_CONFIG_KV_KEY,
  435. &mint_info_bytes,
  436. )
  437. .await?;
  438. tx.commit().await?;
  439. Ok(())
  440. }
  441. /// Get quote ttl
  442. #[instrument(skip_all)]
  443. pub async fn quote_ttl(&self) -> Result<QuoteTTL, Error> {
  444. let quote_ttl_bytes = self
  445. .localstore
  446. .kv_read(
  447. CDK_MINT_PRIMARY_NAMESPACE,
  448. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  449. CDK_MINT_QUOTE_TTL_KV_KEY,
  450. )
  451. .await?;
  452. match quote_ttl_bytes {
  453. Some(bytes) => {
  454. let quote_ttl: QuoteTTL = serde_json::from_slice(&bytes)?;
  455. Ok(quote_ttl)
  456. }
  457. None => {
  458. // Return default if not found
  459. Ok(QuoteTTL::default())
  460. }
  461. }
  462. }
  463. /// Set quote ttl
  464. #[instrument(skip_all)]
  465. pub async fn set_quote_ttl(&self, quote_ttl: QuoteTTL) -> Result<(), Error> {
  466. let quote_ttl_bytes = serde_json::to_vec(&quote_ttl)?;
  467. let mut tx = self.localstore.begin_transaction().await?;
  468. tx.kv_write(
  469. CDK_MINT_PRIMARY_NAMESPACE,
  470. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  471. CDK_MINT_QUOTE_TTL_KV_KEY,
  472. &quote_ttl_bytes,
  473. )
  474. .await?;
  475. tx.commit().await?;
  476. Ok(())
  477. }
  478. /// For each backend starts a task that waits for any invoice to be paid
  479. /// Once invoice is paid mint quote status is updated
  480. /// Returns true if a QuoteTTL is persisted in the database. This is used to avoid overwriting
  481. /// explicit configuration with defaults when the TTL has already been set by an operator.
  482. #[instrument(skip_all)]
  483. pub async fn quote_ttl_is_persisted(&self) -> Result<bool, Error> {
  484. let quote_ttl_bytes = self
  485. .localstore
  486. .kv_read(
  487. CDK_MINT_PRIMARY_NAMESPACE,
  488. CDK_MINT_CONFIG_SECONDARY_NAMESPACE,
  489. CDK_MINT_QUOTE_TTL_KV_KEY,
  490. )
  491. .await?;
  492. Ok(quote_ttl_bytes.is_some())
  493. }
  494. #[instrument(skip_all)]
  495. async fn wait_for_paid_invoices(
  496. payment_processors: &HashMap<PaymentProcessorKey, DynMintPayment>,
  497. localstore: DynMintDatabase,
  498. pubsub_manager: Arc<PubSubManager>,
  499. shutdown: Arc<Notify>,
  500. ) -> Result<(), Error> {
  501. let mut join_set = JoinSet::new();
  502. // Group processors by unique instance (using Arc pointer equality)
  503. let mut seen_processors = Vec::new();
  504. for (key, processor) in payment_processors {
  505. // Skip if processor is already active
  506. if processor.is_wait_invoice_active() {
  507. continue;
  508. }
  509. // Skip if we've already spawned a task for this processor instance
  510. if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
  511. continue;
  512. }
  513. seen_processors.push(Arc::clone(processor));
  514. tracing::info!("Starting payment wait task for {:?}", key);
  515. // Clone for the spawned task
  516. let processor = Arc::clone(processor);
  517. let localstore = Arc::clone(&localstore);
  518. let pubsub_manager = Arc::clone(&pubsub_manager);
  519. let shutdown = Arc::clone(&shutdown);
  520. join_set.spawn(async move {
  521. let result = Self::wait_for_processor_payments(
  522. processor,
  523. localstore,
  524. pubsub_manager,
  525. shutdown,
  526. )
  527. .await;
  528. if let Err(e) = result {
  529. tracing::error!("Payment processor task failed: {:?}", e);
  530. }
  531. });
  532. }
  533. // If no payment processors, just wait for shutdown
  534. if join_set.is_empty() {
  535. shutdown.notified().await;
  536. } else {
  537. // Wait for shutdown or all tasks to complete
  538. loop {
  539. tokio::select! {
  540. _ = shutdown.notified() => {
  541. tracing::info!("Shutting down payment processors");
  542. break;
  543. }
  544. Some(result) = join_set.join_next() => {
  545. if let Err(e) = result {
  546. tracing::warn!("Task panicked: {:?}", e);
  547. }
  548. }
  549. else => break, // All tasks completed
  550. }
  551. }
  552. }
  553. join_set.shutdown().await;
  554. Ok(())
  555. }
  556. /// Handles payment waiting for a single processor
  557. #[instrument(skip_all)]
  558. async fn wait_for_processor_payments(
  559. processor: DynMintPayment,
  560. localstore: DynMintDatabase,
  561. pubsub_manager: Arc<PubSubManager>,
  562. shutdown: Arc<Notify>,
  563. ) -> Result<(), Error> {
  564. loop {
  565. tokio::select! {
  566. _ = shutdown.notified() => {
  567. processor.cancel_wait_invoice();
  568. break;
  569. }
  570. result = processor.wait_payment_event() => {
  571. match result {
  572. Ok(mut stream) => {
  573. while let Some(event) = stream.next().await {
  574. match event {
  575. cdk_common::payment::Event::PaymentReceived(wait_payment_response) => {
  576. if let Err(e) = Self::handle_payment_notification(
  577. &localstore,
  578. &pubsub_manager,
  579. wait_payment_response,
  580. ).await {
  581. tracing::warn!("Payment notification error: {:?}", e);
  582. }
  583. }
  584. }
  585. }
  586. }
  587. Err(e) => {
  588. tracing::warn!("Failed to get payment stream: {}", e);
  589. tokio::time::sleep(Duration::from_secs(5)).await;
  590. }
  591. }
  592. }
  593. }
  594. }
  595. Ok(())
  596. }
  597. /// Handle payment notification without needing full Mint instance
  598. /// This is a helper function that can be called with just the required components
  599. #[instrument(skip_all)]
  600. async fn handle_payment_notification(
  601. localstore: &DynMintDatabase,
  602. pubsub_manager: &Arc<PubSubManager>,
  603. wait_payment_response: WaitPaymentResponse,
  604. ) -> Result<(), Error> {
  605. if wait_payment_response.payment_amount == Amount::ZERO {
  606. tracing::warn!(
  607. "Received payment response with 0 amount with payment id {}.",
  608. wait_payment_response.payment_id
  609. );
  610. return Err(Error::AmountUndefined);
  611. }
  612. let mut tx = localstore.begin_transaction().await?;
  613. if let Ok(Some(mint_quote)) = tx
  614. .get_mint_quote_by_request_lookup_id(&wait_payment_response.payment_identifier)
  615. .await
  616. {
  617. Self::handle_mint_quote_payment(
  618. &mut tx,
  619. &mint_quote,
  620. wait_payment_response,
  621. pubsub_manager,
  622. )
  623. .await?;
  624. } else {
  625. tracing::warn!(
  626. "Could not get request for request lookup id {:?}",
  627. wait_payment_response.payment_identifier
  628. );
  629. }
  630. tx.commit().await?;
  631. Ok(())
  632. }
  633. /// Handle payment for a specific mint quote (extracted from pay_mint_quote)
  634. #[instrument(skip_all)]
  635. async fn handle_mint_quote_payment(
  636. tx: &mut Box<dyn database::MintTransaction<'_, database::Error> + Send + Sync + '_>,
  637. mint_quote: &MintQuote,
  638. wait_payment_response: WaitPaymentResponse,
  639. pubsub_manager: &Arc<PubSubManager>,
  640. ) -> Result<(), Error> {
  641. tracing::debug!(
  642. "Received payment notification of {} {} for mint quote {} with payment id {}",
  643. wait_payment_response.payment_amount,
  644. wait_payment_response.unit,
  645. mint_quote.id,
  646. wait_payment_response.payment_id.to_string()
  647. );
  648. let quote_state = mint_quote.state();
  649. if !mint_quote
  650. .payment_ids()
  651. .contains(&&wait_payment_response.payment_id)
  652. {
  653. if mint_quote.payment_method == PaymentMethod::Bolt11
  654. && (quote_state == MintQuoteState::Issued || quote_state == MintQuoteState::Paid)
  655. {
  656. tracing::info!("Received payment notification for already issued quote.");
  657. } else {
  658. let payment_amount_quote_unit = to_unit(
  659. wait_payment_response.payment_amount,
  660. &wait_payment_response.unit,
  661. &mint_quote.unit,
  662. )?;
  663. if payment_amount_quote_unit == Amount::ZERO {
  664. tracing::error!("Zero amount payments should not be recorded.");
  665. return Err(Error::AmountUndefined);
  666. }
  667. tracing::debug!(
  668. "Payment received amount in quote unit {} {}",
  669. mint_quote.unit,
  670. payment_amount_quote_unit
  671. );
  672. let total_paid = tx
  673. .increment_mint_quote_amount_paid(
  674. &mint_quote.id,
  675. payment_amount_quote_unit,
  676. wait_payment_response.payment_id,
  677. )
  678. .await?;
  679. pubsub_manager.mint_quote_payment(mint_quote, total_paid);
  680. }
  681. } else {
  682. tracing::info!("Received payment notification for already seen payment.");
  683. }
  684. Ok(())
  685. }
  686. /// Fee required for proof set
  687. #[instrument(skip_all)]
  688. pub async fn get_proofs_fee(&self, proofs: &Proofs) -> Result<Amount, Error> {
  689. let mut proofs_per_keyset = HashMap::new();
  690. let mut fee_per_keyset = HashMap::new();
  691. for proof in proofs {
  692. if let std::collections::hash_map::Entry::Vacant(e) =
  693. fee_per_keyset.entry(proof.keyset_id)
  694. {
  695. let mint_keyset_info = self
  696. .get_keyset_info(&proof.keyset_id)
  697. .ok_or(Error::UnknownKeySet)?;
  698. e.insert(mint_keyset_info.input_fee_ppk);
  699. }
  700. proofs_per_keyset
  701. .entry(proof.keyset_id)
  702. .and_modify(|count| *count += 1)
  703. .or_insert(1);
  704. }
  705. let fee = calculate_fee(&proofs_per_keyset, &fee_per_keyset)?;
  706. Ok(fee)
  707. }
  708. /// Get active keysets
  709. pub fn get_active_keysets(&self) -> HashMap<CurrencyUnit, Id> {
  710. self.keysets
  711. .load()
  712. .iter()
  713. .filter_map(|keyset| {
  714. if keyset.active {
  715. Some((keyset.unit.clone(), keyset.id))
  716. } else {
  717. None
  718. }
  719. })
  720. .collect()
  721. }
  722. /// Get keyset info
  723. pub fn get_keyset_info(&self, id: &Id) -> Option<MintKeySetInfo> {
  724. self.keysets
  725. .load()
  726. .iter()
  727. .filter_map(|keyset| {
  728. if keyset.id == *id {
  729. Some(keyset.into())
  730. } else {
  731. None
  732. }
  733. })
  734. .next()
  735. }
  736. /// Blind Sign
  737. #[tracing::instrument(skip_all)]
  738. pub async fn blind_sign(
  739. &self,
  740. blinded_message: Vec<BlindedMessage>,
  741. ) -> Result<Vec<BlindSignature>, Error> {
  742. #[cfg(test)]
  743. {
  744. if crate::test_helpers::mint::should_fail_in_test() {
  745. return Err(Error::SignatureMissingOrInvalid);
  746. }
  747. }
  748. #[cfg(feature = "prometheus")]
  749. global::inc_in_flight_requests("blind_sign");
  750. let result = self.signatory.blind_sign(blinded_message).await;
  751. #[cfg(feature = "prometheus")]
  752. {
  753. global::dec_in_flight_requests("blind_sign");
  754. global::record_mint_operation("blind_sign", result.is_ok());
  755. }
  756. result
  757. }
  758. /// Verify [`Proof`] meets conditions and is signed
  759. #[tracing::instrument(skip_all)]
  760. pub async fn verify_proofs(&self, proofs: Proofs) -> Result<(), Error> {
  761. #[cfg(feature = "prometheus")]
  762. global::inc_in_flight_requests("verify_proofs");
  763. let result = async {
  764. proofs
  765. .iter()
  766. .map(|proof| {
  767. // Check if secret is a nut10 secret with conditions
  768. if let Ok(secret) =
  769. <&secret::Secret as TryInto<nuts::nut10::Secret>>::try_into(&proof.secret)
  770. {
  771. // Checks and verifies known secret kinds.
  772. // If it is an unknown secret kind it will be treated as a normal secret.
  773. // Spending conditions will **not** be check. It is up to the wallet to ensure
  774. // only supported secret kinds are used as there is no way for the mint to
  775. // enforce only signing supported secrets as they are blinded at
  776. // that point.
  777. match secret.kind() {
  778. Kind::P2PK => {
  779. proof.verify_p2pk()?;
  780. }
  781. Kind::HTLC => {
  782. proof.verify_htlc()?;
  783. }
  784. }
  785. }
  786. Ok(())
  787. })
  788. .collect::<Result<Vec<()>, Error>>()?;
  789. self.signatory.verify_proofs(proofs).await
  790. }
  791. .await;
  792. #[cfg(feature = "prometheus")]
  793. {
  794. global::dec_in_flight_requests("verify_proofs");
  795. global::record_mint_operation("verify_proofs", result.is_ok());
  796. }
  797. result
  798. }
  799. /// Restore
  800. #[instrument(skip_all)]
  801. pub async fn restore(&self, request: RestoreRequest) -> Result<RestoreResponse, Error> {
  802. #[cfg(feature = "prometheus")]
  803. global::inc_in_flight_requests("restore");
  804. let result = async {
  805. let output_len = request.outputs.len();
  806. let mut outputs = Vec::with_capacity(output_len);
  807. let mut signatures = Vec::with_capacity(output_len);
  808. let blinded_message: Vec<PublicKey> =
  809. request.outputs.iter().map(|b| b.blinded_secret).collect();
  810. let blinded_signatures = self
  811. .localstore
  812. .get_blind_signatures(&blinded_message)
  813. .await?;
  814. assert_eq!(blinded_signatures.len(), output_len);
  815. for (blinded_message, blinded_signature) in
  816. request.outputs.into_iter().zip(blinded_signatures)
  817. {
  818. if let Some(blinded_signature) = blinded_signature {
  819. outputs.push(blinded_message);
  820. signatures.push(blinded_signature);
  821. }
  822. }
  823. Ok(RestoreResponse {
  824. outputs,
  825. signatures: signatures.clone(),
  826. promises: Some(signatures),
  827. })
  828. }
  829. .await;
  830. #[cfg(feature = "prometheus")]
  831. {
  832. global::dec_in_flight_requests("restore");
  833. global::record_mint_operation("restore", result.is_ok());
  834. }
  835. result
  836. }
  837. /// Get the total amount issed by keyset
  838. #[instrument(skip_all)]
  839. pub async fn total_issued(&self) -> Result<HashMap<Id, Amount>, Error> {
  840. #[cfg(feature = "prometheus")]
  841. global::inc_in_flight_requests("total_issued");
  842. let result = async {
  843. let mut total_issued = self.localstore.get_total_issued().await?;
  844. for keyset in self.keysets().keysets {
  845. total_issued.entry(keyset.id).or_default();
  846. }
  847. Ok(total_issued)
  848. }
  849. .await;
  850. #[cfg(feature = "prometheus")]
  851. {
  852. global::dec_in_flight_requests("total_issued");
  853. global::record_mint_operation("total_issued", result.is_ok());
  854. }
  855. result
  856. }
  857. /// Total redeemed for keyset
  858. #[instrument(skip_all)]
  859. pub async fn total_redeemed(&self) -> Result<HashMap<Id, Amount>, Error> {
  860. #[cfg(feature = "prometheus")]
  861. global::inc_in_flight_requests("total_redeemed");
  862. let total_redeemed = async {
  863. let mut total_redeemed = self.localstore.get_total_redeemed().await?;
  864. for keyset in self.keysets().keysets {
  865. total_redeemed.entry(keyset.id).or_default();
  866. }
  867. Ok(total_redeemed)
  868. }
  869. .await;
  870. #[cfg(feature = "prometheus")]
  871. global::dec_in_flight_requests("total_redeemed");
  872. total_redeemed
  873. }
  874. }
  875. #[cfg(test)]
  876. mod tests {
  877. use std::str::FromStr;
  878. use cdk_sqlite::mint::memory::new_with_state;
  879. use super::*;
  880. #[derive(Default)]
  881. struct MintConfig<'a> {
  882. active_keysets: HashMap<CurrencyUnit, Id>,
  883. keysets: Vec<MintKeySetInfo>,
  884. mint_quotes: Vec<MintQuote>,
  885. melt_quotes: Vec<MeltQuote>,
  886. pending_proofs: Proofs,
  887. spent_proofs: Proofs,
  888. seed: &'a [u8],
  889. mint_info: MintInfo,
  890. supported_units: HashMap<CurrencyUnit, (u64, u8)>,
  891. }
  892. async fn create_mint(config: MintConfig<'_>) -> Mint {
  893. let localstore = Arc::new(
  894. new_with_state(
  895. config.active_keysets,
  896. config.keysets,
  897. config.mint_quotes,
  898. config.melt_quotes,
  899. config.pending_proofs,
  900. config.spent_proofs,
  901. config.mint_info,
  902. )
  903. .await
  904. .unwrap(),
  905. );
  906. let signatory = Arc::new(
  907. cdk_signatory::db_signatory::DbSignatory::new(
  908. localstore.clone(),
  909. config.seed,
  910. config.supported_units,
  911. HashMap::new(),
  912. )
  913. .await
  914. .expect("Failed to create signatory"),
  915. );
  916. Mint::new(MintInfo::default(), signatory, localstore, HashMap::new())
  917. .await
  918. .unwrap()
  919. }
  920. #[tokio::test]
  921. async fn mint_mod_new_mint() {
  922. let mut supported_units = HashMap::new();
  923. supported_units.insert(CurrencyUnit::default(), (0, 32));
  924. let config = MintConfig::<'_> {
  925. supported_units,
  926. ..Default::default()
  927. };
  928. let mint = create_mint(config).await;
  929. assert_eq!(
  930. mint.total_issued()
  931. .await
  932. .unwrap()
  933. .into_values()
  934. .collect::<Vec<_>>(),
  935. vec![Amount::default()]
  936. );
  937. assert_eq!(
  938. mint.total_issued()
  939. .await
  940. .unwrap()
  941. .into_values()
  942. .collect::<Vec<_>>(),
  943. vec![Amount::default()]
  944. );
  945. }
  946. #[tokio::test]
  947. async fn mint_mod_rotate_keyset() {
  948. let mut supported_units = HashMap::new();
  949. supported_units.insert(CurrencyUnit::default(), (0, 32));
  950. let config = MintConfig::<'_> {
  951. supported_units,
  952. ..Default::default()
  953. };
  954. let mint = create_mint(config).await;
  955. let keysets = mint.keysets();
  956. let first_keyset_id = keysets.keysets[0].id;
  957. // set the first keyset to inactive and generate a new keyset
  958. mint.rotate_keyset(CurrencyUnit::default(), 1, 1)
  959. .await
  960. .expect("test");
  961. let keysets = mint.keysets();
  962. assert_eq!(2, keysets.keysets.len());
  963. for keyset in &keysets.keysets {
  964. if keyset.id == first_keyset_id {
  965. assert!(!keyset.active);
  966. } else {
  967. assert!(keyset.active);
  968. }
  969. }
  970. }
  971. #[tokio::test]
  972. async fn test_mint_keyset_gen() {
  973. let seed = bip39::Mnemonic::from_str(
  974. "dismiss price public alone audit gallery ignore process swap dance crane furnace",
  975. )
  976. .unwrap();
  977. let mut supported_units = HashMap::new();
  978. supported_units.insert(CurrencyUnit::default(), (0, 32));
  979. let config = MintConfig::<'_> {
  980. seed: &seed.to_seed_normalized(""),
  981. supported_units,
  982. ..Default::default()
  983. };
  984. let mint = create_mint(config).await;
  985. let keys = mint.pubkeys();
  986. let expected_keys = r#"{"keysets":[{"id":"005f6e8c540c9e61","unit":"sat","keys":{"1":"03e8aded7525acee36e3394e28f2dcbc012533ef2a2b085a55fc291d311afee3ef","1024":"0351a68a667c5fc21d66c187baecefa1d65529d06b7ae13112d432b6bca16b0e8c","1048576":"02b016346e5a322d371c6e6164b28b31b4d93a51572351ca2f26cdc12e916d9ac3","1073741824":"03f12e6a0903ed0db87485a296b1dca9d953a8a6919ff88732238fbc672d6bd125","128":"0351e33a076f415c2cadc945bc9bcb75bf4a774b28df8a0605dea1557e5897fed8","131072":"027cdf7be8b20a49ac7f2f065f7c53764c8926799877858c6b00b888a8aa6741a5","134217728":"0380658e5163fcf274e1ace6c696d1feef4c6068e0d03083d676dc5ef21804f22d","16":"031dbab0e4f7fb4fb0030f0e1a1dc80668eadd0b1046df3337bb13a7b9c982d392","16384":"028e9c6ce70f34cd29aad48656bf8345bb5ba2cb4f31fdd978686c37c93f0ab411","16777216":"02f2508e7df981c32f7b0008a273e2a1f19c23bb60a1561dba6b2a95ed1251eb90","2":"02628c0919e5cb8ce9aed1f81ce313f40e1ab0b33439d5be2abc69d9bb574902e0","2048":"0376166d8dcf97d8b0e9f11867ff0dafd439c90255b36a25be01e37e14741b9c6a","2097152":"028f25283e36a11df7713934a5287267381f8304aca3c1eb1b89fddce973ef1436","2147483648":"02cece3fb38a54581e0646db4b29242b6d78e49313dda46764094f9d128c1059c1","256":"0314b9f4300367c7e64fa85770da90839d2fc2f57d63660f08bb3ebbf90ed76840","262144":"026939b8f766c3ebaf26408e7e54fc833805563e2ef14c8ee4d0435808b005ec4c","268435456":"031526f03de945c638acccb879de837ac3fabff8590057cfb8552ebcf51215f3aa","32":"037241f7ad421374eb764a48e7769b5e2473582316844fda000d6eef28eea8ffb8","32768":"0253e34bab4eec93e235c33994e01bf851d5caca4559f07d37b5a5c266de7cf840","33554432":"0381883a1517f8c9979a84fcd5f18437b1a2b0020376ecdd2e515dc8d5a157a318","4":"039e7c7f274e1e8a90c61669e961c944944e6154c0794fccf8084af90252d2848f","4096":"03d40f47b4e5c4d72f2a977fab5c66b54d945b2836eb888049b1dd9334d1d70304","4194304":"03e5841d310819a49ec42dfb24839c61f68bbfc93ac68f6dad37fd5b2d204cc535","512":"030d95abc7e881d173f4207a3349f4ee442b9e51cc461602d3eb9665b9237e8db3","524288":"03772542057493a46eed6513b40386e766eedada16560ffde2f776b65794e9f004","536870912":"035eb3e7262e126c5503e1b402db05f87de6556773ae709cb7aa1c3b0986b87566","64":"02bc9767b4abf88becdac47a59e67ee9a9a80b9864ef57d16084575273ac63c0e7","65536":"02684ede207f9ace309b796b5259fc81ef0d4492b4fb5d66cf866b0b4a6f27bec9","67108864":"02aa648d39c9a725ef5927db15af6895f0d43c17f0a31faff4406314fc80180086","8":"02ca0e563ae941700aefcb16a7fb820afbb3258ae924ab520210cb730227a76ca3","8192":"03be18afaf35a29d7bcd5dfd1936d82c1c14691a63f8aa6ece258e16b0c043049b","8388608":"0307ebfeb87b7bca9baa03fad00499e5cc999fa5179ef0b7ad4f555568bcb946f5"}}]}"#;
  987. assert_eq!(expected_keys, serde_json::to_string(&keys.clone()).unwrap());
  988. }
  989. #[tokio::test]
  990. async fn test_start_stop_lifecycle() {
  991. let mut supported_units = HashMap::new();
  992. supported_units.insert(CurrencyUnit::default(), (0, 32));
  993. let config = MintConfig::<'_> {
  994. supported_units,
  995. ..Default::default()
  996. };
  997. let mint = create_mint(config).await;
  998. // Start should succeed (async)
  999. mint.start().await.expect("Failed to start mint");
  1000. // Starting again should fail (already running)
  1001. assert!(mint.start().await.is_err());
  1002. // Stop should succeed (still async)
  1003. mint.stop().await.expect("Failed to stop mint");
  1004. // Stopping again should succeed (idempotent)
  1005. mint.stop().await.expect("Second stop should be fine");
  1006. // Should be able to start again after stopping
  1007. mint.start().await.expect("Should be able to restart");
  1008. mint.stop().await.expect("Final stop should work");
  1009. }
  1010. }