5 Commits bc758e52f4 ... d31daa10b7

Author SHA1 Message Date
  Cesar Rodas d31daa10b7 Working on personal relayer 3 months ago
  Cesar Rodas 2e74b919e4 Merge branch 'async-incoming-request-processing' of cesar/nostr-prototype into main 1 month ago
  Cesar Rodas 6e33a10364 Enable client pool to process requests and responses in a non-blocking way 1 month ago
  Cesar Rodas bc758e52f4 Improvements 1 month ago
  Cesar Rodas ab1715fe46 Working on personal relayer 3 months ago

+ 3 - 4
crates/personal-relayer/src/lib.rs

@@ -4,9 +4,8 @@ use nostr_rs_client::Pool;
 use nostr_rs_relayer::Relayer;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{
-    types::{
-        tag::TagType, Content, Id,
-    }, Response,
+    types::{tag::TagType, Content, Id},
+    Response,
 };
 use std::{collections::HashSet, ops::Deref};
 use tokio::{net::TcpListener, task::JoinHandle};
@@ -61,7 +60,7 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
 
         Ok(Self {
             relayer,
-            accounts: GraphManager::new(2, accounts),
+            accounts: GraphManager::new(1, accounts),
         })
     }
 

+ 1 - 1
crates/relayer/src/connection/mod.rs

