Bläddra i källkod

Improvements to the relayer

Things done:

* [x] Improved client to have droppable subscription.
	- When a pool subscription is dropped it should be desuscribed at the socket level
* [x] Remove `block_on` from Drop and use `tokio::spawn` instead to avoid deadlocks
* [x] Test close subscription
* [x] Added ability to subscribe to all updates
* [x] Test all removed scenarios
* [x] Improve `SortedFilter::keys()` to produce a better list of keys instead of multiple
	- Added multiple indexes
	- SubscriptionId is notified once
Cesar Rodas 3 månader sedan
förälder
incheckning
37beec8e3b

+ 1 - 0
Cargo.lock

@@ -899,6 +899,7 @@ dependencies = [
  "futures",
  "futures-util",
  "log",
+ "nostr-rs-memory",
  "nostr-rs-relayer",
  "nostr-rs-types",
  "serde_json",

+ 1 - 0
crates/client/Cargo.toml

@@ -20,3 +20,4 @@ futures = "0.3.28"
 
 [dev-dependencies]
 nostr-rs-relayer = { path = "../relayer" }
+nostr-rs-memory = { path = "../storage/memory" }

+ 7 - 6
crates/client/src/client.rs

@@ -1,5 +1,4 @@
 use crate::Error;
-use futures::executor::block_on;
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{
     client::{self, subscribe},
@@ -32,11 +31,13 @@ pub struct ActiveSubscription {
 
 impl Drop for ActiveSubscription {
     fn drop(&mut self) {
-        block_on(async move {
-            self.subscriptions.write().await.remove(&self.id);
-            let _ = self
-                .send_to_socket
-                .send(nostr_rs_types::client::Close(self.id.clone()).into())
+        let subscriptions = self.subscriptions.clone();
+        let id = self.id.clone();
+        let send_to_socket = self.send_to_socket.clone();
+        tokio::spawn(async move {
+            subscriptions.write().await.remove(&id);
+            let _ = send_to_socket
+                .send(nostr_rs_types::client::Close(id).into())
                 .await;
         });
     }

+ 2 - 0
crates/client/src/lib.rs

@@ -12,4 +12,6 @@ mod client;
 mod error;
 mod pool;
 
+pub use url::Url;
+
 pub use self::{client::Client, error::Error, pool::Pool};

+ 93 - 14
crates/client/src/pool.rs

@@ -8,10 +8,12 @@ use nostr_rs_types::{
     types::SubscriptionId,
     Response,
 };
-use std::collections::HashMap;
+use std::{collections::HashMap, sync::Arc};
 use tokio::sync::{mpsc, RwLock};
 use url::Url;
 
+type Subscriptions =
+    Arc<RwLock<HashMap<SubscriptionId, (subscribe::Subscribe, Vec<ActiveSubscription>)>>>;
 /// Clients
 ///
 /// This is a set of outgoing connections to relayers. This struct can connect
@@ -22,20 +24,14 @@ pub struct Pool {
     clients: RwLock<HashMap<Url, Client>>,
     sender: mpsc::Sender<(Response, Url)>,
     receiver: Option<mpsc::Receiver<(Response, Url)>>,
-    subscriptions: RwLock<HashMap<SubscriptionId, (subscribe::Subscribe, Vec<ActiveSubscription>)>>,
+    subscriptions: Subscriptions,
 }
 
+/// Default channel buffer size for the pool
+pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
+
 impl Default for Pool {
     fn default() -> Self {
-        Self::new()
-    }
-}
-
-const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 10_000;
-
-impl Pool {
-    /// Creates a new Relayers object
-    pub fn new() -> Self {
         let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
         Self {
             clients: Default::default(),
@@ -44,7 +40,25 @@ impl Pool {
             sender,
         }
     }
+}
 
+/// Return a subscription that will be removed when dropped
+pub struct PoolSubscription {
+    subscription_id: SubscriptionId,
+    subscriptions: Subscriptions,
+}
+
+impl Drop for PoolSubscription {
+    fn drop(&mut self) {
+        let subscriptions = self.subscriptions.clone();
+        let subscription_id = self.subscription_id.clone();
+        tokio::spawn(async move {
+            subscriptions.write().await.remove(&subscription_id);
+        });
+    }
+}
+
+impl Pool {
     /// Creates a new instance with a list of urls
     pub fn new_with_clients(clients: Vec<Url>) -> Self {
         let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
@@ -76,8 +90,16 @@ impl Pool {
         self.receiver.as_mut()?.recv().await
     }
 
+    /// Returns the number of active subscriptions
+    pub async fn active_subscriptions(&self) -> usize {
+        self.subscriptions.read().await.keys().len()
+    }
+
     /// Subscribes to all the connected relayers
-    pub async fn subscribe(&self, subscription: subscribe::Subscribe) -> Result<(), Error> {
+    pub async fn subscribe(
+        &self,
+        subscription: subscribe::Subscribe,
+    ) -> Result<PoolSubscription, Error> {
         let clients = self.clients.read().await;
 
         let wait_all = clients
@@ -85,8 +107,10 @@ impl Pool {
             .map(|sender| sender.subscribe(subscription.clone()))
             .collect::<Vec<_>>();
 
+        let subscription_id = subscription.subscription_id.clone();
+
         self.subscriptions.write().await.insert(
-            subscription.subscription_id.clone(),
+            subscription_id.clone(),
             (
                 subscription,
                 join_all(wait_all)
@@ -96,7 +120,10 @@ impl Pool {
             ),
         );
 
-        Ok(())
+        Ok(PoolSubscription {
+            subscription_id,
+            subscriptions: self.subscriptions.clone(),
+        })
     }
 
     /// Sends a request to all the connected relayers
@@ -144,3 +171,55 @@ impl Pool {
         }
     }
 }
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use nostr_rs_memory::Memory;
+    use nostr_rs_relayer::Relayer;
+    use std::time::Duration;
+    use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
+
+    async fn dummy_server() -> (Url, JoinHandle<()>) {
+        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+        let local_addr = listener.local_addr().expect("addr");
+
+        let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
+        let stopper = relayer.main(listener).expect("valid main loop");
+        (
+            Url::parse(&format!("ws://{}", local_addr.to_string())).expect("valid url"),
+            stopper,
+        )
+    }
+
+    #[tokio::test]
+    async fn droppable_subscription() {
+        let pool = Pool::default();
+        let subscription = pool
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        assert_eq!(pool.active_subscriptions().await, 1);
+        drop(subscription);
+        sleep(Duration::from_millis(10)).await;
+        assert_eq!(pool.active_subscriptions().await, 0);
+    }
+
+    #[tokio::test]
+    async fn connect_to_dummy_server() {
+        let (addr, stopper) = dummy_server().await;
+        let pool = Pool::new_with_clients(vec![addr]);
+
+        assert_eq!(0, pool.check_active_connections().await);
+
+        sleep(Duration::from_millis(1000)).await;
+        assert_eq!(1, pool.check_active_connections().await);
+
+        // stop dummy server
+        stopper.abort();
+
+        sleep(Duration::from_millis(100)).await;
+        assert_eq!(0, pool.check_active_connections().await);
+    }
+}

+ 9 - 13
crates/dump/src/main.rs

@@ -1,4 +1,4 @@
-use nostr_rs_client::{Error as ClientError, Pool};
+use nostr_rs_client::{Error as ClientError, Pool, Url};
 use nostr_rs_types::{client::Subscribe, Response};
 
 #[derive(Debug, thiserror::Error)]
@@ -13,18 +13,14 @@ pub enum Error {
 #[tokio::main]
 async fn main() {
     env_logger::init();
-    let mut clients = vec![
-        "wss://relay.damus.io/",
-        "wss://brb.io",
-        "wss://nos.lol",
-        "wss://relay.current.fyi",
-        "wss://eden.nostr.land",
-        "wss://relay.snort.social",
-    ]
-    .into_iter()
-    .fold(Pool::new(), |clients, host| {
-        clients.connect_to(host.parse().expect("valid url"))
-    });
+    let mut clients = Pool::new_with_clients(vec![
+        Url::parse("wss://relay.damus.io/").expect("valid url"),
+        Url::parse("wss://brb.io").expect("valid url"),
+        Url::parse("wss://nos.lol").expect("valid url"),
+        Url::parse("wss://relay.current.fyi").expect("valid url"),
+        Url::parse("wss://eden.nostr.land").expect("valid url"),
+        Url::parse("wss://relay.snort.social").expect("valid url"),
+    ]);
 
     let _ = clients.subscribe(Subscribe::default().into()).await;
 

+ 19 - 16
crates/relayer/src/connection.rs

@@ -1,8 +1,8 @@
-use crate::Error;
+use crate::{subscription::ActiveSubscription, Error};
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{
     relayer::{Auth, ROk},
-    types::Addr,
+    types::{Addr, SubscriptionId},
     Request, Response,
 };
 use std::{
@@ -21,7 +21,7 @@ use tokio::{
 use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
-pub struct ConnectionId(pub(crate) usize);
+pub struct ConnectionId(usize);
 
 impl Default for ConnectionId {
     fn default() -> Self {
@@ -52,10 +52,9 @@ impl ConnectionId {
 /// The new connection struct. This struct spawn's a new worker that handles
 /// upcoming messages form the client
 pub struct Connection {
-    #[allow(unused)]
-    pub(crate) conn_id: ConnectionId,
+    conn_id: ConnectionId,
     sender: Sender<Response>,
-    subscriptions: RwLock<HashMap<String, ConnectionId>>,
+    subscriptions: RwLock<HashMap<SubscriptionId, Vec<ActiveSubscription>>>,
     handler: Option<JoinHandle<()>>,
 }
 
@@ -178,18 +177,22 @@ impl Connection {
             .map_err(|e| Error::TrySendError(Box::new(e)))
     }
 
-    /// Get subscription id for a given subscription id
-    pub async fn get_subscription_id(&self, id: &str) -> Option<ConnectionId> {
-        let subscriptions = self.subscriptions.read().await;
-        subscriptions.get(id).copied()
+    /// Get the sender for this connection
+    pub fn get_sender(&self) -> Sender<Response> {
+        self.sender.clone()
+    }
+
+    /// Get the connection id for this connection
+    pub fn get_conn_id(&self) -> ConnectionId {
+        self.conn_id
     }
 
     /// Create a subscription for this connection
-    pub async fn create_subscription(&self, id: String) -> (ConnectionId, Sender<Response>) {
-        let mut subscriptions = self.subscriptions.write().await;
-        let internal_id = subscriptions
-            .entry(id)
-            .or_insert_with(ConnectionId::default);
-        (*internal_id, self.sender.clone())
+    pub async fn keep_track_subscription(
+        &self,
+        id: SubscriptionId,
+        subscriptions: Vec<ActiveSubscription>,
+    ) {
+        self.subscriptions.write().await.insert(id, subscriptions);
     }
 }

+ 1 - 3
crates/relayer/src/lib.rs

@@ -15,6 +15,4 @@ mod error;
 mod relayer;
 mod subscription;
 
-pub use self::{
-    connection::Connection, error::Error, relayer::Relayer, subscription::Subscription,
-};
+pub use self::{connection::Connection, error::Error, relayer::Relayer};

+ 333 - 99
crates/relayer/src/relayer.rs

@@ -1,28 +1,18 @@
-use crate::{connection::ConnectionId, Connection, Error, Subscription};
+use crate::{connection::ConnectionId, subscription::SubscriptionManager, Connection, Error};
 use futures_util::StreamExt;
 use nostr_rs_client::{Error as ClientError, Pool};
 use nostr_rs_storage_base::Storage;
-use nostr_rs_types::{
-    relayer,
-    types::{Event, SubscriptionId},
-    Request, Response,
-};
-use std::{collections::HashMap, ops::Deref};
+use nostr_rs_types::{relayer, types::Event, Request, Response};
+use std::{collections::HashMap, ops::Deref, sync::Arc};
 use tokio::{
     net::{TcpListener, TcpStream},
-    sync::{
-        mpsc::{channel, Receiver, Sender},
-        RwLockReadGuard,
-    },
+    sync::mpsc::{channel, Receiver, Sender},
 };
 use tokio::{
     sync::{mpsc, RwLock},
     task::JoinHandle,
 };
 
-type SubId = ConnectionId;
-type Subscriptions = HashMap<SubId, (SubscriptionId, Sender<Response>)>;
-
 /// Relayer struct
 ///
 pub struct Relayer<T: Storage + Send + Sync + 'static> {
@@ -31,19 +21,8 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// be able to perform any optimization like prefetching content while offline
     storage: Option<T>,
-    /// Keeps a map between the internal subscription ID and the subscription
-    /// type. One subscription ID may have multiple subscription types.
-    ///
-    /// Each connection keeps a list of the subscription ID provided by the user
-    /// (String) and the internal, globally recognized subscription ID which is
-    /// internal (SubId)
-    subscriptions_ids_index: RwLock<HashMap<SubId, Vec<Subscription>>>,
-    /// Each subscription type that is active has a list of subscriptions.
-    ///
-    /// A single REQ can be subscribed to multiple subscription types, specially
-    /// when it is translated in OR filters. It is designed this way to allow a
-    /// fast iteration and match quickly filters.
-    subscriptions: RwLock<HashMap<Subscription, RwLock<Subscriptions>>>,
+    /// x
+    subscriptions: Arc<SubscriptionManager>,
     clients: RwLock<HashMap<ConnectionId, Connection>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
     send_to_relayer: Sender<(ConnectionId, Request)>,
@@ -76,10 +55,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         let (sender, receiver) = channel(100_000);
         Ok(Self {
             storage,
+            subscriptions: Default::default(),
             send_to_relayer: sender.clone(),
             relayer_receiver: Some(receiver),
-            subscriptions: Default::default(),
-            subscriptions_ids_index: Default::default(),
             clients: Default::default(),
             client_pool: if let Some(client_pool) = client_pool {
                 Some(Self::handle_client_pool(client_pool, sender)?)
@@ -89,6 +67,11 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         })
     }
 
+    /// Total number of subscribers requests that actively listening for new events
+    pub fn total_subscribers(&self) -> usize {
+        self.subscriptions.total_subscribers()
+    }
+
     /// Splits the relayer object and extract their receiver.
     pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> {
         let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?;
@@ -184,12 +167,13 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         let client =
             Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
                 .await?;
-        let id = client.conn_id;
+        let id = client.get_conn_id();
         self.clients.write().await.insert(id, client);
 
         Ok(id)
     }
 
+    /// Process a request from a connected client
     async fn process_request_from_client(
         &self,
         connection: &Connection,
@@ -217,37 +201,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     let _ = client_pool.subscribe(request.filters.clone().into()).await;
                 }
 
-                // Create subscription
-                let (sub_id, receiver) = connection
-                    .create_subscription(request.subscription_id.deref().to_owned())
-                    .await;
-                let mut sub_index = self.subscriptions_ids_index.write().await;
-                let mut subscriptions = self.subscriptions.write().await;
-                if let Some(prev_subs) = sub_index.remove(&sub_id) {
-                    // remove any previous subscriptions
-                    for index in prev_subs.iter() {
-                        if let Some(subscriptions) = subscriptions.get_mut(index) {
-                            subscriptions.write().await.remove(&sub_id);
-                        }
-                    }
-                }
-
-                let mut sub_index_values = vec![];
-                for index in Subscription::from_filters(&request.filters).into_iter() {
-                    subscriptions
-                        .entry(index.clone())
-                        .or_insert_with(|| RwLock::new(HashMap::new()))
-                        .write()
-                        .await
-                        .insert(sub_id, (request.subscription_id.clone(), receiver.clone()));
-                    sub_index_values.push(index);
-                }
-
-                sub_index.insert(sub_id, sub_index_values);
-
-                drop(subscriptions);
-                drop(sub_index);
-
                 if let Some(storage) = self.storage.as_ref() {
                     // Sent all events that match the filter that are stored in our database
                     for filter in request.filters.clone().into_iter() {
@@ -267,19 +220,22 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
                 let _ = connection
                     .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+
+                connection
+                    .keep_track_subscription(
+                        request.subscription_id.clone(),
+                        self.subscriptions
+                            .subscribe(
+                                connection.get_conn_id(),
+                                connection.get_sender(),
+                                request.clone(),
+                            )
+                            .await,
+                    )
+                    .await;
             }
-            Request::Close(close) => {
-                if let Some(id) = connection.get_subscription_id(&close.0).await {
-                    let mut subscriptions = self.subscriptions_ids_index.write().await;
-                    if let Some(indexes) = subscriptions.remove(&id) {
-                        let mut subscriptions = self.subscriptions.write().await;
-                        for index in indexes {
-                            if let Some(subscriptions) = subscriptions.get_mut(&index) {
-                                subscriptions.write().await.remove(&id);
-                            }
-                        }
-                    }
-                }
+            Request::Close(_close) => {
+                todo!()
             }
         };
 
@@ -287,43 +243,47 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     }
 
     #[inline]
-    fn broadcast_to_subscribers<'a>(
-        subscriptions: RwLockReadGuard<'a, Subscriptions>,
-        event: &Event,
-    ) {
-        for (_, receiver) in subscriptions.iter() {
-            let _ = receiver.1.try_send(
-                relayer::Event {
-                    subscription_id: receiver.0.clone(),
-                    event: event.clone(),
-                }
-                .into(),
-            );
-        }
-    }
-
-    #[inline]
     /// Broadcast a given event to all local subscribers
     pub async fn broadcast(&self, event: &Event) {
         if let Some(storage) = self.storage.as_ref() {
             let _ = storage.store(event).await;
         }
-        let subscriptions = self.subscriptions.read().await;
 
-        for subscription_type in Subscription::from_event(event) {
-            if let Some(subscribers) = subscriptions.get(&subscription_type) {
-                Self::broadcast_to_subscribers(subscribers.read().await, event);
-            }
-        }
+        self.subscriptions.broadcast(event.clone());
     }
 }
 
 #[cfg(test)]
 mod test {
+    use std::time::Duration;
+
     use super::*;
+    use futures::future::join_all;
     use nostr_rs_memory::Memory;
     use nostr_rs_types::Request;
     use serde_json::json;
+    use tokio::time::sleep;
+
+    fn get_note() -> Request {
+        serde_json::from_value(json!(
+            [
+                "EVENT",
+                {
+                    "kind":1,
+                    "content":"Pong",
+                    "tags":[
+                        ["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],
+                        ["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],
+                        ["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
+                        ["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]
+                    ],
+                    "created_at":1681938616,
+                    "pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3",
+                    "id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
+                    "sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc",
+                }
+        ])).expect("value")
+    }
 
     async fn get_db(prefill: bool) -> Memory {
         let db = Memory::default();
@@ -472,12 +432,110 @@ mod test {
 
     #[tokio::test]
     async fn server_listener_real_time() {
-        let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
+        let request: Request = serde_json::from_value(json!(
+            [
+                "REQ",
+                "1298169700973717",
+                {
+                    "authors": ["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
+                    "since":1681939304
+                },
+                {
+                    "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
+                    "kinds":[1,3,6,7,9735],
+                    "since":1681939304
+                },
+                {
+                    "#p":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
+                    "kinds":[4]
+                },
+                {
+                    "authors":["39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],
+                    "kinds":[4]
+                },
+                {
+                    "#e":[
+                        "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
+                        "a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1"
+                    ],
+                    "kinds":[1,6,7,9735]
+                }
+        ]))
+        .expect("valid object");
+        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
+        let (connection, mut recv) = Connection::new_for_test();
+
+        assert_eq!(relayer.total_subscribers(), 0);
+        let _ = relayer
+            .process_request_from_client(&connection, request)
+            .await;
+
+        assert_eq!(relayer.total_subscribers(), 5);
+
+        // eod
+        assert!(recv
+            .try_recv()
+            .expect("valid")
+            .as_end_of_stored_events()
+            .is_some());
+
+        // It is empty
+        assert!(recv.try_recv().is_err());
+
+        relayer
+            .process_request_from_client(&connection, get_note())
+            .await
+            .expect("process event");
+
+        sleep(Duration::from_millis(100)).await;
+
+        // It is not empty
+        let msg = recv.try_recv();
+        assert!(msg.is_ok());
+        assert_eq!(
+            msg.expect("is ok")
+                .as_event()
+                .expect("valid")
+                .subscription_id
+                .to_string(),
+            "1298169700973717".to_owned()
+        );
+
+        // it must be deliverd at most once
+        assert!(recv.try_recv().is_err());
+        assert_eq!(relayer.total_subscribers(), 5);
+
+        // when client is dropped, the subscription is removed
+        // automatically
+        drop(connection);
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(relayer.total_subscribers(), 0);
+    }
+
+    #[tokio::test]
+    async fn subscribe_partial_key() {
+        let request: Request = serde_json::from_value(json!([
+            "REQ",
+            "1298169700973717",
+            {
+                "authors":["a42007e33c"],
+                "since":1681939304
+            }
+        ]))
+        .expect("valid object");
+
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
         let (connection, mut recv) = Connection::new_for_test();
+
+        assert_eq!(relayer.total_subscribers(), 0);
         let _ = relayer
             .process_request_from_client(&connection, request)
             .await;
+
+        assert_eq!(relayer.total_subscribers(), 1);
+
         // eod
         assert!(recv
             .try_recv()
@@ -488,14 +546,190 @@ mod test {
         // It is empty
         assert!(recv.try_recv().is_err());
 
-        let new_event: Request = serde_json::from_str(r#"["EVENT", {"kind":1,"content":"Pong","tags":[["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]],"created_at":1681938616,"pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3","id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9","sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc","meta":{"revision":0,"created":1681939266488,"version":0},"$loki":108}]"#).expect("value");
+        relayer
+            .process_request_from_client(&connection, get_note())
+            .await
+            .expect("process event");
+
+        sleep(Duration::from_millis(100)).await;
+
+        // It is not empty
+        let msg = recv.try_recv();
+        assert!(msg.is_ok());
+        assert_eq!(
+            msg.expect("is ok")
+                .as_event()
+                .expect("valid")
+                .subscription_id
+                .to_string(),
+            "1298169700973717".to_owned()
+        );
+
+        // it must be deliverd at most once
+        assert!(recv.try_recv().is_err());
+        assert_eq!(relayer.total_subscribers(), 1);
+
+        // when client is dropped, the subscription is removed
+        // automatically
+        drop(connection);
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(relayer.total_subscribers(), 0);
+    }
+
+    #[tokio::test]
+    async fn multiple_subcribers() {
+        let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
+            "authors":["c42007e33c"],
+        }]))
+        .expect("valid object");
+        let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
+           "authors":["a42007e33c"]
+        }]))
+        .expect("valid object");
+
+        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
+        let (publisher, _) = Connection::new_for_test();
+
+        let mut set1 = (0..1000)
+            .map(|_| Connection::new_for_test())
+            .collect::<Vec<_>>();
+
+        let mut set2 = (0..100)
+            .map(|_| Connection::new_for_test())
+            .collect::<Vec<_>>();
+
+        let subscribe1 = set1
+            .iter()
+            .map(|(connection, _)| relayer.process_request_from_client(connection, req1.clone()))
+            .collect::<Vec<_>>();
+
+        let subscribe2 = set2
+            .iter()
+            .map(|(connection, _)| relayer.process_request_from_client(connection, req2.clone()))
+            .collect::<Vec<_>>();
+
+        assert_eq!(relayer.total_subscribers(), 0);
+
+        join_all(subscribe1)
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid calls");
+        join_all(subscribe2)
+            .await
+            .into_iter()
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid calls");
+
+        for (_, recv) in set1.iter_mut() {
+            assert!(recv
+                .try_recv()
+                .expect("end of stored events")
+                .as_end_of_stored_events()
+                .is_some());
+        }
+
+        for (_, recv) in set2.iter_mut() {
+            assert!(recv
+                .try_recv()
+                .expect("end of stored events")
+                .as_end_of_stored_events()
+                .is_some());
+        }
+
+        assert_eq!(relayer.total_subscribers(), 1100);
+
+        relayer
+            .process_request_from_client(&publisher, get_note())
+            .await
+            .expect("process event");
+
+        sleep(Duration::from_millis(10)).await;
+
+        for (_, recv) in set1.iter_mut() {
+            assert!(recv.try_recv().is_err());
+        }
+
+        for (_, recv) in set2.iter_mut() {
+            let msg = recv.try_recv();
+            assert!(msg.is_ok());
+            let msg = msg.expect("msg");
+
+            assert_eq!(
+                msg.as_event().expect("valid").subscription_id.to_string(),
+                "1298169700973717".to_owned()
+            );
+
+            assert!(recv.try_recv().is_err());
+        }
+
+        drop(set1);
+        sleep(Duration::from_millis(10)).await;
+        assert_eq!(relayer.total_subscribers(), 100);
+
+        drop(set2);
+        sleep(Duration::from_millis(10)).await;
+        assert_eq!(relayer.total_subscribers(), 0);
+
+        drop(relayer);
+    }
+
+    #[tokio::test]
+    async fn subscribe_to_all() {
+        let request: Request =
+            serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
+
+        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
+        let (connection, mut recv) = Connection::new_for_test();
+
+        assert_eq!(relayer.total_subscribers(), 0);
+        let _ = relayer
+            .process_request_from_client(&connection, request)
+            .await;
+
+        assert_eq!(relayer.total_subscribers(), 1);
+
+        // eod
+        assert!(recv
+            .try_recv()
+            .expect("valid")
+            .as_end_of_stored_events()
+            .is_some());
+
+        // It is empty
+        assert!(recv.try_recv().is_err());
 
         relayer
-            .process_request_from_client(&connection, new_event)
+            .process_request_from_client(&connection, get_note())
             .await
             .expect("process event");
 
+        sleep(Duration::from_millis(10)).await;
+
         // It is not empty
-        assert!(recv.try_recv().is_ok());
+        let msg = recv.try_recv();
+        assert!(msg.is_ok());
+        assert_eq!(
+            msg.expect("is ok")
+                .as_event()
+                .expect("valid")
+                .subscription_id
+                .to_string(),
+            "1298169700973717".to_owned()
+        );
+
+        // it must be deliverd at most once
+        assert!(recv.try_recv().is_err());
+        assert_eq!(relayer.total_subscribers(), 1);
+
+        // when client is dropped, the subscription is removed
+        // automatically
+        drop(connection);
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(relayer.total_subscribers(), 0);
     }
 }

+ 98 - 0
crates/relayer/src/subscription/filter.rs

@@ -1,6 +1,24 @@
 use nostr_rs_types::types::{Event, Filter, Kind, Tag};
 use std::collections::BTreeSet;
 
+/// The subscription keys are used to quickly identify the subscriptions
+/// by one or more fields. Think of it like a database index
+#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
+pub(crate) enum Key {
+    /// Key for the author field
+    Author(Vec<u8>, Option<Kind>),
+    /// Key for the reference to an event
+    RefId(Vec<u8>, Option<Kind>),
+    /// Key for the reference to a public key
+    RefPublicKey(Vec<u8>, Option<Kind>),
+    /// Key for the kind field
+    Id(Vec<u8>),
+    /// Key for the kind field
+    Kind(Kind),
+    /// Any value, for a catch all
+    AllUpdates,
+}
+
 type SortedSet<T> = Option<BTreeSet<T>>;
 
 /// Sorted filter
@@ -55,6 +73,86 @@ impl From<Filter> for SortedFilter {
 }
 
 impl SortedFilter {
+    /// Get the keys for the filter
+    ///
+    /// Get the keys or indexes for the filter. This is used to quickly identify potential matches
+    /// from a set of event listeners.
+    pub fn keys(&self) -> Vec<Key> {
+        let authors = self
+            .authors
+            .as_ref()
+            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+
+        let ids = self
+            .ids
+            .as_ref()
+            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+
+        let references_to_event = self
+            .references_to_event
+            .as_ref()
+            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+
+        let references_to_public_key = self
+            .references_to_public_key
+            .as_ref()
+            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+
+        let kinds = self
+            .kinds
+            .as_ref()
+            .map_or_else(|| vec![], |x| x.iter().map(|x| *x).collect());
+
+        let kind_option = if kinds.is_empty() {
+            vec![None]
+        } else {
+            kinds.clone().into_iter().map(Some).collect()
+        };
+
+        let keys = vec![
+            authors
+                .into_iter()
+                .map(|author| {
+                    kind_option
+                        .iter()
+                        .map(|kind| Key::Author(author.clone(), *kind))
+                        .collect::<Vec<_>>()
+                })
+                .collect::<Vec<_>>(),
+            references_to_event
+                .into_iter()
+                .map(|event| {
+                    kind_option
+                        .iter()
+                        .map(|kind| Key::RefId(event.clone(), *kind))
+                        .collect::<Vec<_>>()
+                })
+                .collect::<Vec<_>>(),
+            references_to_public_key
+                .into_iter()
+                .map(|pub_key| {
+                    kind_option
+                        .iter()
+                        .map(|kind| Key::RefPublicKey(pub_key.clone(), *kind))
+                        .collect::<Vec<_>>()
+                })
+                .collect::<Vec<_>>(),
+            vec![kinds
+                .into_iter()
+                .map(|kind| Key::Kind(kind))
+                .collect::<Vec<_>>()],
+            vec![ids.into_iter().map(|id| Key::Id(id)).collect::<Vec<_>>()],
+        ]
+        .concat()
+        .concat();
+
+        if keys.is_empty() {
+            vec![Key::AllUpdates]
+        } else {
+            keys
+        }
+    }
+
     /// Checks if a given key exists in the sorted set, either as a whole or as a partial match
     #[inline]
     fn has_key_or_partial_match<T: AsRef<[u8]>>(id: T, ids: &SortedSet<Vec<u8>>) -> bool {

+ 158 - 36
crates/relayer/src/subscription/manager.rs

@@ -1,32 +1,53 @@
-use crate::{
-    connection::ConnectionId,
-    subscription::{Key, Subscription},
-};
-use futures::executor::block_on;
+use super::filter::{Key, SortedFilter};
+use crate::connection::ConnectionId;
 use nostr_rs_types::{
-    relayer,
+    client::Subscribe,
     types::{Event, SubscriptionId},
     Response,
 };
-use std::{collections::BTreeMap, sync::Arc};
+use std::{
+    collections::{BTreeMap, BTreeSet},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
 use tokio::sync::{mpsc::Sender, RwLock};
 
 type SubIdx = (Key, ConnectionId, SubscriptionId);
 
+pub const MIN_PREFIX_MATCH_LEN: usize = 4;
+
 /// Subscription for a connection
 ///
 /// This object is responsible for keeping track of a subscription for a connection
 ///
 /// When dropped their listener will be removed from the subscription manager automatically
-pub struct SubscriptionForConnection {
+#[derive(Clone, Debug)]
+pub struct ActiveSubscription {
     conn_id: ConnectionId,
     name: SubscriptionId,
     keys: Vec<Key>,
-    subscribe: Subscription,
     manager: Arc<SubscriptionManager>,
 }
 
-impl Drop for SubscriptionForConnection {
+impl ActiveSubscription {
+    fn new(
+        conn_id: ConnectionId,
+        name: SubscriptionId,
+        keys: Vec<Key>,
+        manager: Arc<SubscriptionManager>,
+    ) -> Self {
+        Self {
+            conn_id,
+            name,
+            keys,
+            manager,
+        }
+    }
+}
+
+impl Drop for ActiveSubscription {
     /// When the subscription is dropped, it will remove the listener from the
     /// subscription manager
     fn drop(&mut self) {
@@ -36,8 +57,10 @@ impl Drop for SubscriptionForConnection {
             .map(|key| (key, self.conn_id, self.name.clone()))
             .collect::<Vec<_>>();
 
-        block_on(async {
-            self.manager.clone().unsubscribe(keys).await;
+        let manager = self.manager.clone();
+
+        tokio::spawn(async move {
+            manager.unsubscribe(keys).await;
         });
     }
 }
@@ -46,59 +69,158 @@ impl Drop for SubscriptionForConnection {
 ///
 /// This object is responsible for letting clients and processes subscribe to
 /// events,
-#[derive(Default)]
+#[derive(Debug)]
 pub struct SubscriptionManager {
-    subscriptions: Arc<RwLock<BTreeMap<SubIdx, Sender<Response>>>>,
+    /// List of subscriptions with their filters and their index.
+    ///
+    /// A single request may be converted to multiple subscriptions entry as
+    /// they are sorted by their index/key
+    subscriptions: RwLock<BTreeMap<SubIdx, (Arc<SortedFilter>, Sender<Response>)>>,
+    /// Total number of subscribers
+    /// A single REQ may have multiple subscriptions
+    total_subscribers: AtomicUsize,
+    /// Minimum prefix match length
+    min_prefix_match_len: usize,
+}
+
+impl Default for SubscriptionManager {
+    fn default() -> Self {
+        Self {
+            subscriptions: Default::default(),
+            total_subscribers: Default::default(),
+            min_prefix_match_len: MIN_PREFIX_MATCH_LEN,
+        }
+    }
 }
 
 impl SubscriptionManager {
-    pub async fn unsubscribe(&self, keys: Vec<SubIdx>) {
+    async fn unsubscribe(self: Arc<Self>, keys: Vec<SubIdx>) {
+        println!("block");
         let mut subscriptions = self.subscriptions.write().await;
+        println!("\tblocked");
         for sub in keys {
             subscriptions.remove(&sub);
         }
+        self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
+        println!("released");
     }
 
-    fn get_keys_from_event(event: &Event) -> Vec<Key> {
-        let mut subscriptions = vec![Key::Kind(event.kind().into())];
+    fn get_keys_from_event(event: &Event, min_prefix_match_len: usize) -> Vec<Key> {
+        let mut subscriptions = vec![];
 
         let author = event.author().as_ref().to_vec();
         let id = event.id.as_ref().to_vec();
 
-        for i in 4..author.len() {
-            subscriptions.push(Key::Author(author[..author.len() - i].to_vec()));
+        let len = author.len();
+        for i in min_prefix_match_len..len - min_prefix_match_len {
+            subscriptions.push(Key::Author(author[..len - i].to_vec(), None));
+            subscriptions.push(Key::Author(author[..len - i].to_vec(), Some(event.kind())));
         }
-        for i in 4..id.len() {
-            subscriptions.push(Key::Id(author[..author.len() - i].to_vec()));
+
+        for t in event.tags() {
+            match t {
+                nostr_rs_types::types::Tag::Event(ref_event) => {
+                    let len = ref_event.id.len();
+                    for i in min_prefix_match_len..ref_event.id.len() - min_prefix_match_len {
+                        subscriptions.push(Key::RefId(ref_event.id[..len - i].to_vec(), None));
+                        subscriptions.push(Key::RefId(
+                            ref_event.id[..len - i].to_vec(),
+                            Some(event.kind()),
+                        ));
+                    }
+                }
+                nostr_rs_types::types::Tag::PubKey(ref_pub_key) => {
+                    let len = ref_pub_key.id.len();
+                    for i in min_prefix_match_len..len - min_prefix_match_len {
+                        subscriptions.push(Key::RefId(ref_pub_key.id[..len - i].to_vec(), None));
+                        subscriptions.push(Key::RefId(
+                            ref_pub_key.id[..len - i].to_vec(),
+                            Some(event.kind()),
+                        ));
+                    }
+                }
+                _ => {}
+            }
+        }
+
+        let len = id.len();
+        for i in min_prefix_match_len..len - min_prefix_match_len {
+            subscriptions.push(Key::Id(id[..len - i].to_vec()));
         }
 
+        subscriptions.push(Key::Kind(event.kind().into()));
+        subscriptions.push(Key::AllUpdates);
+
+        subscriptions
+    }
+
+    /// Get the number of subscribers
+    pub fn total_subscribers(self: &Arc<Self>) -> usize {
+        self.total_subscribers.load(Ordering::Relaxed)
+    }
+
+    /// Subscribe to a future events
+    ///
+    /// This will add a new subscription to the subscription manager with a
+    /// given conn_id, sender and a vector of filters.
+    pub async fn subscribe(
+        self: &Arc<Self>,
+        conn_id: ConnectionId,
+        sender: Sender<Response>,
+        request: Subscribe,
+    ) -> Vec<ActiveSubscription> {
+        let name = request.subscription_id;
+        let mut subscriptions = self.subscriptions.write().await;
+        let subscriptions = request
+            .filters
+            .into_iter()
+            .map(|filter| {
+                let filter = Arc::new(SortedFilter::from(filter));
+                let subscription =
+                    ActiveSubscription::new(conn_id, name.clone(), filter.keys(), self.clone());
+                for key in subscription.keys.iter() {
+                    subscriptions.insert(
+                        (key.clone(), conn_id, name.clone()),
+                        (filter.clone(), sender.clone()),
+                    );
+                }
+                subscription
+            })
+            .collect::<Vec<_>>();
+        self.total_subscribers
+            .fetch_add(subscriptions.len(), Ordering::Relaxed);
         subscriptions
     }
 
-    pub fn publish(&self, event: Event) {
-        let subscriptions = self.subscriptions.clone();
+    /// Publish an event to all subscribers
+    pub fn broadcast(self: &Arc<Self>, event: Event) {
+        let this = self.clone();
         tokio::spawn(async move {
-            let subscriptions = subscriptions.read().await;
-            let subs = Self::get_keys_from_event(&event);
+            let subscriptions = this.subscriptions.read().await;
+            let subs = Self::get_keys_from_event(&event, this.min_prefix_match_len);
+            let mut deliverded = BTreeSet::new();
 
             for sub in subs {
-                for ((sub_type, _, sub_id), sender) in
-                    subscriptions.range(&(sub.clone(), ConnectionId(0), SubscriptionId::empty())..)
-                {
+                for ((sub_type, client, name), (filter, sender)) in subscriptions.range(
+                    &(
+                        sub.clone(),
+                        ConnectionId::new_empty(),
+                        SubscriptionId::empty(),
+                    )..,
+                ) {
                     if sub_type != &sub {
                         break;
                     }
 
-                    let _ = sender
-                        .send(Response::Event(relayer::Event {
-                            subscription_id: sub_id.clone(),
-                            event: event.clone(),
-                        }))
-                        .await;
+                    if deliverded.contains(client) || !filter.check(&event) {
+                        continue;
+                    }
+                    println!("send");
+
+                    let _ = sender.try_send(Response::Event((name, &event).into()));
+                    deliverded.insert(client.clone());
                 }
             }
-
-            todo!()
         });
     }
 }

+ 1 - 236
crates/relayer/src/subscription/mod.rs

@@ -1,239 +1,4 @@
-use nostr_rs_types::types::{Event, Filter, Tag};
-
 mod filter;
 mod manager;
 
-/// The subscription keys are used to quickly identify the subscriptions
-/// by one or more fields. Think of it like a database index
-#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
-pub enum Key {
-    Author(Vec<u8>),
-    RefPublicKey(Vec<u8>),
-    RefId(Vec<u8>),
-    Id(Vec<u8>),
-    Kind(u32),
-}
-
-#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Ord, Hash)]
-/// Client subscription
-pub struct Subscription {
-    author: Option<Vec<u8>>,
-    ref_public_key: Option<Vec<u8>>,
-    ref_id: Option<Vec<u8>>,
-    id: Option<Vec<u8>>,
-    kind: Option<u32>,
-}
-
-impl Subscription {
-    #[inline]
-    /// Creates a vector of subscriptions from a given slice of filters
-    pub fn from_filters(filters: &[Filter]) -> Vec<Subscription> {
-        let mut subs = vec![];
-        filters.iter().for_each(|filter| {
-            let authors: Vec<Option<Vec<u8>>> = if filter.authors.is_empty() {
-                vec![None]
-            } else {
-                filter
-                    .authors
-                    .iter()
-                    .map(|author| Some(author.to_vec()))
-                    .collect()
-            };
-            let ref_public_keys = if filter.references_to_public_key.is_empty() {
-                vec![None]
-            } else {
-                filter
-                    .references_to_public_key
-                    .iter()
-                    .map(|public_key| Some((*public_key).to_vec()))
-                    .collect()
-            };
-            let ref_ids = if filter.references_to_event.is_empty() {
-                vec![None]
-            } else {
-                filter
-                    .references_to_event
-                    .iter()
-                    .map(|id| Some((*id).to_vec()))
-                    .collect()
-            };
-            let kind = if filter.kinds.is_empty() {
-                vec![None]
-            } else {
-                filter
-                    .kinds
-                    .iter()
-                    .map(|kind| Some((*kind).into()))
-                    .collect()
-            };
-
-            let ids = if filter.ids.is_empty() {
-                vec![None]
-            } else {
-                filter.ids.iter().map(|id| Some((*id).to_vec())).collect()
-            };
-
-            for author in authors.iter() {
-                for id in ids.iter() {
-                    for ref_public_key in ref_public_keys.iter() {
-                        for ref_id in ref_ids.iter() {
-                            for kind in kind.iter() {
-                                subs.push(Subscription {
-                                    id: id.clone(),
-                                    ref_public_key: ref_public_key.clone(),
-                                    author: author.clone(),
-                                    ref_id: ref_id.clone(),
-                                    kind: *kind,
-                                });
-                            }
-                        }
-                    }
-                }
-            }
-        });
-
-        subs
-    }
-
-    #[inline]
-    /// Returns the a list of subscriptions that matches the given event
-    pub fn from_event(event: &Event) -> Vec<Subscription> {
-        let kind = event.kind().into();
-        let public_keys = vec![None, Some(event.author().as_ref().to_vec())];
-        let id = vec![None, Some(event.id.as_ref().to_vec())];
-        let kind = [None, Some(kind)];
-        let mut ref_public_keys = vec![None];
-        let mut ref_ids = vec![None];
-
-        event.tags().iter().for_each(|tag| match tag {
-            Tag::Event(x) => {
-                ref_ids.push(Some(x.id.as_ref().to_vec()));
-            }
-            Tag::PubKey(x) => {
-                ref_public_keys.push(Some(x.id.as_ref().to_vec()));
-            }
-            _ => {}
-        });
-
-        let mut subs = vec![];
-
-        for ref_public_key in ref_public_keys.iter() {
-            for ref_id in ref_ids.iter() {
-                for public_key in public_keys.iter() {
-                    for id in id.iter() {
-                        for kind in kind.iter() {
-                            subs.push(Subscription {
-                                ref_id: ref_id.clone(),
-                                ref_public_key: ref_public_key.clone(),
-                                author: public_key.clone(),
-                                id: id.clone(),
-                                kind: *kind,
-                            });
-                        }
-                    }
-                }
-            }
-        }
-
-        subs
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use super::*;
-    use nostr_rs_types::{types::Addr, Request};
-
-    #[test]
-    fn test_no_listen_to_all() {
-        let request: Request = serde_json::from_str("[\"REQ\",\"6440d5279e350\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681967101},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681967101},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid");
-        let mut subscriptions =
-            Subscription::from_filters(&request.as_request().expect("req").filters);
-        subscriptions.sort();
-
-        assert!(subscriptions
-            .binary_search(&Subscription::default())
-            .is_err())
-    }
-
-    #[test]
-    fn from_filters() {
-        let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
-        let mut subscriptions =
-            Subscription::from_filters(&request.as_request().expect("req").filters);
-        subscriptions.sort();
-
-        assert!(subscriptions
-            .binary_search(&Subscription::default())
-            .is_err())
-    }
-
-    #[test]
-    fn from_event() {
-        let new_event: Request = serde_json::from_str(r#"["EVENT", {"kind":1,"content":"Pong","tags":[["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]],"created_at":1681938616,"pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3","id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9","sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc","meta":{"revision":0,"created":1681939266488,"version":0},"$loki":108}]"#).expect("value");
-        let mut subscriptions = Subscription::from_event(&new_event.as_event().expect("event"));
-        subscriptions.sort();
-
-        let pk: Addr = "39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"
-            .try_into()
-            .expect("id");
-
-        let id: Addr = "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9"
-            .try_into()
-            .expect("id");
-
-        let ref_id: Addr = "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a"
-            .try_into()
-            .expect("id");
-
-        let author: Addr = "a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"
-            .try_into()
-            .expect("id");
-
-        let expected = vec![
-            Subscription {
-                ref_public_key: Some(pk.as_ref().to_vec()),
-                ..Subscription::default()
-            },
-            Subscription {
-                id: Some(id.as_ref().to_vec()),
-                ref_public_key: Some(pk.as_ref().to_vec()),
-                ..Subscription::default()
-            },
-            Subscription {
-                id: Some(id.as_ref().to_vec()),
-                ref_id: Some(ref_id.as_ref().to_vec()),
-                ref_public_key: Some(pk.as_ref().to_vec()),
-                ..Subscription::default()
-            },
-            Subscription {
-                author: Some(author.as_ref().to_vec()),
-                kind: Some(1),
-                ..Subscription::default()
-            },
-            Subscription {
-                author: Some(author.as_ref().to_vec()),
-                ..Subscription::default()
-            },
-            Subscription {
-                id: Some(id.as_ref().to_vec()),
-                ref_id: Some(ref_id.as_ref().to_vec()),
-                author: Some(author.as_ref().to_vec()),
-                ..Subscription::default()
-            },
-        ];
-
-        assert_eq!(subscriptions.len(), 72);
-        expected.iter().enumerate().for_each(|(i, sub)| {
-            assert!(
-                subscriptions.binary_search(sub).is_ok(),
-                "{} -> {:?}",
-                i,
-                sub
-            );
-        });
-        assert!(subscriptions
-            .binary_search(&Subscription::default())
-            .is_ok());
-    }
-}
+pub use self::manager::{ActiveSubscription, SubscriptionManager};

+ 9 - 0
crates/types/src/relayer/event.rs

@@ -15,6 +15,15 @@ pub struct Event {
     pub event: types::Event,
 }
 
+impl From<(&SubscriptionId, &types::Event)> for Event {
+    fn from((subscription_id, event): (&SubscriptionId, &types::Event)) -> Self {
+        Self {
+            subscription_id: subscription_id.clone(),
+            event: event.clone(),
+        }
+    }
+}
+
 impl SerializeDeserialize for Event {
     fn get_tag() -> &'static str {
         "EVENT"

+ 1 - 1
crates/types/src/types/kind.rs

@@ -13,7 +13,7 @@ use serde::{
 /// Any unsupported Kind will be wrapped under the Unknown type
 ///
 /// The Kind is represented as a u32 on the wire
-#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
 pub enum Kind {
     /// Metadata
     ///