|  | @@ -1,11 +1,22 @@
 | 
											
												
													
														|  | 
 |  | +use account::GraphManager;
 | 
											
												
													
														|  |  use futures::future::join_all;
 |  |  use futures::future::join_all;
 | 
											
												
													
														|  |  use nostr_rs_client::Pool;
 |  |  use nostr_rs_client::Pool;
 | 
											
												
													
														|  | -use nostr_rs_relayer::Relayer;
 |  | 
 | 
											
												
													
														|  | 
 |  | +use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
 | 
											
												
													
														|  |  use nostr_rs_storage_base::Storage;
 |  |  use nostr_rs_storage_base::Storage;
 | 
											
												
													
														|  | -use nostr_rs_types::types::{Addr, Filter, Id};
 |  | 
 | 
											
												
													
														|  | -use tokio::{net::TcpListener, task::JoinHandle};
 |  | 
 | 
											
												
													
														|  | 
 |  | +use nostr_rs_types::{
 | 
											
												
													
														|  | 
 |  | +    client::{self, Subscribe},
 | 
											
												
													
														|  | 
 |  | +    types::{
 | 
											
												
													
														|  | 
 |  | +        content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
 | 
											
												
													
														|  | 
 |  | +        Kind, SubscriptionId,
 | 
											
												
													
														|  | 
 |  | +    },
 | 
											
												
													
														|  | 
 |  | +    Request, Response,
 | 
											
												
													
														|  | 
 |  | +};
 | 
											
												
													
														|  | 
 |  | +use std::{collections::HashSet, ops::Deref, time::Duration};
 | 
											
												
													
														|  | 
 |  | +use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
 | 
											
												
													
														|  |  use url::Url;
 |  |  use url::Url;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +mod account;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |  pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 |  |  pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  impl From<Vec<JoinHandle<()>>> for Stoppable {
 |  |  impl From<Vec<JoinHandle<()>>> for Stoppable {
 | 
											
										
											
												
													
														|  | @@ -24,6 +35,14 @@ impl Drop for Stoppable {
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +impl Stoppable {
 | 
											
												
													
														|  | 
 |  | +    pub async fn wait(mut self) {
 | 
											
												
													
														|  | 
 |  | +        if let Some(tasks) = self.0.take() {
 | 
											
												
													
														|  | 
 |  | +            join_all(tasks).await;
 | 
											
												
													
														|  | 
 |  | +        }
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +}
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |  #[derive(thiserror::Error, Debug)]
 |  |  #[derive(thiserror::Error, Debug)]
 | 
											
												
													
														|  |  pub enum Error {
 |  |  pub enum Error {
 | 
											
												
													
														|  |      #[error("Relayer: {0}")]
 |  |      #[error("Relayer: {0}")]
 | 
											
										
											
												
													
														|  | @@ -33,42 +52,194 @@ pub enum Error {
 | 
											
												
													
														|  |      Client(#[from] nostr_rs_client::Error),
 |  |      Client(#[from] nostr_rs_client::Error),
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +#[derive(Debug, Clone, Default)]
 | 
											
												
													
														|  | 
 |  | +pub struct Account {
 | 
											
												
													
														|  | 
 |  | +    pub pub_key: Id,
 | 
											
												
													
														|  | 
 |  | +    pub degree: usize,
 | 
											
												
													
														|  | 
 |  | +    pub added_by: Option<Id>,
 | 
											
												
													
														|  | 
 |  | +    pub profile: Option<Profile>,
 | 
											
												
													
														|  | 
 |  | +    pub followed_by: HashSet<Id>,
 | 
											
												
													
														|  | 
 |  | +    pub following: HashSet<Id>,
 | 
											
												
													
														|  | 
 |  | +    pub content: Vec<Event>,
 | 
											
												
													
														|  | 
 |  | +    subscribed: Option<(
 | 
											
												
													
														|  | 
 |  | +        Sender<(ConnectionId, Request)>,
 | 
											
												
													
														|  | 
 |  | +        ConnectionId,
 | 
											
												
													
														|  | 
 |  | +        SubscriptionId,
 | 
											
												
													
														|  | 
 |  | +    )>,
 | 
											
												
													
														|  | 
 |  | +}
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +impl Account {
 | 
											
												
													
														|  | 
 |  | +    pub fn new(pub_key: Id) -> Self {
 | 
											
												
													
														|  | 
 |  | +        Self {
 | 
											
												
													
														|  | 
 |  | +            pub_key,
 | 
											
												
													
														|  | 
 |  | +            degree: 0,
 | 
											
												
													
														|  | 
 |  | +            added_by: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            profile: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            followed_by: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            following: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            content: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            subscribed: Default::default(),
 | 
											
												
													
														|  | 
 |  | +        }
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
 | 
											
												
													
														|  | 
 |  | +    where
 | 
											
												
													
														|  | 
 |  | +        T: Storage + Send + Sync + 'static,
 | 
											
												
													
														|  | 
 |  | +    {
 | 
											
												
													
														|  | 
 |  | +        let request = Subscribe {
 | 
											
												
													
														|  | 
 |  | +            subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
 | 
											
												
													
														|  | 
 |  | +            filters: vec![
 | 
											
												
													
														|  | 
 |  | +                Filter {
 | 
											
												
													
														|  | 
 |  | +                    authors: vec![self.pub_key.clone()],
 | 
											
												
													
														|  | 
 |  | +                    kinds: vec![
 | 
											
												
													
														|  | 
 |  | +                        Kind::Contacts,
 | 
											
												
													
														|  | 
 |  | +                        Kind::Metadata,
 | 
											
												
													
														|  | 
 |  | +                        Kind::MuteList,
 | 
											
												
													
														|  | 
 |  | +                        Kind::Followset,
 | 
											
												
													
														|  | 
 |  | +                    ],
 | 
											
												
													
														|  | 
 |  | +                    ..Default::default()
 | 
											
												
													
														|  | 
 |  | +                },
 | 
											
												
													
														|  | 
 |  | +                Filter {
 | 
											
												
													
														|  | 
 |  | +                    authors: vec![self.pub_key.clone()],
 | 
											
												
													
														|  | 
 |  | +                    ..Default::default()
 | 
											
												
													
														|  | 
 |  | +                },
 | 
											
												
													
														|  | 
 |  | +                Filter {
 | 
											
												
													
														|  | 
 |  | +                    tags: vec![(
 | 
											
												
													
														|  | 
 |  | +                        "p".to_owned(),
 | 
											
												
													
														|  | 
 |  | +                        HashSet::from([TagValue::Id(self.pub_key.clone())]),
 | 
											
												
													
														|  | 
 |  | +                    )]
 | 
											
												
													
														|  | 
 |  | +                    .into_iter()
 | 
											
												
													
														|  | 
 |  | +                    .collect(),
 | 
											
												
													
														|  | 
 |  | +                    ..Default::default()
 | 
											
												
													
														|  | 
 |  | +                },
 | 
											
												
													
														|  | 
 |  | +            ],
 | 
											
												
													
														|  | 
 |  | +        };
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +        self.subscribed = Some((
 | 
											
												
													
														|  | 
 |  | +            conn.sender(),
 | 
											
												
													
														|  | 
 |  | +            conn.conn_id.clone(),
 | 
											
												
													
														|  | 
 |  | +            request.subscription_id.clone(),
 | 
											
												
													
														|  | 
 |  | +        ));
 | 
											
												
													
														|  | 
 |  | +        conn.send(Request::Request(request)).await?;
 | 
											
												
													
														|  | 
 |  | +        Ok(())
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    pub fn add_contact(&self, pub_key: Id) -> Self {
 | 
											
												
													
														|  | 
 |  | +        Self {
 | 
											
												
													
														|  | 
 |  | +            pub_key,
 | 
											
												
													
														|  | 
 |  | +            degree: self.degree + 1,
 | 
											
												
													
														|  | 
 |  | +            added_by: Some(self.pub_key.clone()),
 | 
											
												
													
														|  | 
 |  | +            profile: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            followed_by: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            following: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            content: Default::default(),
 | 
											
												
													
														|  | 
 |  | +            subscribed: Default::default(),
 | 
											
												
													
														|  | 
 |  | +        }
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +}
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +impl Drop for Account {
 | 
											
												
													
														|  | 
 |  | +    fn drop(&mut self) {
 | 
											
												
													
														|  | 
 |  | +        if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
 | 
											
												
													
														|  | 
 |  | +            conn.try_send((conn_id, Request::Close(sub_id.into())))
 | 
											
												
													
														|  | 
 |  | +                .expect("unsubscription");
 | 
											
												
													
														|  | 
 |  | +        }
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +}
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  |  pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
 |  |  pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
 | 
											
												
													
														|  |      relayer: Relayer<T>,
 |  |      relayer: Relayer<T>,
 | 
											
												
													
														|  | -    accounts: Vec<Id>,
 |  | 
 | 
											
												
													
														|  | 
 |  | +    accounts: GraphManager,
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
 |  |  impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
 | 
											
												
													
														|  |      pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
 |  |      pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
 | 
											
												
													
														|  | -        let pool = Pool::new_with_clients(client_urls);
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -        join_all(
 |  | 
 | 
											
												
													
														|  | -            accounts
 |  | 
 | 
											
												
													
														|  | -                .iter()
 |  | 
 | 
											
												
													
														|  | -                .map(|account| {
 |  | 
 | 
											
												
													
														|  | -                    pool.subscribe(
 |  | 
 | 
											
												
													
														|  | -                        Filter {
 |  | 
 | 
											
												
													
														|  | -                            authors: vec![account.clone()],
 |  | 
 | 
											
												
													
														|  | -                            ..Default::default()
 |  | 
 | 
											
												
													
														|  | -                        }
 |  | 
 | 
											
												
													
														|  | -                        .into(),
 |  | 
 | 
											
												
													
														|  | -                    )
 |  | 
 | 
											
												
													
														|  | -                })
 |  | 
 | 
											
												
													
														|  | -                .collect::<Vec<_>>(),
 |  | 
 | 
											
												
													
														|  | -        )
 |  | 
 | 
											
												
													
														|  | -        .await
 |  | 
 | 
											
												
													
														|  | -        .into_iter()
 |  | 
 | 
											
												
													
														|  | -        .collect::<Result<Vec<_>, _>>()?;
 |  | 
 | 
											
												
													
														|  | 
 |  | +        let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
 | 
											
												
													
														|  | 
 |  | +        let relayer = Relayer::new(Some(storage), Some(pool))?;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          Ok(Self {
 |  |          Ok(Self {
 | 
											
												
													
														|  | -            relayer: Relayer::new(Some(storage), Some(pool))?,
 |  | 
 | 
											
												
													
														|  | -            accounts,
 |  | 
 | 
											
												
													
														|  | 
 |  | +            relayer,
 | 
											
												
													
														|  | 
 |  | +            accounts: GraphManager::new(2, accounts),
 | 
											
												
													
														|  |          })
 |  |          })
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -    pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
 |  | 
 | 
											
												
													
														|  | -        let (relayer, handle) = self.relayer.main(server)?;
 |  | 
 | 
											
												
													
														|  | -        let tasks = vec![handle, tokio::spawn(async move {})];
 |  | 
 | 
											
												
													
														|  | 
 |  | +    pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
 | 
											
												
													
														|  | 
 |  | +        let (relayer, relayer_handler) = self.relayer.main(server)?;
 | 
											
												
													
														|  | 
 |  | +        let kinds = vec![
 | 
											
												
													
														|  | 
 |  | +            Kind::Contacts,
 | 
											
												
													
														|  | 
 |  | +            Kind::Metadata,
 | 
											
												
													
														|  | 
 |  | +            Kind::MuteList,
 | 
											
												
													
														|  | 
 |  | +            Kind::Followset,
 | 
											
												
													
														|  | 
 |  | +        ];
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +        let tasks = vec![
 | 
											
												
													
														|  | 
 |  | +            relayer_handler,
 | 
											
												
													
														|  | 
 |  | +            tokio::spawn(async move {
 | 
											
												
													
														|  | 
 |  | +                let mut local_connection = relayer.create_new_local_connection().await;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                let _ = self.accounts.create_subscriptions(&local_connection).await;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                let mut already_subscribed = HashSet::new();
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                loop {
 | 
											
												
													
														|  | 
 |  | +                    while let Some(res) = local_connection.recv().await {
 | 
											
												
													
														|  | 
 |  | +                        match res {
 | 
											
												
													
														|  | 
 |  | +                            Response::Event(event) => {
 | 
											
												
													
														|  | 
 |  | +                                let current_user: Id =
 | 
											
												
													
														|  | 
 |  | +                                    if let Ok(id) = event.subscription_id.deref().try_into() {
 | 
											
												
													
														|  | 
 |  | +                                        id
 | 
											
												
													
														|  | 
 |  | +                                    } else {
 | 
											
												
													
														|  | 
 |  | +                                        continue;
 | 
											
												
													
														|  | 
 |  | +                                    };
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                                match event.content() {
 | 
											
												
													
														|  | 
 |  | +                                    Content::Metadata(_profile) => {}
 | 
											
												
													
														|  | 
 |  | +                                    Content::Contacts(_) => {
 | 
											
												
													
														|  | 
 |  | +                                        let mut ids = vec![];
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                                        for tag in event.tags() {
 | 
											
												
													
														|  | 
 |  | +                                            if let TagType::PubKey(pub_key, relayer_url, _) =
 | 
											
												
													
														|  | 
 |  | +                                                tag.deref()
 | 
											
												
													
														|  | 
 |  | +                                            {
 | 
											
												
													
														|  | 
 |  | +                                                if let Some(_relayer_url) = relayer_url {
 | 
											
												
													
														|  | 
 |  | +                                                    //let _ = relayer
 | 
											
												
													
														|  | 
 |  | +                                                    //    .connect_to_relayer(relayer_url.clone())
 | 
											
												
													
														|  | 
 |  | +                                                    //    .await;
 | 
											
												
													
														|  | 
 |  | +                                                }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                                                if !already_subscribed.contains(pub_key) {
 | 
											
												
													
														|  | 
 |  | +                                                    ids.push(pub_key.clone());
 | 
											
												
													
														|  | 
 |  | +                                                    already_subscribed.insert(pub_key.clone());
 | 
											
												
													
														|  | 
 |  | +                                                }
 | 
											
												
													
														|  | 
 |  | +                                            }
 | 
											
												
													
														|  | 
 |  | +                                        }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                                        if ids.is_empty() {
 | 
											
												
													
														|  | 
 |  | +                                            continue;
 | 
											
												
													
														|  | 
 |  | +                                        }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                                        log::info!("found {} authors", ids.len());
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                                        for pub_key in ids {
 | 
											
												
													
														|  | 
 |  | +                                            let _ =
 | 
											
												
													
														|  | 
 |  | +                                                self.accounts.follows_to(¤t_user, pub_key);
 | 
											
												
													
														|  | 
 |  | +                                        }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +                                        let _ = self
 | 
											
												
													
														|  | 
 |  | +                                            .accounts
 | 
											
												
													
														|  | 
 |  | +                                            .create_subscriptions(&local_connection)
 | 
											
												
													
														|  | 
 |  | +                                            .await;
 | 
											
												
													
														|  | 
 |  | +                                    }
 | 
											
												
													
														|  | 
 |  | +                                    Content::ShortTextNote(_) => {}
 | 
											
												
													
														|  | 
 |  | +                                    _ => {}
 | 
											
												
													
														|  | 
 |  | +                                }
 | 
											
												
													
														|  | 
 |  | +                            }
 | 
											
												
													
														|  | 
 |  | +                            _ => {}
 | 
											
												
													
														|  | 
 |  | +                        }
 | 
											
												
													
														|  | 
 |  | +                    }
 | 
											
												
													
														|  | 
 |  | +                }
 | 
											
												
													
														|  | 
 |  | +            }),
 | 
											
												
													
														|  | 
 |  | +        ];
 | 
											
												
													
														|  |          Ok(tasks.into())
 |  |          Ok(tasks.into())
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  }
 |  |  }
 |