Cesar Rodas пре 1 година
родитељ
комит
d9497c33ad
6 измењених фајлова са 72 додато и 47 уклоњено
  1. 5 1
      crates/client/src/lib.rs
  2. 9 13
      crates/client/src/relayer.rs
  3. 16 11
      crates/client/src/relayers.rs
  4. 4 3
      crates/relayer/src/relayer.rs
  5. 6 10
      src/bin/dump.rs
  6. 32 9
      src/main.rs

+ 5 - 1
crates/client/src/lib.rs

@@ -12,4 +12,8 @@ mod error;
 mod relayer;
 mod relayers;
 
-pub use self::{error::Error, relayer::Relayer, relayers::Relayers};
+pub use self::{
+    error::Error,
+    relayer::Relayer,
+    relayers::{Event, Relayers},
+};

+ 9 - 13
crates/client/src/relayer.rs

@@ -1,8 +1,7 @@
-use crate::Error;
+use crate::{relayers::Event, Error};
 use futures::Future;
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{Request, Response};
-use parking_lot::RwLock;
 use std::{
     pin::Pin,
     sync::{
@@ -34,14 +33,13 @@ const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
 impl Relayer {
     /// Creates a new relayer
     pub fn new<F>(
-        broadcast_to_listeners: mpsc::Sender<(Response, String)>,
-        sent_messages: Arc<RwLock<Vec<Request>>>,
+        broadcast_to_listeners: mpsc::Sender<(Event, String)>,
         max_connections_attempts: u16,
         url: &str,
         on_connection: Option<F>,
     ) -> Result<Self, Error>
     where
-        F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
@@ -50,7 +48,6 @@ impl Relayer {
         let is_connected = Arc::new(AtomicBool::new(false));
         let stop_service = Self::spawn_background_client(
             broadcast_to_listeners,
-            sent_messages,
             send_to_socket.clone(),
             receiver,
             url,
@@ -68,8 +65,7 @@ impl Relayer {
     }
 
     fn spawn_background_client<F>(
-        broadcast_to_listeners: mpsc::Sender<(Response, String)>,
-        sent_messages: Arc<RwLock<Vec<Request>>>,
+        broadcast_to_listeners: mpsc::Sender<(Event, String)>,
         send_to_socket: mpsc::Sender<Request>,
         mut receiver: mpsc::Receiver<Request>,
         url_str: &str,
@@ -78,7 +74,7 @@ impl Relayer {
         on_connection: Option<F>,
     ) -> Result<oneshot::Sender<()>, Error>
     where
-        F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
@@ -107,8 +103,7 @@ impl Relayer {
                 log::info!("Connected to {}", url);
 
                 if let Some(on_connection) = &on_connection {
-                    let sent_messages = sent_messages.read().to_owned();
-                    on_connection(sent_messages, send_to_socket.clone()).await;
+                    on_connection(&url, send_to_socket.clone()).await;
                 }
 
                 loop {
@@ -158,7 +153,7 @@ impl Relayer {
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = broadcast_to_listeners.try_send((msg, url.to_owned())) {
+                                if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg), url.to_owned())) {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }
@@ -172,11 +167,12 @@ impl Relayer {
                 }
 
                 is_connected.store(false, Relaxed);
-
                 // Throttle down to not spam the server with reconnections
                 sleep(Duration::from_millis(500)).await;
             }
 
+            let _ = broadcast_to_listeners.try_send((Event::Disconnected, "".to_owned()));
+
             log::warn!("{}: Disconnected", url);
         });
 

+ 16 - 11
crates/client/src/relayers.rs

@@ -4,8 +4,7 @@
 use crate::{Error, Relayer};
 use futures::Future;
 use nostr_rs_types::{Request, Response};
-use parking_lot::RwLock;
-use std::{collections::HashMap, pin::Pin, sync::Arc};
+use std::{collections::HashMap, pin::Pin};
 use tokio::sync::mpsc;
 
 /// Clients
@@ -16,18 +15,28 @@ use tokio::sync::mpsc;
 #[derive(Debug)]
 pub struct Relayers {
     clients: HashMap<String, Relayer>,
-    messages: Arc<RwLock<Vec<Request>>>,
-    sender: mpsc::Sender<(Response, String)>,
+    sender: mpsc::Sender<(Event, String)>,
+}
+
+/// Client event
+///
+/// This type wraps a response a disconnected event. The disconnected will be
+/// the last event to be sent.
+#[derive(Debug, Clone)]
+pub enum Event {
+    /// A response
+    Response(Response),
+    /// A disconnection event
+    Disconnected,
 }
 
 impl Relayers {
     /// Creates a new Relayers object
-    pub fn new() -> (Self, mpsc::Receiver<(Response, String)>) {
+    pub fn new() -> (Self, mpsc::Receiver<(Event, String)>) {
         let (sender, receiver) = mpsc::channel(100_000);
         (
             Self {
                 clients: HashMap::new(),
-                messages: Arc::new(RwLock::new(vec![])),
                 sender,
             },
             receiver,
@@ -61,9 +70,6 @@ impl Relayers {
         for (_, sender) in self.clients.iter() {
             let _ = sender.send(request.clone()).await;
         }
-
-        // Add a copy of the request for incoming connections.
-        self.messages.write().push(request);
     }
 
     /// Creates a connection to a new relayer. If the connection is successful a
@@ -83,7 +89,7 @@ impl Relayers {
         on_connection: Option<F>,
     ) -> Result<bool, Error>
     where
-        F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
@@ -96,7 +102,6 @@ impl Relayers {
                 url.to_owned(),
                 Relayer::new(
                     self.sender.clone(),
-                    self.messages.clone(),
                     max_connections_attempts,
                     url,
                     on_connection,

+ 4 - 3
crates/relayer/src/relayer.rs

@@ -62,12 +62,13 @@ impl Relayer {
         &self,
         disconnection_notify: Option<mpsc::Sender<u128>>,
         stream: TcpStream,
-    ) -> Result<(), Error> {
+    ) -> Result<u128, Error> {
         let client = Connection::new(self.sender.clone(), disconnection_notify, stream).await?;
+        let id = client.conn_id;
         let mut clients = self.clients.write();
-        clients.insert(client.conn_id, client);
+        clients.insert(id, client);
 
-        Ok(())
+        Ok(id)
     }
 
     fn recv_request_from_client(

+ 6 - 10
src/bin/dump.rs

@@ -1,5 +1,5 @@
 use futures::Future;
-use nostr_rs_client::{Error as ClientError, Relayers};
+use nostr_rs_client::{Error as ClientError, Event, Relayers};
 use nostr_rs_storage::RocksDb;
 use nostr_rs_types::{client::Subscribe, Request, Response};
 use std::pin::Pin;
@@ -15,15 +15,11 @@ pub enum Error {
 }
 
 fn on_connection(
-    sent_messages: Vec<Request>,
+    host: &str,
     socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
-    println!("Reconnecting with {:?}", sent_messages);
-    Box::pin(async move {
-        for m in sent_messages {
-            let _ = socket.send(m).await;
-        }
-    })
+    println!("Reconnecting to {}", host);
+    Box::pin(async move {})
 }
 
 #[tokio::main]
@@ -66,11 +62,11 @@ async fn main() {
     loop {
         if let Some((msg, _relayed_by)) = receiver.recv().await {
             match msg {
-                Response::Event(x) => {
+                Event::Response(Response::Event(x)) => {
                     let event = x.event;
                     db.store(&event).expect("valid");
                 }
-                Response::EndOfStoredEvents(_) => {}
+                Event::Response(Response::EndOfStoredEvents(_)) => {}
                 msg => {
                     panic!("{:?}", msg);
                 }

+ 32 - 9
src/main.rs

@@ -1,7 +1,8 @@
 use futures::Future;
+use nostr_rs_client::Event;
 use nostr_rs_storage::RocksDb;
 use nostr_rs_types::{Request, Response};
-use std::{pin::Pin, sync::Arc};
+use std::{collections::HashMap, pin::Pin, sync::Arc};
 use tokio::{
     net::TcpListener,
     sync::mpsc,
@@ -9,12 +10,36 @@ use tokio::{
 };
 
 fn on_connection(
-    _sent_messages: Vec<Request>,
+    host: &str,
     _socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
     Box::pin(async move {})
 }
 
+pub struct Connection {
+    connection_to_relayers: Arc<nostr_rs_client::Relayers>,
+    receive_from_relayer: mpsc::Receiver<Response>,
+}
+
+fn create_listener(relayer: Arc<nostr_rs_relayer::Relayer>) -> nostr_rs_client::Relayers {
+    let (clients, mut client_receiver) = nostr_rs_client::Relayers::new();
+
+    tokio::spawn(async move {
+        loop {
+            let (event, _url) = client_receiver.recv().await.unwrap();
+            match &event {
+                Event::Response(Response::Event(event)) => {
+                    let _ = relayer.store_and_broadcast(&event.event);
+                }
+                Event::Disconnected => break,
+                _ => {}
+            }
+        }
+    });
+
+    clients
+}
+
 #[tokio::main]
 async fn main() {
     env_logger::init();
@@ -66,13 +91,9 @@ async fn main() {
 
     tokio::spawn(async move {
         loop {
-            let (event, _) = client_receiver.recv().await.unwrap();
-            println!(
-                "received event: {}",
-                serde_json::to_string(&event).expect("valid json")
-            );
+            let (event, url) = client_receiver.recv().await.unwrap();
             match &event {
-                Response::Event(event) => {
+                Event::Response(Response::Event(event)) => {
                     let _ = relayer_for_client.store_and_broadcast(&event.event);
                 }
                 _msg => {
@@ -96,12 +117,14 @@ async fn main() {
         }
     });
 
+    let mut clients = HashMap::<u128, nostr_rs_client::Relayers>::new();
+
     loop {
         let (stream, _) = listener.accept().await.unwrap();
         let addr = stream.peer_addr().unwrap();
 
         println!("Client {} connected", addr);
 
-        let _ = relayer.add_connection(stream).await;
+        let _ = relayer.add_connection(None, stream).await;
     }
 }