pool.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. //! Relayers
  2. //!
  3. //! This is the main entry point to the client library.
  4. use crate::{client::ActiveSubscription, Client, Error};
  5. use futures::future::join_all;
  6. use nostr_rs_types::{
  7. client::{self, subscribe},
  8. types::SubscriptionId,
  9. Response,
  10. };
  11. use std::collections::HashMap;
  12. use tokio::sync::{mpsc, RwLock};
  13. use url::Url;
  14. /// Clients
  15. ///
  16. /// This is a set of outgoing connections to relayers. This struct can connect
  17. /// async to N relayers offering a simple API to talk to all of them at the same
  18. /// time, and to receive messages
  19. #[derive(Debug)]
  20. pub struct Pool {
  21. clients: RwLock<HashMap<Url, Client>>,
  22. sender: mpsc::Sender<(Response, Url)>,
  23. receiver: Option<mpsc::Receiver<(Response, Url)>>,
  24. subscriptions: RwLock<HashMap<SubscriptionId, Vec<ActiveSubscription>>>,
  25. }
  26. impl Default for Pool {
  27. fn default() -> Self {
  28. Self::new()
  29. }
  30. }
  31. const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 10_000;
  32. impl Pool {
  33. /// Creates a new Relayers object
  34. pub fn new() -> Self {
  35. let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
  36. Self {
  37. clients: Default::default(),
  38. receiver: Some(receiver),
  39. subscriptions: Default::default(),
  40. sender,
  41. }
  42. }
  43. /// Creates a new instance with a list of urls
  44. pub fn new_with_clients(clients: Vec<Url>) -> Self {
  45. let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
  46. let clients = clients
  47. .into_iter()
  48. .map(|url| (url.clone(), Client::new(sender.clone(), url)))
  49. .collect::<HashMap<_, _>>();
  50. Self {
  51. clients: RwLock::new(clients),
  52. subscriptions: Default::default(),
  53. receiver: Some(receiver),
  54. sender,
  55. }
  56. }
  57. /// Splits the pool removing the receiver to be used in a different context
  58. pub fn split(mut self) -> Result<(mpsc::Receiver<(Response, Url)>, Self), Error> {
  59. Ok((self.receiver.take().ok_or(Error::AlreadySplitted)?, self))
  60. }
  61. /// Tries to receive a message from any of the connected relayers
  62. pub fn try_recv(&mut self) -> Option<(Response, Url)> {
  63. self.receiver.as_mut()?.try_recv().ok()
  64. }
  65. /// Receives a message from any of the connected relayers
  66. pub async fn recv(&mut self) -> Option<(Response, Url)> {
  67. self.receiver.as_mut()?.recv().await
  68. }
  69. /// Subscribes to all the connected relayers
  70. pub async fn subscribe(&self, subscription: subscribe::Subscribe) -> Result<(), Error> {
  71. let clients = self.clients.read().await;
  72. let wait_all = clients
  73. .values()
  74. .map(|sender| sender.subscribe(subscription.clone()))
  75. .collect::<Vec<_>>();
  76. self.subscriptions.write().await.insert(
  77. subscription.subscription_id,
  78. join_all(wait_all)
  79. .await
  80. .into_iter()
  81. .collect::<Result<Vec<_>, _>>()?,
  82. );
  83. Ok(())
  84. }
  85. /// Sends a request to all the connected relayers
  86. pub async fn post(&self, request: client::Event) {
  87. let clients = self.clients.read().await;
  88. join_all(
  89. clients
  90. .values()
  91. .map(|sender| sender.post(request.clone()))
  92. .collect::<Vec<_>>(),
  93. )
  94. .await;
  95. }
  96. /// Returns the number of active connections.
  97. pub async fn check_active_connections(&self) -> usize {
  98. self.clients
  99. .read()
  100. .await
  101. .iter()
  102. .filter(|(_, client)| client.is_connected())
  103. .collect::<Vec<_>>()
  104. .len()
  105. }
  106. /// Creates a connection to a new relayer.
  107. ///
  108. /// This function will open a connection at most once, if a connection
  109. /// already exists false will be returned
  110. pub async fn connect_to(&self, url: Url) {
  111. let mut clients = self.clients.write().await;
  112. if !clients.contains_key(&url) {
  113. log::warn!("Connecting to {}", url);
  114. clients.insert(url.clone(), Client::new(self.sender.clone(), url));
  115. }
  116. }
  117. }