|
@@ -0,0 +1,102 @@
|
|
|
+use crate::{Error, Storage};
|
|
|
+use nostr_rs_types::types::{Addr, Event, Filter, Kind};
|
|
|
+use parking_lot::RwLock;
|
|
|
+use std::{collections::HashMap, sync::atomic::AtomicUsize};
|
|
|
+use tokio::sync::mpsc::Sender;
|
|
|
+
|
|
|
+#[allow(dead_code)]
|
|
|
+struct SubscriptionEntry {
|
|
|
+ pub id: usize,
|
|
|
+ pub filter: Filter,
|
|
|
+ pub sender: Sender<(usize, Event)>,
|
|
|
+}
|
|
|
+
|
|
|
+#[allow(dead_code)]
|
|
|
+enum SubscriptionListenerType {
|
|
|
+ Id(Addr),
|
|
|
+ Author(Addr),
|
|
|
+ Kind(Kind),
|
|
|
+ ReferenceToEvent(Addr),
|
|
|
+ ReferenceToPublicKey(Addr),
|
|
|
+}
|
|
|
+
|
|
|
+/// Subscription
|
|
|
+pub struct Subscription<T>
|
|
|
+where
|
|
|
+ T: Storage,
|
|
|
+{
|
|
|
+ db: T,
|
|
|
+ subscriptions: RwLock<HashMap<usize, SubscriptionEntry>>,
|
|
|
+ subscription_listener: RwLock<HashMap<SubscriptionListenerType, Vec<usize>>>,
|
|
|
+ last_id: AtomicUsize,
|
|
|
+}
|
|
|
+
|
|
|
+pub struct SubscriptionResultFromDb<I: Iterator> {
|
|
|
+ iterator: I,
|
|
|
+}
|
|
|
+
|
|
|
+impl<I> Iterator for SubscriptionResultFromDb<I>
|
|
|
+where
|
|
|
+ I: Iterator<Item = Result<Event, Error>>,
|
|
|
+{
|
|
|
+ type Item = Result<Event, Error>;
|
|
|
+
|
|
|
+ fn next(&mut self) -> Option<Self::Item> {
|
|
|
+ self.iterator.next()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T> Subscription<T>
|
|
|
+where
|
|
|
+ T: Storage,
|
|
|
+{
|
|
|
+ /// Wraps the storage layer to by pass the subscription/notification wrapper
|
|
|
+ pub fn new(db: T) -> Self {
|
|
|
+ Self {
|
|
|
+ db,
|
|
|
+ subscriptions: RwLock::new(HashMap::new()),
|
|
|
+ subscription_listener: RwLock::new(HashMap::new()),
|
|
|
+ last_id: AtomicUsize::new(0),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Gets an event from the wrapped storage
|
|
|
+ pub fn get_event<T1: AsRef<[u8]>>(&self, id: T1) -> Result<Option<Event>, Error> {
|
|
|
+ self.db.get_event(id)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Removes a subscription from the listener
|
|
|
+ pub fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
|
|
|
+ let mut subscribers = self.subscriptions.write();
|
|
|
+ let _ = subscribers.remove(&subscription_id);
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Subscribes to a filter. The first streamed bytes will be reads from the
|
|
|
+ /// database.
|
|
|
+ pub fn subscribe<'a>(
|
|
|
+ &'a self,
|
|
|
+ filter: Filter,
|
|
|
+ sender: Sender<(usize, Event)>,
|
|
|
+ ) -> Result<(usize, SubscriptionResultFromDb<T::Iterator<'a>>), Error> {
|
|
|
+ let mut subscribers = self.subscriptions.write();
|
|
|
+ let mut _subscription_listener = self.subscription_listener.write();
|
|
|
+ let id = self
|
|
|
+ .last_id
|
|
|
+ .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
|
|
+ subscribers.insert(
|
|
|
+ id,
|
|
|
+ SubscriptionEntry {
|
|
|
+ id,
|
|
|
+ filter: filter.clone(),
|
|
|
+ sender,
|
|
|
+ },
|
|
|
+ );
|
|
|
+ Ok((
|
|
|
+ id,
|
|
|
+ SubscriptionResultFromDb {
|
|
|
+ iterator: self.db.get_by_filter(filter)?,
|
|
|
+ },
|
|
|
+ ))
|
|
|
+ }
|
|
|
+}
|