123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- //! Relayers
- //!
- //! This is the main entry point to the client library.
- use crate::{client::ActiveSubscription, Client, Error};
- use futures::future::join_all;
- use nostr_rs_types::{
- client::{self, subscribe},
- types::SubscriptionId,
- Response,
- };
- use std::collections::HashMap;
- use tokio::sync::{mpsc, RwLock};
- use url::Url;
- /// Clients
- ///
- /// This is a set of outgoing connections to relayers. This struct can connect
- /// async to N relayers offering a simple API to talk to all of them at the same
- /// time, and to receive messages
- #[derive(Debug)]
- pub struct Pool {
- clients: RwLock<HashMap<Url, Client>>,
- sender: mpsc::Sender<(Response, Url)>,
- receiver: Option<mpsc::Receiver<(Response, Url)>>,
- subscriptions: RwLock<HashMap<SubscriptionId, Vec<ActiveSubscription>>>,
- }
- impl Default for Pool {
- fn default() -> Self {
- Self::new()
- }
- }
- const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 10_000;
- impl Pool {
- /// Creates a new Relayers object
- pub fn new() -> Self {
- let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
- Self {
- clients: Default::default(),
- receiver: Some(receiver),
- subscriptions: Default::default(),
- sender,
- }
- }
- /// Creates a new instance with a list of urls
- pub fn new_with_clients(clients: Vec<Url>) -> Self {
- let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
- let clients = clients
- .into_iter()
- .map(|url| (url.clone(), Client::new(sender.clone(), url)))
- .collect::<HashMap<_, _>>();
- Self {
- clients: RwLock::new(clients),
- subscriptions: Default::default(),
- receiver: Some(receiver),
- sender,
- }
- }
- /// Splits the pool removing the receiver to be used in a different context
- pub fn split(mut self) -> Result<(mpsc::Receiver<(Response, Url)>, Self), Error> {
- Ok((self.receiver.take().ok_or(Error::AlreadySplitted)?, self))
- }
- /// Tries to receive a message from any of the connected relayers
- pub fn try_recv(&mut self) -> Option<(Response, Url)> {
- self.receiver.as_mut()?.try_recv().ok()
- }
- /// Receives a message from any of the connected relayers
- pub async fn recv(&mut self) -> Option<(Response, Url)> {
- self.receiver.as_mut()?.recv().await
- }
- /// Subscribes to all the connected relayers
- pub async fn subscribe(&self, subscription: subscribe::Subscribe) -> Result<(), Error> {
- let clients = self.clients.read().await;
- let wait_all = clients
- .values()
- .map(|sender| sender.subscribe(subscription.clone()))
- .collect::<Vec<_>>();
- self.subscriptions.write().await.insert(
- subscription.subscription_id,
- join_all(wait_all)
- .await
- .into_iter()
- .collect::<Result<Vec<_>, _>>()?,
- );
- Ok(())
- }
- /// Sends a request to all the connected relayers
- pub async fn post(&self, request: client::Event) {
- let clients = self.clients.read().await;
- join_all(
- clients
- .values()
- .map(|sender| sender.post(request.clone()))
- .collect::<Vec<_>>(),
- )
- .await;
- }
- /// Returns the number of active connections.
- pub async fn check_active_connections(&self) -> usize {
- self.clients
- .read()
- .await
- .iter()
- .filter(|(_, client)| client.is_connected())
- .collect::<Vec<_>>()
- .len()
- }
- /// Creates a connection to a new relayer.
- ///
- /// This function will open a connection at most once, if a connection
- /// already exists false will be returned
- pub async fn connect_to(&self, url: Url) {
- let mut clients = self.clients.write().await;
- if !clients.contains_key(&url) {
- log::warn!("Connecting to {}", url);
- clients.insert(url.clone(), Client::new(self.sender.clone(), url));
- }
- }
- }
|