Cesar Rodas пре 1 месец
родитељ
комит
bc758e52f4
3 измењених фајлова са 52 додато и 148 уклоњено
  1. 42 146
      crates/personal-relayer/src/lib.rs
  2. 1 1
      crates/relayer/src/connection/local.rs
  3. 9 1
      crates/relayer/src/relayer.rs

+ 42 - 146
crates/personal-relayer/src/lib.rs

@@ -1,18 +1,15 @@
 use account::GraphManager;
 use futures::future::join_all;
 use nostr_rs_client::Pool;
-use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
+use nostr_rs_relayer::Relayer;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{
-    client::{self, Subscribe},
     types::{
-        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
-        Kind, SubscriptionId,
-    },
-    Request, Response,
+        tag::TagType, Content, Id,
+    }, Response,
 };
-use std::{collections::HashSet, ops::Deref, time::Duration};
-use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
+use std::{collections::HashSet, ops::Deref};
+use tokio::{net::TcpListener, task::JoinHandle};
 use url::Url;
 
 mod account;
@@ -52,101 +49,6 @@ pub enum Error {
     Client(#[from] nostr_rs_client::Error),
 }
 
-#[derive(Debug, Clone, Default)]
-pub struct Account {
-    pub pub_key: Id,
-    pub degree: usize,
-    pub added_by: Option<Id>,
-    pub profile: Option<Profile>,
-    pub followed_by: HashSet<Id>,
-    pub following: HashSet<Id>,
-    pub content: Vec<Event>,
-    subscribed: Option<(
-        Sender<(ConnectionId, Request)>,
-        ConnectionId,
-        SubscriptionId,
-    )>,
-}
-
-impl Account {
-    pub fn new(pub_key: Id) -> Self {
-        Self {
-            pub_key,
-            degree: 0,
-            added_by: Default::default(),
-            profile: Default::default(),
-            followed_by: Default::default(),
-            following: Default::default(),
-            content: Default::default(),
-            subscribed: Default::default(),
-        }
-    }
-
-    pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
-    where
-        T: Storage + Send + Sync + 'static,
-    {
-        let request = Subscribe {
-            subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
-            filters: vec![
-                Filter {
-                    authors: vec![self.pub_key.clone()],
-                    kinds: vec![
-                        Kind::Contacts,
-                        Kind::Metadata,
-                        Kind::MuteList,
-                        Kind::Followset,
-                    ],
-                    ..Default::default()
-                },
-                Filter {
-                    authors: vec![self.pub_key.clone()],
-                    ..Default::default()
-                },
-                Filter {
-                    tags: vec![(
-                        "p".to_owned(),
-                        HashSet::from([TagValue::Id(self.pub_key.clone())]),
-                    )]
-                    .into_iter()
-                    .collect(),
-                    ..Default::default()
-                },
-            ],
-        };
-
-        self.subscribed = Some((
-            conn.sender(),
-            conn.conn_id.clone(),
-            request.subscription_id.clone(),
-        ));
-        conn.send(Request::Request(request)).await?;
-        Ok(())
-    }
-
-    pub fn add_contact(&self, pub_key: Id) -> Self {
-        Self {
-            pub_key,
-            degree: self.degree + 1,
-            added_by: Some(self.pub_key.clone()),
-            profile: Default::default(),
-            followed_by: Default::default(),
-            following: Default::default(),
-            content: Default::default(),
-            subscribed: Default::default(),
-        }
-    }
-}
-
-impl Drop for Account {
-    fn drop(&mut self) {
-        if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
-            conn.try_send((conn_id, Request::Close(sub_id.into())))
-                .expect("unsubscription");
-        }
-    }
-}
-
 pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
     relayer: Relayer<T>,
     accounts: GraphManager,
