Bläddra i källkod

Introduce Cancel Token instead of timeout

Cesar Rodas 1 månad sedan
förälder
incheckning
8e4036fe7a

+ 1 - 7
crates/cdk-cli/src/sub_commands/mint.rs

@@ -1,5 +1,4 @@
 use std::str::FromStr;
-use std::time::Duration;
 
 use anyhow::{anyhow, Result};
 use cdk::amount::SplitTarget;
@@ -96,12 +95,7 @@ pub async fn mint(
 
     let mut amount_minted = Amount::ZERO;
 
-    let mut proof_streams = wallet.proof_stream(
-        quote,
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(sub_command_args.wait_duration),
-    );
+    let mut proof_streams = wallet.proof_stream(quote, SplitTarget::default(), None);
 
     while let Some(proofs) = proof_streams.next().await {
         let proofs = match proofs {

+ 0 - 4
crates/cdk-common/src/error.rs

@@ -111,10 +111,6 @@ pub enum Error {
     #[error("Could not parse bolt12")]
     Bolt12parse,
 
-    /// Operation timeout
-    #[error("Operation timeout")]
-    Timeout,
-
     /// BIP353 address parsing error
     #[error("Failed to parse BIP353 address: {0}")]
     Bip353Parse(String),

+ 1 - 7
crates/cdk-integration-tests/src/init_pure_tests.rs

@@ -3,7 +3,6 @@ use std::fmt::{Debug, Formatter};
 use std::path::PathBuf;
 use std::str::FromStr;
 use std::sync::Arc;
-use std::time::Duration;
 use std::{env, fs};
 
 use anyhow::{anyhow, bail, Result};
@@ -362,12 +361,7 @@ pub async fn fund_wallet(
     let quote = wallet.mint_quote(desired_amount, None).await?;
 
     Ok(wallet
-        .proof_stream(
-            quote,
-            split_target.unwrap_or_default(),
-            None,
-            Duration::from_secs(60),
-        )
+        .proof_stream(quote, split_target.unwrap_or_default(), None)
         .next()
         .await
         .expect("proofs")?

+ 1 - 2
crates/cdk-integration-tests/src/lib.rs

@@ -28,7 +28,6 @@ use cdk::{StreamExt, Wallet};
 use cdk_fake_wallet::create_fake_invoice;
 use init_regtest::{get_lnd_dir, LND_RPC_ADDR};
 use ln_regtest_rs::ln_client::{LightningClient, LndClient};
-use tokio::time::Duration;
 
 pub mod cli;
 pub mod init_auth_mint;
@@ -43,7 +42,7 @@ pub async fn fund_wallet(wallet: Arc<Wallet>, amount: Amount) {
         .expect("Could not get mint quote");
 
     let _proofs = wallet
-        .proof_stream(quote, SplitTarget::default(), None, Duration::from_secs(60))
+        .proof_stream(quote, SplitTarget::default(), None)
         .next()
         .await
         .expect("proofs")

+ 7 - 26
crates/cdk-integration-tests/tests/bolt12.rs

@@ -1,7 +1,6 @@
 use std::env;
 use std::path::PathBuf;
 use std::sync::Arc;
-use std::time::Duration;
 
 use anyhow::{bail, Result};
 use bip39::Mnemonic;
@@ -117,12 +116,7 @@ async fn test_regtest_bolt12_mint_multiple() -> Result<()> {
         .await
         .unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams.next().await.expect("payment")?;
 
@@ -177,12 +171,8 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> {
         .pay_bolt12_offer(None, quote_one.request.clone())
         .await?;
 
-    let mut proof_streams_one = wallet_one.proof_stream(
-        quote_one.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams_one =
+        wallet_one.proof_stream(quote_one.clone(), SplitTarget::default(), None);
 
     let proofs_one = proof_streams_one.next().await.expect("payment")?;
 
@@ -196,12 +186,8 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> {
         .pay_bolt12_offer(None, quote_two.request.clone())
         .await?;
 
-    let mut proof_streams_two = wallet_two.proof_stream(
-        quote_two.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams_two =
+        wallet_two.proof_stream(quote_two.clone(), SplitTarget::default(), None);
 
     let proofs_two = proof_streams_two.next().await.expect("payment")?;
 
@@ -274,12 +260,7 @@ async fn test_regtest_bolt12_melt() -> Result<()> {
         .await?;
 
     // Wait for payment to be processed
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let offer = cln_client
         .get_bolt12_offer(Some(10_000), true, "hhhhhhhh".to_string())
@@ -333,7 +314,7 @@ async fn test_regtest_bolt12_mint_extra() -> Result<()> {
         .pay_bolt12_offer(Some(pay_amount_msats), mint_quote.request.clone())
         .await?;
 
-    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
 
     let _ = payment_streams.next().await;
 

+ 1 - 7
crates/cdk-integration-tests/tests/fake_auth.rs

@@ -1,7 +1,6 @@
 use std::env;
 use std::str::FromStr;
 use std::sync::Arc;
-use std::time::Duration;
 
 use bip39::Mnemonic;
 use cashu::{MintAuthRequest, MintInfo};
@@ -332,12 +331,7 @@ async fn test_mint_with_auth() {
 
     let quote = wallet.mint_quote(mint_amount, None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()

+ 24 - 113
crates/cdk-integration-tests/tests/fake_wallet.rs

@@ -15,7 +15,6 @@
 //! - Duplicate proof detection
 
 use std::sync::Arc;
-use std::time::Duration;
 
 use bip39::Mnemonic;
 use cashu::Amount;
@@ -48,12 +47,7 @@ async fn test_fake_tokens_pending() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -94,12 +88,7 @@ async fn test_fake_melt_payment_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -163,12 +152,7 @@ async fn test_fake_melt_payment_fail_and_check() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -215,12 +199,7 @@ async fn test_fake_melt_payment_return_fail_status() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -282,12 +261,7 @@ async fn test_fake_melt_payment_error_unknown() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -349,12 +323,7 @@ async fn test_fake_melt_payment_err_paid() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -394,12 +363,7 @@ async fn test_fake_melt_change_in_quote() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -467,12 +431,7 @@ async fn test_fake_mint_with_witness() {
     .expect("failed to create new wallet");
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -499,7 +458,7 @@ async fn test_fake_mint_without_witness() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
 
     payment_streams
         .next()
@@ -543,7 +502,7 @@ async fn test_fake_mint_with_wrong_witness() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
 
     payment_streams
         .next()
@@ -593,7 +552,7 @@ async fn test_fake_mint_inflated() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
 
     payment_streams
         .next()
@@ -655,7 +614,7 @@ async fn test_fake_mint_multiple_units() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut payment_streams = wallet.payment_stream(&mint_quote, Duration::from_secs(60));
+    let mut payment_streams = wallet.payment_stream(&mint_quote);
 
     payment_streams
         .next()
@@ -738,12 +697,7 @@ async fn test_fake_mint_multiple_unit_swap() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -763,12 +717,8 @@ async fn test_fake_mint_multiple_unit_swap() {
 
     let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet_usd.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams =
+        wallet_usd.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let usd_proofs = proof_streams
         .next()
@@ -860,12 +810,7 @@ async fn test_fake_mint_multiple_unit_melt() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -887,12 +832,8 @@ async fn test_fake_mint_multiple_unit_melt() {
     let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap();
     println!("Minted quote usd");
 
-    let mut proof_streams = wallet_usd.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams =
+        wallet_usd.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let usd_proofs = proof_streams
         .next()
@@ -986,12 +927,7 @@ async fn test_fake_mint_input_output_mismatch() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -1048,12 +984,7 @@ async fn test_fake_mint_swap_inflated() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -1097,12 +1028,7 @@ async fn test_fake_mint_swap_spend_after_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -1173,12 +1099,7 @@ async fn test_fake_mint_melt_spend_after_fail() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -1250,12 +1171,7 @@ async fn test_fake_mint_duplicate_proofs_swap() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -1334,12 +1250,7 @@ async fn test_fake_mint_duplicate_proofs_melt() {
 
     let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()

+ 5 - 30
crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs

@@ -109,12 +109,7 @@ async fn test_happy_mint_melt_round_trip() {
         .await
         .unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -236,12 +231,7 @@ async fn test_happy_mint() {
         .await
         .unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()
@@ -287,12 +277,7 @@ async fn test_restore() {
         .await
         .unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -370,12 +355,7 @@ async fn test_fake_melt_change_in_quote() {
 
     pay_if_regtest(&get_test_temp_dir(), &bolt11).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -445,12 +425,7 @@ async fn test_pay_invoice_twice() {
         .await
         .unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let proofs = proof_streams
         .next()

+ 5 - 25
crates/cdk-integration-tests/tests/regtest.rs

@@ -87,12 +87,7 @@ async fn test_internal_payment() {
         .await
         .expect("failed to pay invoice");
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -122,12 +117,7 @@ async fn test_internal_payment() {
 
     let _melted = wallet.melt(&melt.id).await.unwrap();
 
-    let mut proof_streams = wallet_2.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet_2.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -269,12 +259,7 @@ async fn test_multimint_melt() {
         .await
         .expect("failed to pay invoice");
 
-    let mut proof_streams = wallet1.proof_stream(
-        quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet1.proof_stream(quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -288,12 +273,7 @@ async fn test_multimint_melt() {
         .await
         .expect("failed to pay invoice");
 
-    let mut proof_streams = wallet2.proof_stream(
-        quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet2.proof_stream(quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -353,7 +333,7 @@ async fn test_cached_mint() {
         .await
         .expect("failed to pay invoice");
 
-    let mut payment_streams = wallet.payment_stream(&quote, Duration::from_secs(60));
+    let mut payment_streams = wallet.payment_stream(&quote);
 
     let _ = payment_streams.next().await;
 

+ 2 - 13
crates/cdk-integration-tests/tests/test_fees.rs

@@ -1,6 +1,5 @@
 use std::str::FromStr;
 use std::sync::Arc;
-use std::time::Duration;
 
 use bip39::Mnemonic;
 use cashu::{Bolt11Invoice, ProofsMethods};
@@ -29,12 +28,7 @@ async fn test_swap() {
     let invoice = Bolt11Invoice::from_str(&mint_quote.request).unwrap();
     pay_if_regtest(&get_temp_dir(), &invoice).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()
@@ -100,12 +94,7 @@ async fn test_fake_melt_change_in_quote() {
 
     pay_if_regtest(&get_temp_dir(), &bolt11).await.unwrap();
 
-    let mut proof_streams = wallet.proof_stream(
-        mint_quote.clone(),
-        SplitTarget::default(),
-        None,
-        Duration::from_secs(60),
-    );
+    let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None);
 
     let _proofs = proof_streams
         .next()

+ 1 - 0
crates/cdk/Cargo.toml

@@ -50,6 +50,7 @@ sync_wrapper = "0.1.2"
 bech32 = "0.9.1"
 arc-swap = "1.7.1"
 zeroize = "1"
+tokio-util.workspace = true
 
 [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
 tokio = { workspace = true, features = [

+ 1 - 2
crates/cdk/examples/auth_wallet.rs

@@ -1,5 +1,4 @@
 use std::sync::Arc;
-use std::time::Duration;
 
 use cdk::error::Error;
 use cdk::nuts::CurrencyUnit;
@@ -61,7 +60,7 @@ async fn main() -> Result<(), Error> {
 
     let quote = wallet.mint_quote(amount, None).await.unwrap();
     let proofs = wallet
-        .proof_stream(quote, SplitTarget::default(), None, Duration::from_secs(10))
+        .proof_stream(quote, SplitTarget::default(), None)
         .next()
         .await
         .expect("Some payment")

+ 1 - 7
crates/cdk/examples/melt-token.rs

@@ -1,5 +1,4 @@
 use std::sync::Arc;
-use std::time::Duration;
 
 use bitcoin::hashes::{sha256, Hash};
 use bitcoin::hex::prelude::FromHex;
@@ -31,12 +30,7 @@ async fn main() -> Result<(), Error> {
 
     let quote = wallet.mint_quote(amount, None).await?;
 
-    let mut proof_streams = wallet.proof_stream(
-        quote,
-        Default::default(),
-        Default::default(),
-        Duration::from_secs(10),
-    );
+    let mut proof_streams = wallet.proof_stream(quote, Default::default(), Default::default());
 
     while let Some(proofs) = proof_streams.next().await {
         let receive_amount = proofs?.total_amount()?;

+ 1 - 7
crates/cdk/examples/mint-token.rs

@@ -1,5 +1,4 @@
 use std::sync::Arc;
-use std::time::Duration;
 
 use cdk::error::Error;
 use cdk::nuts::nut00::ProofsMethods;
@@ -36,12 +35,7 @@ async fn main() -> Result<(), Error> {
     let wallet = Wallet::new(mint_url, unit, localstore, seed, None)?;
 
     let quote = wallet.mint_quote(amount, None).await?;
-    let mut proof_streams = wallet.proof_stream(
-        quote,
-        Default::default(),
-        Default::default(),
-        Duration::from_secs(10),
-    );
+    let mut proof_streams = wallet.proof_stream(quote, Default::default(), Default::default());
 
     while let Some(proofs) = proof_streams.next().await {
         let proofs = match proofs {

+ 1 - 7
crates/cdk/examples/p2pk.rs

@@ -1,5 +1,4 @@
 use std::sync::Arc;
-use std::time::Duration;
 
 use cdk::error::Error;
 use cdk::nuts::{CurrencyUnit, SecretKey, SpendingConditions};
@@ -36,12 +35,7 @@ async fn main() -> Result<(), Error> {
 
     let quote = wallet.mint_quote(amount, None).await?;
 
-    let mut proof_streams = wallet.proof_stream(
-        quote,
-        Default::default(),
-        Default::default(),
-        Duration::from_secs(10),
-    );
+    let mut proof_streams = wallet.proof_stream(quote, Default::default(), Default::default());
 
     while let Some(proofs) = proof_streams.next().await {
         let proofs = proofs?;

+ 1 - 7
crates/cdk/examples/proof-selection.rs

@@ -2,7 +2,6 @@
 
 use std::collections::HashMap;
 use std::sync::Arc;
-use std::time::Duration;
 
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::CurrencyUnit;
@@ -33,12 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
         let quote = wallet.mint_quote(amount, None).await?;
 
-        let mut proof_streams = wallet.proof_stream(
-            quote,
-            Default::default(),
-            Default::default(),
-            Duration::from_secs(10),
-        );
+        let mut proof_streams = wallet.proof_stream(quote, Default::default(), Default::default());
 
         while let Some(proofs) = proof_streams.next().await {
             // Mint the received amount

+ 1 - 7
crates/cdk/examples/wallet.rs

@@ -1,5 +1,4 @@
 use std::sync::Arc;
-use std::time::Duration;
 
 use cdk::nuts::nut00::ProofsMethods;
 use cdk::nuts::CurrencyUnit;
@@ -26,12 +25,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     let quote = wallet.mint_quote(amount, None).await?;
 
-    let mut proof_streams = wallet.proof_stream(
-        quote,
-        Default::default(),
-        Default::default(),
-        Duration::from_secs(10),
-    );
+    let mut proof_streams = wallet.proof_stream(quote, Default::default(), Default::default());
 
     while let Some(proofs) = proof_streams.next().await {
         // Mint the received amount

+ 3 - 11
crates/cdk/src/wallet/streams/mod.rs

@@ -7,7 +7,6 @@ use cdk_common::wallet::{MeltQuote, MintQuote};
 use cdk_common::{PaymentMethod, SpendingConditions};
 use payment::PaymentStream;
 use proof::ProofStream;
-use tokio::time::Duration;
 
 use super::{Wallet, WalletSubscription};
 
@@ -69,23 +68,16 @@ impl Wallet {
         quote: MintQuote,
         amount_split_target: SplitTarget,
         spending_conditions: Option<SpendingConditions>,
-        timeout_duration: Duration,
     ) -> ProofStream<'_> {
-        ProofStream::new(
-            self,
-            quote,
-            amount_split_target,
-            spending_conditions,
-            timeout_duration,
-        )
+        ProofStream::new(self, quote, amount_split_target, spending_conditions)
     }
 
     /// Returns a BoxFuture that will wait for payment on the given event with a timeout check
     #[allow(private_bounds)]
-    pub fn payment_stream<T>(&self, event: T, timeout: Duration) -> PaymentStream<'_>
+    pub fn payment_stream<T>(&self, event: T) -> PaymentStream<'_>
     where
         T: Into<WaitableEvent>,
     {
-        PaymentStream::new(self, event.into().into(), timeout)
+        PaymentStream::new(self, event.into().into())
     }
 }

+ 24 - 23
crates/cdk/src/wallet/streams/payment.rs

@@ -4,12 +4,11 @@
 //! but it will eventually error on a Timeout.
 //!
 //! Bolt11 will emit a single event.
-use std::pin::Pin;
 use std::task::Poll;
 
 use cdk_common::{Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload};
 use futures::{FutureExt, Stream};
-use tokio::time::{sleep, Duration, Sleep};
+use tokio_util::sync::CancellationToken;
 
 use super::RecvFuture;
 use crate::wallet::subscription::ActiveSubscription;
@@ -20,29 +19,35 @@ pub struct PaymentStream<'a> {
     wallet: Option<(&'a Wallet, WalletSubscription)>,
     is_finalized: bool,
     active_subscription: Option<ActiveSubscription>,
-    timeout: Duration,
+
+    cancel_token: CancellationToken,
 
     // Future events
     subscriber_future: Option<RecvFuture<'a, ActiveSubscription>>,
     subscription_receiver_future:
         Option<RecvFuture<'static, (Option<NotificationPayload<String>>, ActiveSubscription)>>,
-    timeout_future: Option<Pin<Box<Sleep>>>,
+    cancellation_future: Option<RecvFuture<'a, ()>>,
 }
 
 impl<'a> PaymentStream<'a> {
     /// Creates a new instance of the
-    pub fn new(wallet: &'a Wallet, filter: WalletSubscription, timeout: Duration) -> Self {
+    pub fn new(wallet: &'a Wallet, filter: WalletSubscription) -> Self {
         Self {
             wallet: Some((wallet, filter)),
             is_finalized: false,
             active_subscription: None,
-            timeout,
+            cancel_token: Default::default(),
             subscriber_future: None,
             subscription_receiver_future: None,
-            timeout_future: None,
+            cancellation_future: None,
         }
     }
 
+    /// Get cancellation token
+    pub fn get_cancel_token(&self) -> CancellationToken {
+        self.cancel_token.clone()
+    }
+
     /// Creating a wallet subscription is an async event, this may change in the future, but for now,
     /// creating a new Subscription should be polled, as any other async event. This function will
     /// return None if the subscription is already active, Some(()) otherwise
@@ -65,22 +70,18 @@ impl<'a> PaymentStream<'a> {
         }
     }
 
-    /// Checks if the timeout has been reached, or starts a new timer that will be executed if no
-    /// event is produced before
-    ///
-    /// When an event is produced this timeout is dropped and it starts again whenever the new event
-    /// is polled.
-    fn poll_timeout(&mut self, cx: &mut std::task::Context<'_>) -> bool {
-        let mut timeout = self
-            .timeout_future
-            .take()
-            .unwrap_or_else(|| Box::pin(sleep(self.timeout)));
-
-        if timeout.poll_unpin(cx).is_ready() {
+    /// Checks if the stream has been externally cancelled
+    fn poll_cancel(&mut self, cx: &mut std::task::Context<'_>) -> bool {
+        let mut cancellation_future = self.cancellation_future.take().unwrap_or_else(|| {
+            let cancel_token = self.cancel_token.clone();
+            Box::pin(async move { cancel_token.cancelled().await })
+        });
+
+        if cancellation_future.poll_unpin(cx).is_ready() {
             self.subscription_receiver_future = None;
             true
         } else {
-            self.timeout_future = Some(timeout);
+            self.cancellation_future = Some(cancellation_future);
             false
         }
     }
@@ -118,7 +119,7 @@ impl<'a> PaymentStream<'a> {
                 // This future is now fulfilled, put the active_subscription again back to object. Next time next().await is called,
                 // the future will be created in subscription_receiver_future.
                 self.active_subscription = Some(subscription);
-                self.timeout_future = None; // resets timeout
+                self.cancellation_future = None; // resets timeout
                 match notification {
                     None => {
                         self.is_finalized = true;
@@ -172,8 +173,8 @@ impl Stream for PaymentStream<'_> {
             return Poll::Ready(None);
         }
 
-        if this.poll_timeout(cx) {
-            return Poll::Ready(Some(Err(Error::Timeout)));
+        if this.poll_cancel(cx) {
+            return Poll::Ready(None);
         }
 
         if this.poll_init_subscription(cx).is_some() {

+ 7 - 3
crates/cdk/src/wallet/streams/proof.rs

@@ -5,12 +5,12 @@
 //! Bolt11 will mint once
 
 use std::task::Poll;
-use std::time::Duration;
 
 use cdk_common::amount::SplitTarget;
 use cdk_common::wallet::MintQuote;
 use cdk_common::{Error, PaymentMethod, Proofs, SpendingConditions};
 use futures::{FutureExt, Stream, StreamExt};
+use tokio_util::sync::CancellationToken;
 
 use super::payment::PaymentStream;
 use super::{RecvFuture, WaitableEvent};
@@ -33,11 +33,10 @@ impl<'a> ProofStream<'a> {
         mint_quote: MintQuote,
         amount_split_target: SplitTarget,
         spending_conditions: Option<SpendingConditions>,
-        timeout: Duration,
     ) -> Self {
         let filter: WaitableEvent = (&mint_quote).into();
         Self {
-            payment_stream: PaymentStream::new(wallet, filter.into(), timeout),
+            payment_stream: PaymentStream::new(wallet, filter.into()),
             wallet,
             amount_split_target,
             spending_conditions,
@@ -45,6 +44,11 @@ impl<'a> ProofStream<'a> {
             minting_future: None,
         }
     }
+
+    /// Get cancellation token
+    pub fn get_cancel_token(&self) -> CancellationToken {
+        self.payment_stream.get_cancel_token()
+    }
 }
 
 impl Stream for ProofStream<'_> {