Pārlūkot izejas kodu

WIP Working on iterator resultset

Cesar Rodas 1 gadu atpakaļ
vecāks
revīzija
db9e891a3d

+ 1 - 0
crates/storage/Cargo.toml

@@ -9,3 +9,4 @@ nostr-rs-types = { path = "../types" }
 thiserror = "1.0.40"
 rocksdb = { version = "0.20.1", features = ["multi-threaded-cf", "serde", "snappy"] }
 rand = "0.8.5"
+chrono = "0.4.26"

+ 139 - 0
crates/storage/src/event_filter.rs

@@ -0,0 +1,139 @@
+use chrono::{DateTime, Utc};
+use nostr_rs_types::types::{Event, Filter, Kind, Tag};
+use std::collections::HashSet;
+
+#[derive(Debug)]
+pub struct EventFilter {
+    ids: Vec<Vec<u8>>,
+    authors: Vec<Vec<u8>>,
+    kinds: Vec<Kind>,
+    references_to_public_key: Vec<Vec<u8>>,
+    references_to_event: Vec<Vec<u8>>,
+    since: Option<DateTime<Utc>>,
+    until: Option<DateTime<Utc>>,
+}
+
+impl From<Filter> for EventFilter {
+    fn from(query: Filter) -> Self {
+        let mut authors = query
+            .authors
+            .into_iter()
+            .map(|id| id.bytes)
+            .collect::<Vec<_>>();
+
+        let mut references_to_public_key = query
+            .references_to_public_key
+            .into_iter()
+            .map(|tag| tag.bytes)
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect::<Vec<_>>();
+
+        let mut references_to_event = query
+            .references_to_event
+            .into_iter()
+            .map(|tag| tag.bytes)
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect::<Vec<_>>();
+
+        let mut kinds = query.kinds.into_iter().collect::<Vec<Kind>>();
+
+        let mut ids = query.ids.into_iter().map(|id| id.bytes).collect::<Vec<_>>();
+
+        // Sort everything for a quick binary_search instead of a linear search
+        ids.sort();
+        references_to_event.sort();
+        references_to_public_key.sort();
+        authors.sort();
+        kinds.sort();
+
+        EventFilter {
+            ids,
+            authors,
+            kinds,
+            references_to_public_key,
+            references_to_event,
+            since: query.since,
+            until: query.until,
+        }
+    }
+}
+
+impl EventFilter {
+    /// Receives a list of event-IDs, and returns a list of events that match
+    /// the given filter.
+    ///
+    /// The list of event-IDs should come from an index from any namespace, and
+    /// the filter will load the events from the `Events` namespace and will
+    /// filter them by the given parameters.
+    #[inline]
+    pub fn check_event(&self, event: &Event) -> bool {
+        if !self.ids.is_empty() && self.ids.binary_search(&event.id.0.to_vec()).is_err() {
+            return false;
+        }
+        if !self.references_to_public_key.is_empty()
+            || !self.references_to_event.is_empty() && self.authors.is_empty()
+        {
+            let mut found = false;
+
+            for tag in event.tags().iter() {
+                match tag {
+                    Tag::Event(event) => {
+                        if self
+                            .references_to_event
+                            .binary_search(&event.id.as_ref().to_vec())
+                            .is_ok()
+                        {
+                            found = true;
+                        }
+                    }
+                    Tag::PubKey(key) => {
+                        if self
+                            .references_to_public_key
+                            .binary_search(&key.id.as_ref().to_vec())
+                            .is_ok()
+                        {
+                            found = true;
+                        }
+                    }
+                    _ => {}
+                }
+
+                if found {
+                    break;
+                }
+            }
+
+            if !found {
+                return false;
+            }
+        }
+
+        if !self.authors.is_empty()
+            && self
+                .authors
+                .binary_search(&event.author().as_ref().to_vec())
+                .is_err()
+        {
+            return false;
+        }
+
+        if !self.kinds.is_empty() && self.kinds.binary_search(&event.kind()).is_err() {
+            return false;
+        }
+
+        if let Some(since) = self.since {
+            if event.created_at() < since {
+                return false;
+            }
+        }
+        if let Some(until) = self.until {
+            if event.created_at() > until {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}

+ 113 - 96
crates/storage/src/lib.rs

@@ -8,9 +8,10 @@
 //! let this crate build the needed indexes to find events quickly.
 #![deny(missing_docs, warnings)]
 mod error;
+mod event_filter;
 mod rocksdb;
+mod secondary_index;
 mod storage;
-mod util;
 
 pub use crate::{error::Error, rocksdb::RocksDb, storage::Storage};
 
@@ -23,7 +24,11 @@ mod test {
         io::{BufRead, BufReader},
     };
 
-    fn setup_db<T: Storage>(db: &T) {
+    fn setup_db<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         let file = File::open("./tests/events.json").expect("file");
         let events = BufReader::new(file)
             .lines()
@@ -35,7 +40,11 @@ mod test {
         }
     }
 
-    pub fn store_and_get<T: Storage>(db: &T) {
+    pub fn store_and_get<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
 
         let event: Event = serde_json::from_str(json).expect("valid");
@@ -46,26 +55,26 @@ mod test {
         assert_eq!(event1, Some(event));
     }
 
-    pub fn records_are_sorted_by_date_desc<T: Storage>(db: &T) {
+    pub fn records_are_sorted_by_date_desc<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
 
         let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
             .try_into()
             .expect("pk");
 
-        let mut vec = vec![];
-        db.get_by_filter(
-            Filter {
+        let vec = db
+            .get_by_filter(Filter {
                 authors: vec![pk],
                 limit: 10,
                 ..Default::default()
-            },
-            |event| {
-                vec.push(event);
-                Ok(())
-            },
-        )
-        .expect("set of results");
+            })
+            .expect("set of results")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
 
         let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
         let mut sorted_dates = dates.clone();
@@ -75,12 +84,15 @@ mod test {
         assert_eq!(dates, sorted_dates);
     }
 
-    pub fn filter_by_references<T: Storage>(db: &T) {
+    pub fn filter_by_references<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
 
-        let mut related_events = vec![];
-        db.get_by_filter(
-            Filter {
+        let related_events = db
+            .get_by_filter(Filter {
                 references_to_event: vec![
                     "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
                         .try_into()
@@ -92,22 +104,22 @@ mod test {
                         .expect("pk"),
                 ],
                 ..Default::default()
-            },
-            |event| {
-                related_events.push(event);
-                Ok(())
-            },
-        )
-        .expect("events");
+            })
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(related_events.len(), 1);
     }
 
-    pub fn filter_by_references_zero_match<T: Storage>(db: &T) {
+    pub fn filter_by_references_zero_match<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
 
-        let mut related_events = vec![];
-        db.get_by_filter(
-            Filter {
+        let related_events = db
+            .get_by_filter(Filter {
                 references_to_event: vec![
                     "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
                         .try_into()
@@ -119,22 +131,22 @@ mod test {
                         .expect("pk"),
                 ],
                 ..Default::default()
-            },
-            |event| {
-                related_events.push(event);
-                Ok(())
-            },
-        )
-        .expect("events");
+            })
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(related_events.len(), 0);
     }
 
-    pub fn filter_by_references_and_kind<T: Storage>(db: &T) {
+    pub fn filter_by_references_and_kind<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
 
-        let mut related_events = vec![];
-        db.get_by_filter(
-            Filter {
+        let related_events = db
+            .get_by_filter(Filter {
                 kinds: vec![Kind::Reaction, Kind::ShortTextNote],
                 references_to_event: vec![
                     "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
@@ -142,50 +154,43 @@ mod test {
                         .expect("pk"),
                 ],
                 ..Default::default()
-            },
-            |event| {
-                related_events.push(event);
-                Ok(())
-            },
-        )
-        .expect("events");
+            })
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(related_events.len(), 3);
     }
 
-    pub fn get_event_and_related_events<T: Storage>(db: &T) {
+    pub fn get_event_and_related_events<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
 
         let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
             .try_into()
             .expect("pk");
 
-        let mut events = vec![];
-        db.get_by_filter(
-            Filter {
+        let events = db
+            .get_by_filter(Filter {
                 ids: vec![id.clone()],
                 ..Default::default()
-            },
-            |event| {
-                events.push(event);
-                Ok(())
-            },
-        )
-        .expect("events");
+            })
+            .expect("events")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
 
         assert_eq!(events.len(), 1);
 
-        let mut related_events = vec![];
-        db.get_by_filter(
-            Filter {
+        let related_events = db
+            .get_by_filter(Filter {
                 references_to_event: vec![id],
                 ..Default::default()
-            },
-            |event| {
-                related_events.push(event);
-                Ok(())
-            },
-        )
-        .expect("events");
+            })
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(related_events.len(), 2_538);
 
         let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
@@ -196,7 +201,11 @@ mod test {
         assert_eq!(Kind::Unknown(42), kinds[1]);
     }
 
-    pub fn filter_by_authors<T: Storage>(db: &T) {
+    pub fn filter_by_authors<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
         let query = Filter {
             authors: vec![
@@ -209,16 +218,19 @@ mod test {
             ],
             ..Default::default()
         };
-        let mut records = vec![];
-        db.get_by_filter(query, |event| {
-            records.push(event);
-            Ok(())
-        })
-        .expect("records");
+        let records = db
+            .get_by_filter(query)
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(records.len(), 27);
     }
 
-    pub fn filter_by_author<T: Storage>(db: &T) {
+    pub fn filter_by_author<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
         let query = Filter {
             authors: vec![
@@ -228,16 +240,19 @@ mod test {
             ],
             ..Default::default()
         };
-        let mut records = vec![];
-        db.get_by_filter(query, |event| {
-            records.push(event);
-            Ok(())
-        })
-        .expect("records");
+        let records = db
+            .get_by_filter(query)
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(records.len(), 3);
     }
 
-    pub fn filter_by_author_and_kinds<T: Storage>(db: &T) {
+    pub fn filter_by_author_and_kinds<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
         let query = Filter {
             authors: vec![
@@ -248,27 +263,29 @@ mod test {
             kinds: vec![Kind::ShortTextNote, Kind::Reaction],
             ..Default::default()
         };
-        let mut records = vec![];
-        db.get_by_filter(query, |event| {
-            records.push(event);
-            Ok(())
-        })
-        .expect("records");
+        let records = db
+            .get_by_filter(query)
+            .expect("iterator")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(records.len(), 2);
     }
 
-    pub fn filter_kind<T: Storage>(db: &T) {
+    pub fn filter_kind<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
         setup_db(db);
         let query = Filter {
             kinds: vec![Kind::ShortTextNote],
             ..Default::default()
         };
-        let mut records = vec![];
-        db.get_by_filter(query, |event| {
-            records.push(event);
-            Ok(())
-        })
-        .expect("records");
+        let records = db
+            .get_by_filter(query)
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
         assert_eq!(records.len(), 1_511);
         records
             .iter()

+ 66 - 104
crates/storage/src/rocksdb.rs → crates/storage/src/rocksdb/mod.rs

@@ -1,14 +1,14 @@
 //! Rocks DB implementation of the storage layer
-use crate::{
-    util::{load_events_and_filter, SecondaryIndex},
-    Error, Storage,
-};
+use self::resultset::ResultSet;
+use crate::{secondary_index::SecondaryIndex, Error, Storage};
 use nostr_rs_types::types::{Event, Filter, Tag};
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
     DB,
 };
-use std::{collections::HashSet, path::Path, sync::Arc};
+use std::{collections::VecDeque, path::Path, sync::Arc};
+
+mod resultset;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
 enum ReferenceType {
@@ -80,71 +80,9 @@ impl RocksDb {
             .cf_handle(namespace.as_str())
             .ok_or(Error::InvalidColumnFamily)
     }
-
-    /// Instead of loading the whole event, only the event id is loaded. This is
-    /// done for performance, to avoid loading the same event multiple times,
-    /// also to only load from the database the events that are needed
-    /// (according to limits and other constraints)
-    fn get_event_ids_by_ns_and_prefix(
-        &self,
-        cf_handle: &Arc<BoundColumnFamily>,
-        prefix: &[u8],
-        limit: Option<usize>,
-    ) -> Result<Vec<Vec<u8>>, Error> {
-        let mut items = vec![];
-        let limit = limit.unwrap_or(usize::MAX);
-
-        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
-            let (key, value) = match item {
-                Ok((k, v)) => (k, v),
-                Err(e) => return Err(Error::RocksDb(e)),
-            };
-
-            if !key.starts_with(prefix) {
-                break;
-            }
-
-            items.push(value.to_vec());
-            if items.len() >= limit {
-                break;
-            }
-        }
-
-        Ok(items)
-    }
-
-    /// Loads all events that references a given event. The reference-type is provided and it can be any value of `ReferenceType`.
-    fn get_event_referenced_as<T: AsRef<[u8]>>(
-        &self,
-        cf_handle: &Arc<BoundColumnFamily>,
-        id: T,
-        limit: Option<usize>,
-    ) -> Result<HashSet<Vec<u8>>, Error> {
-        let mut items = HashSet::new();
-        let limit = limit.unwrap_or(usize::MAX);
-        let prefix = id.as_ref();
-
-        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
-            let (key, value) = match item {
-                Ok((k, v)) => (k, v),
-                Err(e) => return Err(Error::RocksDb(e)),
-            };
-
-            if !key.starts_with(prefix) {
-                break;
-            }
-
-            items.insert(value.to_vec());
-            if limit == items.len() {
-                break;
-            }
-        }
-
-        Ok(items)
-    }
 }
 
-impl Storage for RocksDb {
+impl<'a> Storage<'a, ResultSet<'a>> for RocksDb {
     fn get_local_events<C>(&self, for_each: C) -> Result<(), Error>
     where
         C: Fn(Event) -> Result<(), Error>,
@@ -259,67 +197,91 @@ impl Storage for RocksDb {
             .transpose()?)
     }
 
-    fn get_by_filter<C>(&self, mut query: Filter, for_each: C) -> Result<(), Error>
-    where
-        C: Fn(Event) -> Result<(), Error>,
-    {
-        let mut event_ids = vec![];
+    fn get_by_filter(&'a self, mut query: Filter) -> Result<ResultSet<'a>, Error> {
         let limit = if query.limit == 0 {
             None
         } else {
             Some(query.limit.try_into()?)
         };
 
-        if !query.references_to_event.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-            for public_key in query.references_to_event.iter() {
-                event_ids.extend(self.get_event_referenced_as(&ns, public_key.as_ref(), limit)?);
-            }
+        let (namespace, prefixes) = if !query.references_to_event.is_empty() {
+            let ns: Arc<BoundColumnFamily<'_>> =
+                self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+            let keys = query
+                .references_to_event
+                .iter()
+                .map(|c| c.as_ref().to_vec())
+                .collect();
             query.references_to_event.clear();
+            (Some(ns), keys)
         } else if !query.references_to_public_key.is_empty() {
             let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-            for public_key in query.references_to_public_key.iter() {
-                event_ids.extend(self.get_event_referenced_as(&ns, public_key.as_ref(), limit)?);
-            }
+            let keys = query
+                .references_to_public_key
+                .iter()
+                .map(|c| c.as_ref().to_vec())
+                .collect();
             query.references_to_public_key.clear();
+            (Some(ns), keys)
         } else if !query.ids.is_empty() {
-            query
-                .ids
-                .iter()
-                .map(|id| event_ids.push(id.as_ref().to_vec()))
-                .for_each(drop);
+            let keys = query.ids.iter().map(|c| c.as_ref().to_vec()).collect();
             query.ids.clear();
+            (None, keys)
         } else if !query.authors.is_empty() {
             let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
-            for public_key in query.authors.iter() {
-                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
-                    &ns,
-                    public_key.as_ref(),
-                    limit,
-                )?);
-            }
+            let keys = query.authors.iter().map(|c| c.as_ref().to_vec()).collect();
             query.authors.clear();
+            (Some(ns), keys)
         } else if !query.kinds.is_empty() {
             let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
-            for kind in query.kinds.iter() {
-                let kind: u32 = (*kind).into();
-                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
-                    &ns,
-                    &kind.to_be_bytes(),
-                    limit,
-                )?);
-            }
+            let keys = query
+                .kinds
+                .iter()
+                .map(|kind| {
+                    let kind: u32 = (*kind).into();
+                    kind.to_be_bytes().to_vec()
+                })
+                .collect();
             query.kinds.clear();
-        }
+            (Some(ns), keys)
+        } else {
+            (None, VecDeque::new())
+        };
 
-        load_events_and_filter(self, query, event_ids, for_each)
+        Ok(ResultSet {
+            db: self,
+            filter: query.into(),
+            namespace,
+            secondary_index_lookup: None,
+            current_prefix: vec![],
+            prefixes,
+            returned: 0,
+            limit,
+        })
+
+        //load_events_and_filter(self, query, event_ids, for_each)
     }
 }
 
 #[cfg(test)]
 mod test {
     use super::*;
-    use crate::{test, util::unique_nanoseconds};
+    use crate::test;
+    use rand::Rng;
+
+    // Get current nanoseconds and use the last 3 digits as a random number (because
+    // sometimes it comes as 0)
+    pub fn unique_nanoseconds() -> u128 {
+        let mut rng = rand::thread_rng();
+        let random_number = rng.gen_range(0..999);
+
+        let ts = std::time::SystemTime::now()
+            .duration_since(std::time::UNIX_EPOCH)
+            .expect("time")
+            .as_nanos();
+
+        ts.checked_add(random_number).unwrap_or(ts)
+    }
 
     #[test]
     fn store_and_get() {

+ 84 - 0
crates/storage/src/rocksdb/resultset.rs

@@ -0,0 +1,84 @@
+//! Rocks DB implementation of the storage layer
+use crate::{event_filter::EventFilter, Error, RocksDb, Storage};
+use nostr_rs_types::types::Event;
+use rocksdb::{BoundColumnFamily, DBIteratorWithThreadMode, DB};
+use std::{collections::VecDeque, sync::Arc};
+
+pub struct ResultSet<'a> {
+    pub db: &'a RocksDb,
+    pub filter: EventFilter,
+    pub namespace: Option<Arc<BoundColumnFamily<'a>>>,
+    pub secondary_index_lookup: Option<DBIteratorWithThreadMode<'a, DB>>,
+    pub current_prefix: Vec<u8>,
+    pub prefixes: VecDeque<Vec<u8>>,
+    pub limit: Option<usize>,
+    pub returned: usize,
+}
+
+impl<'a> ResultSet<'a> {
+    /// Selects the next prefix available and starts an iterator using the
+    /// secondary index. If no prefix is available from prefixes the functions
+    /// return None, signalling upstream the are no more results
+    fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
+        let prefix = self.prefixes.pop_front()?;
+        self.secondary_index_lookup = Some(
+            self.db
+                .db
+                .prefix_iterator_cf(self.namespace.as_ref()?, prefix.clone()),
+        );
+        self.current_prefix = prefix;
+        Some(())
+    }
+}
+
+impl<'a> Iterator for ResultSet<'a> {
+    type Item = Result<Event, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if Some(self.returned) == self.limit {
+            return None;
+        }
+        if self.secondary_index_lookup.is_none() {
+            if self.namespace.is_some() {
+                self.select_next_prefix_using_secondary_index()?;
+            } else {
+                // No secondary index is used to query, this means the query is
+                // using the ID filter, so it is more efficient to use the
+                // primary index to prefetch events that may satisfy the query
+                loop {
+                    let prefix = self.prefixes.pop_front()?;
+                    if let Ok(Some(event)) = self.db.get_event(prefix) {
+                        if self.filter.check_event(&event) {
+                            self.returned += 1;
+                            return Some(Ok(event));
+                        }
+                    }
+                }
+            }
+        }
+
+        loop {
+            loop {
+                let secondary_index = self.secondary_index_lookup.as_mut()?;
+                let (key, value) = match secondary_index.next() {
+                    Some(Ok((k, v))) => (k, v),
+                    _ => {
+                        // break this loop to select next available prefix
+                        break;
+                    }
+                };
+                if !key.starts_with(&self.current_prefix) {
+                    break;
+                }
+                if let Ok(Some(event)) = self.db.get_event(value) {
+                    if self.filter.check_event(&event) {
+                        self.returned += 1;
+                        return Some(Ok(event));
+                    }
+                }
+            }
+            // Select next prefix if available, or exists
+            self.select_next_prefix_using_secondary_index()?;
+        }
+    }
+}

+ 36 - 0
crates/storage/src/secondary_index.rs

@@ -0,0 +1,36 @@
+use chrono::{DateTime, Utc};
+use nostr_rs_types::types::Id;
+
+/// Creates a secondary index for a given event ID. The secondary index is
+/// sorted by created time in a descendent order, meaning that newer events will
+/// be first.
+///
+/// This is useful to leverage natural sorting of the database, and to be able
+/// to create secondary indexes to find by author, kind, tag, etc, and have that
+/// result sorted by date naturally
+pub struct SecondaryIndex<'a> {
+    entry_id: &'a Id,
+    sort_by: [u8; 8],
+}
+
+impl<'a> SecondaryIndex<'a> {
+    pub fn new(entry_id: &'a Id, date_time: DateTime<Utc>) -> Self {
+        // We need to reverse the number, to sort it. Otherwise, the first is
+        // going to be the oldest entry, instead of the newest.
+        let reversed_order_by = u64::MAX
+            .checked_sub(date_time.timestamp() as u64)
+            .unwrap_or_default()
+            .to_be_bytes();
+        Self {
+            entry_id,
+            sort_by: reversed_order_by,
+        }
+    }
+
+    pub fn new_id<T: AsRef<[u8]>>(&self, index_by: T) -> Vec<u8> {
+        let mut new_id = index_by.as_ref().to_vec();
+        new_id.extend_from_slice(&self.sort_by);
+        new_id.extend_from_slice(self.entry_id.as_ref());
+        new_id
+    }
+}

+ 5 - 4
crates/storage/src/storage.rs

@@ -2,7 +2,10 @@ use crate::Error;
 use nostr_rs_types::types::{Event, Filter};
 
 /// Trait to store/query events
-pub trait Storage {
+pub trait Storage<'a, I>
+where
+    I: Iterator<Item = Result<Event, Error>>,
+{
     /// Stores a new event into the database. This action will also creates all
     /// the needed indexes to find this event later by reference, author, kind or tag.
     fn store(&self, event: &Event) -> Result<bool, Error>;
@@ -19,9 +22,7 @@ pub trait Storage {
     /// The first step is to use one available index to get a list of event-IDs,
     /// then call `load_and_filter_events` that will load the events from the
     /// `Events` namespace and will filter them by the given parameters.
-    fn get_by_filter<C>(&self, query: Filter, for_each: C) -> Result<(), Error>
-    where
-        C: Fn(Event) -> Result<(), Error>;
+    fn get_by_filter(&'a self, query: Filter) -> Result<I, Error>;
 
     /// Return a vector of all local events
     fn get_local_events<C>(&self, for_each: C) -> Result<(), Error>

+ 0 - 164
crates/storage/src/util.rs

@@ -1,164 +0,0 @@
-use crate::{Error, Storage};
-use chrono::{DateTime, Utc};
-use nostr_rs_types::types::{Event, Filter, Id, Tag};
-use rand::Rng;
-use std::collections::HashSet;
-
-/// Creates a secondary index for a given event ID. The secondary index is
-/// sorted by created time in a descendent order, meaning that newer events will
-/// be first.
-///
-/// This is useful to leverage natural sorting of the database, and to be able
-/// to create secondary indexes to find by author, kind, tag, etc, and have that
-/// result sorted by date naturally
-pub struct SecondaryIndex<'a> {
-    entry_id: &'a Id,
-    sort_by: [u8; 8],
-}
-
-impl<'a> SecondaryIndex<'a> {
-    pub fn new(entry_id: &'a Id, date_time: DateTime<Utc>) -> Self {
-        // We need to reverse the number, to sort it. Otherwise, the first is
-        // going to be the oldest entry, instead of the newest.
-        let reversed_order_by = u64::MAX
-            .checked_sub(date_time.timestamp() as u64)
-            .unwrap_or_default()
-            .to_be_bytes();
-        Self {
-            entry_id,
-            sort_by: reversed_order_by,
-        }
-    }
-
-    pub fn new_id<T: AsRef<[u8]>>(&self, index_by: T) -> Vec<u8> {
-        let mut new_id = index_by.as_ref().to_vec();
-        new_id.extend_from_slice(&self.sort_by);
-        new_id.extend_from_slice(self.entry_id.as_ref());
-        new_id
-    }
-}
-
-// Get current nanoseconds and use the last 3 digits as a random number (because
-// sometimes it comes as 0)
-#[inline]
-#[allow(dead_code)]
-pub fn unique_nanoseconds() -> u128 {
-    let mut rng = rand::thread_rng();
-    let random_number = rng.gen_range(0..999);
-
-    let ts = std::time::SystemTime::now()
-        .duration_since(std::time::UNIX_EPOCH)
-        .expect("time")
-        .as_nanos();
-
-    ts.checked_add(random_number).unwrap_or(ts)
-}
-
-/// Receives a list of event-IDs, and returns a list of events that match
-/// the given filter.
-///
-/// The list of event-IDs should come from an index from any namespace, and
-/// the filter will load the events from the `Events` namespace and will
-/// filter them by the given parameters.
-#[inline]
-pub fn load_events_and_filter<T: Storage, C>(
-    db: &T,
-    mut query: Filter,
-    mut event_ids: Vec<Vec<u8>>,
-    for_each: C,
-) -> Result<(), Error>
-where
-    C: Fn(Event) -> Result<(), Error>,
-{
-    if !query.ids.is_empty() {
-        let mut ids = query.ids.iter().map(|id| id.as_ref()).collect::<Vec<_>>();
-        ids.sort();
-        event_ids.retain(|id| ids.binary_search(&id.as_slice()).is_ok());
-    }
-
-    let mut authors = query
-        .authors
-        .iter()
-        .map(|id| id.as_ref())
-        .collect::<Vec<_>>();
-
-    let mut tag_primary_keys = query
-        .references_to_public_key
-        .iter()
-        .map(|tag| tag.as_ref())
-        .collect::<HashSet<_>>()
-        .into_iter()
-        .collect::<Vec<_>>();
-
-    let mut tag_events = query
-        .references_to_event
-        .iter()
-        .map(|tag| tag.as_ref())
-        .collect::<HashSet<_>>()
-        .into_iter()
-        .collect::<Vec<_>>();
-
-    // Sort everything for a quick binary_search instead of a linear search
-    tag_events.sort();
-    tag_primary_keys.sort();
-    authors.sort();
-    query.kinds.sort();
-
-    let mut found = 0;
-
-    for id in event_ids.iter() {
-        if let Some(event) = db.get_event(id)? {
-            if !tag_primary_keys.is_empty() || !tag_events.is_empty() && authors.is_empty() {
-                let mut found = false;
-
-                for tag in event.tags().iter() {
-                    match tag {
-                        Tag::Event(event) => {
-                            if tag_events.binary_search(&event.id.as_ref()).is_ok() {
-                                found = true;
-                            }
-                        }
-                        Tag::PubKey(key) => {
-                            if tag_primary_keys.binary_search(&key.id.as_ref()).is_ok() {
-                                found = true;
-                            }
-                        }
-                        _ => {}
-                    }
-
-                    if found {
-                        break;
-                    }
-                }
-
-                if !found {
-                    continue;
-                }
-            }
-
-            if !authors.is_empty() && authors.binary_search(&event.author().as_ref()).is_err() {
-                continue;
-            }
-            if !query.kinds.is_empty() && query.kinds.binary_search(&event.kind()).is_err() {
-                continue;
-            }
-            if let Some(since) = query.since {
-                if event.created_at() < since {
-                    continue;
-                }
-            }
-            if let Some(until) = query.until {
-                if event.created_at() > until {
-                    continue;
-                }
-            }
-            for_each(event)?;
-            found += 1;
-            if found == query.limit {
-                break;
-            }
-        }
-    }
-
-    Ok(())
-}