Browse Source

Fixed subscription and active subscriptions

Cesar Rodas 1 năm trước cách đây
mục cha
commit
39279508ba

+ 65 - 11
crates/client/src/client.rs

@@ -29,6 +29,8 @@ use url::Url;
 
 type Subscriptions = Arc<RwLock<HashMap<SubscriptionId, subscribe::Subscribe>>>;
 
+const MAX_ACTIVE_SUBSCRIPTIONS: usize = 10;
+
 #[derive(Debug)]
 /// Active subscription
 ///
@@ -45,10 +47,10 @@ impl Drop for ActiveSubscription {
         let id = self.id.clone();
         let send_to_socket = self.send_to_socket.clone();
         tokio::spawn(async move {
-            subscriptions.write().await.remove(&id);
             let _ = send_to_socket
-                .send(nostr_rs_types::client::Close(id).into())
+                .send(nostr_rs_types::client::Close(id.clone()).into())
                 .await;
+            subscriptions.write().await.remove(&id);
         });
     }
 }
@@ -126,7 +128,7 @@ impl Client {
         mut send_to_socket: mpsc::Receiver<Request>,
         url: Url,
         is_connected: Arc<AtomicBool>,
-        send_on_connection: Subscriptions,
+        to_resubscribe: Subscriptions,
         filter: F,
     ) -> JoinHandle<()>
     where
