Forráskód Böngészése

Working on personal relayer

Cesar Rodas 3 hónapja
szülő
commit
ab1715fe46

A különbségek nem kerülnek megjelenítésre, a fájl túl nagy
+ 178 - 268
Cargo.lock


+ 6 - 1
Cargo.toml

@@ -9,7 +9,12 @@ members = [
     "crates/client",
     "crates/relayer",
     "crates/storage/base",
-    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer", "crates/subscription-manager", "crates/storage/sqlite",
+    "crates/storage/rocksdb",
+    "crates/dump",
+    "crates/storage/memory",
+    "crates/personal-relayer",
+    "crates/subscription-manager",
+    "crates/storage/sqlite",
 ]
 
 [dependencies]

+ 4 - 1
crates/dump/src/main.rs

@@ -22,7 +22,10 @@ async fn main() {
         Url::parse("wss://relay.snort.social").expect("valid url"),
     ]);
 
-    let _ = clients.subscribe(Subscribe::default().into()).await;
+    let _ = clients
+        .subscribe(Subscribe::default().into())
+        .await
+        .expect("v");
 
     loop {
         if let Some((msg, relayed_by)) = clients.recv().await {

+ 3 - 1
crates/personal-relayer/Cargo.toml

@@ -5,13 +5,15 @@ edition = "2021"
 
 [dependencies]
 nostr-rs-types = { path = "../types" }
-nostr-rs-memory = { path = "../storage/memory" }
 nostr-rs-storage-base = { path = "../storage/base" }
+nostr-rs-sqlite = { path = "../storage/sqlite" }
 nostr-rs-client = { path = "../client" }
 nostr-rs-relayer = { path = "../relayer" }
 thiserror = "1.0.39"
 url = { version = "2.5.2", features = ["serde"] }
 futures = "0.3.30"
 tokio = { version = "1.39.2", features = ["full"] }
+serde = "1.0.208"
+serde_json = "1.0.125"
 log = "0.4.22"
 env_logger = "0.11.5"

+ 192 - 28
crates/personal-relayer/src/lib.rs

@@ -1,11 +1,22 @@
+use account::GraphManager;
 use futures::future::join_all;
 use nostr_rs_client::Pool;
-use nostr_rs_relayer::Relayer;
+use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
 use nostr_rs_storage_base::Storage;
-use nostr_rs_types::types::{Addr, Filter, Id};
-use tokio::{net::TcpListener, task::JoinHandle};
+use nostr_rs_types::{
+    client::{self, Subscribe},
+    types::{
+        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
+        Kind, SubscriptionId,
+    },
+    Request, Response,
+};
+use std::{collections::HashSet, ops::Deref, time::Duration};
+use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
 use url::Url;
 
+mod account;
+
 pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 
 impl From<Vec<JoinHandle<()>>> for Stoppable {
@@ -24,6 +35,14 @@ impl Drop for Stoppable {
     }
 }
 
+impl Stoppable {
+    pub async fn wait(mut self) {
+        if let Some(tasks) = self.0.take() {
+            join_all(tasks).await;
+        }
+    }
+}
+
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
     #[error("Relayer: {0}")]
@@ -33,42 +52,187 @@ pub enum Error {
     Client(#[from] nostr_rs_client::Error),
 }
 
+#[derive(Debug, Clone, Default)]
+pub struct Account {
+    pub pub_key: Id,
+    pub degree: usize,
+    pub added_by: Option<Id>,
+    pub profile: Option<Profile>,
+    pub followed_by: HashSet<Id>,
+    pub following: HashSet<Id>,
+    pub content: Vec<Event>,
+    subscribed: Option<(
+        Sender<(ConnectionId, Request)>,
+        ConnectionId,
+        SubscriptionId,
+    )>,
+}
+
+impl Account {
+    pub fn new(pub_key: Id) -> Self {
+        Self {
+            pub_key,
+            degree: 0,
+            added_by: Default::default(),
+            profile: Default::default(),
+            followed_by: Default::default(),
+            following: Default::default(),
+            content: Default::default(),
+            subscribed: Default::default(),
+        }
+    }
+
+    pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
+    where
+        T: Storage + Send + Sync + 'static,
+    {
+        let request = Subscribe {
+            subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
+            filters: vec![
+                Filter {
+                    authors: vec![self.pub_key.clone()],
+                    kinds: vec![
+                        Kind::Contacts,
+                        Kind::Metadata,
+                        Kind::MuteList,
+                        Kind::Followset,
+                    ],
+                    ..Default::default()
+                },
+                Filter {
+                    authors: vec![self.pub_key.clone()],
+                    ..Default::default()
+                },
+                Filter {
+                    tags: vec![(
+                        "p".to_owned(),
+                        HashSet::from([TagValue::Id(self.pub_key.clone())]),
+                    )]
+                    .into_iter()
+                    .collect(),
+                    ..Default::default()
+                },
+            ],
+        };
+
+        self.subscribed = Some((
+            conn.sender(),
+            conn.conn_id.clone(),
+            request.subscription_id.clone(),
+        ));
+        conn.send(Request::Request(request)).await?;
+        Ok(())
+    }
+
+    pub fn add_contact(&self, pub_key: Id) -> Self {
+        Self {
+            pub_key,
+            degree: self.degree + 1,
+            added_by: Some(self.pub_key.clone()),
+            profile: Default::default(),
+            followed_by: Default::default(),
+            following: Default::default(),
+            content: Default::default(),
+            subscribed: Default::default(),
+        }
+    }
+}
+
+impl Drop for Account {
+    fn drop(&mut self) {
+        if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
+            conn.try_send((conn_id, Request::Close(sub_id.into())))
+                .expect("unsubscription");
+        }
+    }
+}
+
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
-    accounts: Vec<Id>,
+    accounts: GraphManager,
 }
 
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
     pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
-        let pool = Pool::new_with_clients(client_urls);
-
-        join_all(
-            accounts
-                .iter()
-                .map(|account| {
-                    pool.subscribe(
-                        Filter {
-                            authors: vec![account.clone()],
-                            ..Default::default()
-                        }
-                        .into(),
-                    )
-                })
-                .collect::<Vec<_>>(),
-        )
-        .await
-        .into_iter()
-        .collect::<Result<Vec<_>, _>>()?;
+        let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
+        let relayer = Relayer::new(Some(storage), Some(pool))?;
 
         Ok(Self {
-            relayer: Relayer::new(Some(storage), Some(pool))?,
-            accounts,
+            relayer,
+            accounts: GraphManager::new(2, accounts),
         })
     }
 
