Quellcode durchsuchen

Working on a better storage

Cesar Rodas vor 1 Jahr
Ursprung
Commit
c001ff499c
3 geänderte Dateien mit 577 neuen und 322 gelöschten Zeilen
  1. 26 0
      crates/storage/src/error.rs
  2. 13 322
      crates/storage/src/lib.rs
  3. 538 0
      crates/storage/src/rocksdb.rs

+ 26 - 0
crates/storage/src/error.rs

@@ -0,0 +1,26 @@
+//! Errors for this crate
+use std::num::TryFromIntError;
+
+/// Error
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    /// Internal database error
+    #[error("RocksDB: {0}")]
+    RocksDb(#[from] rocksdb::Error),
+
+    /// Serialization error
+    #[error("Serde: {0}")]
+    Serde(#[from] serde_json::Error),
+
+    /// Internal error while converting types to integer
+    #[error("Internal error: {0}")]
+    Internal(#[from] TryFromIntError),
+
+    /// Transaction error
+    #[error("Tx: {0}")]
+    Tx(String),
+
+    /// Internal error
+    #[error("Unknown family column")]
+    InvalidColumnFamily,
+}

+ 13 - 322
crates/storage/src/lib.rs

@@ -1,322 +1,13 @@
-use nostr_rs_types::types::{Event, Tag};
-use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, Options, SliceTransform, WriteBatch, DB};
-use std::{num::TryFromIntError, path::Path, sync::Arc};
-
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
-    #[error("DB: {0}")]
-    DB(#[from] rocksdb::Error),
-
-    #[error("Serde: {0}")]
-    Serde(#[from] serde_json::Error),
-
-    #[error("Internal error: {0}")]
-    Internal(#[from] TryFromIntError),
-
-    #[error("Tx: {0}")]
-    Tx(String),
-
-    #[error("Unknown family column")]
-    InvalidColumnFamily,
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
-pub enum Namespace {
-    Events,
-    Author,
-    RefPublicKey,
-    RefEvent,
-}
-
-impl Namespace {
-    #[inline]
-    pub fn as_str(&self) -> &'static str {
-        match self {
-            Self::Events => "events",
-            Self::Author => "authors",
-            Self::RefPublicKey => "refs_by_public_key",
-            Self::RefEvent => "refs_by_ids",
-        }
-    }
-}
-
-pub struct Db {
-    db: DB,
-}
-
-fn nanoseconds() -> u128 {
-    std::time::SystemTime::now()
-        .duration_since(std::time::UNIX_EPOCH)
-        .unwrap()
-        .as_nanos()
-}
-
-impl Db {
-    pub fn with_custom_db<T: AsRef<Path>>(options: Options, path: T) -> Result<Self, Error> {
-        let db = DB::open_cf_descriptors(
-            &options,
-            path,
-            vec![
-                ColumnFamilyDescriptor::new(Namespace::Events.as_str(), options.clone()),
-                ColumnFamilyDescriptor::new(Namespace::Author.as_str(), options.clone()),
-                ColumnFamilyDescriptor::new(Namespace::RefEvent.as_str(), options.clone()),
-                ColumnFamilyDescriptor::new(Namespace::RefPublicKey.as_str(), options.clone()),
-            ],
-        )?;
-        Ok(Self { db })
-    }
-    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
-        let mut options = Options::default();
-        options.create_if_missing(true);
-        options.create_missing_column_families(true);
-        options.set_compression_type(rocksdb::DBCompressionType::Snappy);
-        options.optimize_for_point_lookup(256 * 1024 * 1024);
-
-        let prefix_extractor = SliceTransform::create_fixed_prefix(32);
-        options.set_prefix_extractor(prefix_extractor);
-
-        Self::with_custom_db(options, path)
-    }
-
-    /// Takes a vector of bytes and appends the current time in nanoseconds to
-    /// it, to make the ID unique enough and to make it sortable by time.
-    #[inline]
-    fn generate_unique_id<T: AsRef<[u8]>>(id: T, order_by: u64) -> Vec<u8> {
-        let mut new_id = id.as_ref().to_vec();
-
-        // We need to reverse the number, to sort it. Otherwise, the first is
-        // going to be the oldest entry, instead of the newest.
-        let reversed_order_by = u64::MAX
-            .checked_sub(order_by)
-            .unwrap_or_default()
-            .to_be_bytes();
-        new_id.extend_from_slice(&reversed_order_by);
-
-        let now = u128::MAX
-            .checked_sub(nanoseconds())
-            .unwrap_or_default()
-            .to_be_bytes();
-
-        new_id.extend_from_slice(&now);
-        new_id
-    }
-
-    fn ns(&self, namespace: Namespace) -> Result<Arc<BoundColumnFamily>, Error> {
-        self.db
-            .cf_handle(namespace.as_str())
-            .ok_or(Error::InvalidColumnFamily)
-    }
-
-    pub fn store(&self, event: &Event) -> Result<bool, Error> {
-        let event_id = event.id.clone();
-
-        if let Ok(Some(_)) = self.db.get_cf(&self.ns(Namespace::Events)?, *event_id) {
-            return Ok(false);
-        }
-        let created_at = event.created_at().timestamp().try_into()?;
-        let author_id = Self::generate_unique_id(event.author(), created_at);
-        let json = serde_json::to_vec(event)?;
-
-        let mut buffer = WriteBatch::default();
-
-        buffer.put_cf(&self.ns(Namespace::Events)?, *event_id, json);
-        buffer.put_cf(&self.ns(Namespace::Author)?, author_id, *event_id);
-
-        for tag in event.tags().iter() {
-            match tag {
-                Tag::PubKey(p) => {
-                    let foreign_id = Self::generate_unique_id(&p.id, created_at);
-                    let local_id = Self::generate_unique_id(&event_id, created_at);
-
-                    buffer.put_cf(&self.ns(Namespace::RefPublicKey)?, foreign_id, *event_id);
-                    buffer.put_cf(&self.ns(Namespace::RefEvent)?, local_id, &*p.id);
-                }
-                Tag::Event(e) => {
-                    let foreign_id = Self::generate_unique_id(&e.id, created_at);
-                    let local_id = Self::generate_unique_id(&event_id, created_at);
-
-                    buffer.put_cf(&self.ns(Namespace::RefEvent)?, foreign_id, *event_id);
-                    buffer.put_cf(&self.ns(Namespace::RefEvent)?, local_id, &*e.id);
-                }
-                _ => {}
-            }
-        }
-
-        self.db.write(buffer).map_err(Error::DB)?;
-
-        Ok(true)
-    }
-
-    pub fn get_events_by_author<T: AsRef<[u8]>>(
-        &self,
-        public_key: T,
-        limit: Option<usize>,
-    ) -> Result<Vec<Event>, Error> {
-        let mut items = vec![];
-        let limit = limit.unwrap_or(usize::MAX);
-        let prefix = public_key.as_ref();
-
-        for item in self
-            .db
-            .prefix_iterator_cf(&self.ns(Namespace::Author)?, prefix)
-        {
-            let (key, value) = match item {
-                Ok((k, v)) => (k, v),
-                Err(e) => return Err(Error::DB(e)),
-            };
-
-            if !key.starts_with(prefix) {
-                break;
-            }
-
-            if let Some(value) = self.get_event(value)? {
-                items.push(value);
-                if limit == items.len() {
-                    break;
-                }
-            }
-        }
-
-        Ok(items)
-    }
-
-    pub fn get_events_that_reference_to<T: AsRef<[u8]>>(
-        &self,
-        ns: Namespace,
-        id: T,
-        limit: Option<usize>,
-    ) -> Result<Vec<Event>, Error> {
-        let mut items = vec![];
-        let limit = limit.unwrap_or(usize::MAX);
-        let prefix = id.as_ref();
-
-        for item in self.db.prefix_iterator_cf(&self.ns(ns)?, prefix) {
-            let (key, value) = match item {
-                Ok((k, v)) => (k, v),
-                Err(e) => return Err(Error::DB(e)),
-            };
-
-            if !key.starts_with(prefix) {
-                break;
-            }
-
-            if let Some(value) = self.get_event(value)? {
-                items.push(value);
-                if limit == items.len() {
-                    break;
-                }
-            }
-        }
-
-        Ok(items)
-    }
-
-    pub fn get_event_and_related_events<T: AsRef<[u8]>>(
-        &self,
-        id: T,
-        limit: Option<usize>,
-    ) -> Result<Option<(Event, Vec<Event>)>, Error> {
-        if let Some(r) = self.get_event(&id)? {
-            Ok(Some((
-                r,
-                self.get_events_that_reference_to(Namespace::RefEvent, &id, limit)?,
-            )))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /*
-        pub fn subscribe_to<T: AsRef<[u8]>>(&self, prefix: Namespace, id: T) -> Subscriber {
-            self.db.watch_prefix(Self::get_key_with_prefix(prefix, id))
-        }
-    */
-
-    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
-        Ok(self
-            .db
-            .get_cf(&self.ns(Namespace::Events)?, id)?
-            .map(|event| serde_json::from_slice(&event))
-            .transpose()?)
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use super::*;
-    use nostr_rs_types::types::{Addr, Kind};
-    use std::{
-        fs::File,
-        io::{BufRead, BufReader},
-    };
-
-    fn get_db() -> Db {
-        let db = Db::new(format!("tests/db/{}", nanoseconds())).expect("db");
-
-        let file = File::open("./tests/events.json").expect("file");
-        let events = BufReader::new(file)
-            .lines()
-            .map(|line| serde_json::from_str(&line.expect("line")).expect("valid"))
-            .collect::<Vec<Event>>();
-
-        for event in events {
-            assert!(db.store(&event).expect("valid"));
-        }
-        db
-    }
-
-    #[test]
-    fn store_and_get() {
-        let db = Db::new(format!("tests/db/{}", nanoseconds())).expect("db");
-        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
-
-        let event: Event = serde_json::from_str(json).expect("valid");
-        assert_eq!(true, db.store(&event).expect("valid"));
-        assert_eq!(false, db.store(&event).expect("valid"));
-
-        let event1 = db.get_event(&event.id).expect("something");
-        assert_eq!(event1, Some(event));
-    }
-
-    #[test]
-    fn records_are_sorted_by_date_desc() {
-        let db = get_db();
-
-        let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
-            .try_into()
-            .expect("pk");
-
-        let vec = db.get_events_by_author(pk, Some(10)).expect("records");
-        let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
-        let mut sorted_dates = dates.clone();
-        sorted_dates.sort_by(|a, b| b.cmp(a));
-
-        assert_eq!(vec.len(), 10);
-        assert_eq!(dates, sorted_dates);
-    }
-
-    #[test]
-    fn get_event_and_related_events() {
-        let db = get_db();
-
-        let pk: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-            .try_into()
-            .expect("pk");
-
-        let events = db.get_event_and_related_events(pk, None).expect("events");
-        assert!(events.is_some());
-        let events = events.expect("events");
-        assert_eq!(
-            events.0.id.to_string(),
-            "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
-        );
-        assert_eq!(events.1.len(), 2_538);
-
-        let mut kinds = events.1.iter().map(|x| x.kind()).collect::<Vec<_>>();
-        kinds.sort();
-        kinds.dedup();
-
-        assert_eq!(Kind::Reaction, kinds[0]);
-        assert_eq!(Kind::Unknown(42), kinds[1]);
-    }
-}
+//! # Nostr Storage
+//!
+//! This crate will storage events into a database. It will also build index to
+//! find events by their tags, kind and references.
+//!
+//! Although the trait is generic this implementation uses rocksDB internal,
+//! which is a C++ database open source by Facebook. It is high level enough to
+//! let this crate build the needed indexes to find events quickly.
+#![deny(missing_docs, warnings)]
+mod error;
+mod rocksdb;
+
+pub use crate::{error::Error, rocksdb::RocksDb};

+ 538 - 0
crates/storage/src/rocksdb.rs

@@ -0,0 +1,538 @@
+//! Rocks DB implementation of the storage layer
+use crate::Error;
+use nostr_rs_types::types::{Event, Filter, Tag};
+use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, Options, SliceTransform, WriteBatch, DB};
+use std::{collections::HashSet, path::Path, sync::Arc};
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+enum ReferenceType {
+    Events,
+    Author,
+    RefPublicKey,
+    RefEvent,
+    Kind,
+}
+
+impl ReferenceType {
+    #[inline]
+    pub fn as_str(&self) -> &'static str {
+        match self {
+            Self::Events => "events",
+            Self::Author => "authors",
+            Self::RefPublicKey => "refs_by_public_key",
+            Self::RefEvent => "refs_by_ids",
+            Self::Kind => "kinds",
+        }
+    }
+}
+
+/// Storage implementation using RocksDB
+pub struct RocksDb {
+    db: DB,
+}
+
+fn nanoseconds() -> u128 {
+    std::time::SystemTime::now()
+        .duration_since(std::time::UNIX_EPOCH)
+        .unwrap()
+        .as_nanos()
+}
+
+impl RocksDb {
+    /// Creates a new instance passing rocksDb options and a path
+    pub fn with_custom_db<T: AsRef<Path>>(options: Options, path: T) -> Result<Self, Error> {
+        let db = DB::open_cf_descriptors(
+            &options,
+            path,
+            vec![
+                ColumnFamilyDescriptor::new(ReferenceType::Events.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(ReferenceType::Author.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(ReferenceType::RefEvent.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(ReferenceType::RefPublicKey.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(ReferenceType::Kind.as_str(), options.clone()),
+            ],
+        )?;
+        Ok(Self { db })
+    }
+
+    /// Creates a new instance, passing just the path, the default options are passed
+    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
+        let mut options = Options::default();
+        options.create_if_missing(true);
+        options.create_missing_column_families(true);
+        options.set_compression_type(rocksdb::DBCompressionType::Snappy);
+        options.optimize_for_point_lookup(256 * 1024 * 1024);
+
+        let prefix_extractor = SliceTransform::create_fixed_prefix(32);
+        options.set_prefix_extractor(prefix_extractor);
+
+        Self::with_custom_db(options, path)
+    }
+
+    /// Takes a vector of bytes and appends the current time in nanoseconds to
+    /// it, to make the ID unique enough and to make it sortable by time.
+    #[inline]
+    fn generate_unique_id<T: AsRef<[u8]>>(id: T, order_by: u64) -> Vec<u8> {
+        let mut new_id = id.as_ref().to_vec();
+
+        // We need to reverse the number, to sort it. Otherwise, the first is
+        // going to be the oldest entry, instead of the newest.
+        let reversed_order_by = u64::MAX
+            .checked_sub(order_by)
+            .unwrap_or_default()
+            .to_be_bytes();
+        new_id.extend_from_slice(&reversed_order_by);
+
+        let now = u128::MAX
+            .checked_sub(nanoseconds())
+            .unwrap_or_default()
+            .to_be_bytes();
+
+        new_id.extend_from_slice(&now);
+        new_id
+    }
+
+    #[inline]
+    fn reference_to_cf_handle(
+        &self,
+        namespace: ReferenceType,
+    ) -> Result<Arc<BoundColumnFamily>, Error> {
+        self.db
+            .cf_handle(namespace.as_str())
+            .ok_or(Error::InvalidColumnFamily)
+    }
+
+    /// Stores a new event into the database. This action will also creates all
+    /// the needed indexes to find this event later by reference, author, kind or tag.
+    pub fn store(&self, event: &Event) -> Result<bool, Error> {
+        let event_id = event.id.clone();
+
+        if let Ok(Some(_)) = self.db.get_cf(
+            &self.reference_to_cf_handle(ReferenceType::Events)?,
+            *event_id,
+        ) {
+            return Ok(false);
+        }
+        let created_at = event.created_at().timestamp().try_into()?;
+        let author_id = Self::generate_unique_id(event.author(), created_at);
+        let json = serde_json::to_vec(event)?;
+        let kind: u32 = event.kind().into();
+        let kind_id = Self::generate_unique_id(kind.to_be_bytes(), created_at);
+
+        let mut buffer = WriteBatch::default();
+
+        buffer.put_cf(
+            &self.reference_to_cf_handle(ReferenceType::Events)?,
+            *event_id,
+            json,
+        );
+        buffer.put_cf(
+            &self.reference_to_cf_handle(ReferenceType::Author)?,
+            author_id,
+            *event_id,
+        );
+        buffer.put_cf(
+            &self.reference_to_cf_handle(ReferenceType::Kind)?,
+            kind_id,
+            *event_id,
+        );
+
+        for tag in event.tags().iter() {
+            match tag {
+                Tag::PubKey(p) => {
+                    let foreign_id = Self::generate_unique_id(&p.id, created_at);
+                    let local_id = Self::generate_unique_id(&event_id, created_at);
+
+                    buffer.put_cf(
+                        &self.reference_to_cf_handle(ReferenceType::RefPublicKey)?,
+                        foreign_id,
+                        *event_id,
+                    );
+                    buffer.put_cf(
+                        &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
+                        local_id,
+                        &*p.id,
+                    );
+                }
+                Tag::Event(e) => {
+                    let foreign_id = Self::generate_unique_id(&e.id, created_at);
+                    let local_id = Self::generate_unique_id(&event_id, created_at);
+
+                    buffer.put_cf(
+                        &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
+                        foreign_id,
+                        *event_id,
+                    );
+                    buffer.put_cf(
+                        &self.reference_to_cf_handle(ReferenceType::RefEvent)?,
+                        local_id,
+                        &*e.id,
+                    );
+                }
+                _ => {}
+            }
+        }
+
+        self.db.write(buffer).map_err(Error::RocksDb)?;
+
+        Ok(true)
+    }
+
+    /// Receives a list of event-IDs, and returns a list of events that match
+    /// the given filter.
+    ///
+    /// The list of event-IDs should come from an index from any namespace, and
+    /// the filter will load the events from the `Events` namespace and will
+    /// filter them by the given parameters.
+    fn load_ids_and_filter(
+        &self,
+        mut query: Filter,
+        mut event_ids: HashSet<Vec<u8>>,
+    ) -> Result<Vec<Event>, Error> {
+        let mut events = vec![];
+
+        if !query.ids.is_empty() {
+            let mut ids = query.ids.iter().map(|id| id.as_ref()).collect::<Vec<_>>();
+            ids.sort();
+            event_ids.retain(|id| ids.binary_search(&id.as_slice()).is_ok());
+        }
+
+        let mut authors = query
+            .authors
+            .iter()
+            .map(|id| id.as_ref())
+            .collect::<Vec<_>>();
+
+        authors.sort();
+        query.kinds.sort();
+
+        for id in event_ids.iter() {
+            if let Some(event) = self.get_event(id)? {
+                if !authors.is_empty() && authors.binary_search(&event.author().as_ref()).is_err() {
+                    continue;
+                }
+                if !query.kinds.is_empty() && query.kinds.binary_search(&event.kind()).is_err() {
+                    continue;
+                }
+                if let Some(since) = query.since {
+                    if event.created_at() < since {
+                        continue;
+                    }
+                }
+                if let Some(until) = query.until {
+                    if event.created_at() > until {
+                        continue;
+                    }
+                }
+                events.push(event);
+                if events.len() as u64 == query.limit {
+                    break;
+                }
+            }
+        }
+
+        Ok(events)
+    }
+
+    /// Get events from the database with a given filter
+    pub fn get_events_by_filter(&self, mut query: Filter) -> Result<Vec<Event>, Error> {
+        let mut event_ids = HashSet::new();
+        let limit = if query.limit == 0 {
+            None
+        } else {
+            Some(query.limit.try_into()?)
+        };
+
+        if !query.ids.is_empty() {
+            query
+                .ids
+                .iter()
+                .map(|id| event_ids.insert(id.as_ref().to_vec()))
+                .for_each(drop);
+            query.ids.clear();
+        } else if !query.authors.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Author)?;
+            for public_key in query.authors.iter() {
+                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
+                    &ns,
+                    public_key.as_ref(),
+                    limit,
+                )?);
+            }
+            query.authors.clear();
+        } else if !query.kinds.is_empty() {
+            let ns = self.reference_to_cf_handle(ReferenceType::Kind)?;
+            for kind in query.kinds.iter() {
+                let kind: u32 = (*kind).into();
+                event_ids.extend(self.get_event_ids_by_ns_and_prefix(
+                    &ns,
+                    &kind.to_be_bytes(),
+                    limit,
+                )?);
+            }
+            println!("{:?} {}", event_ids, event_ids.len());
+            query.kinds.clear();
+        }
+
+        self.load_ids_and_filter(query, event_ids)
+    }
+
+    /// Similar function to get_events_by_author but instead of loading the
+    /// whole event, only the event id is loaded. This is done for performance,
+    /// to avoid loading the same event multiple times, also to only load from
+    /// the database the events that are needed (according to limits and other
+    /// constraints)
+    fn get_event_ids_by_ns_and_prefix(
+        &self,
+        cf_handle: &Arc<BoundColumnFamily>,
+        prefix: &[u8],
+        limit: Option<usize>,
+    ) -> Result<HashSet<Vec<u8>>, Error> {
+        let mut items = HashSet::new();
+        let limit = limit.unwrap_or(usize::MAX);
+
+        println!("F: {:?}", prefix);
+        for item in self.db.prefix_iterator_cf(cf_handle, prefix) {
+            println!("fix");
+            let (key, value) = match item {
+                Ok((k, v)) => (k, v),
+                Err(e) => return Err(Error::RocksDb(e)),
+            };
+
+            if !key.starts_with(prefix) {
+                break;
+            }
+
+            items.insert(value.to_vec());
+            if items.len() >= limit {
+                break;
+            }
+        }
+
+        Ok(items)
+    }
+
+    /// Get events by author
+    pub fn get_events_by_author<T: AsRef<[u8]>>(
+        &self,
+        public_key: T,
+        limit: Option<usize>,
+    ) -> Result<Vec<Event>, Error> {
+        let mut events: Vec<Event> = self
+            .get_event_ids_by_ns_and_prefix(
+                &self.reference_to_cf_handle(ReferenceType::Author)?,
+                public_key.as_ref(),
+                limit,
+            )?
+            .iter()
+            .map(|id| self.get_event(id))
+            .collect::<Result<Vec<Option<_>>, _>>()?
+            .into_iter()
+            .flatten()
+            .collect();
+
+        events.sort_by_key(|b| std::cmp::Reverse(b.created_at()));
+        Ok(events)
+    }
+
+    /// Loads all events that references a given event. The reference-type is provided and it can be any value of `ReferenceType`.
+    fn get_event_referenced_as<T: AsRef<[u8]>>(
+        &self,
+        ref_type: ReferenceType,
+        id: T,
+        limit: Option<usize>,
+    ) -> Result<Vec<Event>, Error> {
+        let mut items = vec![];
+        let limit = limit.unwrap_or(usize::MAX);
+        let prefix = id.as_ref();
+
+        for item in self
+            .db
+            .prefix_iterator_cf(&self.reference_to_cf_handle(ref_type)?, prefix)
+        {
+            let (key, value) = match item {
+                Ok((k, v)) => (k, v),
+                Err(e) => return Err(Error::RocksDb(e)),
+            };
+
+            if !key.starts_with(prefix) {
+                break;
+            }
+
+            if let Some(value) = self.get_event(value)? {
+                items.push(value);
+                if limit == items.len() {
+                    break;
+                }
+            }
+        }
+
+        Ok(items)
+    }
+
+    /// Loads a given event and all their related events (likes, reposts, etc)
+    pub fn get_event_and_related_events<T: AsRef<[u8]>>(
+        &self,
+        id: T,
+        limit: Option<usize>,
+    ) -> Result<Option<(Event, Vec<Event>)>, Error> {
+        if let Some(r) = self.get_event(&id)? {
+            Ok(Some((
+                r,
+                self.get_event_referenced_as(ReferenceType::RefEvent, &id, limit)?,
+            )))
+        } else {
+            Ok(None)
+        }
+    }
+
+    /// Returns an event by its ID
+    pub fn get_event<T: AsRef<[u8]>>(&self, id: T) -> Result<Option<Event>, Error> {
+        Ok(self
+            .db
+            .get_cf(&self.reference_to_cf_handle(ReferenceType::Events)?, id)?
+            .map(|event| serde_json::from_slice(&event))
+            .transpose()?)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use nostr_rs_types::types::{Addr, Kind};
+    use std::{
+        fs::File,
+        io::{BufRead, BufReader},
+    };
+
+    fn get_db() -> RocksDb {
+        let db = RocksDb::new(format!("tests/db/{}", nanoseconds())).expect("db");
+
+        let file = File::open("./tests/events.json").expect("file");
+        let events = BufReader::new(file)
+            .lines()
+            .map(|line| serde_json::from_str(&line.expect("line")).expect("valid"))
+            .collect::<Vec<Event>>();
+
+        for event in events {
+            assert!(db.store(&event).expect("valid"));
+        }
+        db
+    }
+
+    #[test]
+    fn store_and_get() {
+        let db = RocksDb::new(format!("tests/db/{}", nanoseconds())).expect("db");
+        let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
+
+        let event: Event = serde_json::from_str(json).expect("valid");
+        assert_eq!(true, db.store(&event).expect("valid"));
+        assert_eq!(false, db.store(&event).expect("valid"));
+
+        let event1 = db.get_event(&event.id).expect("something");
+        assert_eq!(event1, Some(event));
+    }
+
+    #[test]
+    fn records_are_sorted_by_date_desc() {
+        let db = get_db();
+
+        let pk: Addr = "7bdef7be22dd8e59f4600e044aa53a1cf975a9dc7d27df5833bc77db784a5805"
+            .try_into()
+            .expect("pk");
+
+        let vec = db.get_events_by_author(pk, Some(10)).expect("records");
+        let dates = vec.iter().map(|e| e.created_at()).collect::<Vec<_>>();
+        let mut sorted_dates = dates.clone();
+        sorted_dates.sort_by(|a, b| b.cmp(a));
+
+        assert_eq!(vec.len(), 10);
+        assert_eq!(dates, sorted_dates);
+    }
+
+    #[test]
+    fn get_event_and_related_events() {
+        let db = get_db();
+
+        let id: Addr = "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+            .try_into()
+            .expect("pk");
+
+        let events = db.get_event_and_related_events(id, None).expect("events");
+        assert!(events.is_some());
+        let events = events.expect("events");
+        assert_eq!(
+            events.0.id.to_string(),
+            "42224859763652914db53052103f0b744df79dfc4efef7e950fc0802fc3df3c5"
+        );
+        assert_eq!(events.1.len(), 2_538);
+
+        let mut kinds = events.1.iter().map(|x| x.kind()).collect::<Vec<_>>();
+        kinds.sort();
+        kinds.dedup();
+
+        assert_eq!(Kind::Reaction, kinds[0]);
+        assert_eq!(Kind::Unknown(42), kinds[1]);
+    }
+
+    #[test]
+    fn filter_by_authors() {
+        let db = get_db();
+        let query = Filter {
+            authors: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                    .try_into()
+                    .expect("pk"),
+                "38fb689f2fb92d932d457b1ea56715292bdf2140b2c7d282e8b8e8d644483ad6"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            ..Default::default()
+        };
+        let records = db.get_events_by_filter(query).expect("records");
+        assert_eq!(records.len(), 27);
+    }
+
+    #[test]
+    fn filter_by_author() {
+        let db = get_db();
+        let query = Filter {
+            authors: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            ..Default::default()
+        };
+        let records = db.get_events_by_filter(query).expect("records");
+        assert_eq!(records.len(), 3);
+    }
+
+    #[test]
+    fn filter_by_author_and_kinds() {
+        let db = get_db();
+        let query = Filter {
+            authors: vec![
+                "460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"
+                    .try_into()
+                    .expect("pk"),
+            ],
+            kinds: vec![Kind::ShortTextNote, Kind::Reaction],
+            ..Default::default()
+        };
+        let records = db.get_events_by_filter(query).expect("records");
+        assert_eq!(records.len(), 2);
+    }
+
+    #[test]
+    fn filter_kind() {
+        let db = get_db();
+        let query = Filter {
+            kinds: vec![Kind::ShortTextNote],
+            ..Default::default()
+        };
+        let records = db.get_events_by_filter(query).expect("records");
+        assert_eq!(records.len(), 2_538);
+        println!("{:?}", records);
+    }
+}