Quellcode durchsuchen

Merge branch 'client-improvements' of cesar/nostr-prototype into main

Cesar Rodas vor 2 Monaten
Ursprung
Commit
12912c13cb
49 geänderte Dateien mit 1524 neuen und 696 gelöschten Zeilen
  1. 123 15
      Cargo.lock
  2. 1 1
      Cargo.toml
  3. 2 0
      crates/client/Cargo.toml
  4. 45 11
      crates/client/src/client.rs
  5. 4 0
      crates/client/src/error.rs
  6. 3 7
      crates/client/src/lib.rs
  7. 107 105
      crates/client/src/pool/mod.rs
  8. 267 0
      crates/client/src/pool/subscription.rs
  9. 2 0
      crates/personal-relayer/Cargo.toml
  10. 2 1
      crates/personal-relayer/src/lib.rs
  11. 1 0
      crates/relayer/Cargo.toml
  12. 34 6
      crates/relayer/src/connection/local.rs
  13. 10 12
      crates/relayer/src/connection/mod.rs
  14. 4 0
      crates/relayer/src/error.rs
  15. 0 1
      crates/relayer/src/lib.rs
  16. 324 170
      crates/relayer/src/relayer.rs
  17. 0 193
      crates/relayer/src/subscription/manager.rs
  18. 0 3
      crates/relayer/src/subscription/mod.rs
  19. 1 0
      crates/storage/base/Cargo.toml
  20. 3 2
      crates/storage/base/src/cursor.rs
  21. 1 9
      crates/storage/base/src/lib.rs
  22. 1 0
      crates/storage/memory/Cargo.toml
  23. 3 2
      crates/storage/memory/src/cursor.rs
  24. 9 8
      crates/storage/memory/src/lib.rs
  25. 1 0
      crates/storage/rocksdb/Cargo.toml
  26. 4 3
      crates/storage/rocksdb/src/cursor.rs
  27. 8 7
      crates/storage/rocksdb/src/lib.rs
  28. 13 0
      crates/subscription-manager/Cargo.toml
  29. 15 9
      crates/subscription-manager/src/filter.rs
  30. 67 43
      crates/subscription-manager/src/index.rs
  31. 213 0
      crates/subscription-manager/src/lib.rs
  32. 1 0
      crates/types/Cargo.toml
  33. 32 0
      crates/types/src/client/subscribe.rs
  34. 15 0
      crates/types/src/lib.rs
  35. 1 1
      crates/types/src/relayer/auth.rs
  36. 7 1
      crates/types/src/relayer/eose.rs
  37. 10 2
      crates/types/src/relayer/event.rs
  38. 7 1
      crates/types/src/relayer/mod.rs
  39. 1 1
      crates/types/src/relayer/notice.rs
  40. 93 12
      crates/types/src/relayer/ok.rs
  41. 4 3
      crates/types/src/response.rs
  42. 14 0
      crates/types/src/types/addr.rs
  43. 5 0
      crates/types/src/types/event.rs
  44. 2 2
      crates/types/src/types/filter.rs
  45. 1 1
      crates/types/src/types/id.rs
  46. 15 0
      crates/types/src/types/subscription_id.rs
  47. 45 63
      crates/types/src/types/tag.rs
  48. 2 0
      crates/types/tests/regression_parsing.json
  49. 1 1
      src/main.rs

+ 123 - 15
Cargo.lock

@@ -42,6 +42,55 @@ dependencies = [
 ]
 
 [[package]]
