Ver Fonte

Working an async storage

Cesar Rodas há 3 meses atrás
pai
commit
ffd7ff2b43

+ 32 - 10
Cargo.lock

@@ -42,6 +42,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "async-trait"
+version = "0.1.81"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.72",
+]
+
+[[package]]
 name = "autocfg"
 version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -416,7 +427,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.32",
+ "syn 2.0.72",
 ]
 
 [[package]]
@@ -898,6 +909,7 @@ dependencies = [
  "futures",
  "futures-util",
  "log",
+ "nostr-rs-relayer",
  "nostr-rs-types",
  "parking_lot",
  "serde_json",
@@ -926,6 +938,14 @@ dependencies = [
 ]
 
 [[package]]
+name = "nostr-rs-memory"
+version = "0.1.0"
+dependencies = [
+ "nostr-rs-storage-base",
+ "nostr-rs-types",
+]
+
+[[package]]
 name = "nostr-rs-relayer"
 version = "0.1.0"
 dependencies = [
@@ -946,6 +966,7 @@ dependencies = [
 name = "nostr-rs-rocksdb"
 version = "0.1.0"
 dependencies = [
+ "async-trait",
  "chrono",
  "nostr-rs-storage-base",
  "nostr-rs-types",
@@ -957,6 +978,7 @@ dependencies = [
 name = "nostr-rs-storage-base"
 version = "0.1.0"
 dependencies = [
+ "async-trait",
  "nostr-rs-types",
  "parking_lot",
  "rand",
@@ -1084,18 +1106,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.66"
+version = "1.0.86"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
+checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
 dependencies = [
  "unicode-ident",
 ]
 
 [[package]]
 name = "quote"
-version = "1.0.33"
+version = "1.0.36"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
+checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
 dependencies = [
  "proc-macro2",
 ]
@@ -1336,7 +1358,7 @@ checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.32",
+ "syn 2.0.72",
 ]
 
 [[package]]
@@ -1450,9 +1472,9 @@ dependencies = [
 
 [[package]]
 name = "syn"
-version = "2.0.32"
+version = "2.0.72"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2"
+checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1485,7 +1507,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.32",
+ "syn 2.0.72",
 ]
 
 [[package]]
@@ -1541,7 +1563,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.32",
+ "syn 2.0.72",
 ]
 
 [[package]]

+ 1 - 1
Cargo.toml

@@ -9,7 +9,7 @@ members = [
     "crates/client",
     "crates/relayer",
     "crates/storage/base",
-    "crates/storage/rocksdb", "crates/dump",
+    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory",
 ]
 
 [dependencies]

+ 8 - 1
crates/client/Cargo.toml

@@ -7,10 +7,17 @@ edition = "2021"
 thiserror = "1.0.40"
 nostr-rs-types = { path = "../types" }
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
-tokio-tungstenite = { version = "0.18.0", features = ["rustls", "rustls-native-certs", "rustls-tls-native-roots"] }
+tokio-tungstenite = { version = "0.18.0", features = [
+    "rustls",
+    "rustls-native-certs",
+    "rustls-tls-native-roots",
+] }
 url = "2.3.1"
 serde_json = "1.0.94"
 futures-util = "0.3.27"
 parking_lot = "0.12.1"
 log = "0.4.17"
 futures = "0.3.28"
+
+[dev-dependencies]
+nostr-rs-relayer = { path = "../relayer" }

+ 5 - 1
crates/relayer/Cargo.toml

@@ -11,7 +11,11 @@ nostr-rs-storage-base = { path = "../storage/base" }
 futures-util = "0.3.27"
 parking_lot = "0.12.1"
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
-tokio-tungstenite = { version = "0.18.0", features = ["rustls", "rustls-native-certs", "rustls-tls-native-roots"] }
+tokio-tungstenite = { version = "0.18.0", features = [
+    "rustls",
+    "rustls-native-certs",
+    "rustls-tls-native-roots",
+] }
 thiserror = "1.0.39"
 serde_json = "1.0.94"
 rand = "0.8.5"

+ 3 - 3
crates/relayer/src/relayer.rs

@@ -75,7 +75,7 @@ impl<T: Storage> Relayer<T> {
         Ok(id)
     }
 
-    fn recv_request_from_client(
+    async fn recv_request_from_client(
         &self,
         connection: &Connection,
         request: Request,
@@ -122,7 +122,7 @@ impl<T: Storage> Relayer<T> {
                 if let Some(storage) = self.storage.as_ref() {
                     // Sent all events that match the filter that are stored in our database
                     for filter in request.filters.clone().into_iter() {
-                        storage.get_by_filter(filter)?.for_each(|event| {
+                        storage.get_by_filter(filter).await?.for_each(|event| {
                             if let Ok(event) = event {
                                 let _ = connection.send(
                                     relayer::Event {
@@ -171,7 +171,7 @@ impl<T: Storage> Relayer<T> {
             .get(&conn_id)
             .ok_or(Error::UnknownConnection(conn_id))?;
 
-        self.recv_request_from_client(connection, request)
+        self.recv_request_from_client(connection, request).await
     }
 
     pub fn send_to_conn(&self, conn_id: u128, response: Response) -> Result<(), Error> {

+ 1 - 0
crates/storage/base/Cargo.toml

@@ -10,6 +10,7 @@ rand = "0.8.5"
 tokio = { version = "1.32.0", features = ["sync"] }
 parking_lot = "0.12.1"
 serde_json = "1.0"
+async-trait = "0.1.81"
 
 [features]
 default = []

+ 8 - 5
crates/storage/base/src/notification.rs

@@ -61,12 +61,15 @@ where
     }
 
     /// Gets an event from the wrapped storage
-    pub fn get_event<T1: AsRef<[u8]>>(&self, id: T1) -> Result<Option<Event>, Error> {
-        self.db.get_event(id)
+    pub async fn get_event<T1: AsRef<[u8]> + Send + Sync>(
+        &self,
+        id: T1,
+    ) -> Result<Option<Event>, Error> {
+        self.db.get_event(id).await
     }
 
     /// Removes a subscription from the listener
-    pub fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
+    pub async fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
         let mut subscribers = self.subscriptions.write();
         let _ = subscribers.remove(&subscription_id);
         Ok(())
@@ -74,7 +77,7 @@ where
 
     /// Subscribes to a filter. The first streamed bytes will be reads from the
     /// database.
-    pub fn subscribe(
+    pub async fn subscribe(
         &self,
         filter: Filter,
         sender: Sender<(usize, Event)>,
@@ -95,7 +98,7 @@ where
         Ok((
             id,
             SubscriptionResultFromDb {
-                iterator: self.db.get_by_filter(filter)?,
+                iterator: self.db.get_by_filter(filter).await?,
             },
         ))
     }

+ 10 - 9
crates/storage/base/src/storage.rs

@@ -2,7 +2,8 @@ use crate::Error;
 use nostr_rs_types::types::{Event, Filter};
 
 /// Trait to store/query events
-pub trait Storage {
+#[async_trait::async_trait]
+pub trait Storage: Send + Sync {
     /// Result iterators
     type Iterator<'a>: Iterator<Item = Result<Event, Error>>
     where
@@ -10,31 +11,31 @@ pub trait Storage {
 
     /// Stores a new event into the database. This action will also creates all
     /// the needed indexes to find this event later by reference, author, kind or tag.
-    fn store(&self, event: &Event) -> Result<bool, Error>;
+    async fn store(&self, event: &Event) -> Result<bool, Error>;
 
     /// Flags the current event as initiated locally. This useful to retrieve
     /// events using `get_local_events`.
-    fn set_local_event(&self, event: &Event) -> Result<(), Error>;
+    async fn set_local_event(&self, event: &Event) -> Result<(), Error>;
 
     /// Returns an event by its ID
-    fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error>;
+    async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error>;
 
     /// Get events from the database with a given filter
     ///
     /// The first step is to use one available index to get a list of event-IDs,
     /// then call `load_and_filter_events` that will load the events from the
     /// `Events` namespace and will filter them by the given parameters.
-    fn get_by_filter(&self, query: Filter) -> Result<Self::Iterator<'_>, Error>;
+    async fn get_by_filter(&self, query: Filter) -> Result<Self::Iterator<'_>, Error>;
 
     /// Return a vector of all local events
-    fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Iterator<'_>, Error>;
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Iterator<'_>, Error>;
 
     /// Stores an event, similar to store(Event), but keeps track of this event in a
     /// local index. This is useful to keep track of the events that are created by
     /// this node, to be broadcasted to new nodes in the future
-    fn store_local_event(&self, event: &Event) -> Result<bool, Error> {
-        let ret = self.store(event)?;
-        self.set_local_event(event)?;
+    async fn store_local_event(&self, event: &Event) -> Result<bool, Error> {
+        let ret = self.store(event).await?;
+        self.set_local_event(event).await?;
         Ok(ret)
     }
 }

+ 285 - 275
crates/storage/base/src/test.rs

@@ -2,297 +2,307 @@
 //!
 //! This crate will storage events into a database. It will also build index to
 //! find events by their tags, kind and references.
-    use super::*;
-    use nostr_rs_types::types::{Addr, Event, Filter, Kind};
-    use std::{
-        fs::File,
-        io::{BufRead, BufReader},
-    };
-
-    fn setup_db<T>(db: &T)
-    where
-        T: Storage,
-    {
-        let events = include_str!("../tests/events.json").lines()
-            .map(|line| serde_json::from_str(&line).expect("valid"))
-            .collect::<Vec<Event>>();
-
-        for event in events {
-            assert!(db.store(&event).expect("valid"));
-        }
-    }
-
-    pub fn store_and_get<T>(db: &T)
-    where
-        T: Storage,
-    {
-        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
-
-        let event: Event = serde_json::from_str(json).expect("valid");
-        assert_eq!(true, db.store(&event).expect("valid"));
-        assert_eq!(false, db.store(&event).expect("valid"));
-
-        let event1 = db.get_event(&event.id).expect("something");
-        assert_eq!(event1, Some(event));
-    }
-
-    pub fn records_are_sorted_by_date_desc<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
-            .try_into()
-            .expect("pk");
-
-        let vec = db
-            .get_by_filter(Filter {
-                authors: vec![pk],
-                limit: 10,
-                ..Default::default()
-            })
-            .expect("set of results")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
-        let mut sorted_dates = dates.clone();
-        sorted_dates.sort_by(|a, b| b.cmp(a));
-
-        assert_eq!(vec.len(), 10);
-        assert_eq!(dates, sorted_dates);
-    }
-
-    pub fn filter_by_references<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![
-                    "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                references_to_public_key: vec![
-                    "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 1);
+use super::*;
+use nostr_rs_types::types::{Addr, Event, Filter, Kind};
+use std::{
+    fs::File,
+    io::{BufRead, BufReader},
+};
+
+async fn setup_db<T>(db: &T)
+where
+    T: Storage,
+{
+    let events = include_str!("../tests/events.json")
+        .lines()
+        .map(|line| serde_json::from_str(&line).expect("valid"))
+        .collect::<Vec<Event>>();
+
+    for event in events {
+        assert!(db.store(&event).await.expect("valid"));
     }
-
-    pub fn filter_by_references_zero_match<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![
-                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                references_to_public_key: vec![
-                    "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 0);
-    }
-
-    pub fn filter_by_references_and_kind<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                kinds: vec![Kind::Reaction, Kind::ShortTextNote],
-                references_to_event: vec![
-                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 3);
-    }
-
-    pub fn get_event_and_related_events<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-            .try_into()
-            .expect("pk");
-
-        let events = db
-            .get_by_filter(Filter {
-                ids: vec![id.clone()],
-                ..Default::default()
-            })
-            .expect("events")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        assert_eq!(events.len(), 1);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![id],
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(related_events.len(), 2_538);
-
-        let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
-        kinds.sort();
-        kinds.dedup();
-
-        assert_eq!(Kind::Reaction, kinds[0]);
-        assert_eq!(Kind::Unknown(42), kinds[1]);
-    }
-
-    pub fn filter_by_authors<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+}
+
+pub async fn store_and_get<T>(db: &T)
+where
+    T: Storage,
+{
+    let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+    let event: Event = serde_json::from_str(json).expect("valid");
+    assert_eq!(true, db.store(&event).await.expect("valid"));
+    assert_eq!(false, db.store(&event).await.expect("valid"));
+
+    let event1 = db.get_event(&event.id).await.expect("something");
+    assert_eq!(event1, Some(event));
+}
+
+pub async fn records_are_sorted_by_date_desc<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
+        .try_into()
+        .expect("pk");
+
+    let vec = db
+        .get_by_filter(Filter {
+            authors: vec![pk],
+            limit: 10,
+            ..Default::default()
+        })
+        .await
+        .expect("set of results")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+
+    let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
+    let mut sorted_dates = dates.clone();
+    sorted_dates.sort_by(|a, b| b.cmp(a));
+
+    assert_eq!(vec.len(), 10);
+    assert_eq!(dates, sorted_dates);
+}
+
+pub async fn filter_by_references<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let related_events = db
+        .get_by_filter(Filter {
+            references_to_event: vec![
+                "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
                     .try_into()
                     .expect("pk"),
-                "38fb689f2fb92d932d457b1ea56715292bdf2140b2c7d282e8b8e8d644483ad6"
+            ],
+            references_to_public_key: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
                     .try_into()
                     .expect("pk"),
             ],
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 27);
-    }
-
-    pub fn filter_by_author<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+        })
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(related_events.len(), 1);
+}
+
+pub async fn filter_by_references_zero_match<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db);
+
+    let related_events = db
+        .get_by_filter(Filter {
+            references_to_event: vec![
+                "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            references_to_public_key: vec![
+                "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
                     .try_into()
                     .expect("pk"),
             ],
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 3);
-    }
-
-    pub fn filter_by_author_and_kinds<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+        })
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(related_events.len(), 0);
+}
+
+pub async fn filter_by_references_and_kind<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let related_events = db
+        .get_by_filter(Filter {
+            kinds: vec![Kind::Reaction, Kind::ShortTextNote],
+            references_to_event: vec![
+                "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
                     .try_into()
                     .expect("pk"),
             ],
-            kinds: vec![Kind::ShortTextNote, Kind::Reaction],
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("iterator")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 2);
-    }
+        })
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(related_events.len(), 3);
+}
+
+pub async fn get_event_and_related_events<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db);
+
+    let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+        .try_into()
+        .expect("pk");
+
+    let events = db
+        .get_by_filter(Filter {
+            ids: vec![id.clone()],
+            ..Default::default()
+        })
+        .await
+        .expect("events")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+
+    assert_eq!(events.len(), 1);
 
-    pub fn filter_by_kind<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-        let query = Filter {
-            kinds: vec![Kind::ShortTextNote],
+    let related_events = db
+        .get_by_filter(Filter {
+            references_to_event: vec![id],
+            ..Default::default()
+        })
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(related_events.len(), 2_538);
+
+    let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
+    kinds.sort();
+    kinds.dedup();
+
+    assert_eq!(Kind::Reaction, kinds[0]);
+    assert_eq!(Kind::Unknown(42), kinds[1]);
+}
+
+pub async fn filter_by_authors<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        authors: vec![
+            "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                .try_into()
+                .expect("pk"),
+            "38fb689f2fb92d932d457b1ea56715292bdf2140b2c7d282e8b8e8d644483ad6"
+                .try_into()
+                .expect("pk"),
+        ],
+        ..Default::default()
+    };
+    let records = db
+        .get_by_filter(query)
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(records.len(), 27);
+}
+
+pub async fn filter_by_author<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        authors: vec![
+            "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                .try_into()
+                .expect("pk"),
+        ],
+        ..Default::default()
+    };
+    let records = db
+        .get_by_filter(query)
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(records.len(), 3);
+}
+
+pub async fn filter_by_author_and_kinds<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        authors: vec![
+            "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                .try_into()
+                .expect("pk"),
+        ],
+        kinds: vec![Kind::ShortTextNote, Kind::Reaction],
+        ..Default::default()
+    };
+    let records = db
+        .get_by_filter(query)
+        .await
+        .expect("iterator")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(records.len(), 2);
+}
+
+pub async fn filter_by_kind<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+    let query = Filter {
+        kinds: vec![Kind::ShortTextNote],
+        ..Default::default()
+    };
+    let records = db
+        .get_by_filter(query)
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(records.len(), 1_511);
+    records
+        .iter()
+        .map(|x| x.kind())
+        .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
+}
+
+pub async fn get_local_events<T>(db: &T)
+where
+    T: Storage,
+{
+    setup_db(db).await;
+
+    let ids = db
+        .get_by_filter(Filter {
+            limit: 10,
             ..Default::default()
-        };
-        let records = db
-            .get_by_filter(query)
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(records.len(), 1_511);
-        records
-            .iter()
-            .map(|x| x.kind())
-            .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
+        })
+        .await
+        .expect("valid")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+
+    for event in ids.iter() {
+        db.set_local_event(event).await.expect("valid");
     }
 
