Преглед на файлове

Reduce client complexity

Use a single mpsc channel to exchange message from relayer to relayers
Cesar Rodas преди 1 година
родител
ревизия
64a7c35325
променени са 3 файла, в които са добавени 36 реда и са изтрити 98 реда
  1. 9 20
      crates/client/src/relayer.rs
  2. 22 76
      crates/client/src/relayers.rs
  3. 5 2
      src/bin/dump.rs

+ 9 - 20
crates/client/src/relayer.rs

@@ -5,7 +5,7 @@ use nostr_rs_types::{Request, Response};
 use parking_lot::RwLock;
 use std::{pin::Pin, sync::Arc};
 use tokio::{
-    sync::{broadcast, mpsc, oneshot},
+    sync::{mpsc, oneshot},
     time::{sleep, timeout, Duration},
 };
 use tokio_tungstenite::{connect_async, tungstenite::Message};
@@ -18,17 +18,16 @@ pub struct Relayer {
     pub url: String,
     /// Sender to the relayer. This can be used to send a Requests to this relayer
     pub send_to_socket: mpsc::Sender<Request>,
-    /// Internal receiver. This is used to receive messages from the relayer
-    recv_from_socket: broadcast::Receiver<(Response, String)>,
     /// This sender signals to background connection to stop
     stop_service: oneshot::Sender<()>,
 }
 
