浏览代码

Better usage

Cesar Rodas 1 年之前
父节点
当前提交
9857021e12
共有 8 个文件被更改,包括 317 次插入62 次删除
  1. 162 0
      Cargo.lock
  2. 1 0
      Cargo.toml
  3. 6 0
      crates/client/src/error.rs
  4. 13 3
      crates/client/src/lib.rs
  5. 24 8
      crates/client/src/relayer.rs
  6. 53 11
      crates/client/src/relayers.rs
  7. 53 0
      src/bin/dump.rs
  8. 5 40
      src/main.rs

+ 162 - 0
Cargo.lock

@@ -435,6 +435,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
 
 [[package]]
+name = "h2"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5be7b54589b581f624f566bf5d8eb2bab1db736c51528720b6bd36b96b55924d"
+dependencies = [
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "futures-util",
+ "http",
+ "indexmap",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
+[[package]]
+name = "hashbrown"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
+
+[[package]]
 name = "hermit-abi"
 version = "0.2.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -467,18 +492,73 @@ dependencies = [
 ]
 
 [[package]]
+name = "http-body"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
+dependencies = [
+ "bytes",
+ "http",
+ "pin-project-lite",
+]
+
+[[package]]
 name = "httparse"
 version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
 
 [[package]]
+name = "httpdate"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
+
+[[package]]
 name = "humantime"
 version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
 
 [[package]]
+name = "hyper"
+version = "0.14.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc5e554ff619822309ffd57d8734d77cd5ce6238bc956f037ea06c58238c9899"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "httparse",
+ "httpdate",
+ "itoa",
+ "pin-project-lite",
+ "socket2",
+ "tokio",
+ "tower-service",
+ "tracing",
+ "want",
+]
+
+[[package]]
+name = "hyper-rustls"
+version = "0.23.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
+dependencies = [
+ "http",
+ "hyper",
+ "rustls",
+ "rustls-native-certs",
+ "tokio",
+ "tokio-rustls",
+]
+
+[[package]]
 name = "iana-time-zone"
 version = "0.1.53"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -513,6 +593,31 @@ dependencies = [
 ]
 
 [[package]]
+name = "indexmap"
+version = "1.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
+dependencies = [
+ "autocfg",
+ "hashbrown",
+]
+
+[[package]]
+name = "instant-acme"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fbbf90cf8ba6f21d654be86375ee5e516ea38540a842b7259d3ddb848181691c"
+dependencies = [
+ "base64 0.21.0",
+ "hyper",
+ "hyper-rustls",
+ "ring",
+ "serde",
+ "serde_json",
+ "thiserror",
+]
+
+[[package]]
 name = "io-lifetimes"
 version = "1.0.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -698,6 +803,7 @@ dependencies = [
  "env_logger",
  "futures",
  "futures-util",
+ "instant-acme",
  "log",
  "nostr-rs-client",
  "nostr-rs-storage",
@@ -1317,6 +1423,52 @@ dependencies = [
 ]
 
 [[package]]
+name = "tokio-util"
+version = "0.7.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+ "tracing",
+]
+
+[[package]]
+name = "tower-service"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
+
+[[package]]
+name = "tracing"
+version = "0.1.37"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+dependencies = [
+ "cfg-if",
+ "pin-project-lite",
+ "tracing-core",
+]
+
+[[package]]
+name = "tracing-core"
+version = "0.1.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
+dependencies = [
+ "once_cell",
+]
+
+[[package]]
+name = "try-lock"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
+
+[[package]]
 name = "tungstenite"
 version = "0.18.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1406,6 +1558,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
 
 [[package]]
+name = "want"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
+dependencies = [
+ "log",
+ "try-lock",
+]
+
+[[package]]
 name = "wasi"
 version = "0.10.0+wasi-snapshot-preview1"
 source = "registry+https://github.com/rust-lang/crates.io-index"

+ 1 - 0
Cargo.toml

@@ -21,3 +21,4 @@ futures-util = "0.3.27"
 log = "0.4.17"
 thiserror = "1.0.40"
 futures = "0.3.28"
+instant-acme = "0.2.0"

