Browse Source

Add iterator for local events

* Added comments
* Added iterator for local events
* Added stream iterator, where all events will be emited when the filter
  is empty
Cesar Rodas 1 year ago
parent
commit
ef3796b9a2

+ 33 - 0
crates/storage/src/lib.rs

@@ -292,4 +292,37 @@ mod test {
             .map(|x| x.kind())
             .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
     }
+
+    pub fn get_local_events<'a, T, I>(db: &'a T)
+    where
+        T: Storage<'a, I>,
+        I: Iterator<Item = Result<Event, Error>>,
+    {
+        setup_db(db);
+
+        let ids = db
+            .get_by_filter(Filter {
+                limit: 10,
+                ..Default::default()
+            })
+            .expect("valid")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
+
+        let x = ids
+            .iter()
+            .map(|event| db.set_local_event(event))
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
+
+        assert_eq!(10, ids.len());
+        assert_eq!(10, x.len());
+
+        let records = db
+            .get_local_events(None)
+            .expect("valid iterator")
+            .collect::<Result<Vec<_>, _>>()
+            .expect("valid");
+        assert_eq!(x.len(), records.len())
+    }
 }

+ 35 - 10
crates/storage/src/rocksdb/resultset.rs → crates/storage/src/rocksdb/iterators.rs

@@ -4,24 +4,39 @@ use nostr_rs_types::types::Event;
 use rocksdb::{BoundColumnFamily, DBIteratorWithThreadMode, DB};
 use std::{collections::VecDeque, sync::Arc};
 
-pub struct ResultSet<'a> {
+pub struct WrapperIterator<'a> {
+    /// Reference to the rocks db database. This is useful to load the event
+    /// data, because in the secondary indexes we only store the event ID. It
+    /// could be possible to avoid a reference and speed up things even more to
+    /// duplicate the event, but that would mean it would use way more disk
+    /// space.
     pub db: &'a RocksDb,
-    pub filter: EventFilter,
+    /// List of filters to apply to the events, before returning. If no filter
+    /// is given each events from the secondary index will be returned,
+    /// otherwise the events will be filtered by the given filter, and only
+    /// those events that comply will be returned
+    pub filter: Option<EventFilter>,
+    /// Reference to the namespace to use to query the secondary index. If none
+    /// is given the secondary_index_iterator must be constructed outside this
+    /// wrapper.
     pub namespace: Option<Arc<BoundColumnFamily<'a>>>,
-    pub secondary_index_lookup: Option<DBIteratorWithThreadMode<'a, DB>>,
+    /// The current secondary index iterator. If none is given the iterator will
+    /// try to create one using the namespace property and the first prefix from
+    /// prefixes (it will also be copied to current_prefix)
+    pub secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
     pub current_prefix: Vec<u8>,
     pub prefixes: VecDeque<Vec<u8>>,
     pub limit: Option<usize>,
     pub returned: usize,
 }
 
