relayer.rs 41 KB


  1. use crate::{
  2. connection::{ConnectionId, LocalConnection},
  3. Connection, Error,
  4. };
  5. use futures_util::StreamExt;
  6. use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url};
  7. use nostr_rs_storage_base::Storage;
  8. use nostr_rs_types::{
  9. relayer::{self, ROk, ROkStatus},
  10. types::{Addr, Event, SubscriptionId},
  11. Request, Response,
  12. };
  13. use std::{
  14. collections::{HashMap, HashSet},
  15. ops::Deref,
  16. sync::Arc,
  17. time::Instant,
  18. };
  19. use tokio::{
  20. net::{TcpListener, TcpStream},
  21. sync::{
  22. mpsc::{
  23. self, {channel, Receiver, Sender},
  24. },
  25. RwLock,
  26. },
  27. task::JoinHandle,
  28. };
  29. #[derive(Debug, Hash, Ord, PartialEq, PartialOrd, Eq, Clone)]
  30. pub struct RelayerSubscriptionId((SubscriptionId, ConnectionId));
  31. impl From<(SubscriptionId, ConnectionId)> for RelayerSubscriptionId {
  32. fn from(value: (SubscriptionId, ConnectionId)) -> Self {
  33. Self(value)
  34. }
  35. }
  36. impl Default for RelayerSubscriptionId {
  37. fn default() -> Self {
  38. Self((SubscriptionId::empty(), ConnectionId::new_empty()))
  39. }
  40. }
  41. type Connections = Arc<RwLock<HashMap<ConnectionId, Connection>>>;
  42. type SubscriptionManager =
  43. Arc<nostr_rs_subscription_manager::SubscriptionManager<RelayerSubscriptionId, ()>>;
  44. type ClientPoolSubscriptions =
  45. Arc<RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>>;
  46. /// Relayer struct
  47. ///
  48. pub struct Relayer<T: Storage + Send + Sync + 'static> {
  49. /// Storage engine, if provided the services are going to persisted in disk,
  50. /// otherwise all the messages are going to be ephemeral, making this
  51. /// relayer just a dumb proxy (that can be useful for privacy) but it won't
  52. /// be able to perform any optimization like prefetching content while offline
  53. storage: Arc<Option<T>>,
  54. /// Subscription manager
  55. subscription_manager: SubscriptionManager,
  56. /// List of all active connections
  57. connections: Connections,
  58. /// This Sender can be used to send requests from anywhere to the relayer.
  59. send_to_relayer: Sender<(ConnectionId, Request)>,
  60. /// This Receiver is the relayer the way the relayer receives messages
  61. relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
  62. /// Client pool
  63. ///
  64. /// A relayer can optionally be connected to a pool of clients to get
  65. /// foreign events.
  66. client_pool: Option<Arc<Pool>>,
  67. client_pool_receiver: Option<Receiver<(Response, Url)>>,
  68. client_pool_subscriptions: ClientPoolSubscriptions,
  69. }
  70. impl<T: Storage + Send + Sync + 'static> Relayer<T> {
  71. /// Creates a new relayer instance
  72. ///
  73. /// If the storage is given, it will be used to persist events, as well to
  74. /// server past events when a new subscription is added.
  75. ///
  76. /// If the client_pool is given it will be used to connect to those relayers
  77. /// and create a network of relayers, reposting events to them and
  78. /// subscribing to their events.`gqq`
  79. pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
  80. let (relayer_sender, relayer_receiver) = channel(100_000);
  81. let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool {
  82. let result = client_pool.split()?;
  83. (result.0, Some(Arc::new(result.1)))
  84. } else {
  85. let (_, receiver) = mpsc::channel(1);
  86. (receiver, None)
  87. };
  88. Ok(Self {
  89. storage: Arc::new(storage),
  90. subscription_manager: Default::default(),
  91. send_to_relayer: relayer_sender,
  92. relayer_receiver: Some(relayer_receiver),
  93. connections: Default::default(),
  94. client_pool_receiver: Some(client_pool_receiver),
  95. client_pool,
  96. client_pool_subscriptions: Default::default(),
  97. })
  98. }
  99. /// Connects to the relayer pool
  100. pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
  101. let _ = self
  102. .client_pool
  103. .as_ref()
  104. .ok_or(Error::NoClient)?
  105. .connect_to(url)
  106. .await?;
  107. Ok(())
  108. }
  109. /// Total number of subscribers requests that actively listening for new events
  110. pub fn total_subscribers(&self) -> usize {
  111. self.subscription_manager.total_subscribers()
  112. }
  113. /// Splits the relayer object and extract their receiver.
  114. pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> {
  115. let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?;
  116. Ok((self, receiver))
  117. }
  118. /// Runs the relayer main loop in a tokio task and returns it.
  119. ///
  120. /// This function consumes the object and takes the ownership. The returned
  121. /// JoinHandle() can be used to stop the main loop
  122. pub fn main(mut self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
  123. let mut client_pool_receiver = self
  124. .client_pool_receiver
  125. .take()
  126. .ok_or(Error::AlreadySplitted)?;
  127. let (this, mut receiver) = self.split()?;
  128. let _self = Arc::new(this);
  129. let this = _self.clone();
  130. let handle = tokio::spawn(async move {
  131. loop {
  132. let start = Instant::now();
  133. println!("{}", client_pool_receiver.len());
  134. tokio::select! {
  135. Ok((stream, _)) = server.accept() => {
  136. // accept new connections
  137. let _ = this.add_connection(None, stream).await;
  138. },
  139. Some((response, _)) = client_pool_receiver.recv() => {
  140. // process messages from anothe relayer, broadcast it and store it
  141. match response {
  142. Response::Event(event) => {
  143. // we received a message from the client pool, store it locally
  144. // and re-broadcast it.
  145. tokio::spawn(Self::broadcast(
  146. this.storage.clone(),
  147. this.subscription_manager.clone(),
  148. this.connections.clone(),
  149. event.event
  150. ));
  151. }
  152. Response::EndOfStoredEvents(sub) => {
  153. let connections = this.connections.read().await;
  154. let (sub_id, connection) = if let Some((sub_id, conn_id)) = this.client_pool_subscriptions.write().await.remove(&(sub.deref().into())) {
  155. if let Some(connection) = connections.get(&conn_id) {
  156. (sub_id, connection)
  157. } else {
  158. continue;
  159. }
  160. } else {
  161. continue
  162. };
  163. let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into()));
  164. let duration = start.elapsed();
  165. println!("xTime elapsed: {} ms", duration.as_millis());
  166. }
  167. _ => {}
  168. }
  169. }
  170. Some((conn_id, request)) = receiver.recv() => {
  171. tokio::spawn(Self::process_request(
  172. this.storage.clone(),
  173. this.client_pool.clone(),
  174. this.client_pool_subscriptions.clone(),
  175. this.subscription_manager.clone(),
  176. this.connections.clone(),
  177. conn_id,
  178. request.clone()
  179. ));
  180. }
  181. else => {
  182. }
  183. }
  184. }
  185. });
  186. Ok((_self, handle))
  187. }
  188. /// Returns a reference to the internal database
  189. pub fn get_db(&self) -> &Option<T> {
  190. &self.storage
  191. }
  192. /// Adds a new local connection to the list of active connections.
  193. pub async fn create_new_local_connection(self: &Arc<Self>) -> LocalConnection<T> {
  194. let (conn, receiver) = Connection::new_local_connection();
  195. let conn_id = conn.get_conn_id();
  196. self.connections.write().await.insert(conn_id, conn);
  197. (
  198. conn_id,
  199. receiver,
  200. self.send_to_relayer.clone(),
  201. self.clone(),
  202. )
  203. .into()
  204. }
  205. /// Drops a connection from the list of active connections
  206. ///
  207. /// This function only works for local connections, normal connections can
  208. /// be dropped on disconnection.
  209. ///
  210. /// This function could change in the future tu kick connections programmatically
  211. pub fn drop_connection(self: &Arc<Self>, local_connection: &LocalConnection<T>) {
  212. let id = local_connection.conn_id;
  213. let this = self.clone();
  214. tokio::spawn(async move {
  215. this.connections.write().await.remove(&id);
  216. });
  217. }
  218. /// Adds a new TpStream and adds it to the list of active connections.
  219. ///
  220. /// This function will spawn the client's loop to receive incoming messages and send those messages
  221. pub async fn add_connection(
  222. &self,
  223. disconnection_notify: Option<mpsc::Sender<ConnectionId>>,
  224. stream: TcpStream,
  225. ) -> Result<ConnectionId, Error> {
  226. let conn =
  227. Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
  228. .await?;
  229. let id = conn.get_conn_id();
  230. self.connections.write().await.insert(id, conn);
  231. Ok(id)
  232. }
  233. #[cfg(test)]
  234. async fn process_request_from_client(
  235. &self,
  236. connection: &LocalConnection<T>,
  237. request: Request,
  238. ) -> Result<(), Error> {
  239. Self::process_request(
  240. self.storage.clone(),
  241. self.client_pool.clone(),
  242. self.client_pool_subscriptions.clone(),
  243. self.subscription_manager.clone(),
  244. self.connections.clone(),
  245. connection.conn_id,
  246. request,
  247. )
  248. .await
  249. }
  250. /// Process a request from a connected client
  251. async fn process_request(
  252. storage: Arc<Option<T>>,
  253. client_pool: Option<Arc<Pool>>,
  254. client_pool_subscriptions: ClientPoolSubscriptions,
  255. subscription_manager: SubscriptionManager,
  256. connections: Connections,
  257. connection_id: ConnectionId,
  258. request: Request,
  259. ) -> Result<(), Error> {
  260. match request {
  261. Request::Event(event) => {
  262. let read_connections = connections.read().await;
  263. let connection = read_connections
  264. .get(&connection_id)
  265. .ok_or(Error::UnknownConnection(connection_id))?;
  266. let event_id: Addr = event.id.clone().into();
  267. if !Self::broadcast(
  268. storage.clone(),
  269. subscription_manager.clone(),
  270. connections.clone(),
  271. event.deref().clone(),
  272. )
  273. .await?
  274. {
  275. connection.respond(
  276. ROk {
  277. id: event_id,
  278. status: ROkStatus::Duplicate,
  279. }
  280. .into(),
  281. )?;
  282. return Ok(());
  283. }
  284. if let Some(storage) = storage.as_ref() {
  285. let _ = storage.store_local_event(&event).await;
  286. }
  287. if let Some(client_pool) = client_pool.as_ref() {
  288. // pass the event to the pool of clients, so this relayer can relay
  289. // their local events to the clients in the network of relayers
  290. let _ = client_pool.post(event).await;
  291. }
  292. connection.respond(
  293. ROk {
  294. id: event_id,
  295. status: ROkStatus::Ok,
  296. }
  297. .into(),
  298. )?;
  299. }
  300. Request::Request(request) => {
  301. let foreign_subscription = if let Some(client_pool) = client_pool.as_ref() {
  302. // If this relay is connected to other relays through the
  303. // client pool, create the same subscription in them as
  304. // well, with the main goal of fetching any foreign event
  305. // that matches the requested subscription.
  306. //
  307. // If the this happens, this relay will serve any local
  308. // event that matches, as well any foreign event. Foreign
  309. // events will be stored locally as well if there is a
  310. // storage setup.
  311. let foreign_sub_id = client_pool
  312. .subscribe(request.filters.clone().into())
  313. .await?;
  314. client_pool_subscriptions.write().await.insert(
  315. foreign_sub_id.clone(),
  316. (request.subscription_id.clone(), connection_id),
  317. );
  318. Some(foreign_sub_id)
  319. } else {
  320. None
  321. };
  322. let read_connections = connections.read().await;
  323. let connection = read_connections
  324. .get(&connection_id)
  325. .ok_or(Error::UnknownConnection(connection_id))?;
  326. if let Some(storage) = storage.as_ref() {
  327. let mut sent = HashSet::new();
  328. // Sent all events that match the filter that are stored in our database
  329. for filter in request.filters.clone().into_iter() {
  330. let mut result = storage.get_by_filter(filter).await?;
  331. while let Some(Ok(event)) = result.next().await {
  332. if sent.contains(&event.id) {
  333. continue;
  334. }
  335. sent.insert(event.id.clone());
  336. let _ = connection.respond(
  337. relayer::Event {
  338. subscription_id: request.subscription_id.clone(),
  339. event,
  340. }
  341. .into(),
  342. );
  343. }
  344. }
  345. }
  346. if foreign_subscription.is_none() {
  347. // If there is a foreign subscription, we shouldn't send a
  348. // EOS until we have got EOS from all foreign relays
  349. let _ = connection.respond(
  350. relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
  351. );
  352. }
  353. connection
  354. .subscribe(
  355. request.subscription_id.clone(),
  356. (
  357. foreign_subscription,
  358. subscription_manager
  359. .subscribe(
  360. (request.subscription_id, connection.get_conn_id()).into(),
  361. request.filters,
  362. (),
  363. )
  364. .await,
  365. ),
  366. )
  367. .await;
  368. }
  369. Request::Close(close) => {
  370. connections
  371. .read()
  372. .await
  373. .get(&connection_id)
  374. .ok_or(Error::UnknownConnection(connection_id))?
  375. .unsubscribe(&close)
  376. .await;
  377. }
  378. };
  379. Ok(())
  380. }
  381. #[inline]
  382. /// A non-blocking version of broadcast
  383. pub async fn broadcast(
  384. storage: Arc<Option<T>>,
  385. subscription_manager: SubscriptionManager,
  386. connections: Connections,
  387. event: Event,
  388. ) -> Result<bool, Error> {
  389. if let Some(storage) = storage.as_ref() {
  390. if !storage.store(&event).await? {
  391. return Ok(false);
  392. }
  393. }
  394. let connections = connections.read().await;
  395. for RelayerSubscriptionId((sub_id, conn_id)) in
  396. subscription_manager.get_subscribers(&event).await
  397. {
  398. if let Some(connection) = connections.get(&conn_id) {
  399. let _ = connection.respond(
  400. relayer::Event {
  401. subscription_id: sub_id,
  402. event: event.clone(),
  403. }
  404. .into(),
  405. );
  406. }
  407. }
  408. Ok(true)
  409. }
  410. }
  411. #[cfg(test)]
  412. mod test {
  413. use super::*;
  414. use futures::future::join_all;
  415. use nostr_rs_client::Url;
  416. use nostr_rs_memory::Memory;
  417. use nostr_rs_types::{
  418. account::Account,
  419. types::{Content, Tag},
  420. Request,
  421. };
  422. use serde_json::json;
  423. use std::time::Duration;
  424. use tokio::time::sleep;
  425. async fn dummy_server(port: u16, client_pool: Option<Pool>) -> (Url, JoinHandle<()>) {
  426. let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
  427. .await
  428. .unwrap();
  429. let local_addr = listener.local_addr().expect("addr");
  430. let relayer =
  431. Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
  432. let (_, stopper) = relayer.main(listener).expect("valid main loop");
  433. (
  434. Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
  435. stopper,
  436. )
  437. }
  438. async fn dummy_server_with_relayer(
  439. client_pool: Option<Pool>,
  440. ) -> (Arc<Relayer<Memory>>, JoinHandle<()>) {
  441. let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).await.unwrap();
  442. let relayer =
  443. Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
  444. let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
  445. (relayer, stopper)
  446. }
  447. fn get_note_with_custom_tags(tags: serde_json::Value) -> Event {
  448. let account = Account::default();
  449. let content = Content::ShortTextNote("".to_owned());
  450. let tags: Vec<Tag> = serde_json::from_value(tags).expect("valid tags");
  451. account.sign_content(tags, content, None).expect("valid")
  452. }
  453. fn get_note() -> Request {
  454. serde_json::from_value(json!(
  455. [
  456. "EVENT",
  457. {
  458. "kind":1,
  459. "content":"Pong",
  460. "tags":[
  461. ["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],
  462. ["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],
  463. ["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  464. ["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]
  465. ],
  466. "created_at":1681938616,
  467. "pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3",
  468. "id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
  469. "sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc",
  470. }
  471. ])).expect("value")
  472. }
  473. async fn get_db(prefill: bool) -> Memory {
  474. let db = Memory::default();
  475. if prefill {
  476. let events = include_str!("../tests/events.json")
  477. .lines()
  478. .map(|line| serde_json::from_str(line).expect("valid"))
  479. .collect::<Vec<Event>>();
  480. for event in events {
  481. assert!(db.store(&event).await.expect("valid"));
  482. }
  483. while db.is_flushing() {
  484. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
  485. }
  486. }
  487. db
  488. }
  489. #[tokio::test]
  490. async fn serve_listener_from_local_db_custom_tag() {
  491. let request = serde_json::from_value(json!([
  492. "REQ",
  493. "1298169700973717",
  494. {
  495. "#f": [
  496. "foo",
  497. ],
  498. },
  499. ]))
  500. .expect("valid object");
  501. let relayer =
  502. Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
  503. let mut connection = relayer.create_new_local_connection().await;
  504. let note = get_note_with_custom_tags(json!([["f", "foo"]]));
  505. let _ = relayer
  506. .process_request_from_client(&connection, note.clone().into())
  507. .await;
  508. sleep(Duration::from_millis(10)).await;
  509. let _ = relayer
  510. .process_request_from_client(&connection, request)
  511. .await;
  512. // ev1
  513. assert_eq!(
  514. ROkStatus::Ok,
  515. connection
  516. .try_recv()
  517. .expect("valid")
  518. .as_ok()
  519. .cloned()
  520. .unwrap()
  521. .status,
  522. );
  523. // ev1
  524. assert_eq!(
  525. note,
  526. connection
  527. .try_recv()
  528. .expect("valid")
  529. .as_event()
  530. .unwrap()
  531. .event
  532. );
  533. // eod
  534. assert!(connection
  535. .try_recv()
  536. .expect("valid")
  537. .as_end_of_stored_events()
  538. .is_some());
  539. assert!(connection.try_recv().is_none());
  540. }
  541. #[tokio::test]
  542. async fn serve_listener_from_local_db() {
  543. let request = serde_json::from_value(json!([
  544. "REQ",
  545. "1298169700973717",
  546. {
  547. "authors": [
  548. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  549. ],
  550. "until": 1681928304
  551. },
  552. {
  553. "#p": [
  554. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  555. ],
  556. "kinds": [
  557. 1,
  558. 3,
  559. 6,
  560. 7,
  561. 9735
  562. ],
  563. "until": 1681928304
  564. },
  565. {
  566. "#p": [
  567. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  568. ],
  569. "kinds": [
  570. 4
  571. ]
  572. },
  573. {
  574. "authors": [
  575. "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
  576. ],
  577. "kinds": [
  578. 4
  579. ]
  580. },
  581. {
  582. "#e": [
  583. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  584. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  585. ],
  586. "kinds": [
  587. 1,
  588. 6,
  589. 7,
  590. 9735
  591. ]
  592. }
  593. ]))
  594. .expect("valid object");
  595. let relayer =
  596. Arc::new(Relayer::new(Some(get_db(true).await), None).expect("valid relayer"));
  597. let mut connection = relayer.create_new_local_connection().await;
  598. let _ = relayer
  599. .process_request_from_client(&connection, request)
  600. .await;
  601. // ev1
  602. assert_eq!(
  603. "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
  604. connection
  605. .try_recv()
  606. .expect("valid")
  607. .as_event()
  608. .expect("event")
  609. .id
  610. .to_string()
  611. );
  612. // ev3
  613. assert_eq!(
  614. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
  615. connection
  616. .try_recv()
  617. .expect("valid")
  618. .as_event()
  619. .expect("event")
  620. .id
  621. .to_string()
  622. );
  623. // ev2
  624. assert_eq!(
  625. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  626. connection
  627. .try_recv()
  628. .expect("valid")
  629. .as_event()
  630. .expect("event")
  631. .id
  632. .to_string()
  633. );
  634. // eod
  635. assert!(connection
  636. .try_recv()
  637. .expect("valid")
  638. .as_end_of_stored_events()
  639. .is_some());
  640. assert!(connection.try_recv().is_none());
  641. }
  642. #[tokio::test]
  643. async fn server_listener_real_time_single_argument() {
  644. let request: Request = serde_json::from_value(json!(
  645. [
  646. "REQ",
  647. "1298169700973717",
  648. {
  649. "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  650. "since":1681939304
  651. },
  652. {
  653. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  654. "since":1681939304
  655. },
  656. {
  657. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  658. },
  659. {
  660. "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  661. },
  662. {
  663. "#e":[
  664. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  665. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  666. ],
  667. }
  668. ]))
  669. .expect("valid object");
  670. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  671. let mut receiver = relayer.create_new_local_connection().await;
  672. let mut publisher = relayer.create_new_local_connection().await;
  673. assert_eq!(relayer.total_subscribers(), 0);
  674. receiver.send(request).await.expect("subscribe");
  675. sleep(Duration::from_millis(10)).await;
  676. assert_eq!(relayer.total_subscribers(), 1);
  677. // eod
  678. assert!(receiver
  679. .try_recv()
  680. .expect("valid")
  681. .as_end_of_stored_events()
  682. .is_some());
  683. // It is empty
  684. assert!(receiver.try_recv().is_none());
  685. publisher.send(get_note()).await.expect("valid send");
  686. sleep(Duration::from_millis(10)).await;
  687. // ok from posting
  688. let msg = publisher.try_recv();
  689. assert!(msg.is_some());
  690. assert_eq!(
  691. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  692. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  693. );
  694. // It is not empty
  695. let msg = receiver.try_recv();
  696. assert!(msg.is_some());
  697. assert_eq!(
  698. msg.expect("is ok")
  699. .as_event()
  700. .expect("valid")
  701. .subscription_id
  702. .to_string(),
  703. "1298169700973717".to_owned()
  704. );
  705. // it must be deliverd at most once
  706. assert!(receiver.try_recv().is_none());
  707. assert_eq!(relayer.total_subscribers(), 1);
  708. // when client is dropped, the subscription is removed
  709. // automatically
  710. drop(receiver);
  711. sleep(Duration::from_millis(10)).await;
  712. assert_eq!(relayer.total_subscribers(), 0);
  713. }
  714. #[tokio::test]
  715. async fn server_listener_real_time() {
  716. let request: Request = serde_json::from_value(json!(
  717. [
  718. "REQ",
  719. "1298169700973717",
  720. {
  721. "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  722. "since":1681939304
  723. },
  724. {
  725. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  726. "kinds":[1,3,6,7,9735],
  727. "since":1681939304
  728. },
  729. {
  730. "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  731. "kinds":[4]
  732. },
  733. {
  734. "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
  735. "kinds":[4]
  736. },
  737. {
  738. "#e":[
  739. "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
  740. "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
  741. ],
  742. "kinds":[1,6,7,9735]
  743. }
  744. ]))
  745. .expect("valid object");
  746. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  747. let mut receiver = relayer.create_new_local_connection().await;
  748. let mut publisher = relayer.create_new_local_connection().await;
  749. assert_eq!(relayer.total_subscribers(), 0);
  750. receiver.send(request).await.expect("subscribe");
  751. sleep(Duration::from_millis(10)).await;
  752. assert_eq!(relayer.total_subscribers(), 1);
  753. // eod
  754. assert!(receiver
  755. .try_recv()
  756. .expect("valid")
  757. .as_end_of_stored_events()
  758. .is_some());
  759. // It is empty
  760. assert!(receiver.try_recv().is_none());
  761. publisher.send(get_note()).await.expect("valid send");
  762. sleep(Duration::from_millis(100)).await;
  763. // ok from posting
  764. let msg = publisher.try_recv();
  765. assert!(msg.is_some());
  766. assert_eq!(
  767. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  768. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  769. );
  770. // It is not empty
  771. let msg = receiver.try_recv();
  772. assert!(msg.is_some());
  773. assert_eq!(
  774. msg.expect("is ok")
  775. .as_event()
  776. .expect("valid")
  777. .subscription_id
  778. .to_string(),
  779. "1298169700973717".to_owned()
  780. );
  781. // it must be deliverd at most once
  782. assert!(receiver.try_recv().is_none());
  783. assert_eq!(relayer.total_subscribers(), 1);
  784. // when client is dropped, the subscription is removed
  785. // automatically
  786. drop(receiver);
  787. sleep(Duration::from_millis(10)).await;
  788. assert_eq!(relayer.total_subscribers(), 0);
  789. }
  790. #[tokio::test]
  791. async fn multiple_subcribers() {
  792. let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
  793. "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb4"],
  794. }]))
  795. .expect("valid object");
  796. let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
  797. "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"]
  798. }]))
  799. .expect("valid object");
  800. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  801. let mut publisher = relayer.create_new_local_connection().await;
  802. let mut set1 = join_all(
  803. (0..1000)
  804. .map(|_| relayer.create_new_local_connection())
  805. .collect::<Vec<_>>(),
  806. )
  807. .await;
  808. let mut set2 = join_all(
  809. (0..100)
  810. .map(|_| relayer.create_new_local_connection())
  811. .collect::<Vec<_>>(),
  812. )
  813. .await;
  814. assert_eq!(relayer.total_subscribers(), 0);
  815. join_all(
  816. set1.iter()
  817. .map(|connection| connection.send(req1.clone()))
  818. .collect::<Vec<_>>(),
  819. )
  820. .await
  821. .into_iter()
  822. .collect::<Result<Vec<_>, _>>()
  823. .expect("subscribe all");
  824. join_all(
  825. set2.iter()
  826. .map(|connection| connection.send(req2.clone()))
  827. .collect::<Vec<_>>(),
  828. )
  829. .await
  830. .into_iter()
  831. .collect::<Result<Vec<_>, _>>()
  832. .expect("subscribe all");
  833. sleep(Duration::from_millis(10)).await;
  834. for connection in set1.iter_mut() {
  835. assert!(connection
  836. .try_recv()
  837. .expect("end of stored events")
  838. .as_end_of_stored_events()
  839. .is_some());
  840. }
  841. for connection in set2.iter_mut() {
  842. assert!(connection
  843. .try_recv()
  844. .expect("end of stored events")
  845. .as_end_of_stored_events()
  846. .is_some());
  847. }
  848. assert_eq!(relayer.total_subscribers(), 1100);
  849. publisher.send(get_note()).await.expect("valid send");
  850. sleep(Duration::from_millis(10)).await;
  851. let msg = publisher.try_recv();
  852. assert!(msg.is_some());
  853. assert_eq!(
  854. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  855. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  856. );
  857. for connection in set1.iter_mut() {
  858. assert!(connection.try_recv().is_none());
  859. }
  860. for connection in set2.iter_mut() {
  861. let msg = connection.try_recv();
  862. assert!(msg.is_some());
  863. let msg = msg.expect("msg");
  864. assert_eq!(
  865. msg.as_event().expect("valid").subscription_id.to_string(),
  866. "1298169700973717".to_owned()
  867. );
  868. assert!(connection.try_recv().is_none());
  869. }
  870. drop(set1);
  871. sleep(Duration::from_millis(10)).await;
  872. assert_eq!(relayer.total_subscribers(), 100);
  873. drop(set2);
  874. sleep(Duration::from_millis(10)).await;
  875. assert_eq!(relayer.total_subscribers(), 0);
  876. drop(relayer);
  877. }
  878. #[tokio::test]
  879. async fn posting_event_replies_ok() {
  880. let relayer =
  881. Arc::new(Relayer::new(Some(get_db(false).await), None).expect("valid relayer"));
  882. let mut connection = relayer.create_new_local_connection().await;
  883. let note = get_note();
  884. let note_id = note.as_event().map(|x| x.id.clone()).unwrap();
  885. relayer
  886. .process_request_from_client(&connection, note)
  887. .await
  888. .expect("process event");
  889. sleep(Duration::from_millis(10)).await;
  890. assert_eq!(
  891. Some(
  892. ROk {
  893. id: note_id.into(),
  894. status: ROkStatus::Ok,
  895. }
  896. .into()
  897. ),
  898. connection.try_recv()
  899. );
  900. }
  901. #[tokio::test]
  902. async fn subscribe_to_all() {
  903. let request: Request =
  904. serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
  905. let (relayer, _stopper) = dummy_server_with_relayer(None).await;
  906. let mut local_connection_0 = relayer.create_new_local_connection().await;
  907. let mut local_connection_1 = relayer.create_new_local_connection().await;
  908. assert_eq!(relayer.total_subscribers(), 0);
  909. local_connection_1.send(request).await.expect("valid send");
  910. sleep(Duration::from_millis(10)).await;
  911. assert_eq!(relayer.total_subscribers(), 1);
  912. // eod
  913. assert!(local_connection_1
  914. .try_recv()
  915. .expect("valid")
  916. .as_end_of_stored_events()
  917. .is_some());
  918. // It is empty
  919. assert!(local_connection_1.try_recv().is_none());
  920. local_connection_0
  921. .send(get_note())
  922. .await
  923. .expect("valid send");
  924. sleep(Duration::from_millis(10)).await;
  925. // ok from posting
  926. let msg = local_connection_0.try_recv();
  927. assert!(msg.is_some());
  928. assert_eq!(
  929. msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
  930. "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
  931. );
  932. // It is not empty
  933. let msg = local_connection_1.try_recv();
  934. assert!(msg.is_some());
  935. assert_eq!(
  936. msg.expect("is ok")
  937. .as_event()
  938. .expect("valid")
  939. .subscription_id
  940. .to_string(),
  941. "1298169700973717".to_owned()
  942. );
  943. // it must be deliverd at most once
  944. assert!(local_connection_1.try_recv().is_none());
  945. assert_eq!(relayer.total_subscribers(), 1);
  946. // when client is dropped, the subscription is removed
  947. // automatically
  948. drop(local_connection_1);
  949. sleep(Duration::from_millis(10)).await;
  950. assert_eq!(relayer.total_subscribers(), 0);
  951. }
  952. #[tokio::test]
  953. async fn relayer_posts_to_custom_posts_to_all_clients() {
  954. let (relayer1, _) = dummy_server(0, None).await;
  955. let (relayer2, _) = dummy_server(0, None).await;
  956. let (relayer3, _) = dummy_server(0, None).await;
  957. let (pool, _in_scope) =
  958. Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
  959. .expect("valid pool");
  960. let (main_relayer, _) = dummy_server(0, Some(pool)).await;
  961. let (mut reader_client, _reader_client_inscope) =
  962. Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
  963. .expect("valid pool");
  964. let (main_client, _main_client_inscope) =
  965. Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
  966. let _sub = reader_client
  967. .subscribe(Default::default())
  968. .await
  969. .expect("v");
  970. sleep(Duration::from_millis(20)).await;
  971. assert!(reader_client
  972. .try_recv()
  973. .map(|(r, _)| r)
  974. .expect("valid message: step")
  975. .as_end_of_stored_events()
  976. .is_some());
  977. assert!(reader_client.try_recv().is_none());
  978. let account1 = Account::default();
  979. let signed_content = account1
  980. .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
  981. .expect("valid signed content");
  982. // account1 posts a new note into the relayer1, and the main relayer
  983. // should get a copy of it, as well as it is connected to relayer2 and
  984. // relayer1.
  985. main_client.post(signed_content.clone().into()).await;
  986. sleep(Duration::from_millis(10)).await;
  987. let responses = (0..3)
  988. .map(|_| reader_client.try_recv().expect("valid message"))
  989. .filter_map(|(r, url)| {
  990. r.as_event()
  991. .map(|r| (url.port().expect("port"), r.to_owned()))
  992. })
  993. .collect::<HashMap<_, _>>();
  994. assert!(reader_client.try_recv().is_none());
  995. assert_eq!(responses.len(), 3);
  996. assert_eq!(
  997. responses
  998. .get(&relayer1.port().expect("port"))
  999. .map(|x| x.id.clone()),
  1000. Some(signed_content.id.clone())
  1001. );
  1002. assert_eq!(
  1003. responses
  1004. .get(&relayer2.port().expect("port"))
  1005. .map(|x| x.id.clone()),
  1006. Some(signed_content.id.clone())
  1007. );
  1008. assert_eq!(
  1009. responses
  1010. .get(&relayer3.port().expect("port"))
  1011. .map(|x| x.id.clone()),
  1012. Some(signed_content.id)
  1013. );
  1014. }
  1015. #[tokio::test]
  1016. async fn relayer_with_client_pool() {
  1017. let (relayer1, _) = dummy_server(0, None).await;
  1018. let (relayer2, _) = dummy_server(0, None).await;
  1019. let (pool, _in_scope) =
  1020. Pool::new_with_clients(vec![relayer1.clone(), relayer2]).expect("valid pool");
  1021. let (main_relayer, _) = dummy_server(0, Some(pool)).await;
  1022. let (secondary_client, _sc) = Pool::new_with_clients(vec![relayer1]).expect("valid client");
  1023. // Create a subscription in the main relayer, main_client is only
  1024. // connected to the main relayer
  1025. let (mut main_client, _in_scope) =
  1026. Pool::new_with_clients(vec![main_relayer]).expect("valid client");
  1027. let _sub = main_client.subscribe(Default::default()).await.expect("v");
  1028. sleep(Duration::from_millis(10)).await;
  1029. assert!(main_client
  1030. .try_recv()
  1031. .map(|(r, _)| r)
  1032. .expect("valid message")
  1033. .as_end_of_stored_events()
  1034. .is_some());
  1035. assert!(main_client.try_recv().is_none());
  1036. let account1 = Account::default();
  1037. let signed_content = account1
  1038. .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None)
  1039. .expect("valid signed content");
  1040. // account1 posts a new note into the relayer1, and the main relayer
  1041. // should get a copy of it, as well as it is connected to relayer2 and
  1042. // relayer1.
  1043. secondary_client.post(signed_content.clone().into()).await;
  1044. // wait for the note to be delivered
  1045. sleep(Duration::from_millis(10)).await;
  1046. assert_eq!(
  1047. Some((signed_content.id, signed_content.signature)),
  1048. main_client
  1049. .try_recv()
  1050. .and_then(|(r, _)| r.as_event().cloned())
  1051. .map(|x| (x.id.clone(), x.signature.clone()))
  1052. );
  1053. assert!(main_client.try_recv().is_none());
  1054. }
  1055. }