Cesar Rodas 1 рік тому
батько
коміт
ad30e40402
3 змінених файлів з 247 додано та 137 видалено
  1. 1 0
      Cargo.lock
  2. 1 0
      crates/storage/Cargo.toml
  3. 245 137
      crates/storage/src/rocksdb.rs

+ 1 - 0
Cargo.lock

@@ -843,6 +843,7 @@ name = "nostr-rs-storage"
 version = "0.1.0"
 dependencies = [
  "nostr-rs-types",
+ "rand",
  "rocksdb",
  "serde_json",
  "thiserror",

+ 1 - 0
crates/storage/Cargo.toml

@@ -8,3 +8,4 @@ serde_json = "1.0"
 nostr-rs-types = { path = "../types" }
 thiserror = "1.0.40"
 rocksdb = { version = "0.20.1", features = ["multi-threaded-cf", "serde", "snappy"] }
+rand = "0.8.5"

+ 245 - 137
crates/storage/src/rocksdb.rs

@@ -1,6 +1,7 @@
 //! Rocks DB implementation of the storage layer
 use crate::Error;
 use nostr_rs_types::types::{Event, Filter, Tag};
+use rand::Rng;
 use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, Options, SliceTransform, WriteBatch, DB};
 use std::{collections::HashSet, path::Path, sync::Arc};
 
@@ -31,11 +32,18 @@ pub struct RocksDb {
     db: DB,
 }
 
-fn nanoseconds() -> u128 {
-    std::time::SystemTime::now()
+// Get current nanoseconds and use the last 3 digits as a random number (because
+// sometimes it comes as 0)
+fn nanoseconds_with_random() -> u128 {
+    let mut rng = rand::thread_rng();
+    let random_number = rng.gen_range(0..999);
+
+    let ts = std::time::SystemTime::now()
         .duration_since(std::time::UNIX_EPOCH)
         .unwrap()
-        .as_nanos()
+        .as_nanos();
+
+    ts.checked_add(random_number).unwrap_or(ts)
 }
 
 impl RocksDb {
@@ -63,7 +71,7 @@ impl RocksDb {
         options.set_compression_type(rocksdb::DBCompressionType::Snappy);
         options.optimize_for_point_lookup(256 * 1024 * 1024);
 
-        let prefix_extractor = SliceTransform::create_fixed_prefix(32);
+        let prefix_extractor = SliceTransform::create_fixed_prefix(1);
         options.set_prefix_extractor(prefix_extractor);
 
         Self::with_custom_db(options, path)
@@ -80,13 +88,13 @@ impl RocksDb {
         let reversed_order_by = u64::MAX
             .checked_sub(order_by)
             .unwrap_or_default()
-            .to_be_bytes();
+            .to_le_bytes();
         new_id.extend_from_slice(&reversed_order_by);
 
         let now = u128::MAX
-            .checked_sub(nanoseconds())
+            .checked_sub(nanoseconds_with_random())
             .unwrap_or_default()
-            .to_be_bytes();
+            .to_le_bytes();
 
         new_id.extend_from_slice(&now);
         new_id
@@ -117,7 +125,7 @@ impl RocksDb {
         let author_id = Self::generate_unique_id(event.author(), created_at);
         let json = serde_json::to_vec(event)?;
         let kind: u32 = event.kind().into();
-        let kind_id = Self::generate_unique_id(kind.to_be_bytes(), created_at);
+        let kind_id = Self::generate_unique_id(kind.to_le_bytes(), created_at);
 
         let mut buffer = WriteBatch::default();
 
@@ -178,13 +186,80 @@ impl RocksDb {
         Ok(true)
     }
 
+    /// Returns an event by its ID
+    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
+        Ok(self
+            .db
+            .get_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)?
+            .map(|event| serde_json::from_slice(&event))
+            .transpose()?)
+    }
+
+    /// Get events from the database with a given filter
+    ///
+    /// The first step is to use one available index to get a list of event-IDs,
+    /// then call `load_and_filter_events` that will load the events from the
+    /// `Events` namespace and will filter them by the given parameters.
+    pub fn get_by_filter(&self, mut query: Filter) -> Result<Vec<Event>, Error> {
+        let mut event_ids = HashSet::new();
+        let limit = if query.limit == 0 {
+            None
+        } else {
+            Some(query.limit.try_into()?)
+        };
+
+        if !query.references_to_event.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+            for public_key in query.references_to_event.iter() {
+                event_ids.extend(self.get_event_referenced_as(&ns, public_key.as_ref(), limit)?);
+            }
+            query.references_to_event.clear();
+        } else if !query.references_to_public_key.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::RefEvent)?;
+            for public_key in query.references_to_public_key.iter() {
+                event_ids.extend(self.get_event_referenced_as(&ns, public_key.as_ref(), limit)?);
+            }
+            query.references_to_public_key.clear();
+        } else if !query.ids.is_empty() {
+            query
+                .ids
+                .iter()
+                .map(|id| event_ids.insert(id.as_ref().to_vec()))
+                .for_each(drop);
+            query.ids.clear();
+        } else if !query.authors.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
+            for public_key in query.authors.iter() {
+                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
+                    &ns,
+                    public_key.as_ref(),
+                    limit,
+                )?);
+            }
+            query.authors.clear();
+        } else if !query.kinds.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
+            for kind in query.kinds.iter() {
+                let kind: u32 = (*kind).into();
+                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
+                    &ns,
+                    &kind.to_le_bytes(),
+                    limit,
+                )?);
+            }
+            query.kinds.clear();
+        }
+
+        self.load_and_filter_events(query, event_ids)
+    }
+
     /// Receives a list of event-IDs, and returns a list of events that match
     /// the given filter.
     ///
     /// The list of event-IDs should come from an index from any namespace, and
     /// the filter will load the events from the `Events` namespace and will
     /// filter them by the given parameters.
