123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- use crate::Error;
- use futures::Future;
- use futures_util::{SinkExt, StreamExt};
- use nostr_rs_types::{Request, Response};
- use std::pin::Pin;
- use tokio::{
- sync::{broadcast, mpsc, oneshot},
- time::{sleep, timeout, Duration},
- };
- use tokio_tungstenite::{connect_async, tungstenite::Message};
- use url::Url;
- #[derive(Debug)]
- pub struct Client {
- pub url: String,
- pub send_to_socket: mpsc::Sender<Request>,
- recv_from_socket: broadcast::Receiver<(Response, String)>,
- stop_service: oneshot::Sender<()>,
- }
- const NO_ACTIVITY_TIMEOUT_SECS: u64 = 30;
- const MAX_RECONNECT_ATTEMPTS: u64 = 15;
- impl Client {
- pub fn new<F>(url: &str, on_connection: Option<F>) -> Result<Self, Error>
- where
- F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
- + Send
- + Sync
- + 'static,
- {
- let (send_to_socket, receiver) = mpsc::channel(100_000);
- let (recv_from_socket, stop_service) =
- Self::spawn_background_client(send_to_socket.clone(), receiver, url, on_connection)?;
- Ok(Self {
- url: url.to_owned(),
- send_to_socket,
- stop_service,
- recv_from_socket,
- })
- }
- fn spawn_background_client<F>(
- send_to_socket: mpsc::Sender<Request>,
- mut receiver: mpsc::Receiver<Request>,
- url_str: &str,
- on_connection: Option<F>,
- ) -> Result<(broadcast::Receiver<(Response, String)>, oneshot::Sender<()>), Error>
- where
- F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
- + Send
- + Sync
- + 'static,
- {
- let (publish_to_listener, recv_from_socket) = broadcast::channel(10_000);
- let (stop_service, mut stopper_recv) = oneshot::channel();
- let url = url_str.to_owned();
- let url_parsed = Url::parse(&url)?;
- tokio::spawn(async move {
- let mut reconnect = true;
- let mut retries = 0;
- while reconnect && retries <= MAX_RECONNECT_ATTEMPTS {
- log::warn!("{}: Connect attempt {}", url, retries);
- retries += 1;
- let mut socket = if let Ok(x) = connect_async(url_parsed.clone()).await {
- x.0
- } else {
- log::warn!("{}: Failed to connect", url);
- sleep(Duration::from_secs(5)).await;
- continue;
- };
- log::info!("Connected to {}", url);
- if let Some(on_connection) = &on_connection {
- on_connection(send_to_socket.clone()).await;
- }
- loop {
- tokio::select! {
- Ok(()) = &mut stopper_recv => {
- log::warn!("{}: Breaking client due external signal", url);
- reconnect = false;
- break;
- },
- Some(msg) = receiver.recv() => {
- if let Ok(json) = serde_json::to_string(&msg) {
- log::info!("{}: Sending {}", url, json);
- if let Err(x) = socket.send(Message::Text(json)).await {
- log::error!("{} : Reconnecting due {}", url, x);
- break;
- }
- }
- }
- msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
- let msg =if let Ok(Some(Ok(Message::Text(msg)))) = msg {
- msg
- } else {
- log::error!("{} Reconnecting client due of empty recv", url);
- break;
- };
- log::info!("New message: {}", msg);
- retries = 0;
- if msg.is_empty() {
- continue;
- }
- let msg: Result<Response, _> = serde_json::from_str(&msg);
- if let Ok(msg) = msg {
- if let Err(error) = publish_to_listener.send((msg, url.to_owned())) {
- log::error!("{}: Reconnecting client because of {}", url, error);
- break;
- }
- }
- }
- else => {
- log::warn!("{}: else", url);
- break;
- }
- }
- }
- }
- log::warn!("{}: Disconnected", url);
- });
- Ok((recv_from_socket, stop_service))
- }
- pub fn is_running(&self) -> bool {
- !self.stop_service.is_closed()
- }
- pub fn subscribe(&self) -> Option<broadcast::Receiver<(Response, String)>> {
- if self.stop_service.is_closed() {
- None
- } else {
- Some(self.recv_from_socket.resubscribe())
- }
- }
- pub async fn send(&self, request: Request) -> Result<(), Error> {
- Ok(self.send_to_socket.send(request).await?)
- }
- pub async fn stop(self) {
- let _ = self.stop_service.send(());
- }
- }
|