use account::GraphManager; use futures::future::join_all; use nostr_rs_client::Pool; use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer}; use nostr_rs_storage_base::Storage; use nostr_rs_types::{ client::{self, Subscribe}, types::{ content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind, SubscriptionId, }, Request, Response, }; use std::{collections::HashSet, ops::Deref, time::Duration}; use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle}; use url::Url; mod account; pub struct Stoppable(Option>>); impl From>> for Stoppable { fn from(value: Vec>) -> Self { Self(Some(value)) } } impl Drop for Stoppable { fn drop(&mut self) { if let Some(tasks) = self.0.take() { for join_handle in tasks.into_iter() { join_handle.abort(); } } } } impl Stoppable { pub async fn wait(mut self) { if let Some(tasks) = self.0.take() { join_all(tasks).await; } } } #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Relayer: {0}")] Relayer(#[from] nostr_rs_relayer::Error), #[error("Client error: {0}")] Client(#[from] nostr_rs_client::Error), } #[derive(Debug, Clone, Default)] pub struct Account { pub pub_key: Id, pub degree: usize, pub added_by: Option, pub profile: Option, pub followed_by: HashSet, pub following: HashSet, pub content: Vec, subscribed: Option<( Sender<(ConnectionId, Request)>, ConnectionId, SubscriptionId, )>, } impl Account { pub fn new(pub_key: Id) -> Self { Self { pub_key, degree: 0, added_by: Default::default(), profile: Default::default(), followed_by: Default::default(), following: Default::default(), content: Default::default(), subscribed: Default::default(), } } pub async fn subscribe(&mut self, conn: &LocalConnection) -> Result<(), Error> where T: Storage + Send + Sync + 'static, { let request = Subscribe { subscription_id: self.pub_key.to_string().try_into().expect("valid id"), filters: vec![ Filter { authors: vec![self.pub_key.clone()], kinds: vec![ Kind::Contacts, Kind::Metadata, Kind::MuteList, Kind::Followset, ], ..Default::default() }, Filter { authors: vec![self.pub_key.clone()], ..Default::default() }, Filter { tags: vec![( "p".to_owned(), HashSet::from([TagValue::Id(self.pub_key.clone())]), )] .into_iter() .collect(), ..Default::default() }, ], }; self.subscribed = Some(( conn.sender(), conn.conn_id.clone(), request.subscription_id.clone(), )); conn.send(Request::Request(request)).await?; Ok(()) } pub fn add_contact(&self, pub_key: Id) -> Self { Self { pub_key, degree: self.degree + 1, added_by: Some(self.pub_key.clone()), profile: Default::default(), followed_by: Default::default(), following: Default::default(), content: Default::default(), subscribed: Default::default(), } } } impl Drop for Account { fn drop(&mut self) { if let Some((conn, conn_id, sub_id)) = self.subscribed.take() { conn.try_send((conn_id, Request::Close(sub_id.into()))) .expect("unsubscription"); } } } pub struct PersonalRelayer { relayer: Relayer, accounts: GraphManager, } impl PersonalRelayer { pub async fn new(storage: T, accounts: Vec, client_urls: Vec) -> Result { let (pool, _active_clients) = Pool::new_with_clients(client_urls)?; let relayer = Relayer::new(Some(storage), Some(pool))?; Ok(Self { relayer, accounts: GraphManager::new(2, accounts), }) } pub async fn main(mut self, server: TcpListener) -> Result { let (relayer, relayer_handler) = self.relayer.main(server)?; let tasks = vec![ relayer_handler, tokio::spawn(async move { let mut local_connection = relayer.create_new_local_connection().await; let _ = self.accounts.create_subscriptions(&local_connection).await; let mut already_subscribed = HashSet::new(); loop { while let Some(res) = local_connection.recv().await { match res { Response::Event(event) => { let current_user: Id = if let Ok(id) = event.subscription_id.deref().try_into() { id } else { continue; }; match event.content() { Content::Metadata(_profile) => {} Content::Contacts(_) => { let mut ids = vec![]; for tag in event.tags() { if let TagType::PubKey(pub_key, relayer_url, _) = tag.deref() { if let Some(_relayer_url) = relayer_url { //let _ = relayer // .connect_to_relayer(relayer_url.clone()) // .await; } if !already_subscribed.contains(pub_key) { ids.push(pub_key.clone()); already_subscribed.insert(pub_key.clone()); } } } if ids.is_empty() { continue; } log::info!("found {} authors", ids.len()); for pub_key in ids { let _ = self.accounts.follows_to(¤t_user, pub_key); } let _ = self .accounts .create_subscriptions(&local_connection) .await; } Content::ShortTextNote(_) => {} _ => {} } } _ => {} } } } }), ]; Ok(tasks.into()) } }