Browse Source

Working on personal relayer

Cesar Rodas 3 months ago
parent
commit
f8104fa512

File diff suppressed because it is too large
+ 174 - 278
Cargo.lock


+ 1 - 0
crates/client/src/client.rs

@@ -192,6 +192,7 @@ impl Client {
                                     continue;
                                 }
                             }
+                            println!("{}: Sending {:?}", url, msg);
                             if let Ok(json) = serde_json::to_string(&msg) {
                                 log::info!("{}: Sending {}", url, json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {

+ 3 - 1
crates/client/src/pool/mod.rs

@@ -115,7 +115,9 @@ impl Pool {
         &self,
         subscription: subscribe::Subscribe,
     ) -> subscription::ActiveSubscription {
-        self.subscription_manager.subcribe(subscription, None).await
+        self.subscription_manager
+            .subscribe(subscription, None)
+            .await
     }
 
     /// Sends a request to all the connected relayers

+ 6 - 7
crates/client/src/pool/subscription.rs

@@ -94,9 +94,8 @@ pub(crate) struct Scheduler {
 }
 
 /// Maximum number of subscriptions
-pub const MAX_SUBSCRIPTIONS: usize = 50;
+pub const MAX_SUBSCRIPTIONS: usize = 5;
 
-#[allow(warnings)]
 impl Scheduler {
     /// Creates a new instance
     pub fn new(all_clients: AllClients) -> Self {
@@ -176,7 +175,7 @@ impl Scheduler {
             let items = subscription_queue.len();
 
             // A subscription must be descheduled as its place is needed.
-            let mut deschedule =
+            let deschedule =
                 |subscriptions: &mut RwLockWriteGuard<
                     '_,
                     BTreeMap<PoolSubscriptionId, SubscriptionInner>,
@@ -184,7 +183,7 @@ impl Scheduler {
                  subscription_queue: &mut RwLockWriteGuard<'_, VecDeque<PoolSubscriptionId>>|
                  -> bool {
                     for subscription_id in subscription_queue.iter() {
-                        let mut subscription =
+                        let subscription =
                             if let Some(subscription) = subscriptions.get_mut(subscription_id) {
                                 subscription
                             } else {
@@ -208,7 +207,7 @@ impl Scheduler {
                     false
                 };
 
-            for _ in (0..items) {
+            for _ in 0..items {
                 let subscription_id = if let Some(subscription_id) = subscription_queue.pop_front()
                 {
                     subscription_id
@@ -266,7 +265,7 @@ impl Scheduler {
     }
 
     /// Creates a new subscription with a given filters
-    pub async fn subcribe(
+    pub async fn subscribe(
         self: &Arc<Self>,
         subscription_request: subscribe::Subscribe,
         specific_url: Option<Url>,
@@ -303,7 +302,7 @@ impl Scheduler {
         let this = self;
         tokio::spawn(async move {
             let mut subscriptions = this.subscriptions.write().await;
-            if let Some(id) = subscriptions.remove(&subscription_id) {
+            if subscriptions.remove(&subscription_id).is_some() {
                 this.active_subscription_scheduler();
                 this.total_subscriptions.fetch_sub(1, Ordering::Relaxed);
             }

+ 3 - 1
crates/personal-relayer/Cargo.toml

@@ -5,7 +5,6 @@ edition = "2021"
 
 [dependencies]
 nostr-rs-types = { path = "../types" }
-nostr-rs-memory = { path = "../storage/memory" }
 nostr-rs-storage-base = { path = "../storage/base" }
 nostr-rs-client = { path = "../client" }
 nostr-rs-relayer = { path = "../relayer" }
@@ -13,5 +12,8 @@ thiserror = "1.0.39"
 url = { version = "2.5.2", features = ["serde"] }
 futures = "0.3.30"
 tokio = { version = "1.39.2", features = ["full"] }
+nostr-rs-memory = { path = "../storage/memory" }
+serde = "1.0.208"
+serde_json = "1.0.125"
 log = "0.4.22"
 env_logger = "0.11.5"

+ 164 - 26
crates/personal-relayer/src/lib.rs

@@ -1,11 +1,25 @@
+use std::{
+    collections::{HashMap, HashSet},
+    ops::Deref,
+};
+
 use futures::future::join_all;
 use nostr_rs_client::Pool;
 use nostr_rs_relayer::Relayer;
 use nostr_rs_storage_base::Storage;
-use nostr_rs_types::types::{Addr, Filter, Id};
+use nostr_rs_types::{
+    types::{content::profile::Profile, Content, Event, Filter, Id, Kind, Tag},
+    Request,
+};
+use serde::Serialize;
 use tokio::{net::TcpListener, task::JoinHandle};
 use url::Url;
 
+//
+// g
+use tokio::fs::OpenOptions;
+use tokio::io::AsyncWriteExt;
+
 pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 
 impl From<Vec<JoinHandle<()>>> for Stoppable {
@@ -24,6 +38,14 @@ impl Drop for Stoppable {
     }
 }
 
+impl Stoppable {
+    pub async fn wait(mut self) {
+        if let Some(tasks) = self.0.take() {
+            join_all(tasks).await;
+        }
+    }
+}
+
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
     #[error("Relayer: {0}")]
@@ -33,42 +55,158 @@ pub enum Error {
     Client(#[from] nostr_rs_client::Error),
 }
 
+#[derive(Debug, Clone, Serialize)]
+pub struct Contact {
+    pub pub_key: Id,
+    pub added_by: Option<Id>,
+    pub profile: Option<Profile>,
+    pub followed_by: HashSet<Id>,
+    pub following: HashSet<Id>,
+    pub content: Vec<Event>,
+}
+
+impl Contact {
+    pub fn new(pub_key: Id, added_by: Option<Id>) -> Self {
+        Self {
+            pub_key,
+            profile: None,
+            added_by,
+            followed_by: HashSet::new(),
+            following: HashSet::new(),
+            content: Vec::new(),
+        }
+    }
+}
+
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
-    accounts: Vec<Id>,
+    accounts: HashMap<Id, Contact>,
 }
 
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
     pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
-        let pool = Pool::new_with_clients(client_urls);
-
-        join_all(
-            accounts
-                .iter()
-                .map(|account| {
-                    pool.subscribe(
-                        Filter {
-                            authors: vec![account.clone()],
-                            ..Default::default()
-                        }
-                        .into(),
-                    )
-                })
-                .collect::<Vec<_>>(),
-        )
-        .await
-        .into_iter()
-        .collect::<Result<Vec<_>, _>>()?;
+        let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
+        let relayer = Relayer::new(Some(storage), Some(pool))?;
 
         Ok(Self {
-            relayer: Relayer::new(Some(storage), Some(pool))?,
-            accounts,
+            relayer,
+            accounts: accounts
+                .into_iter()
+                .map(|a| (a.clone(), Contact::new(a, None)))
+                .collect::<HashMap<_, _>>(),
         })
     }
 
-    pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
-        let (relayer, handle) = self.relayer.main(server)?;
-        let tasks = vec![handle, tokio::spawn(async move {})];
+    pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
+        let (relayer, relayer_handler) = self.relayer.main(server)?;
+
+        let tasks = vec![
+            relayer_handler,
+            tokio::spawn(async move {
+                let mut local_connection = relayer.create_new_local_connection().await;
+                local_connection
+                    .send(Request::Request(
+                        vec![Filter {
+                            authors: self.accounts.keys().map(|x| x.clone()).collect::<Vec<_>>(),
+                            //kinds: vec![Kind::Metadata, Kind::ShortTextNote, Kind::Contacts],
+                            ..Default::default()
+                        }]
+                        .into(),
+                    ))
+                    .await
+                    .expect("Failed to send request");
+
+                loop {
+                    while let Some(res) = local_connection.recv().await {
+                        if let Some(event) = res.as_event() {
+                            match event.content() {
+                                Content::Metadata(profile) => {
+                                    self.accounts.get_mut(&event.author()).map(|account| {
+                                        account.profile = Some(profile.clone());
+                                        account.content.push(event.deref().clone());
+                                    });
+                                }
+                                Content::Contacts(_) => {
+                                    let current = event.author().clone();
+                                    let mut current_user = if let Some(current_user) =
+                                        self.accounts.remove(&current)
+                                    {
+                                        current_user
+                                    } else {
+                                        continue;
+                                    };
+
+                                    let mut ids = vec![];
+
+                                    for tag in event.tags() {
+                                        match tag {
+                                            Tag::PubKey(pub_key, relayer_url, _) => {
+                                                let followed = self
+                                                    .accounts
+                                                    .entry(pub_key.clone())
+                                                    .or_insert(Contact::new(
+                                                        pub_key.clone(),
+                                                        Some(current.clone()),
+                                                    ));
+
+                                                if let Some((Some(relayer_url), _)) = relayer_url {
+                                                    let _ = relayer
+                                                        .connect_to_relayer(relayer_url.clone())
+                                                        .await;
+                                                }
+
+                                                current_user.following.insert(pub_key.clone());
+                                                followed.followed_by.insert(current.clone());
+                                                ids.push(pub_key.clone());
+                                            }
+                                            _ => {}
+                                        }
+                                    }
+
+                                    self.accounts.insert(current, current_user);
+
+                                    let _ = local_connection
+                                        .send(Request::Request(
+                                            Filter {
+                                                authors: ids,
+                                                kinds: vec![
+                                                    Kind::Metadata,
+                                                    Kind::ShortTextNote,
+                                                    Kind::Contacts,
+                                                ],
+                                                ..Default::default()
+                                            }
+                                            .into(),
+                                        ))
+                                        .await;
+                                }
+                                Content::ShortTextNote(_) => {
+                                    self.accounts.get_mut(&event.author()).map(|account| {
+                                        account.content.push(event.deref().clone());
+                                    });
+                                }
+                                _ => {}
+                            }
+                        } else {
+                            println!("Not an event: {:?}", res);
+                        }
+
+                        let mut file = OpenOptions::new()
+                            .write(true)
+                            .create(true)
+                            .open("example.txt")
+                            .await
+                            .expect("Failed to open file");
+
+                        file.write_all(&serde_json::to_vec_pretty(&self.accounts).unwrap())
+                            .await
+                            .expect("Failed to write to file");
+
+                        file.flush().await.expect("Failed to flush file");
+                    }
+                }
+            }),
+        ];
         Ok(tasks.into())
     }
 }

+ 4 - 1
crates/relayer/src/relayer.rs

@@ -98,7 +98,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     /// Connects to the relayer pool
     pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
         let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
-        let _ = client_pool.connect_to(url).await;
+        let _ = client_pool.connect_to(url).await?;
         Ok(())
     }
 
@@ -173,6 +173,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
         let handle = tokio::spawn(async move {
             loop {
+                if receiver.len() > 500 {
+                    println!("{}", receiver.len());
+                }
                 if let Some((response, _)) = receiver.recv().await {
                     match response {
                         Response::Event(event) => {

+ 1 - 1
crates/storage/rocksdb/Cargo.toml

@@ -9,7 +9,7 @@ edition = "2021"
 nostr-rs-storage-base = { path = "../base" }
 nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
-rocksdb = { version = "0.20.1", features = [
+rocksdb = { version = "0.22.0", features = [
     "multi-threaded-cf",
     "serde",
     "snappy",

+ 2 - 1
crates/subscription-manager/src/lib.rs

@@ -76,7 +76,8 @@ where
     I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
     T: Sync + Send + 'static,
 {
-    id: I,
+    /// Subscription ID
+    pub id: I,
     manager: Option<Arc<SubscriptionManager<I, T>>>,
 }
 

Some files were not shown because too many files changed in this diff