浏览代码

Fix websocket issues and mint quotes

Fixes #1209

When checking with HTTP pulling, mint quotes would ping the ln backend if
needed to get any payment that may not be included, so the mint uses this
chance to correct its database.

This check was missing for database subscriptions
Cesar Rodas 3 周之前
父节点
当前提交
0b9f536b80
共有 3 个文件被更改,包括 85 次插入19 次删除
  1. 31 6
      crates/cdk/src/mint/ln.rs
  2. 6 4
      crates/cdk/src/mint/mod.rs
  3. 48 9
      crates/cdk/src/mint/subscription.rs

+ 31 - 6
crates/cdk/src/mint/ln.rs

@@ -1,17 +1,28 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
 use cdk_common::amount::to_unit;
 use cdk_common::amount::to_unit;
 use cdk_common::common::PaymentProcessorKey;
 use cdk_common::common::PaymentProcessorKey;
+use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::MintQuote;
 use cdk_common::mint::MintQuote;
+use cdk_common::payment::DynMintPayment;
 use cdk_common::util::unix_time;
 use cdk_common::util::unix_time;
 use cdk_common::{Amount, MintQuoteState, PaymentMethod};
 use cdk_common::{Amount, MintQuoteState, PaymentMethod};
 use tracing::instrument;
 use tracing::instrument;
 
 
+use super::subscription::PubSubManager;
 use super::Mint;
 use super::Mint;
 use crate::Error;
 use crate::Error;
 
 
 impl Mint {
 impl Mint {
-    /// Check the status of an ln payment for a quote
-    #[instrument(skip_all)]
-    pub async fn check_mint_quote_paid(&self, quote: &mut MintQuote) -> Result<(), Error> {
+    /// Static implementation of check_mint_quote_paid to avoid circular dependency to the Mint
+    #[inline(always)]
+    pub(crate) async fn check_mint_quote_payments(
+        localstore: DynMintDatabase,
+        payment_processors: Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
+        pubsub_manager: Option<Arc<PubSubManager>>,
+        quote: &mut MintQuote,
+    ) -> Result<(), Error> {
         let state = quote.state();
         let state = quote.state();
 
 
         // We can just return here and do not need to check with ln node.
         // We can just return here and do not need to check with ln node.
@@ -23,7 +34,7 @@ impl Mint {
             return Ok(());
             return Ok(());
         }
         }
 
 
-        let ln = match self.payment_processors.get(&PaymentProcessorKey::new(
+        let ln = match payment_processors.get(&PaymentProcessorKey::new(
             quote.unit.clone(),
             quote.unit.clone(),
             quote.payment_method.clone(),
             quote.payment_method.clone(),
         )) {
         )) {
@@ -43,7 +54,7 @@ impl Mint {
             return Ok(());
             return Ok(());
         }
         }
 
 
-        let mut tx = self.localstore.begin_transaction().await?;
+        let mut tx = localstore.begin_transaction().await?;
 
 
         // reload the quote, as it state may have changed
         // reload the quote, as it state may have changed
         *quote = tx
         *quote = tx
@@ -79,7 +90,9 @@ impl Mint {
                     .increment_mint_quote_amount_paid(&quote.id, amount_paid, payment.payment_id)
                     .increment_mint_quote_amount_paid(&quote.id, amount_paid, payment.payment_id)
                     .await?;
                     .await?;
 
 
-                self.pubsub_manager.mint_quote_payment(quote, total_paid);
+                if let Some(pubsub_manager) = pubsub_manager.as_ref() {
+                    pubsub_manager.mint_quote_payment(quote, total_paid);
+                }
             }
             }
         }
         }
 
 
@@ -87,4 +100,16 @@ impl Mint {
 
 
         Ok(())
         Ok(())
     }
     }
+
+    /// Check the status of an ln payment for a quote
+    #[instrument(skip_all)]
+    pub async fn check_mint_quote_paid(&self, quote: &mut MintQuote) -> Result<(), Error> {
+        Self::check_mint_quote_payments(
+            self.localstore.clone(),
+            self.payment_processors.clone(),
+            Some(self.pubsub_manager.clone()),
+            quote,
+        )
+        .await
+    }
 }
 }

+ 6 - 4
crates/cdk/src/mint/mod.rs

