lib.rs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. use futures::future::join_all;
  2. use nostr_rs_client::Pool;
  3. use nostr_rs_relayer::Relayer;
  4. use nostr_rs_storage_base::Storage;
  5. use nostr_rs_types::{
  6. client,
  7. types::{
  8. content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id, Kind,
  9. },
  10. Request, Response,
  11. };
  12. use serde::Serialize;
  13. use std::{collections::HashSet, ops::Deref, time::Duration};
  14. use tokio::{net::TcpListener, task::JoinHandle};
  15. use url::Url;
  16. pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
  17. impl From<Vec<JoinHandle<()>>> for Stoppable {
  18. fn from(value: Vec<JoinHandle<()>>) -> Self {
  19. Self(Some(value))
  20. }
  21. }
  22. impl Drop for Stoppable {
  23. fn drop(&mut self) {
  24. if let Some(tasks) = self.0.take() {
  25. for join_handle in tasks.into_iter() {
  26. join_handle.abort();
  27. }
  28. }
  29. }
  30. }
  31. impl Stoppable {
  32. pub async fn wait(mut self) {
  33. if let Some(tasks) = self.0.take() {
  34. join_all(tasks).await;
  35. }
  36. }
  37. }
  38. #[derive(thiserror::Error, Debug)]
  39. pub enum Error {
  40. #[error("Relayer: {0}")]
  41. Relayer(#[from] nostr_rs_relayer::Error),
  42. #[error("Client error: {0}")]
  43. Client(#[from] nostr_rs_client::Error),
  44. }
  45. #[derive(Debug, Clone, Serialize)]
  46. pub struct Contact {
  47. pub pub_key: Id,
  48. pub added_by: Option<Id>,
  49. pub profile: Option<Profile>,
  50. pub followed_by: HashSet<Id>,
  51. pub following: HashSet<Id>,
  52. pub content: Vec<Event>,
  53. }
  54. impl Contact {
  55. pub fn new(pub_key: Id, added_by: Option<Id>) -> Self {
  56. Self {
  57. pub_key,
  58. profile: None,
  59. added_by,
  60. followed_by: HashSet::new(),
  61. following: HashSet::new(),
  62. content: Vec::new(),
  63. }
  64. }
  65. }
  66. pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
  67. relayer: Relayer<T>,
  68. accounts: Vec<Id>,
  69. }
  70. impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
  71. pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
  72. let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
  73. let relayer = Relayer::new(Some(storage), Some(pool))?;
  74. Ok(Self { relayer, accounts })
  75. }
  76. pub async fn main(self, server: TcpListener) -> Result<Stoppable, Error> {
  77. let (relayer, relayer_handler) = self.relayer.main(server)?;
  78. let kinds = vec![
  79. Kind::Contacts,
  80. Kind::Metadata,
  81. Kind::MuteList,
  82. Kind::Followset,
  83. ];
  84. let tasks = vec![
  85. relayer_handler,
  86. tokio::spawn(async move {
  87. let mut local_connection = relayer.create_new_local_connection().await;
  88. local_connection
  89. .send(Request::Request(
  90. vec![
  91. Filter {
  92. authors: self.accounts.clone(),
  93. kinds: kinds.clone(),
  94. ..Default::default()
  95. },
  96. Filter {
  97. kinds: kinds.clone(),
  98. tags: vec![(
  99. "p".to_owned(),
  100. self.accounts
  101. .iter()
  102. .map(|id| TagValue::Id(id.clone()))
  103. .collect::<HashSet<TagValue>>(),
  104. )]
  105. .into_iter()
  106. .collect(),
  107. ..Default::default()
  108. },
  109. ]
  110. .into(),
  111. ))
  112. .await
  113. .expect("Failed to send request");
  114. let mut already_subscribed = HashSet::new();
  115. let mut to_remove = HashSet::new();
  116. loop {
  117. while let Some(res) = local_connection.recv().await {
  118. match res {
  119. Response::EndOfStoredEvents(id) => {
  120. if to_remove.contains(&id.0) {
  121. let _ = local_connection.future_send(
  122. Request::Close(id.0.into()),
  123. Duration::from_secs(10),
  124. );
  125. }
  126. }
  127. Response::Event(event) => {
  128. match event.content() {
  129. Content::Metadata(_profile) => {}
  130. Content::Contacts(_) => {
  131. let mut ids = vec![];
  132. for tag in event.tags() {
  133. if let TagType::PubKey(pub_key, relayer_url, _) =
  134. tag.deref()
  135. {
  136. if let Some(_relayer_url) = relayer_url {
  137. //let _ = relayer
  138. // .connect_to_relayer(relayer_url.clone())
  139. // .await;
  140. }
  141. if !already_subscribed.contains(pub_key) {
  142. ids.push(pub_key.clone());
  143. already_subscribed.insert(pub_key.clone());
  144. }
  145. }
  146. }
  147. if ids.len() > 0 {
  148. log::info!("found {} authors", ids.len());
  149. }
  150. for authors in
  151. ids.chunks(20).collect::<Vec<_>>().into_iter()
  152. {
  153. let subscribe: client::Subscribe = vec![
  154. Filter {
  155. kinds: kinds.clone(),
  156. authors: authors.to_vec(),
  157. ..Default::default()
  158. },
  159. Filter {
  160. kinds: kinds.clone(),
  161. tags: vec![(
  162. "p".to_owned(),
  163. authors
  164. .iter()
  165. .map(|id| TagValue::Id(id.clone()))
  166. .collect::<HashSet<TagValue>>(),
  167. )]
  168. .into_iter()
  169. .collect(),
  170. ..Default::default()
  171. },
  172. ]
  173. .into();
  174. to_remove.insert(subscribe.subscription_id.clone());
  175. let _ = local_connection
  176. .send(Request::Request(subscribe))
  177. .await;
  178. }
  179. }
  180. Content::ShortTextNote(_) => {}
  181. _ => {}
  182. }
  183. }
  184. _ => {}
  185. }
  186. }
  187. }
  188. }),
  189. ];
  190. Ok(tasks.into())
  191. }
  192. }