Răsfoiți Sursa

Working on personal relayer

Cesar Rodas 3 luni în urmă
părinte
comite
55dc3d880a

Fișier diff suprimat deoarece este prea mare
+ 193 - 267
Cargo.lock


+ 9 - 1
Cargo.toml

@@ -9,7 +9,12 @@ members = [
     "crates/client",
     "crates/relayer",
     "crates/storage/base",
-    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer", "crates/subscription-manager", "crates/storage/sqlite",
+    "crates/storage/rocksdb",
+    "crates/dump",
+    "crates/storage/memory",
+    "crates/personal-relayer",
+    "crates/subscription-manager",
+    "crates/storage/sqlite",
 ]
 
 [dependencies]
@@ -29,3 +34,6 @@ serde = { version = "1.0.183", features = ["derive"] }
 toml = "0.7.6"
 serde_json = "1.0.105"
 url = { version = "2.5.2", features = ["serde"] }
+sqlformat = "=0.2.6"
+
+

+ 4 - 1
crates/dump/src/main.rs

@@ -22,7 +22,10 @@ async fn main() {
         Url::parse("wss://relay.snort.social").expect("valid url"),
     ]);
 
-    let _ = clients.subscribe(Subscribe::default().into()).await;
+    let _ = clients
+        .subscribe(Subscribe::default().into())
+        .await
+        .expect("v");
 
     loop {
         if let Some((msg, relayed_by)) = clients.recv().await {

+ 3 - 1
crates/personal-relayer/Cargo.toml

@@ -5,13 +5,15 @@ edition = "2021"
 
 [dependencies]
 nostr-rs-types = { path = "../types" }
-nostr-rs-memory = { path = "../storage/memory" }
 nostr-rs-storage-base = { path = "../storage/base" }
+nostr-rs-sqlite = { path = "../storage/sqlite" }
 nostr-rs-client = { path = "../client" }
 nostr-rs-relayer = { path = "../relayer" }
 thiserror = "1.0.39"
 url = { version = "2.5.2", features = ["serde"] }
 futures = "0.3.30"
 tokio = { version = "1.39.2", features = ["full"] }
+serde = "1.0.208"
+serde_json = "1.0.125"
 log = "0.4.22"
 env_logger = "0.11.5"

+ 85 - 26
crates/personal-relayer/src/lib.rs

@@ -1,11 +1,18 @@
+use account::GraphManager;
 use futures::future::join_all;
 use nostr_rs_client::Pool;
 use nostr_rs_relayer::Relayer;
 use nostr_rs_storage_base::Storage;
-use nostr_rs_types::types::{Addr, Filter, Id};
+use nostr_rs_types::{
+    types::{tag::TagType, Content, Id},
+    Response,
+};
+use std::{collections::HashSet, ops::Deref};
 use tokio::{net::TcpListener, task::JoinHandle};
 use url::Url;
 
+mod account;
+
 pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 
 impl From<Vec<JoinHandle<()>>> for Stoppable {
@@ -24,6 +31,14 @@ impl Drop for Stoppable {
     }
 }
 
+impl Stoppable {
+    pub async fn wait(mut self) {
+        if let Some(tasks) = self.0.take() {
+            join_all(tasks).await;
+        }
+    }
+}
+
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
     #[error("Relayer: {0}")]
@@ -35,40 +50,84 @@ pub enum Error {
 
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
-    accounts: Vec<Id>,
+    accounts: GraphManager,
 }
 
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
     pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
-        let (pool, _) = Pool::new_with_clients(client_urls)?;
-
-        join_all(
-            accounts
-                .iter()
-                .map(|account| {
-                    pool.subscribe(
-                        Filter {
-                            authors: vec![account.clone()],
-                            ..Default::default()
-                        }
-                        .into(),
-                    )
-                })
-                .collect::<Vec<_>>(),
-        )
-        .await
-        .into_iter()
-        .collect::<Result<Vec<_>, _>>()?;
+        let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
+        let relayer = Relayer::new(Some(storage), Some(pool))?;
 
         Ok(Self {
-            relayer: Relayer::new(Some(storage), Some(pool))?,
-            accounts,
+            relayer,
+            accounts: GraphManager::new(1, accounts),
         })
     }
 
-    pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
-        let (relayer, handle) = self.relayer.main(server)?;
-        let tasks = vec![handle, tokio::spawn(async move {})];
+    pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
+        let (relayer, relayer_handler) = self.relayer.main(server)?;
+        let tasks = vec![
+            relayer_handler,
+            tokio::spawn(async move {
+                let mut local_connection = relayer.create_new_local_connection().await;
+
+                let _ = self.accounts.create_subscriptions(&local_connection).await;
+
+                let mut already_subscribed = HashSet::new();
+
+                loop {
+                    while let Some(res) = local_connection.recv().await {
+                        if let Response::Event(event) = res {
+                            let current_user: Id =
+                                if let Ok(id) = event.subscription_id.deref().try_into() {
+                                    id
+                                } else {
+                                    continue;
+                                };
+
+                            match event.content() {
+                                Content::Metadata(_profile) => {}
+                                Content::Contacts(_) => {
+                                    let mut ids = vec![];
+
+                                    for tag in event.tags() {
+                                        if let TagType::PubKey(pub_key, relayer_url, _) =
+                                            tag.deref()
+                                        {
+                                            if let Some(_relayer_url) = relayer_url {
+                                                //let _ = relayer
+                                                //    .connect_to_relayer(relayer_url.clone())
+                                                //    .await;
+                                            }
+
+                                            if !already_subscribed.contains(pub_key) {
+                                                ids.push(pub_key.clone());
+                                                already_subscribed.insert(pub_key.clone());
+                                            }
+                                        }
+                                    }
+
+                                    if ids.is_empty() {
+                                        continue;
+                                    }
+
+                                    log::info!("found {} authors", ids.len());
+
+                                    for pub_key in ids {
+                                        let _ = self.accounts.follows_to(&current_user, pub_key);
+                                    }
+
+                                    let _ =
+                                        self.accounts.create_subscriptions(&local_connection).await;
+                                }
+                                Content::ShortTextNote(_) => {}
+                                _ => {}
+                            }
+                        }
+                    }
+                }
+            }),
+        ];
         Ok(tasks.into())
     }
 }

