Przeglądaj źródła

Working on more changes to make project better organized

Cesar Rodas 1 rok temu
rodzic
commit
60a6466b56
8 zmienionych plików z 211 dodań i 253 usunięć
  1. 103 30
      Cargo.lock
  2. 4 7
      Cargo.toml
  3. 2 2
      crates/client/src/lib.rs
  4. 16 13
      crates/client/src/relayer.rs
  5. 0 124
      crates/client/src/relayers.rs
  6. 18 11
      crates/relayer/src/relayer.rs
  7. 5 16
      src/bin/dump.rs
  8. 63 50
      src/main.rs

+ 103 - 30
Cargo.lock

@@ -12,6 +12,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "android-tzdata"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
+
+[[package]]
 name = "android_system_properties"
 version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -134,13 +140,13 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
 name = "chrono"
-version = "0.4.23"
+version = "0.4.26"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f"
+checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5"
 dependencies = [
+ "android-tzdata",
  "iana-time-zone",
  "js-sys",
- "num-integer",
  "num-traits",
  "time",
  "wasm-bindgen",
@@ -283,6 +289,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "equivalent"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
+
+[[package]]
 name = "errno"
 version = "0.2.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -374,7 +386,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.12",
+ "syn 2.0.29",
 ]
 
 [[package]]
