|
@@ -7,6 +7,7 @@ use std::{
|
|
|
atomic::{AtomicUsize, Ordering},
|
|
|
Arc,
|
|
|
},
|
|
|
+ u64,
|
|
|
};
|
|
|
use tokio::sync::RwLock;
|
|
|
|
|
@@ -20,12 +21,14 @@ pub struct Indexes {
|
|
|
ref_pub_key: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
|
|
kind: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
|
|
ids_by_time: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
|
|
+ /// silly index to store the keys of the indexes, but it helps to use the same code
|
|
|
+ ids: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
|
|
+ local_events: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
|
|
|
}
|
|
|
|
|
|
#[derive(Default)]
|
|
|
pub struct Memory {
|
|
|
db: Arc<RwLock<BTreeMap<[u8; 32], Event>>>,
|
|
|
- local_events: Arc<RwLock<BTreeMap<(i64, [u8; 32]), ()>>>,
|
|
|
indexes: Arc<Indexes>,
|
|
|
indexers_running: Arc<AtomicUsize>,
|
|
|
}
|
|
@@ -56,11 +59,28 @@ impl Storage for Memory {
|
|
|
let secondary_index = SecondaryIndex::new(&event.id, event.created_at());
|
|
|
let event_id = event.id.clone().0.to_vec();
|
|
|
|
|
|
+ let time_desc = u64::MAX
|
|
|
+ .checked_sub(
|
|
|
+ event
|
|
|
+ .created_at()
|
|
|
+ .timestamp_micros()
|
|
|
+ .try_into()
|
|
|
+ .unwrap_or_default(),
|
|
|
+ )
|
|
|
+ .unwrap_or_default()
|
|
|
+ .to_be_bytes();
|
|
|
+
|
|
|
+ indexes
|
|
|
+ .ids
|
|
|
+ .write()
|
|
|
+ .await
|
|
|
+ .insert(event_id.clone(), event_id.clone());
|
|
|
+
|
|
|
indexes
|
|
|
.ids_by_time
|
|
|
.write()
|
|
|
.await
|
|
|
- .insert(secondary_index.index_by(&event_id), event_id.clone());
|
|
|
+ .insert(secondary_index.index_by(&time_desc), event_id.clone());
|
|
|
|
|
|
indexes
|
|
|
.author
|
|
@@ -111,9 +131,12 @@ impl Storage for Memory {
|
|
|
}
|
|
|
|
|
|
async fn set_local_event(&self, event: &Event) -> Result<(), Error> {
|
|
|
- let mut local_events = self.local_events.write().await;
|
|
|
- local_events.insert((event.created_at().timestamp_micros(), event.id.0), ());
|
|
|
- todo!()
|
|
|
+ let mut local_events = self.indexes.local_events.write().await;
|
|
|
+ local_events.insert(
|
|
|
+ SecondaryIndex::new(&event.id, event.created_at()).index_by(&[]),
|
|
|
+ event.id.0.to_vec(),
|
|
|
+ );
|
|
|
+ Ok(())
|
|
|
}
|
|
|
|
|
|
async fn get_event<T: AsRef<[u8]> + Send + Sync>(&self, id: T) -> Result<Option<Event>, Error> {
|
|
@@ -157,14 +180,36 @@ impl Storage for Memory {
|
|
|
)
|
|
|
} else if !query.ids.is_empty() {
|
|
|
(
|
|
|
- self.indexes.ids_by_time.read().await,
|
|
|
+ self.indexes.ids.read().await,
|
|
|
std::mem::take(&mut query.ids)
|
|
|
.into_iter()
|
|
|
.map(|c| c.take())
|
|
|
.collect::<VecDeque<_>>(),
|
|
|
)
|
|
|
+ } else if !query.authors.is_empty() {
|
|
|
+ (
|
|
|
+ self.indexes.author.read().await,
|
|
|
+ std::mem::take(&mut query.authors)
|
|
|
+ .into_iter()
|
|
|
+ .map(|c| c.take())
|
|
|
+ .collect::<VecDeque<_>>(),
|
|
|
+ )
|
|
|
+ } else if !query.kinds.is_empty() {
|
|
|
+ (
|
|
|
+ self.indexes.kind.read().await,
|
|
|
+ std::mem::take(&mut query.kinds)
|
|
|
+ .into_iter()
|
|
|
+ .map(|kind| {
|
|
|
+ let kind: u32 = kind.into();
|
|
|
+ kind.to_be_bytes().to_vec()
|
|
|
+ })
|
|
|
+ .collect::<VecDeque<_>>(),
|
|
|
+ )
|
|
|
} else {
|
|
|
- todo!()
|
|
|
+ (
|
|
|
+ self.indexes.ids_by_time.read().await,
|
|
|
+ vec![Vec::new()].into(), // all keys
|
|
|
+ )
|
|
|
};
|
|
|
|
|
|
Ok(Self::Cursor {
|
|
@@ -180,8 +225,18 @@ impl Storage for Memory {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- async fn get_local_events(&self, _limit: Option<usize>) -> Result<Self::Cursor<'_>, Error> {
|
|
|
- todo!()
|
|
|
+ async fn get_local_events(&self, limit: Option<usize>) -> Result<Self::Cursor<'_>, Error> {
|
|
|
+ Ok(Self::Cursor {
|
|
|
+ db: self,
|
|
|
+ index: self.indexes.local_events.read().await,
|
|
|
+ index_prefixes: vec![Vec::new()].into(), // all keys
|
|
|
+ current_index_key: Vec::new(),
|
|
|
+ filter: None,
|
|
|
+ limit,
|
|
|
+ returned: 0,
|
|
|
+ last_prefix: None,
|
|
|
+ future_event: None,
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
|