2 Commits 8d2fce02b9 ... 439d07ad29

Tác giả SHA1 Thông báo Ngày
  Cesar Rodas 439d07ad29 Working on personal relayer 10 tháng trước cách đây
  Cesar Rodas 8d2fce02b9 Working on personal relayer 10 tháng trước cách đây

+ 116 - 83
crates/personal-relayer/src/lib.rs

@@ -1,19 +1,22 @@
+use account::GraphManager;
 use futures::future::join_all;
 use nostr_rs_client::Pool;
-use nostr_rs_relayer::Relayer;
+use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{
-    client,
+    client::{self, Subscribe},
     types::{
-        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind,
+        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
+        Kind, SubscriptionId,
     },
     Request, Response,
 };
-use serde::Serialize;
 use std::{collections::HashSet, ops::Deref, time::Duration};
-use tokio::{net::TcpListener, task::JoinHandle};
+use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
 use url::Url;
 
+mod account;
+
 pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 
 impl From<Vec<JoinHandle<()>>> for Stoppable {
@@ -49,32 +52,104 @@ pub enum Error {
     Client(#[from] nostr_rs_client::Error),
 }
 
-#[derive(Debug, Clone, Serialize)]
-pub struct Contact {
+#[derive(Debug, Clone, Default)]
+pub struct Account {
     pub pub_key: Id,
+    pub degree: usize,
     pub added_by: Option<Id>,
     pub profile: Option<Profile>,
     pub followed_by: HashSet<Id>,
     pub following: HashSet<Id>,
     pub content: Vec<Event>,
+    subscribed: Option<(
+        Sender<(ConnectionId, Request)>,
+        ConnectionId,
+        SubscriptionId,
+    )>,
 }
 
-impl Contact {
-    pub fn new(pub_key: Id, added_by: Option<Id>) -> Self {
+impl Account {
+    pub fn new(pub_key: Id) -> Self {
+        Self {
+            pub_key,
+            degree: 0,
+            added_by: Default::default(),
+            profile: Default::default(),
+            followed_by: Default::default(),
+            following: Default::default(),
+            content: Default::default(),
+            subscribed: Default::default(),
+        }
+    }
+
+    pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
+    where
+        T: Storage + Send + Sync + 'static,
+    {
+        let request = Subscribe {
+            subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
+            filters: vec![
+                Filter {
+                    authors: vec![self.pub_key.clone()],
+                    kinds: vec![
+                        Kind::Contacts,
+                        Kind::Metadata,
+                        Kind::MuteList,
+                        Kind::Followset,
+                    ],
+                    ..Default::default()
+                },
+                Filter {
+                    authors: vec![self.pub_key.clone()],
+                    ..Default::default()
+                },
+                Filter {
+                    tags: vec![(
+                        "p".to_owned(),
+                        HashSet::from([TagValue::Id(self.pub_key.clone())]),
+                    )]
+                    .into_iter()
+                    .collect(),
+                    ..Default::default()
+                },
+            ],
+        };
+
+        self.subscribed = Some((
+            conn.sender(),
+            conn.conn_id.clone(),
+            request.subscription_id.clone(),
+        ));
+        conn.send(Request::Request(request)).await?;
+        Ok(())
+    }
+
+    pub fn add_contact(&self, pub_key: Id) -> Self {
         Self {
             pub_key,
-            profile: None,
-            added_by,
-            followed_by: HashSet::new(),
-            following: HashSet::new(),
-            content: Vec::new(),
+            degree: self.degree + 1,
+            added_by: Some(self.pub_key.clone()),
+            profile: Default::default(),
+            followed_by: Default::default(),
+            following: Default::default(),
+            content: Default::default(),
+            subscribed: Default::default(),
+        }
+    }
+}
+
+impl Drop for Account {
+    fn drop(&mut self) {
+        if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
+            conn.try_send((conn_id, Request::Close(sub_id.into())))
+                .expect("unsubscription");
         }
     }
 }
 
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
-    accounts: Vec<Id>,
+    accounts: GraphManager,
 }
 
 impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
@@ -82,10 +157,13 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
         let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
         let relayer = Relayer::new(Some(storage), Some(pool))?;
 
-        Ok(Self { relayer, accounts })
+        Ok(Self {
+            relayer,
+            accounts: GraphManager::new(2, accounts),
+        })
     }
 
-    pub async fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
+    pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
         let (relayer, relayer_handler) = self.relayer.main(server)?;
         let kinds = vec![
             Kind::Contacts,
@@ -98,48 +176,22 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
             relayer_handler,
             tokio::spawn(async move {
                 let mut local_connection = relayer.create_new_local_connection().await;
-                local_connection
-                    .send(Request::Request(
-                        vec![
-                            Filter {
-                                authors: self.accounts.clone(),
-                                kinds: kinds.clone(),
-                                ..Default::default()
-                            },
-                            Filter {
-                                kinds: kinds.clone(),
-                                tags: vec![(
-                                    "p".to_owned(),
-                                    self.accounts
-                                        .iter()
-                                        .map(|id| TagValue::Id(id.clone()))
-                                        .collect::<HashSet<TagValue>>(),
-                                )]
-                                .into_iter()
-                                .collect(),
-                                ..Default::default()
-                            },
-                        ]
-                        .into(),
-                    ))
-                    .await
-                    .expect("Failed to send request");
+
+                let _ = self.accounts.create_subscriptions(&local_connection).await;
 
                 let mut already_subscribed = HashSet::new();
-                let mut to_remove = HashSet::new();
 
                 loop {
                     while let Some(res) = local_connection.recv().await {
                         match res {
-                            Response::EndOfStoredEvents(id) => {
-                                if to_remove.contains(&id.0) {
-                                    let _ = local_connection.future_send(
-                                        Request::Close(id.0.into()),
-                                        Duration::from_secs(10),
-                                    );
-                                }
-                            }
                             Response::Event(event) => {
+                                let current_user: Id =
+                                    if let Ok(id) = event.subscription_id.deref().try_into() {
+                                        id
+                                    } else {
+                                        continue;
+                                    };
+
                                 match event.content() {
                                     Content::Metadata(_profile) => {}
                                     Content::Contacts(_) => {
@@ -162,40 +214,21 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
                                             }
                                         }
 
-                                        if ids.len() > 0 {
-                                            log::info!("found {} authors", ids.len());
+                                        if ids.is_empty() {
+                                            continue;
                                         }
 
-                                        for authors in
-                                            ids.chunks(20).collect::<Vec<_>>().into_iter()
-                                        {
-                                            let subscribe: client::Subscribe = vec![
-                                                Filter {
-                                                    kinds: kinds.clone(),
-                                                    authors: authors.to_vec(),
-                                                    ..Default::default()
-                                                },
-                                                Filter {
-                                                    kinds: kinds.clone(),
-                                                    tags: vec![(
-                                                        "p".to_owned(),
-                                                        authors
-                                                            .iter()
-                                                            .map(|id| TagValue::Id(id.clone()))
-                                                            .collect::<HashSet<TagValue>>(),
-                                                    )]
-                                                    .into_iter()
-                                                    .collect(),
-                                                    ..Default::default()
-                                                },
-                                            ]
-                                            .into();
-                                            to_remove.insert(subscribe.subscription_id.clone());
-
-                                            let _ = local_connection
-                                                .send(Request::Request(subscribe))
-                                                .await;
+                                        log::info!("found {} authors", ids.len());
+
+                                        for pub_key in ids {
+                                            let _ =
+                                                self.accounts.follows_to(&current_user, pub_key);
                                         }
+
+                                        let _ = self
+                                            .accounts
+                                            .create_subscriptions(&local_connection)
+                                            .await;
                                     }
                                     Content::ShortTextNote(_) => {}
                                     _ => {}

+ 5 - 0
crates/relayer/src/connection/local.rs

@@ -58,6 +58,11 @@ where
             .await
             .map_err(|e| Error::LocalSendError(Box::new(e)))
     }
+
+    /// Get the sender
+    pub fn sender(&self) -> Sender<(ConnectionId, Request)> {
+        self.sender.clone()
+    }
 }
 
 impl<T> Drop for LocalConnection<T>

+ 1 - 0
crates/relayer/src/connection/mod.rs

@@ -23,6 +23,7 @@ use tokio::{
 use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
+/// Connection Id type
 pub struct ConnectionId(usize);
 
 mod local;

+ 1 - 1
crates/relayer/src/lib.rs

@@ -15,7 +15,7 @@ mod error;
 mod relayer;
 
 pub use self::{
-    connection::{Connection, LocalConnection},
+    connection::{Connection, ConnectionId, LocalConnection},
     error::Error,
     relayer::Relayer,
 };

+ 1 - 1
crates/types/src/types/id.rs

@@ -16,7 +16,7 @@ use std::{
 /// Event Id
 ///
 /// Event Id are raw 32 bytes and 64-character length hex encoded to JSON
-#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
+#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Default)]
 pub struct Id(pub [u8; 32]);
 
 impl TryFrom<&str> for Id {