Cesar Rodas 3 месяцев назад
Родитель
Сommit
1bfd71d96a
6 измененных файлов с 91 добавлено и 52 удалено
  1. 10 8
      Cargo.lock
  2. 2 1
      Cargo.toml
  3. 13 12
      crates/client/src/pool.rs
  4. 18 21
      crates/client/src/relayer.rs
  5. 10 6
      src/config.rs
  6. 38 4
      src/main.rs

+ 10 - 8
Cargo.lock

@@ -353,9 +353,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
 
 [[package]]
 name = "form_urlencoded"
-version = "1.1.0"
+version = "1.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
+checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
 dependencies = [
  "percent-encoding",
 ]
@@ -638,9 +638,9 @@ dependencies = [
 
 [[package]]
 name = "idna"
-version = "0.3.0"
+version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
+checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
 dependencies = [
  "unicode-bidi",
  "unicode-normalization",
@@ -888,6 +888,7 @@ dependencies = [
  "thiserror",
  "tokio",
  "toml",
+ "url",
 ]
 
 [[package]]
@@ -1053,9 +1054,9 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
 
 [[package]]
 name = "percent-encoding"
-version = "2.2.0"
+version = "2.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
+checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
 
 [[package]]
 name = "pin-project-lite"
@@ -1712,13 +1713,14 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
 
 [[package]]
 name = "url"
-version = "2.3.1"
+version = "2.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
+checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
 dependencies = [
  "form_urlencoded",
  "idna",
  "percent-encoding",
+ "serde",
 ]
 
 [[package]]

+ 2 - 1
Cargo.toml

@@ -25,6 +25,7 @@ log = "0.4.17"
 thiserror = "1.0.40"
 futures = "0.3.28"
 instant-acme = "0.2.0"
-serde = "1.0.183"
+serde = { version = "1.0.183", features = ["derive"] }
 toml = "0.7.6"
 serde_json = "1.0.105"
+url = { version = "2.5.2", features = ["serde"] }

+ 13 - 12
crates/client/src/pool.rs

@@ -1,11 +1,12 @@
 //! Relayers
 //!
 //! This is the main entry point to the client library.
-use crate::{Error, Relayer};
+use crate::Relayer;
 use futures::Future;
 use nostr_rs_types::{Request, Response};
 use std::{collections::HashMap, pin::Pin};
 use tokio::sync::mpsc;
+use url::Url;
 
 /// Clients
 ///
@@ -14,9 +15,9 @@ use tokio::sync::mpsc;
 /// time, and to receive messages
 #[derive(Debug)]
 pub struct Pool {
-    clients: HashMap<String, Relayer>,
-    sender: mpsc::Sender<(Event, String)>,
-    receiver: mpsc::Receiver<(Event, String)>,
+    clients: HashMap<Url, Relayer>,
+    sender: mpsc::Sender<(Event, Url)>,
+    receiver: mpsc::Receiver<(Event, Url)>,
 }
 
 impl Default for Pool {
@@ -51,12 +52,12 @@ impl Pool {
     }
 
     /// Tries to receive a message from any of the connected relayers
-    pub fn try_recv(&mut self) -> Option<(Event, String)> {
+    pub fn try_recv(&mut self) -> Option<(Event, Url)> {
         self.receiver.try_recv().ok()
     }
 
     /// Receives a message from any of the connected relayers
-    pub async fn recv(&mut self) -> Option<(Event, String)> {
+    pub async fn recv(&mut self) -> Option<(Event, Url)> {
         self.receiver.recv().await
     }
 
@@ -91,21 +92,21 @@ impl Pool {
     ///
     /// This function will open a connection at most once, if a connection
     /// already exists false will be returned
-    pub fn connect_to<F>(mut self, url: &str, on_connection: Option<F>) -> Result<Self, Error>
+    pub fn connect_to<F>(mut self, url: Url, on_connection: Option<F>) -> Self
     where
-        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(&Url, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
     {
-        if !self.clients.contains_key(url) {
+        if !self.clients.contains_key(&url) {
             log::warn!("Connecting to {}", url);
             self.clients.insert(
-                url.to_owned(),
-                Relayer::new(self.sender.clone(), url, on_connection)?,
+                url.clone(),
+                Relayer::new(self.sender.clone(), url, on_connection),
             );
         }
 
-        Ok(self)
+        self
     }
 }

+ 18 - 21
crates/client/src/relayer.rs

@@ -21,7 +21,7 @@ use url::Url;
 #[derive(Debug)]
 pub struct Relayer {
     /// URL of the relayer
-    pub url: String,
+    pub url: Url,
     /// Sender to the relayer. This can be used to send a Requests to this
     /// relayer
     pub send_to_socket: mpsc::Sender<Request>,
@@ -42,12 +42,12 @@ impl Drop for Relayer {
 impl Relayer {
     /// Creates a new relayer
     pub fn new<F>(
-        broadcast_to_listeners: mpsc::Sender<(Event, String)>,
-        url: &str,
+        broadcast_to_listeners: mpsc::Sender<(Event, Url)>,
+        url: Url,
         on_connection: Option<F>,
-    ) -> Result<Self, Error>
+    ) -> Self
     where
-        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(&Url, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
@@ -58,45 +58,42 @@ impl Relayer {
             broadcast_to_listeners,
             sender_to_socket.clone(),
             send_to_socket,
-            url,
+            url.clone(),
             is_connected.clone(),
             on_connection,
-        )?;
+        );
 
-        Ok(Self {
-            url: url.to_owned(),
+        Self {
+            url,
             is_connected,
             send_to_socket: sender_to_socket,
             worker,
-        })
+        }
     }
 
     fn spawn_background_client<F>(
-        broadcast_to_listeners: mpsc::Sender<(Event, String)>,
+        broadcast_to_listeners: mpsc::Sender<(Event, Url)>,
         sender_to_socket: mpsc::Sender<Request>,
         mut send_to_socket: mpsc::Receiver<Request>,
-        url_str: &str,
+        url: Url,
         is_connected: Arc<AtomicBool>,
         on_connection: Option<F>,
-    ) -> Result<JoinHandle<()>, Error>
+    ) -> JoinHandle<()>
     where
-        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
+        F: (Fn(&Url, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
             + Send
             + Sync
             + 'static,
     {
-        let url = url_str.to_owned();
-        let url_parsed = Url::parse(&url)?;
-
         is_connected.store(false, Relaxed);
 
-        Ok(tokio::spawn(async move {
+        tokio::spawn(async move {
             let mut connection_attempts = 0;
 
             loop {
                 log::warn!("{}: Connect attempt {}", url, connection_attempts);
                 connection_attempts += 1;
-                let mut socket = match connect_async(url_parsed.clone()).await {
+                let mut socket = match connect_async(url.clone()).await {
                     Ok(x) => x.0,
                     Err(err) => {
                         log::warn!("{}: Failed to connect: {}", url, err);
@@ -155,7 +152,7 @@ impl Relayer {
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.to_owned())) {
+                                if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.clone())) {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }
@@ -172,7 +169,7 @@ impl Relayer {
                 // Throttle down to not spam the server with reconnections
                 sleep(Duration::from_millis(500)).await;
             }
-        }))
+        })
     }
 
     /// Checks if the relayer is connected. It is guaranteed that the relayer is

+ 10 - 6
src/config.rs

@@ -1,18 +1,22 @@
-use nostr_rs_types::types::Id;
+use nostr_rs_types::types::Addr;
 use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct Config {
-    pub relayers: Vec<String>,
-    pub max_connections_attempts: u16,
+    /// List of external relayers to connect to
+    pub relayers: Vec<url::Url>,
+    /// Local database path
     pub db_path: String,
+    /// External port
     pub port: u16,
-    pub accounts: Vec<Accounts>,
+    /// List of accounts to have the relayer
+    pub account: Vec<Account>,
+    /// Domain to create webserver
     pub domain: Option<String>,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct Accounts {
-    pub pub_key: Id,
+pub struct Account {
+    pub id: Addr,
     pub name: Option<String>,
 }

+ 38 - 4
src/main.rs

@@ -1,20 +1,21 @@
 use futures::Future;
 use nostr_rs_client::Event;
 use nostr_rs_rocksdb::RocksDb;
-use nostr_rs_types::{Request, Response};
-use std::{collections::HashMap, pin::Pin, sync::Arc};
+use nostr_rs_types::{types::Filter, Request, Response};
+use std::{collections::HashMap, env, fs, pin::Pin, sync::Arc};
 use tokio::{
     net::TcpListener,
     sync::mpsc,
     time::{sleep, Duration},
 };
+use url::Url;
 
 mod config;
 
 use config::Config;
 
 fn on_connection(
-    host: &str,
+    host: &Url,
     _socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
     Box::pin(async move {})
@@ -49,6 +50,39 @@ fn create_listener(relayer: Arc<nostr_rs_relayer::Relayer>) -> nostr_rs_client::
 #[tokio::main]
 async fn main() {
     env_logger::init();
+
+    let args: Vec<String> = env::args().collect();
+    let config_path = if args.len() > 1 {
+        &args[1]
+    } else {
+        "config.toml"
+    };
+
+    let config_content = fs::read_to_string(config_path).expect("Failed to read the config file");
+
+    let config: Config = toml::from_str(&config_content).expect("Failed to parse the config file");
+    println!("{:#?}", config);
+
+    let db = RocksDb::new(&config.db_path).expect("db");
+    let client_pool = config
+        .relayers
+        .iter()
+        .fold(nostr_rs_client::Pool::new(), |clients, relayer_url| {
+            clients.connect_to(relayer_url.clone(), Some(on_connection))
+        });
+
+    let initial_subscription = Filter {
+        authors: config
+            .account
+            .iter()
+            .map(|x| x.id.clone())
+            .collect::<Vec<_>>(),
+        ..Default::default()
+    };
+
+    println!("Initial subscription: {:?}", initial_subscription);
+
+    /*
     let db = RocksDb::new("./relayer-db").expect("db");
     let (relayer, mut server_receiver) = nostr_rs_relayer::Relayer::new(Some(db));
     let mut clients = nostr_rs_client::Pool::new()
@@ -69,6 +103,7 @@ async fn main() {
     let listener: TcpListener = TcpListener::bind(&addr).await.unwrap();
 
     clients.send(Request::Request(Default::default())).await;
+
     loop {
         tokio::select! {
             Some((event, url)) = clients.recv() => {
@@ -76,7 +111,6 @@ async fn main() {
             }
         }
     }
-    /*
     let relayer_for_recv = relayer.clone();
     let relayer_for_client = relayer.clone();
     let relayer_for_republisher = relayer.clone();