소스 검색

Improve pool subscription

Instead of having the meta subscription, add a subscription scheduler. The
subscription scheduler will maintain certain subscriptions as active and others
as dormant. When dormant subscriptions are awaking, the parameter `since` is
used only to accept events that must have happened since last time.
Cesar Rodas 2 달 전
부모
커밋
77d03a3ffa
5개의 변경된 파일218개의 추가작업 그리고 183개의 파일을 삭제
  1. 1 1
      Cargo.lock
  2. 1 1
      crates/client/Cargo.toml
  3. 11 26
      crates/client/src/pool/mod.rs
  4. 204 154
      crates/client/src/pool/subscription.rs
  5. 1 1
      crates/subscription-manager/src/lib.rs

+ 1 - 1
Cargo.lock

@@ -1262,13 +1262,13 @@ dependencies = [
 name = "nostr-rs-client"
 version = "0.1.0"
 dependencies = [
+ "chrono",
  "futures",
  "futures-util",
  "log",
  "nostr-rs-memory",
  "nostr-rs-relayer",
  "nostr-rs-storage-base",
- "nostr-rs-subscription-manager",
  "nostr-rs-types",
  "serde_json",
  "thiserror",

+ 1 - 1
crates/client/Cargo.toml

@@ -7,7 +7,6 @@ edition = "2021"
 thiserror = "1.0.40"
 nostr-rs-types = { path = "../types" }
 nostr-rs-storage-base = { path = "../storage/base" }
-nostr-rs-subscription-manager = { path = "../subscription-manager" }
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
 tokio-tungstenite = { version = "0.18.0", features = [
     "rustls",
@@ -19,6 +18,7 @@ serde_json = "1.0.94"
 futures-util = "0.3.27"
 log = "0.4.17"
 futures = "0.3.28"
+chrono = "0.4.38"
 
 [dev-dependencies]
 nostr-rs-relayer = { path = "../relayer" }

+ 11 - 26
crates/client/src/pool/mod.rs

@@ -1,7 +1,7 @@
 //! Relayers
 //!
 //! This is the main entry point to the client library.
-use crate::{client::ActiveSubscription as ClientActiveSubscription, Client, Error};
+use crate::{Client, Error};
 use futures::future::join_all;
 use nostr_rs_types::{
     client::{self, subscribe},
@@ -14,13 +14,13 @@ use std::{
         Arc,
     },
 };
+use subscription::Scheduler;
 use tokio::sync::{mpsc, RwLock};
 use url::Url;
 
 pub mod subscription;
 
-pub(crate) type AllClients =
-    Arc<RwLock<HashMap<Url, (Arc<AtomicUsize>, (ClientActiveSubscription, Client))>>>;
+pub(crate) type AllClients = Arc<RwLock<HashMap<Url, (Arc<AtomicUsize>, Client)>>>;
 
 /// Clients
 ///
@@ -31,7 +31,7 @@ pub struct Pool {
     clients: AllClients,
     sender: mpsc::Sender<(Response, Url)>,
     receiver: Option<mpsc::Receiver<(Response, Url)>>,
-    subscription_manager: Arc<subscription::Manager>,
+    subscription_manager: Arc<subscription::Scheduler>,
 }
 
 /// Active client
@@ -65,10 +65,11 @@ pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100_000;
 impl Default for Pool {
     fn default() -> Self {
         let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
+        let clients: AllClients = Default::default();
         Self {
-            clients: Default::default(),
+            clients: clients.clone(),
             receiver: Some(receiver),
-            subscription_manager: Default::default(),
+            subscription_manager: Arc::new(Scheduler::new(clients.clone())),
             sender,
         }
     }
@@ -123,7 +124,7 @@ impl Pool {
         join_all(
             clients
                 .values()
-                .map(|(_, (_, sender))| sender.post(request.clone()))
+                .map(|(_, sender)| sender.post(request.clone()))
                 .collect::<Vec<_>>(),
         )
         .await;
@@ -135,7 +136,7 @@ impl Pool {
             .read()
             .await
             .iter()
-            .filter(|(_, (_, (_, client)))| client.is_connected())
+            .filter(|(_, (_, client))| client.is_connected())
             .collect::<Vec<_>>()
             .len()
     }
@@ -166,13 +167,10 @@ impl Pool {
                 },
             );
 
-            // subscribe to all events
-            let meta_subscription = client
-                .subscribe(subscribe::Subscribe::to_all_events())
-                .await?;
+            self.subscription_manager.register_client(&client).await;
 
             let ref_id: Arc<AtomicUsize> = Arc::new(1.into());
-            clients.insert(url.clone(), (ref_id.clone(), (meta_subscription, client)));
+            clients.insert(url.clone(), (ref_id.clone(), client));
             ref_id
         };
 
@@ -335,12 +333,6 @@ mod test {
 
         sleep(Duration::from_millis(10)).await;
 
-        assert!(client_pool1
-            .try_recv()
-            .map(|(r, _)| r)
-            .expect("valid message")
-            .as_end_of_stored_events()
-            .is_some());
         assert_eq!(
             Some((signed_content.id, signed_content.signature)),
             client_pool1
@@ -372,13 +364,6 @@ mod test {
             .as_end_of_stored_events()
             .is_some());
 
-        assert!(client_pool1
-            .try_recv()
-            .map(|(r, _)| r)
-            .expect("valid message")
-            .as_end_of_stored_events()
-            .is_some());
-
         let account1 = Account::default();
         let signed_content = account1
             .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)

+ 204 - 154
crates/client/src/pool/subscription.rs

@@ -1,35 +1,41 @@
 //! Subscription manager
-use crate::{client, Error};
+use super::AllClients;
+use crate::{client, Client, Error};
+use chrono::Utc;
 use futures::future::join_all;
-use nostr_rs_subscription_manager::{
-    ActiveSubscription as ActiveSubscriptionInner, SubscriptionManager,
-};
 use nostr_rs_types::{
-    client::subscribe::{self, is_all_events},
-    relayer,
+    client::subscribe::{self},
     types::SubscriptionId,
     Response,
 };
-use std::sync::{
-    atomic::{AtomicUsize, Ordering},
-    Arc,
+use std::{
+    collections::{BTreeMap, VecDeque},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
 };
-use tokio::sync::{mpsc, Mutex};
+use tokio::sync::{mpsc, RwLock, RwLockWriteGuard};
 use url::Url;
 
-use super::AllClients;
-
 #[derive(Debug, Copy, Default, Eq, PartialEq, Clone)]
 /// Subscription status
 pub enum Status {
     /// Subscription is awaiting to be subscribed
     #[default]
     Queued,
-    /// Subscribed is active
+    /// Subscribed is active and it is fetching previous records and no EOD has
+    /// been received
+    Fetching,
+    /// Subscription is listening, an EOD has been received. This state can be
+    /// Requeued is their spot is needed for other subscriptions
     Subscribed,
-    /// Technically unsubscribed, and fetching future events from the relayer
-    /// from the All-Events meta subscription
-    Stale,
+    /// Waiting to be subscribed again
+    Requeued,
+    /// Resubscribed, like subscribed but the EOD is ignored and nore relayed to
+    /// the listeners, since this is not the first the this subscription has
+    /// been created and it will be rotated soon
+    Resubscribed,
 }
 
 #[derive(Debug, Default)]
@@ -37,7 +43,7 @@ struct SubscriptionInner {
     /// Active subscription (in the client side), when this is Drop all clients unsubscribes
     active_subscription: Option<Vec<client::ActiveSubscription>>,
     /// Subscription status
-    status: Arc<Mutex<Status>>,
+    status: Status,
     /// raw request
     subscription_request: subscribe::Subscribe,
 }
@@ -49,27 +55,14 @@ struct SubscriptionInner {
 ///
 /// This must be dropped to unsubscribe from the subscription manager
 pub struct ActiveSubscription {
-    _id: ActiveSubscriptionInner<PoolSubscriptionId, SubscriptionInner>,
-    status: Arc<Mutex<Status>>,
-    active_subscriptions: Arc<AtomicUsize>,
-    queued_subscriptions: Arc<AtomicUsize>,
-    stale_subscriptions: Arc<AtomicUsize>,
+    unsubscriber: Option<(PoolSubscriptionId, Arc<Scheduler>)>,
 }
 
 impl Drop for ActiveSubscription {
     fn drop(&mut self) {
-        let active_subscriptions = self.active_subscriptions.clone();
-        let queued_subscriptions = self.queued_subscriptions.clone();
-        let stale_subscriptions = self.stale_subscriptions.clone();
-        let status = self.status.clone();
-
-        tokio::spawn(async move {
-            match *status.lock().await {
-                Status::Subscribed => active_subscriptions.fetch_sub(1, Ordering::Relaxed),
-                Status::Queued => queued_subscriptions.fetch_sub(1, Ordering::Relaxed),
-                Status::Stale => stale_subscriptions.fetch_sub(1, Ordering::Relaxed),
-            }
-        });
+        if let Some((id, scheduler)) = self.unsubscriber.take() {
+            scheduler.remove(id);
+        }
     }
 }
 
@@ -85,33 +78,58 @@ impl Default for PoolSubscriptionId {
 
 /// Subscription manager
 ///
-/// Clients who are added to the pool are automatically subscribed to all
-/// events, which is known as the All-Events subscription.
-///
-/// Subscriptions in the client pool are smarter than the raw subscriptions at
-/// the client level. These subscriptions will be active until an "End of
-/// stored" event is received; the client pool will unsubscribe at that point,
-/// and the All-Events subscriptions, filtered internally, will fetch future
-/// events. By doing so, only past events are queried, and there is a single
-/// stream of future events to be a good citizen with other relayers.
+/// Pools can have an unlimited number of active subscriptions, but this
+/// subscriptions will be scheduled internally.
 #[derive(Default)]
-pub(crate) struct Manager {
-    subscription_manager: Arc<SubscriptionManager<PoolSubscriptionId, SubscriptionInner>>,
+pub(crate) struct Scheduler {
+    subscriptions: RwLock<BTreeMap<PoolSubscriptionId, SubscriptionInner>>,
     all_clients: AllClients,
+    /// VecDec to have the list of subscritions
+    ///
+    /// This structure will pop the first elements and move them to be back
+    /// waiting their turn to be rescheduled.
+    subscription_queue: RwLock<VecDeque<PoolSubscriptionId>>,
     active_subscriptions: Arc<AtomicUsize>,
-    queued_subscriptions: Arc<AtomicUsize>,
-    stale_subscriptions: Arc<AtomicUsize>,
+    total_subscriptions: Arc<AtomicUsize>,
 }
 
 /// Maximum number of subscriptions
 pub const MAX_SUBSCRIPTIONS: usize = 50;
 
-impl Manager {
+#[allow(warnings)]
+impl Scheduler {
+    /// Creates a new instance
+    pub fn new(all_clients: AllClients) -> Self {
+        Self {
+            all_clients,
+            ..Default::default()
+        }
+    }
+
+    /// Register a new Client
+    pub async fn register_client(&self, client: &Client) {
+        let mut subscriptions = self.subscriptions.write().await;
+        for subscription in subscriptions.values_mut() {
+            if matches!(
+                subscription.status,
+                Status::Fetching | Status::Resubscribed | Status::Subscribed,
+            ) {
+                if let Ok(active_subscription) = client
+                    .subscribe(subscription.subscription_request.clone())
+                    .await
+                {
+                    if let Some(active_subscriptions) = subscription.active_subscription.as_mut() {
+                        active_subscriptions.push(active_subscription);
+                    }
+                }
+            }
+        }
+    }
+
     /// Processes all messages from the client pools
     ///
-    /// The client pool creates a subscription to the All-Events subscription,
-    /// this callback checks if there are any listener to this event, otherwise
-    /// it will not process the event.
+    /// Mainly to schedule subscriptions and pass the message over to the
+    /// listeners
     pub async fn process_message(
         self: &Arc<Self>,
         message: Response,
@@ -121,53 +139,22 @@ impl Manager {
         match message {
             Response::EndOfStoredEvents(subscription_id) => {
                 let subscription_id = PoolSubscriptionId((subscription_id.0, None));
-                let mut subscription = self.subscription_manager.subbscriptions_mut().await;
+                let mut subscription = self.subscriptions.write().await;
                 if let Some(s) = subscription.get_mut(&subscription_id) {
-                    *s.status.lock().await = Status::Stale;
-                    let _ = s.active_subscription.take();
+                    let old_status = s.status;
+                    s.status = Status::Subscribed;
 
-                    self.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
-                    self.stale_subscriptions.fetch_add(1, Ordering::Relaxed);
-                }
-
-                return_to
-                    .try_send((
-                        Response::EndOfStoredEvents(subscription_id.0 .0.into()),
-                        url,
-                    ))
-                    .map_err(|e| Error::InternalChannel(e.to_string()))?;
-
-                Ok(())
-            }
-            Response::Event(relayer::Event {
-                subscription_id,
-                event,
-            }) => {
-                if !is_all_events(&subscription_id) {
-                    // This is not an All-Events subscription, it must be passed on as it is an stored event
-                    return_to
-                        .try_send((
-                            Response::Event(relayer::Event {
-                                subscription_id,
-                                event,
-                            }),
-                            url.clone(),
-                        ))
-                        .map_err(|e| Error::InternalChannel(e.to_string()))?;
-                    return Ok(());
+                    if old_status == Status::Fetching {
+                        return_to
+                            .try_send((
+                                Response::EndOfStoredEvents(subscription_id.0 .0.into()),
+                                url,
+                            ))
+                            .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                    }
                 }
 
-                for id in self.subscription_manager.get_subscribers(&event).await {
-                    return_to
-                        .try_send((
-                            Response::Event(relayer::Event {
-                                subscription_id: id.0 .0.clone(),
-                                event: event.clone(),
-                            }),
-                            url.clone(),
-                        ))
-                        .map_err(|e| Error::InternalChannel(e.to_string()))?;
-                }
+                self.active_subscription_scheduler();
 
                 Ok(())
             }
@@ -180,43 +167,102 @@ impl Manager {
         }
     }
 
-    async fn update_active_subscriptions(self: &Arc<Self>) {
-        if self.active_subscriptions.load(Ordering::Relaxed) >= MAX_SUBSCRIPTIONS
-            || self.queued_subscriptions.load(Ordering::Relaxed) == 0
-        {
-            return;
-        }
+    fn active_subscription_scheduler(self: &Arc<Self>) {
+        let this = self.clone();
+        tokio::spawn(async move {
+            let clients = this.all_clients.read().await;
+            let mut subscriptions = this.subscriptions.write().await;
+            let mut subscription_queue = this.subscription_queue.write().await;
+            let items = subscription_queue.len();
 
-        let clients = self.all_clients.read().await;
-        let mut subscriptions = self.subscription_manager.subbscriptions_mut().await;
+            // A subscription must be descheduled as its place is needed.
+            let mut deschedule =
+                |subscriptions: &mut RwLockWriteGuard<
+                    '_,
+                    BTreeMap<PoolSubscriptionId, SubscriptionInner>,
+                >,
+                 subscription_queue: &mut RwLockWriteGuard<'_, VecDeque<PoolSubscriptionId>>|
+                 -> bool {
+                    for subscription_id in subscription_queue.iter() {
+                        let mut subscription =
+                            if let Some(subscription) = subscriptions.get_mut(subscription_id) {
+                                subscription
+                            } else {
+                                continue;
+                            };
 
-        for subscription in subscriptions.values_mut() {
-            if *subscription.status.lock().await == Status::Queued {
-                let wait_all = clients
-                    .values()
-                    .map(|(_, (_, sender))| {
-                        sender.subscribe(subscription.subscription_request.clone())
-                    })
-                    .collect::<Vec<_>>();
-
-                if let Ok(active_subscriptions) = join_all(wait_all)
-                    .await
-                    .into_iter()
-                    .collect::<Result<Vec<_>, _>>()
+                        if matches!(subscription.status, Status::Subscribed) {
+                            // unsubscribe
+                            let _ = subscription.active_subscription.take();
+                            // update counter
+                            this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                            // update since for next request
+                            let now = Utc::now();
+                            for filter in subscription.subscription_request.filters.iter_mut() {
+                                filter.since = Some(now);
+                            }
+                            return true;
+                        }
+                    }
+
+                    false
+                };
+
+            for _ in (0..items) {
+                let subscription_id = if let Some(subscription_id) = subscription_queue.pop_front()
                 {
-                    subscription.active_subscription = Some(active_subscriptions);
-                    *subscription.status.lock().await = Status::Subscribed;
-
-                    let queued_subscribed =
-                        self.queued_subscriptions.fetch_sub(1, Ordering::Relaxed);
-                    let active_subscriptions =
-                        self.active_subscriptions.fetch_add(1, Ordering::Relaxed);
-                    if queued_subscribed == 0 || active_subscriptions >= MAX_SUBSCRIPTIONS {
+                    subscription_id
+                } else {
+                    continue;
+                };
+                let mut subscription =
+                    if let Some(subscription) = subscriptions.remove(&subscription_id) {
+                        subscription
+                    } else {
+                        continue;
+                    };
+
+                // add subscription id back to the last element, to be visited later
+                subscription_queue.push_back(subscription_id.clone());
+
+                let prev_status = subscription.status;
+
+                if matches!(prev_status, Status::Queued | Status::Requeued) {
+                    if this.active_subscriptions.load(Ordering::SeqCst) >= MAX_SUBSCRIPTIONS
+                        && !deschedule(&mut subscriptions, &mut subscription_queue)
+                    {
+                        subscriptions.insert(subscription_id, subscription);
+                        // There is no more room to promote this subscription to active
                         break;
                     }
+
+                    let wait_all = clients
+                        .values()
+                        .map(|(_, sender)| {
+                            sender.subscribe(subscription.subscription_request.clone())
+                        })
+                        .collect::<Vec<_>>();
+
+                    if let Ok(active_subscriptions) = join_all(wait_all)
+                        .await
+                        .into_iter()
+                        .collect::<Result<Vec<_>, _>>()
+                    {
+                        subscription.active_subscription = Some(active_subscriptions);
+                        subscription.status = if prev_status == Status::Queued {
+                            Status::Fetching
+                        } else {
+                            Status::Resubscribed
+                        };
+
+                        this.active_subscriptions.fetch_add(1, Ordering::Relaxed);
+                    }
                 }
+
+                // add it back
+                subscriptions.insert(subscription_id, subscription);
             }
-        }
+        });
     }
 
     /// Creates a new subscription with a given filters
@@ -225,43 +271,47 @@ impl Manager {
         subscription_request: subscribe::Subscribe,
         specific_url: Option<Url>,
     ) -> ActiveSubscription {
-        let status = Arc::new(Mutex::new(Status::Queued));
-        let id = self
-            .subscription_manager
-            .subscribe(
-                PoolSubscriptionId((
-                    subscription_request.subscription_id.clone(),
-                    specific_url.clone(),
-                )),
-                subscription_request.filters.clone(),
-                SubscriptionInner {
-                    status: status.clone(),
-                    active_subscription: None,
-                    subscription_request,
-                },
-            )
-            .await;
-
-        self.queued_subscriptions.fetch_add(1, Ordering::Relaxed);
+        let subscription_id = PoolSubscriptionId((
+            subscription_request.subscription_id.clone(),
+            specific_url.clone(),
+        ));
 
-        let this = self.clone();
-        tokio::spawn(async move {
-            this.update_active_subscriptions().await;
-        });
+        self.subscriptions.write().await.insert(
+            subscription_id.clone(),
+            SubscriptionInner {
+                status: Status::Queued,
+                active_subscription: None,
+                subscription_request,
+            },
+        );
+
+        // add subscription to the queue, to be scheduled eventually
+        self.subscription_queue
+            .write()
+            .await
+            .push_back(subscription_id.clone());
+
+        self.total_subscriptions.fetch_add(1, Ordering::Relaxed);
+        self.active_subscription_scheduler();
 
         ActiveSubscription {
-            _id: id,
-            status,
-            active_subscriptions: self.active_subscriptions.clone(),
-            queued_subscriptions: self.queued_subscriptions.clone(),
-            stale_subscriptions: self.stale_subscriptions.clone(),
+            unsubscriber: Some((subscription_id, self.clone())),
         }
     }
 
+    fn remove(self: Arc<Self>, subscription_id: PoolSubscriptionId) {
+        let this = self;
+        tokio::spawn(async move {
+            let mut subscriptions = this.subscriptions.write().await;
+            if let Some(id) = subscriptions.remove(&subscription_id) {
+                this.active_subscription_scheduler();
+                this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
+            }
+        });
+    }
+
     /// Total number of subscribers
     pub fn total_subscribers(&self) -> usize {
-        self.active_subscriptions.load(Ordering::Relaxed)
-            + self.queued_subscriptions.load(Ordering::Relaxed)
-            + self.stale_subscriptions.load(Ordering::Relaxed)
+        self.total_subscriptions.load(Ordering::Relaxed)
     }
 }

+ 1 - 1
crates/subscription-manager/src/lib.rs

@@ -134,7 +134,7 @@ where
     }
 
     /// Get the subscriptions list as a mutable reference
-    pub async fn subbscriptions_mut(&self) -> RwLockWriteGuard<'_, BTreeMap<I, Subscription<T>>> {
+    pub async fn subscriptions_mut(&self) -> RwLockWriteGuard<'_, BTreeMap<I, Subscription<T>>> {
         self.subscriptions.write().await
     }