Browse Source

Introduce Future Streams for Payments and Minting Proofs

Introduce Future Streams (`ProofStream`, `PaymentStream`) for Payments and
Proofs, an easier to use interface, async friendly, to interact for the mint
waiting for payments of mints for Bolt11 and Bolt12.
Cesar Rodas 2 months ago
parent
commit
0597c017f7

+ 16 - 20
crates/cdk-cli/src/sub_commands/mint.rs

@@ -7,7 +7,7 @@ use cdk::mint_url::MintUrl;
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::{CurrencyUnit, PaymentMethod};
 use cdk::wallet::MultiMintWallet;
-use cdk::Amount;
+use cdk::{Amount, StreamExt};
 use clap::Args;
 use serde::{Deserialize, Serialize};
 
@@ -96,26 +96,22 @@ pub async fn mint(
 
     let mut amount_minted = Amount::ZERO;
 
-    loop {
-        let proofs = wallet
-            .wait_and_mint_quote(
-                quote.clone(),
-                SplitTarget::default(),
-                None,
-                Duration::from_secs(sub_command_args.wait_duration),
-            )
-            .await?;
-
+    let mut proof_streams = wallet.proof_stream(
+        quote,
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(sub_command_args.wait_duration),
+    );
+
+    while let Some(proofs) = proof_streams.next().await {
+        let proofs = match proofs {
+            Ok(proofs) => proofs,
+            Err(err) => {
+                tracing::error!("Proof streams ended with {:?}", err);
+                break;
+            }
+        };
         amount_minted += proofs.total_amount()?;
-
-        if sub_command_args.quote_id.is_none() || quote.payment_method == PaymentMethod::Bolt11 {
-            break;
-        } else {
-            println!(
-                "Minted {} waiting for next payment.",
-                proofs.total_amount()?
-            );
-        }
     }
 
     println!("Received {amount_minted} from mint {mint_url}");

+ 10 - 7
crates/cdk-integration-tests/src/init_pure_tests.rs

@@ -23,7 +23,7 @@ use cdk::nuts::{
 use cdk::types::{FeeReserve, QuoteTTL};
 use cdk::util::unix_time;
 use cdk::wallet::{AuthWallet, MintConnector, Wallet, WalletBuilder};
-use cdk::{Amount, Error, Mint};
+use cdk::{Amount, Error, Mint, StreamExt};
 use cdk_fake_wallet::FakeWallet;
 use tokio::sync::RwLock;
 use tracing_subscriber::EnvFilter;
@@ -361,12 +361,15 @@ pub async fn fund_wallet(
     let desired_amount = Amount::from(amount);
     let quote = wallet.mint_quote(desired_amount, None).await?;
 
-    wallet
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await?;
-
     Ok(wallet
-        .mint(&quote.id, split_target.unwrap_or_default(), None)
-        .await?
+        .proof_stream(
+            quote,
+            split_target.unwrap_or_default(),
+            None,
+            Duration::from_secs(60),
+        )
+        .next()
+        .await
+        .expect("proofs")?
         .total_amount()?)
 }

+ 5 - 8
crates/cdk-integration-tests/src/lib.rs

@@ -24,7 +24,7 @@ use anyhow::{anyhow, bail, Result};
 use cashu::Bolt11Invoice;
 use cdk::amount::{Amount, SplitTarget};
 use cdk::nuts::State;
-use cdk::Wallet;
+use cdk::{StreamExt, Wallet};
 use cdk_fake_wallet::create_fake_invoice;
 use init_regtest::{get_lnd_dir, LND_RPC_ADDR};
 use ln_regtest_rs::ln_client::{LightningClient, LndClient};
@@ -42,15 +42,12 @@ pub async fn fund_wallet(wallet: Arc<Wallet>, amount: Amount) {
         .await
         .expect("Could not get mint quote");
 
-    wallet
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await
-        .expect("wait for mint failed");
-
     let _proofs = wallet
-        .mint(&quote.id, SplitTarget::default(), None)
+        .proof_stream(quote, SplitTarget::default(), None, Duration::from_secs(60))
+        .next()
         .await
-        .expect("Could not mint");
+        .expect("proofs")
+        .expect("proofs with no error");
 }
 
 pub fn get_mint_url_from_env() -> String {

+ 34 - 41
crates/cdk-integration-tests/tests/bolt12.rs

@@ -9,6 +9,7 @@ use cashu::amount::SplitTarget;
 use cashu::nut23::Amountless;
 use cashu::{Amount, CurrencyUnit, MintRequest, PreMintSecrets, ProofsMethods};
 use cdk::wallet::{HttpClient, MintConnector, Wallet};
+use cdk::StreamExt;
 use cdk_integration_tests::get_mint_url_from_env;
 use cdk_integration_tests::init_regtest::{get_cln_dir, get_temp_dir};
 use cdk_sqlite::wallet::memory;
@@ -116,16 +117,14 @@ async fn test_regtest_bolt12_mint_multiple() -> Result<()> {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await?;
-
-    wallet.mint_bolt12_quote_state(&mint_quote.id).await?;
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None)
-        .await
-        .unwrap();
+    let proofs = proof_streams.next().await.expect("payment")?;
 
     assert_eq!(proofs.total_amount().unwrap(), 10.into());
 
@@ -134,16 +133,7 @@ async fn test_regtest_bolt12_mint_multiple() -> Result<()> {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await?;
-
-    wallet.mint_bolt12_quote_state(&mint_quote.id).await?;
-
-    let proofs = wallet
-        .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None)
-        .await
-        .unwrap();
+    let proofs = proof_streams.next().await.expect("payment")?;
 
     assert_eq!(proofs.total_amount().unwrap(), 11.into());
 
@@ -187,13 +177,14 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> {
         .pay_bolt12_offer(None, quote_one.request.clone())
         .await?;
 
-    wallet_one
-        .wait_for_payment(&quote_one, Duration::from_secs(60))
-        .await?;
+    let mut proof_streams_one = wallet_one.proof_stream(
+        quote_one.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs_one = wallet_one
-        .mint_bolt12(&quote_one.id, None, SplitTarget::default(), None)
-        .await?;
+    let proofs_one = proof_streams_one.next().await.expect("payment")?;
 
     assert_eq!(proofs_one.total_amount()?, 10_000.into());
 
@@ -205,13 +196,15 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> {
         .pay_bolt12_offer(None, quote_two.request.clone())
         .await?;
 
-    wallet_two
-        .wait_for_payment(&quote_two, Duration::from_secs(60))
-        .await?;
+    let mut proof_streams_two = wallet_two.proof_stream(
+        quote_two.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
+
+    let proofs_two = proof_streams_two.next().await.expect("payment")?;
 
-    let proofs_two = wallet_two
-        .mint_bolt12(&quote_two.id, None, SplitTarget::default(), None)
-        .await?;
     assert_eq!(proofs_two.total_amount()?, 15_000.into());
 
     let offer = cln_client
@@ -281,18 +274,18 @@ async fn test_regtest_bolt12_melt() -> Result<()> {
         .await?;
 
     // Wait for payment to be processed
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await?;
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
     let offer = cln_client
         .get_bolt12_offer(Some(10_000), true, "hhhhhhhh".to_string())
         .await?;
 
-    let _proofs = wallet
-        .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None)
-        .await
-        .unwrap();
+    let _proofs = proof_streams.next().await.expect("payment")?;
 
     let quote = wallet.melt_bolt12_quote(offer.to_string(), None).await?;
 
@@ -340,9 +333,9 @@ async fn test_regtest_bolt12_mint_extra() -> Result<()> {
         .pay_bolt12_offer(Some(pay_amount_msats), mint_quote.request.clone())
         .await?;
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(10))
-        .await?;
+    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+
+    let _ = payment_streams.next().await;
 
     let state = wallet.mint_bolt12_quote_state(&mint_quote.id).await?;
 

+ 13 - 9
crates/cdk-integration-tests/tests/fake_auth.rs

@@ -14,7 +14,7 @@ use cdk::nuts::{
     SwapRequest,
 };
 use cdk::wallet::{AuthHttpClient, AuthMintConnector, HttpClient, MintConnector, WalletBuilder};
-use cdk::{Error, OidcClient};
+use cdk::{Error, OidcClient, StreamExt};
 use cdk_fake_wallet::create_fake_invoice;
 use cdk_integration_tests::fund_wallet;
 use cdk_sqlite::wallet::memory;
@@ -331,15 +331,19 @@ async fn test_mint_with_auth() {
     let mint_amount: Amount = 100.into();
 
     let quote = wallet.mint_quote(mint_amount, None).await.unwrap();
-    let proofs = wallet
-        .wait_and_mint_quote(
-            quote,
-            Default::default(),
-            Default::default(),
-            Duration::from_secs(10),
-        )
+
+    let mut proof_streams = wallet.proof_stream(
+        quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
+
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     assert!(proofs.total_amount().expect("Could not get proofs amount") == mint_amount);
 }

+ 208 - 138
crates/cdk-integration-tests/tests/fake_wallet.rs

@@ -27,6 +27,7 @@ use cdk::nuts::{
 };
 use cdk::wallet::types::TransactionDirection;
 use cdk::wallet::{HttpClient, MintConnector, Wallet};
+use cdk::StreamExt;
 use cdk_fake_wallet::{create_fake_invoice, FakeInvoiceDescription};
 use cdk_integration_tests::attempt_to_swap_pending;
 use cdk_sqlite::wallet::memory;
@@ -47,15 +48,18 @@ async fn test_fake_tokens_pending() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Pending,
@@ -90,15 +94,18 @@ async fn test_fake_melt_payment_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Unknown,
@@ -156,15 +163,18 @@ async fn test_fake_melt_payment_fail_and_check() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Unknown,
@@ -205,15 +215,18 @@ async fn test_fake_melt_payment_return_fail_status() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Failed,
@@ -269,15 +282,18 @@ async fn test_fake_melt_payment_error_unknown() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Failed,
@@ -333,15 +349,18 @@ async fn test_fake_melt_payment_err_paid() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let fake_description = FakeInvoiceDescription {
         pay_invoice_state: MeltQuoteState::Failed,
@@ -375,15 +394,18 @@ async fn test_fake_melt_change_in_quote() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let transaction = wallet
         .list_transactions(Some(TransactionDirection::Incoming))
@@ -445,15 +467,18 @@ async fn test_fake_mint_with_witness() {
     .expect("failed to create new wallet");
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let mint_amount = proofs.total_amount().unwrap();
 
@@ -474,10 +499,13 @@ async fn test_fake_mint_without_witness() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let http_client = HttpClient::new(MINT_URL.parse().unwrap(), None);
 
@@ -515,10 +543,13 @@ async fn test_fake_mint_with_wrong_witness() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let http_client = HttpClient::new(MINT_URL.parse().unwrap(), None);
 
@@ -562,10 +593,13 @@ async fn test_fake_mint_inflated() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -621,10 +655,13 @@ async fn test_fake_mint_multiple_units() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
+    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+
+    payment_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -701,15 +738,18 @@ async fn test_fake_mint_multiple_unit_swap() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let wallet_usd = Wallet::new(
         MINT_URL,
@@ -723,15 +763,18 @@ async fn test_fake_mint_multiple_unit_swap() {
 
     let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet_usd.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let usd_proofs = wallet_usd
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let usd_proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -817,15 +860,18 @@ async fn test_fake_mint_multiple_unit_melt() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     println!("Minted sat");
 
@@ -841,15 +887,18 @@ async fn test_fake_mint_multiple_unit_melt() {
     let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap();
     println!("Minted quote usd");
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet_usd.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let usd_proofs = wallet_usd
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let usd_proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     {
         let inputs: Proofs = vec![
@@ -937,15 +986,18 @@ async fn test_fake_mint_input_output_mismatch() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let wallet_usd = Wallet::new(
         MINT_URL,
@@ -996,15 +1048,19 @@ async fn test_fake_mint_swap_inflated() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
+
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
     let pre_mint =
         PreMintSecrets::random(active_keyset_id, 101.into(), &SplitTarget::None).unwrap();
@@ -1041,15 +1097,19 @@ async fn test_fake_mint_swap_spend_after_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
+
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
     let pre_mint =
@@ -1113,15 +1173,19 @@ async fn test_fake_mint_melt_spend_after_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
+
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
     let pre_mint =
@@ -1186,15 +1250,18 @@ async fn test_fake_mint_duplicate_proofs_swap() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
 
@@ -1267,15 +1334,18 @@ async fn test_fake_mint_duplicate_proofs_melt() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::None, None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let inputs = vec![proofs[0].clone(), proofs[0].clone()];
 

+ 50 - 35
crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs

@@ -109,15 +109,18 @@ async fn test_happy_mint_melt_round_trip() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let mint_amount = proofs.total_amount().unwrap();
 
@@ -233,15 +236,18 @@ async fn test_happy_mint() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let mint_amount = proofs.total_amount().unwrap();
 
@@ -281,15 +287,18 @@ async fn test_restore() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     assert_eq!(wallet.total_balance().await.unwrap(), 100.into());
 
@@ -361,15 +370,18 @@ async fn test_fake_melt_change_in_quote() {
 
     pay_if_regtest(&get_test_temp_dir(), &bolt11).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let invoice = create_invoice_for_env(&get_test_temp_dir(), Some(9))
         .await
@@ -433,15 +445,18 @@ async fn test_pay_invoice_twice() {
         .await
         .unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let proofs = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let mint_amount = proofs.total_amount().unwrap();
 

+ 48 - 32
crates/cdk-integration-tests/tests/regtest.rs

@@ -26,6 +26,7 @@ use cdk::nuts::{
     NotificationPayload, PreMintSecrets,
 };
 use cdk::wallet::{HttpClient, MintConnector, Wallet, WalletSubscription};
+use cdk::StreamExt;
 use cdk_integration_tests::init_regtest::{get_lnd_dir, LND_RPC_ADDR};
 use cdk_integration_tests::{get_mint_url_from_env, get_second_mint_url_from_env};
 use cdk_sqlite::wallet::{self, memory};
@@ -86,15 +87,18 @@ async fn test_internal_payment() {
         .await
         .expect("failed to pay invoice");
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     assert!(wallet.total_balance().await.unwrap() == 100.into());
 
@@ -118,15 +122,18 @@ async fn test_internal_payment() {
 
     let _melted = wallet.melt(&melt.id).await.unwrap();
 
-    wallet_2
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet_2.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _wallet_2_mint = wallet_2
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     // let check_paid = match get_mint_port("0") {
     //     8085 => {
@@ -261,28 +268,38 @@ async fn test_multimint_melt() {
         .pay_invoice(quote.request.clone())
         .await
         .expect("failed to pay invoice");
-    wallet1
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-    wallet1
-        .mint(&quote.id, SplitTarget::default(), None)
+
+    let mut proof_streams = wallet1.proof_stream(
+        quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
+
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let quote = wallet2.mint_quote(mint_amount, None).await.unwrap();
     lnd_client
         .pay_invoice(quote.request.clone())
         .await
         .expect("failed to pay invoice");
-    wallet2
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await
-        .unwrap();
-    wallet2
-        .mint(&quote.id, SplitTarget::default(), None)
+
+    let mut proof_streams = wallet2.proof_stream(
+        quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
+
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     // Get an invoice
     let invoice = lnd_client.create_invoice(Some(50)).await.unwrap();
@@ -336,10 +353,9 @@ async fn test_cached_mint() {
         .await
         .expect("failed to pay invoice");
 
-    wallet
-        .wait_for_payment(&quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut payment_streams = wallet.payment_stream(&quote, Duration::from_secs(60));
+
+    let _ = payment_streams.next().await;
 
     let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id;
     let http_client = HttpClient::new(get_mint_url_from_env().parse().unwrap(), None);

+ 21 - 14
crates/cdk-integration-tests/tests/test_fees.rs

@@ -7,6 +7,7 @@ use cashu::{Bolt11Invoice, ProofsMethods};
 use cdk::amount::{Amount, SplitTarget};
 use cdk::nuts::CurrencyUnit;
 use cdk::wallet::{ReceiveOptions, SendKind, SendOptions, Wallet};
+use cdk::StreamExt;
 use cdk_integration_tests::init_regtest::get_temp_dir;
 use cdk_integration_tests::{create_invoice_for_env, get_mint_url_from_env, pay_if_regtest};
 use cdk_sqlite::wallet::memory;
@@ -28,15 +29,18 @@ async fn test_swap() {
     let invoice = Bolt11Invoice::from_str(&mint_quote.request).unwrap();
     pay_if_regtest(&get_temp_dir(), &invoice).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let proofs: Vec<Amount> = wallet
         .get_unspent_proofs()
@@ -96,15 +100,18 @@ async fn test_fake_melt_change_in_quote() {
 
     pay_if_regtest(&get_temp_dir(), &bolt11).await.unwrap();
 
-    wallet
-        .wait_for_payment(&mint_quote, Duration::from_secs(60))
-        .await
-        .unwrap();
+    let mut proof_streams = wallet.proof_stream(
+        mint_quote.clone(),
+        SplitTarget::default(),
+        None,
+        Duration::from_secs(60),
+    );
 
-    let _mint_amount = wallet
-        .mint(&mint_quote.id, SplitTarget::default(), None)
+    let _proofs = proof_streams
+        .next()
         .await
-        .unwrap();
+        .expect("payment")
+        .expect("no error");
 
     let invoice_amount = 9;
 

+ 5 - 2
crates/cdk/examples/auth_wallet.rs

@@ -8,6 +8,7 @@ use cdk::{Amount, OidcClient};
 use cdk_common::amount::SplitTarget;
 use cdk_common::{MintInfo, ProofsMethods};
 use cdk_sqlite::wallet::memory;
+use futures::StreamExt;
 use rand::Rng;
 use tracing_subscriber::EnvFilter;
 
@@ -60,9 +61,11 @@ async fn main() -> Result<(), Error> {
 
     let quote = wallet.mint_quote(amount, None).await.unwrap();
     let proofs = wallet
-        .wait_and_mint_quote(quote, SplitTarget::default(), None, Duration::from_secs(10))
+        .proof_stream(quote, SplitTarget::default(), None, Duration::from_secs(10))
+        .next()
         .await
-        .unwrap();
+        .expect("Some payment")
+        .expect("No error");
 
     println!("Received: {}", proofs.total_amount()?);
 

+ 41 - 38
crates/cdk/examples/melt-token.rs

@@ -8,7 +8,7 @@ use cdk::error::Error;
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::{CurrencyUnit, SecretKey};
 use cdk::wallet::Wallet;
-use cdk::Amount;
+use cdk::{Amount, StreamExt};
 use cdk_sqlite::wallet::memory;
 use lightning_invoice::{Currency, InvoiceBuilder, PaymentSecret};
 use rand::Rng;
@@ -30,48 +30,51 @@ async fn main() -> Result<(), Error> {
     let wallet = Wallet::new(mint_url, unit, Arc::new(localstore), seed, None)?;
 
     let quote = wallet.mint_quote(amount, None).await?;
-    let proofs = wallet
-        .wait_and_mint_quote(
-            quote,
-            Default::default(),
-            Default::default(),
-            Duration::from_secs(10),
-        )
-        .await?;
 
-    let receive_amount = proofs.total_amount()?;
-    println!("Received {} from mint {}", receive_amount, mint_url);
+    let mut proof_streams = wallet.proof_stream(
+        quote,
+        Default::default(),
+        Default::default(),
+        Duration::from_secs(10),
+    );
+
+    while let Some(proofs) = proof_streams.next().await {
+        let receive_amount = proofs?.total_amount()?;
+        println!("Received {} from mint {}", receive_amount, mint_url);
 
-    // Now melt what we have
-    // We need to prepare a lightning invoice
-    let private_key = SecretKey::from_slice(
-        &<[u8; 32]>::from_hex("e126f68f7eafcc8b74f54d269fe206be715000f94dac067d1c04a8ca3b2db734")
+        // Now melt what we have
+        // We need to prepare a lightning invoice
+        let private_key = SecretKey::from_slice(
+            &<[u8; 32]>::from_hex(
+                "e126f68f7eafcc8b74f54d269fe206be715000f94dac067d1c04a8ca3b2db734",
+            )
             .unwrap(),
-    )
-    .unwrap();
-    let random_bytes = rand::rng().random::<[u8; 32]>();
-    let payment_hash = sha256::Hash::from_slice(&random_bytes).unwrap();
-    let payment_secret = PaymentSecret([42u8; 32]);
-    let invoice_to_be_paid = InvoiceBuilder::new(Currency::Bitcoin)
-        .amount_milli_satoshis(5 * 1000)
-        .description("Pay me".into())
-        .payment_hash(payment_hash)
-        .payment_secret(payment_secret)
-        .current_timestamp()
-        .min_final_cltv_expiry_delta(144)
-        .build_signed(|hash| Secp256k1::new().sign_ecdsa_recoverable(hash, &private_key))
-        .unwrap()
-        .to_string();
-    println!("Invoice to be paid: {}", invoice_to_be_paid);
+        )
+        .unwrap();
+        let random_bytes = rand::rng().random::<[u8; 32]>();
+        let payment_hash = sha256::Hash::from_slice(&random_bytes).unwrap();
+        let payment_secret = PaymentSecret([42u8; 32]);
+        let invoice_to_be_paid = InvoiceBuilder::new(Currency::Bitcoin)
+            .amount_milli_satoshis(5 * 1000)
+            .description("Pay me".into())
+            .payment_hash(payment_hash)
+            .payment_secret(payment_secret)
+            .current_timestamp()
+            .min_final_cltv_expiry_delta(144)
+            .build_signed(|hash| Secp256k1::new().sign_ecdsa_recoverable(hash, &private_key))
+            .unwrap()
+            .to_string();
+        println!("Invoice to be paid: {}", invoice_to_be_paid);
 
-    let melt_quote = wallet.melt_quote(invoice_to_be_paid, None).await?;
-    println!(
-        "Melt quote: {} {} {:?}",
-        melt_quote.amount, melt_quote.state, melt_quote,
-    );
+        let melt_quote = wallet.melt_quote(invoice_to_be_paid, None).await?;
+        println!(
+            "Melt quote: {} {} {:?}",
+            melt_quote.amount, melt_quote.state, melt_quote,
+        );
 
-    let melted = wallet.melt(&melt_quote.id).await?;
-    println!("Melted: {:?}", melted);
+        let melted = wallet.melt(&melt_quote.id).await?;
+        println!("Melted: {:?}", melted);
+    }
 
     Ok(())
 }

+ 27 - 19
crates/cdk/examples/mint-token.rs

@@ -5,7 +5,7 @@ use cdk::error::Error;
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::CurrencyUnit;
 use cdk::wallet::{SendOptions, Wallet};
-use cdk::Amount;
+use cdk::{Amount, StreamExt};
 use cdk_sqlite::wallet::memory;
 use rand::random;
 use tracing_subscriber::EnvFilter;
@@ -36,24 +36,32 @@ async fn main() -> Result<(), Error> {
     let wallet = Wallet::new(mint_url, unit, localstore, seed, None)?;
 
     let quote = wallet.mint_quote(amount, None).await?;
-    let proofs = wallet
-        .wait_and_mint_quote(
-            quote,
-            Default::default(),
-            Default::default(),
-            Duration::from_secs(10),
-        )
-        .await?;
-
-    // Mint the received amount
-    let receive_amount = proofs.total_amount()?;
-    println!("Received {} from mint {}", receive_amount, mint_url);
-
-    // Send a token with the specified amount
-    let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?;
-    let token = prepared_send.confirm(None).await?;
-    println!("Token:");
-    println!("{}", token);
+    let mut proof_streams = wallet.proof_stream(
+        quote,
+        Default::default(),
+        Default::default(),
+        Duration::from_secs(10),
+    );
+
+    while let Some(proofs) = proof_streams.next().await {
+        let proofs = match proofs {
+            Ok(proofs) => proofs,
+            Err(err) => {
+                tracing::error!("Proof stream ended with {:?}", err);
+                break;
+            }
+        };
+
+        // Mint the received amount
+        let receive_amount = proofs.total_amount()?;
+        println!("Received {} from mint {}", receive_amount, mint_url);
+
+        // Send a token with the specified amount
+        let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?;
+        let token = prepared_send.confirm(None).await?;
+        println!("Token:");
+        println!("{}", token);
+    }
 
     Ok(())
 }

+ 56 - 53
crates/cdk/examples/p2pk.rs

@@ -4,7 +4,7 @@ use std::time::Duration;
 use cdk::error::Error;
 use cdk::nuts::{CurrencyUnit, SecretKey, SpendingConditions};
 use cdk::wallet::{ReceiveOptions, SendOptions, Wallet};
-use cdk::Amount;
+use cdk::{Amount, StreamExt};
 use cdk_sqlite::wallet::memory;
 use rand::random;
 use tracing_subscriber::EnvFilter;
@@ -35,60 +35,63 @@ async fn main() -> Result<(), Error> {
     let wallet = Wallet::new(mint_url, unit, localstore, seed, None).unwrap();
 
     let quote = wallet.mint_quote(amount, None).await?;
-    let proofs = wallet
-        .wait_and_mint_quote(
-            quote,
-            Default::default(),
-            Default::default(),
-            Duration::from_secs(10),
-        )
-        .await?;
-
-    // Mint the received amount
-    println!(
-        "Minted nuts: {:?}",
-        proofs.into_iter().map(|p| p.amount).collect::<Vec<_>>()
+
+    let mut proof_streams = wallet.proof_stream(
+        quote,
+        Default::default(),
+        Default::default(),
+        Duration::from_secs(10),
     );
 
-    // Generate a secret key for spending conditions
-    let secret = SecretKey::generate();
-
-    // Create spending conditions using the generated public key
-    let spending_conditions = SpendingConditions::new_p2pk(secret.public_key(), None);
-
-    // Get the total balance of the wallet
-    let bal = wallet.total_balance().await?;
-    println!("Total balance: {}", bal);
-
-    // Send a token with the specified amount and spending conditions
-    let prepared_send = wallet
-        .prepare_send(
-            10.into(),
-            SendOptions {
-                conditions: Some(spending_conditions),
-                include_fee: true,
-                ..Default::default()
-            },
-        )
-        .await?;
-    println!("Fee: {}", prepared_send.fee());
-    let token = prepared_send.confirm(None).await?;
-
-    println!("Created token locked to pubkey: {}", secret.public_key());
-    println!("{}", token);
-
-    // Receive the token using the secret key
-    let amount = wallet
-        .receive(
-            &token.to_string(),
-            ReceiveOptions {
-                p2pk_signing_keys: vec![secret],
-                ..Default::default()
-            },
-        )
-        .await?;
-
-    println!("Redeemed locked token worth: {}", u64::from(amount));
+    while let Some(proofs) = proof_streams.next().await {
+        let proofs = proofs?;
+
+        // Mint the received amount
+        println!(
+            "Minted nuts: {:?}",
+            proofs.into_iter().map(|p| p.amount).collect::<Vec<_>>()
+        );
+
+        // Generate a secret key for spending conditions
+        let secret = SecretKey::generate();
+
+        // Create spending conditions using the generated public key
+        let spending_conditions = SpendingConditions::new_p2pk(secret.public_key(), None);
+
+        // Get the total balance of the wallet
+        let bal = wallet.total_balance().await?;
+        println!("Total balance: {}", bal);
+
+        // Send a token with the specified amount and spending conditions
+        let prepared_send = wallet
+            .prepare_send(
+                10.into(),
+                SendOptions {
+                    conditions: Some(spending_conditions),
+                    include_fee: true,
+                    ..Default::default()
+                },
+            )
+            .await?;
+        println!("Fee: {}", prepared_send.fee());
+        let token = prepared_send.confirm(None).await?;
+
+        println!("Created token locked to pubkey: {}", secret.public_key());
+        println!("{}", token);
+
+        // Receive the token using the secret key
+        let amount = wallet
+            .receive(
+                &token.to_string(),
+                ReceiveOptions {
+                    p2pk_signing_keys: vec![secret],
+                    ..Default::default()
+                },
+            )
+            .await?;
+
+        println!("Redeemed locked token worth: {}", u64::from(amount));
+    }
 
     Ok(())
 }

+ 13 - 12
crates/cdk/examples/proof-selection.rs

@@ -7,7 +7,7 @@ use std::time::Duration;
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::CurrencyUnit;
 use cdk::wallet::Wallet;
-use cdk::Amount;
+use cdk::{Amount, StreamExt};
 use cdk_common::nut02::KeySetInfosMethods;
 use cdk_sqlite::wallet::memory;
 use rand::random;
@@ -32,18 +32,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         let amount = Amount::from(amount);
 
         let quote = wallet.mint_quote(amount, None).await?;
-        let proofs = wallet
-            .wait_and_mint_quote(
-                quote,
-                Default::default(),
-                Default::default(),
-                Duration::from_secs(10),
-            )
-            .await?;
 
-        // Mint the received amount
-        let receive_amount = proofs.total_amount()?;
-        println!("Minted {}", receive_amount);
+        let mut proof_streams = wallet.proof_stream(
+            quote,
+            Default::default(),
+            Default::default(),
+            Duration::from_secs(10),
+        );
+
+        while let Some(proofs) = proof_streams.next().await {
+            // Mint the received amount
+            let receive_amount = proofs?.total_amount()?;
+            println!("Minted {}", receive_amount);
+        }
     }
 
     // Get unspent proofs

+ 20 - 19
crates/cdk/examples/wallet.rs

@@ -4,7 +4,7 @@ use std::time::Duration;
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::CurrencyUnit;
 use cdk::wallet::{SendOptions, Wallet};
-use cdk::Amount;
+use cdk::{Amount, StreamExt};
 use cdk_sqlite::wallet::memory;
 use rand::random;
 
@@ -25,24 +25,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let wallet = Wallet::new(mint_url, unit, localstore, seed, None)?;
 
     let quote = wallet.mint_quote(amount, None).await?;
-    let proofs = wallet
-        .wait_and_mint_quote(
-            quote,
-            Default::default(),
-            Default::default(),
-            Duration::from_secs(10),
-        )
-        .await?;
-
-    // Mint the received amount
-    let receive_amount = proofs.total_amount()?;
-    println!("Minted {}", receive_amount);
-
-    // Send the token
-    let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?;
-    let token = prepared_send.confirm(None).await?;
-
-    println!("{}", token);
+
+    let mut proof_streams = wallet.proof_stream(
+        quote,
+        Default::default(),
+        Default::default(),
+        Duration::from_secs(10),
+    );
+
+    while let Some(proofs) = proof_streams.next().await {
+        // Mint the received amount
+        let receive_amount = proofs?.total_amount()?;
+        println!("Minted {}", receive_amount);
+
+        // Send the token
+        let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?;
+        let token = prepared_send.confirm(None).await?;
+
+        println!("{}", token);
+    }
 
     Ok(())
 }

+ 4 - 0
crates/cdk/src/lib.rs

@@ -64,3 +64,7 @@ pub use self::wallet::HttpClient;
 /// Result
 #[doc(hidden)]
 pub type Result<T, E = Box<dyn std::error::Error>> = std::result::Result<T, E>;
+
+/// Re-export futures::Stream
+#[cfg(any(feature = "wallet", feature = "mint"))]
+pub use futures::{Stream, StreamExt};

+ 1 - 1
crates/cdk/src/wallet/mod.rs

@@ -42,11 +42,11 @@ pub mod multi_mint_wallet;
 mod proofs;
 mod receive;
 mod send;
+mod streams;
 pub mod subscription;
 mod swap;
 mod transactions;
 pub mod util;
-mod wait;
 
 #[cfg(feature = "auth")]
 pub use auth::{AuthMintConnector, AuthWallet};

+ 91 - 0
crates/cdk/src/wallet/streams/mod.rs

@@ -0,0 +1,91 @@
+//! Wallet waiter APIs
+use std::future::Future;
+use std::pin::Pin;
+
+use cdk_common::amount::SplitTarget;
+use cdk_common::wallet::{MeltQuote, MintQuote};
+use cdk_common::{PaymentMethod, SpendingConditions};
+use payment::PaymentStream;
+use proof::ProofStream;
+use tokio::time::Duration;
+
+use super::{Wallet, WalletSubscription};
+
+pub mod payment;
+pub mod proof;
+
+/// Shared type
+#[cfg(not(target_arch = "wasm32"))]
+type RecvFuture<'a, Ret> = Pin<Box<dyn Future<Output = Ret> + Send + 'a>>;
+
+#[cfg(target_arch = "wasm32")]
+type RecvFuture<'a, Ret> = Pin<Box<dyn Future<Output = Ret> + 'a>>;
+
+#[allow(private_bounds)]
+#[allow(clippy::enum_variant_names)]
+enum WaitableEvent {
+    MeltQuote(String),
+    MintQuote(String),
+    Bolt12MintQuote(String),
+}
+
+impl From<&MeltQuote> for WaitableEvent {
+    fn from(event: &MeltQuote) -> Self {
+        WaitableEvent::MeltQuote(event.id.to_owned())
+    }
+}
+
+impl From<&MintQuote> for WaitableEvent {
+    fn from(event: &MintQuote) -> Self {
+        match event.payment_method {
+            PaymentMethod::Bolt11 => WaitableEvent::MintQuote(event.id.to_owned()),
+            PaymentMethod::Bolt12 => WaitableEvent::Bolt12MintQuote(event.id.to_owned()),
+            PaymentMethod::Custom(_) => WaitableEvent::MintQuote(event.id.to_owned()),
+        }
+    }
+}
+
+impl From<WaitableEvent> for WalletSubscription {
+    fn from(val: WaitableEvent) -> Self {
+        match val {
+            WaitableEvent::MeltQuote(quote_id) => {
+                WalletSubscription::Bolt11MeltQuoteState(vec![quote_id])
+            }
+            WaitableEvent::MintQuote(quote_id) => {
+                WalletSubscription::Bolt11MintQuoteState(vec![quote_id])
+            }
+            WaitableEvent::Bolt12MintQuote(quote_id) => {
+                WalletSubscription::Bolt12MintQuoteState(vec![quote_id])
+            }
+        }
+    }
+}
+
+impl Wallet {
+    #[inline(always)]
+    /// Mints a mint quote once it is paid
+    pub fn proof_stream(
+        &self,
+        quote: MintQuote,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+        timeout_duration: Duration,
+    ) -> ProofStream<'_> {
+        ProofStream::new(
+            self,
+            quote,
+            amount_split_target,
+            spending_conditions,
+            timeout_duration,
+        )
+    }
+
+    /// Returns a BoxFuture that will wait for payment on the given event with a timeout check
+    #[allow(private_bounds)]
+    pub fn payment_stream<T>(&self, event: T, timeout: Duration) -> PaymentStream<'_>
+    where
+        T: Into<WaitableEvent>,
+    {
+        PaymentStream::new(self, event.into().into(), timeout)
+    }
+}

+ 185 - 0
crates/cdk/src/wallet/streams/payment.rs

@@ -0,0 +1,185 @@
+//! Payment Stream
+//!
+//! This future Stream will wait events for a Mint Quote be paid. If it is for a Bolt12 it will not stop
+//! but it will eventually error on a Timeout.
+//!
+//! Bolt11 will emit a single event.
+use std::pin::Pin;
+use std::task::Poll;
+
+use cdk_common::{Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload};
+use futures::{FutureExt, Stream};
+use tokio::time::{sleep, Duration, Sleep};
+
+use super::RecvFuture;
+use crate::wallet::subscription::ActiveSubscription;
+use crate::{Wallet, WalletSubscription};
+
+/// PaymentWaiter
+pub struct PaymentStream<'a> {
+    wallet: Option<(&'a Wallet, WalletSubscription)>,
+    is_finalized: bool,
+    active_subscription: Option<ActiveSubscription>,
+    timeout: Duration,
+
+    // Future events
+    subscriber_future: Option<RecvFuture<'a, ActiveSubscription>>,
+    subscription_receiver_future:
+        Option<RecvFuture<'static, (Option<NotificationPayload<String>>, ActiveSubscription)>>,
+    timeout_future: Option<Pin<Box<Sleep>>>,
+}
+
+impl<'a> PaymentStream<'a> {
+    /// Creates a new instance of the
+    pub fn new(wallet: &'a Wallet, filter: WalletSubscription, timeout: Duration) -> Self {
+        Self {
+            wallet: Some((wallet, filter)),
+            is_finalized: false,
+            active_subscription: None,
+            timeout,
+            subscriber_future: None,
+            subscription_receiver_future: None,
+            timeout_future: None,
+        }
+    }
+
+    /// Creating a wallet subscription is an async event, this may change in the future, but for now,
+    /// creating a new Subscription should be polled, as any other async event. This function will
+    /// return None if the subscription is already active, Some(()) otherwise
+    fn poll_init_subscription(&mut self, cx: &mut std::task::Context<'_>) -> Option<()> {
+        if let Some((wallet, filter)) = self.wallet.take() {
+            self.subscriber_future = Some(Box::pin(async move { wallet.subscribe(filter).await }));
+        }
+
+        let mut subscriber_future = self.subscriber_future.take()?;
+
+        match subscriber_future.poll_unpin(cx) {
+            Poll::Pending => {
+                self.subscriber_future = Some(subscriber_future);
+                Some(())
+            }
+            Poll::Ready(active_subscription) => {
+                self.active_subscription = Some(active_subscription);
+                None
+            }
+        }
+    }
+
+    /// Checks if the timeout has been reached, or starts a new timer that will be executed if no
+    /// event is produced before
+    ///
+    /// When an event is produced this timeout is dropped and it starts again whenever the new event
+    /// is polled.
+    fn poll_timeout(&mut self, cx: &mut std::task::Context<'_>) -> bool {
+        let mut timeout = self
+            .timeout_future
+            .take()
+            .unwrap_or_else(|| Box::pin(sleep(self.timeout)));
+
+        if timeout.poll_unpin(cx).is_ready() {
+            self.subscription_receiver_future = None;
+            true
+        } else {
+            self.timeout_future = Some(timeout);
+            false
+        }
+    }
+
+    /// Polls the subscription for any new event
+    fn poll_event(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<Option<Amount>, Error>>> {
+        let (subscription_receiver_future, active_subscription) = (
+            self.subscription_receiver_future.take(),
+            self.active_subscription.take(),
+        );
+
+        if subscription_receiver_future.is_none() && active_subscription.is_none() {
+            // Unexpected state, we should have an in-flight future or the active_subscription to
+            // create the future to read an event
+            return Poll::Ready(Some(Err(Error::Internal)));
+        }
+
+        let mut receiver = subscription_receiver_future.unwrap_or_else(|| {
+            let mut subscription_receiver =
+                active_subscription.expect("active subscription object");
+
+            Box::pin(async move { (subscription_receiver.recv().await, subscription_receiver) })
+        });
+
+        match receiver.poll_unpin(cx) {
+            Poll::Pending => {
+                self.subscription_receiver_future = Some(receiver);
+                Poll::Pending
+            }
+            Poll::Ready((notification, subscription)) => {
+                tracing::debug!("Receive payment notification {:?}", notification);
+                // This future is now fulfilled, put the active_subscription again back to object. Next time next().await is called,
+                // the future will be created in subscription_receiver_future.
+                self.active_subscription = Some(subscription);
+                self.timeout_future = None; // resets timeout
+                match notification {
+                    None => {
+                        self.is_finalized = true;
+                        Poll::Ready(None)
+                    }
+                    Some(info) => {
+                        match info {
+                            NotificationPayload::MintQuoteBolt11Response(info) => {
+                                if info.state == MintQuoteState::Paid {
+                                    self.is_finalized = true;
+                                    return Poll::Ready(Some(Ok(None)));
+                                }
+                            }
+                            NotificationPayload::MintQuoteBolt12Response(info) => {
+                                let to_be_issued = info.amount_paid - info.amount_issued;
+                                if to_be_issued > Amount::ZERO {
+                                    return Poll::Ready(Some(Ok(Some(to_be_issued))));
+                                }
+                            }
+                            NotificationPayload::MeltQuoteBolt11Response(info) => {
+                                if info.state == MeltQuoteState::Paid {
+                                    self.is_finalized = true;
+                                    return Poll::Ready(Some(Ok(None)));
+                                }
+                            }
+                            _ => {}
+                        }
+
+                        // We got an event but it is not what was expected, we need to call `recv`
+                        // again, and to copy-paste this is a recursive call that should be resolved
+                        // to a Poll::Pending *but* will trigger the future execution
+                        self.poll_event(cx)
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl Stream for PaymentStream<'_> {
+    type Item = Result<Option<Amount>, Error>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        if this.is_finalized {
+            // end of stream
+            return Poll::Ready(None);
+        }
+
+        if this.poll_timeout(cx) {
+            return Poll::Ready(Some(Err(Error::Timeout)));
+        }
+
+        if this.poll_init_subscription(cx).is_some() {
+            return Poll::Pending;
+        }
+
+        this.poll_event(cx)
+    }
+}

+ 128 - 0
crates/cdk/src/wallet/streams/proof.rs

@@ -0,0 +1,128 @@
+//! Mint Stream
+//!
+//! This will mint after a mint quote has been paid. If the quote is for a Bolt12 it will keep minting until a timeout is reached.
+//!
+//! Bolt11 will mint once
+
+use std::task::Poll;
+use std::time::Duration;
+
+use cdk_common::amount::SplitTarget;
+use cdk_common::wallet::MintQuote;
+use cdk_common::{Error, PaymentMethod, Proofs, SpendingConditions};
+use futures::{FutureExt, Stream, StreamExt};
+
+use super::payment::PaymentStream;
+use super::{RecvFuture, WaitableEvent};
+use crate::Wallet;
+
+/// Mint waiter
+pub struct ProofStream<'a> {
+    payment_stream: PaymentStream<'a>,
+    wallet: &'a Wallet,
+    mint_quote: MintQuote,
+    amount_split_target: SplitTarget,
+    spending_conditions: Option<SpendingConditions>,
+    minting_future: Option<RecvFuture<'a, Result<Proofs, Error>>>,
+}
+
+impl<'a> ProofStream<'a> {
+    /// Create a new Stream
+    pub fn new(
+        wallet: &'a Wallet,
+        mint_quote: MintQuote,
+        amount_split_target: SplitTarget,
+        spending_conditions: Option<SpendingConditions>,
+        timeout: Duration,
+    ) -> Self {
+        let filter: WaitableEvent = (&mint_quote).into();
+        Self {
+            payment_stream: PaymentStream::new(wallet, filter.into(), timeout),
+            wallet,
+            amount_split_target,
+            spending_conditions,
+            mint_quote,
+            minting_future: None,
+        }
+    }
+}
+
+impl Stream for ProofStream<'_> {
+    type Item = Result<Proofs, Error>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        if let Some(mut minting_future) = this.minting_future.take() {
+            return match minting_future.poll_unpin(cx) {
+                Poll::Pending => {
+                    this.minting_future = Some(minting_future);
+                    Poll::Pending
+                }
+                Poll::Ready(proofs) => Poll::Ready(Some(proofs)),
+            };
+        }
+
+        match this.payment_stream.poll_next_unpin(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(result) => match result {
+                None => Poll::Ready(None),
+                Some(result) => {
+                    let amount = match result {
+                        Err(err) => {
+                            tracing::error!(
+                                "Error while waiting for payment for {}",
+                                this.mint_quote.id
+                            );
+                            return Poll::Ready(Some(Err(err)));
+                        }
+                        Ok(amount) => amount,
+                    };
+
+                    let mint_quote = this.mint_quote.clone();
+                    let amount_split_target = this.amount_split_target.clone();
+                    let spending_conditions = this.spending_conditions.clone();
+                    let wallet = this.wallet;
+
+                    tracing::debug!(
+                        "Received payment ({:?}) notification for {}. Minting...",
+                        amount,
+                        mint_quote.id
+                    );
+
+                    let mut minting_future = Box::pin(async move {
+                        match mint_quote.payment_method {
+                            PaymentMethod::Bolt11 => {
+                                wallet
+                                    .mint(&mint_quote.id, amount_split_target, spending_conditions)
+                                    .await
+                            }
+                            PaymentMethod::Bolt12 => {
+                                wallet
+                                    .mint_bolt12(
+                                        &mint_quote.id,
+                                        amount,
+                                        amount_split_target,
+                                        spending_conditions,
+                                    )
+                                    .await
+                            }
+                            _ => Err(Error::UnsupportedPaymentMethod),
+                        }
+                    });
+
+                    match minting_future.poll_unpin(cx) {
+                        Poll::Pending => {
+                            this.minting_future = Some(minting_future);
+                            Poll::Pending
+                        }
+                        Poll::Ready(proofs) => Poll::Ready(Some(proofs)),
+                    }
+                }
+            },
+        }
+    }
+}

+ 0 - 119
crates/cdk/src/wallet/wait.rs

@@ -1,119 +0,0 @@
-use cdk_common::amount::SplitTarget;
-use cdk_common::wallet::{MeltQuote, MintQuote};
-use cdk_common::{
-    Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload, PaymentMethod, Proofs,
-    SpendingConditions,
-};
-use futures::future::BoxFuture;
-use tokio::time::{timeout, Duration};
-
-use super::{Wallet, WalletSubscription};
-
-#[allow(private_bounds)]
-#[allow(clippy::enum_variant_names)]
-enum WaitableEvent {
-    MeltQuote(String),
-    MintQuote(String),
-    Bolt12MintQuote(String),
-}
-
-impl From<&MeltQuote> for WaitableEvent {
-    fn from(event: &MeltQuote) -> Self {
-        WaitableEvent::MeltQuote(event.id.to_owned())
-    }
-}
-
-impl From<&MintQuote> for WaitableEvent {
-    fn from(event: &MintQuote) -> Self {
-        match event.payment_method {
-            PaymentMethod::Bolt11 => WaitableEvent::MintQuote(event.id.to_owned()),
-            PaymentMethod::Bolt12 => WaitableEvent::Bolt12MintQuote(event.id.to_owned()),
-            PaymentMethod::Custom(_) => WaitableEvent::MintQuote(event.id.to_owned()),
-        }
-    }
-}
-
-impl From<WaitableEvent> for WalletSubscription {
-    fn from(val: WaitableEvent) -> Self {
-        match val {
-            WaitableEvent::MeltQuote(quote_id) => {
-                WalletSubscription::Bolt11MeltQuoteState(vec![quote_id])
-            }
-            WaitableEvent::MintQuote(quote_id) => {
-                WalletSubscription::Bolt11MintQuoteState(vec![quote_id])
-            }
-            WaitableEvent::Bolt12MintQuote(quote_id) => {
-                WalletSubscription::Bolt12MintQuoteState(vec![quote_id])
-            }
-        }
-    }
-}
-
-impl Wallet {
-    #[inline(always)]
-    /// Mints a mint quote once it is paid
-    pub async fn wait_and_mint_quote(
-        &self,
-        quote: MintQuote,
-        amount_split_target: SplitTarget,
-        spending_conditions: Option<SpendingConditions>,
-        timeout_duration: Duration,
-    ) -> Result<Proofs, Error> {
-        let amount = self.wait_for_payment(&quote, timeout_duration).await?;
-
-        tracing::debug!("Received payment notification for {}. Minting...", quote.id);
-
-        match quote.payment_method {
-            PaymentMethod::Bolt11 => {
-                self.mint(&quote.id, amount_split_target, spending_conditions)
-                    .await
-            }
-            PaymentMethod::Bolt12 => {
-                self.mint_bolt12(&quote.id, amount, amount_split_target, spending_conditions)
-                    .await
-            }
-            _ => Err(Error::UnsupportedPaymentMethod),
-        }
-    }
-
-    /// Returns a BoxFuture that will wait for payment on the given event with a timeout check
-    #[allow(private_bounds)]
-    pub fn wait_for_payment<T>(
-        &self,
-        event: T,
-        timeout_duration: Duration,
-    ) -> BoxFuture<'_, Result<Option<Amount>, Error>>
-    where
-        T: Into<WaitableEvent>,
-    {
-        let subs = self.subscribe::<WalletSubscription>(event.into().into());
-
-        Box::pin(async move {
-            timeout(timeout_duration, async {
-                let mut subscription = subs.await;
-                loop {
-                    match subscription.recv().await.ok_or(Error::Internal)? {
-                        NotificationPayload::MintQuoteBolt11Response(info) => {
-                            if info.state == MintQuoteState::Paid {
-                                return Ok(None);
-                            }
-                        }
-                        NotificationPayload::MintQuoteBolt12Response(info) => {
-                            if info.amount_paid - info.amount_issued > Amount::ZERO {
-                                return Ok(Some(info.amount_paid - info.amount_issued));
-                            }
-                        }
-                        NotificationPayload::MeltQuoteBolt11Response(info) => {
-                            if info.state == MeltQuoteState::Paid {
-                                return Ok(None);
-                            }
-                        }
-                        _ => {}
-                    }
-                }
-            })
-            .await
-            .map_err(|_| Error::Timeout)?
-        })
-    }
-}