relayer.rs 35 KB


  1. use crate::{
  2. connection::{ConnectionId, LocalConnection},
  3. subscription::SubscriptionManager,
  4. Connection, Error,
  5. };
  6. use futures_util::StreamExt;
  7. use nostr_rs_client::{Error as ClientError, Pool, Url};
  8. use nostr_rs_storage_base::Storage;
  9. use nostr_rs_types::{
  10. relayer::{self, ROk, ROkStatus},
  11. types::{Addr, Event},
  12. Request, Response,
  13. };
  14. use std::{
  15. collections::{HashMap, HashSet},
  16. ops::Deref,
  17. sync::Arc,
  18. };
  19. use tokio::{
  20. net::{TcpListener, TcpStream},
  21. sync::mpsc::{channel, Receiver, Sender},
  22. };
  23. use tokio::{
  24. sync::{mpsc, RwLock},
  25. task::JoinHandle,
  26. };
  27. /// Relayer struct
  28. ///
  29. pub struct Relayer<T: Storage + Send + Sync + 'static> {
  30. /// Storage engine, if provided the services are going to persisted in disk,
  31. /// otherwise all the messages are going to be ephemeral, making this
  32. /// relayer just a dumb proxy (that can be useful for privacy) but it won't
  33. /// be able to perform any optimization like prefetching content while offline
  34. storage: Option<T>,
  35. /// Subscription manager
  36. subscriptions: Arc<SubscriptionManager>,
  37. /// List of all active connections
  38. connections: RwLock<HashMap<ConnectionId, Connection>>,
  39. /// This Sender can be used to send requests from anywhere to the relayer.
  40. send_to_relayer: Sender<(ConnectionId, Request)>,
  41. /// This Receiver is the relayer the way the relayer receives messages
  42. relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
  43. /// Client pool
  44. ///
  45. /// A relayer can optionally be connected to a pool of clients to get foreign events.
  46. client_pool: Option<(Pool, JoinHandle<()>)>,
  47. }
  48. impl<T: Storage + Send + Sync + 'static> Drop for Relayer<T> {
  49. fn drop(&mut self) {
  50. if let Some((_, handle)) = self.client_pool.take() {
  51. handle.abort();
  52. }
  53. }
  54. }
  55. impl<T: Storage + Send + Sync + 'static> Relayer<T> {
  56. /// Creates a new relayer instance
  57. ///
  58. /// If the storage is given, it will be used to persist events, as well to
  59. /// server past events when a new subscription is added.
  60. ///
  61. /// If the client_pool is given it will be used to connect to those relayers
  62. /// and create a network of relayers, reposting events to them and
  63. /// subscribing to their events.`gqq`
  64. pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
  65. let (sender, receiver) = channel(100_000);
  66. Ok(Self {
  67. storage,
  68. subscriptions: Default::default(),
  69. send_to_relayer: sender.clone(),
  70. relayer_receiver: Some(receiver),
  71. connections: Default::default(),
  72. client_pool: if let Some(client_pool) = client_pool {
  73. Some(Self::handle_client_pool(client_pool, sender)?)
  74. } else {
  75. None
  76. },
  77. })
  78. }
  79. /// Connects to the relayer pool
  80. pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
  81. let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
  82. client_pool.connect_to(url).await;
  83. Ok(())
  84. }
  85. /// Total number of subscribers requests that actively listening for new events
  86. pub fn total_subscribers(&self) -> usize {
  87. self.subscriptions.total_subscribers()
  88. }
  89. /// Splits the relayer object and extract their receiver.
  90. pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> {
  91. let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?;
  92. Ok((self, receiver))
  93. }
  94. /// Runs the relayer main loop in a tokio task and returns it.
  95. ///
  96. /// This function consumes the object and takes the ownership. The returned
  97. /// JoinHandle() can be used to stop the main loop
  98. pub fn main(self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
  99. let (this, mut receiver) = self.split()?;
  100. let _self = Arc::new(this);
  101. let this = _self.clone();
  102. let handle = tokio::spawn(async move {
  103. loop {
  104. tokio::select! {
  105. Ok((stream, _)) = server.accept() => {
  106. // accept new external connections
  107. let _ = this.add_connection(None, stream).await;
  108. },
  109. Some((conn_id, request)) = receiver.recv() => {
  110. // receive messages from the connection pool
  111. if conn_id.is_empty() {
  112. // message received from client pool
  113. if let Request::Event(event) = request {
  114. let _ = this.broadcast(event.deref()).await;
  115. if let Some(storage) = this.storage.as_ref() {
  116. let _ = storage.store_local_event(&event).await;
  117. }
  118. }
  119. continue;
  120. }
  121. let connections = this.connections.read().await;
  122. let connection = if let Some(connection) = connections.get(&conn_id) {
  123. connection
  124. } else {
  125. continue;
  126. };
  127. // receive messages from clients
  128. let _ = this.process_request_from_client(connection, request).await;
  129. drop(connections);
  130. }
  131. else => {
  132. }
  133. }
  134. }
  135. });
  136. Ok((_self, handle))
  137. }
  138. /// Handle the client pool
  139. ///
  140. /// Main loop to consume messages from the client pool and broadcast them to the local subscribers
  141. fn handle_client_pool(
  142. client_pool: Pool,
  143. send_message_to_relayer: Sender<(ConnectionId, Request)>,
  144. ) -> Result<(Pool, JoinHandle<()>), ClientError> {
  145. let (mut receiver, client_pool) = client_pool.split()?;
  146. let handle = tokio::spawn(async move {
  147. loop {
  148. if receiver.len() > 500 {
  149. println!("{}", receiver.len());
  150. }
  151. if let Some((response, _)) = receiver.recv().await {
  152. match response {
  153. Response::Event(event) => {
  154. let _ = send_message_to_relayer.try_send((
  155. ConnectionId::new_empty(),
  156. Request::Event(event.event.into()),
  157. ));
  158. }
  159. Response::EndOfStoredEvents(_) => {}
  160. x => {
  161. println!("x => {:?}", x);
  162. }
  163. }
  164. }
  165. }
  166. });
  167. Ok((client_pool, handle))
  168. }
  169. /// Returns a reference to the internal database
  170. pub fn get_db(&self) -> &Option<T> {
  171. &self.storage
  172. }
  173. /// Adds a new local connection to the list of active connections.
  174. pub async fn create_new_local_connection(&self) -> LocalConnection {
  175. let (conn, receiver) = Connection::new_local_connection();
  176. let conn_id = conn.get_conn_id();
  177. self.connections.write().await.insert(conn_id, conn);
  178. (conn_id, receiver, self.send_to_relayer.clone()).into()
  179. }
  180. /// Adds a new TpStream and adds it to the list of active connections.
  181. ///
  182. /// This function will spawn the client's loop to receive incoming messages and send those messages
  183. pub async fn add_connection(
  184. &self,
  185. disconnection_notify: Option<mpsc::Sender<ConnectionId>>,
  186. stream: TcpStream,
  187. ) -> Result<ConnectionId, Error> {
  188. let conn =
  189. Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
  190. .await?;
  191. let id = conn.get_conn_id();
  192. self.connections.write().await.insert(id, conn);
  193. Ok(id)
  194. }
  195. /// Process a request from a connected client
  196. async fn process_request_from_client(
  197. &self,
  198. connection: &Connection,
  199. request: Request,
  200. ) -> Result<(), Error> {
  201. match request {
  202. Request::Event(event) => {
  203. let event_id: Addr = event.id.clone().into();
  204. if !self.broadcast(&event).await? {
  205. connection.send(
  206. ROk {
  207. id: event_id,
  208. status: ROkStatus::Duplicate,
  209. }
  210. .into(),
  211. )?;
  212. return Ok(());
  213. }
  214. if let Some(storage) = self.storage.as_ref() {
  215. let _ = storage.store_local_event(&event).await;
  216. }
  217. if let Some((client_pool, _)) = self.client_pool.as_ref() {
  218. // pass the event to the pool of clients, so this relayer can relay
  219. // their local events to the clients in the network of relayers
  220. let _ = client_pool.post(event).await;
  221. }
  222. connection.send(
  223. ROk {
  224. id: event_id,
  225. status: ROkStatus::Ok,
  226. }
  227. .into(),
  228. )?;
  229. }
  230. Request::Request(request) => {
  231. let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref()
  232. {
  233. // pass the subscription request to the pool of clients, so this relayer
  234. // can relay any unknown event to the clients through their subscriptions
  235. Some(client_pool.subscribe(request.clone()).await?)
  236. } else {
  237. None
  238. };
  239. if let Some(storage) = self.storage.as_ref() {
  240. let mut sent = HashSet::new();
  241. // Sent all events that match the filter that are stored in our database
  242. for filter in request.filters.clone().into_iter() {
  243. let mut result = storage.get_by_filter(filter).await?;
  244. while let Some(Ok(event)) = result.next().await {
  245. if sent.contains(&event.id) {
  246. continue;
  247. }
  248. sent.insert(event.id.clone());
  249. let _ = connection.send(
  250. relayer::Event {
  251. subscription_id: request.subscription_id.clone(),
  252. event,
  253. }
  254. .into(),
  255. );
  256. }
  257. }
  258. }
  259. let _ = connection
  260. .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
  261. connection
  262. .subscribe(
  263. request.subscription_id.clone(),
  264. (
  265. foreign_subscription,
  266. self.subscriptions
  267. .subscribe(
  268. connection.get_conn_id(),
  269. connection.get_sender(),
  270. request.clone(),
  271. )
  272. .await,
  273. ),
  274. )
  275. .await;
  276. }
  277. Request::Close(close) => {
  278. connection.unsubscribe(&close).await;
  279. }
  280. };
  281. Ok(())
  282. }
  283. #[inline]
  284. /// Broadcast a given event to all local subscribers
  285. pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
  286. if let Some(storage) = self.storage.as_ref() {
  287. if !storage.store(event).await? {
  288. return Ok(false);
  289. }
  290. }
  291. self.subscriptions.broadcast(event.clone());
  292. Ok(true)
  293. }
  294. }
  295. #[cfg(test)]
  296. mod test {
  297. use std::time::Duration;
  298. use super::*;
  299. use futures::future::join_all;
  300. use nostr_rs_client::Url;
  301. use nostr_rs_memory::Memory;
  302. use nostr_rs_types::{
  303. account::Account,
  304. types::{Content, Tag},
  305. Request,
  306. };
  307. use serde_json::json;
  308. use tokio::time::sleep;
  309. async fn dummy_server(port: u16, client_pool: Option<Pool>) -> (Url, JoinHandle<()>) {
  310. let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
  311. .await
  312. .unwrap();
  313. let local_addr = listener.local_addr().expect("addr");
  314. let relayer =
  315. Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
  316. let (_, stopper) = relayer.main(listener).expect("valid main loop");
  317. (
  318. Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
  319. stopper,
  320. )
  321. }
  322. fn get_note_with_custom_tags(tags: Vec<Tag>) -> Event {
  323. let account = Account::default();
  324. let content = Content::ShortTextNote("".to_owned());
  325. account.sign_content(tags, content, None).expect("valid")
  326. }
  327. fn get_note() -> Request {
  328. serde_json::from_value(json!(
  329. [
  330. "EVENT",
  331. {
  332. "kind":1,
  333. "content":"Pong",
  334. "tags":[
  335. ["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],
  336. ["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],
  337. ["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  338. ["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]
  339. ],
  340. "created_at":1681938616,
  341. "pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3",
  342. "id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
  343. "sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc",
  344. }
  345. ])).expect("value")
  346. }
  347. async fn get_db(prefill: bool) -> Memory {
  348. let db = Memory::default();
  349. if prefill {
  350. let events = include_str!("../tests/events.json")
  351. .lines()
  352. .map(|line| serde_json::from_str(line).expect("valid"))
  353. .collect::<Vec<Event>>();
  354. for event in events {
  355. assert!(db.store(&event).await.expect("valid"));
  356. }
  357. while db.is_flushing() {
  358. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
  359. }
  360. }
  361. db
  362. }
  363. #[tokio::test]
  364. async fn serve_listener_from_local_db_custom_tag() {
  365. let request = serde_json::from_value(json!([
  366. "REQ",
  367. "1298169700973717",
  368. {
  369. "#f": [
  370. "foo",
  371. ],
  372. },
  373. ]))
  374. .expect("valid object");
  375. let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer");
  376. let (connection, mut recv) = Connection::new_local_connection();
  377. let note =
  378. get_note_with_custom_tags(vec![Tag::Unknown("f".to_owned(), vec!["foo".to_owned()])]);
  379. let _ = relayer
  380. .process_request_from_client(&connection, note.clone().into())
  381. .await;
  382. sleep(Duration::from_millis(10)).await;
  383. let _ = relayer
  384. .process_request_from_client(&connection, request)
  385. .await;
  386. // ev1
  387. assert_eq!(
  388. ROkStatus::Ok,
  389. recv.try_recv()
  390. .expect("valid")
  391. .as_ok()
  392. .cloned()
  393. .unwrap()
  394. .status,
  395. );
  396. // ev1
  397. assert_eq!(
  398. note,
  399. recv.try_recv().expect("valid").as_event().unwrap().event
  400. );
  401. // eod
  402. assert!(recv
  403. .try_recv()
  404. .expect("valid")
  405. .as_end_of_stored_events()
  406. .is_some());
  407. assert!(recv.try_recv().is_err());
  408. }
  409. #[tokio::test]
  410. async fn serve_listener_from_local_db() {
  411. let request = serde_json::from_value(json!([
  412. "REQ",
  413. "1298169700973717",
  414. {
  415. "authors": [
  416. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  417. ],
  418. "since": 1681928304
  419. },
  420. {
  421. "#p": [
  422. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  423. ],
  424. "kinds": [
  425. 1,
  426. 3,
  427. 6,
  428. 7,
  429. 9735
  430. ],
  431. "since": 1681928304
  432. },
  433. {
  434. "#p": [
  435. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  436. ],
  437. "kinds": [
  438. 4
  439. ]
  440. },
  441. {
  442. "authors": [
  443. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  444. ],
  445. "kinds": [
  446. 4
  447. ]
  448. },
  449. {
  450. "#e": [
  451. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  452. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  453. ],
  454. "kinds": [
  455. 1,
  456. 6,
  457. 7,
  458. 9735
  459. ]
  460. }
  461. ]))
  462. .expect("valid object");
  463. let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer");
  464. let (connection, mut recv) = Connection::new_local_connection();
  465. let _ = relayer
  466. .process_request_from_client(&connection, request)
  467. .await;
  468. // ev1
  469. assert_eq!(
  470. "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
  471. recv.try_recv()
  472. .expect("valid")
  473. .as_event()
  474. .expect("event")
  475. .id
  476. .to_string()
  477. );
  478. // ev3
  479. assert_eq!(
  480. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
  481. recv.try_recv()
  482. .expect("valid")
  483. .as_event()
  484. .expect("event")
  485. .id
  486. .to_string()
  487. );
  488. // ev2
  489. assert_eq!(
  490. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  491. recv.try_recv()
  492. .expect("valid")
  493. .as_event()
  494. .expect("event")
  495. .id
  496. .to_string()
  497. );
  498. // eod
  499. assert!(recv
  500. .try_recv()
  501. .expect("valid")
  502. .as_end_of_stored_events()
  503. .is_some());
  504. assert!(recv.try_recv().is_err());
  505. }
  506. #[tokio::test]
  507. async fn server_listener_real_time_single_argument() {
  508. let request: Request = serde_json::from_value(json!(
  509. [
  510. "REQ",
  511. "1298169700973717",
  512. {
  513. "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  514. "since":1681939304
  515. },
  516. {
  517. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  518. "since":1681939304
  519. },
  520. {
  521. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  522. },
  523. {
  524. "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  525. },
  526. {
  527. "#e":[
  528. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  529. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  530. ],
  531. }
  532. ]))
  533. .expect("valid object");
  534. let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
  535. let (connection, mut recv) = Connection::new_local_connection();
  536. assert_eq!(relayer.total_subscribers(), 0);
  537. let _ = relayer
  538. .process_request_from_client(&connection, request)
  539. .await;
  540. assert_eq!(relayer.total_subscribers(), 5);
  541. // eod
  542. assert!(recv
  543. .try_recv()
  544. .expect("valid")
  545. .as_end_of_stored_events()
  546. .is_some());
  547. // It is empty
  548. assert!(recv.try_recv().is_err());
  549. relayer
  550. .process_request_from_client(&connection, get_note())
  551. .await
  552. .expect("process event");
  553. sleep(Duration::from_millis(100)).await;
  554. // ok from posting
  555. let msg = recv.try_recv();
  556. assert!(msg.is_ok());
  557. assert_eq!(
  558. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  559. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  560. );
  561. // It is not empty
  562. let msg = recv.try_recv();
  563. assert!(msg.is_ok());
  564. assert_eq!(
  565. msg.expect("is ok")
  566. .as_event()
  567. .expect("valid")
  568. .subscription_id
  569. .to_string(),
  570. "1298169700973717".to_owned()
  571. );
  572. // it must be deliverd at most once
  573. assert!(recv.try_recv().is_err());
  574. assert_eq!(relayer.total_subscribers(), 5);
  575. // when client is dropped, the subscription is removed
  576. // automatically
  577. drop(connection);
  578. sleep(Duration::from_millis(10)).await;
  579. assert_eq!(relayer.total_subscribers(), 0);
  580. }
  581. #[tokio::test]
  582. async fn server_listener_real_time() {
  583. let request: Request = serde_json::from_value(json!(
  584. [
  585. "REQ",
  586. "1298169700973717",
  587. {
  588. "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  589. "since":1681939304
  590. },
  591. {
  592. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  593. "kinds":[1,3,6,7,9735],
  594. "since":1681939304
  595. },
  596. {
  597. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  598. "kinds":[4]
  599. },
  600. {
  601. "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  602. "kinds":[4]
  603. },
  604. {
  605. "#e":[
  606. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  607. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  608. ],
  609. "kinds":[1,6,7,9735]
  610. }
  611. ]))
  612. .expect("valid object");
  613. let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
  614. let (connection, mut recv) = Connection::new_local_connection();
  615. assert_eq!(relayer.total_subscribers(), 0);
  616. let _ = relayer
  617. .process_request_from_client(&connection, request)
  618. .await;
  619. assert_eq!(relayer.total_subscribers(), 5);
  620. // eod
  621. assert!(recv
  622. .try_recv()
  623. .expect("valid")
  624. .as_end_of_stored_events()
  625. .is_some());
  626. // It is empty
  627. assert!(recv.try_recv().is_err());
  628. relayer
  629. .process_request_from_client(&connection, get_note())
  630. .await
  631. .expect("process event");
  632. sleep(Duration::from_millis(100)).await;
  633. // ok from posting
  634. let msg = recv.try_recv();
  635. assert!(msg.is_ok());
  636. assert_eq!(
  637. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  638. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  639. );
  640. // It is not empty
  641. let msg = recv.try_recv();
  642. assert!(msg.is_ok());
  643. assert_eq!(
  644. msg.expect("is ok")
  645. .as_event()
  646. .expect("valid")
  647. .subscription_id
  648. .to_string(),
  649. "1298169700973717".to_owned()
  650. );
  651. // it must be deliverd at most once
  652. assert!(recv.try_recv().is_err());
  653. assert_eq!(relayer.total_subscribers(), 5);
  654. // when client is dropped, the subscription is removed
  655. // automatically
  656. drop(connection);
  657. sleep(Duration::from_millis(10)).await;
  658. assert_eq!(relayer.total_subscribers(), 0);
  659. }
  660. #[tokio::test]
  661. async fn multiple_subcribers() {
  662. let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
  663. "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb4"],
  664. }]))
  665. .expect("valid object");
  666. let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
  667. "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"]
  668. }]))
  669. .expect("valid object");
  670. let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
  671. let (publisher, mut recv) = Connection::new_local_connection();
  672. let mut set1 = (0..1000)
  673. .map(|_| Connection::new_local_connection())
  674. .collect::<Vec<_>>();
  675. let mut set2 = (0..100)
  676. .map(|_| Connection::new_local_connection())
  677. .collect::<Vec<_>>();
  678. let subscribe1 = set1
  679. .iter()
  680. .map(|(connection, _)| relayer.process_request_from_client(connection, req1.clone()))
  681. .collect::<Vec<_>>();
  682. let subscribe2 = set2
  683. .iter()
  684. .map(|(connection, _)| relayer.process_request_from_client(connection, req2.clone()))
  685. .collect::<Vec<_>>();
  686. assert_eq!(relayer.total_subscribers(), 0);
  687. join_all(subscribe1)
  688. .await
  689. .into_iter()
  690. .collect::<Result<Vec<_>, _>>()
  691. .expect("valid calls");
  692. join_all(subscribe2)
  693. .await
  694. .into_iter()
  695. .collect::<Result<Vec<_>, _>>()
  696. .expect("valid calls");
  697. for (_, recv) in set1.iter_mut() {
  698. assert!(recv
  699. .try_recv()
  700. .expect("end of stored events")
  701. .as_end_of_stored_events()
  702. .is_some());
  703. }
  704. for (_, recv) in set2.iter_mut() {
  705. assert!(recv
  706. .try_recv()
  707. .expect("end of stored events")
  708. .as_end_of_stored_events()
  709. .is_some());
  710. }
  711. assert_eq!(relayer.total_subscribers(), 1100);
  712. relayer
  713. .process_request_from_client(&publisher, get_note())
  714. .await
  715. .expect("process event");
  716. sleep(Duration::from_millis(10)).await;
  717. let msg = recv.try_recv();
  718. assert!(msg.is_ok());
  719. assert_eq!(
  720. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  721. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  722. );
  723. for (_, recv) in set1.iter_mut() {
  724. assert!(recv.try_recv().is_err());
  725. }
  726. for (_, recv) in set2.iter_mut() {
  727. let msg = recv.try_recv();
  728. println!("{:?}", msg);
  729. assert!(msg.is_ok());
  730. let msg = msg.expect("msg");
  731. assert_eq!(
  732. msg.as_event().expect("valid").subscription_id.to_string(),
  733. "1298169700973717".to_owned()
  734. );
  735. assert!(recv.try_recv().is_err());
  736. }
  737. drop(set1);
  738. sleep(Duration::from_millis(10)).await;
  739. assert_eq!(relayer.total_subscribers(), 100);
  740. drop(set2);
  741. sleep(Duration::from_millis(10)).await;
  742. assert_eq!(relayer.total_subscribers(), 0);
  743. drop(relayer);
  744. }
  745. #[tokio::test]
  746. async fn posting_event_replies_ok() {
  747. let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
  748. let (connection, mut recv) = Connection::new_local_connection();
  749. let note = get_note();
  750. let note_id = note.as_event().map(|x| x.id.clone()).unwrap();
  751. relayer
  752. .process_request_from_client(&connection, note)
  753. .await
  754. .expect("process event");
  755. sleep(Duration::from_millis(10)).await;
  756. assert_eq!(
  757. Some(
  758. ROk {
  759. id: note_id.into(),
  760. status: ROkStatus::Ok,
  761. }
  762. .into()
  763. ),
  764. recv.try_recv().ok()
  765. );
  766. }
  767. #[tokio::test]
  768. async fn subscribe_to_all() {
  769. let request: Request =
  770. serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
  771. let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
  772. let (connection, mut recv) = Connection::new_local_connection();
  773. assert_eq!(relayer.total_subscribers(), 0);
  774. let _ = relayer
  775. .process_request_from_client(&connection, request)
  776. .await;
  777. assert_eq!(relayer.total_subscribers(), 1);
  778. // eod
  779. assert!(recv
  780. .try_recv()
  781. .expect("valid")
  782. .as_end_of_stored_events()
  783. .is_some());
  784. // It is empty
  785. assert!(recv.try_recv().is_err());
  786. relayer
  787. .process_request_from_client(&connection, get_note())
  788. .await
  789. .expect("process event");
  790. sleep(Duration::from_millis(10)).await;
  791. // ok from posting
  792. let msg = recv.try_recv();
  793. assert!(msg.is_ok());
  794. assert_eq!(
  795. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  796. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  797. );
  798. // It is not empty
  799. let msg = recv.try_recv();
  800. assert!(msg.is_ok());
  801. assert_eq!(
  802. msg.expect("is ok")
  803. .as_event()
  804. .expect("valid")
  805. .subscription_id
  806. .to_string(),
  807. "1298169700973717".to_owned()
  808. );
  809. // it must be deliverd at most once
  810. assert!(recv.try_recv().is_err());
  811. assert_eq!(relayer.total_subscribers(), 1);
  812. // when client is dropped, the subscription is removed
  813. // automatically
  814. drop(connection);
  815. sleep(Duration::from_millis(10)).await;
  816. assert_eq!(relayer.total_subscribers(), 0);
  817. }
  818. #[tokio::test]
  819. async fn relayer_posts_to_custom_posts_to_all_clients() {
  820. let (relayer1, _) = dummy_server(0, None).await;
  821. let (relayer2, _) = dummy_server(0, None).await;
  822. let (relayer3, _) = dummy_server(0, None).await;
  823. let (main_relayer, _) = dummy_server(
  824. 0,
  825. Some(Pool::new_with_clients(vec![
  826. relayer1.clone(),
  827. relayer2.clone(),
  828. relayer3.clone(),
  829. ])),
  830. )
  831. .await;
  832. let mut reader_client =
  833. Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()]);
  834. let main_client = Pool::new_with_clients(vec![main_relayer]);
  835. let _sub = reader_client
  836. .subscribe(Default::default())
  837. .await
  838. .expect("valid subscription");
  839. sleep(Duration::from_millis(20)).await;
  840. for _ in 0..3 {
  841. assert!(reader_client
  842. .try_recv()
  843. .map(|(r, _)| r)
  844. .expect("valid message")
  845. .as_end_of_stored_events()
  846. .is_some());
  847. }
  848. assert!(reader_client.try_recv().is_none());
  849. let account1 = Account::default();
  850. let signed_content = account1
  851. .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
  852. .expect("valid signed content");
  853. // account1 posts a new note into the relayer1, and the main relayer
  854. // should get a copy of it, as well as it is connected to relayer2 and
  855. // relayer1.
  856. main_client.post(signed_content.clone().into()).await;
  857. sleep(Duration::from_millis(10)).await;
  858. let responses = (0..3)
  859. .map(|_| reader_client.try_recv().expect("valid message"))
  860. .filter_map(|(r, url)| {
  861. r.as_event()
  862. .map(|r| (url.port().expect("port"), r.to_owned()))
  863. })
  864. .collect::<HashMap<_, _>>();
  865. assert!(reader_client.try_recv().is_none());
  866. assert_eq!(responses.len(), 3);
  867. assert_eq!(
  868. responses
  869. .get(&relayer1.port().expect("port"))
  870. .map(|x| x.id.clone()),
  871. Some(signed_content.id.clone())
  872. );
  873. assert_eq!(
  874. responses
  875. .get(&relayer2.port().expect("port"))
  876. .map(|x| x.id.clone()),
  877. Some(signed_content.id.clone())
  878. );
  879. assert_eq!(
  880. responses
  881. .get(&relayer3.port().expect("port"))
  882. .map(|x| x.id.clone()),
  883. Some(signed_content.id)
  884. );
  885. }
  886. #[tokio::test]
  887. async fn relayer_with_client_pool() {
  888. let (relayer1, _) = dummy_server(0, None).await;
  889. let (relayer2, _) = dummy_server(0, None).await;
  890. let (main_relayer, _) = dummy_server(
  891. 0,
  892. Some(Pool::new_with_clients(vec![relayer1.clone(), relayer2])),
  893. )
  894. .await;
  895. let secondary_client = Pool::new_with_clients(vec![relayer1]);
  896. // Create a subscription in the main relayer, main_client is only
  897. // connected to the main relayer
  898. let mut main_client = Pool::new_with_clients(vec![main_relayer]);
  899. let _sub = main_client
  900. .subscribe(Default::default())
  901. .await
  902. .expect("valid subscription");
  903. sleep(Duration::from_millis(10)).await;
  904. assert!(main_client
  905. .try_recv()
  906. .map(|(r, _)| r)
  907. .expect("valid message")
  908. .as_end_of_stored_events()
  909. .is_some());
  910. assert!(main_client.try_recv().is_none());
  911. let account1 = Account::default();
  912. let signed_content = account1
  913. .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None)
  914. .expect("valid signed content");
  915. // account1 posts a new note into the relayer1, and the main relayer
  916. // should get a copy of it, as well as it is connected to relayer2 and
  917. // relayer1.
  918. secondary_client.post(signed_content.clone().into()).await;
  919. // wait for the note to be delivered
  920. sleep(Duration::from_millis(10)).await;
  921. assert_eq!(
  922. Some((signed_content.id, signed_content.signature)),
  923. main_client
  924. .try_recv()
  925. .and_then(|(r, _)| r.as_event().cloned())
  926. .map(|x| (x.id.clone(), x.signature.clone()))
  927. );
  928. assert!(main_client.try_recv().is_none());
  929. }
  930. }