소스 검색

Improved relayer

Cesar Rodas 1 년 전
부모
커밋
0a702b5d26
3개의 변경된 파일41개의 추가작업 그리고 22개의 파일을 삭제
  1. 4 4
      crates/client/src/relayer.rs
  2. 14 11
      crates/relayer/src/connection.rs
  3. 23 7
      crates/relayer/src/relayer.rs

+ 4 - 4
crates/client/src/relayer.rs

@@ -27,7 +27,7 @@ const NO_ACTIVITY_TIMEOUT_SECS: u64 = 120;
 impl Relayer {
     /// Creates a new relayer
     pub fn new<F>(
-        publish_to_listeners: mpsc::Sender<(Response, String)>,
+        broadcast_to_listeners: mpsc::Sender<(Response, String)>,
         sent_messages: Arc<RwLock<Vec<Request>>>,
         connection_retries: u16,
         url: &str,
@@ -41,7 +41,7 @@ impl Relayer {
     {
         let (send_to_socket, receiver) = mpsc::channel(100_000);
         let stop_service = Self::spawn_background_client(
-            publish_to_listeners,
+            broadcast_to_listeners,
             sent_messages,
             send_to_socket.clone(),
             receiver,
@@ -58,7 +58,7 @@ impl Relayer {
     }
 
     fn spawn_background_client<F>(
-        publish_to_listeners: mpsc::Sender<(Response, String)>,
+        broadcast_to_listeners: mpsc::Sender<(Response, String)>,
         sent_messages: Arc<RwLock<Vec<Request>>>,
         send_to_socket: mpsc::Sender<Request>,
         mut receiver: mpsc::Receiver<Request>,
@@ -139,7 +139,7 @@ impl Relayer {
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = publish_to_listeners.send((msg, url.to_owned())).await {
+                                if let Err(error) = broadcast_to_listeners.send((msg, url.to_owned())).await {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }

+ 14 - 11
crates/relayer/src/connection.rs

@@ -18,11 +18,14 @@ pub struct Connection {
 const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
 
 impl Connection {
-    pub async fn new(stream: TcpStream) -> Result<Self, Error> {
+    pub async fn new(
+        broadcast_request: Sender<(u128, Request)>,
+        stream: TcpStream,
+    ) -> Result<Self, Error> {
         let websocket = accept_async(stream).await?;
         let conn_id = 0;
         let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
-        Self::spawn(websocket, receiver, conn_id);
+        Self::spawn(broadcast_request, websocket, receiver, conn_id);
         Ok(Self {
             conn_id,
             sender,
@@ -31,26 +34,26 @@ impl Connection {
     }
 
     pub fn spawn(
+        broadcast_request: Sender<(u128, Request)>,
         websocket: WebSocketStream<TcpStream>,
         mut receiver: Receiver<Response>,
-        _conn_id: u128,
+        conn_id: u128,
     ) {
         tokio::spawn(async move {
             let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
-            let (mut _writer, mut reader) = websocket.split();
+            let (mut writer, mut reader) = websocket.split();
             loop {
                 tokio::select! {
                     Some(msg) = receiver.recv() => {
                         let msg = serde_json::to_string(&msg).unwrap();
-                        _writer.send(Message::Text(msg.into())).await.unwrap();
+                        writer.send(Message::Text(msg.into())).await.unwrap();
                     }
                     Some(msg) = reader.next() => {
-                        if let Ok(msg) = msg {
-                        let msg: Result<Request, _> = serde_json::from_str(&msg.to_string());
-                        if let Ok(_msg) = msg {
-                            todo!();
-                        }
-
+                        if let Ok(Message::Text(msg)) = msg {
+                            let msg: Result<Request, _> = serde_json::from_str(&msg);
+                            if let Ok(msg) = msg {
+                                let _ = broadcast_request.send((conn_id, msg)).await;
+                            }
                         }
                     }
                     else => {

+ 23 - 7
crates/relayer/src/relayer.rs

@@ -7,7 +7,10 @@ use nostr_rs_types::{
 };
 use parking_lot::{RwLock, RwLockReadGuard};
 use std::{collections::HashMap, ops::Deref};
-use tokio::{net::TcpStream, sync::mpsc::Sender};
+use tokio::{
+    net::TcpStream,
+    sync::mpsc::{channel, Receiver, Sender},
+};
 
 #[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
 pub struct SubscriptionType {
@@ -22,38 +25,48 @@ pub struct Relayer {
     storage: RocksDb,
     subscriptions: RwLock<HashMap<SubscriptionType, RwLock<Subscriptions>>>,
     clients: RwLock<HashMap<u128, Connection>>,
+    receiver: Receiver<(u128, Request)>,
+    sender: Sender<(u128, Request)>,
 }
 
 impl Relayer {
     pub fn new(storage: RocksDb) -> Self {
+        let (sender, receiver) = channel(100_000);
         Self {
             storage,
             subscriptions: RwLock::new(HashMap::new()),
             clients: RwLock::new(HashMap::new()),
+            receiver,
+            sender,
         }
     }
 
     pub async fn add_connection(&self, stream: TcpStream) -> Result<(), Error> {
-        let client = Connection::new(stream).await?;
+        let client = Connection::new(self.sender.clone(), stream).await?;
         let mut clients = self.clients.write();
         clients.insert(client.conn_id, client);
 
         Ok(())
     }
 
-    pub async fn recv(&self, conn_id: u128, request: Request) -> Result<(), Error> {
-        let connections = self.clients.read();
+    pub async fn recv(&mut self) -> Result<Option<Request>, Error> {
+        let (conn_id, request) = if let Some(request) = self.receiver.recv().await {
+            request
+        } else {
+            return Ok(None);
+        };
 
+        let connections = self.clients.read();
         let connection = connections
             .get(&conn_id)
             .ok_or(Error::UnknownConnection(conn_id))?;
 
-        match request {
+        match &request {
             Request::Event(event) => {
                 self.store_and_broadcast(event.deref());
             }
             Request::Request(request) => {
-                for filter in request.filters.into_iter() {
+                for filter in request.filters.clone().into_iter() {
                     // Create subscription
                     let (conn_id, sub_id, receiver) =
                         connection.create_connection(request.subscription_id.deref().to_owned())?;
@@ -85,10 +98,13 @@ impl Relayer {
                             );
                         });
                 }
+                let _ = connection
+                    .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
             }
             Request::Close(_close) => {}
         };
-        Ok(())
+
+        Ok(Some(request))
     }
 
     #[inline]