Browse Source

Merge branch 'improvements-2024-07-27' of cesar/nostr-prototype into main

Cesar Rodas 3 months ago
parent
commit
4561fb379a

+ 82 - 38
Cargo.lock

@@ -42,6 +42,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "async-trait"
+version = "0.1.81"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.72",
+]
+
+[[package]]
 name = "autocfg"
 version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -353,18 +364,18 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
 
 [[package]]
 name = "form_urlencoded"
-version = "1.1.0"
+version = "1.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
+checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
 dependencies = [
  "percent-encoding",
 ]
 
 [[package]]
 name = "futures"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
+checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
 dependencies = [
  "futures-channel",
  "futures-core",
@@ -377,9 +388,9 @@ dependencies = [
 
 [[package]]
 name = "futures-channel"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
+checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
 dependencies = [
  "futures-core",
  "futures-sink",
@@ -387,15 +398,15 @@ dependencies = [
 
 [[package]]
 name = "futures-core"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
+checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
 
 [[package]]
 name = "futures-executor"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
+checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
 dependencies = [
  "futures-core",
  "futures-task",
@@ -404,38 +415,38 @@ dependencies = [
 
 [[package]]
 name = "futures-io"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
+checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
 
 [[package]]
 name = "futures-macro"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
+checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.29",
+ "syn 2.0.72",
 ]
 
 [[package]]
 name = "futures-sink"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
+checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
 
 [[package]]
 name = "futures-task"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
+checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
 
 [[package]]
 name = "futures-util"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
+checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
 dependencies = [
  "futures-channel",
  "futures-core",
@@ -638,9 +649,9 @@ dependencies = [
 
 [[package]]
 name = "idna"
-version = "0.3.0"
+version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
+checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
 dependencies = [
  "unicode-bidi",
  "unicode-normalization",
@@ -888,6 +899,7 @@ dependencies = [
  "thiserror",
  "tokio",
  "toml",
+ "url",
 ]
 
 [[package]]
@@ -897,6 +909,7 @@ dependencies = [
  "futures",
  "futures-util",
  "log",
+ "nostr-rs-relayer",
  "nostr-rs-types",
  "parking_lot",
  "serde_json",
@@ -907,6 +920,32 @@ dependencies = [
 ]
 
 [[package]]
+name = "nostr-rs-dump"
+version = "0.1.0"
+dependencies = [
+ "env_logger",
+ "futures",
+ "futures-util",
+ "instant-acme",
+ "log",
+ "nostr-rs-client",
+ "nostr-rs-types",
+ "serde",
+ "serde_json",
+ "thiserror",
+ "tokio",
+ "toml",
+]
+
+[[package]]
+name = "nostr-rs-memory"
+version = "0.1.0"
+dependencies = [
+ "nostr-rs-storage-base",
+ "nostr-rs-types",
+]
+
+[[package]]
 name = "nostr-rs-relayer"
 version = "0.1.0"
 dependencies = [
@@ -927,7 +966,9 @@ dependencies = [
 name = "nostr-rs-rocksdb"
 version = "0.1.0"
 dependencies = [
+ "async-trait",
  "chrono",
+ "futures",
  "nostr-rs-storage-base",
  "nostr-rs-types",
  "rocksdb",
@@ -938,6 +979,8 @@ dependencies = [
 name = "nostr-rs-storage-base"
 version = "0.1.0"
 dependencies = [
+ "async-trait",
+ "futures",
  "nostr-rs-types",
  "parking_lot",
  "rand",
@@ -994,9 +1037,9 @@ dependencies = [
 
 [[package]]
 name = "once_cell"
-version = "1.17.1"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "openssl-probe"
@@ -1035,9 +1078,9 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
 
 [[package]]
 name = "percent-encoding"
-version = "2.2.0"
+version = "2.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
+checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
 
 [[package]]
 name = "pin-project-lite"
@@ -1065,18 +1108,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.66"
+version = "1.0.86"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
+checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
 dependencies = [
  "unicode-ident",
 ]
 
 [[package]]
 name = "quote"
-version = "1.0.33"
+version = "1.0.36"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
+checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
 dependencies = [
  "proc-macro2",
 ]
@@ -1317,7 +1360,7 @@ checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.29",
+ "syn 2.0.72",
 ]
 
 [[package]]
@@ -1431,9 +1474,9 @@ dependencies = [
 
 [[package]]
 name = "syn"
-version = "2.0.29"
+version = "2.0.72"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a"
+checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1466,7 +1509,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.29",
+ "syn 2.0.72",
 ]
 
 [[package]]
@@ -1522,7 +1565,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.29",
+ "syn 2.0.72",
 ]
 
 [[package]]
@@ -1694,13 +1737,14 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
 
 [[package]]
 name = "url"
-version = "2.3.1"
+version = "2.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
+checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
 dependencies = [
  "form_urlencoded",
  "idna",
  "percent-encoding",
+ "serde",
 ]
 
 [[package]]

+ 9 - 2
Cargo.toml

@@ -4,7 +4,13 @@ version = "0.1.0"
 edition = "2021"
 
 [workspace]
-members = ["crates/types", "crates/client", "crates/relayer", "crates/storage/base", "crates/storage/rocksdb"]
+members = [
+    "crates/types",
+    "crates/client",
+    "crates/relayer",
+    "crates/storage/base",
+    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory",
+]
 
 [dependencies]
 nostr-rs-types = { path = "crates/types" }
@@ -19,6 +25,7 @@ log = "0.4.17"
 thiserror = "1.0.40"
 futures = "0.3.28"
 instant-acme = "0.2.0"
-serde = "1.0.183"
+serde = { version = "1.0.183", features = ["derive"] }
 toml = "0.7.6"
 serde_json = "1.0.105"
+url = { version = "2.5.2", features = ["serde"] }

+ 8 - 1
crates/client/Cargo.toml

@@ -7,10 +7,17 @@ edition = "2021"
 thiserror = "1.0.40"
 nostr-rs-types = { path = "../types" }
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
-tokio-tungstenite = { version = "0.18.0", features = ["rustls", "rustls-native-certs", "rustls-tls-native-roots"] }
+tokio-tungstenite = { version = "0.18.0", features = [
+    "rustls",
+    "rustls-native-certs",
+    "rustls-tls-native-roots",
+] }
 url = "2.3.1"
 serde_json = "1.0.94"
 futures-util = "0.3.27"
 parking_lot = "0.12.1"
 log = "0.4.17"
 futures = "0.3.28"
+
+[dev-dependencies]
+nostr-rs-relayer = { path = "../relayer" }

+ 109 - 85
crates/client/src/relayer.rs → crates/client/src/client.rs

@@ -1,98 +1,112 @@
-use crate::{pool::Event, Error};
-use futures::Future;
+use crate::Error;
+use futures::executor::block_on;
 use futures_util::{SinkExt, StreamExt};
-use nostr_rs_types::{Request, Response};
+use nostr_rs_types::{
+    client::{self, subscribe},
+    types::SubscriptionId,
+    Request, Response,
+};
 use std::{
-    pin::Pin,
+    collections::HashMap,
     sync::{
         atomic::{AtomicBool, Ordering::Relaxed},
         Arc,
     },
 };
 use tokio::{
-    sync::{mpsc, oneshot},
+    sync::{mpsc, RwLock},
+    task::JoinHandle,
     time::{sleep, timeout, Duration},
 };
 use tokio_tungstenite::{connect_async, tungstenite::Message};
 use url::Url;
 
+type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
+
+#[derive(Debug)]
+pub struct ActiveSubscription {
+    id: SubscriptionId,
+    subscriptions: Subscriptions,
+    send_to_socket: mpsc::Sender<Request>,
+}
+
+impl Drop for ActiveSubscription {
+    fn drop(&mut self) {
+        block_on(async move {
+            self.subscriptions.write().await.remove(&self.id);
+            let _ = self
+                .send_to_socket
+                .send(nostr_rs_types::client::Close(self.id.clone()).into())
+                .await;
+        });
+    }
+}
+
 /// Relayer object
 #[derive(Debug)]
-pub struct Relayer {
+pub struct Client {
     /// URL of the relayer
-    pub url: String,
+    pub url: Url,
     /// Sender to the relayer. This can be used to send a Requests to this
     /// relayer
     pub send_to_socket: mpsc::Sender<Request>,
+
+    subscriptions: Subscriptions,
+
+    worker: JoinHandle<()>,
+
     is_connected: Arc<AtomicBool>,
-    /// This sender signals to background connection to stop
-    stop_service: oneshot::Sender<()>,
 }
 
 const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
 
-impl Relayer {
+impl Drop for Client {
+    fn drop(&mut self) {
+        self.worker.abort()
+    }
+}
+
+impl Client {
     /// Creates a new relayer
-    pub fn new<F>(
-        broadcast_to_listeners: mpsc::Sender<(Event, String)>,
-        max_connections_attempts: u16,
-        url: &str,
-        on_connection: Option<F>,
-    ) -> Result<Self, Error>
-    where
-        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
-            + Send
-            + Sync
-            + 'static,
-    {
+    pub fn new(broadcast_to_listeners: mpsc::Sender<(Response, Url)>, url: Url) -> Self {
         let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
         let is_connected = Arc::new(AtomicBool::new(false));
-        let stop_service = Self::spawn_background_client(
+
+        let subscriptions = Arc::new(RwLock::new(HashMap::new()));
+
+        let worker = Self::spawn_background_client(
             broadcast_to_listeners,
-            sender_to_socket.clone(),
             send_to_socket,
-            url,
-            max_connections_attempts,
+            url.clone(),
             is_connected.clone(),
-            on_connection,
-        )?;
+            subscriptions.clone(),
+        );
 
-        Ok(Self {
-            url: url.to_owned(),
+        Self {
+            url,
             is_connected,
             send_to_socket: sender_to_socket,
-            stop_service,
-        })
+            subscriptions,
+            worker,
+        }
     }
 
-    fn spawn_background_client<F>(
-        broadcast_to_listeners: mpsc::Sender<(Event, String)>,
-        sender_to_socket: mpsc::Sender<Request>,
+    fn spawn_background_client(
+        broadcast_to_listeners: mpsc::Sender<(Response, Url)>,
         mut send_to_socket: mpsc::Receiver<Request>,
-        url_str: &str,
-        max_connections_attempts: u16,
+        url: Url,
         is_connected: Arc<AtomicBool>,
-        on_connection: Option<F>,
-    ) -> Result<oneshot::Sender<()>, Error>
-    where
-        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
-            + Send
-            + Sync
-            + 'static,
-    {
-        let (stop_service, mut stopper_recv) = oneshot::channel();
-
-        let url = url_str.to_owned();
-        let url_parsed = Url::parse(&url)?;
+        send_on_connection: Subscriptions,
+    ) -> JoinHandle<()> {
+        is_connected.store(false, Relaxed);
 
         tokio::spawn(async move {
-            let mut reconnect = true;
             let mut connection_attempts = 0;
 
-            while reconnect && connection_attempts <= max_connections_attempts {
+            loop {
                 log::warn!("{}: Connect attempt {}", url, connection_attempts);
                 connection_attempts += 1;
-                let mut socket = match connect_async(url_parsed.clone()).await {
+                let mut socket = match connect_async(url.clone()).await {
                     Ok(x) => x.0,
                     Err(err) => {
                         log::warn!("{}: Failed to connect: {}", url, err);
@@ -102,19 +116,24 @@ impl Relayer {
                 };
 
                 log::info!("Connected to {}", url);
-
-                if let Some(on_connection) = &on_connection {
-                    on_connection(&url, sender_to_socket.clone()).await;
+                connection_attempts = 0;
+
+                let subscriptions = send_on_connection
+                    .read()
+                    .await
+                    .iter()
+                    .filter_map(|x| serde_json::to_string(&Request::Request(x.1.clone())).ok())
+                    .map(Message::Text)
+                    .collect::<Vec<_>>();
+
+                for msg in subscriptions {
+                    if let Err(x) = socket.send(msg).await {
+                        log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                    }
                 }
 
                 loop {
                     tokio::select! {
-                        Ok(()) = &mut stopper_recv => {
-                            log::warn!("{}: Breaking client due external signal", url);
-                            reconnect = false;
-                            break;
-                        },
-
                         Some(msg) = send_to_socket.recv() => {
                             if let Ok(json) = serde_json::to_string(&msg) {
                                 log::info!("{}: Sending {}", url, json);
@@ -125,8 +144,8 @@ impl Relayer {
                             }
                         }
                         msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
-                            is_connected.store(true, Relaxed);
                             let msg = if let Ok(Some(Ok(msg))) = msg {
+                                is_connected.store(true, Relaxed);
                                     match msg {
                                         Message::Text(text) => text,
                                         Message::Ping(msg) => {
@@ -151,13 +170,12 @@ impl Relayer {
                             }
 
                             log::info!("New message: {}", msg);
-                            connection_attempts = 0;
 
 
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = broadcast_to_listeners.try_send((Event::Response(msg.into()), url.to_owned())) {
+                                if let Err(error) = broadcast_to_listeners.try_send((msg.into(), url.clone())) {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }
@@ -174,20 +192,7 @@ impl Relayer {
                 // Throttle down to not spam the server with reconnections
                 sleep(Duration::from_millis(500)).await;
             }
-
-            let _ = broadcast_to_listeners.try_send((Event::Disconnected, "".to_owned()));
-
-            log::warn!("{}: Disconnected", url);
-        });
-
-        Ok(stop_service)
-    }
-
-    /// Checks if the relayer background connection is running. It is not
-    /// guaranteed there is an active connection, it may be in the process of
-    /// reconnecting.
-    pub fn is_running(&self) -> bool {
-        !self.stop_service.is_closed()
+        })
     }
 
     /// Checks if the relayer is connected. It is guaranteed that the relayer is
@@ -196,16 +201,35 @@ impl Relayer {
         self.is_connected.load(Relaxed)
     }
 
-    /// Sends a requests to this relayer
-    pub async fn send(&self, request: Request) -> Result<(), Error> {
+    /// Creates a new subscription
+    pub async fn subscribe(
+        &self,
+        subscription: subscribe::Subscribe,
+    ) -> Result<ActiveSubscription, Error> {
+        let id = subscription.subscription_id.clone();
+
+        self.subscriptions
+            .write()
+            .await
+            .insert(id.clone(), subscription.clone());
+
         self.send_to_socket
-            .send(request)
+            .send(Request::Request(subscription))
             .await
-            .map_err(|e| Error::Sync(Box::new(e)))
+            .map_err(|e| Error::Sync(Box::new(e)))?;
+
+        Ok(ActiveSubscription {
+            id,
+            subscriptions: self.subscriptions.clone(),
+            send_to_socket: self.send_to_socket.clone(),
+        })
     }
 
-    /// Stops the background thread that has the connection to this relayer
-    pub async fn disconnect(self) {
-        let _ = self.stop_service.send(());
+    /// Posts an event to the relayer
+    pub async fn post(&self, event: client::Event) -> Result<(), Error> {
+        self.send_to_socket
+            .send(event.into())
+            .await
+            .map_err(|e| Error::Sync(Box::new(e)))
     }
 }

+ 2 - 6
crates/client/src/lib.rs

@@ -8,12 +8,8 @@
 //!
 //! It will also have reconnection logic built-in internally.
 #![deny(missing_docs, warnings)]
+mod client;
 mod error;
 mod pool;
-mod relayer;
 
-pub use self::{
-    error::Error,
-    pool::{Event, Pool},
-    relayer::Relayer,
-};
+pub use self::{client::Client, error::Error, pool::Pool};

+ 58 - 79
crates/client/src/pool.rs

@@ -1,11 +1,16 @@
 //! Relayers
 //!
 //! This is the main entry point to the client library.
-use crate::{Error, Relayer};
-use futures::Future;
-use nostr_rs_types::{Request, Response};
-use std::{collections::HashMap, pin::Pin};
+use crate::{client::ActiveSubscription, Client, Error};
+use futures::future::join_all;
+use nostr_rs_types::{
+    client::{self, subscribe},
+    types::SubscriptionId,
+    Response,
+};
+use std::collections::HashMap;
 use tokio::sync::mpsc;
+use url::Url;
 
 /// Clients
 ///
@@ -14,9 +19,10 @@ use tokio::sync::mpsc;
 /// time, and to receive messages
 #[derive(Debug)]
 pub struct Pool {
-    clients: HashMap<String, Relayer>,
-    sender: mpsc::Sender<(Event, String)>,
-    receiver: mpsc::Receiver<(Event, String)>,
+    clients: HashMap<Url, Client>,
+    sender: mpsc::Sender<(Response, Url)>,
+    receiver: mpsc::Receiver<(Response, Url)>,
+    subscriptions: HashMap<SubscriptionId, Vec<ActiveSubscription>>,
 }
 
 impl Default for Pool {
@@ -25,18 +31,6 @@ impl Default for Pool {
     }
 }
 
-/// Client event
-///
-/// This type wraps a response a disconnected event. The disconnected will be
-/// the last event to be sent.
-#[derive(Debug, Clone)]
-pub enum Event {
-    /// A response
-    Response(Box<Response>),
-    /// A disconnection event
-    Disconnected,
-}
-
 const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 10_000;
 
 impl Pool {
@@ -46,91 +40,76 @@ impl Pool {
         Self {
             clients: HashMap::new(),
             receiver,
+            subscriptions: Default::default(),
             sender,
         }
     }
 
     /// Tries to receive a message from any of the connected relayers
-    pub fn try_recv(&mut self) -> Option<(Event, String)> {
+    pub fn try_recv(&mut self) -> Option<(Response, Url)> {
         self.receiver.try_recv().ok()
     }
 
     /// Receives a message from any of the connected relayers
-    pub async fn recv(&mut self) -> Option<(Event, String)> {
+    pub async fn recv(&mut self) -> Option<(Response, Url)> {
         self.receiver.recv().await
     }
 
+    /// Subscribes to all the connected relayers
+    pub async fn subscribe(&mut self, subscription: subscribe::Subscribe) -> Result<(), Error> {
+        let wait_all = self
+            .clients
+            .values()
+            .map(|sender| sender.subscribe(subscription.clone()))
+            .collect::<Vec<_>>();
+
+        self.subscriptions.insert(
+            subscription.subscription_id,
+            join_all(wait_all)
+                .await
+                .into_iter()
+                .collect::<Result<Vec<_>, _>>()?,
+        );
+
+        Ok(())
+    }
+
     /// Sends a request to all the connected relayers
-    pub async fn send(&self, request: Request) {
-        for (_, sender) in self.clients.iter() {
-            let _ = sender.send(request.clone()).await;
-        }
+    pub async fn post(&self, request: client::Event) {
+        let wait_all = self
+            .clients
+            .values()
+            .map(|sender| sender.post(request.clone()))
+            .collect::<Vec<_>>();
+
+        join_all(wait_all).await;
     }
 
     /// Returns a vector to all outgoing connections
-    pub fn get_connections(&self) -> Vec<&Relayer> {
-        self.clients.values().collect::<Vec<&Relayer>>()
+    pub fn get_connections(&self) -> Vec<&Client> {
+        self.clients.values().collect::<Vec<&Client>>()
     }
 
-    /// Returns the number of active connections. If a connection to a relayer
-    /// is not active it will be removed from the list
-    pub fn check_active_connections(&mut self) -> usize {
-        let mut to_remove = vec![];
-        for (url, client) in self.clients.iter() {
-            if !client.is_running() {
-                to_remove.push(url.to_owned());
-            }
-        }
-
-        for url in to_remove.iter() {
-            self.clients.remove(url);
-        }
-
-        self.clients.len()
+    /// Returns the number of active connections.
+    pub fn check_active_connections(&self) -> usize {
+        self.clients
+            .iter()
+            .filter(|(_, relayer)| relayer.is_connected())
+            .collect::<Vec<_>>()
+            .len()
     }
 
-    /// Creates a connection to a new relayer. If the connection is successful a
-    /// Callback will be called, with a list of previously sent requests, and a
-    /// Sender to send new requests to this relayer alone.
-    ///
-    /// The same callback will be called for every reconnection to the same
-    /// relayer, also the callback will be called, giving the chance to re-send
-    /// sent requests to the new connections
+    /// Creates a connection to a new relayer.
     ///
     /// This function will open a connection at most once, if a connection
     /// already exists false will be returned
-    pub fn connect_to<F>(
-        mut self,
-        url: &str,
-        max_connections_attempts: u16,
-        on_connection: Option<F>,
-    ) -> Result<Self, Error>
-    where
-        F: (Fn(&str, mpsc::Sender<Request>) -> Pin<Box<dyn Future<Output = ()> + Send>>)
-            + Send
-            + Sync
-            + 'static,
-    {
-        if self.clients.get(url).is_none() {
+    pub fn connect_to(mut self, url: Url) -> Self {
+        if !self.clients.contains_key(&url) {
             log::warn!("Connecting to {}", url);
-            self.clients.insert(
-                url.to_owned(),
-                Relayer::new(
-                    self.sender.clone(),
-                    max_connections_attempts,
-                    url,
-                    on_connection,
-                )?,
-            );
+            self.clients
+                .insert(url.clone(), Client::new(self.sender.clone(), url));
         }
 
-        Ok(self)
-    }
-
-    /// Disconnects from all relayers
-    pub async fn stop(&mut self) {
-        for (_, client) in self.clients.drain() {
-            client.disconnect().await;
-        }
+        self
     }
 }

+ 18 - 0
crates/dump/Cargo.toml

@@ -0,0 +1,18 @@
+[package]
+name = "nostr-rs-dump"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+nostr-rs-types = { path = "../types" }
+nostr-rs-client = { path = "../client" }
+tokio = { version = "1.26.0", features = ["full"] }
+env_logger = "0.10.0"
+futures-util = "0.3.27"
+log = "0.4.17"
+thiserror = "1.0.40"
+futures = "0.3.28"
+instant-acme = "0.2.0"
+serde = "1.0.183"
+toml = "0.7.6"
+serde_json = "1.0.105"

+ 44 - 0
crates/dump/src/main.rs

@@ -0,0 +1,44 @@
+use nostr_rs_client::{Error as ClientError, Pool};
+use nostr_rs_types::{client::Subscribe, Response};
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("Nostr: {0}")]
+    Addr(#[from] nostr_rs_types::types::addr::Error),
+
+    #[error("client: {0}")]
+    Client(#[from] ClientError),
+}
+
+#[tokio::main]
+async fn main() {
+    env_logger::init();
+    let mut clients = vec![
+        "wss://relay.damus.io/",
+        "wss://brb.io",
+        "wss://nos.lol",
+        "wss://relay.current.fyi",
+        "wss://eden.nostr.land",
+        "wss://relay.snort.social",
+    ]
+    .into_iter()
+    .fold(Pool::new(), |clients, host| {
+        clients.connect_to(host.parse().expect("valid url"))
+    });
+
+    let _ = clients.subscribe(Subscribe::default().into()).await;
+
+    loop {
+        if let Some((msg, relayed_by)) = clients.recv().await {
+            match msg {
+                Response::Event(x) => {
+                    println!("{} => {:?}", relayed_by, x);
+                }
+                Response::EndOfStoredEvents(_) => {}
+                msg => {
+                    println!("{} {:?}", relayed_by, msg);
+                }
+            }
+        }
+    }
+}

+ 5 - 1
crates/relayer/Cargo.toml

@@ -11,7 +11,11 @@ nostr-rs-storage-base = { path = "../storage/base" }
 futures-util = "0.3.27"
 parking_lot = "0.12.1"
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
-tokio-tungstenite = { version = "0.18.0", features = ["rustls", "rustls-native-certs", "rustls-tls-native-roots"] }
+tokio-tungstenite = { version = "0.18.0", features = [
+    "rustls",
+    "rustls-native-certs",
+    "rustls-tls-native-roots",
+] }
 thiserror = "1.0.39"
 serde_json = "1.0.94"
 rand = "0.8.5"

+ 24 - 22
crates/relayer/src/relayer.rs

@@ -1,4 +1,5 @@
 use crate::{Connection, Error, Subscription};
+use futures_util::StreamExt;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{
     relayer,
@@ -75,14 +76,14 @@ impl<T: Storage> Relayer<T> {
         Ok(id)
     }
 
-    fn recv_request_from_client(
+    async fn recv_request_from_client(
         &self,
         connection: &Connection,
         request: Request,
     ) -> Result<Option<Request>, Error> {
         match &request {
             Request::Event(event) => {
-                self.store_and_broadcast_local_event(event.deref());
+                self.store_and_broadcast_local_event(event.deref()).await;
             }
             Request::Request(request) => {
                 // Create subscription
@@ -122,17 +123,17 @@ impl<T: Storage> Relayer<T> {
                 if let Some(storage) = self.storage.as_ref() {
                     // Sent all events that match the filter that are stored in our database
                     for filter in request.filters.clone().into_iter() {
-                        storage.get_by_filter(filter)?.for_each(|event| {
-                            if let Ok(event) = event {
-                                let _ = connection.send(
-                                    relayer::Event {
-                                        subscription_id: request.subscription_id.clone(),
-                                        event,
-                                    }
-                                    .into(),
-                                );
-                            }
-                        });
+                        let mut result = storage.get_by_filter(filter).await?;
+
+                        while let Some(Ok(event)) = result.next().await {
+                            let _ = connection.send(
+                                relayer::Event {
+                                    subscription_id: request.subscription_id.clone(),
+                                    event,
+                                }
+                                .into(),
+                            );
+                        }
                     }
                 }
 
@@ -171,7 +172,7 @@ impl<T: Storage> Relayer<T> {
             .get(&conn_id)
             .ok_or(Error::UnknownConnection(conn_id))?;
 
-        self.recv_request_from_client(connection, request)
+        self.recv_request_from_client(connection, request).await
     }
 
     pub fn send_to_conn(&self, conn_id: u128, response: Response) -> Result<(), Error> {
@@ -197,9 +198,9 @@ impl<T: Storage> Relayer<T> {
     }
 
     #[inline]
-    pub fn store_and_broadcast_local_event(&self, event: &Event) {
+    pub async fn store_and_broadcast_local_event(&self, event: &Event) {
         if let Some(storage) = self.storage.as_ref() {
-            let _ = storage.store_local_event(event);
+            let _ = storage.store_local_event(event).await;
         }
         let subscriptions = self.subscriptions.read();
 
@@ -232,7 +233,7 @@ mod test {
     use nostr_rs_rocksdb::RocksDb;
     use nostr_rs_types::Request;
 
-    fn get_db(prefill: bool) -> RocksDb {
+    async fn get_db(prefill: bool) -> RocksDb {
         let db = RocksDb::new(format!("/tmp/db/{}", get_id())).expect("db");
         if prefill {
             let events = include_str!("../tests/events.json")
@@ -241,7 +242,7 @@ mod test {
                 .collect::<Vec<Event>>();
 
             for event in events {
-                assert!(db.store(&event).expect("valid"));
+                assert!(db.store(&event).await.expect("valid"));
             }
         }
         db
@@ -257,9 +258,9 @@ mod test {
                 {\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},
                 {\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}
             ]").expect("valid object");
-        let (relayer, _) = Relayer::new(Some(get_db(true)));
+        let (relayer, _) = Relayer::new(Some(get_db(true).await));
         let (connection, mut recv) = Connection::new_for_test();
-        let _ = relayer.recv_request_from_client(&connection, request);
+        let _ = relayer.recv_request_from_client(&connection, request).await;
         // ev1
         assert_eq!(
             "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
@@ -306,9 +307,9 @@ mod test {
     #[tokio::test]
     async fn server_listener_real_time() {
         let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
-        let (relayer, _) = Relayer::new(Some(get_db(false)));
+        let (relayer, _) = Relayer::new(Some(get_db(false).await));
         let (connection, mut recv) = Connection::new_for_test();
-        let _ = relayer.recv_request_from_client(&connection, request);
+        let _ = relayer.recv_request_from_client(&connection, request).await;
         // eod
         assert!(recv
             .try_recv()
@@ -323,6 +324,7 @@ mod test {
 
         relayer
             .recv_request_from_client(&connection, new_event)
+            .await
             .expect("process event");
 
         // It is not empty

+ 2 - 0
crates/storage/base/Cargo.toml

@@ -10,6 +10,8 @@ rand = "0.8.5"
 tokio = { version = "1.32.0", features = ["sync"] }
 parking_lot = "0.12.1"
 serde_json = "1.0"
+async-trait = "0.1.81"
+futures = "0.3.30"
 
 [features]
 default = []

+ 2 - 6
crates/storage/base/src/error.rs

@@ -6,7 +6,7 @@ use std::num::TryFromIntError;
 pub enum Error {
     /// Internal database error
     #[error("Unknown: {0}")]
-    Unknown(String),
+    Internal(String),
 
     /// Serialization error
     #[error("Serde: {0}")]
@@ -14,13 +14,9 @@ pub enum Error {
 
     /// Internal error while converting types to integer
     #[error("Internal error: {0}")]
-    Internal(#[from] TryFromIntError),
+    IntErr(#[from] TryFromIntError),
 
     /// Transaction error
     #[error("Tx: {0}")]
     Tx(String),
-
-    /// Internal error
-    #[error("Unknown family column")]
-    InvalidColumnFamily,
 }

+ 31 - 12
crates/storage/base/src/notification.rs

@@ -1,7 +1,13 @@
 use crate::{Error, Storage};
+use futures::Stream;
 use nostr_rs_types::types::{Addr, Event, Filter, Kind};
 use parking_lot::RwLock;
-use std::{collections::HashMap, sync::atomic::AtomicUsize};
+use std::{
+    collections::HashMap,
+    pin::Pin,
+    sync::atomic::AtomicUsize,
+    task::{Context, Poll},
+};
 use tokio::sync::mpsc::Sender;
 
 #[allow(dead_code)]
@@ -31,18 +37,28 @@ where
     last_id: AtomicUsize,
 }
 
-pub struct SubscriptionResultFromDb<I: Iterator> {
+pub struct SubscriptionResultFromDb<I>
+where
+    I: Stream<Item = Result<Event, Error>>,
+{
     iterator: I,
 }
 
-impl<I> Iterator for SubscriptionResultFromDb<I>
+impl<I> Stream for SubscriptionResultFromDb<I>
 where
-    I: Iterator<Item = Result<Event, Error>>,
+    I: Stream<Item = Result<Event, Error>>,
 {
     type Item = Result<Event, Error>;
 
-    fn next(&mut self) -> Option<Self::Item> {
-        self.iterator.next()
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        // Safety: it's safe to use Pin::map_unchecked_mut because the iterator field
+        // is pinned as part of the SubscriptionResultFromDb struct
+        let iterator = unsafe { self.map_unchecked_mut(|s| &mut s.iterator) };
+        iterator.poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.iterator.size_hint()
     }
 }
 
@@ -61,12 +77,15 @@ where
     }
 
     /// Gets an event from the wrapped storage
-    pub fn get_event<T1: AsRef<[u8]>>(&self, id: T1) -> Result<Option<Event>, Error> {
-        self.db.get_event(id)
+    pub async fn get_event<T1: AsRef<[u8]> + Send + Sync>(
+        &self,
+        id: T1,
+    ) -> Result<Option<Event>, Error> {
+        self.db.get_event(id).await
     }
 
     /// Removes a subscription from the listener
-    pub fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
+    pub async fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
         let mut subscribers = self.subscriptions.write();
         let _ = subscribers.remove(&subscription_id);
         Ok(())
@@ -74,11 +93,11 @@ where
 
     /// Subscribes to a filter. The first streamed bytes will be reads from the
     /// database.
-    pub fn subscribe(
+    pub async fn subscribe(
         &self,
         filter: Filter,
         sender: Sender<(usize, Event)>,
-    ) -> Result<(usize, SubscriptionResultFromDb<T::Iterator<'_>>), Error> {
+    ) -> Result<(usize, SubscriptionResultFromDb<T::Stream<'_>>), Error> {
         let mut subscribers = self.subscriptions.write();
         let mut _subscription_listener = self.subscription_listener.write();
         let id = self
@@ -95,7 +114,7 @@ where
         Ok((
             id,
             SubscriptionResultFromDb {
-                iterator: self.db.get_by_filter(filter)?,
+                iterator: self.db.get_by_filter(filter).await?,
             },
         ))
     }

+ 0 - 0
crates/storage/base/src/sqlite/mod.rs


+ 12 - 10
crates/storage/base/src/storage.rs

@@ -1,40 +1,42 @@
 use crate::Error;
+use futures::Stream;
 use nostr_rs_types::types::{Event, Filter};
 
 /// Trait to store/query events
-pub trait Storage {
+#[async_trait::async_trait]
+pub trait Storage: Send + Sync {
     /// Result iterators
-    type Iterator<'a>: Iterator<Item = Result<Event, Error>>
+    type Stream<'a>: Stream<Item = Result<Event, Error>> + Unpin
     where
         Self: 'a;
 
     /// Stores a new event into the database. This action will also creates all
     /// the needed indexes to find this event later by reference, author, kind or tag.
-    fn store(&self, event: &Event) -> Result<bool, Error>;
+    async fn store(&self, event: &Event) -> Result<bool, Error>;
 
     /// Flags the current event as initiated locally. This useful to retrieve
     /// events using `get_local_events`.
-    fn set_local_event(&self, event: &Event) -> Result<(), Error>;
+    async fn set_local_event(&self, event: &Event) -> Result<(), Error>;
 
     /// Returns an event by its ID
-    fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error>;
+    async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error>;
 
     /// Get events from the database with a given filter
     ///
     /// The first step is to use one available index to get a list of event-IDs,
     /// then call `load_and_filter_events` that will load the events from the
     /// `Events` namespace and will filter them by the given parameters.
-    fn get_by_filter(&self, query: Filter) -> Result<Self::Iterator<'_>, Error>;
+    async fn get_by_filter(&self, query: Filter) -> Result<Self::Stream<'_>, Error>;
 
     /// Return a vector of all local events
-    fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Iterator<'_>, Error>;
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Stream<'_>, Error>;
 
     /// Stores an event, similar to store(Event), but keeps track of this event in a
     /// local index. This is useful to keep track of the events that are created by
     /// this node, to be broadcasted to new nodes in the future
-    fn store_local_event(&self, event: &Event) -> Result<bool, Error> {
-        let ret = self.store(event)?;
-        self.set_local_event(event)?;
+    async fn store_local_event(&self, event: &Event) -> Result<bool, Error> {
+        let ret = self.store(event).await?;
+        self.set_local_event(event).await?;
         Ok(ret)
     }
 }

+ 301 - 276
crates/storage/base/src/test.rs

@@ -2,297 +2,322 @@
 //!
 //! This crate will storage events into a database. It will also build index to
 //! find events by their tags, kind and references.
-    use super::*;
-    use nostr_rs_types::types::{Addr, Event, Filter, Kind};
-    use std::{
-        fs::File,
-        io::{BufRead, BufReader},
-    };
-
-    fn setup_db<T>(db: &T)
-    where
-        T: Storage,
-    {
-        let events = include_str!("../tests/events.json").lines()
-            .map(|line| serde_json::from_str(&line).expect("valid"))
-            .collect::<Vec<Event>>();
-
-        for event in events {
-            assert!(db.store(&event).expect("valid"));
-        }
-    }
-
-    pub fn store_and_get<T>(db: &T)
-    where
-        T: Storage,
-    {
-        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
-
-        let event: Event = serde_json::from_str(json).expect("valid");
-        assert_eq!(true, db.store(&event).expect("valid"));
-        assert_eq!(false, db.store(&event).expect("valid"));
-
-        let event1 = db.get_event(&event.id).expect("something");
-        assert_eq!(event1, Some(event));
+use super::*;
+use futures::{StreamExt, TryStreamExt};
+use nostr_rs_types::types::{Addr, Event, Filter, Kind};
+use std::{
+    fs::File,
+    io::{BufRead, BufReader},
+};
+
+async fn setup_db<T>(db: &T)
+where
+    T: Storage,
+{
+    let events = include_str!("../tests/events.json")
+        .lines()
+        .map(|line| serde_json::from_str(&line).expect("valid"))
+        .collect::<Vec<Event>>();
+
+    for event in events {
+        assert!(db.store(&event).await.expect("valid"));
     }
-
-    pub fn records_are_sorted_by_date_desc<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
-            .try_into()
-            .expect("pk");
-
-        let vec = db
-            .get_by_filter(Filter {
-                authors: vec![pk],
-                limit: 10,
-                ..Default::default()
-            })
-            .expect("set of results")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
-        let mut sorted_dates = dates.clone();
-        sorted_dates.sort_by(|a, b| b.cmp(a));
-
-        assert_eq!(vec.len(), 10);
-        assert_eq!(dates, sorted_dates);
-    }
-
-    pub fn filter_by_references<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![
-                    "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                references_to_public_key: vec![
-                    "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 1);
-    }
-
-    pub fn filter_by_references_zero_match<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![
-                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                references_to_public_key: vec![
-                    "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 0);
-    }
-
-    pub fn filter_by_references_and_kind<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                kinds: vec![Kind::Reaction, Kind::ShortTextNote],
-                references_to_event: vec![
-                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 3);
-    }
-
-    pub fn get_event_and_related_events<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-            .try_into()
-            .expect("pk");
-
-        let events = db
-            .get_by_filter(Filter {
-                ids: vec![id.clone()],
-                ..Default::default()
-            })
-            .expect("events")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        assert_eq!(events.len(), 1);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![id],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 2_538);
-
-        let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
-        kinds.sort();
-        kinds.dedup();
-
-        assert_eq!(Kind::Reaction, kinds[0]);
-        assert_eq!(Kind::Unknown(42), kinds[1]);
-    }
-
-    pub fn filter_by_authors<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+}
+
+pub async fn store_and_get<T>(db: &T)
+where
+    T: Storage,
+{
+    let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+    let event: Event = serde_json::from_str(json).expect("valid");
+    assert_eq!(true, db.store(&event).await.expect("valid"));
+    assert_eq!(false, db.store(&event).await.expect("valid"));
+
+    let event1 = db.get_event(&event.id).await.expect("something");
+    assert_eq!(event1, Some(event));
+}
+
+pub async fn records_are_sorted_by_date_desc<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
+        .try_into()
+        .expect("pk");
+
+    let vec: Vec<Event> = db
+        .get_by_filter(Filter {
+            authors: vec![pk],
+            limit: 10,
+            ..Default::default()
+        })
+        .await
+        .expect("set of results")
+        .try_collect()
+        .await
+        .expect("valid");
+
+    let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
+    let mut sorted_dates = dates.clone();
+    sorted_dates.sort_by(|a, b| b.cmp(a));
+
+    assert_eq!(vec.len(), 10);
+    assert_eq!(dates, sorted_dates);
+}
+
+pub async fn filter_by_references<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let related_events: Vec<Event> = db
+        .get_by_filter(Filter {
+            references_to_event: vec![
+                "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
                     .try_into()
                     .expect("pk"),
-                "38fb689f2fb92d932d457b1ea56715292bdf2140b2c7d282e8b8e8d644483ad6"
+            ],
+            references_to_public_key: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
                     .try_into()
                     .expect("pk"),
             ],
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 27);
-    }
-
-    pub fn filter_by_author<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+        })
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+    assert_eq!(related_events.len(), 1);
+}
+
+pub async fn filter_by_references_zero_match<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db);
+
+    let related_events: Vec<Event> = db
+        .get_by_filter(Filter {
+            references_to_event: vec![
+                "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            references_to_public_key: vec![
+                "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
                     .try_into()
                     .expect("pk"),
             ],
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 3);
-    }
-
-    pub fn filter_by_author_and_kinds<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+        })
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+    assert_eq!(related_events.len(), 0);
+}
+
+pub async fn filter_by_references_and_kind<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let related_events: Vec<Event> = db
+        .get_by_filter(Filter {
+            kinds: vec![Kind::Reaction, Kind::ShortTextNote],
+            references_to_event: vec![
+                "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
                     .try_into()
                     .expect("pk"),
             ],
-            kinds: vec![Kind::ShortTextNote, Kind::Reaction],
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("iterator")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 2);
-    }
-
-    pub fn filter_by_kind<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            kinds: vec![Kind::ShortTextNote],
+        })
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+    assert_eq!(related_events.len(), 3);
+}
+
+pub async fn get_event_and_related_events<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db);
+
+    let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+        .try_into()
+        .expect("pk");
+
+    let events: Vec<Event> = db
+        .get_by_filter(Filter {
+            ids: vec![id.clone()],
+            ..Default::default()
+        })
+        .await
+        .expect("events")
+        .try_collect()
+        .await
+        .expect("valid");
+
+    assert_eq!(events.len(), 1);
+
+    let related_events: Vec<Event> = db
+        .get_by_filter(Filter {
+            references_to_event: vec![id],
+            ..Default::default()
+        })
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+    assert_eq!(related_events.len(), 2_538);
+
+    let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
+    kinds.sort();
+    kinds.dedup();
+
+    assert_eq!(Kind::Reaction, kinds[0]);
+    assert_eq!(Kind::Unknown(42), kinds[1]);
+}
+
+pub async fn filter_by_authors<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        authors: vec![
+            "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                .try_into()
+                .expect("pk"),
+            "38fb689f2fb92d932d457b1ea56715292bdf2140b2c7d282e8b8e8d644483ad6"
+                .try_into()
+                .expect("pk"),
+        ],
+        ..Default::default()
+    };
+    let records: Vec<Event> = db
+        .get_by_filter(query)
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+    assert_eq!(records.len(), 27);
+}
+
+pub async fn filter_by_author<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        authors: vec![
+            "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                .try_into()
+                .expect("pk"),
+        ],
+        ..Default::default()
+    };
+    let records: Vec<Event> = db
+        .get_by_filter(query)
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+    assert_eq!(records.len(), 3);
+}
+
+pub async fn filter_by_author_and_kinds<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        authors: vec![
+            "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                .try_into()
+                .expect("pk"),
+        ],
+        kinds: vec![Kind::ShortTextNote, Kind::Reaction],
+        ..Default::default()
+    };
+    let records: Vec<Event> = db
+        .get_by_filter(query)
+        .await
+        .expect("iterator")
+        .try_collect()
+        .await
+        .expect("valid");
+    assert_eq!(records.len(), 2);
+}
+
+pub async fn filter_by_kind<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        kinds: vec![Kind::ShortTextNote],
+        ..Default::default()
+    };
+    let records: Vec<Event> = db
+        .get_by_filter(query)
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+
+    assert_eq!(records.len(), 1_511);
+    records
+        .iter()
+        .map(|x| x.kind())
+        .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
+}
+
+pub async fn get_local_events<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let events_from_filter: Vec<Event> = db
+        .get_by_filter(Filter {
+            limit: 10,
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 1_511);
-        records
-            .iter()
-            .map(|x| x.kind())
-            .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
+        })
+        .await
+        .expect("valid")
+        .try_collect()
+        .await
+        .expect("valid");
+
+    for event in events_from_filter.iter() {
+        db.set_local_event(event).await.expect("valid");
     }
 
-    pub fn get_local_events<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let ids = db
-            .get_by_filter(Filter {
-                limit: 10,
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        let x = ids
-            .iter()
-            .map(|event| db.set_local_event(event))
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        assert_eq!(10, ids.len());
-        assert_eq!(10, x.len());
-
-        let records = db
-            .get_local_events(None)
-            .expect("valid iterator")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(x.len(), records.len())
-    }
+    assert_eq!(10, events_from_filter.len());
+
+    let records: Vec<Event> = db
+        .get_local_events(None)
+        .await
+        .expect("valid iterator")
+        .try_collect()
+        .await
+        .expect("valid");
+
+    assert_eq!(10, records.len())
+}

+ 2 - 0
crates/storage/rocksdb/Cargo.toml

@@ -15,6 +15,8 @@ rocksdb = { version = "0.20.1", features = [
 ] }
 chrono = "0.4.26"
 serde_json = "1.0"
+async-trait = "0.1.81"
+futures = "0.3.30"
 
 [dev-dependencies]
 nostr-rs-storage-base = { path = "../base", features = ["test"] }

+ 107 - 47
crates/storage/rocksdb/src/iterator.rs

@@ -1,9 +1,24 @@
 //! Rocks DB implementation of the storage layer
 use crate::{event_filter::EventFilter, RocksDb};
+use futures::{Future, FutureExt, Stream};
 use nostr_rs_storage_base::{Error, Storage};
 use nostr_rs_types::types::Event;
 use rocksdb::{BoundColumnFamily, DBIteratorWithThreadMode, DB};
-use std::{collections::VecDeque, sync::Arc};
+use std::{
+    collections::VecDeque,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+type CurrentEventByPrefixFuture<'a> = Pin<
+    Box<
+        dyn Future<
+                Output = Result<Option<nostr_rs_types::types::Event>, nostr_rs_storage_base::Error>,
+            > + Send
+            + 'a,
+    >,
+>;
 
 pub struct WrapperIterator<'a> {
     /// Reference to the rocks db database. This is useful to load the event
@@ -29,6 +44,8 @@ pub struct WrapperIterator<'a> {
     pub prefixes: VecDeque<Vec<u8>>,
     pub limit: Option<usize>,
     pub returned: usize,
+
+    pub future_event: Option<CurrentEventByPrefixFuture<'a>>,
 }
 
 impl<'a> WrapperIterator<'a> {
@@ -36,6 +53,7 @@ impl<'a> WrapperIterator<'a> {
     /// secondary index. If no prefix is available from prefixes the functions
     /// return None, signalling upstream the are no more results
     fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
+        self.secondary_index_iterator = None;
         let prefix = self.prefixes.pop_front()?;
         self.secondary_index_iterator = Some(
             self.db
@@ -45,66 +63,108 @@ impl<'a> WrapperIterator<'a> {
         self.current_prefix = prefix;
         Some(())
     }
-}
-
-impl<'a> Iterator for WrapperIterator<'a> {
-    type Item = Result<Event, Error>;
 
-    fn next(&mut self) -> Option<Self::Item> {
-        if Some(self.returned) == self.limit {
-            return None;
-        }
-        if self.secondary_index_iterator.is_none() {
-            if self.namespace.is_some() {
-                self.select_next_prefix_using_secondary_index()?;
-            } else {
-                // No secondary index is used to query, this means the query is
-                // using the ID filter, so it is more efficient to use the
-                // primary index to prefetch events that may satisfy the query
-                loop {
-                    let prefix = self.prefixes.pop_front()?;
-                    if let Ok(Some(event)) = self.db.get_event(prefix) {
-                        if let Some(filter) = &self.filter {
-                            if filter.check_event(&event) {
-                                self.returned += 1;
-                                return Some(Ok(event));
-                            }
+    fn handle_future_call(&mut self, cx: &mut Context<'_>) -> FutureStatus {
+        if let Some(mut future_event) = self.future_event.take() {
+            match future_event.poll_unpin(cx) {
+                Poll::Ready(Ok(None)) => FutureStatus::FoundNotMatch,
+                Poll::Ready(Ok(Some(event))) => {
+                    // event is ready, apply the neccesary filters
+                    if let Some(filter) = &self.filter {
+                        if filter.check_event(&event) {
+                            FutureStatus::Found(Ok(event))
                         } else {
-                            self.returned += 1;
-                            return Some(Ok(event));
+                            FutureStatus::FoundNotMatch
                         }
+                    } else {
+                        FutureStatus::Found(Ok(event))
                     }
                 }
+                Poll::Ready(Err(error)) => return FutureStatus::Found(Err(error)),
+                Poll::Pending => FutureStatus::Pending,
             }
+        } else {
+            FutureStatus::Ended
+        }
+    }
+}
+
+enum FutureStatus {
+    Found(Result<Event, Error>),
+    Pending,
+    Ended,
+    FoundNotMatch,
+}
+
+impl<'a> Stream for WrapperIterator<'a> {
+    type Item = Result<Event, Error>;
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, None)
+    }
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if Some(self.returned) == self.limit {
+            return Poll::Ready(None);
+        }
+
+        let this = Pin::into_inner(self);
+        let db = this.db;
+
+        match this.handle_future_call(cx) {
+            FutureStatus::Found(event) => return Poll::Ready(Some(event)),
+            FutureStatus::Pending => return Poll::Pending,
+            FutureStatus::FoundNotMatch | FutureStatus::Ended => {}
         }
 
         loop {
-            loop {
-                let secondary_index = self.secondary_index_iterator.as_mut()?;
-                let (key, value) = match secondary_index.next() {
-                    Some(Ok((k, v))) => (k, v),
-                    _ => {
-                        // break this loop to select next available prefix
-                        break;
+            let secondary_index = if let Some(iterator) = this.secondary_index_iterator.as_mut() {
+                iterator
+            } else {
+                if this.namespace.is_some() {
+                    let _ = this.select_next_prefix_using_secondary_index();
+                    if let Some(iterator) = this.secondary_index_iterator.as_mut() {
+                        iterator
+                    } else {
+                        return Poll::Ready(None);
                     }
-                };
-                if !key.starts_with(&self.current_prefix) {
-                    break;
-                }
-                if let Ok(Some(event)) = self.db.get_event(value) {
-                    if let Some(filter) = &self.filter {
-                        if filter.check_event(&event) {
-                            self.returned += 1;
-                            return Some(Ok(event));
+                } else {
+                    // No secondary index is used to query, this means the query is
+                    // using the ID filter, so it is more efficient to use the
+                    // primary index to prefetch events that may satisfy the query
+                    return if let Some(prefix) = this.prefixes.pop_front() {
+                        this.future_event = Some(db.get_event(prefix));
+                        match this.handle_future_call(cx) {
+                            FutureStatus::Found(event) => Poll::Ready(Some(event)),
+                            FutureStatus::Pending => Poll::Pending,
+                            FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
                         }
                     } else {
-                        self.returned += 1;
-                        return Some(Ok(event));
+                        Poll::Ready(None)
+                    };
+                }
+            };
+
+            return match secondary_index.next() {
+                Some(Ok((key, value))) => {
+                    if !key.starts_with(&this.current_prefix) {
+                        let _ = this.select_next_prefix_using_secondary_index();
+                        continue;
+                    }
+
+                    this.future_event = Some(db.get_event(value));
+                    match this.handle_future_call(cx) {
+                        FutureStatus::Found(event) => Poll::Ready(Some(event)),
+                        FutureStatus::Pending => Poll::Pending,
+                        FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
                     }
                 }
-            }
-            // Select next prefix if available, or exists
-            self.select_next_prefix_using_secondary_index()?;
+                Some(Err(err)) => Poll::Ready(Some(Err(Error::Internal(err.to_string())))),
+                None => {
+                    let _ = this.select_next_prefix_using_secondary_index();
+                    continue;
+                }
+            };
         }
     }
 }

+ 24 - 16
crates/storage/rocksdb/src/lib.rs

@@ -1,5 +1,5 @@
 //! Rocks DB implementation of the storage layer
-use crate::{secondary_index::SecondaryIndex, iterator::WrapperIterator};
+use crate::{iterator::WrapperIterator, secondary_index::SecondaryIndex};
 use nostr_rs_storage_base::{Error, Storage};
 use nostr_rs_types::types::{Event, Filter, Tag};
 use rocksdb::{
@@ -59,7 +59,7 @@ impl RocksDb {
                 ColumnFamilyDescriptor::new(ReferenceType::Stream.as_str(), options.clone()),
             ],
         )
-        .map_err(|e| Error::Unknown(e.to_string()))?;
+        .map_err(|e| Error::Internal(e.to_string()))?;
         Ok(Self { db })
     }
 
@@ -85,13 +85,15 @@ impl RocksDb {
     ) -> Result<Arc<BoundColumnFamily>, Error> {
         self.db
             .cf_handle(namespace.as_str())
-            .ok_or(Error::InvalidColumnFamily)
+            .ok_or(Error::Internal("Unknown db-family".to_owned()))
     }
 }
 
+#[async_trait::async_trait]
 impl Storage for RocksDb {
-    type Iterator<'a> = WrapperIterator<'a>;
-    fn get_local_events(&self, limit: Option<usize>) -> Result<WrapperIterator<'_>, Error> {
+    type Stream<'a> = WrapperIterator<'a>;
+
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<WrapperIterator<'_>, Error> {
         let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
         Ok(WrapperIterator {
             db: self,
@@ -102,21 +104,24 @@ impl Storage for RocksDb {
             prefixes: VecDeque::new(),
             limit,
             returned: 0,
+            future_event: None,
         })
     }
 
-    fn set_local_event(&self, event: &Event) -> Result<(), Error> {
+    async fn set_local_event(&self, event: &Event) -> Result<(), Error> {
         let event_id = &event.id;
         let secondary_index = SecondaryIndex::new(event_id, event.created_at());
-        self.db.put_cf(
-            &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
-            secondary_index.index_by([]),
-            event_id.deref(),
-        ).map_err(|e| Error::Unknown(e.to_string()))?;
+        self.db
+            .put_cf(
+                &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
+                secondary_index.index_by([]),
+                event_id.deref(),
+            )
+            .map_err(|e| Error::Internal(e.to_string()))?;
         Ok(())
     }
 
-    fn store(&self, event: &Event) -> Result<bool, Error> {
+    async fn store(&self, event: &Event) -> Result<bool, Error> {
         let event_id = &event.id;
 
         if let Ok(Some(_)) = self.db.get_cf(
@@ -191,21 +196,23 @@ impl Storage for RocksDb {
             }
         }
 
-        self.db.write(buffer).map_err(|e| Error::Unknown(e.to_string()))?;
+        self.db
+            .write(buffer)
+            .map_err(|e| Error::Internal(e.to_string()))?;
 
         Ok(true)
     }
 
-    fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
+    async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error> {
         Ok(self
             .db
             .get_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)
-            .map_err(|e| Error::Unknown(e.to_string()))?
+            .map_err(|e| Error::Internal(e.to_string()))?
             .map(|event| serde_json::from_slice(&event))
             .transpose()?)
     }
 
-    fn get_by_filter(&self, mut query: Filter) -> Result<WrapperIterator<'_>, Error> {
+    async fn get_by_filter(&self, mut query: Filter) -> Result<WrapperIterator<'_>, Error> {
         let limit = if query.limit == 0 {
             None
         } else {
@@ -271,6 +278,7 @@ impl Storage for RocksDb {
             prefixes,
             returned: 0,
             limit,
+            future_event: None,
         })
 
         //load_events_and_filter(self, query, event_ids, for_each)

+ 18 - 0
crates/types/src/client/subscribe.rs

@@ -18,6 +18,24 @@ pub struct Subscribe {
     pub filters: Vec<types::Filter>,
 }
 
+impl From<types::Filter> for Subscribe {
+    fn from(filter: types::Filter) -> Self {
+        Self {
+            filters: vec![filter],
+            ..Default::default()
+        }
+    }
+}
+
+impl From<Vec<types::Filter>> for Subscribe {
+    fn from(filters: Vec<types::Filter>) -> Self {
+        Self {
+            filters,
+            ..Default::default()
+        }
+    }
+}
+
 impl Default for Subscribe {
     fn default() -> Self {
         Self {

+ 16 - 8
crates/types/src/types/addr.rs

@@ -9,7 +9,11 @@ use serde::{
     de::{self, Deserialize, Deserializer},
     ser::{self, Serializer},
 };
-use std::{hash::Hash, ops::Deref};
+use std::{
+    fmt::{self, Display},
+    hash::Hash,
+    ops::Deref,
+};
 use thiserror::Error;
 
 /// Errors
@@ -45,13 +49,17 @@ pub enum HumanReadablePart {
     Note,
 }
 
-impl ToString for HumanReadablePart {
-    fn to_string(&self) -> String {
-        match *self {
-            Self::NPub => "npub".to_owned(),
-            Self::NSec => "nsec".to_owned(),
-            Self::Note => "note".to_owned(),
-        }
+impl Display for HumanReadablePart {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(
+            f,
+            "{}",
+            match self {
+                Self::NPub => "npub",
+                Self::NSec => "nsec",
+                Self::Note => "note",
+            }
+        )
     }
 }
 

+ 3 - 0
crates/types/src/types/filter.rs

@@ -10,6 +10,9 @@ use serde::{Deserialize, Serialize};
 use super::Kind;
 
 /// Filter
+///
+/// As defined in Nostr, all this filters options are AND, meaning that an object must match all the
+/// requirements in order to be included in the resultset
 #[derive(Serialize, Deserialize, Default, Debug, Clone)]
 pub struct Filter {
     /// Fetch events by their Id, or subscribe

+ 7 - 4
crates/types/src/types/id.rs

@@ -4,7 +4,10 @@ use serde::{
     ser::{self, Serializer},
     Deserialize,
 };
-use std::ops::Deref;
+use std::{
+    fmt::{self, Display},
+    ops::Deref,
+};
 
 /// Event Id
 ///
@@ -25,9 +28,9 @@ impl Deref for Id {
     }
 }
 
-impl ToString for Id {
-    fn to_string(&self) -> String {
-        hex::encode(self.0)
+impl Display for Id {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{}", hex::encode(self.0))
     }
 }
 

+ 7 - 4
crates/types/src/types/signature.rs

@@ -3,7 +3,10 @@ use serde::{
     de::{self, Deserialize, Deserializer},
     ser::{self, Serializer},
 };
-use std::ops::Deref;
+use std::{
+    fmt::{self, Display},
+    ops::Deref,
+};
 use thiserror::Error;
 
 /// Errors
@@ -37,9 +40,9 @@ impl Deref for Signature {
     }
 }
 
-impl ToString for Signature {
-    fn to_string(&self) -> String {
-        hex::encode(self.0)
+impl Display for Signature {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "{}", hex::encode(self.0))
     }
 }
 

+ 9 - 5
crates/types/src/types/subscription_id.rs

@@ -3,7 +3,11 @@
 //! This mod abstract the subscription and makes sure it is valid
 use hex::ToHex;
 use rand::RngCore;
-use std::{convert::TryFrom, ops::Deref};
+use std::{
+    convert::TryFrom,
+    fmt::{self, Display},
+    ops::Deref,
+};
 use thiserror::Error;
 
 /// Error type
@@ -19,7 +23,7 @@ pub enum Error {
 /// The rules are simple, any UTF-8 valid string with fewer than 32 characters
 ///
 /// By default a random ID will be created if needed.
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 pub struct SubscriptionId(String);
 
 impl Deref for SubscriptionId {
@@ -55,9 +59,9 @@ impl Default for SubscriptionId {
     }
 }
 
-impl ToString for SubscriptionId {
-    fn to_string(&self) -> String {
-        self.0.clone()
+impl Display for SubscriptionId {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "{}", self.0)
     }
 }
 

+ 13 - 9
crates/types/src/types/tag/event.rs

@@ -1,5 +1,6 @@
 //! Event tag
 use super::Addr;
+use std::fmt::{self, Display};
 
 /// A tag that references another event
 #[derive(Debug, PartialEq, Eq, Clone)]
@@ -26,15 +27,18 @@ pub enum Marker {
     Unknown(String),
 }
 
-impl ToString for Marker {
-    fn to_string(&self) -> String {
-        (match self {
-            Self::Root => "root",
-            Self::Reply => "reply",
-            Self::Mention => "mention",
-            Self::Unknown(x) => x,
-        })
-        .to_owned()
+impl Display for Marker {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(
+            f,
+            "{}",
+            match self {
+                Self::Root => "root",
+                Self::Reply => "reply",
+                Self::Mention => "mention",
+                Self::Unknown(x) => x,
+            }
+        )
     }
 }
 

+ 0 - 69
src/bin/dump.rs

@@ -1,69 +0,0 @@
-use futures::Future;
-use nostr_rs_client::{Error as ClientError, Event, Pool};
-use nostr_rs_rocksdb::RocksDb;
-use nostr_rs_types::{client::Subscribe, Request, Response};
-use nostr_rs_storage_base::Storage;
-use std::pin::Pin;
-use tokio::sync::mpsc;
-
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
-    #[error("Nostr: {0}")]
-    Addr(#[from] nostr_rs_types::types::addr::Error),
-
-    #[error("client: {0}")]
-    Client(#[from] ClientError),
-}
-
-fn on_connection(
-    host: &str,
-    _socket: mpsc::Sender<Request>,
-) -> Pin<Box<dyn Future<Output = ()> + Send>> {
-    println!("Reconnecting to {}", host);
-    Box::pin(async move {
-        let _ = _socket.send(Subscribe::default().into()).await;
-    })
-}
-
-#[tokio::main]
-async fn main() {
-    env_logger::init();
-    let mut clients = Pool::new()
-        .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
-        .expect("valid url")
-        .connect_to("wss://brb.io", u16::MAX, Some(on_connection))
-        .expect("valid url")
-        .connect_to("wss://nos.lol", u16::MAX, Some(on_connection))
-        .expect("valid url")
-        .connect_to("wss://relay.current.fyi", u16::MAX, Some(on_connection))
-        .expect("valid url")
-        .connect_to("wss://eden.nostr.land", u16::MAX, Some(on_connection))
-        .expect("valid url")
-        .connect_to("wss://relay.snort.social", u16::MAX, Some(on_connection))
-        .expect("valid url");
-
-    clients.send(Subscribe::default().into()).await;
-    let db = RocksDb::new("./db").expect("db");
-
-    loop {
-        if let Some((msg, relayed_by)) = clients.recv().await {
-            match msg {
-                Event::Response(r) => match *r {
-                    Response::Event(x) => {
-                        println!("{} => {:?}", relayed_by, x);
-                        let event = x.event;
-                        db.store(&event).expect("valid");
-                    }
-                    Response::EndOfStoredEvents(_) => {}
-                    msg => {
-                        println!("{} {:?}", relayed_by, msg);
-                        clients.stop().await;
-                    }
-                },
-                msg => {
-                    println!("{} {:?}", relayed_by, msg);
-                }
-            }
-        }
-    }
-}

+ 10 - 6
src/config.rs

@@ -1,18 +1,22 @@
-use nostr_rs_types::types::Id;
+use nostr_rs_types::types::Addr;
 use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct Config {
-    pub relayers: Vec<String>,
-    pub max_connections_attempts: u16,
+    /// List of external relayers to connect to
+    pub relayers: Vec<url::Url>,
+    /// Local database path
     pub db_path: String,
+    /// External port
     pub port: u16,
-    pub accounts: Vec<Accounts>,
+    /// List of accounts to have the relayer
+    pub account: Vec<Account>,
+    /// Domain to create webserver
     pub domain: Option<String>,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct Accounts {
-    pub pub_key: Id,
+pub struct Account {
+    pub id: Addr,
     pub name: Option<String>,
 }

+ 62 - 12
src/main.rs

@@ -1,20 +1,20 @@
 use futures::Future;
-use nostr_rs_client::Event;
 use nostr_rs_rocksdb::RocksDb;
-use nostr_rs_types::{Request, Response};
-use std::{collections::HashMap, pin::Pin, sync::Arc};
+use nostr_rs_types::{types::Filter, Request, Response};
+use std::{collections::HashMap, env, fs, pin::Pin, sync::Arc};
 use tokio::{
     net::TcpListener,
     sync::mpsc,
     time::{sleep, Duration},
 };
+use url::Url;
 
 mod config;
 
 use config::Config;
 
 fn on_connection(
-    host: &str,
+    host: Url,
     _socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
     Box::pin(async move {})
@@ -49,26 +49,77 @@ fn create_listener(relayer: Arc<nostr_rs_relayer::Relayer>) -> nostr_rs_client::
 #[tokio::main]
 async fn main() {
     env_logger::init();
+
+    let args: Vec<String> = env::args().collect();
+    let config_path = if args.len() > 1 {
+        &args[1]
+    } else {
+        "config.toml"
+    };
+
+    let config_content = fs::read_to_string(config_path).expect("Failed to read the config file");
+
+    let config: Config = toml::from_str(&config_content).expect("Failed to parse the config file");
+    println!("{:#?}", config);
+
+    let db = RocksDb::new(&config.db_path).expect("db");
+    let mut client_pool = config
+        .relayers
+        .iter()
+        .fold(nostr_rs_client::Pool::new(), |clients, relayer_url| {
+            clients.connect_to(relayer_url.clone())
+        });
+
+    let initial_subscription = vec![
+        Filter {
+            authors: config
+                .account
+                .iter()
+                .map(|x| x.id.clone())
+                .collect::<Vec<_>>(),
+            ..Default::default()
+        },
+        Filter {
+            references_to_public_key: config
+                .account
+                .iter()
+                .map(|x| x.id.clone())
+                .collect::<Vec<_>>(),
+            ..Default::default()
+        },
+    ];
+
+    let _ = client_pool.subscribe(initial_subscription.into()).await;
+    loop {
+        tokio::select! {
+            Some((event, url)) = client_pool.recv() => {
+                println!("Recv: {} from {}", serde_json::to_string(&event).expect("valid json"), url);
+            }
+        }
+    }
+
+    /*
     let db = RocksDb::new("./relayer-db").expect("db");
     let (relayer, mut server_receiver) = nostr_rs_relayer::Relayer::new(Some(db));
     let mut clients = nostr_rs_client::Pool::new()
-        .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
+        .connect_to("wss://relay.damus.io/", Some(on_connection))
         .expect("valid url")
-        .connect_to("wss://brb.io", u16::MAX, Some(on_connection))
+        .connect_to("wss://brb.io", Some(on_connection))
         .expect("valid url")
-        .connect_to("wss://nos.lol", u16::MAX, Some(on_connection))
+        .connect_to("wss://nos.lol", Some(on_connection))
         .expect("valid url")
-        .connect_to("wss://relay.current.fyi", u16::MAX, Some(on_connection))
+        .connect_to("wss://relay.current.fyi", Some(on_connection))
         .expect("valid url")
-        .connect_to("wss://eden.nostr.land", u16::MAX, Some(on_connection))
+        .connect_to("wss://eden.nostr.land", Some(on_connection))
         .expect("valid url")
-        .connect_to("wss://relay.snort.social", u16::MAX, Some(on_connection))
+        .connect_to("wss://relay.snort.social", Some(on_connection))
         .expect("valid url");
 
     let addr = "127.0.0.1:3000";
-    let listener = TcpListener::bind(&addr).await.unwrap();
+    let listener: TcpListener = TcpListener::bind(&addr).await.unwrap();
 
     clients.send(Request::Request(Default::default())).await;
+
     loop {
         tokio::select! {
             Some((event, url)) = clients.recv() => {
@@ -76,7 +127,6 @@ async fn main() {
             }
         }
     }
-    /*
     let relayer_for_recv = relayer.clone();
     let relayer_for_client = relayer.clone();
     let relayer_for_republisher = relayer.clone();