浏览代码

Merge pull request #903 from thesimplekid/mint_start

feat(cdk): add mint lifecycle management with start/stop methods
thesimplekid 3 月之前
父节点
当前提交
7b2e31a3df
共有 4 个文件被更改,包括 326 次插入72 次删除
  1. 5 0
      CHANGELOG.md
  2. 2 7
      crates/cdk-integration-tests/src/init_pure_tests.rs
  3. 2 9
      crates/cdk-mintd/src/main.rs
  4. 317 56
      crates/cdk/src/mint/mod.rs

+ 5 - 0
CHANGELOG.md

@@ -8,6 +8,8 @@
 
 ### Added
 - cashu: `KeySetInfos` type alias and `KeySetInfosMethods` trait for filtering keysets ([thesimplekid]).
+- cdk: Mint lifecycle management with `start()` and `stop()` methods for graceful background service control ([thesimplekid]).
+- cdk: Background task management for invoice payment monitoring with proper shutdown handling ([thesimplekid]).
 
 ### Changed
 - cdk: Refactored wallet keyset management methods for better clarity and separation of concerns ([thesimplekid]).
@@ -17,6 +19,9 @@
 - cdk: Improved `load_mint_keysets` method to be the primary method for getting keysets for token operations ([thesimplekid]).
 - cdk: Enhanced keyset management with better offline/online operation separation ([thesimplekid]).
 - cdk: Updated method documentation to clarify storage vs network operations ([thesimplekid]).
+- cdk: Refactored invoice payment monitoring to use centralized lifecycle management instead of manual task spawning ([thesimplekid]).
+- cdk-mintd: Updated to use new mint lifecycle methods for improved service management ([thesimplekid]).
+- cdk-integration-tests: Updated test utilities to use new mint lifecycle management ([thesimplekid]).
 
 
 

+ 2 - 7
crates/cdk-integration-tests/src/init_pure_tests.rs

@@ -24,7 +24,7 @@ use cdk::util::unix_time;
 use cdk::wallet::{AuthWallet, MintConnector, Wallet, WalletBuilder};
 use cdk::{Amount, Error, Mint};
 use cdk_fake_wallet::FakeWallet;
-use tokio::sync::{Notify, RwLock};
+use tokio::sync::RwLock;
 use tracing_subscriber::EnvFilter;
 use uuid::Uuid;
 
@@ -282,12 +282,7 @@ pub async fn create_and_start_test_mint() -> Result<Mint> {
         .build_with_seed(localstore.clone(), &mnemonic.to_seed_normalized(""))
         .await?;
 
-    let mint_clone = mint.clone();
-    let shutdown = Arc::new(Notify::new());
-    tokio::spawn({
-        let shutdown = Arc::clone(&shutdown);
-        async move { mint_clone.wait_for_paid_invoices(shutdown).await }
-    });
+    mint.start().await?;
 
     Ok(mint)
 }

+ 2 - 9
crates/cdk-mintd/src/main.rs

@@ -49,7 +49,6 @@ use cdk_mintd::setup::LnBackendSetup;
 use cdk_sqlite::mint::MintSqliteAuthDatabase;
 use cdk_sqlite::MintSqliteDatabase;
 use clap::Parser;
-use tokio::sync::Notify;
 use tower::ServiceBuilder;
 use tower_http::compression::CompressionLayer;
 use tower_http::decompression::RequestDecompressionLayer;
@@ -757,12 +756,7 @@ async fn start_services(
         mint_service = mint_service.merge(router);
     }
 
-    let shutdown = Arc::new(Notify::new());
-    let mint_clone = Arc::clone(&mint);
-    tokio::spawn({
-        let shutdown = Arc::clone(&shutdown);
-        async move { mint_clone.wait_for_paid_invoices(shutdown).await }
-    });
+    mint.start().await?;
 
     let socket_addr = SocketAddr::from_str(&format!("{listen_addr}:{listen_port}"))?;
 
@@ -784,8 +778,7 @@ async fn start_services(
         }
     }
 
