Parcourir la source

Working on relayer

Cesar Rodas il y a 1 an
Parent
commit
b592f6e750

+ 3 - 0
Cargo.lock

@@ -833,11 +833,14 @@ dependencies = [
 name = "nostr-rs-relayer"
 version = "0.1.0"
 dependencies = [
+ "futures-util",
  "nostr-rs-storage",
  "nostr-rs-types",
  "parking_lot",
+ "serde_json",
  "thiserror",
  "tokio",
+ "tokio-tungstenite",
 ]
 
 [[package]]

+ 4 - 1
crates/relayer/Cargo.toml

@@ -6,8 +6,11 @@ edition = "2021"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-thiserror = "1.0.40"
 nostr-rs-types = { path = "../types" }
 nostr-rs-storage = { path = "../storage" }
+futures-util = "0.3.27"
 parking_lot = "0.12.1"
 tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
+tokio-tungstenite = { version = "0.18.0", features = ["rustls", "rustls-native-certs", "rustls-tls-native-roots"] }
+thiserror = "1.0.39"
+serde_json = "1.0.94"

+ 78 - 0
crates/relayer/src/connection.rs

@@ -0,0 +1,78 @@
+use crate::Error;
+use futures_util::{SinkExt, StreamExt};
+use nostr_rs_types::{Request, Response};
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use tokio::{
+    net::TcpStream,
+    sync::mpsc::{channel, Receiver, Sender},
+};
+use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
+
+pub struct Connection {
+    pub(crate) conn_id: u128,
+    sender: Sender<Response>,
+    subscriptions: RwLock<HashMap<String, u128>>,
+}
+
+const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
+
+impl Connection {
+    pub async fn new(stream: TcpStream) -> Result<Self, Error> {
+        let websocket = accept_async(stream).await?;
+        let conn_id = 0;
+        let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
+        Self::spawn(websocket, receiver, conn_id);
+        Ok(Self {
+            conn_id,
+            sender,
+            subscriptions: RwLock::new(HashMap::new()),
+        })
+    }
+
+    pub fn spawn(
+        websocket: WebSocketStream<TcpStream>,
+        mut receiver: Receiver<Response>,
+        _conn_id: u128,
+    ) {
+        tokio::spawn(async move {
+            let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
+            let (mut _writer, mut reader) = websocket.split();
+            loop {
+                tokio::select! {
+                    Some(msg) = receiver.recv() => {
+                        let msg = serde_json::to_string(&msg).unwrap();
+                        _writer.send(Message::Text(msg.into())).await.unwrap();
+                    }
+                    Some(msg) = reader.next() => {
+                        if let Ok(msg) = msg {
+                        let msg: Result<Request, _> = serde_json::from_str(&msg.to_string());
+                        if let Ok(_msg) = msg {
+                            todo!();
+                        }
+
+                        }
+                    }
+                    else => {
+                        break;
+                    }
+                }
+            }
+        });
+    }
+
+    pub async fn send(&self, _response: Response) -> Result<(), Error> {
+        Ok(())
+    }
+
+    pub fn create_connection(&self, id: String) -> Result<(u128, u128, Sender<Response>), Error> {
+        let mut subscriptions = self.subscriptions.write();
+        if subscriptions.contains_key(&id) {
+            return Err(Error::IdentifierAlreadyUsed(id));
+        }
+
+        let internal_id = 0;
+        subscriptions.insert(id, internal_id);
+        Ok((self.conn_id, internal_id, self.sender.clone()))
+    }
+}

+ 17 - 0
crates/relayer/src/error.rs

@@ -0,0 +1,17 @@
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("The identifier {0} is already in use")]
+    IdentifierAlreadyUsed(String),
+
+    #[error("Internal/DB: {0}")]
+    Db(#[from] nostr_rs_storage::Error),
+
+    #[error("WebSocket error: {0}")]
+    WebSocket(#[from] tokio_tungstenite::tungstenite::Error),
+
+    #[error("Serialization: {0}")]
+    Serde(#[from] serde_json::Error),
+
+    #[error("Unknown connection: {0}")]
+    UnknownConnection(u128),
+}

+ 4 - 79
crates/relayer/src/lib.rs

@@ -1,80 +1,5 @@
-use nostr_rs_storage::RocksDb;
-use nostr_rs_types::{client::Event, Request};
-use parking_lot::{RwLock, RwLockReadGuard};
-use std::collections::HashMap;
-use tokio::sync::mpsc::Sender;
+mod connection;
+mod error;
+mod relayer;
 
-#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
-pub struct SubscriptionType {
-    public_key: Option<Vec<u8>>,
-    id: Option<Vec<u8>>,
-    kind: Option<u32>,
-}
-
-type Subscriptions = HashMap<u128, Sender<Event>>;
-
-pub struct Relayer {
-    storage: RocksDb,
-    subscriptions: RwLock<HashMap<SubscriptionType, RwLock<Subscriptions>>>,
-}
-
-impl Relayer {
-    pub fn new(storage: RocksDb) -> Self {
-        Self {
-            storage,
-            subscriptions: RwLock::new(HashMap::new()),
-        }
-    }
-
-    pub async fn recv(&self, request: Request) {
-        match request {
-            Request::Event(event) => {
-                let _ = self.storage.store(&event);
-                self.broadcast(event).await;
-            }
-            Request::Request(subscribe) => {}
-            Request::Close(close) => {}
-            _ => {}
-        }
-    }
-
-    #[inline]
-    fn get_possible_listeners_from_event(event: &Event) -> Vec<SubscriptionType> {
-        let kind = event.kind().into();
-        let public_keys = [None, Some(event.author().as_ref().to_vec())];
-        let id = [None, Some(event.id.as_ref().to_vec())];
-        let kind = [None, Some(kind)];
-        let mut subs = vec![];
-
-        for public_key in public_keys.iter() {
-            for id in id.iter() {
-                for kind in kind.iter() {
-                    subs.push(SubscriptionType {
-                        public_key: public_key.clone(),
-                        id: id.clone(),
-                        kind: *kind,
-                    });
-                }
-            }
-        }
-
-        subs
-    }
-
-    #[inline]
-    fn broadcast_to_subscribers(subscriptions: RwLockReadGuard<Subscriptions>, event: &Event) {
-        for (_, receiver) in subscriptions.iter() {
-            let _ = receiver.try_send(event.clone());
-        }
-    }
-
-    async fn broadcast(&self, event: Event) {
-        let subscriptions = self.subscriptions.read();
-
-        for subscription_type in Self::get_possible_listeners_from_event(&event) {
-            if let Some(subscribers) = subscriptions.get(&subscription_type) {
-                Self::broadcast_to_subscribers(subscribers.read(), &event);
-            }
-        }
-    }
-}
+pub use self::{connection::Connection, error::Error, relayer::Relayer};

+ 187 - 0
crates/relayer/src/relayer.rs

@@ -0,0 +1,187 @@
+use crate::{Connection, Error};
+use nostr_rs_storage::RocksDb;
+use nostr_rs_types::{
+    relayer,
+    types::{Event, Filter, SubscriptionId},
+    Request, Response,
+};
+use parking_lot::{RwLock, RwLockReadGuard};
+use std::{collections::HashMap, ops::Deref};
+use tokio::{net::TcpStream, sync::mpsc::Sender};
+
+#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
+pub struct SubscriptionType {
+    public_key: Option<Vec<u8>>,
+    id: Option<Vec<u8>>,
+    kind: Option<u32>,
+}
+
+type Subscriptions = HashMap<(u128, u128), (SubscriptionId, Sender<Response>)>;
+
+pub struct Relayer {
+    storage: RocksDb,
+    subscriptions: RwLock<HashMap<SubscriptionType, RwLock<Subscriptions>>>,
+    clients: RwLock<HashMap<u128, Connection>>,
+}
+
+impl Relayer {
+    pub fn new(storage: RocksDb) -> Self {
+        Self {
+            storage,
+            subscriptions: RwLock::new(HashMap::new()),
+            clients: RwLock::new(HashMap::new()),
+        }
+    }
+
+    pub async fn add_connection(&self, stream: TcpStream) -> Result<(), Error> {
+        let client = Connection::new(stream).await?;
+        let mut clients = self.clients.write();
+        clients.insert(client.conn_id, client);
+
+        Ok(())
+    }
+
+    pub async fn recv(&self, conn_id: u128, request: Request) -> Result<(), Error> {
+        let connections = self.clients.read();
+
+        let connection = connections
+            .get(&conn_id)
+            .ok_or(Error::UnknownConnection(conn_id))?;
+
+        match request {
+            Request::Event(event) => {
+                self.store_and_broadcast(event.deref());
+            }
+            Request::Request(request) => {
+                for filter in request.filters.into_iter() {
+                    // Create subscription
+                    let (conn_id, sub_id, receiver) =
+                        connection.create_connection(request.subscription_id.deref().to_owned())?;
+                    let mut subscriptions = self.subscriptions.write();
+                    Self::get_indexes_from_filter(&filter)
+                        .into_iter()
+                        .for_each(|index| {
+                            subscriptions
+                                .entry(index)
+                                .or_insert_with(|| RwLock::new(HashMap::new()))
+                                .write()
+                                .insert(
+                                    (conn_id, sub_id),
+                                    (request.subscription_id.clone(), receiver.clone()),
+                                );
+                        });
+
+                    // Sent all events that match the filter that are stored in our database
+                    self.storage
+                        .get_by_filter(filter)?
+                        .into_iter()
+                        .for_each(|event| {
+                            let _ = connection.send(
+                                relayer::Event {
+                                    subscription_id: request.subscription_id.clone(),
+                                    event,
+                                }
+                                .into(),
+                            );
+                        });
+                }
+            }
+            Request::Close(_close) => {}
+        };
+        Ok(())
+    }
+
+    #[inline]
+    fn get_indexes_from_filter(filter: &Filter) -> Vec<SubscriptionType> {
+        let public_keys = if filter.references_to_public_key.is_empty() {
+            vec![None]
+        } else {
+            filter
+                .references_to_public_key
+                .iter()
+                .map(|public_key| Some((*public_key).to_vec()))
+                .collect()
+        };
+        let id = if filter.references_to_event.is_empty() {
+            vec![None]
+        } else {
+            filter
+                .references_to_event
+                .iter()
+                .map(|id| Some((*id).to_vec()))
+                .collect()
+        };
+        let kind = if filter.kinds.is_empty() {
+            vec![None]
+        } else {
+            filter
+                .kinds
+                .iter()
+                .map(|kind| Some((*kind).into()))
+                .collect()
+        };
+        let mut subs = vec![];
+
+        for public_key in public_keys.iter() {
+            for id in id.iter() {
+                for kind in kind.iter() {
+                    subs.push(SubscriptionType {
+                        public_key: public_key.clone(),
+                        id: id.clone(),
+                        kind: *kind,
+                    });
+                }
+            }
+        }
+
+        subs
+    }
+
+    #[inline]
+    fn get_possible_listeners_from_event(event: &Event) -> Vec<SubscriptionType> {
+        let kind = event.kind().into();
+        let public_keys = [None, Some(event.author().as_ref().to_vec())];
+        let id = [None, Some(event.id.as_ref().to_vec())];
+        let kind = [None, Some(kind)];
+        let mut subs = vec![];
+
+        for public_key in public_keys.iter() {
+            for id in id.iter() {
+                for kind in kind.iter() {
+                    subs.push(SubscriptionType {
+                        public_key: public_key.clone(),
+                        id: id.clone(),
+                        kind: *kind,
+                    });
+                }
+            }
+        }
+
+        subs
+    }
+
+    #[inline]
+    fn broadcast_to_subscribers(subscriptions: RwLockReadGuard<Subscriptions>, event: &Event) {
+        for (_, receiver) in subscriptions.iter() {
+            let _ = receiver.1.try_send(
+                relayer::Event {
+                    subscription_id: receiver.0.clone(),
+                    event: event.clone(),
+                }
+                .into(),
+            );
+        }
+    }
+
+    #[inline]
+    pub fn store_and_broadcast(&self, event: &Event) {
+        let _ = self.storage.store(event);
+        let subscriptions = self.subscriptions.read();
+
+        for subscription_type in Self::get_possible_listeners_from_event(event) {
+            if let Some(subscribers) = subscriptions.get(&subscription_type) {
+                Self::broadcast_to_subscribers(subscribers.read(), event);
+            }
+        }
+    }
+}