Bladeren bron

Add WASM-compatible WebSocket client abstraction

Move WebSocket dependency (tokio-tungstenite) from cdk to cdk-http-client and
introduce a platform-agnostic ws module with native (tokio-tungstenite) and
WASM (ws_stream_wasm) backends. This enables WebSocket subscriptions on WASM
targets, which were previously gated behind cfg(not(wasm32)).

Flatten wallet subscription module by merging ws.rs into subscription.rs now
that the stream client is unconditionally compiled.
Cesar Rodas 1 week geleden
bovenliggende
commit
0e24b8c50c

+ 50 - 1
Cargo.lock

@@ -486,6 +486,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "async_io_stream"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c"
+dependencies = [
+ "futures",
+ "pharos",
+ "rustc_version",
+]
+
+[[package]]
 name = "asynchronous-codec"
 version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1220,7 +1231,6 @@ dependencies = [
  "tls-api",
  "tls-api-native-tls",
  "tokio",
- "tokio-tungstenite 0.26.2",
  "tokio-util",
  "tor-rtcompat",
  "tracing",
@@ -1386,6 +1396,7 @@ dependencies = [
 name = "cdk-http-client"
 version = "0.15.0-rc.0"
 dependencies = [
+ "futures",
  "mockito",
  "regex",
  "reqwest",
@@ -1393,7 +1404,10 @@ dependencies = [
  "serde_json",
  "thiserror 2.0.18",
  "tokio",
+ "tokio-tungstenite 0.26.2",
+ "tracing",
  "url",
+ "ws_stream_wasm",
 ]
 
 [[package]]
@@ -5124,6 +5138,16 @@ dependencies = [
 ]
 
 [[package]]
+name = "pharos"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414"
+dependencies = [
+ "futures",
+ "rustc_version",
+]
+
+[[package]]
 name = "phf"
 version = "0.11.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6519,6 +6543,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "send_wrapper"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
+
+[[package]]
 name = "separator"
 version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -9677,6 +9707,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9"
 
 [[package]]
+name = "ws_stream_wasm"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c173014acad22e83f16403ee360115b38846fe754e735c5d9d3803fe70c6abc"
+dependencies = [
+ "async_io_stream",
+ "futures",
+ "js-sys",
+ "log",
+ "pharos",
+ "rustc_version",
+ "send_wrapper",
+ "thiserror 2.0.18",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
+[[package]]
 name = "wyz"
 version = "0.5.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"

+ 60 - 10
Cargo.lock.msrv

@@ -189,7 +189,7 @@ version = "1.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
 dependencies = [
- "windows-sys 0.60.2",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -200,7 +200,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
 dependencies = [
  "anstyle",
  "once_cell_polyfill",
- "windows-sys 0.60.2",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -486,6 +486,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "async_io_stream"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c"
+dependencies = [
+ "futures",
+ "pharos",
+ "rustc_version",
+]
+
+[[package]]
 name = "asynchronous-codec"
 version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1221,7 +1232,6 @@ dependencies = [
  "tls-api",
  "tls-api-native-tls",
  "tokio",
- "tokio-tungstenite 0.26.2",
  "tokio-util",
  "tor-rtcompat",
  "tracing",
@@ -1386,6 +1396,7 @@ dependencies = [
 name = "cdk-http-client"
 version = "0.14.0"
 dependencies = [
+ "futures",
  "mockito",
  "regex",
  "reqwest",
@@ -1393,7 +1404,10 @@ dependencies = [
  "serde_json",
  "thiserror 2.0.18",
  "tokio",
+ "tokio-tungstenite 0.26.2",
+ "tracing",
  "url",
+ "ws_stream_wasm",
 ]
 
 [[package]]
@@ -1445,6 +1459,7 @@ version = "0.14.0"
 dependencies = [
  "async-trait",
  "axum 0.8.8",
+ "bip39",
  "cdk-common",
  "futures",
  "ldk-node",
@@ -1939,7 +1954,7 @@ version = "3.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34"
 dependencies = [
- "windows-sys 0.48.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -2534,7 +2549,7 @@ dependencies = [
  "libc",
  "option-ext",
  "redox_users 0.5.2",
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -2765,7 +2780,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
 dependencies = [
  "libc",
- "windows-sys 0.52.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -4706,7 +4721,7 @@ version = "0.50.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
 dependencies = [
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -5123,6 +5138,16 @@ dependencies = [
 ]
 
 [[package]]
+name = "pharos"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414"
+dependencies = [
+ "futures",
+ "rustc_version",
+]
+
+[[package]]
 name = "phf"
 version = "0.11.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6189,7 +6214,7 @@ dependencies = [
  "errno",
  "libc",
  "linux-raw-sys 0.11.0",
- "windows-sys 0.52.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -6518,6 +6543,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "send_wrapper"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
+
+[[package]]
 name = "separator"
 version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -7085,7 +7116,7 @@ dependencies = [
  "getrandom 0.3.4",
  "once_cell",
  "rustix 1.1.3",
- "windows-sys 0.52.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -9283,7 +9314,7 @@ version = "0.1.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
 dependencies = [
- "windows-sys 0.48.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
@@ -9676,6 +9707,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9"
 
 [[package]]
+name = "ws_stream_wasm"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c173014acad22e83f16403ee360115b38846fe754e735c5d9d3803fe70c6abc"
+dependencies = [
+ "async_io_stream",
+ "futures",
+ "js-sys",
+ "log",
+ "pharos",
+ "rustc_version",
+ "send_wrapper",
+ "thiserror 2.0.18",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
+[[package]]
 name = "wyz"
 version = "0.5.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"

+ 3 - 0
crates/cdk-common/src/lib.rs

@@ -45,6 +45,9 @@ pub use cashu::nuts::{self, *};
 #[cfg(feature = "mint")]
 pub use cashu::quote_id::{self, *};
 pub use cashu::{dhke, ensure_cdk, mint_url, secret, util, SECP256K1};
+/// Re-export cdk-http-client WebSocket client
+#[cfg(feature = "http")]
+pub use cdk_http_client::ws as ws_client;
 /// Re-export cdk-http-client types
 #[cfg(feature = "http")]
 pub use cdk_http_client::{

+ 6 - 0
crates/cdk-http-client/Cargo.toml

@@ -11,17 +11,23 @@ license.workspace = true
 readme = "README.md"
 
 [dependencies]
+futures.workspace = true
 serde.workspace = true
 serde_json.workspace = true
 thiserror.workspace = true
+tracing.workspace = true
 url.workspace = true
 
 [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
 reqwest = { workspace = true }
 regex = { workspace = true }
+tokio-tungstenite = { workspace = true, features = [
+    "rustls", "rustls-tls-native-roots", "connect"
+] }
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 reqwest = { version = "0.12", default-features = false, features = ["json"] }
+ws_stream_wasm = "0.7"
 
 [dev-dependencies]
 tokio = { workspace = true, features = ["rt", "macros"] }

+ 1 - 0
crates/cdk-http-client/src/lib.rs

@@ -24,6 +24,7 @@ mod client;
 mod error;
 mod request;
 mod response;
+pub mod ws;
 
 pub use client::{fetch, HttpClient, HttpClientBuilder};
 pub use error::HttpError;

+ 15 - 0
crates/cdk-http-client/src/ws/error.rs

@@ -0,0 +1,15 @@
+//! WebSocket error types
+
+/// Errors that can occur during WebSocket operations
+#[derive(Debug, thiserror::Error)]
+pub enum WsError {
+    /// Failed to establish a WebSocket connection
+    #[error("WebSocket connection error: {0}")]
+    Connection(String),
+    /// Failed to send a WebSocket message
+    #[error("WebSocket send error: {0}")]
+    Send(String),
+    /// Failed to receive a WebSocket message
+    #[error("WebSocket receive error: {0}")]
+    Receive(String),
+}

+ 16 - 0
crates/cdk-http-client/src/ws/mod.rs

@@ -0,0 +1,16 @@
+//! WebSocket client abstraction for CDK
+//!
+//! Provides a platform-agnostic WebSocket client. On native targets this uses
+//! `tokio-tungstenite`; on WASM it uses `ws_stream_wasm`.
+
+mod error;
+#[cfg(not(target_arch = "wasm32"))]
+mod native;
+#[cfg(target_arch = "wasm32")]
+mod wasm;
+
+pub use error::WsError;
+#[cfg(not(target_arch = "wasm32"))]
+pub use native::{connect, WsReceiver, WsSender};
+#[cfg(target_arch = "wasm32")]
+pub use wasm::{connect, WsReceiver, WsSender};

+ 104 - 0
crates/cdk-http-client/src/ws/native.rs

@@ -0,0 +1,104 @@
+//! Native WebSocket implementation using tokio-tungstenite
+
+use futures::{SinkExt, StreamExt};
+use tokio_tungstenite::tungstenite::client::IntoClientRequest;
+use tokio_tungstenite::tungstenite::Message;
+
+use super::WsError;
+
+/// WebSocket sender half
+pub struct WsSender {
+    inner: Box<
+        dyn futures::Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin + Send,
+    >,
+}
+
+impl std::fmt::Debug for WsSender {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WsSender").finish_non_exhaustive()
+    }
+}
+
+/// WebSocket receiver half
+pub struct WsReceiver {
+    inner: Box<
+        dyn futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
+            + Unpin
+            + Send,
+    >,
+}
+
+impl std::fmt::Debug for WsReceiver {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WsReceiver").finish_non_exhaustive()
+    }
+}
+
+impl WsSender {
+    /// Send a text message over the WebSocket
+    pub async fn send(&mut self, text: String) -> Result<(), WsError> {
+        self.inner
+            .send(Message::Text(text.into()))
+            .await
+            .map_err(|e| WsError::Send(e.to_string()))
+    }
+
+    /// Send a close frame
+    pub async fn close(&mut self) -> Result<(), WsError> {
+        self.inner
+            .send(Message::Close(None))
+            .await
+            .map_err(|e| WsError::Send(e.to_string()))
+    }
+}
+
+impl WsReceiver {
+    /// Receive the next text message. Returns `None` when the connection is closed.
+    /// Non-text messages are silently skipped.
+    pub async fn recv(&mut self) -> Option<Result<String, WsError>> {
+        loop {
+            match self.inner.next().await {
+                Some(Ok(Message::Text(text))) => return Some(Ok(text.to_string())),
+                Some(Ok(Message::Close(_))) | None => return None,
+                Some(Ok(_)) => continue, // skip binary, ping, pong
+                Some(Err(e)) => return Some(Err(WsError::Receive(e.to_string()))),
+            }
+        }
+    }
+}
+
+/// Connect to a WebSocket endpoint with optional headers.
+///
+/// `headers` is a slice of `(name, value)` pairs to include in the upgrade request.
+pub async fn connect(
+    url: &str,
+    headers: &[(&str, &str)],
+) -> Result<(WsSender, WsReceiver), WsError> {
+    let mut request = url
+        .into_client_request()
+        .map_err(|e| WsError::Connection(e.to_string()))?;
+
+    for &(name, value) in headers {
+        if let (Ok(header_name), Ok(header_value)) = (
+            name.parse::<tokio_tungstenite::tungstenite::http::header::HeaderName>(),
+            value.parse::<tokio_tungstenite::tungstenite::http::header::HeaderValue>(),
+        ) {
+            request.headers_mut().insert(header_name, header_value);
+        }
+    }
+
+    let (ws_stream, _) = tokio_tungstenite::connect_async(request)
+        .await
+        .map_err(|e| WsError::Connection(e.to_string()))?;
+
+    let (sink, stream) = ws_stream.split();
+
+    Ok((
+        WsSender {
+            inner: Box::new(sink),
+        },
+        WsReceiver {
+            inner: Box::new(stream),
+        },
+    ))
+}

+ 85 - 0
crates/cdk-http-client/src/ws/wasm.rs

@@ -0,0 +1,85 @@
+//! WASM WebSocket implementation using ws_stream_wasm
+
+use futures::{SinkExt, StreamExt};
+use ws_stream_wasm::{WsMessage, WsMeta};
+
+use super::WsError;
+
+/// WebSocket sender half
+pub struct WsSender {
+    inner: futures::stream::SplitSink<ws_stream_wasm::WsStream, WsMessage>,
+}
+
+impl std::fmt::Debug for WsSender {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WsSender").finish_non_exhaustive()
+    }
+}
+
+/// WebSocket receiver half
+pub struct WsReceiver {
+    inner: futures::stream::SplitStream<ws_stream_wasm::WsStream>,
+}
+
+impl std::fmt::Debug for WsReceiver {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WsReceiver").finish_non_exhaustive()
+    }
+}
+
+impl WsSender {
+    /// Send a text message over the WebSocket
+    pub async fn send(&mut self, text: String) -> Result<(), WsError> {
+        self.inner
+            .send(WsMessage::Text(text))
+            .await
+            .map_err(|e| WsError::Send(e.to_string()))
+    }
+
+    /// Send a close frame
+    pub async fn close(&mut self) -> Result<(), WsError> {
+        self.inner
+            .close()
+            .await
+            .map_err(|e| WsError::Send(e.to_string()))
+    }
+}
+
+impl WsReceiver {
+    /// Receive the next text message. Returns `None` when the connection is closed.
+    /// Non-text messages are silently skipped.
+    pub async fn recv(&mut self) -> Option<Result<String, WsError>> {
+        loop {
+            match self.inner.next().await {
+                Some(WsMessage::Text(text)) => return Some(Ok(text)),
+                Some(WsMessage::Binary(_)) => continue,
+                None => return None,
+            }
+        }
+    }
+}
+
+/// Connect to a WebSocket endpoint.
+///
+/// On WASM, custom headers are not supported by the browser WebSocket API.
+/// If `headers` is non-empty, a warning is logged and headers are ignored.
+pub async fn connect(
+    url: &str,
+    headers: &[(&str, &str)],
+) -> Result<(WsSender, WsReceiver), WsError> {
+    if !headers.is_empty() {
+        tracing::warn!(
+            "WebSocket headers are not supported on WASM (browser limitation). \
+             {} header(s) will be ignored.",
+            headers.len()
+        );
+    }
+
+    let (_meta, ws_stream) = WsMeta::connect(url, None)
+        .await
+        .map_err(|e| WsError::Connection(e.to_string()))?;
+
+    let (sink, stream) = ws_stream.split();
+
+    Ok((WsSender { inner: sink }, WsReceiver { inner: stream }))
+}

+ 0 - 5
crates/cdk/Cargo.toml

@@ -73,11 +73,6 @@ tokio = { workspace = true, features = [
 ] }
 getrandom = { version = "0.2" }
 cdk-signatory = { workspace = true, features = ["grpc"], optional = true }
-tokio-tungstenite = { workspace = true, features = [
-    "rustls",
-    "rustls-tls-native-roots",
-    "connect"
-] }
 # Tor dependencies (optional; enabled by feature "tor")
 hyper = { version = "0.14", optional = true, features = ["client", "http1", "http2"] }
 http = { version = "0.2", optional = true }

+ 3 - 1
crates/cdk/src/wallet/mint_connector/transport.rs

@@ -1,7 +1,9 @@
 //! HTTP Transport trait with a default implementation
 use std::fmt::Debug;
 
-use cdk_common::{AuthToken, HttpClient, HttpClientBuilder};
+#[cfg(not(target_arch = "wasm32"))]
+use cdk_common::HttpClientBuilder;
+use cdk_common::{AuthToken, HttpClient};
 #[cfg(all(feature = "bip353", not(target_arch = "wasm32")))]
 use hickory_resolver::config::ResolverConfig;
 #[cfg(all(feature = "bip353", not(target_arch = "wasm32")))]

+ 146 - 15
crates/cdk/src/wallet/subscription/mod.rs → crates/cdk/src/wallet/subscription.rs

@@ -10,7 +10,9 @@ use std::fmt::Debug;
 use std::sync::atomic::AtomicUsize;
 use std::sync::Arc;
 
-use cdk_common::nut17::ws::{WsMethodRequest, WsRequest, WsUnsubscribeRequest};
+use cdk_common::nut17::ws::{
+    WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscribeRequest,
+};
 use cdk_common::nut17::{Kind, NotificationId};
 use cdk_common::parking_lot::RwLock;
 use cdk_common::pub_sub::remote_consumer::{
@@ -18,7 +20,8 @@ use cdk_common::pub_sub::remote_consumer::{
 };
 use cdk_common::pub_sub::{Error as PubsubError, Spec, Subscriber};
 use cdk_common::subscription::WalletParams;
-use cdk_common::CheckStateRequest;
+use cdk_common::ws_client::{connect as ws_connect, WsError};
+use cdk_common::{CheckStateRequest, Method, RoutePath};
 use tokio::sync::mpsc;
 use uuid::Uuid;
 
@@ -26,9 +29,6 @@ use crate::event::MintEvent;
 use crate::mint_url::MintUrl;
 use crate::wallet::MintConnector;
 
-#[cfg(not(target_arch = "wasm32"))]
-mod ws;
-
 /// Notification Payload
 pub type NotificationPayload = crate::nuts::NotificationPayload<String>;
 
@@ -215,17 +215,11 @@ impl Transport for SubscriptionClient {
 
     async fn stream(
         &self,
-        _ctrls: mpsc::Receiver<StreamCtrl<Self::Spec>>,
-        _topics: Vec<SubscribeMessage<Self::Spec>>,
-        _reply_to: InternalRelay<Self::Spec>,
+        ctrls: mpsc::Receiver<StreamCtrl<Self::Spec>>,
+        topics: Vec<SubscribeMessage<Self::Spec>>,
+        reply_to: InternalRelay<Self::Spec>,
     ) -> Result<(), PubsubError> {
-        #[cfg(not(target_arch = "wasm32"))]
-        let r = ws::stream_client(self, _ctrls, _topics, _reply_to).await;
-
-        #[cfg(target_arch = "wasm32")]
-        let r = Err(PubsubError::NotSupported);
-
-        r
+        stream_client(self, ctrls, topics, reply_to).await
     }
 
     /// Poll on demand
@@ -353,3 +347,140 @@ impl Transport for SubscriptionClient {
         Ok(())
     }
 }
+
+async fn stream_client(
+    client: &SubscriptionClient,
+    mut ctrl: mpsc::Receiver<StreamCtrl<MintSubTopics>>,
+    topics: Vec<SubscribeMessage<MintSubTopics>>,
+    reply_to: InternalRelay<MintSubTopics>,
+) -> Result<(), PubsubError> {
+    let mut url = client
+        .mint_url
+        .join_paths(&["v1", "ws"])
+        .expect("Could not join paths");
+
+    if url.scheme() == "https" {
+        url.set_scheme("wss").expect("Could not set scheme");
+    } else {
+        url.set_scheme("ws").expect("Could not set scheme");
+    }
+
+    let mut headers: Vec<(&str, String)> = Vec::new();
+
+    {
+        let auth_wallet = client.http_client.get_auth_wallet().await;
+        let token = match auth_wallet.as_ref() {
+            Some(auth_wallet) => {
+                let endpoint = cdk_common::ProtectedEndpoint::new(Method::Get, RoutePath::Ws);
+                match auth_wallet.get_auth_for_request(&endpoint).await {
+                    Ok(token) => token,
+                    Err(err) => {
+                        tracing::warn!("Failed to get auth token: {:?}", err);
+                        None
+                    }
+                }
+            }
+            None => None,
+        };
+
+        if let Some(auth_token) = token {
+            let header_key = match &auth_token {
+                cdk_common::AuthToken::ClearAuth(_) => "Clear-auth",
+                cdk_common::AuthToken::BlindAuth(_) => "Blind-auth",
+            };
+
+            let header_value = auth_token.to_string();
+            headers.push((header_key, header_value));
+        }
+    }
+
+    let url_str = url.to_string();
+    let header_refs: Vec<(&str, &str)> = headers.iter().map(|(k, v)| (*k, v.as_str())).collect();
+
+    tracing::debug!("Connecting to {}", url);
+    let (mut sender, mut receiver) = ws_connect(&url_str, &header_refs).await.map_err(|err| {
+        tracing::error!("Error connecting: {err:?}");
+        map_ws_error(err)
+    })?;
+
+    tracing::debug!("Connected to {}", url);
+
+    for (name, index) in topics {
+        let (_, req) = if let Some(req) = client.get_sub_request(name, index) {
+            req
+        } else {
+            continue;
+        };
+
+        let _ = sender.send(req).await;
+    }
+
+    loop {
+        tokio::select! {
+            Some(msg) = ctrl.recv() => {
+                match msg {
+                    StreamCtrl::Subscribe(msg) => {
+                        let (_, req) = if let Some(req) = client.get_sub_request(msg.0, msg.1) {
+                            req
+                        } else {
+                            continue;
+                        };
+                        let _ = sender.send(req).await;
+                    }
+                    StreamCtrl::Unsubscribe(msg) => {
+                        let req = if let Some(req) = client.get_unsub_request(msg) {
+                            req
+                        } else {
+                            continue;
+                        };
+                        let _ = sender.send(req).await;
+                    }
+                    StreamCtrl::Stop => {
+                        if let Err(err) = sender.close().await {
+                            tracing::error!("Closing error {err:?}");
+                        }
+                        break;
+                    }
+                };
+            }
+            msg = receiver.recv() => {
+                let msg = match msg {
+                    Some(Ok(msg)) => msg,
+                    Some(Err(_)) => {
+                        if let Err(err) = sender.close().await {
+                            tracing::error!("Closing error {err:?}");
+                        }
+                        break;
+                    }
+                    None => break,
+                };
+                let msg = match serde_json::from_str::<WsMessageOrResponse<String>>(&msg) {
+                    Ok(msg) => msg,
+                    Err(_) => continue,
+                };
+
+                match msg {
+                    WsMessageOrResponse::Notification(ref payload) => {
+                        reply_to.send(payload.params.payload.clone());
+                    }
+                    WsMessageOrResponse::Response(response) => {
+                        tracing::debug!("Received response from server: {:?}", response);
+                    }
+                    WsMessageOrResponse::ErrorResponse(error) => {
+                        tracing::debug!("Received an error from server: {:?}", error);
+                        return Err(PubsubError::InternalStr(error.error.message));
+                    }
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
+fn map_ws_error(err: WsError) -> PubsubError {
+    match err {
+        WsError::Connection(_) => PubsubError::NotSupported,
+        other => PubsubError::InternalStr(other.to_string()),
+    }
+}

+ 0 - 158
crates/cdk/src/wallet/subscription/ws.rs

@@ -1,158 +0,0 @@
-use cdk_common::nut17::ws::WsMessageOrResponse;
-use cdk_common::pub_sub::remote_consumer::{InternalRelay, StreamCtrl, SubscribeMessage};
-use cdk_common::pub_sub::Error as PubsubError;
-use cdk_common::{Method, RoutePath};
-use futures::{SinkExt, StreamExt};
-use tokio::sync::mpsc;
-use tokio_tungstenite::connect_async;
-use tokio_tungstenite::tungstenite::client::IntoClientRequest;
-use tokio_tungstenite::tungstenite::Message;
-
-use super::{MintSubTopics, SubscriptionClient};
-
-#[inline(always)]
-pub(crate) async fn stream_client(
-    client: &SubscriptionClient,
-    mut ctrl: mpsc::Receiver<StreamCtrl<MintSubTopics>>,
-    topics: Vec<SubscribeMessage<MintSubTopics>>,
-    reply_to: InternalRelay<MintSubTopics>,
-) -> Result<(), PubsubError> {
-    let mut url = client
-        .mint_url
-        .join_paths(&["v1", "ws"])
-        .expect("Could not join paths");
-
-    if url.scheme() == "https" {
-        url.set_scheme("wss").expect("Could not set scheme");
-    } else {
-        url.set_scheme("ws").expect("Could not set scheme");
-    }
-
-    let mut request = url.to_string().into_client_request().map_err(|err| {
-        tracing::error!("Failed to create client request: {:?}", err);
-        // Fallback to HTTP client if we can't create the WebSocket request
-        cdk_common::pub_sub::Error::NotSupported
-    })?;
-
-    {
-        let auth_wallet = client.http_client.get_auth_wallet().await;
-        let token = match auth_wallet.as_ref() {
-            Some(auth_wallet) => {
-                let endpoint = cdk_common::ProtectedEndpoint::new(Method::Get, RoutePath::Ws);
-                match auth_wallet.get_auth_for_request(&endpoint).await {
-                    Ok(token) => token,
-                    Err(err) => {
-                        tracing::warn!("Failed to get auth token: {:?}", err);
-                        None
-                    }
-                }
-            }
-            None => None,
-        };
-
-        if let Some(auth_token) = token {
-            let header_key = match &auth_token {
-                cdk_common::AuthToken::ClearAuth(_) => "Clear-auth",
-                cdk_common::AuthToken::BlindAuth(_) => "Blind-auth",
-            };
-
-            match auth_token.to_string().parse() {
-                Ok(header_value) => {
-                    request.headers_mut().insert(header_key, header_value);
-                }
-                Err(err) => {
-                    tracing::warn!("Failed to parse auth token as header value: {:?}", err);
-                }
-            }
-        }
-    }
-
-    tracing::debug!("Connecting to {}", url);
-    let ws_stream = connect_async(request)
-        .await
-        .map(|(ws_stream, _)| ws_stream)
-        .map_err(|err| {
-            tracing::error!("Error connecting: {err:?}");
-
-            cdk_common::pub_sub::Error::Internal(Box::new(err))
-        })?;
-
-    tracing::debug!("Connected to {}", url);
-    let (mut write, mut read) = ws_stream.split();
-
-    for (name, index) in topics {
-        let (_, req) = if let Some(req) = client.get_sub_request(name, index) {
-            req
-        } else {
-            continue;
-        };
-
-        let _ = write.send(Message::Text(req.into())).await;
-    }
-
-    loop {
-        tokio::select! {
-            Some(msg) = ctrl.recv() => {
-                match msg {
-                    StreamCtrl::Subscribe(msg) => {
-                        let (_, req) = if let Some(req) = client.get_sub_request(msg.0, msg.1) {
-                            req
-                        } else {
-                            continue;
-                        };
-                        let _ = write.send(Message::Text(req.into())).await;
-                    }
-                    StreamCtrl::Unsubscribe(msg) => {
-                        let req = if let Some(req) = client.get_unsub_request(msg) {
-                            req
-                        } else {
-                            continue;
-                        };
-                        let _ = write.send(Message::Text(req.into())).await;
-                    }
-                    StreamCtrl::Stop => {
-                        if let Err(err) = write.send(Message::Close(None)).await {
-                            tracing::error!("Closing error {err:?}");
-                        }
-                        break;
-                    }
-                };
-            }
-            Some(msg) = read.next() => {
-                let msg = match msg {
-                    Ok(msg) => msg,
-                    Err(_) => {
-                        if let Err(err) = write.send(Message::Close(None)).await {
-                            tracing::error!("Closing error {err:?}");
-                        }
-                        break;
-                    }
-                };
-                let msg = match msg {
-                    Message::Text(msg) => msg,
-                    _ => continue,
-                };
-                let msg = match serde_json::from_str::<WsMessageOrResponse<String>>(&msg) {
-                    Ok(msg) => msg,
-                    Err(_) => continue,
-                };
-
-                match msg {
-                    WsMessageOrResponse::Notification(ref payload) => {
-                        reply_to.send(payload.params.payload.clone());
-                    }
-                    WsMessageOrResponse::Response(response) => {
-                        tracing::debug!("Received response from server: {:?}", response);
-                    }
-                    WsMessageOrResponse::ErrorResponse(error) => {
-                        tracing::debug!("Received an error from server: {:?}", error);
-                        return Err(PubsubError::InternalStr(error.error.message));
-                    }
-                }
-
-            }
-        }
-    }
-
-    Ok(())
-}