|
@@ -1,19 +1,22 @@
|
|
|
+use account::GraphManager;
|
|
|
use futures::future::join_all;
|
|
|
use nostr_rs_client::Pool;
|
|
|
-use nostr_rs_relayer::Relayer;
|
|
|
+use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
|
|
|
use nostr_rs_storage_base::Storage;
|
|
|
use nostr_rs_types::{
|
|
|
- client,
|
|
|
+ client::{self, Subscribe},
|
|
|
types::{
|
|
|
- content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind,
|
|
|
+ content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
|
|
|
+ Kind, SubscriptionId,
|
|
|
},
|
|
|
Request, Response,
|
|
|
};
|
|
|
-use serde::Serialize;
|
|
|
use std::{collections::HashSet, ops::Deref, time::Duration};
|
|
|
-use tokio::{net::TcpListener, task::JoinHandle};
|
|
|
+use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
|
|
|
use url::Url;
|
|
|
|
|
|
+mod account;
|
|
|
+
|
|
|
pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
|
|
|
|
|
|
impl From<Vec<JoinHandle<()>>> for Stoppable {
|
|
@@ -49,32 +52,104 @@ pub enum Error {
|
|
|
Client(#[from] nostr_rs_client::Error),
|
|
|
}
|
|
|
|
|
|
-#[derive(Debug, Clone, Serialize)]
|
|
|
-pub struct Contact {
|
|
|
+#[derive(Debug, Clone, Default)]
|
|
|
+pub struct Account {
|
|
|
pub pub_key: Id,
|
|
|
+ pub degree: usize,
|
|
|
pub added_by: Option<Id>,
|
|
|
pub profile: Option<Profile>,
|
|
|
pub followed_by: HashSet<Id>,
|
|
|
pub following: HashSet<Id>,
|
|
|
pub content: Vec<Event>,
|
|
|
+ subscribed: Option<(
|
|
|
+ Sender<(ConnectionId, Request)>,
|
|
|
+ ConnectionId,
|
|
|
+ SubscriptionId,
|
|
|
+ )>,
|
|
|
}
|
|
|
|
|
|
-impl Contact {
|
|
|
- pub fn new(pub_key: Id, added_by: Option<Id>) -> Self {
|
|
|
+impl Account {
|
|
|
+ pub fn new(pub_key: Id) -> Self {
|
|
|
+ Self {
|
|
|
+ pub_key,
|
|
|
+ degree: 0,
|
|
|
+ added_by: Default::default(),
|
|
|
+ profile: Default::default(),
|
|
|
+ followed_by: Default::default(),
|
|
|
+ following: Default::default(),
|
|
|
+ content: Default::default(),
|
|
|
+ subscribed: Default::default(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
|
|
|
+ where
|
|
|
+ T: Storage + Send + Sync + 'static,
|
|
|
+ {
|
|
|
+ let request = Subscribe {
|
|
|
+ subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
|
|
|
+ filters: vec![
|
|
|
+ Filter {
|
|
|
+ authors: vec![self.pub_key.clone()],
|
|
|
+ kinds: vec![
|
|
|
+ Kind::Contacts,
|
|
|
+ Kind::Metadata,
|
|
|
+ Kind::MuteList,
|
|
|
+ Kind::Followset,
|
|
|
+ ],
|
|
|
+ ..Default::default()
|
|
|
+ },
|
|
|
+ Filter {
|
|
|
+ authors: vec![self.pub_key.clone()],
|
|
|
+ ..Default::default()
|
|
|
+ },
|
|
|
+ Filter {
|
|
|
+ tags: vec![(
|
|
|
+ "p".to_owned(),
|
|
|
+ HashSet::from([TagValue::Id(self.pub_key.clone())]),
|
|
|
+ )]
|
|
|
+ .into_iter()
|
|
|
+ .collect(),
|
|
|
+ ..Default::default()
|
|
|
+ },
|
|
|
+ ],
|
|
|
+ };
|
|
|
+
|
|
|
+ self.subscribed = Some((
|
|
|
+ conn.sender(),
|
|
|
+ conn.conn_id.clone(),
|
|
|
+ request.subscription_id.clone(),
|
|
|
+ ));
|
|
|
+ conn.send(Request::Request(request)).await?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn add_contact(&self, pub_key: Id) -> Self {
|
|
|
Self {
|
|
|
pub_key,
|
|
|
- profile: None,
|
|
|
- added_by,
|
|
|
- followed_by: HashSet::new(),
|
|
|
- following: HashSet::new(),
|
|
|
- content: Vec::new(),
|
|
|
+ degree: self.degree + 1,
|
|
|
+ added_by: Some(self.pub_key.clone()),
|
|
|
+ profile: Default::default(),
|
|
|
+ followed_by: Default::default(),
|
|
|
+ following: Default::default(),
|
|
|
+ content: Default::default(),
|
|
|
+ subscribed: Default::default(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Drop for Account {
|
|
|
+ fn drop(&mut self) {
|
|
|
+ if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
|
|
|
+ conn.try_send((conn_id, Request::Close(sub_id.into())))
|
|
|
+ .expect("unsubscription");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
|
|
|
relayer: Relayer<T>,
|
|
|
- accounts: Vec<Id>,
|
|
|
+ accounts: GraphManager,
|
|
|
}
|
|
|
|
|
|
impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
@@ -82,10 +157,13 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
|
let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
|
|
|
let relayer = Relayer::new(Some(storage), Some(pool))?;
|
|
|
|
|
|
- Ok(Self { relayer, accounts })
|
|
|
+ Ok(Self {
|
|
|
+ relayer,
|
|
|
+ accounts: GraphManager::new(2, accounts),
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
- pub async fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
|
|
|
+ pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
|
|
|
let (relayer, relayer_handler) = self.relayer.main(server)?;
|
|
|
let kinds = vec![
|
|
|
Kind::Contacts,
|
|
@@ -98,48 +176,22 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
|
relayer_handler,
|
|
|
tokio::spawn(async move {
|
|
|
let mut local_connection = relayer.create_new_local_connection().await;
|
|
|
- local_connection
|
|
|
- .send(Request::Request(
|
|
|
- vec![
|
|
|
- Filter {
|
|
|
- authors: self.accounts.clone(),
|
|
|
- kinds: kinds.clone(),
|
|
|
- ..Default::default()
|
|
|
- },
|
|
|
- Filter {
|
|
|
- kinds: kinds.clone(),
|
|
|
- tags: vec![(
|
|
|
- "p".to_owned(),
|
|
|
- self.accounts
|
|
|
- .iter()
|
|
|
- .map(|id| TagValue::Id(id.clone()))
|
|
|
- .collect::<HashSet<TagValue>>(),
|
|
|
- )]
|
|
|
- .into_iter()
|
|
|
- .collect(),
|
|
|
- ..Default::default()
|
|
|
- },
|
|
|
- ]
|
|
|
- .into(),
|
|
|
- ))
|
|
|
- .await
|
|
|
- .expect("Failed to send request");
|
|
|
+
|
|
|
+ let _ = self.accounts.create_subscriptions(&local_connection).await;
|
|
|
|
|
|
let mut already_subscribed = HashSet::new();
|
|
|
- let mut to_remove = HashSet::new();
|
|
|
|
|
|
loop {
|
|
|
while let Some(res) = local_connection.recv().await {
|
|
|
match res {
|
|
|
- Response::EndOfStoredEvents(id) => {
|
|
|
- if to_remove.contains(&id.0) {
|
|
|
- let _ = local_connection.future_send(
|
|
|
- Request::Close(id.0.into()),
|
|
|
- Duration::from_secs(10),
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
Response::Event(event) => {
|
|
|
+ let current_user: Id =
|
|
|
+ if let Ok(id) = event.subscription_id.deref().try_into() {
|
|
|
+ id
|
|
|
+ } else {
|
|
|
+ continue;
|
|
|
+ };
|
|
|
+
|
|
|
match event.content() {
|
|
|
Content::Metadata(_profile) => {}
|
|
|
Content::Contacts(_) => {
|
|
@@ -162,40 +214,21 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if ids.len() > 0 {
|
|
|
- log::info!("found {} authors", ids.len());
|
|
|
+ if ids.is_empty() {
|
|
|
+ continue;
|
|
|
}
|
|
|
|
|
|
- for authors in
|
|
|
- ids.chunks(20).collect::<Vec<_>>().into_iter()
|
|
|
- {
|
|
|
- let subscribe: client::Subscribe = vec![
|
|
|
- Filter {
|
|
|
- kinds: kinds.clone(),
|
|
|
- authors: authors.to_vec(),
|
|
|
- ..Default::default()
|
|
|
- },
|
|
|
- Filter {
|
|
|
- kinds: kinds.clone(),
|
|
|
- tags: vec![(
|
|
|
- "p".to_owned(),
|
|
|
- authors
|
|
|
- .iter()
|
|
|
- .map(|id| TagValue::Id(id.clone()))
|
|
|
- .collect::<HashSet<TagValue>>(),
|
|
|
- )]
|
|
|
- .into_iter()
|
|
|
- .collect(),
|
|
|
- ..Default::default()
|
|
|
- },
|
|
|
- ]
|
|
|
- .into();
|
|
|
- to_remove.insert(subscribe.subscription_id.clone());
|
|
|
-
|
|
|
- let _ = local_connection
|
|
|
- .send(Request::Request(subscribe))
|
|
|
- .await;
|
|
|
+ log::info!("found {} authors", ids.len());
|
|
|
+
|
|
|
+ for pub_key in ids {
|
|
|
+ let _ =
|
|
|
+ self.accounts.follows_to(¤t_user, pub_key);
|
|
|
}
|
|
|
+
|
|
|
+ let _ = self
|
|
|
+ .accounts
|
|
|
+ .create_subscriptions(&local_connection)
|
|
|
+ .await;
|
|
|
}
|
|
|
Content::ShortTextNote(_) => {}
|
|
|
_ => {}
|