6 Angajamente 39279508ba ... 788462cbf9

Autor SHA1 Permisiunea de a trimite mesaje. Dacă este dezactivată, utilizatorul nu va putea trimite nici un fel de mesaj Data
  Cesar Rodas 788462cbf9 Working on personal relayer 10 luni în urmă
  Cesar Rodas de127a47ca Merge branch 'client-pool-improvements' of cesar/nostr-prototype into main 9 luni în urmă
  Cesar Rodas 94836bda5c Client pool and relayer improvements 9 luni în urmă
  Cesar Rodas 39279508ba Fixed subscription and active subscriptions 9 luni în urmă
  Cesar Rodas 65a48e22b5 Fixed tags and base64 types bugs discovered while connecting to other relayers 9 luni în urmă
  Cesar Rodas f8104fa512 Working on personal relayer 10 luni în urmă

+ 426 - 9
crates/client/src/pool/mod.rs

@@ -199,20 +199,25 @@ mod test {
     use super::*;
     use nostr_rs_memory::Memory;
     use nostr_rs_relayer::Relayer;
-    use nostr_rs_types::{account::Account, types::Content};
+    use nostr_rs_types::{
+        account::Account,
+        types::{Content, Filter},
+    };
     use std::time::Duration;
+    use subscription::MAX_ACTIVE_SUBSCRIPTIONS;
     use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
 
-    async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
+    async fn dummy_server(port: u16) -> (Url, Arc<Relayer<Memory>>, JoinHandle<()>) {
         let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
             .await
             .unwrap();
         let local_addr = listener.local_addr().expect("addr");
 
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
-        let (_, stopper) = relayer.main(listener).expect("valid main loop");
+        let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
         (
             Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
+            relayer,
             stopper,
         )
     }
@@ -233,7 +238,7 @@ mod test {
 
     #[tokio::test]
     async fn connect_to_dummy_server() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
         assert_eq!(0, client_pool.check_active_connections().await);
@@ -250,7 +255,7 @@ mod test {
 
     #[tokio::test]
     async fn two_clients_communication() {
-        let (addr, _) = dummy_server(0).await;
+        let (addr, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
@@ -288,7 +293,7 @@ mod test {
 
     #[tokio::test]
     async fn reconnect_and_resubscribe() {
-        let (addr, stopper) = dummy_server(0).await;
+        let (addr, _, stopper) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
@@ -333,7 +338,7 @@ mod test {
         assert_eq!(0, client_pool1.check_active_connections().await);
         assert_eq!(0, client_pool2.check_active_connections().await);
 
-        let (_, stopper) = dummy_server(addr.port().expect("port")).await;
+        let (_, _, stopper) = dummy_server(addr.port().expect("port")).await;
 
         sleep(Duration::from_millis(2_000)).await;
 
@@ -362,8 +367,8 @@ mod test {
 
     #[tokio::test]
     async fn connect_multiple_servers() {
-        let (addr1, _) = dummy_server(0).await;
-        let (addr2, _) = dummy_server(0).await;
+        let (addr1, _, _) = dummy_server(0).await;
+        let (addr2, _, _) = dummy_server(0).await;
         let (mut client_pool1, _c1) =
             Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
         let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
@@ -397,4 +402,416 @@ mod test {
         );
         assert!(client_pool1.try_recv().is_none());
     }
+
+    /// Client pool user creates 101 subscription (that is not allowed by many
+    /// relays), so the pool will do a round-robin subscriptions keeping 5
+    /// actives at a time with an internal scheduler.
+    ///
+    /// The scheduler will pause active subscriptions when the EOS is received.
+    /// On the next round `since` will be used to receive only newer events
+    mod scheduler {
+        use super::*;
+
+        #[tokio::test]
+        async fn stored_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, last");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            sleep(Duration::from_secs(1)).await;
+
+            let mut has_receive_event = false;
+            for _ in 0..102 {
+                let event = client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid event");
+
+                if has_receive_event {
+                    assert!(event.as_end_of_stored_events().is_some());
+                } else {
+                    if let Some(ev) = event.as_event() {
+                        assert_eq!(ev.id, signed_content.id);
+                        has_receive_event = true;
+                    } else {
+                        assert!(event.as_end_of_stored_events().is_some());
+                    }
+                }
+            }
+
+            drop(subs);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn stored_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+            client_pool2.post(signed_content.clone().into()).await;
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_first() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let first_sub = client_pool1
+                .subscribe(
+                    Filter {
+                        ids: vec![signed_content.id.clone()],
+                        ..Default::default()
+                    }
+                    .into(),
+                )
+                .await
+                .expect("valid subs, first");
+
+            let subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..101 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                1,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+
+            drop(first_sub);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+
+        #[tokio::test]
+        async fn realtime_last() {
+            let (addr1, relayer, _) = dummy_server(0).await;
+
+            let (mut client_pool1, _c1) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+            let (client_pool2, _c2) =
+                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
+
+            let account1 = Account::default();
+            let signed_content = account1
+                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+                .expect("valid signed content");
+
+            let mut subs = join_all(
+                (0..100)
+                    .into_iter()
+                    .map(|_| {
+                        client_pool1.subscribe(
+                            Filter {
+                                authors: vec![
+                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
+                                    .parse()
+                                    .unwrap(),
+                            ],
+                                ..Default::default()
+                            }
+                            .into(),
+                        )
+                    })
+                    .collect::<Vec<_>>(),
+            )
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid 100 dummy subs");
+
+            for _ in 1..10 {
+                sleep(Duration::from_millis(10)).await;
+                assert!(
+                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
+                    "total subs {}",
+                    relayer.total_subscribers()
+                );
+            }
+
+            for _ in 0..100 {
+                assert!(client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_end_of_stored_events()
+                    .is_some());
+            }
+
+            assert!(client_pool1.try_recv().is_none());
+
+            subs.push(
+                client_pool1
+                    .subscribe(
+                        Filter {
+                            ids: vec![signed_content.id.clone()],
+                            ..Default::default()
+                        }
+                        .into(),
+                    )
+                    .await
+                    .expect("valid subs, last"),
+            );
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert!(client_pool1
+                .try_recv()
+                .map(|(r, _)| r)
+                .expect("valid message")
+                .as_end_of_stored_events()
+                .is_some());
+
+            client_pool2.post(signed_content.clone().into()).await;
+
+            sleep(Duration::from_secs(1)).await;
+
+            assert_eq!(
+                client_pool1
+                    .try_recv()
+                    .map(|(r, _)| r)
+                    .expect("valid message")
+                    .as_event()
+                    .expect("valid event")
+                    .id,
+                signed_content.id
+            );
+
+            drop(subs);
+
+            sleep(Duration::from_secs(1)).await;
+            assert_eq!(
+                0,
+                relayer.total_subscribers(),
+                "total subs {}",
+                relayer.total_subscribers()
+            );
+        }
+    }
 }

+ 5 - 1
crates/client/src/pool/subscription.rs

@@ -392,7 +392,7 @@ impl Scheduler {
                 .await
                 .retain(|x| x != &subscription_id);
 
-            if subscriptions.remove(&subscription_id).is_some() {
+            if let Some(sub) = subscriptions.remove(&subscription_id) {
                 log::info!(
                     "Unsubscribing and dropping from scheduler {}",
                     subscription_id.0,
@@ -400,6 +400,10 @@ impl Scheduler {
 
                 this.active_subscription_scheduler();
                 this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                if sub.active_subscription.is_some() {
+                    // it is active
+                    this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                }
             }
         });
     }

+ 13 - 4
crates/relayer/src/relayer.rs

@@ -282,8 +282,15 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             }
             Request::Request(request) => {
                 let foreign_subscription = if let Some(client_pool) = self.client_pool.as_ref() {
-                    // pass the subscription request to the pool of clients, so this relayer
-                    // can relay any unknown event to the clients through their subscriptions
+                    // If this relay is connected to other relays through the
+                    // client pool, create the same subscription in them as
+                    // well, with the main goal of fetching any foreign event
+                    // that matches the requested subscription.
+                    //
+                    // If the this happens, this relay will serve any local
+                    // event that matches, as well any foreign event. Foreign
+                    // events will be stored locally as well if there is a
+                    // storage setup.
                     let foreign_sub_id = client_pool
                         .subscribe(request.filters.clone().into())
                         .await?;
@@ -321,6 +328,8 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 }
 
                 if foreign_subscription.is_none() {
+                    // If there is a foreign subscription, we shouldn't send a
+                    // EOS until we have got EOS from all foreign relays
                     let _ = connection
                         .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
                 }
@@ -559,7 +568,7 @@ mod test {
             "authors": [
               "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           {
             "#p": [
@@ -572,7 +581,7 @@ mod test {
               7,
               9735
             ],
-            "since": 1681928304
+            "until": 1681928304
           },
           {
             "#p": [

+ 2 - 2
crates/subscription-manager/src/filter.rs

@@ -134,12 +134,12 @@ impl SortedFilter {
         }
 
         if let Some(since) = self.since {
-            if event.created_at() < since {
+            if event.created_at() >= since {
                 return false;
             }
         }
         if let Some(until) = self.until {
-            if event.created_at() > until {
+            if event.created_at() <= until {
                 return false;
             }
         }