Răsfoiți Sursa

Introduce Future Streams for Payments and Minting Proofs (#985)

* Introduce Future Streams for Payments and Minting Proofs

Introduce Future Streams (`ProofStream`, `PaymentStream`) for Payments and
Proofs, an easier to use interface, async friendly, to interact for the mint
waiting for payments of mints for Bolt11 and Bolt12.

---------

Co-authored-by: thesimplekid <tsk@thesimplekid.com>
C 2 luni în urmă
părinte
comite
218b39a670

+ 10 - 20
crates/cdk-cli/src/sub_commands/mint.rs

@@ -1,5 +1,4 @@
 use std::str::FromStr;
-use std::time::Duration;
 
 use anyhow::{anyhow, Result};
 use cdk::amount::SplitTarget;
@@ -7,7 +6,7 @@ use cdk::mint_url::MintUrl;
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::{CurrencyUnit, PaymentMethod};
 use cdk::wallet::MultiMintWallet;
-use cdk::Amount;
+use cdk::{Amount, StreamExt};
 use clap::Args;
 use serde::{Deserialize, Serialize};
 
@@ -96,26 +95,17 @@ pub async fn mint(
 
     let mut amount_minted = Amount::ZERO;
 
-    loop {
-        let proofs = wallet
-            .wait_and_mint_quote(
-                quote.clone(),
-                SplitTarget::default(),
-                None,
-                Duration::from_secs(sub_command_args.wait_duration),
-            )
-            .await?;
+    let mut proof_streams = wallet.proof_stream(quote, SplitTarget::default(), None);
 
+    while let Some(proofs) = proof_streams.next().await {
+        let proofs = match proofs {
+            Ok(proofs) => proofs,
+            Err(err) => {
+                tracing::error!("Proof streams ended with {:?}", err);
+                break;
+            }
+        };
         amount_minted += proofs.total_amount()?;
-
-        if sub_command_args.quote_id.is_none() || quote.payment_method == PaymentMethod::Bolt11 {
-            break;
-        } else {
-            println!(
-                "Minted {} waiting for next payment.",
-                proofs.total_amount()?
-            );
-        }
     }
 
     println!("Received {amount_minted} from mint {mint_url}");

+ 4 - 3
crates/cdk-common/src/error.rs

@@ -111,13 +111,14 @@ pub enum Error {
     #[error("Could not parse bolt12")]
     Bolt12parse,
 
+    /// BIP353 address parsing error
+    #[error("Failed to parse BIP353 address: {0}")]
+    Bip353Parse(String),
+
     /// Operation timeout
     #[error("Operation timeout")]
     Timeout,
 
-    /// BIP353 address parsing error
-    #[error("Failed to parse BIP353 address: {0}")]
-    Bip353Parse(String),
     /// BIP353 address resolution error
     #[error("Failed to resolve BIP353 address: {0}")]
     Bip353Resolve(String),

+ 5 - 8
crates/cdk-integration-tests/src/init_pure_tests.rs

@@ -3,7 +3,6 @@ use std::fmt::{Debug, Formatter};
 use std::path::PathBuf;
 use std::str::FromStr;
 use std::sync::Arc;
-use std::time::Duration;
 use std::{env, fs};
 
 use anyhow::{anyhow, bail, Result};
@@ -23,7 +22,7 @@ use cdk::nuts::{
 use cdk::types::{FeeReserve, QuoteTTL};
 use cdk::util::unix_time;
 use cdk::wallet::{AuthWallet, MintConnector, Wallet, WalletBuilder};
-use cdk::{Amount, Error, Mint};
+use cdk::{Amount, Error, Mint, StreamExt};
 use cdk_fake_wallet::FakeWallet;
 use tokio::sync::RwLock;
 use tracing_subscriber::EnvFilter;
@@ -361,12 +360,10 @@ pub async fn fund_wallet(
     let desired_amount = Amount::from(amount);
     let quote = wallet.mint_quote(desired_amount, None).await?;
 
-    wallet
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await?;
-
     Ok(wallet
-        .mint(&quote.id, split_target.unwrap_or_default(), None)
-        .await?
+        .proof_stream(quote, split_target.unwrap_or_default(), None)
+        .next()
+        .await
+        .expect("proofs")?
         .total_amount()?)
 }

+ 5 - 9
crates/cdk-integration-tests/src/lib.rs

@@ -24,11 +24,10 @@ use anyhow::{anyhow, bail, Result};
 use cashu::Bolt11Invoice;
 use cdk::amount::{Amount, SplitTarget};
 use cdk::nuts::State;
-use cdk::Wallet;
+use cdk::{StreamExt, Wallet};
 use cdk_fake_wallet::create_fake_invoice;
 use init_regtest::{get_lnd_dir, LND_RPC_ADDR};
 use ln_regtest_rs::ln_client::{ClnClient, LightningClient, LndClient};
-use tokio::time::Duration;
 
 use crate::init_regtest::get_cln_dir;
 
@@ -44,15 +43,12 @@ pub async fn fund_wallet(wallet: Arc<Wallet>, amount: Amount) {
         .await
         .expect("Could not get mint quote");
 
-    wallet
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await
-        .expect("wait for mint failed");
-
     let _proofs = wallet
-        .mint(&quote.id, SplitTarget::default(), None)
+        .proof_stream(quote, SplitTarget::default(), None)
+        .next()
         .await
-        .expect("Could not mint");
+        .expect("proofs")
+        .expect("proofs with no error");
 }
 
 pub fn get_mint_url_from_env() -> String {

+ 39 - 40
crates/cdk-integration-tests/tests/bolt12.rs

@@ -1,7 +1,6 @@
 use std::env;
 use std::path::PathBuf;
 use std::sync::Arc;
-use std::time::Duration;
 
 use anyhow::{bail, Result};
 use bip39::Mnemonic;
@@ -116,16 +115,14 @@ async fn test_regtest_bolt12_mint_multiple() -> Result<()> {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await?;
-
-    wallet.mint_bolt12_quote_state(&mint_quote.id).await?;
-
     let proofs = wallet
-        .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None)
-        .await
-        .unwrap();
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
+        .await?;
 
     assert_eq!(proofs.total_amount().unwrap(), 10.into());
 
@@ -134,16 +131,14 @@ async fn test_regtest_bolt12_mint_multiple() -> Result<()> {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await?;
-
-    wallet.mint_bolt12_quote_state(&mint_quote.id).await?;
-
     let proofs = wallet
-        .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None)
-        .await
-        .unwrap();
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
+        .await?;
 
     assert_eq!(proofs.total_amount().unwrap(), 11.into());
 
@@ -187,12 +182,13 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> {
         .pay_bolt12_offer(None, quote_one.request.clone())
         .await?;
 
-    wallet_one
-        .wait_for_payment(&quote_one, Duration::from_secs(60))
-        .await?;
-
     let proofs_one = wallet_one
-        .mint_bolt12(&quote_one.id, None, SplitTarget::default(), None)
+        .wait_and_mint_quote(
+            quote_one.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await?;
 
     assert_eq!(proofs_one.total_amount()?, 10_000.into());
@@ -205,13 +201,15 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> {
         .pay_bolt12_offer(None, quote_two.request.clone())
         .await?;
 
-    wallet_two
-        .wait_for_payment(&quote_two, Duration::from_secs(60))
-        .await?;
-
     let proofs_two = wallet_two
-        .mint_bolt12(&quote_two.id, None, SplitTarget::default(), None)
+        .wait_and_mint_quote(
+            quote_two.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await?;
+
     assert_eq!(proofs_two.total_amount()?, 15_000.into());
 
     let offer = cln_client
@@ -280,20 +278,19 @@ async fn test_regtest_bolt12_melt() -> Result<()> {
         .pay_bolt12_offer(None, mint_quote.request.clone())
         .await?;
 
-    // Wait for payment to be processed
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let _proofs = wallet
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await?;
 
     let offer = cln_client
         .get_bolt12_offer(Some(10_000), true, "hhhhhhhh".to_string())
         .await?;
 
-    let _proofs = wallet
-        .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None)
-        .await
-        .unwrap();
-
     let quote = wallet.melt_bolt12_quote(offer.to_string(), None).await?;
 
     let melt = wallet.melt(&quote.id).await?;
@@ -340,12 +337,14 @@ async fn test_regtest_bolt12_mint_extra() -> Result<()> {
         .pay_bolt12_offer(Some(pay_amount_msats), mint_quote.request.clone())
         .await?;
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(10))
-        .await?;
+    let payment = wallet
+        .wait_for_payment(&mint_quote, tokio::time::Duration::from_secs(15))
+        .await?
+        .unwrap();
 
     let state = wallet.mint_bolt12_quote_state(&mint_quote.id).await?;
 
+    assert_eq!(payment, state.amount_paid);
     assert_eq!(state.amount_paid, (pay_amount_msats / 1_000).into());
     assert_eq!(state.amount_issued, Amount::ZERO);
 

+ 6 - 6
crates/cdk-integration-tests/tests/fake_auth.rs

@@ -1,7 +1,6 @@
 use std::env;
 use std::str::FromStr;
 use std::sync::Arc;
-use std::time::Duration;
 
 use bip39::Mnemonic;
 use cashu::{MintAuthRequest, MintInfo};
@@ -331,15 +330,16 @@ async fn test_mint_with_auth() {
     let mint_amount: Amount = 100.into();
 
     let quote = wallet.mint_quote(mint_amount, None).await.unwrap();
+
     let proofs = wallet
         .wait_and_mint_quote(
-            quote,
-            Default::default(),
-            Default::default(),
-            Duration::from_secs(10),
+            quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
         )
         .await
-        .unwrap();
+        .expect("payment");
 
     assert!(proofs.total_amount().expect("Could not get proofs amount") == mint_amount);
 }

+ 120 - 139
crates/cdk-integration-tests/tests/fake_wallet.rs

@@ -15,7 +15,6 @@
 //! - Duplicate proof detection
 
 use std::sync::Arc;
-use std::time::Duration;
 
 use bip39::Mnemonic;
 use cashu::Amount;
@@ -27,6 +26,7 @@ use cdk::nuts::{
 };
 use cdk::wallet::types::TransactionDirection;
 use cdk::wallet::{HttpClient, MintConnector, Wallet};
+use cdk::StreamExt;
 use cdk_fake_wallet::{create_fake_invoice, FakeInvoiceDescription};
 use cdk_integration_tests::attempt_to_swap_pending;
 use cdk_sqlite::wallet::memory;
@@ -47,15 +47,13 @@ async fn test_fake_tokens_pending() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Pending,
@@ -90,15 +88,13 @@ async fn test_fake_melt_payment_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Unknown,
@@ -156,15 +152,13 @@ async fn test_fake_melt_payment_fail_and_check() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Unknown,
@@ -205,15 +199,13 @@ async fn test_fake_melt_payment_return_fail_status() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Failed,
@@ -269,15 +261,13 @@ async fn test_fake_melt_payment_error_unknown() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Failed,
@@ -333,15 +323,13 @@ async fn test_fake_melt_payment_err_paid() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Failed,
@@ -375,15 +363,13 @@ async fn test_fake_melt_change_in_quote() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let transaction = wallet
         .list_transactions(Some(TransactionDirection::Incoming))
@@ -445,15 +431,13 @@ async fn test_fake_mint_with_witness() {
     .expect("failed to create new wallet");
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let mint_amount = proofs.total_amount().unwrap();
 
@@ -474,10 +458,13 @@ async fn test_fake_mint_without_witness() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let http_client = HttpClient::new(MINT_URL.parse().unwrap(), None);
 
@@ -515,10 +502,13 @@ async fn test_fake_mint_with_wrong_witness() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let http_client = HttpClient::new(MINT_URL.parse().unwrap(), None);
 
@@ -562,10 +552,13 @@ async fn test_fake_mint_inflated() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -621,10 +614,13 @@ async fn test_fake_mint_multiple_units() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -701,15 +697,13 @@ async fn test_fake_mint_multiple_unit_swap() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let wallet_usd = Wallet::new(
         MINT_URL,
@@ -723,15 +717,14 @@ async fn test_fake_mint_multiple_unit_swap() {
 
     let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams =
+        wallet_usd.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let usd_proofs = wallet_usd
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let usd_proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -817,15 +810,13 @@ async fn test_fake_mint_multiple_unit_melt() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     println!("Minted sat");
 
@@ -841,15 +832,14 @@ async fn test_fake_mint_multiple_unit_melt() {
     let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap();
     println!("Minted quote usd");
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams =
+        wallet_usd.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let usd_proofs = wallet_usd
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let usd_proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     {
         let inputs: Proofs = vec![
@@ -937,15 +927,13 @@ async fn test_fake_mint_input_output_mismatch() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let wallet_usd = Wallet::new(
         MINT_URL,
@@ -996,15 +984,14 @@ async fn test_fake_mint_swap_inflated() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
+
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
     let pre_mint =
         PreMintSecrets::random(active_keyset_id, 101.into(), &SplitTarget::None).unwrap();
@@ -1041,15 +1028,14 @@ async fn test_fake_mint_swap_spend_after_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
+
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
     let pre_mint =
@@ -1113,15 +1099,14 @@ async fn test_fake_mint_melt_spend_after_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
+
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
     let pre_mint =
@@ -1186,15 +1171,13 @@ async fn test_fake_mint_duplicate_proofs_swap() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -1267,15 +1250,13 @@ async fn test_fake_mint_duplicate_proofs_melt() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let inputs = vec![proofs[0].clone(), proofs[0].clone()];
 

+ 37 - 37
crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs

@@ -109,15 +109,15 @@ async fn test_happy_mint_melt_round_trip() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-
     let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     let mint_amount = proofs.total_amount().unwrap();
 
@@ -231,15 +231,15 @@ async fn test_happy_mint() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-
     let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     let mint_amount = proofs.total_amount().unwrap();
 
@@ -279,15 +279,15 @@ async fn test_restore() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = wallet
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     assert_eq!(wallet.total_balance().await.unwrap(), 100.into());
 
@@ -359,15 +359,15 @@ async fn test_fake_melt_change_in_quote() {
 
     pay_if_regtest(&get_test_temp_dir(), &bolt11).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = wallet
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     let invoice = create_invoice_for_env(Some(9)).await.unwrap();
 
@@ -429,15 +429,15 @@ async fn test_pay_invoice_twice() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-
     let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     let mint_amount = proofs.total_amount().unwrap();
 

+ 37 - 33
crates/cdk-integration-tests/tests/regtest.rs

@@ -51,15 +51,15 @@ async fn test_internal_payment() {
         .await
         .expect("failed to pay invoice");
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let _proofs = wallet
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
-
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
-        .await
-        .unwrap();
+        .expect("payment");
 
     assert!(wallet.total_balance().await.unwrap() == 100.into());
 
@@ -83,15 +83,15 @@ async fn test_internal_payment() {
 
     let _melted = wallet.melt(&melt.id).await.unwrap();
 
-    wallet_2
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let _proofs = wallet_2
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
-
-    let _wallet_2_mint = wallet_2
-        .mint(&mint_quote.id, SplitTarget::default(), None)
-        .await
-        .unwrap();
+        .expect("payment");
 
     // let check_paid = match get_mint_port("0") {
     //     8085 => {
@@ -230,28 +230,32 @@ async fn test_multimint_melt() {
         .pay_invoice(quote.request.clone())
         .await
         .expect("failed to pay invoice");
-    wallet1
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-    wallet1
-        .mint(&quote.id, SplitTarget::default(), None)
+
+    let _proofs = wallet1
+        .wait_and_mint_quote(
+            quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     let quote = wallet2.mint_quote(mint_amount, None).await.unwrap();
     ln_client
         .pay_invoice(quote.request.clone())
         .await
         .expect("failed to pay invoice");
-    wallet2
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-    wallet2
-        .mint(&quote.id, SplitTarget::default(), None)
+
+    let _proofs = wallet2
+        .wait_and_mint_quote(
+            quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     // Get an invoice
     let invoice = ln_client.create_invoice(Some(50)).await.unwrap();
@@ -305,10 +309,10 @@ async fn test_cached_mint() {
         .await
         .expect("failed to pay invoice");
 
-    wallet
-        .wait_for_payment(&quote, Duration::from_secs(60))
+    let _proofs = wallet
+        .wait_for_payment(&quote, tokio::time::Duration::from_secs(15))
         .await
-        .unwrap();
+        .expect("payment");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
     let http_client = HttpClient::new(get_mint_url_from_env().parse().unwrap(), None);

+ 16 - 25
crates/cdk-integration-tests/tests/test_fees.rs

@@ -1,6 +1,5 @@
 use std::str::FromStr;
 use std::sync::Arc;
-use std::time::Duration;
 
 use bip39::Mnemonic;
 use cashu::{Bolt11Invoice, ProofsMethods};
@@ -28,23 +27,15 @@ async fn test_swap() {
     let invoice = Bolt11Invoice::from_str(&mint_quote.request).unwrap();
     pay_if_regtest(&get_temp_dir(), &invoice).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
-        .await
-        .unwrap();
-
-    let proofs: Vec<Amount> = wallet
-        .get_unspent_proofs()
+    let proofs = wallet
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap()
-        .iter()
-        .map(|p| p.amount)
-        .collect();
+        .expect("payment");
 
     println!("{:?}", proofs);
 
@@ -96,15 +87,15 @@ async fn test_fake_melt_change_in_quote() {
 
     pay_if_regtest(&get_temp_dir(), &bolt11).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = wallet
+        .wait_and_mint_quote(
+            mint_quote.clone(),
+            SplitTarget::default(),
+            None,
+            tokio::time::Duration::from_secs(15),
+        )
         .await
-        .unwrap();
+        .expect("payment");
 
     let invoice_amount = 9;
 

+ 8 - 0
crates/cdk/Cargo.toml

@@ -50,6 +50,7 @@ sync_wrapper = "0.1.2"
 bech32 = "0.9.1"
 arc-swap = "1.7.1"
 zeroize = "1"
+tokio-util.workspace = true
 
 [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
 tokio = { workspace = true, features = [
@@ -102,6 +103,13 @@ required-features = ["wallet", "auth"]
 name = "bip353"
 required-features = ["wallet", "bip353"]
 
+[[example]]
+name = "mint-token-bolt12-with-stream"
+required-features = ["wallet"]
+[[example]]
+name = "mint-token-bolt12"
+required-features = ["wallet"]
+
 [dev-dependencies]
 rand.workspace = true
 cdk-sqlite.workspace = true

+ 72 - 0
crates/cdk/examples/mint-token-bolt12-with-stream.rs

@@ -0,0 +1,72 @@
+use std::sync::Arc;
+
+use cdk::error::Error;
+use cdk::nuts::nut00::ProofsMethods;
+use cdk::nuts::CurrencyUnit;
+use cdk::wallet::{SendOptions, Wallet};
+use cdk::{Amount, StreamExt};
+use cdk_sqlite::wallet::memory;
+use rand::random;
+use tracing_subscriber::EnvFilter;
+
+#[tokio::main]
+async fn main() -> Result<(), Error> {
+    let default_filter = "debug";
+
+    let sqlx_filter = "sqlx=warn,hyper_util=warn,reqwest=warn,rustls=warn";
+
+    let env_filter = EnvFilter::new(format!("{},{}", default_filter, sqlx_filter));
+
+    // Parse input
+    tracing_subscriber::fmt().with_env_filter(env_filter).init();
+
+    // Initialize the memory store for the wallet
+    let localstore = Arc::new(memory::empty().await?);
+
+    // Generate a random seed for the wallet
+    let seed = random::<[u8; 64]>();
+
+    // Define the mint URL and currency unit
+    let mint_url = "https://fake.thesimplekid.dev";
+    let unit = CurrencyUnit::Sat;
+    let amount = Amount::from(10);
+
+    // Create a new wallet
+    let wallet = Wallet::new(mint_url, unit, localstore, seed, None)?;
+
+    let quotes = vec![
+        wallet.mint_bolt12_quote(None, None).await?,
+        wallet.mint_bolt12_quote(None, None).await?,
+        wallet.mint_bolt12_quote(None, None).await?,
+    ];
+
+    let mut stream = wallet.mints_proof_stream(quotes, Default::default(), None);
+
+    let stop = stream.get_cancel_token();
+
+    let mut processed = 0;
+
+    while let Some(proofs) = stream.next().await {
+        let (mint_quote, proofs) = proofs?;
+
+        // Mint the received amount
+        let receive_amount = proofs.total_amount()?;
+        println!("Received {} from mint {}", receive_amount, mint_quote.id);
+
+        // Send a token with the specified amount
+        let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?;
+        let token = prepared_send.confirm(None).await?;
+        println!("Token:");
+        println!("{}", token);
+
+        processed += 1;
+
+        if processed == 3 {
+            stop.cancel()
+        }
+    }
+
+    println!("Stopped the loop after {} quotes being minted", processed);
+
+    Ok(())
+}

+ 59 - 0
crates/cdk/examples/mint-token-bolt12.rs

@@ -0,0 +1,59 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use cdk::error::Error;
+use cdk::nuts::nut00::ProofsMethods;
+use cdk::nuts::CurrencyUnit;
+use cdk::wallet::{SendOptions, Wallet};
+use cdk::Amount;
+use cdk_sqlite::wallet::memory;
+use rand::random;
+use tracing_subscriber::EnvFilter;
+
+#[tokio::main]
+async fn main() -> Result<(), Error> {
+    let default_filter = "debug";
+
+    let sqlx_filter = "sqlx=warn,hyper_util=warn,reqwest=warn,rustls=warn";
+
+    let env_filter = EnvFilter::new(format!("{},{}", default_filter, sqlx_filter));
+
+    // Parse input
+    tracing_subscriber::fmt().with_env_filter(env_filter).init();
+
+    // Initialize the memory store for the wallet
+    let localstore = Arc::new(memory::empty().await?);
+
+    // Generate a random seed for the wallet
+    let seed = random::<[u8; 64]>();
+
+    // Define the mint URL and currency unit
+    let mint_url = "https://fake.thesimplekid.dev";
+    let unit = CurrencyUnit::Sat;
+    let amount = Amount::from(10);
+
+    // Create a new wallet
+    let wallet = Wallet::new(mint_url, unit, localstore, seed, None)?;
+
+    let quote = wallet.mint_bolt12_quote(None, None).await?;
+    let proofs = wallet
+        .wait_and_mint_quote(
+            quote,
+            Default::default(),
+            Default::default(),
+            Duration::from_secs(10),
+        )
+        .await?;
+
+    // Mint the received amount
+    let receive_amount = proofs.total_amount()?;
+    println!("Received {} from mint {}", receive_amount, mint_url);
+
+    // Send a token with the specified amount
+    let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?;
+    let token = prepared_send.confirm(None).await?;
+    println!("Token:");
+    println!("{}", token);
+
+    Ok(())
+}

+ 4 - 0
crates/cdk/src/lib.rs

@@ -64,3 +64,7 @@ pub use self::wallet::HttpClient;
 /// Result
 #[doc(hidden)]
 pub type Result<T, E = Box<dyn std::error::Error>> = std::result::Result<T, E>;
+
+/// Re-export futures::Stream
+#[cfg(any(feature = "wallet", feature = "mint"))]
+pub use futures::{Stream, StreamExt};

+ 2 - 2
crates/cdk/src/mint/subscription/manager.rs

@@ -107,8 +107,8 @@ impl PubSubManager {
         amount_issued: Amount,
     ) {
         if let Ok(mut event) = quote.try_into() {
-            event.amount_paid += amount_paid;
-            event.amount_issued += amount_issued;
+            event.amount_paid = amount_paid;
+            event.amount_issued = amount_issued;
 
             self.broadcast(event.into());
         } else {

+ 2 - 1
crates/cdk/src/wallet/mod.rs

@@ -42,11 +42,12 @@ pub mod multi_mint_wallet;
 mod proofs;
 mod receive;
 mod send;
+#[cfg(not(target_arch = "wasm32"))]
+mod streams;
 pub mod subscription;
 mod swap;
 mod transactions;
 pub mod util;
-mod wait;
 
 #[cfg(feature = "auth")]
 pub use auth::{AuthMintConnector, AuthWallet};

+ 122 - 0
crates/cdk/src/wallet/streams/mod.rs

@@ -0,0 +1,122 @@
+//! Wallet waiter APIn
+use std::future::Future;
+use std::pin::Pin;
+
+use cdk_common::amount::SplitTarget;
+use cdk_common::wallet::{MeltQuote, MintQuote};
+use cdk_common::{PaymentMethod, SpendingConditions};
+use payment::PaymentStream;
+use proof::{MultipleMintQuoteProofStream, SingleMintQuoteProofStream};
+
+use super::{Wallet, WalletSubscription};
+
+pub mod payment;
+pub mod proof;
+mod wait;
+
+/// Shared type
+type RecvFuture<'a, Ret> = Pin<Box<dyn Future<Output = Ret> + Send + 'a>>;
+
+#[allow(private_bounds)]
+#[allow(clippy::enum_variant_names)]
+enum WaitableEvent {
+    MeltQuote(Vec<String>),
+    MintQuote(Vec<(String, PaymentMethod)>),
+}
+
+impl From<&[MeltQuote]> for WaitableEvent {
+    fn from(events: &[MeltQuote]) -> Self {
+        WaitableEvent::MeltQuote(events.iter().map(|event| event.id.to_owned()).collect())
+    }
+}
+
+impl From<&MeltQuote> for WaitableEvent {
+    fn from(event: &MeltQuote) -> Self {
+        WaitableEvent::MeltQuote(vec![event.id.to_owned()])
+    }
+}
+
+impl From<&[MintQuote]> for WaitableEvent {
+    fn from(events: &[MintQuote]) -> Self {
+        WaitableEvent::MintQuote(
+            events
+                .iter()
+                .map(|event| (event.id.to_owned(), event.payment_method.clone()))
+                .collect(),
+        )
+    }
+}
+
+impl From<&MintQuote> for WaitableEvent {
+    fn from(event: &MintQuote) -> Self {
+        WaitableEvent::MintQuote(vec![(event.id.to_owned(), event.payment_method.clone())])
+    }
+}
+
+impl WaitableEvent {
+    fn into_subscription(self) -> Vec<WalletSubscription> {
+        match self {
+            WaitableEvent::MeltQuote(quotes) => {
+                vec![WalletSubscription::Bolt11MeltQuoteState(quotes)]
+            }
+            WaitableEvent::MintQuote(quotes) => {
+                let (bolt11, bolt12) = quotes.into_iter().fold(
+                    (Vec::new(), Vec::new()),
+                    |mut acc, (quote_id, payment_method)| {
+                        match payment_method {
+                            PaymentMethod::Bolt11 => acc.0.push(quote_id),
+                            PaymentMethod::Bolt12 => acc.1.push(quote_id),
+                            PaymentMethod::Custom(_) => acc.0.push(quote_id),
+                        }
+                        acc
+                    },
+                );
+
+                let mut subscriptions = Vec::new();
+
+                if !bolt11.is_empty() {
+                    subscriptions.push(WalletSubscription::Bolt11MintQuoteState(bolt11));
+                }
+
+                if !bolt12.is_empty() {
+                    subscriptions.push(WalletSubscription::Bolt12MintQuoteState(bolt12));
+                }
+
+                subscriptions
+            }
+        }
+    }
+}
+
+impl Wallet {
+    /// Streams all proofs from a single mint quote
+    #[inline(always)]
+    pub fn proof_stream(
+        &self,
+        quote: MintQuote,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+    ) -> SingleMintQuoteProofStream<'_> {
+        SingleMintQuoteProofStream::new(self, quote, amount_split_target, spending_conditions)
+    }
+
+    /// Streams all new proofs for a set of mints
+    #[inline(always)]
+    pub fn mints_proof_stream(
+        &self,
+        quotes: Vec<MintQuote>,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+    ) -> MultipleMintQuoteProofStream<'_> {
+        MultipleMintQuoteProofStream::new(self, quotes, amount_split_target, spending_conditions)
+    }
+
+    /// Returns a BoxFuture that will wait for payment on the given event with a timeout check
+    #[allow(private_bounds)]
+    pub fn payment_stream<T>(&self, events: T) -> PaymentStream<'_>
+    where
+        T: Into<WaitableEvent>,
+    {
+        PaymentStream::new(self, events.into().into_subscription())
+    }
+}

+ 205 - 0
crates/cdk/src/wallet/streams/payment.rs

@@ -0,0 +1,205 @@
+//! Payment Stream
+//!
+//! This future Stream will wait events for a Mint Quote be paid. If it is for a Bolt12 it will not stop
+//! but it will eventually error on a Timeout.
+//!
+//! Bolt11 will emit a single event.
+use std::task::Poll;
+
+use cdk_common::{Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload};
+use futures::future::join_all;
+use futures::stream::FuturesUnordered;
+use futures::{FutureExt, Stream, StreamExt};
+use tokio_util::sync::CancellationToken;
+
+use super::RecvFuture;
+use crate::wallet::subscription::ActiveSubscription;
+use crate::{Wallet, WalletSubscription};
+
+type SubscribeReceived = (Option<NotificationPayload<String>>, Vec<ActiveSubscription>);
+type PaymentValue = (String, Option<Amount>);
+
+/// PaymentWaiter
+pub struct PaymentStream<'a> {
+    wallet: Option<(&'a Wallet, Vec<WalletSubscription>)>,
+    is_finalized: bool,
+    active_subscription: Option<Vec<ActiveSubscription>>,
+
+    cancel_token: CancellationToken,
+
+    // Future events
+    subscriber_future: Option<RecvFuture<'a, Vec<ActiveSubscription>>>,
+    subscription_receiver_future: Option<RecvFuture<'static, SubscribeReceived>>,
+    cancellation_future: Option<RecvFuture<'a, ()>>,
+}
+
+impl<'a> PaymentStream<'a> {
+    /// Creates a new instance of the
+    pub fn new(wallet: &'a Wallet, filters: Vec<WalletSubscription>) -> Self {
+        Self {
+            wallet: Some((wallet, filters)),
+            is_finalized: false,
+            active_subscription: None,
+            cancel_token: Default::default(),
+            subscriber_future: None,
+            subscription_receiver_future: None,
+            cancellation_future: None,
+        }
+    }
+
+    /// Get cancellation token
+    pub fn get_cancel_token(&self) -> CancellationToken {
+        self.cancel_token.clone()
+    }
+
+    /// Creating a wallet subscription is an async event, this may change in the future, but for now,
+    /// creating a new Subscription should be polled, as any other async event. This function will
+    /// return None if the subscription is already active, Some(()) otherwise
+    fn poll_init_subscription(&mut self, cx: &mut std::task::Context<'_>) -> Option<()> {
+        if let Some((wallet, filters)) = self.wallet.take() {
+            self.subscriber_future = Some(Box::pin(async move {
+                join_all(filters.into_iter().map(|w| wallet.subscribe(w))).await
+            }));
+        }
+
+        let mut subscriber_future = self.subscriber_future.take()?;
+
+        match subscriber_future.poll_unpin(cx) {
+            Poll::Pending => {
+                self.subscriber_future = Some(subscriber_future);
+                Some(())
+            }
+            Poll::Ready(active_subscription) => {
+                self.active_subscription = Some(active_subscription);
+                None
+            }
+        }
+    }
+
+    /// Checks if the stream has been externally cancelled
+    fn poll_cancel(&mut self, cx: &mut std::task::Context<'_>) -> bool {
+        let mut cancellation_future = self.cancellation_future.take().unwrap_or_else(|| {
+            let cancel_token = self.cancel_token.clone();
+            Box::pin(async move { cancel_token.cancelled().await })
+        });
+
+        if cancellation_future.poll_unpin(cx).is_ready() {
+            self.subscription_receiver_future = None;
+            true
+        } else {
+            self.cancellation_future = Some(cancellation_future);
+            false
+        }
+    }
+
+    /// Polls the subscription for any new event
+    fn poll_event(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<PaymentValue, Error>>> {
+        let (subscription_receiver_future, active_subscription) = (
+            self.subscription_receiver_future.take(),
+            self.active_subscription.take(),
+        );
+
+        if subscription_receiver_future.is_none() && active_subscription.is_none() {
+            // Unexpected state, we should have an in-flight future or the active_subscription to
+            // create the future to read an event
+            return Poll::Ready(Some(Err(Error::Internal)));
+        }
+
+        let mut receiver = subscription_receiver_future.unwrap_or_else(|| {
+            let mut subscription_receiver =
+                active_subscription.expect("active subscription object");
+
+            Box::pin(async move {
+                let mut futures: FuturesUnordered<_> = subscription_receiver
+                    .iter_mut()
+                    .map(|sub| sub.recv())
+                    .collect();
+
+                if let Some(Some(winner)) = futures.next().await {
+                    drop(futures);
+                    return (Some(winner), subscription_receiver);
+                }
+
+                drop(futures);
+                (None, subscription_receiver)
+            })
+        });
+
+        match receiver.poll_unpin(cx) {
+            Poll::Pending => {
+                self.subscription_receiver_future = Some(receiver);
+                Poll::Pending
+            }
+            Poll::Ready((notification, subscription)) => {
+                tracing::debug!("Receive payment notification {:?}", notification);
+                // This future is now fulfilled, put the active_subscription again back to object. Next time next().await is called,
+                // the future will be created in subscription_receiver_future.
+                self.active_subscription = Some(subscription);
+                self.cancellation_future = None; // resets timeout
+                match notification {
+                    None => {
+                        self.is_finalized = true;
+                        Poll::Ready(None)
+                    }
+                    Some(info) => {
+                        match info {
+                            NotificationPayload::MintQuoteBolt11Response(info) => {
+                                if info.state == MintQuoteState::Paid {
+                                    self.is_finalized = true;
+                                    return Poll::Ready(Some(Ok((info.quote, None))));
+                                }
+                            }
+                            NotificationPayload::MintQuoteBolt12Response(info) => {
+                                let to_be_issued = info.amount_paid - info.amount_issued;
+                                if to_be_issued > Amount::ZERO {
+                                    return Poll::Ready(Some(Ok((info.quote, Some(to_be_issued)))));
+                                }
+                            }
+                            NotificationPayload::MeltQuoteBolt11Response(info) => {
+                                if info.state == MeltQuoteState::Paid {
+                                    self.is_finalized = true;
+                                    return Poll::Ready(Some(Ok((info.quote, None))));
+                                }
+                            }
+                            _ => {}
+                        }
+
+                        // We got an event but it is not what was expected, we need to call `recv`
+                        // again, and to copy-paste this is a recursive call that should be resolved
+                        // to a Poll::Pending *but* will trigger the future execution
+                        self.poll_event(cx)
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl Stream for PaymentStream<'_> {
+    type Item = Result<PaymentValue, Error>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        if this.is_finalized {
+            // end of stream
+            return Poll::Ready(None);
+        }
+
+        if this.poll_cancel(cx) {
+            return Poll::Ready(None);
+        }
+
+        if this.poll_init_subscription(cx).is_some() {
+            return Poll::Pending;
+        }
+
+        this.poll_event(cx)
+    }
+}

+ 185 - 0
crates/cdk/src/wallet/streams/proof.rs

@@ -0,0 +1,185 @@
+//! Mint Stream
+//!
+//! This will mint after a mint quote has been paid. If the quote is for a Bolt12 it will keep minting until a timeout is reached.
+//!
+//! Bolt11 will mint once
+
+use std::collections::HashMap;
+use std::task::Poll;
+
+use cdk_common::amount::SplitTarget;
+use cdk_common::wallet::MintQuote;
+use cdk_common::{Error, PaymentMethod, Proofs, SpendingConditions};
+use futures::{FutureExt, Stream, StreamExt};
+use tokio_util::sync::CancellationToken;
+
+use super::payment::PaymentStream;
+use super::{RecvFuture, WaitableEvent};
+use crate::Wallet;
+
+/// Proofs for many mint quotes, as they are minted, in streams
+pub struct MultipleMintQuoteProofStream<'a> {
+    payment_stream: PaymentStream<'a>,
+    wallet: &'a Wallet,
+    quotes: HashMap<String, MintQuote>,
+    amount_split_target: SplitTarget,
+    spending_conditions: Option<SpendingConditions>,
+    minting_future: Option<RecvFuture<'a, Result<(MintQuote, Proofs), Error>>>,
+}
+
+impl<'a> MultipleMintQuoteProofStream<'a> {
+    /// Create a new Stream
+    pub fn new(
+        wallet: &'a Wallet,
+        quotes: Vec<MintQuote>,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+    ) -> Self {
+        let filter: WaitableEvent = quotes.as_slice().into();
+
+        Self {
+            payment_stream: PaymentStream::new(wallet, filter.into_subscription()),
+            wallet,
+            amount_split_target,
+            spending_conditions,
+            quotes: quotes
+                .into_iter()
+                .map(|mint_quote| (mint_quote.id.clone(), mint_quote))
+                .collect(),
+            minting_future: None,
+        }
+    }
+
+    /// Get cancellation token
+    pub fn get_cancel_token(&self) -> CancellationToken {
+        self.payment_stream.get_cancel_token()
+    }
+}
+
+impl Stream for MultipleMintQuoteProofStream<'_> {
+    type Item = Result<(MintQuote, Proofs), Error>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        if let Some(mut minting_future) = this.minting_future.take() {
+            return match minting_future.poll_unpin(cx) {
+                Poll::Pending => {
+                    this.minting_future = Some(minting_future);
+                    Poll::Pending
+                }
+                Poll::Ready(proofs) => Poll::Ready(Some(proofs)),
+            };
+        }
+
+        match this.payment_stream.poll_next_unpin(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(result) => match result {
+                None => Poll::Ready(None),
+                Some(result) => {
+                    let (quote_id, amount) = match result {
+                        Err(err) => {
+                            tracing::error!(
+                                "Error while waiting for payment for {:?}",
+                                this.quotes.keys().collect::<Vec<_>>()
+                            );
+                            return Poll::Ready(Some(Err(err)));
+                        }
+                        Ok(amount) => amount,
+                    };
+
+                    let mint_quote = if let Some(quote) = this.quotes.get(&quote_id) {
+                        quote.clone()
+                    } else {
+                        tracing::error!("Cannot find mint_quote {} internally", quote_id);
+                        return Poll::Ready(Some(Err(Error::UnknownQuote)));
+                    };
+
+                    let amount_split_target = this.amount_split_target.clone();
+                    let spending_conditions = this.spending_conditions.clone();
+                    let wallet = this.wallet;
+
+                    tracing::debug!(
+                        "Received payment ({:?}) notification for {}. Minting...",
+                        amount,
+                        mint_quote.id
+                    );
+
+                    let mut minting_future = Box::pin(async move {
+                        match mint_quote.payment_method {
+                            PaymentMethod::Bolt11 => wallet
+                                .mint(&mint_quote.id, amount_split_target, spending_conditions)
+                                .await
+                                .map(|proofs| (mint_quote, proofs)),
+                            PaymentMethod::Bolt12 => wallet
+                                .mint_bolt12(
+                                    &mint_quote.id,
+                                    amount,
+                                    amount_split_target,
+                                    spending_conditions,
+                                )
+                                .await
+                                .map(|proofs| (mint_quote, proofs)),
+                            _ => Err(Error::UnsupportedPaymentMethod),
+                        }
+                    });
+
+                    match minting_future.poll_unpin(cx) {
+                        Poll::Pending => {
+                            this.minting_future = Some(minting_future);
+                            Poll::Pending
+                        }
+                        Poll::Ready(result) => Poll::Ready(Some(result)),
+                    }
+                }
+            },
+        }
+    }
+}
+
+/// Proofs for a single mint quote
+pub struct SingleMintQuoteProofStream<'a>(MultipleMintQuoteProofStream<'a>);
+
+impl<'a> SingleMintQuoteProofStream<'a> {
+    /// Create a new Stream
+    pub fn new(
+        wallet: &'a Wallet,
+        quote: MintQuote,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+    ) -> Self {
+        Self(MultipleMintQuoteProofStream::new(
+            wallet,
+            vec![quote],
+            amount_split_target,
+            spending_conditions,
+        ))
+    }
+
+    /// Get cancellation token
+    pub fn get_cancel_token(&self) -> CancellationToken {
+        self.0.payment_stream.get_cancel_token()
+    }
+}
+
+impl Stream for SingleMintQuoteProofStream<'_> {
+    type Item = Result<Proofs, Error>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        match this.0.poll_next_unpin(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(result) => match result {
+                None => Poll::Ready(None),
+                Some(Err(err)) => Poll::Ready(Some(Err(err))),
+                Some(Ok((_, proofs))) => Poll::Ready(Some(Ok(proofs))),
+            },
+        }
+    }
+}

+ 50 - 0
crates/cdk/src/wallet/streams/wait.rs

@@ -0,0 +1,50 @@
+use cdk_common::amount::SplitTarget;
+use cdk_common::wallet::MintQuote;
+use cdk_common::{Amount, Error, Proofs, SpendingConditions};
+use futures::future::BoxFuture;
+use futures::StreamExt;
+use tokio::time::{timeout, Duration};
+
+use super::Wallet;
+
+impl Wallet {
+    #[inline(always)]
+    /// Mints a mint quote once it is paid
+    pub async fn wait_and_mint_quote(
+        &self,
+        quote: MintQuote,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+        timeout_duration: Duration,
+    ) -> Result<Proofs, Error> {
+        let mut stream = self.proof_stream(quote, amount_split_target, spending_conditions);
+
+        timeout(timeout_duration, async move {
+            stream.next().await.ok_or(Error::Internal)?
+        })
+        .await
+        .map_err(|_| Error::Timeout)?
+    }
+
+    /// Returns a BoxFuture that will wait for payment on the given event with a timeout check
+    #[allow(private_bounds)]
+    pub fn wait_for_payment(
+        &self,
+        event: &MintQuote,
+        timeout_duration: Duration,
+    ) -> BoxFuture<'_, Result<Option<Amount>, Error>> {
+        let mut stream = self.payment_stream(event);
+
+        Box::pin(async move {
+            timeout(timeout_duration, async {
+                stream
+                    .next()
+                    .await
+                    .ok_or(Error::Internal)?
+                    .map(|(_quote, amount)| amount)
+            })
+            .await
+            .map_err(|_| Error::Timeout)?
+        })
+    }
+}

+ 0 - 119
crates/cdk/src/wallet/wait.rs

@@ -1,119 +0,0 @@
-use cdk_common::amount::SplitTarget;
-use cdk_common::wallet::{MeltQuote, MintQuote};
-use cdk_common::{
-    Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload, PaymentMethod, Proofs,
-    SpendingConditions,
-};
-use futures::future::BoxFuture;
-use tokio::time::{timeout, Duration};
-
-use super::{Wallet, WalletSubscription};
-
-#[allow(private_bounds)]
-#[allow(clippy::enum_variant_names)]
-enum WaitableEvent {
-    MeltQuote(String),
-    MintQuote(String),
-    Bolt12MintQuote(String),
-}
-
-impl From<&MeltQuote> for WaitableEvent {
-    fn from(event: &MeltQuote) -> Self {
-        WaitableEvent::MeltQuote(event.id.to_owned())
-    }
-}
-
-impl From<&MintQuote> for WaitableEvent {
-    fn from(event: &MintQuote) -> Self {
-        match event.payment_method {
-            PaymentMethod::Bolt11 => WaitableEvent::MintQuote(event.id.to_owned()),
-            PaymentMethod::Bolt12 => WaitableEvent::Bolt12MintQuote(event.id.to_owned()),
-            PaymentMethod::Custom(_) => WaitableEvent::MintQuote(event.id.to_owned()),
-        }
-    }
-}
-
-impl From<WaitableEvent> for WalletSubscription {
-    fn from(val: WaitableEvent) -> Self {
-        match val {
-            WaitableEvent::MeltQuote(quote_id) => {
-                WalletSubscription::Bolt11MeltQuoteState(vec![quote_id])
-            }
-            WaitableEvent::MintQuote(quote_id) => {
-                WalletSubscription::Bolt11MintQuoteState(vec![quote_id])
-            }
-            WaitableEvent::Bolt12MintQuote(quote_id) => {
-                WalletSubscription::Bolt12MintQuoteState(vec![quote_id])
-            }
-        }
-    }
-}
-
-impl Wallet {
-    #[inline(always)]
-    /// Mints a mint quote once it is paid
-    pub async fn wait_and_mint_quote(
-        &self,
-        quote: MintQuote,
-        amount_split_target: SplitTarget,
-        spending_conditions: Option<SpendingConditions>,
-        timeout_duration: Duration,
-    ) -> Result<Proofs, Error> {
-        let amount = self.wait_for_payment(&quote, timeout_duration).await?;
-
-        tracing::debug!("Received payment notification for {}. Minting...", quote.id);
-
-        match quote.payment_method {
-            PaymentMethod::Bolt11 => {
-                self.mint(&quote.id, amount_split_target, spending_conditions)
-                    .await
-            }
-            PaymentMethod::Bolt12 => {
-                self.mint_bolt12(&quote.id, amount, amount_split_target, spending_conditions)
-                    .await
-            }
-            _ => Err(Error::UnsupportedPaymentMethod),
-        }
-    }
-
-    /// Returns a BoxFuture that will wait for payment on the given event with a timeout check
-    #[allow(private_bounds)]
-    pub fn wait_for_payment<T>(
-        &self,
-        event: T,
-        timeout_duration: Duration,
-    ) -> BoxFuture<'_, Result<Option<Amount>, Error>>
-    where
-        T: Into<WaitableEvent>,
-    {
-        let subs = self.subscribe::<WalletSubscription>(event.into().into());
-
-        Box::pin(async move {
-            timeout(timeout_duration, async {
-                let mut subscription = subs.await;
-                loop {
-                    match subscription.recv().await.ok_or(Error::Internal)? {
-                        NotificationPayload::MintQuoteBolt11Response(info) => {
-                            if info.state == MintQuoteState::Paid {
-                                return Ok(None);
-                            }
-                        }
-                        NotificationPayload::MintQuoteBolt12Response(info) => {
-                            if info.amount_paid - info.amount_issued > Amount::ZERO {
-                                return Ok(Some(info.amount_paid - info.amount_issued));
-                            }
-                        }
-                        NotificationPayload::MeltQuoteBolt11Response(info) => {
-                            if info.state == MeltQuoteState::Paid {
-                                return Ok(None);
-                            }
-                        }
-                        _ => {}
-                    }
-                }
-            })
-            .await
-            .map_err(|_| Error::Timeout)?
-        })
-    }
-}