@@ -176,58 +78,52 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
 
                 loop {
                     while let Some(res) = local_connection.recv().await {
-                        match res {
-                            Response::Event(event) => {
-                                let current_user: Id =
-                                    if let Ok(id) = event.subscription_id.deref().try_into() {
-                                        id
-                                    } else {
-                                        continue;
-                                    };
-
-                                match event.content() {
-                                    Content::Metadata(_profile) => {}
-                                    Content::Contacts(_) => {
-                                        let mut ids = vec![];
-
-                                        for tag in event.tags() {
-                                            if let TagType::PubKey(pub_key, relayer_url, _) =
-                                                tag.deref()
-                                            {
-                                                if let Some(_relayer_url) = relayer_url {
-                                                    //let _ = relayer
-                                                    //    .connect_to_relayer(relayer_url.clone())
-                                                    //    .await;
-                                                }
-
-                                                if !already_subscribed.contains(pub_key) {
-                                                    ids.push(pub_key.clone());
-                                                    already_subscribed.insert(pub_key.clone());
-                                                }
+                        if let Response::Event(event) = res {
+                            let current_user: Id =
+                                if let Ok(id) = event.subscription_id.deref().try_into() {
+                                    id
+                                } else {
+                                    continue;
+                                };
+
+                            match event.content() {
+                                Content::Metadata(_profile) => {}
+                                Content::Contacts(_) => {
+                                    let mut ids = vec![];
+
+                                    for tag in event.tags() {
+                                        if let TagType::PubKey(pub_key, relayer_url, _) =
+                                            tag.deref()
+                                        {
+                                            if let Some(_relayer_url) = relayer_url {
+                                                //let _ = relayer
+                                                //    .connect_to_relayer(relayer_url.clone())
+                                                //    .await;
                                             }
-                                        }
 
-                                        if ids.is_empty() {
-                                            continue;
+                                            if !already_subscribed.contains(pub_key) {
+                                                ids.push(pub_key.clone());
+                                                already_subscribed.insert(pub_key.clone());
+                                            }
                                         }
+                                    }
 
-                                        log::info!("found {} authors", ids.len());
+                                    if ids.is_empty() {
+                                        continue;
+                                    }
 
-                                        for pub_key in ids {
-                                            let _ =
-                                                self.accounts.follows_to(&current_user, pub_key);
-                                        }
+                                    log::info!("found {} authors", ids.len());
 
-                                        let _ = self
-                                            .accounts
-                                            .create_subscriptions(&local_connection)
-                                            .await;
+                                    for pub_key in ids {
+                                        let _ = self.accounts.follows_to(&current_user, pub_key);
                                     }
-                                    Content::ShortTextNote(_) => {}
-                                    _ => {}
+
+                                    let _ =
+                                        self.accounts.create_subscriptions(&local_connection).await;
                                 }
+                                Content::ShortTextNote(_) => {}
+                                _ => {}
                             }
-                            _ => {}
                         }
                     }
                 }

+ 1 - 1
crates/relayer/src/connection/local.rs

@@ -44,7 +44,7 @@ where
     /// Queues sending a message to the relayer in the future time
     pub fn future_send(&self, request: Request, in_the_future: Duration) -> JoinHandle<()> {
         let sender = self.sender.clone();
-        let conn_id = self.conn_id.clone();
+        let conn_id = self.conn_id;
         tokio::spawn(async move {
             sleep(in_the_future).await;
             let _ = sender.send((conn_id, request)).await;

+ 9 - 1
crates/relayer/src/relayer.rs

@@ -95,7 +95,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             relayer_receiver: Some(relayer_receiver),
             connections: Default::default(),
             client_pool_receiver: Some(client_pool_receiver),
-            client_pool: client_pool,
+            client_pool,
             client_pool_subscriptions: Default::default(),
         })
     }
@@ -399,10 +399,18 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             }
         }
 
+        let mut sent = HashSet::new();
+
         let connections = self.connections.read().await;
         for RelayerSubscriptionId((sub_id, conn_id)) in
             self.subscription_manager.get_subscribers(event).await
         {
+            if sent.contains(&conn_id) {
+                continue;
+            }
+
+            sent.insert(conn_id);
+
             if let Some(connection) = connections.get(&conn_id) {
                 let _ = connection.send(
                     relayer::Event {