Bläddra i källkod

Add support for closing a request

Cesar Rodas 1 år sedan
förälder
incheckning
b47d52d898
2 ändrade filer med 67 tillägg och 23 borttagningar
  1. 9 9
      crates/relayer/src/connection.rs
  2. 58 14
      crates/relayer/src/relayer.rs

+ 9 - 9
crates/relayer/src/connection.rs

@@ -12,7 +12,7 @@ use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 pub struct Connection {
     pub(crate) conn_id: u128,
     sender: Sender<Response>,
-    subscriptions: RwLock<HashMap<String, Vec<u128>>>,
+    subscriptions: RwLock<HashMap<String, u128>>,
 }
 
 const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
@@ -69,14 +69,14 @@ impl Connection {
         Ok(self.sender.try_send(response)?)
     }
 
-    pub fn create_subscription(&self, id: String) -> (u128, u128, Sender<Response>) {
+    pub fn get_subscription_id(&self, id: &str) -> Option<u128> {
+        let subscriptions = self.subscriptions.read();
+        subscriptions.get(id).copied()
+    }
+
+    pub fn create_subscription(&self, id: String) -> (u128, Sender<Response>) {
         let mut subscriptions = self.subscriptions.write();
-        let internal_id = get_id();
-        if let Some(subscriptions) = subscriptions.get_mut(&id) {
-            subscriptions.push(internal_id);
-        } else {
-            subscriptions.insert(id, vec![internal_id]);
-        }
-        (self.conn_id, internal_id, self.sender.clone())
+        let internal_id = subscriptions.entry(id).or_insert_with(get_id);
+        (*internal_id, self.sender.clone())
     }
 }

+ 58 - 14
crates/relayer/src/relayer.rs

@@ -19,10 +19,24 @@ pub struct SubscriptionType {
     kind: Option<u32>,
 }
 
-type Subscriptions = HashMap<(u128, u128), (SubscriptionId, Sender<Response>)>;
+type SubId = u128;
+
+type Subscriptions = HashMap<SubId, (SubscriptionId, Sender<Response>)>;
 
 pub struct Relayer {
     storage: RocksDb,
+    /// Keeps a map between the internal subscription ID and the subscription
+    /// type. One subscription ID may have multiple subscription types.
+    ///
+    /// Each connection keeps a list of the subscription ID provided by the user
+    /// (String) and the internal, globally recognized subscription ID which is
+    /// internal (SubId)
+    subscriptions_ids_index: RwLock<HashMap<SubId, Vec<SubscriptionType>>>,
+    /// Each subscription type that is active has a list of subscriptions.
+    ///
+    /// A single REQ can be subscribed to multiple subscription types, specially
+    /// when it is translated in OR filters. It is designed this way to allow a
+    /// fast iteration and match quickly filters.
     subscriptions: RwLock<HashMap<SubscriptionType, RwLock<Subscriptions>>>,
     clients: RwLock<HashMap<u128, Connection>>,
     sender: Sender<(u128, Request)>,
@@ -35,6 +49,7 @@ impl Relayer {
             Arc::new(Self {
                 storage,
                 subscriptions: RwLock::new(HashMap::new()),
+                subscriptions_ids_index: RwLock::new(HashMap::new()),
                 clients: RwLock::new(HashMap::new()),
                 sender,
             }),
@@ -72,21 +87,38 @@ impl Relayer {
             Request::Request(request) => {
                 for filter in request.filters.clone().into_iter() {
                     // Create subscription
-                    let (conn_id, sub_id, receiver) =
+                    let (sub_id, receiver) =
                         connection.create_subscription(request.subscription_id.deref().to_owned());
+                    let mut sub_index = self.subscriptions_ids_index.write();
                     let mut subscriptions = self.subscriptions.write();
-                    Self::get_indexes_from_filter(&filter)
-                        .into_iter()
-                        .for_each(|index| {
-                            subscriptions
-                                .entry(index)
-                                .or_insert_with(|| RwLock::new(HashMap::new()))
-                                .write()
-                                .insert(
-                                    (conn_id, sub_id),
-                                    (request.subscription_id.clone(), receiver.clone()),
-                                );
+                    if let Some(prev_subs) = sub_index.remove(&sub_id) {
+                        // remove any previous subscriptions
+                        prev_subs.iter().for_each(|index| {
+                            if let Some(subscriptions) = subscriptions.get_mut(&index) {
+                                subscriptions.write().remove(&sub_id);
+                            }
                         });
+                    }
+                    sub_index.insert(
+                        sub_id,
+                        Self::get_indexes_from_filter(&filter)
+                            .into_iter()
+                            .map(|index| {
+                                subscriptions
+                                    .entry(index.clone())
+                                    .or_insert_with(|| RwLock::new(HashMap::new()))
+                                    .write()
+                                    .insert(
+                                        sub_id,
+                                        (request.subscription_id.clone(), receiver.clone()),
+                                    );
+                                index
+                            })
+                            .collect::<Vec<_>>(),
+                    );
+
+                    drop(subscriptions);
+                    drop(sub_index);
 
                     // Sent all events that match the filter that are stored in our database
                     self.storage
@@ -105,7 +137,19 @@ impl Relayer {
                 let _ = connection
                     .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
             }
-            Request::Close(_close) => {}
+            Request::Close(close) => {
+                connection.get_subscription_id(&*close.0).map(|id| {
+                    let mut subscriptions = self.subscriptions_ids_index.write();
+                    subscriptions.remove(&id).map(|indexes| {
+                        let mut subscriptions = self.subscriptions.write();
+                        indexes.iter().for_each(|index| {
+                            if let Some(subscriptions) = subscriptions.get_mut(&index) {
+                                subscriptions.write().remove(&id);
+                            }
+                        });
+                    });
+                });
+            }
         };
 
         Ok(Some(request))