Parcourir la source

Working on an inmemory storage

Cesar Rodas il y a 3 mois
Parent
commit
60af3694c7

+ 14 - 0
crates/storage/memory/Cargo.toml

@@ -0,0 +1,14 @@
+[package]
+name = "nostr-rs-memory"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+async-trait = "0.1.81"
+futures = "0.3.30"
+nostr-rs-storage-base = { path = "../base" }
+nostr-rs-types = { path = "../../types" }
+tokio = { version = "1.39.2", features = ["full"] }
+
+[dev-dependencies]
+nostr-rs-storage-base = { path = "../base", features = ["test"] }

+ 111 - 0
crates/storage/memory/src/cursor.rs

@@ -0,0 +1,111 @@
+use crate::Memory;
+use futures::Stream;
+use nostr_rs_storage_base::{
+    cursor::{check_future_call, FutureResult, FutureValue},
+    Error, EventFilter, Storage,
+};
+use nostr_rs_types::types::Event;
+use std::{
+    collections::{BTreeMap, VecDeque},
+    pin::Pin,
+    task::{Context, Poll},
+};
+use tokio::sync::RwLockReadGuard;
+
+pub struct Cursor<'a> {
+    pub db: &'a Memory,
+    pub filter: Option<EventFilter>,
+    pub limit: Option<usize>,
+    pub returned: usize,
+    pub index: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,
+    pub index_prefixes: VecDeque<Vec<u8>>,
+    pub last_prefix: Option<Vec<u8>>,
+    pub current_index_key: Vec<u8>,
+    pub future_event: Option<FutureResult<'a>>,
+}
+
+impl<'a> Cursor<'a> {
+    fn get_id_from_index(&mut self) -> Option<Vec<u8>> {
+        if let Some(Some(Some((last_prefix, id)))) = self.last_prefix.take().map(|last_prefix| {
+            // We are iterating over an index and we are half way through, we
+            // know the last key, we start iterating from them and skip 1,
+            // because we already have that key If the key starts with the
+            // current index key, we return it otherwise we go to the else
+            // brancha and the next key in the index_prefixes is used
+            //
+            // This approach is needed since holding a reference to the index Range has been problematic
+            self.index
+                .range(last_prefix..)
+                .nth(1)
+                .map(|(idx_id, value)| {
+                    if idx_id.starts_with(&self.current_index_key) {
+                        Some((idx_id.clone(), value.clone()))
+                    } else {
+                        None
+                    }
+                })
+        }) {
+            self.last_prefix = Some(last_prefix);
+            Some(id)
+        } else {
+            loop {
+                self.current_index_key = self.index_prefixes.pop_front()?;
+                if let Some(Some((last_prefix, id))) = self
+                    .index
+                    .range(self.current_index_key.clone()..)
+                    .next()
+                    .map(|(idx_id, id)| {
+                        if idx_id.starts_with(&self.current_index_key) {
+                            Some((idx_id.clone(), id.clone()))
+                        } else {
+                            None
+                        }
+                    })
+                {
+                    self.last_prefix = Some(last_prefix);
+                    return Some(id);
+                }
+            }
+        }
+    }
+}
+
+impl<'a> Stream for Cursor<'a> {
+    type Item = Result<Event, Error>;
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, None)
+    }
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if Some(self.returned) == self.limit {
+            return Poll::Ready(None);
+        }
+
+        let this = Pin::into_inner(self);
+
+        match check_future_call(&mut this.future_event, &this.filter, cx) {
+            FutureValue::Found(event) => {
+                this.returned += 1;
+                return Poll::Ready(Some(event));
+            }
+            FutureValue::Pending => return Poll::Pending,
+            _ => {}
+        }
+
+        while let Some(id) = this.get_id_from_index() {
+            this.future_event = Some(this.db.get_event(id));
+
+            match check_future_call(&mut this.future_event, &this.filter, cx) {
+                FutureValue::Found(event) => {
+                    this.returned += 1;
+                    return Poll::Ready(Some(event));
+                }
+                FutureValue::Pending => return Poll::Pending,
+                _ => {}
+            }
+        }
+
+        Poll::Ready(None)
+    }
+}

+ 201 - 0
crates/storage/memory/src/lib.rs

