|
@@ -1,120 +0,0 @@
|
|
|
-use crate::{Error, Storage};
|
|
|
-use futures::Stream;
|
|
|
-use nostr_rs_types::types::{Addr, Event, Filter, Kind};
|
|
|
-use std::{
|
|
|
- collections::HashMap,
|
|
|
- pin::Pin,
|
|
|
- sync::atomic::AtomicUsize,
|
|
|
- task::{Context, Poll},
|
|
|
-};
|
|
|
-use tokio::sync::{mpsc::Sender, RwLock};
|
|
|
-
|
|
|
-#[allow(dead_code)]
|
|
|
-struct SubscriptionEntry {
|
|
|
- pub id: usize,
|
|
|
- pub filter: Filter,
|
|
|
- pub sender: Sender<(usize, Event)>,
|
|
|
-}
|
|
|
-
|
|
|
-#[allow(dead_code)]
|
|
|
-enum SubscriptionListenerType {
|
|
|
- Id(Addr),
|
|
|
- Author(Addr),
|
|
|
- Kind(Kind),
|
|
|
- ReferenceToEvent(Addr),
|
|
|
- ReferenceToPublicKey(Addr),
|
|
|
-}
|
|
|
-
|
|
|
-/// Subscription
|
|
|
-pub struct Subscription<T>
|
|
|
-where
|
|
|
- T: Storage,
|
|
|
-{
|
|
|
- db: T,
|
|
|
- subscriptions: RwLock<HashMap<usize, SubscriptionEntry>>,
|
|
|
- subscription_listener: RwLock<HashMap<SubscriptionListenerType, Vec<usize>>>,
|
|
|
- last_id: AtomicUsize,
|
|
|
-}
|
|
|
-
|
|
|
-pub struct SubscriptionResultFromDb<I>
|
|
|
-where
|
|
|
- I: Stream<Item = Result<Event, Error>>,
|
|
|
-{
|
|
|
- iterator: I,
|
|
|
-}
|
|
|
-
|
|
|
-impl<I> Stream for SubscriptionResultFromDb<I>
|
|
|
-where
|
|
|
- I: Stream<Item = Result<Event, Error>>,
|
|
|
-{
|
|
|
- type Item = Result<Event, Error>;
|
|
|
-
|
|
|
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
- // Safety: it's safe to use Pin::map_unchecked_mut because the iterator field
|
|
|
- // is pinned as part of the SubscriptionResultFromDb struct
|
|
|
- let iterator = unsafe { self.map_unchecked_mut(|s| &mut s.iterator) };
|
|
|
- iterator.poll_next(cx)
|
|
|
- }
|
|
|
-
|
|
|
- fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
- self.iterator.size_hint()
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<T> Subscription<T>
|
|
|
-where
|
|
|
- T: Storage,
|
|
|
-{
|
|
|
- /// Wraps the storage layer to by pass the subscription/notification wrapper
|
|
|
- pub fn new(db: T) -> Self {
|
|
|
- Self {
|
|
|
- db,
|
|
|
- subscriptions: RwLock::new(HashMap::new()),
|
|
|
- subscription_listener: RwLock::new(HashMap::new()),
|
|
|
- last_id: AtomicUsize::new(0),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /// Gets an event from the wrapped storage
|
|
|
- pub async fn get_event<T1: AsRef<[u8]> + Send + Sync>(
|
|
|
- &self,
|
|
|
- id: T1,
|
|
|
- ) -> Result<Option<Event>, Error> {
|
|
|
- self.db.get_event(id).await
|
|
|
- }
|
|
|
-
|
|
|
- /// Removes a subscription from the listener
|
|
|
- pub async fn unsubscribe(self, subscription_id: usize) -> Result<(), Error> {
|
|
|
- let mut subscribers = self.subscriptions.write().await;
|
|
|
- let _ = subscribers.remove(&subscription_id);
|
|
|
- Ok(())
|
|
|
- }
|
|
|
-
|
|
|
- /// Subscribes to a filter. The first streamed bytes will be reads from the
|
|
|
- /// database.
|
|
|
- pub async fn subscribe(
|
|
|
- &self,
|
|
|
- filter: Filter,
|
|
|
- sender: Sender<(usize, Event)>,
|
|
|
- ) -> Result<(usize, SubscriptionResultFromDb<T::Cursor<'_>>), Error> {
|
|
|
- let mut subscribers = self.subscriptions.write().await;
|
|
|
- let mut _subscription_listener = self.subscription_listener.write().await;
|
|
|
- let id = self
|
|
|
- .last_id
|
|
|
- .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
|
|
- subscribers.insert(
|
|
|
- id,
|
|
|
- SubscriptionEntry {
|
|
|
- id,
|
|
|
- filter: filter.clone(),
|
|
|
- sender,
|
|
|
- },
|
|
|
- );
|
|
|
- Ok((
|
|
|
- id,
|
|
|
- SubscriptionResultFromDb {
|
|
|
- iterator: self.db.get_by_filter(filter).await?,
|
|
|
- },
|
|
|
- ))
|
|
|
- }
|
|
|
-}
|