Cesar Rodas vor 3 Monaten
Ursprung
Commit
c525701bfd

+ 10 - 9
crates/relayer/src/relayer.rs

@@ -83,7 +83,7 @@ impl<T: Storage> Relayer<T> {
     ) -> Result<Option<Request>, Error> {
         match &request {
             Request::Event(event) => {
-                self.store_and_broadcast_local_event(event.deref());
+                self.store_and_broadcast_local_event(event.deref()).await;
             }
             Request::Request(request) => {
                 // Create subscription
@@ -198,9 +198,9 @@ impl<T: Storage> Relayer<T> {
     }
 
     #[inline]
-    pub fn store_and_broadcast_local_event(&self, event: &Event) {
+    pub async fn store_and_broadcast_local_event(&self, event: &Event) {
         if let Some(storage) = self.storage.as_ref() {
-            let _ = storage.store_local_event(event);
+            let _ = storage.store_local_event(event).await;
         }
         let subscriptions = self.subscriptions.read();
 
@@ -233,7 +233,7 @@ mod test {
     use nostr_rs_rocksdb::RocksDb;
     use nostr_rs_types::Request;
 
-    fn get_db(prefill: bool) -> RocksDb {
+    async fn get_db(prefill: bool) -> RocksDb {
         let db = RocksDb::new(format!("/tmp/db/{}", get_id())).expect("db");
         if prefill {
             let events = include_str!("../tests/events.json")
@@ -242,7 +242,7 @@ mod test {
                 .collect::<Vec<Event>>();
 
             for event in events {
-                assert!(db.store(&event).expect("valid"));
+                assert!(db.store(&event).await.expect("valid"));
             }
         }
         db
@@ -258,9 +258,9 @@ mod test {
                 {\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},
                 {\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}
             ]").expect("valid object");
-        let (relayer, _) = Relayer::new(Some(get_db(true)));
+        let (relayer, _) = Relayer::new(Some(get_db(true).await));
         let (connection, mut recv) = Connection::new_for_test();
-        let _ = relayer.recv_request_from_client(&connection, request);
+        let _ = relayer.recv_request_from_client(&connection, request).await;
         // ev1
         assert_eq!(
             "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
@@ -307,9 +307,9 @@ mod test {
     #[tokio::test]
     async fn server_listener_real_time() {
         let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
-        let (relayer, _) = Relayer::new(Some(get_db(false)));
+        let (relayer, _) = Relayer::new(Some(get_db(false).await));
         let (connection, mut recv) = Connection::new_for_test();
-        let _ = relayer.recv_request_from_client(&connection, request);
+        let _ = relayer.recv_request_from_client(&connection, request).await;
         // eod
         assert!(recv
             .try_recv()
@@ -324,6 +324,7 @@ mod test {
 
         relayer
             .recv_request_from_client(&connection, new_event)
+            .await
             .expect("process event");
 
         // It is not empty

+ 77 - 49
crates/storage/rocksdb/src/iterator.rs

@@ -45,7 +45,7 @@ pub struct WrapperIterator<'a> {
     pub limit: Option<usize>,
     pub returned: usize,
 
-    pub current_event_by_prefix: Option<CurrentEventByPrefixFuture<'a>>,
+    pub future_event: Option<CurrentEventByPrefixFuture<'a>>,
 }
 
 impl<'a> WrapperIterator<'a> {
@@ -53,6 +53,7 @@ impl<'a> WrapperIterator<'a> {
     /// secondary index. If no prefix is available from prefixes the functions
     /// return None, signalling upstream the are no more results
     fn select_next_prefix_using_secondary_index(&mut self) -> Option<()> {
+        self.secondary_index_iterator = None;
         let prefix = self.prefixes.pop_front()?;
         self.secondary_index_iterator = Some(
             self.db
@@ -62,6 +63,37 @@ impl<'a> WrapperIterator<'a> {
         self.current_prefix = prefix;
         Some(())
     }
+
+    fn handle_future_call(&mut self, cx: &mut Context<'_>) -> FutureStatus {
+        if let Some(mut future_event) = self.future_event.take() {
+            match future_event.poll_unpin(cx) {
+                Poll::Ready(Ok(None)) => FutureStatus::FoundNotMatch,
+                Poll::Ready(Ok(Some(event))) => {
+                    // event is ready, apply the neccesary filters
+                    if let Some(filter) = &self.filter {
+                        if filter.check_event(&event) {
+                            FutureStatus::Found(Ok(event))
+                        } else {
+                            FutureStatus::FoundNotMatch
+                        }
+                    } else {
+                        FutureStatus::Found(Ok(event))
+                    }
+                }
+                Poll::Ready(Err(error)) => return FutureStatus::Found(Err(error)),
+                Poll::Pending => FutureStatus::Pending,
+            }
+        } else {
+            FutureStatus::Ended
+        }
+    }
+}
+
+enum FutureStatus {
+    Found(Result<Event, Error>),
+    Pending,
+    Ended,
+    FoundNotMatch,
 }
 
 impl<'a> Stream for WrapperIterator<'a> {
@@ -79,64 +111,60 @@ impl<'a> Stream for WrapperIterator<'a> {
         let this = Pin::into_inner(self);
         let db = this.db;
 
-        if let Some(mut current_event_filter) = this.current_event_by_prefix.take() {
-            match current_event_filter.poll_unpin(cx) {
-                Poll::Ready(Ok(Some(event))) => {
-                    // event is ready, apply the neccesary filters
-                    if let Some(filter) = &this.filter {
-                        if filter.check_event(&event) {
-                            this.returned += 1;
-                            return Poll::Ready(Some(Ok(event)));
-                        }
-                    } else {
-                        this.returned += 1;
-                        return Poll::Ready(Some(Ok(event)));
-                    }
-                }
-                Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
-                Poll::Pending => {
-                    // add it back
-                    this.current_event_by_prefix = Some(current_event_filter);
-                    return Poll::Pending;
-                }
-                _ => {}
-            }
+        match this.handle_future_call(cx) {
+            FutureStatus::Found(event) => return Poll::Ready(Some(event)),
+            FutureStatus::Pending => return Poll::Pending,
+            FutureStatus::FoundNotMatch | FutureStatus::Ended => {}
         }
-        let secondary_index = if let Some(iterator) = this.secondary_index_iterator.as_mut() {
-            iterator
-        } else {
-            return Poll::Ready(None);
-        };
-
-        match secondary_index.next() {
-            Some(Ok((key, value))) => {
-                if !key.starts_with(&this.current_prefix) {
-                    if this.select_next_prefix_using_secondary_index().is_none() {
-                        return Poll::Ready(None);
-                    }
-                } else {
-                    // query the database to get the record
-                    this.current_event_by_prefix = Some(db.get_event(value));
-                }
 
-                Poll::Pending
-            }
-            Some(Err(err)) => Poll::Ready(Some(Err(Error::Internal(err.to_string())))),
-            None => {
+        loop {
+            let secondary_index = if let Some(iterator) = this.secondary_index_iterator.as_mut() {
+                iterator
+            } else {
                 if this.namespace.is_some() {
-                    if this.select_next_prefix_using_secondary_index().is_none() {
+                    let _ = this.select_next_prefix_using_secondary_index();
+                    if let Some(iterator) = this.secondary_index_iterator.as_mut() {
+                        iterator
+                    } else {
                         return Poll::Ready(None);
                     }
                 } else {
                     // No secondary index is used to query, this means the query is
                     // using the ID filter, so it is more efficient to use the
                     // primary index to prefetch events that may satisfy the query
-                    let current_event_by_prefix =
-                        this.prefixes.pop_front().map(|prefix| db.get_event(prefix));
-                    this.current_event_by_prefix = current_event_by_prefix;
+                    return if let Some(prefix) = this.prefixes.pop_front() {
+                        this.future_event = Some(db.get_event(prefix));
+                        match this.handle_future_call(cx) {
+                            FutureStatus::Found(event) => Poll::Ready(Some(event)),
+                            FutureStatus::Pending => Poll::Pending,
+                            FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
+                        }
+                    } else {
+                        Poll::Ready(None)
+                    };
                 }
-                Poll::Pending
-            }
+            };
+
+            return match secondary_index.next() {
+                Some(Ok((key, value))) => {
+                    if !key.starts_with(&this.current_prefix) {
+                        let _ = this.select_next_prefix_using_secondary_index();
+                        continue;
+                    }
+
+                    this.future_event = Some(db.get_event(value));
+                    match this.handle_future_call(cx) {
+                        FutureStatus::Found(event) => Poll::Ready(Some(event)),
+                        FutureStatus::Pending => Poll::Pending,
+                        FutureStatus::FoundNotMatch | FutureStatus::Ended => continue,
+                    }
+                }
+                Some(Err(err)) => Poll::Ready(Some(Err(Error::Internal(err.to_string())))),
+                None => {
+                    let _ = this.select_next_prefix_using_secondary_index();
+                    continue;
+                }
+            };
         }
     }
 }

+ 2 - 2
crates/storage/rocksdb/src/lib.rs

@@ -104,7 +104,7 @@ impl Storage for RocksDb {
             prefixes: VecDeque::new(),
             limit,
             returned: 0,
-            current_event_by_prefix: None,
+            future_event: None,
         })
     }
 
@@ -278,7 +278,7 @@ impl Storage for RocksDb {
             prefixes,
             returned: 0,
             limit,
-            current_event_by_prefix: None,
+            future_event: None,
         })
 
         //load_events_and_filter(self, query, event_ids, for_each)