-    // Notify all waiting tasks to shutdown
-    shutdown.notify_waiters();
+    mint.stop().await?;
 
     #[cfg(feature = "management-rpc")]
     {

+ 317 - 56
crates/cdk/src/mint/mod.rs

@@ -2,6 +2,7 @@
 
 use std::collections::HashMap;
 use std::sync::Arc;
+use std::time::Duration;
 
 use arc_swap::ArcSwap;
 use cdk_common::common::{PaymentProcessorKey, QuoteTTL};
@@ -9,14 +10,15 @@ use cdk_common::common::{PaymentProcessorKey, QuoteTTL};
 use cdk_common::database::MintAuthDatabase;
 use cdk_common::database::{self, MintDatabase, MintTransaction};
 use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind};
+use cdk_common::payment::WaitPaymentResponse;
 use cdk_common::secret;
 use cdk_signatory::signatory::{Signatory, SignatoryKeySet};
 use futures::StreamExt;
 #[cfg(feature = "auth")]
 use nut21::ProtectedEndpoint;
 use subscription::PubSubManager;
-use tokio::sync::Notify;
-use tokio::task::JoinSet;
+use tokio::sync::{Mutex, Notify};
+use tokio::task::{JoinHandle, JoinSet};
 use tracing::instrument;
 use uuid::Uuid;
 
@@ -68,6 +70,17 @@ pub struct Mint {
     oidc_client: Option<OidcClient>,
     /// In-memory keyset
     keysets: Arc<ArcSwap<Vec<SignatoryKeySet>>>,
+    /// Background task management
+    task_state: Arc<Mutex<TaskState>>,
+}
+
+/// State for managing background tasks
+#[derive(Default)]
+struct TaskState {
+    /// Shutdown signal for all background tasks
+    shutdown_notify: Option<Arc<Notify>>,
+    /// Handle to the main supervisor task
+    supervisor_handle: Option<JoinHandle<Result<(), Error>>>,
 }
 
 impl Mint {
@@ -168,9 +181,115 @@ impl Mint {
             #[cfg(feature = "auth")]
             auth_localstore,
             keysets: Arc::new(ArcSwap::new(keysets.keysets.into())),
+            task_state: Arc::new(Mutex::new(TaskState::default())),
         })
     }
 