@@ -446,7 +458,7 @@ dependencies = [
  "futures-sink",
  "futures-util",
  "http",
- "indexmap",
+ "indexmap 1.9.3",
  "slab",
  "tokio",
  "tokio-util",
@@ -460,6 +472,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
 
 [[package]]
+name = "hashbrown"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
+
+[[package]]
 name = "hermit-abi"
 version = "0.2.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -599,7 +617,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
 dependencies = [
  "autocfg",
- "hashbrown",
+ "hashbrown 0.12.3",
+]
+
+[[package]]
+name = "indexmap"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
+dependencies = [
+ "equivalent",
+ "hashbrown 0.14.0",
 ]
 
 [[package]]
@@ -809,9 +837,11 @@ dependencies = [
  "nostr-rs-relayer",
  "nostr-rs-storage",
  "nostr-rs-types",
+ "serde",
  "serde_json",
  "thiserror",
  "tokio",
+ "toml",
 ]
 
 [[package]]
@@ -850,6 +880,7 @@ dependencies = [
 name = "nostr-rs-storage"
 version = "0.1.0"
 dependencies = [
+ "chrono",
  "nostr-rs-types",
  "rand",
  "rocksdb",
@@ -876,16 +907,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "num-integer"
-version = "0.1.45"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
-dependencies = [
- "autocfg",
- "num-traits",
-]
-
-[[package]]
 name = "num-traits"
 version = "0.2.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -977,18 +998,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.52"
+version = "1.0.66"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1d0e1ae9e836cc3beddd63db0df682593d7e2d3d891ae8c9083d2113e1744224"
+checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
 dependencies = [
  "unicode-ident",
 ]
 
 [[package]]
 name = "quote"
-version = "1.0.26"
+version = "1.0.33"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc"
+checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
 dependencies = [
  "proc-macro2",
 ]
@@ -1208,29 +1229,29 @@ dependencies = [
 
 [[package]]
 name = "serde"
-version = "1.0.153"
+version = "1.0.183"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3a382c72b4ba118526e187430bb4963cd6d55051ebf13d9b25574d379cc98d20"
+checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.153"
+version = "1.0.183"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1ef476a5790f0f6decbc66726b6e5d63680ed518283e64c7df415989d880954f"
+checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 1.0.109",
+ "syn 2.0.29",
 ]
 
 [[package]]
 name = "serde_json"
-version = "1.0.94"
+version = "1.0.105"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea"
+checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360"
 dependencies = [
  "itoa",
  "ryu",
@@ -1238,6 +1259,15 @@ dependencies = [
 ]
 
 [[package]]
+name = "serde_spanned"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186"
+dependencies = [
+ "serde",
+]
+
+[[package]]
 name = "sha1"
 version = "0.10.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1318,9 +1348,9 @@ dependencies = [
 
 [[package]]
 name = "syn"
-version = "2.0.12"
+version = "2.0.29"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "79d9531f94112cfc3e4c8f5f02cb2b58f72c97b7efd85f70203cc6d8efda5927"
+checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1353,7 +1383,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.12",
+ "syn 2.0.29",
 ]
 
 [[package]]
@@ -1455,6 +1485,40 @@ dependencies = [
 ]
 
 [[package]]
+name = "toml"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542"
+dependencies = [
+ "serde",
+ "serde_spanned",
+ "toml_datetime",
+ "toml_edit",
+]
+
+[[package]]
+name = "toml_datetime"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
+dependencies = [
+ "serde",
+]
+
+[[package]]
+name = "toml_edit"
+version = "0.19.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
+dependencies = [
+ "indexmap 2.0.0",
+ "serde",
+ "serde_spanned",
+ "toml_datetime",
+ "winnow",
+]
+
+[[package]]
 name = "tower-service"
 version = "0.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1784,6 +1848,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
 
 [[package]]
+name = "winnow"
+version = "0.5.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d09770118a7eb1ccaf4a594a221334119a44a814fcb0d31c5b85e83e97227a97"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
 name = "zstd-sys"
 version = "2.0.7+zstd.1.5.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"

+ 4 - 7
Cargo.toml

@@ -4,12 +4,7 @@ version = "0.1.0"
 edition = "2021"
 
 [workspace]
-members = [
-    "crates/types",
-    "crates/client",
-    "crates/relayer",
-    "crates/storage"
-]
+members = ["crates/types", "crates/client", "crates/relayer", "crates/storage"]
 
 [dependencies]
 nostr-rs-types = { path = "crates/types" }
@@ -18,9 +13,11 @@ nostr-rs-storage = { path = "crates/storage" }
 nostr-rs-relayer = { path = "crates/relayer" }
 tokio = { version = "1.26.0", features = ["full"] }
 env_logger = "0.10.0"
-serde_json = "1.0.94"
 futures-util = "0.3.27"
 log = "0.4.17"
 thiserror = "1.0.40"
 futures = "0.3.28"
 instant-acme = "0.2.0"
+serde = "1.0.183"
+toml = "0.7.6"
+serde_json = "1.0.105"

+ 2 - 2
crates/client/src/lib.rs

@@ -9,11 +9,11 @@
 //! It will also have reconnection logic built-in internally.
 #![deny(missing_docs, warnings)]
 mod error;
+mod pool;
 mod relayer;
-mod relayers;
 
 pub use self::{
     error::Error,
+    pool::{Event, Pool},
     relayer::Relayer,
-    relayers::{Event, Relayers},
 };

+ 16 - 13
crates/client/src/relayer.rs

@@ -1,4 +1,4 @@
-use crate::{relayers::Event, Error};
+use crate::{pool::Event, Error};
 use futures::Future;
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{Request, Response};
@@ -44,12 +44,12 @@ impl Relayer {
             + Sync
             + 'static,
     {
-        let (send_to_socket, receiver) = mpsc::channel(100_000);
+        let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
         let is_connected = Arc::new(AtomicBool::new(false));
         let stop_service = Self::spawn_background_client(
             broadcast_to_listeners,
-            send_to_socket.clone(),
-            receiver,
+            sender_to_socket.clone(),
+            send_to_socket,
             url,
             max_connections_attempts,
             is_connected.clone(),
@@ -59,15 +59,15 @@ impl Relayer {
         Ok(Self {
             url: url.to_owned(),
             is_connected,
-            send_to_socket,
+            send_to_socket: sender_to_socket,
             stop_service,
         })
     }
 
     fn spawn_background_client<F>(
         broadcast_to_listeners: mpsc::Sender<(Event, String)>,
-        send_to_socket: mpsc::Sender<Request>,
-        mut receiver: mpsc::Receiver<Request>,
+        sender_to_socket: mpsc::Sender<Request>,
+        mut send_to_socket: mpsc::Receiver<Request>,
         url_str: &str,
         max_connections_attempts: u16,
         is_connected: Arc<AtomicBool>,
@@ -103,7 +103,7 @@ impl Relayer {
                 log::info!("Connected to {}", url);
 
                 if let Some(on_connection) = &on_connection {
-                    on_connection(&url, send_to_socket.clone()).await;
+                    on_connection(&url, sender_to_socket.clone()).await;
                 }
 
                 loop {
@@ -114,7 +114,7 @@ impl Relayer {
                             break;
                         },
 
-                        Some(msg) = receiver.recv() => {
+                        Some(msg) = send_to_socket.recv() => {
                             if let Ok(json) = serde_json::to_string(&msg) {
                                 log::info!("{}: Sending {}", url, json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
@@ -129,13 +129,16 @@ impl Relayer {
                                     match msg {
                                         Message::Text(text) => text,
                                         Message::Ping(msg) => {
-                                            if let Err(x) = socket.send(Message::Pong(msg.clone())).await {
-                                                log::error!("{} : Reconnecting due error at sending Pong({:?}): {}", url, msg, x);
+                                            if let Err(x) = socket.send(Message::Pong(msg)).await {
+                                                log::error!("{} : Reconnecting due error at sending Pong: {:?}", url, x);
                                                 break;
                                             }
                                             continue;
                                         },
-                                        msg => panic!("Unexpected {:?}", msg),
+                                        msg => {
+                                            log::error!("Unexpected {:?}", msg);
+                                            continue;
+                                        }
                                     }
                                 } else {
                                     log::error!("{} Reconnecting client due of empty recv: {:?}", url, msg);
@@ -198,7 +201,7 @@ impl Relayer {
     }
 
     /// Stops the background thread that has the connection to this relayer
-    pub async fn stop(self) {
+    pub async fn disconnect(self) {
         let _ = self.stop_service.send(());
     }
 }

+ 0 - 124
crates/client/src/relayers.rs

@@ -1,124 +0,0 @@
-//! Relayers
-//!
-//! This is the main entry point to the client library.
-use crate::{Error, Relayer};
-use futures::Future;
-use nostr_rs_types::{Request, Response};
-use std::{collections::HashMap, pin::Pin};
-use tokio::sync::mpsc;
-
-/// Clients
-///
-/// This is a set of outgoing connections to relayers. This struct can connect
-/// async to N relayers offering a simple API to talk to all of them at the same
-/// time, and to receive messages
-#[derive(Debug)]
-pub struct Relayers {
-    clients: HashMap<String, Relayer>,
-    sender: mpsc::Sender<(Event, String)>,
-}
-
-/// Client event
-///
-/// This type wraps a response a disconnected event. The disconnected will be
-/// the last event to be sent.
-#[derive(Debug, Clone)]
-pub enum Event {
-    /// A response
-    Response(Response),
-    /// A disconnection event
-    Disconnected,
-}
-
-impl Relayers {
-    /// Creates a new Relayers object
-    pub fn new() -> (Self, mpsc::Receiver<(Event, String)>) {
-        let (sender, receiver) = mpsc::channel(100_000);
-        (
-            Self {
-                clients: HashMap::new(),
-                sender,
-            },
-            receiver,
-        )
-    }
-
-    /// Returns a vector to all outgoing connections
-    pub fn get_connections<'a>(&'a self) -> Vec<&'a Relayer> {
-        self.clients.values().collect::<Vec<&Relayer>>()
-    }
-
-    /// Returns the number of active connections. If a connection to a relayer
-    /// is not active it will be removed from the list
-    pub fn check_active_connections(&mut self) -> usize {
-        let mut to_remove = vec![];
-        for (url, client) in self.clients.iter() {
-            if !client.is_running() {
-                to_remove.push(url.to_owned());
-            }
-        }
-
-        for url in to_remove.iter() {
-            self.clients.remove(url);
-        }
-
-        self.clients.len()
-    }
-
-    /// Sends a request to all the connected relayers
-    pub async fn send(&self, request: Request) {
-        for (_, sender) in self.clients.iter() {
-            let _ = sender.send(request.clone()).await;
-        }
-    }
-
-    /// Creates a connection to a new relayer. If the connection is successful a
-    /// Callback will be called, with a list of previously sent requests, and a
-    /// Sender to send new requests to this relayer alone.
-    ///
-    /// The same callback will be called for every reconnection to the same
-    /// relayer, also the callback will be called, giving the chance to re-send
-    /// sent requests to the new connections
-    ///
-    /// This function will open a connection at most once, if a connection
-    /// already exists false will be returned
-    pub async fn connect_to<F>(
-        &mut self,
-        url: &str,
-        max_connections_attempts: u16,
-        on_connection: Option<F>,
-    ) -> Result<bool, Error>
-    where
-        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
-            + Send
-            + Sync
-            + 'static,
-    {
-        Ok(if self.clients.get(url).is_some() {
-            false
-        } else {
-            log::warn!("Connecting to {}", url);
-            self.clients.insert(
-                url.to_owned(),
-                Relayer::new(
-                    self.sender.clone(),
-                    max_connections_attempts,
-                    url,
-                    on_connection,
-                )?,
-            );
-            true
-        })
-    }
-
-    /// Disconnects from all relayers
-    pub async fn stop(&mut self) {
-        let keys = self.clients.keys().cloned().collect::<Vec<_>>();
-
-        for key in keys.iter() {
-            if let Some(client) = self.clients.remove(key) {
-                client.stop().await;
-            }
-        }
-    }
-}

+ 18 - 11
crates/relayer/src/relayer.rs

@@ -6,7 +6,7 @@ use nostr_rs_types::{
     Request, Response,
 };
 use parking_lot::{RwLock, RwLockReadGuard};
-use std::{collections::HashMap, ops::Deref, sync::Arc};
+use std::{collections::HashMap, marker::PhantomData, ops::Deref, sync::Arc};
 use tokio::sync::mpsc;
 #[allow(unused_imports)]
 use tokio::{
@@ -18,9 +18,12 @@ type SubId = u128;
 
 type Subscriptions = HashMap<SubId, (SubscriptionId, Sender<Response>)>;
 
-pub struct Relayer<T: Storage> {
+pub struct Relayer<'a, I, T: Storage<'a, I>>
+where
+    I: Iterator<Item = Result<Event, nostr_rs_storage::Error>>,
+{
     /// Storage engine, if provided the services are going to persisted in disk,
-    /// otherwise all the messages are going to be ephimeral, making this
+    /// otherwise all the messages are going to be ephemeral, making this
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// be able to perform any optimization like prefetching content while offline
     storage: Option<T>,
@@ -40,9 +43,13 @@ pub struct Relayer<T: Storage> {
     clients: RwLock<HashMap<u128, Connection>>,
     #[allow(dead_code)]
     sender: Sender<(u128, Request)>,
+    _phantom: std::marker::PhantomData<&'a I>,
 }
 
-impl<T: Storage> Relayer<T> {
+impl<'a, I, T: Storage<'a, I>> Relayer<'a, I, T>
+where
+    I: Iterator<Item = Result<Event, nostr_rs_storage::Error>>,
+{
     pub fn new(storage: Option<T>) -> (Arc<Self>, Receiver<(u128, Request)>) {
         let (sender, receiver) = channel(100_000);
         (
@@ -52,6 +59,7 @@ impl<T: Storage> Relayer<T> {
                 subscriptions_ids_index: RwLock::new(HashMap::new()),
                 clients: RwLock::new(HashMap::new()),
                 sender,
+                _phantom: PhantomData,
             }),
             receiver,
         )
@@ -76,7 +84,7 @@ impl<T: Storage> Relayer<T> {
     }
 
     fn recv_request_from_client(
-        &self,
+        &'a self,
         connection: &Connection,
         request: Request,
     ) -> Result<Option<Request>, Error> {
@@ -122,10 +130,8 @@ impl<T: Storage> Relayer<T> {
                 if let Some(storage) = self.storage.as_ref() {
                     // Sent all events that match the filter that are stored in our database
                     for filter in request.filters.clone().into_iter() {
-                        storage
-                            .get_by_filter(filter)?
-                            .into_iter()
-                            .for_each(|event| {
+                        storage.get_by_filter(filter)?.for_each(|event| {
+                            if let Ok(event) = event {
                                 let _ = connection.send(
                                     relayer::Event {
                                         subscription_id: request.subscription_id.clone(),
@@ -133,7 +139,8 @@ impl<T: Storage> Relayer<T> {
                                     }
                                     .into(),
                                 );
-                            });
+                            }
+                        });
                     }
                 }
 
@@ -159,7 +166,7 @@ impl<T: Storage> Relayer<T> {
     }
 
     pub async fn recv(
-        &self,
+        &'a self,
         receiver: &mut Receiver<(u128, Request)>,
     ) -> Result<Option<Request>, Error> {
         let (conn_id, request) = if let Some(request) = receiver.recv().await {

+ 5 - 16
src/bin/dump.rs

@@ -1,6 +1,6 @@
 use futures::Future;
-use nostr_rs_client::{Error as ClientError, Event, Relayers};
-use nostr_rs_storage::RocksDb;
+use nostr_rs_client::{Error as ClientError, Event, Pool};
+use nostr_rs_storage::{RocksDb, Storage};
 use nostr_rs_types::{client::Subscribe, Request, Response};
 use std::pin::Pin;
 use tokio::sync::mpsc;
@@ -16,7 +16,7 @@ pub enum Error {
 
 fn on_connection(
     host: &str,
-    socket: mpsc::Sender<Request>,
+    _socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
     println!("Reconnecting to {}", host);
     Box::pin(async move {})
@@ -25,7 +25,7 @@ fn on_connection(
 #[tokio::main]
 async fn main() {
     env_logger::init();
-    let (mut clients, mut receiver) = Relayers::new();
+    let mut clients = Pool::new();
     clients.send(Subscribe::default().into()).await;
 
     let _ = clients
@@ -48,19 +48,8 @@ async fn main() {
         .await;
     let db = RocksDb::new("./db").expect("db");
 
-    tokio::spawn(async move {
-        loop {
-            clients.get_connections().iter().for_each(|relayer| {
-                if relayer.is_connected() {
-                    log::warn!("Connected to {}", relayer.url);
-                }
-            });
-            tokio::time::sleep(std::time::Duration::from_secs(10)).await;
-        }
-    });
-
     loop {
-        if let Some((msg, _relayed_by)) = receiver.recv().await {
+        if let Some((msg, _relayed_by)) = clients.recv().await {
             match msg {
                 Event::Response(Response::Event(x)) => {
                     let event = x.event;

+ 63 - 50
src/main.rs

@@ -9,6 +9,10 @@ use tokio::{
     time::{sleep, Duration},
 };
 
+mod config;
+
+use config::Config;
+
 fn on_connection(
     host: &str,
     _socket: mpsc::Sender<Request>,
@@ -17,12 +21,13 @@ fn on_connection(
 }
 
 pub struct Connection {
-    connection_to_relayers: Arc<nostr_rs_client::Relayers>,
+    connection_to_relayers: Arc<nostr_rs_client::Pool>,
     receive_from_relayer: mpsc::Receiver<Response>,
 }
 
-fn create_listener(relayer: Arc<nostr_rs_relayer::Relayer>) -> nostr_rs_client::Relayers {
-    let (clients, mut client_receiver) = nostr_rs_client::Relayers::new();
+/*
+fn create_listener(relayer: Arc<nostr_rs_relayer::Relayer>) -> nostr_rs_client::Pool {
+    let (clients, mut client_receiver) = nostr_rs_client::Pool::new();
 
     tokio::spawn(async move {
         loop {
@@ -39,13 +44,14 @@ fn create_listener(relayer: Arc<nostr_rs_relayer::Relayer>) -> nostr_rs_client::
 
     clients
 }
+*/
 
 #[tokio::main]
 async fn main() {
     env_logger::init();
     let db = RocksDb::new("./relayer-db").expect("db");
-    let (relayer, mut server_receiver) = nostr_rs_relayer::Relayer::new(db);
-    let (mut clients, mut client_receiver) = nostr_rs_client::Relayers::new();
+    let (relayer, mut server_receiver) = nostr_rs_relayer::Relayer::new(Some(db));
+    let mut clients = nostr_rs_client::Pool::new();
     let _ = clients
         .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
         .await
@@ -69,62 +75,69 @@ async fn main() {
     let addr = "127.0.0.1:3000";
     let listener = TcpListener::bind(&addr).await.unwrap();
 
+    clients.send(Request::Request(Default::default())).await;
+    loop {
+        tokio::select! {
+            Some((event, url)) = clients.recv() => {
+                println!("Recv: {:?} from {}", event, url);
+            }
+        }
+    }
+    /*
     let relayer_for_recv = relayer.clone();
     let relayer_for_client = relayer.clone();
     let relayer_for_republisher = relayer.clone();
 
-    let clients = Arc::new(clients);
-    let clients_for_republisher = clients.clone();
-
-    tokio::spawn(async move {
-        loop {
-            if let Ok(events) = relayer_for_republisher.get_db().get_local_events() {
-                println!("Rebroadcast: {:?} ", events);
-                for event in events.into_iter() {
-                    println!("Rebroadcasting: {}", serde_json::to_string(&event).unwrap());
-                    let _ = clients_for_republisher.send(event.into()).await;
+        tokio::spawn(async move {
+            loop {
+                if let Ok(events) = relayer_for_republisher.get_db().get_local_events() {
+                    println!("Rebroadcast: {:?} ", events);
+                    for event in events.into_iter() {
+                        println!("Rebroadcasting: {}", serde_json::to_string(&event).unwrap());
+                        let _ = clients_for_republisher.send(event.into()).await;
+                    }
                 }
+                sleep(Duration::from_secs(360)).await;
             }
-            sleep(Duration::from_secs(360)).await;
-        }
-    });
-
-    tokio::spawn(async move {
-        loop {
-            let (event, url) = client_receiver.recv().await.unwrap();
-            match &event {
-                Event::Response(Response::Event(event)) => {
-                    let _ = relayer_for_client.store_and_broadcast(&event.event);
-                }
-                _msg => {
-                    //println!("Sending message: {:?}", msg);
-                    //let _ = relayer_for_client.send(msg).await;
+        });
+
+        tokio::spawn(async move {
+            loop {
+                let (event, url) = client_receiver.recv().await.unwrap();
+                match &event {
+                    Event::Response(Response::Event(event)) => {
+                        let _ = relayer_for_client.store_and_broadcast(&event.event);
+                    }
+                    _msg => {
+                        //println!("Sending message: {:?}", msg);
+                        //let _ = relayer_for_client.send(msg).await;
+                    }
                 }
             }
-        }
-    });
-
-    tokio::spawn(async move {
-        loop {
-            let x = relayer_for_recv.recv(&mut server_receiver).await.unwrap();
-            if let Some(x) = x {
-                println!(
-                    "received event: {}",
-                    serde_json::to_string(&x).expect("valid json")
-                );
-                clients.send(x).await;
+        });
+
+        tokio::spawn(async move {
+            loop {
+                let x = relayer_for_recv.recv(&mut server_receiver).await.unwrap();
+                if let Some(x) = x {
+                    println!(
+                        "received event: {}",
+                        serde_json::to_string(&x).expect("valid json")
+                    );
+                    clients.send(x).await;
+                }
             }
-        }
-    });
+        });
 
-    let mut clients = HashMap::<u128, nostr_rs_client::Relayers>::new();
+        let mut clients = HashMap::<u128, nostr_rs_client::Pool>::new();
 
-    loop {
-        let (stream, _) = listener.accept().await.unwrap();
-        let addr = stream.peer_addr().unwrap();
+        loop {
+            let (stream, _) = listener.accept().await.unwrap();
+            let addr = stream.peer_addr().unwrap();
 
-        println!("Client {} connected", addr);
+            println!("Client {} connected", addr);
 
-        let _ = relayer.add_connection(None, stream).await;
-    }
+            let _ = relayer.add_connection(None, stream).await;
+        }
+        */
 }