use crate::{ connection::{ConnectionId, LocalConnection}, Connection, Error, }; use futures_util::StreamExt; use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url}; use nostr_rs_storage_base::Storage; use nostr_rs_types::{ relayer::{self, ROk, ROkStatus}, types::{Addr, Event, SubscriptionId}, Request, Response, }; use std::{ collections::{HashMap, HashSet}, ops::Deref, sync::Arc, time::Instant, }; use tokio::{ net::{TcpListener, TcpStream}, sync::{ mpsc::{ self, {channel, Receiver, Sender}, }, RwLock, }, task::JoinHandle, }; #[derive(Debug, Hash, Ord, PartialEq, PartialOrd, Eq, Clone)] pub struct RelayerSubscriptionId((SubscriptionId, ConnectionId)); impl From<(SubscriptionId, ConnectionId)> for RelayerSubscriptionId { fn from(value: (SubscriptionId, ConnectionId)) -> Self { Self(value) } } impl Default for RelayerSubscriptionId { fn default() -> Self { Self((SubscriptionId::empty(), ConnectionId::new_empty())) } } type Connections = Arc>>; type SubscriptionManager = Arc>; type ClientPoolSubscriptions = Arc>>; /// Relayer struct /// pub struct Relayer { /// Storage engine, if provided the services are going to persisted in disk, /// otherwise all the messages are going to be ephemeral, making this /// relayer just a dumb proxy (that can be useful for privacy) but it won't /// be able to perform any optimization like prefetching content while offline storage: Arc>, /// Subscription manager subscription_manager: SubscriptionManager, /// List of all active connections connections: Connections, /// This Sender can be used to send requests from anywhere to the relayer. send_to_relayer: Sender<(ConnectionId, Request)>, /// This Receiver is the relayer the way the relayer receives messages relayer_receiver: Option>, /// Client pool /// /// A relayer can optionally be connected to a pool of clients to get /// foreign events. client_pool: Option>, client_pool_receiver: Option>, client_pool_subscriptions: ClientPoolSubscriptions, } impl Relayer { /// Creates a new relayer instance /// /// If the storage is given, it will be used to persist events, as well to /// server past events when a new subscription is added. /// /// If the client_pool is given it will be used to connect to those relayers /// and create a network of relayers, reposting events to them and /// subscribing to their events.`gqq` pub fn new(storage: Option, client_pool: Option) -> Result { let (relayer_sender, relayer_receiver) = channel(100_000); let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool { let result = client_pool.split()?; (result.0, Some(Arc::new(result.1))) } else { let (_, receiver) = mpsc::channel(1); (receiver, None) }; Ok(Self { storage: Arc::new(storage), subscription_manager: Default::default(), send_to_relayer: relayer_sender, relayer_receiver: Some(relayer_receiver), connections: Default::default(), client_pool_receiver: Some(client_pool_receiver), client_pool, client_pool_subscriptions: Default::default(), }) } /// Connects to the relayer pool pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> { let _ = self .client_pool .as_ref() .ok_or(Error::NoClient)? .connect_to(url) .await?; Ok(()) } /// Total number of subscribers requests that actively listening for new events pub fn total_subscribers(&self) -> usize { self.subscription_manager.total_subscribers() } /// Splits the relayer object and extract their receiver. pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> { let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?; Ok((self, receiver)) } /// Runs the relayer main loop in a tokio task and returns it. /// /// This function consumes the object and takes the ownership. The returned /// JoinHandle() can be used to stop the main loop pub fn main(mut self, server: TcpListener) -> Result<(Arc, JoinHandle<()>), Error> { let mut client_pool_receiver = self .client_pool_receiver .take() .ok_or(Error::AlreadySplitted)?; let (this, mut receiver) = self.split()?; let _self = Arc::new(this); let this = _self.clone(); let handle = tokio::spawn(async move { loop { let start = Instant::now(); println!("{}", client_pool_receiver.len()); tokio::select! { Ok((stream, _)) = server.accept() => { // accept new connections let _ = this.add_connection(None, stream).await; }, Some((response, _)) = client_pool_receiver.recv() => { // process messages from anothe relayer, broadcast it and store it match response { Response::Event(event) => { // we received a message from the client pool, store it locally // and re-broadcast it. tokio::spawn(Self::broadcast( this.storage.clone(), this.subscription_manager.clone(), this.connections.clone(), event.event )); } Response::EndOfStoredEvents(sub) => { let connections = this.connections.read().await; let (sub_id, connection) = if let Some((sub_id, conn_id)) = this.client_pool_subscriptions.write().await.remove(&(sub.deref().into())) { if let Some(connection) = connections.get(&conn_id) { (sub_id, connection) } else { continue; } } else { continue }; let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into())); let duration = start.elapsed(); println!("xTime elapsed: {} ms", duration.as_millis()); } _ => {} } } Some((conn_id, request)) = receiver.recv() => { tokio::spawn(Self::process_request( this.storage.clone(), this.client_pool.clone(), this.client_pool_subscriptions.clone(), this.subscription_manager.clone(), this.connections.clone(), conn_id, request.clone() )); } else => { } } } }); Ok((_self, handle)) } /// Returns a reference to the internal database pub fn get_db(&self) -> &Option { &self.storage } /// Adds a new local connection to the list of active connections. pub async fn create_new_local_connection(self: &Arc) -> LocalConnection { let (conn, receiver) = Connection::new_local_connection(); let conn_id = conn.get_conn_id(); self.connections.write().await.insert(conn_id, conn); ( conn_id, receiver, self.send_to_relayer.clone(), self.clone(), ) .into() } /// Drops a connection from the list of active connections /// /// This function only works for local connections, normal connections can /// be dropped on disconnection. /// /// This function could change in the future tu kick connections programmatically pub fn drop_connection(self: &Arc, local_connection: &LocalConnection) { let id = local_connection.conn_id; let this = self.clone(); tokio::spawn(async move { this.connections.write().await.remove(&id); }); } /// Adds a new TpStream and adds it to the list of active connections. /// /// This function will spawn the client's loop to receive incoming messages and send those messages pub async fn add_connection( &self, disconnection_notify: Option>, stream: TcpStream, ) -> Result { let conn = Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream) .await?; let id = conn.get_conn_id(); self.connections.write().await.insert(id, conn); Ok(id) } #[cfg(test)] async fn process_request_from_client( &self, connection: &LocalConnection, request: Request, ) -> Result<(), Error> { Self::process_request( self.storage.clone(), self.client_pool.clone(), self.client_pool_subscriptions.clone(), self.subscription_manager.clone(), self.connections.clone(), connection.conn_id, request, ) .await } /// Process a request from a connected client async fn process_request( storage: Arc>, client_pool: Option>, client_pool_subscriptions: ClientPoolSubscriptions, subscription_manager: SubscriptionManager, connections: Connections, connection_id: ConnectionId, request: Request, ) -> Result<(), Error> { match request { Request::Event(event) => { let read_connections = connections.read().await; let connection = read_connections .get(&connection_id) .ok_or(Error::UnknownConnection(connection_id))?; let event_id: Addr = event.id.clone().into(); if !Self::broadcast( storage.clone(), subscription_manager.clone(), connections.clone(), event.deref().clone(), ) .await? { connection.respond( ROk { id: event_id, status: ROkStatus::Duplicate, } .into(), )?; return Ok(()); } if let Some(storage) = storage.as_ref() { let _ = storage.store_local_event(&event).await; } if let Some(client_pool) = client_pool.as_ref() { // pass the event to the pool of clients, so this relayer can relay // their local events to the clients in the network of relayers let _ = client_pool.post(event).await; } connection.respond( ROk { id: event_id, status: ROkStatus::Ok, } .into(), )?; } Request::Request(request) => { let foreign_subscription = if let Some(client_pool) = client_pool.as_ref() { // If this relay is connected to other relays through the // client pool, create the same subscription in them as // well, with the main goal of fetching any foreign event // that matches the requested subscription. // // If the this happens, this relay will serve any local // event that matches, as well any foreign event. Foreign // events will be stored locally as well if there is a // storage setup. let foreign_sub_id = client_pool .subscribe(request.filters.clone().into()) .await?; client_pool_subscriptions.write().await.insert( foreign_sub_id.clone(), (request.subscription_id.clone(), connection_id), ); Some(foreign_sub_id) } else { None }; let read_connections = connections.read().await; let connection = read_connections .get(&connection_id) .ok_or(Error::UnknownConnection(connection_id))?; if let Some(storage) = storage.as_ref() { let mut sent = HashSet::new(); // Sent all events that match the filter that are stored in our database for filter in request.filters.clone().into_iter() { let mut result = storage.get_by_filter(filter).await?; while let Some(Ok(event)) = result.next().await { if sent.contains(&event.id) { continue; } sent.insert(event.id.clone()); let _ = connection.respond( relayer::Event { subscription_id: request.subscription_id.clone(), event, } .into(), ); } } } if foreign_subscription.is_none() { // If there is a foreign subscription, we shouldn't send a // EOS until we have got EOS from all foreign relays let _ = connection.respond( relayer::EndOfStoredEvents(request.subscription_id.clone()).into(), ); } connection .subscribe( request.subscription_id.clone(), ( foreign_subscription, subscription_manager .subscribe( (request.subscription_id, connection.get_conn_id()).into(), request.filters, (), ) .await, ), ) .await; } Request::Close(close) => { connections .read() .await .get(&connection_id) .ok_or(Error::UnknownConnection(connection_id))? .unsubscribe(&close) .await; } }; Ok(()) } #[inline] /// A non-blocking version of broadcast pub async fn broadcast( storage: Arc>, subscription_manager: SubscriptionManager, connections: Connections, event: Event, ) -> Result { if let Some(storage) = storage.as_ref() { if !storage.store(&event).await? { return Ok(false); } } let connections = connections.read().await; for RelayerSubscriptionId((sub_id, conn_id)) in subscription_manager.get_subscribers(&event).await { if let Some(connection) = connections.get(&conn_id) { let _ = connection.respond( relayer::Event { subscription_id: sub_id, event: event.clone(), } .into(), ); } } Ok(true) } } #[cfg(test)] mod test { use super::*; use futures::future::join_all; use nostr_rs_client::Url; use nostr_rs_memory::Memory; use nostr_rs_types::{ account::Account, types::{Content, Tag}, Request, }; use serde_json::json; use std::time::Duration; use tokio::time::sleep; async fn dummy_server(port: u16, client_pool: Option) -> (Url, JoinHandle<()>) { let listener = TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); let local_addr = listener.local_addr().expect("addr"); let relayer = Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server"); let (_, stopper) = relayer.main(listener).expect("valid main loop"); ( Url::parse(&format!("ws://{}", local_addr)).expect("valid url"), stopper, ) } async fn dummy_server_with_relayer( client_pool: Option, ) -> (Arc>, JoinHandle<()>) { let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).await.unwrap(); let relayer = Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server"); let (relayer, stopper) = relayer.main(listener).expect("valid main loop"); (relayer, stopper) } fn get_note_with_custom_tags(tags: serde_json::Value) -> Event { let account = Account::default(); let content = Content::ShortTextNote("".to_owned()); let tags: Vec = serde_json::from_value(tags).expect("valid tags"); account.sign_content(tags, content, None).expect("valid") } fn get_note() -> Request { serde_json::from_value(json!( [ "EVENT", { "kind":1, "content":"Pong", "tags":[ ["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"], ["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"], ["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], ["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"] ], "created_at":1681938616, "pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3", "id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9", "sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc", } ])).expect("value") } async fn get_db(prefill: bool) -> Memory { let db = Memory::default(); if prefill { let events = include_str!("../tests/events.json") .lines() .map(|line| serde_json::from_str(line).expect("valid")) .collect::>(); for event in events { assert!(db.store(&event).await.expect("valid")); } while db.is_flushing() { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; } } db } #[tokio::test] async fn serve_listener_from_local_db_custom_tag() { let request = serde_json::from_value(json!([ "REQ", "1298169700973717", { "#f": [ "foo", ], }, ])) .expect("valid object"); let relayer = Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer")); let mut connection = relayer.create_new_local_connection().await; let note = get_note_with_custom_tags(json!([["f", "foo"]])); let _ = relayer .process_request_from_client(&connection, note.clone().into()) .await; sleep(Duration::from_millis(10)).await; let _ = relayer .process_request_from_client(&connection, request) .await; // ev1 assert_eq!( ROkStatus::Ok, connection .try_recv() .expect("valid") .as_ok() .cloned() .unwrap() .status, ); // ev1 assert_eq!( note, connection .try_recv() .expect("valid") .as_event() .unwrap() .event ); // eod assert!(connection .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); assert!(connection.try_recv().is_none()); } #[tokio::test] async fn serve_listener_from_local_db() { let request = serde_json::from_value(json!([ "REQ", "1298169700973717", { "authors": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "until": 1681928304 }, { "#p": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "kinds": [ 1, 3, 6, 7, 9735 ], "until": 1681928304 }, { "#p": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "kinds": [ 4 ] }, { "authors": [ "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb" ], "kinds": [ 4 ] }, { "#e": [ "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1" ], "kinds": [ 1, 6, 7, 9735 ] } ])) .expect("valid object"); let relayer = Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer")); let mut connection = relayer.create_new_local_connection().await; let _ = relayer .process_request_from_client(&connection, request) .await; // ev1 assert_eq!( "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42", connection .try_recv() .expect("valid") .as_event() .expect("event") .id .to_string() ); // ev3 assert_eq!( "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9", connection .try_recv() .expect("valid") .as_event() .expect("event") .id .to_string() ); // ev2 assert_eq!( "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", connection .try_recv() .expect("valid") .as_event() .expect("event") .id .to_string() ); // eod assert!(connection .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); assert!(connection.try_recv().is_none()); } #[tokio::test] async fn server_listener_real_time_single_argument() { let request: Request = serde_json::from_value(json!( [ "REQ", "1298169700973717", { "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], }, { "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], }, { "#e":[ "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1" ], } ])) .expect("valid object"); let (relayer, _stopper) = dummy_server_with_relayer(None).await; let mut receiver = relayer.create_new_local_connection().await; let mut publisher = relayer.create_new_local_connection().await; assert_eq!(relayer.total_subscribers(), 0); receiver.send(request).await.expect("subscribe"); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 1); // eod assert!(receiver .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); // It is empty assert!(receiver.try_recv().is_none()); publisher.send(get_note()).await.expect("valid send"); sleep(Duration::from_millis(10)).await; // ok from posting let msg = publisher.try_recv(); assert!(msg.is_some()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); // It is not empty let msg = receiver.try_recv(); assert!(msg.is_some()); assert_eq!( msg.expect("is ok") .as_event() .expect("valid") .subscription_id .to_string(), "1298169700973717".to_owned() ); // it must be deliverd at most once assert!(receiver.try_recv().is_none()); assert_eq!(relayer.total_subscribers(), 1); // when client is dropped, the subscription is removed // automatically drop(receiver); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); } #[tokio::test] async fn server_listener_real_time() { let request: Request = serde_json::from_value(json!( [ "REQ", "1298169700973717", { "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "kinds":[1,3,6,7,9735], "since":1681939304 }, { "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "kinds":[4] }, { "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"], "kinds":[4] }, { "#e":[ "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a", "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1" ], "kinds":[1,6,7,9735] } ])) .expect("valid object"); let (relayer, _stopper) = dummy_server_with_relayer(None).await; let mut receiver = relayer.create_new_local_connection().await; let mut publisher = relayer.create_new_local_connection().await; assert_eq!(relayer.total_subscribers(), 0); receiver.send(request).await.expect("subscribe"); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 1); // eod assert!(receiver .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); // It is empty assert!(receiver.try_recv().is_none()); publisher.send(get_note()).await.expect("valid send"); sleep(Duration::from_millis(100)).await; // ok from posting let msg = publisher.try_recv(); assert!(msg.is_some()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); // It is not empty let msg = receiver.try_recv(); assert!(msg.is_some()); assert_eq!( msg.expect("is ok") .as_event() .expect("valid") .subscription_id .to_string(), "1298169700973717".to_owned() ); // it must be deliverd at most once assert!(receiver.try_recv().is_none()); assert_eq!(relayer.total_subscribers(), 1); // when client is dropped, the subscription is removed // automatically drop(receiver); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); } #[tokio::test] async fn multiple_subcribers() { let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", { "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb4"], }])) .expect("valid object"); let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", { "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"] }])) .expect("valid object"); let (relayer, _stopper) = dummy_server_with_relayer(None).await; let mut publisher = relayer.create_new_local_connection().await; let mut set1 = join_all( (0..1000) .map(|_| relayer.create_new_local_connection()) .collect::>(), ) .await; let mut set2 = join_all( (0..100) .map(|_| relayer.create_new_local_connection()) .collect::>(), ) .await; assert_eq!(relayer.total_subscribers(), 0); join_all( set1.iter() .map(|connection| connection.send(req1.clone())) .collect::>(), ) .await .into_iter() .collect::, _>>() .expect("subscribe all"); join_all( set2.iter() .map(|connection| connection.send(req2.clone())) .collect::>(), ) .await .into_iter() .collect::, _>>() .expect("subscribe all"); sleep(Duration::from_millis(10)).await; for connection in set1.iter_mut() { assert!(connection .try_recv() .expect("end of stored events") .as_end_of_stored_events() .is_some()); } for connection in set2.iter_mut() { assert!(connection .try_recv() .expect("end of stored events") .as_end_of_stored_events() .is_some()); } assert_eq!(relayer.total_subscribers(), 1100); publisher.send(get_note()).await.expect("valid send"); sleep(Duration::from_millis(10)).await; let msg = publisher.try_recv(); assert!(msg.is_some()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); for connection in set1.iter_mut() { assert!(connection.try_recv().is_none()); } for connection in set2.iter_mut() { let msg = connection.try_recv(); assert!(msg.is_some()); let msg = msg.expect("msg"); assert_eq!( msg.as_event().expect("valid").subscription_id.to_string(), "1298169700973717".to_owned() ); assert!(connection.try_recv().is_none()); } drop(set1); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 100); drop(set2); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); drop(relayer); } #[tokio::test] async fn posting_event_replies_ok() { let relayer = Arc::new(Relayer::new(Some(get_db(false).await), None).expect("valid relayer")); let mut connection = relayer.create_new_local_connection().await; let note = get_note(); let note_id = note.as_event().map(|x| x.id.clone()).unwrap(); relayer .process_request_from_client(&connection, note) .await .expect("process event"); sleep(Duration::from_millis(10)).await; assert_eq!( Some( ROk { id: note_id.into(), status: ROkStatus::Ok, } .into() ), connection.try_recv() ); } #[tokio::test] async fn subscribe_to_all() { let request: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object"); let (relayer, _stopper) = dummy_server_with_relayer(None).await; let mut local_connection_0 = relayer.create_new_local_connection().await; let mut local_connection_1 = relayer.create_new_local_connection().await; assert_eq!(relayer.total_subscribers(), 0); local_connection_1.send(request).await.expect("valid send"); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 1); // eod assert!(local_connection_1 .try_recv() .expect("valid") .as_end_of_stored_events() .is_some()); // It is empty assert!(local_connection_1.try_recv().is_none()); local_connection_0 .send(get_note()) .await .expect("valid send"); sleep(Duration::from_millis(10)).await; // ok from posting let msg = local_connection_0.try_recv(); assert!(msg.is_some()); assert_eq!( msg.expect("is ok").as_ok().expect("valid").id.to_hex(), "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(), ); // It is not empty let msg = local_connection_1.try_recv(); assert!(msg.is_some()); assert_eq!( msg.expect("is ok") .as_event() .expect("valid") .subscription_id .to_string(), "1298169700973717".to_owned() ); // it must be deliverd at most once assert!(local_connection_1.try_recv().is_none()); assert_eq!(relayer.total_subscribers(), 1); // when client is dropped, the subscription is removed // automatically drop(local_connection_1); sleep(Duration::from_millis(10)).await; assert_eq!(relayer.total_subscribers(), 0); } #[tokio::test] async fn relayer_posts_to_custom_posts_to_all_clients() { let (relayer1, _) = dummy_server(0, None).await; let (relayer2, _) = dummy_server(0, None).await; let (relayer3, _) = dummy_server(0, None).await; let (pool, _in_scope) = Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()]) .expect("valid pool"); let (main_relayer, _) = dummy_server(0, Some(pool)).await; let (mut reader_client, _reader_client_inscope) = Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()]) .expect("valid pool"); let (main_client, _main_client_inscope) = Pool::new_with_clients(vec![main_relayer]).expect("valid pool"); let _sub = reader_client .subscribe(Default::default()) .await .expect("v"); sleep(Duration::from_millis(20)).await; assert!(reader_client .try_recv() .map(|(r, _)| r) .expect("valid message: step") .as_end_of_stored_events() .is_some()); assert!(reader_client.try_recv().is_none()); let account1 = Account::default(); let signed_content = account1 .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None) .expect("valid signed content"); // account1 posts a new note into the relayer1, and the main relayer // should get a copy of it, as well as it is connected to relayer2 and // relayer1. main_client.post(signed_content.clone().into()).await; sleep(Duration::from_millis(10)).await; let responses = (0..3) .map(|_| reader_client.try_recv().expect("valid message")) .filter_map(|(r, url)| { r.as_event() .map(|r| (url.port().expect("port"), r.to_owned())) }) .collect::>(); assert!(reader_client.try_recv().is_none()); assert_eq!(responses.len(), 3); assert_eq!( responses .get(&relayer1.port().expect("port")) .map(|x| x.id.clone()), Some(signed_content.id.clone()) ); assert_eq!( responses .get(&relayer2.port().expect("port")) .map(|x| x.id.clone()), Some(signed_content.id.clone()) ); assert_eq!( responses .get(&relayer3.port().expect("port")) .map(|x| x.id.clone()), Some(signed_content.id) ); } #[tokio::test] async fn relayer_with_client_pool() { let (relayer1, _) = dummy_server(0, None).await; let (relayer2, _) = dummy_server(0, None).await; let (pool, _in_scope) = Pool::new_with_clients(vec![relayer1.clone(), relayer2]).expect("valid pool"); let (main_relayer, _) = dummy_server(0, Some(pool)).await; let (secondary_client, _sc) = Pool::new_with_clients(vec![relayer1]).expect("valid client"); // Create a subscription in the main relayer, main_client is only // connected to the main relayer let (mut main_client, _in_scope) = Pool::new_with_clients(vec![main_relayer]).expect("valid client"); let _sub = main_client.subscribe(Default::default()).await.expect("v"); sleep(Duration::from_millis(10)).await; assert!(main_client .try_recv() .map(|(r, _)| r) .expect("valid message") .as_end_of_stored_events() .is_some()); assert!(main_client.try_recv().is_none()); let account1 = Account::default(); let signed_content = account1 .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None) .expect("valid signed content"); // account1 posts a new note into the relayer1, and the main relayer // should get a copy of it, as well as it is connected to relayer2 and // relayer1. secondary_client.post(signed_content.clone().into()).await; // wait for the note to be delivered sleep(Duration::from_millis(10)).await; assert_eq!( Some((signed_content.id, signed_content.signature)), main_client .try_recv() .and_then(|(r, _)| r.as_event().cloned()) .map(|x| (x.id.clone(), x.signature.clone())) ); assert!(main_client.try_recv().is_none()); } }