client.rs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. use crate::Error;
  2. use futures::executor::block_on;
  3. use futures_util::{SinkExt, StreamExt};
  4. use nostr_rs_types::{
  5. client::{self, subscribe},
  6. types::SubscriptionId,
  7. Request, Response,
  8. };
  9. use std::{
  10. collections::HashMap,
  11. sync::{
  12. atomic::{AtomicBool, Ordering::Relaxed},
  13. Arc,
  14. },
  15. };
  16. use tokio::{
  17. sync::{mpsc, RwLock},
  18. task::JoinHandle,
  19. time::{sleep, timeout, Duration},
  20. };
  21. use tokio_tungstenite::{connect_async, tungstenite::Message};
  22. use url::Url;
  23. type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
  24. #[derive(Debug)]
  25. pub struct ActiveSubscription {
  26. id: SubscriptionId,
  27. subscriptions: Subscriptions,
  28. send_to_socket: mpsc::Sender<Request>,
  29. }
  30. impl Drop for ActiveSubscription {
  31. fn drop(&mut self) {
  32. block_on(async move {
  33. self.subscriptions.write().await.remove(&self.id);
  34. let _ = self
  35. .send_to_socket
  36. .send(nostr_rs_types::client::Close(self.id.clone()).into())
  37. .await;
  38. });
  39. }
  40. }
  41. /// Relayer object
  42. #[derive(Debug)]
  43. pub struct Client {
  44. /// URL of the relayer
  45. pub url: Url,
  46. /// Sender to the relayer. This can be used to send a Requests to this
  47. /// relayer
  48. pub send_to_socket: mpsc::Sender<Request>,
  49. /// List of active subscriptions for this nostr client
  50. subscriptions: Subscriptions,
  51. /// Background task / thread that is doing the actual connection
  52. worker: JoinHandle<()>,
  53. /// Wether the background worker is connected or not
  54. is_connected: Arc<AtomicBool>,
  55. }
  56. const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
  57. impl Drop for Client {
  58. fn drop(&mut self) {
  59. self.worker.abort()
  60. }
  61. }
  62. impl Client {
  63. /// Creates a new relayer
  64. pub fn new(send_message_to_listener: mpsc::Sender<(Response, Url)>, url: Url) -> Self {
  65. let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
  66. let is_connected = Arc::new(AtomicBool::new(false));
  67. let subscriptions = Arc::new(RwLock::new(HashMap::new()));
  68. let worker = Self::spawn_background_client(
  69. send_message_to_listener,
  70. send_to_socket,
  71. url.clone(),
  72. is_connected.clone(),
  73. subscriptions.clone(),
  74. );
  75. Self {
  76. url,
  77. is_connected,
  78. send_to_socket: sender_to_socket,
  79. subscriptions,
  80. worker,
  81. }
  82. }
  83. /// Spawns a background client that connects to the relayer
  84. /// and sends messages to the listener
  85. ///
  86. /// This function will return a JoinHandle that can be used to
  87. /// wait for the background client to finish or to cancel it.
  88. fn spawn_background_client(
  89. send_message_to_listener: mpsc::Sender<(Response, Url)>,
  90. mut send_to_socket: mpsc::Receiver<Request>,
  91. url: Url,
  92. is_connected: Arc<AtomicBool>,
  93. send_on_connection: Subscriptions,
  94. ) -> JoinHandle<()> {
  95. is_connected.store(false, Relaxed);
  96. tokio::spawn(async move {
  97. let mut connection_attempts = 0;
  98. loop {
  99. log::warn!("{}: Connect attempt {}", url, connection_attempts);
  100. connection_attempts += 1;
  101. let mut socket = match connect_async(url.clone()).await {
  102. Ok(x) => x.0,
  103. Err(err) => {
  104. log::warn!("{}: Failed to connect: {}", url, err);
  105. sleep(Duration::from_secs(5)).await;
  106. continue;
  107. }
  108. };
  109. log::info!("Connected to {}", url);
  110. connection_attempts = 0;
  111. let subscriptions = send_on_connection
  112. .read()
  113. .await
  114. .iter()
  115. .filter_map(|x| serde_json::to_string(&Request::Request(x.1.clone())).ok())
  116. .map(Message::Text)
  117. .collect::<Vec<_>>();
  118. for msg in subscriptions {
  119. if let Err(x) = socket.send(msg).await {
  120. log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
  121. }
  122. }
  123. loop {
  124. tokio::select! {
  125. Some(msg) = send_to_socket.recv() => {
  126. if let Ok(json) = serde_json::to_string(&msg) {
  127. log::info!("{}: Sending {}", url, json);
  128. if let Err(x) = socket.send(Message::Text(json)).await {
  129. log::error!("{} : Reconnecting due {}", url, x);
  130. break;
  131. }
  132. }
  133. }
  134. msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
  135. let msg = if let Ok(Some(Ok(msg))) = msg {
  136. is_connected.store(true, Relaxed);
  137. match msg {
  138. Message::Text(text) => text,
  139. Message::Ping(msg) => {
  140. if let Err(x) = socket.send(Message::Pong(msg)).await {
  141. log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
  142. break;
  143. }
  144. continue;
  145. },
  146. msg => {
  147. log::error!("Unexpected {:?}", msg);
  148. continue;
  149. }
  150. }
  151. } else {
  152. log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
  153. break;
  154. };
  155. if msg.is_empty() {
  156. continue;
  157. }
  158. log::info!("New message: {}", msg);
  159. let msg: Result<Response, _> = serde_json::from_str(&msg);
  160. if let Ok(msg) = msg {
  161. if let Err(error) = send_message_to_listener.try_send((msg.into(), url.clone())) {
  162. log::error!("{}: Reconnecting client because of {}", url, error);
  163. break;
  164. }
  165. }
  166. }
  167. else => {
  168. log::warn!("{}: else", url);
  169. break;
  170. }
  171. }
  172. }
  173. is_connected.store(false, Relaxed);
  174. // Throttle down to not spam the server with reconnections
  175. sleep(Duration::from_millis(500)).await;
  176. }
  177. })
  178. }
  179. /// Checks if the relayer is connected. It is guaranteed that the relayer is
  180. /// connected if this method returns true.
  181. pub fn is_connected(&self) -> bool {
  182. self.is_connected.load(Relaxed)
  183. }
  184. /// Creates a new subscription
  185. pub async fn subscribe(
  186. &self,
  187. subscription: subscribe::Subscribe,
  188. ) -> Result<ActiveSubscription, Error> {
  189. let id = subscription.subscription_id.clone();
  190. self.subscriptions
  191. .write()
  192. .await
  193. .insert(id.clone(), subscription.clone());
  194. self.send_to_socket
  195. .send(Request::Request(subscription))
  196. .await
  197. .map_err(|e| Error::Sync(Box::new(e)))?;
  198. Ok(ActiveSubscription {
  199. id,
  200. subscriptions: self.subscriptions.clone(),
  201. send_to_socket: self.send_to_socket.clone(),
  202. })
  203. }
  204. /// Posts an event to the relayer
  205. pub async fn post(&self, event: client::Event) -> Result<(), Error> {
  206. self.send_to_socket
  207. .send(event.into())
  208. .await
  209. .map_err(|e| Error::Sync(Box::new(e)))
  210. }
  211. }