relayer.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. use crate::{pool::Event, Error};
  2. use futures::Future;
  3. use futures_util::{SinkExt, StreamExt};
  4. use nostr_rs_types::{Request, Response};
  5. use std::{
  6. pin::Pin,
  7. sync::{
  8. atomic::{AtomicBool, Ordering::Relaxed},
  9. Arc,
  10. },
  11. };
  12. use tokio::{
  13. sync::mpsc,
  14. task::JoinHandle,
  15. time::{sleep, timeout, Duration},
  16. };
  17. use tokio_tungstenite::{connect_async, tungstenite::Message};
  18. use url::Url;
  19. /// Relayer object
  20. #[derive(Debug)]
  21. pub struct Relayer {
  22. /// URL of the relayer
  23. pub url: String,
  24. /// Sender to the relayer. This can be used to send a Requests to this
  25. /// relayer
  26. pub send_to_socket: mpsc::Sender<Request>,
  27. worker: JoinHandle<()>,
  28. is_connected: Arc<AtomicBool>,
  29. }
  30. const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
  31. impl Drop for Relayer {
  32. fn drop(&mut self) {
  33. self.worker.abort()
  34. }
  35. }
  36. impl Relayer {
  37. /// Creates a new relayer
  38. pub fn new<F>(
  39. broadcast_to_listeners: mpsc::Sender<(Event, String)>,
  40. url: &str,
  41. on_connection: Option<F>,
  42. ) -> Result<Self, Error>
  43. where
  44. F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
  45. + Send
  46. + Sync
  47. + 'static,
  48. {
  49. let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
  50. let is_connected = Arc::new(AtomicBool::new(false));
  51. let worker = Self::spawn_background_client(
  52. broadcast_to_listeners,
  53. sender_to_socket.clone(),
  54. send_to_socket,
  55. url,
  56. is_connected.clone(),
  57. on_connection,
  58. )?;
  59. Ok(Self {
  60. url: url.to_owned(),
  61. is_connected,
  62. send_to_socket: sender_to_socket,
  63. worker,
  64. })
  65. }
  66. fn spawn_background_client<F>(
  67. broadcast_to_listeners: mpsc::Sender<(Event, String)>,
  68. sender_to_socket: mpsc::Sender<Request>,
  69. mut send_to_socket: mpsc::Receiver<Request>,
  70. url_str: &str,
  71. is_connected: Arc<AtomicBool>,
  72. on_connection: Option<F>,
  73. ) -> Result<JoinHandle<()>, Error>
  74. where
  75. F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
  76. + Send
  77. + Sync
  78. + 'static,
  79. {
  80. let url = url_str.to_owned();
  81. let url_parsed = Url::parse(&url)?;
  82. is_connected.store(false, Relaxed);
  83. Ok(tokio::spawn(async move {
  84. let mut connection_attempts = 0;
  85. loop {
  86. log::warn!("{}: Connect attempt {}", url, connection_attempts);
  87. connection_attempts += 1;
  88. let mut socket = match connect_async(url_parsed.clone()).await {
  89. Ok(x) => x.0,
  90. Err(err) => {
  91. log::warn!("{}: Failed to connect: {}", url, err);
  92. sleep(Duration::from_secs(5)).await;
  93. continue;
  94. }
  95. };
  96. log::info!("Connected to {}", url);
  97. connection_attempts = 0;
  98. if let Some(on_connection) = &on_connection {
  99. on_connection(&url, sender_to_socket.clone()).await;
  100. }
  101. loop {
  102. tokio::select! {
  103. Some(msg) = send_to_socket.recv() => {
  104. if let Ok(json) = serde_json::to_string(&msg) {
  105. log::info!("{}: Sending {}", url, json);
  106. if let Err(x) = socket.send(Message::Text(json)).await {
  107. log::error!("{} : Reconnecting due {}", url, x);
  108. break;
  109. }
  110. }
  111. }
  112. msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
  113. let msg = if let Ok(Some(Ok(msg))) = msg {
  114. is_connected.store(true, Relaxed);
  115. match msg {
  116. Message::Text(text) => text,
  117. Message::Ping(msg) => {
  118. if let Err(x) = socket.send(Message::Pong(msg)).await {
  119. log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
  120. break;
  121. }
  122. continue;
  123. },
  124. msg => {
  125. log::error!("Unexpected {:?}", msg);
  126. continue;
  127. }
  128. }
  129. } else {
  130. log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
  131. break;
  132. };
  133. if msg.is_empty() {
  134. continue;
  135. }
  136. log::info!("New message: {}", msg);
  137. let msg: Result<Response, _> = serde_json::from_str(&msg);
  138. if let Ok(msg) = msg {
  139. if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.to_owned())) {
  140. log::error!("{}: Reconnecting client because of {}", url, error);
  141. break;
  142. }
  143. }
  144. }
  145. else => {
  146. log::warn!("{}: else", url);
  147. break;
  148. }
  149. }
  150. }
  151. is_connected.store(false, Relaxed);
  152. // Throttle down to not spam the server with reconnections
  153. sleep(Duration::from_millis(500)).await;
  154. }
  155. }))
  156. }
  157. /// Checks if the relayer is connected. It is guaranteed that the relayer is
  158. /// connected if this method returns true.
  159. pub fn is_connected(&self) -> bool {
  160. self.is_connected.load(Relaxed)
  161. }
  162. /// Sends a requests to this relayer
  163. pub async fn send(&self, request: Request) -> Result<(), Error> {
  164. self.send_to_socket
  165. .send(request)
  166. .await
  167. .map_err(|e| Error::Sync(Box::new(e)))
  168. }
  169. }