-    fn load_ids_and_filter(
+    fn load_and_filter_events(
         &self,
         mut query: Filter,
         mut event_ids: HashSet<Vec<u8>>,
@@ -203,11 +278,58 @@ impl RocksDb {
             .map(|id| id.as_ref())
             .collect::<Vec<_>>();
 
+        let mut tag_primary_keys = query
+            .references_to_public_key
+            .iter()
+            .map(|tag| tag.as_ref())
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect::<Vec<_>>();
+
+        let mut tag_events = query
+            .references_to_event
+            .iter()
+            .map(|tag| tag.as_ref())
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect::<Vec<_>>();
+
+        // Sort everything for a quick binary_search instead of a linear search
+        tag_events.sort();
+        tag_primary_keys.sort();
         authors.sort();
         query.kinds.sort();
 
         for id in event_ids.iter() {
             if let Some(event) = self.get_event(id)? {
+                if !tag_primary_keys.is_empty() || !tag_events.is_empty() && authors.is_empty() {
+                    let mut found = false;
+
+                    for tag in event.tags().iter() {
+                        match tag {
+                            Tag::Event(event) => {
+                                if tag_events.binary_search(&event.id.as_ref()).is_ok() {
+                                    found = true;
+                                }
+                            }
+                            Tag::PubKey(key) => {
+                                if tag_primary_keys.binary_search(&key.id.as_ref()).is_ok() {
+                                    found = true;
+                                }
+                            }
+                            _ => {}
+                        }
+
+                        if found {
+                            break;
+                        }
+                    }
+
+                    if !found {
+                        continue;
+                    }
+                }
+
                 if !authors.is_empty() && authors.binary_search(&event.author().as_ref()).is_err() {
                     continue;
                 }
@@ -231,57 +353,18 @@ impl RocksDb {
             }
         }
 
-        Ok(events)
-    }
-
-    /// Get events from the database with a given filter
-    pub fn get_events_by_filter(&self, mut query: Filter) -> Result<Vec<Event>, Error> {
-        let mut event_ids = HashSet::new();
-        let limit = if query.limit == 0 {
-            None
-        } else {
-            Some(query.limit.try_into()?)
-        };
+        // It should be sorted until an internal HashSet can be created, one that
+        // respects the inserts time, because the database is sorted already by time
+        // desc
+        events.sort_by(|a, b| b.created_at().cmp(&a.created_at()));
 
-        if !query.ids.is_empty() {
-            query
-                .ids
-                .iter()
-                .map(|id| event_ids.insert(id.as_ref().to_vec()))
-                .for_each(drop);
-            query.ids.clear();
-        } else if !query.authors.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
-            for public_key in query.authors.iter() {
-                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
-                    &ns,
-                    public_key.as_ref(),
-                    limit,
-                )?);
-            }
-            query.authors.clear();
-        } else if !query.kinds.is_empty() {
-            let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
-            for kind in query.kinds.iter() {
-                let kind: u32 = (*kind).into();
-                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
-                    &ns,
-                    &kind.to_be_bytes(),
-                    limit,
-                )?);
-            }
-            println!("{:?} {}", event_ids, event_ids.len());
-            query.kinds.clear();
-        }
-
-        self.load_ids_and_filter(query, event_ids)
+        Ok(events)
     }
 
