Sfoglia il codice sorgente

Use rocksdb instead

Cesar Rodas 1 anno fa
parent
commit
be0551cde4
2 ha cambiato i file con 150 aggiunte e 127 eliminazioni
  1. 1 1
      crates/storage/Cargo.toml
  2. 149 126
      crates/storage/src/lib.rs

+ 1 - 1
crates/storage/Cargo.toml

@@ -7,4 +7,4 @@ edition = "2021"
 serde_json = "1.0"
 nostr-rs-types = { path = "../types" }
 thiserror = "1.0.40"
-sled = { version = "0.34.7", features = ["compression"] }
+rocksdb = { version = "0.20.1", features = ["multi-threaded-cf", "serde", "snappy"] }

+ 149 - 126
crates/storage/src/lib.rs

@@ -1,14 +1,12 @@
 use nostr_rs_types::types::{Event, Tag};
-use sled::{
-    transaction::{ConflictableTransactionError, TransactionError},
-    Subscriber,
-};
-use std::num::TryFromIntError;
+use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, Options, SliceTransform, WriteBatch, DB};
+use std::{num::TryFromIntError, path::Path, sync::Arc};
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
-    #[error("Sled: {0}")]
-    Sled(#[from] sled::Error),
+    #[error("DB: {0}")]
+    DB(#[from] rocksdb::Error),
+
     #[error("Serde: {0}")]
     Serde(#[from] serde_json::Error),
 
@@ -17,57 +15,74 @@ pub enum Error {
 
     #[error("Tx: {0}")]
     Tx(String),
+
+    #[error("Unknown family column")]
+    InvalidColumnFamily,
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
-pub enum Prefix {
+pub enum Namespace {
+    Events,
     Author,
     RefPublicKey,
     RefEvent,
 }
 
-impl Into<&'static [u8]> for Prefix {
-    fn into(self) -> &'static [u8] {
+impl Namespace {
+    #[inline]
+    pub fn as_str(&self) -> &'static str {
         match self {
-            Self::Author => b"a:",
-            Self::RefPublicKey => b"p:",
-            Self::RefEvent => b"t:",
+            Self::Events => "events",
+            Self::Author => "authors",
+            Self::RefPublicKey => "refs_by_public_key",
+            Self::RefEvent => "refs_by_ids",
         }
     }
 }
 
 pub struct Db {
-    pub db: sled::Db,
+    db: DB,
+}
+
+fn nanoseconds() -> u128 {
+    std::time::SystemTime::now()
+        .duration_since(std::time::UNIX_EPOCH)
+        .unwrap()
+        .as_nanos()
 }
 
 impl Db {
-    pub fn with_custom_db(db: sled::Db) -> Result<Self, Error> {
+    pub fn with_custom_db<T: AsRef<Path>>(options: Options, path: T) -> Result<Self, Error> {
+        let db = DB::open_cf_descriptors(
+            &options,
+            path,
+            vec![
+                ColumnFamilyDescriptor::new(Namespace::Events.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(Namespace::Author.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(Namespace::RefEvent.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(Namespace::RefPublicKey.as_str(), options.clone()),
+            ],
+        )?;
         Ok(Self { db })
     }
-    pub fn new(path: &str) -> Result<Self, Error> {
-        Self::with_custom_db(
-            sled::Config::default()
-                .cache_capacity(10_000_000_000)
-                .mode(sled::Mode::HighThroughput)
-                .flush_every_ms(Some(30_000))
-                .path(path)
-                .open()?,
-        )
-    }
+    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
+        let mut options = Options::default();
+        options.create_if_missing(true);
+        options.create_missing_column_families(true);
+        options.set_compression_type(rocksdb::DBCompressionType::Snappy);
+        options.optimize_for_point_lookup(256 * 1024 * 1024);
 
-    #[inline]
-    fn get_key_with_prefix<T: AsRef<[u8]>>(prefix: Prefix, id: T) -> Vec<u8> {
-        let mut id_with_prefix: Vec<u8> = <Prefix as Into<&'static [u8]>>::into(prefix).to_vec();
+        let prefix_extractor = SliceTransform::create_fixed_prefix(32);
+        options.set_prefix_extractor(prefix_extractor);
 
-        id_with_prefix.extend_from_slice(id.as_ref());
-        id_with_prefix.push(b':');
-        id_with_prefix
+        Self::with_custom_db(options, path)
     }
 
     /// Takes a vector of bytes and appends the current time in nanoseconds to
     /// it, to make the ID unique enough and to make it sortable by time.
-    fn generate_unique_id<T: AsRef<[u8]>>(prefix: Prefix, id: T, order_by: u64) -> Vec<u8> {
-        let mut new_id = Self::get_key_with_prefix(prefix, id);
+    #[inline]
+    fn generate_unique_id<T: AsRef<[u8]>>(id: T, order_by: u64) -> Vec<u8> {
+        let mut new_id = id.as_ref().to_vec();
 
         // We need to reverse the number, to sort it. Otherwise, the first is
         // going to be the oldest entry, instead of the newest.
@@ -78,12 +93,7 @@ impl Db {
         new_id.extend_from_slice(&reversed_order_by);
 
         let now = u128::MAX
-            .checked_sub(
-                std::time::SystemTime::now()
-                    .duration_since(std::time::UNIX_EPOCH)
-                    .unwrap()
-                    .as_nanos(),
-            )
+            .checked_sub(nanoseconds())
             .unwrap_or_default()
             .to_be_bytes();
 
@@ -91,52 +101,49 @@ impl Db {
         new_id
     }
 
+    fn ns(&self, namespace: Namespace) -> Result<Arc<BoundColumnFamily>, Error> {
+        self.db
+            .cf_handle(namespace.as_str())
+            .ok_or(Error::InvalidColumnFamily)
+    }
+
     pub fn store(&self, event: &Event) -> Result<bool, Error> {
         let event_id = event.id.clone();
 
-        if let Ok(Some(_)) = self.db.get(&*event_id) {
+        if let Ok(Some(_)) = self.db.get_cf(&self.ns(Namespace::Events)?, *event_id) {
             return Ok(false);
         }
         let created_at = event.created_at().timestamp().try_into()?;
-        let author_id = Self::generate_unique_id(Prefix::Author, event.author(), created_at);
+        let author_id = Self::generate_unique_id(event.author(), created_at);
         let json = serde_json::to_vec(event)?;
 
-        let mut buffer = sled::Batch::default();
-        buffer.insert(&*event_id, json);
-        buffer.insert(author_id, &*event_id);
+        let mut buffer = WriteBatch::default();
+
+        buffer.put_cf(&self.ns(Namespace::Events)?, *event_id, json);
+        buffer.put_cf(&self.ns(Namespace::Author)?, author_id, *event_id);
 
         for tag in event.tags().iter() {
             match tag {
                 Tag::PubKey(p) => {
-                    let foreign_id =
-                        Self::generate_unique_id(Prefix::RefPublicKey, &p.id, created_at);
-                    let local_id =
-                        Self::generate_unique_id(Prefix::RefEvent, &event_id, created_at);
+                    let foreign_id = Self::generate_unique_id(&p.id, created_at);
+                    let local_id = Self::generate_unique_id(&event_id, created_at);
 
-                    buffer.insert(foreign_id, &*event_id);
-                    buffer.insert(local_id, &*p.id);
+                    buffer.put_cf(&self.ns(Namespace::RefPublicKey)?, foreign_id, *event_id);
+                    buffer.put_cf(&self.ns(Namespace::RefEvent)?, local_id, &*p.id);
                 }
                 Tag::Event(e) => {
-                    let foreign_id = Self::generate_unique_id(Prefix::RefEvent, &e.id, created_at);
-                    let local_id =
-                        Self::generate_unique_id(Prefix::RefEvent, &event_id, created_at);
+                    let foreign_id = Self::generate_unique_id(&e.id, created_at);
+                    let local_id = Self::generate_unique_id(&event_id, created_at);
 
-                    buffer.insert(foreign_id, &*event_id);
-                    buffer.insert(local_id, &*e.id);
+                    buffer.put_cf(&self.ns(Namespace::RefEvent)?, foreign_id, *event_id);
+                    buffer.put_cf(&self.ns(Namespace::RefEvent)?, local_id, &*e.id);
                 }
                 _ => {}
             }
         }
 
-        self.db
-            .transaction(|tx| {
-                tx.apply_batch(&buffer)?;
-                Ok::<(), ConflictableTransactionError<()>>(())
-            })
-            .map_err(|e: TransactionError<_>| match e {
-                TransactionError::Abort(e) => Error::Tx(format!("{:?}", e)),
-                TransactionError::Storage(e) => Error::Sled(e),
-            })?;
+        self.db.write(buffer).map_err(Error::DB)?;
+
         Ok(true)
     }
 
@@ -144,68 +151,93 @@ impl Db {
         &self,
         public_key: T,
         limit: Option<usize>,
-    ) -> Vec<Event> {
-        let prefix = Self::get_key_with_prefix(Prefix::Author, public_key);
+    ) -> Result<Vec<Event>, Error> {
         let mut items = vec![];
         let limit = limit.unwrap_or(usize::MAX);
+        let prefix = public_key.as_ref();
+
+        for item in self
+            .db
+            .prefix_iterator_cf(&self.ns(Namespace::Author)?, prefix)
+        {
+            let (key, value) = match item {
+                Ok((k, v)) => (k, v),
+                Err(e) => return Err(Error::DB(e)),
+            };
+
+            if !key.starts_with(prefix) {
+                break;
+            }
 
-        for item in self.db.scan_prefix(prefix) {
-            if let Ok((_, value)) = item {
-                if let Some(value) = self.get_event(value) {
-                    items.push(value);
-                    if limit == items.len() {
-                        break;
-                    }
+            if let Some(value) = self.get_event(value)? {
+                items.push(value);
+                if limit == items.len() {
+                    break;
                 }
             }
         }
 
-        items
+        Ok(items)
     }
 
     pub fn get_events_that_reference_to<T: AsRef<[u8]>>(
         &self,
-        prefix: Prefix,
+        ns: Namespace,
         id: T,
-    ) -> Vec<Event> {
-        let prefix = Self::get_key_with_prefix(prefix, id);
+        limit: Option<usize>,
+    ) -> Result<Vec<Event>, Error> {
         let mut items = vec![];
+        let limit = limit.unwrap_or(usize::MAX);
+        let prefix = id.as_ref();
+
+        for item in self.db.prefix_iterator_cf(&self.ns(ns)?, prefix) {
+            let (key, value) = match item {
+                Ok((k, v)) => (k, v),
+                Err(e) => return Err(Error::DB(e)),
+            };
+
+            if !key.starts_with(prefix) {
+                break;
+            }
 
-        for item in self.db.scan_prefix(prefix) {
-            if let Ok((_, value)) = item {
-                if let Some(value) = self.get_event(value) {
-                    items.push(value);
+            if let Some(value) = self.get_event(value)? {
+                items.push(value);
+                if limit == items.len() {
+                    break;
                 }
             }
         }
 
-        items
+        Ok(items)
     }
 
     pub fn get_event_and_related_events<T: AsRef<[u8]>>(
         &self,
         id: T,
-    ) -> Option<(Event, Vec<Event>)> {
-        Some((
-            self.get_event(&id)?,
-            self.get_events_that_reference_to(Prefix::RefEvent, &id),
-        ))
-    }
-
-    pub fn subscribe_to<T: AsRef<[u8]>>(&self, prefix: Prefix, id: T) -> Subscriber {
-        self.db.watch_prefix(Self::get_key_with_prefix(prefix, id))
-    }
-
-    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Option<Event> {
-        self.db
-            .get(id)
-            .ok()?
-            .map(|event| serde_json::from_slice(&event))?
-            .ok()
+        limit: Option<usize>,
+    ) -> Result<Option<(Event, Vec<Event>)>, Error> {
+        if let Some(r) = self.get_event(&id)? {
+            Ok(Some((
+                r,
+                self.get_events_that_reference_to(Namespace::RefEvent, &id, limit)?,
+            )))
+        } else {
+            Ok(None)
+        }
     }
 
-    pub fn total_events(&self) -> usize {
-        self.db.len()
+    /*
+        pub fn subscribe_to<T: AsRef<[u8]>>(&self, prefix: Namespace, id: T) -> Subscriber {
+            self.db.watch_prefix(Self::get_key_with_prefix(prefix, id))
+        }
+    */
+
+    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
+        Ok(self
+            .db
+            .get_cf(&self.ns(Namespace::Events)?, id)?
+            .map(|event| serde_json::from_slice(&event))
+            .transpose()?)
     }
 }
 
@@ -218,30 +250,8 @@ mod test {
         io::{BufRead, BufReader},
     };
 
-    #[test]
-    fn store_and_get() {
-        let db = Db::with_custom_db(sled::Config::default().temporary(true).open().expect("db"))
-            .expect("db");
-        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
-
-        let event: Event = serde_json::from_str(json).expect("valid");
-        assert_eq!(true, db.store(&event).expect("valid"));
-        assert_eq!(false, db.store(&event).expect("valid"));
-
-        let event1 = db.get_event(&event.id);
-        assert_eq!(event1, Some(event));
-    }
-
     fn get_db() -> Db {
-        let db = Db::with_custom_db(
-            sled::Config::default()
-                .temporary(true)
-                .cache_capacity(10_000_000_000)
-                .flush_every_ms(Some(10_000))
-                .open()
-                .expect("db"),
-        )
-        .expect("db");
+        let db = Db::new(format!("tests/db/{}", nanoseconds())).expect("db");
 
         let file = File::open("./tests/events.json").expect("file");
         let events = BufReader::new(file)
@@ -256,6 +266,19 @@ mod test {
     }
 
     #[test]
+    fn store_and_get() {
+        let db = Db::new(format!("tests/db/{}", nanoseconds())).expect("db");
+        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+        let event: Event = serde_json::from_str(json).expect("valid");
+        assert_eq!(true, db.store(&event).expect("valid"));
+        assert_eq!(false, db.store(&event).expect("valid"));
+
+        let event1 = db.get_event(&event.id).expect("something");
+        assert_eq!(event1, Some(event));
+    }
+
+    #[test]
     fn records_are_sorted_by_date_desc() {
         let db = get_db();
 
@@ -263,7 +286,7 @@ mod test {
             .try_into()
             .expect("pk");
 
-        let vec = db.get_events_by_author(pk, Some(10));
+        let vec = db.get_events_by_author(pk, Some(10)).expect("records");
         let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
         let mut sorted_dates = dates.clone();
         sorted_dates.sort_by(|a, b| b.cmp(a));
@@ -280,7 +303,7 @@ mod test {
             .try_into()
             .expect("pk");
 
-        let events = db.get_event_and_related_events(pk);
+        let events = db.get_event_and_related_events(pk, None).expect("events");
         assert!(events.is_some());
         let events = events.expect("events");
         assert_eq!(