Prechádzať zdrojové kódy

Improved the client pool and relayer integration

Cesar Rodas 3 mesiacov pred
rodič
commit
c8efb90f37
1 zmenil súbory, kde vykonal 19 pridanie a 3 odobranie
  1. 19 3
      crates/relayer/src/relayer.rs

+ 19 - 3
crates/relayer/src/relayer.rs

@@ -45,7 +45,7 @@ pub struct Relayer<T: Storage> {
     subscriptions: RwLock<HashMap<Subscription, RwLock<Subscriptions>>>,
     clients: RwLock<HashMap<u128, Connection>>,
     sender: Sender<(u128, Request)>,
-    client_pool: Option<(Pool, JoinHandle<()>)>,
+    client_pool: Option<(RwLock<Pool>, JoinHandle<()>)>,
 }
 
 impl<T: Storage> Drop for Relayer<T> {
@@ -82,7 +82,7 @@ impl<T: Storage> Relayer<T> {
     fn handle_client_pool(
         client_pool: Pool,
         sender: Sender<(u128, Request)>,
-    ) -> Result<(Pool, JoinHandle<()>), ClientError> {
+    ) -> Result<(RwLock<Pool>, JoinHandle<()>), ClientError> {
         let (mut receiver, client_pool) = client_pool.split()?;
 
         let handle = tokio::spawn(async move {
@@ -98,7 +98,7 @@ impl<T: Storage> Relayer<T> {
             }
         });
 
-        Ok((client_pool, handle))
+        Ok((RwLock::new(client_pool), handle))
     }
 
     /// Returns a reference to the internal database
@@ -129,6 +129,16 @@ impl<T: Storage> Relayer<T> {
                 self.store_and_broadcast_local_event(event.deref()).await;
             }
             Request::Request(request) => {
+                if let Some((client_pool, _)) = self.client_pool.as_ref() {
+                    // pass the subscription request to the pool of clients, so this relayer
+                    // can relay any unknown event to the clients through their subscriptions
+                    let _ = client_pool
+                        .write()
+                        .await
+                        .subscribe(request.filters.clone().into())
+                        .await;
+                }
+
                 // Create subscription
                 let (sub_id, receiver) = connection
                     .create_subscription(request.subscription_id.deref().to_owned())
@@ -266,6 +276,12 @@ impl<T: Storage> Relayer<T> {
                 Self::broadcast_to_subscribers(subscribers.read().await, event);
             }
         }
+
+        if let Some((client_pool, _)) = self.client_pool.as_ref() {
+            // pass the event to the pool of clients, so this relayer can relay
+            // their local events to the clients in the network of relayers
+            let _ = client_pool.write().await.post(event.clone().into()).await;
+        }
     }
 }