Przeglądaj źródła

Make Cursor trait send

Cesar Rodas 3 miesięcy temu
rodzic
commit
d7f2a5347b

+ 1 - 1
crates/storage/base/src/storage.rs

@@ -6,7 +6,7 @@ use nostr_rs_types::types::{Event, Filter};
 #[async_trait::async_trait]
 pub trait Storage: Send + Sync {
     /// Result iterators
-    type Cursor<'a>: Stream<Item = Result<Event, Error>> + Unpin
+    type Cursor<'a>: Stream<Item = Result<Event, Error>> + Unpin + Send
     where
         Self: 'a;
 

+ 9 - 6
crates/storage/rocksdb/src/cursor.rs

@@ -1,16 +1,15 @@
 //! Rocks DB implementation of the storage layer
-use crate::RocksDb;
+use crate::{ReferenceType, RocksDb};
 use futures::Stream;
 use nostr_rs_storage_base::{
     cursor::{check_future_call, FutureResult, FutureValue},
     Error, EventFilter, Storage,
 };
 use nostr_rs_types::types::Event;
-use rocksdb::{BoundColumnFamily, DBIteratorWithThreadMode, DB};
+use rocksdb::{DBIteratorWithThreadMode, DB};
 use std::{
     collections::VecDeque,
     pin::Pin,
-    sync::Arc,
     task::{Context, Poll},
 };
 
@@ -29,7 +28,7 @@ pub struct Cursor<'a> {
     /// Reference to the namespace to use to query the secondary index. If none
     /// is given the secondary_index_iterator must be constructed outside this
     /// wrapper.
-    index: Option<Arc<BoundColumnFamily<'a>>>,
+    index: Option<ReferenceType>,
     /// The current secondary index iterator. If none is given the iterator will
     /// try to create one using the namespace property and the first prefix from
     /// prefixes (it will also be copied to current_prefix)
@@ -49,7 +48,7 @@ pub struct Cursor<'a> {
 impl<'a> Cursor<'a> {
     pub fn new(
         db: &'a RocksDb,
-        index: Option<Arc<BoundColumnFamily<'a>>>,
+        index: Option<ReferenceType>,
         prefixes: Vec<Vec<u8>>,
         filter: Option<EventFilter>,
         secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
@@ -73,10 +72,14 @@ impl<'a> Cursor<'a> {
     fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
         self.index_iterator = None;
         let prefix = self.index_keys.pop_front()?;
+        let index = self
+            .index
+            .map(|index| self.db.reference_to_cf_handle(index).ok())?;
+
         self.index_iterator = Some(
             self.db
                 .db
-                .prefix_iterator_cf(self.index.as_ref()?, prefix.clone()),
+                .prefix_iterator_cf(index.as_ref()?, prefix.clone()),
         );
         self.current_index_key = prefix;
         Some(())

+ 6 - 11
crates/storage/rocksdb/src/lib.rs

@@ -11,7 +11,8 @@ use std::{collections::VecDeque, ops::Deref, path::Path, sync::Arc};
 mod cursor;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
-enum ReferenceType {
+/// Internal index name
+pub enum ReferenceType {
     Events,
     Author,
     RefPublicKey,
@@ -223,21 +224,17 @@ impl Storage for RocksDb {
         };
 
         let (index, secondary_index_iterator, prefixes) = if !query.references_to_event.is_empty() {
-            let ns: Arc<BoundColumnFamily<'_>> =
-                self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-
             let keys = std::mem::take(&mut query.references_to_event)
                 .into_iter()
                 .map(|c| c.take())
                 .collect();
-            (Some(ns), None, keys)
+            (Some(ReferenceType::RefEvent), None, keys)
         } else if !query.references_to_public_key.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::RefPublicKey)?;
             let keys = std::mem::take(&mut query.references_to_public_key)
                 .into_iter()
                 .map(|c| c.take())
                 .collect();
-            (Some(ns), None, keys)
+            (Some(ReferenceType::RefPublicKey), None, keys)
         } else if !query.ids.is_empty() {
             let keys = std::mem::take(&mut query.ids)
                 .into_iter()
@@ -245,14 +242,12 @@ impl Storage for RocksDb {
                 .collect();
             (None, None, keys)
         } else if !query.authors.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
             let keys = std::mem::take(&mut query.authors)
                 .into_iter()
                 .map(|c| c.take())
                 .collect();
-            (Some(ns), None, keys)
+            (Some(ReferenceType::Author), None, keys)
         } else if !query.kinds.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
             let keys = std::mem::take(&mut query.kinds)
                 .into_iter()
                 .map(|kind| {
@@ -260,7 +255,7 @@ impl Storage for RocksDb {
                     kind.to_be_bytes().to_vec()
                 })
                 .collect();
-            (Some(ns), None, keys)
+            (Some(ReferenceType::Kind), None, keys)
         } else {
             let cf_handle = self.reference_to_cf_handle(ReferenceType::Stream)?;
             (