+    /// Start the mint's background services and operations
+    ///
+    /// This function immediately starts background services and returns. The background
+    /// tasks will continue running until `stop()` is called.
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(())` if background services started successfully, or an `Error`
+    /// if startup failed.
+    ///
+    /// # Background Services
+    ///
+    /// Currently manages:
+    /// - Invoice payment monitoring across all configured payment processors
+    ///
+    /// Future services may include:
+    /// - Quote cleanup and expiration management  
+    /// - Periodic database maintenance
+    /// - Health check monitoring
+    /// - Metrics collection
+    pub async fn start(&self) -> Result<(), Error> {
+        let mut task_state = self.task_state.lock().await;
+
+        // Prevent starting if already running
+        if task_state.shutdown_notify.is_some() {
+            return Err(Error::Internal); // Already started
+        }
+
+        // Create shutdown signal
+        let shutdown_notify = Arc::new(Notify::new());
+
+        // Clone required components for the background task
+        let payment_processors = self.payment_processors.clone();
+        let localstore = Arc::clone(&self.localstore);
+        let pubsub_manager = Arc::clone(&self.pubsub_manager);
+        let shutdown_clone = shutdown_notify.clone();
+
+        // Spawn the supervisor task
+        let supervisor_handle = tokio::spawn(async move {
+            Self::wait_for_paid_invoices(
+                &payment_processors,
+                localstore,
+                pubsub_manager,
+                shutdown_clone,
+            )
+            .await
+        });
+
+        // Store the handles
+        task_state.shutdown_notify = Some(shutdown_notify);
+        task_state.supervisor_handle = Some(supervisor_handle);
+
+        // Give the background task a tiny bit of time to start waiting
+        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
+
+        tracing::info!("Mint background services started");
+        Ok(())
+    }
+
+    /// Stop all background services and wait for graceful shutdown
+    ///
+    /// This function signals all background tasks to shut down and waits for them
+    /// to complete gracefully. It's safe to call multiple times.
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(())` when all background services have shut down cleanly, or an
+    /// `Error` if there was an issue during shutdown.
+    pub async fn stop(&self) -> Result<(), Error> {
+        let mut task_state = self.task_state.lock().await;
+
+        // Take the handles out of the state
+        let shutdown_notify = task_state.shutdown_notify.take();
+        let supervisor_handle = task_state.supervisor_handle.take();
+
+        // If nothing to stop, return early
+        let (shutdown_notify, supervisor_handle) = match (shutdown_notify, supervisor_handle) {
+            (Some(notify), Some(handle)) => (notify, handle),
+            _ => {
+                tracing::debug!("Stop called but no background services were running");
+                return Ok(()); // Nothing to stop
+            }
+        };
+
+        // Drop the lock before waiting
+        drop(task_state);
+
+        tracing::info!("Stopping mint background services...");
+
+        // Signal shutdown
+        shutdown_notify.notify_waiters();
+
+        // Wait for supervisor to complete
+        match supervisor_handle.await {
+            Ok(result) => {
+                tracing::info!("Mint background services stopped");
+                result
+            }
+            Err(join_error) => {
+                tracing::error!("Background service task panicked: {:?}", join_error);
+                Err(Error::Internal)
+            }
+        }
+    }
+
     /// Get the payment processor for the given unit and payment method
     pub fn get_payment_processor(
         &self,
@@ -262,80 +381,194 @@ impl Mint {
         Ok(tx.commit().await?)
     }
 
-    /// Wait for any invoice to be paid
     /// For each backend starts a task that waits for any invoice to be paid
     /// Once invoice is paid mint quote status is updated
     #[instrument(skip_all)]
-    pub async fn wait_for_paid_invoices(&self, shutdown: Arc<Notify>) -> Result<(), Error> {
-        let mint_arc = Arc::new(self.clone());
-
+    async fn wait_for_paid_invoices(
+        payment_processors: &HashMap<
+            PaymentProcessorKey,
+            Arc<dyn MintPayment<Err = cdk_payment::Error> + Send + Sync>,
+        >,
+        localstore: Arc<dyn MintDatabase<database::Error> + Send + Sync>,
+        pubsub_manager: Arc<PubSubManager>,
+        shutdown: Arc<Notify>,
+    ) -> Result<(), Error> {
         let mut join_set = JoinSet::new();
 
-        let mut processor_groups: Vec<(
-            Arc<dyn MintPayment<Err = cdk_payment::Error> + Send + Sync>,
-            Vec<PaymentProcessorKey>,
-        )> = Vec::new();
-
-        for (key, ln) in self.payment_processors.iter() {
-            // Check if we already have this processor
-            let found = processor_groups.iter_mut().find(|(proc_ref, _)| {
-                // Compare Arc pointer equality using ptr_eq
-                Arc::ptr_eq(proc_ref, ln)
-            });
+        // Group processors by unique instance (using Arc pointer equality)
+        let mut seen_processors = Vec::new();
+        for (key, processor) in payment_processors {
+            // Skip if processor is already active
+            if processor.is_wait_invoice_active() {
+                continue;
+            }
 
-            if let Some((_, keys)) = found {
-                // We found this processor, add the key to its group
-                keys.push(key.clone());
-            } else {
-                // New processor, create a new group
-                processor_groups.push((Arc::clone(ln), vec![key.clone()]));
+            // Skip if we've already spawned a task for this processor instance
+            if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
+                continue;
             }
+
+            seen_processors.push(Arc::clone(processor));
+
+            tracing::info!("Starting payment wait task for {:?}", key);
+
+            // Clone for the spawned task
+            let processor = Arc::clone(processor);
+            let localstore = Arc::clone(&localstore);
+            let pubsub_manager = Arc::clone(&pubsub_manager);
+            let shutdown = Arc::clone(&shutdown);
+
+            join_set.spawn(async move {
+                let result = Self::wait_for_processor_payments(
+                    processor,
+                    localstore,
+                    pubsub_manager,
+                    shutdown,
+                )
+                .await;
+
+                if let Err(e) = result {
+                    tracing::error!("Payment processor task failed: {:?}", e);
+                }
+            });
         }
 
-        for (ln, key) in processor_groups {
-            if !ln.is_wait_invoice_active() {
-                tracing::info!("Wait payment for {:?} inactive starting.", key);
-                let mint = Arc::clone(&mint_arc);
-                let ln = Arc::clone(&ln);
-                let shutdown = Arc::clone(&shutdown);
-                let key = key.clone();
-                join_set.spawn(async move {
+        // If no payment processors, just wait for shutdown
+        if join_set.is_empty() {
+            shutdown.notified().await;
+        } else {
+            // Wait for shutdown or all tasks to complete
             loop {
-                tracing::info!("Restarting wait for: {:?}", key);
                 tokio::select! {
                     _ = shutdown.notified() => {
-                        tracing::info!("Shutdown signal received, stopping task for {:?}", key);
-                        ln.cancel_wait_invoice();
+                        println!("Shutting down payment processors");
                         break;
                     }
-                    result = ln.wait_any_incoming_payment() => {
-                        match result {
-                            Ok(mut stream) => {
-                                while let Some(request_lookup_id) = stream.next().await {
-                                    if let Err(err) = mint.pay_mint_quote_for_request_id(request_lookup_id).await {
-                                        tracing::warn!("{:?}", err);
-                                    }
-                                }
-                            }
-                            Err(err) => {
-                                tracing::warn!("Could not get incoming payment stream for {:?}: {}",key, err);
+                    Some(result) = join_set.join_next() => {
+                        if let Err(e) = result {
+                            tracing::warn!("Task panicked: {:?}", e);
+                        }
+                    }
+                    else => break, // All tasks completed
+                }
+            }
+        }
+
+        join_set.shutdown().await;
+        Ok(())
+    }
 
-                                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+    /// Handles payment waiting for a single processor
+    async fn wait_for_processor_payments(
+        processor: Arc<dyn MintPayment<Err = cdk_payment::Error> + Send + Sync>,
+        localstore: Arc<dyn MintDatabase<database::Error> + Send + Sync>,
+        pubsub_manager: Arc<PubSubManager>,
+        shutdown: Arc<Notify>,
+    ) -> Result<(), Error> {
+        loop {
+            tokio::select! {
+                _ = shutdown.notified() => {
+                    processor.cancel_wait_invoice();
+                    break;
+                }
+                result = processor.wait_any_incoming_payment() => {
+                    match result {
+                        Ok(mut stream) => {
+                            while let Some(request_lookup_id) = stream.next().await {
+                                if let Err(e) = Self::handle_payment_notification(
+                                    &localstore,
+                                    &pubsub_manager,
+                                    request_lookup_id,
+                                ).await {
+                                    tracing::warn!("Payment notification error: {:?}", e);
+                                }
                             }
                         }
+                        Err(e) => {
+                            tracing::warn!("Failed to get payment stream: {}", e);
+                            tokio::time::sleep(Duration::from_secs(5)).await;
+                        }
                     }
-                    }
-            }
-        });
+                }
             }
         }
+        Ok(())
+    }
 
-        // Spawn a task to manage the JoinSet
-        while let Some(result) = join_set.join_next().await {
-            match result {
-                Ok(_) => tracing::info!("A task completed successfully."),
-                Err(err) => tracing::warn!("A task failed: {:?}", err),
+    /// Handle payment notification without needing full Mint instance
+    /// This is a helper function that can be called with just the required components
+    async fn handle_payment_notification(
+        localstore: &Arc<dyn MintDatabase<database::Error> + Send + Sync>,
+        pubsub_manager: &Arc<PubSubManager>,
+        wait_payment_response: WaitPaymentResponse,
+    ) -> Result<(), Error> {
+        if wait_payment_response.payment_amount == Amount::ZERO {
+            tracing::warn!(
+                "Received payment response with 0 amount with payment id {}.",
+                wait_payment_response.payment_id
+            );
+            return Err(Error::AmountUndefined);
+        }
+
+        let mut tx = localstore.begin_transaction().await?;
+
+        if let Ok(Some(mint_quote)) = tx
+            .get_mint_quote_by_request_lookup_id(&wait_payment_response.payment_identifier)
+            .await
+        {
+            Self::handle_mint_quote_payment(
+                &mut tx,
+                &mint_quote,
+                wait_payment_response,
+                pubsub_manager,
+            )
+            .await?;
+        } else {
+            tracing::warn!(
+                "Could not get request for request lookup id {:?}.",
+                wait_payment_response.payment_identifier
+            );
+        }
+
+        tx.commit().await?;
+        Ok(())
+    }
+
+    /// Handle payment for a specific mint quote (extracted from pay_mint_quote)
+    async fn handle_mint_quote_payment(
+        tx: &mut Box<dyn database::MintTransaction<'_, database::Error> + Send + Sync + '_>,
+        mint_quote: &MintQuote,
+        wait_payment_response: WaitPaymentResponse,
+        pubsub_manager: &Arc<PubSubManager>,
+    ) -> Result<(), Error> {
+        tracing::debug!(
+            "Received payment notification of {} for mint quote {} with payment id {}",
+            wait_payment_response.payment_amount,
+            mint_quote.id,
+            wait_payment_response.payment_id
+        );
+
+        let quote_state = mint_quote.state();
+        if !mint_quote
+            .payment_ids()
+            .contains(&&wait_payment_response.payment_id)
+        {
+            if mint_quote.payment_method == PaymentMethod::Bolt11
+                && (quote_state == MintQuoteState::Issued || quote_state == MintQuoteState::Paid)
+            {
+                tracing::info!("Received payment notification for already issued quote.");
+            } else {
+                tx.increment_mint_quote_amount_paid(
+                    &mint_quote.id,
+                    wait_payment_response.payment_amount,
+                    wait_payment_response.payment_id,
+                )
+                .await?;
+
+                pubsub_manager.mint_quote_bolt11_status(mint_quote.clone(), MintQuoteState::Paid);
             }
+        } else {
+            tracing::info!("Received payment notification for already seen payment.");
         }
 
         Ok(())
@@ -557,11 +790,11 @@ impl Mint {
     /// Total redeemed for keyset
     #[instrument(skip_all)]
     pub async fn total_redeemed(&self) -> Result<HashMap<Id, Amount>, Error> {
-        let keysets = self.keysets().keysets;
+        let keysets = self.signatory.keysets().await?;
 
         let mut total_redeemed = HashMap::new();
 
-        for keyset in keysets {
+        for keyset in keysets.keysets {
             let (proofs, state) = self.localstore.get_proofs_by_keyset_id(&keyset.id).await?;
 
             let total_spent =
@@ -581,6 +814,7 @@ impl Mint {
 
 #[cfg(test)]
 mod tests {
+
     use std::str::FromStr;
 
     use cdk_sqlite::mint::memory::new_with_state;
@@ -713,4 +947,31 @@ mod tests {
 
         assert_eq!(expected_keys, serde_json::to_string(&keys.clone()).unwrap());
     }
+
+    #[tokio::test]
+    async fn test_start_stop_lifecycle() {
+        let mut supported_units = HashMap::new();
+        supported_units.insert(CurrencyUnit::default(), (0, 32));
+        let config = MintConfig::<'_> {
+            supported_units,
+            ..Default::default()
+        };
+        let mint = create_mint(config).await;
+
+        // Start should succeed (async)
+        mint.start().await.expect("Failed to start mint");
+
+        // Starting again should fail (already running)
+        assert!(mint.start().await.is_err());
+
+        // Stop should succeed (still async)
+        mint.stop().await.expect("Failed to stop mint");
+
+        // Stopping again should succeed (idempotent)
+        mint.stop().await.expect("Second stop should be fine");
+
+        // Should be able to start again after stopping
+        mint.start().await.expect("Should be able to restart");
+        mint.stop().await.expect("Final stop should work");
+    }
 }