client.rs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. use crate::Error;
  2. use futures::Future;
  3. use futures_util::{SinkExt, StreamExt};
  4. use nostr_rs_types::{Request, Response};
  5. use std::pin::Pin;
  6. use tokio::{
  7. sync::{broadcast, mpsc, oneshot},
  8. time::{sleep, timeout, Duration},
  9. };
  10. use tokio_tungstenite::{connect_async, tungstenite::Message};
  11. use url::Url;
  12. #[derive(Debug)]
  13. pub struct Client {
  14. pub url: String,
  15. pub send_to_socket: mpsc::Sender<Request>,
  16. recv_from_socket: broadcast::Receiver<(Response, String)>,
  17. stop_service: oneshot::Sender<()>,
  18. }
  19. const NO_ACTIVITY_TIMEOUT_SECS: u64 = 30;
  20. const MAX_RECONNECT_ATTEMPTS: u64 = 15;
  21. impl Client {
  22. pub fn new<F>(url: &str, on_connection: Option<F>) -> Result<Self, Error>
  23. where
  24. F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
  25. + Send
  26. + Sync
  27. + 'static,
  28. {
  29. let (send_to_socket, receiver) = mpsc::channel(100_000);
  30. let (recv_from_socket, stop_service) =
  31. Self::spawn_background_client(send_to_socket.clone(), receiver, url, on_connection)?;
  32. Ok(Self {
  33. url: url.to_owned(),
  34. send_to_socket,
  35. stop_service,
  36. recv_from_socket,
  37. })
  38. }
  39. fn spawn_background_client<F>(
  40. send_to_socket: mpsc::Sender<Request>,
  41. mut receiver: mpsc::Receiver<Request>,
  42. url_str: &str,
  43. on_connection: Option<F>,
  44. ) -> Result<(broadcast::Receiver<(Response, String)>, oneshot::Sender<()>), Error>
  45. where
  46. F: (Fn(mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
  47. + Send
  48. + Sync
  49. + 'static,
  50. {
  51. let (publish_to_listener, recv_from_socket) = broadcast::channel(10_000);
  52. let (stop_service, mut stopper_recv) = oneshot::channel();
  53. let url = url_str.to_owned();
  54. let url_parsed = Url::parse(&url)?;
  55. tokio::spawn(async move {
  56. let mut reconnect = true;
  57. let mut retries = 0;
  58. while reconnect && retries <= MAX_RECONNECT_ATTEMPTS {
  59. log::warn!("{}: Connect attempt {}", url, retries);
  60. retries += 1;
  61. let mut socket = if let Ok(x) = connect_async(url_parsed.clone()).await {
  62. x.0
  63. } else {
  64. log::warn!("{}: Failed to connect", url);
  65. sleep(Duration::from_secs(5)).await;
  66. continue;
  67. };
  68. log::info!("Connected to {}", url);
  69. if let Some(on_connection) = &on_connection {
  70. on_connection(send_to_socket.clone()).await;
  71. }
  72. loop {
  73. tokio::select! {
  74. Ok(()) = &mut stopper_recv => {
  75. log::warn!("{}: Breaking client due external signal", url);
  76. reconnect = false;
  77. break;
  78. },
  79. Some(msg) = receiver.recv() => {
  80. if let Ok(json) = serde_json::to_string(&msg) {
  81. log::info!("{}: Sending {}", url, json);
  82. if let Err(x) = socket.send(Message::Text(json)).await {
  83. log::error!("{} : Reconnecting due {}", url, x);
  84. break;
  85. }
  86. }
  87. }
  88. msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
  89. let msg =if let Ok(Some(Ok(Message::Text(msg)))) = msg {
  90. msg
  91. } else {
  92. log::error!("{} Reconnecting client due of empty recv", url);
  93. break;
  94. };
  95. log::info!("New message: {}", msg);
  96. retries = 0;
  97. if msg.is_empty() {
  98. continue;
  99. }
  100. let msg: Result<Response, _> = serde_json::from_str(&msg);
  101. if let Ok(msg) = msg {
  102. if let Err(error) = publish_to_listener.send((msg, url.to_owned())) {
  103. log::error!("{}: Reconnecting client because of {}", url, error);
  104. break;
  105. }
  106. }
  107. }
  108. else => {
  109. log::warn!("{}: else", url);
  110. break;
  111. }
  112. }
  113. }
  114. }
  115. log::warn!("{}: Disconnected", url);
  116. });
  117. Ok((recv_from_socket, stop_service))
  118. }
  119. pub fn is_running(&self) -> bool {
  120. !self.stop_service.is_closed()
  121. }
  122. pub fn subscribe(&self) -> Option<broadcast::Receiver<(Response, String)>> {
  123. if self.stop_service.is_closed() {
  124. None
  125. } else {
  126. Some(self.recv_from_socket.resubscribe())
  127. }
  128. }
  129. pub async fn send(&self, request: Request) -> Result<(), Error> {
  130. Ok(self.send_to_socket.send(request).await?)
  131. }
  132. pub async fn stop(self) {
  133. let _ = self.stop_service.send(());
  134. }
  135. }