Kaynağa Gözat

Working on a improved pool subscriptions

Improvements:
 * Introduce subscription manager
   * Subscriptions are kept locally and mainly used to fetch stored events
   * Add a single subscription to fetch all updates from the relayer, and server
     local subscriptions from that feed.
   * Added SuscriptionId with prefixes.
     * Added also a random unique per run prefix for special subscriptions
  * Re-use most of the subscription-matching logic from the relayer
    * Added EventIndex in the types crate
    * Moved function to extract Vec<EventIndex> from Event to Event struct
Cesar Rodas 2 ay önce
ebeveyn
işleme
76ab9b9958

+ 2 - 0
Cargo.lock

@@ -982,6 +982,7 @@ dependencies = [
  "log",
  "nostr-rs-memory",
  "nostr-rs-relayer",
+ "nostr-rs-storage-base",
  "nostr-rs-types",
  "serde_json",
  "thiserror",
@@ -1090,6 +1091,7 @@ dependencies = [
  "custom_derive",
  "enum_derive",
  "hex",
+ "once_cell",
  "rand",
  "secp256k1",
  "serde",

+ 1 - 0
crates/client/Cargo.toml

@@ -6,6 +6,7 @@ edition = "2021"
 [dependencies]
 thiserror = "1.0.40"
 nostr-rs-types = { path = "../types" }
+nostr-rs-storage-base = { path = "../storage/base" }
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
 tokio-tungstenite = { version = "0.18.0", features = [
     "rustls",

+ 38 - 6
crates/client/src/client.rs

@@ -1,3 +1,9 @@
+//! Client for the nostr relayer
+//!
+//! This is a simple client with reconnection logic built-in but no load balancing
+//! nor subscription.
+//!
+//! Most likely you want to use the `Pool` client instead of this one.
 use crate::{pool::DEFAULT_CHANNEL_BUFFER_SIZE, Error};
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{
@@ -7,6 +13,7 @@ use nostr_rs_types::{
 };
 use std::{
     collections::HashMap,
+    pin::Pin,
     sync::{
         atomic::{AtomicBool, Ordering::Relaxed},
         Arc,
@@ -23,6 +30,9 @@ use url::Url;
 type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
 
 #[derive(Debug)]
+/// Active subscription
+///
+/// This must be kept in scope to keep the subscription active
 pub struct ActiveSubscription {
     id: SubscriptionId,
     subscriptions: Subscriptions,
@@ -72,18 +82,29 @@ impl Drop for Client {
 
 impl Client {
     /// Creates a new relayer
-    pub fn new(send_message_to_listener: mpsc::Sender<(Response, Url)>, url: Url) -> Self {
+    pub fn new<F>(return_to: mpsc::Sender<(Response, Url)>, url: Url, filter: F) -> Self
+    where
+        F: Fn(
+                Response,
+                Url,
+                mpsc::Sender<(Response, Url)>,
+            ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
+            + Send
+            + Sync
+            + 'static,
+    {
         let (sender_to_socket, send_to_socket) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
         let is_connected = Arc::new(AtomicBool::new(false));
 
         let subscriptions = Arc::new(RwLock::new(HashMap::new()));
 
         let worker = Self::spawn_background_client(
-            send_message_to_listener,
+            return_to,
             send_to_socket,
             url.clone(),
             is_connected.clone(),
             subscriptions.clone(),
+            filter,
         );
 
         Self {
@@ -100,13 +121,24 @@ impl Client {
     ///
     /// This function will return a JoinHandle that can be used to
     /// wait for the background client to finish or to cancel it.
-    fn spawn_background_client(
-        send_message_to_listener: mpsc::Sender<(Response, Url)>,
+    fn spawn_background_client<F>(
+        return_to: mpsc::Sender<(Response, Url)>,
         mut send_to_socket: mpsc::Receiver<Request>,
         url: Url,
         is_connected: Arc<AtomicBool>,
         send_on_connection: Subscriptions,
-    ) -> JoinHandle<()> {
+        filter: F,
+    ) -> JoinHandle<()>
+    where
+        F: Fn(
+                Response,
+                Url,
+                mpsc::Sender<(Response, Url)>,
+            ) -> Pin<Box<dyn futures::Future<Output = Result<(), Error>> + Send>>
+            + Send
+            + Sync
+            + 'static,
+    {
         is_connected.store(false, Relaxed);
 
         tokio::spawn(async move {
@@ -198,7 +230,7 @@ impl Client {
                             let event: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = event {
-                                if let Err(error) = send_message_to_listener.try_send((msg, url.clone())) {
+                                if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }

+ 4 - 0
crates/client/src/error.rs

@@ -22,6 +22,10 @@ pub enum Error {
     #[error("There is no connection")]
     Disconnected,
 
+    /// Error sending message with the internal channel
+    #[error("Error sending message: {0}")]
+    InternalChannel(String),
+
     /// The pool was already splitted
     #[error("The pool was already splitted")]
     AlreadySplitted,

+ 3 - 7
crates/client/src/lib.rs

@@ -8,14 +8,10 @@
 //!
 //! It will also have reconnection logic built-in internally.
 #![deny(missing_docs, warnings)]
-mod client;
+pub mod client;
 mod error;
-mod pool;
+pub mod pool;
 
 pub use url::Url;
 
-pub use self::{
-    client::Client,
-    error::Error,
-    pool::{Pool, PoolSubscription},
-};
+pub use self::{client::Client, error::Error, pool::Pool};

+ 105 - 103
crates/client/src/pool.rs → crates/client/src/pool/mod.rs

@@ -1,30 +1,62 @@
 //! Relayers
 //!
 //! This is the main entry point to the client library.
-use crate::{client::ActiveSubscription, Client, Error};
+use crate::{client::ActiveSubscription as ClientActiveSubscription, Client, Error};
 use futures::future::join_all;
 use nostr_rs_types::{
     client::{self, subscribe},
-    types::SubscriptionId,
     Response,
 };
-use std::{collections::HashMap, sync::Arc};
+use std::{
+    collections::HashMap,
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
 use tokio::sync::{mpsc, RwLock};
 use url::Url;
 
-type Subscriptions =
-    Arc<RwLock<HashMap<SubscriptionId, (subscribe::Subscribe, Vec<ActiveSubscription>)>>>;
+pub mod subscription;
+
+pub(crate) type AllClients =
+    Arc<RwLock<HashMap<Url, (Arc<AtomicUsize>, (ClientActiveSubscription, Client))>>>;
+
 /// Clients
 ///
 /// This is a set of outgoing connections to relayers. This struct can connect
 /// async to N relayers offering a simple API to talk to all of them at the same
 /// time, and to receive messages
-#[derive(Debug)]
 pub struct Pool {
-    clients: RwLock<HashMap<Url, Client>>,
+    clients: AllClients,
     sender: mpsc::Sender<(Response, Url)>,
     receiver: Option<mpsc::Receiver<(Response, Url)>>,
-    subscriptions: Subscriptions,
+    subscription_manager: Arc<subscription::Manager>,
+}
+
+/// Active client
+///
+/// For each connection on the pool this object will be returned. When dropped,
+/// that connection is also dropped from the connection pool.
+pub struct ActiveClient {
+    client_id: Url,
+    counter: Arc<AtomicUsize>,
+    all_clients: AllClients,
+}
+
+impl Drop for ActiveClient {
+    fn drop(&mut self) {
+        let counter = self.counter.fetch_sub(1, Ordering::SeqCst);
+        if counter == 0 {
+            let all_clients = self.all_clients.clone();
+            let client_id = self.client_id.clone();
+            tokio::spawn(async move {
+                // remove the client from the pool, when it goes out of scope
+                // it will be disconnected
+                all_clients.write().await.remove(&client_id);
+            });
+        }
+    }
 }
 
 /// Default channel buffer size for the pool
@@ -36,44 +68,25 @@ impl Default for Pool {
         Self {
             clients: Default::default(),
             receiver: Some(receiver),
-            subscriptions: Default::default(),
+            subscription_manager: Default::default(),
             sender,
         }
     }
 }
 
-/// Return a subscription that will be removed when dropped
-#[derive(Debug)]
-pub struct PoolSubscription {
-    subscription_id: SubscriptionId,
-    subscriptions: Subscriptions,
-}
-
-impl Drop for PoolSubscription {
-    fn drop(&mut self) {
-        let subscriptions = self.subscriptions.clone();
-        let subscription_id = self.subscription_id.clone();
-        tokio::spawn(async move {
-            subscriptions.write().await.remove(&subscription_id);
-        });
-    }
-}
-
 impl Pool {
     /// Creates a new instance with a list of urls
-    pub fn new_with_clients(clients: Vec<Url>) -> Self {
-        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
-        let clients = clients
-            .into_iter()
-            .map(|url| (url.clone(), Client::new(sender.clone(), url)))
-            .collect::<HashMap<_, _>>();
-
-        Self {
-            clients: RwLock::new(clients),
-            subscriptions: Default::default(),
-            receiver: Some(receiver),
-            sender,
-        }
+    pub fn new_with_clients(clients: Vec<Url>) -> Result<(Self, Vec<ActiveClient>), Error> {
+        let pool = Self::default();
+        let connect = clients.into_iter().map(|url| pool.connect_to(url));
+
+        futures::executor::block_on(async {
+            futures::future::join_all(connect)
+                .await
+                .into_iter()
+                .collect::<Result<Vec<_>, _>>()
+        })
+        .map(|clients| (pool, clients))
     }
 
     /// Splits the pool removing the receiver to be used in a different context
@@ -93,38 +106,15 @@ impl Pool {
 
     /// Returns the number of active subscriptions
     pub async fn active_subscriptions(&self) -> usize {
-        self.subscriptions.read().await.keys().len()
+        self.subscription_manager.total_subscribers()
     }
 
     /// Subscribes to all the connected relayers
     pub async fn subscribe(
         &self,
         subscription: subscribe::Subscribe,
-    ) -> Result<PoolSubscription, Error> {
-        let clients = self.clients.read().await;
-
-        let wait_all = clients
-            .values()
-            .map(|sender| sender.subscribe(subscription.clone()))
-            .collect::<Vec<_>>();
-
-        let subscription_id = subscription.subscription_id.clone();
-
-        self.subscriptions.write().await.insert(
-            subscription_id.clone(),
-            (
-                subscription,
-                join_all(wait_all)
-                    .await
-                    .into_iter()
-                    .collect::<Result<Vec<_>, _>>()?,
-            ),
-        );
-
-        Ok(PoolSubscription {
-            subscription_id,
-            subscriptions: self.subscriptions.clone(),
-        })
+    ) -> subscription::ActiveSubscription {
+        self.subscription_manager.subcribe(subscription, None).await
     }
 
     /// Sends a request to all the connected relayers
@@ -133,7 +123,7 @@ impl Pool {
         join_all(
             clients
                 .values()
-                .map(|sender| sender.post(request.clone()))
+                .map(|(_, (_, sender))| sender.post(request.clone()))
                 .collect::<Vec<_>>(),
         )
         .await;
@@ -145,7 +135,7 @@ impl Pool {
             .read()
             .await
             .iter()
-            .filter(|(_, client)| client.is_connected())
+            .filter(|(_, (_, (_, client)))| client.is_connected())
             .collect::<Vec<_>>()
             .len()
     }
@@ -154,22 +144,43 @@ impl Pool {
     ///
     /// This function will open a connection at most once, if a connection
     /// already exists false will be returned
-    pub async fn connect_to(&self, url: Url) {
+    pub async fn connect_to(&self, url: Url) -> Result<ActiveClient, Error> {
         let mut clients = self.clients.write().await;
-        let mut subscriptions = self.subscriptions.write().await;
 
-        if !clients.contains_key(&url) {
+        let ref_id = if let Some((id, _)) = clients.get(&url) {
+            id.fetch_add(1, Ordering::SeqCst);
+            id.clone()
+        } else {
             log::warn!("Connecting to {}", url);
-            let client = Client::new(self.sender.clone(), url.clone());
-
-            for (filter, sub) in subscriptions.values_mut() {
-                let _ = client.subscribe(filter.clone()).await.map(|subscription| {
-                    sub.push(subscription);
-                });
-            }
-
-            clients.insert(url, client);
-        }
+            let subscription_manager = self.subscription_manager.clone();
+            let client = Client::new(
+                self.sender.clone(),
+                url.clone(),
+                move |response, url, return_to| {
+                    let subscription_manager = subscription_manager.clone();
+                    Box::pin(async move {
+                        subscription_manager
+                            .process_message(response, url, return_to)
+                            .await
+                    })
+                },
+            );
+
+            // subscribe to all events
+            let meta_subscription = client
+                .subscribe(subscribe::Subscribe::to_all_events())
+                .await?;
+
+            let ref_id: Arc<AtomicUsize> = Arc::new(1.into());
+            clients.insert(url.clone(), (ref_id.clone(), (meta_subscription, client)));
+            ref_id
+        };
+
+        Ok(ActiveClient {
+            client_id: url,
+            counter: ref_id,
+            all_clients: self.clients.clone(),
+        })
     }
 }
 
@@ -199,10 +210,7 @@ mod test {
     #[tokio::test]
     async fn droppable_subscription() {
         let client_pool = Pool::default();
-        let subscription = client_pool
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let subscription = client_pool.subscribe(Default::default()).await;
 
         assert_eq!(client_pool.active_subscriptions().await, 1);
         drop(subscription);
@@ -213,7 +221,7 @@ mod test {
     #[tokio::test]
     async fn connect_to_dummy_server() {
         let (addr, stopper) = dummy_server(0).await;
-        let client_pool = Pool::new_with_clients(vec![addr]);
+        let (client_pool, _connections) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
         assert_eq!(0, client_pool.check_active_connections().await);
 
@@ -230,13 +238,11 @@ mod test {
     #[tokio::test]
     async fn two_clients_communication() {
         let (addr, _) = dummy_server(0).await;
-        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
-        let client_pool2 = Pool::new_with_clients(vec![addr]);
+        let (mut client_pool1, _c1) =
+            Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
+        let (client_pool2, _c2) = Pool::new_with_clients(vec![addr]).expect("valid pool");
 
-        let _sub1 = client_pool1
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let _sub1 = client_pool1.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
 
@@ -270,13 +276,11 @@ mod test {
     #[tokio::test]
     async fn reconnect_and_resubscribe() {
         let (addr, stopper) = dummy_server(0).await;
-        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
-        let client_pool2 = Pool::new_with_clients(vec![addr.clone()]);
+        let (mut client_pool1, _c1) =
+            Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
+        let (client_pool2, _c2) = Pool::new_with_clients(vec![addr.clone()]).expect("valid pool");
 
-        let _sub1 = client_pool1
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let _sub1 = client_pool1.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
 
@@ -353,13 +357,11 @@ mod test {
     async fn connect_multiple_servers() {
         let (addr1, _) = dummy_server(0).await;
         let (addr2, _) = dummy_server(0).await;
-        let mut client_pool1 = Pool::new_with_clients(vec![addr1.clone(), addr2]);
-        let client_pool2 = Pool::new_with_clients(vec![addr1]);
+        let (mut client_pool1, _c1) =
+            Pool::new_with_clients(vec![addr1.clone(), addr2]).expect("valid pool");
+        let (client_pool2, _c2) = Pool::new_with_clients(vec![addr1]).expect("valid pool");
 
-        let _sub1 = client_pool1
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let _sub1 = client_pool1.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
 

+ 298 - 0
crates/client/src/pool/subscription.rs

@@ -0,0 +1,298 @@
+//! Subscription manager
+use super::AllClients;
+use crate::{client, Error};
+use futures::future::join_all;
+use nostr_rs_storage_base::Index;
+use nostr_rs_types::{
+    client::subscribe::{self, is_all_events},
+    relayer,
+    types::{event::SortedFilter, Filter, Index as EventIndex, SubscriptionId},
+    Response,
+};
+use std::{
+    collections::{BTreeMap, HashSet},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
+use tokio::sync::{mpsc, RwLock};
+use url::Url;
+
+#[derive(Debug, Copy, Eq, PartialEq, Clone)]
+/// Subscription status
+pub enum Status {
+    /// Subscription is awaiting to be subscribed
+    Queued,
+    /// Subscribed is active
+    Subscribed,
+    /// Technically unsubscribed, and fetching future events from the relayer
+    /// from the All-Events meta subscription
+    Stale,
+}
+
+#[allow(dead_code)]
+struct Subscription {
+    /// Request to subscribe
+    subscription_request: subscribe::Subscribe,
+    /// Active subscription (in the client side), when this is Drop all clients unsubscribes
+    active_subscription: Option<Vec<client::ActiveSubscription>>,
+    /// Subscription status
+    status: Status,
+    /// Reverse index
+    ///
+    /// This is a reverse index of the filters, it is only used to update the
+    /// main shared index when this subscription is dropped.
+    reverse_index: Vec<(Filter, Vec<EventIndex>)>,
+}
+
+/// Active subscription
+///
+/// This object is responsible for keeping track of a subscription for a
+/// connection
+///
+/// This must be dropped to unsubscribe from the subscription manager
+pub struct ActiveSubscription {
+    id: (SubscriptionId, Option<Url>),
+    manager: Arc<Manager>,
+    active_subscriptions: Arc<AtomicUsize>,
+    queued_subscriptions: Arc<AtomicUsize>,
+    stale_subscriptions: Arc<AtomicUsize>,
+}
+
+impl Drop for ActiveSubscription {
+    fn drop(&mut self) {
+        let manager = self.manager.clone();
+        let id_to_remove = self.id.clone();
+        let active_subscriptions = self.active_subscriptions.clone();
+        let queued_subscriptions = self.queued_subscriptions.clone();
+        let stale_subscriptions = self.stale_subscriptions.clone();
+
+        tokio::spawn(async move {
+            let mut subscriptions = manager.subscriptions.write().await;
+            let mut indexes = manager.index.write().await;
+
+            if let Some(subscription) = subscriptions.remove(&id_to_remove) {
+                match subscription.status {
+                    Status::Subscribed => active_subscriptions.fetch_sub(1, Ordering::Relaxed),
+                    Status::Queued => queued_subscriptions.fetch_sub(1, Ordering::Relaxed),
+                    Status::Stale => stale_subscriptions.fetch_sub(1, Ordering::Relaxed),
+                };
+
+                for (_, single_indexes) in subscription.reverse_index.iter() {
+                    for index in single_indexes.iter() {
+                        indexes.remove(&(index.clone(), id_to_remove.0.clone()));
+                    }
+                }
+            }
+        });
+    }
+}
+
+/// Subscription manager
+///
+/// Clients who are added to the pool are automatically subscribed to all
+/// events, which is known as the All-Events subscription.
+///
+/// Subscriptions in the client pool are smarter than the raw subscriptions at
+/// the client level. These subscriptions will be active until an "End of
+/// stored" event is received; the client pool will unsubscribe at that point,
+/// and the All-Events subscriptions, filtered internally, will fetch future
+/// events. By doing so, only past events are queried, and there is a single
+/// stream of future events to be a good citizen with other relayers.
+#[derive(Default)]
+pub(crate) struct Manager {
+    subscriptions: RwLock<BTreeMap<(SubscriptionId, Option<Url>), Subscription>>,
+    index: RwLock<BTreeMap<(EventIndex, SubscriptionId), SortedFilter>>,
+    all_clients: AllClients,
+    active_subscriptions: Arc<AtomicUsize>,
+    queued_subscriptions: Arc<AtomicUsize>,
+    stale_subscriptions: Arc<AtomicUsize>,
+}
+
+/// Maximum number of subscriptions
+pub const MAX_SUBSCRIPTIONS: usize = 50;
+
+impl Manager {
+    /// Processes all messages from the client pools
+    ///
+    /// The client pool creates a subscription to the All-Events subscription,
+    /// this callback checks if there are any listener to this event, otherwise
+    /// it will not process the event.
+    pub async fn process_message(
+        self: &Arc<Self>,
+        message: Response,
+        url: Url,
+        return_to: mpsc::Sender<(Response, Url)>,
+    ) -> Result<(), Error> {
+        match message {
+            Response::EndOfStoredEvents(subscription_id) => {
+                let subscription_id = (subscription_id.0, None);
+                let mut subscription = self.subscriptions.write().await;
+                subscription.get_mut(&subscription_id).map(|s| {
+                    s.status = Status::Stale;
+                    let _ = s.active_subscription.take();
+
+                    self.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                    self.stale_subscriptions.fetch_add(1, Ordering::Relaxed);
+                });
+
+                return_to
+                    .try_send((Response::EndOfStoredEvents(subscription_id.0.into()), url))
+                    .map_err(|e| Error::InternalChannel(e.to_string()))?;
+
+                return Ok(());
+            }
+            Response::Event(relayer::Event {
+                subscription_id,
+                event,
+            }) => {
+                if !is_all_events(&subscription_id) {
+                    // This is not an All-Events subscription, it must be passed on as it is an stored event
+                    return_to
+                        .try_send((
+                            Response::Event(relayer::Event {
+                                subscription_id: subscription_id,
+                                event,
+                            }),
+                            url.clone(),
+                        ))
+                        .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                    return Ok(());
+                }
+
+                let index = self.index.read().await;
+                let mut matched = HashSet::new();
+                let event_index = event.get_indexes(4);
+
+                for idx in event_index {
+                    let mut start = index.range(&(idx.clone(), SubscriptionId::empty())..);
+
+                    while let Some(((current_idx, subscription_id), filter)) = start.next() {
+                        if current_idx != &idx {
+                            break;
+                        }
+
+                        if !matched.contains(subscription_id) && filter.check_event(&event) {
+                            return_to
+                                .try_send((
+                                    Response::Event(relayer::Event {
+                                        subscription_id: subscription_id.clone(),
+                                        event: event.clone(),
+                                    }),
+                                    url.clone(),
+                                ))
+                                .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                            matched.insert(subscription_id.clone());
+                        }
+                    }
+                }
+
+                Ok(())
+            }
+            any_message => {
+                return_to
+                    .try_send((any_message, url))
+                    .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                Ok(())
+            }
+        }
+    }
+
+    async fn update_active_subscriptions(self: &Arc<Self>) {
+        if self.active_subscriptions.load(Ordering::Relaxed) >= MAX_SUBSCRIPTIONS
+            || self.queued_subscriptions.load(Ordering::Relaxed) == 0
+        {
+            return;
+        }
+
+        let clients = self.all_clients.read().await;
+        let mut subscriptions = self.subscriptions.write().await;
+
+        for subscription in subscriptions.values_mut() {
+            if subscription.status == Status::Queued {
+                let wait_all = clients
+                    .values()
+                    .map(|(_, (_, sender))| {
+                        sender.subscribe(subscription.subscription_request.clone())
+                    })
+                    .collect::<Vec<_>>();
+
+                if let Ok(active_subscriptions) = join_all(wait_all)
+                    .await
+                    .into_iter()
+                    .collect::<Result<Vec<_>, _>>()
+                {
+                    subscription.active_subscription = Some(active_subscriptions);
+                    subscription.status = Status::Subscribed;
+
+                    let queued_subscribed =
+                        self.queued_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                    let active_subscriptions =
+                        self.active_subscriptions.fetch_add(1, Ordering::Relaxed);
+                    if queued_subscribed == 0 || active_subscriptions >= MAX_SUBSCRIPTIONS {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /// Creates a new subscription with a given filters
+    pub async fn subcribe(
+        self: &Arc<Self>,
+        mut subscription_request: subscribe::Subscribe,
+        specific_url: Option<Url>,
+    ) -> ActiveSubscription {
+        let mut subscriptions = self.subscriptions.write().await;
+        let mut indexes = self.index.write().await;
+
+        let id = (subscription_request.subscription_id.clone(), specific_url);
+        let reverse_index: Vec<_> = subscription_request
+            .filters
+            .iter_mut()
+            .map(|f| {
+                let index = <&mut Filter as Into<Index>>::into(f).split();
+                (f.clone(), index)
+            })
+            .collect();
+
+        for (filter, single_indexes) in reverse_index.iter() {
+            for index in single_indexes.iter() {
+                indexes.insert((index.clone(), id.0.clone()), filter.clone().into());
+            }
+        }
+
+        subscriptions.insert(
+            id.clone(),
+            Subscription {
+                reverse_index,
+                subscription_request,
+                status: Status::Queued,
+                active_subscription: None,
+            },
+        );
+
+        self.queued_subscriptions.fetch_add(1, Ordering::Relaxed);
+
+        let this = self.clone();
+        tokio::spawn(async move {
+            this.update_active_subscriptions().await;
+        });
+
+        ActiveSubscription {
+            id,
+            manager: self.clone(),
+            active_subscriptions: self.active_subscriptions.clone(),
+            queued_subscriptions: self.queued_subscriptions.clone(),
+            stale_subscriptions: self.stale_subscriptions.clone(),
+        }
+    }
+
+    /// Total number of subscribers
+    pub fn total_subscribers(&self) -> usize {
+        self.active_subscriptions.load(Ordering::Relaxed)
+            + self.queued_subscriptions.load(Ordering::Relaxed)
+            + self.stale_subscriptions.load(Ordering::Relaxed)
+    }
+}

+ 6 - 8
crates/relayer/src/connection/mod.rs

@@ -1,6 +1,6 @@
 use crate::{subscription::ActiveSubscription, Error};
 use futures_util::{SinkExt, StreamExt};
-use nostr_rs_client::PoolSubscription;
+use nostr_rs_client::pool;
 use nostr_rs_types::{
     relayer::{ok::ROkStatus, ROk},
     types::{Addr, SubscriptionId},
@@ -51,9 +51,11 @@ impl ConnectionId {
     }
 }
 
-type CompoundSubcription = (Option<PoolSubscription>, Vec<ActiveSubscription>);
+type CompoundSubcription = (
+    Option<pool::subscription::ActiveSubscription>,
+    Vec<ActiveSubscription>,
+);
 
-#[derive(Debug)]
 /// Relayer connection
 ///
 /// The new connection struct. This struct spawn's a new worker that handles
@@ -195,11 +197,7 @@ impl Connection {
     }
 
     /// Create a subscription for this connection
-    pub async fn subscribe(
-        &self,
-        id: SubscriptionId,
-        subscriptions: (Option<PoolSubscription>, Vec<ActiveSubscription>),
-    ) {
+    pub async fn subscribe(&self, id: SubscriptionId, subscriptions: CompoundSubcription) {
         self.subscriptions.write().await.insert(id, subscriptions);
     }
 

+ 26 - 34
crates/relayer/src/relayer.rs

@@ -34,7 +34,7 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// be able to perform any optimization like prefetching content while offline
     storage: Option<T>,
     /// Subscription manager
-    subscriptions: Arc<SubscriptionManager>,
+    subscription_manager: Arc<SubscriptionManager>,
     /// List of all active connections
     connections: RwLock<HashMap<ConnectionId, Connection>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
@@ -68,7 +68,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         let (sender, receiver) = channel(100_000);
         Ok(Self {
             storage,
-            subscriptions: Default::default(),
+            subscription_manager: Default::default(),
             send_to_relayer: sender.clone(),
             relayer_receiver: Some(receiver),
             connections: Default::default(),
@@ -83,13 +83,13 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     /// Connects to the relayer pool
     pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
         let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
-        client_pool.connect_to(url).await;
+        let _ = client_pool.connect_to(url).await;
         Ok(())
     }
 
     /// Total number of subscribers requests that actively listening for new events
     pub fn total_subscribers(&self) -> usize {
-        self.subscriptions.total_subscribers()
+        self.subscription_manager.total_subscribers()
     }
 
     /// Splits the relayer object and extract their receiver.
@@ -255,7 +255,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 {
                     // pass the subscription request to the pool of clients, so this relayer
                     // can relay any unknown event to the clients through their subscriptions
-                    Some(client_pool.subscribe(request.clone()).await?)
+                    Some(client_pool.subscribe(request.clone()).await)
                 } else {
                     None
                 };
@@ -290,7 +290,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                         request.subscription_id.clone(),
                         (
                             foreign_subscription,
-                            self.subscriptions
+                            self.subscription_manager
                                 .subscribe(
                                     connection.get_conn_id(),
                                     connection.get_sender(),
@@ -318,7 +318,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             }
         }
 
-        self.subscriptions.broadcast(event.clone());
+        self.subscription_manager.broadcast(event.clone());
         Ok(true)
     }
 }
@@ -937,24 +937,20 @@ mod test {
         let (relayer1, _) = dummy_server(0, None).await;
         let (relayer2, _) = dummy_server(0, None).await;
         let (relayer3, _) = dummy_server(0, None).await;
-        let (main_relayer, _) = dummy_server(
-            0,
-            Some(Pool::new_with_clients(vec![
-                relayer1.clone(),
-                relayer2.clone(),
-                relayer3.clone(),
-            ])),
-        )
-        .await;
 
-        let mut reader_client =
-            Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()]);
-        let main_client = Pool::new_with_clients(vec![main_relayer]);
+        let (pool, _in_scope) =
+            Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
+                .expect("valid pool");
 
-        let _sub = reader_client
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let (main_relayer, _) = dummy_server(0, Some(pool)).await;
+
+        let (mut reader_client, _reader_client_inscope) =
+            Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()])
+                .expect("valid pool");
+        let (main_client, _main_client_inscope) =
+            Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
+
+        let _sub = reader_client.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(20)).await;
 
@@ -1015,21 +1011,17 @@ mod test {
     async fn relayer_with_client_pool() {
         let (relayer1, _) = dummy_server(0, None).await;
         let (relayer2, _) = dummy_server(0, None).await;
-        let (main_relayer, _) = dummy_server(
-            0,
-            Some(Pool::new_with_clients(vec![relayer1.clone(), relayer2])),
-        )
-        .await;
+        let (pool, _in_scope) =
+            Pool::new_with_clients(vec![relayer1.clone(), relayer2]).expect("valid pool");
+        let (main_relayer, _) = dummy_server(0, Some(pool)).await;
 
-        let secondary_client = Pool::new_with_clients(vec![relayer1]);
+        let (secondary_client, _sc) = Pool::new_with_clients(vec![relayer1]).expect("valid client");
 
         // Create a subscription in the main relayer, main_client is only
         // connected to the main relayer
-        let mut main_client = Pool::new_with_clients(vec![main_relayer]);
-        let _sub = main_client
-            .subscribe(Default::default())
-            .await
-            .expect("valid subscription");
+        let (mut main_client, _in_scope) =
+            Pool::new_with_clients(vec![main_relayer]).expect("valid client");
+        let _sub = main_client.subscribe(Default::default()).await;
 
         sleep(Duration::from_millis(10)).await;
         assert!(main_client

+ 13 - 26
crates/relayer/src/subscription/manager.rs

@@ -1,8 +1,11 @@
 use crate::connection::ConnectionId;
-use nostr_rs_storage_base::{EventFilter, Index, SingleIndex};
+use nostr_rs_storage_base::Index;
 use nostr_rs_types::{
     client::Subscribe,
-    types::{Event, SubscriptionId},
+    types::{
+        event::{Event, Index as EventIndex, SortedFilter},
+        SubscriptionId,
+    },
     Response,
 };
 use std::{
@@ -14,7 +17,7 @@ use std::{
 };
 use tokio::sync::{mpsc::Sender, RwLock};
 
-type SubIdx = (SingleIndex, ConnectionId, SubscriptionId);
+type SubIdx = (EventIndex, ConnectionId, SubscriptionId);
 
 pub const MIN_PREFIX_MATCH_LEN: usize = 2;
 
@@ -27,7 +30,7 @@ pub const MIN_PREFIX_MATCH_LEN: usize = 2;
 pub struct ActiveSubscription {
     conn_id: ConnectionId,
     name: SubscriptionId,
-    indexes: Vec<SingleIndex>,
+    indexes: Vec<EventIndex>,
     manager: Arc<SubscriptionManager>,
 }
 
@@ -65,7 +68,7 @@ impl Drop for ActiveSubscription {
     }
 }
 
-type SubscriptionValue = (Arc<EventFilter>, Sender<Response>);
+type SubscriptionValue = (Arc<SortedFilter>, Sender<Response>);
 
 /// Subscription manager
 ///
@@ -104,22 +107,6 @@ impl SubscriptionManager {
         self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
     }
 
-    fn get_keys_from_event(event: &Event, _min_prefix_match_len: usize) -> Vec<SingleIndex> {
-        let mut subscriptions = vec![];
-
-        subscriptions.push(SingleIndex::Author(event.author().to_owned()));
-        subscriptions.push(SingleIndex::Id(event.id.to_owned()));
-
-        for t in event.tags() {
-            t.get_indexable_value()
-                .map(|v| subscriptions.push(SingleIndex::Tag(t.get_identifier().to_owned(), v)));
-        }
-
-        subscriptions.push(SingleIndex::Kind(event.kind()));
-        subscriptions.push(SingleIndex::AllUpdates);
-        subscriptions
-    }
-
     /// Get the number of subscribers
     pub fn total_subscribers(self: &Arc<Self>) -> usize {
         self.total_subscribers.load(Ordering::Relaxed)
@@ -142,7 +129,7 @@ impl SubscriptionManager {
             .into_iter()
             .map(|mut filter| {
                 let index: Index = (&mut filter).into();
-                let filter = Arc::new(EventFilter::from(filter));
+                let filter = Arc::new(SortedFilter::from(filter));
                 let subscription =
                     ActiveSubscription::new(conn_id, name.clone(), index, self.clone());
 
@@ -165,18 +152,18 @@ impl SubscriptionManager {
         let this = self.clone();
         tokio::spawn(async move {
             let subscriptions = this.subscriptions.read().await;
-            let subs = Self::get_keys_from_event(&event, this.min_prefix_match_len);
+            let indexes = event.get_indexes(this.min_prefix_match_len);
             let mut deliverded = HashSet::new();
 
-            for sub in subs {
+            for index in indexes {
                 for ((sub_type, client, name), (filter, sender)) in subscriptions.range(
                     &(
-                        sub.clone(),
+                        index.clone(),
                         ConnectionId::new_empty(),
                         SubscriptionId::empty(),
                     )..,
                 ) {
-                    if sub_type != &sub {
+                    if sub_type != &index {
                         break;
                     }
 

+ 3 - 3
crates/storage/base/src/cursor.rs

@@ -1,6 +1,6 @@
-use crate::{event_filter::EventFilter, Error};
+use crate::Error;
 use futures::FutureExt;
-use nostr_rs_types::types::{Event, Filter};
+use nostr_rs_types::types::{event::SortedFilter, Event, Filter};
 use std::{
     future::Future,
     pin::Pin,
@@ -20,7 +20,7 @@ pub type FutureResult<'a> = Pin<Box<dyn Future<Output = Result<Option<Event>, Er
 
 pub fn check_future_call(
     future_event: &mut Option<FutureResult<'_>>,
-    filter: &Option<EventFilter>,
+    filter: &Option<SortedFilter>,
     cx: &mut Context<'_>,
 ) -> FutureValue {
     if let Some(mut inner_future) = future_event.take() {

+ 7 - 16
crates/storage/base/src/index.rs

@@ -5,7 +5,7 @@
 //! indexes by default.
 //!
 //! Each storage engine can use this implementation or have their own
-use nostr_rs_types::types::{filter::TagValue, Addr, Filter, Id, Kind};
+use nostr_rs_types::types::{filter::TagValue, Addr, Filter, Id, Index as EventIndex, Kind};
 use std::collections::HashSet;
 
 /// Indexes for the storage engine.
@@ -25,29 +25,20 @@ pub enum Index {
 
 impl Index {
     /// Splits the index into a list of single indexes.
-    pub fn split(self) -> Vec<SingleIndex> {
+    pub fn split(self) -> Vec<EventIndex> {
         match self {
             Index::Tag(tag, tags) => tags
                 .into_iter()
-                .map(|tag_value| SingleIndex::Tag(tag.clone(), tag_value))
+                .map(|tag_value| EventIndex::Tag(tag.clone(), tag_value))
                 .collect(),
-            Index::Id(ids) => ids.into_iter().map(SingleIndex::Id).collect(),
-            Index::Author(authors) => authors.into_iter().map(SingleIndex::Author).collect(),
-            Index::Kind(kinds) => kinds.into_iter().map(SingleIndex::Kind).collect(),
-            Index::TableScan => vec![SingleIndex::AllUpdates],
+            Index::Id(ids) => ids.into_iter().map(EventIndex::Id).collect(),
+            Index::Author(authors) => authors.into_iter().map(EventIndex::Author).collect(),
+            Index::Kind(kinds) => kinds.into_iter().map(EventIndex::Kind).collect(),
+            Index::TableScan => vec![EventIndex::Anything],
         }
     }
 }
 
-#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub enum SingleIndex {
-    Author(Id),
-    Id(Id),
-    Kind(Kind),
-    Tag(String, TagValue),
-    AllUpdates,
-}
-
 impl From<&mut Filter> for Index {
     fn from(query: &mut Filter) -> Self {
         if !query.ids.is_empty() {

+ 1 - 8
crates/storage/base/src/lib.rs

@@ -6,7 +6,6 @@
 #![allow(dead_code)]
 pub mod cursor;
 mod error;
-mod event_filter;
 mod index;
 mod secondary_index;
 mod storage;
@@ -17,13 +16,7 @@ pub mod test;
 #[cfg(feature = "test")]
 pub use tokio;
 
-pub use crate::{
-    error::Error,
-    event_filter::*,
-    index::{Index, SingleIndex},
-    secondary_index::SecondaryIndex,
-    storage::Storage,
-};
+pub use crate::{error::Error, index::Index, secondary_index::SecondaryIndex, storage::Storage};
 
 #[macro_export]
 /// This macro creates the

+ 3 - 3
crates/storage/memory/src/cursor.rs

@@ -2,9 +2,9 @@ use crate::Memory;
 use futures::Stream;
 use nostr_rs_storage_base::{
     cursor::{check_future_call, FutureResult, FutureValue},
-    Error, EventFilter, Storage,
+    Error, Storage,
 };
-use nostr_rs_types::types::Event;
+use nostr_rs_types::types::{event::SortedFilter, Event};
 use std::{
     collections::{BTreeMap, VecDeque},
     pin::Pin,
@@ -14,7 +14,7 @@ use tokio::sync::RwLockReadGuard;
 
 pub struct Cursor<'a> {
     pub db: &'a Memory,
-    pub filter: Option<EventFilter>,
+    pub filter: Option<SortedFilter>,
     pub limit: Option<usize>,
     pub returned: usize,
     pub index: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,

+ 3 - 3
crates/storage/memory/src/lib.rs

@@ -1,5 +1,5 @@
-use nostr_rs_storage_base::{Error, EventFilter, Index, SecondaryIndex, Storage};
-use nostr_rs_types::types::{Event, Filter};
+use nostr_rs_storage_base::{Error, Index, SecondaryIndex, Storage};
+use nostr_rs_types::types::{event::SortedFilter, Event, Filter};
 use std::{
     cmp::min,
     collections::{BTreeMap, VecDeque},
@@ -176,7 +176,7 @@ impl Storage for Memory {
             ),
         };
 
-        let filter: EventFilter = query.into();
+        let filter: SortedFilter = query.into();
 
         Ok(Self::Cursor {
             db: self,

+ 4 - 4
crates/storage/rocksdb/src/cursor.rs

@@ -3,9 +3,9 @@ use crate::{ReferenceType, RocksDb};
 use futures::Stream;
 use nostr_rs_storage_base::{
     cursor::{check_future_call, FutureResult, FutureValue},
-    Error, EventFilter, Storage,
+    Error, Storage,
 };
-use nostr_rs_types::types::Event;
+use nostr_rs_types::types::{event::SortedFilter, Event};
 use rocksdb::{DBIteratorWithThreadMode, DB};
 use std::{
     collections::VecDeque,
@@ -24,7 +24,7 @@ pub struct Cursor<'a> {
     /// is given each events from the secondary index will be returned,
     /// otherwise the events will be filtered by the given filter, and only
     /// those events that comply will be returned
-    filter: Option<EventFilter>,
+    filter: Option<SortedFilter>,
     /// Reference to the namespace to use to query the secondary index. If none
     /// is given the secondary_index_iterator must be constructed outside this
     /// wrapper.
@@ -50,7 +50,7 @@ impl<'a> Cursor<'a> {
         db: &'a RocksDb,
         index: Option<ReferenceType>,
         prefixes: Vec<Vec<u8>>,
-        filter: Option<EventFilter>,
+        filter: Option<SortedFilter>,
         secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
         limit: Option<usize>,
     ) -> Self {

+ 1 - 0
crates/types/Cargo.toml

@@ -12,6 +12,7 @@ chrono = "0.4.23"
 custom_derive = "0.1.7"
 enum_derive = "0.1.7"
 hex = "0.4"
+once_cell = "1.19.0"
 rand = "0.8.5"
 secp256k1 = { version = "0.26.0", features = [
     "global-context",

+ 32 - 0
crates/types/src/client/subscribe.rs

@@ -2,9 +2,18 @@
 //!
 //! Used to request events and subscribe to new updates.
 use crate::{common::SerializeDeserialize, types};
+use hex::ToHex;
+use once_cell::sync::Lazy;
+use rand::RngCore;
 use serde_json::Value;
 use std::collections::VecDeque;
 
+static ALL_EVENTS_PREFIX: Lazy<String> = Lazy::new(|| {
+    let mut prefix = [0u8; 4];
+    rand::thread_rng().fill_bytes(&mut prefix);
+    prefix.encode_hex::<String>()
+});
+
 /// Request: used to request events and subscribe to new updates.
 ///
 /// More details at https://github.com/nostr-protocol/nips/blob/master/01.md#communication-between-clients-and-relays
@@ -18,6 +27,22 @@ pub struct Subscribe {
     pub filters: Vec<types::Filter>,
 }
 
+/// Checks if the subscription ID is for all events
+pub fn is_all_events(subscription_id: &types::SubscriptionId) -> bool {
+    subscription_id.starts_with(&*ALL_EVENTS_PREFIX)
+}
+
+impl Subscribe {
+    /// Creates a new subscription with a random ID to subscribe to all events
+    pub fn to_all_events() -> Self {
+        Self {
+            subscription_id: types::SubscriptionId::with_prefix(&ALL_EVENTS_PREFIX)
+                .expect("valid subscription id"),
+            filters: vec![Default::default()],
+        }
+    }
+}
+
 impl From<types::Filter> for Subscribe {
     fn from(filter: types::Filter) -> Self {
         Self {
@@ -127,4 +152,11 @@ mod test {
         assert_eq!(r.subscription_id, obj.subscription_id);
         assert_eq!(r.filters.len(), obj.filters.len());
     }
+
+    #[test]
+    fn test_subscribe_all() {
+        let x = Subscribe::to_all_events();
+        assert!(is_all_events(&x.subscription_id));
+        assert!(!is_all_events(&Default::default()));
+    }
 }

+ 6 - 0
crates/types/src/relayer/eose.rs

@@ -16,6 +16,12 @@ use std::{collections::VecDeque, ops::Deref};
 #[derive(Clone, PartialEq, Eq, Debug)]
 pub struct EndOfStoredEvents(pub SubscriptionId);
 
+impl From<SubscriptionId> for EndOfStoredEvents {
+    fn from(value: SubscriptionId) -> Self {
+        Self(value)
+    }
+}
+
 impl Deref for EndOfStoredEvents {
     type Target = SubscriptionId;
 

+ 9 - 6
crates/storage/base/src/event_filter.rs → crates/types/src/types/event/filter.rs

@@ -1,12 +1,15 @@
+use crate::types::{filter::TagValue, Event, Filter as FilterT, Kind};
 use chrono::{DateTime, Utc};
-use nostr_rs_types::types::{filter::TagValue, Addr, Event, Filter, Kind, Tag};
 use std::{
     collections::{HashMap, HashSet},
     ops::Deref,
 };
 
 #[derive(Debug)]
-pub struct EventFilter {
+/// Event Sorted Filter
+///
+/// It is an internal representation of the filter, used for quick memory matching
+pub struct SortedFilter {
     ids: HashSet<[u8; 32]>,
     authors: HashSet<[u8; 32]>,
     tags: HashMap<String, HashSet<TagValue>>,
@@ -15,8 +18,8 @@ pub struct EventFilter {
     until: Option<DateTime<Utc>>,
 }
 
-impl From<Filter> for EventFilter {
-    fn from(query: Filter) -> Self {
+impl From<FilterT> for SortedFilter {
+    fn from(query: FilterT) -> Self {
         let authors = query
             .authors
             .into_iter()
@@ -37,7 +40,7 @@ impl From<Filter> for EventFilter {
             .map(|id| (*id))
             .collect::<HashSet<_>>();
 
-        EventFilter {
+        SortedFilter {
             ids,
             authors,
             kinds,
@@ -48,7 +51,7 @@ impl From<Filter> for EventFilter {
     }
 }
 
-impl EventFilter {
+impl SortedFilter {
     /// Returns true if the filter is empty, meaning that it will match any event.
     pub fn is_empty(&self) -> bool {
         self.ids.is_empty()

+ 18 - 0
crates/types/src/types/event/index.rs

@@ -0,0 +1,18 @@
+use crate::types::{filter::TagValue, Id, Kind};
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+/// Events can be indexed by different fields.
+///
+/// This index is a multi-value index, meaning that it can be indexed by multiple fields
+pub enum Index {
+    /// Author ID
+    Author(Id),
+    /// Note ID
+    Id(Id),
+    /// Note Kind
+    Kind(Kind),
+    /// Tag
+    Tag(String, TagValue),
+    /// A catch-all index
+    Anything,
+}

+ 22 - 0
crates/types/src/types/event.rs → crates/types/src/types/event/mod.rs

@@ -13,6 +13,11 @@ use serde_json::json;
 use sha2::{Digest, Sha256};
 use thiserror::Error;
 
+mod filter;
+mod index;
+
+pub use self::{filter::SortedFilter, index::Index};
+
 /// Errors
 #[derive(Error, Debug)]
 pub enum Error {
@@ -168,6 +173,23 @@ impl Event {
         })
     }
 
+    /// Return indexes from the event
+    pub fn get_indexes(&self, _min_prefix_match_len: usize) -> Vec<Index> {
+        let mut subscriptions = vec![];
+
+        subscriptions.push(Index::Author(self.author().to_owned()));
+        subscriptions.push(Index::Id(self.id.to_owned()));
+
+        for t in self.tags() {
+            t.get_indexable_value()
+                .map(|v| subscriptions.push(Index::Tag(t.get_identifier().to_owned(), v)));
+        }
+
+        subscriptions.push(Index::Kind(self.kind()));
+        subscriptions.push(Index::Anything);
+        subscriptions
+    }
+
     /// Returns the kind of this event
     pub fn kind(&self) -> Kind {
         self.inner.kind

+ 1 - 1
crates/types/src/types/mod.rs

@@ -76,7 +76,7 @@ pub(crate) mod ts_seconds {
 pub use self::{
     addr::Addr,
     content::Content,
-    event::{Event, UnsignedEvent},
+    event::{Event, Index, UnsignedEvent},
     filter::Filter,
     id::Id,
     kind::Kind,

+ 15 - 0
crates/types/src/types/subscription_id.rs

@@ -31,6 +31,21 @@ impl SubscriptionId {
     pub fn empty() -> Self {
         Self("".to_owned())
     }
+
+    /// Creates a new subscription ID with a prefix
+    ///
+    /// Prefixes are meaningless in this context but they may be meaningful in
+    /// other contexts, such as the clients or client pools.
+    pub fn with_prefix(prefix: &str) -> Result<Self, Error> {
+        if prefix.as_bytes().len() > 30 {
+            return Err(Error::TooLong);
+        }
+        let mut data = [0u8; 32];
+        rand::thread_rng().fill_bytes(&mut data);
+        let suffix = data.encode_hex::<String>();
+
+        Ok(Self(format!("{}{}", prefix, &suffix[prefix.len()..])))
+    }
 }
 
 impl Deref for SubscriptionId {