+name = "anstream"
+version = "0.6.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526"
+dependencies = [
+ "anstyle",
+ "anstyle-parse",
+ "anstyle-query",
+ "anstyle-wincon",
+ "colorchoice",
+ "is_terminal_polyfill",
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle"
+version = "1.0.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1"
+
+[[package]]
+name = "anstyle-parse"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb"
+dependencies = [
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle-query"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a"
+dependencies = [
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "anstyle-wincon"
+version = "3.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8"
+dependencies = [
+ "anstyle",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
 name = "async-trait"
 version = "0.1.81"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -215,6 +264,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "colorchoice"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
+
+[[package]]
 name = "core-foundation"
 version = "0.9.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -316,6 +371,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "406ac2a8c9eedf8af9ee1489bee9e50029278a6456c740f7454cf8a158abc816"
 
 [[package]]
+name = "env_filter"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab"
+dependencies = [
+ "log",
+ "regex",
+]
+
+[[package]]
 name = "env_logger"
 version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -329,6 +394,19 @@ dependencies = [
 ]
 
 [[package]]
+name = "env_logger"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d"
+dependencies = [
+ "anstream",
+ "anstyle",
+ "env_filter",
+ "humantime",
+ "log",
+]
+
+[[package]]
 name = "equivalent"
 version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -705,6 +783,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "is_terminal_polyfill"
+version = "1.70.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
+
+[[package]]
 name = "itoa"
 version = "1.0.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -810,12 +894,9 @@ dependencies = [
 
 [[package]]
 name = "log"
-version = "0.4.17"
+version = "0.4.22"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
-dependencies = [
- "cfg-if",
-]
+checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
 
 [[package]]
 name = "lz4-sys"
@@ -874,7 +955,7 @@ dependencies = [
 name = "nostr"
 version = "0.1.0"
 dependencies = [
- "env_logger",
+ "env_logger 0.10.0",
  "futures",
  "futures-util",
  "instant-acme",
@@ -901,6 +982,8 @@ dependencies = [
  "log",
  "nostr-rs-memory",
  "nostr-rs-relayer",
+ "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "serde_json",
  "thiserror",
@@ -913,7 +996,7 @@ dependencies = [
 name = "nostr-rs-dump"
 version = "0.1.0"
 dependencies = [
- "env_logger",
+ "env_logger 0.10.0",
  "futures",
  "futures-util",
  "instant-acme",
@@ -934,6 +1017,7 @@ dependencies = [
  "async-trait",
  "futures",
  "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "tokio",
 ]
@@ -942,7 +1026,9 @@ dependencies = [
 name = "nostr-rs-personal-relayer"
 version = "0.1.0"
 dependencies = [
+ "env_logger 0.11.5",
  "futures",
+ "log",
  "nostr-rs-client",
  "nostr-rs-memory",
  "nostr-rs-relayer",
@@ -963,6 +1049,7 @@ dependencies = [
  "nostr-rs-client",
  "nostr-rs-memory",
  "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "serde_json",
  "thiserror",
@@ -978,6 +1065,7 @@ dependencies = [
  "chrono",
  "futures",
  "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "rocksdb",
  "serde_json",
@@ -990,6 +1078,7 @@ dependencies = [
  "async-trait",
  "chrono",
  "futures",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "rand",
  "serde_json",
@@ -998,6 +1087,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "nostr-rs-subscription-manager"
+version = "0.1.0"
+dependencies = [
+ "chrono",
+ "nostr-rs-types",
+ "serde",
+ "serde_json",
+ "tokio",
+]
+
+[[package]]
 name = "nostr-rs-types"
 version = "0.1.0"
 dependencies = [
@@ -1007,6 +1107,7 @@ dependencies = [
  "custom_derive",
  "enum_derive",
  "hex",
+ "once_cell",
  "rand",
  "secp256k1",
  "serde",
@@ -1346,18 +1447,18 @@ dependencies = [
 
 [[package]]
 name = "serde"
-version = "1.0.183"
+version = "1.0.209"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
+checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.183"
+version = "1.0.209"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
+checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1366,11 +1467,12 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.105"
+version = "1.0.127"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360"
+checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad"
 dependencies = [
  "itoa",
+ "memchr",
  "ryu",
  "serde",
 ]
@@ -1530,9 +1632,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.39.2"
+version = "1.40.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1"
+checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
 dependencies = [
  "backtrace",
  "bytes",
@@ -1743,6 +1845,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
 
 [[package]]
+name = "utf8parse"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
+
+[[package]]
 name = "vcpkg"
 version = "0.2.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"

+ 1 - 1
Cargo.toml

@@ -9,7 +9,7 @@ members = [
     "crates/client",
     "crates/relayer",
     "crates/storage/base",
-    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer",
+    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer", "crates/subscription-manager",
 ]
 
 [dependencies]

+ 2 - 0
crates/client/Cargo.toml

@@ -6,6 +6,8 @@ edition = "2021"
 [dependencies]
 thiserror = "1.0.40"
 nostr-rs-types = { path = "../types" }
+nostr-rs-storage-base = { path = "../storage/base" }
+nostr-rs-subscription-manager = { path = "../subscription-manager" }
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
 tokio-tungstenite = { version = "0.18.0", features = [
     "rustls",

+ 45 - 11
crates/client/src/client.rs

@@ -1,4 +1,10 @@
-use crate::Error;
+//! Client for the nostr relayer
+//!
+//! This is a simple client with reconnection logic built-in but no load balancing
+//! nor subscription.
+//!
+//! Most likely you want to use the `Pool` client instead of this one.
+use crate::{pool::DEFAULT_CHANNEL_BUFFER_SIZE, Error};
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{
     client::{self, subscribe},
@@ -7,6 +13,7 @@ use nostr_rs_types::{
 };
 use std::{
     collections::HashMap,
+    pin::Pin,
     sync::{
         atomic::{AtomicBool, Ordering::Relaxed},
         Arc,
@@ -23,6 +30,9 @@ use url::Url;
 type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
 
 #[derive(Debug)]
+/// Active subscription
+///
+/// This must be kept in scope to keep the subscription active
 pub struct ActiveSubscription {
     id: SubscriptionId,
     subscriptions: Subscriptions,
@@ -72,18 +82,29 @@ impl Drop for Client {
 
 impl Client {
     /// Creates a new relayer
-    pub fn new(send_message_to_listener: mpsc::Sender<(Response, Url)>, url: Url) -> Self {
-        let (sender_to_socket, send_to_socket) = mpsc::channel(100_000);
+    pub fn new<F>(return_to: mpsc::Sender<(Response, Url)>, url: Url, filter: F) -> Self
+    where
+        F: Fn(
+                Response,
+                Url,
+                mpsc::Sender<(Response, Url)>,
+            ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
+            + Send
+            + Sync
+            + 'static,
+    {
+        let (sender_to_socket, send_to_socket) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
         let is_connected = Arc::new(AtomicBool::new(false));
 
         let subscriptions = Arc::new(RwLock::new(HashMap::new()));
 
         let worker = Self::spawn_background_client(
-            send_message_to_listener,
+            return_to,
             send_to_socket,
             url.clone(),
             is_connected.clone(),
             subscriptions.clone(),
+            filter,
         );
 
         Self {
@@ -100,20 +121,31 @@ impl Client {
     ///
     /// This function will return a JoinHandle that can be used to
     /// wait for the background client to finish or to cancel it.
-    fn spawn_background_client(
-        send_message_to_listener: mpsc::Sender<(Response, Url)>,
+    fn spawn_background_client<F>(
+        return_to: mpsc::Sender<(Response, Url)>,
         mut send_to_socket: mpsc::Receiver<Request>,
         url: Url,
         is_connected: Arc<AtomicBool>,
         send_on_connection: Subscriptions,
-    ) -> JoinHandle<()> {
+        filter: F,
+    ) -> JoinHandle<()>
+    where
+        F: Fn(
+                Response,
+                Url,
+                mpsc::Sender<(Response, Url)>,
+            ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
+            + Send
+            + Sync
+            + 'static,
+    {
         is_connected.store(false, Relaxed);
 
         tokio::spawn(async move {
             let mut connection_attempts = 0;
 
             loop {
-                log::warn!("{}: Connect attempt {}", url, connection_attempts);
+                log::info!("{}: Connect attempt {}", url, connection_attempts);
                 connection_attempts += 1;
                 let mut socket = match connect_async(url.clone()).await {
                     Ok(x) => x.0,
@@ -195,13 +227,15 @@ impl Client {
 
                             log::info!("New message: {}", msg);
 
-                            let msg: Result<Response, _> = serde_json::from_str(&msg);
+                            let event: Result<Response, _> = serde_json::from_str(&msg);
 
-                            if let Ok(msg) = msg {
-                                if let Err(error) = send_message_to_listener.try_send((msg, url.clone())) {
+                            if let Ok(msg) = event {
+                                if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }
+                            } else {
+                                log::error!("Failed to parse message: {:?} {}", event, msg);
                             }
                         }
                         else => {

+ 4 - 0
crates/client/src/error.rs

@@ -22,6 +22,10 @@ pub enum Error {
     #[error("There is no connection")]
     Disconnected,
 
+    /// Error sending message with the internal channel
+    #[error("Error sending message: {0}")]
+    InternalChannel(String),
+
     /// The pool was already splitted
     #[error("The pool was already splitted")]
     AlreadySplitted,

+ 3 - 7
crates/client/src/lib.rs

@@ -8,14 +8,10 @@
 //!
 //! It will also have reconnection logic built-in internally.
 #![deny(missing_docs, warnings)]
-mod client;
+pub mod client;
 mod error;
-mod pool;
+pub mod pool;
 
 pub use url::Url;
 
-pub use self::{
-    client::Client,
-    error::Error,
-    pool::{Pool, PoolSubscription},
-};
+pub use self::{client::Client, error::Error, pool::Pool};

+ 107 - 105
crates/client/src/pool.rs → crates/client/src/pool/mod.rs

@@ -1,34 +1,66 @@
 //! Relayers
 //!
 //! This is the main entry point to the client library.
-use crate::{client::ActiveSubscription, Client, Error};
+use crate::{client::ActiveSubscription as ClientActiveSubscription, Client, Error};
 use futures::future::join_all;
 use nostr_rs_types::{
     client::{self, subscribe},
-    types::SubscriptionId,
     Response,
 };
-use std::{collections::HashMap, sync::Arc};
+use std::{
+    collections::HashMap,
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
 use tokio::sync::{mpsc, RwLock};
 use url::Url;
 
-type Subscriptions =
-    Arc<RwLock<HashMap<SubscriptionId, (subscribe::Subscribe, Vec<ActiveSubscription>)>>>;
+pub mod subscription;
+
+pub(crate) type AllClients =
+    Arc<RwLock<HashMap<Url, (Arc<AtomicUsize>, (ClientActiveSubscription, Client))>>>;
+
 /// Clients
 ///
 /// This is a set of outgoing connections to relayers. This struct can connect
 /// async to N relayers offering a simple API to talk to all of them at the same
 /// time, and to receive messages
-#[derive(Debug)]
 pub struct Pool {
-    clients: RwLock<HashMap<Url, Client>>,
+    clients: AllClients,
     sender: mpsc::Sender<(Response, Url)>,
     receiver: Option<mpsc::Receiver<(Response, Url)>>,
-    subscriptions: Subscriptions,
+    subscription_manager: Arc<subscription::Manager>,
+}
+
+/// Active client
+///
+/// For each connection on the pool this object will be returned. When dropped,
+/// that connection is also dropped from the connection pool.
+pub struct ActiveClient {
+    client_id: Url,
+    counter: Arc<AtomicUsize>,
+    all_clients: AllClients,
+}
+
+impl Drop for ActiveClient {
+    fn drop(&mut self) {
+        let counter = self.counter.fetch_sub(1, Ordering::SeqCst);
+        if counter == 0 {
+            let all_clients = self.all_clients.clone();
+            let client_id = self.client_id.clone();
+            tokio::spawn(async move {
+                // remove the client from the pool, when it goes out of scope
+                // it will be disconnected
+                all_clients.write().await.remove(&client_id);
+            });
+        }
+    }
 }
 
 /// Default channel buffer size for the pool
-pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
+pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100_000;
 
 impl Default for Pool {
     fn default() -> Self {
@@ -36,44 +68,25 @@ impl Default for Pool {
         Self {
             clients: Default::default(),
             receiver: Some(receiver),
-            subscriptions: Default::default(),
+            subscription_manager: Default::default(),
             sender,
         }
     }
 }
 
-/// Return a subscription that will be removed when dropped
-#[derive(Debug)]
-pub struct PoolSubscription {
-    subscription_id: SubscriptionId,
-    subscriptions: Subscriptions,
-}
-
-impl Drop for PoolSubscription {
-    fn drop(&mut self) {
-        let subscriptions = self.subscriptions.clone();
-        let subscription_id = self.subscription_id.clone();
-        tokio::spawn(async move {
-            subscriptions.write().await.remove(&subscription_id);
-        });
-    }
-}
-
 impl Pool {
     /// Creates a new instance with a list of urls
-    pub fn new_with_clients(clients: Vec<Url>) -> Self {
-        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
-        let clients = clients
-            .into_iter()
-            .map(|url| (url.clone(), Client::new(sender.clone(), url)))
-            .collect::<HashMap<_, _>>();
-
-        Self {
-            clients: RwLock::new(clients),
-            subscriptions: Default::default(),
-            receiver: Some(receiver),
-            sender,
-        }
+    pub fn new_with_clients(clients: Vec<Url>) -> Result<(Self, Vec<ActiveClient>), Error> {
+        let pool = Self::default();
+        let connect = clients.into_iter().map(|url| pool.connect_to(url));
+
+        futures::executor::block_on(async {
+            futures::future::join_all(connect)
+                .await
+                .into_iter()
+                .collect::<Result<Vec<_>, _>>()
+        })
+        .map(|clients| (pool, clients))
     }
 
     /// Splits the pool removing the receiver to be used in a different context
@@ -93,38 +106,15 @@ impl Pool {
 
     /// Returns the number of active subscriptions
     pub async fn active_subscriptions(&self) -> usize {
-        self.subscriptions.read().await.keys().len()
+        self.subscription_manager.total_subscribers()
     }
 
     /// Subscribes to all the connected relayers
     pub async fn subscribe(
         &self,
         subscription: subscribe::Subscribe,
-    ) -> Result<PoolSubscription, Error> {
-        let clients = self.clients.read().await;
-
-        let wait_all = clients
-            .values()
-            .map(|sender| sender.subscribe(subscription.clone()))
-            .collect::<Vec<_>>();
-
-        let subscription_id = subscription.subscription_id.clone();
-
-        self.subscriptions.write().await.insert(
-            subscription_id.clone(),
-            (
-                subscription,
-                join_all(wait_all)
-                    .await
-                    .into_iter()
-                    .collect::<Result<Vec<_>, _>>()?,
-            ),
-        );
-
-        Ok(PoolSubscription {
-            subscription_id,
-            subscriptions: self.subscriptions.clone(),
-        })
+    ) -> subscription::ActiveSubscription {
+        self.subscription_manager.subcribe(subscription, None).await
     }
 
     /// Sends a request to all the connected relayers
@@ -133,7 +123,7 @@ impl Pool {
         join_all(
             clients
                 .values()
-                .map(|sender| sender.post(request.clone()))
+                .map(|(_, (_, sender))| sender.post(request.clone()))
                 .collect::<Vec<_>>(),
         )
         .await;
@@ -145,7 +135,7 @@ impl Pool {
             .read()
             .await
             .iter()
-            .filter(|(_, client)| client.is_connected())
+            .filter(|(_, (_, (_, client)))| client.is_connected())
             .collect::<Vec<_>>()
             .len()
     }
@@ -154,22 +144,43 @@ impl Pool {
     ///
     /// This function will open a connection at most once, if a connection
     /// already exists false will be returned
-    pub async fn connect_to(&self, url: Url) {
+    pub async fn connect_to(&self, url: Url) -> Result<ActiveClient, Error> {
         let mut clients = self.clients.write().await;
-        let mut subscriptions = self.subscriptions.write().await;
 
-        if !clients.contains_key(&url) {
+        let ref_id = if let Some((id, _)) = clients.get(&url) {
+            id.fetch_add(1, Ordering::SeqCst);
+            id.clone()
+        } else {
             log::warn!("Connecting to {}", url);
-            let client = Client::new(self.sender.clone(), url.clone());
-
-            for (filter, sub) in subscriptions.values_mut() {
-                let _ = client.subscribe(filter.clone()).await.map(|subscription| {
-                    sub.push(subscription);
-                });
-            }
-
-            clients.insert(url.clone(), client);
-        }
+            let subscription_manager = self.subscription_manager.clone();
+            let client = Client::new(
+                self.sender.clone(),
+                url.clone(),
+                move |response, url, return_to| {
+                    let subscription_manager = subscription_manager.clone();
+                    Box::pin(async move {
+                        subscription_manager
+                            .process_message(response, url, return_to)
+                            .await
+                    })
+                },
+            );
+
+            // subscribe to all events
+            let meta_subscription = client
+                .subscribe(subscribe::Subscribe::to_all_events())
+                .await?;
+
+            let ref_id: Arc<AtomicUsize> = Arc::new(1.into());
+            clients.insert(url.clone(), (ref_id.clone(), (meta_subscription, client)));
+            ref_id
+        };
+
+        Ok(ActiveClient {
+            client_id: url,
+            counter: ref_id,
+            all_clients: self.clients.clone(),
+        })
     }
 }
 
@@ -189,7 +200,7 @@ mod test {
         let local_addr = listener.local_addr().expect("addr");
 
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
-        let stopper = relayer.main(listener).expect("valid main loop");
+        let (_, stopper) = relayer.main(listener).expect("valid main loop");
         (
             Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
             stopper,
@@ -199,10 +210,7 @@ mod test {
     #[tokio::test]
     async fn droppable_subscription() {
         let client_pool = Pool::default();
-        let subscription = client_pool
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let subscription = client_pool.subscribe(Default::default()).await;
 
         assert_eq!(client_pool.active_subscriptions().await, 1);
         drop(subscription);
@@ -213,7 +221,7 @@ mod test {
     #[tokio::test]
     async fn connect_to_dummy_server() {
         let (addr, stopper) = dummy_server(0).await;
-        let client_pool = Pool::new_with_clients(vec![addr]);
+        let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
         assert_eq!(0, client_pool.check_active_connections().await);
 
@@ -230,13 +238,11 @@ mod test {
     #[tokio::test]
     async fn two_clients_communication() {
         let (addr, _) = dummy_server(0).await;
-        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
-        let client_pool2 = Pool::new_with_clients(vec![addr]);
+        let (mut client_pool1, _c1) =
+            Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
+        let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
-        let _sub1 = client_pool1
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let _sub1 = client_pool1.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
 
@@ -270,13 +276,11 @@ mod test {
     #[tokio::test]
     async fn reconnect_and_resubscribe() {
         let (addr, stopper) = dummy_server(0).await;
-        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
-        let client_pool2 = Pool::new_with_clients(vec![addr.clone()]);
+        let (mut client_pool1, _c1) =
+            Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
+        let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
 
-        let _sub1 = client_pool1
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let _sub1 = client_pool1.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
 
@@ -353,13 +357,11 @@ mod test {
     async fn connect_multiple_servers() {
         let (addr1, _) = dummy_server(0).await;
         let (addr2, _) = dummy_server(0).await;
-        let mut client_pool1 = Pool::new_with_clients(vec![addr1.clone(), addr2]);
-        let client_pool2 = Pool::new_with_clients(vec![addr1]);
+        let (mut client_pool1, _c1) =
+            Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
+        let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
 
-        let _sub1 = client_pool1
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let _sub1 = client_pool1.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
 

+ 267 - 0
crates/client/src/pool/subscription.rs

@@ -0,0 +1,267 @@
+//! Subscription manager
+use crate::{client, Error};
+use futures::future::join_all;
+use nostr_rs_subscription_manager::{
+    ActiveSubscription as ActiveSubscriptionInner, SubscriptionManager,
+};
+use nostr_rs_types::{
+    client::subscribe::{self, is_all_events},
+    relayer,
+    types::SubscriptionId,
+    Response,
+};
+use std::sync::{
+    atomic::{AtomicUsize, Ordering},
+    Arc,
+};
+use tokio::sync::{mpsc, Mutex};
+use url::Url;
+
+use super::AllClients;
+
+#[derive(Debug, Copy, Default, Eq, PartialEq, Clone)]
+/// Subscription status
+pub enum Status {
+    /// Subscription is awaiting to be subscribed
+    #[default]
+    Queued,
+    /// Subscribed is active
+    Subscribed,
+    /// Technically unsubscribed, and fetching future events from the relayer
+    /// from the All-Events meta subscription
+    Stale,
+}
+
+#[derive(Debug, Default)]
+struct SubscriptionInner {
+    /// Active subscription (in the client side), when this is Drop all clients unsubscribes
+    active_subscription: Option<Vec<client::ActiveSubscription>>,
+    /// Subscription status
+    status: Arc<Mutex<Status>>,
+    /// raw request
+    subscription_request: subscribe::Subscribe,
+}
+
+/// Active subscription
+///
+/// This object is responsible for keeping track of a subscription for a
+/// connection
+///
+/// This must be dropped to unsubscribe from the subscription manager
+pub struct ActiveSubscription {
+    _id: ActiveSubscriptionInner<PoolSubscriptionId, SubscriptionInner>,
+    status: Arc<Mutex<Status>>,
+    active_subscriptions: Arc<AtomicUsize>,
+    queued_subscriptions: Arc<AtomicUsize>,
+    stale_subscriptions: Arc<AtomicUsize>,
+}
+
+impl Drop for ActiveSubscription {
+    fn drop(&mut self) {
+        let active_subscriptions = self.active_subscriptions.clone();
+        let queued_subscriptions = self.queued_subscriptions.clone();
+        let stale_subscriptions = self.stale_subscriptions.clone();
+        let status = self.status.clone();
+
+        tokio::spawn(async move {
+            match *status.lock().await {
+                Status::Subscribed => active_subscriptions.fetch_sub(1, Ordering::Relaxed),
+                Status::Queued => queued_subscriptions.fetch_sub(1, Ordering::Relaxed),
+                Status::Stale => stale_subscriptions.fetch_sub(1, Ordering::Relaxed),
+            }
+        });
+    }
+}
+
+/// Pool subscription ID
+#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
+pub struct PoolSubscriptionId((SubscriptionId, Option<Url>));
+
+impl Default for PoolSubscriptionId {
+    fn default() -> Self {
+        Self((SubscriptionId::empty(), None))
+    }
+}
+
+/// Subscription manager
+///
+/// Clients who are added to the pool are automatically subscribed to all
+/// events, which is known as the All-Events subscription.
+///
+/// Subscriptions in the client pool are smarter than the raw subscriptions at
+/// the client level. These subscriptions will be active until an "End of
+/// stored" event is received; the client pool will unsubscribe at that point,
+/// and the All-Events subscriptions, filtered internally, will fetch future
+/// events. By doing so, only past events are queried, and there is a single
+/// stream of future events to be a good citizen with other relayers.
+#[derive(Default)]
+pub(crate) struct Manager {
+    subscription_manager: Arc<SubscriptionManager<PoolSubscriptionId, SubscriptionInner>>,
+    all_clients: AllClients,
+    active_subscriptions: Arc<AtomicUsize>,
+    queued_subscriptions: Arc<AtomicUsize>,
+    stale_subscriptions: Arc<AtomicUsize>,
+}
+
+/// Maximum number of subscriptions
+pub const MAX_SUBSCRIPTIONS: usize = 50;
+
+impl Manager {
+    /// Processes all messages from the client pools
+    ///
+    /// The client pool creates a subscription to the All-Events subscription,
+    /// this callback checks if there are any listener to this event, otherwise
+    /// it will not process the event.
+    pub async fn process_message(
+        self: &Arc<Self>,
+        message: Response,
+        url: Url,
+        return_to: mpsc::Sender<(Response, Url)>,
+    ) -> Result<(), Error> {
+        match message {
+            Response::EndOfStoredEvents(subscription_id) => {
+                let subscription_id = PoolSubscriptionId((subscription_id.0, None));
+                let mut subscription = self.subscription_manager.subbscriptions_mut().await;
+                if let Some(s) = subscription.get_mut(&subscription_id) {
+                    *s.status.lock().await = Status::Stale;
+                    let _ = s.active_subscription.take();
+
+                    self.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                    self.stale_subscriptions.fetch_add(1, Ordering::Relaxed);
+                }
+
+                return_to
+                    .try_send((
+                        Response::EndOfStoredEvents(subscription_id.0 .0.into()),
+                        url,
+                    ))
+                    .map_err(|e| Error::InternalChannel(e.to_string()))?;
+
+                Ok(())
+            }
+            Response::Event(relayer::Event {
+                subscription_id,
+                event,
+            }) => {
+                if !is_all_events(&subscription_id) {
+                    // This is not an All-Events subscription, it must be passed on as it is an stored event
+                    return_to
+                        .try_send((
+                            Response::Event(relayer::Event {
+                                subscription_id,
+                                event,
+                            }),
+                            url.clone(),
+                        ))
+                        .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                    return Ok(());
+                }
+
+                for id in self.subscription_manager.get_subscribers(&event).await {
+                    return_to
+                        .try_send((
+                            Response::Event(relayer::Event {
+                                subscription_id: id.0 .0.clone(),
+                                event: event.clone(),
+                            }),
+                            url.clone(),
+                        ))
+                        .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                }
+
+                Ok(())
+            }
+            any_message => {
+                return_to
+                    .try_send((any_message, url))
+                    .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                Ok(())
+            }
+        }
+    }
+
+    async fn update_active_subscriptions(self: &Arc<Self>) {
+        if self.active_subscriptions.load(Ordering::Relaxed) >= MAX_SUBSCRIPTIONS
+            || self.queued_subscriptions.load(Ordering::Relaxed) == 0
+        {
+            return;
+        }
+
+        let clients = self.all_clients.read().await;
+        let mut subscriptions = self.subscription_manager.subbscriptions_mut().await;
+
+        for subscription in subscriptions.values_mut() {
+            if *subscription.status.lock().await == Status::Queued {
+                let wait_all = clients
+                    .values()
+                    .map(|(_, (_, sender))| {
+                        sender.subscribe(subscription.subscription_request.clone())
+                    })
+                    .collect::<Vec<_>>();
+
+                if let Ok(active_subscriptions) = join_all(wait_all)
+                    .await
+                    .into_iter()
+                    .collect::<Result<Vec<_>, _>>()
+                {
+                    subscription.active_subscription = Some(active_subscriptions);
+                    *subscription.status.lock().await = Status::Subscribed;
+
+                    let queued_subscribed =
+                        self.queued_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                    let active_subscriptions =
+                        self.active_subscriptions.fetch_add(1, Ordering::Relaxed);
+                    if queued_subscribed == 0 || active_subscriptions >= MAX_SUBSCRIPTIONS {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /// Creates a new subscription with a given filters
+    pub async fn subcribe(
+        self: &Arc<Self>,
+        subscription_request: subscribe::Subscribe,
+        specific_url: Option<Url>,
+    ) -> ActiveSubscription {
+        let status = Arc::new(Mutex::new(Status::Queued));
+        let id = self
+            .subscription_manager
+            .subscribe(
+                PoolSubscriptionId((
+                    subscription_request.subscription_id.clone(),
+                    specific_url.clone(),
+                )),
+                subscription_request.filters.clone(),
+                SubscriptionInner {
+                    status: status.clone(),
+                    active_subscription: None,
+                    subscription_request,
+                },
+            )
+            .await;
+
+        self.queued_subscriptions.fetch_add(1, Ordering::Relaxed);
+
+        let this = self.clone();
+        tokio::spawn(async move {
+            this.update_active_subscriptions().await;
+        });
+
+        ActiveSubscription {
+            _id: id,
+            status,
+            active_subscriptions: self.active_subscriptions.clone(),
+            queued_subscriptions: self.queued_subscriptions.clone(),
+            stale_subscriptions: self.stale_subscriptions.clone(),
+        }
+    }
+
+    /// Total number of subscribers
+    pub fn total_subscribers(&self) -> usize {
+        self.active_subscriptions.load(Ordering::Relaxed)
+            + self.queued_subscriptions.load(Ordering::Relaxed)
+            + self.stale_subscriptions.load(Ordering::Relaxed)
+    }
+}

+ 2 - 0
crates/personal-relayer/Cargo.toml

@@ -13,3 +13,5 @@ thiserror = "1.0.39"
 url = { version = "2.5.2", features = ["serde"] }
 futures = "0.3.30"
 tokio = { version = "1.39.2", features = ["full"] }
+log = "0.4.22"
+env_logger = "0.11.5"

+ 2 - 1
crates/personal-relayer/src/lib.rs

@@ -67,7 +67,8 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
     }
 
     pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
-        let tasks = vec![self.relayer.main(server)?, tokio::spawn(async move {})];
+        let (relayer, handle) = self.relayer.main(server)?;
+        let tasks = vec![handle, tokio::spawn(async move {})];
         Ok(tasks.into())
     }
 }

+ 1 - 0
crates/relayer/Cargo.toml

@@ -9,6 +9,7 @@ edition = "2021"
 nostr-rs-types = { path = "../types" }
 nostr-rs-storage-base = { path = "../storage/base" }
 nostr-rs-client = { path = "../client" }
+nostr-rs-subscription-manager = { path = "../subscription-manager" }
 futures-util = "0.3.27"
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
 tokio-tungstenite = { version = "0.18.0", features = [

+ 34 - 6
crates/relayer/src/connection/local.rs

@@ -1,23 +1,37 @@
 //! Local connection
 //!
 //! Add types for adding local connections
-use crate::{connection::ConnectionId, Error};
+use crate::{connection::ConnectionId, Error, Relayer};
+use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{Request, Response};
+use std::sync::Arc;
 use tokio::sync::mpsc::{Receiver, Sender};
 
 /// Local connection
-pub struct LocalConnection {
+pub struct LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
+{
     sender: Sender<(ConnectionId, Request)>,
     receiver: Receiver<Response>,
-    conn_id: ConnectionId,
+    pub(crate) conn_id: ConnectionId,
+    relayer: Arc<Relayer<T>>,
 }
 
-impl LocalConnection {
+impl<T> LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
+{
     /// Receive a message from the relayer
     pub async fn recv(&mut self) -> Option<Response> {
         self.receiver.recv().await
     }
 
+    /// Try to receive a message from the relayer
+    pub fn try_recv(&mut self) -> Option<Response> {
+        self.receiver.try_recv().ok()
+    }
+
     /// Sends a request to the relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         self.sender
@@ -27,24 +41,38 @@ impl LocalConnection {
     }
 }
 
-impl
+impl<T> Drop for LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
+{
+    fn drop(&mut self) {
+        self.relayer.drop_connection(self);
+    }
+}
+
+impl<T>
     From<(
         ConnectionId,
         Receiver<Response>,
         Sender<(ConnectionId, Request)>,
-    )> for LocalConnection
+        Arc<Relayer<T>>,
+    )> for LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
 {
     fn from(
         value: (
             ConnectionId,
             Receiver<Response>,
             Sender<(ConnectionId, Request)>,
+            Arc<Relayer<T>>,
         ),
     ) -> Self {
         LocalConnection {
             conn_id: value.0,
             receiver: value.1,
             sender: value.2,
+            relayer: value.3,
         }
     }
 }

+ 10 - 12
crates/relayer/src/connection/mod.rs

@@ -1,8 +1,9 @@
-use crate::{subscription::ActiveSubscription, Error};
+use crate::{relayer::RelayerSubscriptionId, Error};
 use futures_util::{SinkExt, StreamExt};
-use nostr_rs_client::PoolSubscription;
+use nostr_rs_client::pool;
+use nostr_rs_subscription_manager::ActiveSubscription;
 use nostr_rs_types::{
-    relayer::ROk,
+    relayer::{ok::ROkStatus, ROk},
     types::{Addr, SubscriptionId},
     Request, Response,
 };
@@ -51,9 +52,11 @@ impl ConnectionId {
     }
 }
 
-type CompoundSubcription = (Option<PoolSubscription>, Vec<ActiveSubscription>);
+type CompoundSubcription = (
+    Option<pool::subscription::ActiveSubscription>,
+    ActiveSubscription<RelayerSubscriptionId, ()>,
+);
 
-#[derive(Debug)]
 /// Relayer connection
 ///
 /// The new connection struct. This struct spawn's a new worker that handles
@@ -149,8 +152,7 @@ impl Connection {
                                     log::error!("Error parsing message from client: {}", err);
                                     let reply: Response = ROk {
                                         id: Addr::default(),
-                                        status: false,
-                                        message: "Error parsing message".to_owned(),
+                                        status: ROkStatus::Error("Error parsing message".to_owned()),
                                     }.into();
                                     let reply = if let Ok(reply) = serde_json::to_string(&reply) {
                                         reply
@@ -196,11 +198,7 @@ impl Connection {
     }
 
     /// Create a subscription for this connection
-    pub async fn subscribe(
-        &self,
-        id: SubscriptionId,
-        subscriptions: (Option<PoolSubscription>, Vec<ActiveSubscription>),
-    ) {
+    pub async fn subscribe(&self, id: SubscriptionId, subscriptions: CompoundSubcription) {
         self.subscriptions.write().await.insert(id, subscriptions);
     }
 

+ 4 - 0
crates/relayer/src/error.rs

@@ -28,6 +28,10 @@ pub enum Error {
     #[error("Nostr client error: {0}")]
     Client(#[from] nostr_rs_client::Error),
 
+    /// No client connected
+    #[error("Nostr relayer is not connected to any client")]
+    NoClient,
+
     /// Unknown connections
     #[error("Unknown connection: {0}")]
     UnknownConnection(u128),

+ 0 - 1
crates/relayer/src/lib.rs

@@ -13,7 +13,6 @@
 mod connection;
 mod error;
 mod relayer;
-mod subscription;
 
 pub use self::{
     connection::{Connection, LocalConnection},

+ 324 - 170
crates/relayer/src/relayer.rs

@@ -1,12 +1,16 @@
 use crate::{
     connection::{ConnectionId, LocalConnection},
-    subscription::SubscriptionManager,
     Connection, Error,
 };
 use futures_util::StreamExt;
-use nostr_rs_client::{Error as ClientError, Pool};
+use nostr_rs_client::{Error as ClientError, Pool, Url};
 use nostr_rs_storage_base::Storage;
-use nostr_rs_types::{relayer, types::Event, Request, Response};
+use nostr_rs_subscription_manager::SubscriptionManager;
+use nostr_rs_types::{
+    relayer::{self, ROk, ROkStatus},
+    types::{Addr, Event, SubscriptionId},
+    Request, Response,
+};
 use std::{
     collections::{HashMap, HashSet},
     ops::Deref,
@@ -21,6 +25,21 @@ use tokio::{
     task::JoinHandle,
 };
 
+#[derive(Debug, Hash, Ord, PartialEq, PartialOrd, Eq, Clone)]
+pub struct RelayerSubscriptionId((SubscriptionId, ConnectionId));
+
+impl From<(SubscriptionId, ConnectionId)> for RelayerSubscriptionId {
+    fn from(value: (SubscriptionId, ConnectionId)) -> Self {
+        Self(value)
+    }
+}
+
+impl Default for RelayerSubscriptionId {
+    fn default() -> Self {
+        Self((SubscriptionId::empty(), ConnectionId::new_empty()))
+    }
+}
+
 /// Relayer struct
 ///
 pub struct Relayer<T: Storage + Send + Sync + 'static> {
@@ -30,7 +49,7 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// be able to perform any optimization like prefetching content while offline
     storage: Option<T>,
     /// Subscription manager
-    subscriptions: Arc<SubscriptionManager>,
+    subscription_manager: Arc<SubscriptionManager<RelayerSubscriptionId, ()>>,
     /// List of all active connections
     connections: RwLock<HashMap<ConnectionId, Connection>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
@@ -64,7 +83,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         let (sender, receiver) = channel(100_000);
         Ok(Self {
             storage,
-            subscriptions: Default::default(),
+            subscription_manager: Default::default(),
             send_to_relayer: sender.clone(),
             relayer_receiver: Some(receiver),
             connections: Default::default(),
@@ -76,9 +95,16 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         })
     }
 
+    /// Connects to the relayer pool
+    pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
+        let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
+        let _ = client_pool.connect_to(url).await;
+        Ok(())
+    }
+
     /// Total number of subscribers requests that actively listening for new events
     pub fn total_subscribers(&self) -> usize {
-        self.subscriptions.total_subscribers()
+        self.subscription_manager.total_subscribers()
     }
 
     /// Splits the relayer object and extract their receiver.
@@ -91,9 +117,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     ///
     /// This function consumes the object and takes the ownership. The returned
     /// JoinHandle() can be used to stop the main loop
-    pub fn main(self, server: TcpListener) -> Result<JoinHandle<()>, Error> {
+    pub fn main(self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
         let (this, mut receiver) = self.split()?;
-        Ok(tokio::spawn(async move {
+        let _self = Arc::new(this);
+        let this = _self.clone();
+
+        let handle = tokio::spawn(async move {
             loop {
                 tokio::select! {
                     Ok((stream, _)) = server.accept() => {
@@ -103,12 +132,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     Some((conn_id, request)) = receiver.recv() => {
                         // receive messages from the connection pool
                         if conn_id.is_empty() {
-                            // connection pool
+                            // message received from client pool
                             if let Request::Event(event) = request {
+                                let _ = this.broadcast(event.deref()).await;
                                 if let Some(storage) = this.storage.as_ref() {
                                     let _ = storage.store_local_event(&event).await;
                                 }
-                                this.broadcast(event.deref()).await;
                             }
                             continue;
                         }
@@ -128,7 +157,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     }
                 }
             }
-        }))
+        });
+
+        Ok((_self, handle))
     }
 
     /// Handle the client pool
@@ -136,7 +167,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     /// Main loop to consume messages from the client pool and broadcast them to the local subscribers
     fn handle_client_pool(
         client_pool: Pool,
-        sender: Sender<(ConnectionId, Request)>,
+        send_message_to_relayer: Sender<(ConnectionId, Request)>,
     ) -> Result<(Pool, JoinHandle<()>), ClientError> {
         let (mut receiver, client_pool) = client_pool.split()?;
 
@@ -145,12 +176,10 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 if let Some((response, _)) = receiver.recv().await {
                     match response {
                         Response::Event(event) => {
-                            let _ = sender
-                                .send((
-                                    ConnectionId::new_empty(),
-                                    Request::Event(event.event.into()),
-                                ))
-                                .await;
+                            let _ = send_message_to_relayer.try_send((
+                                ConnectionId::new_empty(),
+                                Request::Event(event.event.into()),
+                            ));
                         }
                         Response::EndOfStoredEvents(_) => {}
                         x => {
@@ -170,12 +199,33 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     }
 
     /// Adds a new local connection to the list of active connections.
-    pub async fn create_new_local_connection(&self) -> LocalConnection {
+    pub async fn create_new_local_connection(self: &Arc<Self>) -> LocalConnection<T> {
         let (conn, receiver) = Connection::new_local_connection();
         let conn_id = conn.get_conn_id();
         self.connections.write().await.insert(conn_id, conn);
 
-        (conn_id, receiver, self.send_to_relayer.clone()).into()
+        (
+            conn_id,
+            receiver,
+            self.send_to_relayer.clone(),
+            self.clone(),
+        )
+            .into()
+    }
+
+    /// Drops a connection from the list of active connections
+    ///
+    /// This function only works for local connections, normal connections can
+    /// be dropped on disconnection.
+    ///
+    /// This function could change in the future tu kick connections programmatically
+    pub fn drop_connection(self: &Arc<Self>, local_connection: &LocalConnection<T>) {
+        let id = local_connection.conn_id;
+        let this = self.clone();
+
+        tokio::spawn(async move {
+            this.connections.write().await.remove(&id);
+        });
     }
 
     /// Adds a new TpStream and adds it to the list of active connections.
@@ -200,32 +250,45 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         &self,
         connection: &Connection,
         request: Request,
-    ) -> Result<Option<Request>, Error> {
-        match &request {
+    ) -> Result<(), Error> {
+        match request {
             Request::Event(event) => {
-                if let Some(storage) = self.storage.as_ref() {
-                    let _ = storage.store(event).await;
-                    let _ = storage.store_local_event(event).await;
+                let event_id: Addr = event.id.clone().into();
+                if !self.broadcast(&event).await? {
+                    connection.send(
+                        ROk {
+                            id: event_id,
+                            status: ROkStatus::Duplicate,
+                        }
+                        .into(),
+                    )?;
+                    return Ok(());
                 }
 
-                self.broadcast(event).await;
+                if let Some(storage) = self.storage.as_ref() {
+                    let _ = storage.store_local_event(&event).await;
+                }
 
                 if let Some((client_pool, _)) = self.client_pool.as_ref() {
                     // pass the event to the pool of clients, so this relayer can relay
                     // their local events to the clients in the network of relayers
-                    let _ = client_pool.post(event.clone()).await;
+                    let _ = client_pool.post(event).await;
                 }
+
+                connection.send(
+                    ROk {
+                        id: event_id,
+                        status: ROkStatus::Ok,
+                    }
+                    .into(),
+                )?;
             }
             Request::Request(request) => {
                 let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref()
                 {
                     // pass the subscription request to the pool of clients, so this relayer
                     // can relay any unknown event to the clients through their subscriptions
-                    Some(
-                        client_pool
-                            .subscribe(request.filters.clone().into())
-                            .await?,
-                    )
+                    Some(client_pool.subscribe(request.clone()).await)
                 } else {
                     None
                 };
@@ -260,11 +323,11 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                         request.subscription_id.clone(),
                         (
                             foreign_subscription,
-                            self.subscriptions
+                            self.subscription_manager
                                 .subscribe(
-                                    connection.get_conn_id(),
-                                    connection.get_sender(),
-                                    request.clone(),
+                                    (request.subscription_id, connection.get_conn_id()).into(),
+                                    request.filters,
+                                    (),
                                 )
                                 .await,
                         ),
@@ -272,21 +335,37 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     .await;
             }
             Request::Close(close) => {
-                connection.unsubscribe(close).await;
+                connection.unsubscribe(&close).await;
             }
         };
 
-        Ok(Some(request))
+        Ok(())
     }
 
     #[inline]
     /// Broadcast a given event to all local subscribers
-    pub async fn broadcast(&self, event: &Event) {
+    pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
         if let Some(storage) = self.storage.as_ref() {
-            let _ = storage.store(event).await;
+            if !storage.store(event).await? {
+                return Ok(false);
+            }
         }
 
-        self.subscriptions.broadcast(event.clone());
+        let connections = self.connections.read().await;
+        for RelayerSubscriptionId((sub_id, conn_id)) in
+            self.subscription_manager.get_subscribers(event).await
+        {
+            if let Some(connection) = connections.get(&conn_id) {
+                let _ = connection.send(
+                    relayer::Event {
+                        subscription_id: sub_id,
+                        event: event.clone(),
+                    }
+                    .into(),
+                );
+            }
+        }
+        Ok(true)
     }
 }
 
@@ -314,13 +393,24 @@ mod test {
 
         let relayer =
             Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
-        let stopper = relayer.main(listener).expect("valid main loop");
+        let (_, stopper) = relayer.main(listener).expect("valid main loop");
         (
             Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
             stopper,
         )
     }
 
+    async fn dummy_server_with_relayer(
+        client_pool: Option<Pool>,
+    ) -> (Arc<Relayer<Memory>>, JoinHandle<()>) {
+        let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).await.unwrap();
+
+        let relayer =
+            Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
+        let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
+        (relayer, stopper)
+    }
+
     fn get_note_with_custom_tags(tags: Vec<Tag>) -> Event {
         let account = Account::default();
         let content = Content::ShortTextNote("".to_owned());
@@ -393,6 +483,18 @@ mod test {
         let _ = relayer
             .process_request_from_client(&connection, request)
             .await;
+
+        // ev1
+        assert_eq!(
+            ROkStatus::Ok,
+            recv.try_recv()
+                .expect("valid")
+                .as_ok()
+                .cloned()
+                .unwrap()
+                .status,
+        );
+
         // ev1
         assert_eq!(
             note,
@@ -475,7 +577,6 @@ mod test {
                 .expect("valid")
                 .as_event()
                 .expect("event")
-                .event
                 .id
                 .to_string()
         );
@@ -486,7 +587,6 @@ mod test {
                 .expect("valid")
                 .as_event()
                 .expect("event")
-                .event
                 .id
                 .to_string()
         );
@@ -497,7 +597,6 @@ mod test {
                 .expect("valid")
                 .as_event()
                 .expect("event")
-                .event
                 .id
                 .to_string()
         );
@@ -540,36 +639,43 @@ mod test {
                 }
         ]))
         .expect("valid object");
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+        let mut receiver = relayer.create_new_local_connection().await;
+        let mut publisher = relayer.create_new_local_connection().await;
 
         assert_eq!(relayer.total_subscribers(), 0);
-        let _ = relayer
-            .process_request_from_client(&connection, request)
-            .await;
 
-        assert_eq!(relayer.total_subscribers(), 5);
+        receiver.send(request).await.expect("subscribe");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // eod
-        assert!(recv
+        assert!(receiver
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
         // It is empty
-        assert!(recv.try_recv().is_err());
+        assert!(receiver.try_recv().is_none());
 
-        relayer
-            .process_request_from_client(&connection, get_note())
-            .await
-            .expect("process event");
+        publisher.send(get_note()).await.expect("valid send");
 
-        sleep(Duration::from_millis(100)).await;
+        sleep(Duration::from_millis(10)).await;
+
+        // ok from posting
+        let msg = publisher.try_recv();
+        assert!(msg.is_some());
+        assert_eq!(
+            msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
+            "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
+        );
 
         // It is not empty
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = receiver.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok")
                 .as_event()
@@ -580,12 +686,12 @@ mod test {
         );
 
         // it must be deliverd at most once
-        assert!(recv.try_recv().is_err());
-        assert_eq!(relayer.total_subscribers(), 5);
+        assert!(receiver.try_recv().is_none());
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // when client is dropped, the subscription is removed
         // automatically
-        drop(connection);
+        drop(receiver);
 
         sleep(Duration::from_millis(10)).await;
 
@@ -624,36 +730,44 @@ mod test {
                 }
         ]))
         .expect("valid object");
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+
+        let mut receiver = relayer.create_new_local_connection().await;
+        let mut publisher = relayer.create_new_local_connection().await;
 
         assert_eq!(relayer.total_subscribers(), 0);
-        let _ = relayer
-            .process_request_from_client(&connection, request)
-            .await;
 
-        assert_eq!(relayer.total_subscribers(), 5);
+        receiver.send(request).await.expect("subscribe");
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // eod
-        assert!(recv
+        assert!(receiver
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
         // It is empty
-        assert!(recv.try_recv().is_err());
+        assert!(receiver.try_recv().is_none());
 
-        relayer
-            .process_request_from_client(&connection, get_note())
-            .await
-            .expect("process event");
+        publisher.send(get_note()).await.expect("valid send");
 
         sleep(Duration::from_millis(100)).await;
 
+        // ok from posting
+        let msg = publisher.try_recv();
+        assert!(msg.is_some());
+        assert_eq!(
+            msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
+            "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
+        );
+
         // It is not empty
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = receiver.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok")
                 .as_event()
@@ -664,12 +778,12 @@ mod test {
         );
 
         // it must be deliverd at most once
-        assert!(recv.try_recv().is_err());
-        assert_eq!(relayer.total_subscribers(), 5);
+        assert!(receiver.try_recv().is_none());
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // when client is dropped, the subscription is removed
         // automatically
-        drop(connection);
+        drop(receiver);
 
         sleep(Duration::from_millis(10)).await;
 
@@ -687,50 +801,57 @@ mod test {
         }]))
         .expect("valid object");
 
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (publisher, _) = Connection::new_local_connection();
-
-        let mut set1 = (0..1000)
-            .map(|_| Connection::new_local_connection())
-            .collect::<Vec<_>>();
-
-        let mut set2 = (0..100)
-            .map(|_| Connection::new_local_connection())
-            .collect::<Vec<_>>();
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+        let mut publisher = relayer.create_new_local_connection().await;
 
-        let subscribe1 = set1
-            .iter()
-            .map(|(connection, _)| relayer.process_request_from_client(connection, req1.clone()))
-            .collect::<Vec<_>>();
+        let mut set1 = join_all(
+            (0..1000)
+                .map(|_| relayer.create_new_local_connection())
+                .collect::<Vec<_>>(),
+        )
+        .await;
 
-        let subscribe2 = set2
-            .iter()
-            .map(|(connection, _)| relayer.process_request_from_client(connection, req2.clone()))
-            .collect::<Vec<_>>();
+        let mut set2 = join_all(
+            (0..100)
+                .map(|_| relayer.create_new_local_connection())
+                .collect::<Vec<_>>(),
+        )
+        .await;
 
         assert_eq!(relayer.total_subscribers(), 0);
 
-        join_all(subscribe1)
-            .await
-            .into_iter()
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid calls");
-        join_all(subscribe2)
-            .await
-            .into_iter()
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid calls");
+        join_all(
+            set1.iter()
+                .map(|connection| connection.send(req1.clone()))
+                .collect::<Vec<_>>(),
+        )
+        .await
+        .into_iter()
+        .collect::<Result<Vec<_>, _>>()
+        .expect("subscribe all");
+
+        join_all(
+            set2.iter()
+                .map(|connection| connection.send(req2.clone()))
+                .collect::<Vec<_>>(),
+        )
+        .await
+        .into_iter()
+        .collect::<Result<Vec<_>, _>>()
+        .expect("subscribe all");
 
-        for (_, recv) in set1.iter_mut() {
-            assert!(recv
+        sleep(Duration::from_millis(10)).await;
+
+        for connection in set1.iter_mut() {
+            assert!(connection
                 .try_recv()
                 .expect("end of stored events")
                 .as_end_of_stored_events()
                 .is_some());
         }
 
-        for (_, recv) in set2.iter_mut() {
-            assert!(recv
+        for connection in set2.iter_mut() {
+            assert!(connection
                 .try_recv()
                 .expect("end of stored events")
                 .as_end_of_stored_events()
@@ -739,21 +860,24 @@ mod test {
 
         assert_eq!(relayer.total_subscribers(), 1100);
 
-        relayer
-            .process_request_from_client(&publisher, get_note())
-            .await
-            .expect("process event");
+        publisher.send(get_note()).await.expect("valid send");
 
         sleep(Duration::from_millis(10)).await;
 
-        for (_, recv) in set1.iter_mut() {
-            assert!(recv.try_recv().is_err());
+        let msg = publisher.try_recv();
+        assert!(msg.is_some());
+        assert_eq!(
+            msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
+            "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
+        );
+
+        for connection in set1.iter_mut() {
+            assert!(connection.try_recv().is_none());
         }
 
-        for (_, recv) in set2.iter_mut() {
-            let msg = recv.try_recv();
-            println!("{:?}", msg);
-            assert!(msg.is_ok());
+        for connection in set2.iter_mut() {
+            let msg = connection.try_recv();
+            assert!(msg.is_some());
             let msg = msg.expect("msg");
 
             assert_eq!(
@@ -761,7 +885,7 @@ mod test {
                 "1298169700973717".to_owned()
             );
 
-            assert!(recv.try_recv().is_err());
+            assert!(connection.try_recv().is_none());
         }
 
         drop(set1);
@@ -776,40 +900,78 @@ mod test {
     }
 
     #[tokio::test]
+    async fn posting_event_replies_ok() {
+        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
+        let (connection, mut recv) = Connection::new_local_connection();
+
+        let note = get_note();
+        let note_id = note.as_event().map(|x| x.id.clone()).unwrap();
+
+        relayer
+            .process_request_from_client(&connection, note)
+            .await
+            .expect("process event");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(
+            Some(
+                ROk {
+                    id: note_id.into(),
+                    status: ROkStatus::Ok,
+                }
+                .into()
+            ),
+            recv.try_recv().ok()
+        );
+    }
+
+    #[tokio::test]
     async fn subscribe_to_all() {
         let request: Request =
             serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
 
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+
+        let mut local_connection_0 = relayer.create_new_local_connection().await;
+        let mut local_connection_1 = relayer.create_new_local_connection().await;
 
         assert_eq!(relayer.total_subscribers(), 0);
-        let _ = relayer
-            .process_request_from_client(&connection, request)
-            .await;
+
+        local_connection_1.send(request).await.expect("valid send");
+
+        sleep(Duration::from_millis(10)).await;
 
         assert_eq!(relayer.total_subscribers(), 1);
 
         // eod
-        assert!(recv
+        assert!(local_connection_1
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
         // It is empty
-        assert!(recv.try_recv().is_err());
+        assert!(local_connection_1.try_recv().is_none());
 
-        relayer
-            .process_request_from_client(&connection, get_note())
+        local_connection_0
+            .send(get_note())
             .await
-            .expect("process event");
+            .expect("valid send");
 
         sleep(Duration::from_millis(10)).await;
 
+        // ok from posting
+        let msg = local_connection_0.try_recv();
+        assert!(msg.is_some());
+        assert_eq!(
+            msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
+            "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
+        );
+
         // It is not empty
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = local_connection_1.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok")
                 .as_event()
@@ -820,12 +982,12 @@ mod test {
         );
 
         // it must be deliverd at most once
-        assert!(recv.try_recv().is_err());
+        assert!(local_connection_1.try_recv().is_none());
         assert_eq!(relayer.total_subscribers(), 1);
 
         // when client is dropped, the subscription is removed
         // automatically
-        drop(connection);
+        drop(local_connection_1);
 
         sleep(Duration::from_millis(10)).await;
 
@@ -837,24 +999,20 @@ mod test {
         let (relayer1, _) = dummy_server(0, None).await;
         let (relayer2, _) = dummy_server(0, None).await;
         let (relayer3, _) = dummy_server(0, None).await;
-        let (main_relayer, _) = dummy_server(
-            0,
-            Some(Pool::new_with_clients(vec![
-                relayer1.clone(),
-                relayer2.clone(),
-                relayer3.clone(),
-            ])),
-        )
-        .await;
 
-        let mut reader_client =
-            Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()]);
-        let main_client = Pool::new_with_clients(vec![main_relayer]);
+        let (pool, _in_scope) =
+            Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
+                .expect("valid pool");
 
-        let _sub = reader_client
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let (main_relayer, _) = dummy_server(0, Some(pool)).await;
+
+        let (mut reader_client, _reader_client_inscope) =
+            Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
+                .expect("valid pool");
+        let (main_client, _main_client_inscope) =
+            Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
+
+        let _sub = reader_client.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(20)).await;
 
@@ -894,19 +1052,19 @@ mod test {
         assert_eq!(
             responses
                 .get(&relayer1.port().expect("port"))
-                .map(|x| x.event.id.clone()),
+                .map(|x| x.id.clone()),
             Some(signed_content.id.clone())
         );
         assert_eq!(
             responses
                 .get(&relayer2.port().expect("port"))
-                .map(|x| x.event.id.clone()),
+                .map(|x| x.id.clone()),
             Some(signed_content.id.clone())
         );
         assert_eq!(
             responses
                 .get(&relayer3.port().expect("port"))
-                .map(|x| x.event.id.clone()),
+                .map(|x| x.id.clone()),
             Some(signed_content.id)
         );
     }
@@ -915,21 +1073,17 @@ mod test {
     async fn relayer_with_client_pool() {
         let (relayer1, _) = dummy_server(0, None).await;
         let (relayer2, _) = dummy_server(0, None).await;
-        let (main_relayer, _) = dummy_server(
-            0,
-            Some(Pool::new_with_clients(vec![relayer1.clone(), relayer2])),
-        )
-        .await;
+        let (pool, _in_scope) =
+            Pool::new_with_clients(vec![relayer1.clone(), relayer2]).expect("valid pool");
+        let (main_relayer, _) = dummy_server(0, Some(pool)).await;
 
-        let secondary_client = Pool::new_with_clients(vec![relayer1]);
+        let (secondary_client, _sc) = Pool::new_with_clients(vec![relayer1]).expect("valid client");
 
         // Create a subscription in the main relayer, main_client is only
         // connected to the main relayer
-        let mut main_client = Pool::new_with_clients(vec![main_relayer]);
-        let _sub = main_client
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let (mut main_client, _in_scope) =
+            Pool::new_with_clients(vec![main_relayer]).expect("valid client");
+        let _sub = main_client.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
         assert!(main_client
@@ -942,7 +1096,7 @@ mod test {
 
         let account1 = Account::default();
         let signed_content = account1
-            .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+            .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None)
             .expect("valid signed content");
 
         // account1 posts a new note into the relayer1, and the main relayer
@@ -956,8 +1110,8 @@ mod test {
             Some((signed_content.id, signed_content.signature)),
             main_client
                 .try_recv()
-                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
-                .map(|x| (x.id, x.signature))
+                .and_then(|(r, _)| r.as_event().cloned())
+                .map(|x| (x.id.clone(), x.signature.clone()))
         );
         assert!(main_client.try_recv().is_none());
     }

+ 0 - 193
crates/relayer/src/subscription/manager.rs

@@ -1,193 +0,0 @@
-use crate::connection::ConnectionId;
-use nostr_rs_storage_base::{EventFilter, Index, SingleIndex};
-use nostr_rs_types::{
-    client::Subscribe,
-    types::{Event, SubscriptionId},
-    Response,
-};
-use std::{
-    collections::{BTreeMap, HashSet},
-    sync::{
-        atomic::{AtomicUsize, Ordering},
-        Arc,
-    },
-};
-use tokio::sync::{mpsc::Sender, RwLock};
-
-type SubIdx = (SingleIndex, ConnectionId, SubscriptionId);
-
-pub const MIN_PREFIX_MATCH_LEN: usize = 2;
-
-/// Subscription for a connection
-///
-/// This object is responsible for keeping track of a subscription for a connection
-///
-/// When dropped their listener will be removed from the subscription manager automatically
-#[derive(Clone, Debug)]
-pub struct ActiveSubscription {
-    conn_id: ConnectionId,
-    name: SubscriptionId,
-    indexes: Vec<SingleIndex>,
-    manager: Arc<SubscriptionManager>,
-}
-
-impl ActiveSubscription {
-    fn new(
-        conn_id: ConnectionId,
-        name: SubscriptionId,
-        index: Index,
-        manager: Arc<SubscriptionManager>,
-    ) -> Self {
-        Self {
-            conn_id,
-            name,
-            indexes: index.split(),
-            manager,
-        }
-    }
-}
-
-impl Drop for ActiveSubscription {
-    /// When the subscription is dropped, it will remove the listener from the
-    /// subscription manager
-    fn drop(&mut self) {
-        let indexes = self
-            .indexes
-            .drain(..)
-            .map(|key| (key, self.conn_id, self.name.clone()))
-            .collect::<Vec<_>>();
-
-        let manager = self.manager.clone();
-
-        tokio::spawn(async move {
-            manager.unsubscribe(indexes).await;
-        });
-    }
-}
-
-type SubscriptionValue = (Arc<EventFilter>, Sender<Response>);
-
-/// Subscription manager
-///
-/// This object is responsible for letting clients and processes subscribe to
-/// events,
-#[derive(Debug)]
-pub struct SubscriptionManager {
-    /// List of subscriptions with their filters and their index.
-    ///
-    /// A single request may be converted to multiple subscriptions entry as
-    /// they are sorted by their index/key
-    subscriptions: RwLock<BTreeMap<SubIdx, SubscriptionValue>>,
-    /// Total number of subscribers
-    /// A single REQ may have multiple subscriptions
-    total_subscribers: AtomicUsize,
-    /// Minimum prefix match length
-    min_prefix_match_len: usize,
-}
-
-impl Default for SubscriptionManager {
-    fn default() -> Self {
-        Self {
-            subscriptions: Default::default(),
-            total_subscribers: Default::default(),
-            min_prefix_match_len: MIN_PREFIX_MATCH_LEN,
-        }
-    }
-}
-
-impl SubscriptionManager {
-    async fn unsubscribe(self: Arc<Self>, keys: Vec<SubIdx>) {
-        let mut subscriptions = self.subscriptions.write().await;
-        for sub in keys {
-            subscriptions.remove(&sub);
-        }
-        self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
-    }
-
-    fn get_keys_from_event(event: &Event, _min_prefix_match_len: usize) -> Vec<SingleIndex> {
-        let mut subscriptions = vec![];
-
-        subscriptions.push(SingleIndex::Author(event.author().to_owned()));
-        subscriptions.push(SingleIndex::Id(event.id.to_owned()));
-
-        for t in event.tags() {
-            t.get_indexable_value()
-                .map(|v| subscriptions.push(SingleIndex::Tag(t.get_identifier().to_owned(), v)));
-        }
-
-        subscriptions.push(SingleIndex::Kind(event.kind()));
-        subscriptions.push(SingleIndex::AllUpdates);
-        subscriptions
-    }
-
-    /// Get the number of subscribers
-    pub fn total_subscribers(self: &Arc<Self>) -> usize {
-        self.total_subscribers.load(Ordering::Relaxed)
-    }
-
-    /// Subscribe to a future events
-    ///
-    /// This will add a new subscription to the subscription manager with a
-    /// given conn_id, sender and a vector of filters.
-    pub async fn subscribe(
-        self: &Arc<Self>,
-        conn_id: ConnectionId,
-        sender: Sender<Response>,
-        request: Subscribe,
-    ) -> Vec<ActiveSubscription> {
-        let name = request.subscription_id;
-        let mut subscriptions = self.subscriptions.write().await;
-        let subscriptions = request
-            .filters
-            .into_iter()
-            .map(|mut filter| {
-                let index: Index = (&mut filter).into();
-                let filter = Arc::new(EventFilter::from(filter));
-                let subscription =
-                    ActiveSubscription::new(conn_id, name.clone(), index, self.clone());
-
-                for key in subscription.indexes.iter() {
-                    subscriptions.insert(
-                        (key.clone(), conn_id, name.clone()),
-                        (filter.clone(), sender.clone()),
-                    );
-                }
-                subscription
-            })
-            .collect::<Vec<_>>();
-        self.total_subscribers
-            .fetch_add(subscriptions.len(), Ordering::Relaxed);
-        subscriptions
-    }
-
-    /// Publish an event to all subscribers
-    pub fn broadcast(self: &Arc<Self>, event: Event) {
-        let this = self.clone();
-        tokio::spawn(async move {
-            let subscriptions = this.subscriptions.read().await;
-            let subs = Self::get_keys_from_event(&event, this.min_prefix_match_len);
-            let mut deliverded = HashSet::new();
-
-            for sub in subs {
-                for ((sub_type, client, name), (filter, sender)) in subscriptions.range(
-                    &(
-                        sub.clone(),
-                        ConnectionId::new_empty(),
-                        SubscriptionId::empty(),
-                    )..,
-                ) {
-                    if sub_type != &sub {
-                        break;
-                    }
-
-                    if deliverded.contains(client) || !filter.check_event(&event) {
-                        continue;
-                    }
-
-                    let _ = sender.try_send(Response::Event((name, &event).into()));
-                    deliverded.insert(*client);
-                }
-            }
-        });
-    }
-}

+ 0 - 3
crates/relayer/src/subscription/mod.rs

@@ -1,3 +0,0 @@
-mod manager;
-
-pub use self::manager::{ActiveSubscription, SubscriptionManager};

+ 1 - 0
crates/storage/base/Cargo.toml

@@ -5,6 +5,7 @@ edition = "2021"
 
 [dependencies]
 nostr-rs-types = { path = "../../types" }
+nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 thiserror = "1.0.40"
 rand = "0.8.5"
 tokio = { version = "1.32.0", features = ["full"] }

+ 3 - 2
crates/storage/base/src/cursor.rs

@@ -1,5 +1,6 @@
-use crate::{event_filter::EventFilter, Error};
+use crate::Error;
 use futures::FutureExt;
+use nostr_rs_subscription_manager::SortedFilter;
 use nostr_rs_types::types::{Event, Filter};
 use std::{
     future::Future,
@@ -20,7 +21,7 @@ pub type FutureResult<'a> = Pin<Box<dyn Future<Output = Result<Option<Event>, Er
 
 pub fn check_future_call(
     future_event: &mut Option<FutureResult<'_>>,
-    filter: &Option<EventFilter>,
+    filter: &Option<SortedFilter>,
     cx: &mut Context<'_>,
 ) -> FutureValue {
     if let Some(mut inner_future) = future_event.take() {

+ 1 - 9
crates/storage/base/src/lib.rs

@@ -6,8 +6,6 @@
 #![allow(dead_code)]
 pub mod cursor;
 mod error;
-mod event_filter;
-mod index;
 mod secondary_index;
 mod storage;
 
@@ -17,13 +15,7 @@ pub mod test;
 #[cfg(feature = "test")]
 pub use tokio;
 
-pub use crate::{
-    error::Error,
-    event_filter::*,
-    index::{Index, SingleIndex},
-    secondary_index::SecondaryIndex,
-    storage::Storage,
-};
+pub use crate::{error::Error, secondary_index::SecondaryIndex, storage::Storage};
 
 #[macro_export]
 /// This macro creates the

+ 1 - 0
crates/storage/memory/Cargo.toml

@@ -7,6 +7,7 @@ edition = "2021"
 async-trait = "0.1.81"
 futures = "0.3.30"
 nostr-rs-storage-base = { path = "../base" }
+nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
 tokio = { version = "1.39.2", features = ["full"] }
 

+ 3 - 2
crates/storage/memory/src/cursor.rs

@@ -2,8 +2,9 @@ use crate::Memory;
 use futures::Stream;
 use nostr_rs_storage_base::{
     cursor::{check_future_call, FutureResult, FutureValue},
-    Error, EventFilter, Storage,
+    Error, Storage,
 };
+use nostr_rs_subscription_manager::SortedFilter;
 use nostr_rs_types::types::Event;
 use std::{
     collections::{BTreeMap, VecDeque},
@@ -14,7 +15,7 @@ use tokio::sync::RwLockReadGuard;
 
 pub struct Cursor<'a> {
     pub db: &'a Memory,
-    pub filter: Option<EventFilter>,
+    pub filter: Option<SortedFilter>,
     pub limit: Option<usize>,
     pub returned: usize,
     pub index: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,

+ 9 - 8
crates/storage/memory/src/lib.rs

@@ -1,4 +1,5 @@
-use nostr_rs_storage_base::{Error, EventFilter, Index, SecondaryIndex, Storage};
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
+use nostr_rs_subscription_manager::{CompoundIndex, SortedFilter};
 use nostr_rs_types::types::{Event, Filter};
 use std::{
     cmp::min,
@@ -141,42 +142,42 @@ impl Storage for Memory {
             Some(query.limit.try_into()?)
         };
 
-        let query_index: Index = (&mut query).into();
+        let query_index: CompoundIndex = (&mut query).into();
 
         let (index, index_prefixes) = match query_index {
-            Index::Tag(tag_name, tags) => (
+            CompoundIndex::Tag(tag_name, tags) => (
                 self.indexes.tags.read().await,
                 tags.into_iter()
                     .map(|tag| tag.into_bytes(&tag_name))
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Author(authors) => (
+            CompoundIndex::Author(authors) => (
                 self.indexes.author.read().await,
                 authors
                     .into_iter()
                     .map(|c| c.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Id(ids) => (
+            CompoundIndex::Id(ids) => (
                 self.indexes.ids.read().await,
                 ids.into_iter()
                     .map(|c| c.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Kind(kind) => (
+            CompoundIndex::Kind(kind) => (
                 self.indexes.kind.read().await,
                 kind.into_iter()
                     .map(|kind| kind.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
 
-            Index::TableScan => (
+            CompoundIndex::TableScan => (
                 self.indexes.ids_by_time.read().await,
                 vec![Vec::new()].into(), // all keys
             ),
         };
 
-        let filter: EventFilter = query.into();
+        let filter: SortedFilter = query.into();
 
         Ok(Self::Cursor {
             db: self,

+ 1 - 0
crates/storage/rocksdb/Cargo.toml

@@ -7,6 +7,7 @@ edition = "2021"
 
 [dependencies]
 nostr-rs-storage-base = { path = "../base" }
+nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
 rocksdb = { version = "0.20.1", features = [
     "multi-threaded-cf",

+ 4 - 3
crates/storage/rocksdb/src/cursor.rs

@@ -3,8 +3,9 @@ use crate::{ReferenceType, RocksDb};
 use futures::Stream;
 use nostr_rs_storage_base::{
     cursor::{check_future_call, FutureResult, FutureValue},
-    Error, EventFilter, Storage,
+    Error, Storage,
 };
+use nostr_rs_subscription_manager::SortedFilter;
 use nostr_rs_types::types::Event;
 use rocksdb::{DBIteratorWithThreadMode, DB};
 use std::{
@@ -24,7 +25,7 @@ pub struct Cursor<'a> {
     /// is given each events from the secondary index will be returned,
     /// otherwise the events will be filtered by the given filter, and only
     /// those events that comply will be returned
-    filter: Option<EventFilter>,
+    filter: Option<SortedFilter>,
     /// Reference to the namespace to use to query the secondary index. If none
     /// is given the secondary_index_iterator must be constructed outside this
     /// wrapper.
@@ -50,7 +51,7 @@ impl<'a> Cursor<'a> {
         db: &'a RocksDb,
         index: Option<ReferenceType>,
         prefixes: Vec<Vec<u8>>,
-        filter: Option<EventFilter>,
+        filter: Option<SortedFilter>,
         secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
         limit: Option<usize>,
     ) -> Self {

+ 8 - 7
crates/storage/rocksdb/src/lib.rs

@@ -1,6 +1,7 @@
 //! Rocks DB implementation of the storage layer
 use crate::cursor::Cursor;
-use nostr_rs_storage_base::{Error, Index, SecondaryIndex, Storage};
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
+use nostr_rs_subscription_manager::CompoundIndex;
 use nostr_rs_types::types::{Event, Filter};
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
@@ -195,24 +196,24 @@ impl Storage for RocksDb {
             Some(query.limit.try_into()?)
         };
 
-        let query_index: Index = (&mut query).into();
+        let query_index: CompoundIndex = (&mut query).into();
 
         let (index, secondary_index_iterator, prefixes) = match query_index {
-            Index::Tag(tag_name, tags) => (
+            CompoundIndex::Tag(tag_name, tags) => (
                 Some(ReferenceType::Tags),
                 None,
                 tags.into_iter()
                     .map(|tag| tag.into_bytes(&tag_name))
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Id(ids) => (
+            CompoundIndex::Id(ids) => (
                 None,
                 None,
                 ids.into_iter()
                     .map(|id| id.to_vec())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Author(authors) => (
+            CompoundIndex::Author(authors) => (
                 Some(ReferenceType::Author),
                 None,
                 authors
@@ -220,7 +221,7 @@ impl Storage for RocksDb {
                     .map(|author| author.to_vec())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Kind(kinds) => (
+            CompoundIndex::Kind(kinds) => (
                 Some(ReferenceType::Kind),
                 None,
                 kinds
@@ -228,7 +229,7 @@ impl Storage for RocksDb {
                     .map(|kind| kind.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::TableScan => (
+            CompoundIndex::TableScan => (
                 Some(ReferenceType::Stream),
                 Some(self.db.iterator_cf(
                     &self.reference_to_cf_handle(ReferenceType::Stream)?,

+ 13 - 0
crates/subscription-manager/Cargo.toml

@@ -0,0 +1,13 @@
+[package]
+name = "nostr-rs-subscription-manager"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+chrono = "0.4.38"
+nostr-rs-types = { path = "../types" }
+tokio = { version = "1.40.0", features = ["full"] }
+
+[dev-dependencies]
+serde = "1.0.209"
+serde_json = "1.0.127"

+ 15 - 9
crates/storage/base/src/event_filter.rs → crates/subscription-manager/src/filter.rs

@@ -1,12 +1,19 @@
+//! Filter crate
+//!
+//! This mod provides a way to transform a filter into a sorted filter, that can
+//! match events much efficiently.
 use chrono::{DateTime, Utc};
-use nostr_rs_types::types::{filter::TagValue, Addr, Event, Filter, Kind, Tag};
+use nostr_rs_types::types::{filter::TagValue, Event, Filter as FilterT, Kind};
 use std::{
     collections::{HashMap, HashSet},
     ops::Deref,
 };
 
 #[derive(Debug)]
-pub struct EventFilter {
+/// Event Sorted Filter
+///
+/// It is an internal representation of the filter, used for quick memory matching
+pub struct SortedFilter {
     ids: HashSet<[u8; 32]>,
     authors: HashSet<[u8; 32]>,
     tags: HashMap<String, HashSet<TagValue>>,
@@ -15,8 +22,8 @@ pub struct EventFilter {
     until: Option<DateTime<Utc>>,
 }
 
-impl From<Filter> for EventFilter {
-    fn from(query: Filter) -> Self {
+impl From<FilterT> for SortedFilter {
+    fn from(query: FilterT) -> Self {
         let authors = query
             .authors
             .into_iter()
@@ -26,7 +33,6 @@ impl From<Filter> for EventFilter {
         let tags = query
             .tags
             .into_iter()
-            .map(|(tag, values)| (tag, values))
             .collect::<HashMap<String, HashSet<TagValue>>>();
 
         let kinds = query.kinds.into_iter().collect::<HashSet<Kind>>();
@@ -37,7 +43,7 @@ impl From<Filter> for EventFilter {
             .map(|id| (*id))
             .collect::<HashSet<_>>();
 
-        EventFilter {
+        SortedFilter {
             ids,
             authors,
             kinds,
@@ -48,7 +54,7 @@ impl From<Filter> for EventFilter {
     }
 }
 
-impl EventFilter {
+impl SortedFilter {
     /// Returns true if the filter is empty, meaning that it will match any event.
     pub fn is_empty(&self) -> bool {
         self.ids.is_empty()
@@ -92,8 +98,8 @@ impl EventFilter {
                     tag.get_indexable_value()
                         .map(|f| (tag.get_identifier().to_owned(), f))
                 })
-                .fold(HashMap::new(), |mut acc, (key, value)| {
-                    acc.entry(key).or_insert_with(HashSet::new).insert(value);
+                .fold(HashMap::<_, HashSet<_>>::new(), |mut acc, (key, value)| {
+                    acc.entry(key).or_default().insert(value);
                     acc
                 });
 

+ 67 - 43
crates/storage/base/src/index.rs → crates/subscription-manager/src/index.rs

@@ -1,16 +1,51 @@
-//! Indexes for the storage engine.
+//! Indexes for the subscription manager
 //!
-//! This module contains the definition of the indexes used by the storage
-//! engine and provides a convenient conversion from less useful to more useful
-//! indexes by default.
-//!
-//! Each storage engine can use this implementation or have their own
-use nostr_rs_types::types::{filter::TagValue, Addr, Filter, Id, Kind};
+//! This module contains the definition of the indexes used by the subscription
+//! manager to provide a convenient way to choose indexes to reduce the number
+//! of checks before sending the events to the subscribers.
+use nostr_rs_types::types::{filter::TagValue, Event, Filter, Id, Kind};
 use std::collections::HashSet;
 
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+/// Events can be indexed by different fields.
+///
+/// This index is a multi-value index, meaning that it can be indexed by multiple fields
+pub enum Index {
+    /// Author ID
+    Author(Id),
+    /// Note ID
+    Id(Id),
+    /// Note Kind
+    Kind(Kind),
+    /// Tag
+    Tag(String, TagValue),
+    /// A catch-all index
+    Anything,
+}
+
+impl Index {
+    /// Converts an event into a list of indexes
+    pub fn from(event: &Event) -> Vec<Self> {
+        let mut subscriptions = vec![];
+
+        subscriptions.push(Index::Author(event.author().to_owned()));
+        subscriptions.push(Index::Id(event.id.to_owned()));
+
+        for t in event.tags() {
+            if let Some(v) = t.get_indexable_value() {
+                subscriptions.push(Index::Tag(t.get_identifier().to_owned(), v));
+            }
+        }
+
+        subscriptions.push(Index::Kind(event.kind()));
+        subscriptions.push(Index::Anything);
+        subscriptions
+    }
+}
+
 /// Indexes for the storage engine.
 #[derive(Debug, Clone)]
-pub enum Index {
+pub enum CompoundIndex {
     /// Index by tag.
     Tag(String, HashSet<TagValue>),
     /// Index by id.
@@ -23,37 +58,28 @@ pub enum Index {
     TableScan,
 }
 
-impl Index {
+impl CompoundIndex {
     /// Splits the index into a list of single indexes.
-    pub fn split(self) -> Vec<SingleIndex> {
+    pub fn split(self) -> Vec<Index> {
         match self {
-            Index::Tag(tag, tags) => tags
+            CompoundIndex::Tag(tag, tags) => tags
                 .into_iter()
-                .map(|tag_value| SingleIndex::Tag(tag.clone(), tag_value))
+                .map(|tag_value| Index::Tag(tag.clone(), tag_value))
                 .collect(),
-            Index::Id(ids) => ids.into_iter().map(SingleIndex::Id).collect(),
-            Index::Author(authors) => authors.into_iter().map(SingleIndex::Author).collect(),
-            Index::Kind(kinds) => kinds.into_iter().map(SingleIndex::Kind).collect(),
-            Index::TableScan => vec![SingleIndex::AllUpdates],
+            CompoundIndex::Id(ids) => ids.into_iter().map(Index::Id).collect(),
+            CompoundIndex::Author(authors) => authors.into_iter().map(Index::Author).collect(),
+            CompoundIndex::Kind(kinds) => kinds.into_iter().map(Index::Kind).collect(),
+            CompoundIndex::TableScan => vec![Index::Anything],
         }
     }
 }
 
-#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub enum SingleIndex {
-    Author(Id),
-    Id(Id),
-    Kind(Kind),
-    Tag(String, TagValue),
-    AllUpdates,
-}
-
-impl From<&mut Filter> for Index {
+impl From<&mut Filter> for CompoundIndex {
     fn from(query: &mut Filter) -> Self {
         if !query.ids.is_empty() {
-            Index::Id(std::mem::take(&mut query.ids))
+            CompoundIndex::Id(std::mem::take(&mut query.ids))
         } else if !query.authors.is_empty() {
-            Index::Author(std::mem::take(&mut query.authors))
+            CompoundIndex::Author(std::mem::take(&mut query.authors))
         } else if !query.tags.is_empty() {
             // find the tag with fewer options to iterate over
             // and convert that tag as the index.
@@ -69,24 +95,22 @@ impl From<&mut Filter> for Index {
             let (key, _) = tag_with_fewer_opts.remove(0);
 
             if let Some(tags) = query.tags.remove(&key) {
-                Index::Tag(key, tags)
+                CompoundIndex::Tag(key, tags)
             } else {
-                Index::TableScan
+                CompoundIndex::TableScan
             }
         } else if !query.kinds.is_empty() {
-            Index::Kind(std::mem::take(&mut query.kinds))
+            CompoundIndex::Kind(std::mem::take(&mut query.kinds))
         } else {
-            Index::TableScan
+            CompoundIndex::TableScan
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use serde_json::json;
-
     use super::*;
-    use std::collections::HashMap;
+    use serde_json::json;
 
     #[test]
     fn test_index_preference_ids_over_others() {
@@ -98,8 +122,8 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Id(_)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Id(_)));
     }
 
     #[test]
@@ -111,8 +135,8 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Author(_)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Author(_)));
     }
 
     #[test]
@@ -123,8 +147,8 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Tag(_, _)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Tag(_, _)));
     }
 
     #[test]
@@ -134,7 +158,7 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Kind(_)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Kind(_)));
     }
 }

+ 213 - 0
crates/subscription-manager/src/lib.rs

@@ -0,0 +1,213 @@
+//! Subscription manager
+//!
+//! This crate provides a subscription manager or matching engine for
+//! subscriptions and events.
+//!
+//! This crate provides a generic efficient way of keeping track of
+//! subscriptions and check an event to get their listeners
+//!
+//! Each subscription has a droppable struct that will remove the subscription
+//! on Drop.
+//!
+//! Any delivery mechanism or any other form of communication is not part of
+//! this crate
+#![deny(missing_docs, warnings)]
+
+use nostr_rs_types::types::{Event, Filter};
+use std::{
+    collections::{BTreeMap, HashSet},
+    fmt::Debug,
+    hash::Hash,
+    ops::{Deref, DerefMut},
+    sync::{atomic::AtomicUsize, Arc},
+};
+use tokio::sync::{RwLock, RwLockWriteGuard};
+
+mod filter;
+mod index;
+
+pub use self::{
+    filter::SortedFilter,
+    index::{CompoundIndex, Index},
+};
+
+/// Subscription value
+pub struct Subscription<T>
+where
+    T: Sync + Send,
+{
+    /// inner object
+    inner: T,
+    /// Reverse index
+    ///
+    /// This is a reverse index of the filters, it is only used to update the
+    /// main shared index when this subscription is dropped.
+    reverse_index: Vec<Vec<Index>>,
+}
+
+impl<T> Deref for Subscription<T>
+where
+    T: Sync + Send,
+{
+    type Target = T;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+impl<T> DerefMut for Subscription<T>
+where
+    T: Sync + Send,
+{
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.inner
+    }
+}
+
+/// Active subscription
+///
+/// This is a droppable struct that will remove the subscription from the
+/// manager on Drop.
+///
+/// The callee must keep this struct alive to keep the subscription alive.
+pub struct ActiveSubscription<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    id: I,
+    manager: Option<Arc<SubscriptionManager<I, T>>>,
+}
+
+impl<I, T> Drop for ActiveSubscription<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    fn drop(&mut self) {
+        if let Some(manager) = self.manager.take() {
+            manager.unsubscribe(self);
+        }
+    }
+}
+
+/// Subscription manager
+///
+/// This is the main struct that keeps track of all the subscriptions
+///
+/// The generic type `I` is the type of the subscription ID (which is outside of
+/// the scope of this crate) and the T which is space to keep aditional data
+/// associate with a subscription
+#[derive(Default)]
+pub struct SubscriptionManager<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    subscriptions: RwLock<BTreeMap<I, Subscription<T>>>,
+    index: RwLock<BTreeMap<(Index, I), SortedFilter>>,
+    total_subscribers: AtomicUsize,
+}
+
+impl<I, T> SubscriptionManager<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    fn unsubscribe(self: Arc<Self>, subscription: &mut ActiveSubscription<I, T>) {
+        let id_to_remove = subscription.id.clone();
+        self.total_subscribers
+            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+        tokio::spawn(async move {
+            let mut subscriptions = self.subscriptions.write().await;
+            let mut indexes = self.index.write().await;
+
+            if let Some(subscription) = subscriptions.remove(&id_to_remove) {
+                for single_indexes in subscription.reverse_index.iter() {
+                    for index in single_indexes.iter() {
+                        indexes.remove(&(index.clone(), id_to_remove.clone()));
+                    }
+                }
+            }
+        });
+    }
+
+    /// Get the subscriptions list as a mutable reference
+    pub async fn subbscriptions_mut(&self) -> RwLockWriteGuard<'_, BTreeMap<I, Subscription<T>>> {
+        self.subscriptions.write().await
+    }
+
+    /// Get active listeners for this event
+    pub async fn get_subscribers(self: &Arc<Self>, event: &Event) -> Vec<I> {
+        let indexes = self.index.read().await;
+
+        let event_index = Index::from(event);
+        let mut matched = HashSet::new();
+
+        for idx in event_index {
+            let start_index = (idx.clone(), I::default());
+
+            for ((current_idx, subscription_id), filter) in indexes.range(&start_index..) {
+                if current_idx != &idx {
+                    break;
+                }
+
+                if !matched.contains(subscription_id) && filter.check_event(event) {
+                    matched.insert(subscription_id.clone());
+                }
+            }
+        }
+
+        matched.into_iter().collect()
+    }
+
+    /// Returns the total number of subscribers
+    pub fn total_subscribers(&self) -> usize {
+        self.total_subscribers
+            .load(std::sync::atomic::Ordering::Relaxed)
+    }
+
+    /// Creates a subscription and returns an active subscription struct
+    ///
+    /// The return object must be kept alive to keep the subscription alive
+    pub async fn subscribe(
+        self: &Arc<Self>,
+        id: I,
+        mut filters: Vec<Filter>,
+        inner: T,
+    ) -> ActiveSubscription<I, T> {
+        let mut subscriptions = self.subscriptions.write().await;
+        let mut indexes = self.index.write().await;
+
+        let reverse_index: Vec<_> = filters
+            .iter_mut()
+            .map(|f| {
+                let event_index = <&mut Filter as Into<CompoundIndex>>::into(f).split();
+                (f.clone(), event_index)
+            })
+            .collect();
+
+        for (filter, single_indexes) in reverse_index.iter() {
+            for index in single_indexes.iter() {
+                indexes.insert((index.clone(), id.clone()), filter.clone().into());
+            }
+        }
+
+        self.total_subscribers
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        subscriptions.insert(
+            id.clone(),
+            Subscription {
+                reverse_index: reverse_index.into_iter().map(|(_, index)| index).collect(),
+                inner,
+            },
+        );
+
+        ActiveSubscription {
+            id,
+            manager: Some(self.clone()),
+        }
+    }
+}

+ 1 - 0
crates/types/Cargo.toml

@@ -12,6 +12,7 @@ chrono = "0.4.23"
 custom_derive = "0.1.7"
 enum_derive = "0.1.7"
 hex = "0.4"
+once_cell = "1.19.0"
 rand = "0.8.5"
 secp256k1 = { version = "0.26.0", features = [
     "global-context",

+ 32 - 0
crates/types/src/client/subscribe.rs

@@ -2,9 +2,18 @@
 //!
 //! Used to request events and subscribe to new updates.
 use crate::{common::SerializeDeserialize, types};
+use hex::ToHex;
+use once_cell::sync::Lazy;
+use rand::RngCore;
 use serde_json::Value;
 use std::collections::VecDeque;
 
+static ALL_EVENTS_PREFIX: Lazy<String> = Lazy::new(|| {
+    let mut prefix = [0u8; 4];
+    rand::thread_rng().fill_bytes(&mut prefix);
+    prefix.encode_hex::<String>()
+});
+
 /// Request: used to request events and subscribe to new updates.
 ///
 /// More details at https://github.com/nostr-protocol/nips/blob/master/01.md#communication-between-clients-and-relays
@@ -18,6 +27,22 @@ pub struct Subscribe {
     pub filters: Vec<types::Filter>,
 }
 
+/// Checks if the subscription ID is for all events
+pub fn is_all_events(subscription_id: &types::SubscriptionId) -> bool {
+    subscription_id.starts_with(&*ALL_EVENTS_PREFIX)
+}
+
+impl Subscribe {
+    /// Creates a new subscription with a random ID to subscribe to all events
+    pub fn to_all_events() -> Self {
+        Self {
+            subscription_id: types::SubscriptionId::with_prefix(&ALL_EVENTS_PREFIX)
+                .expect("valid subscription id"),
+            filters: vec![Default::default()],
+        }
+    }
+}
+
 impl From<types::Filter> for Subscribe {
     fn from(filter: types::Filter) -> Self {
         Self {
@@ -127,4 +152,11 @@ mod test {
         assert_eq!(r.subscription_id, obj.subscription_id);
         assert_eq!(r.filters.len(), obj.filters.len());
     }
+
+    #[test]
+    fn test_subscribe_all() {
+        let x = Subscribe::to_all_events();
+        assert!(is_all_events(&x.subscription_id));
+        assert!(!is_all_events(&Default::default()));
+    }
 }

+ 15 - 0
crates/types/src/lib.rs

@@ -17,3 +17,18 @@ pub use self::{request::Request, response::Response};
 extern crate custom_derive;
 #[macro_use]
 extern crate enum_derive;
+
+#[cfg(test)]
+mod regression {
+    use crate::types::Event;
+
+    #[test]
+    fn event() {
+        include_str!("../tests/regression_parsing.json")
+            .lines()
+            .for_each(|line| {
+                let event: Event = serde_json::from_str(line).unwrap();
+                assert!(event.is_valid().is_ok(), "Failed to parse: {}", line);
+            });
+    }
+}

+ 1 - 1
crates/types/src/relayer/auth.rs

@@ -10,7 +10,7 @@ use std::{collections::VecDeque, ops::Deref};
 
 /// This is how a relayer sends an authentication challenge to the client. The
 /// challenge must be returned signed by the client
-#[derive(Clone, Debug)]
+#[derive(Clone, PartialEq, Eq, Debug)]
 pub struct Auth(pub String);
 
 impl Default for Auth {

+ 7 - 1
crates/types/src/relayer/eose.rs

@@ -13,9 +13,15 @@ use std::{collections::VecDeque, ops::Deref};
 /// This is how to the relayer signals the client they gave all the stored
 /// messages with the requested filter. Future events will be sent, as they
 /// appear to this relayer
-#[derive(Clone, Debug)]
+#[derive(Clone, PartialEq, Eq, Debug)]
 pub struct EndOfStoredEvents(pub SubscriptionId);
 
+impl From<SubscriptionId> for EndOfStoredEvents {
+    fn from(value: SubscriptionId) -> Self {
+        Self(value)
+    }
+}
+
 impl Deref for EndOfStoredEvents {
     type Target = SubscriptionId;
 

+ 10 - 2
crates/types/src/relayer/event.rs

@@ -4,10 +4,10 @@ use crate::{
     types::{self, subscription_id::Error, SubscriptionId},
 };
 use serde_json::Value;
-use std::collections::VecDeque;
+use std::{collections::VecDeque, ops::Deref};
 
 /// An event sent to clients
-#[derive(Clone, Debug)]
+#[derive(Clone, PartialEq, Eq, Debug)]
 pub struct Event {
     /// The subscription ID that matched the inner event
     pub subscription_id: SubscriptionId,
@@ -15,6 +15,14 @@ pub struct Event {
     pub event: types::Event,
 }
 
+impl Deref for Event {
+    type Target = types::Event;
+
+    fn deref(&self) -> &Self::Target {
+        &self.event
+    }
+}
+
 impl From<(&SubscriptionId, &types::Event)> for Event {
     fn from((subscription_id, event): (&SubscriptionId, &types::Event)) -> Self {
         Self {

+ 7 - 1
crates/types/src/relayer/mod.rs

@@ -8,4 +8,10 @@ pub mod event;
 pub mod notice;
 pub mod ok;
 
-pub use self::{auth::Auth, eose::EndOfStoredEvents, event::Event, notice::Notice, ok::ROk};
+pub use self::{
+    auth::Auth,
+    eose::EndOfStoredEvents,
+    event::Event,
+    notice::Notice,
+    ok::{ROk, ROkStatus},
+};

+ 1 - 1
crates/types/src/relayer/notice.rs

@@ -8,7 +8,7 @@ use serde_json::Value;
 use std::{collections::VecDeque, ops::Deref};
 
 /// Notices are errors
-#[derive(Clone, Debug)]
+#[derive(Clone, PartialEq, Eq, Debug)]
 pub struct Notice(pub String);
 
 impl Deref for Notice {

+ 93 - 12
crates/types/src/relayer/ok.rs

@@ -11,15 +11,80 @@ use crate::{
 use serde_json::Value;
 use std::collections::VecDeque;
 
+#[derive(Clone, PartialEq, Eq, Debug)]
+/// ROkStatus
+pub enum ROkStatus {
+    /// Everything is ok
+    Ok,
+    /// Proof of work failed
+    Pow,
+    /// Event is known already
+    Duplicate,
+    /// User is blocked
+    Blocked,
+    /// User is blacklisted
+    Blacklisted,
+    /// Rate limit exceeded
+    RateLimit,
+    /// Invalid event
+    Invalid,
+    /// Error
+    Error(String),
+}
+
+impl From<(Addr, ROkStatus)> for ROk {
+    fn from((id, status): (Addr, ROkStatus)) -> Self {
+        Self { id, status }
+    }
+}
+
+impl ROkStatus {
+    /// Compute readable status string
+    pub fn computer_readable(&self) -> String {
+        match self {
+            ROkStatus::Ok => "",
+            ROkStatus::Pow => "pow",
+            ROkStatus::Duplicate => "duplicate",
+            ROkStatus::Blocked => "blocked",
+            ROkStatus::Blacklisted => "blocked",
+            ROkStatus::RateLimit => "rate-limit",
+            ROkStatus::Invalid => "invalid",
+            ROkStatus::Error(_) => "error",
+        }
+        .to_owned()
+    }
+
+    /// Human readable status
+    #[allow(clippy::inherent_to_string)]
+    pub fn to_string(&self) -> String {
+        let message = match self {
+            ROkStatus::Ok => "",
+            ROkStatus::Pow => "difficulty",
+            ROkStatus::Duplicate => "already have this event",
+            ROkStatus::Blocked => "try again later",
+            ROkStatus::Blacklisted => "you are banned from posting here",
+            ROkStatus::RateLimit => "slow down there chief",
+            ROkStatus::Invalid => "event creation date is too far off from the current time",
+            ROkStatus::Error(e) => e,
+        }
+        .to_owned();
+
+        match self {
+            ROkStatus::Ok => "".to_owned(),
+            _ => {
+                format!("{}:{}", self.computer_readable(), message)
+            }
+        }
+    }
+}
+
 /// OK messages
-#[derive(Clone, Debug)]
+#[derive(Clone, PartialEq, Eq, Debug)]
 pub struct ROk {
     /// Event Id
     pub id: Addr,
-    /// Whether the event has been successful or not
-    pub status: bool,
-    /// Some message
-    pub message: String,
+    /// Status
+    pub status: ROkStatus,
 }
 
 impl SerializeDeserialize for ROk {
@@ -31,8 +96,8 @@ impl SerializeDeserialize for ROk {
         Ok(vec![
             Value::String(Self::get_tag().to_owned()),
             Value::String(self.id.to_hex()),
-            Value::Bool(self.status),
-            Value::String(self.message.clone()),
+            Value::Bool(self.status == ROkStatus::Ok),
+            Value::String(self.status.computer_readable()),
         ])
     }
 
@@ -52,8 +117,25 @@ impl SerializeDeserialize for ROk {
             .to_owned();
         Ok(Self {
             id,
-            status,
-            message,
+            status: if status {
+                ROkStatus::Ok
+            } else {
+                let part = message
+                    .split(":")
+                    .next()
+                    .map(|x| x.trim().to_owned())
+                    .unwrap_or_default();
+                match part.as_str() {
+                    "pow" => ROkStatus::Pow,
+                    "duplicate" => ROkStatus::Duplicate,
+                    "blocked" => ROkStatus::Blocked,
+                    "blacklisted" => ROkStatus::Blacklisted,
+                    "rate-limit" => ROkStatus::RateLimit,
+                    "invalid" => ROkStatus::Invalid,
+                    "error" => ROkStatus::Error(message),
+                    _ => ROkStatus::Error(message),
+                }
+            },
         })
     }
 }
@@ -74,12 +156,11 @@ mod test {
     fn serialize() {
         let ok_ = ROk {
             id: "a0b0".try_into().expect("valid_id"),
-            status: true,
-            message: "Some test".into(),
+            status: ROkStatus::Ok,
         };
         let m: Response = ok_.into();
         assert_eq!(
-            r#"["OK","a0b0",true,"Some test"]"#,
+            r#"["OK","a0b0",true,""]"#,
             serde_json::to_string(&m).expect("valid json")
         );
     }

+ 4 - 3
crates/types/src/response.rs

@@ -10,7 +10,7 @@ use serde::{
 use std::collections::VecDeque;
 
 custom_derive! {
-#[derive(Debug, Clone, EnumFromInner)]
+#[derive(Debug, Clone, PartialEq, Eq, EnumFromInner)]
     /// Response Message
     ///
     /// All responses from relayers to clients are abstracted in this struct
@@ -236,8 +236,9 @@ mod test {
         jsons
             .iter()
             .map(|json| {
-                let message: Response =
-                    serde_json::from_str(json).expect(&format!("valid message: {}", json));
+                let message: Response = serde_json::from_str(json)
+                    .unwrap_or_else(|_| panic!("valid message: {}", json));
+
                 assert!(message.as_event().is_some());
             })
             .for_each(drop);

+ 14 - 0
crates/types/src/types/addr.rs

@@ -13,6 +13,7 @@ use std::{
     fmt::{self, Display},
     hash::Hash,
     ops::Deref,
+    str::FromStr,
 };
 use thiserror::Error;
 
@@ -173,6 +174,14 @@ impl Addr {
 /// The first approach is try decoding the the Addr as a hex-encoded string.
 ///
 /// The second approach is decoding with bech32
+impl FromStr for Addr {
+    type Err = Error;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        s.try_into()
+    }
+}
+
 impl TryFrom<&str> for Addr {
     type Error = Error;
 
@@ -195,6 +204,11 @@ impl TryFrom<&str> for Addr {
     }
 }
 
+/// Converts an Addr from an string
+///
+/// The first approach is try decoding the the Addr as a hex-encoded string.
+///
+/// The second approach is decoding with bech32
 impl TryFrom<String> for Addr {
     type Error = Error;
 

+ 5 - 0
crates/types/src/types/event.rs

@@ -202,6 +202,11 @@ impl Event {
         &self.inner.content
     }
 
+    /// Consumes the event and returns the content
+    pub fn take_content(self) -> Content {
+        self.inner.content
+    }
+
     /// Verifies if the Id, Public Key and Signature are correct
     fn verify_signature(
         public_key: &Id,

+ 2 - 2
crates/types/src/types/filter.rs

@@ -63,8 +63,8 @@ where
         {
             let mut tags = HashMap::new();
             while let Some((key, value)) = map.next_entry::<String, _>()? {
-                if key.starts_with('#') {
-                    tags.insert(key[1..].to_string(), value);
+                if let Some(stripped) = key.strip_prefix('#') {
+                    tags.insert(stripped.to_string(), value);
                 }
             }
             Ok(tags)

+ 1 - 1
crates/types/src/types/id.rs

@@ -27,7 +27,7 @@ impl TryFrom<&str> for Id {
         if hex::decode_to_slice(value, &mut slice).is_ok() {
             return Ok(Id(slice));
         }
-        let (hrp, bytes, _) = bech32::decode(&value)?;
+        let (hrp, bytes, _) = bech32::decode(value)?;
         match hrp.to_lowercase().as_str() {
             "npub" | "nsec" | "note" => {}
             _ => return Err(Error::UnexpectedHrp),

+ 15 - 0
crates/types/src/types/subscription_id.rs

@@ -31,6 +31,21 @@ impl SubscriptionId {
     pub fn empty() -> Self {
         Self("".to_owned())
     }
+
+    /// Creates a new subscription ID with a prefix
+    ///
+    /// Prefixes are meaningless in this context but they may be meaningful in
+    /// other contexts, such as the clients or client pools.
+    pub fn with_prefix(prefix: &str) -> Result<Self, Error> {
+        if prefix.as_bytes().len() > 30 {
+            return Err(Error::TooLong);
+        }
+        let mut data = [0u8; 32];
+        rand::thread_rng().fill_bytes(&mut data);
+        let suffix = data.encode_hex::<String>();
+
+        Ok(Self(format!("{}{}", prefix, &suffix[prefix.len()..])))
+    }
 }
 
 impl Deref for SubscriptionId {

+ 45 - 63
crates/types/src/types/tag.rs

@@ -30,23 +30,7 @@ pub enum Marker {
     Unknown(String),
 }
 
-#[derive(Debug, Clone, PartialEq, Eq)]
-/// Url content or empty string
-pub enum UrlOrEmpty {
-    /// Url
-    Url(Url),
-    /// Empty string
-    Empty,
-}
-
-impl Display for UrlOrEmpty {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            Self::Url(url) => write!(f, "{}", url),
-            Self::Empty => write!(f, ""),
-        }
-    }
-}
+type ParsedUrl = (Option<Url>, String);
 
 impl Display for Marker {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -96,11 +80,11 @@ pub enum RelayAccessType {
 #[derive(Debug, PartialEq, Eq, Clone)]
 pub enum Tag {
     /// Tag another event
-    Event(Id, Option<UrlOrEmpty>, Option<Marker>),
+    Event(Id, Option<ParsedUrl>, Option<Marker>),
     /// Tag another public key
-    PubKey(Id, Option<UrlOrEmpty>, Option<String>),
+    PubKey(Id, Option<ParsedUrl>, Option<String>),
     /// Tag a relay
-    Relay(Url, RelayAccessType),
+    Relay(ParsedUrl, RelayAccessType),
     /// Tag a hashtag
     Hashtag(String),
     /// Tag with an external content id
@@ -116,7 +100,7 @@ pub enum Tag {
     /// Image - NIP-23, NIP-52, NIP-58
     Image(Url, Option<String>),
     /// Zap Goal - NIP-75
-    ZapGoal(Id, Option<UrlOrEmpty>),
+    ZapGoal(Id, Option<ParsedUrl>),
     /// Weird, supported nonetheless
     Empty,
 }
@@ -157,9 +141,9 @@ impl Tag {
             Tag::PubKey(key, _, _) => Some(TagValue::Id(key.clone())),
             Tag::ExternalContentId(id, _) => Some(TagValue::String(id.clone())),
             Tag::Hashtag(content) | Tag::Title(content) => Some(TagValue::String(content.clone())),
-            Tag::Relay(url, _) => Some(TagValue::String(url.to_string())),
+            Tag::Relay((_, url), _) => Some(TagValue::String(url.to_string())),
             Tag::Unknown(_, args) => {
-                let value = args.get(0).cloned().unwrap_or_default();
+                let value = args.first().cloned().unwrap_or_default();
                 Some(
                     value
                         .as_str()
@@ -200,8 +184,8 @@ impl ser::Serialize for Tag {
             }
             Tag::Event(event, relayer_url, marker) => {
                 seq.serialize_element(&event.to_string())?;
-                if let Some(relayer) = &relayer_url {
-                    seq.serialize_element(&relayer.to_string())?;
+                if let Some((_, relayer)) = relayer_url {
+                    seq.serialize_element(&relayer)?;
                     if let Some(marker) = &marker {
                         seq.serialize_element(&marker.to_string())?;
                     }
@@ -209,8 +193,8 @@ impl ser::Serialize for Tag {
             }
             Tag::ZapGoal(event, relayer_url) => {
                 seq.serialize_element(&event.to_string())?;
-                if let Some(relayer) = &relayer_url {
-                    seq.serialize_element(&relayer.to_string())?;
+                if let Some((_, relayer)) = relayer_url {
+                    seq.serialize_element(&relayer)?;
                 }
             }
             Tag::ExternalContentId(content, url) => {
@@ -224,14 +208,14 @@ impl ser::Serialize for Tag {
             }
             Tag::PubKey(key, relayer_url, pet_name) => {
                 seq.serialize_element(&key.to_string())?;
-                if let Some(relayer) = &relayer_url {
-                    seq.serialize_element(&relayer.to_string())?;
+                if let Some((_, relayer)) = relayer_url {
+                    seq.serialize_element(&relayer)?;
                     if let Some(pet_name) = &pet_name {
                         seq.serialize_element(pet_name)?;
                     }
                 }
             }
-            Tag::Relay(url, access) => {
+            Tag::Relay((_, url), access) => {
                 seq.serialize_element(url)?;
 
                 if let Some(access) = match access {
@@ -270,10 +254,7 @@ impl<'de> Deserialize<'de> for Tag {
             0 => return Ok(Tag::Empty),
             1 => {
                 let tag_name = parts.pop_front().unwrap_or_default();
-                match tag_name.as_str() {
-                    "encrypted" => return Ok(Tag::Encrypted),
-                    _ => {}
-                }
+                if tag_name.as_str() == "encrypted" { return Ok(Tag::Encrypted) }
                 return Ok(Tag::Unknown(tag_name, vec![]));
             }
             _ => {}
@@ -287,28 +268,16 @@ impl<'de> Deserialize<'de> for Tag {
                 .pop_front()
                 .ok_or_else::<D::Error, _>(|| de::Error::custom("missing argument"))
                 .and_then(|id| id.parse().map_err(de::Error::custom))
-                .and_then(|id| {
-                    let relayer_url = parts
-                        .pop_front()
-                        .map(|value| {
-                            if value.is_empty() {
-                                Ok(UrlOrEmpty::Empty)
-                            } else {
-                                value.parse().map(UrlOrEmpty::Url)
-                            }
-                        })
-                        .transpose()
-                        .map_err(de::Error::custom);
-
-                    relayer_url.map(|relayer_url| {
-                        let extra = parts.pop_front();
-                        match tag_type.as_str() {
-                            "e" => Tag::Event(id, relayer_url, extra.map(|x| x.as_str().into())),
-                            "goal" => Tag::ZapGoal(id, relayer_url),
-                            "p" => Tag::PubKey(id, relayer_url, extra),
-                            _ => unreachable!(),
-                        }
-                    })
+                .map(|id| {
+                    let relayer_url = parts.pop_front().map(|value| (value.parse().ok(), value));
+
+                    let extra = parts.pop_front();
+                    match tag_type.as_str() {
+                        "e" => Tag::Event(id, relayer_url, extra.map(|x| x.as_str().into())),
+                        "goal" => Tag::ZapGoal(id, relayer_url),
+                        "p" => Tag::PubKey(id, relayer_url, extra),
+                        _ => unreachable!(),
+                    }
                 }),
             "expiration" => {
                 let timestamp = parts
@@ -348,7 +317,11 @@ impl<'de> Deserialize<'de> for Tag {
                 let url = parts
                     .pop_front()
                     .ok_or_else::<D::Error, _>(|| de::Error::custom("missing url"))
-                    .and_then(|url| url.parse().map_err(de::Error::custom));
+                    .and_then(|url| {
+                        url.parse::<Url>()
+                            .map_err(de::Error::custom)
+                            .map(|parsed_url| (Some(parsed_url), url))
+                    });
 
                 let access = parts
                     .pop_front()
@@ -437,7 +410,7 @@ mod test {
                 "8fe53b37518e3dbe9bab26d912292001d8b882de9456b7b08b615f912dc8bf4a"
                     .parse()
                     .unwrap(),
-                Some(UrlOrEmpty::Empty),
+                Some((None, "".to_owned())),
                 Some("mention".to_owned()),
             ),
             Tag::Event(
@@ -461,10 +434,13 @@ mod test {
 
     #[test]
     fn test_relay() {
-        let json = json!(["r", "https://example.com", "read"]);
+        let json = json!(["r", "https://example.com/", "read"]);
         assert_eq!(
             Tag::Relay(
-                "https://example.com".parse().expect("valid url"),
+                (
+                    Some("https://example.com".parse().expect("valid url")),
+                    "https://example.com/".to_owned()
+                ),
                 RelayAccessType::Read
             ),
             serde_json::from_value(json).expect("valid json"),
@@ -473,7 +449,10 @@ mod test {
         let json = json!(["r", "https://example.com", "write"]);
         assert_eq!(
             Tag::Relay(
-                "https://example.com".parse().expect("valid url"),
+                (
+                    Some("https://example.com".parse().expect("valid url")),
+                    "https://example.com".to_owned()
+                ),
                 RelayAccessType::Write
             ),
             serde_json::from_value(json).expect("valid json"),
@@ -482,7 +461,10 @@ mod test {
         let json = json!(["r", "https://example.com"]);
         assert_eq!(
             Tag::Relay(
-                "https://example.com".parse().expect("valid url"),
+                (
+                    Some("https://example.com".parse().expect("valid url")),
+                    "https://example.com".to_owned()
+                ),
                 RelayAccessType::Both,
             ),
             serde_json::from_value(json).expect("valid json"),
@@ -611,7 +593,7 @@ mod test {
                 "d45a98f898820258a3313f5cb14f5fe8a9263437931ac6309f23ae0324833f39"
                     .parse()
                     .unwrap(),
-                None
+                None,
             ),
         );
     }

+ 2 - 0
crates/types/tests/regression_parsing.json

@@ -0,0 +1,2 @@
+{"content":"🤙","created_at":1724806560,"id":"f4b1461f37dc847f6dea3dbc1b5470c4dceb23c4046ebbcb80807f62f520f371","kind":7,"pubkey":"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78","sig":"5d6bcf31b72d8a189e0cd2095bf64bf052f0ec88ec0e5c6a669f546b14306204feffa0b453a9591639710fe6341ae53589dcbd0cda2752428bd986ccaf99bf6e","tags":[["e","078fc44df2ba89ae08e2d327d5ab43f31e60765b7517342a74e8a180b943fd71","wss://relay.primal.net","root"],["p","b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78"],["e","0612860a3a567a531bde1076a407c50f086f56246f5a0f8e99e061d23cd040b5"],["p","d7df5567015930b17c125b3a7cf29bef23aa5a68d09cd6518d291359606aab7b"]]}
+{"content":"🤣 https://image.nostr.build/f0f21864cb22642fc5d83cd96fc55d5d85a2e469e46e191be50bc71a7a1d9740.jpg ","created_at":1719979685,"id":"85616bf509f2612677a1e59f94cceab07f06fb37240597b260fee42c0295992b","kind":1,"pubkey":"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78","sig":"f3e993413b2b4f4579e535c1559e982bb460446d47c02a7ab2e884fd996da31ea999ad89f2ad707800ab4dbf6966819dfd2c93f2d0f4ce64f79430f00b16c8ed","tags":[["e","4668bb2cc0a1fdb5b328810a22b9bf2e8425ff82a228347ddf97e2d9797cf41b","","root"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"],["imeta","url https://image.nostr.build/f0f21864cb22642fc5d83cd96fc55d5d85a2e469e46e191be50bc71a7a1d9740.jpg","blurhash eEFOc0.R0hH[n#%E${%JD+Mzb=xoxZxtIq.7-:xtRjRQxZWUxWn$NH","dim 1280x720"],["r","https://image.nostr.build/f0f21864cb22642fc5d83cd96fc55d5d85a2e469e46e191be50bc71a7a1d9740.jpg"]]}

+ 1 - 1
src/main.rs

@@ -72,7 +72,7 @@ async fn main() {
     let addr = "127.0.0.1:3000";
     let listener: TcpListener = TcpListener::bind(&addr).await.unwrap();
 
-    let _ = relayer.main(listener).expect("valid main").await;
+    let _ = relayer.main(listener).expect("valid main");
 
     /*
     loop {