-    pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
-        let (relayer, handle) = self.relayer.main(server)?;
-        let tasks = vec![handle, tokio::spawn(async move {})];
+    pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
+        let (relayer, relayer_handler) = self.relayer.main(server)?;
+        let tasks = vec![
+            relayer_handler,
+            tokio::spawn(async move {
+                let mut local_connection = relayer.create_new_local_connection().await;
+
+                let _ = self.accounts.create_subscriptions(&local_connection).await;
+
+                let mut already_subscribed = HashSet::new();
+
+                loop {
+                    while let Some(res) = local_connection.recv().await {
+                        match res {
+                            Response::Event(event) => {
+                                let current_user: Id =
+                                    if let Ok(id) = event.subscription_id.deref().try_into() {
+                                        id
+                                    } else {
+                                        continue;
+                                    };
+
+                                match event.content() {
+                                    Content::Metadata(_profile) => {}
+                                    Content::Contacts(_) => {
+                                        let mut ids = vec![];
+
+                                        for tag in event.tags() {
+                                            if let TagType::PubKey(pub_key, relayer_url, _) =
+                                                tag.deref()
+                                            {
+                                                if let Some(_relayer_url) = relayer_url {
+                                                    //let _ = relayer
+                                                    //    .connect_to_relayer(relayer_url.clone())
+                                                    //    .await;
+                                                }
+
+                                                if !already_subscribed.contains(pub_key) {
+                                                    ids.push(pub_key.clone());
+                                                    already_subscribed.insert(pub_key.clone());
+                                                }
+                                            }
+                                        }
+
+                                        if ids.is_empty() {
+                                            continue;
+                                        }
+
+                                        log::info!("found {} authors", ids.len());
+
+                                        for pub_key in ids {
+                                            let _ =
+                                                self.accounts.follows_to(&current_user, pub_key);
+                                        }
+
+                                        let _ = self
+                                            .accounts
+                                            .create_subscriptions(&local_connection)
+                                            .await;
+                                    }
+                                    Content::ShortTextNote(_) => {}
+                                    _ => {}
+                                }
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+            }),
+        ];
         Ok(tasks.into())
     }
 }

+ 5 - 0
crates/relayer/src/connection/local.rs

@@ -58,6 +58,11 @@ where
             .await
             .map_err(|e| Error::LocalSendError(Box::new(e)))
     }
+
+    /// Get the sender
+    pub fn sender(&self) -> Sender<(ConnectionId, Request)> {
+        self.sender.clone()
+    }
 }
 
 impl<T> Drop for LocalConnection<T>

+ 1 - 0
crates/relayer/src/connection/mod.rs

