Browse Source

Improve pool subscription to newer clients

Cesar Rodas 3 tháng trước cách đây
mục cha
commit
90a2b54f4b
1 tập tin đã thay đổi với 19 bổ sung7 xóa
  1. 19 7
      crates/client/src/pool.rs

+ 19 - 7
crates/client/src/pool.rs

@@ -22,7 +22,7 @@ pub struct Pool {
     clients: RwLock<HashMap<Url, Client>>,
     sender: mpsc::Sender<(Response, Url)>,
     receiver: Option<mpsc::Receiver<(Response, Url)>>,
-    subscriptions: RwLock<HashMap<SubscriptionId, Vec<ActiveSubscription>>>,
+    subscriptions: RwLock<HashMap<SubscriptionId, (subscribe::Subscribe, Vec<ActiveSubscription>)>>,
 }
 
 impl Default for Pool {
@@ -86,11 +86,14 @@ impl Pool {
             .collect::<Vec<_>>();
 
         self.subscriptions.write().await.insert(
-            subscription.subscription_id,
-            join_all(wait_all)
-                .await
-                .into_iter()
-                .collect::<Result<Vec<_>, _>>()?,
+            subscription.subscription_id.clone(),
+            (
+                subscription,
+                join_all(wait_all)
+                    .await
+                    .into_iter()
+                    .collect::<Result<Vec<_>, _>>()?,
+            ),
         );
 
         Ok(())
@@ -125,10 +128,19 @@ impl Pool {
     /// already exists false will be returned
     pub async fn connect_to(&self, url: Url) {
         let mut clients = self.clients.write().await;
+        let mut subscriptions = self.subscriptions.write().await;
 
         if !clients.contains_key(&url) {
             log::warn!("Connecting to {}", url);
-            clients.insert(url.clone(), Client::new(self.sender.clone(), url));
+            let client = Client::new(self.sender.clone(), url.clone());
+
+            for (filter, sub) in subscriptions.values_mut() {
+                let _ = client.subscribe(filter.clone()).await.map(|subscription| {
+                    sub.push(subscription);
+                });
+            }
+
+            clients.insert(url.clone(), client);
         }
     }
 }