-impl<'a> ResultSet<'a> {
+impl<'a> WrapperIterator<'a> {
     /// Selects the next prefix available and starts an iterator using the
     /// secondary index. If no prefix is available from prefixes the functions
     /// return None, signalling upstream the are no more results
     fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
         let prefix = self.prefixes.pop_front()?;
-        self.secondary_index_lookup = Some(
+        self.secondary_index_iterator = Some(
             self.db
                 .db
                 .prefix_iterator_cf(self.namespace.as_ref()?, prefix.clone()),
@@ -31,14 +46,14 @@ impl<'a> ResultSet<'a> {
     }
 }
 
-impl<'a> Iterator for ResultSet<'a> {
+impl<'a> Iterator for WrapperIterator<'a> {
     type Item = Result<Event, Error>;
 
     fn next(&mut self) -> Option<Self::Item> {
         if Some(self.returned) == self.limit {
             return None;
         }
-        if self.secondary_index_lookup.is_none() {
+        if self.secondary_index_iterator.is_none() {
             if self.namespace.is_some() {
                 self.select_next_prefix_using_secondary_index()?;
             } else {
@@ -48,7 +63,12 @@ impl<'a> Iterator for ResultSet<'a> {
                 loop {
                     let prefix = self.prefixes.pop_front()?;
                     if let Ok(Some(event)) = self.db.get_event(prefix) {
-                        if self.filter.check_event(&event) {
+                        if let Some(filter) = &self.filter {
+                            if filter.check_event(&event) {
+                                self.returned += 1;
+                                return Some(Ok(event));
+                            }
+                        } else {
                             self.returned += 1;
                             return Some(Ok(event));
                         }
@@ -59,7 +79,7 @@ impl<'a> Iterator for ResultSet<'a> {
 
         loop {
             loop {
-                let secondary_index = self.secondary_index_lookup.as_mut()?;
+                let secondary_index = self.secondary_index_iterator.as_mut()?;
                 let (key, value) = match secondary_index.next() {
                     Some(Ok((k, v))) => (k, v),
                     _ => {
@@ -71,7 +91,12 @@ impl<'a> Iterator for ResultSet<'a> {
                     break;
                 }
                 if let Ok(Some(event)) = self.db.get_event(value) {
-                    if self.filter.check_event(&event) {
+                    if let Some(filter) = &self.filter {
+                        if filter.check_event(&event) {
+                            self.returned += 1;
+                            return Some(Ok(event));
+                        }
+                    } else {
                         self.returned += 1;
                         return Some(Ok(event));
                     }

+ 102 - 88
crates/storage/src/rocksdb/mod.rs

@@ -1,14 +1,14 @@
 //! Rocks DB implementation of the storage layer
-use self::resultset::ResultSet;
+use self::iterators::WrapperIterator;
 use crate::{secondary_index::SecondaryIndex, Error, Storage};
 use nostr_rs_types::types::{Event, Filter, Tag};
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
     DB,
 };
-use std::{collections::VecDeque, path::Path, sync::Arc};
+use std::{collections::VecDeque, ops::Deref, path::Path, sync::Arc};
 
-mod resultset;
+mod iterators;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
 enum ReferenceType {
@@ -18,6 +18,7 @@ enum ReferenceType {
     RefEvent,
     Kind,
     LocalEvents,
+    Stream,
 }
 
 impl ReferenceType {
@@ -30,6 +31,7 @@ impl ReferenceType {
             Self::RefEvent => "refs_by_ids",
             Self::Kind => "kinds",
             Self::LocalEvents => "local",
+            Self::Stream => "stream",
         }
     }
 }
@@ -52,6 +54,7 @@ impl RocksDb {
                 ColumnFamilyDescriptor::new(ReferenceType::RefPublicKey.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::Kind.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::LocalEvents.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(ReferenceType::Stream.as_str(), options.clone()),
             ],
         )?;
         Ok(Self { db })
@@ -82,82 +85,81 @@ impl RocksDb {
     }
 }
 
-impl<'a> Storage<'a, ResultSet<'a>> for RocksDb {
-    fn get_local_events<C>(&self, for_each: C) -> Result<(), Error>
-    where
-        C: Fn(Event) -> Result<(), Error>,
-    {
+impl<'a> Storage<'a, WrapperIterator<'a>> for RocksDb {
+    fn get_local_events(&'a self, limit: Option<usize>) -> Result<WrapperIterator<'a>, Error> {
         let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
-        self.db
-            .iterator_cf(&cf_handle, IteratorMode::Start)
-            .into_iter()
-            .map(|res| {
-                if let Ok((key, _)) = res {
-                    if let Ok(Some(event)) = self.get_event(&key) {
-                        for_each(event)
-                    } else {
-                        Ok(())
-                    }
-                } else {
-                    Ok(())
-                }
-            })
-            .collect::<Result<Vec<_>, Error>>()?;
-        Ok(())
+        Ok(WrapperIterator {
+            db: self,
+            filter: None,
+            namespace: None,
+            secondary_index_iterator: Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
+            current_prefix: vec![],
+            prefixes: VecDeque::new(),
+            limit,
+            returned: 0,
+        })
     }
 
     fn set_local_event(&self, event: &Event) -> Result<(), Error> {
+        let event_id = &event.id;
+        let secondary_index = SecondaryIndex::new(event_id, event.created_at());
         self.db.put_cf(
             &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
-            *(event.id),
-            &[],
+            secondary_index.index_by(&[]),
+            &event_id.deref(),
         )?;
         Ok(())
     }
 
     fn store(&self, event: &Event) -> Result<bool, Error> {
-        let event_id = event.id.clone();
+        let event_id = &event.id;
 
         if let Ok(Some(_)) = self.db.get_cf(
             &self.reference_to_cf_handle(ReferenceType::Events)?,
-            *event_id,
+            event_id.deref(),
         ) {
             return Ok(false);
         }
-        let secondary_index = SecondaryIndex::new(&event_id, event.created_at());
-        let author_id = secondary_index.new_id(event.author());
+        let secondary_index = SecondaryIndex::new(event_id, event.created_at());
+        let author_id = secondary_index.index_by(event.author());
         let json = serde_json::to_vec(event)?;
         let kind: u32 = event.kind().into();
-        let kind_id = secondary_index.new_id(&kind.to_be_bytes());
+        let kind_id = secondary_index.index_by(&kind.to_be_bytes());
 
         let mut buffer = WriteBatch::default();
 
         buffer.put_cf(
             &self.reference_to_cf_handle(ReferenceType::Events)?,
-            *event_id,
+            event_id.deref(),
             json,
         );
         buffer.put_cf(
             &self.reference_to_cf_handle(ReferenceType::Author)?,
             author_id,
-            *event_id,
+            event_id.deref(),
         );
         buffer.put_cf(
             &self.reference_to_cf_handle(ReferenceType::Kind)?,
             kind_id,
-            *event_id,
+            event_id.deref(),
+        );
+
+        buffer.put_cf(
+            &self.reference_to_cf_handle(ReferenceType::Stream)?,
+            secondary_index.index_by(&[]),
+            event_id.deref(),
         );
 
         for tag in event.tags().iter() {
             match tag {
                 Tag::PubKey(p) => {
-                    let foreign_id = secondary_index.new_id(&p.id);
-                    let local_id = secondary_index.new_id(&event_id);
+                    let foreign_id = secondary_index.index_by(&p.id);
+                    let local_id = secondary_index.index_by(&event_id);
 
                     buffer.put_cf(
                         &self.reference_to_cf_handle(ReferenceType::RefPublicKey)?,
                         foreign_id,
-                        *event_id,
+                        event_id.deref(),
                     );
                     buffer.put_cf(
                         &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
@@ -166,13 +168,13 @@ impl<'a> Storage<'a, ResultSet<'a>> for RocksDb {
                     );
                 }
                 Tag::Event(e) => {
-                    let foreign_id = secondary_index.new_id(&e.id);
-                    let local_id = secondary_index.new_id(&event_id);
+                    let foreign_id = secondary_index.index_by(&e.id);
+                    let local_id = secondary_index.index_by(&event_id);
 
                     buffer.put_cf(
                         &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
                         foreign_id,
-                        *event_id,
+                        event_id.deref(),
                     );
                     buffer.put_cf(
                         &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
@@ -197,62 +199,68 @@ impl<'a> Storage<'a, ResultSet<'a>> for RocksDb {
             .transpose()?)
     }
 
-    fn get_by_filter(&'a self, mut query: Filter) -> Result<ResultSet<'a>, Error> {
+    fn get_by_filter(&'a self, mut query: Filter) -> Result<WrapperIterator<'a>, Error> {
         let limit = if query.limit == 0 {
             None
         } else {
             Some(query.limit.try_into()?)
         };
 
-        let (namespace, prefixes) = if !query.references_to_event.is_empty() {
-            let ns: Arc<BoundColumnFamily<'_>> =
-                self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-            let keys = query
-                .references_to_event
-                .iter()
-                .map(|c| c.as_ref().to_vec())
-                .collect();
-            query.references_to_event.clear();
-            (Some(ns), keys)
-        } else if !query.references_to_public_key.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-            let keys = query
-                .references_to_public_key
-                .iter()
-                .map(|c| c.as_ref().to_vec())
-                .collect();
-            query.references_to_public_key.clear();
-            (Some(ns), keys)
-        } else if !query.ids.is_empty() {
-            let keys = query.ids.iter().map(|c| c.as_ref().to_vec()).collect();
-            query.ids.clear();
-            (None, keys)
-        } else if !query.authors.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
-            let keys = query.authors.iter().map(|c| c.as_ref().to_vec()).collect();
-            query.authors.clear();
-            (Some(ns), keys)
-        } else if !query.kinds.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
-            let keys = query
-                .kinds
-                .iter()
-                .map(|kind| {
-                    let kind: u32 = (*kind).into();
-                    kind.to_be_bytes().to_vec()
-                })
-                .collect();
-            query.kinds.clear();
-            (Some(ns), keys)
-        } else {
-            (None, VecDeque::new())
-        };
-
-        Ok(ResultSet {
+        let (namespace, secondary_index_iterator, prefixes) =
+            if !query.references_to_event.is_empty() {
+                let ns: Arc<BoundColumnFamily<'_>> =
+                    self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+                let keys = query
+                    .references_to_event
+                    .iter()
+                    .map(|c| c.as_ref().to_vec())
+                    .collect();
+                query.references_to_event.clear();
+                (Some(ns), None, keys)
+            } else if !query.references_to_public_key.is_empty() {
+                let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+                let keys = query
+                    .references_to_public_key
+                    .iter()
+                    .map(|c| c.as_ref().to_vec())
+                    .collect();
+                query.references_to_public_key.clear();
+                (Some(ns), None, keys)
+            } else if !query.ids.is_empty() {
+                let keys = query.ids.iter().map(|c| c.as_ref().to_vec()).collect();
+                query.ids.clear();
+                (None, None, keys)
+            } else if !query.authors.is_empty() {
+                let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
+                let keys = query.authors.iter().map(|c| c.as_ref().to_vec()).collect();
+                query.authors.clear();
+                (Some(ns), None, keys)
+            } else if !query.kinds.is_empty() {
+                let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
+                let keys = query
+                    .kinds
+                    .iter()
+                    .map(|kind| {
+                        let kind: u32 = (*kind).into();
+                        kind.to_be_bytes().to_vec()
+                    })
+                    .collect();
+                query.kinds.clear();
+                (Some(ns), None, keys)
+            } else {
+                let cf_handle = self.reference_to_cf_handle(ReferenceType::Stream)?;
+                (
+                    None,
+                    Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
+                    VecDeque::new(),
+                )
+            };
+
+        Ok(WrapperIterator {
             db: self,
-            filter: query.into(),
+            filter: Some(query.into()),
             namespace,
-            secondary_index_lookup: None,
+            secondary_index_iterator,
             current_prefix: vec![],
             prefixes,
             returned: 0,
@@ -342,4 +350,10 @@ mod test {
         let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
         test::filter_by_author_and_kinds(&db)
     }
+
+    #[test]
+    fn get_local_events() {
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::get_local_events(&db)
+    }
 }

+ 1 - 1
crates/storage/src/secondary_index.rs

@@ -27,7 +27,7 @@ impl<'a> SecondaryIndex<'a> {
         }
     }
 
-    pub fn new_id<T: AsRef<[u8]>>(&self, index_by: T) -> Vec<u8> {
+    pub fn index_by<T: AsRef<[u8]>>(&self, index_by: T) -> Vec<u8> {
         let mut new_id = index_by.as_ref().to_vec();
         new_id.extend_from_slice(&self.sort_by);
         new_id.extend_from_slice(self.entry_id.as_ref());

+ 1 - 3
crates/storage/src/storage.rs

@@ -25,9 +25,7 @@ where
     fn get_by_filter(&'a self, query: Filter) -> Result<I, Error>;
 
     /// Return a vector of all local events
-    fn get_local_events<C>(&self, for_each: C) -> Result<(), Error>
-    where
-        C: Fn(Event) -> Result<(), Error>;
+    fn get_local_events(&'a self, limit: Option<usize>) -> Result<I, Error>;
 
     /// Stores an event, similar to store(Event), but keeps track of this event in a
     /// local index. This is useful to keep track of the events that are created by