+ 6 - 1
crates/relayer/src/connection/local.rs

@@ -44,7 +44,7 @@ where
     /// Queues sending a message to the relayer in the future time
     pub fn future_send(&self, request: Request, in_the_future: Duration) -> JoinHandle<()> {
         let sender = self.sender.clone();
-        let conn_id = self.conn_id.clone();
+        let conn_id = self.conn_id;
         tokio::spawn(async move {
             sleep(in_the_future).await;
             let _ = sender.send((conn_id, request)).await;
@@ -58,6 +58,11 @@ where
             .await
             .map_err(|e| Error::LocalSendError(Box::new(e)))
     }
+
+    /// Get the sender
+    pub fn sender(&self) -> Sender<(ConnectionId, Request)> {
+        self.sender.clone()
+    }
 }
 
 impl<T> Drop for LocalConnection<T>

+ 1 - 0
crates/relayer/src/connection/mod.rs

@@ -23,6 +23,7 @@ use tokio::{
 use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
+/// Connection Id type
 pub struct ConnectionId(usize);
 
 mod local;

+ 1 - 1
crates/relayer/src/lib.rs

@@ -15,7 +15,7 @@ mod error;
 mod relayer;
 
 pub use self::{
-    connection::{Connection, LocalConnection},
+    connection::{Connection, ConnectionId, LocalConnection},
     error::Error,
     relayer::Relayer,
 };

+ 8 - 5
crates/relayer/src/relayer.rs

@@ -14,7 +14,6 @@ use std::{
     collections::{HashMap, HashSet},
     ops::Deref,
     sync::Arc,
-    time::Instant,
 };
 use tokio::{
     net::{TcpListener, TcpStream},
@@ -144,8 +143,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
         let handle = tokio::spawn(async move {
             loop {
-                let start = Instant::now();
-                println!("{}", client_pool_receiver.len());
                 tokio::select! {
                     Ok((stream, _)) = server.accept() => {
                         // accept new connections
@@ -177,8 +174,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                                 };
 
                                 let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into()));
-                                let duration = start.elapsed();
-                                println!("xTime elapsed: {} ms", duration.as_millis());
                             }
                             _ => {}
                         }
@@ -431,9 +426,17 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         }
 
         let connections = connections.read().await;
+        let mut sent = HashSet::new();
+
         for RelayerSubscriptionId((sub_id, conn_id)) in
             subscription_manager.get_subscribers(&event).await
         {
+            if sent.contains(&conn_id) {
+                continue;
+            }
+
+            sent.insert(conn_id);
+
             if let Some(connection) = connections.get(&conn_id) {
                 let _ = connection.respond(
                     relayer::Event {

+ 1 - 1
crates/storage/rocksdb/Cargo.toml

@@ -9,7 +9,7 @@ edition = "2021"
 nostr-rs-storage-base = { path = "../base" }
 nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
-rocksdb = { version = "0.20.1", features = [
+rocksdb = { version = "0.22.0", features = [
     "multi-threaded-cf",
     "serde",
     "snappy",

+ 1 - 1
crates/storage/sqlite/Cargo.toml

@@ -1,5 +1,5 @@
 [package]
-name = "sqlite"
+name = "nostr-rs-sqlite"
 version = "0.1.0"
 edition = "2021"
 

+ 2 - 3
crates/storage/sqlite/src/lib.rs

@@ -119,7 +119,7 @@ impl SQLite {
             CREATE INDEX IF NOT EXISTS by_id ON event_index (event_id, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_author_id ON event_index (author_id, kind, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_tag ON event_index (tag_name, tag_value, created_at DESC);
-            CREATE INDEX IF NOT EXISTS sorted ON event_index (tag_name, tag_value, created_at DESC);
+            CREATE INDEX IF NOT EXISTS sorted ON event_index (created_at DESC);
             ",
         )
         .execute(index_db)
@@ -155,7 +155,6 @@ impl SQLite {
         if !is_retry {
             indexing.fetch_add(1, Ordering::Relaxed);
         }
-
         tokio::spawn(async move {
             let mut indexes = vec![];
 
@@ -251,7 +250,7 @@ impl SQLite {
                 if let Ok(event) = &event {
                     if filter
                         .as_ref()
-                        .map(|f| !f.check_event(&event))
+                        .map(|f| !f.check_event(event))
                         .unwrap_or_default()
                     {
                         continue;

+ 2 - 1
crates/subscription-manager/src/lib.rs

@@ -76,7 +76,8 @@ where
     I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
     T: Sync + Send + 'static,
 {
-    id: I,
+    /// Subscription ID
+    pub id: I,
     manager: Option<Arc<SubscriptionManager<I, T>>>,
 }
 

+ 6 - 0
crates/types/src/client/close.rs

@@ -14,6 +14,12 @@ use serde_json::Value;
 #[derive(Clone, Debug)]
 pub struct Close(pub SubscriptionId);
 
+impl From<SubscriptionId> for Close {
+    fn from(value: SubscriptionId) -> Self {
+        Self(value)
+    }
+}
+
 impl Deref for Close {
     type Target = SubscriptionId;
 

+ 1 - 1
crates/types/src/types/content/mod.rs

@@ -104,7 +104,7 @@ impl Content {
                                     iv,
                                 })
                         })
-                        .map(|data| Self::EncryptedDirectMessage(data))
+                        .map(Self::EncryptedDirectMessage)
                         .unwrap_or_else(|_| Self::Unparsed(Kind::Unknown(4), content.to_owned())))
                 } else {
                     Ok(Self::Unparsed(Kind::Unknown(4), content.to_owned()))

+ 1 - 0
crates/types/src/types/filter.rs

@@ -44,6 +44,7 @@ impl TagValue {
     }
 
     /// Convert the value into a string
+    #[allow(clippy::inherent_to_string)]
     pub fn to_string(&self) -> String {
         match self {
             TagValue::Id(id) => id.to_string(),

+ 1 - 1
crates/types/src/types/id.rs

@@ -16,7 +16,7 @@ use std::{
 /// Event Id
 ///
 /// Event Id are raw 32 bytes and 64-character length hex encoded to JSON
-#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
+#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)]
 pub struct Id(pub [u8; 32]);
 
 impl TryFrom<&str> for Id {

+ 11 - 2
crates/types/src/types/kind.rs

@@ -48,6 +48,11 @@ pub enum Kind {
     Zap,
     /// Relay List Metadata - NIP-65
     RelayListMetadata,
+    /// things the user doesn't want to see in their feeds
+    MuteList,
+    /// categorized groups of users a client may choose to check out in
+    /// different circumstances
+    Followset,
     /// Unknown Kind
     Unknown(u32),
 }
@@ -99,7 +104,9 @@ impl From<Kind> for u32 {
             Kind::Reaction => 7,
             Kind::ZapRequest => 9734,
             Kind::Zap => 9735,
-            Kind::RelayListMetadata => 10002,
+            Kind::MuteList => 10_000,
+            Kind::RelayListMetadata => 10_002,
+            Kind::Followset => 30_000,
             Kind::Unknown(t) => t,
         }
     }
@@ -118,7 +125,9 @@ impl From<u32> for Kind {
             7 => Kind::Reaction,
             9734 => Kind::ZapRequest,
             9735 => Kind::Zap,
-            10002 => Kind::RelayListMetadata,
+            10_000 => Kind::MuteList,
+            10_002 => Kind::RelayListMetadata,
+            30_000 => Kind::Followset,
             any => Kind::Unknown(any),
         }
     }

Unele fișiere nu au fost afișate deoarece prea multe fișiere au fost modificate în acest diff