client.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. //! Client for the nostr relayer
  2. //!
  3. //! This is a simple client with reconnection logic built-in but no load balancing
  4. //! nor subscription.
  5. //!
  6. //! Most likely you want to use the `Pool` client instead of this one.
  7. use crate::{pool::DEFAULT_CHANNEL_BUFFER_SIZE, Error};
  8. use futures_util::{SinkExt, StreamExt};
  9. use nostr_rs_types::{
  10. client::{self, subscribe},
  11. types::SubscriptionId,
  12. Request, Response,
  13. };
  14. use std::{
  15. collections::HashMap,
  16. pin::Pin,
  17. sync::{
  18. atomic::{AtomicBool, Ordering::Relaxed},
  19. Arc,
  20. },
  21. };
  22. use tokio::{
  23. sync::{mpsc, RwLock},
  24. task::JoinHandle,
  25. time::{sleep, timeout, Duration},
  26. };
  27. use tokio_tungstenite::{connect_async, tungstenite::Message};
  28. use url::Url;
  29. type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
  30. const MAX_ACTIVE_SUBSCRIPTIONS: usize = 10;
  31. #[derive(Debug)]
  32. /// Active subscription
  33. ///
  34. /// This must be kept in scope to keep the subscription active
  35. pub struct ActiveSubscription {
  36. id: SubscriptionId,
  37. subscriptions: Subscriptions,
  38. send_to_socket: mpsc::Sender<Request>,
  39. }
  40. impl Drop for ActiveSubscription {
  41. fn drop(&mut self) {
  42. let subscriptions = self.subscriptions.clone();
  43. let id = self.id.clone();
  44. let send_to_socket = self.send_to_socket.clone();
  45. tokio::spawn(async move {
  46. let _ = send_to_socket
  47. .send(nostr_rs_types::client::Close(id.clone()).into())
  48. .await;
  49. subscriptions.write().await.remove(&id);
  50. });
  51. }
  52. }
  53. /// Relayer object
  54. #[derive(Debug)]
  55. pub struct Client {
  56. /// URL of the relayer
  57. pub url: Url,
  58. /// Sender to the relayer. This can be used to send a Requests to this
  59. /// relayer
  60. pub send_to_socket: mpsc::Sender<Request>,
  61. /// List of active subscriptions for this nostr client
  62. subscriptions: Subscriptions,
  63. /// Background task / thread that is doing the actual connection
  64. worker: JoinHandle<()>,
  65. /// Wether the background worker is connected or not
  66. is_connected: Arc<AtomicBool>,
  67. }
  68. const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
  69. impl Drop for Client {
  70. fn drop(&mut self) {
  71. self.worker.abort()
  72. }
  73. }
  74. impl Client {
  75. /// Creates a new relayer
  76. pub fn new<F>(return_to: mpsc::Sender<(Response, Url)>, url: Url, filter: F) -> Self
  77. where
  78. F: Fn(
  79. Response,
  80. Url,
  81. mpsc::Sender<(Response, Url)>,
  82. ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
  83. + Send
  84. + Sync
  85. + 'static,
  86. {
  87. let (sender_to_socket, send_to_socket) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
  88. let is_connected = Arc::new(AtomicBool::new(false));
  89. let subscriptions = Arc::new(RwLock::new(HashMap::new()));
  90. let worker = Self::spawn_background_client(
  91. return_to,
  92. send_to_socket,
  93. url.clone(),
  94. is_connected.clone(),
  95. subscriptions.clone(),
  96. filter,
  97. );
  98. Self {
  99. url,
  100. is_connected,
  101. send_to_socket: sender_to_socket,
  102. subscriptions,
  103. worker,
  104. }
  105. }
  106. /// Spawns a background client that connects to the relayer
  107. /// and sends messages to the listener
  108. ///
  109. /// This function will return a JoinHandle that can be used to
  110. /// wait for the background client to finish or to cancel it.
  111. fn spawn_background_client<F>(
  112. return_to: mpsc::Sender<(Response, Url)>,
  113. mut send_to_socket: mpsc::Receiver<Request>,
  114. url: Url,
  115. is_connected: Arc<AtomicBool>,
  116. to_resubscribe: Subscriptions,
  117. filter: F,
  118. ) -> JoinHandle<()>
  119. where
  120. F: Fn(
  121. Response,
  122. Url,
  123. mpsc::Sender<(Response, Url)>,
  124. ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
  125. + Send
  126. + Sync
  127. + 'static,
  128. {
  129. is_connected.store(false, Relaxed);
  130. tokio::spawn(async move {
  131. let mut connection_attempts = 0;
  132. loop {
  133. log::info!("{}: Connect attempt {}", url, connection_attempts);
  134. connection_attempts += 1;
  135. let mut socket = match connect_async(url.clone()).await {
  136. Ok(x) => x.0,
  137. Err(err) => {
  138. log::warn!("{}: Failed to connect: {}", url, err);
  139. sleep(Duration::from_secs(1)).await;
  140. continue;
  141. }
  142. };
  143. log::info!("Connected to {}", url);
  144. connection_attempts = 0;
  145. // Convert all sent subscriptions to a local vector
  146. let mut subscriptions = to_resubscribe
  147. .read()
  148. .await
  149. .values()
  150. .map(|msg| Request::Request(msg.clone()))
  151. .collect::<Vec<_>>();
  152. // Only keep the ones to be subscribed, moved the rest of the subscriptions to the queue
  153. let mut to_subscribe_queue = if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
  154. subscriptions.split_off(MAX_ACTIVE_SUBSCRIPTIONS)
  155. } else {
  156. vec![]
  157. };
  158. let mut subscriptions = subscriptions
  159. .into_iter()
  160. .map(|msg| {
  161. (
  162. msg.as_request()
  163. .map(|x| x.subscription_id.clone())
  164. .unwrap_or_default(),
  165. serde_json::to_string(&msg).ok().map(Message::Text),
  166. )
  167. })
  168. .collect::<HashMap<_, _>>();
  169. for msg in subscriptions.values_mut() {
  170. if let Some(msg) = msg.take() {
  171. if let Err(x) = socket.send(msg).await {
  172. log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
  173. break;
  174. }
  175. }
  176. }
  177. is_connected.store(true, Relaxed);
  178. loop {
  179. tokio::select! {
  180. Some(msg) = send_to_socket.recv() => {
  181. if let Request::Request(sub) = &msg {
  182. if subscriptions.contains_key(&sub.subscription_id) {
  183. log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
  184. continue;
  185. }
  186. if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
  187. log::warn!("{}: Queueing subscription to {} for later", url, sub.subscription_id);
  188. to_subscribe_queue.push(msg.clone());
  189. continue;
  190. }
  191. subscriptions.insert(sub.subscription_id.clone(), None);
  192. }
  193. let json = if let Ok(json) = serde_json::to_string(&msg) {
  194. json
  195. } else {
  196. continue;
  197. };
  198. if let Err(x) = socket.send(Message::Text(json)).await {
  199. log::error!("{} : Reconnecting due {}", url, x);
  200. break;
  201. }
  202. if let Request::Close(close) = &msg {
  203. subscriptions.remove(&close.0);
  204. let json = if let Some(json) = to_subscribe_queue
  205. .pop()
  206. .and_then(|msg| {
  207. subscriptions.insert(msg.as_request().map(|sub| sub.subscription_id.clone()).unwrap_or_default(), None);
  208. serde_json::to_string(&msg).ok()
  209. })
  210. {
  211. json
  212. } else {
  213. continue;
  214. };
  215. log::info!("Sending: {} (queued subscription)", json);
  216. if let Err(x) = socket.send(Message::Text(json)).await {
  217. log::error!("{} : Reconnecting due {}", url, x);
  218. break;
  219. }
  220. }
  221. }
  222. msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
  223. let msg = if let Ok(Some(Ok(msg))) = msg {
  224. match msg {
  225. Message::Text(text) => text,
  226. Message::Ping(msg) => {
  227. if let Err(x) = socket.send(Message::Pong(msg)).await {
  228. log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
  229. break;
  230. }
  231. continue;
  232. },
  233. msg => {
  234. log::error!("Unexpected {:?}", msg);
  235. continue;
  236. }
  237. }
  238. } else {
  239. log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
  240. break;
  241. };
  242. if msg.is_empty() {
  243. continue;
  244. }
  245. let event: Result<Response, _> = serde_json::from_str(&msg);
  246. if let Ok(Response::Notice(err)) = &event {
  247. log::error!("{}: Active connections {}: {:?}", url, subscriptions.len(), err);
  248. }
  249. if let Ok(msg) = event {
  250. if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
  251. log::error!("{}: Reconnecting client because of {}", url, error);
  252. break;
  253. }
  254. } else {
  255. log::error!("Failed to parse message: {:?} {}", event, msg);
  256. }
  257. }
  258. else => {
  259. log::warn!("{}: else", url);
  260. break;
  261. }
  262. }
  263. }
  264. is_connected.store(false, Relaxed);
  265. // Throttle down to not spam the server with reconnections
  266. sleep(Duration::from_millis(500)).await;
  267. }
  268. })
  269. }
  270. /// Checks if the relayer is connected. It is guaranteed that the relayer is
  271. /// connected if this method returns true.
  272. pub fn is_connected(&self) -> bool {
  273. self.is_connected.load(Relaxed)
  274. }
  275. /// Creates a new subscription
  276. pub async fn subscribe(
  277. &self,
  278. subscription: subscribe::Subscribe,
  279. ) -> Result<ActiveSubscription, Error> {
  280. let id = subscription.subscription_id.clone();
  281. self.subscriptions
  282. .write()
  283. .await
  284. .insert(id.clone(), subscription.clone());
  285. self.send_to_socket
  286. .send(Request::Request(subscription))
  287. .await
  288. .map_err(|e| Error::Sync(Box::new(e)))?;
  289. Ok(ActiveSubscription {
  290. id,
  291. subscriptions: self.subscriptions.clone(),
  292. send_to_socket: self.send_to_socket.clone(),
  293. })
  294. }
  295. /// Posts an event to the relayer
  296. pub async fn post(&self, event: client::Event) -> Result<(), Error> {
  297. self.send_to_socket
  298. .send(event.into())
  299. .await
  300. .map_err(|e| Error::Sync(Box::new(e)))
  301. }
  302. }