Explorar el Código

Better signature

Cesar Rodas hace 9 meses
padre
commit
daf45dab88
Se han modificado 5 ficheros con 156 adiciones y 31 borrados
  1. 1 1
      crates/client/src/lib.rs
  2. 136 0
      crates/client/src/pool.rs
  3. 2 1
      crates/client/src/relayer.rs
  4. 10 15
      src/bin/dump.rs
  5. 7 14
      src/main.rs

+ 1 - 1
crates/client/src/lib.rs

@@ -4,7 +4,7 @@
 //! events or perform any operation that can be done in the protocol.
 //!
 //! The client will let you connect to a pool of relayers oferring a simple
-//! protocol to talk to all of them at the same time.
+//! and unified interface to interact with a pool of relayers at the same time.
 //!
 //! It will also have reconnection logic built-in internally.
 #![deny(missing_docs, warnings)]

+ 136 - 0
crates/client/src/pool.rs

@@ -0,0 +1,136 @@
+//! Relayers
+//!
+//! This is the main entry point to the client library.
+use crate::{Error, Relayer};
+use futures::Future;
+use nostr_rs_types::{Request, Response};
+use std::{collections::HashMap, pin::Pin};
+use tokio::sync::mpsc;
+
+/// Clients
+///
+/// This is a set of outgoing connections to relayers. This struct can connect
+/// async to N relayers offering a simple API to talk to all of them at the same
+/// time, and to receive messages
+#[derive(Debug)]
+pub struct Pool {
+    clients: HashMap<String, Relayer>,
+    sender: mpsc::Sender<(Event, String)>,
+    receiver: mpsc::Receiver<(Event, String)>,
+}
+
+impl Default for Pool {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Client event
+///
+/// This type wraps a response a disconnected event. The disconnected will be
+/// the last event to be sent.
+#[derive(Debug, Clone)]
+pub enum Event {
+    /// A response
+    Response(Box<Response>),
+    /// A disconnection event
+    Disconnected,
+}
+
+const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 10_000;
+
+impl Pool {
+    /// Creates a new Relayers object
+    pub fn new() -> Self {
+        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
+        Self {
+            clients: HashMap::new(),
+            receiver,
+            sender,
+        }
+    }
+
+    /// Tries to receive a message from any of the connected relayers
+    pub fn try_recv(&mut self) -> Option<(Event, String)> {
+        self.receiver.try_recv().ok()
+    }
+
+    /// Receives a message from any of the connected relayers
+    pub async fn recv(&mut self) -> Option<(Event, String)> {
+        self.receiver.recv().await
+    }
+
+    /// Sends a request to all the connected relayers
+    pub async fn send(&self, request: Request) {
+        for (_, sender) in self.clients.iter() {
+            let _ = sender.send(request.clone()).await;
+        }
+    }
+
+    /// Returns a vector to all outgoing connections
+    pub fn get_connections(&self) -> Vec<&Relayer> {
+        self.clients.values().collect::<Vec<&Relayer>>()
+    }
+
+    /// Returns the number of active connections. If a connection to a relayer
+    /// is not active it will be removed from the list
+    pub fn check_active_connections(&mut self) -> usize {
+        let mut to_remove = vec![];
+        for (url, client) in self.clients.iter() {
+            if !client.is_running() {
+                to_remove.push(url.to_owned());
+            }
+        }
+
+        for url in to_remove.iter() {
+            self.clients.remove(url);
+        }
+
+        self.clients.len()
+    }
+
+    /// Creates a connection to a new relayer. If the connection is successful a
+    /// Callback will be called, with a list of previously sent requests, and a
+    /// Sender to send new requests to this relayer alone.
+    ///
+    /// The same callback will be called for every reconnection to the same
+    /// relayer, also the callback will be called, giving the chance to re-send
+    /// sent requests to the new connections
+    ///
+    /// This function will open a connection at most once, if a connection
+    /// already exists false will be returned
+    pub fn connect_to<F>(
+        mut self,
+        url: &str,
+        max_connections_attempts: u16,
+        on_connection: Option<F>,
+    ) -> Result<Self, Error>
+    where
+        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+            + Send
+            + Sync
+            + 'static,
+    {
+        if self.clients.get(url).is_none() {
+            log::warn!("Connecting to {}", url);
+            self.clients.insert(
+                url.to_owned(),
+                Relayer::new(
+                    self.sender.clone(),
+                    max_connections_attempts,
+                    url,
+                    on_connection,
+                )?,
+            );
+        }
+
+        Ok(self)
+    }
+
+    /// Disconnects from all relayers
+    pub async fn stop(&mut self) {
+        for (_, client) in self.clients.drain() {
+            client.disconnect().await;
+        }
+    }
+}

+ 2 - 1
crates/client/src/relayer.rs

@@ -21,7 +21,8 @@ use url::Url;
 pub struct Relayer {
     /// URL of the relayer
     pub url: String,
-    /// Sender to the relayer. This can be used to send a Requests to this relayer
+    /// Sender to the relayer. This can be used to send a Requests to this
+    /// relayer
     pub send_to_socket: mpsc::Sender<Request>,
     is_connected: Arc<AtomicBool>,
     /// This sender signals to background connection to stop

+ 10 - 15
src/bin/dump.rs

@@ -27,27 +27,21 @@ fn on_connection(
 #[tokio::main]
 async fn main() {
     env_logger::init();
-    let mut clients = Pool::new();
-    clients.send(Subscribe::default().into()).await;
-
-    let _ = clients
+    let mut clients = Pool::new()
         .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://brb.io", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://nos.lol", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://relay.current.fyi", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://eden.nostr.land", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://relay.snort.social", u16::MAX, Some(on_connection))
-        .await;
+        .expect("valid url");
+
+    clients.send(Subscribe::default().into()).await;
     let db = RocksDb::new("./db").expect("db");
 
     loop {
@@ -62,6 +56,7 @@ async fn main() {
                     Response::EndOfStoredEvents(_) => {}
                     msg => {
                         println!("{} {:?}", relayed_by, msg);
+                        clients.stop().await;
                     }
                 },
                 msg => {

+ 7 - 14
src/main.rs

@@ -51,26 +51,19 @@ async fn main() {
     env_logger::init();
     let db = RocksDb::new("./relayer-db").expect("db");
     let (relayer, mut server_receiver) = nostr_rs_relayer::Relayer::new(Some(db));
-    let mut clients = nostr_rs_client::Pool::new();
-    let _ = clients
+    let mut clients = nostr_rs_client::Pool::new()
         .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
-        .await
-        .expect("valid connection");
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://brb.io", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://nos.lol", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://relay.current.fyi", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://eden.nostr.land", u16::MAX, Some(on_connection))
-        .await;
-    let _ = clients
+        .expect("valid url")
         .connect_to("wss://relay.snort.social", u16::MAX, Some(on_connection))
-        .await;
+        .expect("valid url");
 
     let addr = "127.0.0.1:3000";
     let listener = TcpListener::bind(&addr).await.unwrap();