123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- use account::GraphManager;
- use futures::future::join_all;
- use nostr_rs_client::Pool;
- use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
- use nostr_rs_storage_base::Storage;
- use nostr_rs_types::{
- client::{self, Subscribe},
- types::{
- content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
- Kind, SubscriptionId,
- },
- Request, Response,
- };
- use std::{collections::HashSet, ops::Deref, time::Duration};
- use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
- use url::Url;
- mod account;
- pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
- impl From<Vec<JoinHandle<()>>> for Stoppable {
- fn from(value: Vec<JoinHandle<()>>) -> Self {
- Self(Some(value))
- }
- }
- impl Drop for Stoppable {
- fn drop(&mut self) {
- if let Some(tasks) = self.0.take() {
- for join_handle in tasks.into_iter() {
- join_handle.abort();
- }
- }
- }
- }
- impl Stoppable {
- pub async fn wait(mut self) {
- if let Some(tasks) = self.0.take() {
- join_all(tasks).await;
- }
- }
- }
- #[derive(thiserror::Error, Debug)]
- pub enum Error {
- #[error("Relayer: {0}")]
- Relayer(#[from] nostr_rs_relayer::Error),
- #[error("Client error: {0}")]
- Client(#[from] nostr_rs_client::Error),
- }
- #[derive(Debug, Clone, Default)]
- pub struct Account {
- pub pub_key: Id,
- pub degree: usize,
- pub added_by: Option<Id>,
- pub profile: Option<Profile>,
- pub followed_by: HashSet<Id>,
- pub following: HashSet<Id>,
- pub content: Vec<Event>,
- subscribed: Option<(
- Sender<(ConnectionId, Request)>,
- ConnectionId,
- SubscriptionId,
- )>,
- }
- impl Account {
- pub fn new(pub_key: Id) -> Self {
- Self {
- pub_key,
- degree: 0,
- added_by: Default::default(),
- profile: Default::default(),
- followed_by: Default::default(),
- following: Default::default(),
- content: Default::default(),
- subscribed: Default::default(),
- }
- }
- pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
- where
- T: Storage + Send + Sync + 'static,
- {
- let request = Subscribe {
- subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
- filters: vec![
- Filter {
- authors: vec![self.pub_key.clone()],
- kinds: vec![
- Kind::Contacts,
- Kind::Metadata,
- Kind::MuteList,
- Kind::Followset,
- ],
- ..Default::default()
- },
- Filter {
- authors: vec![self.pub_key.clone()],
- ..Default::default()
- },
- Filter {
- tags: vec![(
- "p".to_owned(),
- HashSet::from([TagValue::Id(self.pub_key.clone())]),
- )]
- .into_iter()
- .collect(),
- ..Default::default()
- },
- ],
- };
- self.subscribed = Some((
- conn.sender(),
- conn.conn_id.clone(),
- request.subscription_id.clone(),
- ));
- conn.send(Request::Request(request)).await?;
- Ok(())
- }
- pub fn add_contact(&self, pub_key: Id) -> Self {
- Self {
- pub_key,
- degree: self.degree + 1,
- added_by: Some(self.pub_key.clone()),
- profile: Default::default(),
- followed_by: Default::default(),
- following: Default::default(),
- content: Default::default(),
- subscribed: Default::default(),
- }
- }
- }
- impl Drop for Account {
- fn drop(&mut self) {
- if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
- conn.try_send((conn_id, Request::Close(sub_id.into())))
- .expect("unsubscription");
- }
- }
- }
- pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
- relayer: Relayer<T>,
- accounts: GraphManager,
- }
- impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
- pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
- let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
- let relayer = Relayer::new(Some(storage), Some(pool))?;
- Ok(Self {
- relayer,
- accounts: GraphManager::new(2, accounts),
- })
- }
- pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
- let (relayer, relayer_handler) = self.relayer.main(server)?;
- let tasks = vec![
- relayer_handler,
- tokio::spawn(async move {
- let mut local_connection = relayer.create_new_local_connection().await;
- let _ = self.accounts.create_subscriptions(&local_connection).await;
- let mut already_subscribed = HashSet::new();
- loop {
- while let Some(res) = local_connection.recv().await {
- match res {
- Response::Event(event) => {
- let current_user: Id =
- if let Ok(id) = event.subscription_id.deref().try_into() {
- id
- } else {
- continue;
- };
- match event.content() {
- Content::Metadata(_profile) => {}
- Content::Contacts(_) => {
- let mut ids = vec![];
- for tag in event.tags() {
- if let TagType::PubKey(pub_key, relayer_url, _) =
- tag.deref()
- {
- if let Some(_relayer_url) = relayer_url {
- //let _ = relayer
- // .connect_to_relayer(relayer_url.clone())
- // .await;
- }
- if !already_subscribed.contains(pub_key) {
- ids.push(pub_key.clone());
- already_subscribed.insert(pub_key.clone());
- }
- }
- }
- if ids.is_empty() {
- continue;
- }
- log::info!("found {} authors", ids.len());
- for pub_key in ids {
- let _ =
- self.accounts.follows_to(¤t_user, pub_key);
- }
- let _ = self
- .accounts
- .create_subscriptions(&local_connection)
- .await;
- }
- Content::ShortTextNote(_) => {}
- _ => {}
- }
- }
- _ => {}
- }
- }
- }
- }),
- ];
- Ok(tasks.into())
- }
- }
|