Browse Source

Keep track of local events.

Local events are special, because they need to be broadcasted often, so
these events can be broadcasted and be available to people who cannot
connect to this relayer.
Cesar Rodas 1 năm trước cách đây
mục cha
commit
eff75d3bbd

+ 1 - 0
Cargo.lock

@@ -835,6 +835,7 @@ name = "nostr-rs-relayer"
 version = "0.1.0"
 dependencies = [
  "futures-util",
+ "log",
  "nostr-rs-storage",
  "nostr-rs-types",
  "parking_lot",

+ 7 - 6
crates/client/src/relayer.rs

@@ -95,12 +95,13 @@ impl Relayer {
             while reconnect && connection_attempts <= max_connections_attempts {
                 log::warn!("{}: Connect attempt {}", url, connection_attempts);
                 connection_attempts += 1;
-                let mut socket = if let Ok(x) = connect_async(url_parsed.clone()).await {
-                    x.0
-                } else {
-                    log::warn!("{}: Failed to connect", url);
-                    sleep(Duration::from_secs(5)).await;
-                    continue;
+                let mut socket = match connect_async(url_parsed.clone()).await {
+                    Ok(x) => x.0,
+                    Err(err) => {
+                        log::warn!("{}: Failed to connect: {}", url, err);
+                        sleep(Duration::from_secs(5)).await;
+                        continue;
+                    }
                 };
 
                 log::info!("Connected to {}", url);

+ 3 - 3
crates/client/src/relayers.rs

@@ -58,12 +58,12 @@ impl Relayers {
 
     /// Sends a request to all the connected relayers
     pub async fn send(&self, request: Request) {
-        // Add a copy of the request for incoming connections.
-        self.messages.write().push(request.clone());
-
         for (_, sender) in self.clients.iter() {
             let _ = sender.send(request.clone()).await;
         }
+
+        // Add a copy of the request for incoming connections.
+        self.messages.write().push(request);
     }
 
     /// Creates a connection to a new relayer. If the connection is successful a

+ 1 - 0
crates/relayer/Cargo.toml

@@ -15,3 +15,4 @@ tokio-tungstenite = { version = "0.18.0", features = ["rustls", "rustls-native-c
 thiserror = "1.0.39"
 serde_json = "1.0.94"
 rand = "0.8.5"
+log = "0.4.17"

+ 9 - 2
crates/relayer/src/connection.rs

@@ -45,8 +45,15 @@ impl Connection {
             loop {
                 tokio::select! {
                     Some(msg) = receiver.recv() => {
-                        let msg = serde_json::to_string(&msg).unwrap();
-                        writer.send(Message::Text(msg.into())).await.unwrap();
+                        let msg = if let Ok(msg) = serde_json::to_string(&msg) {
+                            msg
+                        } else {
+                            continue;
+                        };
+                        if let Err(err) =  writer.send(Message::Text(msg.into())).await {
+                            log::error!("Error sending message to client: {}", err);
+                            break;
+                        }
                     }
                     Some(msg) = reader.next() => {
                         if let Ok(Message::Text(msg)) = msg {

+ 18 - 1
crates/relayer/src/relayer.rs

@@ -57,6 +57,11 @@ impl Relayer {
         )
     }
 
+    /// Returns a reference to the internal database
+    pub fn get_db(&self) -> &RocksDb {
+        &self.storage
+    }
+
     pub async fn add_connection(&self, stream: TcpStream) -> Result<(), Error> {
         let client = Connection::new(self.sender.clone(), stream).await?;
         let mut clients = self.clients.write();
@@ -82,7 +87,7 @@ impl Relayer {
 
         match &request {
             Request::Event(event) => {
-                self.store_and_broadcast(event.deref());
+                self.store_and_broadcast_local_event(event.deref());
             }
             Request::Request(request) => {
                 for filter in request.filters.clone().into_iter() {
@@ -238,6 +243,18 @@ impl Relayer {
     }
 
     #[inline]
+    pub fn store_and_broadcast_local_event(&self, event: &Event) {
+        let _ = self.storage.store_local_event(event);
+        let subscriptions = self.subscriptions.read();
+
+        for subscription_type in Self::get_possible_listeners_from_event(event) {
+            if let Some(subscribers) = subscriptions.get(&subscription_type) {
+                Self::broadcast_to_subscribers(subscribers.read(), event);
+            }
+        }
+    }
+
+    #[inline]
     pub fn store_and_broadcast(&self, event: &Event) {
         let _ = self.storage.store(event);
         let subscriptions = self.subscriptions.read();

+ 46 - 1
crates/storage/src/rocksdb.rs

@@ -2,7 +2,10 @@
 use crate::Error;
 use nostr_rs_types::types::{Event, Filter, Tag};
 use rand::Rng;
-use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, Options, SliceTransform, WriteBatch, DB};
+use rocksdb::{
+    BoundColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, SliceTransform, WriteBatch,
+    DB,
+};
 use std::{collections::HashSet, path::Path, sync::Arc};
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
@@ -12,6 +15,7 @@ enum ReferenceType {
     RefPublicKey,
     RefEvent,
     Kind,
+    LocalEvents,
 }
 
 impl ReferenceType {
@@ -23,6 +27,7 @@ impl ReferenceType {
             Self::RefPublicKey => "refs_by_public_key",
             Self::RefEvent => "refs_by_ids",
             Self::Kind => "kinds",
+            Self::LocalEvents => "local",
         }
     }
 }
@@ -58,6 +63,7 @@ impl RocksDb {
                 ColumnFamilyDescriptor::new(ReferenceType::RefEvent.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::RefPublicKey.as_str(), options.clone()),
                 ColumnFamilyDescriptor::new(ReferenceType::Kind.as_str(), options.clone()),
+                ColumnFamilyDescriptor::new(ReferenceType::LocalEvents.as_str(), options.clone()),
             ],
         )?;
         Ok(Self { db })
@@ -110,6 +116,45 @@ impl RocksDb {
             .ok_or(Error::InvalidColumnFamily)
     }
 
+    /// Stores an event, similar to store(Event), but keeps track of this event in a
+    /// local index. This is useful to keep track of the events that are created by
+    /// this node, to be broadcasted to new nodes in the future
+    pub fn store_local_event(&self, event: &Event) -> Result<bool, Error> {
+        let ret = self.store(event)?;
+
+        self.db.put_cf(
+            &self.reference_to_cf_handle(ReferenceType::LocalEvents)?,
+            *(event.id),
+            &[],
+        )?;
+
+        Ok(ret)
+    }
+
+    /// Return a vector of all local events
+    pub fn get_local_events(&self) -> Result<Vec<Event>, Error> {
+        let cf_handle = self.reference_to_cf_handle(ReferenceType::LocalEvents)?;
+        Ok(self
+            .db
+            .iterator_cf(&cf_handle, IteratorMode::Start)
+            .into_iter()
+            .map(|res| {
+                if let Ok((key, _)) = res {
+                    if let Ok(Some(event)) = self.get_event(&key) {
+                        Some(event)
+                    } else {
+                        None
+                    }
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<Option<Event>>>()
+            .into_iter()
+            .filter_map(|x| x)
+            .collect())
+    }
+
     /// Stores a new event into the database. This action will also creates all
     /// the needed indexes to find this event later by reference, author, kind or tag.
     pub fn store(&self, event: &Event) -> Result<bool, Error> {

+ 6 - 0
crates/types/src/client/event.rs

@@ -15,6 +15,12 @@ impl Deref for Event {
     }
 }
 
+impl From<types::Event> for Event {
+    fn from(event: types::Event) -> Self {
+        Self(event)
+    }
+}
+
 impl SerializeDeserialize for Event {
     fn get_tag() -> &'static str {
         "EVENT"

+ 6 - 0
crates/types/src/request.rs

@@ -52,6 +52,12 @@ impl Request {
     }
 }
 
+impl From<types::Event> for Request {
+    fn from(event: types::Event) -> Self {
+        Self::Event(event.into())
+    }
+}
+
 impl ser::Serialize for Request {
     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     where

+ 1 - 1
crates/types/src/types/event.rs

@@ -225,7 +225,7 @@ mod test {
         let json = "{\"content\":\"{\\\"lud06\\\":\\\"lnbc1p3a4wxvpp5x0pa6gr55fq5s9d3dxs0vz77mqxgdw63hhtgtlfz5zvm65847vnqdqqcqpjsp5402c8rtqxd4j97rnvuejuwl4sg473g6wg08d67fvn7qc4gtpkfks9q7sqqqqqqqqqqqqqqqqqqqsqqqqqysgqmqz9gxqyjw5qrzjqwryaup9lh50kkranzgcdnn2fgvx390wgj5jd07rwr3vxeje0glclleasn65surjcsqqqqlgqqqqqeqqjqyxj968tem9ps6ttm9ukv6ag4yc6qmgj2svrccfgp4n83fpktr3dsx6fq7grfzlqt982aaemahg9q29vzl9f627kh4j8h8xc2z2mtpdqqjlekah\\\",\\\"website\\\":\\\"\\\",\\\"nip05\\\":\\\"cesar@cesar.com.py\\\",\\\"picture\\\":\\\"https://pbs.twimg.com/profile_images/1175432935337537536/_Peu9vuJ_400x400.jpg\\\",\\\"display_name\\\":\\\"C\\\",\\\"about\\\":\\\"Rust and PHP\\\",\\\"name\\\":\\\"c\\\"}\",\"created_at\":1678476588,\"id\":\"3800c787a23288641c0b96cbcc87c26cbd3ea7bee53b7748422fdb100fb7b9f0\",\"kind\":0,\"pubkey\":\"b2815682cfc83fcd2c3add05785cf4573dd388457069974cc6d8cca06b3c3b78\",\"sig\":\"c8a12ce96833e4cd67bce0e9e50f831262ef0f0c0cff5e56c38a0c90867ed1a6621e9692948ef5e85a7ca3726c3f0f43fa7e1992536bc457317123bca8784f5f\",\"tags\":[]}";
 
         let obj: Event = serde_json::from_str(json).expect("valid");
-        obj.is_valid().unwrap();
+        obj.is_valid().expect("is valid");
         assert!(obj.is_valid().is_ok());
     }
 }

+ 50 - 18
src/main.rs

@@ -1,49 +1,81 @@
-use std::pin::Pin;
-
 use futures::Future;
-use nostr_rs_client::Relayers;
-use nostr_rs_relayer::Relayer;
 use nostr_rs_storage::RocksDb;
 use nostr_rs_types::{Request, Response};
-use tokio::{net::TcpListener, sync::mpsc};
+use std::{pin::Pin, sync::Arc};
+use tokio::{
+    net::TcpListener,
+    sync::mpsc,
+    time::{sleep, Duration},
+};
 
 fn on_connection(
-    sent_messages: Vec<Request>,
-    socket: mpsc::Sender<Request>,
+    _sent_messages: Vec<Request>,
+    _socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
-    println!("Reconnecting with {:?}", sent_messages);
-    Box::pin(async move {
-        for m in sent_messages {
-            let _ = socket.send(m).await;
-        }
-    })
+    Box::pin(async move {})
 }
 
 #[tokio::main]
 async fn main() {
+    env_logger::init();
     let db = RocksDb::new("./relayer-db").expect("db");
-    let (relayer, mut server_receiver) = Relayer::new(db);
-    let (mut clients, mut client_receiver) = Relayers::new();
+    let (relayer, mut server_receiver) = nostr_rs_relayer::Relayer::new(db);
+    let (mut clients, mut client_receiver) = nostr_rs_client::Relayers::new();
     let _ = clients
         .connect_to("wss://relay.damus.io/", u16::MAX, Some(on_connection))
         .await
         .expect("valid connection");
+    let _ = clients
+        .connect_to("wss://brb.io", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://nos.lol", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://relay.current.fyi", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://eden.nostr.land", u16::MAX, Some(on_connection))
+        .await;
+    let _ = clients
+        .connect_to("wss://relay.snort.social", u16::MAX, Some(on_connection))
+        .await;
 
     let addr = "127.0.0.1:3000";
     let listener = TcpListener::bind(&addr).await.unwrap();
 
     let relayer_for_recv = relayer.clone();
     let relayer_for_client = relayer.clone();
+    let relayer_for_republisher = relayer.clone();
+
+    let clients = Arc::new(clients);
+    let clients_for_republisher = clients.clone();
+
+    tokio::spawn(async move {
+        loop {
+            if let Ok(events) = relayer_for_republisher.get_db().get_local_events() {
+                println!("Rebroadcast: {:?} ", events);
+                for event in events.into_iter() {
+                    println!("Rebroadcasting: {}", serde_json::to_string(&event).unwrap());
+                    let _ = clients_for_republisher.send(event.into()).await;
+                }
+            }
+            sleep(Duration::from_secs(360)).await;
+        }
+    });
 
     tokio::spawn(async move {
         loop {
             let (event, _) = client_receiver.recv().await.unwrap();
             match &event {
                 Response::Event(event) => {
-                    println!("Sending event: {:?}", event.event);
+                    //println!("Sending event: {:?}", event.event);
                     let _ = relayer_for_client.store_and_broadcast(&event.event);
                 }
-                msg => println!("recv: {:?}", msg),
+                _msg => {
+                    //println!("Sending message: {:?}", msg);
+                    //let _ = relayer_for_client.send(msg).await;
+                }
             }
         }
     });
@@ -51,7 +83,7 @@ async fn main() {
     tokio::spawn(async move {
         loop {
             let x = relayer_for_recv.recv(&mut server_receiver).await.unwrap();
-            println!("Received: {:?}", x);
+            //println!("Received: {:?}", x);
             if let Some(x) = x {
                 clients.send(x).await;
             }