浏览代码

Merge branch 'client-pool-improvements' of cesar/nostr-prototype into main

Cesar Rodas 1 月之前
父节点
当前提交
de127a47ca

+ 65 - 14
crates/client/src/client.rs

@@ -29,6 +29,8 @@ use url::Url;
 
 type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
 
+const MAX_ACTIVE_SUBSCRIPTIONS: usize = 10;
+
 #[derive(Debug)]
 /// Active subscription
 ///
@@ -45,10 +47,10 @@ impl Drop for ActiveSubscription {
         let id = self.id.clone();
         let send_to_socket = self.send_to_socket.clone();
         tokio::spawn(async move {
-            subscriptions.write().await.remove(&id);
             let _ = send_to_socket
-                .send(nostr_rs_types::client::Close(id).into())
+                .send(nostr_rs_types::client::Close(id.clone()).into())
                 .await;
+            subscriptions.write().await.remove(&id);
         });
     }
 }
@@ -126,7 +128,7 @@ impl Client {
         mut send_to_socket: mpsc::Receiver<Request>,
         url: Url,
         is_connected: Arc<AtomicBool>,
-        send_on_connection: Subscriptions,
+        to_resubscribe: Subscriptions,
         filter: F,
     ) -> JoinHandle<()>
     where
@@ -159,16 +161,29 @@ impl Client {
                 log::info!("Connected to {}", url);
                 connection_attempts = 0;
 
-                let mut subscriptions = send_on_connection
+                // Convert all sent subscriptions to a local vector
+                let mut subscriptions = to_resubscribe
                     .read()
                     .await
-                    .iter()
-                    .map(|(sub_id, msg)| {
+                    .values()
+                    .map(|msg| Request::Request(msg.clone()))
+                    .collect::<Vec<_>>();
+
+                // Only keep the ones to be subscribed, moved the rest of the subscriptions to the queue
+                let mut to_subscribe_queue = if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
+                    subscriptions.split_off(MAX_ACTIVE_SUBSCRIPTIONS)
+                } else {
+                    vec![]
+                };
+
+                let mut subscriptions = subscriptions
+                    .into_iter()
+                    .map(|msg| {
                         (
-                            sub_id.to_owned(),
-                            serde_json::to_string(&Request::Request(msg.clone()))
-                                .ok()
-                                .map(Message::Text),
+                            msg.as_request()
+                                .map(|x| x.subscription_id.clone())
+                                .unwrap_or_default(),
+                            serde_json::to_string(&msg).ok().map(Message::Text),
                         )
                     })
                     .collect::<HashMap<_, _>>();
@@ -177,6 +192,7 @@ impl Client {
                     if let Some(msg) = msg.take() {
                         if let Err(x) = socket.send(msg).await {
                             log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                            break;
                         }
                     }
                 }
@@ -191,9 +207,42 @@ impl Client {
                                     log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
                                     continue;
                                 }
+                                if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
+                                    log::warn!("{}: Queueing subscription to {} for later", url, sub.subscription_id);
+                                    to_subscribe_queue.push(msg.clone());
+                                    continue;
+                                }
+                                subscriptions.insert(sub.subscription_id.clone(), None);
+                            }
+
+
+                            let json = if let Ok(json) =  serde_json::to_string(&msg) {
+                                json
+                            } else {
+                                continue;
+                            };
+
+                            if let Err(x) = socket.send(Message::Text(json)).await {
+                                log::error!("{} : Reconnecting due {}", url, x);
+                                break;
                             }
