//! Client for the nostr relayer //! //! This is a simple client with reconnection logic built-in but no load balancing //! nor subscription. //! //! Most likely you want to use the `Pool` client instead of this one. use crate::{pool::DEFAULT_CHANNEL_BUFFER_SIZE, Error}; use futures_util::{SinkExt, StreamExt}; use nostr_rs_types::{ client::{self, subscribe}, types::SubscriptionId, Request, Response, }; use std::{ collections::HashMap, pin::Pin, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, }, }; use tokio::{ sync::{mpsc, RwLock}, task::JoinHandle, time::{sleep, timeout, Duration}, }; use tokio_tungstenite::{connect_async, tungstenite::Message}; use url::Url; type Subscriptions = Arc>>; const MAX_ACTIVE_SUBSCRIPTIONS: usize = 10; #[derive(Debug)] /// Active subscription /// /// This must be kept in scope to keep the subscription active pub struct ActiveSubscription { id: SubscriptionId, subscriptions: Subscriptions, send_to_socket: mpsc::Sender, } impl Drop for ActiveSubscription { fn drop(&mut self) { let subscriptions = self.subscriptions.clone(); let id = self.id.clone(); let send_to_socket = self.send_to_socket.clone(); tokio::spawn(async move { let _ = send_to_socket .send(nostr_rs_types::client::Close(id.clone()).into()) .await; subscriptions.write().await.remove(&id); }); } } /// Relayer object #[derive(Debug)] pub struct Client { /// URL of the relayer pub url: Url, /// Sender to the relayer. This can be used to send a Requests to this /// relayer pub send_to_socket: mpsc::Sender, /// List of active subscriptions for this nostr client subscriptions: Subscriptions, /// Background task / thread that is doing the actual connection worker: JoinHandle<()>, /// Wether the background worker is connected or not is_connected: Arc, } const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120; impl Drop for Client { fn drop(&mut self) { self.worker.abort() } } impl Client { /// Creates a new relayer pub fn new(return_to: mpsc::Sender<(Response, Url)>, url: Url, filter: F) -> Self where F: Fn( Response, Url, mpsc::Sender<(Response, Url)>, ) -> Pin> + Send>> + Send + Sync + 'static, { let (sender_to_socket, send_to_socket) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE); let is_connected = Arc::new(AtomicBool::new(false)); let subscriptions = Arc::new(RwLock::new(HashMap::new())); let worker = Self::spawn_background_client( return_to, send_to_socket, url.clone(), is_connected.clone(), subscriptions.clone(), filter, ); Self { url, is_connected, send_to_socket: sender_to_socket, subscriptions, worker, } } /// Spawns a background client that connects to the relayer /// and sends messages to the listener /// /// This function will return a JoinHandle that can be used to /// wait for the background client to finish or to cancel it. fn spawn_background_client( return_to: mpsc::Sender<(Response, Url)>, mut send_to_socket: mpsc::Receiver, url: Url, is_connected: Arc, to_resubscribe: Subscriptions, filter: F, ) -> JoinHandle<()> where F: Fn( Response, Url, mpsc::Sender<(Response, Url)>, ) -> Pin> + Send>> + Send + Sync + 'static, { is_connected.store(false, Relaxed); tokio::spawn(async move { let mut connection_attempts = 0; loop { log::info!("{}: Connect attempt {}", url, connection_attempts); connection_attempts += 1; let mut socket = match connect_async(url.clone()).await { Ok(x) => x.0, Err(err) => { log::warn!("{}: Failed to connect: {}", url, err); sleep(Duration::from_secs(1)).await; continue; } }; log::info!("Connected to {}", url); connection_attempts = 0; // Convert all sent subscriptions to a local vector let mut subscriptions = to_resubscribe .read() .await .values() .map(|msg| Request::Request(msg.clone())) .collect::>(); // Only keep the ones to be subscribed, moved the rest of the subscriptions to the queue let mut to_subscribe_queue = if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS { subscriptions.split_off(MAX_ACTIVE_SUBSCRIPTIONS) } else { vec![] }; let mut subscriptions = subscriptions .into_iter() .map(|msg| { ( msg.as_request() .map(|x| x.subscription_id.clone()) .unwrap_or_default(), serde_json::to_string(&msg).ok().map(Message::Text), ) }) .collect::>(); for msg in subscriptions.values_mut() { if let Some(msg) = msg.take() { if let Err(x) = socket.send(msg).await { log::error!("{}: Reconnecting due error at sending: {:?}", url, x); break; } } } is_connected.store(true, Relaxed); loop { tokio::select! { Some(msg) = send_to_socket.recv() => { if let Request::Request(sub) = &msg { if subscriptions.contains_key(&sub.subscription_id) { log::warn!("{}: Already subscribed to {}", url, sub.subscription_id); continue; } if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS { log::warn!("{}: Queueing subscription to {} for later", url, sub.subscription_id); to_subscribe_queue.push(msg.clone()); continue; } subscriptions.insert(sub.subscription_id.clone(), None); } let json = if let Ok(json) = serde_json::to_string(&msg) { json } else { continue; }; if let Err(x) = socket.send(Message::Text(json)).await { log::error!("{} : Reconnecting due {}", url, x); break; } if let Request::Close(close) = &msg { subscriptions.remove(&close.0); let json = if let Some(json) = to_subscribe_queue .pop() .and_then(|msg| { subscriptions.insert(msg.as_request().map(|sub| sub.subscription_id.clone()).unwrap_or_default(), None); serde_json::to_string(&msg).ok() }) { json } else { continue; }; log::info!("Sending: {} (queued subscription)", json); if let Err(x) = socket.send(Message::Text(json)).await { log::error!("{} : Reconnecting due {}", url, x); break; } } } msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => { let msg = if let Ok(Some(Ok(msg))) = msg { match msg { Message::Text(text) => text, Message::Ping(msg) => { if let Err(x) = socket.send(Message::Pong(msg)).await { log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x); break; } continue; }, msg => { log::error!("Unexpected {:?}", msg); continue; } } } else { log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg); break; }; if msg.is_empty() { continue; } let event: Result = serde_json::from_str(&msg); if let Ok(Response::Notice(err)) = &event { log::error!("{}: Active connections {}: {:?}", url, subscriptions.len(), err); } if let Ok(msg) = event { if let Err(error) = filter(msg, url.clone(), return_to.clone()).await { log::error!("{}: Reconnecting client because of {}", url, error); break; } } else { log::error!("Failed to parse message: {:?} {}", event, msg); } } else => { log::warn!("{}: else", url); break; } } } is_connected.store(false, Relaxed); // Throttle down to not spam the server with reconnections sleep(Duration::from_millis(500)).await; } }) } /// Checks if the relayer is connected. It is guaranteed that the relayer is /// connected if this method returns true. pub fn is_connected(&self) -> bool { self.is_connected.load(Relaxed) } /// Creates a new subscription pub async fn subscribe( &self, subscription: subscribe::Subscribe, ) -> Result { let id = subscription.subscription_id.clone(); self.subscriptions .write() .await .insert(id.clone(), subscription.clone()); self.send_to_socket .send(Request::Request(subscription)) .await .map_err(|e| Error::Sync(Box::new(e)))?; Ok(ActiveSubscription { id, subscriptions: self.subscriptions.clone(), send_to_socket: self.send_to_socket.clone(), }) } /// Posts an event to the relayer pub async fn post(&self, event: client::Event) -> Result<(), Error> { self.send_to_socket .send(event.into()) .await .map_err(|e| Error::Sync(Box::new(e))) } }