relayer.rs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. use crate::{pool::Event, Error};
  2. use futures::Future;
  3. use futures_util::{SinkExt, StreamExt};
  4. use nostr_rs_types::{Request, Response};
  5. use std::{
  6. pin::Pin,
  7. sync::{
  8. atomic::{AtomicBool, Ordering::Relaxed},
  9. Arc,
  10. },
  11. };
  12. use tokio::{
  13. sync::{mpsc, oneshot},
  14. time::{sleep, timeout, Duration},
  15. };
  16. use tokio_tungstenite::{connect_async, tungstenite::Message};
  17. use url::Url;
  18. /// Relayer object
  19. #[derive(Debug)]
  20. pub struct Relayer {
  21. /// URL of the relayer
  22. pub url: String,
  23. /// Sender to the relayer. This can be used to send a Requests to this relayer
  24. pub send_to_socket: mpsc::Sender<Request>,
  25. is_connected: Arc<AtomicBool>,
  26. /// This sender signals to background connection to stop
  27. stop_service: oneshot::Sender<()>,
  28. }
  29. const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
  30. impl Relayer {
  31. /// Creates a new relayer
  32. pub fn new<F>(
  33. broadcast_to_listeners: mpsc::Sender<(Event, String)>,
  34. max_connections_attempts: u16,
  35. url: &str,
  36. on_connection: Option<F>,
  37. ) -> Result<Self, Error>
  38. where
  39. F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
  40. + Send
  41. + Sync
  42. + 'static,
  43. {
  44. let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
  45. let is_connected = Arc::new(AtomicBool::new(false));
  46. let stop_service = Self::spawn_background_client(
  47. broadcast_to_listeners,
  48. sender_to_socket.clone(),
  49. send_to_socket,
  50. url,
  51. max_connections_attempts,
  52. is_connected.clone(),
  53. on_connection,
  54. )?;
  55. Ok(Self {
  56. url: url.to_owned(),
  57. is_connected,
  58. send_to_socket: sender_to_socket,
  59. stop_service,
  60. })
  61. }
  62. fn spawn_background_client<F>(
  63. broadcast_to_listeners: mpsc::Sender<(Event, String)>,
  64. sender_to_socket: mpsc::Sender<Request>,
  65. mut send_to_socket: mpsc::Receiver<Request>,
  66. url_str: &str,
  67. max_connections_attempts: u16,
  68. is_connected: Arc<AtomicBool>,
  69. on_connection: Option<F>,
  70. ) -> Result<oneshot::Sender<()>, Error>
  71. where
  72. F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
  73. + Send
  74. + Sync
  75. + 'static,
  76. {
  77. let (stop_service, mut stopper_recv) = oneshot::channel();
  78. let url = url_str.to_owned();
  79. let url_parsed = Url::parse(&url)?;
  80. tokio::spawn(async move {
  81. let mut reconnect = true;
  82. let mut connection_attempts = 0;
  83. while reconnect && connection_attempts <= max_connections_attempts {
  84. log::warn!("{}: Connect attempt {}", url, connection_attempts);
  85. connection_attempts += 1;
  86. let mut socket = match connect_async(url_parsed.clone()).await {
  87. Ok(x) => x.0,
  88. Err(err) => {
  89. log::warn!("{}: Failed to connect: {}", url, err);
  90. sleep(Duration::from_secs(5)).await;
  91. continue;
  92. }
  93. };
  94. log::info!("Connected to {}", url);
  95. if let Some(on_connection) = &on_connection {
  96. on_connection(&url, sender_to_socket.clone()).await;
  97. }
  98. loop {
  99. tokio::select! {
  100. Ok(()) = &mut stopper_recv => {
  101. log::warn!("{}: Breaking client due external signal", url);
  102. reconnect = false;
  103. break;
  104. },
  105. Some(msg) = send_to_socket.recv() => {
  106. if let Ok(json) = serde_json::to_string(&msg) {
  107. log::info!("{}: Sending {}", url, json);
  108. if let Err(x) = socket.send(Message::Text(json)).await {
  109. log::error!("{} : Reconnecting due {}", url, x);
  110. break;
  111. }
  112. }
  113. }
  114. msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
  115. is_connected.store(true, Relaxed);
  116. let msg = if let Ok(Some(Ok(msg))) = msg {
  117. match msg {
  118. Message::Text(text) => text,
  119. Message::Ping(msg) => {
  120. if let Err(x) = socket.send(Message::Pong(msg)).await {
  121. log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
  122. break;
  123. }
  124. continue;
  125. },
  126. msg => {
  127. log::error!("Unexpected {:?}", msg);
  128. continue;
  129. }
  130. }
  131. } else {
  132. log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
  133. break;
  134. };
  135. if msg.is_empty() {
  136. continue;
  137. }
  138. log::info!("New message: {}", msg);
  139. connection_attempts = 0;
  140. let msg: Result<Response, _> = serde_json::from_str(&msg);
  141. if let Ok(msg) = msg {
  142. if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.to_owned())) {
  143. log::error!("{}: Reconnecting client because of {}", url, error);
  144. break;
  145. }
  146. }
  147. }
  148. else => {
  149. log::warn!("{}: else", url);
  150. break;
  151. }
  152. }
  153. }
  154. is_connected.store(false, Relaxed);
  155. // Throttle down to not spam the server with reconnections
  156. sleep(Duration::from_millis(500)).await;
  157. }
  158. let _ = broadcast_to_listeners.try_send((Event::Disconnected, "".to_owned()));
  159. log::warn!("{}: Disconnected", url);
  160. });
  161. Ok(stop_service)
  162. }
  163. /// Checks if the relayer background connection is running. It is not
  164. /// guaranteed there is an active connection, it may be in the process of
  165. /// reconnecting.
  166. pub fn is_running(&self) -> bool {
  167. !self.stop_service.is_closed()
  168. }
  169. /// Checks if the relayer is connected. It is guaranteed that the relayer is
  170. /// connected if this method returns true.
  171. pub fn is_connected(&self) -> bool {
  172. self.is_connected.load(Relaxed)
  173. }
  174. /// Sends a requests to this relayer
  175. pub async fn send(&self, request: Request) -> Result<(), Error> {
  176. self.send_to_socket
  177. .send(request)
  178. .await
  179. .map_err(|e| Error::Sync(Box::new(e)))
  180. }
  181. /// Stops the background thread that has the connection to this relayer
  182. pub async fn disconnect(self) {
  183. let _ = self.stop_service.send(());
  184. }
  185. }