123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- //! Client for the nostr relayer
- //!
- //! This is a simple client with reconnection logic built-in but no load balancing
- //! nor subscription.
- //!
- //! Most likely you want to use the `Pool` client instead of this one.
- use crate::{pool::DEFAULT_CHANNEL_BUFFER_SIZE, Error};
- use futures_util::{SinkExt, StreamExt};
- use nostr_rs_types::{
- client::{self, subscribe},
- types::SubscriptionId,
- Request, Response,
- };
- use std::{
- collections::HashMap,
- pin::Pin,
- sync::{
- atomic::{AtomicBool, Ordering::Relaxed},
- Arc,
- },
- };
- use tokio::{
- sync::{mpsc, RwLock},
- task::JoinHandle,
- time::{sleep, timeout, Duration},
- };
- use tokio_tungstenite::{connect_async, tungstenite::Message};
- use url::Url;
- type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
- const MAX_ACTIVE_SUBSCRIPTIONS: usize = 10;
- #[derive(Debug)]
- /// Active subscription
- ///
- /// This must be kept in scope to keep the subscription active
- pub struct ActiveSubscription {
- id: SubscriptionId,
- subscriptions: Subscriptions,
- send_to_socket: mpsc::Sender<Request>,
- }
- impl Drop for ActiveSubscription {
- fn drop(&mut self) {
- let subscriptions = self.subscriptions.clone();
- let id = self.id.clone();
- let send_to_socket = self.send_to_socket.clone();
- tokio::spawn(async move {
- let _ = send_to_socket
- .send(nostr_rs_types::client::Close(id.clone()).into())
- .await;
- subscriptions.write().await.remove(&id);
- });
- }
- }
- /// Relayer object
- #[derive(Debug)]
- pub struct Client {
- /// URL of the relayer
- pub url: Url,
- /// Sender to the relayer. This can be used to send a Requests to this
- /// relayer
- pub send_to_socket: mpsc::Sender<Request>,
- /// List of active subscriptions for this nostr client
- subscriptions: Subscriptions,
- /// Background task / thread that is doing the actual connection
- worker: JoinHandle<()>,
- /// Wether the background worker is connected or not
- is_connected: Arc<AtomicBool>,
- }
- const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
- impl Drop for Client {
- fn drop(&mut self) {
- self.worker.abort()
- }
- }
- impl Client {
- /// Creates a new relayer
- pub fn new<F>(return_to: mpsc::Sender<(Response, Url)>, url: Url, filter: F) -> Self
- where
- F: Fn(
- Response,
- Url,
- mpsc::Sender<(Response, Url)>,
- ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
- + Send
- + Sync
- + 'static,
- {
- let (sender_to_socket, send_to_socket) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
- let is_connected = Arc::new(AtomicBool::new(false));
- let subscriptions = Arc::new(RwLock::new(HashMap::new()));
- let worker = Self::spawn_background_client(
- return_to,
- send_to_socket,
- url.clone(),
- is_connected.clone(),
- subscriptions.clone(),
- filter,
- );
- Self {
- url,
- is_connected,
- send_to_socket: sender_to_socket,
- subscriptions,
- worker,
- }
- }
- /// Spawns a background client that connects to the relayer
- /// and sends messages to the listener
- ///
- /// This function will return a JoinHandle that can be used to
- /// wait for the background client to finish or to cancel it.
- fn spawn_background_client<F>(
- return_to: mpsc::Sender<(Response, Url)>,
- mut send_to_socket: mpsc::Receiver<Request>,
- url: Url,
- is_connected: Arc<AtomicBool>,
- to_resubscribe: Subscriptions,
- filter: F,
- ) -> JoinHandle<()>
- where
- F: Fn(
- Response,
- Url,
- mpsc::Sender<(Response, Url)>,
- ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
- + Send
- + Sync
- + 'static,
- {
- is_connected.store(false, Relaxed);
- tokio::spawn(async move {
- let mut connection_attempts = 0;
- loop {
- log::info!("{}: Connect attempt {}", url, connection_attempts);
- connection_attempts += 1;
- let mut socket = match connect_async(url.clone()).await {
- Ok(x) => x.0,
- Err(err) => {
- log::warn!("{}: Failed to connect: {}", url, err);
- sleep(Duration::from_secs(1)).await;
- continue;
- }
- };
- log::info!("Connected to {}", url);
- connection_attempts = 0;
- // Convert all sent subscriptions to a local vector
- let mut subscriptions = to_resubscribe
- .read()
- .await
- .values()
- .map(|msg| Request::Request(msg.clone()))
- .collect::<Vec<_>>();
- // Only keep the ones to be subscribed, moved the rest of the subscriptions to the queue
- let mut to_subscribe_queue = if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
- subscriptions.split_off(MAX_ACTIVE_SUBSCRIPTIONS)
- } else {
- vec![]
- };
- let mut subscriptions = subscriptions
- .into_iter()
- .map(|msg| {
- (
- msg.as_request()
- .map(|x| x.subscription_id.clone())
- .unwrap_or_default(),
- serde_json::to_string(&msg).ok().map(Message::Text),
- )
- })
- .collect::<HashMap<_, _>>();
- for msg in subscriptions.values_mut() {
- if let Some(msg) = msg.take() {
- if let Err(x) = socket.send(msg).await {
- log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
- break;
- }
- }
- }
- is_connected.store(true, Relaxed);
- loop {
- tokio::select! {
- Some(msg) = send_to_socket.recv() => {
- if let Request::Request(sub) = &msg {
- if subscriptions.contains_key(&sub.subscription_id) {
- log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
- continue;
- }
- if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
- log::warn!("{}: Queueing subscription to {} for later", url, sub.subscription_id);
- to_subscribe_queue.push(msg.clone());
- continue;
- }
- subscriptions.insert(sub.subscription_id.clone(), None);
- }
- let json = if let Ok(json) = serde_json::to_string(&msg) {
- json
- } else {
- continue;
- };
- if let Err(x) = socket.send(Message::Text(json)).await {
- log::error!("{} : Reconnecting due {}", url, x);
- break;
- }
- if let Request::Close(close) = &msg {
- subscriptions.remove(&close.0);
- let json = if let Some(json) = to_subscribe_queue
- .pop()
- .and_then(|msg| {
- subscriptions.insert(msg.as_request().map(|sub| sub.subscription_id.clone()).unwrap_or_default(), None);
- serde_json::to_string(&msg).ok()
- })
- {
- json
- } else {
- continue;
- };
- log::info!("Sending: {} (queued subscription)", json);
- if let Err(x) = socket.send(Message::Text(json)).await {
- log::error!("{} : Reconnecting due {}", url, x);
- break;
- }
- }
- }
- msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
- let msg = if let Ok(Some(Ok(msg))) = msg {
- match msg {
- Message::Text(text) => text,
- Message::Ping(msg) => {
- if let Err(x) = socket.send(Message::Pong(msg)).await {
- log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
- break;
- }
- continue;
- },
- msg => {
- log::error!("Unexpected {:?}", msg);
- continue;
- }
- }
- } else {
- log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
- break;
- };
- if msg.is_empty() {
- continue;
- }
- let event: Result<Response, _> = serde_json::from_str(&msg);
- if let Ok(Response::Notice(err)) = &event {
- log::error!("{}: Active connections {}: {:?}", url, subscriptions.len(), err);
- }
- if let Ok(msg) = event {
- if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
- log::error!("{}: Reconnecting client because of {}", url, error);
- break;
- }
- } else {
- log::error!("Failed to parse message: {:?} {}", event, msg);
- }
- }
- else => {
- log::warn!("{}: else", url);
- break;
- }
- }
- }
- is_connected.store(false, Relaxed);
- // Throttle down to not spam the server with reconnections
- sleep(Duration::from_millis(500)).await;
- }
- })
- }
- /// Checks if the relayer is connected. It is guaranteed that the relayer is
- /// connected if this method returns true.
- pub fn is_connected(&self) -> bool {
- self.is_connected.load(Relaxed)
- }
- /// Creates a new subscription
- pub async fn subscribe(
- &self,
- subscription: subscribe::Subscribe,
- ) -> Result<ActiveSubscription, Error> {
- let id = subscription.subscription_id.clone();
- self.subscriptions
- .write()
- .await
- .insert(id.clone(), subscription.clone());
- self.send_to_socket
- .send(Request::Request(subscription))
- .await
- .map_err(|e| Error::Sync(Box::new(e)))?;
- Ok(ActiveSubscription {
- id,
- subscriptions: self.subscriptions.clone(),
- send_to_socket: self.send_to_socket.clone(),
- })
- }
- /// Posts an event to the relayer
- pub async fn post(&self, event: client::Event) -> Result<(), Error> {
- self.send_to_socket
- .send(event.into())
- .await
- .map_err(|e| Error::Sync(Box::new(e)))
- }
- }
|