#9 Reimplement filter by index

Merged
cesar merged 1 commits from cesar/tag-indexing into cesar/main 2 months ago

+ 13 - 90
crates/relayer/src/relayer.rs

@@ -7,7 +7,11 @@ use futures_util::StreamExt;
 use nostr_rs_client::{Error as ClientError, Pool};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{relayer, types::Event, Request, Response};
-use std::{collections::HashMap, ops::Deref, sync::Arc};
+use std::{
+    collections::{HashMap, HashSet},
+    ops::Deref,
+    sync::Arc,
+};
 use tokio::{
     net::{TcpListener, TcpStream},
     sync::mpsc::{channel, Receiver, Sender},
@@ -227,11 +231,16 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 };
 
                 if let Some(storage) = self.storage.as_ref() {
+                    let mut sent = HashSet::new();
                     // Sent all events that match the filter that are stored in our database
                     for filter in request.filters.clone().into_iter() {
                         let mut result = storage.get_by_filter(filter).await?;
 
                         while let Some(Ok(event)) = result.next().await {
+                            if sent.contains(&event.id) {
+                                continue;
+                            }
+                            sent.insert(event.id.clone());
                             let _ = connection.send(
                                 relayer::Event {
                                     subscription_id: request.subscription_id.clone(),
@@ -441,29 +450,6 @@ mod test {
                 .to_string()
         );
 
-        // ev3 (again)
-        assert_eq!(
-            "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
-            recv.try_recv()
-                .expect("valid")
-                .as_event()
-                .expect("event")
-                .event
-                .id
-                .to_string()
-        );
-        // ev2 (again)
-        assert_eq!(
-            "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
-            recv.try_recv()
-                .expect("valid")
-                .as_event()
-                .expect("event")
-                .event
-                .id
-                .to_string()
-        );
-
         // eod
         assert!(recv
             .try_recv()
@@ -639,77 +625,13 @@ mod test {
     }
 
     #[tokio::test]
-    async fn subscribe_partial_key() {
-        let request: Request = serde_json::from_value(json!([
-            "REQ",
-            "1298169700973717",
-            {
-                "authors":["a42007e33c"],
-                "since":1681939304
-            }
-        ]))
-        .expect("valid object");
-
-        let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_local_connection();
-
-        assert_eq!(relayer.total_subscribers(), 0);
-        let _ = relayer
-            .process_request_from_client(&connection, request)
-            .await;
-
-        assert_eq!(relayer.total_subscribers(), 1);
-
-        // eod
-        assert!(recv
-            .try_recv()
-            .expect("valid")
-            .as_end_of_stored_events()
-            .is_some());
-
-        // It is empty
-        assert!(recv.try_recv().is_err());
-
-        relayer
-            .process_request_from_client(&connection, get_note())
-            .await
-            .expect("process event");
-
-        sleep(Duration::from_millis(100)).await;
-
-        // It is not empty
-        let msg = recv.try_recv();
-        assert!(msg.is_ok());
-        assert_eq!(
-            msg.expect("is ok")
-                .as_event()
-                .expect("valid")
-                .subscription_id
-                .to_string(),
-            "1298169700973717".to_owned()
-        );
-
-        // it must be deliverd at most once
-        assert!(recv.try_recv().is_err());
-        assert_eq!(relayer.total_subscribers(), 1);
-
-        // when client is dropped, the subscription is removed
-        // automatically
-        drop(connection);
-
-        sleep(Duration::from_millis(10)).await;
-
-        assert_eq!(relayer.total_subscribers(), 0);
-    }
-
-    #[tokio::test]
     async fn multiple_subcribers() {
         let req1: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
-            "authors":["c42007e33c"],
+            "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb4"],
         }]))
         .expect("valid object");
         let req2: Request = serde_json::from_value(json!(["REQ", "1298169700973717", {
-           "authors":["a42007e33c"]
+           "authors":["a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3"]
         }]))
         .expect("valid object");
 
@@ -778,6 +700,7 @@ mod test {
 
         for (_, recv) in set2.iter_mut() {
             let msg = recv.try_recv();
+            println!("{:?}", msg);
             assert!(msg.is_ok());
             let msg = msg.expect("msg");
 

+ 0 - 215
crates/relayer/src/subscription/filter.rs

@@ -1,215 +0,0 @@
-use nostr_rs_types::types::{Event, Filter, Kind, Tag};
-use std::collections::BTreeSet;
-
-/// The subscription keys are used to quickly identify the subscriptions
-/// by one or more fields. Think of it like a database index
-#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
-#[allow(clippy::enum_variant_names)]
-pub(crate) enum Key {
-    /// Key for the author field
-    Author(Vec<u8>, Option<Kind>),
-    /// Key for the reference to an event
-    RefId(Vec<u8>, Option<Kind>),
-    /// Key for the reference to a public key
-    RefPublicKey(Vec<u8>, Option<Kind>),
-    /// Key for the kind field
-    Id(Vec<u8>),
-    /// Key for the kind field
-    Kind(Kind),
-    /// Any value, for a catch all
-    AllUpdates,
-}
-
-type SortedSet<T> = Option<BTreeSet<T>>;
-
-/// Sorted filter
-///
-/// This is a pre-processed filter that is optimized for fast lookups.
-#[derive(Debug, Clone)]
-pub struct SortedFilter {
-    ids: SortedSet<Vec<u8>>,
-    authors: SortedSet<Vec<u8>>,
-    kinds: SortedSet<Kind>,
-    references_to_event: SortedSet<Vec<u8>>,
-    references_to_public_key: SortedSet<Vec<u8>>,
-}
-
-impl From<Filter> for SortedFilter {
-    /// Converts a Filter into a SortedFilter.
-    ///
-    /// SortedFilters have SortedSets that are optimized for fast lookups.
-    fn from(filter: Filter) -> Self {
-        Self {
-            ids: (!filter.ids.is_empty()).then(|| {
-                filter
-                    .ids
-                    .into_iter()
-                    .map(|x| x.as_ref().to_vec())
-                    .collect()
-            }),
-            authors: (!filter.authors.is_empty()).then(|| {
-                filter
-                    .authors
-                    .into_iter()
-                    .map(|x| x.as_ref().to_vec())
-                    .collect()
-            }),
-            kinds: (!filter.kinds.is_empty()).then(|| filter.kinds.into_iter().collect()),
-            references_to_event: (!filter.references_to_event.is_empty()).then(|| {
-                filter
-                    .references_to_event
-                    .into_iter()
-                    .map(|x| x.as_ref().to_vec())
-                    .collect()
-            }),
-            references_to_public_key: (!filter.references_to_public_key.is_empty()).then(|| {
-                filter
-                    .references_to_public_key
-                    .into_iter()
-                    .map(|x| x.as_ref().to_vec())
-                    .collect()
-            }),
-        }
-    }
-}
-
-impl SortedFilter {
-    /// Get the keys for the filter
-    ///
-    /// Get the keys or indexes for the filter. This is used to quickly identify potential matches
-    /// from a set of event listeners.
-    pub fn keys(&self) -> Vec<Key> {
-        let authors = self
-            .authors
-            .as_ref()
-            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
-
-        let ids = self
-            .ids
-            .as_ref()
-            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
-
-        let references_to_event = self
-            .references_to_event
-            .as_ref()
-            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
-
-        let references_to_public_key = self
-            .references_to_public_key
-            .as_ref()
-            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
-
-        let kinds = self
-            .kinds
-            .as_ref()
-            .map_or_else(std::vec::Vec::new, |x| x.iter().copied().collect());
-
-        let kind_option = if kinds.is_empty() {
-            vec![None]
-        } else {
-            kinds.clone().into_iter().map(Some).collect()
-        };
-
-        let keys = [
-            authors
-                .into_iter()
-                .map(|author| {
-                    kind_option
-                        .iter()
-                        .map(|kind| Key::Author(author.clone(), *kind))
-                        .collect::<Vec<_>>()
-                })
-                .collect::<Vec<_>>(),
-            references_to_event
-                .into_iter()
-                .map(|event| {
-                    kind_option
-                        .iter()
-                        .map(|kind| Key::RefId(event.clone(), *kind))
-                        .collect::<Vec<_>>()
-                })
-                .collect::<Vec<_>>(),
-            references_to_public_key
-                .into_iter()
-                .map(|pub_key| {
-                    kind_option
-                        .iter()
-                        .map(|kind| Key::RefPublicKey(pub_key.clone(), *kind))
-                        .collect::<Vec<_>>()
-                })
-                .collect::<Vec<_>>(),
-            vec![kinds.into_iter().map(Key::Kind).collect::<Vec<_>>()],
-            vec![ids.into_iter().map(Key::Id).collect::<Vec<_>>()],
-        ]
-        .concat()
-        .concat();
-
-        if keys.is_empty() {
-            vec![Key::AllUpdates]
-        } else {
-            keys
-        }
-    }
-
-    /// Checks if a given key exists in the sorted set, either as a whole or as a partial match
-    #[inline]
-    fn has_key_or_partial_match<T: AsRef<[u8]>>(id: T, ids: &SortedSet<Vec<u8>>) -> bool {
-        if let Some(ids) = ids.as_ref() {
-            let id = id.as_ref().to_vec();
-            if ids.contains(&id) {
-                return true;
-            }
-
-            for len in 4..=id.len() {
-                let prev_id = &id[..len].to_vec();
-                if ids.contains(prev_id) {
-                    return true;
-                }
-            }
-            false
-        } else {
-            true
-        }
-    }
-
-    /// Checks if any of key given set exists in the sorted set, either as a whole or as a partial match
-    #[inline]
-    fn has_any_key_or_partial_match<T: Iterator<Item = Vec<u8>>>(
-        id_set: T,
-        ids: &SortedSet<Vec<u8>>,
-    ) -> bool {
-        if ids.is_some() {
-            for id in id_set {
-                if Self::has_key_or_partial_match(id, ids) {
-                    return true;
-                }
-            }
-            false
-        } else {
-            true
-        }
-    }
-
-    /// Checks if the event matches the filter
-    pub fn check(&self, event: &Event) -> bool {
-        self.kinds
-            .as_ref()
-            .map_or(true, |kinds| kinds.contains(&event.kind()))
-            && Self::has_key_or_partial_match(&event.id, &self.ids)
-            && Self::has_key_or_partial_match(event.author(), &self.authors)
-            && Self::has_any_key_or_partial_match(
-                event.tags().iter().filter_map(|f| match f {
-                    Tag::Event(x) => Some(x.id.as_ref().to_vec()),
-                    _ => None,
-                }),
-                &self.references_to_event,
-            )
-            && Self::has_any_key_or_partial_match(
-                event.tags().iter().filter_map(|f| match f {
-                    Tag::PubKey(x) => Some(x.id.as_ref().to_vec()),
-                    _ => None,
-                }),
-                &self.references_to_public_key,
-            )
-    }
-}

+ 25 - 48
crates/relayer/src/subscription/manager.rs

@@ -1,12 +1,12 @@
-use super::filter::{Key, SortedFilter};
 use crate::connection::ConnectionId;
+use nostr_rs_storage_base::{EventFilter, Index, SingleIndex};
 use nostr_rs_types::{
     client::Subscribe,
     types::{Event, SubscriptionId},
     Response,
 };
 use std::{
-    collections::{BTreeMap, BTreeSet},
+    collections::{BTreeMap, HashSet},
     sync::{
         atomic::{AtomicUsize, Ordering},
         Arc,
@@ -14,7 +14,7 @@ use std::{
 };
 use tokio::sync::{mpsc::Sender, RwLock};
 
-type SubIdx = (Key, ConnectionId, SubscriptionId);
+type SubIdx = (SingleIndex, ConnectionId, SubscriptionId);
 
 pub const MIN_PREFIX_MATCH_LEN: usize = 2;
 
@@ -27,7 +27,7 @@ pub const MIN_PREFIX_MATCH_LEN: usize = 2;
 pub struct ActiveSubscription {
     conn_id: ConnectionId,
     name: SubscriptionId,
-    keys: Vec<Key>,
+    indexes: Vec<SingleIndex>,
     manager: Arc<SubscriptionManager>,
 }
 
@@ -35,13 +35,13 @@ impl ActiveSubscription {
     fn new(
         conn_id: ConnectionId,
         name: SubscriptionId,
-        keys: Vec<Key>,
+        index: Index,
         manager: Arc<SubscriptionManager>,
     ) -> Self {
         Self {
             conn_id,
             name,
-            keys,
+            indexes: index.split(),
             manager,
         }
     }
@@ -51,8 +51,8 @@ impl Drop for ActiveSubscription {
     /// When the subscription is dropped, it will remove the listener from the
     /// subscription manager
     fn drop(&mut self) {
-        let keys = self
-            .keys
+        let indexes = self
+            .indexes
             .drain(..)
             .map(|key| (key, self.conn_id, self.name.clone()))
             .collect::<Vec<_>>();
@@ -60,12 +60,12 @@ impl Drop for ActiveSubscription {
         let manager = self.manager.clone();
 
         tokio::spawn(async move {
-            manager.unsubscribe(keys).await;
+            manager.unsubscribe(indexes).await;
         });
     }
 }
 
-type SubscriptionValue = (Arc<SortedFilter>, Sender<Response>);
+type SubscriptionValue = (Arc<EventFilter>, Sender<Response>);
 
 /// Subscription manager
 ///
@@ -104,44 +104,19 @@ impl SubscriptionManager {
         self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
     }
 
-    fn get_keys_from_event(event: &Event, min_prefix_match_len: usize) -> Vec<Key> {
+    fn get_keys_from_event(event: &Event, _min_prefix_match_len: usize) -> Vec<SingleIndex> {
         let mut subscriptions = vec![];
 
-        let author = event.author().as_ref().to_vec();
-        let id = event.id.as_ref().to_vec();
-
-        for i in min_prefix_match_len..=author.len() {
-            subscriptions.push(Key::Author(author[..i].to_vec(), None));
-            subscriptions.push(Key::Author(author[..i].to_vec(), Some(event.kind())));
-        }
+        subscriptions.push(SingleIndex::Author(event.author().to_owned()));
+        subscriptions.push(SingleIndex::Id(event.id.to_owned()));
 
         for t in event.tags() {
-            match t {
-                nostr_rs_types::types::Tag::Event(ref_event) => {
-                    for i in min_prefix_match_len..=ref_event.id.len() {
-                        subscriptions.push(Key::RefId(ref_event.id[..i].to_vec(), None));
-                        subscriptions
-                            .push(Key::RefId(ref_event.id[..i].to_vec(), Some(event.kind())));
-                    }
-                }
-                nostr_rs_types::types::Tag::PubKey(ref_pub_key) => {
-                    for i in min_prefix_match_len..=ref_pub_key.id.len() {
-                        subscriptions.push(Key::RefId(ref_pub_key.id[..i].to_vec(), None));
-                        subscriptions
-                            .push(Key::RefId(ref_pub_key.id[..i].to_vec(), Some(event.kind())));
-                    }
-                }
-                _ => {}
-            }
-        }
-
-        for i in min_prefix_match_len..=id.len() {
-            subscriptions.push(Key::Id(id[..i].to_vec()));
+            t.get_indexable_value()
+                .map(|v| subscriptions.push(SingleIndex::Tag(t.get_identifier().to_owned(), v)));
         }
 
-        subscriptions.push(Key::Kind(event.kind()));
-        subscriptions.push(Key::AllUpdates);
-
+        subscriptions.push(SingleIndex::Kind(event.kind()));
+        subscriptions.push(SingleIndex::AllUpdates);
         subscriptions
     }
 
@@ -165,11 +140,13 @@ impl SubscriptionManager {
         let subscriptions = request
             .filters
             .into_iter()
-            .map(|filter| {
-                let filter = Arc::new(SortedFilter::from(filter));
+            .map(|mut filter| {
+                let index: Index = (&mut filter).into();
+                let filter = Arc::new(EventFilter::from(filter));
                 let subscription =
-                    ActiveSubscription::new(conn_id, name.clone(), filter.keys(), self.clone());
-                for key in subscription.keys.iter() {
+                    ActiveSubscription::new(conn_id, name.clone(), index, self.clone());
+
+                for key in subscription.indexes.iter() {
                     subscriptions.insert(
                         (key.clone(), conn_id, name.clone()),
                         (filter.clone(), sender.clone()),
@@ -189,7 +166,7 @@ impl SubscriptionManager {
         tokio::spawn(async move {
             let subscriptions = this.subscriptions.read().await;
             let subs = Self::get_keys_from_event(&event, this.min_prefix_match_len);
-            let mut deliverded = BTreeSet::new();
+            let mut deliverded = HashSet::new();
 
             for sub in subs {
                 for ((sub_type, client, name), (filter, sender)) in subscriptions.range(
@@ -203,7 +180,7 @@ impl SubscriptionManager {
                         break;
                     }
 
-                    if deliverded.contains(client) || !filter.check(&event) {
+                    if deliverded.contains(client) || !filter.check_event(&event) {
                         continue;
                     }
 

+ 0 - 1
crates/relayer/src/subscription/mod.rs

@@ -1,4 +1,3 @@
-mod filter;
 mod manager;
 
 pub use self::manager::{ActiveSubscription, SubscriptionManager};

+ 1 - 0
crates/storage/base/src/cursor.rs

@@ -7,6 +7,7 @@ use std::{
     task::{Context, Poll},
 };
 
+#[derive(Debug)]
 pub enum FutureValue {
     Found(Result<Event, Error>),
     Pending,

+ 78 - 74
crates/storage/base/src/event_filter.rs

@@ -1,59 +1,47 @@
 use chrono::{DateTime, Utc};
-use nostr_rs_types::types::{Event, Filter, Kind, Tag};
-use std::collections::HashSet;
+use nostr_rs_types::types::{filter::TagValue, Addr, Event, Filter, Kind, Tag};
+use std::{
+    collections::{HashMap, HashSet},
+    ops::Deref,
+};
 
 #[derive(Debug)]
 pub struct EventFilter {
-    ids: Vec<Vec<u8>>,
-    authors: Vec<Vec<u8>>,
-    kinds: Vec<Kind>,
-    references_to_public_key: Vec<Vec<u8>>,
-    references_to_event: Vec<Vec<u8>>,
+    ids: HashSet<[u8; 32]>,
+    authors: HashSet<[u8; 32]>,
+    tags: HashMap<String, HashSet<TagValue>>,
+    kinds: HashSet<Kind>,
     since: Option<DateTime<Utc>>,
     until: Option<DateTime<Utc>>,
 }
 
 impl From<Filter> for EventFilter {
     fn from(query: Filter) -> Self {
-        let mut authors = query
+        let authors = query
             .authors
             .into_iter()
-            .map(|id| id.bytes)
-            .collect::<Vec<_>>();
+            .map(|id| *id)
+            .collect::<HashSet<_>>();
 
-        let mut references_to_public_key = query
-            .references_to_public_key
+        let tags = query
+            .tags
             .into_iter()
-            .map(|tag| tag.bytes)
-            .collect::<HashSet<_>>()
-            .into_iter()
-            .collect::<Vec<_>>();
-
-        let mut references_to_event = query
-            .references_to_event
-            .into_iter()
-            .map(|tag| tag.bytes)
-            .collect::<HashSet<_>>()
-            .into_iter()
-            .collect::<Vec<_>>();
-
-        let mut kinds = query.kinds.into_iter().collect::<Vec<Kind>>();
+            .map(|(tag, values)| (tag, values))
+            .collect::<HashMap<String, HashSet<TagValue>>>();
 
-        let mut ids = query.ids.into_iter().map(|id| id.bytes).collect::<Vec<_>>();
+        let kinds = query.kinds.into_iter().collect::<HashSet<Kind>>();
 
-        // Sort everything for a quick binary_search instead of a linear search
-        ids.sort();
-        references_to_event.sort();
-        references_to_public_key.sort();
-        authors.sort();
-        kinds.sort();
+        let ids = query
+            .ids
+            .into_iter()
+            .map(|id| (*id))
+            .collect::<HashSet<_>>();
 
         EventFilter {
             ids,
             authors,
             kinds,
-            references_to_public_key,
-            references_to_event,
+            tags,
             since: query.since,
             until: query.until,
         }
@@ -61,6 +49,25 @@ impl From<Filter> for EventFilter {
 }
 
 impl EventFilter {
+    /// Returns true if the filter is empty, meaning that it will match any event.
+    pub fn is_empty(&self) -> bool {
+        self.ids.is_empty()
+            && self.authors.is_empty()
+            && self.tags.is_empty()
+            && self.kinds.is_empty()
+            && self.since.is_none()
+            && self.until.is_none()
+    }
+
+    /// Returns a new filter that matches any event.
+    pub fn into_option(self) -> Option<Self> {
+        if self.is_empty() {
+            None
+        } else {
+            Some(self)
+        }
+    }
+
     /// Receives a list of event-IDs, and returns a list of events that match
     /// the given filter.
     ///
@@ -69,57 +76,54 @@ impl EventFilter {
     /// filter them by the given parameters.
     #[inline]
     pub fn check_event(&self, event: &Event) -> bool {
-        if !self.ids.is_empty() && self.ids.binary_search(&event.id.0.to_vec()).is_err() {
+        if self.is_empty() {
+            return true;
+        }
+
+        if !self.ids.is_empty() && !self.ids.contains(&event.id.0) {
             return false;
         }
-        if !self.references_to_public_key.is_empty()
-            || !self.references_to_event.is_empty() && self.authors.is_empty()
-        {
-            let mut found = false;
-
-            for tag in event.tags().iter() {
-                match tag {
-                    Tag::Event(event) => {
-                        if self
-                            .references_to_event
-                            .binary_search(&event.id.as_ref().to_vec())
-                            .is_ok()
-                        {
-                            found = true;
-                        }
-                    }
-                    Tag::PubKey(key) => {
-                        if self
-                            .references_to_public_key
-                            .binary_search(&key.id.as_ref().to_vec())
-                            .is_ok()
-                        {
-                            found = true;
-                        }
+
+        if !self.tags.is_empty() {
+            let event_tags = event
+                .tags()
+                .iter()
+                .filter_map(|tag| {
+                    tag.get_indexable_value()
+                        .map(|f| (tag.get_identifier().to_owned(), f))
+                })
+                .fold(HashMap::new(), |mut acc, (key, value)| {
+                    acc.entry(key).or_insert_with(HashSet::new).insert(value);
+                    acc
+                });
+
+            for (tag, possible_values) in self.tags.iter() {
+                let tag_values = if let Some(tag_values) = event_tags.get(tag) {
+                    tag_values
+                } else {
+                    // event has no tag with the given name
+                    return false;
+                };
+
+                let mut found_tag = false;
+                for tag_value in tag_values {
+                    if possible_values.contains(tag_value) {
+                        found_tag = true;
+                        break;
                     }
-                    _ => {}
                 }
 
-                if found {
-                    break;
+                if !found_tag {
+                    return false;
                 }
             }
-
-            if !found {
-                return false;
-            }
         }
 
-        if !self.authors.is_empty()
-            && self
-                .authors
-                .binary_search(&event.author().as_ref().to_vec())
-                .is_err()
-        {
+        if !self.authors.is_empty() && !self.authors.contains(event.author().deref()) {
             return false;
         }
 
-        if !self.kinds.is_empty() && self.kinds.binary_search(&event.kind()).is_err() {
+        if !self.kinds.is_empty() && !self.kinds.contains(&event.kind()) {
             return false;
         }
 

+ 140 - 0
crates/storage/base/src/index.rs

@@ -0,0 +1,140 @@
+//! Indexes for the storage engine.
+//!
+//! This module contains the definition of the indexes used by the storage
+//! engine and provides a convenient conversion from less useful to more useful
+//! indexes by default.
+//!
+//! Each storage engine can use this implementation or have their own
+use nostr_rs_types::types::{filter::TagValue, Addr, Filter, Id, Kind};
+use std::collections::HashSet;
+
+/// Indexes for the storage engine.
+#[derive(Debug, Clone)]
+pub enum Index {
+    /// Index by tag.
+    Tag(String, HashSet<TagValue>),
+    /// Index by id.
+    Id(Vec<Id>),
+    /// Index by author.
+    Author(Vec<Id>),
+    /// Index by kind.
+    Kind(Vec<Kind>),
+    /// Table scan.
+    TableScan,
+}
+
+impl Index {
+    /// Splits the index into a list of single indexes.
+    pub fn split(self) -> Vec<SingleIndex> {
+        match self {
+            Index::Tag(tag, tags) => tags
+                .into_iter()
+                .map(|tag_value| SingleIndex::Tag(tag.clone(), tag_value))
+                .collect(),
+            Index::Id(ids) => ids.into_iter().map(SingleIndex::Id).collect(),
+            Index::Author(authors) => authors.into_iter().map(SingleIndex::Author).collect(),
+            Index::Kind(kinds) => kinds.into_iter().map(SingleIndex::Kind).collect(),
+            Index::TableScan => vec![SingleIndex::AllUpdates],
+        }
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+pub enum SingleIndex {
+    Author(Id),
+    Id(Id),
+    Kind(Kind),
+    Tag(String, TagValue),
+    AllUpdates,
+}
+
+impl From<&mut Filter> for Index {
+    fn from(query: &mut Filter) -> Self {
+        if !query.ids.is_empty() {
+            Index::Id(std::mem::take(&mut query.ids))
+        } else if !query.authors.is_empty() {
+            Index::Author(std::mem::take(&mut query.authors))
+        } else if !query.tags.is_empty() {
+            // find the tag with fewer options to iterate over
+            // and convert that tag as the index.
+            // This block of code is slightly different, because only an index is needed, instead of a list of indexes
+            // to iterate over, therefore the index with fewer options is selected.
+            let mut tag_with_fewer_opts = query
+                .tags
+                .iter()
+                .map(|(tag, values)| (tag.to_owned(), values.len()))
+                .collect::<Vec<_>>();
+
+            tag_with_fewer_opts.sort_by_key(|(_, len)| *len);
+            let (key, _) = tag_with_fewer_opts.remove(0);
+
+            if let Some(tags) = query.tags.remove(&key) {
+                Index::Tag(key, tags)
+            } else {
+                Index::TableScan
+            }
+        } else if !query.kinds.is_empty() {
+            Index::Kind(std::mem::take(&mut query.kinds))
+        } else {
+            Index::TableScan
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use serde_json::json;
+
+    use super::*;
+    use std::collections::HashMap;
+
+    #[test]
+    fn test_index_preference_ids_over_others() {
+        let mut filter: Filter = serde_json::from_value(json!({
+            "ids": ["42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"],
+            "authors": ["42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"],
+            "#r": ["foo"],
+            "kinds": [1]
+        }))
+        .expect("filter");
+
+        let index = Index::from(&mut filter);
+        assert!(matches!(index, Index::Id(_)));
+    }
+
+    #[test]
+    fn test_index_preference_authors_over_tags_and_kinds() {
+        let mut filter: Filter = serde_json::from_value(json!({
+            "authors": ["42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"],
+            "#r": ["foo"],
+            "kinds": [1]
+        }))
+        .expect("filter");
+
+        let index = Index::from(&mut filter);
+        assert!(matches!(index, Index::Author(_)));
+    }
+
+    #[test]
+    fn test_index_preference_tags_over_kinds() {
+        let mut filter: Filter = serde_json::from_value(json!({
+            "#r": ["foo"],
+            "kinds": [1]
+        }))
+        .expect("filter");
+
+        let index = Index::from(&mut filter);
+        assert!(matches!(index, Index::Tag(_, _)));
+    }
+
+    #[test]
+    fn test_index_preference_kinds_when_others_are_empty() {
+        let mut filter: Filter = serde_json::from_value(json!({
+            "kinds": [1]
+        }))
+        .expect("filter");
+
+        let index = Index::from(&mut filter);
+        assert!(matches!(index, Index::Kind(_)));
+    }
+}

+ 8 - 1
crates/storage/base/src/lib.rs

@@ -7,6 +7,7 @@
 pub mod cursor;
 mod error;
 mod event_filter;
+mod index;
 mod secondary_index;
 mod storage;
 
@@ -16,7 +17,13 @@ pub mod test;
 #[cfg(feature = "test")]
 pub use tokio;
 
-pub use crate::{error::Error, event_filter::*, secondary_index::SecondaryIndex, storage::Storage};
+pub use crate::{
+    error::Error,
+    event_filter::*,
+    index::{Index, SingleIndex},
+    secondary_index::SecondaryIndex,
+    storage::Storage,
+};
 
 #[macro_export]
 /// This macro creates the

+ 42 - 40
crates/storage/base/src/test.rs

@@ -5,7 +5,9 @@
 use super::*;
 use futures::{StreamExt, TryStreamExt};
 use nostr_rs_types::types::{Addr, Event, Filter, Kind};
+use serde_json::json;
 use std::{
+    collections::HashMap,
     fs::File,
     io::{BufRead, BufReader},
 };
@@ -118,20 +120,18 @@ where
 {
     setup_db(db).await;
 
+    let filter: Filter = serde_json::from_value(json!({
+        "#e": [
+            "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
+        ],
+        "#p": [
+            "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+        ]
+    }))
+    .expect("valid filter");
+
     let related_events: Vec<Event> = db
-        .get_by_filter(Filter {
-            references_to_event: vec![
-                "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            references_to_public_key: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            ..Default::default()
-        })
+        .get_by_filter(filter)
         .await
         .expect("valid")
         .try_collect()
@@ -146,20 +146,18 @@ where
 {
     setup_db(db);
 
+    let filter: Filter = serde_json::from_value(json!({
+        "#e": [
+            "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+        ],
+        "#p": [
+            "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
+        ]
+    }))
+    .expect("valid filter");
+
     let related_events: Vec<Event> = db
-        .get_by_filter(Filter {
-            references_to_event: vec![
-                "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            references_to_public_key: vec![
-                "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            ..Default::default()
-        })
+        .get_by_filter(filter)
         .await
         .expect("valid")
         .try_collect()
@@ -174,16 +172,16 @@ where
 {
     setup_db(db).await;
 
+    let filter: Filter = serde_json::from_value(json!({
+        "kinds": [7, 1],
+        "#e": [
+            "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+        ],
+    }))
+    .expect("valid filter");
+
     let related_events: Vec<Event> = db
-        .get_by_filter(Filter {
-            kinds: vec![Kind::Reaction, Kind::ShortTextNote],
-            references_to_event: vec![
-                "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            ..Default::default()
-        })
+        .get_by_filter(filter)
         .await
         .expect("valid")
         .try_collect()
@@ -204,7 +202,7 @@ where
 
     let events: Vec<Event> = db
         .get_by_filter(Filter {
-            ids: vec![id.clone()],
+            ids: vec![id],
             ..Default::default()
         })
         .await
@@ -215,11 +213,15 @@ where
 
     assert_eq!(events.len(), 1);
 
+    let filter: Filter = serde_json::from_value(json!({
+        "#e": [
+            "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+        ],
+    }))
+    .expect("valid filter");
+
     let related_events: Vec<Event> = db
-        .get_by_filter(Filter {
-            references_to_event: vec![id],
-            ..Default::default()
-        })
+        .get_by_filter(filter)
         .await
         .expect("valid")
         .try_collect()

+ 40 - 73
crates/storage/memory/src/lib.rs

@@ -1,5 +1,5 @@
-use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
-use nostr_rs_types::types::{Event, Filter, Tag};
+use nostr_rs_storage_base::{Error, EventFilter, Index, SecondaryIndex, Storage};
+use nostr_rs_types::types::{Event, Filter};
 use std::{
     cmp::min,
     collections::{BTreeMap, VecDeque},
@@ -16,8 +16,7 @@ mod cursor;
 pub struct Indexes {
     // Vec<u8> is used instead of Id, since references can be partial keys
     author: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
-    ref_event: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
-    ref_pub_key: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    tags: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
     kind: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
     ids_by_time: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
     /// silly index to store the keys of the indexes, but it helps to use the same code
@@ -94,36 +93,17 @@ impl Storage for Memory {
                 event_id.clone(),
             );
 
-            for tag in event.tags().iter() {
-                match tag {
-                    Tag::PubKey(pub_key) => {
-                        let foreign_id = secondary_index.index_by(&pub_key.id);
-                        let local_id = secondary_index.index_by(&event_id);
-
-                        indexes
-                            .ref_pub_key
-                            .write()
-                            .await
-                            .insert(foreign_id, event_id.clone());
-
-                        indexes
-                            .ref_event
-                            .write()
-                            .await
-                            .insert(local_id, pub_key.id.to_vec());
-                    }
-                    Tag::Event(foreign_event) => {
-                        let foreign_id = secondary_index.index_by(&foreign_event.id);
-                        let local_id = secondary_index.index_by(&event_id);
-
-                        let mut ref_event = indexes.ref_event.write().await;
-                        ref_event.insert(foreign_id, event_id.clone());
-                        ref_event.insert(local_id, foreign_event.id.to_vec());
-                    }
-                    _ => {}
-                }
+            for event_tags in event
+                .tags()
+                .iter()
+                .filter_map(|tag| tag.clone().into_bytes())
+            {
+                indexes
+                    .tags
+                    .write()
+                    .await
+                    .insert(secondary_index.index_by(event_tags), event_id.clone());
             }
-
             let _ = indexer_running.fetch_sub(1, Ordering::SeqCst);
         });
         Ok(true)
@@ -161,62 +141,49 @@ impl Storage for Memory {
             Some(query.limit.try_into()?)
         };
 
-        let (index, index_prefixes) = if !query.references_to_event.is_empty() {
-            (
-                self.indexes.ref_event.read().await,
-                std::mem::take(&mut query.references_to_event)
-                    .into_iter()
-                    .map(|c| c.take())
+        let query_index: Index = (&mut query).into();
+
+        let (index, index_prefixes) = match query_index {
+            Index::Tag(tag_name, tags) => (
+                self.indexes.tags.read().await,
+                tags.into_iter()
+                    .map(|tag| tag.into_bytes(&tag_name))
                     .collect::<VecDeque<_>>(),
-            )
-        } else if !query.references_to_public_key.is_empty() {
-            (
-                self.indexes.ref_pub_key.read().await,
-                std::mem::take(&mut query.references_to_public_key)
+            ),
+            Index::Author(authors) => (
+                self.indexes.author.read().await,
+                authors
                     .into_iter()
-                    .map(|c| c.take())
+                    .map(|c| c.into_bytes())
                     .collect::<VecDeque<_>>(),
-            )
-        } else if !query.ids.is_empty() {
-            (
+            ),
+            Index::Id(ids) => (
                 self.indexes.ids.read().await,
-                std::mem::take(&mut query.ids)
-                    .into_iter()
-                    .map(|c| c.take())
-                    .collect::<VecDeque<_>>(),
-            )
-        } else if !query.authors.is_empty() {
-            (
-                self.indexes.author.read().await,
-                std::mem::take(&mut query.authors)
-                    .into_iter()
-                    .map(|c| c.take())
+                ids.into_iter()
+                    .map(|c| c.into_bytes())
                     .collect::<VecDeque<_>>(),
-            )
-        } else if !query.kinds.is_empty() {
-            (
+            ),
+            Index::Kind(kind) => (
                 self.indexes.kind.read().await,
-                std::mem::take(&mut query.kinds)
-                    .into_iter()
-                    .map(|kind| {
-                        let kind: u32 = kind.into();
-                        kind.to_be_bytes().to_vec()
-                    })
+                kind.into_iter()
+                    .map(|kind| kind.into_bytes())
                     .collect::<VecDeque<_>>(),
-            )
-        } else {
-            (
+            ),
+
+            Index::TableScan => (
                 self.indexes.ids_by_time.read().await,
                 vec![Vec::new()].into(), // all keys
-            )
+            ),
         };
 
+        let filter: EventFilter = query.into();
+
         Ok(Self::Cursor {
             db: self,
             index,
             index_prefixes,
             current_index_key: Vec::new(),
-            filter: Some(query.into()),
+            filter: filter.into_option(),
             limit,
             returned: 0,
             last_prefix: None,

+ 49 - 78
crates/storage/rocksdb/src/lib.rs

@@ -1,7 +1,7 @@
 //! Rocks DB implementation of the storage layer
 use crate::cursor::Cursor;
-use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
-use nostr_rs_types::types::{Event, Filter, Tag};
+use nostr_rs_storage_base::{Error, Index, SecondaryIndex, Storage};
+use nostr_rs_types::types::{Event, Filter};
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
     DB,
@@ -15,8 +15,7 @@ mod cursor;
 pub enum ReferenceType {
     Events,
     Author,
-    RefPublicKey,
-    RefEvent,
+    Tags,
     Kind,
     LocalEvents,
     Stream,
@@ -28,8 +27,7 @@ impl ReferenceType {
         match self {
             Self::Events => "events",
             Self::Author => "authors",
-            Self::RefPublicKey => "refs_by_public_key",
-            Self::RefEvent => "refs_by_ids",
+            Self::Tags => "tags",
             Self::Kind => "kinds",
             Self::LocalEvents => "local",
             Self::Stream => "stream",
@@ -51,8 +49,7 @@ impl RocksDb {
             vec![
                 ColumnFamilyDescriptor::new(ReferenceType::Events.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::Author.as_str(), options.clone()),
-                ColumnFamilyDescriptor::new(ReferenceType::RefEvent.as_str(), options.clone()),
-                ColumnFamilyDescriptor::new(ReferenceType::RefPublicKey.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(ReferenceType::Tags.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::Kind.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::LocalEvents.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::Stream.as_str(), options.clone()),
@@ -157,38 +154,13 @@ impl Storage for RocksDb {
         );
 
         for tag in event.tags().iter() {
-            match tag {
-                Tag::PubKey(p) => {
-                    let foreign_id = secondary_index.index_by(&p.id);
-                    let local_id = secondary_index.index_by(event_id);
-
-                    buffer.put_cf(
-                        &self.reference_to_cf_handle(ReferenceType::RefPublicKey)?,
-                        foreign_id,
-                        event_id.deref(),
-                    );
-                    buffer.put_cf(
-                        &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
-                        local_id,
-                        &*p.id,
-                    );
-                }
-                Tag::Event(e) => {
-                    let foreign_id = secondary_index.index_by(&e.id);
-                    let local_id = secondary_index.index_by(event_id);
-
-                    buffer.put_cf(
-                        &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
-                        foreign_id,
-                        event_id.deref(),
-                    );
-                    buffer.put_cf(
-                        &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
-                        local_id,
-                        &*e.id,
-                    );
-                }
-                _ => {}
+            if let Some(tag) = tag.clone().into_bytes() {
+                let tag_id = secondary_index.index_by(&tag);
+                buffer.put_cf(
+                    &self.reference_to_cf_handle(ReferenceType::Tags)?,
+                    tag_id,
+                    event_id.deref(),
+                );
             }
         }
 
@@ -223,46 +195,45 @@ impl Storage for RocksDb {
             Some(query.limit.try_into()?)
         };
 
-        let (index, secondary_index_iterator, prefixes) = if !query.references_to_event.is_empty() {
-            let keys = std::mem::take(&mut query.references_to_event)
-                .into_iter()
-                .map(|c| c.take())
-                .collect();
-            (Some(ReferenceType::RefEvent), None, keys)
-        } else if !query.references_to_public_key.is_empty() {
-            let keys = std::mem::take(&mut query.references_to_public_key)
-                .into_iter()
-                .map(|c| c.take())
-                .collect();
-            (Some(ReferenceType::RefPublicKey), None, keys)
-        } else if !query.ids.is_empty() {
-            let keys = std::mem::take(&mut query.ids)
-                .into_iter()
-                .map(|c| c.take())
-                .collect();
-            (None, None, keys)
-        } else if !query.authors.is_empty() {
-            let keys = std::mem::take(&mut query.authors)
-                .into_iter()
-                .map(|c| c.take())
-                .collect();
-            (Some(ReferenceType::Author), None, keys)
-        } else if !query.kinds.is_empty() {
-            let keys = std::mem::take(&mut query.kinds)
-                .into_iter()
-                .map(|kind| {
-                    let kind: u32 = kind.into();
-                    kind.to_be_bytes().to_vec()
-                })
-                .collect();
-            (Some(ReferenceType::Kind), None, keys)
-        } else {
-            let cf_handle = self.reference_to_cf_handle(ReferenceType::Stream)?;
-            (
+        let query_index: Index = (&mut query).into();
+
+        let (index, secondary_index_iterator, prefixes) = match query_index {
+            Index::Tag(tag_name, tags) => (
+                Some(ReferenceType::Tags),
+                None,
+                tags.into_iter()
+                    .map(|tag| tag.into_bytes(&tag_name))
+                    .collect::<VecDeque<_>>(),
+            ),
+            Index::Id(ids) => (
                 None,
-                Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
+                None,
+                ids.into_iter().map(|id| id.bytes).collect::<VecDeque<_>>(),
+            ),
+            Index::Author(authors) => (
+                Some(ReferenceType::Author),
+                None,
+                authors
+                    .into_iter()
+                    .map(|author| author.bytes)
+                    .collect::<VecDeque<_>>(),
+            ),
+            Index::Kind(kinds) => (
+                Some(ReferenceType::Kind),
+                None,
+                kinds
+                    .into_iter()
+                    .map(|kind| kind.into_bytes())
+                    .collect::<VecDeque<_>>(),
+            ),
+            Index::TableScan => (
+                Some(ReferenceType::Stream),
+                Some(self.db.iterator_cf(
+                    &self.reference_to_cf_handle(ReferenceType::Stream)?,
+                    IteratorMode::Start,
+                )),
                 VecDeque::new(),
-            )
+            ),
         };
 
         Ok(Cursor::new(

+ 20 - 0
crates/types/src/types/addr.rs

@@ -16,6 +16,8 @@ use std::{
 };
 use thiserror::Error;
 
+use super::Id;
+
 /// Errors
 #[derive(Error, Debug)]
 pub enum Error {
@@ -75,6 +77,24 @@ pub struct Addr {
     pub hrp: Option<HumanReadablePart>,
 }
 
+impl From<&Id> for Addr {
+    fn from(id: &Id) -> Self {
+        Self {
+            bytes: id.0.to_vec(),
+            hrp: None,
+        }
+    }
+}
+
+impl From<Id> for Addr {
+    fn from(id: Id) -> Self {
+        Self {
+            bytes: id.0.to_vec(),
+            hrp: None,
+        }
+    }
+}
+
 impl AsRef<[u8]> for Addr {
     fn as_ref(&self) -> &[u8] {
         &self.bytes

+ 93 - 11
crates/types/src/types/filter.rs

@@ -4,10 +4,89 @@
 //!
 //! Each property is treated as an AND, but a vector of filters can be provided,
 //! and they will be treated as OR.
+use super::Kind;
 use chrono::{DateTime, Utc};
-use serde::{Deserialize, Serialize};
+use serde::{
+    de::{MapAccess, Visitor},
+    ser::SerializeMap,
+    Deserialize, Deserializer, Serialize, Serializer,
+};
+use std::{
+    collections::{HashMap, HashSet},
+    fmt,
+};
 
-use super::Kind;
+#[derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Eq, PartialEq, Clone)]
+#[serde(untagged)]
+/// Tag filter value
+pub enum TagValue {
+    /// Tag another event
+    Address(super::Addr),
+    /// Any non standard tag
+    String(String),
+}
+
+const SEPARATOR: &[u8] = b"\x1f\xcc";
+
+impl TagValue {
+    /// Convert the value into bytes
+    pub fn into_bytes(self, id: &str) -> Vec<u8> {
+        let value = match self {
+            TagValue::Address(addr) => addr.bytes,
+            TagValue::String(s) => s.into_bytes(),
+        };
+
+        let mut bytes = Vec::with_capacity(id.len() + SEPARATOR.len() + value.len());
+        bytes.extend_from_slice(id.as_bytes());
+        bytes.extend_from_slice(SEPARATOR);
+        bytes.extend_from_slice(&value);
+        bytes
+    }
+}
+
+fn deserialize_tags<'de, D>(deserializer: D) -> Result<HashMap<String, HashSet<TagValue>>, D::Error>
+where
+    D: Deserializer<'de>,
+{
+    struct TagVisitor;
+
+    impl<'de> Visitor<'de> for TagVisitor {
+        type Value = HashMap<String, HashSet<TagValue>>;
+
+        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+            formatter.write_str("a map with special #<one-letter> fields")
+        }
+
+        fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
+        where
+            A: MapAccess<'de>,
+        {
+            let mut tags = HashMap::new();
+            while let Some((key, value)) = map.next_entry::<String, _>()? {
+                if key.starts_with('#') {
+                    tags.insert(key[1..].to_string(), value);
+                }
+            }
+            Ok(tags)
+        }
+    }
+
+    deserializer.deserialize_map(TagVisitor)
+}
+
+fn serialize_tags<S>(
+    tags: &HashMap<String, HashSet<TagValue>>,
+    serializer: S,
+) -> Result<S::Ok, S::Error>
+where
+    S: Serializer,
+{
+    let mut map = serializer.serialize_map(Some(tags.len()))?;
+    for (key, value) in tags {
+        map.serialize_entry(&format!("#{}", key), value)?;
+    }
+    map.end()
+}
 
 /// Filter
 ///
@@ -19,25 +98,28 @@ pub struct Filter {
     ///
     /// A full Id can be provided, or a prefix.
     #[serde(default, skip_serializing_if = "Vec::is_empty")]
-    pub ids: Vec<super::Addr>,
+    pub ids: Vec<super::Id>,
     /// Fetch events by a public key, or a prefix
     #[serde(default, skip_serializing_if = "Vec::is_empty")]
-    pub authors: Vec<super::Addr>,
+    pub authors: Vec<super::Id>,
     /// Fetch events by their kinds
     #[serde(default, skip_serializing_if = "Vec::is_empty")]
     pub kinds: Vec<Kind>,
-    /// Fetch events that has tag references to a given event id
-    #[serde(default, skip_serializing_if = "Vec::is_empty", rename = "#e")]
-    pub references_to_event: Vec<super::Addr>,
-    /// Fetches events that has a tag to a public key
-    #[serde(default, skip_serializing_if = "Vec::is_empty", rename = "#p")]
-    pub references_to_public_key: Vec<super::Addr>,
-    /// Fetch events newer to this given timestamp
+    /// Fetch events by their tags
+    /// Although only single letter tags should be indexed per NIP-01, using
+    /// a string will allow to tweak the filter in the future
+    #[serde(
+        flatten,
+        serialize_with = "serialize_tags",
+        deserialize_with = "deserialize_tags"
+    )]
+    pub tags: HashMap<String, HashSet<TagValue>>,
     #[serde(
         default,
         with = "super::option_ts_seconds",
         skip_serializing_if = "Option::is_none"
     )]
+    /// Fetch events newer to this given timestamp
     pub since: Option<DateTime<Utc>>,
     /// Fetch events older to this timestamp
     #[serde(

+ 8 - 1
crates/types/src/types/id.rs

@@ -13,9 +13,16 @@ use std::{
 /// Event Id
 ///
 /// Event Id are raw 32 bytes and 64-character length hex encoded to JSON
-#[derive(Debug, Clone, Hash, Eq, PartialEq)]
+#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
 pub struct Id(pub [u8; 32]);
 
+impl Id {
+    /// Convert the Id to a vector of bytes
+    pub fn into_bytes(self) -> Vec<u8> {
+        self.0.to_vec()
+    }
+}
+
 impl AsRef<[u8]> for Id {
     fn as_ref(&self) -> &[u8] {
         &self.0

+ 6 - 0
crates/types/src/types/kind.rs

@@ -76,6 +76,12 @@ impl Kind {
         let kind_id: u32 = (*self).into();
         (20_000..=29_999).contains(&kind_id)
     }
+
+    /// Convert the Kind into a byte representation
+    pub fn into_bytes(&self) -> Vec<u8> {
+        let kind_id: u32 = (*self).into();
+        kind_id.to_be_bytes().to_vec()
+    }
 }
 
 impl From<Kind> for u32 {

+ 30 - 8
crates/types/src/types/tag/mod.rs

@@ -4,7 +4,7 @@
 //!
 //! It can also use the tag to add all sort of metadata, useful for their own
 //! type
-use super::Addr;
+use super::{filter::TagValue, Addr};
 use serde::{
     de::{self, Deserialize, Deserializer},
     ser::{self, SerializeSeq, Serializer},
@@ -43,6 +43,34 @@ impl Tag {
     pub fn is_event(&self) -> bool {
         matches!(self, Self::Event(_))
     }
+
+    /// Get the identifier for the tag
+    pub fn get_identifier(&self) -> &str {
+        match self {
+            Tag::Event(_) => "e",
+            Tag::PubKey(_) => "p",
+            Tag::Unknown(u, _) => u,
+        }
+    }
+
+    /// Get the indexable value of the tag
+    pub fn get_indexable_value(&self) -> Option<TagValue> {
+        match self {
+            Tag::Event(event) => Some(TagValue::Address(event.id.clone())),
+            Tag::PubKey(key) => Some(TagValue::Address(key.id.clone())),
+            Tag::Unknown(_, _) => None,
+        }
+    }
+
+    /// Converts the tag into bytes if possible.
+    ///
+    /// The bytes can be used to store the tag in a database or to find it in an in-memory index for subscribers
+    pub fn into_bytes(self) -> Option<Vec<u8>> {
+        Some(
+            self.get_indexable_value()?
+                .into_bytes(self.get_identifier()),
+        )
+    }
 }
 
 impl ser::Serialize for Tag {
@@ -50,14 +78,8 @@ impl ser::Serialize for Tag {
     where
         S: Serializer,
     {
-        let typ = match self {
-            Tag::Event(_) => "e",
-            Tag::PubKey(_) => "p",
-            Tag::Unknown(u, _) => u,
-        };
-
         let mut seq = serializer.serialize_seq(Some(2))?;
-        seq.serialize_element(typ)?;
+        seq.serialize_element(self.get_identifier())?;
 
         match self {
             Tag::Event(event) => {