-                            if let Ok(json) = serde_json::to_string(&msg) {
-                                log::info!("{}: Sending {}", url, json);
+
+                            if let Request::Close(close) = &msg {
+                                subscriptions.remove(&close.0);
+                                let json = if let Some(json) = to_subscribe_queue
+                                    .pop()
+                                    .and_then(|msg| {
+                                        subscriptions.insert(msg.as_request().map(|sub| sub.subscription_id.clone()).unwrap_or_default(), None);
+                                        serde_json::to_string(&msg).ok()
+                                    })
+                                    {
+                                        json
+                                    } else {
+                                        continue;
+                                    };
+
+
+                                log::info!("Sending: {} (queued subscription)", json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
                                     log::error!("{} : Reconnecting due {}", url, x);
                                     break;
@@ -225,10 +274,12 @@ impl Client {
                                 continue;
                             }
 
-                            log::info!("New message: {}", msg);
-
                             let event: Result<Response, _> = serde_json::from_str(&msg);
 
+                            if let Ok(Response::Notice(err)) = &event {
+                                log::error!("{}: Active connections {}: {:?}", url, subscriptions.len(), err);
+                            }
+
                             if let Ok(msg) = event {
                                 if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
                                     log::error!("{}: Reconnecting client because of {}", url, error);

+ 4 - 0
crates/client/src/error.rs

@@ -10,6 +10,10 @@ pub enum Error {
     #[error("Url: {0}")]
     Url(#[from] url::ParseError),
 
+    /// Subscriptions must be unique
+    #[error("Duplicate subscription")]
+    DuplicateSubscriptionId,
+
     /// WebSocket client error
     #[error("Tungstenite: {0}")]
     Tungstenite(#[from] TungsteniteError),

+ 445 - 13
crates/client/src/pool/mod.rs

@@ -13,9 +13,13 @@ use std::{
         atomic::{AtomicUsize, Ordering},
         Arc,
     },
+    time::Duration,
 };
 use subscription::Scheduler;
-use tokio::sync::{mpsc, RwLock};
+use tokio::{
+    sync::{mpsc, RwLock},
+    time::sleep,
+};
 use url::Url;
 
 pub mod subscription;
@@ -81,6 +85,14 @@ impl Pool {
         let pool = Self::default();
         let connect = clients.into_iter().map(|url| pool.connect_to(url));
 
+        let x = pool.subscription_manager.clone();
+        tokio::spawn(async move {
+            loop {
+                log::info!("Active subscribers: {}", x.debug().await);
+                sleep(Duration::from_secs(5)).await;
+            }
+        });
+
         futures::executor::block_on(async {
             futures::future::join_all(connect)
                 .await
@@ -114,8 +126,8 @@ impl Pool {
     pub async fn subscribe(
         &self,
         subscription: subscribe::Subscribe,
-    ) -> subscription::ActiveSubscription {
-        self.subscription_manager.subcribe(subscription, None).await
+    ) -> Result<subscription::ActiveSubscription, Error> {
+        self.subscription_manager.subscribe(subscription).await
     }
 
     /// Sends a request to all the connected relayers
@@ -187,20 +199,25 @@ mod test {
     use super::*;
     use nostr_rs_memory::Memory;
     use nostr_rs_relayer::Relayer;
-    use nostr_rs_types::{account::Account, types::Content};
+    use nostr_rs_types::{
+        account::Account,
+        types::{Content, Filter},
+    };
     use std::time::Duration;
+    use subscription::MAX_ACTIVE_SUBSCRIPTIONS;
     use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
 
-    async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
+    async fn dummy_server(port: u16) -> (Url, Arc<Relayer<Memory>>, JoinHandle<()>) {
         let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
             .await
             .unwrap();
         let local_addr = listener.local_addr().expect("addr");
 
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
-        let (_, stopper) = relayer.main(listener).expect("valid main loop");
+        let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
         (
             Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
+            relayer,
             stopper,
         )
     }
@@ -208,7 +225,10 @@ mod test {
     #[tokio::test]
     async fn droppable_subscription() {
         let client_pool = Pool::default();
-        let subscription = client_pool.subscribe(Default::default()).await;
+        let subscription = client_pool
+            .subscribe(Default::default())
+            .await
+            .expect("valid");
 
         assert_eq!(client_pool.active_subscriptions().await, 1);
         drop(subscription);
@@ -218,7 +238,7 @@ mod test {
 
     #[tokio::test]
     async fn connect_to_dummy_server() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
         assert_eq!(0, client_pool.check_active_connections().await);
@@ -235,7 +255,7 @@ mod test {
 
     #[tokio::test]
     async fn two_clients_communication() {
-        let (addr, _) = dummy_server(0).await;
+        let (addr, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
@@ -273,7 +293,7 @@ mod test {
 
     #[tokio::test]
     async fn reconnect_and_resubscribe() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
@@ -318,7 +338,7 @@ mod test {
         assert_eq!(0, client_pool1.check_active_connections().await);
         assert_eq!(0, client_pool2.check_active_connections().await);
 
-        let (_, stopper) = dummy_server(addr.port().expect("port")).await;
+        let (_, _, stopper) = dummy_server(addr.port().expect("port")).await;
 
         sleep(Duration::from_millis(2_000)).await;
 
@@ -347,8 +367,8 @@ mod test {
 
     #[tokio::test]
     async fn connect_multiple_servers() {
-        let (addr1, _) = dummy_server(0).await;
-        let (addr2, _) = dummy_server(0).await;
+        let (addr1, _, _) = dummy_server(0).await;
+        let (addr2, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
@@ -382,4 +402,416 @@ mod test {
         );
         assert!(client_pool1.try_recv().is_none());
     }
+
+    /// Client pool user creates 101 subscription (that is not allowed by many
+    /// relays), so the pool will do a round-robin subscriptions keeping 5
+    /// actives at a time with an internal scheduler.
+    ///
+    /// The scheduler will pause active subscriptions when the EOS is received.
+    /// On the next round `since` will be used to receive only newer events
+    mod scheduler {
+        use super::*;
+
+        #[tokio::test]
+        async fn stored_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, last");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            sleep(Duration::from_secs(1)).await;
+
+            let mut has_receive_event = false;
+            for _ in 0..102 {
+                let event = client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid event");
+
+                if has_receive_event {
+                    assert!(event.as_end_of_stored_events().is_some());
+                } else {
+                    if let Some(ev) = event.as_event() {
+                        assert_eq!(ev.id, signed_content.id);
+                        has_receive_event = true;
+                    } else {
+                        assert!(event.as_end_of_stored_events().is_some());
+                    }
+                }
+            }
+
+            drop(subs);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn stored_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, first");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..101 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert!(client_pool1
+                .try_recv()
+                .map(|(r, _)| r)
+                .expect("valid message")
+                .as_end_of_stored_events()
+                .is_some());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+    }
 }

+ 173 - 60
crates/client/src/pool/subscription.rs

@@ -10,6 +10,7 @@ use nostr_rs_types::{
 };
 use std::{
     collections::{BTreeMap, VecDeque},
+    ops::Deref,
     sync::{
         atomic::{AtomicUsize, Ordering},
         Arc,
@@ -27,23 +28,28 @@ pub enum Status {
     /// Subscribed is active and it is fetching previous records and no EOD has
     /// been received
     Fetching,
+    /// Refetching, like fetching but the EOD is ignored and nore relayed to
+    /// the listeners, since this is not the first the this subscription has
+    /// been created and it will be rotated soon
+    Refetching,
     /// Subscription is listening, an EOD has been received. This state can be
     /// Requeued is their spot is needed for other subscriptions
     Subscribed,
     /// Waiting to be subscribed again
     Requeued,
-    /// Resubscribed, like subscribed but the EOD is ignored and nore relayed to
-    /// the listeners, since this is not the first the this subscription has
-    /// been created and it will be rotated soon
-    Resubscribed,
 }
 
 #[derive(Debug, Default)]
 struct SubscriptionInner {
     /// Active subscription (in the client side), when this is Drop all clients unsubscribes
     active_subscription: Option<Vec<client::ActiveSubscription>>,
+
+    /// Keep track of the number of EOD received
+    end_of_stored_events: usize,
+
     /// Subscription status
     status: Status,
+
     /// raw request
     subscription_request: subscribe::Subscribe,
 }
@@ -55,9 +61,18 @@ struct SubscriptionInner {
 ///
 /// This must be dropped to unsubscribe from the subscription manager
 pub struct ActiveSubscription {
+    id: PoolSubscriptionId,
     unsubscriber: Option<(PoolSubscriptionId, Arc<Scheduler>)>,
 }
 
+impl Deref for ActiveSubscription {
+    type Target = PoolSubscriptionId;
+
+    fn deref(&self) -> &Self::Target {
+        &self.id
+    }
+}
+
 impl Drop for ActiveSubscription {
     fn drop(&mut self) {
         if let Some((id, scheduler)) = self.unsubscriber.take() {
@@ -68,11 +83,31 @@ impl Drop for ActiveSubscription {
 
 /// Pool subscription ID
 #[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
-pub struct PoolSubscriptionId((SubscriptionId, Option<Url>));
+pub struct PoolSubscriptionId(SubscriptionId);
+
+impl From<SubscriptionId> for PoolSubscriptionId {
+    fn from(id: SubscriptionId) -> Self {
+        Self(id)
+    }
+}
+
+impl From<&SubscriptionId> for PoolSubscriptionId {
+    fn from(id: &SubscriptionId) -> Self {
+        Self(id.clone())
+    }
+}
 
 impl Default for PoolSubscriptionId {
     fn default() -> Self {
-        Self((SubscriptionId::empty(), None))
+        Self(SubscriptionId::empty())
+    }
+}
+
+impl Deref for PoolSubscriptionId {
+    type Target = SubscriptionId;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
     }
 }
 
@@ -94,15 +129,17 @@ pub(crate) struct Scheduler {
 }
 
 /// Maximum number of subscriptions
-pub const MAX_SUBSCRIPTIONS: usize = 50;
+pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 5;
 
-#[allow(warnings)]
 impl Scheduler {
     /// Creates a new instance
     pub fn new(all_clients: AllClients) -> Self {
         Self {
             all_clients,
-            ..Default::default()
+            subscription_queue: Default::default(),
+            active_subscriptions: Default::default(),
+            subscriptions: Default::default(),
+            total_subscriptions: Default::default(),
         }
     }
 
@@ -112,7 +149,7 @@ impl Scheduler {
         for subscription in subscriptions.values_mut() {
             if matches!(
                 subscription.status,
-                Status::Fetching | Status::Resubscribed | Status::Subscribed,
+                Status::Fetching | Status::Refetching | Status::Subscribed,
             ) {
                 if let Ok(active_subscription) = client
                     .subscribe(subscription.subscription_request.clone())
@@ -138,24 +175,36 @@ impl Scheduler {
     ) -> Result<(), Error> {
         match message {
             Response::EndOfStoredEvents(subscription_id) => {
-                let subscription_id = PoolSubscriptionId((subscription_id.0, None));
-                let mut subscription = self.subscriptions.write().await;
-                if let Some(s) = subscription.get_mut(&subscription_id) {
-                    let old_status = s.status;
-                    s.status = Status::Subscribed;
-
-                    if old_status == Status::Fetching {
-                        return_to
-                            .try_send((
-                                Response::EndOfStoredEvents(subscription_id.0 .0.into()),
-                                url,
-                            ))
-                            .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                let subscription_id = PoolSubscriptionId(subscription_id.0);
+                let mut subscriptions = self.subscriptions.write().await;
+
+                if let Some(s) = subscriptions.get_mut(&subscription_id) {
+                    s.end_of_stored_events += 1;
+                    if s.end_of_stored_events
+                        >= s.active_subscription
+                            .as_ref()
+                            .map(|x| x.len())
+                            .unwrap_or_default()
+                    {
+                        // all clients have received the EOD
+                        let old_status = s.status;
+                        s.status = Status::Subscribed;
+
+                        self.active_subscription_scheduler();
+
+                        if old_status == Status::Fetching {
+                            return_to
+                                .try_send((
+                                    Response::EndOfStoredEvents(
+                                        s.subscription_request.subscription_id.clone().into(),
+                                    ),
+                                    url,
+                                ))
+                                .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                        }
                     }
                 }
 
-                self.active_subscription_scheduler();
-
                 Ok(())
             }
             any_message => {
@@ -176,39 +225,60 @@ impl Scheduler {
             let items = subscription_queue.len();
 
             // A subscription must be descheduled as its place is needed.
-            let mut deschedule =
+            let deschedule =
                 |subscriptions: &mut RwLockWriteGuard<
                     '_,
                     BTreeMap<PoolSubscriptionId, SubscriptionInner>,
                 >,
                  subscription_queue: &mut RwLockWriteGuard<'_, VecDeque<PoolSubscriptionId>>|
                  -> bool {
-                    for subscription_id in subscription_queue.iter() {
-                        let mut subscription =
+                    let mut active = 0;
+
+                    for (i, subscription_id) in subscription_queue.iter().enumerate() {
+                        let subscription =
                             if let Some(subscription) = subscriptions.get_mut(subscription_id) {
                                 subscription
                             } else {
                                 continue;
                             };
 
-                        if matches!(subscription.status, Status::Subscribed) {
-                            // unsubscribe
-                            let _ = subscription.active_subscription.take();
-                            // update counter
-                            this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
-                            // update since for next request
-                            let now = Utc::now();
-                            for filter in subscription.subscription_request.filters.iter_mut() {
-                                filter.since = Some(now);
+                        match subscription.status {
+                            Status::Subscribed => {
+                                // unsubscribe
+                                subscription.status = Status::Requeued;
+                                subscription.active_subscription.take();
+                                // update counter
+                                this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                                // update since for next request
+                                let now = Utc::now();
+                                log::info!(
+                                    "Deschedule subscription {}",
+                                    subscription.subscription_request.subscription_id
+                                );
+                                for filter in subscription.subscription_request.filters.iter_mut() {
+                                    filter.since = Some(now);
+                                }
+                                return true;
                             }
-                            return true;
+                            Status::Fetching | Status::Refetching => {
+                                active += 1;
+                                if active >= MAX_ACTIVE_SUBSCRIPTIONS {
+                                    log::info!(
+                                        "Breaking after {} attempts (total {})",
+                                        i,
+                                        subscription_queue.len()
+                                    );
+                                    break;
+                                }
+                            }
+                            _ => {}
                         }
                     }
 
                     false
                 };
 
-            for _ in (0..items) {
+            for _ in 0..items {
                 let subscription_id = if let Some(subscription_id) = subscription_queue.pop_front()
                 {
                     subscription_id
@@ -222,13 +292,15 @@ impl Scheduler {
                         continue;
                     };
 
-                // add subscription id back to the last element, to be visited later
-                subscription_queue.push_back(subscription_id.clone());
-
-                let prev_status = subscription.status;
+                if matches!(subscription.status, Status::Fetching | Status::Refetching) {
+                    subscription_queue.push_front(subscription_id.clone());
+                } else {
+                    subscription_queue.push_back(subscription_id.clone());
+                }
 
-                if matches!(prev_status, Status::Queued | Status::Requeued) {
-                    if this.active_subscriptions.load(Ordering::SeqCst) >= MAX_SUBSCRIPTIONS
+                if matches!(subscription.status, Status::Queued | Status::Requeued) {
+                    let prev_status = subscription.status;
+                    if this.active_subscriptions.load(Ordering::SeqCst) >= MAX_ACTIVE_SUBSCRIPTIONS
                         && !deschedule(&mut subscriptions, &mut subscription_queue)
                     {
                         subscriptions.insert(subscription_id, subscription);
@@ -236,6 +308,11 @@ impl Scheduler {
                         break;
                     }
 
+                    // This connection is active now (or will be), therefore move it to the front
+                    // of the subscription queue so it can be descheduled first when the times comes
+                    let _ = subscription_queue.pop_back();
+                    subscription_queue.push_front(subscription_id.clone());
+
                     let wait_all = clients
                         .values()
                         .map(|(_, sender)| {
@@ -248,12 +325,17 @@ impl Scheduler {
                         .into_iter()
                         .collect::<Result<Vec<_>, _>>()
                     {
+                        log::info!(
+                            "Promoting subscription {} to active",
+                            subscription.subscription_request.subscription_id
+                        );
                         subscription.active_subscription = Some(active_subscriptions);
                         subscription.status = if prev_status == Status::Queued {
                             Status::Fetching
                         } else {
-                            Status::Resubscribed
+                            Status::Refetching
                         };
+                        subscription.end_of_stored_events = 0;
 
                         this.active_subscriptions.fetch_add(1, Ordering::Relaxed);
                     }
@@ -266,22 +348,22 @@ impl Scheduler {
     }
 
     /// Creates a new subscription with a given filters
-    pub async fn subcribe(
+    pub async fn subscribe(
         self: &Arc<Self>,
         subscription_request: subscribe::Subscribe,
-        specific_url: Option<Url>,
-    ) -> ActiveSubscription {
-        let subscription_id = PoolSubscriptionId((
-            subscription_request.subscription_id.clone(),
-            specific_url.clone(),
-        ));
-
-        self.subscriptions.write().await.insert(
+    ) -> Result<ActiveSubscription, Error> {
+        let mut subscriptions = self.subscriptions.write().await;
+        let subscription_id = PoolSubscriptionId(subscription_request.subscription_id.clone());
+
+        if subscriptions.get(&subscription_id).is_some() {
+            return Err(Error::DuplicateSubscriptionId);
+        }
+
+        subscriptions.insert(
             subscription_id.clone(),
             SubscriptionInner {
-                status: Status::Queued,
-                active_subscription: None,
                 subscription_request,
+                ..Default::default()
             },
         );
 
@@ -294,22 +376,53 @@ impl Scheduler {
         self.total_subscriptions.fetch_add(1, Ordering::Relaxed);
         self.active_subscription_scheduler();
 
-        ActiveSubscription {
+        Ok(ActiveSubscription {
+            id: subscription_id.clone(),
             unsubscriber: Some((subscription_id, self.clone())),
-        }
+        })
     }
 
+    /// Removes a subscription and drop it from the scheduler
     fn remove(self: Arc<Self>, subscription_id: PoolSubscriptionId) {
         let this = self;
         tokio::spawn(async move {
             let mut subscriptions = this.subscriptions.write().await;
-            if let Some(id) = subscriptions.remove(&subscription_id) {
+            this.subscription_queue
+                .write()
+                .await
+                .retain(|x| x != &subscription_id);
+
+            if let Some(sub) = subscriptions.remove(&subscription_id) {
+                log::info!(
+                    "Unsubscribing and dropping from scheduler {}",
+                    subscription_id.0,
+                );
+
                 this.active_subscription_scheduler();
                 this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                if sub.active_subscription.is_some() {
+                    // it is active
+                    this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                }
             }
         });
     }
 
+    /// debug
+    pub async fn debug(&self) -> String {
+        let a = self.subscriptions.read().await;
+        format!(
+            "Active: {} - {}",
+            serde_json::to_string(
+                &a.iter()
+                    .map(|(k, v)| (k.to_string(), format!("{:?}", v.status)))
+                    .collect::<Vec<_>>(),
+            )
+            .unwrap(),
+            a.len(),
+        )
+    }
+
     /// Total number of subscribers
     pub fn total_subscribers(&self) -> usize {
         self.total_subscriptions.load(Ordering::Relaxed)

+ 22 - 3
crates/relayer/src/connection/local.rs

@@ -4,17 +4,22 @@
 use crate::{connection::ConnectionId, Error, Relayer};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{Request, Response};
-use std::sync::Arc;
-use tokio::sync::mpsc::{Receiver, Sender};
+use std::{sync::Arc, time::Duration};
+use tokio::{
+    sync::mpsc::{Receiver, Sender},
+    task::JoinHandle,
+    time::sleep,
+};
 
 /// Local connection
 pub struct LocalConnection<T>
 where
     T: Storage + Send + Sync + 'static,
 {
+    /// The connection ID
+    pub conn_id: ConnectionId,
     sender: Sender<(ConnectionId, Request)>,
     receiver: Receiver<Response>,
-    pub(crate) conn_id: ConnectionId,
     relayer: Arc<Relayer<T>>,
 }
 
@@ -22,6 +27,10 @@ impl<T> LocalConnection<T>
 where
     T: Storage + Send + Sync + 'static,
 {
+    /// Number of queued messages
+    pub fn queued_messages(&self) -> usize {
+        self.receiver.len()
+    }
     /// Receive a message from the relayer
     pub async fn recv(&mut self) -> Option<Response> {
         self.receiver.recv().await
@@ -32,6 +41,16 @@ where
         self.receiver.try_recv().ok()
     }
 
+    /// Queues sending a message to the relayer in the future time
+    pub fn future_send(&self, request: Request, in_the_future: Duration) -> JoinHandle<()> {
+        let sender = self.sender.clone();
+        let conn_id = self.conn_id.clone();
+        tokio::spawn(async move {
+            sleep(in_the_future).await;
+            let _ = sender.send((conn_id, request)).await;
+        })
+    }
+
     /// Sends a request to the relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         self.sender

+ 6 - 9
crates/relayer/src/connection/mod.rs

@@ -42,14 +42,6 @@ impl ConnectionId {
     pub fn new_empty() -> Self {
         Self(0)
     }
-
-    /// Check if the connection id is empty
-    ///
-    /// Empty connection id is used for messages from Client pool to the relayer
-    #[inline]
-    pub fn is_empty(&self) -> bool {
-        self.0 == 0
-    }
 }
 
 type CompoundSubcription = (
@@ -68,7 +60,7 @@ pub struct Connection {
     handler: Option<JoinHandle<()>>,
 }
 
-const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
+const MAX_SUBSCRIPTIONS_BUFFER: usize = 100_000;
 
 impl Drop for Connection {
     fn drop(&mut self) {
@@ -93,6 +85,11 @@ impl Connection {
         )
     }
 
+    /// If this connection is a local connection and not a TCP-extenal connection
+    pub fn is_local_connection(&self) -> bool {
+        self.handler.is_none()
+    }
+
     /// Create new connection
     pub async fn new_connection(
         send_message_to_relayer: Sender<(ConnectionId, Request)>,

+ 140 - 91
crates/relayer/src/relayer.rs

@@ -3,7 +3,7 @@ use crate::{
     Connection, Error,
 };
 use futures_util::StreamExt;
-use nostr_rs_client::{Error as ClientError, Pool, Url};
+use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_subscription_manager::SubscriptionManager;
 use nostr_rs_types::{
@@ -18,10 +18,12 @@ use std::{
 };
 use tokio::{
     net::{TcpListener, TcpStream},
-    sync::mpsc::{channel, Receiver, Sender},
-};
-use tokio::{
-    sync::{mpsc, RwLock},
+    sync::{
+        mpsc::{
+            self, {channel, Receiver, Sender},
+        },
+        RwLock,
+    },
     task::JoinHandle,
 };
 
@@ -47,27 +49,23 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// otherwise all the messages are going to be ephemeral, making this
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// be able to perform any optimization like prefetching content while offline
-    storage: Option<T>,
+    storage: Arc<Option<T>>,
     /// Subscription manager
     subscription_manager: Arc<SubscriptionManager<RelayerSubscriptionId, ()>>,
     /// List of all active connections
-    connections: RwLock<HashMap<ConnectionId, Connection>>,
+    connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
     send_to_relayer: Sender<(ConnectionId, Request)>,
     /// This Receiver is the relayer the way the relayer receives messages
     relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
+
     /// Client pool
     ///
-    /// A relayer can optionally be connected to a pool of clients to get foreign events.
-    client_pool: Option<(Pool, JoinHandle<()>)>,
-}
-
-impl<T: Storage + Send + Sync + 'static> Drop for Relayer<T> {
-    fn drop(&mut self) {
-        if let Some((_, handle)) = self.client_pool.take() {
-            handle.abort();
-        }
-    }
+    /// A relayer can optionally be connected to a pool of clients to get
+    /// foreign events.
+    client_pool: Option<Pool>,
+    client_pool_receiver: Option<Receiver<(Response, Url)>>,
+    client_pool_subscriptions: RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>,
 }
 
 impl<T: Storage + Send + Sync + 'static> Relayer<T> {
@@ -80,25 +78,36 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     /// and create a network of relayers, reposting events to them and
     /// subscribing to their events.`gqq`
     pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
-        let (sender, receiver) = channel(100_000);
+        let (relayer_sender, relayer_receiver) = channel(100_000);
+
+        let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool {
+            let result = client_pool.split()?;
+            (result.0, Some(result.1))
+        } else {
+            let (_, receiver) = mpsc::channel(1);
+            (receiver, None)
+        };
+
         Ok(Self {
-            storage,
+            storage: Arc::new(storage),
             subscription_manager: Default::default(),
-            send_to_relayer: sender.clone(),
-            relayer_receiver: Some(receiver),
+            send_to_relayer: relayer_sender,
+            relayer_receiver: Some(relayer_receiver),
             connections: Default::default(),
-            client_pool: if let Some(client_pool) = client_pool {
-                Some(Self::handle_client_pool(client_pool, sender)?)
-            } else {
-                None
-            },
+            client_pool_receiver: Some(client_pool_receiver),
+            client_pool: client_pool,
+            client_pool_subscriptions: Default::default(),
         })
     }
 
     /// Connects to the relayer pool
     pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
-        let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
-        let _ = client_pool.connect_to(url).await;
+        let _ = self
+            .client_pool
+            .as_ref()
+            .ok_or(Error::NoClient)?
+            .connect_to(url)
+            .await?;
         Ok(())
     }
 
@@ -117,7 +126,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     ///
     /// This function consumes the object and takes the ownership. The returned
     /// JoinHandle() can be used to stop the main loop
-    pub fn main(self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
+    pub fn main(mut self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
+        let mut client_pool_receiver = self
+            .client_pool_receiver
+            .take()
+            .ok_or(Error::AlreadySplitted)?;
+
         let (this, mut receiver) = self.split()?;
         let _self = Arc::new(this);
         let this = _self.clone();
@@ -126,22 +140,36 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             loop {
                 tokio::select! {
                     Ok((stream, _)) = server.accept() => {
-                        // accept new external connections
+                        // accept new connections
                         let _ = this.add_connection(None, stream).await;
                     },
-                    Some((conn_id, request)) = receiver.recv() => {
-                        // receive messages from the connection pool
-                        if conn_id.is_empty() {
-                            // message received from client pool
-                            if let Request::Event(event) = request {
+                    Some((response, _)) = client_pool_receiver.recv() => {
+                        // process messages from anothe relayer, broadcast it and store it
+                        match response {
+                            Response::Event(event) => {
+                                // we received a message from the client pool, store it locally
+                                // and re-broadcast it.
                                 let _ = this.broadcast(event.deref()).await;
-                                if let Some(storage) = this.storage.as_ref() {
-                                    let _ = storage.store_local_event(&event).await;
-                                }
                             }
-                            continue;
+                            Response::EndOfStoredEvents(sub) => {
+                                let connections = this.connections.read().await;
+                                let (sub_id, connection) = if let Some((sub_id, conn_id)) = this.client_pool_subscriptions.write().await.remove(&(sub.deref().into())) {
+                                     if let Some(connection) = connections.get(&conn_id) {
+                                        (sub_id, connection)
+                                    } else {
+                                        continue;
+                                    }
+                                } else {
+                                    continue
+                                };
+
+                                let _ = connection.send(Response::EndOfStoredEvents(sub_id.into()));
+                            }
+                            _ => {}
                         }
-
+                    }
+                    Some((conn_id, request)) = receiver.recv() => {
+                        // receive messages from our clients
                         let connections = this.connections.read().await;
                         let connection = if let Some(connection) = connections.get(&conn_id) {
                             connection
@@ -162,37 +190,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         Ok((_self, handle))
     }
 
-    /// Handle the client pool
-    ///
-    /// Main loop to consume messages from the client pool and broadcast them to the local subscribers
-    fn handle_client_pool(
-        client_pool: Pool,
-        send_message_to_relayer: Sender<(ConnectionId, Request)>,
-    ) -> Result<(Pool, JoinHandle<()>), ClientError> {
-        let (mut receiver, client_pool) = client_pool.split()?;
-
-        let handle = tokio::spawn(async move {
-            loop {
-                if let Some((response, _)) = receiver.recv().await {
-                    match response {
-                        Response::Event(event) => {
-                            let _ = send_message_to_relayer.try_send((
-                                ConnectionId::new_empty(),
-                                Request::Event(event.event.into()),
-                            ));
-                        }
-                        Response::EndOfStoredEvents(_) => {}
-                        x => {
-                            println!("x => {:?}", x);
-                        }
-                    }
-                }
-            }
-        });
-
-        Ok((client_pool, handle))
-    }
-
     /// Returns a reference to the internal database
     pub fn get_db(&self) -> &Option<T> {
         &self.storage
@@ -269,7 +266,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     let _ = storage.store_local_event(&event).await;
                 }
 
-                if let Some((client_pool, _)) = self.client_pool.as_ref() {
+                if let Some(client_pool) = self.client_pool.as_ref() {
                     // pass the event to the pool of clients, so this relayer can relay
                     // their local events to the clients in the network of relayers
                     let _ = client_pool.post(event).await;
@@ -284,11 +281,26 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 )?;
             }
             Request::Request(request) => {
-                let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref()
-                {
-                    // pass the subscription request to the pool of clients, so this relayer
-                    // can relay any unknown event to the clients through their subscriptions
-                    Some(client_pool.subscribe(request.clone()).await)
+                let foreign_subscription = if let Some(client_pool) = self.client_pool.as_ref() {
+                    // If this relay is connected to other relays through the
+                    // client pool, create the same subscription in them as
+                    // well, with the main goal of fetching any foreign event
+                    // that matches the requested subscription.
+                    //
+                    // If the this happens, this relay will serve any local
+                    // event that matches, as well any foreign event. Foreign
+                    // events will be stored locally as well if there is a
+                    // storage setup.
+                    let foreign_sub_id = client_pool
+                        .subscribe(request.filters.clone().into())
+                        .await?;
+
+                    self.client_pool_subscriptions.write().await.insert(
+                        foreign_sub_id.clone(),
+                        (request.subscription_id.clone(), connection.get_conn_id()),
+                    );
+
+                    Some(foreign_sub_id)
                 } else {
                     None
                 };
@@ -315,8 +327,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     }
                 }
 
-                let _ = connection
-                    .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+                if foreign_subscription.is_none() {
+                    // If there is a foreign subscription, we shouldn't send a
+                    // EOS until we have got EOS from all foreign relays
+                    let _ = connection
+                        .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+                }
 
                 connection
                     .subscribe(
@@ -343,6 +359,38 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     }
 
     #[inline]
+    /// A non-blocking version of broadcast
+    #[allow(dead_code)]
+    fn broadcast_and_forget(&self, event: Event) {
+        let storage = self.storage.clone();
+        let connections = self.connections.clone();
+        let subscription_manager = self.subscription_manager.clone();
+
+        tokio::spawn(async move {
+            if let Some(storage) = storage.as_ref() {
+                if !storage.store(&event).await.unwrap_or_default() {
+                    return;
+                }
+            }
+
+            let connections = connections.read().await;
+            for RelayerSubscriptionId((sub_id, conn_id)) in
+                subscription_manager.get_subscribers(&event).await
+            {
+                if let Some(connection) = connections.get(&conn_id) {
+                    let _ = connection.send(
+                        relayer::Event {
+                            subscription_id: sub_id,
+                            event: event.clone(),
+                        }
+                        .into(),
+                    );
+                }
+            }
+        });
+    }
+
+    #[inline]
     /// Broadcast a given event to all local subscribers
     pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
         if let Some(storage) = self.storage.as_ref() {
@@ -520,7 +568,7 @@ mod test {
             "authors": [
               "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           {
             "#p": [
@@ -533,7 +581,7 @@ mod test {
               7,
               9735
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           {
             "#p": [
@@ -1012,18 +1060,19 @@ mod test {
         let (main_client, _main_client_inscope) =
             Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
 
-        let _sub = reader_client.subscribe(Default::default()).await;
+        let _sub = reader_client
+            .subscribe(Default::default())
+            .await
+            .expect("v");
 
         sleep(Duration::from_millis(20)).await;
 
-        for _ in 0..3 {
-            assert!(reader_client
-                .try_recv()
-                .map(|(r, _)| r)
-                .expect("valid message")
-                .as_end_of_stored_events()
-                .is_some());
-        }
+        assert!(reader_client
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message: step")
+            .as_end_of_stored_events()
+            .is_some());
         assert!(reader_client.try_recv().is_none());
 
         let account1 = Account::default();
@@ -1083,7 +1132,7 @@ mod test {
         // connected to the main relayer
         let (mut main_client, _in_scope) =
             Pool::new_with_clients(vec![main_relayer]).expect("valid client");
-        let _sub = main_client.subscribe(Default::default()).await;
+        let _sub = main_client.subscribe(Default::default()).await.expect("v");
 
         sleep(Duration::from_millis(10)).await;
         assert!(main_client

+ 2 - 2
crates/subscription-manager/src/filter.rs

@@ -134,12 +134,12 @@ impl SortedFilter {
         }
 
         if let Some(since) = self.since {
-            if event.created_at() < since {
+            if event.created_at() >= since {
                 return false;
             }
         }
         if let Some(until) = self.until {
-            if event.created_at() > until {
+            if event.created_at() <= until {
                 return false;
             }
         }