+ 6 - 0
crates/client/src/error.rs

@@ -1,18 +1,24 @@
+//! Errors for this crate
 use nostr_rs_types::Request;
 use tokio::sync::mpsc::error::SendError;
 use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
 
+/// Crate errors
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
+    /// Error parsing the Url
     #[error("Url: {0}")]
     Url(#[from] url::ParseError),
 
+    /// WebSocket client error
     #[error("Tungstenite: {0}")]
     Tungstenite(#[from] TungsteniteError),
 
+    /// Internal error using the Sync channels
     #[error("Sync: {0}")]
     Sync(#[from] SendError<Request>),
 
+    /// The client has no connection to any relayer
     #[error("There is no connection")]
     Disconnected,
 }

+ 13 - 3
crates/client/src/lib.rs

@@ -1,5 +1,15 @@
-mod client;
-mod clients;
+//! # Nostr Client
+//!
+//! This crate will connect to other relayer or relayers and will subscribe to
+//! events or perform any operation that can be done in the protocol.
+//!
+//! The client will let you connect to a pool of relayers oferring a simple
+//! protocol to talk to all of them at the same time.
+//!
+//! It will also have reconnection logic built-in internally.
+#![deny(missing_docs, warnings)]
 mod error;
+mod relayer;
+mod relayers;
 
-pub use self::{client::Client, clients::Clients, error::Error};
+pub use self::{error::Error, relayer::Relayer, relayers::Relayers};

+ 24 - 8
crates/client/src/client.rs → crates/client/src/relayer.rs

@@ -11,20 +11,26 @@ use tokio::{
 use tokio_tungstenite::{connect_async, tungstenite::Message};
 use url::Url;
 
+/// Relayer object
 #[derive(Debug)]
-pub struct Client {
+pub struct Relayer {
+    /// URL of the relayer
     pub url: String,
+    /// Sender to the relayer. This can be used to send a Requests to this relayer
     pub send_to_socket: mpsc::Sender<Request>,
+    /// Internal receiver. This is used to receive messages from the relayer
     recv_from_socket: broadcast::Receiver<(Response, String)>,
+    /// This sender signals to background connection to stop
     stop_service: oneshot::Sender<()>,
 }
 
 const NO_ACTIVITY_TIMEOUT_SECS: u64 = 30;
-const MAX_RECONNECT_ATTEMPTS: u64 = 15;
 
-impl Client {
+impl Relayer {
+    /// Creates a new relayer
     pub fn new<F>(
         sent_messages: Arc<RwLock<Vec<Request>>>,
+        connection_retries: u16,
         url: &str,
         on_connection: Option<F>,
     ) -> Result<Self, Error>
@@ -40,6 +46,7 @@ impl Client {
             send_to_socket.clone(),
             receiver,
             url,
+            connection_retries,
             on_connection,
         )?;
 
@@ -56,6 +63,7 @@ impl Client {
         send_to_socket: mpsc::Sender<Request>,
         mut receiver: mpsc::Receiver<Request>,
         url_str: &str,
+        connection_retries: u16,
         on_connection: Option<F>,
     ) -> Result<(broadcast::Receiver<(Response, String)>, oneshot::Sender<()>), Error>
     where
@@ -74,7 +82,7 @@ impl Client {
             let mut reconnect = true;
             let mut retries = 0;
 
-            while reconnect && retries <= MAX_RECONNECT_ATTEMPTS {
+            while reconnect && retries <= connection_retries {
                 log::warn!("{}: Connect attempt {}", url, retries);
                 retries += 1;
                 let mut socket = if let Ok(x) = connect_async(url_parsed.clone()).await {
@@ -117,17 +125,18 @@ impl Client {
                                         continue;
                                     }
                                 } else {
-                                    log::error!("{} Reconnecting client due of empty recv", url);
+                                    log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
                                     break;
                                 };
 
-                            log::info!("New message: {}", msg);
-                            retries = 0;
-
                             if msg.is_empty() {
                                 continue;
                             }
 
+                            log::info!("New message: {}", msg);
+                            retries = 0;
+
+
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
@@ -142,6 +151,8 @@ impl Client {
                             break;
                         }
                     }
+                    // Throttle down to not spam the server with reconnections
+                    sleep(Duration::from_millis(500)).await;
                 }
             }
 
@@ -151,10 +162,13 @@ impl Client {
         Ok((recv_from_socket, stop_service))
     }
 
+    /// Checks if the relayer background connection is running
     pub fn is_running(&self) -> bool {
         !self.stop_service.is_closed()
     }
 
+    /// Subscribe to responses sent through this relayer. If the relayer is not
+    /// running, this will return None
     pub fn subscribe(&self) -> Option<broadcast::Receiver<(Response, String)>> {
         if self.stop_service.is_closed() {
             None
@@ -163,10 +177,12 @@ impl Client {
         }
     }
 
+    /// Sends a requests to this relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         Ok(self.send_to_socket.send(request).await?)
     }
 
+    /// Stops the background thread that has the connection to this relayer
     pub async fn stop(self) {
         let _ = self.stop_service.send(());
     }

+ 53 - 11
crates/client/src/clients.rs → crates/client/src/relayers.rs

@@ -1,4 +1,7 @@
-use crate::{Client, Error};
+//! Relayers
+//!
+//! This is the main entry point to the client library.
+use crate::{Error, Relayer};
 use futures::Future;
 use futures_util::{stream::FuturesUnordered, StreamExt};
 use nostr_rs_types::{Request, Response};
@@ -6,13 +9,18 @@ use parking_lot::RwLock;
 use std::{collections::HashMap, pin::Pin, sync::Arc};
 use tokio::sync::mpsc;
 
+/// Clients
+///
+/// This is a set of outgoing connections to relayers. This struct can connect
+/// async to N relayers offering a simple API to talk to all of them at the same
+/// time, and to receive messages
 #[derive(Debug, Clone)]
-pub struct Clients {
-    clients: Arc<RwLock<HashMap<String, Client>>>,
+pub struct Relayers {
+    clients: Arc<RwLock<HashMap<String, Relayer>>>,
     messages: Arc<RwLock<Vec<Request>>>,
 }
 
-impl Default for Clients {
+impl Default for Relayers {
     fn default() -> Self {
         Self {
             clients: Arc::new(RwLock::new(HashMap::new())),
@@ -21,7 +29,8 @@ impl Default for Clients {
     }
 }
 
-impl Clients {
+impl Relayers {
+    /// Receives a Response from any of the connected relayers
     pub async fn recv(&self) -> Result<Option<(Response, String)>, Error> {
         let mut subscriptions = self
             .clients
@@ -50,6 +59,8 @@ impl Clients {
         }
     }
 
+    /// Attempts to receive a Response from any of the relayers but won't wait if none
+    /// is available. It will return None if None is available
     pub fn try_recv(&self) -> Result<Option<(Response, String)>, Error> {
         let mut subscriptions = self
             .clients
@@ -73,6 +84,8 @@ impl Clients {
         Ok(None)
     }
 
+    /// Returns the number of active connections. If a connection to a relayer
+    /// is not active it will be removed from the list
     pub fn check_active_connections(&self) -> usize {
         let mut clients = self.clients.write();
         let mut to_remove = vec![];
@@ -89,6 +102,7 @@ impl Clients {
         clients.len()
     }
 
+    /// Sends a request to all the connected relayers
     pub async fn send(&self, request: Request) {
         let senders = self
             .clients
@@ -105,11 +119,22 @@ impl Clients {
         }
     }
 
-    pub fn get_sent_messages(&self) -> Vec<Request> {
-        self.messages.read().to_owned()
-    }
-
-    pub async fn connect_to<F>(&self, url: &str, on_connection: Option<F>) -> Result<bool, Error>
+    /// Creates a connection to a new relayer. If the connection is successful a
+    /// Callback will be called, with a list of previously sent requests, and a
+    /// Sender to send new requests to this relayer alone.
+    ///
+    /// The same callback will be called for every reconnection to the same
+    /// relayer, also the callback will be called, giving the chance to re-send
+    /// sent requests to the new connections
+    ///
+    /// This function will open a connection at most once, if a connection
+    /// already exists false will be returned
+    pub async fn connect_to<F>(
+        &self,
+        url: &str,
+        connection_retries: u16,
+        on_connection: Option<F>,
+    ) -> Result<bool, Error>
     where
         F: (Fn(Vec<Request>, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
@@ -123,9 +148,26 @@ impl Clients {
             log::warn!("Connecting to {}", url);
             clients.insert(
                 url.to_owned(),
-                Client::new(self.messages.clone(), url, on_connection)?,
+                Relayer::new(
+                    self.messages.clone(),
+                    connection_retries,
+                    url,
+                    on_connection,
+                )?,
             );
             true
         })
     }
+
+    /// Disconnects from all relayers
+    pub async fn stop(&self) {
+        let mut clients = self.clients.write();
+        let keys = clients.keys().cloned().collect::<Vec<_>>();
+
+        for key in keys.iter() {
+            if let Some(client) = clients.remove(key) {
+                client.stop().await;
+            }
+        }
+    }
 }

+ 53 - 0
src/bin/dump.rs

@@ -0,0 +1,53 @@
+use futures::Future;
+use nostr_rs_client::{Clients, Error as ClientError};
+use nostr_rs_storage::Db;
+use nostr_rs_types::{client::Subscribe, Request, Response};
+use std::pin::Pin;
+use tokio::sync::mpsc;
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("Nostr: {0}")]
+    Addr(#[from] nostr_rs_types::types::addr::Error),
+
+    #[error("client: {0}")]
+    Client(#[from] ClientError),
+}
+
+fn on_connection(
+    sent_messages: Vec<Request>,
+    socket: mpsc::Sender<Request>,
+) -> Pin<Box<dyn Future<Output = ()> + Send>> {
+    println!("REconnecting with {:?}", sent_messages);
+    Box::pin(async move {
+        for m in sent_messages {
+            let _ = socket.send(m).await;
+        }
+    })
+}
+
+#[tokio::main]
+async fn main() {
+    env_logger::init();
+    let clients = Clients::default();
+    clients.send(Subscribe::default().into()).await;
+
+    let _ = clients
+        .connect_to("wss://relay.damus.io/", Some(on_connection))
+        .await;
+    let db = Db::new("./db").expect("db");
+
+    loop {
+        println!("going into loop");
+        if let Some((msg, _relayed_by)) = clients.recv().await.expect("valid connection") {
+            match msg {
+                Response::Event(x) => {
+                    let event = x.event;
+
+                    let _ = db.store(&event);
+                }
+                _ => {}
+            }
+        }
+    }
+}

+ 5 - 40
src/main.rs

@@ -1,42 +1,7 @@
-use futures::Future;
-use nostr_rs_client::{Clients, Error as ClientError};
-use nostr_rs_storage::Db;
-use nostr_rs_types::{client::Subscribe, Request, Response};
-use std::pin::Pin;
-use tokio::sync::mpsc;
-
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
-    #[error("Nostr: {0}")]
-    Addr(#[from] nostr_rs_types::types::addr::Error),
-
-    #[error("client: {0}")]
-    Client(#[from] ClientError),
-}
-
-fn on_connection(socket: mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
-    Box::pin(async move {
-        log::info!("Connected to relay, subscribing to all events");
-        let _ = socket.send(Subscribe::default().into()).await;
-    })
-}
+use instant_acme::{
+    Account, AuthorizationStatus, ChallengeType, Identifier, KeyAuthorization, LetsEncrypt,
+    NewAccount, NewOrder, OrderStatus,
+};
 
 #[tokio::main]
-async fn main() {
-    env_logger::init();
-    let clients = Clients::default();
-
-    let _ = clients
-        .connect_to("wss://relay.damus.io/", Some(on_connection))
-        .await;
-    let db = Db::new("./db").expect("db");
-    loop {
-        if let Some((msg, _relayed_by)) = clients.recv().await {
-            if let Response::Event(x) = msg {
-                let event = x.event;
-
-                let _ = db.store(&event);
-            }
-        }
-    }
-}
+async fn main() {}