Cesar Rodas 10 ماه پیش
والد
کامیت
19015367a7
3فایلهای تغییر یافته به همراه16 افزوده شده و 19 حذف شده
  1. 5 3
      crates/relayer/src/connection.rs
  2. 1 1
      crates/relayer/src/error.rs
  3. 10 15
      crates/relayer/src/relayer.rs

+ 5 - 3
crates/relayer/src/connection.rs

@@ -80,7 +80,7 @@ impl Connection {
                         } else {
                             continue;
                         };
-                        if let Err(err) =  writer.send(Message::Text(msg.into())).await {
+                        if let Err(err) =  writer.send(Message::Text(msg)).await {
                             log::error!("Error sending message to client: {}", err);
                             break;
                         }
@@ -104,7 +104,7 @@ impl Connection {
                                     } else {
                                         continue;
                                     };
-                                    if let Err(err) =  writer.send(Message::Text(reply.into())).await {
+                                    if let Err(err) =  writer.send(Message::Text(reply)).await {
                                         log::error!("Error sending message to client: {}", err);
                                         break;
                                     }
@@ -126,7 +126,9 @@ impl Connection {
 
     #[inline]
     pub fn send(&self, response: Response) -> Result<(), Error> {
-        Ok(self.sender.try_send(response)?)
+        self.sender
+            .try_send(response)
+            .map_err(|e| Error::TrySendError(Box::new(e)))
     }
 
     #[inline]

+ 1 - 1
crates/relayer/src/error.rs

@@ -18,5 +18,5 @@ pub enum Error {
     UnknownConnection(u128),
 
     #[error("TrySendError: {0}")]
-    TrySendError(#[from] tokio::sync::mpsc::error::TrySendError<Response>),
+    TrySendError(#[from] Box<tokio::sync::mpsc::error::TrySendError<Response>>),
 }

+ 10 - 15
crates/relayer/src/relayer.rs

@@ -6,7 +6,7 @@ use nostr_rs_types::{
     Request, Response,
 };
 use parking_lot::{RwLock, RwLockReadGuard};
-use std::{collections::HashMap, marker::PhantomData, ops::Deref, sync::Arc};
+use std::{collections::HashMap, ops::Deref, sync::Arc};
 use tokio::sync::mpsc;
 #[allow(unused_imports)]
 use tokio::{
@@ -40,13 +40,9 @@ pub struct Relayer<T: Storage> {
     clients: RwLock<HashMap<u128, Connection>>,
     #[allow(dead_code)]
     sender: Sender<(u128, Request)>,
-    _phantom: std::marker::PhantomData<&'a I>,
 }
 
-impl<T: Storage> Relayer<T>
-where
-    I: Iterator<Item = Result<Event, nostr_rs_storage::Error>>,
-{
+impl<T: Storage> Relayer<T> {
     pub fn new(storage: Option<T>) -> (Arc<Self>, Receiver<(u128, Request)>) {
         let (sender, receiver) = channel(100_000);
         (
@@ -56,7 +52,6 @@ where
                 subscriptions_ids_index: RwLock::new(HashMap::new()),
                 clients: RwLock::new(HashMap::new()),
                 sender,
-                _phantom: PhantomData,
             }),
             receiver,
         )
@@ -81,7 +76,7 @@ where
     }
 
     fn recv_request_from_client(
-        &'a self,
+        &self,
         connection: &Connection,
         request: Request,
     ) -> Result<Option<Request>, Error> {
@@ -98,7 +93,7 @@ where
                 if let Some(prev_subs) = sub_index.remove(&sub_id) {
                     // remove any previous subscriptions
                     prev_subs.iter().for_each(|index| {
-                        if let Some(subscriptions) = subscriptions.get_mut(&index) {
+                        if let Some(subscriptions) = subscriptions.get_mut(index) {
                             subscriptions.write().remove(&sub_id);
                         }
                     });
@@ -145,17 +140,17 @@ where
                     .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
             }
             Request::Close(close) => {
-                connection.get_subscription_id(&*close.0).map(|id| {
+                if let Some(id) = connection.get_subscription_id(&close.0) {
                     let mut subscriptions = self.subscriptions_ids_index.write();
-                    subscriptions.remove(&id).map(|indexes| {
+                    if let Some(indexes) = subscriptions.remove(&id) {
                         let mut subscriptions = self.subscriptions.write();
                         indexes.iter().for_each(|index| {
-                            if let Some(subscriptions) = subscriptions.get_mut(&index) {
+                            if let Some(subscriptions) = subscriptions.get_mut(index) {
                                 subscriptions.write().remove(&id);
                             }
                         });
-                    });
-                });
+                    }
+                }
             }
         };
 
@@ -163,7 +158,7 @@ where
     }
 
     pub async fn recv(
-        &'a self,
+        &self,
         receiver: &mut Receiver<(u128, Request)>,
     ) -> Result<Option<Request>, Error> {
         let (conn_id, request) = if let Some(request) = receiver.recv().await {