|
|
@@ -1,6 +1,7 @@
|
|
|
use std::collections::{HashMap, HashSet};
|
|
|
use std::sync::atomic::AtomicUsize;
|
|
|
use std::sync::Arc;
|
|
|
+use std::time::Duration;
|
|
|
|
|
|
use cdk_common::subscription::Params;
|
|
|
use cdk_common::ws::{WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscribeRequest};
|
|
|
@@ -8,6 +9,7 @@ use cdk_common::ws::{WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscr
|
|
|
use cdk_common::{Method, RoutePath};
|
|
|
use futures::{SinkExt, StreamExt};
|
|
|
use tokio::sync::{mpsc, RwLock};
|
|
|
+use tokio::time::sleep;
|
|
|
use tokio_tungstenite::connect_async;
|
|
|
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
|
@@ -61,6 +63,12 @@ pub async fn ws_main(
|
|
|
let mut failure_count = 0;
|
|
|
|
|
|
loop {
|
|
|
+ if subscriptions.read().await.is_empty() {
|
|
|
+ // No active subscription
|
|
|
+ sleep(Duration::from_millis(100)).await;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
let mut request_clone = request.clone();
|
|
|
#[cfg(feature = "auth")]
|
|
|
{
|
|
|
@@ -260,6 +268,13 @@ pub async fn ws_main(
|
|
|
if let Some(json) = get_unsub_request(subid) {
|
|
|
let _ = write.send(Message::Text(json.into())).await;
|
|
|
}
|
|
|
+
|
|
|
+ if subscription.is_empty() {
|
|
|
+ if let Err(err) = write.send(Message::Close(None)).await {
|
|
|
+ tracing::error!("Closing error {err:?}");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|