Sfoglia il codice sorgente

Improve reconnecting time

Cesar Rodas 1 anno fa
parent
commit
c6b59559ad
3 ha cambiato i file con 46 aggiunte e 18 eliminazioni
  1. 22 12
      crates/client/src/client.rs
  2. 3 0
      crates/client/src/error.rs
  3. 21 6
      crates/client/src/lib.rs

+ 22 - 12
crates/client/src/client.rs

@@ -5,7 +5,7 @@ use nostr_rs_types::{Request, Response};
 use std::pin::Pin;
 use tokio::{
     sync::{broadcast, mpsc, oneshot},
-    time::{sleep, Duration},
+    time::{sleep, timeout, Duration},
 };
 use tokio_tungstenite::{connect_async, tungstenite::Message};
 use url::Url;
@@ -20,6 +20,9 @@ pub struct Client {
     stop_service: oneshot::Sender<()>,
 }
 
+const NO_ACTIVITY_TIMEOUT_SECS: u64 = 30;
+const MAX_RECONNECT_ATTEMPTS: u64 = 15;
+
 impl Client {
     pub fn new<F>(url: &str, on_connection: Option<F>) -> Result<Self, Error>
     where
@@ -60,8 +63,8 @@ impl Client {
 
         tokio::spawn(async move {
             let mut reconnect = true;
-            let mut retries: usize = 0;
-            while reconnect && retries <= 10 {
+            let mut retries = 0;
+            while reconnect && retries <= MAX_RECONNECT_ATTEMPTS {
                 log::warn!("{}: Connect attempt {}", url, retries);
                 retries += 1;
                 let mut socket = if let Ok(x) = connect_async(url_parsed.clone()).await {
@@ -85,29 +88,31 @@ impl Client {
                             reconnect = false;
                             break;
                         },
+
                         Some(msg) = receiver.recv() => {
                             if let Ok(json) = serde_json::to_string(&msg) {
                                 log::info!("{}: Sending {}", url, json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
-                                    log::error!("{} :Reconnecting due {}", url, x);
+                                    log::error!("{} : Reconnecting due {}", url, x);
                                     break;
-
                                 }
                             }
                         }
-                        Some(Ok(msg)) = socket.next() => {
-                            let msg =if let Ok(msg) = msg.into_text() {
+                        msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
+                            let msg =if let Ok(Some(Ok(Message::Text(msg)))) = msg {
                                     msg
                                 } else {
-                                    continue;
+                                    log::error!("{} Reconnecting client due of empty recv", url);
+                                    break;
                                 };
 
+                            log::info!("New message: {}", msg);
+                            retries = 0;
+
                             if msg.is_empty() {
                                 continue;
                             }
 
-                            log::info!("New message: {}", msg);
-
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
@@ -119,6 +124,7 @@ impl Client {
                         }
                         else => {
                             log::warn!("{}: else", url);
+                            break;
                         }
                     }
                 }
@@ -134,8 +140,12 @@ impl Client {
         !self.stop_service.is_closed()
     }
 
-    pub fn subscribe(&self) -> broadcast::Receiver<(Response, String)> {
-        self.recv_from_socket.resubscribe()
+    pub fn subscribe(&self) -> Option<broadcast::Receiver<(Response, String)>> {
+        if self.stop_service.is_closed() {
+            None
+        } else {
+            Some(self.recv_from_socket.resubscribe())
+        }
     }
 
     pub async fn send(&self, request: Request) -> Result<(), Error> {

+ 3 - 0
crates/client/src/error.rs

@@ -12,4 +12,7 @@ pub enum Error {
 
     #[error("Sync: {0}")]
     Sync(#[from] SendError<Request>),
+
+    #[error("There is no connection")]
+    Disconnected,
 }

+ 21 - 6
crates/client/src/lib.rs

@@ -24,14 +24,21 @@ impl Default for Clients {
 }
 
 impl Clients {
-    pub async fn recv(&self) -> Option<(Response, String)> {
+    pub async fn recv(&self) -> Result<Option<(Response, String)>, Error> {
         let mut subscriptions = self
             .clients
             .read()
             .iter()
             .map(|(_, c)| c.subscribe())
+            .collect::<Vec<Option<_>>>()
+            .into_iter()
+            .filter_map(|x| x)
             .collect::<Vec<_>>();
 
+        if subscriptions.is_empty() {
+            return Err(Error::Disconnected);
+        }
+
         let mut futures = FuturesUnordered::new();
 
         for sub in subscriptions.iter_mut() {
@@ -39,25 +46,33 @@ impl Clients {
         }
 
         if let Some(Ok(response)) = futures.next().await {
-            Some(response)
+            Ok(Some(response))
         } else {
-            None
+            Ok(None)
         }
     }
 
-    pub fn try_recv(&self) -> Option<(Response, String)> {
+    pub fn try_recv(&self) -> Result<Option<(Response, String)>, Error> {
         let mut subscriptions = self
             .clients
             .read()
             .iter()
             .map(|(_, c)| c.subscribe())
+            .collect::<Vec<Option<_>>>()
+            .into_iter()
+            .filter_map(|x| x)
             .collect::<Vec<_>>();
+
+        if subscriptions.is_empty() {
+            return Err(Error::Disconnected);
+        }
+
         for sub in subscriptions.iter_mut() {
             if let Ok(msg) = sub.try_recv() {
-                return Some(msg);
+                return Ok(Some(msg));
             }
         }
-        None
+        Ok(None)
     }
 
     pub fn check_active_connections(&self) -> usize {