use crate::{ connection::{ConnectionId, LocalConnection}, subscription::SubscriptionManager, Connection, Error, }; use futures_util::StreamExt; use nostr_rs_client::{Error as ClientError, Pool, Url}; use nostr_rs_storage_base::Storage; use nostr_rs_types::{ relayer::{self, ROk, ROkStatus}, types::{Addr, Event}, Request, Response, }; use std::{ collections::{HashMap, HashSet}, ops::Deref, sync::Arc, }; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc::{channel, Receiver, Sender}, }; use tokio::{ sync::{mpsc, RwLock}, task::JoinHandle, }; /// Relayer struct /// pub struct Relayer { /// Storage engine, if provided the services are going to persisted in disk, /// otherwise all the messages are going to be ephemeral, making this /// relayer just a dumb proxy (that can be useful for privacy) but it won't /// be able to perform any optimization like prefetching content while offline storage: Option, /// Subscription manager subscriptions: Arc, /// List of all active connections connections: RwLock>, /// This Sender can be used to send requests from anywhere to the relayer. send_to_relayer: Sender<(ConnectionId, Request)>, /// This Receiver is the relayer the way the relayer receives messages relayer_receiver: Option>, /// Client pool /// /// A relayer can optionally be connected to a pool of clients to get foreign events. client_pool: Option<(Pool, JoinHandle<()>)>, } impl Drop for Relayer { fn drop(&mut self) { if let Some((_, handle)) = self.client_pool.take() { handle.abort(); } } } impl Relayer { /// Creates a new relayer instance /// /// If the storage is given, it will be used to persist events, as well to /// server past events when a new subscription is added. /// /// If the client_pool is given it will be used to connect to those relayers /// and create a network of relayers, reposting events to them and /// subscribing to their events.`gqq` pub fn new(storage: Option, client_pool: Option) -> Result { let (sender, receiver) = channel(100_000); Ok(Self { storage, subscriptions: Default::default(), send_to_relayer: sender.clone(), relayer_receiver: Some(receiver), connections: Default::default(), client_pool: if let Some(client_pool) = client_pool { Some(Self::handle_client_pool(client_pool, sender)?) } else { None }, }) } /// Connects to the relayer pool pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> { let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?; client_pool.connect_to(url).await; Ok(()) } /// Total number of subscribers requests that actively listening for new events pub fn total_subscribers(&self) -> usize { self.subscriptions.total_subscribers() } /// Splits the relayer object and extract their receiver. pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> { let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?; Ok((self, receiver)) } /// Runs the relayer main loop in a tokio task and returns it. /// /// This function consumes the object and takes the ownership. The returned /// JoinHandle() can be used to stop the main loop pub fn main(self, server: TcpListener) -> Result<(Arc, JoinHandle<()>), Error> { let (this, mut receiver) = self.split()?; let _self = Arc::new(this); let this = _self.clone(); let handle = tokio::spawn(async move { loop { tokio::select! { Ok((stream, _)) = server.accept() => { // accept new external connections let _ = this.add_connection(None, stream).await; }, Some((conn_id, request)) = receiver.recv() => { // receive messages from the connection pool if conn_id.is_empty() { // message received from client pool if let Request::Event(event) = request { let _ = this.broadcast(event.deref()).await; if let Some(storage) = this.storage.as_ref() { let _ = storage.store_local_event(&event).await; } } continue; } let connections = this.connections.read().await; let connection = if let Some(connection) = connections.get(&conn_id) { connection } else { continue; }; // receive messages from clients let _ = this.process_request_from_client(connection, request).await; drop(connections); } else => { } } } }); Ok((_self, handle)) } /// Handle the client pool /// /// Main loop to consume messages from the client pool and broadcast them to the local subscribers fn handle_client_pool( client_pool: Pool, send_message_to_relayer: Sender<(ConnectionId, Request)>, ) -> Result<(Pool, JoinHandle<()>), ClientError> { let (mut receiver, client_pool) = client_pool.split()?; let handle = tokio::spawn(async move { loop { if receiver.len() > 500 { println!("{}", receiver.len()); } if let Some((response, _)) = receiver.recv().await { match response { Response::Event(event) => { let _ = send_message_to_relayer.try_send(( ConnectionId::new_empty(), Request::Event(event.event.into()), )); } Response::EndOfStoredEvents(_) => {} x => { println!("x => {:?}", x); } } } } }); Ok((client_pool, handle)) } /// Returns a reference to the internal database pub fn get_db(&self) -> &Option { &self.storage } /// Adds a new local connection to the list of active connections. pub async fn create_new_local_connection(&self) -> LocalConnection { let (conn, receiver) = Connection::new_local_connection(); let conn_id = conn.get_conn_id(); self.connections.write().await.insert(conn_id, conn); (conn_id, receiver, self.send_to_relayer.clone()).into() } /// Adds a new TpStream and adds it to the list of active connections. /// /// This function will spawn the client's loop to receive incoming messages and send those messages pub async fn add_connection( &self, disconnection_notify: Option>, stream: TcpStream, ) -> Result { let conn = Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream) .await?; let id = conn.get_conn_id(); self.connections.write().await.insert(id, conn); Ok(id) } /// Process a request from a connected client async fn process_request_from_client( &self, connection: &Connection, request: Request, ) -> Result<(), Error> { match request { Request::Event(event) => { let event_id: Addr = event.id.clone().into(); if !self.broadcast(&event).await? { connection.send( ROk { id: event_id, status: ROkStatus::Duplicate, } .into(), )?; return Ok(()); } if let Some(storage) = self.storage.as_ref() { let _ = storage.store_local_event(&event).await; } if let Some((client_pool, _)) = self.client_pool.as_ref() { // pass the event to the pool of clients, so this relayer can relay // their local events to the clients in the network of relayers let _ = client_pool.post(event).await; } connection.send( ROk { id: event_id, status: ROkStatus::Ok, } .into(), )?; } Request::Request(request) => { let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref() { // pass the subscription request to the pool of clients, so this relayer // can relay any unknown event to the clients through their subscriptions Some(client_pool.subscribe(request.clone()).await?) } else { None }; if let Some(storage) = self.storage.as_ref() { let mut sent = HashSet::new(); // Sent all events that match the filter that are stored in our database for filter in request.filters.clone().into_iter() { let mut result = storage.get_by_filter(filter).await?; while let Some(Ok(event)) = result.next().await { if sent.contains(&event.id) { continue; } sent.insert(event.id.clone()); let _ = connection.send( relayer::Event { subscription_id: request.subscription_id.clone(), event, } .into(), ); } } } let _ = connection .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into()); connection .subscribe( request.subscription_id.clone(), ( foreign_subscription, self.subscriptions .subscribe( connection.get_conn_id(), connection.get_sender(), request.clone(), ) .await, ), ) .await; } Request::Close(close) => { connection.unsubscribe(&close).await; } }; Ok(()) } #[inline] /// Broadcast a given event to all local subscribers pub async fn broadcast(&self, event: &Event) -> Result { if let Some(storage) = self.storage.as_ref() { if !storage.store(event).await? { return Ok(false); } } self.subscriptions.broadcast(event.clone()); Ok(true) } } #[cfg(test)] mod test { use std::time::Duration; use super::*; use futures::future::join_all; use nostr_rs_client::Url; use nostr_rs_memory::Memory; use nostr_rs_types::{ account::Account, types::{Content, Tag}, Request, }; use serde_json::json; use tokio::time::sleep; async fn dummy_server(port: u16, client_pool: Option) -> (Url, JoinHandle<()>) { let listener = TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); let local_addr = listener.local_addr().expect("addr"); let relayer = Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server"); let (_, stopper) = relayer.main(listener).expect("valid main loop"); ( Url::parse(&format!("ws://{}", local_addr)).expect("valid url"), stopper, ) } fn get_note_with_custom_tags(tags: Vec) -> Event { let account = Account::default(); let content = Content::ShortTextNote("".to_owned()); account.sign_content(tags, content, None).expect("valid") } fn get_note() -> Request { serde_json::from_value(json!( [ "EVENT", { "kind":1, "content":"Pong", "tags":[ ["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"], ["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"], ["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], ["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"] ], "created_at":1681938616, "pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3", "id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9", "sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc", } ])).expect("value") } async fn get_db(prefill: bool) -> Memory { let db = Memory::default(); if prefill { let events = include_str!("../tests/events.json") .lines() .map(|line| serde_json::from_str(line).expect("valid")) .collect::>(); for event in events { assert!(db.store(&event).await.expect("valid")); } while db.is_flushing() { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; } } db } #[tokio::test] async fn serve_listener_from_local_db_custom_tag() { let request = serde_json::from_value(json!([ "REQ", "1298169700973717", { "#f": [ "foo", ], }, ])) .expect("valid object"); let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer"); let (connection, mut recv) = Connection::new_local_connection(); let note = get_note_with_custom_tags(vec![Tag::Unknown("f".to_owned(), vec!["foo".to_owned()])]); let _ = relayer .process_request_from_client(&connection, note.clone().into()) .await; sleep(Duration::from_millis(10)).await; let _ = relayer .process_request_from_client(&connection, request) .await; // ev1 assert_eq!( ROkStatus::Ok, recv.try_recv() .expect("valid") .as_ok() .cloned() .unwrap() .status, ); // ev1 assert_eq!( note, recv.try_recv().expect("valid").as_event().unwrap().event ); // eod assert!(recv .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); assert!(recv.try_recv().is_err()); } #[tokio::test] async fn serve_listener_from_local_db() { let request = serde_json::from_value(json!([ "REQ", "1298169700973717", { "authors": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "since": 1681928304 }, { "#p": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "kinds": [ 1, 3, 6, 7, 9735 ], "since": 1681928304 }, { "#p": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "kinds": [ 4 ] }, { "authors": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "kinds": [ 4 ] }, { "#e": [ "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1" ], "kinds": [ 1, 6, 7, 9735 ] } ])) .expect("valid object"); let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer"); let (connection, mut recv) = Connection::new_local_connection(); let _ = relayer .process_request_from_client(&connection, request) .await; // ev1 assert_eq!( "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42", recv.try_recv() .expect("valid") .as_event() .expect("event") .id .to_string() ); // ev3 assert_eq!( "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9", recv.try_recv() .expect("valid") .as_event() .expect("event") .id .to_string() ); // ev2 assert_eq!( "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", recv.try_recv() .expect("valid") .as_event() .expect("event") .id .to_string() ); // eod assert!(recv .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); assert!(recv.try_recv().is_err()); } #[tokio::test] async fn server_listener_real_time_single_argument() { let request: Request = serde_json::from_value(json!( [ "REQ", "1298169700973717", { "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], }, { "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], }, { "#e":[ "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1" ], } ])) .expect("valid object"); let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer"); let (connection, mut recv) = Connection::new_local_connection(); assert_eq!(relayer.total_subscribers(), 0); let _ = relayer .process_request_from_client(&connection, request) .await; assert_eq!(relayer.total_subscribers(), 5); // eod assert!(recv .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); // It is empty assert!(recv.try_recv().is_err()); relayer .process_request_from_client(&connection, get_note()) .await .expect("process event"); sleep(Duration::from_millis(100)).await; // ok from posting let msg = recv.try_recv(); assert!(msg.is_ok()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); // It is not empty let msg = recv.try_recv(); assert!(msg.is_ok()); assert_eq!( msg.expect("is ok") .as_event() .expect("valid") .subscription_id .to_string(), "1298169700973717".to_owned() ); // it must be deliverd at most once assert!(recv.try_recv().is_err()); assert_eq!(relayer.total_subscribers(), 5); // when client is dropped, the subscription is removed // automatically drop(connection); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); } #[tokio::test] async fn server_listener_real_time() { let request: Request = serde_json::from_value(json!( [ "REQ", "1298169700973717", { "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "kinds":[1,3,6,7,9735], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "kinds":[4] }, { "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "kinds":[4] }, { "#e":[ "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1" ], "kinds":[1,6,7,9735] } ])) .expect("valid object"); let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer"); let (connection, mut recv) = Connection::new_local_connection(); assert_eq!(relayer.total_subscribers(), 0); let _ = relayer .process_request_from_client(&connection, request) .await; assert_eq!(relayer.total_subscribers(), 5); // eod assert!(recv .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); // It is empty assert!(recv.try_recv().is_err()); relayer .process_request_from_client(&connection, get_note()) .await .expect("process event"); sleep(Duration::from_millis(100)).await; // ok from posting let msg = recv.try_recv(); assert!(msg.is_ok()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); // It is not empty let msg = recv.try_recv(); assert!(msg.is_ok()); assert_eq!( msg.expect("is ok") .as_event() .expect("valid") .subscription_id .to_string(), "1298169700973717".to_owned() ); // it must be deliverd at most once assert!(recv.try_recv().is_err()); assert_eq!(relayer.total_subscribers(), 5); // when client is dropped, the subscription is removed // automatically drop(connection); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); } #[tokio::test] async fn multiple_subcribers() { let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", { "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb4"], }])) .expect("valid object"); let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", { "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"] }])) .expect("valid object"); let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer"); let (publisher, mut recv) = Connection::new_local_connection(); let mut set1 = (0..1000) .map(|_| Connection::new_local_connection()) .collect::>(); let mut set2 = (0..100) .map(|_| Connection::new_local_connection()) .collect::>(); let subscribe1 = set1 .iter() .map(|(connection, _)| relayer.process_request_from_client(connection, req1.clone())) .collect::>(); let subscribe2 = set2 .iter() .map(|(connection, _)| relayer.process_request_from_client(connection, req2.clone())) .collect::>(); assert_eq!(relayer.total_subscribers(), 0); join_all(subscribe1) .await .into_iter() .collect::, _>>() .expect("valid calls"); join_all(subscribe2) .await .into_iter() .collect::, _>>() .expect("valid calls"); for (_, recv) in set1.iter_mut() { assert!(recv .try_recv() .expect("end of stored events") .as_end_of_stored_events() .is_some()); } for (_, recv) in set2.iter_mut() { assert!(recv .try_recv() .expect("end of stored events") .as_end_of_stored_events() .is_some()); } assert_eq!(relayer.total_subscribers(), 1100); relayer .process_request_from_client(&publisher, get_note()) .await .expect("process event"); sleep(Duration::from_millis(10)).await; let msg = recv.try_recv(); assert!(msg.is_ok()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); for (_, recv) in set1.iter_mut() { assert!(recv.try_recv().is_err()); } for (_, recv) in set2.iter_mut() { let msg = recv.try_recv(); println!("{:?}", msg); assert!(msg.is_ok()); let msg = msg.expect("msg"); assert_eq!( msg.as_event().expect("valid").subscription_id.to_string(), "1298169700973717".to_owned() ); assert!(recv.try_recv().is_err()); } drop(set1); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 100); drop(set2); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); drop(relayer); } #[tokio::test] async fn posting_event_replies_ok() { let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer"); let (connection, mut recv) = Connection::new_local_connection(); let note = get_note(); let note_id = note.as_event().map(|x| x.id.clone()).unwrap(); relayer .process_request_from_client(&connection, note) .await .expect("process event"); sleep(Duration::from_millis(10)).await; assert_eq!( Some( ROk { id: note_id.into(), status: ROkStatus::Ok, } .into() ), recv.try_recv().ok() ); } #[tokio::test] async fn subscribe_to_all() { let request: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object"); let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer"); let (connection, mut recv) = Connection::new_local_connection(); assert_eq!(relayer.total_subscribers(), 0); let _ = relayer .process_request_from_client(&connection, request) .await; assert_eq!(relayer.total_subscribers(), 1); // eod assert!(recv .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); // It is empty assert!(recv.try_recv().is_err()); relayer .process_request_from_client(&connection, get_note()) .await .expect("process event"); sleep(Duration::from_millis(10)).await; // ok from posting let msg = recv.try_recv(); assert!(msg.is_ok()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); // It is not empty let msg = recv.try_recv(); assert!(msg.is_ok()); assert_eq!( msg.expect("is ok") .as_event() .expect("valid") .subscription_id .to_string(), "1298169700973717".to_owned() ); // it must be deliverd at most once assert!(recv.try_recv().is_err()); assert_eq!(relayer.total_subscribers(), 1); // when client is dropped, the subscription is removed // automatically drop(connection); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); } #[tokio::test] async fn relayer_posts_to_custom_posts_to_all_clients() { let (relayer1, _) = dummy_server(0, None).await; let (relayer2, _) = dummy_server(0, None).await; let (relayer3, _) = dummy_server(0, None).await; let (main_relayer, _) = dummy_server( 0, Some(Pool::new_with_clients(vec![ relayer1.clone(), relayer2.clone(), relayer3.clone(), ])), ) .await; let mut reader_client = Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()]); let main_client = Pool::new_with_clients(vec![main_relayer]); let _sub = reader_client .subscribe(Default::default()) .await .expect("valid subscription"); sleep(Duration::from_millis(20)).await; for _ in 0..3 { assert!(reader_client .try_recv() .map(|(r, _)| r) .expect("valid message") .as_end_of_stored_events() .is_some()); } assert!(reader_client.try_recv().is_none()); let account1 = Account::default(); let signed_content = account1 .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None) .expect("valid signed content"); // account1 posts a new note into the relayer1, and the main relayer // should get a copy of it, as well as it is connected to relayer2 and // relayer1. main_client.post(signed_content.clone().into()).await; sleep(Duration::from_millis(10)).await; let responses = (0..3) .map(|_| reader_client.try_recv().expect("valid message")) .filter_map(|(r, url)| { r.as_event() .map(|r| (url.port().expect("port"), r.to_owned())) }) .collect::>(); assert!(reader_client.try_recv().is_none()); assert_eq!(responses.len(), 3); assert_eq!( responses .get(&relayer1.port().expect("port")) .map(|x| x.id.clone()), Some(signed_content.id.clone()) ); assert_eq!( responses .get(&relayer2.port().expect("port")) .map(|x| x.id.clone()), Some(signed_content.id.clone()) ); assert_eq!( responses .get(&relayer3.port().expect("port")) .map(|x| x.id.clone()), Some(signed_content.id) ); } #[tokio::test] async fn relayer_with_client_pool() { let (relayer1, _) = dummy_server(0, None).await; let (relayer2, _) = dummy_server(0, None).await; let (main_relayer, _) = dummy_server( 0, Some(Pool::new_with_clients(vec![relayer1.clone(), relayer2])), ) .await; let secondary_client = Pool::new_with_clients(vec![relayer1]); // Create a subscription in the main relayer, main_client is only // connected to the main relayer let mut main_client = Pool::new_with_clients(vec![main_relayer]); let _sub = main_client .subscribe(Default::default()) .await .expect("valid subscription"); sleep(Duration::from_millis(10)).await; assert!(main_client .try_recv() .map(|(r, _)| r) .expect("valid message") .as_end_of_stored_events() .is_some()); assert!(main_client.try_recv().is_none()); let account1 = Account::default(); let signed_content = account1 .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None) .expect("valid signed content"); // account1 posts a new note into the relayer1, and the main relayer // should get a copy of it, as well as it is connected to relayer2 and // relayer1. secondary_client.post(signed_content.clone().into()).await; // wait for the note to be delivered sleep(Duration::from_millis(10)).await; assert_eq!( Some((signed_content.id, signed_content.signature)), main_client .try_recv() .and_then(|(r, _)| r.as_event().cloned()) .map(|x| (x.id.clone(), x.signature.clone())) ); assert!(main_client.try_recv().is_none()); } }