lib.rs 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. use account::GraphManager;
  2. use futures::future::join_all;
  3. use nostr_rs_client::Pool;
  4. use nostr_rs_relayer::{ConnectionId, LocalConnection, Relayer};
  5. use nostr_rs_storage_base::Storage;
  6. use nostr_rs_types::{
  7. client::{self, Subscribe},
  8. types::{
  9. content::profile::Profile, filter::TagValue, tag::TagType, Content, Event, Filter, Id,
  10. Kind, SubscriptionId,
  11. },
  12. Request, Response,
  13. };
  14. use std::{collections::HashSet, ops::Deref, time::Duration};
  15. use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
  16. use url::Url;
  17. mod account;
  18. pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
  19. impl From<Vec<JoinHandle<()>>> for Stoppable {
  20. fn from(value: Vec<JoinHandle<()>>) -> Self {
  21. Self(Some(value))
  22. }
  23. }
  24. impl Drop for Stoppable {
  25. fn drop(&mut self) {
  26. if let Some(tasks) = self.0.take() {
  27. for join_handle in tasks.into_iter() {
  28. join_handle.abort();
  29. }
  30. }
  31. }
  32. }
  33. impl Stoppable {
  34. pub async fn wait(mut self) {
  35. if let Some(tasks) = self.0.take() {
  36. join_all(tasks).await;
  37. }
  38. }
  39. }
  40. #[derive(thiserror::Error, Debug)]
  41. pub enum Error {
  42. #[error("Relayer: {0}")]
  43. Relayer(#[from] nostr_rs_relayer::Error),
  44. #[error("Client error: {0}")]
  45. Client(#[from] nostr_rs_client::Error),
  46. }
  47. #[derive(Debug, Clone, Default)]
  48. pub struct Account {
  49. pub pub_key: Id,
  50. pub degree: usize,
  51. pub added_by: Option<Id>,
  52. pub profile: Option<Profile>,
  53. pub followed_by: HashSet<Id>,
  54. pub following: HashSet<Id>,
  55. pub content: Vec<Event>,
  56. subscribed: Option<(
  57. Sender<(ConnectionId, Request)>,
  58. ConnectionId,
  59. SubscriptionId,
  60. )>,
  61. }
  62. impl Account {
  63. pub fn new(pub_key: Id) -> Self {
  64. Self {
  65. pub_key,
  66. degree: 0,
  67. added_by: Default::default(),
  68. profile: Default::default(),
  69. followed_by: Default::default(),
  70. following: Default::default(),
  71. content: Default::default(),
  72. subscribed: Default::default(),
  73. }
  74. }
  75. pub async fn subscribe<T>(&mut self, conn: &LocalConnection<T>) -> Result<(), Error>
  76. where
  77. T: Storage + Send + Sync + 'static,
  78. {
  79. let request = Subscribe {
  80. subscription_id: self.pub_key.to_string().try_into().expect("valid id"),
  81. filters: vec![
  82. Filter {
  83. authors: vec![self.pub_key.clone()],
  84. kinds: vec![
  85. Kind::Contacts,
  86. Kind::Metadata,
  87. Kind::MuteList,
  88. Kind::Followset,
  89. ],
  90. ..Default::default()
  91. },
  92. Filter {
  93. authors: vec![self.pub_key.clone()],
  94. ..Default::default()
  95. },
  96. Filter {
  97. tags: vec![(
  98. "p".to_owned(),
  99. HashSet::from([TagValue::Id(self.pub_key.clone())]),
  100. )]
  101. .into_iter()
  102. .collect(),
  103. ..Default::default()
  104. },
  105. ],
  106. };
  107. self.subscribed = Some((
  108. conn.sender(),
  109. conn.conn_id.clone(),
  110. request.subscription_id.clone(),
  111. ));
  112. conn.send(Request::Request(request)).await?;
  113. Ok(())
  114. }
  115. pub fn add_contact(&self, pub_key: Id) -> Self {
  116. Self {
  117. pub_key,
  118. degree: self.degree + 1,
  119. added_by: Some(self.pub_key.clone()),
  120. profile: Default::default(),
  121. followed_by: Default::default(),
  122. following: Default::default(),
  123. content: Default::default(),
  124. subscribed: Default::default(),
  125. }
  126. }
  127. }
  128. impl Drop for Account {
  129. fn drop(&mut self) {
  130. if let Some((conn, conn_id, sub_id)) = self.subscribed.take() {
  131. conn.try_send((conn_id, Request::Close(sub_id.into())))
  132. .expect("unsubscription");
  133. }
  134. }
  135. }
  136. pub struct PersonalRelayer<T: Storage + Send + Sync + 'static> {
  137. relayer: Relayer<T>,
  138. accounts: GraphManager,
  139. }
  140. impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
  141. pub async fn new(storage: T, accounts: Vec<Id>, client_urls: Vec<Url>) -> Result<Self, Error> {
  142. let (pool, _active_clients) = Pool::new_with_clients(client_urls)?;
  143. let relayer = Relayer::new(Some(storage), Some(pool))?;
  144. Ok(Self {
  145. relayer,
  146. accounts: GraphManager::new(2, accounts),
  147. })
  148. }
  149. pub async fn main(mut self, server: TcpListener) -> Result<Stoppable, Error> {
  150. let (relayer, relayer_handler) = self.relayer.main(server)?;
  151. let tasks = vec![
  152. relayer_handler,
  153. tokio::spawn(async move {
  154. let mut local_connection = relayer.create_new_local_connection().await;
  155. let _ = self.accounts.create_subscriptions(&local_connection).await;
  156. let mut already_subscribed = HashSet::new();
  157. loop {
  158. while let Some(res) = local_connection.recv().await {
  159. match res {
  160. Response::Event(event) => {
  161. let current_user: Id =
  162. if let Ok(id) = event.subscription_id.deref().try_into() {
  163. id
  164. } else {
  165. continue;
  166. };
  167. match event.content() {
  168. Content::Metadata(_profile) => {}
  169. Content::Contacts(_) => {
  170. let mut ids = vec![];
  171. for tag in event.tags() {
  172. if let TagType::PubKey(pub_key, relayer_url, _) =
  173. tag.deref()
  174. {
  175. if let Some(_relayer_url) = relayer_url {
  176. //let _ = relayer
  177. // .connect_to_relayer(relayer_url.clone())
  178. // .await;
  179. }
  180. if !already_subscribed.contains(pub_key) {
  181. ids.push(pub_key.clone());
  182. already_subscribed.insert(pub_key.clone());
  183. }
  184. }
  185. }
  186. if ids.is_empty() {
  187. continue;
  188. }
  189. log::info!("found {} authors", ids.len());
  190. for pub_key in ids {
  191. let _ =
  192. self.accounts.follows_to(&current_user, pub_key);
  193. }
  194. let _ = self
  195. .accounts
  196. .create_subscriptions(&local_connection)
  197. .await;
  198. }
  199. Content::ShortTextNote(_) => {}
  200. _ => {}
  201. }
  202. }
  203. _ => {}
  204. }
  205. }
  206. }
  207. }),
  208. ];
  209. Ok(tasks.into())
  210. }
  211. }