Переглянути джерело

Extended interface to allow better support for background tasks

Added `is_flushing` to signal upstream the storage is still busy persisting stuff or building indexes
Cesar Rodas 3 місяців тому
батько
коміт
8d73358733

+ 2 - 1
crates/storage/base/src/cursor.rs

@@ -11,6 +11,7 @@ pub enum FutureValue {
     Found(Result<Event, Error>),
     Pending,
     Ended,
+    NotFound,
     FoundNotMatch,
 }
 
@@ -23,7 +24,7 @@ pub fn check_future_call(
 ) -> FutureValue {
     if let Some(mut inner_future) = future_event.take() {
         match inner_future.poll_unpin(cx) {
-            Poll::Ready(Ok(None)) => FutureValue::FoundNotMatch,
+            Poll::Ready(Ok(None)) => FutureValue::NotFound,
             Poll::Ready(Ok(Some(event))) => {
                 // event is ready, apply the neccesary filters
                 if let Some(filter) = filter {

+ 6 - 1
crates/storage/base/src/lib.rs

@@ -8,14 +8,19 @@ pub mod cursor;
 mod error;
 mod event_filter;
 mod notification;
+mod secondary_index;
 mod storage;
+
 #[cfg(feature = "test")]
 pub mod test;
 
 #[cfg(feature = "test")]
 pub use tokio;
 
-pub use crate::{error::Error, event_filter::*, notification::Subscription, storage::Storage};
+pub use crate::{
+    error::Error, event_filter::*, notification::Subscription, secondary_index::SecondaryIndex,
+    storage::Storage,
+};
 
 #[macro_export]
 /// This macro creates the

+ 0 - 0
crates/storage/rocksdb/src/secondary_index.rs → crates/storage/base/src/secondary_index.rs


+ 5 - 0
crates/storage/base/src/storage.rs

@@ -10,6 +10,11 @@ pub trait Storage: Send + Sync {
     where
         Self: 'a;
 
+    /// Returns true if the storage is flushing data to disk
+    fn is_flushing(&self) -> bool {
+        false
+    }
+
     /// Stores a new event into the database. This action will also creates all
     /// the needed indexes to find this event later by reference, author, kind or tag.
     async fn store(&self, event: &Event) -> Result<bool, Error>;

+ 4 - 0
crates/storage/base/src/test.rs

@@ -22,6 +22,10 @@ where
     for event in events {
         assert!(db.store(&event).await.expect("valid"));
     }
+
+    while db.is_flushing() {
+        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+    }
 }
 
 pub async fn store_and_get<T>(db: &T)

+ 23 - 23
crates/storage/rocksdb/src/cursor.rs

@@ -29,15 +29,15 @@ pub struct Cursor<'a> {
     /// Reference to the namespace to use to query the secondary index. If none
     /// is given the secondary_index_iterator must be constructed outside this
     /// wrapper.
-    namespace: Option<Arc<BoundColumnFamily<'a>>>,
+    index: Option<Arc<BoundColumnFamily<'a>>>,
     /// The current secondary index iterator. If none is given the iterator will
     /// try to create one using the namespace property and the first prefix from
     /// prefixes (it will also be copied to current_prefix)
-    secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
+    index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
+    /// Index key or prefix to iterate over in the index
+    index_keys: VecDeque<Vec<u8>>,
     /// The current prefix to load
-    current_prefix: Vec<u8>,
-    /// Prefixes
-    prefixes: VecDeque<Vec<u8>>,
+    current_index_key: Vec<u8>,
     /// Limit of events to return, None means nothing
     limit: Option<usize>,
     /// Amount of events returned
@@ -49,7 +49,7 @@ pub struct Cursor<'a> {
 impl<'a> Cursor<'a> {
     pub fn new(
         db: &'a RocksDb,
-        namespace: Option<Arc<BoundColumnFamily<'a>>>,
+        index: Option<Arc<BoundColumnFamily<'a>>>,
         prefixes: Vec<Vec<u8>>,
         filter: Option<EventFilter>,
         secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
@@ -57,10 +57,10 @@ impl<'a> Cursor<'a> {
     ) -> Self {
         Self {
             db,
-            namespace,
-            secondary_index_iterator,
-            current_prefix: Vec::new(),
-            prefixes: prefixes.into(),
+            index,
+            index_iterator: secondary_index_iterator,
+            current_index_key: Vec::new(),
+            index_keys: prefixes.into(),
             filter,
             limit,
             returned: 0,
@@ -71,14 +71,14 @@ impl<'a> Cursor<'a> {
     /// secondary index. If no prefix is available from prefixes the functions
     /// return None, signalling upstream the are no more results
     fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
-        self.secondary_index_iterator = None;
-        let prefix = self.prefixes.pop_front()?;
-        self.secondary_index_iterator = Some(
+        self.index_iterator = None;
+        let prefix = self.index_keys.pop_front()?;
+        self.index_iterator = Some(
             self.db
                 .db
-                .prefix_iterator_cf(self.namespace.as_ref()?, prefix.clone()),
+                .prefix_iterator_cf(self.index.as_ref()?, prefix.clone()),
         );
-        self.current_prefix = prefix;
+        self.current_index_key = prefix;
         Some(())
     }
 }
@@ -104,15 +104,15 @@ impl<'a> Stream for Cursor<'a> {
                 return Poll::Ready(Some(event));
             }
             FutureValue::Pending => return Poll::Pending,
-            FutureValue::FoundNotMatch | FutureValue::Ended => {}
+            _ => {}
         }
 
         loop {
-            let secondary_index = if let Some(iterator) = this.secondary_index_iterator.as_mut() {
+            let secondary_index = if let Some(iterator) = this.index_iterator.as_mut() {
                 iterator
-            } else if this.namespace.is_some() {
+            } else if this.index.is_some() {
                 let _ = this.select_next_prefix_using_secondary_index();
-                if let Some(iterator) = this.secondary_index_iterator.as_mut() {
+                if let Some(iterator) = this.index_iterator.as_mut() {
                     iterator
                 } else {
                     return Poll::Ready(None);
@@ -121,7 +121,7 @@ impl<'a> Stream for Cursor<'a> {
                 // No secondary index is used to query, this means the query is
                 // using the ID filter, so it is more efficient to use the
                 // primary index to prefetch events that may satisfy the query
-                return if let Some(prefix) = this.prefixes.pop_front() {
+                return if let Some(prefix) = this.index_keys.pop_front() {
                     this.future_event = Some(db.get_event(prefix));
                     match check_future_call(&mut this.future_event, &this.filter, cx) {
                         FutureValue::Found(event) => {
@@ -129,7 +129,7 @@ impl<'a> Stream for Cursor<'a> {
                             Poll::Ready(Some(event))
                         }
                         FutureValue::Pending => Poll::Pending,
-                        FutureValue::FoundNotMatch | FutureValue::Ended => continue,
+                        _ => continue,
                     }
                 } else {
                     Poll::Ready(None)
@@ -138,7 +138,7 @@ impl<'a> Stream for Cursor<'a> {
 
             return match secondary_index.next() {
                 Some(Ok((key, value))) => {
-                    if !key.starts_with(&this.current_prefix) {
+                    if !key.starts_with(&this.current_index_key) {
                         let _ = this.select_next_prefix_using_secondary_index();
                         continue;
                     }
@@ -150,7 +150,7 @@ impl<'a> Stream for Cursor<'a> {
                             Poll::Ready(Some(event))
                         }
                         FutureValue::Pending => Poll::Pending,
-                        FutureValue::FoundNotMatch | FutureValue::Ended => continue,
+                        _ => continue,
                     }
                 }
                 Some(Err(err)) => {

+ 52 - 53
crates/storage/rocksdb/src/lib.rs

@@ -1,6 +1,6 @@
 //! Rocks DB implementation of the storage layer
-use crate::{cursor::Cursor, secondary_index::SecondaryIndex};
-use nostr_rs_storage_base::{Error, Storage};
+use crate::cursor::Cursor;
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
 use nostr_rs_types::types::{Event, Filter, Tag};
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
@@ -9,7 +9,6 @@ use rocksdb::{
 use std::{collections::VecDeque, ops::Deref, path::Path, sync::Arc};
 
 mod cursor;
-mod secondary_index;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
 enum ReferenceType {
@@ -223,59 +222,59 @@ impl Storage for RocksDb {
             Some(query.limit.try_into()?)
         };
 
-        let (namespace, secondary_index_iterator, prefixes) =
-            if !query.references_to_event.is_empty() {
-                let ns: Arc<BoundColumnFamily<'_>> =
-                    self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-                let keys = query
-                    .references_to_event
-                    .iter()
-                    .map(|c| c.as_ref().to_vec())
-                    .collect();
-                query.references_to_event.clear();
-                (Some(ns), None, keys)
-            } else if !query.references_to_public_key.is_empty() {
-                let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-                let keys = query
-                    .references_to_public_key
-                    .iter()
-                    .map(|c| c.as_ref().to_vec())
-                    .collect();
-                query.references_to_public_key.clear();
-                (Some(ns), None, keys)
-            } else if !query.ids.is_empty() {
-                let keys = query.ids.iter().map(|c| c.as_ref().to_vec()).collect();
-                query.ids.clear();
-                (None, None, keys)
-            } else if !query.authors.is_empty() {
-                let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
-                let keys = query.authors.iter().map(|c| c.as_ref().to_vec()).collect();
-                query.authors.clear();
-                (Some(ns), None, keys)
-            } else if !query.kinds.is_empty() {
-                let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
-                let keys = query
-                    .kinds
-                    .iter()
-                    .map(|kind| {
-                        let kind: u32 = (*kind).into();
-                        kind.to_be_bytes().to_vec()
-                    })
-                    .collect();
-                query.kinds.clear();
-                (Some(ns), None, keys)
-            } else {
-                let cf_handle = self.reference_to_cf_handle(ReferenceType::Stream)?;
-                (
-                    None,
-                    Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
-                    VecDeque::new(),
-                )
-            };
+        let (index, secondary_index_iterator, prefixes) = if !query.references_to_event.is_empty() {
+            let ns: Arc<BoundColumnFamily<'_>> =
+                self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+
+            let keys = query
+                .references_to_event
+                .iter()
+                .map(|c| c.as_ref().to_vec())
+                .collect();
+            query.references_to_event.clear();
+            (Some(ns), None, keys)
+        } else if !query.references_to_public_key.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+            let keys = query
+                .references_to_public_key
+                .iter()
+                .map(|c| c.as_ref().to_vec())
+                .collect();
+            query.references_to_public_key.clear();
+            (Some(ns), None, keys)
+        } else if !query.ids.is_empty() {
+            let keys = query.ids.iter().map(|c| c.as_ref().to_vec()).collect();
+            query.ids.clear();
+            (None, None, keys)
+        } else if !query.authors.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
+            let keys = query.authors.iter().map(|c| c.as_ref().to_vec()).collect();
+            query.authors.clear();
+            (Some(ns), None, keys)
+        } else if !query.kinds.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
+            let keys = query
+                .kinds
+                .iter()
+                .map(|kind| {
+                    let kind: u32 = (*kind).into();
+                    kind.to_be_bytes().to_vec()
+                })
+                .collect();
+            query.kinds.clear();
+            (Some(ns), None, keys)
+        } else {
+            let cf_handle = self.reference_to_cf_handle(ReferenceType::Stream)?;
+            (
+                None,
+                Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
+                VecDeque::new(),
+            )
+        };
 
         Ok(Cursor::new(
             self,
-            namespace,
+            index,
             prefixes.into(),
             Some(query.into()),
             secondary_index_iterator,