@@ -69,7 +69,7 @@ pub struct Mint {
     #[cfg(feature = "auth")]
     #[cfg(feature = "auth")]
     auth_localstore: Option<DynMintAuthDatabase>,
     auth_localstore: Option<DynMintAuthDatabase>,
     /// Payment processors for mint
     /// Payment processors for mint
-    payment_processors: HashMap<PaymentProcessorKey, DynMintPayment>,
+    payment_processors: Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
     /// Subscription manager
     /// Subscription manager
     pubsub_manager: Arc<PubSubManager>,
     pubsub_manager: Arc<PubSubManager>,
     #[cfg(feature = "auth")]
     #[cfg(feature = "auth")]
@@ -203,9 +203,11 @@ impl Mint {
             }
             }
         }
         }
 
 
+        let payment_processors = Arc::new(payment_processors);
+
         Ok(Self {
         Ok(Self {
             signatory,
             signatory,
-            pubsub_manager: PubSubManager::new(localstore.clone()),
+            pubsub_manager: PubSubManager::new((localstore.clone(), payment_processors.clone())),
             localstore,
             localstore,
             #[cfg(feature = "auth")]
             #[cfg(feature = "auth")]
             oidc_client: computed_info.nuts.nut21.as_ref().map(|nut21| {
             oidc_client: computed_info.nuts.nut21.as_ref().map(|nut21| {
@@ -257,7 +259,7 @@ impl Mint {
         // Start all payment processors first
         // Start all payment processors first
         tracing::info!("Starting payment processors...");
         tracing::info!("Starting payment processors...");
         let mut seen_processors = Vec::new();
         let mut seen_processors = Vec::new();
-        for (key, processor) in &self.payment_processors {
+        for (key, processor) in self.payment_processors.iter() {
             // Skip if we've already spawned a task for this processor instance
             // Skip if we've already spawned a task for this processor instance
             if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
             if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
                 continue;
                 continue;
@@ -369,7 +371,7 @@ impl Mint {
         tracing::info!("Stopping payment processors...");
         tracing::info!("Stopping payment processors...");
         let mut seen_processors = Vec::new();
         let mut seen_processors = Vec::new();
 
 
-        for (key, processor) in &self.payment_processors {
+        for (key, processor) in self.payment_processors.iter() {
             // Skip if we've already spawned a task for this processor instance
             // Skip if we've already spawned a task for this processor instance
             if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
             if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) {
                 continue;
                 continue;

+ 48 - 9
crates/cdk/src/mint/subscription.rs

@@ -1,11 +1,14 @@
 //! Specific Subscription for the cdk crate
 //! Specific Subscription for the cdk crate
 
 
+use std::collections::HashMap;
 use std::ops::Deref;
 use std::ops::Deref;
 use std::sync::Arc;
 use std::sync::Arc;
 
 
+use cdk_common::common::PaymentProcessorKey;
 use cdk_common::database::DynMintDatabase;
 use cdk_common::database::DynMintDatabase;
 use cdk_common::mint::MintQuote;
 use cdk_common::mint::MintQuote;
 use cdk_common::nut17::NotificationId;
 use cdk_common::nut17::NotificationId;
+use cdk_common::payment::DynMintPayment;
 use cdk_common::pub_sub::{Pubsub, Spec, Subscriber};
 use cdk_common::pub_sub::{Pubsub, Spec, Subscriber};
 use cdk_common::subscription::SubId;
 use cdk_common::subscription::SubId;
 use cdk_common::{
 use cdk_common::{
@@ -13,15 +16,39 @@ use cdk_common::{
     MintQuoteBolt12Response, MintQuoteState, PaymentMethod, ProofState, PublicKey, QuoteId,
     MintQuoteBolt12Response, MintQuoteState, PaymentMethod, ProofState, PublicKey, QuoteId,
 };
 };
 
 
+use super::Mint;
 use crate::event::MintEvent;
 use crate::event::MintEvent;
 
 
 /// Mint subtopics
 /// Mint subtopics
 #[derive(Clone)]
 #[derive(Clone)]
 pub struct MintPubSubSpec {
 pub struct MintPubSubSpec {
     db: DynMintDatabase,
     db: DynMintDatabase,
+    payment_processors: Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
 }
 }
 
 
 impl MintPubSubSpec {
 impl MintPubSubSpec {
+    /// Similar to Mint::check_mint_quote_paid, but copied to avoid
+    async fn get_mint_quote(
+        &self,
+        quote_id: &QuoteId,
+    ) -> Result<Option<MintQuote>, cdk_common::Error> {
+        let mut quote = if let Some(quote) = self.db.get_mint_quote(quote_id).await? {
+            quote
+        } else {
+            return Ok(None);
+        };
+
+        Mint::check_mint_quote_payments(
+            self.db.clone(),
+            self.payment_processors.clone(),
+            None,
+            &mut quote,
+        )
+        .await?;
+
+        Ok(Some(quote))
+    }
+
     async fn get_events_from_db(
     async fn get_events_from_db(
         &self,
         &self,
         request: &[NotificationId<QuoteId>],
         request: &[NotificationId<QuoteId>],
@@ -38,10 +65,10 @@ impl MintPubSubSpec {
                     melt_queries.push(self.db.get_melt_quote(uuid))
                     melt_queries.push(self.db.get_melt_quote(uuid))
                 }
                 }
                 NotificationId::MintQuoteBolt11(uuid) => {
                 NotificationId::MintQuoteBolt11(uuid) => {
-                    mint_queries.push(self.db.get_mint_quote(uuid))
+                    mint_queries.push(self.get_mint_quote(uuid))
                 }
                 }
                 NotificationId::MintQuoteBolt12(uuid) => {
                 NotificationId::MintQuoteBolt12(uuid) => {
-                    mint_queries.push(self.db.get_mint_quote(uuid))
+                    mint_queries.push(self.get_mint_quote(uuid))
                 }
                 }
                 NotificationId::MeltQuoteBolt12(uuid) => {
                 NotificationId::MeltQuoteBolt12(uuid) => {
                     melt_queries.push(self.db.get_melt_quote(uuid))
                     melt_queries.push(self.db.get_melt_quote(uuid))
@@ -72,12 +99,13 @@ impl MintPubSubSpec {
                         quotes
                         quotes
                             .into_iter()
                             .into_iter()
                             .filter_map(|quote| {
                             .filter_map(|quote| {
-                                quote.and_then(|x| match x.payment_method {
+                                quote.and_then(|mint_quotes| match mint_quotes.payment_method {
                                     PaymentMethod::Bolt11 => {
                                     PaymentMethod::Bolt11 => {
-                                        let response: MintQuoteBolt11Response<QuoteId> = x.into();
+                                        let response: MintQuoteBolt11Response<QuoteId> =
+                                            mint_quotes.into();
                                         Some(response.into())
                                         Some(response.into())
                                     }
                                     }
-                                    PaymentMethod::Bolt12 => match x.try_into() {
+                                    PaymentMethod::Bolt12 => match mint_quotes.try_into() {
                                         Ok(response) => {
                                         Ok(response) => {
                                             let response: MintQuoteBolt12Response<QuoteId> =
                                             let response: MintQuoteBolt12Response<QuoteId> =
                                                 response;
                                                 response;
@@ -119,10 +147,16 @@ impl Spec for MintPubSubSpec {
 
 
     type Event = MintEvent<QuoteId>;
     type Event = MintEvent<QuoteId>;
 
 
-    type Context = DynMintDatabase;
+    type Context = (
+        DynMintDatabase,
+        Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
+    );
 
 
     fn new_instance(context: Self::Context) -> Arc<Self> {
     fn new_instance(context: Self::Context) -> Arc<Self> {
-        Arc::new(Self { db: context })
+        Arc::new(Self {
+            db: context.0,
+            payment_processors: context.1,
+        })
     }
     }
 
 
     async fn fetch_events(self: &Arc<Self>, topics: Vec<Self::Topic>, reply_to: Subscriber<Self>) {
     async fn fetch_events(self: &Arc<Self>, topics: Vec<Self::Topic>, reply_to: Subscriber<Self>) {
@@ -142,8 +176,13 @@ pub struct PubSubManager(Pubsub<MintPubSubSpec>);
 
 
 impl PubSubManager {
 impl PubSubManager {
     /// Create a new instance
     /// Create a new instance
-    pub fn new(db: DynMintDatabase) -> Arc<Self> {
-        Arc::new(Self(Pubsub::new(MintPubSubSpec::new_instance(db))))
+    pub fn new(
+        context: (
+            DynMintDatabase,
+            Arc<HashMap<PaymentProcessorKey, DynMintPayment>>,
+        ),
+    ) -> Arc<Self> {
+        Arc::new(Self(Pubsub::new(MintPubSubSpec::new_instance(context))))
     }
     }
 
 
     /// Helper function to emit a ProofState status
     /// Helper function to emit a ProofState status