|
@@ -1,11 +1,18 @@
|
|
|
|
+use account::GraphManager;
|
|
use futures::future::join_all;
|
|
use futures::future::join_all;
|
|
use nostr_rs_client::Pool;
|
|
use nostr_rs_client::Pool;
|
|
use nostr_rs_relayer::Relayer;
|
|
use nostr_rs_relayer::Relayer;
|
|
use nostr_rs_storage_base::Storage;
|
|
use nostr_rs_storage_base::Storage;
|
|
-use nostr_rs_types::types::{Addr, Filter, Id};
|
|
|
|
|
|
+use nostr_rs_types::{
|
|
|
|
+ types::{tag::TagType, Content, Id},
|
|
|
|
+ Response,
|
|
|
|
+};
|
|
|
|
+use std::{collections::HashSet, ops::Deref};
|
|
use tokio::{net::TcpListener, task::JoinHandle};
|
|
use tokio::{net::TcpListener, task::JoinHandle};
|
|
use url::Url;
|
|
use url::Url;
|
|
|
|
|
|
|
|
+mod account;
|
|
|
|
+
|
|
pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
|
|
pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
|
|
|
|
|
|
impl From<Vec<JoinHandle<()>>> for Stoppable {
|
|
impl From<Vec<JoinHandle<()>>> for Stoppable {
|
|
@@ -24,6 +31,14 @@ impl Drop for Stoppable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+impl Stoppable {
|
|
|
|
+ pub async fn wait(mut self) {
|
|
|
|
+ if let Some(tasks) = self.0.take() {
|
|
|
|
+ join_all(tasks).await;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
#[derive(thiserror::Error, Debug)]
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum Error {
|
|
pub enum Error {
|
|
#[error("Relayer: {0}")]
|
|
#[error("Relayer: {0}")]
|
|
@@ -35,40 +50,84 @@ pub enum Error {
|
|
|
|
|
|
pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
|
|
pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
|
|
relayer: Relayer<T>,
|
|
relayer: Relayer<T>,
|
|
- accounts: Vec<Id>,
|
|
|
|
|
|
+ accounts: GraphManager,
|
|
}
|
|
}
|
|
|
|
|
|
impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
|
|
pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
|
|
- let (pool, _) = Pool::new_with_clients(client_urls)?;
|
|
|
|
-
|
|
|
|
- join_all(
|
|
|
|
- accounts
|
|
|
|
- .iter()
|
|
|
|
- .map(|account| {
|
|
|
|
- pool.subscribe(
|
|
|
|
- Filter {
|
|
|
|
- authors: vec![account.clone()],
|
|
|
|
- ..Default::default()
|
|
|
|
- }
|
|
|
|
- .into(),
|
|
|
|
- )
|
|
|
|
- })
|
|
|
|
- .collect::<Vec<_>>(),
|
|
|
|
- )
|
|
|
|
- .await
|
|
|
|
- .into_iter()
|
|
|
|
- .collect::<Result<Vec<_>, _>>()?;
|
|
|
|
|
|
+ let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
|
|
|
|
+ let relayer = Relayer::new(Some(storage), Some(pool))?;
|
|
|
|
|
|
Ok(Self {
|
|
Ok(Self {
|
|
- relayer: Relayer::new(Some(storage), Some(pool))?,
|
|
|
|
- accounts,
|
|
|
|
|
|
+ relayer,
|
|
|
|
+ accounts: GraphManager::new(1, accounts),
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
|
|
|
|
- let (relayer, handle) = self.relayer.main(server)?;
|
|
|
|
- let tasks = vec![handle, tokio::spawn(async move {})];
|
|
|
|
|
|
+ pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
|
|
|
|
+ let (relayer, relayer_handler) = self.relayer.main(server)?;
|
|
|
|
+ let tasks = vec![
|
|
|
|
+ relayer_handler,
|
|
|
|
+ tokio::spawn(async move {
|
|
|
|
+ let mut local_connection = relayer.create_new_local_connection().await;
|
|
|
|
+
|
|
|
|
+ let _ = self.accounts.create_subscriptions(&local_connection).await;
|
|
|
|
+
|
|
|
|
+ let mut already_subscribed = HashSet::new();
|
|
|
|
+
|
|
|
|
+ loop {
|
|
|
|
+ while let Some(res) = local_connection.recv().await {
|
|
|
|
+ if let Response::Event(event) = res {
|
|
|
|
+ let current_user: Id =
|
|
|
|
+ if let Ok(id) = event.subscription_id.deref().try_into() {
|
|
|
|
+ id
|
|
|
|
+ } else {
|
|
|
|
+ continue;
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ match event.content() {
|
|
|
|
+ Content::Metadata(_profile) => {}
|
|
|
|
+ Content::Contacts(_) => {
|
|
|
|
+ let mut ids = vec![];
|
|
|
|
+
|
|
|
|
+ for tag in event.tags() {
|
|
|
|
+ if let TagType::PubKey(pub_key, relayer_url, _) =
|
|
|
|
+ tag.deref()
|
|
|
|
+ {
|
|
|
|
+ if let Some(_relayer_url) = relayer_url {
|
|
|
|
+ //let _ = relayer
|
|
|
|
+ // .connect_to_relayer(relayer_url.clone())
|
|
|
|
+ // .await;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if !already_subscribed.contains(pub_key) {
|
|
|
|
+ ids.push(pub_key.clone());
|
|
|
|
+ already_subscribed.insert(pub_key.clone());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ids.is_empty() {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log::info!("found {} authors", ids.len());
|
|
|
|
+
|
|
|
|
+ for pub_key in ids {
|
|
|
|
+ let _ = self.accounts.follows_to(¤t_user, pub_key);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ let _ =
|
|
|
|
+ self.accounts.create_subscriptions(&local_connection).await;
|
|
|
|
+ }
|
|
|
|
+ Content::ShortTextNote(_) => {}
|
|
|
|
+ _ => {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }),
|
|
|
|
+ ];
|
|
Ok(tasks.into())
|
|
Ok(tasks.into())
|
|
}
|
|
}
|
|
}
|
|
}
|