Просмотр исходного кода

Working on a generic subscription manager crate

Cesar Rodas 2 месяцев назад
Родитель
Сommit
ce1ac6f167

+ 25 - 8
Cargo.lock

@@ -983,6 +983,7 @@ dependencies = [
  "nostr-rs-memory",
  "nostr-rs-relayer",
  "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "serde_json",
  "thiserror",
@@ -1016,6 +1017,7 @@ dependencies = [
  "async-trait",
  "futures",
  "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "tokio",
 ]
@@ -1047,6 +1049,7 @@ dependencies = [
  "nostr-rs-client",
  "nostr-rs-memory",
  "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "serde_json",
  "thiserror",
@@ -1062,6 +1065,7 @@ dependencies = [
  "chrono",
  "futures",
  "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "rocksdb",
  "serde_json",
@@ -1074,6 +1078,7 @@ dependencies = [
  "async-trait",
  "chrono",
  "futures",
+ "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "rand",
  "serde_json",
@@ -1082,6 +1087,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "nostr-rs-subscription-manager"
+version = "0.1.0"
+dependencies = [
+ "chrono",
+ "nostr-rs-types",
+ "serde",
+ "serde_json",
+ "tokio",
+]
+
+[[package]]
 name = "nostr-rs-types"
 version = "0.1.0"
 dependencies = [
@@ -1431,18 +1447,18 @@ dependencies = [
 
 [[package]]
 name = "serde"
-version = "1.0.183"
+version = "1.0.209"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
+checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.183"
+version = "1.0.209"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
+checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1451,11 +1467,12 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.105"
+version = "1.0.127"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360"
+checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad"
 dependencies = [
  "itoa",
+ "memchr",
  "ryu",
  "serde",
 ]
@@ -1615,9 +1632,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.39.2"
+version = "1.40.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1"
+checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
 dependencies = [
  "backtrace",
  "bytes",

+ 1 - 1
Cargo.toml

@@ -9,7 +9,7 @@ members = [
     "crates/client",
     "crates/relayer",
     "crates/storage/base",
-    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer",
+    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer", "crates/subscription-manager",
 ]
 
 [dependencies]

+ 1 - 0
crates/client/Cargo.toml

@@ -7,6 +7,7 @@ edition = "2021"
 thiserror = "1.0.40"
 nostr-rs-types = { path = "../types" }
 nostr-rs-storage-base = { path = "../storage/base" }
+nostr-rs-subscription-manager = { path = "../subscription-manager" }
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
 tokio-tungstenite = { version = "0.18.0", features = [
     "rustls",

+ 78 - 109
crates/client/src/pool/subscription.rs

@@ -1,28 +1,29 @@
 //! Subscription manager
-use super::AllClients;
 use crate::{client, Error};
 use futures::future::join_all;
-use nostr_rs_storage_base::Index;
+use nostr_rs_subscription_manager::{
+    ActiveSubscription as ActiveSubscriptionInner, SubscriptionManager,
+};
 use nostr_rs_types::{
     client::subscribe::{self, is_all_events},
     relayer,
-    types::{event::SortedFilter, Filter, Index as EventIndex, SubscriptionId},
+    types::SubscriptionId,
     Response,
 };
-use std::{
-    collections::{BTreeMap, HashSet},
-    sync::{
-        atomic::{AtomicUsize, Ordering},
-        Arc,
-    },
+use std::sync::{
+    atomic::{AtomicUsize, Ordering},
+    Arc,
 };
-use tokio::sync::{mpsc, RwLock};
+use tokio::sync::{mpsc, Mutex};
 use url::Url;
 
-#[derive(Debug, Copy, Eq, PartialEq, Clone)]
+use super::AllClients;
+
+#[derive(Debug, Copy, Default, Eq, PartialEq, Clone)]
 /// Subscription status
 pub enum Status {
     /// Subscription is awaiting to be subscribed
+    #[default]
     Queued,
     /// Subscribed is active
     Subscribed,
@@ -31,19 +32,14 @@ pub enum Status {
     Stale,
 }
 
-#[allow(dead_code)]
-struct Subscription {
-    /// Request to subscribe
-    subscription_request: subscribe::Subscribe,
+#[derive(Debug, Default)]
+struct SubscriptionInner {
     /// Active subscription (in the client side), when this is Drop all clients unsubscribes
     active_subscription: Option<Vec<client::ActiveSubscription>>,
     /// Subscription status
-    status: Status,
-    /// Reverse index
-    ///
-    /// This is a reverse index of the filters, it is only used to update the
-    /// main shared index when this subscription is dropped.
-    reverse_index: Vec<(Filter, Vec<EventIndex>)>,
+    status: Arc<Mutex<Status>>,
+    /// raw request
+    subscription_request: subscribe::Subscribe,
 }
 
 /// Active subscription
@@ -53,8 +49,8 @@ struct Subscription {
 ///
 /// This must be dropped to unsubscribe from the subscription manager
 pub struct ActiveSubscription {
-    id: (SubscriptionId, Option<Url>),
-    manager: Arc<Manager>,
+    _id: ActiveSubscriptionInner<PoolSubscriptionId, SubscriptionInner>,
+    status: Arc<Mutex<Status>>,
     active_subscriptions: Arc<AtomicUsize>,
     queued_subscriptions: Arc<AtomicUsize>,
     stale_subscriptions: Arc<AtomicUsize>,
@@ -62,33 +58,31 @@ pub struct ActiveSubscription {
 
 impl Drop for ActiveSubscription {
     fn drop(&mut self) {
-        let manager = self.manager.clone();
-        let id_to_remove = self.id.clone();
         let active_subscriptions = self.active_subscriptions.clone();
         let queued_subscriptions = self.queued_subscriptions.clone();
         let stale_subscriptions = self.stale_subscriptions.clone();
+        let status = self.status.clone();
 
         tokio::spawn(async move {
-            let mut subscriptions = manager.subscriptions.write().await;
-            let mut indexes = manager.index.write().await;
-
-            if let Some(subscription) = subscriptions.remove(&id_to_remove) {
-                match subscription.status {
-                    Status::Subscribed => active_subscriptions.fetch_sub(1, Ordering::Relaxed),
-                    Status::Queued => queued_subscriptions.fetch_sub(1, Ordering::Relaxed),
-                    Status::Stale => stale_subscriptions.fetch_sub(1, Ordering::Relaxed),
-                };
-
-                for (_, single_indexes) in subscription.reverse_index.iter() {
-                    for index in single_indexes.iter() {
-                        indexes.remove(&(index.clone(), id_to_remove.0.clone()));
-                    }
-                }
+            match *status.lock().await {
+                Status::Subscribed => active_subscriptions.fetch_sub(1, Ordering::Relaxed),
+                Status::Queued => queued_subscriptions.fetch_sub(1, Ordering::Relaxed),
+                Status::Stale => stale_subscriptions.fetch_sub(1, Ordering::Relaxed),
             }
         });
     }
 }
 
+/// Pool subscription ID
+#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
+pub struct PoolSubscriptionId((SubscriptionId, Option<Url>));
+
+impl Default for PoolSubscriptionId {
+    fn default() -> Self {
+        Self((SubscriptionId::empty(), None))
+    }
+}
+
 /// Subscription manager
 ///
 /// Clients who are added to the pool are automatically subscribed to all
@@ -102,8 +96,7 @@ impl Drop for ActiveSubscription {
 /// stream of future events to be a good citizen with other relayers.
 #[derive(Default)]
 pub(crate) struct Manager {
-    subscriptions: RwLock<BTreeMap<(SubscriptionId, Option<Url>), Subscription>>,
-    index: RwLock<BTreeMap<(EventIndex, SubscriptionId), SortedFilter>>,
+    subscription_manager: Arc<SubscriptionManager<PoolSubscriptionId, SubscriptionInner>>,
     all_clients: AllClients,
     active_subscriptions: Arc<AtomicUsize>,
     queued_subscriptions: Arc<AtomicUsize>,
@@ -127,21 +120,24 @@ impl Manager {
     ) -> Result<(), Error> {
         match message {
             Response::EndOfStoredEvents(subscription_id) => {
-                let subscription_id = (subscription_id.0, None);
-                let mut subscription = self.subscriptions.write().await;
-                subscription.get_mut(&subscription_id).map(|s| {
-                    s.status = Status::Stale;
+                let subscription_id = PoolSubscriptionId((subscription_id.0, None));
+                let mut subscription = self.subscription_manager.subbscriptions_mut().await;
+                if let Some(s) = subscription.get_mut(&subscription_id) {
+                    *s.status.lock().await = Status::Stale;
                     let _ = s.active_subscription.take();
 
                     self.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
                     self.stale_subscriptions.fetch_add(1, Ordering::Relaxed);
-                });
+                }
 
                 return_to
-                    .try_send((Response::EndOfStoredEvents(subscription_id.0.into()), url))
+                    .try_send((
+                        Response::EndOfStoredEvents(subscription_id.0 .0.into()),
+                        url,
+                    ))
                     .map_err(|e| Error::InternalChannel(e.to_string()))?;
 
-                return Ok(());
+                Ok(())
             }
             Response::Event(relayer::Event {
                 subscription_id,
@@ -152,7 +148,7 @@ impl Manager {
                     return_to
                         .try_send((
                             Response::Event(relayer::Event {
-                                subscription_id: subscription_id,
+                                subscription_id,
                                 event,
                             }),
                             url.clone(),
@@ -161,31 +157,16 @@ impl Manager {
                     return Ok(());
                 }
 
-                let index = self.index.read().await;
-                let mut matched = HashSet::new();
-                let event_index = event.get_indexes(4);
-
-                for idx in event_index {
-                    let mut start = index.range(&(idx.clone(), SubscriptionId::empty())..);
-
-                    while let Some(((current_idx, subscription_id), filter)) = start.next() {
-                        if current_idx != &idx {
-                            break;
-                        }
-
-                        if !matched.contains(subscription_id) && filter.check_event(&event) {
-                            return_to
-                                .try_send((
-                                    Response::Event(relayer::Event {
-                                        subscription_id: subscription_id.clone(),
-                                        event: event.clone(),
-                                    }),
-                                    url.clone(),
-                                ))
-                                .map_err(|e| Error::InternalChannel(e.to_string()))?;
-                            matched.insert(subscription_id.clone());
-                        }
-                    }
+                for id in self.subscription_manager.get_subscribers(&event).await {
+                    return_to
+                        .try_send((
+                            Response::Event(relayer::Event {
+                                subscription_id: id.0 .0.clone(),
+                                event: event.clone(),
+                            }),
+                            url.clone(),
+                        ))
+                        .map_err(|e| Error::InternalChannel(e.to_string()))?;
                 }
 
                 Ok(())
@@ -207,10 +188,10 @@ impl Manager {
         }
 
         let clients = self.all_clients.read().await;
-        let mut subscriptions = self.subscriptions.write().await;
+        let mut subscriptions = self.subscription_manager.subbscriptions_mut().await;
 
         for subscription in subscriptions.values_mut() {
-            if subscription.status == Status::Queued {
+            if *subscription.status.lock().await == Status::Queued {
                 let wait_all = clients
                     .values()
                     .map(|(_, (_, sender))| {
@@ -224,7 +205,7 @@ impl Manager {
                     .collect::<Result<Vec<_>, _>>()
                 {
                     subscription.active_subscription = Some(active_subscriptions);
-                    subscription.status = Status::Subscribed;
+                    *subscription.status.lock().await = Status::Subscribed;
 
                     let queued_subscribed =
                         self.queued_subscriptions.fetch_sub(1, Ordering::Relaxed);
@@ -241,37 +222,25 @@ impl Manager {
     /// Creates a new subscription with a given filters
     pub async fn subcribe(
         self: &Arc<Self>,
-        mut subscription_request: subscribe::Subscribe,
+        subscription_request: subscribe::Subscribe,
         specific_url: Option<Url>,
     ) -> ActiveSubscription {
-        let mut subscriptions = self.subscriptions.write().await;
-        let mut indexes = self.index.write().await;
-
-        let id = (subscription_request.subscription_id.clone(), specific_url);
-        let reverse_index: Vec<_> = subscription_request
-            .filters
-            .iter_mut()
-            .map(|f| {
-                let index = <&mut Filter as Into<Index>>::into(f).split();
-                (f.clone(), index)
-            })
-            .collect();
-
-        for (filter, single_indexes) in reverse_index.iter() {
-            for index in single_indexes.iter() {
-                indexes.insert((index.clone(), id.0.clone()), filter.clone().into());
-            }
-        }
-
-        subscriptions.insert(
-            id.clone(),
-            Subscription {
-                reverse_index,
-                subscription_request,
-                status: Status::Queued,
-                active_subscription: None,
-            },
-        );
+        let status = Arc::new(Mutex::new(Status::Queued));
+        let id = self
+            .subscription_manager
+            .subscribe(
+                PoolSubscriptionId((
+                    subscription_request.subscription_id.clone(),
+                    specific_url.clone(),
+                )),
+                subscription_request.filters.clone(),
+                SubscriptionInner {
+                    status: status.clone(),
+                    active_subscription: None,
+                    subscription_request,
+                },
+            )
+            .await;
 
         self.queued_subscriptions.fetch_add(1, Ordering::Relaxed);
 
@@ -281,8 +250,8 @@ impl Manager {
         });
 
         ActiveSubscription {
-            id,
-            manager: self.clone(),
+            _id: id,
+            status,
             active_subscriptions: self.active_subscriptions.clone(),
             queued_subscriptions: self.queued_subscriptions.clone(),
             stale_subscriptions: self.stale_subscriptions.clone(),

+ 1 - 0
crates/relayer/Cargo.toml

@@ -9,6 +9,7 @@ edition = "2021"
 nostr-rs-types = { path = "../types" }
 nostr-rs-storage-base = { path = "../storage/base" }
 nostr-rs-client = { path = "../client" }
+nostr-rs-subscription-manager = { path = "../subscription-manager" }
 futures-util = "0.3.27"
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
 tokio-tungstenite = { version = "0.18.0", features = [

+ 34 - 6
crates/relayer/src/connection/local.rs

@@ -1,23 +1,37 @@
 //! Local connection
 //!
 //! Add types for adding local connections
-use crate::{connection::ConnectionId, Error};
+use crate::{connection::ConnectionId, Error, Relayer};
+use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{Request, Response};
+use std::sync::Arc;
 use tokio::sync::mpsc::{Receiver, Sender};
 
 /// Local connection
-pub struct LocalConnection {
+pub struct LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
+{
     sender: Sender<(ConnectionId, Request)>,
     receiver: Receiver<Response>,
-    conn_id: ConnectionId,
+    pub(crate) conn_id: ConnectionId,
+    relayer: Arc<Relayer<T>>,
 }
 
-impl LocalConnection {
+impl<T> LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
+{
     /// Receive a message from the relayer
     pub async fn recv(&mut self) -> Option<Response> {
         self.receiver.recv().await
     }
 
+    /// Try to receive a message from the relayer
+    pub fn try_recv(&mut self) -> Option<Response> {
+        self.receiver.try_recv().ok()
+    }
+
     /// Sends a request to the relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         self.sender
@@ -27,24 +41,38 @@ impl LocalConnection {
     }
 }
 
-impl
+impl<T> Drop for LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
+{
+    fn drop(&mut self) {
+        self.relayer.drop_connection(self);
+    }
+}
+
+impl<T>
     From<(
         ConnectionId,
         Receiver<Response>,
         Sender<(ConnectionId, Request)>,
-    )> for LocalConnection
+        Arc<Relayer<T>>,
+    )> for LocalConnection<T>
+where
+    T: Storage + Send + Sync + 'static,
 {
     fn from(
         value: (
             ConnectionId,
             Receiver<Response>,
             Sender<(ConnectionId, Request)>,
+            Arc<Relayer<T>>,
         ),
     ) -> Self {
         LocalConnection {
             conn_id: value.0,
             receiver: value.1,
             sender: value.2,
+            relayer: value.3,
         }
     }
 }

+ 3 - 2
crates/relayer/src/connection/mod.rs

@@ -1,6 +1,7 @@
-use crate::{subscription::ActiveSubscription, Error};
+use crate::{relayer::RelayerSubscriptionId, Error};
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_client::pool;
+use nostr_rs_subscription_manager::ActiveSubscription;
 use nostr_rs_types::{
     relayer::{ok::ROkStatus, ROk},
     types::{Addr, SubscriptionId},
@@ -53,7 +54,7 @@ impl ConnectionId {
 
 type CompoundSubcription = (
     Option<pool::subscription::ActiveSubscription>,
-    Vec<ActiveSubscription>,
+    ActiveSubscription<RelayerSubscriptionId, ()>,
 );
 
 /// Relayer connection

+ 0 - 1
crates/relayer/src/lib.rs

@@ -13,7 +13,6 @@
 mod connection;
 mod error;
 mod relayer;
-mod subscription;
 
 pub use self::{
     connection::{Connection, LocalConnection},

+ 174 - 112
crates/relayer/src/relayer.rs

@@ -1,14 +1,14 @@
 use crate::{
     connection::{ConnectionId, LocalConnection},
-    subscription::SubscriptionManager,
     Connection, Error,
 };
 use futures_util::StreamExt;
 use nostr_rs_client::{Error as ClientError, Pool, Url};
 use nostr_rs_storage_base::Storage;
+use nostr_rs_subscription_manager::SubscriptionManager;
 use nostr_rs_types::{
     relayer::{self, ROk, ROkStatus},
-    types::{Addr, Event},
+    types::{Addr, Event, SubscriptionId},
     Request, Response,
 };
 use std::{
@@ -25,6 +25,21 @@ use tokio::{
     task::JoinHandle,
 };
 
+#[derive(Debug, Hash, Ord, PartialEq, PartialOrd, Eq, Clone)]
+pub struct RelayerSubscriptionId((SubscriptionId, ConnectionId));
+
+impl From<(SubscriptionId, ConnectionId)> for RelayerSubscriptionId {
+    fn from(value: (SubscriptionId, ConnectionId)) -> Self {
+        Self(value)
+    }
+}
+
+impl Default for RelayerSubscriptionId {
+    fn default() -> Self {
+        Self((SubscriptionId::empty(), ConnectionId::new_empty()))
+    }
+}
+
 /// Relayer struct
 ///
 pub struct Relayer<T: Storage + Send + Sync + 'static> {
@@ -34,7 +49,7 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// be able to perform any optimization like prefetching content while offline
     storage: Option<T>,
     /// Subscription manager
-    subscription_manager: Arc<SubscriptionManager>,
+    subscription_manager: Arc<SubscriptionManager<RelayerSubscriptionId, ()>>,
     /// List of all active connections
     connections: RwLock<HashMap<ConnectionId, Connection>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
@@ -158,9 +173,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
         let handle = tokio::spawn(async move {
             loop {
-                if receiver.len() > 500 {
-                    println!("{}", receiver.len());
-                }
                 if let Some((response, _)) = receiver.recv().await {
                     match response {
                         Response::Event(event) => {
@@ -187,12 +199,33 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     }
 
     /// Adds a new local connection to the list of active connections.
-    pub async fn create_new_local_connection(&self) -> LocalConnection {
+    pub async fn create_new_local_connection(self: &Arc<Self>) -> LocalConnection<T> {
         let (conn, receiver) = Connection::new_local_connection();
         let conn_id = conn.get_conn_id();
         self.connections.write().await.insert(conn_id, conn);
 
-        (conn_id, receiver, self.send_to_relayer.clone()).into()
+        (
+            conn_id,
+            receiver,
+            self.send_to_relayer.clone(),
+            self.clone(),
+        )
+            .into()
+    }
+
+    /// Drops a connection from the list of active connections
+    ///
+    /// This function only works for local connections, normal connections can
+    /// be dropped on disconnection.
+    ///
+    /// This function could change in the future tu kick connections programmatically
+    pub fn drop_connection(self: &Arc<Self>, local_connection: &LocalConnection<T>) {
+        let id = local_connection.conn_id.clone();
+        let this = self.clone();
+
+        tokio::spawn(async move {
+            this.connections.write().await.remove(&id);
+        });
     }
 
     /// Adds a new TpStream and adds it to the list of active connections.
@@ -292,9 +325,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                             foreign_subscription,
                             self.subscription_manager
                                 .subscribe(
-                                    connection.get_conn_id(),
-                                    connection.get_sender(),
-                                    request.clone(),
+                                    (request.subscription_id, connection.get_conn_id()).into(),
+                                    request.filters,
+                                    (),
                                 )
                                 .await,
                         ),
@@ -318,7 +351,20 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             }
         }
 
-        self.subscription_manager.broadcast(event.clone());
+        let connections = self.connections.read().await;
+        for RelayerSubscriptionId((sub_id, conn_id)) in
+            self.subscription_manager.get_subscribers(event).await
+        {
+            if let Some(connection) = connections.get(&conn_id) {
+                let _ = connection.send(
+                    relayer::Event {
+                        subscription_id: sub_id,
+                        event: event.clone(),
+                    }
+                    .into(),
+                );
+            }
+        }
         Ok(true)
     }
 }
@@ -354,6 +400,17 @@ mod test {
         )
     }
 
+    async fn dummy_server_with_relayer(
+        client_pool: Option<Pool>,
+    ) -> (Arc<Relayer<Memory>>, JoinHandle<()>) {
+        let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).await.unwrap();
+
+        let relayer =
+            Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
+        let (relayer, stopper) = relayer.main(listener).expect("valid main loop");
+        (relayer, stopper)
+    }
+
     fn get_note_with_custom_tags(tags: Vec<Tag>) -> Event {
         let account = Account::default();
         let content = Content::ShortTextNote("".to_owned());
@@ -582,44 +639,43 @@ mod test {
                 }
         ]))
         .expect("valid object");
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+        let mut receiver = relayer.create_new_local_connection().await;
+        let mut publisher = relayer.create_new_local_connection().await;
 
         assert_eq!(relayer.total_subscribers(), 0);
-        let _ = relayer
-            .process_request_from_client(&connection, request)
-            .await;
 
-        assert_eq!(relayer.total_subscribers(), 5);
+        receiver.send(request).await.expect("subscribe");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // eod
-        assert!(recv
+        assert!(receiver
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
         // It is empty
-        assert!(recv.try_recv().is_err());
+        assert!(receiver.try_recv().is_none());
 
-        relayer
-            .process_request_from_client(&connection, get_note())
-            .await
-            .expect("process event");
+        publisher.send(get_note()).await.expect("valid send");
 
-        sleep(Duration::from_millis(100)).await;
+        sleep(Duration::from_millis(10)).await;
 
         // ok from posting
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = publisher.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
             "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
         );
 
         // It is not empty
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = receiver.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok")
                 .as_event()
@@ -630,12 +686,12 @@ mod test {
         );
 
         // it must be deliverd at most once
-        assert!(recv.try_recv().is_err());
-        assert_eq!(relayer.total_subscribers(), 5);
+        assert!(receiver.try_recv().is_none());
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // when client is dropped, the subscription is removed
         // automatically
-        drop(connection);
+        drop(receiver);
 
         sleep(Duration::from_millis(10)).await;
 
@@ -674,44 +730,44 @@ mod test {
                 }
         ]))
         .expect("valid object");
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+
+        let mut receiver = relayer.create_new_local_connection().await;
+        let mut publisher = relayer.create_new_local_connection().await;
 
         assert_eq!(relayer.total_subscribers(), 0);
-        let _ = relayer
-            .process_request_from_client(&connection, request)
-            .await;
 
-        assert_eq!(relayer.total_subscribers(), 5);
+        receiver.send(request).await.expect("subscribe");
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // eod
-        assert!(recv
+        assert!(receiver
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
         // It is empty
-        assert!(recv.try_recv().is_err());
+        assert!(receiver.try_recv().is_none());
 
-        relayer
-            .process_request_from_client(&connection, get_note())
-            .await
-            .expect("process event");
+        publisher.send(get_note()).await.expect("valid send");
 
         sleep(Duration::from_millis(100)).await;
 
         // ok from posting
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = publisher.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
             "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
         );
 
         // It is not empty
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = receiver.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok")
                 .as_event()
@@ -722,12 +778,12 @@ mod test {
         );
 
         // it must be deliverd at most once
-        assert!(recv.try_recv().is_err());
-        assert_eq!(relayer.total_subscribers(), 5);
+        assert!(receiver.try_recv().is_none());
+        assert_eq!(relayer.total_subscribers(), 1);
 
         // when client is dropped, the subscription is removed
         // automatically
-        drop(connection);
+        drop(receiver);
 
         sleep(Duration::from_millis(10)).await;
 
@@ -745,50 +801,57 @@ mod test {
         }]))
         .expect("valid object");
 
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (publisher, mut recv) = Connection::new_local_connection();
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+        let mut publisher = relayer.create_new_local_connection().await;
 
-        let mut set1 = (0..1000)
-            .map(|_| Connection::new_local_connection())
-            .collect::<Vec<_>>();
-
-        let mut set2 = (0..100)
-            .map(|_| Connection::new_local_connection())
-            .collect::<Vec<_>>();
-
-        let subscribe1 = set1
-            .iter()
-            .map(|(connection, _)| relayer.process_request_from_client(connection, req1.clone()))
-            .collect::<Vec<_>>();
+        let mut set1 = join_all(
+            (0..1000)
+                .map(|_| relayer.create_new_local_connection())
+                .collect::<Vec<_>>(),
+        )
+        .await;
 
-        let subscribe2 = set2
-            .iter()
-            .map(|(connection, _)| relayer.process_request_from_client(connection, req2.clone()))
-            .collect::<Vec<_>>();
+        let mut set2 = join_all(
+            (0..100)
+                .map(|_| relayer.create_new_local_connection())
+                .collect::<Vec<_>>(),
+        )
+        .await;
 
         assert_eq!(relayer.total_subscribers(), 0);
 
-        join_all(subscribe1)
-            .await
-            .into_iter()
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid calls");
-        join_all(subscribe2)
-            .await
-            .into_iter()
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid calls");
+        join_all(
+            set1.iter()
+                .map(|connection| connection.send(req1.clone()))
+                .collect::<Vec<_>>(),
+        )
+        .await
+        .into_iter()
+        .collect::<Result<Vec<_>, _>>()
+        .expect("subscribe all");
+
+        join_all(
+            set2.iter()
+                .map(|connection| connection.send(req2.clone()))
+                .collect::<Vec<_>>(),
+        )
+        .await
+        .into_iter()
+        .collect::<Result<Vec<_>, _>>()
+        .expect("subscribe all");
 
-        for (_, recv) in set1.iter_mut() {
-            assert!(recv
+        sleep(Duration::from_millis(10)).await;
+
+        for connection in set1.iter_mut() {
+            assert!(connection
                 .try_recv()
                 .expect("end of stored events")
                 .as_end_of_stored_events()
                 .is_some());
         }
 
-        for (_, recv) in set2.iter_mut() {
-            assert!(recv
+        for connection in set2.iter_mut() {
+            assert!(connection
                 .try_recv()
                 .expect("end of stored events")
                 .as_end_of_stored_events()
@@ -797,28 +860,24 @@ mod test {
 
         assert_eq!(relayer.total_subscribers(), 1100);
 
-        relayer
-            .process_request_from_client(&publisher, get_note())
-            .await
-            .expect("process event");
+        publisher.send(get_note().into()).await.expect("valid send");
 
         sleep(Duration::from_millis(10)).await;
 
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = publisher.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
             "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
         );
 
-        for (_, recv) in set1.iter_mut() {
-            assert!(recv.try_recv().is_err());
+        for connection in set1.iter_mut() {
+            assert!(connection.try_recv().is_none());
         }
 
-        for (_, recv) in set2.iter_mut() {
-            let msg = recv.try_recv();
-            println!("{:?}", msg);
-            assert!(msg.is_ok());
+        for connection in set2.iter_mut() {
+            let msg = connection.try_recv();
+            assert!(msg.is_some());
             let msg = msg.expect("msg");
 
             assert_eq!(
@@ -826,7 +885,7 @@ mod test {
                 "1298169700973717".to_owned()
             );
 
-            assert!(recv.try_recv().is_err());
+            assert!(connection.try_recv().is_none());
         }
 
         drop(set1);
@@ -872,44 +931,47 @@ mod test {
         let request: Request =
             serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
 
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
+        let (relayer, _stopper) = dummy_server_with_relayer(None).await;
+
+        let mut local_connection_0 = relayer.create_new_local_connection().await;
+        let mut local_connection_1 = relayer.create_new_local_connection().await;
 
         assert_eq!(relayer.total_subscribers(), 0);
-        let _ = relayer
-            .process_request_from_client(&connection, request)
-            .await;
+
+        local_connection_1.send(request).await.expect("valid send");
+
+        sleep(Duration::from_millis(10)).await;
 
         assert_eq!(relayer.total_subscribers(), 1);
 
         // eod
-        assert!(recv
+        assert!(local_connection_1
             .try_recv()
             .expect("valid")
             .as_end_of_stored_events()
             .is_some());
 
         // It is empty
-        assert!(recv.try_recv().is_err());
+        assert!(local_connection_1.try_recv().is_none());
 
-        relayer
-            .process_request_from_client(&connection, get_note())
+        local_connection_0
+            .send(get_note().into())
             .await
-            .expect("process event");
+            .expect("valid send");
 
         sleep(Duration::from_millis(10)).await;
 
         // ok from posting
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = local_connection_0.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
             "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
         );
 
         // It is not empty
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
+        let msg = local_connection_1.try_recv();
+        assert!(msg.is_some());
         assert_eq!(
             msg.expect("is ok")
                 .as_event()
@@ -920,12 +982,12 @@ mod test {
         );
 
         // it must be deliverd at most once
-        assert!(recv.try_recv().is_err());
+        assert!(local_connection_1.try_recv().is_none());
         assert_eq!(relayer.total_subscribers(), 1);
 
         // when client is dropped, the subscription is removed
         // automatically
-        drop(connection);
+        drop(local_connection_1);
 
         sleep(Duration::from_millis(10)).await;
 

+ 0 - 180
crates/relayer/src/subscription/manager.rs

@@ -1,180 +0,0 @@
-use crate::connection::ConnectionId;
-use nostr_rs_storage_base::Index;
-use nostr_rs_types::{
-    client::Subscribe,
-    types::{
-        event::{Event, Index as EventIndex, SortedFilter},
-        SubscriptionId,
-    },
-    Response,
-};
-use std::{
-    collections::{BTreeMap, HashSet},
-    sync::{
-        atomic::{AtomicUsize, Ordering},
-        Arc,
-    },
-};
-use tokio::sync::{mpsc::Sender, RwLock};
-
-type SubIdx = (EventIndex, ConnectionId, SubscriptionId);
-
-pub const MIN_PREFIX_MATCH_LEN: usize = 2;
-
-/// Subscription for a connection
-///
-/// This object is responsible for keeping track of a subscription for a connection
-///
-/// When dropped their listener will be removed from the subscription manager automatically
-#[derive(Clone, Debug)]
-pub struct ActiveSubscription {
-    conn_id: ConnectionId,
-    name: SubscriptionId,
-    indexes: Vec<EventIndex>,
-    manager: Arc<SubscriptionManager>,
-}
-
-impl ActiveSubscription {
-    fn new(
-        conn_id: ConnectionId,
-        name: SubscriptionId,
-        index: Index,
-        manager: Arc<SubscriptionManager>,
-    ) -> Self {
-        Self {
-            conn_id,
-            name,
-            indexes: index.split(),
-            manager,
-        }
-    }
-}
-
-impl Drop for ActiveSubscription {
-    /// When the subscription is dropped, it will remove the listener from the
-    /// subscription manager
-    fn drop(&mut self) {
-        let indexes = self
-            .indexes
-            .drain(..)
-            .map(|key| (key, self.conn_id, self.name.clone()))
-            .collect::<Vec<_>>();
-
-        let manager = self.manager.clone();
-
-        tokio::spawn(async move {
-            manager.unsubscribe(indexes).await;
-        });
-    }
-}
-
-type SubscriptionValue = (Arc<SortedFilter>, Sender<Response>);
-
-/// Subscription manager
-///
-/// This object is responsible for letting clients and processes subscribe to
-/// events,
-#[derive(Debug)]
-pub struct SubscriptionManager {
-    /// List of subscriptions with their filters and their index.
-    ///
-    /// A single request may be converted to multiple subscriptions entry as
-    /// they are sorted by their index/key
-    subscriptions: RwLock<BTreeMap<SubIdx, SubscriptionValue>>,
-    /// Total number of subscribers
-    /// A single REQ may have multiple subscriptions
-    total_subscribers: AtomicUsize,
-    /// Minimum prefix match length
-    min_prefix_match_len: usize,
-}
-
-impl Default for SubscriptionManager {
-    fn default() -> Self {
-        Self {
-            subscriptions: Default::default(),
-            total_subscribers: Default::default(),
-            min_prefix_match_len: MIN_PREFIX_MATCH_LEN,
-        }
-    }
-}
-
-impl SubscriptionManager {
-    async fn unsubscribe(self: Arc<Self>, keys: Vec<SubIdx>) {
-        let mut subscriptions = self.subscriptions.write().await;
-        for sub in keys {
-            subscriptions.remove(&sub);
-        }
-        self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
-    }
-
-    /// Get the number of subscribers
-    pub fn total_subscribers(self: &Arc<Self>) -> usize {
-        self.total_subscribers.load(Ordering::Relaxed)
-    }
-
-    /// Subscribe to a future events
-    ///
-    /// This will add a new subscription to the subscription manager with a
-    /// given conn_id, sender and a vector of filters.
-    pub async fn subscribe(
-        self: &Arc<Self>,
-        conn_id: ConnectionId,
-        sender: Sender<Response>,
-        request: Subscribe,
-    ) -> Vec<ActiveSubscription> {
-        let name = request.subscription_id;
-        let mut subscriptions = self.subscriptions.write().await;
-        let subscriptions = request
-            .filters
-            .into_iter()
-            .map(|mut filter| {
-                let index: Index = (&mut filter).into();
-                let filter = Arc::new(SortedFilter::from(filter));
-                let subscription =
-                    ActiveSubscription::new(conn_id, name.clone(), index, self.clone());
-
-                for key in subscription.indexes.iter() {
-                    subscriptions.insert(
-                        (key.clone(), conn_id, name.clone()),
-                        (filter.clone(), sender.clone()),
-                    );
-                }
-                subscription
-            })
-            .collect::<Vec<_>>();
-        self.total_subscribers
-            .fetch_add(subscriptions.len(), Ordering::Relaxed);
-        subscriptions
-    }
-
-    /// Publish an event to all subscribers
-    pub fn broadcast(self: &Arc<Self>, event: Event) {
-        let this = self.clone();
-        tokio::spawn(async move {
-            let subscriptions = this.subscriptions.read().await;
-            let indexes = event.get_indexes(this.min_prefix_match_len);
-            let mut deliverded = HashSet::new();
-
-            for index in indexes {
-                for ((sub_type, client, name), (filter, sender)) in subscriptions.range(
-                    &(
-                        index.clone(),
-                        ConnectionId::new_empty(),
-                        SubscriptionId::empty(),
-                    )..,
-                ) {
-                    if sub_type != &index {
-                        break;
-                    }
-
-                    if deliverded.contains(client) || !filter.check_event(&event) {
-                        continue;
-                    }
-
-                    let _ = sender.try_send(Response::Event((name, &event).into()));
-                    deliverded.insert(*client);
-                }
-            }
-        });
-    }
-}

+ 0 - 3
crates/relayer/src/subscription/mod.rs

@@ -1,3 +0,0 @@
-mod manager;
-
-pub use self::manager::{ActiveSubscription, SubscriptionManager};

+ 1 - 0
crates/storage/base/Cargo.toml

@@ -5,6 +5,7 @@ edition = "2021"
 
 [dependencies]
 nostr-rs-types = { path = "../../types" }
+nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 thiserror = "1.0.40"
 rand = "0.8.5"
 tokio = { version = "1.32.0", features = ["full"] }

+ 2 - 1
crates/storage/base/src/cursor.rs

@@ -1,6 +1,7 @@
 use crate::Error;
 use futures::FutureExt;
-use nostr_rs_types::types::{event::SortedFilter, Event, Filter};
+use nostr_rs_subscription_manager::SortedFilter;
+use nostr_rs_types::types::{Event, Filter};
 use std::{
     future::Future,
     pin::Pin,

+ 1 - 2
crates/storage/base/src/lib.rs

@@ -6,7 +6,6 @@
 #![allow(dead_code)]
 pub mod cursor;
 mod error;
-mod index;
 mod secondary_index;
 mod storage;
 
@@ -16,7 +15,7 @@ pub mod test;
 #[cfg(feature = "test")]
 pub use tokio;
 
-pub use crate::{error::Error, index::Index, secondary_index::SecondaryIndex, storage::Storage};
+pub use crate::{error::Error, secondary_index::SecondaryIndex, storage::Storage};
 
 #[macro_export]
 /// This macro creates the

+ 1 - 0
crates/storage/memory/Cargo.toml

@@ -7,6 +7,7 @@ edition = "2021"
 async-trait = "0.1.81"
 futures = "0.3.30"
 nostr-rs-storage-base = { path = "../base" }
+nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
 tokio = { version = "1.39.2", features = ["full"] }
 

+ 2 - 1
crates/storage/memory/src/cursor.rs

@@ -4,7 +4,8 @@ use nostr_rs_storage_base::{
     cursor::{check_future_call, FutureResult, FutureValue},
     Error, Storage,
 };
-use nostr_rs_types::types::{event::SortedFilter, Event};
+use nostr_rs_subscription_manager::SortedFilter;
+use nostr_rs_types::types::Event;
 use std::{
     collections::{BTreeMap, VecDeque},
     pin::Pin,

+ 9 - 8
crates/storage/memory/src/lib.rs

@@ -1,5 +1,6 @@
-use nostr_rs_storage_base::{Error, Index, SecondaryIndex, Storage};
-use nostr_rs_types::types::{event::SortedFilter, Event, Filter};
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
+use nostr_rs_subscription_manager::{CompoundIndex, SortedFilter};
+use nostr_rs_types::types::{Event, Filter};
 use std::{
     cmp::min,
     collections::{BTreeMap, VecDeque},
@@ -141,36 +142,36 @@ impl Storage for Memory {
             Some(query.limit.try_into()?)
         };
 
-        let query_index: Index = (&mut query).into();
+        let query_index: CompoundIndex = (&mut query).into();
 
         let (index, index_prefixes) = match query_index {
-            Index::Tag(tag_name, tags) => (
+            CompoundIndex::Tag(tag_name, tags) => (
                 self.indexes.tags.read().await,
                 tags.into_iter()
                     .map(|tag| tag.into_bytes(&tag_name))
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Author(authors) => (
+            CompoundIndex::Author(authors) => (
                 self.indexes.author.read().await,
                 authors
                     .into_iter()
                     .map(|c| c.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Id(ids) => (
+            CompoundIndex::Id(ids) => (
                 self.indexes.ids.read().await,
                 ids.into_iter()
                     .map(|c| c.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Kind(kind) => (
+            CompoundIndex::Kind(kind) => (
                 self.indexes.kind.read().await,
                 kind.into_iter()
                     .map(|kind| kind.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
 
-            Index::TableScan => (
+            CompoundIndex::TableScan => (
                 self.indexes.ids_by_time.read().await,
                 vec![Vec::new()].into(), // all keys
             ),

+ 1 - 0
crates/storage/rocksdb/Cargo.toml

@@ -7,6 +7,7 @@ edition = "2021"
 
 [dependencies]
 nostr-rs-storage-base = { path = "../base" }
+nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
 rocksdb = { version = "0.20.1", features = [
     "multi-threaded-cf",

+ 2 - 1
crates/storage/rocksdb/src/cursor.rs

@@ -5,7 +5,8 @@ use nostr_rs_storage_base::{
     cursor::{check_future_call, FutureResult, FutureValue},
     Error, Storage,
 };
-use nostr_rs_types::types::{event::SortedFilter, Event};
+use nostr_rs_subscription_manager::SortedFilter;
+use nostr_rs_types::types::Event;
 use rocksdb::{DBIteratorWithThreadMode, DB};
 use std::{
     collections::VecDeque,

+ 8 - 7
crates/storage/rocksdb/src/lib.rs

@@ -1,6 +1,7 @@
 //! Rocks DB implementation of the storage layer
 use crate::cursor::Cursor;
-use nostr_rs_storage_base::{Error, Index, SecondaryIndex, Storage};
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
+use nostr_rs_subscription_manager::CompoundIndex;
 use nostr_rs_types::types::{Event, Filter};
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
@@ -195,24 +196,24 @@ impl Storage for RocksDb {
             Some(query.limit.try_into()?)
         };
 
-        let query_index: Index = (&mut query).into();
+        let query_index: CompoundIndex = (&mut query).into();
 
         let (index, secondary_index_iterator, prefixes) = match query_index {
-            Index::Tag(tag_name, tags) => (
+            CompoundIndex::Tag(tag_name, tags) => (
                 Some(ReferenceType::Tags),
                 None,
                 tags.into_iter()
                     .map(|tag| tag.into_bytes(&tag_name))
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Id(ids) => (
+            CompoundIndex::Id(ids) => (
                 None,
                 None,
                 ids.into_iter()
                     .map(|id| id.to_vec())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Author(authors) => (
+            CompoundIndex::Author(authors) => (
                 Some(ReferenceType::Author),
                 None,
                 authors
@@ -220,7 +221,7 @@ impl Storage for RocksDb {
                     .map(|author| author.to_vec())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::Kind(kinds) => (
+            CompoundIndex::Kind(kinds) => (
                 Some(ReferenceType::Kind),
                 None,
                 kinds
@@ -228,7 +229,7 @@ impl Storage for RocksDb {
                     .map(|kind| kind.into_bytes())
                     .collect::<VecDeque<_>>(),
             ),
-            Index::TableScan => (
+            CompoundIndex::TableScan => (
                 Some(ReferenceType::Stream),
                 Some(self.db.iterator_cf(
                     &self.reference_to_cf_handle(ReferenceType::Stream)?,

+ 13 - 0
crates/subscription-manager/Cargo.toml

@@ -0,0 +1,13 @@
+[package]
+name = "nostr-rs-subscription-manager"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+chrono = "0.4.38"
+nostr-rs-types = { path = "../types" }
+tokio = { version = "1.40.0", features = ["full"] }
+
+[dev-dependencies]
+serde = "1.0.209"
+serde_json = "1.0.127"

+ 5 - 1
crates/types/src/types/event/filter.rs → crates/subscription-manager/src/filter.rs

@@ -1,5 +1,9 @@
-use crate::types::{filter::TagValue, Event, Filter as FilterT, Kind};
+//! Filter crate
+//!
+//! This mod provides a way to transform a filter into a sorted filter, that can
+//! match events much efficiently.
 use chrono::{DateTime, Utc};
+use nostr_rs_types::types::{filter::TagValue, Event, Filter as FilterT, Kind};
 use std::{
     collections::{HashMap, HashSet},
     ops::Deref,

+ 66 - 34
crates/storage/base/src/index.rs → crates/subscription-manager/src/index.rs

@@ -1,16 +1,50 @@
-//! Indexes for the storage engine.
+//! Indexes for the subscription manager
 //!
-//! This module contains the definition of the indexes used by the storage
-//! engine and provides a convenient conversion from less useful to more useful
-//! indexes by default.
-//!
-//! Each storage engine can use this implementation or have their own
-use nostr_rs_types::types::{filter::TagValue, Addr, Filter, Id, Index as EventIndex, Kind};
+//! This module contains the definition of the indexes used by the subscription
+//! manager to provide a convenient way to choose indexes to reduce the number
+//! of checks before sending the events to the subscribers.
+use nostr_rs_types::types::{filter::TagValue, Event, Filter, Id, Kind};
 use std::collections::HashSet;
 
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+/// Events can be indexed by different fields.
+///
+/// This index is a multi-value index, meaning that it can be indexed by multiple fields
+pub enum Index {
+    /// Author ID
+    Author(Id),
+    /// Note ID
+    Id(Id),
+    /// Note Kind
+    Kind(Kind),
+    /// Tag
+    Tag(String, TagValue),
+    /// A catch-all index
+    Anything,
+}
+
+impl Index {
+    /// Converts an event into a list of indexes
+    pub fn from(event: &Event) -> Vec<Self> {
+        let mut subscriptions = vec![];
+
+        subscriptions.push(Index::Author(event.author().to_owned()));
+        subscriptions.push(Index::Id(event.id.to_owned()));
+
+        for t in event.tags() {
+            t.get_indexable_value()
+                .map(|v| subscriptions.push(Index::Tag(t.get_identifier().to_owned(), v)));
+        }
+
+        subscriptions.push(Index::Kind(event.kind()));
+        subscriptions.push(Index::Anything);
+        subscriptions
+    }
+}
+
 /// Indexes for the storage engine.
 #[derive(Debug, Clone)]
-pub enum Index {
+pub enum CompoundIndex {
     /// Index by tag.
     Tag(String, HashSet<TagValue>),
     /// Index by id.
@@ -23,28 +57,28 @@ pub enum Index {
     TableScan,
 }
 
-impl Index {
+impl CompoundIndex {
     /// Splits the index into a list of single indexes.
-    pub fn split(self) -> Vec<EventIndex> {
+    pub fn split(self) -> Vec<Index> {
         match self {
-            Index::Tag(tag, tags) => tags
+            CompoundIndex::Tag(tag, tags) => tags
                 .into_iter()
-                .map(|tag_value| EventIndex::Tag(tag.clone(), tag_value))
+                .map(|tag_value| Index::Tag(tag.clone(), tag_value))
                 .collect(),
-            Index::Id(ids) => ids.into_iter().map(EventIndex::Id).collect(),
-            Index::Author(authors) => authors.into_iter().map(EventIndex::Author).collect(),
-            Index::Kind(kinds) => kinds.into_iter().map(EventIndex::Kind).collect(),
-            Index::TableScan => vec![EventIndex::Anything],
+            CompoundIndex::Id(ids) => ids.into_iter().map(Index::Id).collect(),
+            CompoundIndex::Author(authors) => authors.into_iter().map(Index::Author).collect(),
+            CompoundIndex::Kind(kinds) => kinds.into_iter().map(Index::Kind).collect(),
+            CompoundIndex::TableScan => vec![Index::Anything],
         }
     }
 }
 
-impl From<&mut Filter> for Index {
+impl From<&mut Filter> for CompoundIndex {
     fn from(query: &mut Filter) -> Self {
         if !query.ids.is_empty() {
-            Index::Id(std::mem::take(&mut query.ids))
+            CompoundIndex::Id(std::mem::take(&mut query.ids))
         } else if !query.authors.is_empty() {
-            Index::Author(std::mem::take(&mut query.authors))
+            CompoundIndex::Author(std::mem::take(&mut query.authors))
         } else if !query.tags.is_empty() {
             // find the tag with fewer options to iterate over
             // and convert that tag as the index.
@@ -60,24 +94,22 @@ impl From<&mut Filter> for Index {
             let (key, _) = tag_with_fewer_opts.remove(0);
 
             if let Some(tags) = query.tags.remove(&key) {
-                Index::Tag(key, tags)
+                CompoundIndex::Tag(key, tags)
             } else {
-                Index::TableScan
+                CompoundIndex::TableScan
             }
         } else if !query.kinds.is_empty() {
-            Index::Kind(std::mem::take(&mut query.kinds))
+            CompoundIndex::Kind(std::mem::take(&mut query.kinds))
         } else {
-            Index::TableScan
+            CompoundIndex::TableScan
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use serde_json::json;
-
     use super::*;
-    use std::collections::HashMap;
+    use serde_json::json;
 
     #[test]
     fn test_index_preference_ids_over_others() {
@@ -89,8 +121,8 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Id(_)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Id(_)));
     }
 
     #[test]
@@ -102,8 +134,8 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Author(_)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Author(_)));
     }
 
     #[test]
@@ -114,8 +146,8 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Tag(_, _)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Tag(_, _)));
     }
 
     #[test]
@@ -125,7 +157,7 @@ mod tests {
         }))
         .expect("filter");
 
-        let index = Index::from(&mut filter);
-        assert!(matches!(index, Index::Kind(_)));
+        let index = CompoundIndex::from(&mut filter);
+        assert!(matches!(index, CompoundIndex::Kind(_)));
     }
 }

+ 214 - 0
crates/subscription-manager/src/lib.rs

@@ -0,0 +1,214 @@
+//! Subscription manager
+//!
+//! This crate provides a subscription manager or matching engine for
+//! subscriptions and events.
+//!
+//! This crate provides a generic efficient way of keeping track of
+//! subscriptions and check an event to get their listeners
+//!
+//! Each subscription has a droppable struct that will remove the subscription
+//! on Drop.
+//!
+//! Any delivery mechanism or any other form of communication is not part of
+//! this crate
+#![deny(missing_docs, warnings)]
+
+use nostr_rs_types::types::{Event, Filter};
+use std::{
+    collections::{BTreeMap, HashSet},
+    fmt::Debug,
+    hash::Hash,
+    ops::{Deref, DerefMut},
+    sync::{atomic::AtomicUsize, Arc},
+};
+use tokio::sync::{RwLock, RwLockWriteGuard};
+
+mod filter;
+mod index;
+
+pub use self::{
+    filter::SortedFilter,
+    index::{CompoundIndex, Index},
+};
+
+/// Subscription value
+pub struct Subscription<T>
+where
+    T: Sync + Send,
+{
+    /// inner object
+    inner: T,
+    /// Reverse index
+    ///
+    /// This is a reverse index of the filters, it is only used to update the
+    /// main shared index when this subscription is dropped.
+    reverse_index: Vec<Vec<Index>>,
+}
+
+impl<T> Deref for Subscription<T>
+where
+    T: Sync + Send,
+{
+    type Target = T;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+impl<T> DerefMut for Subscription<T>
+where
+    T: Sync + Send,
+{
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.inner
+    }
+}
+
+/// Active subscription
+///
+/// This is a droppable struct that will remove the subscription from the
+/// manager on Drop.
+///
+/// The callee must keep this struct alive to keep the subscription alive.
+pub struct ActiveSubscription<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    id: I,
+    manager: Option<Arc<SubscriptionManager<I, T>>>,
+}
+
+impl<I, T> Drop for ActiveSubscription<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    fn drop(&mut self) {
+        if let Some(manager) = self.manager.take() {
+            manager.unsubscribe(self);
+        }
+    }
+}
+
+/// Subscription manager
+///
+/// This is the main struct that keeps track of all the subscriptions
+///
+/// The generic type `I` is the type of the subscription ID (which is outside of
+/// the scope of this crate) and the T which is space to keep aditional data
+/// associate with a subscription
+#[derive(Default)]
+pub struct SubscriptionManager<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    subscriptions: RwLock<BTreeMap<I, Subscription<T>>>,
+    index: RwLock<BTreeMap<(Index, I), SortedFilter>>,
+    total_subscribers: AtomicUsize,
+}
+
+impl<I, T> SubscriptionManager<I, T>
+where
+    I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
+    T: Sync + Send + 'static,
+{
+    fn unsubscribe(self: Arc<Self>, subscription: &mut ActiveSubscription<I, T>) {
+        let id_to_remove = subscription.id.clone();
+        self.total_subscribers
+            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+        tokio::spawn(async move {
+            let mut subscriptions = self.subscriptions.write().await;
+            let mut indexes = self.index.write().await;
+
+            if let Some(subscription) = subscriptions.remove(&id_to_remove) {
+                for single_indexes in subscription.reverse_index.iter() {
+                    for index in single_indexes.iter() {
+                        indexes.remove(&(index.clone(), id_to_remove.clone()));
+                    }
+                }
+            }
+        });
+    }
+
+    /// Get the subscriptions list as a mutable reference
+    pub async fn subbscriptions_mut(&self) -> RwLockWriteGuard<'_, BTreeMap<I, Subscription<T>>> {
+        self.subscriptions.write().await
+    }
+
+    /// Get active listeners for this event
+    pub async fn get_subscribers(self: &Arc<Self>, event: &Event) -> Vec<I> {
+        let indexes = self.index.read().await;
+
+        let event_index = Index::from(event);
+        let mut matched = HashSet::new();
+
+        for idx in event_index {
+            let start_index = (idx.clone(), I::default());
+            let mut start = indexes.range(&start_index..);
+
+            while let Some(((current_idx, subscription_id), filter)) = start.next() {
+                if current_idx != &idx {
+                    break;
+                }
+
+                if !matched.contains(subscription_id) && filter.check_event(&event) {
+                    matched.insert(subscription_id.clone());
+                }
+            }
+        }
+
+        matched.into_iter().collect()
+    }
+
+    /// Returns the total number of subscribers
+    pub fn total_subscribers(&self) -> usize {
+        self.total_subscribers
+            .load(std::sync::atomic::Ordering::Relaxed)
+    }
+
+    /// Creates a subscription and returns an active subscription struct
+    ///
+    /// The return object must be kept alive to keep the subscription alive
+    pub async fn subscribe(
+        self: &Arc<Self>,
+        id: I,
+        mut filters: Vec<Filter>,
+        inner: T,
+    ) -> ActiveSubscription<I, T> {
+        let mut subscriptions = self.subscriptions.write().await;
+        let mut indexes = self.index.write().await;
+
+        let reverse_index: Vec<_> = filters
+            .iter_mut()
+            .map(|f| {
+                let event_index = <&mut Filter as Into<CompoundIndex>>::into(f).split();
+                (f.clone(), event_index)
+            })
+            .collect();
+
+        for (filter, single_indexes) in reverse_index.iter() {
+            for index in single_indexes.iter() {
+                indexes.insert((index.clone(), id.clone()), filter.clone().into());
+            }
+        }
+
+        self.total_subscribers
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        subscriptions.insert(
+            id.clone(),
+            Subscription {
+                reverse_index: reverse_index.into_iter().map(|(_, index)| index).collect(),
+                inner,
+            },
+        );
+
+        ActiveSubscription {
+            id,
+            manager: Some(self.clone()),
+        }
+    }
+}

+ 0 - 22
crates/types/src/types/event/mod.rs → crates/types/src/types/event.rs

@@ -13,11 +13,6 @@ use serde_json::json;
 use sha2::{Digest, Sha256};
 use thiserror::Error;
 
-mod filter;
-mod index;
-
-pub use self::{filter::SortedFilter, index::Index};
-
 /// Errors
 #[derive(Error, Debug)]
 pub enum Error {
@@ -173,23 +168,6 @@ impl Event {
         })
     }
 
-    /// Return indexes from the event
-    pub fn get_indexes(&self, _min_prefix_match_len: usize) -> Vec<Index> {
-        let mut subscriptions = vec![];
-
-        subscriptions.push(Index::Author(self.author().to_owned()));
-        subscriptions.push(Index::Id(self.id.to_owned()));
-
-        for t in self.tags() {
-            t.get_indexable_value()
-                .map(|v| subscriptions.push(Index::Tag(t.get_identifier().to_owned(), v)));
-        }
-
-        subscriptions.push(Index::Kind(self.kind()));
-        subscriptions.push(Index::Anything);
-        subscriptions
-    }
-
     /// Returns the kind of this event
     pub fn kind(&self) -> Kind {
         self.inner.kind

+ 0 - 18
crates/types/src/types/event/index.rs

@@ -1,18 +0,0 @@
-use crate::types::{filter::TagValue, Id, Kind};
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
-/// Events can be indexed by different fields.
-///
-/// This index is a multi-value index, meaning that it can be indexed by multiple fields
-pub enum Index {
-    /// Author ID
-    Author(Id),
-    /// Note ID
-    Id(Id),
-    /// Note Kind
-    Kind(Kind),
-    /// Tag
-    Tag(String, TagValue),
-    /// A catch-all index
-    Anything,
-}

+ 1 - 1
crates/types/src/types/mod.rs

@@ -76,7 +76,7 @@ pub(crate) mod ts_seconds {
 pub use self::{
     addr::Addr,
     content::Content,
-    event::{Event, Index, UnsignedEvent},
+    event::{Event, UnsignedEvent},
     filter::Filter,
     id::Id,
     kind::Kind,