-const NO_ACTIVITY_TIMEOUT_SECS: u64 = 30;
+const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
 
 impl Relayer {
     /// Creates a new relayer
     pub fn new<F>(
+        publish_to_listeners: mpsc::Sender<(Response, String)>,
         sent_messages: Arc<RwLock<Vec<Request>>>,
         connection_retries: u16,
         url: &str,
@@ -41,7 +40,8 @@ impl Relayer {
             + 'static,
     {
         let (send_to_socket, receiver) = mpsc::channel(100_000);
-        let (recv_from_socket, stop_service) = Self::spawn_background_client(
+        let stop_service = Self::spawn_background_client(
+            publish_to_listeners,
             sent_messages,
             send_to_socket.clone(),
             receiver,
@@ -54,25 +54,24 @@ impl Relayer {
             url: url.to_owned(),
             send_to_socket,
             stop_service,
-            recv_from_socket,
         })
     }
 
     fn spawn_background_client<F>(
+        publish_to_listeners: mpsc::Sender<(Response, String)>,
         sent_messages: Arc<RwLock<Vec<Request>>>,
         send_to_socket: mpsc::Sender<Request>,
         mut receiver: mpsc::Receiver<Request>,
         url_str: &str,
         connection_retries: u16,
         on_connection: Option<F>,
-    ) -> Result<(broadcast::Receiver<(Response, String)>, oneshot::Sender<()>), Error>
+    ) -> Result<oneshot::Sender<()>, Error>
     where
         F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
     {
-        let (publish_to_listener, recv_from_socket) = broadcast::channel(10_000);
         let (stop_service, mut stopper_recv) = oneshot::channel();
 
         let url = url_str.to_owned();
@@ -140,7 +139,7 @@ impl Relayer {
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = publish_to_listener.send((msg, url.to_owned())) {
+                                if let Err(error) = publish_to_listeners.send((msg, url.to_owned())).await {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }
@@ -159,7 +158,7 @@ impl Relayer {
             log::warn!("{}: Disconnected", url);
         });
 
-        Ok((recv_from_socket, stop_service))
+        Ok(stop_service)
     }
 
     /// Checks if the relayer background connection is running
@@ -167,16 +166,6 @@ impl Relayer {
         !self.stop_service.is_closed()
     }
 
-    /// Subscribe to responses sent through this relayer. If the relayer is not
-    /// running, this will return None
-    pub fn subscribe(&self) -> Option<broadcast::Receiver<(Response, String)>> {
-        if self.stop_service.is_closed() {
-            None
-        } else {
-            Some(self.recv_from_socket.resubscribe())
-        }
-    }
-
     /// Sends a requests to this relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         Ok(self.send_to_socket.send(request).await?)

+ 22 - 76
crates/client/src/relayers.rs

@@ -3,7 +3,6 @@
 //! This is the main entry point to the client library.
 use crate::{Error, Relayer};
 use futures::Future;
-use futures_util::{stream::FuturesUnordered, StreamExt};
 use nostr_rs_types::{Request, Response};
 use parking_lot::RwLock;
 use std::{collections::HashMap, pin::Pin, sync::Arc};
@@ -14,107 +13,55 @@ use tokio::sync::mpsc;
 /// This is a set of outgoing connections to relayers. This struct can connect
 /// async to N relayers offering a simple API to talk to all of them at the same
 /// time, and to receive messages
-#[derive(Debug, Clone)]
+#[derive(Debug)]
 pub struct Relayers {
-    clients: Arc<RwLock<HashMap<String, Relayer>>>,
+    clients: HashMap<String, Relayer>,
     messages: Arc<RwLock<Vec<Request>>>,
+    receiver: mpsc::Receiver<(Response, String)>,
+    sender: mpsc::Sender<(Response, String)>,
 }
 
 impl Default for Relayers {
     fn default() -> Self {
+        let (sender, receiver) = mpsc::channel(100_000);
         Self {
-            clients: Arc::new(RwLock::new(HashMap::new())),
+            clients: HashMap::new(),
             messages: Arc::new(RwLock::new(vec![])),
+            sender,
+            receiver,
         }
     }
 }
 
 impl Relayers {
     /// Receives a Response from any of the connected relayers
-    pub async fn recv(&self) -> Result<Option<(Response, String)>, Error> {
-        let mut subscriptions = self
-            .clients
-            .read()
-            .iter()
-            .map(|(_, c)| c.subscribe())
-            .collect::<Vec<Option<_>>>()
-            .into_iter()
-            .filter_map(|x| x)
-            .collect::<Vec<_>>();
-
-        if subscriptions.is_empty() {
-            return Err(Error::Disconnected);
-        }
-
-        let mut futures = FuturesUnordered::new();
-
-        for sub in subscriptions.iter_mut() {
-            futures.push(sub.recv());
-        }
-
-        if let Some(Ok(response)) = futures.next().await {
-            Ok(Some(response))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Attempts to receive a Response from any of the relayers but won't wait if none
-    /// is available. It will return None if None is available
-    pub fn try_recv(&self) -> Result<Option<(Response, String)>, Error> {
-        let mut subscriptions = self
-            .clients
-            .read()
-            .iter()
-            .map(|(_, c)| c.subscribe())
-            .collect::<Vec<Option<_>>>()
-            .into_iter()
-            .filter_map(|x| x)
-            .collect::<Vec<_>>();
-
-        if subscriptions.is_empty() {
-            return Err(Error::Disconnected);
-        }
-
-        for sub in subscriptions.iter_mut() {
-            if let Ok(msg) = sub.try_recv() {
-                return Ok(Some(msg));
-            }
-        }
-        Ok(None)
+    pub async fn recv(&mut self) -> Result<Option<(Response, String)>, Error> {
+        Ok(self.receiver.recv().await)
     }
 
     /// Returns the number of active connections. If a connection to a relayer
     /// is not active it will be removed from the list
-    pub fn check_active_connections(&self) -> usize {
-        let mut clients = self.clients.write();
+    pub fn check_active_connections(&mut self) -> usize {
         let mut to_remove = vec![];
-        for (url, client) in clients.iter() {
+        for (url, client) in self.clients.iter() {
             if !client.is_running() {
                 to_remove.push(url.to_owned());
             }
         }
 
         for url in to_remove.iter() {
-            clients.remove(url);
+            self.clients.remove(url);
         }
 
-        clients.len()
+        self.clients.len()
     }
 
     /// Sends a request to all the connected relayers
     pub async fn send(&self, request: Request) {
-        let senders = self
-            .clients
-            .read()
-            .iter()
-            .map(|(_, c)| c.send_to_socket.clone())
-            .collect::<Vec<mpsc::Sender<_>>>();
-
         // Add a copy of the request for incoming connections.
         self.messages.write().push(request.clone());
 
-        for sender in senders.iter() {
+        for (_, sender) in self.clients.iter() {
             let _ = sender.send(request.clone()).await;
         }
     }
@@ -130,7 +77,7 @@ impl Relayers {
     /// This function will open a connection at most once, if a connection
     /// already exists false will be returned
     pub async fn connect_to<F>(
-        &self,
+        &mut self,
         url: &str,
         connection_retries: u16,
         on_connection: Option<F>,
@@ -141,14 +88,14 @@ impl Relayers {
             + Sync
             + 'static,
     {
-        let mut clients = self.clients.write();
-        Ok(if clients.get(url).is_some() {
+        Ok(if self.clients.get(url).is_some() {
             false
         } else {
             log::warn!("Connecting to {}", url);
-            clients.insert(
+            self.clients.insert(
                 url.to_owned(),
                 Relayer::new(
+                    self.sender.clone(),
                     self.messages.clone(),
                     connection_retries,
                     url,
@@ -160,12 +107,11 @@ impl Relayers {
     }
 
     /// Disconnects from all relayers
-    pub async fn stop(&self) {
-        let mut clients = self.clients.write();
-        let keys = clients.keys().cloned().collect::<Vec<_>>();
+    pub async fn stop(&mut self) {
+        let keys = self.clients.keys().cloned().collect::<Vec<_>>();
 
         for key in keys.iter() {
-            if let Some(client) = clients.remove(key) {
+            if let Some(client) = self.clients.remove(key) {
                 client.stop().await;
             }
         }

+ 5 - 2
src/bin/dump.rs

@@ -29,12 +29,15 @@ fn on_connection(
 #[tokio::main]
 async fn main() {
     env_logger::init();
-    let clients = Relayers::default();
+    let mut clients = Relayers::default();
     clients.send(Subscribe::default().into()).await;
 
     let _ = clients
         .connect_to("wss://relay.damus.io/", 30, Some(on_connection))
         .await;
+    let _ = clients
+        .connect_to("wss://nostramsterdam.vpx.moe/", 30, Some(on_connection))
+        .await;
     let db = RocksDb::new("./db").expect("db");
 
     loop {
@@ -45,7 +48,7 @@ async fn main() {
                     let event = x.event;
 
                     if db.store(&event).expect("valid") {
-                        panic!("\tStored");
+                        println!("\tStored");
                     } else {
                         println!("\tSkip");
                     }