123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- use crate::{pool::Event, Error};
- use futures::Future;
- use futures_util::{SinkExt, StreamExt};
- use nostr_rs_types::{Request, Response};
- use std::{
- pin::Pin,
- sync::{
- atomic::{AtomicBool, Ordering::Relaxed},
- Arc,
- },
- };
- use tokio::{
- sync::mpsc,
- task::JoinHandle,
- time::{sleep, timeout, Duration},
- };
- use tokio_tungstenite::{connect_async, tungstenite::Message};
- use url::Url;
- /// Relayer object
- #[derive(Debug)]
- pub struct Relayer {
- /// URL of the relayer
- pub url: String,
- /// Sender to the relayer. This can be used to send a Requests to this
- /// relayer
- pub send_to_socket: mpsc::Sender<Request>,
- worker: JoinHandle<()>,
- is_connected: Arc<AtomicBool>,
- }
- const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
- impl Drop for Relayer {
- fn drop(&mut self) {
- self.worker.abort()
- }
- }
- impl Relayer {
- /// Creates a new relayer
- pub fn new<F>(
- broadcast_to_listeners: mpsc::Sender<(Event, String)>,
- url: &str,
- on_connection: Option<F>,
- ) -> Result<Self, Error>
- where
- F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
- + Send
- + Sync
- + 'static,
- {
- let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
- let is_connected = Arc::new(AtomicBool::new(false));
- let worker = Self::spawn_background_client(
- broadcast_to_listeners,
- sender_to_socket.clone(),
- send_to_socket,
- url,
- is_connected.clone(),
- on_connection,
- )?;
- Ok(Self {
- url: url.to_owned(),
- is_connected,
- send_to_socket: sender_to_socket,
- worker,
- })
- }
- fn spawn_background_client<F>(
- broadcast_to_listeners: mpsc::Sender<(Event, String)>,
- sender_to_socket: mpsc::Sender<Request>,
- mut send_to_socket: mpsc::Receiver<Request>,
- url_str: &str,
- is_connected: Arc<AtomicBool>,
- on_connection: Option<F>,
- ) -> Result<JoinHandle<()>, Error>
- where
- F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
- + Send
- + Sync
- + 'static,
- {
- let url = url_str.to_owned();
- let url_parsed = Url::parse(&url)?;
- is_connected.store(false, Relaxed);
- Ok(tokio::spawn(async move {
- let mut connection_attempts = 0;
- loop {
- log::warn!("{}: Connect attempt {}", url, connection_attempts);
- connection_attempts += 1;
- let mut socket = match connect_async(url_parsed.clone()).await {
- Ok(x) => x.0,
- Err(err) => {
- log::warn!("{}: Failed to connect: {}", url, err);
- sleep(Duration::from_secs(5)).await;
- continue;
- }
- };
- log::info!("Connected to {}", url);
- connection_attempts = 0;
- if let Some(on_connection) = &on_connection {
- on_connection(&url, sender_to_socket.clone()).await;
- }
- loop {
- tokio::select! {
- Some(msg) = send_to_socket.recv() => {
- if let Ok(json) = serde_json::to_string(&msg) {
- log::info!("{}: Sending {}", url, json);
- if let Err(x) = socket.send(Message::Text(json)).await {
- log::error!("{} : Reconnecting due {}", url, x);
- break;
- }
- }
- }
- msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
- let msg = if let Ok(Some(Ok(msg))) = msg {
- is_connected.store(true, Relaxed);
- match msg {
- Message::Text(text) => text,
- Message::Ping(msg) => {
- if let Err(x) = socket.send(Message::Pong(msg)).await {
- log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
- break;
- }
- continue;
- },
- msg => {
- log::error!("Unexpected {:?}", msg);
- continue;
- }
- }
- } else {
- log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
- break;
- };
- if msg.is_empty() {
- continue;
- }
- log::info!("New message: {}", msg);
- let msg: Result<Response, _> = serde_json::from_str(&msg);
- if let Ok(msg) = msg {
- if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.to_owned())) {
- log::error!("{}: Reconnecting client because of {}", url, error);
- break;
- }
- }
- }
- else => {
- log::warn!("{}: else", url);
- break;
- }
- }
- }
- is_connected.store(false, Relaxed);
- // Throttle down to not spam the server with reconnections
- sleep(Duration::from_millis(500)).await;
- }
- }))
- }
- /// Checks if the relayer is connected. It is guaranteed that the relayer is
- /// connected if this method returns true.
- pub fn is_connected(&self) -> bool {
- self.is_connected.load(Relaxed)
- }
- /// Sends a requests to this relayer
- pub async fn send(&self, request: Request) -> Result<(), Error> {
- self.send_to_socket
- .send(request)
- .await
- .map_err(|e| Error::Sync(Box::new(e)))
- }
- }
|