瀏覽代碼

Improvement to the client

Cesar Rodas 1 年之前
父節點
當前提交
8ee2a408ef
共有 4 個文件被更改,包括 70 次插入35 次删除
  1. 19 13
      crates/client/src/relayer.rs
  2. 15 15
      crates/client/src/relayers.rs
  3. 2 0
      crates/types/src/relayer/event.rs
  4. 34 7
      src/bin/dump.rs

+ 19 - 13
crates/client/src/relayer.rs

@@ -29,7 +29,7 @@ impl Relayer {
     pub fn new<F>(
         broadcast_to_listeners: mpsc::Sender<(Response, String)>,
         sent_messages: Arc<RwLock<Vec<Request>>>,
-        connection_retries: u16,
+        max_connections_attempts: u16,
         url: &str,
         on_connection: Option<F>,
     ) -> Result<Self, Error>
@@ -46,7 +46,7 @@ impl Relayer {
             send_to_socket.clone(),
             receiver,
             url,
-            connection_retries,
+            max_connections_attempts,
             on_connection,
         )?;
 
@@ -63,7 +63,7 @@ impl Relayer {
         send_to_socket: mpsc::Sender<Request>,
         mut receiver: mpsc::Receiver<Request>,
         url_str: &str,
-        connection_retries: u16,
+        max_connections_attempts: u16,
         on_connection: Option<F>,
     ) -> Result<oneshot::Sender<()>, Error>
     where
@@ -79,11 +79,11 @@ impl Relayer {
 
         tokio::spawn(async move {
             let mut reconnect = true;
-            let mut retries = 0;
+            let mut connection_attempts = 0;
 
-            while reconnect && retries <= connection_retries {
-                log::warn!("{}: Connect attempt {}", url, retries);
-                retries += 1;
+            while reconnect && connection_attempts <= max_connections_attempts {
+                log::warn!("{}: Connect attempt {}", url, connection_attempts);
+                connection_attempts += 1;
                 let mut socket = if let Ok(x) = connect_async(url_parsed.clone()).await {
                     x.0
                 } else {
@@ -118,10 +118,16 @@ impl Relayer {
                         }
                         msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
                             let msg = if let Ok(Some(Ok(msg))) = msg {
-                                    if let Ok(msg) = msg.into_text() {
-                                        msg
-                                    } else {
-                                        continue;
+                                    match msg {
+                                        Message::Text(text) => text,
+                                        Message::Ping(msg) => {
+                                            if let Err(x) = socket.send(Message::Pong(msg.clone())).await {
+                                                log::error!("{} : Reconnecting due error at sending Pong({:?}): {}", url, msg, x);
+                                                break;
+                                            }
+                                            continue;
+                                        },
+                                        msg => panic!("Unexpected {:?}", msg),
                                     }
                                 } else {
                                     log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
@@ -133,13 +139,13 @@ impl Relayer {
                             }
 
                             log::info!("New message: {}", msg);
-                            retries = 0;
+                            connection_attempts = 0;
 
 
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = broadcast_to_listeners.send((msg, url.to_owned())).await {
+                                if let Err(error) = broadcast_to_listeners.try_send((msg, url.to_owned())) {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }

+ 15 - 15
crates/client/src/relayers.rs

@@ -17,26 +17,26 @@ use tokio::sync::mpsc;
 pub struct Relayers {
     clients: HashMap<String, Relayer>,
     messages: Arc<RwLock<Vec<Request>>>,
-    receiver: mpsc::Receiver<(Response, String)>,
     sender: mpsc::Sender<(Response, String)>,
 }
 
-impl Default for Relayers {
-    fn default() -> Self {
+impl Relayers {
+    /// Creates a new Relayers object
+    pub fn new() -> (Self, mpsc::Receiver<(Response, String)>) {
         let (sender, receiver) = mpsc::channel(100_000);
-        Self {
-            clients: HashMap::new(),
-            messages: Arc::new(RwLock::new(vec![])),
-            sender,
+        (
+            Self {
+                clients: HashMap::new(),
+                messages: Arc::new(RwLock::new(vec![])),
+                sender,
+            },
             receiver,
-        }
+        )
     }
-}
 
-impl Relayers {
-    /// Receives a Response from any of the connected relayers
-    pub async fn recv(&mut self) -> Result<Option<(Response, String)>, Error> {
-        Ok(self.receiver.recv().await)
+    /// Returns a vector to all outgoing connections
+    pub fn get_connections<'a>(&'a self) -> Vec<&'a Relayer> {
+        self.clients.values().collect::<Vec<&Relayer>>()
     }
 
     /// Returns the number of active connections. If a connection to a relayer
@@ -79,7 +79,7 @@ impl Relayers {
     pub async fn connect_to<F>(
         &mut self,
         url: &str,
-        connection_retries: u16,
+        max_connections_attempts: u16,
         on_connection: Option<F>,
     ) -> Result<bool, Error>
     where
@@ -97,7 +97,7 @@ impl Relayers {
                 Relayer::new(
                     self.sender.clone(),
                     self.messages.clone(),
-                    connection_retries,
+                    max_connections_attempts,
                     url,
                     on_connection,
                 )?,

+ 2 - 0
crates/types/src/relayer/event.rs

@@ -44,6 +44,8 @@ impl SerializeDeserialize for Event {
             serde_json::from_value(args.pop_front().ok_or("Invalid array length")?)
                 .map_err(|e| e.to_string())?;
 
+        event.is_valid().map_err(|e| e.to_string())?;
+
         Ok(Self {
             subscription_id,
             event,

+ 34 - 7
src/bin/dump.rs

@@ -18,7 +18,7 @@ fn on_connection(
     sent_messages: Vec<Request>,
     socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
-    println!("REconnecting with {:?}", sent_messages);
+    println!("Reconnecting with {:?}", sent_messages);
     Box::pin(async move {
         for m in sent_messages {
             let _ = socket.send(m).await;
@@ -29,26 +29,53 @@ fn on_connection(
 #[tokio::main]
 async fn main() {
     env_logger::init();
-    let mut clients = Relayers::default();
+    let (mut clients, mut receiver) = Relayers::new();
     clients.send(Subscribe::default().into()).await;
 
     let _ = clients
-        .connect_to("wss://relay.damus.io/", 30, Some(on_connection))
+        .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
         .await;
     let _ = clients
-        .connect_to("wss://nostramsterdam.vpx.moe/", 30, Some(on_connection))
+        .connect_to(
+            "wss://nostramsterdam.vpx.moe/",
+            u16::MAX,
+            Some(on_connection),
+        )
         .await;
     let db = RocksDb::new("./db").expect("db");
 
+    let mut debug = false;
+    let mut done: u128 = 0;
+
+    tokio::spawn(async move {
+        loop {
+            clients.get_connections().iter().for_each(|relayer| {
+                if relayer.is_running() {
+                    println!("Connected to {}", relayer.url);
+                }
+            });
+            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+        }
+    });
+
     loop {
-        if let Some((msg, _relayed_by)) = clients.recv().await.expect("valid connection") {
+        if let Some((msg, relayed_by)) = receiver.recv().await {
             match msg {
                 Response::Event(x) => {
                     let event = x.event;
+                    done += 1;
 
-                    let _ = db.store(&event).expect("valid");
+                    let x = db.store(&event).expect("valid");
+                    if debug && x {
+                        println!("Realtime {} {} {:?} ", relayed_by, done, event);
+                    }
+                }
+                Response::EndOfStoredEvents(_) => {
+                    debug = true;
+                }
+                msg => {
+                    panic!("{:?}", msg);
                 }
-                _ => {}
             }
         }
     }