|
@@ -0,0 +1,235 @@
|
|
|
+use crate::Error;
|
|
|
+use futures::executor::block_on;
|
|
|
+use futures_util::{SinkExt, StreamExt};
|
|
|
+use nostr_rs_types::{
|
|
|
+ client::{self, subscribe},
|
|
|
+ types::SubscriptionId,
|
|
|
+ Request, Response,
|
|
|
+};
|
|
|
+use std::{
|
|
|
+ collections::HashMap,
|
|
|
+ sync::{
|
|
|
+ atomic::{AtomicBool, Ordering::Relaxed},
|
|
|
+ Arc,
|
|
|
+ },
|
|
|
+};
|
|
|
+use tokio::{
|
|
|
+ sync::{mpsc, RwLock},
|
|
|
+ task::JoinHandle,
|
|
|
+ time::{sleep, timeout, Duration},
|
|
|
+};
|
|
|
+use tokio_tungstenite::{connect_async, tungstenite::Message};
|
|
|
+use url::Url;
|
|
|
+
|
|
|
+type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
|
|
|
+
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct ActiveSubscription {
|
|
|
+ id: SubscriptionId,
|
|
|
+ subscriptions: Subscriptions,
|
|
|
+ send_to_socket: mpsc::Sender<Request>,
|
|
|
+}
|
|
|
+
|
|
|
+impl Drop for ActiveSubscription {
|
|
|
+ fn drop(&mut self) {
|
|
|
+ block_on(async move {
|
|
|
+ self.subscriptions.write().await.remove(&self.id);
|
|
|
+ let _ = self
|
|
|
+ .send_to_socket
|
|
|
+ .send(nostr_rs_types::client::Close(self.id.clone()).into())
|
|
|
+ .await;
|
|
|
+ });
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/// Relayer object
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct Client {
|
|
|
+ /// URL of the relayer
|
|
|
+ pub url: Url,
|
|
|
+ /// Sender to the relayer. This can be used to send a Requests to this
|
|
|
+ /// relayer
|
|
|
+ pub send_to_socket: mpsc::Sender<Request>,
|
|
|
+
|
|
|
+ subscriptions: Subscriptions,
|
|
|
+
|
|
|
+ worker: JoinHandle<()>,
|
|
|
+
|
|
|
+ is_connected: Arc<AtomicBool>,
|
|
|
+}
|
|
|
+
|
|
|
+const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
|
|
|
+
|
|
|
+impl Drop for Client {
|
|
|
+ fn drop(&mut self) {
|
|
|
+ self.worker.abort()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Client {
|
|
|
+ /// Creates a new relayer
|
|
|
+ pub fn new(broadcast_to_listeners: mpsc::Sender<(Response, Url)>, url: Url) -> Self {
|
|
|
+ let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
|
|
|
+ let is_connected = Arc::new(AtomicBool::new(false));
|
|
|
+
|
|
|
+ let subscriptions = Arc::new(RwLock::new(HashMap::new()));
|
|
|
+
|
|
|
+ let worker = Self::spawn_background_client(
|
|
|
+ broadcast_to_listeners,
|
|
|
+ send_to_socket,
|
|
|
+ url.clone(),
|
|
|
+ is_connected.clone(),
|
|
|
+ subscriptions.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ Self {
|
|
|
+ url,
|
|
|
+ is_connected,
|
|
|
+ send_to_socket: sender_to_socket,
|
|
|
+ subscriptions,
|
|
|
+ worker,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn spawn_background_client(
|
|
|
+ broadcast_to_listeners: mpsc::Sender<(Response, Url)>,
|
|
|
+ mut send_to_socket: mpsc::Receiver<Request>,
|
|
|
+ url: Url,
|
|
|
+ is_connected: Arc<AtomicBool>,
|
|
|
+ send_on_connection: Subscriptions,
|
|
|
+ ) -> JoinHandle<()> {
|
|
|
+ is_connected.store(false, Relaxed);
|
|
|
+
|
|
|
+ tokio::spawn(async move {
|
|
|
+ let mut connection_attempts = 0;
|
|
|
+
|
|
|
+ loop {
|
|
|
+ log::warn!("{}: Connect attempt {}", url, connection_attempts);
|
|
|
+ connection_attempts += 1;
|
|
|
+ let mut socket = match connect_async(url.clone()).await {
|
|
|
+ Ok(x) => x.0,
|
|
|
+ Err(err) => {
|
|
|
+ log::warn!("{}: Failed to connect: {}", url, err);
|
|
|
+ sleep(Duration::from_secs(5)).await;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ log::info!("Connected to {}", url);
|
|
|
+ connection_attempts = 0;
|
|
|
+
|
|
|
+ let subscriptions = send_on_connection
|
|
|
+ .read()
|
|
|
+ .await
|
|
|
+ .iter()
|
|
|
+ .filter_map(|x| serde_json::to_string(&Request::Request(x.1.clone())).ok())
|
|
|
+ .map(Message::Text)
|
|
|
+ .collect::<Vec<_>>();
|
|
|
+
|
|
|
+ for msg in subscriptions {
|
|
|
+ if let Err(x) = socket.send(msg).await {
|
|
|
+ log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ loop {
|
|
|
+ tokio::select! {
|
|
|
+ Some(msg) = send_to_socket.recv() => {
|
|
|
+ if let Ok(json) = serde_json::to_string(&msg) {
|
|
|
+ log::info!("{}: Sending {}", url, json);
|
|
|
+ if let Err(x) = socket.send(Message::Text(json)).await {
|
|
|
+ log::error!("{} : Reconnecting due {}", url, x);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
|
|
|
+ let msg = if let Ok(Some(Ok(msg))) = msg {
|
|
|
+ is_connected.store(true, Relaxed);
|
|
|
+ match msg {
|
|
|
+ Message::Text(text) => text,
|
|
|
+ Message::Ping(msg) => {
|
|
|
+ if let Err(x) = socket.send(Message::Pong(msg)).await {
|
|
|
+ log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ continue;
|
|
|
+ },
|
|
|
+ msg => {
|
|
|
+ log::error!("Unexpected {:?}", msg);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
|
|
|
+ break;
|
|
|
+ };
|
|
|
+
|
|
|
+ if msg.is_empty() {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ log::info!("New message: {}", msg);
|
|
|
+
|
|
|
+
|
|
|
+ let msg: Result<Response, _> = serde_json::from_str(&msg);
|
|
|
+
|
|
|
+ if let Ok(msg) = msg {
|
|
|
+ if let Err(error) = broadcast_to_listeners.try_send((msg.into(), url.clone())) {
|
|
|
+ log::error!("{}: Reconnecting client because of {}", url, error);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else => {
|
|
|
+ log::warn!("{}: else", url);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ is_connected.store(false, Relaxed);
|
|
|
+ // Throttle down to not spam the server with reconnections
|
|
|
+ sleep(Duration::from_millis(500)).await;
|
|
|
+ }
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Checks if the relayer is connected. It is guaranteed that the relayer is
|
|
|
+ /// connected if this method returns true.
|
|
|
+ pub fn is_connected(&self) -> bool {
|
|
|
+ self.is_connected.load(Relaxed)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Creates a new subscription
|
|
|
+ pub async fn subscribe(
|
|
|
+ &self,
|
|
|
+ subscription: subscribe::Subscribe,
|
|
|
+ ) -> Result<ActiveSubscription, Error> {
|
|
|
+ let id = subscription.subscription_id.clone();
|
|
|
+
|
|
|
+ self.subscriptions
|
|
|
+ .write()
|
|
|
+ .await
|
|
|
+ .insert(id.clone(), subscription.clone());
|
|
|
+
|
|
|
+ self.send_to_socket
|
|
|
+ .send(Request::Request(subscription))
|
|
|
+ .await
|
|
|
+ .map_err(|e| Error::Sync(Box::new(e)))?;
|
|
|
+
|
|
|
+ Ok(ActiveSubscription {
|
|
|
+ id,
|
|
|
+ subscriptions: self.subscriptions.clone(),
|
|
|
+ send_to_socket: self.send_to_socket.clone(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Posts an event to the relayer
|
|
|
+ pub async fn post(&self, event: client::Event) -> Result<(), Error> {
|
|
|
+ self.send_to_socket
|
|
|
+ .send(event.into())
|
|
|
+ .await
|
|
|
+ .map_err(|e| Error::Sync(Box::new(e)))
|
|
|
+ }
|
|
|
+}
|