@@ -0,0 +1,201 @@
+use nostr_rs_storage_base::{Error, SecondaryIndex, Storage};
+use nostr_rs_types::types::{Event, Filter, Tag};
+use std::{
+    cmp::min,
+    collections::{BTreeMap, VecDeque},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+};
+use tokio::sync::RwLock;
+
+mod cursor;
+
+#[derive(Default)]
+pub struct Indexes {
+    // Vec<u8> is used instead of Id, since references can be partial keys
+    author: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    ref_event: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    ref_pub_key: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    kind: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+    ids_by_time: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
+}
+
+#[derive(Default)]
+pub struct Memory {
+    db: Arc<RwLock<BTreeMap<[u8; 32], Event>>>,
+    local_events: Arc<RwLock<BTreeMap<(i64, [u8; 32]), ()>>>,
+    indexes: Arc<Indexes>,
+    indexers_running: Arc<AtomicUsize>,
+}
+
+#[async_trait::async_trait]
+impl Storage for Memory {
+    type Cursor<'a> = cursor::Cursor<'a>;
+
+    fn is_flushing(&self) -> bool {
+        self.indexers_running.load(Ordering::SeqCst) > 0
+    }
+
+    async fn store(&self, event: &Event) -> Result<bool, Error> {
+        let mut db = self.db.write().await;
+        if db.get(&event.id.0).is_some() {
+            return Ok(false);
+        }
+
+        db.insert(event.id.0, event.clone());
+
+        let event = event.clone();
+        let indexes = self.indexes.clone();
+        let indexer_running = self.indexers_running.clone();
+        let _ = indexer_running.fetch_add(1, Ordering::SeqCst);
+        tokio::spawn(async move {
+            // build indexes asynchronously to avoid locking contention and make
+            // `storing` slower, as the stream will hold a read lock while iterating
+            let secondary_index = SecondaryIndex::new(&event.id, event.created_at());
+            let event_id = event.id.clone().0.to_vec();
+
+            indexes
+                .ids_by_time
+                .write()
+                .await
+                .insert(secondary_index.index_by(&event_id), event_id.clone());
+
+            indexes
+                .author
+                .write()
+                .await
+                .insert(secondary_index.index_by(event.author()), event_id.clone());
+
+            let kind: u32 = event.kind().into();
+
+            indexes.kind.write().await.insert(
+                secondary_index.index_by(kind.to_be_bytes()),
+                event_id.clone(),
+            );
+
+            for tag in event.tags().iter() {
+                match tag {
+                    Tag::PubKey(pub_key) => {
+                        let foreign_id = secondary_index.index_by(&pub_key.id);
+                        let local_id = secondary_index.index_by(&event_id);
+
+                        indexes
+                            .ref_pub_key
+                            .write()
+                            .await
+                            .insert(foreign_id, event_id.clone());
+
+                        indexes
+                            .ref_event
+                            .write()
+                            .await
+                            .insert(local_id, pub_key.id.to_vec());
+                    }
+                    Tag::Event(foreign_event) => {
+                        let foreign_id = secondary_index.index_by(&foreign_event.id);
+                        let local_id = secondary_index.index_by(&event_id);
+
+                        let mut ref_event = indexes.ref_event.write().await;
+                        ref_event.insert(foreign_id, event_id.clone());
+                        ref_event.insert(local_id, foreign_event.id.to_vec());
+                    }
+                    _ => {}
+                }
+            }
+
+            let _ = indexer_running.fetch_sub(1, Ordering::SeqCst);
+        });
+        Ok(true)
+    }
+
+    async fn set_local_event(&self, event: &Event) -> Result<(), Error> {
+        let mut local_events = self.local_events.write().await;
+        local_events.insert((event.created_at().timestamp_micros(), event.id.0), ());
+        todo!()
+    }
+
+    async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error> {
+        let events = self.db.read().await;
+        let mut id_filter = [0u8; 32];
+        let id = id.as_ref();
+        let slice = 0..min(id.len(), 32);
+        id_filter[slice.clone()].copy_from_slice(&id[slice]);
+
+        if let Some((db_id, value)) = events.range(id_filter..).next() {
+            if db_id.starts_with(id) {
+                return Ok(Some(value.clone()));
+            }
+        }
+
+        return Ok(None);
+    }
+
+    async fn get_by_filter(&self, query: Filter) -> Result<Self::Cursor<'_>, Error> {
+        let limit = if query.limit == 0 {
+            None
+        } else {
+            Some(query.limit.try_into()?)
+        };
+
+        let (index, index_prefixes) = if !query.references_to_event.is_empty() {
+            (
+                self.indexes.ref_event.read().await,
+                query
+                    .references_to_event
+                    .iter()
+                    .map(|c| c.as_ref().to_vec())
+                    .collect::<VecDeque<_>>(),
+            )
+        } else if !query.references_to_public_key.is_empty() {
+            (
+                self.indexes.ref_pub_key.read().await,
+                query
+                    .references_to_public_key
+                    .iter()
+                    .map(|c| c.as_ref().to_vec())
+                    .collect::<VecDeque<_>>(),
+            )
+        } else if !query.ids.is_empty() {
+            (
+                self.indexes.ids_by_time.read().await,
+                query
+                    .ids
+                    .iter()
+                    .map(|c| c.as_ref().to_vec())
+                    .collect::<VecDeque<_>>(),
+            )
+        } else {
+            todo!()
+        };
+
+        Ok(Self::Cursor {
+            db: self,
+            index,
+            index_prefixes,
+            current_index_key: Vec::new(),
+            filter: Some(query.into()),
+            limit,
+            returned: 0,
+            last_prefix: None,
+            future_event: None,
+        })
+    }
+
+    async fn get_local_events(&self, _limit: Option<usize>) -> Result<Self::Cursor<'_>, Error> {
+        todo!()
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    async fn new_instance(_path: &str) -> Memory {
+        Memory::default()
+    }
+
+    async fn destroy_instance(_path: &str) {}
+
+    nostr_rs_storage_base::storage_test!(Memory, new_instance, destroy_instance);
+}