use futures::future::join_all; use nostr_rs_client::Pool; use nostr_rs_relayer::Relayer; use nostr_rs_storage_base::Storage; use nostr_rs_types::{ client, types::{ content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind, }, Request, Response, }; use serde::Serialize; use std::{collections::HashSet, ops::Deref, time::Duration}; use tokio::{net::TcpListener, task::JoinHandle}; use url::Url; pub struct Stoppable(Option>>); impl From>> for Stoppable { fn from(value: Vec>) -> Self { Self(Some(value)) } } impl Drop for Stoppable { fn drop(&mut self) { if let Some(tasks) = self.0.take() { for join_handle in tasks.into_iter() { join_handle.abort(); } } } } impl Stoppable { pub async fn wait(mut self) { if let Some(tasks) = self.0.take() { join_all(tasks).await; } } } #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Relayer: {0}")] Relayer(#[from] nostr_rs_relayer::Error), #[error("Client error: {0}")] Client(#[from] nostr_rs_client::Error), } #[derive(Debug, Clone, Serialize)] pub struct Contact { pub pub_key: Id, pub added_by: Option, pub profile: Option, pub followed_by: HashSet, pub following: HashSet, pub content: Vec, } impl Contact { pub fn new(pub_key: Id, added_by: Option) -> Self { Self { pub_key, profile: None, added_by, followed_by: HashSet::new(), following: HashSet::new(), content: Vec::new(), } } } pub struct PersonalRelayer { relayer: Relayer, accounts: Vec, } impl PersonalRelayer { pub async fn new(storage: T, accounts: Vec, client_urls: Vec) -> Result { let (pool, _active_clients) = Pool::new_with_clients(client_urls)?; let relayer = Relayer::new(Some(storage), Some(pool))?; Ok(Self { relayer, accounts }) } pub async fn main(self, server: TcpListener) -> Result { let (relayer, relayer_handler) = self.relayer.main(server)?; let kinds = vec![ Kind::Contacts, Kind::Metadata, Kind::MuteList, Kind::Followset, ]; let tasks = vec![ relayer_handler, tokio::spawn(async move { let mut local_connection = relayer.create_new_local_connection().await; local_connection .send(Request::Request( vec![ Filter { authors: self.accounts.clone(), kinds: kinds.clone(), ..Default::default() }, Filter { kinds: kinds.clone(), tags: vec![( "p".to_owned(), self.accounts .iter() .map(|id| TagValue::Id(id.clone())) .collect::>(), )] .into_iter() .collect(), ..Default::default() }, ] .into(), )) .await .expect("Failed to send request"); let mut already_subscribed = HashSet::new(); let mut to_remove = HashSet::new(); loop { while let Some(res) = local_connection.recv().await { match res { Response::EndOfStoredEvents(id) => { if to_remove.contains(&id.0) { let _ = local_connection.future_send( Request::Close(id.0.into()), Duration::from_secs(10), ); } } Response::Event(event) => { match event.content() { Content::Metadata(_profile) => {} Content::Contacts(_) => { let mut ids = vec![]; for tag in event.tags() { if let TagType::PubKey(pub_key, relayer_url, _) = tag.deref() { if let Some(_relayer_url) = relayer_url { //let _ = relayer // .connect_to_relayer(relayer_url.clone()) // .await; } if !already_subscribed.contains(pub_key) { ids.push(pub_key.clone()); already_subscribed.insert(pub_key.clone()); } } } if ids.len() > 0 { log::info!("found {} authors", ids.len()); } for authors in ids.chunks(20).collect::>().into_iter() { let subscribe: client::Subscribe = vec![ Filter { kinds: kinds.clone(), authors: authors.to_vec(), ..Default::default() }, Filter { kinds: kinds.clone(), tags: vec![( "p".to_owned(), authors .iter() .map(|id| TagValue::Id(id.clone())) .collect::>(), )] .into_iter() .collect(), ..Default::default() }, ] .into(); to_remove.insert(subscribe.subscription_id.clone()); let _ = local_connection .send(Request::Request(subscribe)) .await; } } Content::ShortTextNote(_) => {} _ => {} } } _ => {} } } } }), ]; Ok(tasks.into()) } }