Kaynağa Gözat

Added clients.rs

Cesar Rodas 1 yıl önce
ebeveyn
işleme
56ac159acc
3 değiştirilmiş dosya ile 127 ekleme ve 122 silme
  1. 6 2
      crates/client/src/client.rs
  2. 119 0
      crates/client/src/clients.rs
  3. 2 120
      crates/client/src/lib.rs

+ 6 - 2
crates/client/src/client.rs

@@ -99,8 +99,12 @@ impl Client {
                             }
                         }
                         msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
-                            let msg =if let Ok(Some(Ok(Message::Text(msg)))) = msg {
-                                    msg
+                            let msg = if let Ok(Some(Ok(msg))) = msg {
+                                    if let Ok(msg) = msg.into_text() {
+                                        msg
+                                    } else {
+                                        continue;
+                                    }
                                 } else {
                                     log::error!("{} Reconnecting client due of empty recv", url);
                                     break;

+ 119 - 0
crates/client/src/clients.rs

@@ -0,0 +1,119 @@
+use crate::{Client, Error};
+use futures::Future;
+use futures_util::{stream::FuturesUnordered, StreamExt};
+use nostr_rs_types::{Request, Response};
+use parking_lot::RwLock;
+use std::{collections::HashMap, pin::Pin, sync::Arc};
+use tokio::sync::mpsc;
+
+#[derive(Debug, Clone)]
+pub struct Clients {
+    clients: Arc<RwLock<HashMap<String, Client>>>,
+}
+
+impl Default for Clients {
+    fn default() -> Self {
+        Self {
+            clients: Arc::new(RwLock::new(HashMap::new())),
+        }
+    }
+}
+
+impl Clients {
+    pub async fn recv(&self) -> Result<Option<(Response, String)>, Error> {
+        let mut subscriptions = self
+            .clients
+            .read()
+            .iter()
+            .map(|(_, c)| c.subscribe())
+            .collect::<Vec<Option<_>>>()
+            .into_iter()
+            .filter_map(|x| x)
+            .collect::<Vec<_>>();
+
+        if subscriptions.is_empty() {
+            return Err(Error::Disconnected);
+        }
+
+        let mut futures = FuturesUnordered::new();
+
+        for sub in subscriptions.iter_mut() {
+            futures.push(sub.recv());
+        }
+
+        if let Some(Ok(response)) = futures.next().await {
+            Ok(Some(response))
+        } else {
+            Ok(None)
+        }
+    }
+
+    pub fn try_recv(&self) -> Result<Option<(Response, String)>, Error> {
+        let mut subscriptions = self
+            .clients
+            .read()
+            .iter()
+            .map(|(_, c)| c.subscribe())
+            .collect::<Vec<Option<_>>>()
+            .into_iter()
+            .filter_map(|x| x)
+            .collect::<Vec<_>>();
+
+        if subscriptions.is_empty() {
+            return Err(Error::Disconnected);
+        }
+
+        for sub in subscriptions.iter_mut() {
+            if let Ok(msg) = sub.try_recv() {
+                return Ok(Some(msg));
+            }
+        }
+        Ok(None)
+    }
+
+    pub fn check_active_connections(&self) -> usize {
+        let mut clients = self.clients.write();
+        let mut to_remove = vec![];
+        for (url, client) in clients.iter() {
+            if !client.is_running() {
+                to_remove.push(url.to_owned());
+            }
+        }
+
+        for url in to_remove.iter() {
+            clients.remove(url);
+        }
+
+        clients.len()
+    }
+
+    pub async fn send(&self, request: Request) {
+        let senders = self
+            .clients
+            .read()
+            .iter()
+            .map(|(_, c)| c.send_to_socket.clone())
+            .collect::<Vec<mpsc::Sender<_>>>();
+
+        for sender in senders.iter() {
+            let _ = sender.send(request.clone()).await;
+        }
+    }
+
+    pub async fn connect_to<F>(&self, url: &str, on_connection: Option<F>) -> Result<bool, Error>
+    where
+        F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+            + Send
+            + Sync
+            + 'static,
+    {
+        let mut clients = self.clients.write();
+        Ok(if clients.get(url).is_some() {
+            false
+        } else {
+            log::warn!("Connecting to {}", url);
+            clients.insert(url.to_owned(), Client::new(url, on_connection)?);
+            true
+        })
+    }
+}

+ 2 - 120
crates/client/src/lib.rs

@@ -1,123 +1,5 @@
-use futures::Future;
-use futures_util::{stream::FuturesUnordered, StreamExt};
-use nostr_rs_types::{Request, Response};
-use parking_lot::RwLock;
-use std::{collections::HashMap, pin::Pin, sync::Arc};
-use tokio::sync::mpsc;
-
 mod client;
+mod clients;
 mod error;
 
-pub use self::{client::Client, error::Error};
-
-#[derive(Debug, Clone)]
-pub struct Clients {
-    clients: Arc<RwLock<HashMap<String, Client>>>,
-}
-
-impl Default for Clients {
-    fn default() -> Self {
-        Self {
-            clients: Arc::new(RwLock::new(HashMap::new())),
-        }
-    }
-}
-
-impl Clients {
-    pub async fn recv(&self) -> Result<Option<(Response, String)>, Error> {
-        let mut subscriptions = self
-            .clients
-            .read()
-            .iter()
-            .map(|(_, c)| c.subscribe())
-            .collect::<Vec<Option<_>>>()
-            .into_iter()
-            .filter_map(|x| x)
-            .collect::<Vec<_>>();
-
-        if subscriptions.is_empty() {
-            return Err(Error::Disconnected);
-        }
-
-        let mut futures = FuturesUnordered::new();
-
-        for sub in subscriptions.iter_mut() {
-            futures.push(sub.recv());
-        }
-
-        if let Some(Ok(response)) = futures.next().await {
-            Ok(Some(response))
-        } else {
-            Ok(None)
-        }
-    }
-
-    pub fn try_recv(&self) -> Result<Option<(Response, String)>, Error> {
-        let mut subscriptions = self
-            .clients
-            .read()
-            .iter()
-            .map(|(_, c)| c.subscribe())
-            .collect::<Vec<Option<_>>>()
-            .into_iter()
-            .filter_map(|x| x)
-            .collect::<Vec<_>>();
-
-        if subscriptions.is_empty() {
-            return Err(Error::Disconnected);
-        }
-
-        for sub in subscriptions.iter_mut() {
-            if let Ok(msg) = sub.try_recv() {
-                return Ok(Some(msg));
-            }
-        }
-        Ok(None)
-    }
-
-    pub fn check_active_connections(&self) -> usize {
-        let mut clients = self.clients.write();
-        let mut to_remove = vec![];
-        for (url, client) in clients.iter() {
-            if !client.is_running() {
-                to_remove.push(url.to_owned());
-            }
-        }
-
-        for url in to_remove.iter() {
-            clients.remove(url);
-        }
-
-        clients.len()
-    }
-
-    pub async fn send(&self, request: Request) {
-        let senders = self
-            .clients
-            .read()
-            .iter()
-            .map(|(_, c)| c.send_to_socket.clone())
-            .collect::<Vec<mpsc::Sender<_>>>();
-
-        for sender in senders.iter() {
-            let _ = sender.send(request.clone()).await;
-        }
-    }
-
-    pub async fn connect_to<F>(&self, url: &str, on_connection: Option<F>) -> Result<bool, Error>
-    where
-        F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
-            + Send
-            + Sync
-            + 'static,
-    {
-        let mut clients = self.clients.write();
-        Ok(if clients.get(url).is_some() {
-            false
-        } else {
-            log::warn!("Connecting to {}", url);
-            clients.insert(url.to_owned(), Client::new(url, on_connection)?);
-            true
-        })
-    }
-}
+pub use self::{client::Client, clients::Clients, error::Error};