Sfoglia il codice sorgente

Use a Stream instead of iterator

Cesar Rodas 3 mesi fa
parent
commit
17baa423d4

+ 19 - 18
Cargo.lock

@@ -373,9 +373,9 @@ dependencies = [
 
 [[package]]
 name = "futures"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
+checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
 dependencies = [
  "futures-channel",
  "futures-core",
@@ -388,9 +388,9 @@ dependencies = [
 
 [[package]]
 name = "futures-channel"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
+checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
 dependencies = [
  "futures-core",
  "futures-sink",
@@ -398,15 +398,15 @@ dependencies = [
 
 [[package]]
 name = "futures-core"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
+checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
 
 [[package]]
 name = "futures-executor"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
+checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
 dependencies = [
  "futures-core",
  "futures-task",
@@ -415,15 +415,15 @@ dependencies = [
 
 [[package]]
 name = "futures-io"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
+checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
 
 [[package]]
 name = "futures-macro"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
+checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -432,21 +432,21 @@ dependencies = [
 
 [[package]]
 name = "futures-sink"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
+checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
 
 [[package]]
 name = "futures-task"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
+checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
 
 [[package]]
 name = "futures-util"
-version = "0.3.28"
+version = "0.3.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
+checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
 dependencies = [
  "futures-channel",
  "futures-core",
@@ -979,6 +979,7 @@ name = "nostr-rs-storage-base"
 version = "0.1.0"
 dependencies = [
  "async-trait",
+ "futures",
  "nostr-rs-types",
  "parking_lot",
  "rand",

+ 1 - 0
crates/storage/base/Cargo.toml

@@ -11,6 +11,7 @@ tokio = { version = "1.32.0", features = ["sync"] }
 parking_lot = "0.12.1"
 serde_json = "1.0"
 async-trait = "0.1.81"
+futures = "0.3.30"
 
 [features]
 default = []

+ 23 - 7
crates/storage/base/src/notification.rs

@@ -1,7 +1,13 @@
 use crate::{Error, Storage};
+use futures::Stream;
 use nostr_rs_types::types::{Addr, Event, Filter, Kind};
 use parking_lot::RwLock;
-use std::{collections::HashMap, sync::atomic::AtomicUsize};
+use std::{
+    collections::HashMap,
+    pin::Pin,
+    sync::atomic::AtomicUsize,
+    task::{Context, Poll},
+};
 use tokio::sync::mpsc::Sender;
 
 #[allow(dead_code)]
@@ -31,18 +37,28 @@ where
     last_id: AtomicUsize,
 }
 
-pub struct SubscriptionResultFromDb<I: Iterator> {
+pub struct SubscriptionResultFromDb<I>
+where
+    I: Stream<Item = Result<Event, Error>>,
+{
     iterator: I,
 }
 
-impl<I> Iterator for SubscriptionResultFromDb<I>
+impl<I> Stream for SubscriptionResultFromDb<I>
 where
-    I: Iterator<Item = Result<Event, Error>>,
+    I: Stream<Item = Result<Event, Error>>,
 {
     type Item = Result<Event, Error>;
 
-    fn next(&mut self) -> Option<Self::Item> {
-        self.iterator.next()
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        // Safety: it's safe to use Pin::map_unchecked_mut because the iterator field
+        // is pinned as part of the SubscriptionResultFromDb struct
+        let iterator = unsafe { self.map_unchecked_mut(|s| &mut s.iterator) };
+        iterator.poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.iterator.size_hint()
     }
 }
 
@@ -81,7 +97,7 @@ where
         &self,
         filter: Filter,
         sender: Sender<(usize, Event)>,
-    ) -> Result<(usize, SubscriptionResultFromDb<T::Iterator<'_>>), Error> {
+    ) -> Result<(usize, SubscriptionResultFromDb<T::Stream<'_>>), Error> {
         let mut subscribers = self.subscriptions.write();
         let mut _subscription_listener = self.subscription_listener.write();
         let id = self

+ 4 - 3
crates/storage/base/src/storage.rs

@@ -1,11 +1,12 @@
 use crate::Error;
+use futures::Stream;
 use nostr_rs_types::types::{Event, Filter};
 
 /// Trait to store/query events
 #[async_trait::async_trait]
 pub trait Storage: Send + Sync {
     /// Result iterators
-    type Iterator<'a>: Iterator<Item = Result<Event, Error>>
+    type Stream<'a>: Stream<Item = Result<Event, Error>>
     where
         Self: 'a;
 
@@ -25,10 +26,10 @@ pub trait Storage: Send + Sync {
     /// The first step is to use one available index to get a list of event-IDs,
     /// then call `load_and_filter_events` that will load the events from the
     /// `Events` namespace and will filter them by the given parameters.
-    async fn get_by_filter(&self, query: Filter) -> Result<Self::Iterator<'_>, Error>;
+    async fn get_by_filter(&self, query: Filter) -> Result<Self::Stream<'_>, Error>;
 
     /// Return a vector of all local events
-    async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Iterator<'_>, Error>;
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Stream<'_>, Error>;
 
     /// Stores an event, similar to store(Event), but keeps track of this event in a
     /// local index. This is useful to keep track of the events that are created by

+ 1 - 1
crates/storage/rocksdb/src/lib.rs

@@ -91,7 +91,7 @@ impl RocksDb {
 
 #[async_trait::async_trait]
 impl Storage for RocksDb {
-    type Iterator<'a> = WrapperIterator<'a>;
+    type Stream<'a> = WrapperIterator<'a>;
     async fn get_local_events(&self, limit: Option<usize>) -> Result<WrapperIterator<'_>, Error> {
         let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
         Ok(WrapperIterator {