-    /// Similar function to get_events_by_author but instead of loading the
-    /// whole event, only the event id is loaded. This is done for performance,
-    /// to avoid loading the same event multiple times, also to only load from
-    /// the database the events that are needed (according to limits and other
-    /// constraints)
+    /// Instead of loading the whole event, only the event id is loaded. This is
+    /// done for performance, to avoid loading the same event multiple times,
+    /// also to only load from the database the events that are needed
+    /// (according to limits and other constraints)
     fn get_event_ids_by_ns_and_prefix(
         &self,
         cf_handle: &Arc<BoundColumnFamily>,
@@ -291,9 +374,7 @@ impl RocksDb {
         let mut items = HashSet::new();
         let limit = limit.unwrap_or(usize::MAX);
 
-        println!("F: {:?}", prefix);
         for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
-            println!("fix");
             let (key, value) = match item {
                 Ok((k, v)) => (k, v),
                 Err(e) => return Err(Error::RocksDb(e)),
@@ -312,44 +393,18 @@ impl RocksDb {
         Ok(items)
     }
 
-    /// Get events by author
-    pub fn get_events_by_author<T: AsRef<[u8]>>(
-        &self,
-        public_key: T,
-        limit: Option<usize>,
-    ) -> Result<Vec<Event>, Error> {
-        let mut events: Vec<Event> = self
-            .get_event_ids_by_ns_and_prefix(
-                &self.reference_to_cf_handle(ReferenceType::Author)?,
-                public_key.as_ref(),
-                limit,
-            )?
-            .iter()
-            .map(|id| self.get_event(id))
-            .collect::<Result<Vec<Option<_>>, _>>()?
-            .into_iter()
-            .flatten()
-            .collect();
-
-        events.sort_by_key(|b| std::cmp::Reverse(b.created_at()));
-        Ok(events)
-    }
-
     /// Loads all events that references a given event. The reference-type is provided and it can be any value of `ReferenceType`.
     fn get_event_referenced_as<T: AsRef<[u8]>>(
         &self,
-        ref_type: ReferenceType,
+        cf_handle: &Arc<BoundColumnFamily>,
         id: T,
         limit: Option<usize>,
-    ) -> Result<Vec<Event>, Error> {
-        let mut items = vec![];
+    ) -> Result<HashSet<Vec<u8>>, Error> {
+        let mut items = HashSet::new();
         let limit = limit.unwrap_or(usize::MAX);
         let prefix = id.as_ref();
 
-        for item in self
-            .db
-            .prefix_iterator_cf(&self.reference_to_cf_handle(ref_type)?, prefix)
-        {
+        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
             let (key, value) = match item {
                 Ok((k, v)) => (k, v),
                 Err(e) => return Err(Error::RocksDb(e)),
@@ -359,41 +414,14 @@ impl RocksDb {
                 break;
             }
 
-            if let Some(value) = self.get_event(value)? {
-                items.push(value);
-                if limit == items.len() {
-                    break;
-                }
+            items.insert(value.to_vec());
+            if limit == items.len() {
+                break;
             }
         }
 
         Ok(items)
     }
-
-    /// Loads a given event and all their related events (likes, reposts, etc)
-    pub fn get_event_and_related_events<T: AsRef<[u8]>>(
-        &self,
-        id: T,
-        limit: Option<usize>,
-    ) -> Result<Option<(Event, Vec<Event>)>, Error> {
-        if let Some(r) = self.get_event(&id)? {
-            Ok(Some((
-                r,
-                self.get_event_referenced_as(ReferenceType::RefEvent, &id, limit)?,
-            )))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Returns an event by its ID
-    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
-        Ok(self
-            .db
-            .get_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)?
-            .map(|event| serde_json::from_slice(&event))
-            .transpose()?)
-    }
 }
 
 #[cfg(test)]
@@ -406,7 +434,7 @@ mod test {
     };
 
     fn get_db() -> RocksDb {
-        let db = RocksDb::new(format!("tests/db/{}", nanoseconds())).expect("db");
+        let db = RocksDb::new(format!("tests/db/{}", nanoseconds_with_random())).expect("db");
 
         let file = File::open("./tests/events.json").expect("file");
         let events = BufReader::new(file)
@@ -422,7 +450,7 @@ mod test {
 
     #[test]
     fn store_and_get() {
-        let db = RocksDb::new(format!("tests/db/{}", nanoseconds())).expect("db");
+        let db = RocksDb::new(format!("tests/db/{}", nanoseconds_with_random())).expect("db");
         let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
 
         let event: Event = serde_json::from_str(json).expect("valid");
@@ -441,7 +469,14 @@ mod test {
             .try_into()
             .expect("pk");
 
-        let vec = db.get_events_by_author(pk, Some(10)).expect("records");
+        let vec = db
+            .get_by_filter(Filter {
+                authors: vec![pk],
+                limit: 10,
+                ..Default::default()
+            })
+            .expect("set of results");
+
         let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
         let mut sorted_dates = dates.clone();
         sorted_dates.sort_by(|a, b| b.cmp(a));
@@ -451,6 +486,68 @@ mod test {
     }
 
     #[test]
+    fn filter_by_references() {
+        let db = get_db();
+
+        let related_events = db
+            .get_by_filter(Filter {
+                references_to_event: vec![
+                    "f513f1422ee5dbf30f57118b6cc34e788746e589a9b07be767664a164c57b9b1"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                references_to_public_key: vec![
+                    "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                ..Default::default()
+            })
+            .expect("events");
+        assert_eq!(related_events.len(), 1);
+    }
+
+    #[test]
+    fn filter_by_references_zero_match() {
+        let db = get_db();
+
+        let related_events = db
+            .get_by_filter(Filter {
+                references_to_event: vec![
+                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                references_to_public_key: vec![
+                    "36ce9f55828b06f4f45c7f7292ae58362f4abe746938888f82e56fe6fb7ffb2c"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                ..Default::default()
+            })
+            .expect("events");
+        assert_eq!(related_events.len(), 0);
+    }
+
+    #[test]
+    fn filter_by_references_and_kind() {
+        let db = get_db();
+
+        let related_events = db
+            .get_by_filter(Filter {
+                kinds: vec![Kind::Reaction, Kind::ShortTextNote],
+                references_to_event: vec![
+                    "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+                        .try_into()
+                        .expect("pk"),
+                ],
+                ..Default::default()
+            })
+            .expect("events");
+        assert_eq!(related_events.len(), 3);
+    }
+
+    #[test]
     fn get_event_and_related_events() {
         let db = get_db();
 
@@ -458,16 +555,24 @@ mod test {
             .try_into()
             .expect("pk");
 
-        let events = db.get_event_and_related_events(id, None).expect("events");
-        assert!(events.is_some());
-        let events = events.expect("events");
-        assert_eq!(
-            events.0.id.to_string(),
-            "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-        );
-        assert_eq!(events.1.len(), 2_538);
+        let event = db
+            .get_by_filter(Filter {
+                ids: vec![id.clone()],
+                ..Default::default()
+            })
+            .expect("events");
+
+        assert_eq!(event.len(), 1);
 
-        let mut kinds = events.1.iter().map(|x| x.kind()).collect::<Vec<_>>();
+        let related_events = db
+            .get_by_filter(Filter {
+                references_to_event: vec![id],
+                ..Default::default()
+            })
+            .expect("events");
+        assert_eq!(related_events.len(), 2_538);
+
+        let mut kinds = related_events.iter().map(|x| x.kind()).collect::<Vec<_>>();
         kinds.sort();
         kinds.dedup();
 
@@ -489,7 +594,7 @@ mod test {
             ],
             ..Default::default()
         };
-        let records = db.get_events_by_filter(query).expect("records");
+        let records = db.get_by_filter(query).expect("records");
         assert_eq!(records.len(), 27);
     }
 
@@ -504,7 +609,7 @@ mod test {
             ],
             ..Default::default()
         };
-        let records = db.get_events_by_filter(query).expect("records");
+        let records = db.get_by_filter(query).expect("records");
         assert_eq!(records.len(), 3);
     }
 
@@ -520,7 +625,7 @@ mod test {
             kinds: vec![Kind::ShortTextNote, Kind::Reaction],
             ..Default::default()
         };
-        let records = db.get_events_by_filter(query).expect("records");
+        let records = db.get_by_filter(query).expect("records");
         assert_eq!(records.len(), 2);
     }
 
@@ -531,8 +636,11 @@ mod test {
             kinds: vec![Kind::ShortTextNote],
             ..Default::default()
         };
-        let records = db.get_events_by_filter(query).expect("records");
-        assert_eq!(records.len(), 2_538);
-        println!("{:?}", records);
+        let records = db.get_by_filter(query).expect("records");
+        assert_eq!(records.len(), 1_511);
+        records
+            .iter()
+            .map(|x| x.kind())
+            .for_each(|x| assert_eq!(x, Kind::ShortTextNote));
     }
 }