use crate::Error; use futures::executor::block_on; use futures_util::{SinkExt, StreamExt}; use nostr_rs_types::{ client::{self, subscribe}, types::SubscriptionId, Request, Response, }; use std::{ collections::HashMap, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, }, }; use tokio::{ sync::{mpsc, RwLock}, task::JoinHandle, time::{sleep, timeout, Duration}, }; use tokio_tungstenite::{connect_async, tungstenite::Message}; use url::Url; type Subscriptions = Arc>>; #[derive(Debug)] pub struct ActiveSubscription { id: SubscriptionId, subscriptions: Subscriptions, send_to_socket: mpsc::Sender, } impl Drop for ActiveSubscription { fn drop(&mut self) { block_on(async move { self.subscriptions.write().await.remove(&self.id); let _ = self .send_to_socket .send(nostr_rs_types::client::Close(self.id.clone()).into()) .await; }); } } /// Relayer object #[derive(Debug)] pub struct Client { /// URL of the relayer pub url: Url, /// Sender to the relayer. This can be used to send a Requests to this /// relayer pub send_to_socket: mpsc::Sender, /// List of active subscriptions for this nostr client subscriptions: Subscriptions, /// Background task / thread that is doing the actual connection worker: JoinHandle<()>, /// Wether the background worker is connected or not is_connected: Arc, } const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120; impl Drop for Client { fn drop(&mut self) { self.worker.abort() } } impl Client { /// Creates a new relayer pub fn new(send_message_to_listener: mpsc::Sender<(Response, Url)>, url: Url) -> Self { let (sender_to_socket, send_to_socket) = mpsc::channel(100_000); let is_connected = Arc::new(AtomicBool::new(false)); let subscriptions = Arc::new(RwLock::new(HashMap::new())); let worker = Self::spawn_background_client( send_message_to_listener, send_to_socket, url.clone(), is_connected.clone(), subscriptions.clone(), ); Self { url, is_connected, send_to_socket: sender_to_socket, subscriptions, worker, } } /// Spawns a background client that connects to the relayer /// and sends messages to the listener /// /// This function will return a JoinHandle that can be used to /// wait for the background client to finish or to cancel it. fn spawn_background_client( send_message_to_listener: mpsc::Sender<(Response, Url)>, mut send_to_socket: mpsc::Receiver, url: Url, is_connected: Arc, send_on_connection: Subscriptions, ) -> JoinHandle<()> { is_connected.store(false, Relaxed); tokio::spawn(async move { let mut connection_attempts = 0; loop { log::warn!("{}: Connect attempt {}", url, connection_attempts); connection_attempts += 1; let mut socket = match connect_async(url.clone()).await { Ok(x) => x.0, Err(err) => { log::warn!("{}: Failed to connect: {}", url, err); sleep(Duration::from_secs(5)).await; continue; } }; log::info!("Connected to {}", url); connection_attempts = 0; let subscriptions = send_on_connection .read() .await .iter() .filter_map(|x| serde_json::to_string(&Request::Request(x.1.clone())).ok()) .map(Message::Text) .collect::>(); for msg in subscriptions { if let Err(x) = socket.send(msg).await { log::error!("{}: Reconnecting due error at sending: {:?}", url, x); } } loop { tokio::select! { Some(msg) = send_to_socket.recv() => { if let Ok(json) = serde_json::to_string(&msg) { log::info!("{}: Sending {}", url, json); if let Err(x) = socket.send(Message::Text(json)).await { log::error!("{} : Reconnecting due {}", url, x); break; } } } msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => { let msg = if let Ok(Some(Ok(msg))) = msg { is_connected.store(true, Relaxed); match msg { Message::Text(text) => text, Message::Ping(msg) => { if let Err(x) = socket.send(Message::Pong(msg)).await { log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x); break; } continue; }, msg => { log::error!("Unexpected {:?}", msg); continue; } } } else { log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg); break; }; if msg.is_empty() { continue; } log::info!("New message: {}", msg); let msg: Result = serde_json::from_str(&msg); if let Ok(msg) = msg { if let Err(error) = send_message_to_listener.try_send((msg.into(), url.clone())) { log::error!("{}: Reconnecting client because of {}", url, error); break; } } } else => { log::warn!("{}: else", url); break; } } } is_connected.store(false, Relaxed); // Throttle down to not spam the server with reconnections sleep(Duration::from_millis(500)).await; } }) } /// Checks if the relayer is connected. It is guaranteed that the relayer is /// connected if this method returns true. pub fn is_connected(&self) -> bool { self.is_connected.load(Relaxed) } /// Creates a new subscription pub async fn subscribe( &self, subscription: subscribe::Subscribe, ) -> Result { let id = subscription.subscription_id.clone(); self.subscriptions .write() .await .insert(id.clone(), subscription.clone()); self.send_to_socket .send(Request::Request(subscription)) .await .map_err(|e| Error::Sync(Box::new(e)))?; Ok(ActiveSubscription { id, subscriptions: self.subscriptions.clone(), send_to_socket: self.send_to_socket.clone(), }) } /// Posts an event to the relayer pub async fn post(&self, event: client::Event) -> Result<(), Error> { self.send_to_socket .send(event.into()) .await .map_err(|e| Error::Sync(Box::new(e))) } }