Bladeren bron

Merge branch 'memory-storage' of cesar/nostr-prototype into main

Cesar Rodas 3 maanden geleden
bovenliggende
commit
471477f74c

+ 60 - 88
Cargo.lock

@@ -181,17 +181,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
 name = "chrono"
-version = "0.4.26"
+version = "0.4.38"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5"
+checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
 dependencies = [
  "android-tzdata",
  "iana-time-zone",
  "js-sys",
  "num-traits",
- "time",
  "wasm-bindgen",
- "winapi",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -478,7 +477,7 @@ checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
 dependencies = [
  "cfg-if",
  "libc",
- "wasi 0.11.0+wasi-snapshot-preview1",
+ "wasi",
 ]
 
 [[package]]
@@ -526,18 +525,9 @@ checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
 
 [[package]]
 name = "hermit-abi"
-version = "0.2.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
-dependencies = [
- "libc",
-]
-
-[[package]]
-name = "hermit-abi"
-version = "0.3.1"
+version = "0.3.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
+checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
 
 [[package]]
 name = "hex"
@@ -708,7 +698,7 @@ version = "0.4.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "21b6b32576413a8e69b90e952e4a026476040d81017b80445deda5f2d3921857"
 dependencies = [
- "hermit-abi 0.3.1",
+ "hermit-abi",
  "io-lifetimes",
  "rustix",
  "windows-sys 0.45.0",
@@ -752,9 +742,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
 
 [[package]]
 name = "libc"
-version = "0.2.147"
+version = "0.2.155"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
+checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
 
 [[package]]
 name = "libloading"
@@ -860,14 +850,14 @@ dependencies = [
 
 [[package]]
 name = "mio"
-version = "0.8.6"
+version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
+checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4"
 dependencies = [
+ "hermit-abi",
  "libc",
- "log",
- "wasi 0.11.0+wasi-snapshot-preview1",
- "windows-sys 0.45.0",
+ "wasi",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -941,8 +931,11 @@ dependencies = [
 name = "nostr-rs-memory"
 version = "0.1.0"
 dependencies = [
+ "async-trait",
+ "futures",
  "nostr-rs-storage-base",
  "nostr-rs-types",
+ "tokio",
 ]
 
 [[package]]
@@ -980,9 +973,9 @@ name = "nostr-rs-storage-base"
 version = "0.1.0"
 dependencies = [
  "async-trait",
+ "chrono",
  "futures",
  "nostr-rs-types",
- "parking_lot",
  "rand",
  "serde_json",
  "thiserror",
@@ -1017,16 +1010,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "num_cpus"
-version = "1.15.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
-dependencies = [
- "hermit-abi 0.2.6",
- "libc",
-]
-
-[[package]]
 name = "object"
 version = "0.32.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1447,12 +1430,12 @@ dependencies = [
 
 [[package]]
 name = "socket2"
-version = "0.5.3"
+version = "0.5.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
+checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
 dependencies = [
  "libc",
- "windows-sys 0.48.0",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -1513,17 +1496,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "time"
-version = "0.1.45"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
-dependencies = [
- "libc",
- "wasi 0.10.0+wasi-snapshot-preview1",
- "winapi",
-]
-
-[[package]]
 name = "tinyvec"
 version = "1.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1540,28 +1512,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.32.0"
+version = "1.39.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
+checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1"
 dependencies = [
  "backtrace",
  "bytes",
  "libc",
  "mio",
- "num_cpus",
  "parking_lot",
  "pin-project-lite",
  "signal-hook-registry",
- "socket2 0.5.3",
+ "socket2 0.5.7",
  "tokio-macros",
- "windows-sys 0.48.0",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
 name = "tokio-macros"
-version = "2.1.0"
+version = "2.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
+checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1777,12 +1748,6 @@ dependencies = [
 
 [[package]]
 name = "wasi"
-version = "0.10.0+wasi-snapshot-preview1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
-
-[[package]]
-name = "wasi"
 version = "0.11.0+wasi-snapshot-preview1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
@@ -1918,11 +1883,11 @@ dependencies = [
 
 [[package]]
 name = "windows-sys"
-version = "0.48.0"
+version = "0.52.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
+checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
 dependencies = [
- "windows-targets 0.48.5",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -1942,17 +1907,18 @@ dependencies = [
 
 [[package]]
 name = "windows-targets"
-version = "0.48.5"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
+checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
 dependencies = [
- "windows_aarch64_gnullvm 0.48.5",
- "windows_aarch64_msvc 0.48.5",
- "windows_i686_gnu 0.48.5",
- "windows_i686_msvc 0.48.5",
- "windows_x86_64_gnu 0.48.5",
- "windows_x86_64_gnullvm 0.48.5",
- "windows_x86_64_msvc 0.48.5",
+ "windows_aarch64_gnullvm 0.52.6",
+ "windows_aarch64_msvc 0.52.6",
+ "windows_i686_gnu 0.52.6",
+ "windows_i686_gnullvm",
+ "windows_i686_msvc 0.52.6",
+ "windows_x86_64_gnu 0.52.6",
+ "windows_x86_64_gnullvm 0.52.6",
+ "windows_x86_64_msvc 0.52.6",
 ]
 
 [[package]]
@@ -1963,9 +1929,9 @@ checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
 
 [[package]]
 name = "windows_aarch64_gnullvm"
-version = "0.48.5"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
+checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
 
 [[package]]
 name = "windows_aarch64_msvc"
@@ -1975,9 +1941,9 @@ checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
 
 [[package]]
 name = "windows_aarch64_msvc"
-version = "0.48.5"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
+checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
 
 [[package]]
 name = "windows_i686_gnu"
@@ -1987,9 +1953,15 @@ checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
 
 [[package]]
 name = "windows_i686_gnu"
-version = "0.48.5"
+version = "0.52.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
+
+[[package]]
+name = "windows_i686_gnullvm"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
+checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
 
 [[package]]
 name = "windows_i686_msvc"
@@ -1999,9 +1971,9 @@ checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
 
 [[package]]
 name = "windows_i686_msvc"
-version = "0.48.5"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
+checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
 
 [[package]]
 name = "windows_x86_64_gnu"
@@ -2011,9 +1983,9 @@ checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
 
 [[package]]
 name = "windows_x86_64_gnu"
-version = "0.48.5"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
+checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
 
 [[package]]
 name = "windows_x86_64_gnullvm"
@@ -2023,9 +1995,9 @@ checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
 
 [[package]]
 name = "windows_x86_64_gnullvm"
-version = "0.48.5"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
+checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
 
 [[package]]
 name = "windows_x86_64_msvc"
@@ -2035,9 +2007,9 @@ checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
 
 [[package]]
 name = "windows_x86_64_msvc"
-version = "0.48.5"
+version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
+checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
 
 [[package]]
 name = "winnow"

+ 3 - 3
crates/storage/base/Cargo.toml

@@ -7,12 +7,12 @@ edition = "2021"
 nostr-rs-types = { path = "../../types" }
 thiserror = "1.0.40"
 rand = "0.8.5"
-tokio = { version = "1.32.0", features = ["sync"] }
-parking_lot = "0.12.1"
+tokio = { version = "1.32.0", features = ["full"] }
 serde_json = "1.0"
 async-trait = "0.1.81"
 futures = "0.3.30"
+chrono = "0.4.38"
 
 [features]
-default = []
 test = []
+default = []

+ 49 - 0
crates/storage/base/src/cursor.rs

@@ -0,0 +1,49 @@
+use crate::{event_filter::EventFilter, Error};
+use futures::FutureExt;
+use nostr_rs_types::types::{Event, Filter};
+use std::{
+    future::Future,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+pub enum FutureValue {
+    Found(Result<Event, Error>),
+    Pending,
+    Ended,
+    NotFound,
+    FoundNotMatch,
+}
+
+pub type FutureResult<'a> = Pin<Box<dyn Future<Output = Result<Option<Event>, Error>> + Send + 'a>>;
+
+pub fn check_future_call(
+    future_event: &mut Option<FutureResult<'_>>,
+    filter: &Option<EventFilter>,
+    cx: &mut Context<'_>,
+) -> FutureValue {
+    if let Some(mut inner_future) = future_event.take() {
+        match inner_future.poll_unpin(cx) {
+            Poll::Ready(Ok(None)) => FutureValue::NotFound,
+            Poll::Ready(Ok(Some(event))) => {
+                // event is ready, apply the neccesary filters
+                if let Some(filter) = filter {
+                    if filter.check_event(&event) {
+                        FutureValue::Found(Ok(event))
+                    } else {
+                        FutureValue::FoundNotMatch
+                    }
+                } else {
+                    FutureValue::Found(Ok(event))
+                }
+            }
+            Poll::Ready(Err(error)) => FutureValue::Found(Err(error)),
+            Poll::Pending => {
+                *future_event = Some(inner_future);
+                FutureValue::Pending
+            }
+        }
+    } else {
+        FutureValue::Ended
+    }
+}

+ 0 - 0
crates/storage/rocksdb/src/event_filter.rs → crates/storage/base/src/event_filter.rs


+ 36 - 19
crates/storage/base/src/lib.rs

@@ -4,13 +4,23 @@
 //! find events by their tags, kind and references.
 #![allow(missing_docs, warnings)]
 #![allow(dead_code)]
+pub mod cursor;
 mod error;
+mod event_filter;
 mod notification;
+mod secondary_index;
 mod storage;
+
 #[cfg(feature = "test")]
 pub mod test;
 
-pub use crate::{error::Error, notification::Subscription, storage::Storage};
+#[cfg(feature = "test")]
+pub use tokio;
+
+pub use crate::{
+    error::Error, event_filter::*, notification::Subscription, secondary_index::SecondaryIndex,
+    storage::Storage,
+};
 
 #[macro_export]
 /// This macro creates the
@@ -19,6 +29,7 @@ macro_rules! storage_test {
         #[cfg(test)]
         mod test {
             use super::*;
+            use futures::executor::block_on;
             use std::sync::atomic::{AtomicUsize, Ordering};
 
             static LAST_VERSION: AtomicUsize = AtomicUsize::new(0);
@@ -29,11 +40,11 @@ macro_rules! storage_test {
             }
 
             impl DisposableStorage {
-                pub fn new() -> Self {
+                pub async fn new() -> Self {
                     let id = LAST_VERSION.fetch_add(1, Ordering::Relaxed);
                     let path = format!("/tmp/nostr/test-{}", id);
                     Self {
-                        storage: $init(&path),
+                        storage: $init(&path).await,
                         path,
                     }
                 }
@@ -41,21 +52,27 @@ macro_rules! storage_test {
 
             impl std::ops::Drop for DisposableStorage {
                 fn drop(&mut self) {
-                    $destroy(&self.path)
+                    block_on(async move {
+                        $destroy(&self.path).await;
+                    });
                 }
             }
 
-            nostr_rs_storage_base::storage_test_name! { store_and_get }
-            nostr_rs_storage_base::storage_test_name! { records_are_sorted_by_date_desc }
-            nostr_rs_storage_base::storage_test_name! { filter_by_references }
-            nostr_rs_storage_base::storage_test_name! { filter_by_references_zero_match }
-            nostr_rs_storage_base::storage_test_name! { filter_by_references_and_kind }
-            nostr_rs_storage_base::storage_test_name! { filter_by_authors }
-            nostr_rs_storage_base::storage_test_name! { filter_by_author }
-            nostr_rs_storage_base::storage_test_name! { filter_by_author_and_kinds }
-            nostr_rs_storage_base::storage_test_name! { filter_by_kind }
-            nostr_rs_storage_base::storage_test_name! { get_event_and_related_events }
-            nostr_rs_storage_base::storage_test_name! { get_local_events }
+            use nostr_rs_storage_base::tokio;
+
+            nostr_rs_storage_base::storage_test_name!(store_and_get_by_partial_key);
+            nostr_rs_storage_base::storage_test_name!(store_and_get);
+            nostr_rs_storage_base::storage_test_name!(records_are_sorted_by_date_desc_by_default);
+            nostr_rs_storage_base::storage_test_name!(records_are_sorted_by_date_desc);
+            nostr_rs_storage_base::storage_test_name!(filter_by_references);
+            nostr_rs_storage_base::storage_test_name!(filter_by_references_zero_match);
+            nostr_rs_storage_base::storage_test_name!(filter_by_references_and_kind);
+            nostr_rs_storage_base::storage_test_name!(filter_by_authors);
+            nostr_rs_storage_base::storage_test_name!(filter_by_author);
+            nostr_rs_storage_base::storage_test_name!(filter_by_author_and_kinds);
+            nostr_rs_storage_base::storage_test_name!(filter_by_kind);
+            nostr_rs_storage_base::storage_test_name!(get_event_and_related_events);
+            nostr_rs_storage_base::storage_test_name!(get_local_events);
         }
     };
 }
@@ -64,10 +81,10 @@ macro_rules! storage_test {
 /// Creates a
 macro_rules! storage_test_name {
     ($name:tt) => {
-        #[test]
-        fn $name() {
-            let x = DisposableStorage::new();
-            nostr_rs_storage_base::test::$name(&x.storage);
+        #[tokio::test]
+        async fn $name() {
+            let x = DisposableStorage::new().await;
+            nostr_rs_storage_base::test::$name(&x.storage).await;
         }
     };
 }

+ 5 - 6
crates/storage/base/src/notification.rs

@@ -1,14 +1,13 @@
 use crate::{Error, Storage};
 use futures::Stream;
 use nostr_rs_types::types::{Addr, Event, Filter, Kind};
-use parking_lot::RwLock;
 use std::{
     collections::HashMap,
     pin::Pin,
     sync::atomic::AtomicUsize,
     task::{Context, Poll},
 };
-use tokio::sync::mpsc::Sender;
+use tokio::sync::{mpsc::Sender, RwLock};
 
 #[allow(dead_code)]
 struct SubscriptionEntry {
@@ -86,7 +85,7 @@ where
 
     /// Removes a subscription from the listener
     pub async fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
-        let mut subscribers = self.subscriptions.write();
+        let mut subscribers = self.subscriptions.write().await;
         let _ = subscribers.remove(&subscription_id);
         Ok(())
     }
@@ -97,9 +96,9 @@ where
         &self,
         filter: Filter,
         sender: Sender<(usize, Event)>,
-    ) -> Result<(usize, SubscriptionResultFromDb<T::Stream<'_>>), Error> {
-        let mut subscribers = self.subscriptions.write();
-        let mut _subscription_listener = self.subscription_listener.write();
+    ) -> Result<(usize, SubscriptionResultFromDb<T::Cursor<'_>>), Error> {
+        let mut subscribers = self.subscriptions.write().await;
+        let mut _subscription_listener = self.subscription_listener.write().await;
         let id = self
             .last_id
             .fetch_add(1, std::sync::atomic::Ordering::SeqCst);

+ 0 - 0
crates/storage/rocksdb/src/secondary_index.rs → crates/storage/base/src/secondary_index.rs


+ 8 - 3
crates/storage/base/src/storage.rs

@@ -6,10 +6,15 @@ use nostr_rs_types::types::{Event, Filter};
 #[async_trait::async_trait]
 pub trait Storage: Send + Sync {
     /// Result iterators
-    type Stream<'a>: Stream<Item = Result<Event, Error>> + Unpin
+    type Cursor<'a>: Stream<Item = Result<Event, Error>> + Unpin
     where
         Self: 'a;
 
+    /// Returns true if the storage is flushing data to disk
+    fn is_flushing(&self) -> bool {
+        false
+    }
+
     /// Stores a new event into the database. This action will also creates all
     /// the needed indexes to find this event later by reference, author, kind or tag.
     async fn store(&self, event: &Event) -> Result<bool, Error>;
@@ -26,10 +31,10 @@ pub trait Storage: Send + Sync {
     /// The first step is to use one available index to get a list of event-IDs,
     /// then call `load_and_filter_events` that will load the events from the
     /// `Events` namespace and will filter them by the given parameters.
-    async fn get_by_filter(&self, query: Filter) -> Result<Self::Stream<'_>, Error>;
+    async fn get_by_filter(&self, query: Filter) -> Result<Self::Cursor<'_>, Error>;
 
     /// Return a vector of all local events
-    async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Stream<'_>, Error>;
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Cursor<'_>, Error>;
 
     /// Stores an event, similar to store(Event), but keeps track of this event in a
     /// local index. This is useful to keep track of the events that are created by

+ 45 - 1
crates/storage/base/src/test.rs

@@ -22,6 +22,10 @@ where
     for event in events {
         assert!(db.store(&event).await.expect("valid"));
     }
+
+    while db.is_flushing() {
+        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+    }
 }
 
 pub async fn store_and_get<T>(db: &T)
@@ -38,6 +42,46 @@ where
     assert_eq!(event1, Some(event));
 }
 
+pub async fn store_and_get_by_partial_key<T>(db: &T)
+where
+    T: Storage,
+{
+    let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+    let event: Event = serde_json::from_str(json).expect("valid");
+    assert_eq!(true, db.store(&event).await.expect("valid"));
+    assert_eq!(false, db.store(&event).await.expect("valid"));
+
+    let event1 = db.get_event(&event.id[0..2]).await.expect("something");
+    assert_eq!(event1, Some(event));
+}
+
+pub async fn records_are_sorted_by_date_desc_by_default<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
+        .try_into()
+        .expect("pk");
+
+    let vec: Vec<Event> = db
+        .get_by_filter(Filter::default())
+        .await
+        .expect("set of results")
+        .try_collect()
+        .await
+        .expect("valid");
+
+    let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
+    let mut sorted_dates = dates.clone();
+    sorted_dates.sort_by(|a, b| b.cmp(a));
+
+    assert_eq!(vec.len(), 10002);
+    assert_eq!(dates, sorted_dates);
+}
+
 pub async fn records_are_sorted_by_date_desc<T>(db: &T)
 where
     T: Storage,
@@ -152,7 +196,7 @@ pub async fn get_event_and_related_events<T>(db: &T)
 where
     T: Storage,
 {
-    setup_db(db);
+    setup_db(db).await;
 
     let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
         .try_into()

+ 14 - 0
crates/storage/memory/Cargo.toml

@@ -0,0 +1,14 @@
+[package]
+name = "nostr-rs-memory"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+async-trait = "0.1.81"
+futures = "0.3.30"
+nostr-rs-storage-base = { path = "../base" }
+nostr-rs-types = { path = "../../types" }
+tokio = { version = "1.39.2", features = ["full"] }
+
+[dev-dependencies]
+nostr-rs-storage-base = { path = "../base", features = ["test"] }

+ 111 - 0
crates/storage/memory/src/cursor.rs

@@ -0,0 +1,111 @@
+use crate::Memory;
+use futures::Stream;
+use nostr_rs_storage_base::{
+    cursor::{check_future_call, FutureResult, FutureValue},
+    Error, EventFilter, Storage,
+};
+use nostr_rs_types::types::Event;
+use std::{
+    collections::{BTreeMap, VecDeque},
+    pin::Pin,
+    task::{Context, Poll},
+};
+use tokio::sync::RwLockReadGuard;
+
+pub struct Cursor<'a> {
+    pub db: &'a Memory,
+    pub filter: Option<EventFilter>,
+    pub limit: Option<usize>,
+    pub returned: usize,
+    pub index: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,
+    pub index_prefixes: VecDeque<Vec<u8>>,
+    pub last_prefix: Option<Vec<u8>>,
+    pub current_index_key: Vec<u8>,
+    pub future_event: Option<FutureResult<'a>>,
+}
+
+impl<'a> Cursor<'a> {
+    fn get_id_from_index(&mut self) -> Option<Vec<u8>> {
+        if let Some(Some(Some((last_prefix, id)))) = self.last_prefix.take().map(|last_prefix| {
+            // We are iterating over an index and we are half way through, we
+            // know the last key, we start iterating from them and skip 1,
+            // because we already have that key If the key starts with the
+            // current index key, we return it otherwise we go to the else
+            // brancha and the next key in the index_prefixes is used
+            //
+            // This approach is needed since holding a reference to the index Range has been problematic
+            self.index
+                .range(last_prefix..)
+                .nth(1)
+                .map(|(idx_id, value)| {
+                    if idx_id.starts_with(&self.current_index_key) {
+                        Some((idx_id.clone(), value.clone()))
+                    } else {
+                        None
+                    }
+                })
+        }) {
+            self.last_prefix = Some(last_prefix);
+            Some(id)
+        } else {
+            loop {
+                self.current_index_key = self.index_prefixes.pop_front()?;
+                if let Some(Some((last_prefix, id))) = self
+                    .index
+                    .range(self.current_index_key.clone()..)
+                    .next()
+                    .map(|(idx_id, id)| {
+                        if idx_id.starts_with(&self.current_index_key) {
+                            Some((idx_id.clone(), id.clone()))
+                        } else {
+                            None
+                        }
+                    })
+                {
+                    self.last_prefix = Some(last_prefix);
+                    return Some(id);
+                }
+            }
+        }
+    }
+}
+
+impl<'a> Stream for Cursor<'a> {
+    type Item = Result<Event, Error>;
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, None)
+    }
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if Some(self.returned) == self.limit {
+            return Poll::Ready(None);
+        }
+
+        let this = Pin::into_inner(self);
+
+        match check_future_call(&mut this.future_event, &this.filter, cx) {
+            FutureValue::Found(event) => {
+                this.returned += 1;
+                return Poll::Ready(Some(event));
+            }
+            FutureValue::Pending => return Poll::Pending,
+            _ => {}
+        }
+
+        while let Some(id) = this.get_id_from_index() {
+            this.future_event = Some(this.db.get_event(id));
+
+            match check_future_call(&mut this.future_event, &this.filter, cx) {
+                FutureValue::Found(event) => {
+                    this.returned += 1;
+                    return Poll::Ready(Some(event));
+                }
+                FutureValue::Pending => return Poll::Pending,
+                _ => {}
+            }
+        }
+
+        Poll::Ready(None)
+    }
+}

+ 253 - 0
crates/storage/memory/src/lib.rs

@@ -0,0 +1,253 @@
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
+use nostr_rs_types::types::{Event, Filter, Tag};
+use std::{
+    cmp::min,
+    collections::{BTreeMap, VecDeque},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+    u64,
+};
+use tokio::sync::RwLock;
+
+mod cursor;
+
+#[derive(Default)]
+pub struct Indexes {
+    // Vec<u8> is used instead of Id, since references can be partial keys
+    author: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    ref_event: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    ref_pub_key: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    kind: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    ids_by_time: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    /// silly index to store the keys of the indexes, but it helps to use the same code
+    ids: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    local_events: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+}
+
+#[derive(Default)]
+pub struct Memory {
+    db: Arc<RwLock<BTreeMap<[u8; 32], Event>>>,
+    indexes: Arc<Indexes>,
+    indexers_running: Arc<AtomicUsize>,
+}
+
+#[async_trait::async_trait]
+impl Storage for Memory {
+    type Cursor<'a> = cursor::Cursor<'a>;
+
+    fn is_flushing(&self) -> bool {
+        self.indexers_running.load(Ordering::SeqCst) > 0
+    }
+
+    async fn store(&self, event: &Event) -> Result<bool, Error> {
+        let mut db = self.db.write().await;
+        if db.get(&event.id.0).is_some() {
+            return Ok(false);
+        }
+
+        db.insert(event.id.0, event.clone());
+
+        let event = event.clone();
+        let indexes = self.indexes.clone();
+        let indexer_running = self.indexers_running.clone();
+        let _ = indexer_running.fetch_add(1, Ordering::SeqCst);
+        tokio::spawn(async move {
+            // build indexes asynchronously to avoid locking contention and make
+            // `storing` slower, as the stream will hold a read lock while iterating
+            let secondary_index = SecondaryIndex::new(&event.id, event.created_at());
+            let event_id = event.id.clone().0.to_vec();
+
+            let time_desc = u64::MAX
+                .checked_sub(
+                    event
+                        .created_at()
+                        .timestamp_micros()
+                        .try_into()
+                        .unwrap_or_default(),
+                )
+                .unwrap_or_default()
+                .to_be_bytes();
+
+            indexes
+                .ids
+                .write()
+                .await
+                .insert(event_id.clone(), event_id.clone());
+
+            indexes
+                .ids_by_time
+                .write()
+                .await
+                .insert(secondary_index.index_by(&time_desc), event_id.clone());
+
+            indexes
+                .author
+                .write()
+                .await
+                .insert(secondary_index.index_by(event.author()), event_id.clone());
+
+            let kind: u32 = event.kind().into();
+
+            indexes.kind.write().await.insert(
+                secondary_index.index_by(kind.to_be_bytes()),
+                event_id.clone(),
+            );
+
+            for tag in event.tags().iter() {
+                match tag {
+                    Tag::PubKey(pub_key) => {
+                        let foreign_id = secondary_index.index_by(&pub_key.id);
+                        let local_id = secondary_index.index_by(&event_id);
+
+                        indexes
+                            .ref_pub_key
+                            .write()
+                            .await
+                            .insert(foreign_id, event_id.clone());
+
+                        indexes
+                            .ref_event
+                            .write()
+                            .await
+                            .insert(local_id, pub_key.id.to_vec());
+                    }
+                    Tag::Event(foreign_event) => {
+                        let foreign_id = secondary_index.index_by(&foreign_event.id);
+                        let local_id = secondary_index.index_by(&event_id);
+
+                        let mut ref_event = indexes.ref_event.write().await;
+                        ref_event.insert(foreign_id, event_id.clone());
+                        ref_event.insert(local_id, foreign_event.id.to_vec());
+                    }
+                    _ => {}
+                }
+            }
+
+            let _ = indexer_running.fetch_sub(1, Ordering::SeqCst);
+        });
+        Ok(true)
+    }
+
+    async fn set_local_event(&self, event: &Event) -> Result<(), Error> {
+        let mut local_events = self.indexes.local_events.write().await;
+        local_events.insert(
+            SecondaryIndex::new(&event.id, event.created_at()).index_by(&[]),
+            event.id.0.to_vec(),
+        );
+        Ok(())
+    }
+
+    async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error> {
+        let events = self.db.read().await;
+        let mut id_filter = [0u8; 32];
+        let id = id.as_ref();
+        let slice = 0..min(id.len(), 32);
+        id_filter[slice.clone()].copy_from_slice(&id[slice]);
+
+        if let Some((db_id, value)) = events.range(id_filter..).next() {
+            if db_id.starts_with(id) {
+                return Ok(Some(value.clone()));
+            }
+        }
+
+        return Ok(None);
+    }
+
+    async fn get_by_filter(&self, mut query: Filter) -> Result<Self::Cursor<'_>, Error> {
+        let limit = if query.limit == 0 {
+            None
+        } else {
+            Some(query.limit.try_into()?)
+        };
+
+        let (index, index_prefixes) = if !query.references_to_event.is_empty() {
+            (
+                self.indexes.ref_event.read().await,
+                std::mem::take(&mut query.references_to_event)
+                    .into_iter()
+                    .map(|c| c.take())
+                    .collect::<VecDeque<_>>(),
+            )
+        } else if !query.references_to_public_key.is_empty() {
+            (
+                self.indexes.ref_pub_key.read().await,
+                std::mem::take(&mut query.references_to_public_key)
+                    .into_iter()
+                    .map(|c| c.take())
+                    .collect::<VecDeque<_>>(),
+            )
+        } else if !query.ids.is_empty() {
+            (
+                self.indexes.ids.read().await,
+                std::mem::take(&mut query.ids)
+                    .into_iter()
+                    .map(|c| c.take())
+                    .collect::<VecDeque<_>>(),
+            )
+        } else if !query.authors.is_empty() {
+            (
+                self.indexes.author.read().await,
+                std::mem::take(&mut query.authors)
+                    .into_iter()
+                    .map(|c| c.take())
+                    .collect::<VecDeque<_>>(),
+            )
+        } else if !query.kinds.is_empty() {
+            (
+                self.indexes.kind.read().await,
+                std::mem::take(&mut query.kinds)
+                    .into_iter()
+                    .map(|kind| {
+                        let kind: u32 = kind.into();
+                        kind.to_be_bytes().to_vec()
+                    })
+                    .collect::<VecDeque<_>>(),
+            )
+        } else {
+            (
+                self.indexes.ids_by_time.read().await,
+                vec![Vec::new()].into(), // all keys
+            )
+        };
+
+        Ok(Self::Cursor {
+            db: self,
+            index,
+            index_prefixes,
+            current_index_key: Vec::new(),
+            filter: Some(query.into()),
+            limit,
+            returned: 0,
+            last_prefix: None,
+            future_event: None,
+        })
+    }
+
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Cursor<'_>, Error> {
+        Ok(Self::Cursor {
+            db: self,
+            index: self.indexes.local_events.read().await,
+            index_prefixes: vec![Vec::new()].into(), // all keys
+            current_index_key: Vec::new(),
+            filter: None,
+            limit,
+            returned: 0,
+            last_prefix: None,
+            future_event: None,
+        })
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    async fn new_instance(_path: &str) -> Memory {
+        Memory::default()
+    }
+
+    async fn destroy_instance(_path: &str) {}
+
+    nostr_rs_storage_base::storage_test!(Memory, new_instance, destroy_instance);
+}

+ 167 - 0
crates/storage/rocksdb/src/cursor.rs

@@ -0,0 +1,167 @@
+//! Rocks DB implementation of the storage layer
+use crate::RocksDb;
+use futures::Stream;
+use nostr_rs_storage_base::{
+    cursor::{check_future_call, FutureResult, FutureValue},
+    Error, EventFilter, Storage,
+};
+use nostr_rs_types::types::Event;
+use rocksdb::{BoundColumnFamily, DBIteratorWithThreadMode, DB};
+use std::{
+    collections::VecDeque,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+pub struct Cursor<'a> {
+    /// Reference to the rocks db database. This is useful to load the event
+    /// data, because in the secondary indexes we only store the event ID. It
+    /// could be possible to avoid a reference and speed up things even more to
+    /// duplicate the event, but that would mean it would use way more disk
+    /// space.
+    db: &'a RocksDb,
+    /// List of filters to apply to the events, before returning. If no filter
+    /// is given each events from the secondary index will be returned,
+    /// otherwise the events will be filtered by the given filter, and only
+    /// those events that comply will be returned
+    filter: Option<EventFilter>,
+    /// Reference to the namespace to use to query the secondary index. If none
+    /// is given the secondary_index_iterator must be constructed outside this
+    /// wrapper.
+    index: Option<Arc<BoundColumnFamily<'a>>>,
+    /// The current secondary index iterator. If none is given the iterator will
+    /// try to create one using the namespace property and the first prefix from
+    /// prefixes (it will also be copied to current_prefix)
+    index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
+    /// Index key or prefix to iterate over in the index
+    index_keys: VecDeque<Vec<u8>>,
+    /// The current prefix to load
+    current_index_key: Vec<u8>,
+    /// Limit of events to return, None means nothing
+    limit: Option<usize>,
+    /// Amount of events returned
+    returned: usize,
+    /// Future event to return
+    future_event: Option<FutureResult<'a>>,
+}
+
+impl<'a> Cursor<'a> {
+    pub fn new(
+        db: &'a RocksDb,
+        index: Option<Arc<BoundColumnFamily<'a>>>,
+        prefixes: Vec<Vec<u8>>,
+        filter: Option<EventFilter>,
+        secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
+        limit: Option<usize>,
+    ) -> Self {
+        Self {
+            db,
+            index,
+            index_iterator: secondary_index_iterator,
+            current_index_key: Vec::new(),
+            index_keys: prefixes.into(),
+            filter,
+            limit,
+            returned: 0,
+            future_event: None,
+        }
+    }
+    /// Selects the next prefix available and starts an iterator using the
+    /// secondary index. If no prefix is available from prefixes the functions
+    /// return None, signalling upstream the are no more results
+    fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
+        self.index_iterator = None;
+        let prefix = self.index_keys.pop_front()?;
+        self.index_iterator = Some(
+            self.db
+                .db
+                .prefix_iterator_cf(self.index.as_ref()?, prefix.clone()),
+        );
+        self.current_index_key = prefix;
+        Some(())
+    }
+}
+
+impl<'a> Stream for Cursor<'a> {
+    type Item = Result<Event, Error>;
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, None)
+    }
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if Some(self.returned) == self.limit {
+            return Poll::Ready(None);
+        }
+
+        let this = Pin::into_inner(self);
+        let db = this.db;
+
+        match check_future_call(&mut this.future_event, &this.filter, cx) {
+            FutureValue::Found(event) => {
+                this.returned += 1;
+                return Poll::Ready(Some(event));
+            }
+            FutureValue::Pending => return Poll::Pending,
+            _ => {}
+        }
+
+        loop {
+            let secondary_index = if let Some(iterator) = this.index_iterator.as_mut() {
+                iterator
+            } else if this.index.is_some() {
+                let _ = this.select_next_prefix_using_secondary_index();
+                if let Some(iterator) = this.index_iterator.as_mut() {
+                    iterator
+                } else {
+                    return Poll::Ready(None);
+                }
+            } else {
+                // No secondary index is used to query, this means the query is
+                // using the ID filter, so it is more efficient to use the
+                // primary index to prefetch events that may satisfy the query
+                return if let Some(prefix) = this.index_keys.pop_front() {
+                    this.future_event = Some(db.get_event(prefix));
+                    match check_future_call(&mut this.future_event, &this.filter, cx) {
+                        FutureValue::Found(event) => {
+                            this.returned += 1;
+                            Poll::Ready(Some(event))
+                        }
+                        FutureValue::Pending => Poll::Pending,
+                        _ => continue,
+                    }
+                } else {
+                    Poll::Ready(None)
+                };
+            };
+
+            return match secondary_index.next() {
+                Some(Ok((key, value))) => {
+                    if !key.starts_with(&this.current_index_key) {
+                        let _ = this.select_next_prefix_using_secondary_index();
+                        continue;
+                    }
+
+                    this.future_event = Some(db.get_event(value));
+                    match check_future_call(&mut this.future_event, &this.filter, cx) {
+                        FutureValue::Found(event) => {
+                            this.returned += 1;
+                            Poll::Ready(Some(event))
+                        }
+                        FutureValue::Pending => Poll::Pending,
+                        _ => continue,
+                    }
+                }
+                Some(Err(err)) => {
+                    this.returned += 1;
+                    Poll::Ready(Some(Err(Error::Internal(err.to_string()))))
+                }
+                None => {
+                    let _ = this.select_next_prefix_using_secondary_index();
+                    continue;
+                }
+            };
+        }
+    }
+}

+ 0 - 170
crates/storage/rocksdb/src/iterator.rs

@@ -1,170 +0,0 @@
-//! Rocks DB implementation of the storage layer
-use crate::{event_filter::EventFilter, RocksDb};
-use futures::{Future, FutureExt, Stream};
-use nostr_rs_storage_base::{Error, Storage};
-use nostr_rs_types::types::Event;
-use rocksdb::{BoundColumnFamily, DBIteratorWithThreadMode, DB};
-use std::{
-    collections::VecDeque,
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
-
-type CurrentEventByPrefixFuture<'a> = Pin<
-    Box<
-        dyn Future<
-                Output = Result<Option<nostr_rs_types::types::Event>, nostr_rs_storage_base::Error>,
-            > + Send
-            + 'a,
-    >,
->;
-
-pub struct WrapperIterator<'a> {
-    /// Reference to the rocks db database. This is useful to load the event
-    /// data, because in the secondary indexes we only store the event ID. It
-    /// could be possible to avoid a reference and speed up things even more to
-    /// duplicate the event, but that would mean it would use way more disk
-    /// space.
-    pub db: &'a RocksDb,
-    /// List of filters to apply to the events, before returning. If no filter
-    /// is given each events from the secondary index will be returned,
-    /// otherwise the events will be filtered by the given filter, and only
-    /// those events that comply will be returned
-    pub filter: Option<EventFilter>,
-    /// Reference to the namespace to use to query the secondary index. If none
-    /// is given the secondary_index_iterator must be constructed outside this
-    /// wrapper.
-    pub namespace: Option<Arc<BoundColumnFamily<'a>>>,
-    /// The current secondary index iterator. If none is given the iterator will
-    /// try to create one using the namespace property and the first prefix from
-    /// prefixes (it will also be copied to current_prefix)
-    pub secondary_index_iterator: Option<DBIteratorWithThreadMode<'a, DB>>,
-    pub current_prefix: Vec<u8>,
-    pub prefixes: VecDeque<Vec<u8>>,
-    pub limit: Option<usize>,
-    pub returned: usize,
-
-    pub future_event: Option<CurrentEventByPrefixFuture<'a>>,
-}
-
-impl<'a> WrapperIterator<'a> {
-    /// Selects the next prefix available and starts an iterator using the
-    /// secondary index. If no prefix is available from prefixes the functions
-    /// return None, signalling upstream the are no more results
-    fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
-        self.secondary_index_iterator = None;
-        let prefix = self.prefixes.pop_front()?;
-        self.secondary_index_iterator = Some(
-            self.db
-                .db
-                .prefix_iterator_cf(self.namespace.as_ref()?, prefix.clone()),
-        );
-        self.current_prefix = prefix;
-        Some(())
-    }
-
-    fn handle_future_call(&mut self, cx: &mut Context<'_>) -> FutureStatus {
-        if let Some(mut future_event) = self.future_event.take() {
-            match future_event.poll_unpin(cx) {
-                Poll::Ready(Ok(None)) => FutureStatus::FoundNotMatch,
-                Poll::Ready(Ok(Some(event))) => {
-                    // event is ready, apply the neccesary filters
-                    if let Some(filter) = &self.filter {
-                        if filter.check_event(&event) {
-                            FutureStatus::Found(Ok(event))
-                        } else {
-                            FutureStatus::FoundNotMatch
-                        }
-                    } else {
-                        FutureStatus::Found(Ok(event))
-                    }
-                }
-                Poll::Ready(Err(error)) => return FutureStatus::Found(Err(error)),
-                Poll::Pending => FutureStatus::Pending,
-            }
-        } else {
-            FutureStatus::Ended
-        }
-    }
-}
-
-enum FutureStatus {
-    Found(Result<Event, Error>),
-    Pending,
-    Ended,
-    FoundNotMatch,
-}
-
-impl<'a> Stream for WrapperIterator<'a> {
-    type Item = Result<Event, Error>;
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        (0, None)
-    }
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        if Some(self.returned) == self.limit {
-            return Poll::Ready(None);
-        }
-
-        let this = Pin::into_inner(self);
-        let db = this.db;
-
-        match this.handle_future_call(cx) {
-            FutureStatus::Found(event) => return Poll::Ready(Some(event)),
-            FutureStatus::Pending => return Poll::Pending,
-            FutureStatus::FoundNotMatch | FutureStatus::Ended => {}
-        }
-
-        loop {
-            let secondary_index = if let Some(iterator) = this.secondary_index_iterator.as_mut() {
-                iterator
-            } else {
-                if this.namespace.is_some() {
-                    let _ = this.select_next_prefix_using_secondary_index();
-                    if let Some(iterator) = this.secondary_index_iterator.as_mut() {
-                        iterator
-                    } else {
-                        return Poll::Ready(None);
-                    }
-                } else {
-                    // No secondary index is used to query, this means the query is
-                    // using the ID filter, so it is more efficient to use the
-                    // primary index to prefetch events that may satisfy the query
-                    return if let Some(prefix) = this.prefixes.pop_front() {
-                        this.future_event = Some(db.get_event(prefix));
-                        match this.handle_future_call(cx) {
-                            FutureStatus::Found(event) => Poll::Ready(Some(event)),
-                            FutureStatus::Pending => Poll::Pending,
-                            FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
-                        }
-                    } else {
-                        Poll::Ready(None)
-                    };
-                }
-            };
-
-            return match secondary_index.next() {
-                Some(Ok((key, value))) => {
-                    if !key.starts_with(&this.current_prefix) {
-                        let _ = this.select_next_prefix_using_secondary_index();
-                        continue;
-                    }
-
-                    this.future_event = Some(db.get_event(value));
-                    match this.handle_future_call(cx) {
-                        FutureStatus::Found(event) => Poll::Ready(Some(event)),
-                        FutureStatus::Pending => Poll::Pending,
-                        FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
-                    }
-                }
-                Some(Err(err)) => Poll::Ready(Some(Err(Error::Internal(err.to_string())))),
-                None => {
-                    let _ = this.select_next_prefix_using_secondary_index();
-                    continue;
-                }
-            };
-        }
-    }
-}

+ 91 - 92
crates/storage/rocksdb/src/lib.rs

@@ -1,6 +1,6 @@
 //! Rocks DB implementation of the storage layer
-use crate::{iterator::WrapperIterator, secondary_index::SecondaryIndex};
-use nostr_rs_storage_base::{Error, Storage};
+use crate::cursor::Cursor;
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
 use nostr_rs_types::types::{Event, Filter, Tag};
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
@@ -8,9 +8,7 @@ use rocksdb::{
 };
 use std::{collections::VecDeque, ops::Deref, path::Path, sync::Arc};
 
-mod event_filter;
-mod iterator;
-mod secondary_index;
+mod cursor;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
 enum ReferenceType {
@@ -91,21 +89,18 @@ impl RocksDb {
 
 #[async_trait::async_trait]
 impl Storage for RocksDb {
-    type Stream<'a> = WrapperIterator<'a>;
+    type Cursor<'a> = Cursor<'a>;
 
-    async fn get_local_events(&self, limit: Option<usize>) -> Result<WrapperIterator<'_>, Error> {
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<Cursor<'_>, Error> {
         let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
-        Ok(WrapperIterator {
-            db: self,
-            filter: None,
-            namespace: None,
-            secondary_index_iterator: Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
-            current_prefix: vec![],
-            prefixes: VecDeque::new(),
+        Ok(Cursor::new(
+            self,
+            None,
+            vec![],
+            None,
+            Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
             limit,
-            returned: 0,
-            future_event: None,
-        })
+        ))
     }
 
     async fn set_local_event(&self, event: &Event) -> Result<(), Error> {
@@ -204,95 +199,99 @@ impl Storage for RocksDb {
     }
 
     async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error> {
-        Ok(self
+        let id = id.as_ref();
+        if let Some(value) = self
             .db
-            .get_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)
-            .map_err(|e| Error::Internal(e.to_string()))?
-            .map(|event| serde_json::from_slice(&event))
-            .transpose()?)
+            .prefix_iterator_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)
+            .next()
+        {
+            let (key, value) = value.map_err(|e| Error::Internal(e.to_string()))?;
+
+            if key.starts_with(id) {
+                return Ok(Some(serde_json::from_slice(&value)?));
+            }
+        }
+
+        Ok(None)
     }
 
-    async fn get_by_filter(&self, mut query: Filter) -> Result<WrapperIterator<'_>, Error> {
+    async fn get_by_filter(&self, mut query: Filter) -> Result<Cursor<'_>, Error> {
         let limit = if query.limit == 0 {
             None
         } else {
             Some(query.limit.try_into()?)
         };
 
-        let (namespace, secondary_index_iterator, prefixes) =
-            if !query.references_to_event.is_empty() {
-                let ns: Arc<BoundColumnFamily<'_>> =
-                    self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-                let keys = query
-                    .references_to_event
-                    .iter()
-                    .map(|c| c.as_ref().to_vec())
-                    .collect();
-                query.references_to_event.clear();
-                (Some(ns), None, keys)
-            } else if !query.references_to_public_key.is_empty() {
-                let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
-                let keys = query
-                    .references_to_public_key
-                    .iter()
-                    .map(|c| c.as_ref().to_vec())
-                    .collect();
-                query.references_to_public_key.clear();
-                (Some(ns), None, keys)
-            } else if !query.ids.is_empty() {
-                let keys = query.ids.iter().map(|c| c.as_ref().to_vec()).collect();
-                query.ids.clear();
-                (None, None, keys)
-            } else if !query.authors.is_empty() {
-                let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
-                let keys = query.authors.iter().map(|c| c.as_ref().to_vec()).collect();
-                query.authors.clear();
-                (Some(ns), None, keys)
-            } else if !query.kinds.is_empty() {
-                let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
-                let keys = query
-                    .kinds
-                    .iter()
-                    .map(|kind| {
-                        let kind: u32 = (*kind).into();
-                        kind.to_be_bytes().to_vec()
-                    })
-                    .collect();
-                query.kinds.clear();
-                (Some(ns), None, keys)
-            } else {
-                let cf_handle = self.reference_to_cf_handle(ReferenceType::Stream)?;
-                (
-                    None,
-                    Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
-                    VecDeque::new(),
-                )
-            };
-
-        Ok(WrapperIterator {
-            db: self,
-            filter: Some(query.into()),
-            namespace,
+        let (index, secondary_index_iterator, prefixes) = if !query.references_to_event.is_empty() {
+            let ns: Arc<BoundColumnFamily<'_>> =
+                self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+
+            let keys = std::mem::take(&mut query.references_to_event)
+                .into_iter()
+                .map(|c| c.take())
+                .collect();
+            (Some(ns), None, keys)
+        } else if !query.references_to_public_key.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+            let keys = std::mem::take(&mut query.references_to_public_key)
+                .into_iter()
+                .map(|c| c.take())
+                .collect();
+            (Some(ns), None, keys)
+        } else if !query.ids.is_empty() {
+            let keys = std::mem::take(&mut query.ids)
+                .into_iter()
+                .map(|c| c.take())
+                .collect();
+            (None, None, keys)
+        } else if !query.authors.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
+            let keys = std::mem::take(&mut query.authors)
+                .into_iter()
+                .map(|c| c.take())
+                .collect();
+            (Some(ns), None, keys)
+        } else if !query.kinds.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
+            let keys = std::mem::take(&mut query.kinds)
+                .into_iter()
+                .map(|kind| {
+                    let kind: u32 = kind.into();
+                    kind.to_be_bytes().to_vec()
+                })
+                .collect();
+            (Some(ns), None, keys)
+        } else {
+            let cf_handle = self.reference_to_cf_handle(ReferenceType::Stream)?;
+            (
+                None,
+                Some(self.db.iterator_cf(&cf_handle, IteratorMode::Start)),
+                VecDeque::new(),
+            )
+        };
+
+        Ok(Cursor::new(
+            self,
+            index,
+            prefixes.into(),
+            Some(query.into()),
             secondary_index_iterator,
-            current_prefix: vec![],
-            prefixes,
-            returned: 0,
             limit,
-            future_event: None,
-        })
-
-        //load_events_and_filter(self, query, event_ids, for_each)
+        ))
     }
 }
 
 #[cfg(test)]
-fn new_instance(path: &str) -> RocksDb {
-    RocksDb::new(&path).expect("valid db")
-}
+mod test {
+    use super::*;
 
-#[cfg(test)]
-fn destroy_instance(path: &str) {
-    std::fs::remove_dir_all(path).expect("deleted file");
-}
+    async fn new_instance(path: &str) -> RocksDb {
+        RocksDb::new(path).expect("valid db")
+    }
+
+    async fn destroy_instance(path: &str) {
+        std::fs::remove_dir_all(path).expect("deleted file");
+    }
 
-nostr_rs_storage_base::storage_test!(RocksDb, new_instance, destroy_instance);
+    nostr_rs_storage_base::storage_test!(RocksDb, new_instance, destroy_instance);
+}

+ 5 - 0
crates/types/src/types/addr.rs

@@ -110,6 +110,11 @@ impl Addr {
         Addr::try_from_bytes(hex::decode(pk)?, hrp)
     }
 
+    /// Converts the Addr into a vector of bytes
+    pub fn take(self) -> Vec<u8> {
+        self.bytes
+    }
+
     /// Instantiates a new Addr from a vector of bytes with an optional human
     /// readable part
     pub fn try_from_bytes(bytes: Vec<u8>, hrp: Option<HumanReadablePart>) -> Result<Self, Error> {