Browse Source

Client pool and relayer improvements

The main improvements are around client subscription manager, EndOfStore events
notification (once regardless of how many clients are subscribed)

If the relay is connected to a pool of clients, it will subscribe to the other
relays with the same filters as requested by its clients, with the primary goal
of being a proxy for connecting to other relays.

These improvements paved the way for a Web-of-Trust relay so people could
connect to it and have this relay connect to the outside world.

This PR also adds unit tests to cover all cases.
Cesar Rodas 1 month ago
parent
commit
94836bda5c

+ 65 - 14
crates/client/src/client.rs

@@ -29,6 +29,8 @@ use url::Url;
 
 
 type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
 type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
 
 
+const MAX_ACTIVE_SUBSCRIPTIONS: usize = 10;
+
 #[derive(Debug)]
 #[derive(Debug)]
 /// Active subscription
 /// Active subscription
 ///
 ///
@@ -45,10 +47,10 @@ impl Drop for ActiveSubscription {
         let id = self.id.clone();
         let id = self.id.clone();
         let send_to_socket = self.send_to_socket.clone();
         let send_to_socket = self.send_to_socket.clone();
         tokio::spawn(async move {
         tokio::spawn(async move {
-            subscriptions.write().await.remove(&id);
             let _ = send_to_socket
             let _ = send_to_socket
-                .send(nostr_rs_types::client::Close(id).into())
+                .send(nostr_rs_types::client::Close(id.clone()).into())
                 .await;
                 .await;
+            subscriptions.write().await.remove(&id);
         });
         });
     }
     }
 }
 }
@@ -126,7 +128,7 @@ impl Client {
         mut send_to_socket: mpsc::Receiver<Request>,
         mut send_to_socket: mpsc::Receiver<Request>,
         url: Url,
         url: Url,
         is_connected: Arc<AtomicBool>,
         is_connected: Arc<AtomicBool>,
-        send_on_connection: Subscriptions,
+        to_resubscribe: Subscriptions,
         filter: F,
         filter: F,
     ) -> JoinHandle<()>
     ) -> JoinHandle<()>
     where
     where
@@ -159,16 +161,29 @@ impl Client {
                 log::info!("Connected to {}", url);
                 log::info!("Connected to {}", url);
                 connection_attempts = 0;
                 connection_attempts = 0;
 
 
-                let mut subscriptions = send_on_connection
+                // Convert all sent subscriptions to a local vector
+                let mut subscriptions = to_resubscribe
                     .read()
                     .read()
                     .await
                     .await
-                    .iter()
-                    .map(|(sub_id, msg)| {
+                    .values()
+                    .map(|msg| Request::Request(msg.clone()))
+                    .collect::<Vec<_>>();
+
+                // Only keep the ones to be subscribed, moved the rest of the subscriptions to the queue
+                let mut to_subscribe_queue = if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
+                    subscriptions.split_off(MAX_ACTIVE_SUBSCRIPTIONS)
+                } else {
+                    vec![]
+                };
+
+                let mut subscriptions = subscriptions
+                    .into_iter()
+                    .map(|msg| {
                         (
                         (
-                            sub_id.to_owned(),
-                            serde_json::to_string(&Request::Request(msg.clone()))
-                                .ok()
-                                .map(Message::Text),
+                            msg.as_request()
+                                .map(|x| x.subscription_id.clone())
+                                .unwrap_or_default(),
+                            serde_json::to_string(&msg).ok().map(Message::Text),
                         )
                         )
                     })
                     })
                     .collect::<HashMap<_, _>>();
                     .collect::<HashMap<_, _>>();
@@ -177,6 +192,7 @@ impl Client {
                     if let Some(msg) = msg.take() {
                     if let Some(msg) = msg.take() {
                         if let Err(x) = socket.send(msg).await {
                         if let Err(x) = socket.send(msg).await {
                             log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
                             log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                            break;
                         }
                         }
                     }
                     }
                 }
                 }
@@ -191,9 +207,42 @@ impl Client {
                                     log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
                                     log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
                                     continue;
                                     continue;
                                 }
                                 }
+                                if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
+                                    log::warn!("{}: Queueing subscription to {} for later", url, sub.subscription_id);
+                                    to_subscribe_queue.push(msg.clone());
+                                    continue;
+                                }
+                                subscriptions.insert(sub.subscription_id.clone(), None);
+                            }
+
+
+                            let json = if let Ok(json) =  serde_json::to_string(&msg) {
+                                json
+                            } else {
+                                continue;
+                            };
+
+                            if let Err(x) = socket.send(Message::Text(json)).await {
+                                log::error!("{} : Reconnecting due {}", url, x);
+                                break;
                             }
                             }
