浏览代码

Fix websocket issues and mint quotes (#1246)

* Fix websocket issues and mint quotes

Fixes #1209

When checking with HTTP pulling, mint quotes would ping the ln backend if
needed to get any payment that may not be included, so the mint uses this
chance to correct its database.

This check was missing for database subscriptions
C 1 周之前
父节点
当前提交
c6434b88d1
共有 3 个文件被更改,包括 85 次插入19 次删除
  1. 31 6
      crates/cdk/src/mint/ln.rs
  2. 6 4
      crates/cdk/src/mint/mod.rs
  3. 48 9
      crates/cdk/src/mint/subscription.rs

+ 31 - 6
crates/cdk/src/mint/ln.rs

@@ -1,17 +1,28 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
 use cdk_common::amount::to_unit;
 use cdk_common::common::PaymentProcessorKey;
+use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::MintQuote;
+use cdk_common::payment::DynMintPayment;
 use cdk_common::util::unix_time;
 use cdk_common::{Amount, MintQuoteState, PaymentMethod};
 use tracing::instrument;
 
+use super::subscription::PubSubManager;
 use super::Mint;
 use crate::Error;
 
 impl Mint {
-    /// Check the status of an ln payment for a quote
-    #[instrument(skip_all)]
-    pub async fn check_mint_quote_paid(&self, quote: &mut MintQuote) -> Result<(), Error> {
+    /// Static implementation of check_mint_quote_paid to avoid circular dependency to the Mint
+    #[inline(always)]
+    pub(crate) async fn check_mint_quote_payments(
+        localstore: DynMintDatabase,
+        payment_processors: Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
+        pubsub_manager: Option<Arc<PubSubManager>>,
+        quote: &mut MintQuote,
+    ) -> Result<(), Error> {
         let state = quote.state();
 
         // We can just return here and do not need to check with ln node.
@@ -23,7 +34,7 @@ impl Mint {
             return Ok(());
         }
 
-        let ln = match self.payment_processors.get(&PaymentProcessorKey::new(
+        let ln = match payment_processors.get(&PaymentProcessorKey::new(
             quote.unit.clone(),
             quote.payment_method.clone(),
         )) {
@@ -43,7 +54,7 @@ impl Mint {
             return Ok(());
         }
 
-        let mut tx = self.localstore.begin_transaction().await?;
+        let mut tx = localstore.begin_transaction().await?;
 
         // reload the quote, as it state may have changed
         *quote = tx
@@ -79,7 +90,9 @@ impl Mint {
                     .increment_mint_quote_amount_paid(&quote.id, amount_paid, payment.payment_id)
                     .await?;
 
-                self.pubsub_manager.mint_quote_payment(quote, total_paid);
+                if let Some(pubsub_manager) = pubsub_manager.as_ref() {
+                    pubsub_manager.mint_quote_payment(quote, total_paid);
+                }
             }
         }
 
@@ -87,4 +100,16 @@ impl Mint {
 
         Ok(())
     }
+
+    /// Check the status of an ln payment for a quote
+    #[instrument(skip_all)]
+    pub async fn check_mint_quote_paid(&self, quote: &mut MintQuote) -> Result<(), Error> {
+        Self::check_mint_quote_payments(
+            self.localstore.clone(),
+            self.payment_processors.clone(),
+            Some(self.pubsub_manager.clone()),
+            quote,
+        )
+        .await
+    }
 }

+ 6 - 4
crates/cdk/src/mint/mod.rs

@@ -69,7 +69,7 @@ pub struct Mint {
     #[cfg(feature = "auth")]
     auth_localstore: Option<DynMintAuthDatabase>,
     /// Payment processors for mint
-    payment_processors: HashMap<PaymentProcessorKey, DynMintPayment>,
+    payment_processors: Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
     /// Subscription manager
     pubsub_manager: Arc<PubSubManager>,
     #[cfg(feature = "auth")]
@@ -203,9 +203,11 @@ impl Mint {
             }
         }
 
+        let payment_processors = Arc::new(payment_processors);
+
         Ok(Self {
             signatory,
-            pubsub_manager: PubSubManager::new(localstore.clone()),
+            pubsub_manager: PubSubManager::new((localstore.clone(), payment_processors.clone())),
             localstore,
             #[cfg(feature = "auth")]
             oidc_client: computed_info.nuts.nut21.as_ref().map(|nut21| {
@@ -257,7 +259,7 @@ impl Mint {
         // Start all payment processors first
         tracing::info!("Starting payment processors...");
         let mut seen_processors = Vec::new();
-        for (key, processor) in &self.payment_processors {
+        for (key, processor) in self.payment_processors.iter() {
             // Skip if we've already spawned a task for this processor instance
             if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
                 continue;
@@ -369,7 +371,7 @@ impl Mint {
         tracing::info!("Stopping payment processors...");
         let mut seen_processors = Vec::new();
 
-        for (key, processor) in &self.payment_processors {
+        for (key, processor) in self.payment_processors.iter() {
             // Skip if we've already spawned a task for this processor instance
             if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
                 continue;

+ 48 - 9
crates/cdk/src/mint/subscription.rs

@@ -1,11 +1,14 @@
 //! Specific Subscription for the cdk crate
 
+use std::collections::HashMap;
 use std::ops::Deref;
 use std::sync::Arc;
 
+use cdk_common::common::PaymentProcessorKey;
 use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::MintQuote;
 use cdk_common::nut17::NotificationId;
+use cdk_common::payment::DynMintPayment;
 use cdk_common::pub_sub::{Pubsub, Spec, Subscriber};
 use cdk_common::subscription::SubId;
 use cdk_common::{
@@ -13,15 +16,39 @@ use cdk_common::{
     MintQuoteBolt12Response, MintQuoteState, PaymentMethod, ProofState, PublicKey, QuoteId,
 };
 
+use super::Mint;
 use crate::event::MintEvent;
 
 /// Mint subtopics
 #[derive(Clone)]
 pub struct MintPubSubSpec {
     db: DynMintDatabase,
+    payment_processors: Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
 }
 
 impl MintPubSubSpec {
+    /// Call Mint::check_mint_quote_payments to update the quote pinging the payment backend
+    async fn get_mint_quote(
+        &self,
+        quote_id: &QuoteId,
+    ) -> Result<Option<MintQuote>, cdk_common::Error> {
+        let mut quote = if let Some(quote) = self.db.get_mint_quote(quote_id).await? {
+            quote
+        } else {
+            return Ok(None);
+        };
+
+        Mint::check_mint_quote_payments(
+            self.db.clone(),
+            self.payment_processors.clone(),
+            None,
+            &mut quote,
+        )
+        .await?;
+
+        Ok(Some(quote))
+    }
+
     async fn get_events_from_db(
         &self,
         request: &[NotificationId<QuoteId>],
@@ -38,10 +65,10 @@ impl MintPubSubSpec {
                     melt_queries.push(self.db.get_melt_quote(uuid))
                 }
                 NotificationId::MintQuoteBolt11(uuid) => {
-                    mint_queries.push(self.db.get_mint_quote(uuid))
+                    mint_queries.push(self.get_mint_quote(uuid))
                 }
                 NotificationId::MintQuoteBolt12(uuid) => {
-                    mint_queries.push(self.db.get_mint_quote(uuid))
+                    mint_queries.push(self.get_mint_quote(uuid))
                 }
                 NotificationId::MeltQuoteBolt12(uuid) => {
                     melt_queries.push(self.db.get_melt_quote(uuid))
@@ -72,12 +99,13 @@ impl MintPubSubSpec {
                         quotes
                             .into_iter()
                             .filter_map(|quote| {
-                                quote.and_then(|x| match x.payment_method {
+                                quote.and_then(|mint_quotes| match mint_quotes.payment_method {
                                     PaymentMethod::Bolt11 => {
-                                        let response: MintQuoteBolt11Response<QuoteId> = x.into();
+                                        let response: MintQuoteBolt11Response<QuoteId> =
+                                            mint_quotes.into();
                                         Some(response.into())
                                     }
-                                    PaymentMethod::Bolt12 => match x.try_into() {
+                                    PaymentMethod::Bolt12 => match mint_quotes.try_into() {
                                         Ok(response) => {
                                             let response: MintQuoteBolt12Response<QuoteId> =
                                                 response;
@@ -119,10 +147,16 @@ impl Spec for MintPubSubSpec {
 
     type Event = MintEvent<QuoteId>;
 
-    type Context = DynMintDatabase;
+    type Context = (
+        DynMintDatabase,
+        Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
+    );
 
     fn new_instance(context: Self::Context) -> Arc<Self> {
-        Arc::new(Self { db: context })
+        Arc::new(Self {
+            db: context.0,
+            payment_processors: context.1,
+        })
     }
 
     async fn fetch_events(self: &Arc<Self>, topics: Vec<Self::Topic>, reply_to: Subscriber<Self>) {
@@ -142,8 +176,13 @@ pub struct PubSubManager(Pubsub<MintPubSubSpec>);
 
 impl PubSubManager {
     /// Create a new instance
-    pub fn new(db: DynMintDatabase) -> Arc<Self> {
-        Arc::new(Self(Pubsub::new(MintPubSubSpec::new_instance(db))))
+    pub fn new(
+        context: (
+            DynMintDatabase,
+            Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
+        ),
+    ) -> Arc<Self> {
+        Arc::new(Self(Pubsub::new(MintPubSubSpec::new_instance(context))))
     }
 
     /// Helper function to emit a ProofState status