Browse Source

Working on the storage layer

Cesar Rodas 1 year ago
parent
commit
c41d533ee2

+ 10 - 0
crates/storage/Cargo.toml

@@ -0,0 +1,10 @@
+[package]
+name = "nostr-rs-storage"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+serde_json = "1.0"
+nostr-rs-types = { path = "../types" }
+thiserror = "1.0.40"
+sled = { version = "0.34.7", features = ["compression"] }

+ 299 - 0
crates/storage/src/lib.rs

@@ -0,0 +1,299 @@
+use nostr_rs_types::types::{Event, Tag};
+use sled::{
+    transaction::{ConflictableTransactionError, TransactionError},
+    Subscriber,
+};
+use std::num::TryFromIntError;
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("Sled: {0}")]
+    Sled(#[from] sled::Error),
+    #[error("Serde: {0}")]
+    Serde(#[from] serde_json::Error),
+
+    #[error("Internal error: {0}")]
+    Internal(#[from] TryFromIntError),
+
+    #[error("Tx: {0}")]
+    Tx(String),
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum Prefix {
+    Author,
+    RefPublicKey,
+    RefEvent,
+}
+
+impl Into<&'static [u8]> for Prefix {
+    fn into(self) -> &'static [u8] {
+        match self {
+            Self::Author => b"a:",
+            Self::RefPublicKey => b"p:",
+            Self::RefEvent => b"t:",
+        }
+    }
+}
+
+pub struct Db {
+    pub db: sled::Db,
+}
+
+impl Db {
+    pub fn with_custom_db(db: sled::Db) -> Result<Self, Error> {
+        Ok(Self { db })
+    }
+    pub fn new(path: &str) -> Result<Self, Error> {
+        Self::with_custom_db(
+            sled::Config::default()
+                .cache_capacity(10_000_000_000)
+                .mode(sled::Mode::HighThroughput)
+                .flush_every_ms(Some(30_000))
+                .path(path)
+                .open()?,
+        )
+    }
+
+    #[inline]
+    fn get_key_with_prefix<T: AsRef<[u8]>>(prefix: Prefix, id: T) -> Vec<u8> {
+        let mut id_with_prefix: Vec<u8> = <Prefix as Into<&'static [u8]>>::into(prefix).to_vec();
+
+        id_with_prefix.extend_from_slice(id.as_ref());
+        id_with_prefix.push(b':');
+        id_with_prefix
+    }
+
+    /// Takes a vector of bytes and appends the current time in nanoseconds to
+    /// it, to make the ID unique enough and to make it sortable by time.
+    fn generate_unique_id<T: AsRef<[u8]>>(prefix: Prefix, id: T, order_by: u64) -> Vec<u8> {
+        let mut new_id = Self::get_key_with_prefix(prefix, id);
+
+        // We need to reverse the number, to sort it. Otherwise, the first is
+        // going to be the oldest entry, instead of the newest.
+        let reversed_order_by = u64::MAX
+            .checked_sub(order_by)
+            .unwrap_or_default()
+            .to_be_bytes();
+        new_id.extend_from_slice(&reversed_order_by);
+
+        let now = u128::MAX
+            .checked_sub(
+                std::time::SystemTime::now()
+                    .duration_since(std::time::UNIX_EPOCH)
+                    .unwrap()
+                    .as_nanos(),
+            )
+            .unwrap_or_default()
+            .to_be_bytes();
+
+        new_id.extend_from_slice(&now);
+        new_id
+    }
+
+    pub fn store(&self, event: &Event) -> Result<bool, Error> {
+        let event_id = event.id.clone();
+
+        if let Ok(Some(_)) = self.db.get(&*event_id) {
+            return Ok(false);
+        }
+        let created_at = event.created_at().timestamp().try_into()?;
+        let author_id = Self::generate_unique_id(Prefix::Author, event.author(), created_at);
+        let json = serde_json::to_vec(event)?;
+
+        let mut buffer = sled::Batch::default();
+        buffer.insert(&*event_id, json);
+        buffer.insert(author_id, &*event_id);
+
+        for tag in event.tags().iter() {
+            match tag {
+                Tag::PubKey(p) => {
+                    let foreign_id =
+                        Self::generate_unique_id(Prefix::RefPublicKey, &p.id, created_at);
+                    let local_id =
+                        Self::generate_unique_id(Prefix::RefEvent, &event_id, created_at);
+
+                    buffer.insert(foreign_id, &*event_id);
+                    buffer.insert(local_id, &*p.id);
+                }
+                Tag::Event(e) => {
+                    let foreign_id = Self::generate_unique_id(Prefix::RefEvent, &e.id, created_at);
+                    let local_id =
+                        Self::generate_unique_id(Prefix::RefEvent, &event_id, created_at);
+
+                    buffer.insert(foreign_id, &*event_id);
+                    buffer.insert(local_id, &*e.id);
+                }
+                _ => {}
+            }
+        }
+
+        self.db
+            .transaction(|tx| {
+                tx.apply_batch(&buffer)?;
+                Ok::<(), ConflictableTransactionError<()>>(())
+            })
+            .map_err(|e: TransactionError<_>| match e {
+                TransactionError::Abort(e) => Error::Tx(format!("{:?}", e)),
+                TransactionError::Storage(e) => Error::Sled(e),
+            })?;
+        Ok(true)
+    }
+
+    pub fn get_events_by_author<T: AsRef<[u8]>>(
+        &self,
+        public_key: T,
+        limit: Option<usize>,
+    ) -> Vec<Event> {
+        let prefix = Self::get_key_with_prefix(Prefix::Author, public_key);
+        let mut items = vec![];
+        let limit = limit.unwrap_or(usize::MAX);
+
+        for item in self.db.scan_prefix(prefix) {
+            if let Ok((_, value)) = item {
+                if let Some(value) = self.get_event(value) {
+                    items.push(value);
+                    if limit == items.len() {
+                        break;
+                    }
+                }
+            }
+        }
+
+        items
+    }
+
+    pub fn get_events_that_reference_to<T: AsRef<[u8]>>(
+        &self,
+        prefix: Prefix,
+        id: T,
+    ) -> Vec<Event> {
+        let prefix = Self::get_key_with_prefix(prefix, id);
+        let mut items = vec![];
+
+        for item in self.db.scan_prefix(prefix) {
+            if let Ok((_, value)) = item {
+                if let Some(value) = self.get_event(value) {
+                    items.push(value);
+                }
+            }
+        }
+
+        items
+    }
+
+    pub fn get_event_and_related_events<T: AsRef<[u8]>>(
+        &self,
+        id: T,
+    ) -> Option<(Event, Vec<Event>)> {
+        Some((
+            self.get_event(&id)?,
+            self.get_events_that_reference_to(Prefix::RefEvent, &id),
+        ))
+    }
+
+    pub fn subscribe_to<T: AsRef<[u8]>>(&self, prefix: Prefix, id: T) -> Subscriber {
+        self.db.watch_prefix(Self::get_key_with_prefix(prefix, id))
+    }
+
+    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Option<Event> {
+        self.db
+            .get(id)
+            .ok()?
+            .map(|event| serde_json::from_slice(&event))?
+            .ok()
+    }
+
+    pub fn total_events(&self) -> usize {
+        self.db.len()
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use nostr_rs_types::types::{Addr, Kind};
+    use std::{
+        fs::File,
+        io::{BufRead, BufReader},
+    };
+
+    #[test]
+    fn store_and_get() {
+        let db = Db::with_custom_db(sled::Config::default().temporary(true).open().expect("db"))
+            .expect("db");
+        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+        let event: Event = serde_json::from_str(json).expect("valid");
+        assert_eq!(true, db.store(&event).expect("valid"));
+        assert_eq!(false, db.store(&event).expect("valid"));
+
+        let event1 = db.get_event(&event.id);
+        assert_eq!(event1, Some(event));
+    }
+
+    fn get_db() -> Db {
+        let db = Db::with_custom_db(
+            sled::Config::default()
+                .temporary(true)
+                .cache_capacity(10_000_000_000)
+                .flush_every_ms(Some(10_000))
+                .open()
+                .expect("db"),
+        )
+        .expect("db");
+
+        let file = File::open("./tests/events.json").expect("file");
+        let events = BufReader::new(file)
+            .lines()
+            .map(|line| serde_json::from_str(&line.expect("line")).expect("valid"))
+            .collect::<Vec<Event>>();
+
+        for event in events {
+            assert!(db.store(&event).expect("valid"));
+        }
+        db
+    }
+
+    #[test]
+    fn records_are_sorted_by_date_desc() {
+        let db = get_db();
+
+        let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
+            .try_into()
+            .expect("pk");
+
+        let vec = db.get_events_by_author(pk, Some(10));
+        let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
+        let mut sorted_dates = dates.clone();
+        sorted_dates.sort_by(|a, b| b.cmp(a));
+
+        assert_eq!(vec.len(), 10);
+        assert_eq!(dates, sorted_dates);
+    }
+
+    #[test]
+    fn get_event_and_related_events() {
+        let db = get_db();
+
+        let pk: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+            .try_into()
+            .expect("pk");
+
+        let events = db.get_event_and_related_events(pk);
+        assert!(events.is_some());
+        let events = events.expect("events");
+        assert_eq!(
+            events.0.id.to_string(),
+            "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+        );
+        assert_eq!(events.1.len(), 2_538);
+
+        let mut kinds = events.1.iter().map(|x| x.kind()).collect::<Vec<_>>();
+        kinds.sort();
+        kinds.dedup();
+
+        assert_eq!(Kind::Reaction, kinds[0]);
+        assert_eq!(Kind::Unknown(42), kinds[1]);
+    }
+}

File diff suppressed because it is too large
+ 138 - 0
crates/storage/tests/events.json


+ 10 - 0
crates/types/src/types/event.rs

@@ -170,6 +170,16 @@ impl Event {
         })
     }
 
+    /// Returns the kind of this event
+    pub fn kind(&self) -> Kind {
+        self.inner.kind
+    }
+
+    /// Returns the date when this event was created
+    pub fn created_at(&self) -> DateTime<Utc> {
+        self.inner.created_at
+    }
+
     /// Returns the event author
     pub fn author(&self) -> &Id {
         &self.inner.public_key

+ 9 - 1
crates/types/src/types/kind.rs

@@ -13,7 +13,7 @@ use serde::{
 /// Any unsupported Kind will be wrapped under the Unknown type
 ///
 /// The Kind is represented as a u32 on the wire
-#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+#[derive(Debug, PartialEq, PartialOrd, Eq, Clone, Copy)]
 pub enum Kind {
     /// Metadata
     ///
@@ -50,6 +50,14 @@ pub enum Kind {
     Unknown(u32),
 }
 
+impl Ord for Kind {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        let kind_id: u32 = (*self).into();
+        let other_kind_id: u32 = (*other).into();
+        kind_id.cmp(&other_kind_id)
+    }
+}
+
 impl Kind {
     /// Is the Kind replaceable according to NIP-16
     pub fn is_replaceable(&self) -> bool {

Some files were not shown because too many files changed in this diff