Browse Source

Working on personal relayer

Cesar Rodas 3 months ago
parent
commit
97dddb7cbd

File diff suppressed because it is too large
+ 178 - 268
Cargo.lock


+ 6 - 1
Cargo.toml

@@ -9,7 +9,12 @@ members = [
     "crates/client",
     "crates/client",
     "crates/relayer",
     "crates/relayer",
     "crates/storage/base",
     "crates/storage/base",
-    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer", "crates/subscription-manager", "crates/storage/sqlite",
+    "crates/storage/rocksdb",
+    "crates/dump",
+    "crates/storage/memory",
+    "crates/personal-relayer",
+    "crates/subscription-manager",
+    "crates/storage/sqlite",
 ]
 ]
 
 
 [dependencies]
 [dependencies]

+ 4 - 1
crates/dump/src/main.rs

@@ -22,7 +22,10 @@ async fn main() {
         Url::parse("wss://relay.snort.social").expect("valid url"),
         Url::parse("wss://relay.snort.social").expect("valid url"),
     ]);
     ]);
 
 
-    let _ = clients.subscribe(Subscribe::default().into()).await;
+    let _ = clients
+        .subscribe(Subscribe::default().into())
+        .await
+        .expect("v");
 
 
     loop {
     loop {
         if let Some((msg, relayed_by)) = clients.recv().await {
         if let Some((msg, relayed_by)) = clients.recv().await {

+ 3 - 1
crates/personal-relayer/Cargo.toml

@@ -5,13 +5,15 @@ edition = "2021"
 
 
 [dependencies]
 [dependencies]
 nostr-rs-types = { path = "../types" }
 nostr-rs-types = { path = "../types" }
-nostr-rs-memory = { path = "../storage/memory" }
 nostr-rs-storage-base = { path = "../storage/base" }
 nostr-rs-storage-base = { path = "../storage/base" }
+nostr-rs-sqlite = { path = "../storage/sqlite" }
 nostr-rs-client = { path = "../client" }
 nostr-rs-client = { path = "../client" }
 nostr-rs-relayer = { path = "../relayer" }
 nostr-rs-relayer = { path = "../relayer" }
 thiserror = "1.0.39"
 thiserror = "1.0.39"
 url = { version = "2.5.2", features = ["serde"] }
 url = { version = "2.5.2", features = ["serde"] }
 futures = "0.3.30"
 futures = "0.3.30"
 tokio = { version = "1.39.2", features = ["full"] }
 tokio = { version = "1.39.2", features = ["full"] }
+serde = "1.0.208"
+serde_json = "1.0.125"
 log = "0.4.22"
 log = "0.4.22"
 env_logger = "0.11.5"
 env_logger = "0.11.5"

+ 166 - 28
crates/personal-relayer/src/lib.rs

@@ -2,7 +2,15 @@ use futures::future::join_all;
 use nostr_rs_client::Pool;
 use nostr_rs_client::Pool;
 use nostr_rs_relayer::Relayer;
 use nostr_rs_relayer::Relayer;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_storage_base::Storage;
-use nostr_rs_types::types::{Addr, Filter, Id};
+use nostr_rs_types::{
+    client,
+    types::{
+        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind,
+    },
+    Request, Response,
+};
+use serde::Serialize;
+use std::{collections::HashSet, ops::Deref, time::Duration};
 use tokio::{net::TcpListener, task::JoinHandle};
 use tokio::{net::TcpListener, task::JoinHandle};
 use url::Url;
 use url::Url;
 
 
@@ -24,6 +32,14 @@ impl Drop for Stoppable {
     }
     }
 }
 }
 
 
+impl Stoppable {
+    pub async fn wait(mut self) {
+        if let Some(tasks) = self.0.take() {
+            join_all(tasks).await;
+        }
+    }
+}
+
 #[derive(thiserror::Error, Debug)]
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
 pub enum Error {
     #[error("Relayer: {0}")]
     #[error("Relayer: {0}")]
@@ -33,6 +49,29 @@ pub enum Error {
     Client(#[from] nostr_rs_client::Error),
     Client(#[from] nostr_rs_client::Error),
 }
 }
 
 
+#[derive(Debug, Clone, Serialize)]
+pub struct Contact {
+    pub pub_key: Id,
+    pub added_by: Option<Id>,
+    pub profile: Option<Profile>,
+    pub followed_by: HashSet<Id>,
+    pub following: HashSet<Id>,
+    pub content: Vec<Event>,
+}
+
+impl Contact {
+    pub fn new(pub_key: Id, added_by: Option<Id>) -> Self {
+        Self {
+            pub_key,
+            profile: None,
+            added_by,
+            followed_by: HashSet::new(),
+            following: HashSet::new(),
+            content: Vec::new(),
+        }
+    }
+}
+
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
     relayer: Relayer<T>,
     accounts: Vec<Id>,
     accounts: Vec<Id>,
@@ -40,35 +79,134 @@ pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
 
 
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
     pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
     pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
-        let pool = Pool::new_with_clients(client_urls);
-
-        join_all(
-            accounts
-                .iter()
-                .map(|account| {
-                    pool.subscribe(
-                        Filter {
-                            authors: vec![account.clone()],
-                            ..Default::default()
-                        }
-                        .into(),
-                    )
-                })
-                .collect::<Vec<_>>(),
-        )
-        .await
-        .into_iter()
-        .collect::<Result<Vec<_>, _>>()?;
-
-        Ok(Self {
-            relayer: Relayer::new(Some(storage), Some(pool))?,
-            accounts,
-        })
+        let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
+        let relayer = Relayer::new(Some(storage), Some(pool))?;
+
+        Ok(Self { relayer, accounts })
     }
     }
 
 
-    pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
-        let (relayer, handle) = self.relayer.main(server)?;
-        let tasks = vec![handle, tokio::spawn(async move {})];
+    pub async fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
+        let (relayer, relayer_handler) = self.relayer.main(server)?;
+        let kinds = vec![
+            Kind::Contacts,
+            Kind::Metadata,
+            Kind::MuteList,
+            Kind::Followset,
+        ];
+
+        let tasks = vec![
+            relayer_handler,
+            tokio::spawn(async move {
+                let mut local_connection = relayer.create_new_local_connection().await;
+                local_connection
+                    .send(Request::Request(
+                        vec![
+                            Filter {
+                                authors: self.accounts.clone(),
+                                kinds: kinds.clone(),
+                                ..Default::default()
+                            },
+                            Filter {
+                                kinds: kinds.clone(),
+                                tags: vec![(
+                                    "p".to_owned(),
+                                    self.accounts
+                                        .iter()
+                                        .map(|id| TagValue::Id(id.clone()))
+                                        .collect::<HashSet<TagValue>>(),
+                                )]
+                                .into_iter()
+                                .collect(),
+                                ..Default::default()
+                            },
+                        ]
+                        .into(),
+                    ))
+                    .await
+                    .expect("Failed to send request");
+
+                let mut already_subscribed = HashSet::new();
+                let mut to_remove = HashSet::new();
+
+                loop {
+                    while let Some(res) = local_connection.recv().await {
+                        match res {
+                            Response::EndOfStoredEvents(id) => {
+                                if to_remove.contains(&id.0) {
+                                    let _ = local_connection.future_send(
+                                        Request::Close(id.0.into()),
+                                        Duration::from_secs(10),
+                                    );
+                                }
+                            }
+                            Response::Event(event) => {
+                                match event.content() {
+                                    Content::Metadata(_profile) => {}
+                                    Content::Contacts(_) => {
+                                        let mut ids = vec![];
+
+                                        for tag in event.tags() {
+                                            if let TagType::PubKey(pub_key, relayer_url, _) =
+                                                tag.deref()
+                                            {
+                                                if let Some(_relayer_url) = relayer_url {
+                                                    //let _ = relayer
+                                                    //    .connect_to_relayer(relayer_url.clone())
+                                                    //    .await;
+                                                }
+
+                                                if !already_subscribed.contains(pub_key) {
+                                                    ids.push(pub_key.clone());
+                                                    already_subscribed.insert(pub_key.clone());
+                                                }
+                                            }
+                                        }
+
+                                        if ids.len() > 0 {
+                                            log::info!("found {} authors", ids.len());
+                                        }
+
+                                        for authors in
+                                            ids.chunks(20).collect::<Vec<_>>().into_iter()
+                                        {
+                                            let subscribe: client::Subscribe = vec![
+                                                Filter {
+                                                    kinds: kinds.clone(),
+                                                    authors: authors.to_vec(),
+                                                    ..Default::default()
+                                                },
+                                                Filter {
+                                                    kinds: kinds.clone(),
+                                                    tags: vec![(
+                                                        "p".to_owned(),
+                                                        authors
+                                                            .iter()
+                                                            .map(|id| TagValue::Id(id.clone()))
+                                                            .collect::<HashSet<TagValue>>(),
+                                                    )]
+                                                    .into_iter()
+                                                    .collect(),
+                                                    ..Default::default()
+                                                },
+                                            ]
+                                            .into();
+                                            to_remove.insert(subscribe.subscription_id.clone());
+
+                                            let _ = local_connection
+                                                .send(Request::Request(subscribe))
+                                                .await;
+                                        }
+                                    }
+                                    Content::ShortTextNote(_) => {}
+                                    _ => {}
+                                }
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+            }),
+        ];
         Ok(tasks.into())
         Ok(tasks.into())
     }
     }
 }
 }

+ 1 - 1
crates/storage/rocksdb/Cargo.toml

@@ -9,7 +9,7 @@ edition = "2021"
 nostr-rs-storage-base = { path = "../base" }
 nostr-rs-storage-base = { path = "../base" }
 nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-subscription-manager = { path = "../../subscription-manager" }
 nostr-rs-types = { path = "../../types" }
 nostr-rs-types = { path = "../../types" }
-rocksdb = { version = "0.20.1", features = [
+rocksdb = { version = "0.22.0", features = [
     "multi-threaded-cf",
     "multi-threaded-cf",
     "serde",
     "serde",
     "snappy",
     "snappy",

+ 1 - 1
crates/storage/sqlite/Cargo.toml

@@ -1,5 +1,5 @@
 [package]
 [package]
-name = "sqlite"
+name = "nostr-rs-sqlite"
 version = "0.1.0"
 version = "0.1.0"
 edition = "2021"
 edition = "2021"
 
 

+ 2 - 6
crates/storage/sqlite/src/lib.rs

@@ -119,7 +119,7 @@ impl SQLite {
             CREATE INDEX IF NOT EXISTS by_id ON event_index (event_id, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_id ON event_index (event_id, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_author_id ON event_index (author_id, kind, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_author_id ON event_index (author_id, kind, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_tag ON event_index (tag_name, tag_value, created_at DESC);
             CREATE INDEX IF NOT EXISTS by_tag ON event_index (tag_name, tag_value, created_at DESC);
-            CREATE INDEX IF NOT EXISTS sorted ON event_index (tag_name, tag_value, created_at DESC);
+            CREATE INDEX IF NOT EXISTS sorted ON event_index (created_at DESC);
             ",
             ",
         )
         )
         .execute(index_db)
         .execute(index_db)
@@ -152,10 +152,6 @@ impl<'a> Stream for Cursor<'a> {
 
 
 impl SQLite {
 impl SQLite {
     fn build_index(indexing: Arc<AtomicUsize>, pool: Pool<Sqlite>, event: Event, is_retry: bool) {
     fn build_index(indexing: Arc<AtomicUsize>, pool: Pool<Sqlite>, event: Event, is_retry: bool) {
-        if !is_retry {
-            indexing.fetch_add(1, Ordering::Relaxed);
-        }
-
         tokio::spawn(async move {
         tokio::spawn(async move {
             let mut indexes = vec![];
             let mut indexes = vec![];
 
 
@@ -251,7 +247,7 @@ impl SQLite {
                 if let Ok(event) = &event {
                 if let Ok(event) = &event {
                     if filter
                     if filter
                         .as_ref()
                         .as_ref()
-                        .map(|f| !f.check_event(&event))
+                        .map(|f| !f.check_event(event))
                         .unwrap_or_default()
                         .unwrap_or_default()
                     {
                     {
                         continue;
                         continue;

+ 2 - 1
crates/subscription-manager/src/lib.rs

@@ -76,7 +76,8 @@ where
     I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
     I: Default + Debug + Hash + Ord + Clone + Sync + Send + 'static,
     T: Sync + Send + 'static,
     T: Sync + Send + 'static,
 {
 {
-    id: I,
+    /// Subscription ID
+    pub id: I,
     manager: Option<Arc<SubscriptionManager<I, T>>>,
     manager: Option<Arc<SubscriptionManager<I, T>>>,
 }
 }
 
 

+ 6 - 0
crates/types/src/client/close.rs

@@ -14,6 +14,12 @@ use serde_json::Value;
 #[derive(Clone, Debug)]
 #[derive(Clone, Debug)]
 pub struct Close(pub SubscriptionId);
 pub struct Close(pub SubscriptionId);
 
 
+impl From<SubscriptionId> for Close {
+    fn from(value: SubscriptionId) -> Self {
+        Self(value)
+    }
+}
+
 impl Deref for Close {
 impl Deref for Close {
     type Target = SubscriptionId;
     type Target = SubscriptionId;
 
 

+ 1 - 1
crates/types/src/types/content/mod.rs

@@ -104,7 +104,7 @@ impl Content {
                                     iv,
                                     iv,
                                 })
                                 })
                         })
                         })
-                        .map(|data| Self::EncryptedDirectMessage(data))
+                        .map(Self::EncryptedDirectMessage)
                         .unwrap_or_else(|_| Self::Unparsed(Kind::Unknown(4), content.to_owned())))
                         .unwrap_or_else(|_| Self::Unparsed(Kind::Unknown(4), content.to_owned())))
                 } else {
                 } else {
                     Ok(Self::Unparsed(Kind::Unknown(4), content.to_owned()))
                     Ok(Self::Unparsed(Kind::Unknown(4), content.to_owned()))

+ 1 - 0
crates/types/src/types/filter.rs

@@ -44,6 +44,7 @@ impl TagValue {
     }
     }
 
 
     /// Convert the value into a string
     /// Convert the value into a string
+    #[allow(clippy::inherent_to_string)]
     pub fn to_string(&self) -> String {
     pub fn to_string(&self) -> String {
         match self {
         match self {
             TagValue::Id(id) => id.to_string(),
             TagValue::Id(id) => id.to_string(),

+ 11 - 2
crates/types/src/types/kind.rs

@@ -48,6 +48,11 @@ pub enum Kind {
     Zap,
     Zap,
     /// Relay List Metadata - NIP-65
     /// Relay List Metadata - NIP-65
     RelayListMetadata,
     RelayListMetadata,
+    /// things the user doesn't want to see in their feeds
+    MuteList,
+    /// categorized groups of users a client may choose to check out in
+    /// different circumstances
+    Followset,
     /// Unknown Kind
     /// Unknown Kind
     Unknown(u32),
     Unknown(u32),
 }
 }
@@ -99,7 +104,9 @@ impl From<Kind> for u32 {
             Kind::Reaction => 7,
             Kind::Reaction => 7,
             Kind::ZapRequest => 9734,
             Kind::ZapRequest => 9734,
             Kind::Zap => 9735,
             Kind::Zap => 9735,
-            Kind::RelayListMetadata => 10002,
+            Kind::MuteList => 10_000,
+            Kind::RelayListMetadata => 10_002,
+            Kind::Followset => 30_000,
             Kind::Unknown(t) => t,
             Kind::Unknown(t) => t,
         }
         }
     }
     }
@@ -118,7 +125,9 @@ impl From<u32> for Kind {
             7 => Kind::Reaction,
             7 => Kind::Reaction,
             9734 => Kind::ZapRequest,
             9734 => Kind::ZapRequest,
             9735 => Kind::Zap,
             9735 => Kind::Zap,
-            10002 => Kind::RelayListMetadata,
+            10_000 => Kind::MuteList,
+            10_002 => Kind::RelayListMetadata,
+            30_000 => Kind::Followset,
             any => Kind::Unknown(any),
             any => Kind::Unknown(any),
         }
         }
     }
     }

Some files were not shown because too many files changed in this diff