@@ -159,16 +161,29 @@ impl Client {
                 log::info!("Connected to {}", url);
                 connection_attempts = 0;
 
-                let mut subscriptions = send_on_connection
+                // Convert all sent subscriptions to a local vector
+                let mut subscriptions = to_resubscribe
                     .read()
                     .await
-                    .iter()
-                    .map(|(sub_id, msg)| {
+                    .values()
+                    .map(|msg| Request::Request(msg.clone()))
+                    .collect::<Vec<_>>();
+
+                // Only keep the ones to be subscribed, moved the rest of the subscriptions to the queue
+                let mut to_subscribe_queue = if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
+                    subscriptions.split_off(MAX_ACTIVE_SUBSCRIPTIONS)
+                } else {
+                    vec![]
+                };
+
+                let mut subscriptions = subscriptions
+                    .into_iter()
+                    .map(|msg| {
                         (
-                            sub_id.to_owned(),
-                            serde_json::to_string(&Request::Request(msg.clone()))
-                                .ok()
-                                .map(Message::Text),
+                            msg.as_request()
+                                .map(|x| x.subscription_id.clone())
+                                .unwrap_or_default(),
+                            serde_json::to_string(&msg).ok().map(Message::Text),
                         )
                     })
                     .collect::<HashMap<_, _>>();
@@ -177,6 +192,7 @@ impl Client {
                     if let Some(msg) = msg.take() {
                         if let Err(x) = socket.send(msg).await {
                             log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                            break;
                         }
                     }
                 }
@@ -191,8 +207,42 @@ impl Client {
                                     log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
                                     continue;
                                 }
+                                if subscriptions.len() > MAX_ACTIVE_SUBSCRIPTIONS {
+                                    log::warn!("{}: Queueing subscription to {} for later", url, sub.subscription_id);
+                                    to_subscribe_queue.push(msg.clone());
+                                    continue;
+                                }
+                                subscriptions.insert(sub.subscription_id.clone(), None);
+                            }
+
+
+                            let json = if let Ok(json) =  serde_json::to_string(&msg) {
+                                json
+                            } else {
+                                continue;
+                            };
+
+                            if let Err(x) = socket.send(Message::Text(json)).await {
+                                log::error!("{} : Reconnecting due {}", url, x);
+                                break;
                             }
-                            if let Ok(json) = serde_json::to_string(&msg) {
+
+                            if let Request::Close(close) = &msg {
+                                subscriptions.remove(&close.0);
+                                let json = if let Some(json) = to_subscribe_queue
+                                    .pop()
+                                    .and_then(|msg| {
+                                        subscriptions.insert(msg.as_request().map(|sub| sub.subscription_id.clone()).unwrap_or_default(), None);
+                                        serde_json::to_string(&msg).ok()
+                                    })
+                                    {
+                                        json
+                                    } else {
+                                        continue;
+                                    };
+
+
+                                log::info!("Sending: {} (queued subscription)", json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
                                     log::error!("{} : Reconnecting due {}", url, x);
                                     break;
@@ -226,6 +276,10 @@ impl Client {
 
                             let event: Result<Response, _> = serde_json::from_str(&msg);
 
+                            if let Ok(Response::Notice(err)) = &event {
+                                log::error!("{}: Active connections {}: {:?}", url, subscriptions.len(), err);
+                            }
+
                             if let Ok(msg) = event {
                                 if let Err(error) = filter(msg, url.clone(), return_to.clone()).await {
                                     log::error!("{}: Reconnecting client because of {}", url, error);

+ 4 - 0
crates/client/src/error.rs

@@ -10,6 +10,10 @@ pub enum Error {
     #[error("Url: {0}")]
     Url(#[from] url::ParseError),
 
+    /// Subscriptions must be unique
+    #[error("Duplicate subscription")]
+    DuplicateSubscriptionId,
+
     /// WebSocket client error
     #[error("Tungstenite: {0}")]
     Tungstenite(#[from] TungsteniteError),

+ 19 - 6
crates/client/src/pool/mod.rs

@@ -13,9 +13,13 @@ use std::{
         atomic::{AtomicUsize, Ordering},
         Arc,
     },
+    time::Duration,
 };
 use subscription::Scheduler;
-use tokio::sync::{mpsc, RwLock};
+use tokio::{
+    sync::{mpsc, RwLock},
+    time::sleep,
+};
 use url::Url;
 
 pub mod subscription;
@@ -81,6 +85,14 @@ impl Pool {
         let pool = Self::default();
         let connect = clients.into_iter().map(|url| pool.connect_to(url));
 
+        let x = pool.subscription_manager.clone();
+        tokio::spawn(async move {
+            loop {
+                log::info!("Active subscribers: {}", x.debug().await);
+                sleep(Duration::from_secs(5)).await;
+            }
+        });
+
         futures::executor::block_on(async {
             futures::future::join_all(connect)
                 .await
@@ -114,10 +126,8 @@ impl Pool {
     pub async fn subscribe(
         &self,
         subscription: subscribe::Subscribe,
-    ) -> subscription::ActiveSubscription {
-        self.subscription_manager
-            .subscribe(subscription, None)
-            .await
+    ) -> Result<subscription::ActiveSubscription, Error> {
+        self.subscription_manager.subscribe(subscription).await
     }
 
     /// Sends a request to all the connected relayers
@@ -210,7 +220,10 @@ mod test {
     #[tokio::test]
     async fn droppable_subscription() {
         let client_pool = Pool::default();
-        let subscription = client_pool.subscribe(Default::default()).await;
+        let subscription = client_pool
+            .subscribe(Default::default())
+            .await
+            .expect("valid");
 
         assert_eq!(client_pool.active_subscriptions().await, 1);
         drop(subscription);

+ 164 - 54
crates/client/src/pool/subscription.rs

@@ -10,6 +10,7 @@ use nostr_rs_types::{
 };
 use std::{
     collections::{BTreeMap, VecDeque},
+    ops::Deref,
     sync::{
         atomic::{AtomicUsize, Ordering},
         Arc,
@@ -27,23 +28,28 @@ pub enum Status {
     /// Subscribed is active and it is fetching previous records and no EOD has
     /// been received
     Fetching,
+    /// Refetching, like fetching but the EOD is ignored and nore relayed to
+    /// the listeners, since this is not the first the this subscription has
+    /// been created and it will be rotated soon
+    Refetching,
     /// Subscription is listening, an EOD has been received. This state can be
     /// Requeued is their spot is needed for other subscriptions
     Subscribed,
     /// Waiting to be subscribed again
     Requeued,
-    /// Resubscribed, like subscribed but the EOD is ignored and nore relayed to
-    /// the listeners, since this is not the first the this subscription has
-    /// been created and it will be rotated soon
-    Resubscribed,
 }
 
 #[derive(Debug, Default)]
 struct SubscriptionInner {
     /// Active subscription (in the client side), when this is Drop all clients unsubscribes
     active_subscription: Option<Vec<client::ActiveSubscription>>,
+
+    /// Keep track of the number of EOD received
+    end_of_stored_events: usize,
+
     /// Subscription status
     status: Status,
+
     /// raw request
     subscription_request: subscribe::Subscribe,
 }
@@ -55,9 +61,18 @@ struct SubscriptionInner {
 ///
 /// This must be dropped to unsubscribe from the subscription manager
 pub struct ActiveSubscription {
+    id: PoolSubscriptionId,
     unsubscriber: Option<(PoolSubscriptionId, Arc<Scheduler>)>,
 }
 
+impl Deref for ActiveSubscription {
+    type Target = PoolSubscriptionId;
+
+    fn deref(&self) -> &Self::Target {
+        &self.id
+    }
+}
+
 impl Drop for ActiveSubscription {
     fn drop(&mut self) {
         if let Some((id, scheduler)) = self.unsubscriber.take() {
@@ -68,11 +83,31 @@ impl Drop for ActiveSubscription {
 
 /// Pool subscription ID
 #[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
-pub struct PoolSubscriptionId((SubscriptionId, Option<Url>));
+pub struct PoolSubscriptionId(SubscriptionId);
+
+impl From<SubscriptionId> for PoolSubscriptionId {
+    fn from(id: SubscriptionId) -> Self {
+        Self(id)
+    }
+}
+
+impl From<&SubscriptionId> for PoolSubscriptionId {
+    fn from(id: &SubscriptionId) -> Self {
+        Self(id.clone())
+    }
+}
 
 impl Default for PoolSubscriptionId {
     fn default() -> Self {
-        Self((SubscriptionId::empty(), None))
+        Self(SubscriptionId::empty())
+    }
+}
+
+impl Deref for PoolSubscriptionId {
+    type Target = SubscriptionId;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
     }
 }
 
@@ -94,14 +129,17 @@ pub(crate) struct Scheduler {
 }
 
 /// Maximum number of subscriptions
-pub const MAX_SUBSCRIPTIONS: usize = 5;
+pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 5;
 
 impl Scheduler {
     /// Creates a new instance
     pub fn new(all_clients: AllClients) -> Self {
         Self {
             all_clients,
-            ..Default::default()
+            subscription_queue: Default::default(),
+            active_subscriptions: Default::default(),
+            subscriptions: Default::default(),
+            total_subscriptions: Default::default(),
         }
     }
 
@@ -111,7 +149,7 @@ impl Scheduler {
         for subscription in subscriptions.values_mut() {
             if matches!(
                 subscription.status,
-                Status::Fetching | Status::Resubscribed | Status::Subscribed,
+                Status::Fetching | Status::Refetching | Status::Subscribed,
             ) {
                 if let Ok(active_subscription) = client
                     .subscribe(subscription.subscription_request.clone())
@@ -137,24 +175,36 @@ impl Scheduler {
     ) -> Result<(), Error> {
         match message {
             Response::EndOfStoredEvents(subscription_id) => {
-                let subscription_id = PoolSubscriptionId((subscription_id.0, None));
-                let mut subscription = self.subscriptions.write().await;
-                if let Some(s) = subscription.get_mut(&subscription_id) {
-                    let old_status = s.status;
-                    s.status = Status::Subscribed;
-
-                    if old_status == Status::Fetching {
-                        return_to
-                            .try_send((
-                                Response::EndOfStoredEvents(subscription_id.0 .0.into()),
-                                url,
-                            ))
-                            .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                let subscription_id = PoolSubscriptionId(subscription_id.0);
+                let mut subscriptions = self.subscriptions.write().await;
+
+                if let Some(s) = subscriptions.get_mut(&subscription_id) {
+                    s.end_of_stored_events += 1;
+                    if s.end_of_stored_events
+                        >= s.active_subscription
+                            .as_ref()
+                            .map(|x| x.len())
+                            .unwrap_or_default()
+                    {
+                        // all clients have received the EOD
+                        let old_status = s.status;
+                        s.status = Status::Subscribed;
+
+                        self.active_subscription_scheduler();
+
+                        if old_status == Status::Fetching {
+                            return_to
+                                .try_send((
+                                    Response::EndOfStoredEvents(
+                                        s.subscription_request.subscription_id.clone().into(),
+                                    ),
+                                    url,
+                                ))
+                                .map_err(|e| Error::InternalChannel(e.to_string()))?;
+                        }
                     }
                 }
 
-                self.active_subscription_scheduler();
-
                 Ok(())
             }
             any_message => {
@@ -182,7 +232,9 @@ impl Scheduler {
                 >,
                  subscription_queue: &mut RwLockWriteGuard<'_, VecDeque<PoolSubscriptionId>>|
                  -> bool {
-                    for subscription_id in subscription_queue.iter() {
+                    let mut active = 0;
+
+                    for (i, subscription_id) in subscription_queue.iter().enumerate() {
                         let subscription =
                             if let Some(subscription) = subscriptions.get_mut(subscription_id) {
                                 subscription
@@ -190,17 +242,36 @@ impl Scheduler {
                                 continue;
                             };
 
-                        if matches!(subscription.status, Status::Subscribed) {
-                            // unsubscribe
-                            let _ = subscription.active_subscription.take();
-                            // update counter
-                            this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
-                            // update since for next request
-                            let now = Utc::now();
-                            for filter in subscription.subscription_request.filters.iter_mut() {
-                                filter.since = Some(now);
+                        match subscription.status {
+                            Status::Subscribed => {
+                                // unsubscribe
+                                subscription.status = Status::Requeued;
+                                subscription.active_subscription.take();
+                                // update counter
+                                this.active_subscriptions.fetch_sub(1, Ordering::Relaxed);
+                                // update since for next request
+                                let now = Utc::now();
+                                log::info!(
+                                    "Deschedule subscription {}",
+                                    subscription.subscription_request.subscription_id
+                                );
+                                for filter in subscription.subscription_request.filters.iter_mut() {
+                                    filter.since = Some(now);
+                                }
+                                return true;
                             }
-                            return true;
+                            Status::Fetching | Status::Refetching => {
+                                active += 1;
+                                if active >= MAX_ACTIVE_SUBSCRIPTIONS {
+                                    log::info!(
+                                        "Breaking after {} attempts (total {})",
+                                        i,
+                                        subscription_queue.len()
+                                    );
+                                    break;
+                                }
+                            }
+                            _ => {}
                         }
                     }
 
@@ -221,13 +292,15 @@ impl Scheduler {
                         continue;
                     };
 
-                // add subscription id back to the last element, to be visited later
-                subscription_queue.push_back(subscription_id.clone());
-
-                let prev_status = subscription.status;
+                if matches!(subscription.status, Status::Fetching | Status::Refetching) {
+                    subscription_queue.push_front(subscription_id.clone());
+                } else {
+                    subscription_queue.push_back(subscription_id.clone());
+                }
 
-                if matches!(prev_status, Status::Queued | Status::Requeued) {
-                    if this.active_subscriptions.load(Ordering::SeqCst) >= MAX_SUBSCRIPTIONS
+                if matches!(subscription.status, Status::Queued | Status::Requeued) {
+                    let prev_status = subscription.status;
+                    if this.active_subscriptions.load(Ordering::SeqCst) >= MAX_ACTIVE_SUBSCRIPTIONS
                         && !deschedule(&mut subscriptions, &mut subscription_queue)
                     {
                         subscriptions.insert(subscription_id, subscription);
@@ -235,6 +308,11 @@ impl Scheduler {
                         break;
                     }
 
+                    // This connection is active now (or will be), therefore move it to the front
+                    // of the subscription queue so it can be descheduled first when the times comes
+                    let _ = subscription_queue.pop_back();
+                    subscription_queue.push_front(subscription_id.clone());
+
                     let wait_all = clients
                         .values()
                         .map(|(_, sender)| {
@@ -247,12 +325,17 @@ impl Scheduler {
                         .into_iter()
                         .collect::<Result<Vec<_>, _>>()
                     {
+                        log::info!(
+                            "Promoting subscription {} to active",
+                            subscription.subscription_request.subscription_id
+                        );
                         subscription.active_subscription = Some(active_subscriptions);
                         subscription.status = if prev_status == Status::Queued {
                             Status::Fetching
                         } else {
-                            Status::Resubscribed
+                            Status::Refetching
                         };
+                        subscription.end_of_stored_events = 0;
 
                         this.active_subscriptions.fetch_add(1, Ordering::Relaxed);
                     }
@@ -268,19 +351,19 @@ impl Scheduler {
     pub async fn subscribe(
         self: &Arc<Self>,
         subscription_request: subscribe::Subscribe,
-        specific_url: Option<Url>,
-    ) -> ActiveSubscription {
-        let subscription_id = PoolSubscriptionId((
-            subscription_request.subscription_id.clone(),
-            specific_url.clone(),
-        ));
-
-        self.subscriptions.write().await.insert(
+    ) -> Result<ActiveSubscription, Error> {
+        let mut subscriptions = self.subscriptions.write().await;
+        let subscription_id = PoolSubscriptionId(subscription_request.subscription_id.clone());
+
+        if subscriptions.get(&subscription_id).is_some() {
+            return Err(Error::DuplicateSubscriptionId);
+        }
+
+        subscriptions.insert(
             subscription_id.clone(),
             SubscriptionInner {
-                status: Status::Queued,
-                active_subscription: None,
                 subscription_request,
+                ..Default::default()
             },
         );
 
@@ -293,22 +376,49 @@ impl Scheduler {
         self.total_subscriptions.fetch_add(1, Ordering::Relaxed);
         self.active_subscription_scheduler();
 
-        ActiveSubscription {
+        Ok(ActiveSubscription {
+            id: subscription_id.clone(),
             unsubscriber: Some((subscription_id, self.clone())),
-        }
+        })
     }
 
+    /// Removes a subscription and drop it from the scheduler
     fn remove(self: Arc<Self>, subscription_id: PoolSubscriptionId) {
         let this = self;
         tokio::spawn(async move {
             let mut subscriptions = this.subscriptions.write().await;
+            this.subscription_queue
+                .write()
+                .await
+                .retain(|x| x != &subscription_id);
+
             if subscriptions.remove(&subscription_id).is_some() {
+                log::info!(
+                    "Unsubscribing and dropping from scheduler {}",
+                    subscription_id.0,
+                );
+
                 this.active_subscription_scheduler();
                 this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
             }
         });
     }
 
+    /// debug
+    pub async fn debug(&self) -> String {
+        let a = self.subscriptions.read().await;
+        format!(
+            "Active: {} - {}",
+            serde_json::to_string(
+                &a.iter()
+                    .map(|(k, v)| (k.to_string(), format!("{:?}", v.status)))
+                    .collect::<Vec<_>>(),
+            )
+            .unwrap(),
+            a.len(),
+        )
+    }
+
     /// Total number of subscribers
     pub fn total_subscribers(&self) -> usize {
         self.total_subscriptions.load(Ordering::Relaxed)

+ 4 - 1
crates/dump/src/main.rs

@@ -22,7 +22,10 @@ async fn main() {
         Url::parse("wss://relay.snort.social").expect("valid url"),
     ]);
 
-    let _ = clients.subscribe(Subscribe::default().into()).await;
+    let _ = clients
+        .subscribe(Subscribe::default().into())
+        .await
+        .expect("v");
 
     loop {
         if let Some((msg, relayed_by)) = clients.recv().await {

+ 99 - 76
crates/personal-relayer/src/lib.rs

@@ -3,14 +3,14 @@ use nostr_rs_client::Pool;
 use nostr_rs_relayer::Relayer;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{
-    types::{content::profile::Profile, tag::TagType, Content, Event, Filter, Id},
-    Request,
+    client,
+    types::{
+        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind,
+    },
+    Request, Response,
 };
 use serde::Serialize;
-use std::{
-    collections::{HashMap, HashSet},
-    ops::Deref,
-};
+use std::{collections::HashSet, ops::Deref, time::Duration};
 use tokio::{net::TcpListener, task::JoinHandle};
 use url::Url;
 
@@ -74,7 +74,7 @@ impl Contact {
 
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
-    accounts: HashMap<Id, Contact>,
+    accounts: Vec<Id>,
 }
 
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
@@ -82,17 +82,17 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
         let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
         let relayer = Relayer::new(Some(storage), Some(pool))?;
 
-        Ok(Self {
-            relayer,
-            accounts: accounts
-                .into_iter()
-                .map(|a| (a.clone(), Contact::new(a, None)))
-                .collect::<HashMap<_, _>>(),
-        })
+        Ok(Self { relayer, accounts })
     }
 
-    pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
+    pub async fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
         let (relayer, relayer_handler) = self.relayer.main(server)?;
+        let kinds = vec![
+            Kind::Contacts,
+            Kind::Metadata,
+            Kind::MuteList,
+            Kind::Followset,
+        ];
 
         let tasks = vec![
             relayer_handler,
@@ -100,85 +100,108 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
                 let mut local_connection = relayer.create_new_local_connection().await;
                 local_connection
                     .send(Request::Request(
-                        vec![Filter {
-                            authors: self.accounts.keys().cloned().collect::<Vec<_>>(),
-                            //kinds: vec![Kind::Metadata, Kind::ShortTextNote, Kind::Contacts],
-                            ..Default::default()
-                        }]
+                        vec![
+                            Filter {
+                                authors: self.accounts.clone(),
+                                kinds: kinds.clone(),
+                                ..Default::default()
+                            },
+                            Filter {
+                                kinds: kinds.clone(),
+                                tags: vec![(
+                                    "p".to_owned(),
+                                    self.accounts
+                                        .iter()
+                                        .map(|id| TagValue::Id(id.clone()))
+                                        .collect::<HashSet<TagValue>>(),
+                                )]
+                                .into_iter()
+                                .collect(),
+                                ..Default::default()
+                            },
+                        ]
                         .into(),
                     ))
                     .await
                     .expect("Failed to send request");
 
+                let mut already_subscribed = HashSet::new();
+                let mut to_remove = HashSet::new();
+
                 loop {
                     while let Some(res) = local_connection.recv().await {
-                        if let Some(event) = res.as_event() {
-                            match event.content() {
-                                Content::Metadata(profile) => {
-                                    if let Some(account) = self.accounts.get_mut(event.author()) {
-                                        account.profile = Some(profile.clone());
-                                        account.content.push(event.deref().clone());
-                                    }
+                        match res {
+                            Response::EndOfStoredEvents(id) => {
+                                if to_remove.contains(&id.0) {
+                                    let _ = local_connection.future_send(
+                                        Request::Close(id.0.into()),
+                                        Duration::from_secs(10),
+                                    );
                                 }
-                                Content::Contacts(_) => {
-                                    let current = event.author().clone();
-                                    let mut current_user = if let Some(current_user) =
-                                        self.accounts.remove(&current)
-                                    {
-                                        current_user
-                                    } else {
-                                        continue;
-                                    };
-
-                                    let mut ids = vec![];
-
-                                    for tag in event.tags() {
-                                        if let TagType::PubKey(pub_key, relayer_url, _) =
-                                            tag.deref()
-                                        {
-                                            let followed = self
-                                                .accounts
-                                                .entry(pub_key.clone())
-                                                .or_insert(Contact::new(
-                                                    pub_key.clone(),
-                                                    Some(current.clone()),
-                                                ));
-
-                                            if let Some(relayer_url) = relayer_url {
-                                                let _ = relayer
-                                                    .connect_to_relayer(relayer_url.clone())
-                                                    .await;
-                                            }
+                            }
+                            Response::Event(event) => {
+                                match event.content() {
+                                    Content::Metadata(_profile) => {}
+                                    Content::Contacts(_) => {
+                                        let mut ids = vec![];
+
+                                        for tag in event.tags() {
+                                            if let TagType::PubKey(pub_key, relayer_url, _) =
+                                                tag.deref()
+                                            {
+                                                if let Some(_relayer_url) = relayer_url {
+                                                    //let _ = relayer
+                                                    //    .connect_to_relayer(relayer_url.clone())
+                                                    //    .await;
+                                                }
 
-                                            current_user.following.insert(pub_key.clone());
-                                            followed.followed_by.insert(current.clone());
-                                            ids.push(pub_key.clone());
+                                                if !already_subscribed.contains(pub_key) {
+                                                    ids.push(pub_key.clone());
+                                                    already_subscribed.insert(pub_key.clone());
+                                                }
+                                            }
                                         }
-                                    }
 
-                                    self.accounts.insert(current, current_user);
+                                        if ids.len() > 0 {
+                                            log::info!("found {} authors", ids.len());
+                                        }
 
-                                    for authors in ids.chunks(20).collect::<Vec<_>>().into_iter() {
-                                        let _ = local_connection
-                                            .send(Request::Request(
+                                        for authors in
+                                            ids.chunks(20).collect::<Vec<_>>().into_iter()
+                                        {
+                                            let subscribe: client::Subscribe = vec![
                                                 Filter {
+                                                    kinds: kinds.clone(),
                                                     authors: authors.to_vec(),
                                                     ..Default::default()
-                                                }
-                                                .into(),
-                                            ))
-                                            .await;
-                                    }
-                                }
-                                Content::ShortTextNote(_) => {
-                                    if let Some(account) = self.accounts.get_mut(event.author()) {
-                                        account.content.push(event.deref().clone());
+                                                },
+                                                Filter {
+                                                    kinds: kinds.clone(),
+                                                    tags: vec![(
+                                                        "p".to_owned(),
+                                                        authors
+                                                            .iter()
+                                                            .map(|id| TagValue::Id(id.clone()))
+                                                            .collect::<HashSet<TagValue>>(),
+                                                    )]
+                                                    .into_iter()
+                                                    .collect(),
+                                                    ..Default::default()
+                                                },
+                                            ]
+                                            .into();
+                                            to_remove.insert(subscribe.subscription_id.clone());
+
+                                            let _ = local_connection
+                                                .send(Request::Request(subscribe))
+                                                .await;
+                                        }
                                     }
+                                    Content::ShortTextNote(_) => {}
+                                    _ => {}
                                 }
-                                _ => {}
                             }
-                        } else {
-                            println!("Not an event: {:?}", res);
+                            _ => {}
                         }
                     }
                 }

+ 22 - 3
crates/relayer/src/connection/local.rs

@@ -4,17 +4,22 @@
 use crate::{connection::ConnectionId, Error, Relayer};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{Request, Response};
-use std::sync::Arc;
-use tokio::sync::mpsc::{Receiver, Sender};
+use std::{sync::Arc, time::Duration};
+use tokio::{
+    sync::mpsc::{Receiver, Sender},
+    task::JoinHandle,
+    time::sleep,
+};
 
 /// Local connection
 pub struct LocalConnection<T>
 where
     T: Storage + Send + Sync + 'static,
 {
+    /// The connection ID
+    pub conn_id: ConnectionId,
     sender: Sender<(ConnectionId, Request)>,
     receiver: Receiver<Response>,
-    pub(crate) conn_id: ConnectionId,
     relayer: Arc<Relayer<T>>,
 }
 
@@ -22,6 +27,10 @@ impl<T> LocalConnection<T>
 where
     T: Storage + Send + Sync + 'static,
 {
+    /// Number of queued messages
+    pub fn queued_messages(&self) -> usize {
+        self.receiver.len()
+    }
     /// Receive a message from the relayer
     pub async fn recv(&mut self) -> Option<Response> {
         self.receiver.recv().await
@@ -32,6 +41,16 @@ where
         self.receiver.try_recv().ok()
     }
 
+    /// Queues sending a message to the relayer in the future time
+    pub fn future_send(&self, request: Request, in_the_future: Duration) -> JoinHandle<()> {
+        let sender = self.sender.clone();
+        let conn_id = self.conn_id.clone();
+        tokio::spawn(async move {
+            sleep(in_the_future).await;
+            let _ = sender.send((conn_id, request)).await;
+        })
+    }
+
     /// Sends a request to the relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         self.sender

+ 6 - 9
crates/relayer/src/connection/mod.rs

@@ -42,14 +42,6 @@ impl ConnectionId {
     pub fn new_empty() -> Self {
         Self(0)
     }
-
-    /// Check if the connection id is empty
-    ///
-    /// Empty connection id is used for messages from Client pool to the relayer
-    #[inline]
-    pub fn is_empty(&self) -> bool {
-        self.0 == 0
-    }
 }
 
 type CompoundSubcription = (
@@ -68,7 +60,7 @@ pub struct Connection {
     handler: Option<JoinHandle<()>>,
 }
 
-const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
+const MAX_SUBSCRIPTIONS_BUFFER: usize = 100_000;
 
 impl Drop for Connection {
     fn drop(&mut self) {
@@ -93,6 +85,11 @@ impl Connection {
         )
     }
 
+    /// If this connection is a local connection and not a TCP-extenal connection
+    pub fn is_local_connection(&self) -> bool {
+        self.handler.is_none()
+    }
+
     /// Create new connection
     pub async fn new_connection(
         send_message_to_relayer: Sender<(ConnectionId, Request)>,

+ 127 - 87
crates/relayer/src/relayer.rs

@@ -3,7 +3,7 @@ use crate::{
     Connection, Error,
 };
 use futures_util::StreamExt;
-use nostr_rs_client::{Error as ClientError, Pool, Url};
+use nostr_rs_client::{pool::subscription::PoolSubscriptionId, Pool, Url};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_subscription_manager::SubscriptionManager;
 use nostr_rs_types::{
@@ -18,10 +18,12 @@ use std::{
 };
 use tokio::{
     net::{TcpListener, TcpStream},
-    sync::mpsc::{channel, Receiver, Sender},
-};
-use tokio::{
-    sync::{mpsc, RwLock},
+    sync::{
+        mpsc::{
+            self, {channel, Receiver, Sender},
+        },
+        RwLock,
+    },
     task::JoinHandle,
 };
 
@@ -47,27 +49,23 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// otherwise all the messages are going to be ephemeral, making this
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// be able to perform any optimization like prefetching content while offline
-    storage: Option<T>,
+    storage: Arc<Option<T>>,
     /// Subscription manager
     subscription_manager: Arc<SubscriptionManager<RelayerSubscriptionId, ()>>,
     /// List of all active connections
-    connections: RwLock<HashMap<ConnectionId, Connection>>,
+    connections: Arc<RwLock<HashMap<ConnectionId, Connection>>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
     send_to_relayer: Sender<(ConnectionId, Request)>,
     /// This Receiver is the relayer the way the relayer receives messages
     relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
+
     /// Client pool
     ///
-    /// A relayer can optionally be connected to a pool of clients to get foreign events.
-    client_pool: Option<(Pool, JoinHandle<()>)>,
-}
-
-impl<T: Storage + Send + Sync + 'static> Drop for Relayer<T> {
-    fn drop(&mut self) {
-        if let Some((_, handle)) = self.client_pool.take() {
-            handle.abort();
-        }
-    }
+    /// A relayer can optionally be connected to a pool of clients to get
+    /// foreign events.
+    client_pool: Option<Pool>,
+    client_pool_receiver: Option<Receiver<(Response, Url)>>,
+    client_pool_subscriptions: RwLock<HashMap<PoolSubscriptionId, (SubscriptionId, ConnectionId)>>,
 }
 
 impl<T: Storage + Send + Sync + 'static> Relayer<T> {
@@ -80,25 +78,36 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     /// and create a network of relayers, reposting events to them and
     /// subscribing to their events.`gqq`
     pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
-        let (sender, receiver) = channel(100_000);
+        let (relayer_sender, relayer_receiver) = channel(100_000);
+
+        let (client_pool_receiver, client_pool) = if let Some(client_pool) = client_pool {
+            let result = client_pool.split()?;
+            (result.0, Some(result.1))
+        } else {
+            let (_, receiver) = mpsc::channel(1);
+            (receiver, None)
+        };
+
         Ok(Self {
-            storage,
+            storage: Arc::new(storage),
             subscription_manager: Default::default(),
-            send_to_relayer: sender.clone(),
-            relayer_receiver: Some(receiver),
+            send_to_relayer: relayer_sender,
+            relayer_receiver: Some(relayer_receiver),
             connections: Default::default(),
-            client_pool: if let Some(client_pool) = client_pool {
-                Some(Self::handle_client_pool(client_pool, sender)?)
-            } else {
-                None
-            },
+            client_pool_receiver: Some(client_pool_receiver),
+            client_pool: client_pool,
+            client_pool_subscriptions: Default::default(),
         })
     }
 
     /// Connects to the relayer pool
     pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
-        let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
-        let _ = client_pool.connect_to(url).await?;
+        let _ = self
+            .client_pool
+            .as_ref()
+            .ok_or(Error::NoClient)?
+            .connect_to(url)
+            .await?;
         Ok(())
     }
 
@@ -117,7 +126,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     ///
     /// This function consumes the object and takes the ownership. The returned
     /// JoinHandle() can be used to stop the main loop
-    pub fn main(self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
+    pub fn main(mut self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
+        let mut client_pool_receiver = self
+            .client_pool_receiver
+            .take()
+            .ok_or(Error::AlreadySplitted)?;
+
         let (this, mut receiver) = self.split()?;
         let _self = Arc::new(this);
         let this = _self.clone();
@@ -126,22 +140,36 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             loop {
                 tokio::select! {
                     Ok((stream, _)) = server.accept() => {
-                        // accept new external connections
+                        // accept new connections
                         let _ = this.add_connection(None, stream).await;
                     },
-                    Some((conn_id, request)) = receiver.recv() => {
-                        // receive messages from the connection pool
-                        if conn_id.is_empty() {
-                            // message received from client pool
-                            if let Request::Event(event) = request {
+                    Some((response, _)) = client_pool_receiver.recv() => {
+                        // process messages from anothe relayer, broadcast it and store it
+                        match response {
+                            Response::Event(event) => {
+                                // we received a message from the client pool, store it locally
+                                // and re-broadcast it.
                                 let _ = this.broadcast(event.deref()).await;
-                                if let Some(storage) = this.storage.as_ref() {
-                                    let _ = storage.store_local_event(&event).await;
-                                }
                             }
-                            continue;
+                            Response::EndOfStoredEvents(sub) => {
+                                let connections = this.connections.read().await;
+                                let (sub_id, connection) = if let Some((sub_id, conn_id)) = this.client_pool_subscriptions.write().await.remove(&(sub.deref().into())) {
+                                     if let Some(connection) = connections.get(&conn_id) {
+                                        (sub_id, connection)
+                                    } else {
+                                        continue;
+                                    }
+                                } else {
+                                    continue
+                                };
+
+                                let _ = connection.send(Response::EndOfStoredEvents(sub_id.into()));
+                            }
+                            _ => {}
                         }
-
+                    }
+                    Some((conn_id, request)) = receiver.recv() => {
+                        // receive messages from our clients
                         let connections = this.connections.read().await;
                         let connection = if let Some(connection) = connections.get(&conn_id) {
                             connection
@@ -162,37 +190,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         Ok((_self, handle))
     }
 
-    /// Handle the client pool
-    ///
-    /// Main loop to consume messages from the client pool and broadcast them to the local subscribers
-    fn handle_client_pool(
-        client_pool: Pool,
-        send_message_to_relayer: Sender<(ConnectionId, Request)>,
-    ) -> Result<(Pool, JoinHandle<()>), ClientError> {
-        let (mut receiver, client_pool) = client_pool.split()?;
-
-        let handle = tokio::spawn(async move {
-            loop {
-                if let Some((response, _)) = receiver.recv().await {
-                    match response {
-                        Response::Event(event) => {
-                            let _ = send_message_to_relayer.try_send((
-                                ConnectionId::new_empty(),
-                                Request::Event(event.event.into()),
-                            ));
-                        }
-                        Response::EndOfStoredEvents(_) => {}
-                        x => {
-                            println!("x => {:?}", x);
-                        }
-                    }
-                }
-            }
-        });
-
-        Ok((client_pool, handle))
-    }
-
     /// Returns a reference to the internal database
     pub fn get_db(&self) -> &Option<T> {
         &self.storage
@@ -269,7 +266,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     let _ = storage.store_local_event(&event).await;
                 }
 
-                if let Some((client_pool, _)) = self.client_pool.as_ref() {
+                if let Some(client_pool) = self.client_pool.as_ref() {
                     // pass the event to the pool of clients, so this relayer can relay
                     // their local events to the clients in the network of relayers
                     let _ = client_pool.post(event).await;
@@ -284,11 +281,19 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 )?;
             }
             Request::Request(request) => {
-                let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref()
-                {
+                let foreign_subscription = if let Some(client_pool) = self.client_pool.as_ref() {
                     // pass the subscription request to the pool of clients, so this relayer
                     // can relay any unknown event to the clients through their subscriptions
-                    Some(client_pool.subscribe(request.clone()).await)
+                    let foreign_sub_id = client_pool
+                        .subscribe(request.filters.clone().into())
+                        .await?;
+
+                    self.client_pool_subscriptions.write().await.insert(
+                        foreign_sub_id.clone(),
+                        (request.subscription_id.clone(), connection.get_conn_id()),
+                    );
+
+                    Some(foreign_sub_id)
                 } else {
                     None
                 };
@@ -315,8 +320,10 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     }
                 }
 
-                let _ = connection
-                    .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+                if foreign_subscription.is_none() {
+                    let _ = connection
+                        .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
+                }
 
                 connection
                     .subscribe(
@@ -343,6 +350,38 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     }
 
     #[inline]
+    /// A non-blocking version of broadcast
+    #[allow(dead_code)]
+    fn broadcast_and_forget(&self, event: Event) {
+        let storage = self.storage.clone();
+        let connections = self.connections.clone();
+        let subscription_manager = self.subscription_manager.clone();
+
+        tokio::spawn(async move {
+            if let Some(storage) = storage.as_ref() {
+                if !storage.store(&event).await.unwrap_or_default() {
+                    return;
+                }
+            }
+
+            let connections = connections.read().await;
+            for RelayerSubscriptionId((sub_id, conn_id)) in
+                subscription_manager.get_subscribers(&event).await
+            {
+                if let Some(connection) = connections.get(&conn_id) {
+                    let _ = connection.send(
+                        relayer::Event {
+                            subscription_id: sub_id,
+                            event: event.clone(),
+                        }
+                        .into(),
+                    );
+                }
+            }
+        });
+    }
+
+    #[inline]
     /// Broadcast a given event to all local subscribers
     pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
         if let Some(storage) = self.storage.as_ref() {
@@ -1012,18 +1051,19 @@ mod test {
         let (main_client, _main_client_inscope) =
             Pool::new_with_clients(vec![main_relayer]).expect("valid pool");
 
-        let _sub = reader_client.subscribe(Default::default()).await;
+        let _sub = reader_client
+            .subscribe(Default::default())
+            .await
+            .expect("v");
 
         sleep(Duration::from_millis(20)).await;
 
-        for _ in 0..3 {
-            assert!(reader_client
-                .try_recv()
-                .map(|(r, _)| r)
-                .expect("valid message")
-                .as_end_of_stored_events()
-                .is_some());
-        }
+        assert!(reader_client
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message: step")
+            .as_end_of_stored_events()
+            .is_some());
         assert!(reader_client.try_recv().is_none());
 
         let account1 = Account::default();
@@ -1083,7 +1123,7 @@ mod test {
         // connected to the main relayer
         let (mut main_client, _in_scope) =
             Pool::new_with_clients(vec![main_relayer]).expect("valid client");
-        let _sub = main_client.subscribe(Default::default()).await;
+        let _sub = main_client.subscribe(Default::default()).await.expect("v");
 
         sleep(Duration::from_millis(10)).await;
         assert!(main_client

+ 1 - 5
crates/storage/sqlite/src/lib.rs

@@ -119,7 +119,7 @@ impl SQLite {
             CREATE INDEX IF NOT EXISTS by_id ON event_index (event_id, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_author_id ON event_index (author_id, kind, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_tag ON event_index (tag_name, tag_value, created_at DESC);
-            CREATE INDEX IF NOT EXISTS sorted ON event_index (tag_name, tag_value, created_at DESC);
+            CREATE INDEX IF NOT EXISTS sorted ON event_index (created_at DESC);
             ",
         )
         .execute(index_db)
@@ -152,10 +152,6 @@ impl<'a> Stream for Cursor<'a> {
 
 impl SQLite {
     fn build_index(indexing: Arc<AtomicUsize>, pool: Pool<Sqlite>, event: Event, is_retry: bool) {
-        if !is_retry {
-            indexing.fetch_add(1, Ordering::Relaxed);
-        }
-
         tokio::spawn(async move {
             let mut indexes = vec![];
 

+ 6 - 0
crates/types/src/client/close.rs

@@ -14,6 +14,12 @@ use serde_json::Value;
 #[derive(Clone, Debug)]
 pub struct Close(pub SubscriptionId);
 
+impl From<SubscriptionId> for Close {
+    fn from(value: SubscriptionId) -> Self {
+        Self(value)
+    }
+}
+
 impl Deref for Close {
     type Target = SubscriptionId;
 

+ 11 - 2
crates/types/src/types/kind.rs

@@ -48,6 +48,11 @@ pub enum Kind {
     Zap,
     /// Relay List Metadata - NIP-65
     RelayListMetadata,
+    /// things the user doesn't want to see in their feeds
+    MuteList,
+    /// categorized groups of users a client may choose to check out in
+    /// different circumstances
+    Followset,
     /// Unknown Kind
     Unknown(u32),
 }
@@ -99,7 +104,9 @@ impl From<Kind> for u32 {
             Kind::Reaction => 7,
             Kind::ZapRequest => 9734,
             Kind::Zap => 9735,
-            Kind::RelayListMetadata => 10002,
+            Kind::MuteList => 10_000,
+            Kind::RelayListMetadata => 10_002,
+            Kind::Followset => 30_000,
             Kind::Unknown(t) => t,
         }
     }
@@ -118,7 +125,9 @@ impl From<u32> for Kind {
             7 => Kind::Reaction,
             9734 => Kind::ZapRequest,
             9735 => Kind::Zap,
-            10002 => Kind::RelayListMetadata,
+            10_000 => Kind::MuteList,
+            10_002 => Kind::RelayListMetadata,
+            30_000 => Kind::Followset,
             any => Kind::Unknown(any),
         }
     }