@@ -23,6 +23,7 @@ use tokio::{
 use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
+/// Connection Id type
 pub struct ConnectionId(usize);
 
 mod local;

+ 1 - 1
crates/relayer/src/lib.rs

@@ -15,7 +15,7 @@ mod error;
 mod relayer;
 
 pub use self::{
-    connection::{Connection, LocalConnection},
+    connection::{Connection, ConnectionId, LocalConnection},
     error::Error,
     relayer::Relayer,
 };

+ 1 - 1
crates/storage/rocksdb/Cargo.toml

@@ -9,7 +9,7 @@ edition = "2021"
 nostr-rs-storage-base = { path = "../base" }
 nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
-rocksdb = { version = "0.20.1", features = [
+rocksdb = { version = "0.22.0", features = [
     "multi-threaded-cf",
     "serde",
     "snappy",

+ 1 - 1
crates/storage/sqlite/Cargo.toml

@@ -1,5 +1,5 @@
 [package]
-name = "sqlite"
+name = "nostr-rs-sqlite"
 version = "0.1.0"
 edition = "2021"
 

+ 2 - 3
crates/storage/sqlite/src/lib.rs

@@ -119,7 +119,7 @@ impl SQLite {
             CREATE INDEX IF NOT EXISTS by_id ON event_index (event_id, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_author_id ON event_index (author_id, kind, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_tag ON event_index (tag_name, tag_value, created_at DESC);
-            CREATE INDEX IF NOT EXISTS sorted ON event_index (tag_name, tag_value, created_at DESC);
+            CREATE INDEX IF NOT EXISTS sorted ON event_index (created_at DESC);
             ",
         )
         .execute(index_db)
@@ -155,7 +155,6 @@ impl SQLite {
         if !is_retry {
             indexing.fetch_add(1, Ordering::Relaxed);
         }
-
         tokio::spawn(async move {
             let mut indexes = vec![];
 
@@ -251,7 +250,7 @@ impl SQLite {
                 if let Ok(event) = &event {
                     if filter
                         .as_ref()
-                        .map(|f| !f.check_event(&event))
+                        .map(|f| !f.check_event(event))
                         .unwrap_or_default()
                     {
                         continue;

+ 2 - 1
crates/subscription-manager/src/lib.rs

@@ -76,7 +76,8 @@ where
     I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
     T: Sync + Send + 'static,
 {
-    id: I,
+    /// Subscription ID
+    pub id: I,
     manager: Option<Arc<SubscriptionManager<I, T>>>,
 }
 

+ 6 - 0
crates/types/src/client/close.rs

@@ -14,6 +14,12 @@ use serde_json::Value;
 #[derive(Clone, Debug)]
 pub struct Close(pub SubscriptionId);
 
+impl From<SubscriptionId> for Close {
+    fn from(value: SubscriptionId) -> Self {
+        Self(value)
+    }
+}
+
 impl Deref for Close {
     type Target = SubscriptionId;
 

+ 1 - 1
crates/types/src/types/content/mod.rs

@@ -104,7 +104,7 @@ impl Content {
                                     iv,
                                 })
                         })
-                        .map(|data| Self::EncryptedDirectMessage(data))
+                        .map(Self::EncryptedDirectMessage)
                         .unwrap_or_else(|_| Self::Unparsed(Kind::Unknown(4), content.to_owned())))
                 } else {
                     Ok(Self::Unparsed(Kind::Unknown(4), content.to_owned()))

+ 1 - 0
crates/types/src/types/filter.rs

@@ -44,6 +44,7 @@ impl TagValue {
     }
 
     /// Convert the value into a string
+    #[allow(clippy::inherent_to_string)]
     pub fn to_string(&self) -> String {
         match self {
             TagValue::Id(id) => id.to_string(),

+ 1 - 1
crates/types/src/types/id.rs

@@ -16,7 +16,7 @@ use std::{
 /// Event Id
 ///
 /// Event Id are raw 32 bytes and 64-character length hex encoded to JSON
-#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
+#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)]
 pub struct Id(pub [u8; 32]);
 
 impl TryFrom<&str> for Id {

+ 11 - 2
crates/types/src/types/kind.rs

@@ -48,6 +48,11 @@ pub enum Kind {
     Zap,
     /// Relay List Metadata - NIP-65
     RelayListMetadata,
+    /// things the user doesn't want to see in their feeds
+    MuteList,
+    /// categorized groups of users a client may choose to check out in
+    /// different circumstances
+    Followset,
     /// Unknown Kind
     Unknown(u32),
 }
@@ -99,7 +104,9 @@ impl From<Kind> for u32 {
             Kind::Reaction => 7,
             Kind::ZapRequest => 9734,
             Kind::Zap => 9735,
-            Kind::RelayListMetadata => 10002,
+            Kind::MuteList => 10_000,
+            Kind::RelayListMetadata => 10_002,
+            Kind::Followset => 30_000,
             Kind::Unknown(t) => t,
         }
     }
@@ -118,7 +125,9 @@ impl From<u32> for Kind {
             7 => Kind::Reaction,
             9734 => Kind::ZapRequest,
             9735 => Kind::Zap,
-            10002 => Kind::RelayListMetadata,
+            10_000 => Kind::MuteList,
+            10_002 => Kind::RelayListMetadata,
+            30_000 => Kind::Followset,
             any => Kind::Unknown(any),
         }
     }

Nem az összes módosított fájl került megjelenítésre, mert túl sok fájl változott