-    pub fn get_local_events<T>(db: &T)
-    where
-        T: Storage,
-    {
-        setup_db(db);
-
-        let ids = db
-            .get_by_filter(Filter {
-                limit: 10,
-                ..Default::default()
-            })
-            .expect("valid")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        let x = ids
-            .iter()
-            .map(|event| db.set_local_event(event))
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-
-        assert_eq!(10, ids.len());
-        assert_eq!(10, x.len());
-
-        let records = db
-            .get_local_events(None)
-            .expect("valid iterator")
-            .collect::<Result<Vec<_>, _>>()
-            .expect("valid");
-        assert_eq!(x.len(), records.len())
-    }
+    assert_eq!(10, ids.len());
+
+    let records = db
+        .get_local_events(None)
+        .await
+        .expect("valid iterator")
+        .collect::<Result<Vec<_>, _>>()
+        .expect("valid");
+    assert_eq!(10, records.len())
+}

+ 1 - 0
crates/storage/rocksdb/Cargo.toml

@@ -15,6 +15,7 @@ rocksdb = { version = "0.20.1", features = [
 ] }
 chrono = "0.4.26"
 serde_json = "1.0"
+async-trait = "0.1.81"
 
 [dev-dependencies]
 nostr-rs-storage-base = { path = "../base", features = ["test"] }

