Quellcode durchsuchen

Introduce proof_stream and mints_proof_stream

They both stream a proofs, from either a single mint, or a set of mints. For
the last function the mint object is also streamed
Cesar Rodas vor 1 Monat
Ursprung
Commit
f308c6001e

+ 2 - 2
crates/cdk/src/mint/subscription/manager.rs

@@ -107,8 +107,8 @@ impl PubSubManager {
         amount_issued: Amount,
     ) {
         if let Ok(mut event) = quote.try_into() {
-            event.amount_paid += amount_paid;
-            event.amount_issued += amount_issued;
+            event.amount_paid = amount_paid;
+            event.amount_issued = amount_issued;
 
             self.broadcast(event.into());
         } else {

+ 68 - 26
crates/cdk/src/wallet/streams/mod.rs

@@ -1,4 +1,4 @@
-//! Wallet waiter APIs
+//! Wallet waiter APIn
 use std::future::Future;
 use std::pin::Pin;
 
@@ -6,7 +6,7 @@ use cdk_common::amount::SplitTarget;
 use cdk_common::wallet::{MeltQuote, MintQuote};
 use cdk_common::{PaymentMethod, SpendingConditions};
 use payment::PaymentStream;
-use proof::ProofStream;
+use proof::{MultipleMintQuoteProofStream, SingleMintQuoteProofStream};
 
 use super::{Wallet, WalletSubscription};
 
@@ -23,61 +23,103 @@ type RecvFuture<'a, Ret> = Pin<Box<dyn Future<Output = Ret> + 'a>>;
 #[allow(private_bounds)]
 #[allow(clippy::enum_variant_names)]
 enum WaitableEvent {
-    MeltQuote(String),
-    MintQuote(String),
-    Bolt12MintQuote(String),
+    MeltQuote(Vec<String>),
+    MintQuote(Vec<(String, PaymentMethod)>),
+}
+
+impl From<&[MeltQuote]> for WaitableEvent {
+    fn from(events: &[MeltQuote]) -> Self {
+        WaitableEvent::MeltQuote(events.iter().map(|event| event.id.to_owned()).collect())
+    }
 }
 
 impl From<&MeltQuote> for WaitableEvent {
     fn from(event: &MeltQuote) -> Self {
-        WaitableEvent::MeltQuote(event.id.to_owned())
+        WaitableEvent::MeltQuote(vec![event.id.to_owned()])
+    }
+}
+
+impl From<&[MintQuote]> for WaitableEvent {
+    fn from(events: &[MintQuote]) -> Self {
+        WaitableEvent::MintQuote(
+            events
+                .iter()
+                .map(|event| (event.id.to_owned(), event.payment_method.clone()))
+                .collect(),
+        )
     }
 }
 
 impl From<&MintQuote> for WaitableEvent {
     fn from(event: &MintQuote) -> Self {
-        match event.payment_method {
-            PaymentMethod::Bolt11 => WaitableEvent::MintQuote(event.id.to_owned()),
-            PaymentMethod::Bolt12 => WaitableEvent::Bolt12MintQuote(event.id.to_owned()),
-            PaymentMethod::Custom(_) => WaitableEvent::MintQuote(event.id.to_owned()),
-        }
+        WaitableEvent::MintQuote(vec![(event.id.to_owned(), event.payment_method.clone())])
     }
 }
 
-impl From<WaitableEvent> for WalletSubscription {
-    fn from(val: WaitableEvent) -> Self {
-        match val {
-            WaitableEvent::MeltQuote(quote_id) => {
-                WalletSubscription::Bolt11MeltQuoteState(vec![quote_id])
+impl WaitableEvent {
+    fn into_subscription(self) -> Vec<WalletSubscription> {
+        match self {
+            WaitableEvent::MeltQuote(quotes) => {
+                vec![WalletSubscription::Bolt11MeltQuoteState(quotes)]
             }
-            WaitableEvent::MintQuote(quote_id) => {
-                WalletSubscription::Bolt11MintQuoteState(vec![quote_id])
-            }
-            WaitableEvent::Bolt12MintQuote(quote_id) => {
-                WalletSubscription::Bolt12MintQuoteState(vec![quote_id])
+            WaitableEvent::MintQuote(quotes) => {
+                let (bolt11, bolt12) = quotes.into_iter().fold(
+                    (Vec::new(), Vec::new()),
+                    |mut acc, (quote_id, payment_method)| {
+                        match payment_method {
+                            PaymentMethod::Bolt11 => acc.0.push(quote_id),
+                            PaymentMethod::Bolt12 => acc.1.push(quote_id),
+                            PaymentMethod::Custom(_) => acc.0.push(quote_id),
+                        }
+                        acc
+                    },
+                );
+
+                let mut subscriptions = Vec::new();
+
+                if !bolt11.is_empty() {
+                    subscriptions.push(WalletSubscription::Bolt11MintQuoteState(bolt11));
+                }
+
+                if !bolt12.is_empty() {
+                    subscriptions.push(WalletSubscription::Bolt12MintQuoteState(bolt12));
+                }
+
+                subscriptions
             }
         }
     }
 }
 
 impl Wallet {
+    /// Streams all proofs from a single mint quote
     #[inline(always)]
-    /// Mints a mint quote once it is paid
     pub fn proof_stream(
         &self,
         quote: MintQuote,
         amount_split_target: SplitTarget,
         spending_conditions: Option<SpendingConditions>,
-    ) -> ProofStream<'_> {
-        ProofStream::new(self, quote, amount_split_target, spending_conditions)
+    ) -> SingleMintQuoteProofStream<'_> {
+        SingleMintQuoteProofStream::new(self, quote, amount_split_target, spending_conditions)
+    }
+
+    /// Streams all new proofs for a set of mints
+    #[inline(always)]
+    pub fn mints_proof_stream(
+        &self,
+        quotes: Vec<MintQuote>,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+    ) -> MultipleMintQuoteProofStream<'_> {
+        MultipleMintQuoteProofStream::new(self, quotes, amount_split_target, spending_conditions)
     }
 
     /// Returns a BoxFuture that will wait for payment on the given event with a timeout check
     #[allow(private_bounds)]
-    pub fn payment_stream<T>(&self, event: T) -> PaymentStream<'_>
+    pub fn payment_stream<T>(&self, events: T) -> PaymentStream<'_>
     where
         T: Into<WaitableEvent>,
     {
-        PaymentStream::new(self, event.into().into())
+        PaymentStream::new(self, events.into().into_subscription())
     }
 }

+ 35 - 16
crates/cdk/src/wallet/streams/payment.rs

@@ -7,33 +7,37 @@
 use std::task::Poll;
 
 use cdk_common::{Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload};
-use futures::{FutureExt, Stream};
+use futures::future::join_all;
+use futures::stream::FuturesUnordered;
+use futures::{FutureExt, Stream, StreamExt};
 use tokio_util::sync::CancellationToken;
 
 use super::RecvFuture;
 use crate::wallet::subscription::ActiveSubscription;
 use crate::{Wallet, WalletSubscription};
 
+type SubscribeReceived = (Option<NotificationPayload<String>>, Vec<ActiveSubscription>);
+type PaymentValue = (String, Option<Amount>);
+
 /// PaymentWaiter
 pub struct PaymentStream<'a> {
-    wallet: Option<(&'a Wallet, WalletSubscription)>,
+    wallet: Option<(&'a Wallet, Vec<WalletSubscription>)>,
     is_finalized: bool,
-    active_subscription: Option<ActiveSubscription>,
+    active_subscription: Option<Vec<ActiveSubscription>>,
 
     cancel_token: CancellationToken,
 
     // Future events
-    subscriber_future: Option<RecvFuture<'a, ActiveSubscription>>,
-    subscription_receiver_future:
-        Option<RecvFuture<'static, (Option<NotificationPayload<String>>, ActiveSubscription)>>,
+    subscriber_future: Option<RecvFuture<'a, Vec<ActiveSubscription>>>,
+    subscription_receiver_future: Option<RecvFuture<'static, SubscribeReceived>>,
     cancellation_future: Option<RecvFuture<'a, ()>>,
 }
 
 impl<'a> PaymentStream<'a> {
     /// Creates a new instance of the
-    pub fn new(wallet: &'a Wallet, filter: WalletSubscription) -> Self {
+    pub fn new(wallet: &'a Wallet, filters: Vec<WalletSubscription>) -> Self {
         Self {
-            wallet: Some((wallet, filter)),
+            wallet: Some((wallet, filters)),
             is_finalized: false,
             active_subscription: None,
             cancel_token: Default::default(),
@@ -52,8 +56,10 @@ impl<'a> PaymentStream<'a> {
     /// creating a new Subscription should be polled, as any other async event. This function will
     /// return None if the subscription is already active, Some(()) otherwise
     fn poll_init_subscription(&mut self, cx: &mut std::task::Context<'_>) -> Option<()> {
-        if let Some((wallet, filter)) = self.wallet.take() {
-            self.subscriber_future = Some(Box::pin(async move { wallet.subscribe(filter).await }));
+        if let Some((wallet, filters)) = self.wallet.take() {
+            self.subscriber_future = Some(Box::pin(async move {
+                join_all(filters.into_iter().map(|w| wallet.subscribe(w))).await
+            }));
         }
 
         let mut subscriber_future = self.subscriber_future.take()?;
@@ -90,7 +96,7 @@ impl<'a> PaymentStream<'a> {
     fn poll_event(
         &mut self,
         cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Result<Option<Amount>, Error>>> {
+    ) -> Poll<Option<Result<PaymentValue, Error>>> {
         let (subscription_receiver_future, active_subscription) = (
             self.subscription_receiver_future.take(),
             self.active_subscription.take(),
@@ -106,7 +112,20 @@ impl<'a> PaymentStream<'a> {
             let mut subscription_receiver =
                 active_subscription.expect("active subscription object");
 
-            Box::pin(async move { (subscription_receiver.recv().await, subscription_receiver) })
+            Box::pin(async move {
+                let mut futures: FuturesUnordered<_> = subscription_receiver
+                    .iter_mut()
+                    .map(|sub| sub.recv())
+                    .collect();
+
+                if let Some(Some(winner)) = futures.next().await {
+                    drop(futures);
+                    return (Some(winner), subscription_receiver);
+                }
+
+                drop(futures);
+                (None, subscription_receiver)
+            })
         });
 
         match receiver.poll_unpin(cx) {
@@ -130,19 +149,19 @@ impl<'a> PaymentStream<'a> {
                             NotificationPayload::MintQuoteBolt11Response(info) => {
                                 if info.state == MintQuoteState::Paid {
                                     self.is_finalized = true;
-                                    return Poll::Ready(Some(Ok(None)));
+                                    return Poll::Ready(Some(Ok((info.quote, None))));
                                 }
                             }
                             NotificationPayload::MintQuoteBolt12Response(info) => {
                                 let to_be_issued = info.amount_paid - info.amount_issued;
                                 if to_be_issued > Amount::ZERO {
-                                    return Poll::Ready(Some(Ok(Some(to_be_issued))));
+                                    return Poll::Ready(Some(Ok((info.quote, Some(to_be_issued)))));
                                 }
                             }
                             NotificationPayload::MeltQuoteBolt11Response(info) => {
                                 if info.state == MeltQuoteState::Paid {
                                     self.is_finalized = true;
-                                    return Poll::Ready(Some(Ok(None)));
+                                    return Poll::Ready(Some(Ok((info.quote, None))));
                                 }
                             }
                             _ => {}
@@ -160,7 +179,7 @@ impl<'a> PaymentStream<'a> {
 }
 
 impl Stream for PaymentStream<'_> {
-    type Item = Result<Option<Amount>, Error>;
+    type Item = Result<PaymentValue, Error>;
 
     fn poll_next(
         self: std::pin::Pin<&mut Self>,

+ 84 - 31
crates/cdk/src/wallet/streams/proof.rs

@@ -4,6 +4,7 @@
 //!
 //! Bolt11 will mint once
 
+use std::collections::HashMap;
 use std::task::Poll;
 
 use cdk_common::amount::SplitTarget;
@@ -16,31 +17,35 @@ use super::payment::PaymentStream;
 use super::{RecvFuture, WaitableEvent};
 use crate::Wallet;
 
-/// Mint waiter
-pub struct ProofStream<'a> {
+/// Proofs for many mint quotes, as they are minted, in streams
+pub struct MultipleMintQuoteProofStream<'a> {
     payment_stream: PaymentStream<'a>,
     wallet: &'a Wallet,
-    mint_quote: MintQuote,
+    quotes: HashMap<String, MintQuote>,
     amount_split_target: SplitTarget,
     spending_conditions: Option<SpendingConditions>,
-    minting_future: Option<RecvFuture<'a, Result<Proofs, Error>>>,
+    minting_future: Option<RecvFuture<'a, Result<(MintQuote, Proofs), Error>>>,
 }
 
-impl<'a> ProofStream<'a> {
+impl<'a> MultipleMintQuoteProofStream<'a> {
     /// Create a new Stream
     pub fn new(
         wallet: &'a Wallet,
-        mint_quote: MintQuote,
+        quotes: Vec<MintQuote>,
         amount_split_target: SplitTarget,
         spending_conditions: Option<SpendingConditions>,
     ) -> Self {
-        let filter: WaitableEvent = (&mint_quote).into();
+        let filter: WaitableEvent = quotes.as_slice().into();
+
         Self {
-            payment_stream: PaymentStream::new(wallet, filter.into()),
+            payment_stream: PaymentStream::new(wallet, filter.into_subscription()),
             wallet,
             amount_split_target,
             spending_conditions,
-            mint_quote,
+            quotes: quotes
+                .into_iter()
+                .map(|mint_quote| (mint_quote.id.clone(), mint_quote))
+                .collect(),
             minting_future: None,
         }
     }
@@ -51,8 +56,8 @@ impl<'a> ProofStream<'a> {
     }
 }
 
-impl Stream for ProofStream<'_> {
-    type Item = Result<Proofs, Error>;
+impl Stream for MultipleMintQuoteProofStream<'_> {
+    type Item = Result<(MintQuote, Proofs), Error>;
 
     fn poll_next(
         self: std::pin::Pin<&mut Self>,
@@ -75,18 +80,24 @@ impl Stream for ProofStream<'_> {
             Poll::Ready(result) => match result {
                 None => Poll::Ready(None),
                 Some(result) => {
-                    let amount = match result {
+                    let (quote_id, amount) = match result {
                         Err(err) => {
                             tracing::error!(
-                                "Error while waiting for payment for {}",
-                                this.mint_quote.id
+                                "Error while waiting for payment for {:?}",
+                                this.quotes.keys().collect::<Vec<_>>()
                             );
                             return Poll::Ready(Some(Err(err)));
                         }
                         Ok(amount) => amount,
                     };
 
-                    let mint_quote = this.mint_quote.clone();
+                    let mint_quote = if let Some(quote) = this.quotes.get(&quote_id) {
+                        quote.clone()
+                    } else {
+                        tracing::error!("Cannot find mint_quote {} internally", quote_id);
+                        return Poll::Ready(Some(Err(Error::UnknownQuote)));
+                    };
+
                     let amount_split_target = this.amount_split_target.clone();
                     let spending_conditions = this.spending_conditions.clone();
                     let wallet = this.wallet;
@@ -99,21 +110,19 @@ impl Stream for ProofStream<'_> {
 
                     let mut minting_future = Box::pin(async move {
                         match mint_quote.payment_method {
-                            PaymentMethod::Bolt11 => {
-                                wallet
-                                    .mint(&mint_quote.id, amount_split_target, spending_conditions)
-                                    .await
-                            }
-                            PaymentMethod::Bolt12 => {
-                                wallet
-                                    .mint_bolt12(
-                                        &mint_quote.id,
-                                        amount,
-                                        amount_split_target,
-                                        spending_conditions,
-                                    )
-                                    .await
-                            }
+                            PaymentMethod::Bolt11 => wallet
+                                .mint(&mint_quote.id, amount_split_target, spending_conditions)
+                                .await
+                                .map(|proofs| (mint_quote, proofs)),
+                            PaymentMethod::Bolt12 => wallet
+                                .mint_bolt12(
+                                    &mint_quote.id,
+                                    amount,
+                                    amount_split_target,
+                                    spending_conditions,
+                                )
+                                .await
+                                .map(|proofs| (mint_quote, proofs)),
                             _ => Err(Error::UnsupportedPaymentMethod),
                         }
                     });
@@ -123,10 +132,54 @@ impl Stream for ProofStream<'_> {
                             this.minting_future = Some(minting_future);
                             Poll::Pending
                         }
-                        Poll::Ready(proofs) => Poll::Ready(Some(proofs)),
+                        Poll::Ready(result) => Poll::Ready(Some(result)),
                     }
                 }
             },
         }
     }
 }
+
+/// Proofs for a single mint quote
+pub struct SingleMintQuoteProofStream<'a>(MultipleMintQuoteProofStream<'a>);
+
+impl<'a> SingleMintQuoteProofStream<'a> {
+    /// Create a new Stream
+    pub fn new(
+        wallet: &'a Wallet,
+        quote: MintQuote,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+    ) -> Self {
+        Self(MultipleMintQuoteProofStream::new(
+            wallet,
+            vec![quote],
+            amount_split_target,
+            spending_conditions,
+        ))
+    }
+
+    /// Get cancellation token
+    pub fn get_cancel_token(&self) -> CancellationToken {
+        self.0.payment_stream.get_cancel_token()
+    }
+}
+
+impl Stream for SingleMintQuoteProofStream<'_> {
+    type Item = Result<Proofs, Error>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        match this.0.poll_next_unpin(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(result) => match result {
+                None => Poll::Ready(None),
+                Some(Err(err)) => Poll::Ready(Some(Err(err))),
+                Some(Ok((_, proofs))) => Poll::Ready(Some(Ok(proofs))),
+            },
+        }
+    }
+}