Cesar Rodas hace 3 meses
padre
commit
a7b91d6af5
Se han modificado 2 ficheros con 48 adiciones y 41 borrados
  1. 36 21
      crates/storage/rocksdb/src/cursor.rs
  2. 12 20
      crates/storage/rocksdb/src/lib.rs

+ 36 - 21
crates/storage/rocksdb/src/cursor.rs

@@ -1,8 +1,8 @@
 //! Rocks DB implementation of the storage layer
 use crate::RocksDb;
-use futures::{Future, Stream};
+use futures::Stream;
 use nostr_rs_storage_base::{
-    cursor::{check_future_call, FutureValue},
+    cursor::{check_future_call, FutureResult, FutureValue},
     Error, EventFilter, Storage,
 };
 use nostr_rs_types::types::Event;
@@ -14,44 +14,59 @@ use std::{
     task::{Context, Poll},
 };
 
-type CurrentEventByPrefixFuture<'a> = Pin<
-    Box<
-        dyn Future<
-                Output = Result<Option<nostr_rs_types::types::Event>, nostr_rs_storage_base::Error>,
-            > + Send
-            + 'a,
-    >,
->;
-
 pub struct Cursor<'a> {
     /// Reference to the rocks db database. This is useful to load the event
     /// data, because in the secondary indexes we only store the event ID. It
     /// could be possible to avoid a reference and speed up things even more to
     /// duplicate the event, but that would mean it would use way more disk
     /// space.
-    pub db: &'a RocksDb,
+    db: &'a RocksDb,
     /// List of filters to apply to the events, before returning. If no filter
     /// is given each events from the secondary index will be returned,
     /// otherwise the events will be filtered by the given filter, and only
     /// those events that comply will be returned
-    pub filter: Option<EventFilter>,
+    filter: Option<EventFilter>,
     /// Reference to the namespace to use to query the secondary index. If none
     /// is given the secondary_index_iterator must be constructed outside this
     /// wrapper.
-    pub namespace: Option<Arc<BoundColumnFamily<'a>>>,
+    namespace: Option<Arc<BoundColumnFamily<'a>>>,
     /// The current secondary index iterator. If none is given the iterator will
     /// try to create one using the namespace property and the first prefix from
     /// prefixes (it will also be copied to current_prefix)
-    pub secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
-    pub current_prefix: Vec<u8>,
-    pub prefixes: VecDeque<Vec<u8>>,
-    pub limit: Option<usize>,
-    pub returned: usize,
-
-    pub future_event: Option<CurrentEventByPrefixFuture<'a>>,
+    secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
+    /// The current prefix to load
+    current_prefix: Vec<u8>,
+    /// Prefixes
+    prefixes: VecDeque<Vec<u8>>,
+    /// Limit of events to return, None means nothing
+    limit: Option<usize>,
+    /// Amount of events returned
+    returned: usize,
+    /// Future event to return
+    future_event: Option<FutureResult<'a>>,
 }
 
 impl<'a> Cursor<'a> {
+    pub fn new(
+        db: &'a RocksDb,
+        namespace: Option<Arc<BoundColumnFamily<'a>>>,
+        prefixes: Vec<Vec<u8>>,
+        filter: Option<EventFilter>,
+        secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
+        limit: Option<usize>,
+    ) -> Self {
+        Self {
+            db,
+            namespace,
+            secondary_index_iterator,
+            current_prefix: Vec::new(),
+            prefixes: prefixes.into(),
+            filter,
+            limit,
+            returned: 0,
+            future_event: None,
+        }
+    }
     /// Selects the next prefix available and starts an iterator using the
     /// secondary index. If no prefix is available from prefixes the functions
     /// return None, signalling upstream the are no more results

+ 12 - 20
crates/storage/rocksdb/src/lib.rs

@@ -94,17 +94,14 @@ impl Storage for RocksDb {
 
     async fn get_local_events(&self, limit: Option<usize>) -> Result<Cursor<'_>, Error> {
         let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
-        Ok(Cursor {
-            db: self,
-            filter: None,
-            namespace: None,
-            secondary_index_iterator: Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
-            current_prefix: vec![],
-            prefixes: VecDeque::new(),
+        Ok(Cursor::new(
+            self,
+            None,
+            vec![],
+            None,
+            Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
             limit,
-            returned: 0,
-            future_event: None,
-        })
+        ))
     }
 
     async fn set_local_event(&self, event: &Event) -> Result<(), Error> {
@@ -276,19 +273,14 @@ impl Storage for RocksDb {
                 )
             };
 
-        Ok(Cursor {
-            db: self,
-            filter: Some(query.into()),
+        Ok(Cursor::new(
+            self,
             namespace,
+            prefixes.into(),
+            Some(query.into()),
             secondary_index_iterator,
-            current_prefix: vec![],
-            prefixes,
-            returned: 0,
             limit,
-            future_event: None,
-        })
-
-        //load_events_and_filter(self, query, event_ids, for_each)
+        ))
     }
 }