|
@@ -2,7 +2,15 @@ use futures::future::join_all;
|
|
use nostr_rs_client::Pool;
|
|
use nostr_rs_client::Pool;
|
|
use nostr_rs_relayer::Relayer;
|
|
use nostr_rs_relayer::Relayer;
|
|
use nostr_rs_storage_base::Storage;
|
|
use nostr_rs_storage_base::Storage;
|
|
-use nostr_rs_types::types::{Addr, Filter, Id};
|
|
|
|
|
|
+use nostr_rs_types::{
|
|
|
|
+ client,
|
|
|
|
+ types::{
|
|
|
|
+ content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind,
|
|
|
|
+ },
|
|
|
|
+ Request, Response,
|
|
|
|
+};
|
|
|
|
+use serde::Serialize;
|
|
|
|
+use std::{collections::HashSet, ops::Deref, time::Duration};
|
|
use tokio::{net::TcpListener, task::JoinHandle};
|
|
use tokio::{net::TcpListener, task::JoinHandle};
|
|
use url::Url;
|
|
use url::Url;
|
|
|
|
|
|
@@ -24,6 +32,14 @@ impl Drop for Stoppable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+impl Stoppable {
|
|
|
|
+ pub async fn wait(mut self) {
|
|
|
|
+ if let Some(tasks) = self.0.take() {
|
|
|
|
+ join_all(tasks).await;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
#[derive(thiserror::Error, Debug)]
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum Error {
|
|
pub enum Error {
|
|
#[error("Relayer: {0}")]
|
|
#[error("Relayer: {0}")]
|
|
@@ -33,6 +49,29 @@ pub enum Error {
|
|
Client(#[from] nostr_rs_client::Error),
|
|
Client(#[from] nostr_rs_client::Error),
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+#[derive(Debug, Clone, Serialize)]
|
|
|
|
+pub struct Contact {
|
|
|
|
+ pub pub_key: Id,
|
|
|
|
+ pub added_by: Option<Id>,
|
|
|
|
+ pub profile: Option<Profile>,
|
|
|
|
+ pub followed_by: HashSet<Id>,
|
|
|
|
+ pub following: HashSet<Id>,
|
|
|
|
+ pub content: Vec<Event>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl Contact {
|
|
|
|
+ pub fn new(pub_key: Id, added_by: Option<Id>) -> Self {
|
|
|
|
+ Self {
|
|
|
|
+ pub_key,
|
|
|
|
+ profile: None,
|
|
|
|
+ added_by,
|
|
|
|
+ followed_by: HashSet::new(),
|
|
|
|
+ following: HashSet::new(),
|
|
|
|
+ content: Vec::new(),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
|
|
pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
|
|
relayer: Relayer<T>,
|
|
relayer: Relayer<T>,
|
|
accounts: Vec<Id>,
|
|
accounts: Vec<Id>,
|
|
@@ -40,35 +79,134 @@ pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
|
|
|
|
|
|
impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
|
|
pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
|
|
- let pool = Pool::new_with_clients(client_urls);
|
|
|
|
-
|
|
|
|
- join_all(
|
|
|
|
- accounts
|
|
|
|
- .iter()
|
|
|
|
- .map(|account| {
|
|
|
|
- pool.subscribe(
|
|
|
|
- Filter {
|
|
|
|
- authors: vec![account.clone()],
|
|
|
|
- ..Default::default()
|
|
|
|
- }
|
|
|
|
- .into(),
|
|
|
|
- )
|
|
|
|
- })
|
|
|
|
- .collect::<Vec<_>>(),
|
|
|
|
- )
|
|
|
|
- .await
|
|
|
|
- .into_iter()
|
|
|
|
- .collect::<Result<Vec<_>, _>>()?;
|
|
|
|
-
|
|
|
|
- Ok(Self {
|
|
|
|
- relayer: Relayer::new(Some(storage), Some(pool))?,
|
|
|
|
- accounts,
|
|
|
|
- })
|
|
|
|
|
|
+ let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
|
|
|
|
+ let relayer = Relayer::new(Some(storage), Some(pool))?;
|
|
|
|
+
|
|
|
|
+ Ok(Self { relayer, accounts })
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
|
|
|
|
- let (relayer, handle) = self.relayer.main(server)?;
|
|
|
|
- let tasks = vec![handle, tokio::spawn(async move {})];
|
|
|
|
|
|
+ pub async fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
|
|
|
|
+ let (relayer, relayer_handler) = self.relayer.main(server)?;
|
|
|
|
+ let kinds = vec![
|
|
|
|
+ Kind::Contacts,
|
|
|
|
+ Kind::Metadata,
|
|
|
|
+ Kind::MuteList,
|
|
|
|
+ Kind::Followset,
|
|
|
|
+ ];
|
|
|
|
+
|
|
|
|
+ let tasks = vec![
|
|
|
|
+ relayer_handler,
|
|
|
|
+ tokio::spawn(async move {
|
|
|
|
+ let mut local_connection = relayer.create_new_local_connection().await;
|
|
|
|
+ local_connection
|
|
|
|
+ .send(Request::Request(
|
|
|
|
+ vec![
|
|
|
|
+ Filter {
|
|
|
|
+ authors: self.accounts.clone(),
|
|
|
|
+ kinds: kinds.clone(),
|
|
|
|
+ ..Default::default()
|
|
|
|
+ },
|
|
|
|
+ Filter {
|
|
|
|
+ kinds: kinds.clone(),
|
|
|
|
+ tags: vec![(
|
|
|
|
+ "p".to_owned(),
|
|
|
|
+ self.accounts
|
|
|
|
+ .iter()
|
|
|
|
+ .map(|id| TagValue::Id(id.clone()))
|
|
|
|
+ .collect::<HashSet<TagValue>>(),
|
|
|
|
+ )]
|
|
|
|
+ .into_iter()
|
|
|
|
+ .collect(),
|
|
|
|
+ ..Default::default()
|
|
|
|
+ },
|
|
|
|
+ ]
|
|
|
|
+ .into(),
|
|
|
|
+ ))
|
|
|
|
+ .await
|
|
|
|
+ .expect("Failed to send request");
|
|
|
|
+
|
|
|
|
+ let mut already_subscribed = HashSet::new();
|
|
|
|
+ let mut to_remove = HashSet::new();
|
|
|
|
+
|
|
|
|
+ loop {
|
|
|
|
+ while let Some(res) = local_connection.recv().await {
|
|
|
|
+ match res {
|
|
|
|
+ Response::EndOfStoredEvents(id) => {
|
|
|
|
+ if to_remove.contains(&id.0) {
|
|
|
|
+ let _ = local_connection.future_send(
|
|
|
|
+ Request::Close(id.0.into()),
|
|
|
|
+ Duration::from_secs(10),
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ Response::Event(event) => {
|
|
|
|
+ match event.content() {
|
|
|
|
+ Content::Metadata(_profile) => {}
|
|
|
|
+ Content::Contacts(_) => {
|
|
|
|
+ let mut ids = vec![];
|
|
|
|
+
|
|
|
|
+ for tag in event.tags() {
|
|
|
|
+ if let TagType::PubKey(pub_key, relayer_url, _) =
|
|
|
|
+ tag.deref()
|
|
|
|
+ {
|
|
|
|
+ if let Some(_relayer_url) = relayer_url {
|
|
|
|
+ //let _ = relayer
|
|
|
|
+ // .connect_to_relayer(relayer_url.clone())
|
|
|
|
+ // .await;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if !already_subscribed.contains(pub_key) {
|
|
|
|
+ ids.push(pub_key.clone());
|
|
|
|
+ already_subscribed.insert(pub_key.clone());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ids.len() > 0 {
|
|
|
|
+ log::info!("found {} authors", ids.len());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for authors in
|
|
|
|
+ ids.chunks(20).collect::<Vec<_>>().into_iter()
|
|
|
|
+ {
|
|
|
|
+ let subscribe: client::Subscribe = vec![
|
|
|
|
+ Filter {
|
|
|
|
+ kinds: kinds.clone(),
|
|
|
|
+ authors: authors.to_vec(),
|
|
|
|
+ ..Default::default()
|
|
|
|
+ },
|
|
|
|
+ Filter {
|
|
|
|
+ kinds: kinds.clone(),
|
|
|
|
+ tags: vec![(
|
|
|
|
+ "p".to_owned(),
|
|
|
|
+ authors
|
|
|
|
+ .iter()
|
|
|
|
+ .map(|id| TagValue::Id(id.clone()))
|
|
|
|
+ .collect::<HashSet<TagValue>>(),
|
|
|
|
+ )]
|
|
|
|
+ .into_iter()
|
|
|
|
+ .collect(),
|
|
|
|
+ ..Default::default()
|
|
|
|
+ },
|
|
|
|
+ ]
|
|
|
|
+ .into();
|
|
|
|
+ to_remove.insert(subscribe.subscription_id.clone());
|
|
|
|
+
|
|
|
|
+ let _ = local_connection
|
|
|
|
+ .send(Request::Request(subscribe))
|
|
|
|
+ .await;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ Content::ShortTextNote(_) => {}
|
|
|
|
+ _ => {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ _ => {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }),
|
|
|
|
+ ];
|
|
Ok(tasks.into())
|
|
Ok(tasks.into())
|
|
}
|
|
}
|
|
}
|
|
}
|