1
0

2 Коммиты 8d2fce02b9 ... 439d07ad29

Автор SHA1 Сообщение Дата
  Cesar Rodas 439d07ad29 Working on personal relayer 3 месяцев назад
  Cesar Rodas 8d2fce02b9 Working on personal relayer 3 месяцев назад

+ 116 - 83
crates/personal-relayer/src/lib.rs

@@ -1,19 +1,22 @@
+use account::GraphManager;
 use futures::future::join_all;
 use nostr_rs_client::Pool;
-use nostr_rs_relayer::Relayer;
+use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{
-    client,
+    client::{self, Subscribe},
     types::{
-        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind,
+        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
+        Kind, SubscriptionId,
     },
     Request, Response,
 };
-use serde::Serialize;
 use std::{collections::HashSet, ops::Deref, time::Duration};
-use tokio::{net::TcpListener, task::JoinHandle};
+use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
 use url::Url;
 
+mod account;
+
 pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 
 impl From<Vec<JoinHandle<()>>> for Stoppable {
@@ -49,32 +52,104 @@ pub enum Error {
     Client(#[from] nostr_rs_client::Error),
 }
 
-#[derive(Debug, Clone, Serialize)]
-pub struct Contact {
+#[derive(Debug, Clone, Default)]
+pub struct Account {
     pub pub_key: Id,
+    pub degree: usize,
     pub added_by: Option<Id>,
     pub profile: Option<Profile>,
     pub followed_by: HashSet<Id>,
     pub following: HashSet<Id>,
     pub content: Vec<Event>,
+    subscribed: Option<(
+        Sender<(ConnectionId, Request)>,
+        ConnectionId,
+        SubscriptionId,
+    )>,
 }
 
-impl Contact {
-    pub fn new(pub_key: Id, added_by: Option<Id>) -> Self {
+impl Account {
+    pub fn new(pub_key: Id) -> Self {
+        Self {
+            pub_key,
+            degree: 0,
+            added_by: Default::default(),
+            profile: Default::default(),
+            followed_by: Default::default(),
+            following: Default::default(),
+            content: Default::default(),
+            subscribed: Default::default(),
+        }
+    }
+
+    pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
+    where
+        T: Storage + Send + Sync + 'static,
+    {
+        let request = Subscribe {
+            subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
+            filters: vec![
+                Filter {
+                    authors: vec![self.pub_key.clone()],
+                    kinds: vec![
+                        Kind::Contacts,
+                        Kind::Metadata,
+                        Kind::MuteList,
+                        Kind::Followset,
+                    ],
+                    ..Default::default()
+                },
+                Filter {
+                    authors: vec![self.pub_key.clone()],
+                    ..Default::default()
+                },
+                Filter {
+                    tags: vec![(
+                        "p".to_owned(),
+                        HashSet::from([TagValue::Id(self.pub_key.clone())]),
+                    )]
+                    .into_iter()
+                    .collect(),
+                    ..Default::default()
+                },
+            ],
+        };
+
+        self.subscribed = Some((
+            conn.sender(),
+            conn.conn_id.clone(),
+            request.subscription_id.clone(),
+        ));
+        conn.send(Request::Request(request)).await?;
+        Ok(())
+    }
+
+    pub fn add_contact(&self, pub_key: Id) -> Self {
         Self {
             pub_key,
-            profile: None,
-            added_by,
-            followed_by: HashSet::new(),
-            following: HashSet::new(),
-            content: Vec::new(),
+            degree: self.degree + 1,
+            added_by: Some(self.pub_key.clone()),
+            profile: Default::default(),
+            followed_by: Default::default(),
+            following: Default::default(),
+            content: Default::default(),
+            subscribed: Default::default(),
+        }
+    }
+}
+
+impl Drop for Account {
+    fn drop(&mut self) {
+        if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
+            conn.try_send((conn_id, Request::Close(sub_id.into())))
+                .expect("unsubscription");
         }
     }
 }
 
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
-    accounts: Vec<Id>,
+    accounts: GraphManager,
 }
 
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
@@ -82,10 +157,13 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
         let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
         let relayer = Relayer::new(Some(storage), Some(pool))?;
 
-        Ok(Self { relayer, accounts })
+        Ok(Self {
+            relayer,
+            accounts: GraphManager::new(2, accounts),
+        })
     }
 
-    pub async fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
+    pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
         let (relayer, relayer_handler) = self.relayer.main(server)?;
         let kinds = vec![
             Kind::Contacts,
@@ -98,48 +176,22 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
             relayer_handler,
             tokio::spawn(async move {
                 let mut local_connection = relayer.create_new_local_connection().await;
-                local_connection
-                    .send(Request::Request(
-                        vec![
-                            Filter {
-                                authors: self.accounts.clone(),
-                                kinds: kinds.clone(),
-                                ..Default::default()
-                            },
-                            Filter {
-                                kinds: kinds.clone(),
-                                tags: vec![(
-                                    "p".to_owned(),
-                                    self.accounts
-                                        .iter()
-                                        .map(|id| TagValue::Id(id.clone()))
-                                        .collect::<HashSet<TagValue>>(),
-                                )]
-                                .into_iter()
-                                .collect(),
-                                ..Default::default()
-                            },
-                        ]
-                        .into(),
-                    ))
-                    .await
-                    .expect("Failed to send request");
+
+                let _ = self.accounts.create_subscriptions(&local_connection).await;
 
                 let mut already_subscribed = HashSet::new();
-                let mut to_remove = HashSet::new();
 
                 loop {
                     while let Some(res) = local_connection.recv().await {
                         match res {
-                            Response::EndOfStoredEvents(id) => {
-                                if to_remove.contains(&id.0) {
-                                    let _ = local_connection.future_send(
-                                        Request::Close(id.0.into()),
-                                        Duration::from_secs(10),
-                                    );
-                                }
-                            }
                             Response::Event(event) => {
+                                let current_user: Id =
+                                    if let Ok(id) = event.subscription_id.deref().try_into() {
+                                        id
+                                    } else {
+                                        continue;
+                                    };
+
                                 match event.content() {
                                     Content::Metadata(_profile) => {}
                                     Content::Contacts(_) => {
@@ -162,40 +214,21 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
                                             }
                                         }
 
-                                        if ids.len() > 0 {
-                                            log::info!("found {} authors", ids.len());
+                                        if ids.is_empty() {
+                                            continue;
                                         }
 
-                                        for authors in
-                                            ids.chunks(20).collect::<Vec<_>>().into_iter()
-                                        {
-                                            let subscribe: client::Subscribe = vec![
-                                                Filter {
-                                                    kinds: kinds.clone(),
-                                                    authors: authors.to_vec(),
-                                                    ..Default::default()
-                                                },
-                                                Filter {
-                                                    kinds: kinds.clone(),
-                                                    tags: vec![(
-                                                        "p".to_owned(),
-                                                        authors
-                                                            .iter()
-                                                            .map(|id| TagValue::Id(id.clone()))
-                                                            .collect::<HashSet<TagValue>>(),
-                                                    )]
-                                                    .into_iter()
-                                                    .collect(),
-                                                    ..Default::default()
-                                                },
-                                            ]
-                                            .into();
-                                            to_remove.insert(subscribe.subscription_id.clone());
-
-                                            let _ = local_connection
-                                                .send(Request::Request(subscribe))
-                                                .await;
+                                        log::info!("found {} authors", ids.len());
+
+                                        for pub_key in ids {
+                                            let _ =
+                                                self.accounts.follows_to(&current_user, pub_key);
                                         }
+
+                                        let _ = self
+                                            .accounts
+                                            .create_subscriptions(&local_connection)
+                                            .await;
                                     }
                                     Content::ShortTextNote(_) => {}
                                     _ => {}

+ 5 - 0
crates/relayer/src/connection/local.rs

@@ -58,6 +58,11 @@ where
             .await
             .map_err(|e| Error::LocalSendError(Box::new(e)))
     }
+
+    /// Get the sender
+    pub fn sender(&self) -> Sender<(ConnectionId, Request)> {
+        self.sender.clone()
+    }
 }
 
 impl<T> Drop for LocalConnection<T>

+ 1 - 0
crates/relayer/src/connection/mod.rs

@@ -23,6 +23,7 @@ use tokio::{
 use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
+/// Connection Id type
 pub struct ConnectionId(usize);
 
 mod local;

+ 1 - 1
crates/relayer/src/lib.rs

@@ -15,7 +15,7 @@ mod error;
 mod relayer;
 
 pub use self::{
-    connection::{Connection, LocalConnection},
+    connection::{Connection, ConnectionId, LocalConnection},
     error::Error,
     relayer::Relayer,
 };

+ 1 - 1
crates/types/src/types/id.rs

@@ -16,7 +16,7 @@ use std::{
 /// Event Id
 ///
 /// Event Id are raw 32 bytes and 64-character length hex encoded to JSON
-#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
+#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)]
 pub struct Id(pub [u8; 32]);
 
 impl TryFrom<&str> for Id {