-                            if let Ok(json) = serde_json::to_string(&msg) {
-                                log::info!("{}: Sending {}", url, json);
+
+                            if let Request::Close(close) = &msg {
+                                subscriptions.remove(&close.0);
+                                let json = if let Some(json) = to_subscribe_queue
+                                    .pop()
+                                    .and_then(|msg| {
+                                        subscriptions.insert(msg.as_request().map(|sub| sub.subscription_id.clone()).unwrap_or_default(), None);
+                                        serde_json::to_string(&msg).ok()
+                                    })
+                                    {
+                                        json
+                                    } else {
+                                        continue;
+                                    };
+
+
+                                log::info!("Sending: {} (queued subscription)", json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
                                 if let Err(x) = socket.send(Message::Text(json)).await {
                                     log::error!("{} : Reconnecting due {}", url, x);
                                     log::error!("{} : Reconnecting due {}", url, x);
                                     break;
                                     break;
@@ -225,10 +274,12 @@ impl Client {
                                 continue;
                                 continue;
                             }
                             }
 
 
-                            log::info!("New message: {}", msg);
-
                             let event: Result<Response, _> = serde_json::from_str(&msg);
                             let event: Result<Response, _> = serde_json::from_str(&msg);
 
 
+                            if let Ok(Response::Notice(err)) = &event {
+                                log::error!("{}: Active connections {}: {:?}", url, subscriptions.len(), err);
+                            }
+
                             if let Ok(msg) = event {
                             if let Ok(msg) = event {
                                 if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
                                 if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     log::error!("{}: Reconnecting client because of {}", url, error);

+ 4 - 0
crates/client/src/error.rs

@@ -10,6 +10,10 @@ pub enum Error {
     #[error("Url: {0}")]
     #[error("Url: {0}")]
     Url(#[from] url::ParseError),
     Url(#[from] url::ParseError),
 
 
+    /// Subscriptions must be unique
+    #[error("Duplicate subscription")]
+    DuplicateSubscriptionId,
+
     /// WebSocket client error
     /// WebSocket client error
     #[error("Tungstenite: {0}")]
     #[error("Tungstenite: {0}")]
     Tungstenite(#[from] TungsteniteError),
     Tungstenite(#[from] TungsteniteError),

+ 445 - 13
crates/client/src/pool/mod.rs

@@ -13,9 +13,13 @@ use std::{
         atomic::{AtomicUsize, Ordering},
         atomic::{AtomicUsize, Ordering},
         Arc,
         Arc,
     },
     },
+    time::Duration,
 };
 };
 use subscription::Scheduler;
 use subscription::Scheduler;
-use tokio::sync::{mpsc, RwLock};
+use tokio::{
+    sync::{mpsc, RwLock},
+    time::sleep,
+};
 use url::Url;
 use url::Url;
 
 
 pub mod subscription;
 pub mod subscription;
@@ -81,6 +85,14 @@ impl Pool {
         let pool = Self::default();
         let pool = Self::default();
         let connect = clients.into_iter().map(|url| pool.connect_to(url));
         let connect = clients.into_iter().map(|url| pool.connect_to(url));
 
 
+        let x = pool.subscription_manager.clone();
+        tokio::spawn(async move {
+            loop {
+                log::info!("Active subscribers: {}", x.debug().await);
+                sleep(Duration::from_secs(5)).await;
+            }
+        });
+
         futures::executor::block_on(async {
         futures::executor::block_on(async {
             futures::future::join_all(connect)
             futures::future::join_all(connect)
                 .await
                 .await
@@ -114,8 +126,8 @@ impl Pool {
     pub async fn subscribe(
     pub async fn subscribe(
         &self,
         &self,
         subscription: subscribe::Subscribe,
         subscription: subscribe::Subscribe,
-    ) -> subscription::ActiveSubscription {
-        self.subscription_manager.subcribe(subscription, None).await
+    ) -> Result<subscription::ActiveSubscription, Error> {
+        self.subscription_manager.subscribe(subscription).await
     }
     }
 
 
     /// Sends a request to all the connected relayers
     /// Sends a request to all the connected relayers
@@ -187,20 +199,25 @@ mod test {
     use super::*;
     use super::*;
     use nostr_rs_memory::Memory;
     use nostr_rs_memory::Memory;
     use nostr_rs_relayer::Relayer;
     use nostr_rs_relayer::Relayer;
-    use nostr_rs_types::{account::Account, types::Content};
+    use nostr_rs_types::{
+        account::Account,
+        types::{Content, Filter},
+    };
     use std::time::Duration;
     use std::time::Duration;
+    use subscription::MAX_ACTIVE_SUBSCRIPTIONS;
     use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
     use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
 
 
-    async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
+    async fn dummy_server(port: u16) -> (Url, Arc<Relayer<Memory>>, JoinHandle<()>) {
         let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
         let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
             .await
             .await
             .unwrap();
             .unwrap();
         let local_addr = listener.local_addr().expect("addr");
         let local_addr = listener.local_addr().expect("addr");
 
 
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
-        let (_, stopper) = relayer.main(listener).expect("valid main loop");
+        let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
         (
         (
             Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
             Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
+            relayer,
             stopper,
             stopper,
         )
         )
     }
     }
@@ -208,7 +225,10 @@ mod test {
     #[tokio::test]
     #[tokio::test]
     async fn droppable_subscription() {
     async fn droppable_subscription() {
         let client_pool = Pool::default();
         let client_pool = Pool::default();
-        let subscription = client_pool.subscribe(Default::default()).await;
+        let subscription = client_pool
+            .subscribe(Default::default())
+            .await
+            .expect("valid");
 
 
         assert_eq!(client_pool.active_subscriptions().await, 1);
         assert_eq!(client_pool.active_subscriptions().await, 1);
         drop(subscription);
         drop(subscription);
@@ -218,7 +238,7 @@ mod test {
 
 
     #[tokio::test]
     #[tokio::test]
     async fn connect_to_dummy_server() {
     async fn connect_to_dummy_server() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
         let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
 
         assert_eq!(0, client_pool.check_active_connections().await);
         assert_eq!(0, client_pool.check_active_connections().await);
@@ -235,7 +255,7 @@ mod test {
 
 
     #[tokio::test]
     #[tokio::test]
     async fn two_clients_communication() {
     async fn two_clients_communication() {
-        let (addr, _) = dummy_server(0).await;
+        let (addr, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
@@ -273,7 +293,7 @@ mod test {
 
 
     #[tokio::test]
     #[tokio::test]
     async fn reconnect_and_resubscribe() {
     async fn reconnect_and_resubscribe() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
@@ -318,7 +338,7 @@ mod test {
         assert_eq!(0, client_pool1.check_active_connections().await);
         assert_eq!(0, client_pool1.check_active_connections().await);
         assert_eq!(0, client_pool2.check_active_connections().await);
         assert_eq!(0, client_pool2.check_active_connections().await);
 
 
-        let (_, stopper) = dummy_server(addr.port().expect("port")).await;
+        let (_, _, stopper) = dummy_server(addr.port().expect("port")).await;
 
 
         sleep(Duration::from_millis(2_000)).await;
         sleep(Duration::from_millis(2_000)).await;
 
 
@@ -347,8 +367,8 @@ mod test {
 
 
     #[tokio::test]
     #[tokio::test]
     async fn connect_multiple_servers() {
     async fn connect_multiple_servers() {
-        let (addr1, _) = dummy_server(0).await;
-        let (addr2, _) = dummy_server(0).await;
+        let (addr1, _, _) = dummy_server(0).await;
+        let (addr2, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
             Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
@@ -382,4 +402,416 @@ mod test {
         );
         );
         assert!(client_pool1.try_recv().is_none());
         assert!(client_pool1.try_recv().is_none());
     }
     }
+
+    /// Client pool user creates 101 subscription (that is not allowed by many
+    /// relays), so the pool will do a round-robin subscriptions keeping 5
+    /// actives at a time with an internal scheduler.
+    ///
+    /// The scheduler will pause active subscriptions when the EOS is received.
+    /// On the next round `since` will be used to receive only newer events
+    mod scheduler {
+        use super::*;
+
+        #[tokio::test]
+        async fn stored_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, last");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            sleep(Duration::from_secs(1)).await;
+
+            let mut has_receive_event = false;
+            for _ in 0..102 {
+                let event = client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid event");
+
+                if has_receive_event {
+                    assert!(event.as_end_of_stored_events().is_some());
+                } else {
+                    if let Some(ev) = event.as_event() {
+                        assert_eq!(ev.id, signed_content.id);
+                        has_receive_event = true;
+                    } else {
+                        assert!(event.as_end_of_stored_events().is_some());
+                    }
+                }
+            }
+
+            drop(subs);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn stored_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, first");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..101 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert!(client_pool1
+                .try_recv()
+                .map(|(r, _)| r)
+                .expect("valid message")
+                .as_end_of_stored_events()
+                .is_some());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+    }
 }
 }

+ 173 - 60
crates/client/src/pool/subscription.rs

@@ -10,6 +10,7 @@ use nostr_rs_types::{
 };
 };
 use std::{
 use std::{
     collections::{BTreeMap, VecDeque},
     collections::{BTreeMap, VecDeque},
+    ops::Deref,
     sync::{
     sync::{
         atomic::{AtomicUsize, Ordering},
         atomic::{AtomicUsize, Ordering},
         Arc,
         Arc,
@@ -27,23 +28,28 @@ pub enum Status {
     /// Subscribed is active and it is fetching previous records and no EOD has
     /// Subscribed is active and it is fetching previous records and no EOD has
     /// been received
     /// been received
     Fetching,
     Fetching,
+    /// Refetching, like fetching but the EOD is ignored and nore relayed to
+    /// the listeners, since this is not the first the this subscription has
+    /// been created and it will be rotated soon
+    Refetching,
     /// Subscription is listening, an EOD has been received. This state can be
     /// Subscription is listening, an EOD has been received. This state can be
     /// Requeued is their spot is needed for other subscriptions
     /// Requeued is their spot is needed for other subscriptions
     Subscribed,
     Subscribed,
     /// Waiting to be subscribed again
     /// Waiting to be subscribed again
     Requeued,
     Requeued,
-    /// Resubscribed, like subscribed but the EOD is ignored and nore relayed to
-    /// the listeners, since this is not the first the this subscription has
-    /// been created and it will be rotated soon
-    Resubscribed,
 }
 }
 
 
 #[derive(Debug, Default)]
 #[derive(Debug, Default)]
 struct SubscriptionInner {
 struct SubscriptionInner {
     /// Active subscription (in the client side), when this is Drop all clients unsubscribes
     /// Active subscription (in the client side), when this is Drop all clients unsubscribes
     active_subscription: Option<Vec<client::ActiveSubscription>>,
     active_subscription: Option<Vec<client::ActiveSubscription>>,
+
+    /// Keep track of the number of EOD received
+    end_of_stored_events: usize,
+
     /// Subscription status
     /// Subscription status
     status: Status,
     status: Status,
+
     /// raw request
     /// raw request
     subscription_request: subscribe::Subscribe,
     subscription_request: subscribe::Subscribe,
 }
 }
@@ -55,9 +61,18 @@ struct SubscriptionInner {
 ///
 ///
 /// This must be dropped to unsubscribe from the subscription manager
 /// This must be dropped to unsubscribe from the subscription manager
 pub struct ActiveSubscription {
 pub struct ActiveSubscription {
+    id: PoolSubscriptionId,
     unsubscriber: Option<(PoolSubscriptionId, Arc<Scheduler>)>,
     unsubscriber: Option<(PoolSubscriptionId, Arc<Scheduler>)>,
 }
 }
 
 
+impl Deref for ActiveSubscription {
+    type Target = PoolSubscriptionId;
+
+    fn deref(&self) -> &Self::Target {
+        &self.id
+    }
+}
+
 impl Drop for ActiveSubscription {
 impl Drop for ActiveSubscription {
     fn drop(&mut self) {
     fn drop(&mut self) {
         if let Some((id, scheduler)) = self.unsubscriber.take() {
         if let Some((id, scheduler)) = self.unsubscriber.take() {
@@ -68,11 +83,31 @@ impl Drop for ActiveSubscription {
 
 
 /// Pool subscription ID
 /// Pool subscription ID
 #[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
 #[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
-pub struct PoolSubscriptionId((SubscriptionId, Option<Url>));
+pub struct PoolSubscriptionId(SubscriptionId);
+
+impl From<SubscriptionId> for PoolSubscriptionId {
+    fn from(id: SubscriptionId) -> Self {
+        Self(id)
+    }
+}
+
+impl From<&SubscriptionId> for PoolSubscriptionId {
+    fn from(id: &SubscriptionId) -> Self {
+        Self(id.clone())
+    }
+}
 
 
 impl Default for PoolSubscriptionId {
 impl Default for PoolSubscriptionId {
     fn default() -> Self {
     fn default() -> Self {
-        Self((SubscriptionId::empty(), None))
+        Self(SubscriptionId::empty())
+    }
+}
+
+impl Deref for PoolSubscriptionId {
+    type Target = SubscriptionId;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
     }
     }
 }
 }
 
 
@@ -94,15 +129,17 @@ pub(crate) struct Scheduler {
 }
 }
 
 
 /// Maximum number of subscriptions
 /// Maximum number of subscriptions
-pub const MAX_SUBSCRIPTIONS: usize = 50;
+pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 5;
 
 
-#[allow(warnings)]
 impl Scheduler {
 impl Scheduler {
     /// Creates a new instance
     /// Creates a new instance
     pub fn new(all_clients: AllClients) -> Self {
     pub fn new(all_clients: AllClients) -> Self {
         Self {
         Self {
             all_clients,
             all_clients,
-            ..Default::default()
+            subscription_queue: Default::default(),
+            active_subscriptions: Default::default(),
+            subscriptions: Default::default(),
+            total_subscriptions: Default::default(),
         }
         }
     }
     }
 
 
@@ -112,7 +149,7 @@ impl Scheduler {
         for subscription in subscriptions.values_mut() {
         for subscription in subscriptions.values_mut() {
             if matches!(
             if matches!(
                 subscription.status,
                 subscription.status,
-                Status::Fetching | Status::Resubscribed | Status::Subscribed,
+                Status::Fetching | Status::Refetching | Status::Subscribed,
             ) {
             ) {
                 if let Ok(active_subscription) = client
                 if let Ok(active_subscription) = client
                     .subscribe(subscription.subscription_request.clone())
                     .subscribe(subscription.subscription_request.clone())
@@ -138,24 +175,36 @@ impl Scheduler {
     ) -> Result<(), Error> {
     ) -> Result<(), Error> {
         match message {
         match message {
             Response::EndOfStoredEvents(subscription_id) => {
             Response::EndOfStoredEvents(subscription_id) => {
-                let subscription_id = PoolSubscriptionId((subscription_id.0, None));
-                let mut subscription = self.subscriptions.write().await;
-                if let Some(s) = subscription.get_mut(&subscription_id) {
-                    let old_status = s.status;
-                    s.status = Status::Subscribed;
-
-                    if old_status == Status::Fetching {
-                        return_to
-                            .try_send((
-                                Response::EndOfStoredEvents(subscription_id.0 .0.into()),
-                                url,
-                            ))
-                            .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                let subscription_id = PoolSubscriptionId(subscription_id.0);
+                let mut subscriptions = self.subscriptions.write().await;
+
+                if let Some(s) = subscriptions.get_mut(&subscription_id) {
+                    s.end_of_stored_events += 1;
+                    if s.end_of_stored_events
+                        >= s.active_subscription
+                            .as_ref()
+                            .map(|x| x.len())
+                            .unwrap_or_default()
+                    {
+                        // all clients have received the EOD
+                        let old_status = s.status;
+                        s.status = Status::Subscribed;
+
+                        self.active_subscription_scheduler();
+
+                        if old_status == Status::Fetching {
+                            return_to
+                                .try_send((
+                                    Response::EndOfStoredEvents(
+                                        s.subscription_request.subscription_id.clone().into(),
+                                    ),
+                                    url,
+                                ))
+                                .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                        }
                     }
                     }
                 }
                 }
 
 
-                self.active_subscription_scheduler();
-
                 Ok(())
                 Ok(())
             }
             }
             any_message => {
             any_message => {
@@ -176,39 +225,60 @@ impl Scheduler {
             let items = subscription_queue.len();
             let items = subscription_queue.len();
 
 
             // A subscription must be descheduled as its place is needed.
             // A subscription must be descheduled as its place is needed.
-            let mut deschedule =
+            let deschedule =
                 |subscriptions: &mut RwLockWriteGuard<
                 |subscriptions: &mut RwLockWriteGuard<
                     '_,
                     '_,
                     BTreeMap<PoolSubscriptionId, SubscriptionInner>,
                     BTreeMap<PoolSubscriptionId, SubscriptionInner>,
                 >,
                 >,
                  subscription_queue: &mut RwLockWriteGuard<'_, VecDeque<PoolSubscriptionId>>|
                  subscription_queue: &mut RwLockWriteGuard<'_, VecDeque<PoolSubscriptionId>>|
                  -> bool {
                  -> bool {
-                    for subscription_id in subscription_queue.iter() {
-                        let mut subscription =
+                    let mut active = 0;
+
+                    for (i, subscription_id) in subscription_queue.iter().enumerate() {
+                        let subscription =
                             if let Some(subscription) = subscriptions.get_mut(subscription_id) {
                             if let Some(subscription) = subscriptions.get_mut(subscription_id) {
                                 subscription
                                 subscription
                             } else {
                             } else {
                                 continue;
                                 continue;
                             };
                             };
 
 
-                        if matches!(subscription.status, Status::Subscribed) {
-                            // unsubscribe
-                            let _ = subscription.active_subscription.take();
-                            // update counter
-                            this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
-                            // update since for next request
-                            let now = Utc::now();
-                            for filter in subscription.subscription_request.filters.iter_mut() {
-                                filter.since = Some(now);
+                        match subscription.status {
+                            Status::Subscribed => {
+                                // unsubscribe
+                                subscription.status = Status::Requeued;
+                                subscription.active_subscription.take();
+                                // update counter
+                                this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                                // update since for next request
+                                let now = Utc::now();
+                                log::info!(
+                                    "Deschedule subscription {}",
+                                    subscription.subscription_request.subscription_id
+                                );
+                                for filter in subscription.subscription_request.filters.iter_mut() {
+                                    filter.since = Some(now);
+                                }
+                                return true;
                             }
                             }
-                            return true;
+                            Status::Fetching | Status::Refetching => {
+                                active += 1;
+                                if active >= MAX_ACTIVE_SUBSCRIPTIONS {
+                                    log::info!(
+                                        "Breaking after {} attempts (total {})",
+                                        i,
+                                        subscription_queue.len()
+                                    );
+                                    break;
+                                }
+                            }
+                            _ => {}
                         }
                         }
                     }
                     }
 
 
                     false
                     false
                 };
                 };
 
 
-            for _ in (0..items) {
+            for _ in 0..items {
                 let subscription_id = if let Some(subscription_id) = subscription_queue.pop_front()
                 let subscription_id = if let Some(subscription_id) = subscription_queue.pop_front()
                 {
                 {
                     subscription_id
                     subscription_id
@@ -222,13 +292,15 @@ impl Scheduler {
                         continue;
                         continue;
                     };
                     };
 
 
-                // add subscription id back to the last element, to be visited later
-                subscription_queue.push_back(subscription_id.clone());
-
-                let prev_status = subscription.status;
+                if matches!(subscription.status, Status::Fetching | Status::Refetching) {
+                    subscription_queue.push_front(subscription_id.clone());
+                } else {
+                    subscription_queue.push_back(subscription_id.clone());
+                }
 
 
-                if matches!(prev_status, Status::Queued | Status::Requeued) {
-                    if this.active_subscriptions.load(Ordering::SeqCst) >= MAX_SUBSCRIPTIONS
+                if matches!(subscription.status, Status::Queued | Status::Requeued) {
+                    let prev_status = subscription.status;
+                    if this.active_subscriptions.load(Ordering::SeqCst) >= MAX_ACTIVE_SUBSCRIPTIONS
                         && !deschedule(&mut subscriptions, &mut subscription_queue)
                         && !deschedule(&mut subscriptions, &mut subscription_queue)
                     {
                     {
                         subscriptions.insert(subscription_id, subscription);
                         subscriptions.insert(subscription_id, subscription);
@@ -236,6 +308,11 @@ impl Scheduler {
                         break;
                         break;
                     }
                     }
 
 
+                    // This connection is active now (or will be), therefore move it to the front
+                    // of the subscription queue so it can be descheduled first when the times comes
+                    let _ = subscription_queue.pop_back();
+                    subscription_queue.push_front(subscription_id.clone());
+
                     let wait_all = clients
                     let wait_all = clients
                         .values()
                         .values()
                         .map(|(_, sender)| {
                         .map(|(_, sender)| {
@@ -248,12 +325,17 @@ impl Scheduler {
                         .into_iter()
                         .into_iter()
                         .collect::<Result<Vec<_>, _>>()
                         .collect::<Result<Vec<_>, _>>()
                     {
                     {
+                        log::info!(
+                            "Promoting subscription {} to active",
+                            subscription.subscription_request.subscription_id
+                        );
                         subscription.active_subscription = Some(active_subscriptions);
                         subscription.active_subscription = Some(active_subscriptions);
                         subscription.status = if prev_status == Status::Queued {
                         subscription.status = if prev_status == Status::Queued {
                             Status::Fetching
                             Status::Fetching
                         } else {
                         } else {
-                            Status::Resubscribed
+                            Status::Refetching
                         };
                         };
+                        subscription.end_of_stored_events = 0;
 
 
                         this.active_subscriptions.fetch_add(1, Ordering::Relaxed);
                         this.active_subscriptions.fetch_add(1, Ordering::Relaxed);
                     }
                     }
@@ -266,22 +348,22 @@ impl Scheduler {
     }
     }
 
 
     /// Creates a new subscription with a given filters
     /// Creates a new subscription with a given filters
-    pub async fn subcribe(
+    pub async fn subscribe(
         self: &Arc<Self>,
         self: &Arc<Self>,
         subscription_request: subscribe::Subscribe,
         subscription_request: subscribe::Subscribe,
-        specific_url: Option<Url>,
-    ) -> ActiveSubscription {
-        let subscription_id = PoolSubscriptionId((
-            subscription_request.subscription_id.clone(),
-            specific_url.clone(),
-        ));
-
-        self.subscriptions.write().await.insert(
+    ) -> Result<ActiveSubscription, Error> {
+        let mut subscriptions = self.subscriptions.write().await;
+        let subscription_id = PoolSubscriptionId(subscription_request.subscription_id.clone());
+
+        if subscriptions.get(&subscription_id).is_some() {
+            return Err(Error::DuplicateSubscriptionId);
+        }
+
+        subscriptions.insert(
             subscription_id.clone(),
             subscription_id.clone(),
             SubscriptionInner {
             SubscriptionInner {
-                status: Status::Queued,
-                active_subscription: None,
                 subscription_request,
                 subscription_request,
+                ..Default::default()
             },
             },
         );
         );
 
 
@@ -294,22 +376,53 @@ impl Scheduler {
         self.total_subscriptions.fetch_add(1, Ordering::Relaxed);
         self.total_subscriptions.fetch_add(1, Ordering::Relaxed);
         self.active_subscription_scheduler();
         self.active_subscription_scheduler();
 
 
-        ActiveSubscription {
+        Ok(ActiveSubscription {
+            id: subscription_id.clone(),
             unsubscriber: Some((subscription_id, self.clone())),
             unsubscriber: Some((subscription_id, self.clone())),
-        }
+        })
     }
     }
 
 
+    /// Removes a subscription and drop it from the scheduler
     fn remove(self: Arc<Self>, subscription_id: PoolSubscriptionId) {
     fn remove(self: Arc<Self>, subscription_id: PoolSubscriptionId) {
         let this = self;
         let this = self;
         tokio::spawn(async move {
         tokio::spawn(async move {
             let mut subscriptions = this.subscriptions.write().await;
             let mut subscriptions = this.subscriptions.write().await;
-            if let Some(id) = subscriptions.remove(&subscription_id) {
+            this.subscription_queue
+                .write()
+                .await
+                .retain(|x| x != &subscription_id);
+
+            if let Some(sub) = subscriptions.remove(&subscription_id) {
+                log::info!(
+                    "Unsubscribing and dropping from scheduler {}",
+                    subscription_id.0,
+                );
+
                 this.active_subscription_scheduler();
                 this.active_subscription_scheduler();
                 this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
                 this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                if sub.active_subscription.is_some() {
+                    // it is active
+                    this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                }
             }
             }
         });
         });
     }
     }
 
 
+    /// debug
+    pub async fn debug(&self) -> String {
+        let a = self.subscriptions.read().await;
+        format!(
+            "Active: {} - {}",
+            serde_json::to_string(
+                &a.iter()
+                    .map(|(k, v)| (k.to_string(), format!("{:?}", v.status)))
+                    .collect::<Vec<_>>(),
+            )
+            .unwrap(),
+            a.len(),
+        )
+    }
+
     /// Total number of subscribers
     /// Total number of subscribers
     pub fn total_subscribers(&self) -> usize {
     pub fn total_subscribers(&self) -> usize {
         self.total_subscriptions.load(Ordering::Relaxed)
         self.total_subscriptions.load(Ordering::Relaxed)

+ 22 - 3
crates/relayer/src/connection/local.rs

@@ -4,17 +4,22 @@
 use crate::{connection::ConnectionId, Error, Relayer};
 use crate::{connection::ConnectionId, Error, Relayer};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{Request, Response};
 use nostr_rs_types::{Request, Response};
-use std::sync::Arc;
-use tokio::sync::mpsc::{Receiver, Sender};
+use std::{sync::Arc, time::Duration};
+use tokio::{
+    sync::mpsc::{Receiver, Sender},
+    task::JoinHandle,
+    time::sleep,
+};
 
 
 /// Local connection
 /// Local connection
 pub struct LocalConnection<T>
 pub struct LocalConnection<T>
 where
 where
     T: Storage + Send + Sync + 'static,
     T: Storage + Send + Sync + 'static,
 {
 {
+    /// The connection ID
+    pub conn_id: ConnectionId,
     sender: Sender<(ConnectionId, Request)>,
     sender: Sender<(ConnectionId, Request)>,
     receiver: Receiver<Response>,
     receiver: Receiver<Response>,
-    pub(crate) conn_id: ConnectionId,
     relayer: Arc<Relayer<T>>,
     relayer: Arc<Relayer<T>>,
 }
 }
 
 
@@ -22,6 +27,10 @@ impl<T> LocalConnection<T>
 where
 where
     T: Storage + Send + Sync + 'static,
     T: Storage + Send + Sync + 'static,
 {
 {
+    /// Number of queued messages
+    pub fn queued_messages(&self) -> usize {
+        self.receiver.len()
+    }
     /// Receive a message from the relayer
     /// Receive a message from the relayer
     pub async fn recv(&mut self) -> Option<Response> {
     pub async fn recv(&mut self) -> Option<Response> {
         self.receiver.recv().await
         self.receiver.recv().await
@@ -32,6 +41,16 @@ where
         self.receiver.try_recv().ok()
         self.receiver.try_recv().ok()
     }
     }
 
 
+    /// Queues sending a message to the relayer in the future time
+    pub fn future_send(&self, request: Request, in_the_future: Duration) -> JoinHandle<()> {
+        let sender = self.sender.clone();
+        let conn_id = self.conn_id.clone();
+        tokio::spawn(async move {
+            sleep(in_the_future).await;
+            let _ = sender.send((conn_id, request)).await;
+        })
+    }
+
     /// Sends a request to the relayer
     /// Sends a request to the relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         self.sender
         self.sender

+ 6 - 9
crates/relayer/src/connection/mod.rs

@@ -42,14 +42,6 @@ impl ConnectionId {
     pub fn new_empty() -> Self {
     pub fn new_empty() -> Self {
         Self(0)
         Self(0)
     }
     }
-
-    /// Check if the connection id is empty
-    ///
-    /// Empty connection id is used for messages from Client pool to the relayer
-    #[inline]
-    pub fn is_empty(&self) -> bool {
-        self.0 == 0
-    }
 }
 }
 
 
 type CompoundSubcription = (
 type CompoundSubcription = (
@@ -68,7 +60,7 @@ pub struct Connection {
     handler: Option<JoinHandle<()>>,
     handler: Option<JoinHandle<()>>,
 }
 }
 
 
-const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
+const MAX_SUBSCRIPTIONS_BUFFER: usize = 100_000;
 
 
 impl Drop for Connection {
 impl Drop for Connection {
     fn drop(&mut self) {
     fn drop(&mut self) {
@@ -93,6 +85,11 @@ impl Connection {
         )
         )
     }
     }
 
 
+    /// If this connection is a local connection and not a TCP-extenal connection
+    pub fn is_local_connection(&self) -> bool {
+        self.handler.is_none()
+    }
+
     /// Create new connection
     /// Create new connection
     pub async fn new_connection(
     pub async fn new_connection(
         send_message_to_relayer: Sender<(ConnectionId, Request)>,
         send_message_to_relayer: Sender<(ConnectionId, Request)>,

+ 140 - 91
crates/relayer/src/relayer.rs

@@ -3,7 +3,7 @@ use crate::{
     Connection, Error,
     Connection, Error,
 };
 };
 use futures_util::StreamExt;
 use futures_util::StreamExt;
-use nostr_rs_client::{Error as ClientError, Pool, Url};
+use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_subscription_manager::SubscriptionManager;
 use nostr_rs_subscription_manager::SubscriptionManager;
 use nostr_rs_types::{
 use nostr_rs_types::{
@@ -18,10 +18,12 @@ use std::{
 };
 };
 use tokio::{
 use tokio::{
     net::{TcpListener, TcpStream},
     net::{TcpListener, TcpStream},
-    sync::mpsc::{channel, Receiver, Sender},
-};
-use tokio::{
-    sync::{mpsc, RwLock},
+    sync::{
+        mpsc::{
+            self, {channel, Receiver, Sender},
+        },
+        RwLock,
+    },
     task::JoinHandle,
     task::JoinHandle,
 };
 };
 
 
@@ -47,27 +49,23 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// otherwise all the messages are going to be ephemeral, making this
     /// otherwise all the messages are going to be ephemeral, making this
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// be able to perform any optimization like prefetching content while offline
     /// be able to perform any optimization like prefetching content while offline
-    storage: Option<T>,
+    storage: Arc<Option<T>>,
     /// Subscription manager
     /// Subscription manager
     subscription_manager: Arc<SubscriptionManager<RelayerSubscriptionId, ()>>,
     subscription_manager: Arc<SubscriptionManager<RelayerSubscriptionId, ()>>,
     /// List of all active connections
     /// List of all active connections
-    connections: RwLock<HashMap<ConnectionId, Connection>>,
+    connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
     /// This Sender can be used to send requests from anywhere to the relayer.
     send_to_relayer: Sender<(ConnectionId, Request)>,
     send_to_relayer: Sender<(ConnectionId, Request)>,
     /// This Receiver is the relayer the way the relayer receives messages
     /// This Receiver is the relayer the way the relayer receives messages
     relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
     relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
+
     /// Client pool
     /// Client pool
     ///
     ///
-    /// A relayer can optionally be connected to a pool of clients to get foreign events.
-    client_pool: Option<(Pool, JoinHandle<()>)>,
-}
-
-impl<T: Storage + Send + Sync + 'static> Drop for Relayer<T> {
-    fn drop(&mut self) {
-        if let Some((_, handle)) = self.client_pool.take() {
-            handle.abort();
-        }
-    }
+    /// A relayer can optionally be connected to a pool of clients to get
+    /// foreign events.
+    client_pool: Option<Pool>,
+    client_pool_receiver: Option<Receiver<(Response, Url)>>,
+    client_pool_subscriptions: RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>,
 }
 }
 
 
 impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 impl<T: Storage + Send + Sync + 'static> Relayer<T> {
@@ -80,25 +78,36 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     /// and create a network of relayers, reposting events to them and
     /// and create a network of relayers, reposting events to them and
     /// subscribing to their events.`gqq`
     /// subscribing to their events.`gqq`
     pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
     pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
-        let (sender, receiver) = channel(100_000);
+        let (relayer_sender, relayer_receiver) = channel(100_000);
+
+        let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool {
+            let result = client_pool.split()?;
+            (result.0, Some(result.1))
+        } else {
+            let (_, receiver) = mpsc::channel(1);
+            (receiver, None)
+        };
+
         Ok(Self {
         Ok(Self {
-            storage,
+            storage: Arc::new(storage),
             subscription_manager: Default::default(),
             subscription_manager: Default::default(),
-            send_to_relayer: sender.clone(),
-            relayer_receiver: Some(receiver),
+            send_to_relayer: relayer_sender,
+            relayer_receiver: Some(relayer_receiver),
             connections: Default::default(),
             connections: Default::default(),
-            client_pool: if let Some(client_pool) = client_pool {
-                Some(Self::handle_client_pool(client_pool, sender)?)
-            } else {
-                None
-            },
+            client_pool_receiver: Some(client_pool_receiver),
+            client_pool: client_pool,
+            client_pool_subscriptions: Default::default(),
         })
         })
     }
     }
 
 
     /// Connects to the relayer pool
     /// Connects to the relayer pool
     pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
     pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
-        let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
-        let _ = client_pool.connect_to(url).await;
+        let _ = self
+            .client_pool
+            .as_ref()
+            .ok_or(Error::NoClient)?
+            .connect_to(url)
+            .await?;
         Ok(())
         Ok(())
     }
     }
 
 
@@ -117,7 +126,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     ///
     ///
     /// This function consumes the object and takes the ownership. The returned
     /// This function consumes the object and takes the ownership. The returned
     /// JoinHandle() can be used to stop the main loop
     /// JoinHandle() can be used to stop the main loop
-    pub fn main(self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
+    pub fn main(mut self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
+        let mut client_pool_receiver = self
+            .client_pool_receiver
+            .take()
+            .ok_or(Error::AlreadySplitted)?;
+
         let (this, mut receiver) = self.split()?;
         let (this, mut receiver) = self.split()?;
         let _self = Arc::new(this);
         let _self = Arc::new(this);
         let this = _self.clone();
         let this = _self.clone();
@@ -126,22 +140,36 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             loop {
             loop {
                 tokio::select! {
                 tokio::select! {
                     Ok((stream, _)) = server.accept() => {
                     Ok((stream, _)) = server.accept() => {
-                        // accept new external connections
+                        // accept new connections
                         let _ = this.add_connection(None, stream).await;
                         let _ = this.add_connection(None, stream).await;
                     },
                     },
-                    Some((conn_id, request)) = receiver.recv() => {
-                        // receive messages from the connection pool
-                        if conn_id.is_empty() {
-                            // message received from client pool
-                            if let Request::Event(event) = request {
+                    Some((response, _)) = client_pool_receiver.recv() => {
+                        // process messages from anothe relayer, broadcast it and store it
+                        match response {
+                            Response::Event(event) => {
+                                // we received a message from the client pool, store it locally
+                                // and re-broadcast it.
                                 let _ = this.broadcast(event.deref()).await;
                                 let _ = this.broadcast(event.deref()).await;
-                                if let Some(storage) = this.storage.as_ref() {
-                                    let _ = storage.store_local_event(&event).await;
-                                }
                             }
                             }
-                            continue;
+                            Response::EndOfStoredEvents(sub) => {
+                                let connections = this.connections.read().await;
+                                let (sub_id, connection) = if let Some((sub_id, conn_id)) = this.client_pool_subscriptions.write().await.remove(&(sub.deref().into())) {
+                                     if let Some(connection) = connections.get(&conn_id) {
+                                        (sub_id, connection)
+                                    } else {
+                                        continue;
+                                    }
+                                } else {
+                                    continue
+                                };
+
+                                let _ = connection.send(Response::EndOfStoredEvents(sub_id.into()));
+                            }
+                            _ => {}
                         }
                         }
-
+                    }
+                    Some((conn_id, request)) = receiver.recv() => {
+                        // receive messages from our clients
                         let connections = this.connections.read().await;
                         let connections = this.connections.read().await;
                         let connection = if let Some(connection) = connections.get(&conn_id) {
                         let connection = if let Some(connection) = connections.get(&conn_id) {
                             connection
                             connection
@@ -162,37 +190,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         Ok((_self, handle))
         Ok((_self, handle))
     }
     }
 
 
-    /// Handle the client pool
-    ///
-    /// Main loop to consume messages from the client pool and broadcast them to the local subscribers
-    fn handle_client_pool(
-        client_pool: Pool,
-        send_message_to_relayer: Sender<(ConnectionId, Request)>,
-    ) -> Result<(Pool, JoinHandle<()>), ClientError> {
-        let (mut receiver, client_pool) = client_pool.split()?;
-
-        let handle = tokio::spawn(async move {
-            loop {
-                if let Some((response, _)) = receiver.recv().await {
-                    match response {
-                        Response::Event(event) => {
-                            let _ = send_message_to_relayer.try_send((
-                                ConnectionId::new_empty(),
-                                Request::Event(event.event.into()),
-                            ));
-                        }
-                        Response::EndOfStoredEvents(_) => {}
-                        x => {
-                            println!("x => {:?}", x);
-                        }
-                    }
-                }
-            }
-        });
-
-        Ok((client_pool, handle))
-    }
-
     /// Returns a reference to the internal database
     /// Returns a reference to the internal database
     pub fn get_db(&self) -> &Option<T> {
     pub fn get_db(&self) -> &Option<T> {
         &self.storage
         &self.storage
@@ -269,7 +266,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     let _ = storage.store_local_event(&event).await;
                     let _ = storage.store_local_event(&event).await;
                 }
                 }
 
 
-                if let Some((client_pool, _)) = self.client_pool.as_ref() {
+                if let Some(client_pool) = self.client_pool.as_ref() {
                     // pass the event to the pool of clients, so this relayer can relay
                     // pass the event to the pool of clients, so this relayer can relay
                     // their local events to the clients in the network of relayers
                     // their local events to the clients in the network of relayers
                     let _ = client_pool.post(event).await;
                     let _ = client_pool.post(event).await;
@@ -284,11 +281,26 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 )?;
                 )?;
             }
             }
             Request::Request(request) => {
             Request::Request(request) => {
-                let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref()
-                {
-                    // pass the subscription request to the pool of clients, so this relayer
-                    // can relay any unknown event to the clients through their subscriptions
-                    Some(client_pool.subscribe(request.clone()).await)
+                let foreign_subscription = if let Some(client_pool) = self.client_pool.as_ref() {
+                    // If this relay is connected to other relays through the
+                    // client pool, create the same subscription in them as
+                    // well, with the main goal of fetching any foreign event
+                    // that matches the requested subscription.
+                    //
+                    // If the this happens, this relay will serve any local
+                    // event that matches, as well any foreign event. Foreign
+                    // events will be stored locally as well if there is a
+                    // storage setup.
+                    let foreign_sub_id = client_pool
+                        .subscribe(request.filters.clone().into())
+                        .await?;
+
+                    self.client_pool_subscriptions.write().await.insert(
+                        foreign_sub_id.clone(),
+                        (request.subscription_id.clone(), connection.get_conn_id()),
+                    );
+
+                    Some(foreign_sub_id)
                 } else {
                 } else {
                     None
                     None
                 };
                 };
@@ -315,8 +327,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     }
                     }
                 }
                 }
 
 
-                let _ = connection
-                    .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+                if foreign_subscription.is_none() {
+                    // If there is a foreign subscription, we shouldn't send a
+                    // EOS until we have got EOS from all foreign relays
+                    let _ = connection
+                        .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+                }
 
 
                 connection
                 connection
                     .subscribe(
                     .subscribe(
@@ -343,6 +359,38 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     }
     }
 
 
     #[inline]
     #[inline]
+    /// A non-blocking version of broadcast
+    #[allow(dead_code)]
+    fn broadcast_and_forget(&self, event: Event) {
+        let storage = self.storage.clone();
+        let connections = self.connections.clone();
+        let subscription_manager = self.subscription_manager.clone();
+
+        tokio::spawn(async move {
+            if let Some(storage) = storage.as_ref() {
+                if !storage.store(&event).await.unwrap_or_default() {
+                    return;
+                }
+            }
+
+            let connections = connections.read().await;
+            for RelayerSubscriptionId((sub_id, conn_id)) in
+                subscription_manager.get_subscribers(&event).await
+            {
+                if let Some(connection) = connections.get(&conn_id) {
+                    let _ = connection.send(
+                        relayer::Event {
+                            subscription_id: sub_id,
+                            event: event.clone(),
+                        }
+                        .into(),
+                    );
+                }
+            }
+        });
+    }
+
+    #[inline]
     /// Broadcast a given event to all local subscribers
     /// Broadcast a given event to all local subscribers
     pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
     pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
         if let Some(storage) = self.storage.as_ref() {
         if let Some(storage) = self.storage.as_ref() {
@@ -520,7 +568,7 @@ mod test {
             "authors": [
             "authors": [
               "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
               "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
             ],
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           },
           {
           {
             "#p": [
             "#p": [
@@ -533,7 +581,7 @@ mod test {
               7,
               7,
               9735
               9735
             ],
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           },
           {
           {
             "#p": [
             "#p": [
@@ -1012,18 +1060,19 @@ mod test {
         let (main_client, _main_client_inscope) =
         let (main_client, _main_client_inscope) =
             Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
             Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
 
 
-        let _sub = reader_client.subscribe(Default::default()).await;
+        let _sub = reader_client
+            .subscribe(Default::default())
+            .await
+            .expect("v");
 
 
         sleep(Duration::from_millis(20)).await;
         sleep(Duration::from_millis(20)).await;
 
 
-        for _ in 0..3 {
-            assert!(reader_client
-                .try_recv()
-                .map(|(r, _)| r)
-                .expect("valid message")
-                .as_end_of_stored_events()
-                .is_some());
-        }
+        assert!(reader_client
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message: step")
+            .as_end_of_stored_events()
+            .is_some());
         assert!(reader_client.try_recv().is_none());
         assert!(reader_client.try_recv().is_none());
 
 
         let account1 = Account::default();
         let account1 = Account::default();
@@ -1083,7 +1132,7 @@ mod test {
         // connected to the main relayer
         // connected to the main relayer
         let (mut main_client, _in_scope) =
         let (mut main_client, _in_scope) =
             Pool::new_with_clients(vec![main_relayer]).expect("valid client");
             Pool::new_with_clients(vec![main_relayer]).expect("valid client");
-        let _sub = main_client.subscribe(Default::default()).await;
+        let _sub = main_client.subscribe(Default::default()).await.expect("v");
 
 
         sleep(Duration::from_millis(10)).await;
         sleep(Duration::from_millis(10)).await;
         assert!(main_client
         assert!(main_client

+ 2 - 2
crates/subscription-manager/src/filter.rs

@@ -134,12 +134,12 @@ impl SortedFilter {
         }
         }
 
 
         if let Some(since) = self.since {
         if let Some(since) = self.since {
-            if event.created_at() < since {
+            if event.created_at() >= since {
                 return false;
                 return false;
             }
             }
         }
         }
         if let Some(until) = self.until {
         if let Some(until) = self.until {
-            if event.created_at() > until {
+            if event.created_at() <= until {
                 return false;
                 return false;
             }
             }
         }
         }