| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 | use crate::{pool::Event, Error};use futures::Future;use futures_util::{SinkExt, StreamExt};use nostr_rs_types::{Request, Response};use std::{    pin::Pin,    sync::{        atomic::{AtomicBool, Ordering::Relaxed},        Arc,    },};use tokio::{    sync::{mpsc, oneshot},    time::{sleep, timeout, Duration},};use tokio_tungstenite::{connect_async, tungstenite::Message};use url::Url;/// Relayer object#[derive(Debug)]pub struct Relayer {    /// URL of the relayer    pub url: String,    /// Sender to the relayer. This can be used to send a Requests to this relayer    pub send_to_socket: mpsc::Sender<Request>,    is_connected: Arc<AtomicBool>,    /// This sender signals to background connection to stop    stop_service: oneshot::Sender<()>,}const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;impl Relayer {    /// Creates a new relayer    pub fn new<F>(        broadcast_to_listeners: mpsc::Sender<(Event, String)>,        max_connections_attempts: u16,        url: &str,        on_connection: Option<F>,    ) -> Result<Self, Error>    where        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)            + Send            + Sync            + 'static,    {        let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);        let is_connected = Arc::new(AtomicBool::new(false));        let stop_service = Self::spawn_background_client(            broadcast_to_listeners,            sender_to_socket.clone(),            send_to_socket,            url,            max_connections_attempts,            is_connected.clone(),            on_connection,        )?;        Ok(Self {            url: url.to_owned(),            is_connected,            send_to_socket: sender_to_socket,            stop_service,        })    }    fn spawn_background_client<F>(        broadcast_to_listeners: mpsc::Sender<(Event, String)>,        sender_to_socket: mpsc::Sender<Request>,        mut send_to_socket: mpsc::Receiver<Request>,        url_str: &str,        max_connections_attempts: u16,        is_connected: Arc<AtomicBool>,        on_connection: Option<F>,    ) -> Result<oneshot::Sender<()>, Error>    where        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)            + Send            + Sync            + 'static,    {        let (stop_service, mut stopper_recv) = oneshot::channel();        let url = url_str.to_owned();        let url_parsed = Url::parse(&url)?;        tokio::spawn(async move {            let mut reconnect = true;            let mut connection_attempts = 0;            while reconnect && connection_attempts <= max_connections_attempts {                log::warn!("{}: Connect attempt {}", url, connection_attempts);                connection_attempts += 1;                let mut socket = match connect_async(url_parsed.clone()).await {                    Ok(x) => x.0,                    Err(err) => {                        log::warn!("{}: Failed to connect: {}", url, err);                        sleep(Duration::from_secs(5)).await;                        continue;                    }                };                log::info!("Connected to {}", url);                if let Some(on_connection) = &on_connection {                    on_connection(&url, sender_to_socket.clone()).await;                }                loop {                    tokio::select! {                        Ok(()) = &mut stopper_recv => {                            log::warn!("{}: Breaking client due external signal", url);                            reconnect = false;                            break;                        },                        Some(msg) = send_to_socket.recv() => {                            if let Ok(json) = serde_json::to_string(&msg) {                                log::info!("{}: Sending {}", url, json);                                if let Err(x) = socket.send(Message::Text(json)).await {                                    log::error!("{} : Reconnecting due {}", url, x);                                    break;                                }                            }                        }                        msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {                            is_connected.store(true, Relaxed);                            let msg = if let Ok(Some(Ok(msg))) = msg {                                    match msg {                                        Message::Text(text) => text,                                        Message::Ping(msg) => {                                            if let Err(x) = socket.send(Message::Pong(msg)).await {                                                log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);                                                break;                                            }                                            continue;                                        },                                        msg => {                                            log::error!("Unexpected {:?}", msg);                                            continue;                                        }                                    }                                } else {                                    log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);                                    break;                                };                            if msg.is_empty() {                                continue;                            }                            log::info!("New message: {}", msg);                            connection_attempts = 0;                            let msg: Result<Response, _> = serde_json::from_str(&msg);                            if let Ok(msg) = msg {                                if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.to_owned())) {                                    log::error!("{}: Reconnecting client because of {}", url, error);                                    break;                                }                            }                        }                        else => {                            log::warn!("{}: else", url);                            break;                        }                    }                }                is_connected.store(false, Relaxed);                // Throttle down to not spam the server with reconnections                sleep(Duration::from_millis(500)).await;            }            let _ = broadcast_to_listeners.try_send((Event::Disconnected, "".to_owned()));            log::warn!("{}: Disconnected", url);        });        Ok(stop_service)    }    /// Checks if the relayer background connection is running. It is not    /// guaranteed there is an active connection, it may be in the process of    /// reconnecting.    pub fn is_running(&self) -> bool {        !self.stop_service.is_closed()    }    /// Checks if the relayer is connected. It is guaranteed that the relayer is    /// connected if this method returns true.    pub fn is_connected(&self) -> bool {        self.is_connected.load(Relaxed)    }    /// Sends a requests to this relayer    pub async fn send(&self, request: Request) -> Result<(), Error> {        self.send_to_socket            .send(request)            .await            .map_err(|e| Error::Sync(Box::new(e)))    }    /// Stops the background thread that has the connection to this relayer    pub async fn disconnect(self) {        let _ = self.stop_service.send(());    }}
 |