1
0

6 Revīzijas 39279508ba ... 788462cbf9

Autors SHA1 Ziņojums Datums
  Cesar Rodas 788462cbf9 Working on personal relayer 3 mēneši atpakaļ
  Cesar Rodas de127a47ca Merge branch 'client-pool-improvements' of cesar/nostr-prototype into main 1 mēnesi atpakaļ
  Cesar Rodas 94836bda5c Client pool and relayer improvements 1 mēnesi atpakaļ
  Cesar Rodas 39279508ba Fixed subscription and active subscriptions 2 mēneši atpakaļ
  Cesar Rodas 65a48e22b5 Fixed tags and base64 types bugs discovered while connecting to other relayers 2 mēneši atpakaļ
  Cesar Rodas f8104fa512 Working on personal relayer 3 mēneši atpakaļ

+ 426 - 9
crates/client/src/pool/mod.rs

@@ -199,20 +199,25 @@ mod test {
     use super::*;
     use nostr_rs_memory::Memory;
     use nostr_rs_relayer::Relayer;
-    use nostr_rs_types::{account::Account, types::Content};
+    use nostr_rs_types::{
+        account::Account,
+        types::{Content, Filter},
+    };
     use std::time::Duration;
+    use subscription::MAX_ACTIVE_SUBSCRIPTIONS;
     use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
 
-    async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
+    async fn dummy_server(port: u16) -> (Url, Arc<Relayer<Memory>>, JoinHandle<()>) {
         let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
             .await
             .unwrap();
         let local_addr = listener.local_addr().expect("addr");
 
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
-        let (_, stopper) = relayer.main(listener).expect("valid main loop");
+        let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
         (
             Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
+            relayer,
             stopper,
         )
     }
@@ -233,7 +238,7 @@ mod test {
 
     #[tokio::test]
     async fn connect_to_dummy_server() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
         assert_eq!(0, client_pool.check_active_connections().await);
@@ -250,7 +255,7 @@ mod test {
 
     #[tokio::test]
     async fn two_clients_communication() {
-        let (addr, _) = dummy_server(0).await;
+        let (addr, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
@@ -288,7 +293,7 @@ mod test {
 
     #[tokio::test]
     async fn reconnect_and_resubscribe() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
@@ -333,7 +338,7 @@ mod test {
         assert_eq!(0, client_pool1.check_active_connections().await);
         assert_eq!(0, client_pool2.check_active_connections().await);
 
-        let (_, stopper) = dummy_server(addr.port().expect("port")).await;
+        let (_, _, stopper) = dummy_server(addr.port().expect("port")).await;
 
         sleep(Duration::from_millis(2_000)).await;
 
@@ -362,8 +367,8 @@ mod test {
 
     #[tokio::test]
     async fn connect_multiple_servers() {
-        let (addr1, _) = dummy_server(0).await;
-        let (addr2, _) = dummy_server(0).await;
+        let (addr1, _, _) = dummy_server(0).await;
+        let (addr2, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
@@ -397,4 +402,416 @@ mod test {
         );
         assert!(client_pool1.try_recv().is_none());
     }
+
+    /// Client pool user creates 101 subscription (that is not allowed by many
+    /// relays), so the pool will do a round-robin subscriptions keeping 5
+    /// actives at a time with an internal scheduler.
+    ///
+    /// The scheduler will pause active subscriptions when the EOS is received.
+    /// On the next round `since` will be used to receive only newer events
+    mod scheduler {
+        use super::*;
+
+        #[tokio::test]
+        async fn stored_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, last");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            sleep(Duration::from_secs(1)).await;
+
+            let mut has_receive_event = false;
+            for _ in 0..102 {
+                let event = client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid event");
+
+                if has_receive_event {
+                    assert!(event.as_end_of_stored_events().is_some());
+                } else {
+                    if let Some(ev) = event.as_event() {
+                        assert_eq!(ev.id, signed_content.id);
+                        has_receive_event = true;
+                    } else {
+                        assert!(event.as_end_of_stored_events().is_some());
+                    }
+                }
+            }
+
+            drop(subs);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn stored_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, first");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..101 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert!(client_pool1
+                .try_recv()
+                .map(|(r, _)| r)
+                .expect("valid message")
+                .as_end_of_stored_events()
+                .is_some());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+    }
 }

+ 5 - 1
crates/client/src/pool/subscription.rs

@@ -392,7 +392,7 @@ impl Scheduler {
                 .await
                 .retain(|x| x != &subscription_id);
 
-            if subscriptions.remove(&subscription_id).is_some() {
+            if let Some(sub) = subscriptions.remove(&subscription_id) {
                 log::info!(
                     "Unsubscribing and dropping from scheduler {}",
                     subscription_id.0,
@@ -400,6 +400,10 @@ impl Scheduler {
 
                 this.active_subscription_scheduler();
                 this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                if sub.active_subscription.is_some() {
+                    // it is active
+                    this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                }
             }
         });
     }

+ 13 - 4
crates/relayer/src/relayer.rs

@@ -282,8 +282,15 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             }
             Request::Request(request) => {
                 let foreign_subscription = if let Some(client_pool) = self.client_pool.as_ref() {
-                    // pass the subscription request to the pool of clients, so this relayer
-                    // can relay any unknown event to the clients through their subscriptions
+                    // If this relay is connected to other relays through the
+                    // client pool, create the same subscription in them as
+                    // well, with the main goal of fetching any foreign event
+                    // that matches the requested subscription.
+                    //
+                    // If the this happens, this relay will serve any local
+                    // event that matches, as well any foreign event. Foreign
+                    // events will be stored locally as well if there is a
+                    // storage setup.
                     let foreign_sub_id = client_pool
                         .subscribe(request.filters.clone().into())
                         .await?;
@@ -321,6 +328,8 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 }
 
                 if foreign_subscription.is_none() {
+                    // If there is a foreign subscription, we shouldn't send a
+                    // EOS until we have got EOS from all foreign relays
                     let _ = connection
                         .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
                 }
@@ -559,7 +568,7 @@ mod test {
             "authors": [
               "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           {
             "#p": [
@@ -572,7 +581,7 @@ mod test {
               7,
               9735
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           {
             "#p": [

+ 2 - 2
crates/subscription-manager/src/filter.rs

@@ -134,12 +134,12 @@ impl SortedFilter {
         }
 
         if let Some(since) = self.since {
-            if event.created_at() < since {
+            if event.created_at() >= since {
                 return false;
             }
         }
         if let Some(until) = self.until {
-            if event.created_at() > until {
+            if event.created_at() <= until {
                 return false;
             }
         }