Browse Source

First version of the personal relayer

Cesar Rodas 1 year ago
parent
commit
f0fecc5655

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+/target
+crates/storage/tests/db/

+ 2 - 0
Cargo.lock

@@ -806,6 +806,7 @@ dependencies = [
  "instant-acme",
  "log",
  "nostr-rs-client",
+ "nostr-rs-relayer",
  "nostr-rs-storage",
  "nostr-rs-types",
  "serde_json",
@@ -837,6 +838,7 @@ dependencies = [
  "nostr-rs-storage",
  "nostr-rs-types",
  "parking_lot",
+ "rand",
  "serde_json",
  "thiserror",
  "tokio",

+ 1 - 0
Cargo.toml

@@ -15,6 +15,7 @@ members = [
 nostr-rs-types = { path = "crates/types" }
 nostr-rs-client = { path = "crates/client" }
 nostr-rs-storage = { path = "crates/storage" }
+nostr-rs-relayer = { path = "crates/relayer" }
 tokio = { version = "1.26.0", features = ["full"] }
 env_logger = "0.10.0"
 serde_json = "1.0.94"

+ 25 - 2
crates/client/src/relayer.rs

@@ -3,7 +3,13 @@ use futures::Future;
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{Request, Response};
 use parking_lot::RwLock;
-use std::{pin::Pin, sync::Arc};
+use std::{
+    pin::Pin,
+    sync::{
+        atomic::{AtomicBool, Ordering::Relaxed},
+        Arc,
+    },
+};
 use tokio::{
     sync::{mpsc, oneshot},
     time::{sleep, timeout, Duration},
@@ -18,6 +24,7 @@ pub struct Relayer {
     pub url: String,
     /// Sender to the relayer. This can be used to send a Requests to this relayer
     pub send_to_socket: mpsc::Sender<Request>,
+    is_connected: Arc<AtomicBool>,
     /// This sender signals to background connection to stop
     stop_service: oneshot::Sender<()>,
 }
@@ -40,6 +47,7 @@ impl Relayer {
             + 'static,
     {
         let (send_to_socket, receiver) = mpsc::channel(100_000);
+        let is_connected = Arc::new(AtomicBool::new(false));
         let stop_service = Self::spawn_background_client(
             broadcast_to_listeners,
             sent_messages,
@@ -47,11 +55,13 @@ impl Relayer {
             receiver,
             url,
             max_connections_attempts,
+            is_connected.clone(),
             on_connection,
         )?;
 
         Ok(Self {
             url: url.to_owned(),
+            is_connected,
             send_to_socket,
             stop_service,
         })
@@ -64,6 +74,7 @@ impl Relayer {
         mut receiver: mpsc::Receiver<Request>,
         url_str: &str,
         max_connections_attempts: u16,
+        is_connected: Arc<AtomicBool>,
         on_connection: Option<F>,
     ) -> Result<oneshot::Sender<()>, Error>
     where
@@ -117,6 +128,7 @@ impl Relayer {
                             }
                         }
                         msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
+                            is_connected.store(true, Relaxed);
                             let msg = if let Ok(Some(Ok(msg))) = msg {
                                     match msg {
                                         Message::Text(text) => text,
@@ -157,6 +169,9 @@ impl Relayer {
                         }
                     }
                 }
+
+                is_connected.store(false, Relaxed);
+
                 // Throttle down to not spam the server with reconnections
                 sleep(Duration::from_millis(500)).await;
             }
@@ -167,11 +182,19 @@ impl Relayer {
         Ok(stop_service)
     }
 
-    /// Checks if the relayer background connection is running
+    /// Checks if the relayer background connection is running. It is not
+    /// guaranteed there is an active connection, it may be in the process of
+    /// reconnecting.
     pub fn is_running(&self) -> bool {
         !self.stop_service.is_closed()
     }
 
+    /// Checks if the relayer is connected. It is guaranteed that the relayer is
+    /// connected if this method returns true.
+    pub fn is_connected(&self) -> bool {
+        self.is_connected.load(Relaxed)
+    }
+
     /// Sends a requests to this relayer
     pub async fn send(&self, request: Request) -> Result<(), Error> {
         Ok(self.send_to_socket.send(request).await?)

+ 1 - 0
crates/relayer/Cargo.toml

@@ -14,3 +14,4 @@ tokio = { version = "1.26.0", features = ["sync", "macros", "rt", "time"] }
 tokio-tungstenite = { version = "0.18.0", features = ["rustls", "rustls-native-certs", "rustls-tls-native-roots"] }
 thiserror = "1.0.39"
 serde_json = "1.0.94"
+rand = "0.8.5"

+ 10 - 10
crates/relayer/src/connection.rs

@@ -1,4 +1,4 @@
-use crate::Error;
+use crate::{get_id, Error};
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{Request, Response};
 use parking_lot::RwLock;
@@ -12,7 +12,7 @@ use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 pub struct Connection {
     pub(crate) conn_id: u128,
     sender: Sender<Response>,
-    subscriptions: RwLock<HashMap<String, u128>>,
+    subscriptions: RwLock<HashMap<String, Vec<u128>>>,
 }
 
 const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
@@ -23,7 +23,7 @@ impl Connection {
         stream: TcpStream,
     ) -> Result<Self, Error> {
         let websocket = accept_async(stream).await?;
-        let conn_id = 0;
+        let conn_id = get_id();
         let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
         Self::spawn(broadcast_request, websocket, receiver, conn_id);
         Ok(Self {
@@ -69,14 +69,14 @@ impl Connection {
         Ok(self.sender.try_send(response)?)
     }
 
-    pub fn create_connection(&self, id: String) -> Result<(u128, u128, Sender<Response>), Error> {
+    pub fn create_subscription(&self, id: String) -> (u128, u128, Sender<Response>) {
         let mut subscriptions = self.subscriptions.write();
-        if subscriptions.contains_key(&id) {
-            return Err(Error::IdentifierAlreadyUsed(id));
+        let internal_id = get_id();
+        if let Some(subscriptions) = subscriptions.get_mut(&id) {
+            subscriptions.push(internal_id);
+        } else {
+            subscriptions.insert(id, vec![internal_id]);
         }
-
-        let internal_id = 0;
-        subscriptions.insert(id, internal_id);
-        Ok((self.conn_id, internal_id, self.sender.clone()))
+        (self.conn_id, internal_id, self.sender.clone())
     }
 }

+ 16 - 0
crates/relayer/src/lib.rs

@@ -1,5 +1,21 @@
+use rand::Rng;
+
 mod connection;
 mod error;
 mod relayer;
 
 pub use self::{connection::Connection, error::Error, relayer::Relayer};
+
+// Get current nanoseconds and use the last 3 digits as a random number (because
+// sometimes it comes as 0)
+pub(crate) fn get_id() -> u128 {
+    let mut rng = rand::thread_rng();
+    let random_number = rng.gen_range(0..999);
+
+    let ts = std::time::SystemTime::now()
+        .duration_since(std::time::UNIX_EPOCH)
+        .unwrap()
+        .as_nanos();
+
+    ts.checked_add(random_number).unwrap_or(ts)
+}

+ 16 - 12
crates/relayer/src/relayer.rs

@@ -6,7 +6,7 @@ use nostr_rs_types::{
     Request, Response,
 };
 use parking_lot::{RwLock, RwLockReadGuard};
-use std::{collections::HashMap, ops::Deref};
+use std::{collections::HashMap, ops::Deref, sync::Arc};
 use tokio::{
     net::TcpStream,
     sync::mpsc::{channel, Receiver, Sender},
@@ -25,20 +25,21 @@ pub struct Relayer {
     storage: RocksDb,
     subscriptions: RwLock<HashMap<SubscriptionType, RwLock<Subscriptions>>>,
     clients: RwLock<HashMap<u128, Connection>>,
-    receiver: Receiver<(u128, Request)>,
     sender: Sender<(u128, Request)>,
 }
 
 impl Relayer {
-    pub fn new(storage: RocksDb) -> Self {
+    pub fn new(storage: RocksDb) -> (Arc<Self>, Receiver<(u128, Request)>) {
         let (sender, receiver) = channel(100_000);
-        Self {
-            storage,
-            subscriptions: RwLock::new(HashMap::new()),
-            clients: RwLock::new(HashMap::new()),
+        (
+            Arc::new(Self {
+                storage,
+                subscriptions: RwLock::new(HashMap::new()),
+                clients: RwLock::new(HashMap::new()),
+                sender,
+            }),
             receiver,
-            sender,
-        }
+        )
     }
 
     pub async fn add_connection(&self, stream: TcpStream) -> Result<(), Error> {
@@ -49,8 +50,11 @@ impl Relayer {
         Ok(())
     }
 
-    pub async fn recv(&mut self) -> Result<Option<Request>, Error> {
-        let (conn_id, request) = if let Some(request) = self.receiver.recv().await {
+    pub async fn recv(
+        &self,
+        receiver: &mut Receiver<(u128, Request)>,
+    ) -> Result<Option<Request>, Error> {
+        let (conn_id, request) = if let Some(request) = receiver.recv().await {
             request
         } else {
             return Ok(None);
@@ -69,7 +73,7 @@ impl Relayer {
                 for filter in request.filters.clone().into_iter() {
                     // Create subscription
                     let (conn_id, sub_id, receiver) =
-                        connection.create_connection(request.subscription_id.deref().to_owned())?;
+                        connection.create_subscription(request.subscription_id.deref().to_owned());
                     let mut subscriptions = self.subscriptions.write();
                     Self::get_indexes_from_filter(&filter)
                         .into_iter()

+ 19 - 21
src/bin/dump.rs

@@ -36,43 +36,41 @@ async fn main() {
         .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
         .await;
     let _ = clients
-        .connect_to(
-            "wss://nostramsterdam.vpx.moe/",
-            u16::MAX,
-            Some(on_connection),
-        )
+        .connect_to("wss://brb.io", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://nos.lol", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://relay.current.fyi", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://eden.nostr.land", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://relay.snort.social", u16::MAX, Some(on_connection))
         .await;
     let db = RocksDb::new("./db").expect("db");
 
-    let mut debug = false;
-    let mut done: u128 = 0;
-
     tokio::spawn(async move {
         loop {
             clients.get_connections().iter().for_each(|relayer| {
-                if relayer.is_running() {
-                    println!("Connected to {}", relayer.url);
+                if relayer.is_connected() {
+                    log::warn!("Connected to {}", relayer.url);
                 }
             });
-            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+            tokio::time::sleep(std::time::Duration::from_secs(10)).await;
         }
     });
 
     loop {
-        if let Some((msg, relayed_by)) = receiver.recv().await {
+        if let Some((msg, _relayed_by)) = receiver.recv().await {
             match msg {
                 Response::Event(x) => {
                     let event = x.event;
-                    done += 1;
-
-                    let x = db.store(&event).expect("valid");
-                    if debug && x {
-                        println!("Realtime {} {} {:?} ", relayed_by, done, event);
-                    }
-                }
-                Response::EndOfStoredEvents(_) => {
-                    debug = true;
+                    db.store(&event).expect("valid");
                 }
+                Response::EndOfStoredEvents(_) => {}
                 msg => {
                     panic!("{:?}", msg);
                 }

+ 67 - 5
src/main.rs

@@ -1,7 +1,69 @@
-use instant_acme::{
-    Account, AuthorizationStatus, ChallengeType, Identifier, KeyAuthorization, LetsEncrypt,
-    NewAccount, NewOrder, OrderStatus,
-};
+use std::pin::Pin;
+
+use futures::Future;
+use nostr_rs_client::Relayers;
+use nostr_rs_relayer::Relayer;
+use nostr_rs_storage::RocksDb;
+use nostr_rs_types::{Request, Response};
+use tokio::{net::TcpListener, sync::mpsc};
+
+fn on_connection(
+    sent_messages: Vec<Request>,
+    socket: mpsc::Sender<Request>,
+) -> Pin<Box<dyn Future<Output = ()> + Send>> {
+    println!("Reconnecting with {:?}", sent_messages);
+    Box::pin(async move {
+        for m in sent_messages {
+            let _ = socket.send(m).await;
+        }
+    })
+}
 
 #[tokio::main]
-async fn main() {}
+async fn main() {
+    let db = RocksDb::new("./relayer-db").expect("db");
+    let (relayer, mut server_receiver) = Relayer::new(db);
+    let (mut clients, mut client_receiver) = Relayers::new();
+    let _ = clients
+        .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
+        .await
+        .expect("valid connection");
+
+    let addr = "127.0.0.1:3000";
+    let listener = TcpListener::bind(&addr).await.unwrap();
+
+    let relayer_for_recv = relayer.clone();
+    let relayer_for_client = relayer.clone();
+
+    tokio::spawn(async move {
+        loop {
+            let (event, _) = client_receiver.recv().await.unwrap();
+            match &event {
+                Response::Event(event) => {
+                    println!("Sending event: {:?}", event.event);
+                    let _ = relayer_for_client.store_and_broadcast(&event.event);
+                }
+                msg => println!("recv: {:?}", msg),
+            }
+        }
+    });
+
+    tokio::spawn(async move {
+        loop {
+            let x = relayer_for_recv.recv(&mut server_receiver).await.unwrap();
+            println!("Received: {:?}", x);
+            if let Some(x) = x {
+                clients.send(x).await;
+            }
+        }
+    });
+
+    loop {
+        let (stream, _) = listener.accept().await.unwrap();
+        let addr = stream.peer_addr().unwrap();
+
+        println!("Client {} connected", addr);
+
+        let _ = relayer.add_connection(stream).await;
+    }
+}