|
@@ -199,20 +199,25 @@ mod test {
|
|
use super::*;
|
|
use super::*;
|
|
use nostr_rs_memory::Memory;
|
|
use nostr_rs_memory::Memory;
|
|
use nostr_rs_relayer::Relayer;
|
|
use nostr_rs_relayer::Relayer;
|
|
- use nostr_rs_types::{account::Account, types::Content};
|
|
|
|
|
|
+ use nostr_rs_types::{
|
|
|
|
+ account::Account,
|
|
|
|
+ types::{Content, Filter},
|
|
|
|
+ };
|
|
use std::time::Duration;
|
|
use std::time::Duration;
|
|
|
|
+ use subscription::MAX_ACTIVE_SUBSCRIPTIONS;
|
|
use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
|
|
use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
|
|
|
|
|
|
- async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
|
|
|
|
|
|
+ async fn dummy_server(port: u16) -> (Url, Arc<Relayer<Memory>>, JoinHandle<()>) {
|
|
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
|
|
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
|
|
.await
|
|
.await
|
|
.unwrap();
|
|
.unwrap();
|
|
let local_addr = listener.local_addr().expect("addr");
|
|
let local_addr = listener.local_addr().expect("addr");
|
|
|
|
|
|
let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
|
|
let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
|
|
- let (_, stopper) = relayer.main(listener).expect("valid main loop");
|
|
|
|
|
|
+ let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
|
|
(
|
|
(
|
|
Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
|
|
Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
|
|
|
|
+ relayer,
|
|
stopper,
|
|
stopper,
|
|
)
|
|
)
|
|
}
|
|
}
|
|
@@ -233,7 +238,7 @@ mod test {
|
|
|
|
|
|
#[tokio::test]
|
|
#[tokio::test]
|
|
async fn connect_to_dummy_server() {
|
|
async fn connect_to_dummy_server() {
|
|
- let (addr, stopper) = dummy_server(0).await;
|
|
|
|
|
|
+ let (addr, _, stopper) = dummy_server(0).await;
|
|
let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
|
|
let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
|
|
|
|
|
|
assert_eq!(0, client_pool.check_active_connections().await);
|
|
assert_eq!(0, client_pool.check_active_connections().await);
|
|
@@ -250,7 +255,7 @@ mod test {
|
|
|
|
|
|
#[tokio::test]
|
|
#[tokio::test]
|
|
async fn two_clients_communication() {
|
|
async fn two_clients_communication() {
|
|
- let (addr, _) = dummy_server(0).await;
|
|
|
|
|
|
+ let (addr, _, _) = dummy_server(0).await;
|
|
let (mut client_pool1, _c1) =
|
|
let (mut client_pool1, _c1) =
|
|
Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
|
|
Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
|
|
let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
|
|
let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
|
|
@@ -288,7 +293,7 @@ mod test {
|
|
|
|
|
|
#[tokio::test]
|
|
#[tokio::test]
|
|
async fn reconnect_and_resubscribe() {
|
|
async fn reconnect_and_resubscribe() {
|
|
- let (addr, stopper) = dummy_server(0).await;
|
|
|
|
|
|
+ let (addr, _, stopper) = dummy_server(0).await;
|
|
let (mut client_pool1, _c1) =
|
|
let (mut client_pool1, _c1) =
|
|
Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
|
|
Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
|
|
let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
|
|
let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
|
|
@@ -333,7 +338,7 @@ mod test {
|
|
assert_eq!(0, client_pool1.check_active_connections().await);
|
|
assert_eq!(0, client_pool1.check_active_connections().await);
|
|
assert_eq!(0, client_pool2.check_active_connections().await);
|
|
assert_eq!(0, client_pool2.check_active_connections().await);
|
|
|
|
|
|
- let (_, stopper) = dummy_server(addr.port().expect("port")).await;
|
|
|
|
|
|
+ let (_, _, stopper) = dummy_server(addr.port().expect("port")).await;
|
|
|
|
|
|
sleep(Duration::from_millis(2_000)).await;
|
|
sleep(Duration::from_millis(2_000)).await;
|
|
|
|
|
|
@@ -362,8 +367,8 @@ mod test {
|
|
|
|
|
|
#[tokio::test]
|
|
#[tokio::test]
|
|
async fn connect_multiple_servers() {
|
|
async fn connect_multiple_servers() {
|
|
- let (addr1, _) = dummy_server(0).await;
|
|
|
|
- let (addr2, _) = dummy_server(0).await;
|
|
|
|
|
|
+ let (addr1, _, _) = dummy_server(0).await;
|
|
|
|
+ let (addr2, _, _) = dummy_server(0).await;
|
|
let (mut client_pool1, _c1) =
|
|
let (mut client_pool1, _c1) =
|
|
Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
|
|
Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
|
|
let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
|
|
let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
|
|
@@ -397,4 +402,416 @@ mod test {
|
|
);
|
|
);
|
|
assert!(client_pool1.try_recv().is_none());
|
|
assert!(client_pool1.try_recv().is_none());
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /// Client pool user creates 101 subscription (that is not allowed by many
|
|
|
|
+ /// relays), so the pool will do a round-robin subscriptions keeping 5
|
|
|
|
+ /// actives at a time with an internal scheduler.
|
|
|
|
+ ///
|
|
|
|
+ /// The scheduler will pause active subscriptions when the EOS is received.
|
|
|
|
+ /// On the next round `since` will be used to receive only newer events
|
|
|
|
+ mod scheduler {
|
|
|
|
+ use super::*;
|
|
|
|
+
|
|
|
|
+ #[tokio::test]
|
|
|
|
+ async fn stored_first() {
|
|
|
|
+ let (addr1, relayer, _) = dummy_server(0).await;
|
|
|
|
+
|
|
|
|
+ let (mut client_pool1, _c1) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+ let (client_pool2, _c2) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+
|
|
|
|
+ let account1 = Account::default();
|
|
|
|
+ let signed_content = account1
|
|
|
|
+ .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
|
|
|
|
+ .expect("valid signed content");
|
|
|
|
+ client_pool2.post(signed_content.clone().into()).await;
|
|
|
|
+
|
|
|
|
+ let first_sub = client_pool1
|
|
|
|
+ .subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ ids: vec![signed_content.id.clone()],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .expect("valid subs, last");
|
|
|
|
+
|
|
|
|
+ let subs = join_all(
|
|
|
|
+ (0..100)
|
|
|
|
+ .into_iter()
|
|
|
|
+ .map(|_| {
|
|
|
|
+ client_pool1.subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ authors: vec![
|
|
|
|
+ "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
|
|
|
|
+ .parse()
|
|
|
|
+ .unwrap(),
|
|
|
|
+ ],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ })
|
|
|
|
+ .collect::<Vec<_>>(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .into_iter()
|
|
|
|
+ .collect::<Result<Vec<_>, _>>()
|
|
|
|
+ .expect("valid 100 dummy subs");
|
|
|
|
+
|
|
|
|
+ for _ in 1..10 {
|
|
|
|
+ sleep(Duration::from_millis(10)).await;
|
|
|
|
+ assert!(
|
|
|
|
+ MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+
|
|
|
|
+ let mut has_receive_event = false;
|
|
|
|
+ for _ in 0..102 {
|
|
|
|
+ let event = client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid event");
|
|
|
|
+
|
|
|
|
+ if has_receive_event {
|
|
|
|
+ assert!(event.as_end_of_stored_events().is_some());
|
|
|
|
+ } else {
|
|
|
|
+ if let Some(ev) = event.as_event() {
|
|
|
|
+ assert_eq!(ev.id, signed_content.id);
|
|
|
|
+ has_receive_event = true;
|
|
|
|
+ } else {
|
|
|
|
+ assert!(event.as_end_of_stored_events().is_some());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ drop(subs);
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+ assert_eq!(
|
|
|
|
+ 1,
|
|
|
|
+ relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ drop(first_sub);
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+ assert_eq!(
|
|
|
|
+ 0,
|
|
|
|
+ relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ #[tokio::test]
|
|
|
|
+ async fn stored_last() {
|
|
|
|
+ let (addr1, relayer, _) = dummy_server(0).await;
|
|
|
|
+
|
|
|
|
+ let (mut client_pool1, _c1) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+ let (client_pool2, _c2) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+
|
|
|
|
+ let account1 = Account::default();
|
|
|
|
+ let signed_content = account1
|
|
|
|
+ .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
|
|
|
|
+ .expect("valid signed content");
|
|
|
|
+ client_pool2.post(signed_content.clone().into()).await;
|
|
|
|
+
|
|
|
|
+ let mut subs = join_all(
|
|
|
|
+ (0..100)
|
|
|
|
+ .into_iter()
|
|
|
|
+ .map(|_| {
|
|
|
|
+ client_pool1.subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ authors: vec![
|
|
|
|
+ "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
|
|
|
|
+ .parse()
|
|
|
|
+ .unwrap(),
|
|
|
|
+ ],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ })
|
|
|
|
+ .collect::<Vec<_>>(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .into_iter()
|
|
|
|
+ .collect::<Result<Vec<_>, _>>()
|
|
|
|
+ .expect("valid 100 dummy subs");
|
|
|
|
+
|
|
|
|
+ for _ in 1..10 {
|
|
|
|
+ sleep(Duration::from_millis(10)).await;
|
|
|
|
+ assert!(
|
|
|
|
+ MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _ in 0..100 {
|
|
|
|
+ assert!(client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid message")
|
|
|
|
+ .as_end_of_stored_events()
|
|
|
|
+ .is_some());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ assert!(client_pool1.try_recv().is_none());
|
|
|
|
+
|
|
|
|
+ subs.push(
|
|
|
|
+ client_pool1
|
|
|
|
+ .subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ ids: vec![signed_content.id.clone()],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .expect("valid subs, last"),
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+
|
|
|
|
+ assert_eq!(
|
|
|
|
+ client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid message")
|
|
|
|
+ .as_event()
|
|
|
|
+ .expect("valid event")
|
|
|
|
+ .id,
|
|
|
|
+ signed_content.id
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ drop(subs);
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+ assert_eq!(
|
|
|
|
+ 0,
|
|
|
|
+ relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ #[tokio::test]
|
|
|
|
+ async fn realtime_first() {
|
|
|
|
+ let (addr1, relayer, _) = dummy_server(0).await;
|
|
|
|
+
|
|
|
|
+ let (mut client_pool1, _c1) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+ let (client_pool2, _c2) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+
|
|
|
|
+ let account1 = Account::default();
|
|
|
|
+ let signed_content = account1
|
|
|
|
+ .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
|
|
|
|
+ .expect("valid signed content");
|
|
|
|
+
|
|
|
|
+ let first_sub = client_pool1
|
|
|
|
+ .subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ ids: vec![signed_content.id.clone()],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .expect("valid subs, first");
|
|
|
|
+
|
|
|
|
+ let subs = join_all(
|
|
|
|
+ (0..100)
|
|
|
|
+ .into_iter()
|
|
|
|
+ .map(|_| {
|
|
|
|
+ client_pool1.subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ authors: vec![
|
|
|
|
+ "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
|
|
|
|
+ .parse()
|
|
|
|
+ .unwrap(),
|
|
|
|
+ ],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ })
|
|
|
|
+ .collect::<Vec<_>>(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .into_iter()
|
|
|
|
+ .collect::<Result<Vec<_>, _>>()
|
|
|
|
+ .expect("valid 100 dummy subs");
|
|
|
|
+
|
|
|
|
+ for _ in 1..10 {
|
|
|
|
+ sleep(Duration::from_millis(10)).await;
|
|
|
|
+ assert!(
|
|
|
|
+ MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _ in 0..101 {
|
|
|
|
+ assert!(client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid message")
|
|
|
|
+ .as_end_of_stored_events()
|
|
|
|
+ .is_some());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ assert!(client_pool1.try_recv().is_none());
|
|
|
|
+
|
|
|
|
+ client_pool2.post(signed_content.clone().into()).await;
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+
|
|
|
|
+ assert_eq!(
|
|
|
|
+ client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid message")
|
|
|
|
+ .as_event()
|
|
|
|
+ .expect("valid event")
|
|
|
|
+ .id,
|
|
|
|
+ signed_content.id
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ drop(subs);
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+ assert_eq!(
|
|
|
|
+ 1,
|
|
|
|
+ relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ drop(first_sub);
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+ assert_eq!(
|
|
|
|
+ 0,
|
|
|
|
+ relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ #[tokio::test]
|
|
|
|
+ async fn realtime_last() {
|
|
|
|
+ let (addr1, relayer, _) = dummy_server(0).await;
|
|
|
|
+
|
|
|
|
+ let (mut client_pool1, _c1) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+ let (client_pool2, _c2) =
|
|
|
|
+ Pool::new_with_clients(vec![addr1.clone()]).expect("valid pool");
|
|
|
|
+
|
|
|
|
+ let account1 = Account::default();
|
|
|
|
+ let signed_content = account1
|
|
|
|
+ .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
|
|
|
|
+ .expect("valid signed content");
|
|
|
|
+
|
|
|
|
+ let mut subs = join_all(
|
|
|
|
+ (0..100)
|
|
|
|
+ .into_iter()
|
|
|
|
+ .map(|_| {
|
|
|
|
+ client_pool1.subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ authors: vec![
|
|
|
|
+ "npub1k2q4dqk0eqlu6tp6m5zhsh852u7a8zz9wp5ewnxxmrx2q6eu8duq3ydzzr"
|
|
|
|
+ .parse()
|
|
|
|
+ .unwrap(),
|
|
|
|
+ ],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ })
|
|
|
|
+ .collect::<Vec<_>>(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .into_iter()
|
|
|
|
+ .collect::<Result<Vec<_>, _>>()
|
|
|
|
+ .expect("valid 100 dummy subs");
|
|
|
|
+
|
|
|
|
+ for _ in 1..10 {
|
|
|
|
+ sleep(Duration::from_millis(10)).await;
|
|
|
|
+ assert!(
|
|
|
|
+ MAX_ACTIVE_SUBSCRIPTIONS * 2 > relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _ in 0..100 {
|
|
|
|
+ assert!(client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid message")
|
|
|
|
+ .as_end_of_stored_events()
|
|
|
|
+ .is_some());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ assert!(client_pool1.try_recv().is_none());
|
|
|
|
+
|
|
|
|
+ subs.push(
|
|
|
|
+ client_pool1
|
|
|
|
+ .subscribe(
|
|
|
|
+ Filter {
|
|
|
|
+ ids: vec![signed_content.id.clone()],
|
|
|
|
+ ..Default::default()
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ )
|
|
|
|
+ .await
|
|
|
|
+ .expect("valid subs, last"),
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+
|
|
|
|
+ assert!(client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid message")
|
|
|
|
+ .as_end_of_stored_events()
|
|
|
|
+ .is_some());
|
|
|
|
+
|
|
|
|
+ client_pool2.post(signed_content.clone().into()).await;
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+
|
|
|
|
+ assert_eq!(
|
|
|
|
+ client_pool1
|
|
|
|
+ .try_recv()
|
|
|
|
+ .map(|(r, _)| r)
|
|
|
|
+ .expect("valid message")
|
|
|
|
+ .as_event()
|
|
|
|
+ .expect("valid event")
|
|
|
|
+ .id,
|
|
|
|
+ signed_content.id
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ drop(subs);
|
|
|
|
+
|
|
|
|
+ sleep(Duration::from_secs(1)).await;
|
|
|
|
+ assert_eq!(
|
|
|
|
+ 0,
|
|
|
|
+ relayer.total_subscribers(),
|
|
|
|
+ "total subs {}",
|
|
|
|
+ relayer.total_subscribers()
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|