|  | @@ -199,20 +199,25 @@ mod test {
 | 
	
		
			
				|  |  |      use super::*;
 | 
	
		
			
				|  |  |      use nostr_rs_memory::Memory;
 | 
	
		
			
				|  |  |      use nostr_rs_relayer::Relayer;
 | 
	
		
			
				|  |  | -    use nostr_rs_types::{account::Account, types::Content};
 | 
	
		
			
				|  |  | +    use nostr_rs_types::{
 | 
	
		
			
				|  |  | +        account::Account,
 | 
	
		
			
				|  |  | +        types::{Content, Filter},
 | 
	
		
			
				|  |  | +    };
 | 
	
		
			
				|  |  |      use std::time::Duration;
 | 
	
		
			
				|  |  | +    use subscription::MAX_ACTIVE_SUBSCRIPTIONS;
 | 
	
		
			
				|  |  |      use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
 | 
	
		
			
				|  |  | +    async fn dummy_server(port: u16) -> (Url, Arc<Relayer<Memory>>, JoinHandle<()>) {
 | 
	
		
			
				|  |  |          let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
	
		
			
				|  |  |              .await
 | 
	
		
			
				|  |  |              .unwrap();
 | 
	
		
			
				|  |  |          let local_addr = listener.local_addr().expect("addr");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
 | 
	
		
			
				|  |  | -        let (_, stopper) = relayer.main(listener).expect("valid main loop");
 | 
	
		
			
				|  |  | +        let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
 | 
	
		
			
				|  |  |          (
 | 
	
		
			
				|  |  |              Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
 | 
	
		
			
				|  |  | +            relayer,
 | 
	
		
			
				|  |  |              stopper,
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -233,7 +238,7 @@ mod test {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      #[tokio::test]
 | 
	
		
			
				|  |  |      async fn connect_to_dummy_server() {
 | 
	
		
			
				|  |  | -        let (addr, stopper) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +        let (addr, _, stopper) = dummy_server(0).await;
 | 
	
		
			
				|  |  |          let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assert_eq!(0, client_pool.check_active_connections().await);
 | 
	
	
		
			
				|  | @@ -250,7 +255,7 @@ mod test {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      #[tokio::test]
 | 
	
		
			
				|  |  |      async fn two_clients_communication() {
 | 
	
		
			
				|  |  | -        let (addr, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +        let (addr, _, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  |          let (mut client_pool1, _c1) =
 | 
	
		
			
				|  |  |              Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  |          let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 | 
	
	
		
			
				|  | @@ -288,7 +293,7 @@ mod test {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      #[tokio::test]
 | 
	
		
			
				|  |  |      async fn reconnect_and_resubscribe() {
 | 
	
		
			
				|  |  | -        let (addr, stopper) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +        let (addr, _, stopper) = dummy_server(0).await;
 | 
	
		
			
				|  |  |          let (mut client_pool1, _c1) =
 | 
	
		
			
				|  |  |              Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  |          let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
 | 
	
	
		
			
				|  | @@ -333,7 +338,7 @@ mod test {
 | 
	
		
			
				|  |  |          assert_eq!(0, client_pool1.check_active_connections().await);
 | 
	
		
			
				|  |  |          assert_eq!(0, client_pool2.check_active_connections().await);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        let (_, stopper) = dummy_server(addr.port().expect("port")).await;
 | 
	
		
			
				|  |  | +        let (_, _, stopper) = dummy_server(addr.port().expect("port")).await;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          sleep(Duration::from_millis(2_000)).await;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -362,8 +367,8 @@ mod test {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      #[tokio::test]
 | 
	
		
			
				|  |  |      async fn connect_multiple_servers() {
 | 
	
		
			
				|  |  | -        let (addr1, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | -        let (addr2, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +        let (addr1, _, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +        let (addr2, _, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  |          let (mut client_pool1, _c1) =
 | 
	
		
			
				|  |  |              Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
 | 
	
		
			
				|  |  |          let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
 | 
	
	
		
			
				|  | @@ -397,4 +402,416 @@ mod test {
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |          assert!(client_pool1.try_recv().is_none());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /// Client pool user creates 101 subscription (that is not allowed by many
 | 
	
		
			
				|  |  | +    /// relays), so the pool will do a round-robin subscriptions keeping 5
 | 
	
		
			
				|  |  | +    /// actives at a time with an internal scheduler.
 | 
	
		
			
				|  |  | +    ///
 | 
	
		
			
				|  |  | +    /// The scheduler will pause active subscriptions when the EOS is received.
 | 
	
		
			
				|  |  | +    /// On the next round `since` will be used to receive only newer events
 | 
	
		
			
				|  |  | +    mod scheduler {
 | 
	
		
			
				|  |  | +        use super::*;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        #[tokio::test]
 | 
	
		
			
				|  |  | +        async fn stored_first() {
 | 
	
		
			
				|  |  | +            let (addr1, relayer, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let (mut client_pool1, _c1) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +            let (client_pool2, _c2) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let account1 = Account::default();
 | 
	
		
			
				|  |  | +            let signed_content = account1
 | 
	
		
			
				|  |  | +                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
 | 
	
		
			
				|  |  | +                .expect("valid signed content");
 | 
	
		
			
				|  |  | +            client_pool2.post(signed_content.clone().into()).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let first_sub = client_pool1
 | 
	
		
			
				|  |  | +                .subscribe(
 | 
	
		
			
				|  |  | +                    Filter {
 | 
	
		
			
				|  |  | +                        ids: vec![signed_content.id.clone()],
 | 
	
		
			
				|  |  | +                        ..Default::default()
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    .into(),
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +                .await
 | 
	
		
			
				|  |  | +                .expect("valid subs, last");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let subs = join_all(
 | 
	
		
			
				|  |  | +                (0..100)
 | 
	
		
			
				|  |  | +                    .into_iter()
 | 
	
		
			
				|  |  | +                    .map(|_| {
 | 
	
		
			
				|  |  | +                        client_pool1.subscribe(
 | 
	
		
			
				|  |  | +                            Filter {
 | 
	
		
			
				|  |  | +                                authors: vec![
 | 
	
		
			
				|  |  | +                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
 | 
	
		
			
				|  |  | +                                    .parse()
 | 
	
		
			
				|  |  | +                                    .unwrap(),
 | 
	
		
			
				|  |  | +                            ],
 | 
	
		
			
				|  |  | +                                ..Default::default()
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                            .into(),
 | 
	
		
			
				|  |  | +                        )
 | 
	
		
			
				|  |  | +                    })
 | 
	
		
			
				|  |  | +                    .collect::<Vec<_>>(),
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            .await
 | 
	
		
			
				|  |  | +            .into_iter()
 | 
	
		
			
				|  |  | +            .collect::<Result<Vec<_>, _>>()
 | 
	
		
			
				|  |  | +            .expect("valid 100 dummy subs");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for _ in 1..10 {
 | 
	
		
			
				|  |  | +                sleep(Duration::from_millis(10)).await;
 | 
	
		
			
				|  |  | +                assert!(
 | 
	
		
			
				|  |  | +                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                    "total subs {}",
 | 
	
		
			
				|  |  | +                    relayer.total_subscribers()
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let mut has_receive_event = false;
 | 
	
		
			
				|  |  | +            for _ in 0..102 {
 | 
	
		
			
				|  |  | +                let event = client_pool1
 | 
	
		
			
				|  |  | +                    .try_recv()
 | 
	
		
			
				|  |  | +                    .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                    .expect("valid event");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                if has_receive_event {
 | 
	
		
			
				|  |  | +                    assert!(event.as_end_of_stored_events().is_some());
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    if let Some(ev) = event.as_event() {
 | 
	
		
			
				|  |  | +                        assert_eq!(ev.id, signed_content.id);
 | 
	
		
			
				|  |  | +                        has_receive_event = true;
 | 
	
		
			
				|  |  | +                    } else {
 | 
	
		
			
				|  |  | +                        assert!(event.as_end_of_stored_events().is_some());
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            drop(subs);
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                1,
 | 
	
		
			
				|  |  | +                relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                "total subs {}",
 | 
	
		
			
				|  |  | +                relayer.total_subscribers()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            drop(first_sub);
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                0,
 | 
	
		
			
				|  |  | +                relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                "total subs {}",
 | 
	
		
			
				|  |  | +                relayer.total_subscribers()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        #[tokio::test]
 | 
	
		
			
				|  |  | +        async fn stored_last() {
 | 
	
		
			
				|  |  | +            let (addr1, relayer, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let (mut client_pool1, _c1) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +            let (client_pool2, _c2) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let account1 = Account::default();
 | 
	
		
			
				|  |  | +            let signed_content = account1
 | 
	
		
			
				|  |  | +                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
 | 
	
		
			
				|  |  | +                .expect("valid signed content");
 | 
	
		
			
				|  |  | +            client_pool2.post(signed_content.clone().into()).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let mut subs = join_all(
 | 
	
		
			
				|  |  | +                (0..100)
 | 
	
		
			
				|  |  | +                    .into_iter()
 | 
	
		
			
				|  |  | +                    .map(|_| {
 | 
	
		
			
				|  |  | +                        client_pool1.subscribe(
 | 
	
		
			
				|  |  | +                            Filter {
 | 
	
		
			
				|  |  | +                                authors: vec![
 | 
	
		
			
				|  |  | +                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
 | 
	
		
			
				|  |  | +                                    .parse()
 | 
	
		
			
				|  |  | +                                    .unwrap(),
 | 
	
		
			
				|  |  | +                            ],
 | 
	
		
			
				|  |  | +                                ..Default::default()
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                            .into(),
 | 
	
		
			
				|  |  | +                        )
 | 
	
		
			
				|  |  | +                    })
 | 
	
		
			
				|  |  | +                    .collect::<Vec<_>>(),
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            .await
 | 
	
		
			
				|  |  | +            .into_iter()
 | 
	
		
			
				|  |  | +            .collect::<Result<Vec<_>, _>>()
 | 
	
		
			
				|  |  | +            .expect("valid 100 dummy subs");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for _ in 1..10 {
 | 
	
		
			
				|  |  | +                sleep(Duration::from_millis(10)).await;
 | 
	
		
			
				|  |  | +                assert!(
 | 
	
		
			
				|  |  | +                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                    "total subs {}",
 | 
	
		
			
				|  |  | +                    relayer.total_subscribers()
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for _ in 0..100 {
 | 
	
		
			
				|  |  | +                assert!(client_pool1
 | 
	
		
			
				|  |  | +                    .try_recv()
 | 
	
		
			
				|  |  | +                    .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                    .expect("valid message")
 | 
	
		
			
				|  |  | +                    .as_end_of_stored_events()
 | 
	
		
			
				|  |  | +                    .is_some());
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assert!(client_pool1.try_recv().is_none());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            subs.push(
 | 
	
		
			
				|  |  | +                client_pool1
 | 
	
		
			
				|  |  | +                    .subscribe(
 | 
	
		
			
				|  |  | +                        Filter {
 | 
	
		
			
				|  |  | +                            ids: vec![signed_content.id.clone()],
 | 
	
		
			
				|  |  | +                            ..Default::default()
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                        .into(),
 | 
	
		
			
				|  |  | +                    )
 | 
	
		
			
				|  |  | +                    .await
 | 
	
		
			
				|  |  | +                    .expect("valid subs, last"),
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                client_pool1
 | 
	
		
			
				|  |  | +                    .try_recv()
 | 
	
		
			
				|  |  | +                    .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                    .expect("valid message")
 | 
	
		
			
				|  |  | +                    .as_event()
 | 
	
		
			
				|  |  | +                    .expect("valid event")
 | 
	
		
			
				|  |  | +                    .id,
 | 
	
		
			
				|  |  | +                signed_content.id
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            drop(subs);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                0,
 | 
	
		
			
				|  |  | +                relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                "total subs {}",
 | 
	
		
			
				|  |  | +                relayer.total_subscribers()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        #[tokio::test]
 | 
	
		
			
				|  |  | +        async fn realtime_first() {
 | 
	
		
			
				|  |  | +            let (addr1, relayer, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let (mut client_pool1, _c1) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +            let (client_pool2, _c2) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let account1 = Account::default();
 | 
	
		
			
				|  |  | +            let signed_content = account1
 | 
	
		
			
				|  |  | +                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
 | 
	
		
			
				|  |  | +                .expect("valid signed content");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let first_sub = client_pool1
 | 
	
		
			
				|  |  | +                .subscribe(
 | 
	
		
			
				|  |  | +                    Filter {
 | 
	
		
			
				|  |  | +                        ids: vec![signed_content.id.clone()],
 | 
	
		
			
				|  |  | +                        ..Default::default()
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    .into(),
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +                .await
 | 
	
		
			
				|  |  | +                .expect("valid subs, first");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let subs = join_all(
 | 
	
		
			
				|  |  | +                (0..100)
 | 
	
		
			
				|  |  | +                    .into_iter()
 | 
	
		
			
				|  |  | +                    .map(|_| {
 | 
	
		
			
				|  |  | +                        client_pool1.subscribe(
 | 
	
		
			
				|  |  | +                            Filter {
 | 
	
		
			
				|  |  | +                                authors: vec![
 | 
	
		
			
				|  |  | +                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
 | 
	
		
			
				|  |  | +                                    .parse()
 | 
	
		
			
				|  |  | +                                    .unwrap(),
 | 
	
		
			
				|  |  | +                            ],
 | 
	
		
			
				|  |  | +                                ..Default::default()
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                            .into(),
 | 
	
		
			
				|  |  | +                        )
 | 
	
		
			
				|  |  | +                    })
 | 
	
		
			
				|  |  | +                    .collect::<Vec<_>>(),
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            .await
 | 
	
		
			
				|  |  | +            .into_iter()
 | 
	
		
			
				|  |  | +            .collect::<Result<Vec<_>, _>>()
 | 
	
		
			
				|  |  | +            .expect("valid 100 dummy subs");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for _ in 1..10 {
 | 
	
		
			
				|  |  | +                sleep(Duration::from_millis(10)).await;
 | 
	
		
			
				|  |  | +                assert!(
 | 
	
		
			
				|  |  | +                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                    "total subs {}",
 | 
	
		
			
				|  |  | +                    relayer.total_subscribers()
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for _ in 0..101 {
 | 
	
		
			
				|  |  | +                assert!(client_pool1
 | 
	
		
			
				|  |  | +                    .try_recv()
 | 
	
		
			
				|  |  | +                    .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                    .expect("valid message")
 | 
	
		
			
				|  |  | +                    .as_end_of_stored_events()
 | 
	
		
			
				|  |  | +                    .is_some());
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assert!(client_pool1.try_recv().is_none());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            client_pool2.post(signed_content.clone().into()).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                client_pool1
 | 
	
		
			
				|  |  | +                    .try_recv()
 | 
	
		
			
				|  |  | +                    .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                    .expect("valid message")
 | 
	
		
			
				|  |  | +                    .as_event()
 | 
	
		
			
				|  |  | +                    .expect("valid event")
 | 
	
		
			
				|  |  | +                    .id,
 | 
	
		
			
				|  |  | +                signed_content.id
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            drop(subs);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                1,
 | 
	
		
			
				|  |  | +                relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                "total subs {}",
 | 
	
		
			
				|  |  | +                relayer.total_subscribers()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            drop(first_sub);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                0,
 | 
	
		
			
				|  |  | +                relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                "total subs {}",
 | 
	
		
			
				|  |  | +                relayer.total_subscribers()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        #[tokio::test]
 | 
	
		
			
				|  |  | +        async fn realtime_last() {
 | 
	
		
			
				|  |  | +            let (addr1, relayer, _) = dummy_server(0).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let (mut client_pool1, _c1) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +            let (client_pool2, _c2) =
 | 
	
		
			
				|  |  | +                Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let account1 = Account::default();
 | 
	
		
			
				|  |  | +            let signed_content = account1
 | 
	
		
			
				|  |  | +                .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
 | 
	
		
			
				|  |  | +                .expect("valid signed content");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            let mut subs = join_all(
 | 
	
		
			
				|  |  | +                (0..100)
 | 
	
		
			
				|  |  | +                    .into_iter()
 | 
	
		
			
				|  |  | +                    .map(|_| {
 | 
	
		
			
				|  |  | +                        client_pool1.subscribe(
 | 
	
		
			
				|  |  | +                            Filter {
 | 
	
		
			
				|  |  | +                                authors: vec![
 | 
	
		
			
				|  |  | +                                "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
 | 
	
		
			
				|  |  | +                                    .parse()
 | 
	
		
			
				|  |  | +                                    .unwrap(),
 | 
	
		
			
				|  |  | +                            ],
 | 
	
		
			
				|  |  | +                                ..Default::default()
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                            .into(),
 | 
	
		
			
				|  |  | +                        )
 | 
	
		
			
				|  |  | +                    })
 | 
	
		
			
				|  |  | +                    .collect::<Vec<_>>(),
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +            .await
 | 
	
		
			
				|  |  | +            .into_iter()
 | 
	
		
			
				|  |  | +            .collect::<Result<Vec<_>, _>>()
 | 
	
		
			
				|  |  | +            .expect("valid 100 dummy subs");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for _ in 1..10 {
 | 
	
		
			
				|  |  | +                sleep(Duration::from_millis(10)).await;
 | 
	
		
			
				|  |  | +                assert!(
 | 
	
		
			
				|  |  | +                    MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                    "total subs {}",
 | 
	
		
			
				|  |  | +                    relayer.total_subscribers()
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            for _ in 0..100 {
 | 
	
		
			
				|  |  | +                assert!(client_pool1
 | 
	
		
			
				|  |  | +                    .try_recv()
 | 
	
		
			
				|  |  | +                    .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                    .expect("valid message")
 | 
	
		
			
				|  |  | +                    .as_end_of_stored_events()
 | 
	
		
			
				|  |  | +                    .is_some());
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assert!(client_pool1.try_recv().is_none());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            subs.push(
 | 
	
		
			
				|  |  | +                client_pool1
 | 
	
		
			
				|  |  | +                    .subscribe(
 | 
	
		
			
				|  |  | +                        Filter {
 | 
	
		
			
				|  |  | +                            ids: vec![signed_content.id.clone()],
 | 
	
		
			
				|  |  | +                            ..Default::default()
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                        .into(),
 | 
	
		
			
				|  |  | +                    )
 | 
	
		
			
				|  |  | +                    .await
 | 
	
		
			
				|  |  | +                    .expect("valid subs, last"),
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assert!(client_pool1
 | 
	
		
			
				|  |  | +                .try_recv()
 | 
	
		
			
				|  |  | +                .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                .expect("valid message")
 | 
	
		
			
				|  |  | +                .as_end_of_stored_events()
 | 
	
		
			
				|  |  | +                .is_some());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            client_pool2.post(signed_content.clone().into()).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                client_pool1
 | 
	
		
			
				|  |  | +                    .try_recv()
 | 
	
		
			
				|  |  | +                    .map(|(r, _)| r)
 | 
	
		
			
				|  |  | +                    .expect("valid message")
 | 
	
		
			
				|  |  | +                    .as_event()
 | 
	
		
			
				|  |  | +                    .expect("valid event")
 | 
	
		
			
				|  |  | +                    .id,
 | 
	
		
			
				|  |  | +                signed_content.id
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            drop(subs);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sleep(Duration::from_secs(1)).await;
 | 
	
		
			
				|  |  | +            assert_eq!(
 | 
	
		
			
				|  |  | +                0,
 | 
	
		
			
				|  |  | +                relayer.total_subscribers(),
 | 
	
		
			
				|  |  | +                "total subs {}",
 | 
	
		
			
				|  |  | +                relayer.total_subscribers()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |