|
@@ -0,0 +1,162 @@
|
|
|
+use futures_util::{SinkExt, StreamExt};
|
|
|
+use nostr_rs_types::{Request, Response};
|
|
|
+use parking_lot::RwLock;
|
|
|
+use std::{collections::HashMap, sync::Arc};
|
|
|
+use tokio::sync::{
|
|
|
+ broadcast,
|
|
|
+ mpsc::{self, error::SendError},
|
|
|
+ oneshot,
|
|
|
+};
|
|
|
+use tokio_tungstenite::{
|
|
|
+ connect_async, tungstenite::error::Error as TungsteniteError, tungstenite::Message,
|
|
|
+};
|
|
|
+use url::Url;
|
|
|
+
|
|
|
+#[derive(thiserror::Error, Debug)]
|
|
|
+pub enum Error {
|
|
|
+ #[error("Url: {0}")]
|
|
|
+ Url(#[from] url::ParseError),
|
|
|
+
|
|
|
+ #[error("Tungstenite: {0}")]
|
|
|
+ Tungstenite(#[from] TungsteniteError),
|
|
|
+
|
|
|
+ #[error("Sync: {0}")]
|
|
|
+ Sync(#[from] SendError<Request>),
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct Client {
|
|
|
+ pub url: String,
|
|
|
+
|
|
|
+ pub sender: mpsc::Sender<Request>,
|
|
|
+
|
|
|
+ receiver: broadcast::Receiver<Response>,
|
|
|
+ stopper: oneshot::Sender<()>,
|
|
|
+}
|
|
|
+
|
|
|
+impl Client {
|
|
|
+ pub fn new(url: &str) -> Result<Self, Error> {
|
|
|
+ let (sender, receiver) = mpsc::channel(10_000);
|
|
|
+ let (receiver, stopper) = Self::spawn(receiver, url)?;
|
|
|
+
|
|
|
+ Ok(Self {
|
|
|
+ url: url.to_owned(),
|
|
|
+ sender,
|
|
|
+ stopper,
|
|
|
+ receiver,
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ fn spawn(
|
|
|
+ mut receiver: mpsc::Receiver<Request>,
|
|
|
+ url: &str,
|
|
|
+ ) -> Result<(broadcast::Receiver<Response>, oneshot::Sender<()>), Error> {
|
|
|
+ let url = Url::parse(url)?;
|
|
|
+ let (response_sender, response_receiver) = broadcast::channel(10_000);
|
|
|
+ let (stopper_sender, mut stopper_recv) = oneshot::channel();
|
|
|
+
|
|
|
+ tokio::spawn(async move {
|
|
|
+ let (mut socket, _) = connect_async(url).await.expect("valid connection");
|
|
|
+ loop {
|
|
|
+ tokio::select! {
|
|
|
+ Ok(()) = &mut stopper_recv => {
|
|
|
+ println!("Breaking client");
|
|
|
+ break;
|
|
|
+ },
|
|
|
+ Some(msg) = receiver.recv() => {
|
|
|
+ if let Ok(json) = serde_json::to_string(&msg) {
|
|
|
+ socket.send(Message::Text(json)).await.unwrap();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Some(Ok(msg)) = socket.next() => {
|
|
|
+ let msg =if let Ok(msg) = msg.into_text() {
|
|
|
+ msg
|
|
|
+ } else {
|
|
|
+ continue;
|
|
|
+ };
|
|
|
+
|
|
|
+ if msg.is_empty() {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let msg: Result<Response, _> = serde_json::from_str(&msg);
|
|
|
+
|
|
|
+ if let Ok(msg) = msg {
|
|
|
+ if let Err(error) = response_sender.send(msg) {
|
|
|
+ println!("Disconnecting client because of {}", error);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ Ok((response_receiver, stopper_sender))
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn subscribe(&self) -> broadcast::Receiver<Response> {
|
|
|
+ self.receiver.resubscribe()
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn send(&self, request: Request) -> Result<(), Error> {
|
|
|
+ Ok(self.sender.send(request).await?)
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn stop(self) {
|
|
|
+ let _ = self.stopper.send(());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug, Clone)]
|
|
|
+pub struct Clients {
|
|
|
+ clients: Arc<RwLock<HashMap<String, Client>>>,
|
|
|
+ subscriptions: Arc<RwLock<Vec<broadcast::Receiver<Response>>>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl Default for Clients {
|
|
|
+ fn default() -> Self {
|
|
|
+ Self {
|
|
|
+ clients: Arc::new(RwLock::new(HashMap::new())),
|
|
|
+ subscriptions: Arc::new(RwLock::new(vec![])),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Clients {
|
|
|
+ pub fn try_recv(&self) -> Option<Response> {
|
|
|
+ let mut subscriptions = self.subscriptions.write();
|
|
|
+ for sub in subscriptions.iter_mut() {
|
|
|
+ if let Ok(msg) = sub.try_recv() {
|
|
|
+ return Some(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ None
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn send(&self, request: Request) {
|
|
|
+ let senders = self
|
|
|
+ .clients
|
|
|
+ .read()
|
|
|
+ .iter()
|
|
|
+ .map(|(_, c)| c.sender.clone())
|
|
|
+ .collect::<Vec<mpsc::Sender<_>>>();
|
|
|
+
|
|
|
+ for sender in senders.iter() {
|
|
|
+ let _ = sender.send(request.clone()).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn connect_to(&self, url: &str) -> Result<bool, Error> {
|
|
|
+ let mut clients = self.clients.write();
|
|
|
+ Ok(if clients.get(url).is_some() {
|
|
|
+ false
|
|
|
+ } else {
|
|
|
+ let client = Client::new(url)?;
|
|
|
+ let mut subscriptions = self.subscriptions.write();
|
|
|
+ subscriptions.push(client.subscribe());
|
|
|
+ clients.insert(url.to_owned(), client);
|
|
|
+ true
|
|
|
+ })
|
|
|
+ }
|
|
|
+}
|