Pārlūkot izejas kodu

Change API

* Remove disconnection event
* Keep trying to connect until Relayer client struct is Drop
* Remove oneshop and use tokio::spawn JoinHandle instead to stop working thread
Cesar Rodas 1 gadu atpakaļ
vecāks
revīzija
a98c9fba37
2 mainītis faili ar 31 papildinājumiem un 76 dzēšanām
  1. 10 35
      crates/client/src/pool.rs
  2. 21 41
      crates/client/src/relayer.rs

+ 10 - 35
crates/client/src/pool.rs

@@ -72,21 +72,13 @@ impl Pool {
         self.clients.values().collect::<Vec<&Relayer>>()
     }
 
-    /// Returns the number of active connections. If a connection to a relayer
-    /// is not active it will be removed from the list
-    pub fn check_active_connections(&mut self) -> usize {
-        let mut to_remove = vec![];
-        for (url, client) in self.clients.iter() {
-            if !client.is_running() {
-                to_remove.push(url.to_owned());
-            }
-        }
-
-        for url in to_remove.iter() {
-            self.clients.remove(url);
-        }
-
-        self.clients.len()
+    /// Returns the number of active connections.
+    pub fn check_active_connections(&self) -> usize {
+        self.clients
+            .iter()
+            .filter(|(_, relayer)| relayer.is_connected())
+            .collect::<Vec<_>>()
+            .len()
     }
 
     /// Creates a connection to a new relayer. If the connection is successful a
@@ -99,38 +91,21 @@ impl Pool {
     ///
     /// This function will open a connection at most once, if a connection
     /// already exists false will be returned
-    pub fn connect_to<F>(
-        mut self,
-        url: &str,
-        max_connections_attempts: u16,
-        on_connection: Option<F>,
-    ) -> Result<Self, Error>
+    pub fn connect_to<F>(mut self, url: &str, on_connection: Option<F>) -> Result<Self, Error>
     where
         F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
     {
-        if self.clients.get(url).is_none() {
+        if !self.clients.contains_key(url) {
             log::warn!("Connecting to {}", url);
             self.clients.insert(
                 url.to_owned(),
-                Relayer::new(
-                    self.sender.clone(),
-                    max_connections_attempts,
-                    url,
-                    on_connection,
-                )?,
+                Relayer::new(self.sender.clone(), url, on_connection)?,
             );
         }
 
         Ok(self)
     }
-
-    /// Disconnects from all relayers
-    pub async fn stop(&mut self) {
-        for (_, client) in self.clients.drain() {
-            client.disconnect().await;
-        }
-    }
 }

+ 21 - 41
crates/client/src/relayer.rs

@@ -10,7 +10,8 @@ use std::{
     },
 };
 use tokio::{
-    sync::{mpsc, oneshot},
+    sync::mpsc,
+    task::JoinHandle,
     time::{sleep, timeout, Duration},
 };
 use tokio_tungstenite::{connect_async, tungstenite::Message};
@@ -24,18 +25,24 @@ pub struct Relayer {
     /// Sender to the relayer. This can be used to send a Requests to this
     /// relayer
     pub send_to_socket: mpsc::Sender<Request>,
+
+    worker: JoinHandle<()>,
+
     is_connected: Arc<AtomicBool>,
-    /// This sender signals to background connection to stop
-    stop_service: oneshot::Sender<()>,
 }
 
 const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
 
+impl Drop for Relayer {
+    fn drop(&mut self) {
+        self.worker.abort()
+    }
+}
+
 impl Relayer {
     /// Creates a new relayer
     pub fn new<F>(
         broadcast_to_listeners: mpsc::Sender<(Event, String)>,
-        max_connections_attempts: u16,
         url: &str,
         on_connection: Option<F>,
     ) -> Result<Self, Error>
@@ -47,12 +54,11 @@ impl Relayer {
     {
         let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
         let is_connected = Arc::new(AtomicBool::new(false));
-        let stop_service = Self::spawn_background_client(
+        let worker = Self::spawn_background_client(
             broadcast_to_listeners,
             sender_to_socket.clone(),
             send_to_socket,
             url,
-            max_connections_attempts,
             is_connected.clone(),
             on_connection,
         )?;
@@ -61,7 +67,7 @@ impl Relayer {
             url: url.to_owned(),
             is_connected,
             send_to_socket: sender_to_socket,
-            stop_service,
+            worker,
         })
     }
 
@@ -70,26 +76,24 @@ impl Relayer {
         sender_to_socket: mpsc::Sender<Request>,
         mut send_to_socket: mpsc::Receiver<Request>,
         url_str: &str,
-        max_connections_attempts: u16,
         is_connected: Arc<AtomicBool>,
         on_connection: Option<F>,
-    ) -> Result<oneshot::Sender<()>, Error>
+    ) -> Result<JoinHandle<()>, Error>
     where
         F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
     {
-        let (stop_service, mut stopper_recv) = oneshot::channel();
-
         let url = url_str.to_owned();
         let url_parsed = Url::parse(&url)?;
 
-        tokio::spawn(async move {
-            let mut reconnect = true;
+        is_connected.store(false, Relaxed);
+
+        Ok(tokio::spawn(async move {
             let mut connection_attempts = 0;
 
-            while reconnect && connection_attempts <= max_connections_attempts {
+            loop {
                 log::warn!("{}: Connect attempt {}", url, connection_attempts);
                 connection_attempts += 1;
                 let mut socket = match connect_async(url_parsed.clone()).await {
@@ -102,6 +106,7 @@ impl Relayer {
                 };
 
                 log::info!("Connected to {}", url);
+                connection_attempts = 0;
 
                 if let Some(on_connection) = &on_connection {
                     on_connection(&url, sender_to_socket.clone()).await;
@@ -109,12 +114,6 @@ impl Relayer {
 
                 loop {
                     tokio::select! {
-                        Ok(()) = &mut stopper_recv => {
-                            log::warn!("{}: Breaking client due external signal", url);
-                            reconnect = false;
-                            break;
-                        },
-
                         Some(msg) = send_to_socket.recv() => {
                             if let Ok(json) = serde_json::to_string(&msg) {
                                 log::info!("{}: Sending {}", url, json);
@@ -125,8 +124,8 @@ impl Relayer {
                             }
                         }
                         msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
-                            is_connected.store(true, Relaxed);
                             let msg = if let Ok(Some(Ok(msg))) = msg {
+                                is_connected.store(true, Relaxed);
                                     match msg {
                                         Message::Text(text) => text,
                                         Message::Ping(msg) => {
@@ -151,7 +150,6 @@ impl Relayer {
                             }
 
                             log::info!("New message: {}", msg);
-                            connection_attempts = 0;
 
 
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
@@ -174,20 +172,7 @@ impl Relayer {
                 // Throttle down to not spam the server with reconnections
                 sleep(Duration::from_millis(500)).await;
             }
-
-            let _ = broadcast_to_listeners.try_send((Event::Disconnected, "".to_owned()));
-
-            log::warn!("{}: Disconnected", url);
-        });
-
-        Ok(stop_service)
-    }
-
-    /// Checks if the relayer background connection is running. It is not
-    /// guaranteed there is an active connection, it may be in the process of
-    /// reconnecting.
-    pub fn is_running(&self) -> bool {
-        !self.stop_service.is_closed()
+        }))
     }
 
     /// Checks if the relayer is connected. It is guaranteed that the relayer is
@@ -203,9 +188,4 @@ impl Relayer {
             .await
             .map_err(|e| Error::Sync(Box::new(e)))
     }
-
-    /// Stops the background thread that has the connection to this relayer
-    pub async fn disconnect(self) {
-        let _ = self.stop_service.send(());
-    }
 }