Browse Source

Make client receive all previous messages

Cesar Rodas 1 year ago
parent
commit
22e0f3f7c4
2 changed files with 34 additions and 11 deletions
  1. 20 9
      crates/client/src/client.rs
  2. 14 2
      crates/client/src/clients.rs

+ 20 - 9
crates/client/src/client.rs

@@ -2,7 +2,8 @@ use crate::Error;
 use futures::Future;
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{Request, Response};
-use std::pin::Pin;
+use parking_lot::RwLock;
+use std::{pin::Pin, sync::Arc};
 use tokio::{
     sync::{broadcast, mpsc, oneshot},
     time::{sleep, timeout, Duration},
@@ -13,9 +14,7 @@ use url::Url;
 #[derive(Debug)]
 pub struct Client {
     pub url: String,
-
     pub send_to_socket: mpsc::Sender<Request>,
-
     recv_from_socket: broadcast::Receiver<(Response, String)>,
     stop_service: oneshot::Sender<()>,
 }
@@ -24,16 +23,25 @@ const NO_ACTIVITY_TIMEOUT_SECS: u64 = 30;
 const MAX_RECONNECT_ATTEMPTS: u64 = 15;
 
 impl Client {
-    pub fn new<F>(url: &str, on_connection: Option<F>) -> Result<Self, Error>
+    pub fn new<F>(
+        sent_messages: Arc<RwLock<Vec<Request>>>,
+        url: &str,
+        on_connection: Option<F>,
+    ) -> Result<Self, Error>
     where
-        F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
     {
         let (send_to_socket, receiver) = mpsc::channel(100_000);
-        let (recv_from_socket, stop_service) =
-            Self::spawn_background_client(send_to_socket.clone(), receiver, url, on_connection)?;
+        let (recv_from_socket, stop_service) = Self::spawn_background_client(
+            sent_messages,
+            send_to_socket.clone(),
+            receiver,
+            url,
+            on_connection,
+        )?;
 
         Ok(Self {
             url: url.to_owned(),
@@ -44,13 +52,14 @@ impl Client {
     }
 
     fn spawn_background_client<F>(
+        sent_messages: Arc<RwLock<Vec<Request>>>,
         send_to_socket: mpsc::Sender<Request>,
         mut receiver: mpsc::Receiver<Request>,
         url_str: &str,
         on_connection: Option<F>,
     ) -> Result<(broadcast::Receiver<(Response, String)>, oneshot::Sender<()>), Error>
     where
-        F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
@@ -64,6 +73,7 @@ impl Client {
         tokio::spawn(async move {
             let mut reconnect = true;
             let mut retries = 0;
+
             while reconnect && retries <= MAX_RECONNECT_ATTEMPTS {
                 log::warn!("{}: Connect attempt {}", url, retries);
                 retries += 1;
@@ -78,7 +88,8 @@ impl Client {
                 log::info!("Connected to {}", url);
 
                 if let Some(on_connection) = &on_connection {
-                    on_connection(send_to_socket.clone()).await;
+                    let sent_messages = sent_messages.read().to_owned();
+                    on_connection(sent_messages, send_to_socket.clone()).await;
                 }
 
                 loop {

+ 14 - 2
crates/client/src/clients.rs

@@ -9,12 +9,14 @@ use tokio::sync::mpsc;
 #[derive(Debug, Clone)]
 pub struct Clients {
     clients: Arc<RwLock<HashMap<String, Client>>>,
+    messages: Arc<RwLock<Vec<Request>>>,
 }
 
 impl Default for Clients {
     fn default() -> Self {
         Self {
             clients: Arc::new(RwLock::new(HashMap::new())),
+            messages: Arc::new(RwLock::new(vec![])),
         }
     }
 }
@@ -95,14 +97,21 @@ impl Clients {
             .map(|(_, c)| c.send_to_socket.clone())
             .collect::<Vec<mpsc::Sender<_>>>();
 
+        // Add a copy of the request for incoming connections.
+        self.messages.write().push(request.clone());
+
         for sender in senders.iter() {
             let _ = sender.send(request.clone()).await;
         }
     }
 
+    pub fn get_sent_messages(&self) -> Vec<Request> {
+        self.messages.read().to_owned()
+    }
+
     pub async fn connect_to<F>(&self, url: &str, on_connection: Option<F>) -> Result<bool, Error>
     where
-        F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
@@ -112,7 +121,10 @@ impl Clients {
             false
         } else {
             log::warn!("Connecting to {}", url);
-            clients.insert(url.to_owned(), Client::new(url, on_connection)?);
+            clients.insert(
+                url.to_owned(),
+                Client::new(self.messages.clone(), url, on_connection)?,
+            );
             true
         })
     }