| 
					
				 | 
			
			
				@@ -2,6 +2,7 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use std::collections::HashMap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use std::sync::Arc; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+use std::time::Duration; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use arc_swap::ArcSwap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::common::{PaymentProcessorKey, QuoteTTL}; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -9,14 +10,15 @@ use cdk_common::common::{PaymentProcessorKey, QuoteTTL}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::database::MintAuthDatabase; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::database::{self, MintDatabase, MintTransaction}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+use cdk_common::payment::WaitPaymentResponse; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::secret; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_signatory::signatory::{Signatory, SignatoryKeySet}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use futures::StreamExt; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #[cfg(feature = "auth")] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use nut21::ProtectedEndpoint; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use subscription::PubSubManager; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-use tokio::sync::Notify; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-use tokio::task::JoinSet; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+use tokio::sync::{Mutex, Notify}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+use tokio::task::{JoinHandle, JoinSet}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use tracing::instrument; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use uuid::Uuid; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -68,6 +70,17 @@ pub struct Mint { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     oidc_client: Option<OidcClient>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /// In-memory keyset 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     keysets: Arc<ArcSwap<Vec<SignatoryKeySet>>>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Background task management 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    task_state: Arc<Mutex<TaskState>>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/// State for managing background tasks 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#[derive(Default)] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+struct TaskState { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Shutdown signal for all background tasks 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    shutdown_notify: Option<Arc<Notify>>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Handle to the main supervisor task 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    supervisor_handle: Option<JoinHandle<Result<(), Error>>>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 impl Mint { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -168,9 +181,112 @@ impl Mint { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             #[cfg(feature = "auth")] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             auth_localstore, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             keysets: Arc::new(ArcSwap::new(keysets.keysets.into())), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            task_state: Arc::new(Mutex::new(TaskState::default())), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Start the mint's background services and operations 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// This function immediately starts background services and returns. The background 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// tasks will continue running until `stop()` is called. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// # Returns 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Returns `Ok(())` if background services started successfully, or an `Error` 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// if startup failed. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// # Background Services 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Currently manages: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// - Invoice payment monitoring across all configured payment processors 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Future services may include: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// - Quote cleanup and expiration management   
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// - Periodic database maintenance 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// - Health check monitoring 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// - Metrics collection 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    pub async fn start(&self) -> Result<(), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let mut task_state = self.task_state.lock().await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Prevent starting if already running 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if task_state.shutdown_notify.is_some() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return Err(Error::Internal); // Already started 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Create shutdown signal 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let shutdown_notify = Arc::new(Notify::new()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Clone required components for the background task 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let payment_processors = self.payment_processors.clone(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let localstore = Arc::clone(&self.localstore); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let pubsub_manager = Arc::clone(&self.pubsub_manager); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let shutdown_clone = shutdown_notify.clone(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Spawn the supervisor task 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let supervisor_handle = tokio::spawn(async move { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Self::wait_for_paid_invoices( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                &payment_processors, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                localstore, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                pubsub_manager, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                shutdown_clone, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Store the handles 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        task_state.shutdown_notify = Some(shutdown_notify); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        task_state.supervisor_handle = Some(supervisor_handle); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tracing::info!("Mint background services started"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Ok(()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Stop all background services and wait for graceful shutdown 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// This function signals all background tasks to shut down and waits for them 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// to complete gracefully. It's safe to call multiple times. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// # Returns 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Returns `Ok(())` when all background services have shut down cleanly, or an 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// `Error` if there was an issue during shutdown. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    pub async fn stop(&self) -> Result<(), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let mut task_state = self.task_state.lock().await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Take the handles out of the state 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let shutdown_notify = task_state.shutdown_notify.take(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let supervisor_handle = task_state.supervisor_handle.take(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // If nothing to stop, return early 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let (shutdown_notify, supervisor_handle) = match (shutdown_notify, supervisor_handle) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            (Some(notify), Some(handle)) => (notify, handle), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            _ => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tracing::debug!("Stop called but no background services were running"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return Ok(()); // Nothing to stop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Drop the lock before waiting 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        drop(task_state); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tracing::info!("Stopping mint background services..."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Signal shutdown 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        shutdown_notify.notify_waiters(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Wait for supervisor to complete 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        match supervisor_handle.await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Ok(result) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tracing::info!("Mint background services stopped"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                result 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Err(join_error) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tracing::error!("Background service task panicked: {:?}", join_error); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Err(Error::Internal) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /// Get the payment processor for the given unit and payment method 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     pub fn get_payment_processor( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         &self, 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -262,80 +378,189 @@ impl Mint { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         Ok(tx.commit().await?) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    /// Wait for any invoice to be paid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /// For each backend starts a task that waits for any invoice to be paid 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     /// Once invoice is paid mint quote status is updated 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     #[instrument(skip_all)] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    pub async fn wait_for_paid_invoices(&self, shutdown: Arc<Notify>) -> Result<(), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        let mint_arc = Arc::new(self.clone()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn wait_for_paid_invoices( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        payment_processors: &HashMap< 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            PaymentProcessorKey, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Arc<dyn MintPayment<Err = cdk_payment::Error> + Send + Sync>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        >, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        localstore: Arc<dyn MintDatabase<database::Error> + Send + Sync>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pubsub_manager: Arc<PubSubManager>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        shutdown: Arc<Notify>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ) -> Result<(), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         let mut join_set = JoinSet::new(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        let mut processor_groups: Vec<( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            Arc<dyn MintPayment<Err = cdk_payment::Error> + Send + Sync>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            Vec<PaymentProcessorKey>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        )> = Vec::new(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        for (key, ln) in self.payment_processors.iter() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            // Check if we already have this processor 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            let found = processor_groups.iter_mut().find(|(proc_ref, _)| { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                // Compare Arc pointer equality using ptr_eq 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                Arc::ptr_eq(proc_ref, ln) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Group processors by unique instance (using Arc pointer equality) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let mut seen_processors = Vec::new(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for (key, processor) in payment_processors { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // Skip if processor is already active 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if processor.is_wait_invoice_active() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                continue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if let Some((_, keys)) = found { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                // We found this processor, add the key to its group 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                keys.push(key.clone()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                // New processor, create a new group 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                processor_groups.push((Arc::clone(ln), vec![key.clone()])); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // Skip if we've already spawned a task for this processor instance 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                continue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            seen_processors.push(Arc::clone(processor)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tracing::info!("Starting payment wait task for {:?}", key); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            // Clone for the spawned task 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            let processor = Arc::clone(processor); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            let localstore = Arc::clone(&localstore); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            let pubsub_manager = Arc::clone(&pubsub_manager); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            let shutdown = Arc::clone(&shutdown); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            join_set.spawn(async move { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                let result = Self::wait_for_processor_payments( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    processor, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    localstore, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    pubsub_manager, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    shutdown, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if let Err(e) = result { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    tracing::error!("Payment processor task failed: {:?}", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        for (ln, key) in processor_groups { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if !ln.is_wait_invoice_active() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                tracing::info!("Wait payment for {:?} inactive starting.", key); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                let mint = Arc::clone(&mint_arc); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                let ln = Arc::clone(&ln); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                let shutdown = Arc::clone(&shutdown); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                let key = key.clone(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                join_set.spawn(async move { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            loop { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                tracing::info!("Restarting wait for: {:?}", key); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                tokio::select! { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    _ = shutdown.notified() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        tracing::info!("Shutdown signal received, stopping task for {:?}", key); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        ln.cancel_wait_invoice(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Wait for shutdown or all tasks to complete 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        loop { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tokio::select! { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                _ = shutdown.notified() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    tracing::info!("Shutting down payment processors"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                Some(result) = join_set.join_next() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if let Err(e) = result { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        tracing::warn!("Task panicked: {:?}", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    result = ln.wait_any_incoming_payment() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        match result { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            Ok(mut stream) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                while let Some(request_lookup_id) = stream.next().await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                    if let Err(err) = mint.pay_mint_quote_for_request_id(request_lookup_id).await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                        tracing::warn!("{:?}", err); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            Err(err) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                tracing::warn!("Could not get incoming payment stream for {:?}: {}",key, err); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                else => break, // All tasks completed 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        join_set.shutdown().await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Ok(()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                tokio::time::sleep(std::time::Duration::from_secs(5)).await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Handles payment waiting for a single processor 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn wait_for_processor_payments( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        processor: Arc<dyn MintPayment<Err = cdk_payment::Error> + Send + Sync>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        localstore: Arc<dyn MintDatabase<database::Error> + Send + Sync>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pubsub_manager: Arc<PubSubManager>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        shutdown: Arc<Notify>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ) -> Result<(), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        loop { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tokio::select! { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                _ = shutdown.notified() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    processor.cancel_wait_invoice(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    break; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                result = processor.wait_any_incoming_payment() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    match result { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        Ok(mut stream) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            while let Some(request_lookup_id) = stream.next().await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                if let Err(e) = Self::handle_payment_notification( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    &localstore, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    &pubsub_manager, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    request_lookup_id, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                ).await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    tracing::warn!("Payment notification error: {:?}", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        Err(e) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            tracing::warn!("Failed to get payment stream: {}", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            tokio::time::sleep(Duration::from_secs(5)).await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Ok(()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Handle payment notification without needing full Mint instance 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// This is a helper function that can be called with just the required components 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn handle_payment_notification( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        localstore: &Arc<dyn MintDatabase<database::Error> + Send + Sync>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pubsub_manager: &Arc<PubSubManager>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        wait_payment_response: WaitPaymentResponse, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ) -> Result<(), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if wait_payment_response.payment_amount == Amount::ZERO { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tracing::warn!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                "Received payment response with 0 amount with payment id {}.", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                wait_payment_response.payment_id 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return Err(Error::AmountUndefined); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let mut tx = localstore.begin_transaction().await?; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if let Ok(Some(mint_quote)) = tx 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .get_mint_quote_by_request_lookup_id(&wait_payment_response.payment_identifier) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Self::handle_mint_quote_payment( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                &mut tx, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                &mint_quote, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                wait_payment_response, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                pubsub_manager, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .await?; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tracing::warn!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                "Could not get request for request lookup id {:?}.", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                wait_payment_response.payment_identifier 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tx.commit().await?; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Ok(()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Handle payment for a specific mint quote (extracted from pay_mint_quote) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn handle_mint_quote_payment( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tx: &mut Box<dyn database::MintTransaction<'_, database::Error> + Send + Sync + '_>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        mint_quote: &MintQuote, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        wait_payment_response: WaitPaymentResponse, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pubsub_manager: &Arc<PubSubManager>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ) -> Result<(), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tracing::debug!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "Received payment notification of {} for mint quote {} with payment id {}", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            wait_payment_response.payment_amount, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            mint_quote.id, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            wait_payment_response.payment_id 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let quote_state = mint_quote.state(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if !mint_quote 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .payment_ids() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .contains(&&wait_payment_response.payment_id) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if mint_quote.payment_method == PaymentMethod::Bolt11 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                && (quote_state == MintQuoteState::Issued || quote_state == MintQuoteState::Paid) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tracing::info!("Received payment notification for already issued quote."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                tx.increment_mint_quote_amount_paid( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    &mint_quote.id, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    wait_payment_response.payment_amount, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    wait_payment_response.payment_id, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                .await?; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        // Spawn a task to manage the JoinSet 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        while let Some(result) = join_set.join_next().await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            match result { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                Ok(_) => tracing::info!("A task completed successfully."), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                Err(err) => tracing::warn!("A task failed: {:?}", err), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                pubsub_manager.mint_quote_bolt11_status(mint_quote.clone(), MintQuoteState::Paid); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            tracing::info!("Received payment notification for already seen payment."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         Ok(()) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -713,4 +938,31 @@ mod tests { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         assert_eq!(expected_keys, serde_json::to_string(&keys.clone()).unwrap()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[tokio::test] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn test_start_stop_lifecycle() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let mut supported_units = HashMap::new(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        supported_units.insert(CurrencyUnit::default(), (0, 32)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let config = MintConfig::<'_> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            supported_units, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ..Default::default() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let mint = create_mint(config).await; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Start should succeed (async) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        mint.start().await.expect("Failed to start mint"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Starting again should fail (already running) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        assert!(mint.start().await.is_err()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Stop should succeed (still async) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        mint.stop().await.expect("Failed to stop mint"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Stopping again should succeed (idempotent) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        mint.stop().await.expect("Second stop should be fine"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Should be able to start again after stopping 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        mint.start().await.expect("Should be able to restart"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        mint.stop().await.expect("Final stop should work"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 |