@@ -179,7 +179,7 @@ impl Connection {
 
     #[inline]
     /// Sends a message to this connection's websocket
-    pub fn send(&self, response: Response) -> Result<(), Error> {
+    pub fn respond(&self, response: Response) -> Result<(), Error> {
         self.sender
             .try_send(response)
             .map_err(|e| Error::TrySendError(Box::new(e)))

+ 2 - 2
crates/relayer/src/error.rs

@@ -33,8 +33,8 @@ pub enum Error {
     NoClient,
 
     /// Unknown connections
-    #[error("Unknown connection: {0}")]
-    UnknownConnection(u128),
+    #[error("Unknown connection: {0:?}")]
+    UnknownConnection(ConnectionId),
 
     /// The relayer is already splitten
     #[error("Relayer already splitten")]

+ 135 - 91
crates/relayer/src/relayer.rs

@@ -5,7 +5,6 @@ use crate::{
 use futures_util::StreamExt;
 use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url};
 use nostr_rs_storage_base::Storage;
-use nostr_rs_subscription_manager::SubscriptionManager;
 use nostr_rs_types::{
     relayer::{self, ROk, ROkStatus},
     types::{Addr, Event, SubscriptionId},
@@ -15,6 +14,7 @@ use std::{
     collections::{HashMap, HashSet},
     ops::Deref,
     sync::Arc,
+    time::Instant,
 };
 use tokio::{
     net::{TcpListener, TcpStream},
@@ -42,6 +42,12 @@ impl Default for RelayerSubscriptionId {
     }
 }
 
+type Connections = Arc<RwLock<HashMap<ConnectionId, Connection>>>;
+type SubscriptionManager =
+    Arc<nostr_rs_subscription_manager::SubscriptionManager<RelayerSubscriptionId, ()>>;
+type ClientPoolSubscriptions =
+    Arc<RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>>;
+
 /// Relayer struct
 ///
 pub struct Relayer<T: Storage + Send + Sync + 'static> {
@@ -51,9 +57,9 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// be able to perform any optimization like prefetching content while offline
     storage: Arc<Option<T>>,
     /// Subscription manager
-    subscription_manager: Arc<SubscriptionManager<RelayerSubscriptionId, ()>>,
+    subscription_manager: SubscriptionManager,
     /// List of all active connections
-    connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
+    connections: Connections,
     /// This Sender can be used to send requests from anywhere to the relayer.
     send_to_relayer: Sender<(ConnectionId, Request)>,
     /// This Receiver is the relayer the way the relayer receives messages
@@ -63,9 +69,9 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     ///
     /// A relayer can optionally be connected to a pool of clients to get
     /// foreign events.
-    client_pool: Option<Pool>,
+    client_pool: Option<Arc<Pool>>,
     client_pool_receiver: Option<Receiver<(Response, Url)>>,
-    client_pool_subscriptions: RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>,
+    client_pool_subscriptions: ClientPoolSubscriptions,
 }
 
 impl<T: Storage + Send + Sync + 'static> Relayer<T> {
@@ -82,7 +88,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
         let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool {
             let result = client_pool.split()?;
-            (result.0, Some(result.1))
+            (result.0, Some(Arc::new(result.1)))
         } else {
             let (_, receiver) = mpsc::channel(1);
             (receiver, None)
@@ -138,6 +144,8 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
         let handle = tokio::spawn(async move {
             loop {
+                let start = Instant::now();
+                println!("{}", client_pool_receiver.len());
                 tokio::select! {
                     Ok((stream, _)) = server.accept() => {
                         // accept new connections
@@ -149,7 +157,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                             Response::Event(event) => {
                                 // we received a message from the client pool, store it locally
                                 // and re-broadcast it.
-                                let _ = this.broadcast(event.deref()).await;
+                                tokio::spawn(Self::broadcast(
+                                    this.storage.clone(),
+                                    this.subscription_manager.clone(),
+                                    this.connections.clone(),
+                                    event.event
+                                ));
                             }
                             Response::EndOfStoredEvents(sub) => {
                                 let connections = this.connections.read().await;
@@ -163,23 +176,23 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                                     continue
                                 };
 
-                                let _ = connection.send(Response::EndOfStoredEvents(sub_id.into()));
+                                let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into()));
+                                let duration = start.elapsed();
+                                println!("xTime elapsed: {} ms", duration.as_millis());
                             }
                             _ => {}
                         }
                     }
                     Some((conn_id, request)) = receiver.recv() => {
-                        // receive messages from our clients
-                        let connections = this.connections.read().await;
-                        let connection = if let Some(connection) = connections.get(&conn_id) {
-                            connection
-                        } else {
-                            continue;
-                        };
-
-                        // receive messages from clients
-                        let _ = this.process_request_from_client(connection, request).await;
-                        drop(connections);
+                        tokio::spawn(Self::process_request(
+                            this.storage.clone(),
+                            this.client_pool.clone(),
+                            this.client_pool_subscriptions.clone(),
+                            this.subscription_manager.clone(),
+                            this.connections.clone(),
+                            conn_id,
+                            request.clone()
+                        ));
                     }
                     else => {
                     }
@@ -242,17 +255,50 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         Ok(id)
     }
 
-    /// Process a request from a connected client
+    #[cfg(test)]
     async fn process_request_from_client(
         &self,
-        connection: &Connection,
+        connection: &LocalConnection<T>,
+        request: Request,
+    ) -> Result<(), Error> {
+        Self::process_request(
+            self.storage.clone(),
+            self.client_pool.clone(),
+            self.client_pool_subscriptions.clone(),
+            self.subscription_manager.clone(),
+            self.connections.clone(),
+            connection.conn_id,
+            request,
+        )
+        .await
+    }
+
+    /// Process a request from a connected client
+    async fn process_request(
+        storage: Arc<Option<T>>,
+        client_pool: Option<Arc<Pool>>,
+        client_pool_subscriptions: ClientPoolSubscriptions,
+        subscription_manager: SubscriptionManager,
+        connections: Connections,
+        connection_id: ConnectionId,
         request: Request,
     ) -> Result<(), Error> {
         match request {
             Request::Event(event) => {
+                let read_connections = connections.read().await;
+                let connection = read_connections
+                    .get(&connection_id)
+                    .ok_or(Error::UnknownConnection(connection_id))?;
                 let event_id: Addr = event.id.clone().into();
-                if !self.broadcast(&event).await? {
-                    connection.send(
+                if !Self::broadcast(
+                    storage.clone(),
+                    subscription_manager.clone(),
+                    connections.clone(),
+                    event.deref().clone(),
+                )
+                .await?
+                {
+                    connection.respond(
                         ROk {
                             id: event_id,
                             status: ROkStatus::Duplicate,
@@ -262,17 +308,17 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     return Ok(());
                 }
 
-                if let Some(storage) = self.storage.as_ref() {
+                if let Some(storage) = storage.as_ref() {
                     let _ = storage.store_local_event(&event).await;
                 }
 
-                if let Some(client_pool) = self.client_pool.as_ref() {
+                if let Some(client_pool) = client_pool.as_ref() {
                     // pass the event to the pool of clients, so this relayer can relay
                     // their local events to the clients in the network of relayers
                     let _ = client_pool.post(event).await;
                 }
 
-                connection.send(
+                connection.respond(
                     ROk {
                         id: event_id,
                         status: ROkStatus::Ok,
@@ -281,7 +327,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 )?;
             }
             Request::Request(request) => {
-                let foreign_subscription = if let Some(client_pool) = self.client_pool.as_ref() {
+                let foreign_subscription = if let Some(client_pool) = client_pool.as_ref() {
                     // If this relay is connected to other relays through the
                     // client pool, create the same subscription in them as
                     // well, with the main goal of fetching any foreign event
@@ -295,9 +341,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                         .subscribe(request.filters.clone().into())
                         .await?;
 
-                    self.client_pool_subscriptions.write().await.insert(
+                    client_pool_subscriptions.write().await.insert(
                         foreign_sub_id.clone(),
-                        (request.subscription_id.clone(), connection.get_conn_id()),
+                        (request.subscription_id.clone(), connection_id),
                     );
 
                     Some(foreign_sub_id)
@@ -305,7 +351,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     None
                 };
 
-                if let Some(storage) = self.storage.as_ref() {
+                let read_connections = connections.read().await;
+                let connection = read_connections
+                    .get(&connection_id)
+                    .ok_or(Error::UnknownConnection(connection_id))?;
+
+                if let Some(storage) = storage.as_ref() {
                     let mut sent = HashSet::new();
                     // Sent all events that match the filter that are stored in our database
                     for filter in request.filters.clone().into_iter() {
@@ -316,7 +367,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                                 continue;
                             }
                             sent.insert(event.id.clone());
-                            let _ = connection.send(
+                            let _ = connection.respond(
                                 relayer::Event {
                                     subscription_id: request.subscription_id.clone(),
                                     event,
@@ -330,8 +381,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 if foreign_subscription.is_none() {
                     // If there is a foreign subscription, we shouldn't send a
                     // EOS until we have got EOS from all foreign relays
-                    let _ = connection
-                        .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+                    let _ = connection.respond(
+                        relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
+                    );
                 }
 
                 connection
@@ -339,7 +391,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                         request.subscription_id.clone(),
                         (
                             foreign_subscription,
-                            self.subscription_manager
+                            subscription_manager
                                 .subscribe(
                                     (request.subscription_id, connection.get_conn_id()).into(),
                                     request.filters,
@@ -351,7 +403,13 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     .await;
             }
             Request::Close(close) => {
-                connection.unsubscribe(&close).await;
+                connections
+                    .read()
+                    .await
+                    .get(&connection_id)
+                    .ok_or(Error::UnknownConnection(connection_id))?
+                    .unsubscribe(&close)
+                    .await;
             }
         };
 
@@ -360,50 +418,23 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
     #[inline]
     /// A non-blocking version of broadcast
-    #[allow(dead_code)]
-    fn broadcast_and_forget(&self, event: Event) {
-        let storage = self.storage.clone();
-        let connections = self.connections.clone();
-        let subscription_manager = self.subscription_manager.clone();
-
-        tokio::spawn(async move {
-            if let Some(storage) = storage.as_ref() {
-                if !storage.store(&event).await.unwrap_or_default() {
-                    return;
-                }
-            }
-
-            let connections = connections.read().await;
-            for RelayerSubscriptionId((sub_id, conn_id)) in
-                subscription_manager.get_subscribers(&event).await
-            {
-                if let Some(connection) = connections.get(&conn_id) {
-                    let _ = connection.send(
-                        relayer::Event {
-                            subscription_id: sub_id,
-                            event: event.clone(),
-                        }
-                        .into(),
-                    );
-                }
-            }
-        });
-    }
-
-    #[inline]
-    /// Broadcast a given event to all local subscribers
-    pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
-        if let Some(storage) = self.storage.as_ref() {
-            if !storage.store(event).await? {
+    pub async fn broadcast(
+        storage: Arc<Option<T>>,
+        subscription_manager: SubscriptionManager,
+        connections: Connections,
+        event: Event,
+    ) -> Result<bool, Error> {
+        if let Some(storage) = storage.as_ref() {
+            if !storage.store(&event).await? {
                 return Ok(false);
             }
         }
 
+        let connections = connections.read().await;
         let mut sent = HashSet::new();
 
-        let connections = self.connections.read().await;
         for RelayerSubscriptionId((sub_id, conn_id)) in
-            self.subscription_manager.get_subscribers(event).await
+            subscription_manager.get_subscribers(&event).await
         {
             if sent.contains(&conn_id) {
                 continue;
@@ -412,7 +443,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             sent.insert(conn_id);
 
             if let Some(connection) = connections.get(&conn_id) {
-                let _ = connection.send(
+                let _ = connection.respond(
                     relayer::Event {
                         subscription_id: sub_id,
                         event: event.clone(),
@@ -421,14 +452,13 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 );
             }
         }
+
         Ok(true)
     }
 }
 
 #[cfg(test)]
 mod test {
-    use std::time::Duration;
-
     use super::*;
     use futures::future::join_all;
     use nostr_rs_client::Url;
@@ -439,6 +469,7 @@ mod test {
         Request,
     };
     use serde_json::json;
+    use std::time::Duration;
     use tokio::time::sleep;
 
     async fn dummy_server(port: u16, client_pool: Option<Pool>) -> (Url, JoinHandle<()>) {
@@ -525,8 +556,10 @@ mod test {
           },
         ]))
         .expect("valid object");
-        let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+        let relayer =
+            Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
+
+        let mut connection = relayer.create_new_local_connection().await;
 
         let note = get_note_with_custom_tags(json!([["f", "foo"]]));
 
@@ -543,7 +576,8 @@ mod test {
         // ev1
         assert_eq!(
             ROkStatus::Ok,
-            recv.try_recv()
+            connection
+                .try_recv()
                 .expect("valid")
                 .as_ok()
                 .cloned()
@@ -554,17 +588,22 @@ mod test {
         // ev1
         assert_eq!(
             note,
-            recv.try_recv().expect("valid").as_event().unwrap().event
+            connection
+                .try_recv()
+                .expect("valid")
+                .as_event()
+                .unwrap()
+                .event
         );
 
         // eod
-        assert!(recv
+        assert!(connection
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
-        assert!(recv.try_recv().is_err());
+        assert!(connection.try_recv().is_none());
     }
 
     #[tokio::test]
@@ -621,15 +660,17 @@ mod test {
           }
         ]))
         .expect("valid object");
-        let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+        let relayer =
+            Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
+        let mut connection = relayer.create_new_local_connection().await;
         let _ = relayer
             .process_request_from_client(&connection, request)
             .await;
         // ev1
         assert_eq!(
             "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
-            recv.try_recv()
+            connection
+                .try_recv()
                 .expect("valid")
                 .as_event()
                 .expect("event")
@@ -639,7 +680,8 @@ mod test {
         // ev3
         assert_eq!(
             "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
-            recv.try_recv()
+            connection
+                .try_recv()
                 .expect("valid")
                 .as_event()
                 .expect("event")
@@ -649,7 +691,8 @@ mod test {
         // ev2
         assert_eq!(
             "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
-            recv.try_recv()
+            connection
+                .try_recv()
                 .expect("valid")
                 .as_event()
                 .expect("event")
@@ -658,13 +701,13 @@ mod test {
         );
 
         // eod
-        assert!(recv
+        assert!(connection
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
-        assert!(recv.try_recv().is_err());
+        assert!(connection.try_recv().is_none());
     }
 
     #[tokio::test]
@@ -957,8 +1000,9 @@ mod test {
 
     #[tokio::test]
     async fn posting_event_replies_ok() {
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+        let relayer =
+            Arc::new(Relayer::new(Some(get_db(false).await), None).expect("valid relayer"));
+        let mut connection = relayer.create_new_local_connection().await;
 
         let note = get_note();
         let note_id = note.as_event().map(|x| x.id.clone()).unwrap();
@@ -978,7 +1022,7 @@ mod test {
                 }
                 .into()
             ),
-            recv.try_recv().ok()
+            connection.try_recv()
         );
     }