Преглед изворни кода

Working on a better storage layer

Cesar Rodas пре 1 година
родитељ
комит
b90bdd6c5e
5 измењених фајлова са 631 додато и 461 уклоњено
  1. 34 24
      crates/relayer/src/relayer.rs
  2. 266 1
      crates/storage/src/lib.rs
  3. 128 436
      crates/storage/src/rocksdb.rs
  4. 39 0
      crates/storage/src/storage.rs
  5. 164 0
      crates/storage/src/util.rs

+ 34 - 24
crates/relayer/src/relayer.rs

@@ -1,5 +1,5 @@
 use crate::{Connection, Error, Subscription};
-use nostr_rs_storage::RocksDb;
+use nostr_rs_storage::Storage;
 use nostr_rs_types::{
     relayer,
     types::{Event, SubscriptionId},
@@ -18,8 +18,12 @@ type SubId = u128;
 
 type Subscriptions = HashMap<SubId, (SubscriptionId, Sender<Response>)>;
 
-pub struct Relayer {
-    storage: RocksDb,
+pub struct Relayer<T: Storage> {
+    /// Storage engine, if provided the services are going to persisted in disk,
+    /// otherwise all the messages are going to be ephimeral, making this
+    /// relayer just a dumb proxy (that can be useful for privacy) but it won't
+    /// be able to perform any optimization like prefetching content while offline
+    storage: Option<T>,
     /// Keeps a map between the internal subscription ID and the subscription
     /// type. One subscription ID may have multiple subscription types.
     ///
@@ -38,8 +42,8 @@ pub struct Relayer {
     sender: Sender<(u128, Request)>,
 }
 
-impl Relayer {
-    pub fn new(storage: RocksDb) -> (Arc<Self>, Receiver<(u128, Request)>) {
+impl<T: Storage> Relayer<T> {
+    pub fn new(storage: Option<T>) -> (Arc<Self>, Receiver<(u128, Request)>) {
         let (sender, receiver) = channel(100_000);
         (
             Arc::new(Self {
@@ -54,7 +58,7 @@ impl Relayer {
     }
 
     /// Returns a reference to the internal database
-    pub fn get_db(&self) -> &RocksDb {
+    pub fn get_db(&self) -> &Option<T> {
         &self.storage
     }
 
@@ -115,20 +119,22 @@ impl Relayer {
                 drop(subscriptions);
                 drop(sub_index);
 
-                // Sent all events that match the filter that are stored in our database
-                for filter in request.filters.clone().into_iter() {
-                    self.storage
-                        .get_by_filter(filter)?
-                        .into_iter()
-                        .for_each(|event| {
-                            let _ = connection.send(
-                                relayer::Event {
-                                    subscription_id: request.subscription_id.clone(),
-                                    event,
-                                }
-                                .into(),
-                            );
-                        });
+                if let Some(storage) = self.storage.as_ref() {
+                    // Sent all events that match the filter that are stored in our database
+                    for filter in request.filters.clone().into_iter() {
+                        storage
+                            .get_by_filter(filter)?
+                            .into_iter()
+                            .for_each(|event| {
+                                let _ = connection.send(
+                                    relayer::Event {
+                                        subscription_id: request.subscription_id.clone(),
+                                        event,
+                                    }
+                                    .into(),
+                                );
+                            });
+                    }
                 }
 
                 let _ = connection
@@ -193,7 +199,9 @@ impl Relayer {
 
     #[inline]
     pub fn store_and_broadcast_local_event(&self, event: &Event) {
-        let _ = self.storage.store_local_event(event);
+        if let Some(storage) = self.storage.as_ref() {
+            let _ = storage.store_local_event(event);
+        }
         let subscriptions = self.subscriptions.read();
 
         for subscription_type in Subscription::from_event(event) {
@@ -205,7 +213,9 @@ impl Relayer {
 
     #[inline]
     pub fn store_and_broadcast(&self, event: &Event) {
-        let _ = self.storage.store(event);
+        if let Some(storage) = self.storage.as_ref() {
+            let _ = storage.store(event);
+        }
         let subscriptions = self.subscriptions.read();
 
         for subscription_type in Subscription::from_event(event) {
@@ -246,7 +256,7 @@ mod test {
     #[tokio::test]
     async fn serve_listener_from_local_db() {
         let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
-        let (relayer, _) = Relayer::new(get_db(true));
+        let (relayer, _) = Relayer::new(Some(get_db(true)));
         let (connection, mut recv) = Connection::new_for_test();
         let _ = relayer.recv_request_from_client(&connection, request);
         // ev1
@@ -295,7 +305,7 @@ mod test {
     #[tokio::test]
     async fn server_listener_real_time() {
         let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
-        let (relayer, _) = Relayer::new(get_db(false));
+        let (relayer, _) = Relayer::new(Some(get_db(false)));
         let (connection, mut recv) = Connection::new_for_test();
         let _ = relayer.recv_request_from_client(&connection, request);
         // eod

+ 266 - 1
crates/storage/src/lib.rs

@@ -9,5 +9,270 @@
 #![deny(missing_docs, warnings)]
 mod error;
 mod rocksdb;
+mod storage;
+mod util;
 
-pub use crate::{error::Error, rocksdb::RocksDb};
+pub use crate::{error::Error, rocksdb::RocksDb, storage::Storage};
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use nostr_rs_types::types::{Addr, Event, Filter, Kind};
+    use std::{
+        fs::File,
+        io::{BufRead, BufReader},
+    };
+
+    fn setup_db<T: Storage>(db: &T) {
+        let file = File::open("./tests/events.json").expect("file");
+        let events = BufReader::new(file)
+            .lines()
+            .map(|line| serde_json::from_str(&line.expect("line")).expect("valid"))
+            .collect::<Vec<Event>>();
+
+        for event in events {
+            assert!(db.store(&event).expect("valid"));
+        }
+    }
+
+    pub fn store_and_get<T: Storage>(db: &T) {
+        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+        let event: Event = serde_json::from_str(json).expect("valid");
+        assert_eq!(true, db.store(&event).expect("valid"));
+        assert_eq!(false, db.store(&event).expect("valid"));
+
+        let event1 = db.get_event(&event.id).expect("something");
+        assert_eq!(event1, Some(event));
+    }
+
+    pub fn records_are_sorted_by_date_desc<T: Storage>(db: &T) {
+        setup_db(db);
+
+        let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
+            .try_into()
+            .expect("pk");
+
+        let mut vec = vec![];
+        db.get_by_filter(
+            Filter {
+                authors: vec![pk],
+                limit: 10,
+                ..Default::default()
+            },
+            |event| {
+                vec.push(event);
+                Ok(())
+            },
+        )
+        .expect("set of results");
+
+        let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
+        let mut sorted_dates = dates.clone();
+        sorted_dates.sort_by(|a, b| b.cmp(a));
+
+        assert_eq!(vec.len(), 10);
+        assert_eq!(dates, sorted_dates);
+    }
+
+    pub fn filter_by_references<T: Storage>(db: &T) {
+        setup_db(db);
+
+        let mut related_events = vec![];
+        db.get_by_filter(
+            Filter {
+                references_to_event: vec![
+                    "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                references_to_public_key: vec![
+                    "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                ..Default::default()
+            },
+            |event| {
+                related_events.push(event);
+                Ok(())
+            },
+        )
+        .expect("events");
+        assert_eq!(related_events.len(), 1);
+    }
+
+    pub fn filter_by_references_zero_match<T: Storage>(db: &T) {
+        setup_db(db);
+
+        let mut related_events = vec![];
+        db.get_by_filter(
+            Filter {
+                references_to_event: vec![
+                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                references_to_public_key: vec![
+                    "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                ..Default::default()
+            },
+            |event| {
+                related_events.push(event);
+                Ok(())
+            },
+        )
+        .expect("events");
+        assert_eq!(related_events.len(), 0);
+    }
+
+    pub fn filter_by_references_and_kind<T: Storage>(db: &T) {
+        setup_db(db);
+
+        let mut related_events = vec![];
+        db.get_by_filter(
+            Filter {
+                kinds: vec![Kind::Reaction, Kind::ShortTextNote],
+                references_to_event: vec![
+                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                ..Default::default()
+            },
+            |event| {
+                related_events.push(event);
+                Ok(())
+            },
+        )
+        .expect("events");
+        assert_eq!(related_events.len(), 3);
+    }
+
+    pub fn get_event_and_related_events<T: Storage>(db: &T) {
+        setup_db(db);
+
+        let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+            .try_into()
+            .expect("pk");
+
+        let mut events = vec![];
+        db.get_by_filter(
+            Filter {
+                ids: vec![id.clone()],
+                ..Default::default()
+            },
+            |event| {
+                events.push(event);
+                Ok(())
+            },
+        )
+        .expect("events");
+
+        assert_eq!(events.len(), 1);
+
+        let mut related_events = vec![];
+        db.get_by_filter(
+            Filter {
+                references_to_event: vec![id],
+                ..Default::default()
+            },
+            |event| {
+                related_events.push(event);
+                Ok(())
+            },
+        )
+        .expect("events");
+        assert_eq!(related_events.len(), 2_538);
+
+        let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
+        kinds.sort();
+        kinds.dedup();
+
+        assert_eq!(Kind::Reaction, kinds[0]);
+        assert_eq!(Kind::Unknown(42), kinds[1]);
+    }
+
+    pub fn filter_by_authors<T: Storage>(db: &T) {
+        setup_db(db);
+        let query = Filter {
+            authors: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                    .try_into()
+                    .expect("pk"),
+                "38fb689f2fb92d932d457b1ea56715292bdf2140b2c7d282e8b8e8d644483ad6"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            ..Default::default()
+        };
+        let mut records = vec![];
+        db.get_by_filter(query, |event| {
+            records.push(event);
+            Ok(())
+        })
+        .expect("records");
+        assert_eq!(records.len(), 27);
+    }
+
+    pub fn filter_by_author<T: Storage>(db: &T) {
+        setup_db(db);
+        let query = Filter {
+            authors: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            ..Default::default()
+        };
+        let mut records = vec![];
+        db.get_by_filter(query, |event| {
+            records.push(event);
+            Ok(())
+        })
+        .expect("records");
+        assert_eq!(records.len(), 3);
+    }
+
+    pub fn filter_by_author_and_kinds<T: Storage>(db: &T) {
+        setup_db(db);
+        let query = Filter {
+            authors: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            kinds: vec![Kind::ShortTextNote, Kind::Reaction],
+            ..Default::default()
+        };
+        let mut records = vec![];
+        db.get_by_filter(query, |event| {
+            records.push(event);
+            Ok(())
+        })
+        .expect("records");
+        assert_eq!(records.len(), 2);
+    }
+
+    pub fn filter_kind<T: Storage>(db: &T) {
+        setup_db(db);
+        let query = Filter {
+            kinds: vec![Kind::ShortTextNote],
+            ..Default::default()
+        };
+        let mut records = vec![];
+        db.get_by_filter(query, |event| {
+            records.push(event);
+            Ok(())
+        })
+        .expect("records");
+        assert_eq!(records.len(), 1_511);
+        records
+            .iter()
+            .map(|x| x.kind())
+            .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
+    }
+}

+ 128 - 436
crates/storage/src/rocksdb.rs

@@ -1,7 +1,9 @@
 //! Rocks DB implementation of the storage layer
-use crate::Error;
+use crate::{
+    util::{load_events_and_filter, SecondaryIndex},
+    Error, Storage,
+};
 use nostr_rs_types::types::{Event, Filter, Tag};
-use rand::Rng;
 use rocksdb::{
     BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
     DB,
@@ -37,20 +39,6 @@ pub struct RocksDb {
     db: DB,
 }
 
-// Get current nanoseconds and use the last 3 digits as a random number (because
-// sometimes it comes as 0)
-fn nanoseconds_with_random() -> u128 {
-    let mut rng = rand::thread_rng();
-    let random_number = rng.gen_range(0..999);
-
-    let ts = std::time::SystemTime::now()
-        .duration_since(std::time::UNIX_EPOCH)
-        .expect("time")
-        .as_nanos();
-
-    ts.checked_add(random_number).unwrap_or(ts)
-}
-
 impl RocksDb {
     /// Creates a new instance passing rocksDb options and a path
     pub fn with_custom_db<T: AsRef<Path>>(options: Options, path: T) -> Result<Self, Error> {
@@ -83,29 +71,6 @@ impl RocksDb {
         Self::with_custom_db(options, path)
     }
 
-    /// Takes a vector of bytes and appends the current time in nanoseconds to
-    /// it, to make the ID unique enough and to make it sortable by time.
-    #[inline]
-    fn generate_unique_id<T: AsRef<[u8]>>(id: T, order_by: u64) -> Vec<u8> {
-        let mut new_id = id.as_ref().to_vec();
-
-        // We need to reverse the number, to sort it. Otherwise, the first is
-        // going to be the oldest entry, instead of the newest.
-        let reversed_order_by = u64::MAX
-            .checked_sub(order_by)
-            .unwrap_or_default()
-            .to_le_bytes();
-        new_id.extend_from_slice(&reversed_order_by);
-
-        let now = u128::MAX
-            .checked_sub(nanoseconds_with_random())
-            .unwrap_or_default()
-            .to_le_bytes();
-
-        new_id.extend_from_slice(&now);
-        new_id
-    }
-
     #[inline]
     fn reference_to_cf_handle(
         &self,
@@ -116,48 +81,103 @@ impl RocksDb {
             .ok_or(Error::InvalidColumnFamily)
     }
 
-    /// Stores an event, similar to store(Event), but keeps track of this event in a
-    /// local index. This is useful to keep track of the events that are created by
-    /// this node, to be broadcasted to new nodes in the future
-    pub fn store_local_event(&self, event: &Event) -> Result<bool, Error> {
-        let ret = self.store(event)?;
+    /// Instead of loading the whole event, only the event id is loaded. This is
+    /// done for performance, to avoid loading the same event multiple times,
+    /// also to only load from the database the events that are needed
+    /// (according to limits and other constraints)
+    fn get_event_ids_by_ns_and_prefix(
+        &self,
+        cf_handle: &Arc<BoundColumnFamily>,
+        prefix: &[u8],
+        limit: Option<usize>,
+    ) -> Result<Vec<Vec<u8>>, Error> {
+        let mut items = vec![];
+        let limit = limit.unwrap_or(usize::MAX);
 
-        self.db.put_cf(
-            &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
-            *(event.id),
-            &[],
-        )?;
+        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
+            let (key, value) = match item {
+                Ok((k, v)) => (k, v),
+                Err(e) => return Err(Error::RocksDb(e)),
+            };
+
+            if !key.starts_with(prefix) {
+                break;
+            }
+
+            items.push(value.to_vec());
+            if items.len() >= limit {
+                break;
+            }
+        }
+
+        Ok(items)
+    }
+
+    /// Loads all events that references a given event. The reference-type is provided and it can be any value of `ReferenceType`.
+    fn get_event_referenced_as<T: AsRef<[u8]>>(
+        &self,
+        cf_handle: &Arc<BoundColumnFamily>,
+        id: T,
+        limit: Option<usize>,
+    ) -> Result<HashSet<Vec<u8>>, Error> {
+        let mut items = HashSet::new();
+        let limit = limit.unwrap_or(usize::MAX);
+        let prefix = id.as_ref();
 
-        Ok(ret)
+        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
+            let (key, value) = match item {
+                Ok((k, v)) => (k, v),
+                Err(e) => return Err(Error::RocksDb(e)),
+            };
+
+            if !key.starts_with(prefix) {
+                break;
+            }
+
+            items.insert(value.to_vec());
+            if limit == items.len() {
+                break;
+            }
+        }
+
+        Ok(items)
     }
+}
 
-    /// Return a vector of all local events
-    pub fn get_local_events(&self) -> Result<Vec<Event>, Error> {
+impl Storage for RocksDb {
+    fn get_local_events<C>(&self, for_each: C) -> Result<(), Error>
+    where
+        C: Fn(Event) -> Result<(), Error>,
+    {
         let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
-        Ok(self
-            .db
+        self.db
             .iterator_cf(&cf_handle, IteratorMode::Start)
             .into_iter()
             .map(|res| {
                 if let Ok((key, _)) = res {
                     if let Ok(Some(event)) = self.get_event(&key) {
-                        Some(event)
+                        for_each(event)
                     } else {
-                        None
+                        Ok(())
                     }
                 } else {
-                    None
+                    Ok(())
                 }
             })
-            .collect::<Vec<Option<Event>>>()
-            .into_iter()
-            .filter_map(|x| x)
-            .collect())
+            .collect::<Result<Vec<_>, Error>>()?;
+        Ok(())
     }
 
-    /// Stores a new event into the database. This action will also creates all
-    /// the needed indexes to find this event later by reference, author, kind or tag.
-    pub fn store(&self, event: &Event) -> Result<bool, Error> {
+    fn set_local_event(&self, event: &Event) -> Result<(), Error> {
+        self.db.put_cf(
+            &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
+            *(event.id),
+            &[],
+        )?;
+        Ok(())
+    }
+
+    fn store(&self, event: &Event) -> Result<bool, Error> {
         let event_id = event.id.clone();
 
         if let Ok(Some(_)) = self.db.get_cf(
@@ -166,11 +186,11 @@ impl RocksDb {
         ) {
             return Ok(false);
         }
-        let created_at = event.created_at().timestamp().try_into()?;
-        let author_id = Self::generate_unique_id(event.author(), created_at);
+        let secondary_index = SecondaryIndex::new(&event_id, event.created_at());
+        let author_id = secondary_index.new_id(event.author());
         let json = serde_json::to_vec(event)?;
         let kind: u32 = event.kind().into();
-        let kind_id = Self::generate_unique_id(kind.to_le_bytes(), created_at);
+        let kind_id = secondary_index.new_id(&kind.to_be_bytes());
 
         let mut buffer = WriteBatch::default();
 
@@ -193,8 +213,8 @@ impl RocksDb {
         for tag in event.tags().iter() {
             match tag {
                 Tag::PubKey(p) => {
-                    let foreign_id = Self::generate_unique_id(&p.id, created_at);
-                    let local_id = Self::generate_unique_id(&event_id, created_at);
+                    let foreign_id = secondary_index.new_id(&p.id);
+                    let local_id = secondary_index.new_id(&event_id);
 
                     buffer.put_cf(
                         &self.reference_to_cf_handle(ReferenceType::RefPublicKey)?,
@@ -208,8 +228,8 @@ impl RocksDb {
                     );
                 }
                 Tag::Event(e) => {
-                    let foreign_id = Self::generate_unique_id(&e.id, created_at);
-                    let local_id = Self::generate_unique_id(&event_id, created_at);
+                    let foreign_id = secondary_index.new_id(&e.id);
+                    let local_id = secondary_index.new_id(&event_id);
 
                     buffer.put_cf(
                         &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
@@ -231,8 +251,7 @@ impl RocksDb {
         Ok(true)
     }
 
-    /// Returns an event by its ID
-    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
+    fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
         Ok(self
             .db
             .get_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)?
@@ -240,13 +259,11 @@ impl RocksDb {
             .transpose()?)
     }
 
-    /// Get events from the database with a given filter
-    ///
-    /// The first step is to use one available index to get a list of event-IDs,
-    /// then call `load_and_filter_events` that will load the events from the
-    /// `Events` namespace and will filter them by the given parameters.
-    pub fn get_by_filter(&self, mut query: Filter) -> Result<Vec<Event>, Error> {
-        let mut event_ids = HashSet::new();
+    fn get_by_filter<C>(&self, mut query: Filter, for_each: C) -> Result<(), Error>
+    where
+        C: Fn(Event) -> Result<(), Error>,
+    {
+        let mut event_ids = vec![];
         let limit = if query.limit == 0 {
             None
         } else {
@@ -269,7 +286,7 @@ impl RocksDb {
             query
                 .ids
                 .iter()
-                .map(|id| event_ids.insert(id.as_ref().to_vec()))
+                .map(|id| event_ids.push(id.as_ref().to_vec()))
                 .for_each(drop);
             query.ids.clear();
         } else if !query.authors.is_empty() {
@@ -288,404 +305,79 @@ impl RocksDb {
                 let kind: u32 = (*kind).into();
                 event_ids.extend(self.get_event_ids_by_ns_and_prefix(
                     &ns,
-                    &kind.to_le_bytes(),
+                    &kind.to_be_bytes(),
                     limit,
                 )?);
             }
             query.kinds.clear();
         }
 
-        self.load_and_filter_events(query, event_ids)
-    }
-
-    /// Receives a list of event-IDs, and returns a list of events that match
-    /// the given filter.
-    ///
-    /// The list of event-IDs should come from an index from any namespace, and
-    /// the filter will load the events from the `Events` namespace and will
-    /// filter them by the given parameters.
-    fn load_and_filter_events(
-        &self,
-        mut query: Filter,
-        mut event_ids: HashSet<Vec<u8>>,
-    ) -> Result<Vec<Event>, Error> {
-        let mut events = vec![];
-
-        if !query.ids.is_empty() {
-            let mut ids = query.ids.iter().map(|id| id.as_ref()).collect::<Vec<_>>();
-            ids.sort();
-            event_ids.retain(|id| ids.binary_search(&id.as_slice()).is_ok());
-        }
-
-        let mut authors = query
-            .authors
-            .iter()
-            .map(|id| id.as_ref())
-            .collect::<Vec<_>>();
-
-        let mut tag_primary_keys = query
-            .references_to_public_key
-            .iter()
-            .map(|tag| tag.as_ref())
-            .collect::<HashSet<_>>()
-            .into_iter()
-            .collect::<Vec<_>>();
-
-        let mut tag_events = query
-            .references_to_event
-            .iter()
-            .map(|tag| tag.as_ref())
-            .collect::<HashSet<_>>()
-            .into_iter()
-            .collect::<Vec<_>>();
-
-        // Sort everything for a quick binary_search instead of a linear search
-        tag_events.sort();
-        tag_primary_keys.sort();
-        authors.sort();
-        query.kinds.sort();
-
-        for id in event_ids.iter() {
-            if let Some(event) = self.get_event(id)? {
-                if !tag_primary_keys.is_empty() || !tag_events.is_empty() && authors.is_empty() {
-                    let mut found = false;
-
-                    for tag in event.tags().iter() {
-                        match tag {
-                            Tag::Event(event) => {
-                                if tag_events.binary_search(&event.id.as_ref()).is_ok() {
-                                    found = true;
-                                }
-                            }
-                            Tag::PubKey(key) => {
-                                if tag_primary_keys.binary_search(&key.id.as_ref()).is_ok() {
-                                    found = true;
-                                }
-                            }
-                            _ => {}
-                        }
-
-                        if found {
-                            break;
-                        }
-                    }
-
-                    if !found {
-                        continue;
-                    }
-                }
-
-                if !authors.is_empty() && authors.binary_search(&event.author().as_ref()).is_err() {
-                    continue;
-                }
-                if !query.kinds.is_empty() && query.kinds.binary_search(&event.kind()).is_err() {
-                    continue;
-                }
-                if let Some(since) = query.since {
-                    if event.created_at() < since {
-                        continue;
-                    }
-                }
-                if let Some(until) = query.until {
-                    if event.created_at() > until {
-                        continue;
-                    }
-                }
-                events.push(event);
-                if events.len() as u64 == query.limit {
-                    break;
-                }
-            }
-        }
-
-        // It should be sorted until an internal HashSet can be created, one that
-        // respects the inserts time, because the database is sorted already by time
-        // desc
-        events.sort_by_key(|b| b.created_at());
-
-        Ok(events)
-    }
-
-    /// Instead of loading the whole event, only the event id is loaded. This is
-    /// done for performance, to avoid loading the same event multiple times,
-    /// also to only load from the database the events that are needed
-    /// (according to limits and other constraints)
-    fn get_event_ids_by_ns_and_prefix(
-        &self,
-        cf_handle: &Arc<BoundColumnFamily>,
-        prefix: &[u8],
-        limit: Option<usize>,
-    ) -> Result<HashSet<Vec<u8>>, Error> {
-        let mut items = HashSet::new();
-        let limit = limit.unwrap_or(usize::MAX);
-
-        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
-            let (key, value) = match item {
-                Ok((k, v)) => (k, v),
-                Err(e) => return Err(Error::RocksDb(e)),
-            };
-
-            if !key.starts_with(prefix) {
-                break;
-            }
-
-            items.insert(value.to_vec());
-            if items.len() >= limit {
-                break;
-            }
-        }
-
-        Ok(items)
-    }
-
-    /// Loads all events that references a given event. The reference-type is provided and it can be any value of `ReferenceType`.
-    fn get_event_referenced_as<T: AsRef<[u8]>>(
-        &self,
-        cf_handle: &Arc<BoundColumnFamily>,
-        id: T,
-        limit: Option<usize>,
-    ) -> Result<HashSet<Vec<u8>>, Error> {
-        let mut items = HashSet::new();
-        let limit = limit.unwrap_or(usize::MAX);
-        let prefix = id.as_ref();
-
-        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
-            let (key, value) = match item {
-                Ok((k, v)) => (k, v),
-                Err(e) => return Err(Error::RocksDb(e)),
-            };
-
-            if !key.starts_with(prefix) {
-                break;
-            }
-
-            items.insert(value.to_vec());
-            if limit == items.len() {
-                break;
-            }
-        }
-
-        Ok(items)
+        load_events_and_filter(self, query, event_ids, for_each)
     }
 }
 
 #[cfg(test)]
 mod test {
     use super::*;
-    use nostr_rs_types::types::{Addr, Kind};
-    use std::{
-        fs::File,
-        io::{BufRead, BufReader},
-    };
-
-    fn get_db() -> RocksDb {
-        let db = RocksDb::new(format!("tests/db/{}", nanoseconds_with_random())).expect("db");
-
-        let file = File::open("./tests/events.json").expect("file");
-        let events = BufReader::new(file)
-            .lines()
-            .map(|line| serde_json::from_str(&line.expect("line")).expect("valid"))
-            .collect::<Vec<Event>>();
-
-        for event in events {
-            assert!(db.store(&event).expect("valid"));
-        }
-        db
-    }
+    use crate::{test, util::unique_nanoseconds};
 
     #[test]
     fn store_and_get() {
-        let db = RocksDb::new(format!("tests/db/{}", nanoseconds_with_random())).expect("db");
-        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
-
-        let event: Event = serde_json::from_str(json).expect("valid");
-        assert_eq!(true, db.store(&event).expect("valid"));
-        assert_eq!(false, db.store(&event).expect("valid"));
-
-        let event1 = db.get_event(&event.id).expect("something");
-        assert_eq!(event1, Some(event));
-    }
-
-    #[test]
-    fn records_are_sorted_by_date_asc() {
-        let db = get_db();
-
-        let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
-            .try_into()
-            .expect("pk");
-
-        let vec = db
-            .get_by_filter(Filter {
-                authors: vec![pk],
-                limit: 10,
-                ..Default::default()
-            })
-            .expect("set of results");
-
-        let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
-        let mut sorted_dates = dates.clone();
-        sorted_dates.sort_by(|a, b| a.cmp(b));
-
-        assert_eq!(vec.len(), 10);
-        assert_eq!(dates, sorted_dates);
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::store_and_get(&db)
     }
 
     #[test]
-    fn filter_by_references() {
-        let db = get_db();
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![
-                    "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                references_to_public_key: vec![
-                    "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("events");
-        assert_eq!(related_events.len(), 1);
+    fn records_are_sorted_by_date_desc() {
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::records_are_sorted_by_date_desc(&db)
     }
 
     #[test]
     fn filter_by_references_zero_match() {
-        let db = get_db();
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![
-                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                references_to_public_key: vec![
-                    "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("events");
-        assert_eq!(related_events.len(), 0);
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::filter_by_references_zero_match(&db)
     }
 
     #[test]
     fn filter_by_references_and_kind() {
-        let db = get_db();
-
-        let related_events = db
-            .get_by_filter(Filter {
-                kinds: vec![Kind::Reaction, Kind::ShortTextNote],
-                references_to_event: vec![
-                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-                        .try_into()
-                        .expect("pk"),
-                ],
-                ..Default::default()
-            })
-            .expect("events");
-        assert_eq!(related_events.len(), 3);
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::filter_by_references_and_kind(&db)
     }
 
     #[test]
     fn get_event_and_related_events() {
-        let db = get_db();
-
-        let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-            .try_into()
-            .expect("pk");
-
-        let event = db
-            .get_by_filter(Filter {
-                ids: vec![id.clone()],
-                ..Default::default()
-            })
-            .expect("events");
-
-        assert_eq!(event.len(), 1);
-
-        let related_events = db
-            .get_by_filter(Filter {
-                references_to_event: vec![id],
-                ..Default::default()
-            })
-            .expect("events");
-        assert_eq!(related_events.len(), 2_538);
-
-        let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
-        kinds.sort();
-        kinds.dedup();
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::get_event_and_related_events(&db)
+    }
 
-        assert_eq!(Kind::Reaction, kinds[0]);
-        assert_eq!(Kind::Unknown(42), kinds[1]);
+    #[test]
+    fn filter_by_author() {
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::filter_by_author(&db)
     }
 
     #[test]
     fn filter_by_authors() {
-        let db = get_db();
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
-                    .try_into()
-                    .expect("pk"),
-                "38fb689f2fb92d932d457b1ea56715292bdf2140b2c7d282e8b8e8d644483ad6"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            ..Default::default()
-        };
-        let records = db.get_by_filter(query).expect("records");
-        assert_eq!(records.len(), 27);
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::filter_by_authors(&db)
     }
 
     #[test]
-    fn filter_by_author() {
-        let db = get_db();
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            ..Default::default()
-        };
-        let records = db.get_by_filter(query).expect("records");
-        assert_eq!(records.len(), 3);
+    fn filter_kind() {
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::filter_kind(&db)
     }
 
     #[test]
-    fn filter_by_author_and_kinds() {
-        let db = get_db();
-        let query = Filter {
-            authors: vec![
-                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
-                    .try_into()
-                    .expect("pk"),
-            ],
-            kinds: vec![Kind::ShortTextNote, Kind::Reaction],
-            ..Default::default()
-        };
-        let records = db.get_by_filter(query).expect("records");
-        assert_eq!(records.len(), 2);
+    fn filter_by_references() {
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::filter_by_references(&db)
     }
 
     #[test]
-    fn filter_kind() {
-        let db = get_db();
-        let query = Filter {
-            kinds: vec![Kind::ShortTextNote],
-            ..Default::default()
-        };
-        let records = db.get_by_filter(query).expect("records");
-        assert_eq!(records.len(), 1_511);
-        records
-            .iter()
-            .map(|x| x.kind())
-            .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
+    fn filter_by_author_and_kinds() {
+        let db = RocksDb::new(format!("tests/db/{}", unique_nanoseconds())).expect("db");
+        test::filter_by_author_and_kinds(&db)
     }
 }

+ 39 - 0
crates/storage/src/storage.rs

@@ -0,0 +1,39 @@
+use crate::Error;
+use nostr_rs_types::types::{Event, Filter};
+
+/// Trait to store/query events
+pub trait Storage {
+    /// Stores a new event into the database. This action will also creates all
+    /// the needed indexes to find this event later by reference, author, kind or tag.
+    fn store(&self, event: &Event) -> Result<bool, Error>;
+
+    /// Flags the current event as initiated locally. This useful to retrieve
+    /// events using `get_local_events`.
+    fn set_local_event(&self, event: &Event) -> Result<(), Error>;
+
+    /// Returns an event by its ID
+    fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error>;
+
+    /// Get events from the database with a given filter
+    ///
+    /// The first step is to use one available index to get a list of event-IDs,
+    /// then call `load_and_filter_events` that will load the events from the
+    /// `Events` namespace and will filter them by the given parameters.
+    fn get_by_filter<C>(&self, query: Filter, for_each: C) -> Result<(), Error>
+    where
+        C: Fn(Event) -> Result<(), Error>;
+
+    /// Return a vector of all local events
+    fn get_local_events<C>(&self, for_each: C) -> Result<(), Error>
+    where
+        C: Fn(Event) -> Result<(), Error>;
+
+    /// Stores an event, similar to store(Event), but keeps track of this event in a
+    /// local index. This is useful to keep track of the events that are created by
+    /// this node, to be broadcasted to new nodes in the future
+    fn store_local_event(&self, event: &Event) -> Result<bool, Error> {
+        let ret = self.store(event)?;
+        self.set_local_event(event)?;
+        Ok(ret)
+    }
+}

+ 164 - 0
crates/storage/src/util.rs

@@ -0,0 +1,164 @@
+use crate::{Error, Storage};
+use chrono::{DateTime, Utc};
+use nostr_rs_types::types::{Event, Filter, Id, Tag};
+use rand::Rng;
+use std::collections::HashSet;
+
+/// Creates a secondary index for a given event ID. The secondary index is
+/// sorted by created time in a descendent order, meaning that newer events will
+/// be first.
+///
+/// This is useful to leverage natural sorting of the database, and to be able
+/// to create secondary indexes to find by author, kind, tag, etc, and have that
+/// result sorted by date naturally
+pub struct SecondaryIndex<'a> {
+    entry_id: &'a Id,
+    sort_by: [u8; 8],
+}
+
+impl<'a> SecondaryIndex<'a> {
+    pub fn new(entry_id: &'a Id, date_time: DateTime<Utc>) -> Self {
+        // We need to reverse the number, to sort it. Otherwise, the first is
+        // going to be the oldest entry, instead of the newest.
+        let reversed_order_by = u64::MAX
+            .checked_sub(date_time.timestamp() as u64)
+            .unwrap_or_default()
+            .to_be_bytes();
+        Self {
+            entry_id,
+            sort_by: reversed_order_by,
+        }
+    }
+
+    pub fn new_id<T: AsRef<[u8]>>(&self, index_by: T) -> Vec<u8> {
+        let mut new_id = index_by.as_ref().to_vec();
+        new_id.extend_from_slice(&self.sort_by);
+        new_id.extend_from_slice(self.entry_id.as_ref());
+        new_id
+    }
+}
+
+// Get current nanoseconds and use the last 3 digits as a random number (because
+// sometimes it comes as 0)
+#[inline]
+#[allow(dead_code)]
+pub fn unique_nanoseconds() -> u128 {
+    let mut rng = rand::thread_rng();
+    let random_number = rng.gen_range(0..999);
+
+    let ts = std::time::SystemTime::now()
+        .duration_since(std::time::UNIX_EPOCH)
+        .expect("time")
+        .as_nanos();
+
+    ts.checked_add(random_number).unwrap_or(ts)
+}
+
+/// Receives a list of event-IDs, and returns a list of events that match
+/// the given filter.
+///
+/// The list of event-IDs should come from an index from any namespace, and
+/// the filter will load the events from the `Events` namespace and will
+/// filter them by the given parameters.
+#[inline]
+pub fn load_events_and_filter<T: Storage, C>(
+    db: &T,
+    mut query: Filter,
+    mut event_ids: Vec<Vec<u8>>,
+    for_each: C,
+) -> Result<(), Error>
+where
+    C: Fn(Event) -> Result<(), Error>,
+{
+    if !query.ids.is_empty() {
+        let mut ids = query.ids.iter().map(|id| id.as_ref()).collect::<Vec<_>>();
+        ids.sort();
+        event_ids.retain(|id| ids.binary_search(&id.as_slice()).is_ok());
+    }
+
+    let mut authors = query
+        .authors
+        .iter()
+        .map(|id| id.as_ref())
+        .collect::<Vec<_>>();
+
+    let mut tag_primary_keys = query
+        .references_to_public_key
+        .iter()
+        .map(|tag| tag.as_ref())
+        .collect::<HashSet<_>>()
+        .into_iter()
+        .collect::<Vec<_>>();
+
+    let mut tag_events = query
+        .references_to_event
+        .iter()
+        .map(|tag| tag.as_ref())
+        .collect::<HashSet<_>>()
+        .into_iter()
+        .collect::<Vec<_>>();
+
+    // Sort everything for a quick binary_search instead of a linear search
+    tag_events.sort();
+    tag_primary_keys.sort();
+    authors.sort();
+    query.kinds.sort();
+
+    let mut found = 0;
+
+    for id in event_ids.iter() {
+        if let Some(event) = db.get_event(id)? {
+            if !tag_primary_keys.is_empty() || !tag_events.is_empty() && authors.is_empty() {
+                let mut found = false;
+
+                for tag in event.tags().iter() {
+                    match tag {
+                        Tag::Event(event) => {
+                            if tag_events.binary_search(&event.id.as_ref()).is_ok() {
+                                found = true;
+                            }
+                        }
+                        Tag::PubKey(key) => {
+                            if tag_primary_keys.binary_search(&key.id.as_ref()).is_ok() {
+                                found = true;
+                            }
+                        }
+                        _ => {}
+                    }
+
+                    if found {
+                        break;
+                    }
+                }
+
+                if !found {
+                    continue;
+                }
+            }
+
+            if !authors.is_empty() && authors.binary_search(&event.author().as_ref()).is_err() {
+                continue;
+            }
+            if !query.kinds.is_empty() && query.kinds.binary_search(&event.kind()).is_err() {
+                continue;
+            }
+            if let Some(since) = query.since {
+                if event.created_at() < since {
+                    continue;
+                }
+            }
+            if let Some(until) = query.until {
+                if event.created_at() > until {
+                    continue;
+                }
+            }
+            for_each(event)?;
+            found += 1;
+            if found == query.limit {
+                break;
+            }
+        }
+    }
+
+    Ok(())
+}