use crate::{pool::Event, Error}; use futures::Future; use futures_util::{SinkExt, StreamExt}; use nostr_rs_types::{Request, Response}; use std::{ pin::Pin, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, }, }; use tokio::{ sync::mpsc, task::JoinHandle, time::{sleep, timeout, Duration}, }; use tokio_tungstenite::{connect_async, tungstenite::Message}; use url::Url; /// Relayer object #[derive(Debug)] pub struct Relayer { /// URL of the relayer pub url: String, /// Sender to the relayer. This can be used to send a Requests to this /// relayer pub send_to_socket: mpsc::Sender, worker: JoinHandle<()>, is_connected: Arc, } const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120; impl Drop for Relayer { fn drop(&mut self) { self.worker.abort() } } impl Relayer { /// Creates a new relayer pub fn new( broadcast_to_listeners: mpsc::Sender<(Event, String)>, url: &str, on_connection: Option, ) -> Result where F: (Fn(&str, mpsc::Sender) -> Pin + Send>>) + Send + Sync + 'static, { let (sender_to_socket, send_to_socket) = mpsc::channel(100_000); let is_connected = Arc::new(AtomicBool::new(false)); let worker = Self::spawn_background_client( broadcast_to_listeners, sender_to_socket.clone(), send_to_socket, url, is_connected.clone(), on_connection, )?; Ok(Self { url: url.to_owned(), is_connected, send_to_socket: sender_to_socket, worker, }) } fn spawn_background_client( broadcast_to_listeners: mpsc::Sender<(Event, String)>, sender_to_socket: mpsc::Sender, mut send_to_socket: mpsc::Receiver, url_str: &str, is_connected: Arc, on_connection: Option, ) -> Result, Error> where F: (Fn(&str, mpsc::Sender) -> Pin + Send>>) + Send + Sync + 'static, { let url = url_str.to_owned(); let url_parsed = Url::parse(&url)?; is_connected.store(false, Relaxed); Ok(tokio::spawn(async move { let mut connection_attempts = 0; loop { log::warn!("{}: Connect attempt {}", url, connection_attempts); connection_attempts += 1; let mut socket = match connect_async(url_parsed.clone()).await { Ok(x) => x.0, Err(err) => { log::warn!("{}: Failed to connect: {}", url, err); sleep(Duration::from_secs(5)).await; continue; } }; log::info!("Connected to {}", url); connection_attempts = 0; if let Some(on_connection) = &on_connection { on_connection(&url, sender_to_socket.clone()).await; } loop { tokio::select! { Some(msg) = send_to_socket.recv() => { if let Ok(json) = serde_json::to_string(&msg) { log::info!("{}: Sending {}", url, json); if let Err(x) = socket.send(Message::Text(json)).await { log::error!("{} : Reconnecting due {}", url, x); break; } } } msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => { let msg = if let Ok(Some(Ok(msg))) = msg { is_connected.store(true, Relaxed); match msg { Message::Text(text) => text, Message::Ping(msg) => { if let Err(x) = socket.send(Message::Pong(msg)).await { log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x); break; } continue; }, msg => { log::error!("Unexpected {:?}", msg); continue; } } } else { log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg); break; }; if msg.is_empty() { continue; } log::info!("New message: {}", msg); let msg: Result = serde_json::from_str(&msg); if let Ok(msg) = msg { if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.to_owned())) { log::error!("{}: Reconnecting client because of {}", url, error); break; } } } else => { log::warn!("{}: else", url); break; } } } is_connected.store(false, Relaxed); // Throttle down to not spam the server with reconnections sleep(Duration::from_millis(500)).await; } })) } /// Checks if the relayer is connected. It is guaranteed that the relayer is /// connected if this method returns true. pub fn is_connected(&self) -> bool { self.is_connected.load(Relaxed) } /// Sends a requests to this relayer pub async fn send(&self, request: Request) -> Result<(), Error> { self.send_to_socket .send(request) .await .map_err(|e| Error::Sync(Box::new(e))) } }