+ 1 - 1
crates/storage/rocksdb/src/iterator.rs

@@ -63,7 +63,7 @@ impl<'a> Iterator for WrapperIterator<'a> {
                 // primary index to prefetch events that may satisfy the query
                 loop {
                     let prefix = self.prefixes.pop_front()?;
-                    if let Ok(Some(event)) = self.db.get_event(prefix) {
+                    if let Ok(Some(event)) = self.db.get_event(prefix).await {
                         if let Some(filter) = &self.filter {
                             if filter.check_event(&event) {
                                 self.returned += 1;

+ 17 - 12
crates/storage/rocksdb/src/lib.rs

@@ -1,5 +1,5 @@
 //! Rocks DB implementation of the storage layer
-use crate::{secondary_index::SecondaryIndex, iterator::WrapperIterator};
+use crate::{iterator::WrapperIterator, secondary_index::SecondaryIndex};
 use nostr_rs_storage_base::{Error, Storage};
 use nostr_rs_types::types::{Event, Filter, Tag};
 use rocksdb::{
@@ -89,9 +89,10 @@ impl RocksDb {
     }
 }
 
+#[async_trait::async_trait]
 impl Storage for RocksDb {
     type Iterator<'a> = WrapperIterator<'a>;
-    fn get_local_events(&self, limit: Option<usize>) -> Result<WrapperIterator<'_>, Error> {
+    async fn get_local_events(&self, limit: Option<usize>) -> Result<WrapperIterator<'_>, Error> {
         let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
         Ok(WrapperIterator {
             db: self,
@@ -105,18 +106,20 @@ impl Storage for RocksDb {
         })
     }
 
-    fn set_local_event(&self, event: &Event) -> Result<(), Error> {
+    async fn set_local_event(&self, event: &Event) -> Result<(), Error> {
         let event_id = &event.id;
         let secondary_index = SecondaryIndex::new(event_id, event.created_at());
-        self.db.put_cf(
-            &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
-            secondary_index.index_by([]),
-            event_id.deref(),
-        ).map_err(|e| Error::Unknown(e.to_string()))?;
+        self.db
+            .put_cf(
+                &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
+                secondary_index.index_by([]),
+                event_id.deref(),
+            )
+            .map_err(|e| Error::Unknown(e.to_string()))?;
         Ok(())
     }
 
-    fn store(&self, event: &Event) -> Result<bool, Error> {
+    async fn store(&self, event: &Event) -> Result<bool, Error> {
         let event_id = &event.id;
 
         if let Ok(Some(_)) = self.db.get_cf(
@@ -191,12 +194,14 @@ impl Storage for RocksDb {
             }
         }
 
-        self.db.write(buffer).map_err(|e| Error::Unknown(e.to_string()))?;
+        self.db
+            .write(buffer)
+            .map_err(|e| Error::Unknown(e.to_string()))?;
 
         Ok(true)
     }
 
-    fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
+    async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error> {
         Ok(self
             .db
             .get_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)
@@ -205,7 +210,7 @@ impl Storage for RocksDb {
             .transpose()?)
     }
 
-    fn get_by_filter(&self, mut query: Filter) -> Result<WrapperIterator<'_>, Error> {
+    async fn get_by_filter(&self, mut query: Filter) -> Result<WrapperIterator<'_>, Error> {
         let limit = if query.limit == 0 {
             None
         } else {

+ 18 - 8
src/main.rs

@@ -70,14 +70,24 @@ async fn main() {
             clients.connect_to(relayer_url.clone())
         });
 
-    let initial_subscription = Filter {
-        authors: config
-            .account
-            .iter()
-            .map(|x| x.id.clone())
-            .collect::<Vec<_>>(),
-        ..Default::default()
-    };
+    let initial_subscription = vec![
+        Filter {
+            authors: config
+                .account
+                .iter()
+                .map(|x| x.id.clone())
+                .collect::<Vec<_>>(),
+            ..Default::default()
+        },
+        Filter {
+            references_to_public_key: config
+                .account
+                .iter()
+                .map(|x| x.id.clone())
+                .collect::<Vec<_>>(),
+            ..Default::default()
+        },
+    ];
 
     let _ = client_pool.subscribe(initial_subscription.into()).await;
     loop {