relayer.rs 41 KB


  1. use crate::{
  2. connection::{ConnectionId, LocalConnection},
  3. Connection, Error,
  4. };
  5. use futures_util::StreamExt;
  6. use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url};
  7. use nostr_rs_storage_base::Storage;
  8. use nostr_rs_types::{
  9. relayer::{self, ROk, ROkStatus},
  10. types::{Addr, Event, SubscriptionId},
  11. Request, Response,
  12. };
  13. use std::{
  14. collections::{HashMap, HashSet},
  15. ops::Deref,
  16. sync::Arc,
  17. time::Instant,
  18. };
  19. use tokio::{
  20. net::{TcpListener, TcpStream},
  21. sync::{
  22. mpsc::{
  23. self, {channel, Receiver, Sender},
  24. },
  25. RwLock,
  26. },
  27. task::JoinHandle,
  28. };
  29. #[derive(Debug, Hash, Ord, PartialEq, PartialOrd, Eq, Clone)]
  30. pub struct RelayerSubscriptionId((SubscriptionId, ConnectionId));
  31. impl From<(SubscriptionId, ConnectionId)> for RelayerSubscriptionId {
  32. fn from(value: (SubscriptionId, ConnectionId)) -> Self {
  33. Self(value)
  34. }
  35. }
  36. impl Default for RelayerSubscriptionId {
  37. fn default() -> Self {
  38. Self((SubscriptionId::empty(), ConnectionId::new_empty()))
  39. }
  40. }
  41. type Connections = Arc<RwLock<HashMap<ConnectionId, Connection>>>;
  42. type SubscriptionManager =
  43. Arc<nostr_rs_subscription_manager::SubscriptionManager<RelayerSubscriptionId, ()>>;
  44. type ClientPoolSubscriptions =
  45. Arc<RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>>;
  46. /// Relayer struct
  47. ///
  48. pub struct Relayer<T: Storage + Send + Sync + 'static> {
  49. /// Storage engine, if provided the services are going to persisted in disk,
  50. /// otherwise all the messages are going to be ephemeral, making this
  51. /// relayer just a dumb proxy (that can be useful for privacy) but it won't
  52. /// be able to perform any optimization like prefetching content while offline
  53. storage: Arc<Option<T>>,
  54. /// Subscription manager
  55. subscription_manager: SubscriptionManager,
  56. /// List of all active connections
  57. connections: Connections,
  58. /// This Sender can be used to send requests from anywhere to the relayer.
  59. send_to_relayer: Sender<(ConnectionId, Request)>,
  60. /// This Receiver is the relayer the way the relayer receives messages
  61. relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
  62. /// Client pool
  63. ///
  64. /// A relayer can optionally be connected to a pool of clients to get
  65. /// foreign events.
  66. client_pool: Option<Arc<Pool>>,
  67. client_pool_receiver: Option<Receiver<(Response, Url)>>,
  68. client_pool_subscriptions: ClientPoolSubscriptions,
  69. }
  70. impl<T: Storage + Send + Sync + 'static> Relayer<T> {
  71. /// Creates a new relayer instance
  72. ///
  73. /// If the storage is given, it will be used to persist events, as well to
  74. /// server past events when a new subscription is added.
  75. ///
  76. /// If the client_pool is given it will be used to connect to those relayers
  77. /// and create a network of relayers, reposting events to them and
  78. /// subscribing to their events.`gqq`
  79. pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
  80. let (relayer_sender, relayer_receiver) = channel(100_000);
  81. let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool {
  82. let result = client_pool.split()?;
  83. (result.0, Some(Arc::new(result.1)))
  84. } else {
  85. let (_, receiver) = mpsc::channel(1);
  86. (receiver, None)
  87. };
  88. Ok(Self {
  89. storage: Arc::new(storage),
  90. subscription_manager: Default::default(),
  91. send_to_relayer: relayer_sender,
  92. relayer_receiver: Some(relayer_receiver),
  93. connections: Default::default(),
  94. client_pool_receiver: Some(client_pool_receiver),
  95. client_pool,
  96. client_pool_subscriptions: Default::default(),
  97. })
  98. }
  99. /// Connects to the relayer pool
  100. pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
  101. let _ = self
  102. .client_pool
  103. .as_ref()
  104. .ok_or(Error::NoClient)?
  105. .connect_to(url)
  106. .await?;
  107. Ok(())
  108. }
  109. /// Total number of subscribers requests that actively listening for new events
  110. pub fn total_subscribers(&self) -> usize {
  111. self.subscription_manager.total_subscribers()
  112. }
  113. /// Splits the relayer object and extract their receiver.
  114. pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> {
  115. let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?;
  116. Ok((self, receiver))
  117. }
  118. /// Runs the relayer main loop in a tokio task and returns it.
  119. ///
  120. /// This function consumes the object and takes the ownership. The returned
  121. /// JoinHandle() can be used to stop the main loop
  122. pub fn main(mut self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
  123. let mut client_pool_receiver = self
  124. .client_pool_receiver
  125. .take()
  126. .ok_or(Error::AlreadySplitted)?;
  127. let (this, mut receiver) = self.split()?;
  128. let _self = Arc::new(this);
  129. let this = _self.clone();
  130. let handle = tokio::spawn(async move {
  131. loop {
  132. let start = Instant::now();
  133. println!("{}", client_pool_receiver.len());
  134. tokio::select! {
  135. Ok((stream, _)) = server.accept() => {
  136. // accept new connections
  137. let _ = this.add_connection(None, stream).await;
  138. },
  139. Some((response, _)) = client_pool_receiver.recv() => {
  140. // process messages from anothe relayer, broadcast it and store it
  141. match response {
  142. Response::Event(event) => {
  143. // we received a message from the client pool, store it locally
  144. // and re-broadcast it.
  145. tokio::spawn(Self::broadcast(
  146. this.storage.clone(),
  147. this.subscription_manager.clone(),
  148. this.connections.clone(),
  149. event.event
  150. ));
  151. }
  152. Response::EndOfStoredEvents(sub) => {
  153. let connections = this.connections.read().await;
  154. let (sub_id, connection) = if let Some((sub_id, conn_id)) = this.client_pool_subscriptions.write().await.remove(&(sub.deref().into())) {
  155. if let Some(connection) = connections.get(&conn_id) {
  156. (sub_id, connection)
  157. } else {
  158. continue;
  159. }
  160. } else {
  161. continue
  162. };
  163. let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into()));
  164. let duration = start.elapsed();
  165. println!("xTime elapsed: {} ms", duration.as_millis());
  166. }
  167. _ => {}
  168. }
  169. }
  170. Some((conn_id, request)) = receiver.recv() => {
  171. tokio::spawn(Self::process_request(
  172. this.storage.clone(),
  173. this.client_pool.clone(),
  174. this.client_pool_subscriptions.clone(),
  175. this.subscription_manager.clone(),
  176. this.connections.clone(),
  177. conn_id,
  178. request.clone()
  179. ));
  180. }
  181. else => {
  182. }
  183. }
  184. }
  185. });
  186. Ok((_self, handle))
  187. }
  188. /// Returns a reference to the internal database
  189. pub fn get_db(&self) -> &Option<T> {
  190. &self.storage
  191. }
  192. /// Adds a new local connection to the list of active connections.
  193. pub async fn create_new_local_connection(self: &Arc<Self>) -> LocalConnection<T> {
  194. let (conn, receiver) = Connection::new_local_connection();
  195. let conn_id = conn.get_conn_id();
  196. self.connections.write().await.insert(conn_id, conn);
  197. (
  198. conn_id,
  199. receiver,
  200. self.send_to_relayer.clone(),
  201. self.clone(),
  202. )
  203. .into()
  204. }
  205. /// Drops a connection from the list of active connections
  206. ///
  207. /// This function only works for local connections, normal connections can
  208. /// be dropped on disconnection.
  209. ///
  210. /// This function could change in the future tu kick connections programmatically
  211. pub fn drop_connection(self: &Arc<Self>, local_connection: &LocalConnection<T>) {
  212. let id = local_connection.conn_id;
  213. let this = self.clone();
  214. tokio::spawn(async move {
  215. this.connections.write().await.remove(&id);
  216. });
  217. }
  218. /// Adds a new TpStream and adds it to the list of active connections.
  219. ///
  220. /// This function will spawn the client's loop to receive incoming messages and send those messages
  221. pub async fn add_connection(
  222. &self,
  223. disconnection_notify: Option<mpsc::Sender<ConnectionId>>,
  224. stream: TcpStream,
  225. ) -> Result<ConnectionId, Error> {
  226. let conn =
  227. Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
  228. .await?;
  229. let id = conn.get_conn_id();
  230. self.connections.write().await.insert(id, conn);
  231. Ok(id)
  232. }
  233. #[cfg(test)]
  234. async fn process_request_from_client(
  235. &self,
  236. connection: &LocalConnection<T>,
  237. request: Request,
  238. ) -> Result<(), Error> {
  239. Self::process_request(
  240. self.storage.clone(),
  241. self.client_pool.clone(),
  242. self.client_pool_subscriptions.clone(),
  243. self.subscription_manager.clone(),
  244. self.connections.clone(),
  245. connection.conn_id,
  246. request,
  247. )
  248. .await
  249. }
  250. /// Process a request from a connected client
  251. async fn process_request(
  252. storage: Arc<Option<T>>,
  253. client_pool: Option<Arc<Pool>>,
  254. client_pool_subscriptions: ClientPoolSubscriptions,
  255. subscription_manager: SubscriptionManager,
  256. connections: Connections,
  257. connection_id: ConnectionId,
  258. request: Request,
  259. ) -> Result<(), Error> {
  260. match request {
  261. Request::Event(event) => {
  262. let read_connections = connections.read().await;
  263. let connection = read_connections
  264. .get(&connection_id)
  265. .ok_or(Error::UnknownConnection(connection_id))?;
  266. let event_id: Addr = event.id.clone().into();
  267. if !Self::broadcast(
  268. storage.clone(),
  269. subscription_manager.clone(),
  270. connections.clone(),
  271. event.deref().clone(),
  272. )
  273. .await?
  274. {
  275. connection.respond(
  276. ROk {
  277. id: event_id,
  278. status: ROkStatus::Duplicate,
  279. }
  280. .into(),
  281. )?;
  282. return Ok(());
  283. }
  284. if let Some(storage) = storage.as_ref() {
  285. let _ = storage.store_local_event(&event).await;
  286. }
  287. if let Some(client_pool) = client_pool.as_ref() {
  288. // pass the event to the pool of clients, so this relayer can relay
  289. // their local events to the clients in the network of relayers
  290. let _ = client_pool.post(event).await;
  291. }
  292. connection.respond(
  293. ROk {
  294. id: event_id,
  295. status: ROkStatus::Ok,
  296. }
  297. .into(),
  298. )?;
  299. }
  300. Request::Request(request) => {
  301. let foreign_subscription = if let Some(client_pool) = client_pool.as_ref() {
  302. // If this relay is connected to other relays through the
  303. // client pool, create the same subscription in them as
  304. // well, with the main goal of fetching any foreign event
  305. // that matches the requested subscription.
  306. //
  307. // If the this happens, this relay will serve any local
  308. // event that matches, as well any foreign event. Foreign
  309. // events will be stored locally as well if there is a
  310. // storage setup.
  311. let foreign_sub_id = client_pool
  312. .subscribe(request.filters.clone().into())
  313. .await?;
  314. client_pool_subscriptions.write().await.insert(
  315. foreign_sub_id.clone(),
  316. (request.subscription_id.clone(), connection_id),
  317. );
  318. Some(foreign_sub_id)
  319. } else {
  320. None
  321. };
  322. let read_connections = connections.read().await;
  323. let connection = read_connections
  324. .get(&connection_id)
  325. .ok_or(Error::UnknownConnection(connection_id))?;
  326. if let Some(storage) = storage.as_ref() {
  327. let mut sent = HashSet::new();
  328. // Sent all events that match the filter that are stored in our database
  329. for filter in request.filters.clone().into_iter() {
  330. let mut result = storage.get_by_filter(filter).await?;
  331. while let Some(Ok(event)) = result.next().await {
  332. if sent.contains(&event.id) {
  333. continue;
  334. }
  335. sent.insert(event.id.clone());
  336. let _ = connection.respond(
  337. relayer::Event {
  338. subscription_id: request.subscription_id.clone(),
  339. event,
  340. }
  341. .into(),
  342. );
  343. }
  344. }
  345. }
  346. if foreign_subscription.is_none() {
  347. // If there is a foreign subscription, we shouldn't send a
  348. // EOS until we have got EOS from all foreign relays
  349. let _ = connection.respond(
  350. relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
  351. );
  352. }
  353. connection
  354. .subscribe(
  355. request.subscription_id.clone(),
  356. (
  357. foreign_subscription,
  358. subscription_manager
  359. .subscribe(
  360. (request.subscription_id, connection.get_conn_id()).into(),
  361. request.filters,
  362. (),
  363. )
  364. .await,
  365. ),
  366. )
  367. .await;
  368. }
  369. Request::Close(close) => {
  370. connections
  371. .read()
  372. .await
  373. .get(&connection_id)
  374. .ok_or(Error::UnknownConnection(connection_id))?
  375. .unsubscribe(&close)
  376. .await;
  377. }
  378. };
  379. Ok(())
  380. }
  381. #[inline]
  382. /// A non-blocking version of broadcast
  383. pub async fn broadcast(
  384. storage: Arc<Option<T>>,
  385. subscription_manager: SubscriptionManager,
  386. connections: Connections,
  387. event: Event,
  388. ) -> Result<bool, Error> {
  389. if let Some(storage) = storage.as_ref() {
  390. if !storage.store(&event).await? {
  391. return Ok(false);
  392. }
  393. }
  394. let connections = connections.read().await;
  395. let mut sent = HashSet::new();
  396. for RelayerSubscriptionId((sub_id, conn_id)) in
  397. subscription_manager.get_subscribers(&event).await
  398. {
  399. if sent.contains(&conn_id) {
  400. continue;
  401. }
  402. sent.insert(conn_id);
  403. if let Some(connection) = connections.get(&conn_id) {
  404. let _ = connection.respond(
  405. relayer::Event {
  406. subscription_id: sub_id,
  407. event: event.clone(),
  408. }
  409. .into(),
  410. );
  411. }
  412. }
  413. Ok(true)
  414. }
  415. }
  416. #[cfg(test)]
  417. mod test {
  418. use super::*;
  419. use futures::future::join_all;
  420. use nostr_rs_client::Url;
  421. use nostr_rs_memory::Memory;
  422. use nostr_rs_types::{
  423. account::Account,
  424. types::{Content, Tag},
  425. Request,
  426. };
  427. use serde_json::json;
  428. use std::time::Duration;
  429. use tokio::time::sleep;
  430. async fn dummy_server(port: u16, client_pool: Option<Pool>) -> (Url, JoinHandle<()>) {
  431. let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
  432. .await
  433. .unwrap();
  434. let local_addr = listener.local_addr().expect("addr");
  435. let relayer =
  436. Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
  437. let (_, stopper) = relayer.main(listener).expect("valid main loop");
  438. (
  439. Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
  440. stopper,
  441. )
  442. }
  443. async fn dummy_server_with_relayer(
  444. client_pool: Option<Pool>,
  445. ) -> (Arc<Relayer<Memory>>, JoinHandle<()>) {
  446. let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).await.unwrap();
  447. let relayer =
  448. Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
  449. let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
  450. (relayer, stopper)
  451. }
  452. fn get_note_with_custom_tags(tags: serde_json::Value) -> Event {
  453. let account = Account::default();
  454. let content = Content::ShortTextNote("".to_owned());
  455. let tags: Vec<Tag> = serde_json::from_value(tags).expect("valid tags");
  456. account.sign_content(tags, content, None).expect("valid")
  457. }
  458. fn get_note() -> Request {
  459. serde_json::from_value(json!(
  460. [
  461. "EVENT",
  462. {
  463. "kind":1,
  464. "content":"Pong",
  465. "tags":[
  466. ["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],
  467. ["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],
  468. ["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  469. ["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]
  470. ],
  471. "created_at":1681938616,
  472. "pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3",
  473. "id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
  474. "sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc",
  475. }
  476. ])).expect("value")
  477. }
  478. async fn get_db(prefill: bool) -> Memory {
  479. let db = Memory::default();
  480. if prefill {
  481. let events = include_str!("../tests/events.json")
  482. .lines()
  483. .map(|line| serde_json::from_str(line).expect("valid"))
  484. .collect::<Vec<Event>>();
  485. for event in events {
  486. assert!(db.store(&event).await.expect("valid"));
  487. }
  488. while db.is_flushing() {
  489. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
  490. }
  491. }
  492. db
  493. }
  494. #[tokio::test]
  495. async fn serve_listener_from_local_db_custom_tag() {
  496. let request = serde_json::from_value(json!([
  497. "REQ",
  498. "1298169700973717",
  499. {
  500. "#f": [
  501. "foo",
  502. ],
  503. },
  504. ]))
  505. .expect("valid object");
  506. let relayer =
  507. Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
  508. let mut connection = relayer.create_new_local_connection().await;
  509. let note = get_note_with_custom_tags(json!([["f", "foo"]]));
  510. let _ = relayer
  511. .process_request_from_client(&connection, note.clone().into())
  512. .await;
  513. sleep(Duration::from_millis(10)).await;
  514. let _ = relayer
  515. .process_request_from_client(&connection, request)
  516. .await;
  517. // ev1
  518. assert_eq!(
  519. ROkStatus::Ok,
  520. connection
  521. .try_recv()
  522. .expect("valid")
  523. .as_ok()
  524. .cloned()
  525. .unwrap()
  526. .status,
  527. );
  528. // ev1
  529. assert_eq!(
  530. note,
  531. connection
  532. .try_recv()
  533. .expect("valid")
  534. .as_event()
  535. .unwrap()
  536. .event
  537. );
  538. // eod
  539. assert!(connection
  540. .try_recv()
  541. .expect("valid")
  542. .as_end_of_stored_events()
  543. .is_some());
  544. assert!(connection.try_recv().is_none());
  545. }
  546. #[tokio::test]
  547. async fn serve_listener_from_local_db() {
  548. let request = serde_json::from_value(json!([
  549. "REQ",
  550. "1298169700973717",
  551. {
  552. "authors": [
  553. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  554. ],
  555. "until": 1681928304
  556. },
  557. {
  558. "#p": [
  559. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  560. ],
  561. "kinds": [
  562. 1,
  563. 3,
  564. 6,
  565. 7,
  566. 9735
  567. ],
  568. "until": 1681928304
  569. },
  570. {
  571. "#p": [
  572. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  573. ],
  574. "kinds": [
  575. 4
  576. ]
  577. },
  578. {
  579. "authors": [
  580. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  581. ],
  582. "kinds": [
  583. 4
  584. ]
  585. },
  586. {
  587. "#e": [
  588. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  589. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  590. ],
  591. "kinds": [
  592. 1,
  593. 6,
  594. 7,
  595. 9735
  596. ]
  597. }
  598. ]))
  599. .expect("valid object");
  600. let relayer =
  601. Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
  602. let mut connection = relayer.create_new_local_connection().await;
  603. let _ = relayer
  604. .process_request_from_client(&connection, request)
  605. .await;
  606. // ev1
  607. assert_eq!(
  608. "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
  609. connection
  610. .try_recv()
  611. .expect("valid")
  612. .as_event()
  613. .expect("event")
  614. .id
  615. .to_string()
  616. );
  617. // ev3
  618. assert_eq!(
  619. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
  620. connection
  621. .try_recv()
  622. .expect("valid")
  623. .as_event()
  624. .expect("event")
  625. .id
  626. .to_string()
  627. );
  628. // ev2
  629. assert_eq!(
  630. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  631. connection
  632. .try_recv()
  633. .expect("valid")
  634. .as_event()
  635. .expect("event")
  636. .id
  637. .to_string()
  638. );
  639. // eod
  640. assert!(connection
  641. .try_recv()
  642. .expect("valid")
  643. .as_end_of_stored_events()
  644. .is_some());
  645. assert!(connection.try_recv().is_none());
  646. }
  647. #[tokio::test]
  648. async fn server_listener_real_time_single_argument() {
  649. let request: Request = serde_json::from_value(json!(
  650. [
  651. "REQ",
  652. "1298169700973717",
  653. {
  654. "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  655. "since":1681939304
  656. },
  657. {
  658. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  659. "since":1681939304
  660. },
  661. {
  662. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  663. },
  664. {
  665. "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  666. },
  667. {
  668. "#e":[
  669. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  670. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  671. ],
  672. }
  673. ]))
  674. .expect("valid object");
  675. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  676. let mut receiver = relayer.create_new_local_connection().await;
  677. let mut publisher = relayer.create_new_local_connection().await;
  678. assert_eq!(relayer.total_subscribers(), 0);
  679. receiver.send(request).await.expect("subscribe");
  680. sleep(Duration::from_millis(10)).await;
  681. assert_eq!(relayer.total_subscribers(), 1);
  682. // eod
  683. assert!(receiver
  684. .try_recv()
  685. .expect("valid")
  686. .as_end_of_stored_events()
  687. .is_some());
  688. // It is empty
  689. assert!(receiver.try_recv().is_none());
  690. publisher.send(get_note()).await.expect("valid send");
  691. sleep(Duration::from_millis(10)).await;
  692. // ok from posting
  693. let msg = publisher.try_recv();
  694. assert!(msg.is_some());
  695. assert_eq!(
  696. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  697. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  698. );
  699. // It is not empty
  700. let msg = receiver.try_recv();
  701. assert!(msg.is_some());
  702. assert_eq!(
  703. msg.expect("is ok")
  704. .as_event()
  705. .expect("valid")
  706. .subscription_id
  707. .to_string(),
  708. "1298169700973717".to_owned()
  709. );
  710. // it must be deliverd at most once
  711. assert!(receiver.try_recv().is_none());
  712. assert_eq!(relayer.total_subscribers(), 1);
  713. // when client is dropped, the subscription is removed
  714. // automatically
  715. drop(receiver);
  716. sleep(Duration::from_millis(10)).await;
  717. assert_eq!(relayer.total_subscribers(), 0);
  718. }
  719. #[tokio::test]
  720. async fn server_listener_real_time() {
  721. let request: Request = serde_json::from_value(json!(
  722. [
  723. "REQ",
  724. "1298169700973717",
  725. {
  726. "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  727. "since":1681939304
  728. },
  729. {
  730. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  731. "kinds":[1,3,6,7,9735],
  732. "since":1681939304
  733. },
  734. {
  735. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  736. "kinds":[4]
  737. },
  738. {
  739. "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  740. "kinds":[4]
  741. },
  742. {
  743. "#e":[
  744. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  745. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  746. ],
  747. "kinds":[1,6,7,9735]
  748. }
  749. ]))
  750. .expect("valid object");
  751. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  752. let mut receiver = relayer.create_new_local_connection().await;
  753. let mut publisher = relayer.create_new_local_connection().await;
  754. assert_eq!(relayer.total_subscribers(), 0);
  755. receiver.send(request).await.expect("subscribe");
  756. sleep(Duration::from_millis(10)).await;
  757. assert_eq!(relayer.total_subscribers(), 1);
  758. // eod
  759. assert!(receiver
  760. .try_recv()
  761. .expect("valid")
  762. .as_end_of_stored_events()
  763. .is_some());
  764. // It is empty
  765. assert!(receiver.try_recv().is_none());
  766. publisher.send(get_note()).await.expect("valid send");
  767. sleep(Duration::from_millis(100)).await;
  768. // ok from posting
  769. let msg = publisher.try_recv();
  770. assert!(msg.is_some());
  771. assert_eq!(
  772. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  773. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  774. );
  775. // It is not empty
  776. let msg = receiver.try_recv();
  777. assert!(msg.is_some());
  778. assert_eq!(
  779. msg.expect("is ok")
  780. .as_event()
  781. .expect("valid")
  782. .subscription_id
  783. .to_string(),
  784. "1298169700973717".to_owned()
  785. );
  786. // it must be deliverd at most once
  787. assert!(receiver.try_recv().is_none());
  788. assert_eq!(relayer.total_subscribers(), 1);
  789. // when client is dropped, the subscription is removed
  790. // automatically
  791. drop(receiver);
  792. sleep(Duration::from_millis(10)).await;
  793. assert_eq!(relayer.total_subscribers(), 0);
  794. }
  795. #[tokio::test]
  796. async fn multiple_subcribers() {
  797. let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
  798. "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb4"],
  799. }]))
  800. .expect("valid object");
  801. let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
  802. "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"]
  803. }]))
  804. .expect("valid object");
  805. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  806. let mut publisher = relayer.create_new_local_connection().await;
  807. let mut set1 = join_all(
  808. (0..1000)
  809. .map(|_| relayer.create_new_local_connection())
  810. .collect::<Vec<_>>(),
  811. )
  812. .await;
  813. let mut set2 = join_all(
  814. (0..100)
  815. .map(|_| relayer.create_new_local_connection())
  816. .collect::<Vec<_>>(),
  817. )
  818. .await;
  819. assert_eq!(relayer.total_subscribers(), 0);
  820. join_all(
  821. set1.iter()
  822. .map(|connection| connection.send(req1.clone()))
  823. .collect::<Vec<_>>(),
  824. )
  825. .await
  826. .into_iter()
  827. .collect::<Result<Vec<_>, _>>()
  828. .expect("subscribe all");
  829. join_all(
  830. set2.iter()
  831. .map(|connection| connection.send(req2.clone()))
  832. .collect::<Vec<_>>(),
  833. )
  834. .await
  835. .into_iter()
  836. .collect::<Result<Vec<_>, _>>()
  837. .expect("subscribe all");
  838. sleep(Duration::from_millis(10)).await;
  839. for connection in set1.iter_mut() {
  840. assert!(connection
  841. .try_recv()
  842. .expect("end of stored events")
  843. .as_end_of_stored_events()
  844. .is_some());
  845. }
  846. for connection in set2.iter_mut() {
  847. assert!(connection
  848. .try_recv()
  849. .expect("end of stored events")
  850. .as_end_of_stored_events()
  851. .is_some());
  852. }
  853. assert_eq!(relayer.total_subscribers(), 1100);
  854. publisher.send(get_note()).await.expect("valid send");
  855. sleep(Duration::from_millis(10)).await;
  856. let msg = publisher.try_recv();
  857. assert!(msg.is_some());
  858. assert_eq!(
  859. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  860. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  861. );
  862. for connection in set1.iter_mut() {
  863. assert!(connection.try_recv().is_none());
  864. }
  865. for connection in set2.iter_mut() {
  866. let msg = connection.try_recv();
  867. assert!(msg.is_some());
  868. let msg = msg.expect("msg");
  869. assert_eq!(
  870. msg.as_event().expect("valid").subscription_id.to_string(),
  871. "1298169700973717".to_owned()
  872. );
  873. assert!(connection.try_recv().is_none());
  874. }
  875. drop(set1);
  876. sleep(Duration::from_millis(10)).await;
  877. assert_eq!(relayer.total_subscribers(), 100);
  878. drop(set2);
  879. sleep(Duration::from_millis(10)).await;
  880. assert_eq!(relayer.total_subscribers(), 0);
  881. drop(relayer);
  882. }
  883. #[tokio::test]
  884. async fn posting_event_replies_ok() {
  885. let relayer =
  886. Arc::new(Relayer::new(Some(get_db(false).await), None).expect("valid relayer"));
  887. let mut connection = relayer.create_new_local_connection().await;
  888. let note = get_note();
  889. let note_id = note.as_event().map(|x| x.id.clone()).unwrap();
  890. relayer
  891. .process_request_from_client(&connection, note)
  892. .await
  893. .expect("process event");
  894. sleep(Duration::from_millis(10)).await;
  895. assert_eq!(
  896. Some(
  897. ROk {
  898. id: note_id.into(),
  899. status: ROkStatus::Ok,
  900. }
  901. .into()
  902. ),
  903. connection.try_recv()
  904. );
  905. }
  906. #[tokio::test]
  907. async fn subscribe_to_all() {
  908. let request: Request =
  909. serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
  910. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  911. let mut local_connection_0 = relayer.create_new_local_connection().await;
  912. let mut local_connection_1 = relayer.create_new_local_connection().await;
  913. assert_eq!(relayer.total_subscribers(), 0);
  914. local_connection_1.send(request).await.expect("valid send");
  915. sleep(Duration::from_millis(10)).await;
  916. assert_eq!(relayer.total_subscribers(), 1);
  917. // eod
  918. assert!(local_connection_1
  919. .try_recv()
  920. .expect("valid")
  921. .as_end_of_stored_events()
  922. .is_some());
  923. // It is empty
  924. assert!(local_connection_1.try_recv().is_none());
  925. local_connection_0
  926. .send(get_note())
  927. .await
  928. .expect("valid send");
  929. sleep(Duration::from_millis(10)).await;
  930. // ok from posting
  931. let msg = local_connection_0.try_recv();
  932. assert!(msg.is_some());
  933. assert_eq!(
  934. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  935. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  936. );
  937. // It is not empty
  938. let msg = local_connection_1.try_recv();
  939. assert!(msg.is_some());
  940. assert_eq!(
  941. msg.expect("is ok")
  942. .as_event()
  943. .expect("valid")
  944. .subscription_id
  945. .to_string(),
  946. "1298169700973717".to_owned()
  947. );
  948. // it must be deliverd at most once
  949. assert!(local_connection_1.try_recv().is_none());
  950. assert_eq!(relayer.total_subscribers(), 1);
  951. // when client is dropped, the subscription is removed
  952. // automatically
  953. drop(local_connection_1);
  954. sleep(Duration::from_millis(10)).await;
  955. assert_eq!(relayer.total_subscribers(), 0);
  956. }
  957. #[tokio::test]
  958. async fn relayer_posts_to_custom_posts_to_all_clients() {
  959. let (relayer1, _) = dummy_server(0, None).await;
  960. let (relayer2, _) = dummy_server(0, None).await;
  961. let (relayer3, _) = dummy_server(0, None).await;
  962. let (pool, _in_scope) =
  963. Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
  964. .expect("valid pool");
  965. let (main_relayer, _) = dummy_server(0, Some(pool)).await;
  966. let (mut reader_client, _reader_client_inscope) =
  967. Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
  968. .expect("valid pool");
  969. let (main_client, _main_client_inscope) =
  970. Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
  971. let _sub = reader_client
  972. .subscribe(Default::default())
  973. .await
  974. .expect("v");
  975. sleep(Duration::from_millis(20)).await;
  976. assert!(reader_client
  977. .try_recv()
  978. .map(|(r, _)| r)
  979. .expect("valid message: step")
  980. .as_end_of_stored_events()
  981. .is_some());
  982. assert!(reader_client.try_recv().is_none());
  983. let account1 = Account::default();
  984. let signed_content = account1
  985. .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
  986. .expect("valid signed content");
  987. // account1 posts a new note into the relayer1, and the main relayer
  988. // should get a copy of it, as well as it is connected to relayer2 and
  989. // relayer1.
  990. main_client.post(signed_content.clone().into()).await;
  991. sleep(Duration::from_millis(10)).await;
  992. let responses = (0..3)
  993. .map(|_| reader_client.try_recv().expect("valid message"))
  994. .filter_map(|(r, url)| {
  995. r.as_event()
  996. .map(|r| (url.port().expect("port"), r.to_owned()))
  997. })
  998. .collect::<HashMap<_, _>>();
  999. assert!(reader_client.try_recv().is_none());
  1000. assert_eq!(responses.len(), 3);
  1001. assert_eq!(
  1002. responses
  1003. .get(&relayer1.port().expect("port"))
  1004. .map(|x| x.id.clone()),
  1005. Some(signed_content.id.clone())
  1006. );
  1007. assert_eq!(
  1008. responses
  1009. .get(&relayer2.port().expect("port"))
  1010. .map(|x| x.id.clone()),
  1011. Some(signed_content.id.clone())
  1012. );
  1013. assert_eq!(
  1014. responses
  1015. .get(&relayer3.port().expect("port"))
  1016. .map(|x| x.id.clone()),
  1017. Some(signed_content.id)
  1018. );
  1019. }
  1020. #[tokio::test]
  1021. async fn relayer_with_client_pool() {
  1022. let (relayer1, _) = dummy_server(0, None).await;
  1023. let (relayer2, _) = dummy_server(0, None).await;
  1024. let (pool, _in_scope) =
  1025. Pool::new_with_clients(vec![relayer1.clone(), relayer2]).expect("valid pool");
  1026. let (main_relayer, _) = dummy_server(0, Some(pool)).await;
  1027. let (secondary_client, _sc) = Pool::new_with_clients(vec![relayer1]).expect("valid client");
  1028. // Create a subscription in the main relayer, main_client is only
  1029. // connected to the main relayer
  1030. let (mut main_client, _in_scope) =
  1031. Pool::new_with_clients(vec![main_relayer]).expect("valid client");
  1032. let _sub = main_client.subscribe(Default::default()).await.expect("v");
  1033. sleep(Duration::from_millis(10)).await;
  1034. assert!(main_client
  1035. .try_recv()
  1036. .map(|(r, _)| r)
  1037. .expect("valid message")
  1038. .as_end_of_stored_events()
  1039. .is_some());
  1040. assert!(main_client.try_recv().is_none());
  1041. let account1 = Account::default();
  1042. let signed_content = account1
  1043. .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None)
  1044. .expect("valid signed content");
  1045. // account1 posts a new note into the relayer1, and the main relayer
  1046. // should get a copy of it, as well as it is connected to relayer2 and
  1047. // relayer1.
  1048. secondary_client.post(signed_content.clone().into()).await;
  1049. // wait for the note to be delivered
  1050. sleep(Duration::from_millis(10)).await;
  1051. assert_eq!(
  1052. Some((signed_content.id, signed_content.signature)),
  1053. main_client
  1054. .try_recv()
  1055. .and_then(|(r, _)| r.as_event().cloned())
  1056. .map(|x| (x.id.clone(), x.signature.clone()))
  1057. );
  1058. assert!(main_client.try_recv().is_none());
  1059. }
  1060. }