Kaynağa Gözat

Improvements

1. fixed tests in base, they should be asyn
2. moved things that are common out of rocksdb
Cesar Rodas 3 ay önce
ebeveyn
işleme
4db0505622

+ 6 - 24
Cargo.lock

@@ -181,17 +181,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
 name = "chrono"
-version = "0.4.26"
+version = "0.4.38"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5"
+checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
 dependencies = [
  "android-tzdata",
  "iana-time-zone",
  "js-sys",
  "num-traits",
- "time",
  "wasm-bindgen",
- "winapi",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -478,7 +477,7 @@ checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
 dependencies = [
  "cfg-if",
  "libc",
- "wasi 0.11.0+wasi-snapshot-preview1",
+ "wasi",
 ]
 
 [[package]]
@@ -857,7 +856,7 @@ checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4"
 dependencies = [
  "hermit-abi",
  "libc",
- "wasi 0.11.0+wasi-snapshot-preview1",
+ "wasi",
  "windows-sys 0.52.0",
 ]
 
@@ -974,9 +973,9 @@ name = "nostr-rs-storage-base"
 version = "0.1.0"
 dependencies = [
  "async-trait",
+ "chrono",
  "futures",
  "nostr-rs-types",
- "parking_lot",
  "rand",
  "serde_json",
  "thiserror",
@@ -1497,17 +1496,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "time"
-version = "0.1.45"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
-dependencies = [
- "libc",
- "wasi 0.10.0+wasi-snapshot-preview1",
- "winapi",
-]
-
-[[package]]
 name = "tinyvec"
 version = "1.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1760,12 +1748,6 @@ dependencies = [
 
 [[package]]
 name = "wasi"
-version = "0.10.0+wasi-snapshot-preview1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
-
-[[package]]
-name = "wasi"
 version = "0.11.0+wasi-snapshot-preview1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"

+ 3 - 3
crates/storage/base/Cargo.toml

@@ -7,12 +7,12 @@ edition = "2021"
 nostr-rs-types = { path = "../../types" }
 thiserror = "1.0.40"
 rand = "0.8.5"
-tokio = { version = "1.32.0", features = ["sync"] }
-parking_lot = "0.12.1"
+tokio = { version = "1.32.0", features = ["full"] }
 serde_json = "1.0"
 async-trait = "0.1.81"
 futures = "0.3.30"
+chrono = "0.4.38"
 
 [features]
-default = []
 test = []
+default = []

+ 48 - 0
crates/storage/base/src/cursor.rs

@@ -0,0 +1,48 @@
+use crate::{event_filter::EventFilter, Error};
+use futures::FutureExt;
+use nostr_rs_types::types::{Event, Filter};
+use std::{
+    future::Future,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+pub enum FutureValue {
+    Found(Result<Event, Error>),
+    Pending,
+    Ended,
+    FoundNotMatch,
+}
+
+pub type FutureResult<'a> = Pin<Box<dyn Future<Output = Result<Option<Event>, Error>> + Send + 'a>>;
+
+pub fn check_future_call(
+    future_event: &mut Option<FutureResult<'_>>,
+    filter: &Option<EventFilter>,
+    cx: &mut Context<'_>,
+) -> FutureValue {
+    if let Some(mut inner_future) = future_event.take() {
+        match inner_future.poll_unpin(cx) {
+            Poll::Ready(Ok(None)) => FutureValue::FoundNotMatch,
+            Poll::Ready(Ok(Some(event))) => {
+                // event is ready, apply the neccesary filters
+                if let Some(filter) = filter {
+                    if filter.check_event(&event) {
+                        FutureValue::Found(Ok(event))
+                    } else {
+                        FutureValue::FoundNotMatch
+                    }
+                } else {
+                    FutureValue::Found(Ok(event))
+                }
+            }
+            Poll::Ready(Err(error)) => FutureValue::Found(Err(error)),
+            Poll::Pending => {
+                *future_event = Some(inner_future);
+                FutureValue::Pending
+            }
+        }
+    } else {
+        FutureValue::Ended
+    }
+}

+ 0 - 0
crates/storage/rocksdb/src/event_filter.rs → crates/storage/base/src/event_filter.rs


+ 30 - 19
crates/storage/base/src/lib.rs

@@ -4,13 +4,18 @@
 //! find events by their tags, kind and references.
 #![allow(missing_docs, warnings)]
 #![allow(dead_code)]
+pub mod cursor;
 mod error;
+mod event_filter;
 mod notification;
 mod storage;
 #[cfg(feature = "test")]
 pub mod test;
 
-pub use crate::{error::Error, notification::Subscription, storage::Storage};
+#[cfg(feature = "test")]
+pub use tokio;
+
+pub use crate::{error::Error, event_filter::*, notification::Subscription, storage::Storage};
 
 #[macro_export]
 /// This macro creates the
@@ -19,6 +24,7 @@ macro_rules! storage_test {
         #[cfg(test)]
         mod test {
             use super::*;
+            use futures::executor::block_on;
             use std::sync::atomic::{AtomicUsize, Ordering};
 
             static LAST_VERSION: AtomicUsize = AtomicUsize::new(0);
@@ -29,11 +35,11 @@ macro_rules! storage_test {
             }
 
             impl DisposableStorage {
-                pub fn new() -> Self {
+                pub async fn new() -> Self {
                     let id = LAST_VERSION.fetch_add(1, Ordering::Relaxed);
                     let path = format!("/tmp/nostr/test-{}", id);
                     Self {
-                        storage: $init(&path),
+                        storage: $init(&path).await,
                         path,
                     }
                 }
@@ -41,21 +47,26 @@ macro_rules! storage_test {
 
             impl std::ops::Drop for DisposableStorage {
                 fn drop(&mut self) {
-                    $destroy(&self.path)
+                    block_on(async move {
+                        $destroy(&self.path).await;
+                    });
                 }
             }
 
-            nostr_rs_storage_base::storage_test_name! { store_and_get }
-            nostr_rs_storage_base::storage_test_name! { records_are_sorted_by_date_desc }
-            nostr_rs_storage_base::storage_test_name! { filter_by_references }
-            nostr_rs_storage_base::storage_test_name! { filter_by_references_zero_match }
-            nostr_rs_storage_base::storage_test_name! { filter_by_references_and_kind }
-            nostr_rs_storage_base::storage_test_name! { filter_by_authors }
-            nostr_rs_storage_base::storage_test_name! { filter_by_author }
-            nostr_rs_storage_base::storage_test_name! { filter_by_author_and_kinds }
-            nostr_rs_storage_base::storage_test_name! { filter_by_kind }
-            nostr_rs_storage_base::storage_test_name! { get_event_and_related_events }
-            nostr_rs_storage_base::storage_test_name! { get_local_events }
+            use nostr_rs_storage_base::tokio;
+
+            nostr_rs_storage_base::storage_test_name!(store_and_get_by_partial_key);
+            nostr_rs_storage_base::storage_test_name!(store_and_get);
+            nostr_rs_storage_base::storage_test_name!(records_are_sorted_by_date_desc);
+            nostr_rs_storage_base::storage_test_name!(filter_by_references);
+            nostr_rs_storage_base::storage_test_name!(filter_by_references_zero_match);
+            nostr_rs_storage_base::storage_test_name!(filter_by_references_and_kind);
+            nostr_rs_storage_base::storage_test_name!(filter_by_authors);
+            nostr_rs_storage_base::storage_test_name!(filter_by_author);
+            nostr_rs_storage_base::storage_test_name!(filter_by_author_and_kinds);
+            nostr_rs_storage_base::storage_test_name!(filter_by_kind);
+            nostr_rs_storage_base::storage_test_name!(get_event_and_related_events);
+            nostr_rs_storage_base::storage_test_name!(get_local_events);
         }
     };
 }
@@ -64,10 +75,10 @@ macro_rules! storage_test {
 /// Creates a
 macro_rules! storage_test_name {
     ($name:tt) => {
-        #[test]
-        fn $name() {
-            let x = DisposableStorage::new();
-            nostr_rs_storage_base::test::$name(&x.storage);
+        #[tokio::test]
+        async fn $name() {
+            let x = DisposableStorage::new().await;
+            nostr_rs_storage_base::test::$name(&x.storage).await;
         }
     };
 }

+ 4 - 5
crates/storage/base/src/notification.rs

@@ -1,14 +1,13 @@
 use crate::{Error, Storage};
 use futures::Stream;
 use nostr_rs_types::types::{Addr, Event, Filter, Kind};
-use parking_lot::RwLock;
 use std::{
     collections::HashMap,
     pin::Pin,
     sync::atomic::AtomicUsize,
     task::{Context, Poll},
 };
-use tokio::sync::mpsc::Sender;
+use tokio::sync::{mpsc::Sender, RwLock};
 
 #[allow(dead_code)]
 struct SubscriptionEntry {
@@ -86,7 +85,7 @@ where
 
     /// Removes a subscription from the listener
     pub async fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
-        let mut subscribers = self.subscriptions.write();
+        let mut subscribers = self.subscriptions.write().await;
         let _ = subscribers.remove(&subscription_id);
         Ok(())
     }
@@ -98,8 +97,8 @@ where
         filter: Filter,
         sender: Sender<(usize, Event)>,
     ) -> Result<(usize, SubscriptionResultFromDb<T::Cursor<'_>>), Error> {
-        let mut subscribers = self.subscriptions.write();
-        let mut _subscription_listener = self.subscription_listener.write();
+        let mut subscribers = self.subscriptions.write().await;
+        let mut _subscription_listener = self.subscription_listener.write().await;
         let id = self
             .last_id
             .fetch_add(1, std::sync::atomic::Ordering::SeqCst);

+ 14 - 0
crates/storage/base/src/test.rs

@@ -38,6 +38,20 @@ where
     assert_eq!(event1, Some(event));
 }
 
+pub async fn store_and_get_by_partial_key<T>(db: &T)
+where
+    T: Storage,
+{
+    let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+    let event: Event = serde_json::from_str(json).expect("valid");
+    assert_eq!(true, db.store(&event).await.expect("valid"));
+    assert_eq!(false, db.store(&event).await.expect("valid"));
+
+    let event1 = db.get_event(&event.id[0..2]).await.expect("something");
+    assert_eq!(event1, Some(event));
+}
+
 pub async fn records_are_sorted_by_date_desc<T>(db: &T)
 where
     T: Storage,

+ 18 - 46
crates/storage/rocksdb/src/cursor.rs

@@ -1,7 +1,10 @@
 //! Rocks DB implementation of the storage layer
-use crate::{event_filter::EventFilter, RocksDb};
-use futures::{Future, FutureExt, Stream};
-use nostr_rs_storage_base::{Error, Storage};
+use crate::RocksDb;
+use futures::{Future, Stream};
+use nostr_rs_storage_base::{
+    cursor::{check_future_call, FutureValue},
+    Error, EventFilter, Storage,
+};
 use nostr_rs_types::types::Event;
 use rocksdb::{BoundColumnFamily, DBIteratorWithThreadMode, DB};
 use std::{
@@ -63,37 +66,6 @@ impl<'a> Cursor<'a> {
         self.current_prefix = prefix;
         Some(())
     }
-
-    fn handle_future_call(&mut self, cx: &mut Context<'_>) -> FutureStatus {
-        if let Some(mut future_event) = self.future_event.take() {
-            match future_event.poll_unpin(cx) {
-                Poll::Ready(Ok(None)) => FutureStatus::FoundNotMatch,
-                Poll::Ready(Ok(Some(event))) => {
-                    // event is ready, apply the neccesary filters
-                    if let Some(filter) = &self.filter {
-                        if filter.check_event(&event) {
-                            FutureStatus::Found(Ok(event))
-                        } else {
-                            FutureStatus::FoundNotMatch
-                        }
-                    } else {
-                        FutureStatus::Found(Ok(event))
-                    }
-                }
-                Poll::Ready(Err(error)) => return FutureStatus::Found(Err(error)),
-                Poll::Pending => FutureStatus::Pending,
-            }
-        } else {
-            FutureStatus::Ended
-        }
-    }
-}
-
-enum FutureStatus {
-    Found(Result<Event, Error>),
-    Pending,
-    Ended,
-    FoundNotMatch,
 }
 
 impl<'a> Stream for Cursor<'a> {
@@ -111,10 +83,10 @@ impl<'a> Stream for Cursor<'a> {
         let this = Pin::into_inner(self);
         let db = this.db;
 
-        match this.handle_future_call(cx) {
-            FutureStatus::Found(event) => return Poll::Ready(Some(event)),
-            FutureStatus::Pending => return Poll::Pending,
-            FutureStatus::FoundNotMatch | FutureStatus::Ended => {}
+        match check_future_call(&mut this.future_event, &this.filter, cx) {
+            FutureValue::Found(event) => return Poll::Ready(Some(event)),
+            FutureValue::Pending => return Poll::Pending,
+            FutureValue::FoundNotMatch | FutureValue::Ended => {}
         }
 
         loop {
@@ -134,10 +106,10 @@ impl<'a> Stream for Cursor<'a> {
                     // primary index to prefetch events that may satisfy the query
                     return if let Some(prefix) = this.prefixes.pop_front() {
                         this.future_event = Some(db.get_event(prefix));
-                        match this.handle_future_call(cx) {
-                            FutureStatus::Found(event) => Poll::Ready(Some(event)),
-                            FutureStatus::Pending => Poll::Pending,
-                            FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
+                        match check_future_call(&mut this.future_event, &this.filter, cx) {
+                            FutureValue::Found(event) => Poll::Ready(Some(event)),
+                            FutureValue::Pending => Poll::Pending,
+                            FutureValue::FoundNotMatch | FutureValue::Ended => continue,
                         }
                     } else {
                         Poll::Ready(None)
@@ -153,10 +125,10 @@ impl<'a> Stream for Cursor<'a> {
                     }
 
                     this.future_event = Some(db.get_event(value));
-                    match this.handle_future_call(cx) {
-                        FutureStatus::Found(event) => Poll::Ready(Some(event)),
-                        FutureStatus::Pending => Poll::Pending,
-                        FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
+                    match check_future_call(&mut this.future_event, &this.filter, cx) {
+                        FutureValue::Found(event) => Poll::Ready(Some(event)),
+                        FutureValue::Pending => Poll::Pending,
+                        FutureValue::FoundNotMatch | FutureValue::Ended => continue,
                     }
                 }
                 Some(Err(err)) => Poll::Ready(Some(Err(Error::Internal(err.to_string())))),

+ 2 - 3
crates/storage/rocksdb/src/lib.rs

@@ -9,7 +9,6 @@ use rocksdb::{
 use std::{collections::VecDeque, ops::Deref, path::Path, sync::Arc};
 
 mod cursor;
-mod event_filter;
 mod secondary_index;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
@@ -286,12 +285,12 @@ impl Storage for RocksDb {
 }
 
 #[cfg(test)]
-fn new_instance(path: &str) -> RocksDb {
+async fn new_instance(path: &str) -> RocksDb {
     RocksDb::new(&path).expect("valid db")
 }
 
 #[cfg(test)]
-fn destroy_instance(path: &str) {
+async fn destroy_instance(path: &str) {
     std::fs::remove_dir_all(path).expect("deleted file");
 }