소스 검색

Fix bug with websocket close (#1144)

* Fix bug with websocket close

Fixes #1111

* Do not connect to ws if there are no active subscription
C 1 개월 전
부모
커밋
81a47d5d12
1개의 변경된 파일25개의 추가작업 그리고 1개의 파일을 삭제
  1. 25 1
      crates/cdk/src/wallet/subscription/ws.rs

+ 25 - 1
crates/cdk/src/wallet/subscription/ws.rs

@@ -1,6 +1,7 @@
 use std::collections::{HashMap, HashSet};
 use std::sync::atomic::AtomicUsize;
 use std::sync::Arc;
+use std::time::Duration;
 
 use cdk_common::subscription::Params;
 use cdk_common::ws::{WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscribeRequest};
@@ -8,6 +9,7 @@ use cdk_common::ws::{WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscr
 use cdk_common::{Method, RoutePath};
 use futures::{SinkExt, StreamExt};
 use tokio::sync::{mpsc, RwLock};
+use tokio::time::sleep;
 use tokio_tungstenite::connect_async;
 use tokio_tungstenite::tungstenite::client::IntoClientRequest;
 use tokio_tungstenite::tungstenite::Message;
@@ -61,6 +63,12 @@ pub async fn ws_main(
     let mut failure_count = 0;
 
     loop {
+        if subscriptions.read().await.is_empty() {
+            // No active subscription
+            sleep(Duration::from_millis(100)).await;
+            continue;
+        }
+
         let mut request_clone = request.clone();
         #[cfg(feature = "auth")]
         {
@@ -177,7 +185,12 @@ pub async fn ws_main(
                 Some(msg) = read.next() => {
                     let msg = match msg {
                         Ok(msg) => msg,
-                        Err(_) => break,
+                        Err(_) => {
+                            if let Err(err) = write.send(Message::Close(None)).await {
+                                tracing::error!("Closing error {err:?}");
+                            }
+                            break
+                        },
                     };
                     let msg = match msg {
                         Message::Text(msg) => msg,
@@ -222,6 +235,10 @@ pub async fn ws_main(
                                     .await;
                                 }
 
+                                if let Err(err) = write.send(Message::Close(None)).await {
+                                    tracing::error!("Closing error {err:?}");
+                                }
+
                                 break; // break connection to force a reconnection, to attempt to recover form this error
                             }
                         }
@@ -251,6 +268,13 @@ pub async fn ws_main(
                     if let Some(json) = get_unsub_request(subid) {
                         let _ = write.send(Message::Text(json.into())).await;
                     }
+
+                    if subscription.is_empty() {
+                        if let Err(err) = write.send(Message::Close(None)).await {
+                            tracing::error!("Closing error {err:?}");
+                        }
+                        break;
+                    }
                 }
             }
         }