|
- use crate::{
- connection::{ConnectionId, LocalConnection},
- Connection, Error,
- };
- use futures_util::StreamExt;
- use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url};
- use nostr_rs_storage_base::Storage;
- use nostr_rs_types::{
- relayer::{self, ROk, ROkStatus},
- types::{Addr, Event, SubscriptionId},
- Request, Response,
- };
- use std::{
- collections::{HashMap, HashSet},
- ops::Deref,
- sync::Arc,
- time::Instant,
- };
- use tokio::{
- net::{TcpListener, TcpStream},
- sync::{
- mpsc::{
- self, {channel, Receiver, Sender},
- },
- RwLock,
- },
- task::JoinHandle,
- };
- #[derive(Debug, Hash, Ord, PartialEq, PartialOrd, Eq, Clone)]
- pub struct RelayerSubscriptionId((SubscriptionId, ConnectionId));
- impl From<(SubscriptionId, ConnectionId)> for RelayerSubscriptionId {
- fn from(value: (SubscriptionId, ConnectionId)) -> Self {
- Self(value)
- }
- }
- impl Default for RelayerSubscriptionId {
- fn default() -> Self {
- Self((SubscriptionId::empty(), ConnectionId::new_empty()))
- }
- }
- type Connections = Arc<RwLock<HashMap<ConnectionId, Connection>>>;
- type SubscriptionManager =
- Arc<nostr_rs_subscription_manager::SubscriptionManager<RelayerSubscriptionId, ()>>;
- type ClientPoolSubscriptions =
- Arc<RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>>;
- /// Relayer struct
- ///
- pub struct Relayer<T: Storage + Send + Sync + 'static> {
- /// Storage engine, if provided the services are going to persisted in disk,
- /// otherwise all the messages are going to be ephemeral, making this
- /// relayer just a dumb proxy (that can be useful for privacy) but it won't
- /// be able to perform any optimization like prefetching content while offline
- storage: Arc<Option<T>>,
- /// Subscription manager
- subscription_manager: SubscriptionManager,
- /// List of all active connections
- connections: Connections,
- /// This Sender can be used to send requests from anywhere to the relayer.
- send_to_relayer: Sender<(ConnectionId, Request)>,
- /// This Receiver is the relayer the way the relayer receives messages
- relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
- /// Client pool
- ///
- /// A relayer can optionally be connected to a pool of clients to get
- /// foreign events.
- client_pool: Option<Arc<Pool>>,
- client_pool_receiver: Option<Receiver<(Response, Url)>>,
- client_pool_subscriptions: ClientPoolSubscriptions,
- }
- impl<T: Storage + Send + Sync + 'static> Relayer<T> {
- /// Creates a new relayer instance
- ///
- /// If the storage is given, it will be used to persist events, as well to
- /// server past events when a new subscription is added.
- ///
- /// If the client_pool is given it will be used to connect to those relayers
- /// and create a network of relayers, reposting events to them and
- /// subscribing to their events.`gqq`
- pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
- let (relayer_sender, relayer_receiver) = channel(100_000);
- let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool {
- let result = client_pool.split()?;
- (result.0, Some(Arc::new(result.1)))
- } else {
- let (_, receiver) = mpsc::channel(1);
- (receiver, None)
- };
- Ok(Self {
- storage: Arc::new(storage),
- subscription_manager: Default::default(),
- send_to_relayer: relayer_sender,
- relayer_receiver: Some(relayer_receiver),
- connections: Default::default(),
- client_pool_receiver: Some(client_pool_receiver),
- client_pool,
- client_pool_subscriptions: Default::default(),
- })
- }
- /// Connects to the relayer pool
- pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
- let _ = self
- .client_pool
- .as_ref()
- .ok_or(Error::NoClient)?
- .connect_to(url)
- .await?;
- Ok(())
- }
- /// Total number of subscribers requests that actively listening for new events
- pub fn total_subscribers(&self) -> usize {
- self.subscription_manager.total_subscribers()
- }
- /// Splits the relayer object and extract their receiver.
- pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> {
- let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?;
- Ok((self, receiver))
- }
- /// Runs the relayer main loop in a tokio task and returns it.
- ///
- /// This function consumes the object and takes the ownership. The returned
- /// JoinHandle() can be used to stop the main loop
- pub fn main(mut self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
- let mut client_pool_receiver = self
- .client_pool_receiver
- .take()
- .ok_or(Error::AlreadySplitted)?;
- let (this, mut receiver) = self.split()?;
- let _self = Arc::new(this);
- let this = _self.clone();
- let handle = tokio::spawn(async move {
- loop {
- let start = Instant::now();
- println!("{}", client_pool_receiver.len());
- tokio::select! {
- Ok((stream, _)) = server.accept() => {
- // accept new connections
- let _ = this.add_connection(None, stream).await;
- },
- Some((response, _)) = client_pool_receiver.recv() => {
- // process messages from anothe relayer, broadcast it and store it
- match response {
- Response::Event(event) => {
- // we received a message from the client pool, store it locally
- // and re-broadcast it.
- tokio::spawn(Self::broadcast(
- this.storage.clone(),
- this.subscription_manager.clone(),
- this.connections.clone(),
- event.event
- ));
- }
- Response::EndOfStoredEvents(sub) => {
- let connections = this.connections.read().await;
- let (sub_id, connection) = if let Some((sub_id, conn_id)) = this.client_pool_subscriptions.write().await.remove(&(sub.deref().into())) {
- if let Some(connection) = connections.get(&conn_id) {
- (sub_id, connection)
- } else {
- continue;
- }
- } else {
- continue
- };
- let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into()));
- let duration = start.elapsed();
- println!("xTime elapsed: {} ms", duration.as_millis());
- }
- _ => {}
- }
- }
- Some((conn_id, request)) = receiver.recv() => {
- tokio::spawn(Self::process_request(
- this.storage.clone(),
- this.client_pool.clone(),
- this.client_pool_subscriptions.clone(),
- this.subscription_manager.clone(),
- this.connections.clone(),
- conn_id,
- request.clone()
- ));
- }
- else => {
- }
- }
- }
- });
- Ok((_self, handle))
- }
- /// Returns a reference to the internal database
- pub fn get_db(&self) -> &Option<T> {
- &self.storage
- }
- /// Adds a new local connection to the list of active connections.
- pub async fn create_new_local_connection(self: &Arc<Self>) -> LocalConnection<T> {
- let (conn, receiver) = Connection::new_local_connection();
- let conn_id = conn.get_conn_id();
- self.connections.write().await.insert(conn_id, conn);
- (
- conn_id,
- receiver,
- self.send_to_relayer.clone(),
- self.clone(),
- )
- .into()
- }
- /// Drops a connection from the list of active connections
- ///
- /// This function only works for local connections, normal connections can
- /// be dropped on disconnection.
- ///
- /// This function could change in the future tu kick connections programmatically
- pub fn drop_connection(self: &Arc<Self>, local_connection: &LocalConnection<T>) {
- let id = local_connection.conn_id;
- let this = self.clone();
- tokio::spawn(async move {
- this.connections.write().await.remove(&id);
- });
- }
- /// Adds a new TpStream and adds it to the list of active connections.
- ///
- /// This function will spawn the client's loop to receive incoming messages and send those messages
- pub async fn add_connection(
- &self,
- disconnection_notify: Option<mpsc::Sender<ConnectionId>>,
- stream: TcpStream,
- ) -> Result<ConnectionId, Error> {
- let conn =
- Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
- .await?;
- let id = conn.get_conn_id();
- self.connections.write().await.insert(id, conn);
- Ok(id)
- }
- #[cfg(test)]
- async fn process_request_from_client(
- &self,
- connection: &LocalConnection<T>,
- request: Request,
- ) -> Result<(), Error> {
- Self::process_request(
- self.storage.clone(),
- self.client_pool.clone(),
- self.client_pool_subscriptions.clone(),
- self.subscription_manager.clone(),
- self.connections.clone(),
- connection.conn_id,
- request,
- )
- .await
- }
- /// Process a request from a connected client
- async fn process_request(
- storage: Arc<Option<T>>,
- client_pool: Option<Arc<Pool>>,
- client_pool_subscriptions: ClientPoolSubscriptions,
- subscription_manager: SubscriptionManager,
- connections: Connections,
- connection_id: ConnectionId,
- request: Request,
- ) -> Result<(), Error> {
- match request {
- Request::Event(event) => {
- let read_connections = connections.read().await;
- let connection = read_connections
- .get(&connection_id)
- .ok_or(Error::UnknownConnection(connection_id))?;
- let event_id: Addr = event.id.clone().into();
- if !Self::broadcast(
- storage.clone(),
- subscription_manager.clone(),
- connections.clone(),
- event.deref().clone(),
- )
- .await?
- {
- connection.respond(
- ROk {
- id: event_id,
- status: ROkStatus::Duplicate,
- }
- .into(),
- )?;
- return Ok(());
- }
- if let Some(storage) = storage.as_ref() {
- let _ = storage.store_local_event(&event).await;
- }
- if let Some(client_pool) = client_pool.as_ref() {
- // pass the event to the pool of clients, so this relayer can relay
- // their local events to the clients in the network of relayers
- let _ = client_pool.post(event).await;
- }
- connection.respond(
- ROk {
- id: event_id,
- status: ROkStatus::Ok,
- }
- .into(),
- )?;
- }
- Request::Request(request) => {
- let foreign_subscription = if let Some(client_pool) = client_pool.as_ref() {
- // If this relay is connected to other relays through the
- // client pool, create the same subscription in them as
- // well, with the main goal of fetching any foreign event
- // that matches the requested subscription.
- //
- // If the this happens, this relay will serve any local
- // event that matches, as well any foreign event. Foreign
- // events will be stored locally as well if there is a
- // storage setup.
- let foreign_sub_id = client_pool
- .subscribe(request.filters.clone().into())
- .await?;
- client_pool_subscriptions.write().await.insert(
- foreign_sub_id.clone(),
- (request.subscription_id.clone(), connection_id),
- );
- Some(foreign_sub_id)
- } else {
- None
- };
- let read_connections = connections.read().await;
- let connection = read_connections
- .get(&connection_id)
- .ok_or(Error::UnknownConnection(connection_id))?;
- if let Some(storage) = storage.as_ref() {
- let mut sent = HashSet::new();
- // Sent all events that match the filter that are stored in our database
- for filter in request.filters.clone().into_iter() {
- let mut result = storage.get_by_filter(filter).await?;
- while let Some(Ok(event)) = result.next().await {
- if sent.contains(&event.id) {
- continue;
- }
- sent.insert(event.id.clone());
- let _ = connection.respond(
- relayer::Event {
- subscription_id: request.subscription_id.clone(),
- event,
- }
- .into(),
- );
- }
- }
- }
- if foreign_subscription.is_none() {
- // If there is a foreign subscription, we shouldn't send a
- // EOS until we have got EOS from all foreign relays
- let _ = connection.respond(
- relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
- );
- }
- connection
- .subscribe(
- request.subscription_id.clone(),
- (
- foreign_subscription,
- subscription_manager
- .subscribe(
- (request.subscription_id, connection.get_conn_id()).into(),
- request.filters,
- (),
- )
- .await,
- ),
- )
- .await;
- }
- Request::Close(close) => {
- connections
- .read()
- .await
- .get(&connection_id)
- .ok_or(Error::UnknownConnection(connection_id))?
- .unsubscribe(&close)
- .await;
- }
- };
- Ok(())
- }
- #[inline]
- /// A non-blocking version of broadcast
- pub async fn broadcast(
- storage: Arc<Option<T>>,
- subscription_manager: SubscriptionManager,
- connections: Connections,
- event: Event,
- ) -> Result<bool, Error> {
- if let Some(storage) = storage.as_ref() {
- if !storage.store(&event).await? {
- return Ok(false);
- }
- }
- let connections = connections.read().await;
- for RelayerSubscriptionId((sub_id, conn_id)) in
- subscription_manager.get_subscribers(&event).await
- {
- if let Some(connection) = connections.get(&conn_id) {
- let _ = connection.respond(
- relayer::Event {
- subscription_id: sub_id,
- event: event.clone(),
- }
- .into(),
- );
- }
- }
- Ok(true)
- }
- }
- #[cfg(test)]
- mod test {
- use super::*;
- use futures::future::join_all;
- use nostr_rs_client::Url;
- use nostr_rs_memory::Memory;
- use nostr_rs_types::{
- account::Account,
- types::{Content, Tag},
- Request,
- };
- use serde_json::json;
- use std::time::Duration;
- use tokio::time::sleep;
- async fn dummy_server(port: u16, client_pool: Option<Pool>) -> (Url, JoinHandle<()>) {
- let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
- .await
- .unwrap();
- let local_addr = listener.local_addr().expect("addr");
- let relayer =
- Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
- let (_, stopper) = relayer.main(listener).expect("valid main loop");
- (
- Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
- stopper,
- )
- }
- async fn dummy_server_with_relayer(
- client_pool: Option<Pool>,
- ) -> (Arc<Relayer<Memory>>, JoinHandle<()>) {
- let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).await.unwrap();
- let relayer =
- Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
- let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
- (relayer, stopper)
- }
- fn get_note_with_custom_tags(tags: serde_json::Value) -> Event {
- let account = Account::default();
- let content = Content::ShortTextNote("".to_owned());
- let tags: Vec<Tag> = serde_json::from_value(tags).expect("valid tags");
- account.sign_content(tags, content, None).expect("valid")
- }
- fn get_note() -> Request {
- serde_json::from_value(json!(
- [
- "EVENT",
- {
- "kind":1,
- "content":"Pong",
- "tags":[
- ["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],
- ["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],
- ["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- ["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]
- ],
- "created_at":1681938616,
- "pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3",
- "id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
- "sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc",
- }
- ])).expect("value")
- }
- async fn get_db(prefill: bool) -> Memory {
- let db = Memory::default();
- if prefill {
- let events = include_str!("../tests/events.json")
- .lines()
- .map(|line| serde_json::from_str(line).expect("valid"))
- .collect::<Vec<Event>>();
- for event in events {
- assert!(db.store(&event).await.expect("valid"));
- }
- while db.is_flushing() {
- tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
- }
- }
- db
- }
- #[tokio::test]
- async fn serve_listener_from_local_db_custom_tag() {
- let request = serde_json::from_value(json!([
- "REQ",
- "1298169700973717",
- {
- "#f": [
- "foo",
- ],
- },
- ]))
- .expect("valid object");
- let relayer =
- Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
- let mut connection = relayer.create_new_local_connection().await;
- let note = get_note_with_custom_tags(json!([["f", "foo"]]));
- let _ = relayer
- .process_request_from_client(&connection, note.clone().into())
- .await;
- sleep(Duration::from_millis(10)).await;
- let _ = relayer
- .process_request_from_client(&connection, request)
- .await;
- // ev1
- assert_eq!(
- ROkStatus::Ok,
- connection
- .try_recv()
- .expect("valid")
- .as_ok()
- .cloned()
- .unwrap()
- .status,
- );
- // ev1
- assert_eq!(
- note,
- connection
- .try_recv()
- .expect("valid")
- .as_event()
- .unwrap()
- .event
- );
- // eod
- assert!(connection
- .try_recv()
- .expect("valid")
- .as_end_of_stored_events()
- .is_some());
- assert!(connection.try_recv().is_none());
- }
- #[tokio::test]
- async fn serve_listener_from_local_db() {
- let request = serde_json::from_value(json!([
- "REQ",
- "1298169700973717",
- {
- "authors": [
- "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
- ],
- "until": 1681928304
- },
- {
- "#p": [
- "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
- ],
- "kinds": [
- 1,
- 3,
- 6,
- 7,
- 9735
- ],
- "until": 1681928304
- },
- {
- "#p": [
- "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
- ],
- "kinds": [
- 4
- ]
- },
- {
- "authors": [
- "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
- ],
- "kinds": [
- 4
- ]
- },
- {
- "#e": [
- "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
- "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
- ],
- "kinds": [
- 1,
- 6,
- 7,
- 9735
- ]
- }
- ]))
- .expect("valid object");
- let relayer =
- Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
- let mut connection = relayer.create_new_local_connection().await;
- let _ = relayer
- .process_request_from_client(&connection, request)
- .await;
- // ev1
- assert_eq!(
- "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
- connection
- .try_recv()
- .expect("valid")
- .as_event()
- .expect("event")
- .id
- .to_string()
- );
- // ev3
- assert_eq!(
- "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
- connection
- .try_recv()
- .expect("valid")
- .as_event()
- .expect("event")
- .id
- .to_string()
- );
- // ev2
- assert_eq!(
- "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
- connection
- .try_recv()
- .expect("valid")
- .as_event()
- .expect("event")
- .id
- .to_string()
- );
- // eod
- assert!(connection
- .try_recv()
- .expect("valid")
- .as_end_of_stored_events()
- .is_some());
- assert!(connection.try_recv().is_none());
- }
- #[tokio::test]
- async fn server_listener_real_time_single_argument() {
- let request: Request = serde_json::from_value(json!(
- [
- "REQ",
- "1298169700973717",
- {
- "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- "since":1681939304
- },
- {
- "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- "since":1681939304
- },
- {
- "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- },
- {
- "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- },
- {
- "#e":[
- "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
- "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
- ],
- }
- ]))
- .expect("valid object");
- let (relayer, _stopper) = dummy_server_with_relayer(None).await;
- let mut receiver = relayer.create_new_local_connection().await;
- let mut publisher = relayer.create_new_local_connection().await;
- assert_eq!(relayer.total_subscribers(), 0);
- receiver.send(request).await.expect("subscribe");
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 1);
- // eod
- assert!(receiver
- .try_recv()
- .expect("valid")
- .as_end_of_stored_events()
- .is_some());
- // It is empty
- assert!(receiver.try_recv().is_none());
- publisher.send(get_note()).await.expect("valid send");
- sleep(Duration::from_millis(10)).await;
- // ok from posting
- let msg = publisher.try_recv();
- assert!(msg.is_some());
- assert_eq!(
- msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
- "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
- );
- // It is not empty
- let msg = receiver.try_recv();
- assert!(msg.is_some());
- assert_eq!(
- msg.expect("is ok")
- .as_event()
- .expect("valid")
- .subscription_id
- .to_string(),
- "1298169700973717".to_owned()
- );
- // it must be deliverd at most once
- assert!(receiver.try_recv().is_none());
- assert_eq!(relayer.total_subscribers(), 1);
- // when client is dropped, the subscription is removed
- // automatically
- drop(receiver);
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 0);
- }
- #[tokio::test]
- async fn server_listener_real_time() {
- let request: Request = serde_json::from_value(json!(
- [
- "REQ",
- "1298169700973717",
- {
- "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- "since":1681939304
- },
- {
- "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- "kinds":[1,3,6,7,9735],
- "since":1681939304
- },
- {
- "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- "kinds":[4]
- },
- {
- "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
- "kinds":[4]
- },
- {
- "#e":[
- "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
- "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
- ],
- "kinds":[1,6,7,9735]
- }
- ]))
- .expect("valid object");
- let (relayer, _stopper) = dummy_server_with_relayer(None).await;
- let mut receiver = relayer.create_new_local_connection().await;
- let mut publisher = relayer.create_new_local_connection().await;
- assert_eq!(relayer.total_subscribers(), 0);
- receiver.send(request).await.expect("subscribe");
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 1);
- // eod
- assert!(receiver
- .try_recv()
- .expect("valid")
- .as_end_of_stored_events()
- .is_some());
- // It is empty
- assert!(receiver.try_recv().is_none());
- publisher.send(get_note()).await.expect("valid send");
- sleep(Duration::from_millis(100)).await;
- // ok from posting
- let msg = publisher.try_recv();
- assert!(msg.is_some());
- assert_eq!(
- msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
- "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
- );
- // It is not empty
- let msg = receiver.try_recv();
- assert!(msg.is_some());
- assert_eq!(
- msg.expect("is ok")
- .as_event()
- .expect("valid")
- .subscription_id
- .to_string(),
- "1298169700973717".to_owned()
- );
- // it must be deliverd at most once
- assert!(receiver.try_recv().is_none());
- assert_eq!(relayer.total_subscribers(), 1);
- // when client is dropped, the subscription is removed
- // automatically
- drop(receiver);
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 0);
- }
- #[tokio::test]
- async fn multiple_subcribers() {
- let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
- "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb4"],
- }]))
- .expect("valid object");
- let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
- "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"]
- }]))
- .expect("valid object");
- let (relayer, _stopper) = dummy_server_with_relayer(None).await;
- let mut publisher = relayer.create_new_local_connection().await;
- let mut set1 = join_all(
- (0..1000)
- .map(|_| relayer.create_new_local_connection())
- .collect::<Vec<_>>(),
- )
- .await;
- let mut set2 = join_all(
- (0..100)
- .map(|_| relayer.create_new_local_connection())
- .collect::<Vec<_>>(),
- )
- .await;
- assert_eq!(relayer.total_subscribers(), 0);
- join_all(
- set1.iter()
- .map(|connection| connection.send(req1.clone()))
- .collect::<Vec<_>>(),
- )
- .await
- .into_iter()
- .collect::<Result<Vec<_>, _>>()
- .expect("subscribe all");
- join_all(
- set2.iter()
- .map(|connection| connection.send(req2.clone()))
- .collect::<Vec<_>>(),
- )
- .await
- .into_iter()
- .collect::<Result<Vec<_>, _>>()
- .expect("subscribe all");
- sleep(Duration::from_millis(10)).await;
- for connection in set1.iter_mut() {
- assert!(connection
- .try_recv()
- .expect("end of stored events")
- .as_end_of_stored_events()
- .is_some());
- }
- for connection in set2.iter_mut() {
- assert!(connection
- .try_recv()
- .expect("end of stored events")
- .as_end_of_stored_events()
- .is_some());
- }
- assert_eq!(relayer.total_subscribers(), 1100);
- publisher.send(get_note()).await.expect("valid send");
- sleep(Duration::from_millis(10)).await;
- let msg = publisher.try_recv();
- assert!(msg.is_some());
- assert_eq!(
- msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
- "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
- );
- for connection in set1.iter_mut() {
- assert!(connection.try_recv().is_none());
- }
- for connection in set2.iter_mut() {
- let msg = connection.try_recv();
- assert!(msg.is_some());
- let msg = msg.expect("msg");
- assert_eq!(
- msg.as_event().expect("valid").subscription_id.to_string(),
- "1298169700973717".to_owned()
- );
- assert!(connection.try_recv().is_none());
- }
- drop(set1);
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 100);
- drop(set2);
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 0);
- drop(relayer);
- }
- #[tokio::test]
- async fn posting_event_replies_ok() {
- let relayer =
- Arc::new(Relayer::new(Some(get_db(false).await), None).expect("valid relayer"));
- let mut connection = relayer.create_new_local_connection().await;
- let note = get_note();
- let note_id = note.as_event().map(|x| x.id.clone()).unwrap();
- relayer
- .process_request_from_client(&connection, note)
- .await
- .expect("process event");
- sleep(Duration::from_millis(10)).await;
- assert_eq!(
- Some(
- ROk {
- id: note_id.into(),
- status: ROkStatus::Ok,
- }
- .into()
- ),
- connection.try_recv()
- );
- }
- #[tokio::test]
- async fn subscribe_to_all() {
- let request: Request =
- serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
- let (relayer, _stopper) = dummy_server_with_relayer(None).await;
- let mut local_connection_0 = relayer.create_new_local_connection().await;
- let mut local_connection_1 = relayer.create_new_local_connection().await;
- assert_eq!(relayer.total_subscribers(), 0);
- local_connection_1.send(request).await.expect("valid send");
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 1);
- // eod
- assert!(local_connection_1
- .try_recv()
- .expect("valid")
- .as_end_of_stored_events()
- .is_some());
- // It is empty
- assert!(local_connection_1.try_recv().is_none());
- local_connection_0
- .send(get_note())
- .await
- .expect("valid send");
- sleep(Duration::from_millis(10)).await;
- // ok from posting
- let msg = local_connection_0.try_recv();
- assert!(msg.is_some());
- assert_eq!(
- msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
- "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
- );
- // It is not empty
- let msg = local_connection_1.try_recv();
- assert!(msg.is_some());
- assert_eq!(
- msg.expect("is ok")
- .as_event()
- .expect("valid")
- .subscription_id
- .to_string(),
- "1298169700973717".to_owned()
- );
- // it must be deliverd at most once
- assert!(local_connection_1.try_recv().is_none());
- assert_eq!(relayer.total_subscribers(), 1);
- // when client is dropped, the subscription is removed
- // automatically
- drop(local_connection_1);
- sleep(Duration::from_millis(10)).await;
- assert_eq!(relayer.total_subscribers(), 0);
- }
- #[tokio::test]
- async fn relayer_posts_to_custom_posts_to_all_clients() {
- let (relayer1, _) = dummy_server(0, None).await;
- let (relayer2, _) = dummy_server(0, None).await;
- let (relayer3, _) = dummy_server(0, None).await;
- let (pool, _in_scope) =
- Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
- .expect("valid pool");
- let (main_relayer, _) = dummy_server(0, Some(pool)).await;
- let (mut reader_client, _reader_client_inscope) =
- Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
- .expect("valid pool");
- let (main_client, _main_client_inscope) =
- Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
- let _sub = reader_client
- .subscribe(Default::default())
- .await
- .expect("v");
- sleep(Duration::from_millis(20)).await;
- assert!(reader_client
- .try_recv()
- .map(|(r, _)| r)
- .expect("valid message: step")
- .as_end_of_stored_events()
- .is_some());
- assert!(reader_client.try_recv().is_none());
- let account1 = Account::default();
- let signed_content = account1
- .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
- .expect("valid signed content");
- // account1 posts a new note into the relayer1, and the main relayer
- // should get a copy of it, as well as it is connected to relayer2 and
- // relayer1.
- main_client.post(signed_content.clone().into()).await;
- sleep(Duration::from_millis(10)).await;
- let responses = (0..3)
- .map(|_| reader_client.try_recv().expect("valid message"))
- .filter_map(|(r, url)| {
- r.as_event()
- .map(|r| (url.port().expect("port"), r.to_owned()))
- })
- .collect::<HashMap<_, _>>();
- assert!(reader_client.try_recv().is_none());
- assert_eq!(responses.len(), 3);
- assert_eq!(
- responses
- .get(&relayer1.port().expect("port"))
- .map(|x| x.id.clone()),
- Some(signed_content.id.clone())
- );
- assert_eq!(
- responses
- .get(&relayer2.port().expect("port"))
- .map(|x| x.id.clone()),
- Some(signed_content.id.clone())
- );
- assert_eq!(
- responses
- .get(&relayer3.port().expect("port"))
- .map(|x| x.id.clone()),
- Some(signed_content.id)
- );
- }
- #[tokio::test]
- async fn relayer_with_client_pool() {
- let (relayer1, _) = dummy_server(0, None).await;
- let (relayer2, _) = dummy_server(0, None).await;
- let (pool, _in_scope) =
- Pool::new_with_clients(vec![relayer1.clone(), relayer2]).expect("valid pool");
- let (main_relayer, _) = dummy_server(0, Some(pool)).await;
- let (secondary_client, _sc) = Pool::new_with_clients(vec![relayer1]).expect("valid client");
- // Create a subscription in the main relayer, main_client is only
- // connected to the main relayer
- let (mut main_client, _in_scope) =
- Pool::new_with_clients(vec![main_relayer]).expect("valid client");
- let _sub = main_client.subscribe(Default::default()).await.expect("v");
- sleep(Duration::from_millis(10)).await;
- assert!(main_client
- .try_recv()
- .map(|(r, _)| r)
- .expect("valid message")
- .as_end_of_stored_events()
- .is_some());
- assert!(main_client.try_recv().is_none());
- let account1 = Account::default();
- let signed_content = account1
- .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None)
- .expect("valid signed content");
- // account1 posts a new note into the relayer1, and the main relayer
- // should get a copy of it, as well as it is connected to relayer2 and
- // relayer1.
- secondary_client.post(signed_content.clone().into()).await;
- // wait for the note to be delivered
- sleep(Duration::from_millis(10)).await;
- assert_eq!(
- Some((signed_content.id, signed_content.signature)),
- main_client
- .try_recv()
- .and_then(|(r, _)| r.as_event().cloned())
- .map(|x| (x.id.clone(), x.signature.clone()))
- );
- assert!(main_client.try_recv().is_none());
- }
- }
|