Browse Source

Clippy and local connection

Cesar Rodas 3 months ago
parent
commit
620127fbdc

+ 50 - 0
crates/relayer/src/connection/local.rs

@@ -0,0 +1,50 @@
+//! Local connection
+//!
+//! Add types for adding local connections
+use crate::{connection::ConnectionId, Error};
+use nostr_rs_types::{Request, Response};
+use tokio::sync::mpsc::{Receiver, Sender};
+
+/// Local connection
+pub struct LocalConnection {
+    sender: Sender<(ConnectionId, Request)>,
+    receiver: Receiver<Response>,
+    conn_id: ConnectionId,
+}
+
+impl LocalConnection {
+    /// Receive a message from the relayer
+    pub async fn recv(&mut self) -> Option<Response> {
+        self.receiver.recv().await
+    }
+
+    /// Sends a request to the relayer
+    pub async fn send(&self, request: Request) -> Result<(), Error> {
+        self.sender
+            .send((self.conn_id, request))
+            .await
+            .map_err(|e| Error::LocalSendError(Box::new(e)))
+    }
+}
+
+impl
+    From<(
+        ConnectionId,
+        Receiver<Response>,
+        Sender<(ConnectionId, Request)>,
+    )> for LocalConnection
+{
+    fn from(
+        value: (
+            ConnectionId,
+            Receiver<Response>,
+            Sender<(ConnectionId, Request)>,
+        ),
+    ) -> Self {
+        LocalConnection {
+            conn_id: value.0,
+            receiver: value.1,
+            sender: value.2,
+        }
+    }
+}

+ 13 - 6
crates/relayer/src/connection.rs → crates/relayer/src/connection/mod.rs

@@ -24,6 +24,10 @@ use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
 pub struct ConnectionId(usize);
 pub struct ConnectionId(usize);
 
 
+mod local;
+
+pub use local::LocalConnection;
+
 impl Default for ConnectionId {
 impl Default for ConnectionId {
     fn default() -> Self {
     fn default() -> Self {
         static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
         static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
@@ -47,6 +51,8 @@ impl ConnectionId {
     }
     }
 }
 }
 
 
+type CompoundSubcription = (Option<PoolSubscription>, Vec<ActiveSubscription>);
+
 #[derive(Debug)]
 #[derive(Debug)]
 /// Relayer connection
 /// Relayer connection
 ///
 ///
@@ -55,8 +61,7 @@ impl ConnectionId {
 pub struct Connection {
 pub struct Connection {
     conn_id: ConnectionId,
     conn_id: ConnectionId,
     sender: Sender<Response>,
     sender: Sender<Response>,
-    subscriptions:
-        RwLock<HashMap<SubscriptionId, (Option<PoolSubscription>, Vec<ActiveSubscription>)>>,
+    subscriptions: RwLock<HashMap<SubscriptionId, CompoundSubcription>>,
     handler: Option<JoinHandle<()>>,
     handler: Option<JoinHandle<()>>,
 }
 }
 
 
@@ -65,14 +70,14 @@ const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
 impl Drop for Connection {
 impl Drop for Connection {
     fn drop(&mut self) {
     fn drop(&mut self) {
         if let Some(handler) = self.handler.take() {
         if let Some(handler) = self.handler.take() {
-            let _ = handler.abort();
+            handler.abort();
         }
         }
     }
     }
 }
 }
 
 
 impl Connection {
 impl Connection {
-    #[cfg(test)]
-    pub fn new_for_test() -> (Self, Receiver<Response>) {
+    /// Create a new local connection
+    pub fn new_local_connection() -> (Self, Receiver<Response>) {
         let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
         let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
         (
         (
             Self {
             Self {
@@ -136,7 +141,9 @@ impl Connection {
                             let msg: Result<Request, _> = serde_json::from_str(&msg);
                             let msg: Result<Request, _> = serde_json::from_str(&msg);
                             match msg {
                             match msg {
                                 Ok(msg) => {
                                 Ok(msg) => {
-                                    let _ = send_message_to_relayer.send((conn_id, msg)).await;
+                                    if let Err(err) = send_message_to_relayer.send((conn_id, msg)).await {
+                                        log::error!("Error sending message to relayer: {}", err);
+                                    }
                                 },
                                 },
                                 Err(err) => {
                                 Err(err) => {
                                     log::error!("Error parsing message from client: {}", err);
                                     log::error!("Error parsing message from client: {}", err);

+ 6 - 1
crates/relayer/src/error.rs

@@ -1,4 +1,5 @@
-use nostr_rs_types::Response;
+use crate::connection::ConnectionId;
+use nostr_rs_types::{Request, Response};
 
 
 #[derive(Debug, thiserror::Error)]
 #[derive(Debug, thiserror::Error)]
 /// Relayer error
 /// Relayer error
@@ -19,6 +20,10 @@ pub enum Error {
     #[error("TrySendError: {0}")]
     #[error("TrySendError: {0}")]
     TrySendError(#[from] Box<tokio::sync::mpsc::error::TrySendError<Response>>),
     TrySendError(#[from] Box<tokio::sync::mpsc::error::TrySendError<Response>>),
 
 
+    /// Tokio channel's error
+    #[error("LocalTrySendError: {0}")]
+    LocalSendError(#[from] Box<tokio::sync::mpsc::error::SendError<(ConnectionId, Request)>>),
+
     /// Client related errors
     /// Client related errors
     #[error("Nostr client error: {0}")]
     #[error("Nostr client error: {0}")]
     Client(#[from] nostr_rs_client::Error),
     Client(#[from] nostr_rs_client::Error),

+ 5 - 1
crates/relayer/src/lib.rs

@@ -15,4 +15,8 @@ mod error;
 mod relayer;
 mod relayer;
 mod subscription;
 mod subscription;
 
 
-pub use self::{connection::Connection, error::Error, relayer::Relayer};
+pub use self::{
+    connection::{Connection, LocalConnection},
+    error::Error,
+    relayer::Relayer,
+};

+ 34 - 20
crates/relayer/src/relayer.rs

@@ -1,4 +1,8 @@
-use crate::{connection::ConnectionId, subscription::SubscriptionManager, Connection, Error};
+use crate::{
+    connection::{ConnectionId, LocalConnection},
+    subscription::SubscriptionManager,
+    Connection, Error,
+};
 use futures_util::StreamExt;
 use futures_util::StreamExt;
 use nostr_rs_client::{Error as ClientError, Pool};
 use nostr_rs_client::{Error as ClientError, Pool};
 use nostr_rs_storage_base::Storage;
 use nostr_rs_storage_base::Storage;
@@ -21,9 +25,10 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// relayer just a dumb proxy (that can be useful for privacy) but it won't
     /// be able to perform any optimization like prefetching content while offline
     /// be able to perform any optimization like prefetching content while offline
     storage: Option<T>,
     storage: Option<T>,
-    /// x
+    /// Subscription manager
     subscriptions: Arc<SubscriptionManager>,
     subscriptions: Arc<SubscriptionManager>,
-    clients: RwLock<HashMap<ConnectionId, Connection>>,
+    /// List of all active connections
+    connections: RwLock<HashMap<ConnectionId, Connection>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
     /// This Sender can be used to send requests from anywhere to the relayer.
     send_to_relayer: Sender<(ConnectionId, Request)>,
     send_to_relayer: Sender<(ConnectionId, Request)>,
     /// This Receiver is the relayer the way the relayer receives messages
     /// This Receiver is the relayer the way the relayer receives messages
@@ -37,7 +42,7 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
 impl<T: Storage + Send + Sync + 'static> Drop for Relayer<T> {
 impl<T: Storage + Send + Sync + 'static> Drop for Relayer<T> {
     fn drop(&mut self) {
     fn drop(&mut self) {
         if let Some((_, handle)) = self.client_pool.take() {
         if let Some((_, handle)) = self.client_pool.take() {
-            let _ = handle.abort();
+            handle.abort();
         }
         }
     }
     }
 }
 }
@@ -58,7 +63,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
             subscriptions: Default::default(),
             subscriptions: Default::default(),
             send_to_relayer: sender.clone(),
             send_to_relayer: sender.clone(),
             relayer_receiver: Some(receiver),
             relayer_receiver: Some(receiver),
-            clients: Default::default(),
+            connections: Default::default(),
             client_pool: if let Some(client_pool) = client_pool {
             client_pool: if let Some(client_pool) = client_pool {
                 Some(Self::handle_client_pool(client_pool, sender)?)
                 Some(Self::handle_client_pool(client_pool, sender)?)
             } else {
             } else {
@@ -99,12 +104,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                                 if let Some(storage) = this.storage.as_ref() {
                                 if let Some(storage) = this.storage.as_ref() {
                                     let _ = storage.store_local_event(&event).await;
                                     let _ = storage.store_local_event(&event).await;
                                 }
                                 }
-                                this.broadcast(&event.deref()).await;
+                                this.broadcast(event.deref()).await;
                             }
                             }
                             continue;
                             continue;
                         }
                         }
 
 
-                        let connections = this.clients.read().await;
+                        let connections = this.connections.read().await;
                         let connection = if let Some(connection) = connections.get(&conn_id) {
                         let connection = if let Some(connection) = connections.get(&conn_id) {
                             connection
                             connection
                         } else {
                         } else {
@@ -160,6 +165,15 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         &self.storage
         &self.storage
     }
     }
 
 
+    /// Adds a new local connection to the list of active connections.
+    pub async fn create_new_local_connection(&self) -> LocalConnection {
+        let (conn, receiver) = Connection::new_local_connection();
+        let conn_id = conn.get_conn_id();
+        self.connections.write().await.insert(conn_id, conn);
+
+        (conn_id, receiver, self.send_to_relayer.clone()).into()
+    }
+
     /// Adds a new TpStream and adds it to the list of active connections.
     /// Adds a new TpStream and adds it to the list of active connections.
     ///
     ///
     /// This function will spawn the client's loop to receive incoming messages and send those messages
     /// This function will spawn the client's loop to receive incoming messages and send those messages
@@ -168,11 +182,11 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         disconnection_notify: Option<mpsc::Sender<ConnectionId>>,
         disconnection_notify: Option<mpsc::Sender<ConnectionId>>,
         stream: TcpStream,
         stream: TcpStream,
     ) -> Result<ConnectionId, Error> {
     ) -> Result<ConnectionId, Error> {
-        let client =
+        let conn =
             Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
             Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
                 .await?;
                 .await?;
-        let id = client.get_conn_id();
-        self.clients.write().await.insert(id, client);
+        let id = conn.get_conn_id();
+        self.connections.write().await.insert(id, conn);
 
 
         Ok(id)
         Ok(id)
     }
     }
@@ -195,7 +209,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 if let Some((client_pool, _)) = self.client_pool.as_ref() {
                 if let Some((client_pool, _)) = self.client_pool.as_ref() {
                     // pass the event to the pool of clients, so this relayer can relay
                     // pass the event to the pool of clients, so this relayer can relay
                     // their local events to the clients in the network of relayers
                     // their local events to the clients in the network of relayers
-                    let _ = client_pool.post(event.clone().into()).await;
+                    let _ = client_pool.post(event.clone()).await;
                 }
                 }
             }
             }
             Request::Request(request) => {
             Request::Request(request) => {
@@ -249,7 +263,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     .await;
                     .await;
             }
             }
             Request::Close(close) => {
             Request::Close(close) => {
-                connection.unsubscribe(&*close).await;
+                connection.unsubscribe(close).await;
             }
             }
         };
         };
 
 
@@ -320,7 +334,7 @@ mod test {
         if prefill {
         if prefill {
             let events = include_str!("../tests/events.json")
             let events = include_str!("../tests/events.json")
                 .lines()
                 .lines()
-                .map(|line| serde_json::from_str(&line).expect("valid"))
+                .map(|line| serde_json::from_str(line).expect("valid"))
                 .collect::<Vec<Event>>();
                 .collect::<Vec<Event>>();
 
 
             for event in events {
             for event in events {
@@ -389,7 +403,7 @@ mod test {
         ]))
         ]))
         .expect("valid object");
         .expect("valid object");
         let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer");
         let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_for_test();
+        let (connection, mut recv) = Connection::new_local_connection();
         let _ = relayer
         let _ = relayer
             .process_request_from_client(&connection, request)
             .process_request_from_client(&connection, request)
             .await;
             .await;
@@ -493,7 +507,7 @@ mod test {
         ]))
         ]))
         .expect("valid object");
         .expect("valid object");
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_for_test();
+        let (connection, mut recv) = Connection::new_local_connection();
 
 
         assert_eq!(relayer.total_subscribers(), 0);
         assert_eq!(relayer.total_subscribers(), 0);
         let _ = relayer
         let _ = relayer
@@ -557,7 +571,7 @@ mod test {
         .expect("valid object");
         .expect("valid object");
 
 
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_for_test();
+        let (connection, mut recv) = Connection::new_local_connection();
 
 
         assert_eq!(relayer.total_subscribers(), 0);
         assert_eq!(relayer.total_subscribers(), 0);
         let _ = relayer
         let _ = relayer
@@ -620,14 +634,14 @@ mod test {
         .expect("valid object");
         .expect("valid object");
 
 
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (publisher, _) = Connection::new_for_test();
+        let (publisher, _) = Connection::new_local_connection();
 
 
         let mut set1 = (0..1000)
         let mut set1 = (0..1000)
-            .map(|_| Connection::new_for_test())
+            .map(|_| Connection::new_local_connection())
             .collect::<Vec<_>>();
             .collect::<Vec<_>>();
 
 
         let mut set2 = (0..100)
         let mut set2 = (0..100)
-            .map(|_| Connection::new_for_test())
+            .map(|_| Connection::new_local_connection())
             .collect::<Vec<_>>();
             .collect::<Vec<_>>();
 
 
         let subscribe1 = set1
         let subscribe1 = set1
@@ -712,7 +726,7 @@ mod test {
             serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
             serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
 
 
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
         let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
-        let (connection, mut recv) = Connection::new_for_test();
+        let (connection, mut recv) = Connection::new_local_connection();
 
 
         assert_eq!(relayer.total_subscribers(), 0);
         assert_eq!(relayer.total_subscribers(), 0);
         let _ = relayer
         let _ = relayer

+ 10 - 12
crates/relayer/src/subscription/filter.rs

@@ -4,6 +4,7 @@ use std::collections::BTreeSet;
 /// The subscription keys are used to quickly identify the subscriptions
 /// The subscription keys are used to quickly identify the subscriptions
 /// by one or more fields. Think of it like a database index
 /// by one or more fields. Think of it like a database index
 #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
 #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
+#[allow(clippy::enum_variant_names)]
 pub(crate) enum Key {
 pub(crate) enum Key {
     /// Key for the author field
     /// Key for the author field
     Author(Vec<u8>, Option<Kind>),
     Author(Vec<u8>, Option<Kind>),
@@ -81,27 +82,27 @@ impl SortedFilter {
         let authors = self
         let authors = self
             .authors
             .authors
             .as_ref()
             .as_ref()
-            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
 
 
         let ids = self
         let ids = self
             .ids
             .ids
             .as_ref()
             .as_ref()
-            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
 
 
         let references_to_event = self
         let references_to_event = self
             .references_to_event
             .references_to_event
             .as_ref()
             .as_ref()
-            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
 
 
         let references_to_public_key = self
         let references_to_public_key = self
             .references_to_public_key
             .references_to_public_key
             .as_ref()
             .as_ref()
-            .map_or_else(|| vec![], |x| x.iter().map(|x| x.clone()).collect());
+            .map_or_else(std::vec::Vec::new, |x| x.iter().cloned().collect());
 
 
         let kinds = self
         let kinds = self
             .kinds
             .kinds
             .as_ref()
             .as_ref()
-            .map_or_else(|| vec![], |x| x.iter().map(|x| *x).collect());
+            .map_or_else(std::vec::Vec::new, |x| x.iter().copied().collect());
 
 
         let kind_option = if kinds.is_empty() {
         let kind_option = if kinds.is_empty() {
             vec![None]
             vec![None]
@@ -109,7 +110,7 @@ impl SortedFilter {
             kinds.clone().into_iter().map(Some).collect()
             kinds.clone().into_iter().map(Some).collect()
         };
         };
 
 
-        let keys = vec![
+        let keys = [
             authors
             authors
                 .into_iter()
                 .into_iter()
                 .map(|author| {
                 .map(|author| {
@@ -137,11 +138,8 @@ impl SortedFilter {
                         .collect::<Vec<_>>()
                         .collect::<Vec<_>>()
                 })
                 })
                 .collect::<Vec<_>>(),
                 .collect::<Vec<_>>(),
-            vec![kinds
-                .into_iter()
-                .map(|kind| Key::Kind(kind))
-                .collect::<Vec<_>>()],
-            vec![ids.into_iter().map(|id| Key::Id(id)).collect::<Vec<_>>()],
+            vec![kinds.into_iter().map(Key::Kind).collect::<Vec<_>>()],
+            vec![ids.into_iter().map(Key::Id).collect::<Vec<_>>()],
         ]
         ]
         .concat()
         .concat()
         .concat();
         .concat();
@@ -198,7 +196,7 @@ impl SortedFilter {
             .as_ref()
             .as_ref()
             .map_or(true, |kinds| kinds.contains(&event.kind()))
             .map_or(true, |kinds| kinds.contains(&event.kind()))
             && Self::has_key_or_partial_match(&event.id, &self.ids)
             && Self::has_key_or_partial_match(&event.id, &self.ids)
-            && Self::has_key_or_partial_match(&event.author(), &self.authors)
+            && Self::has_key_or_partial_match(event.author(), &self.authors)
             && Self::has_any_key_or_partial_match(
             && Self::has_any_key_or_partial_match(
                 event.tags().iter().filter_map(|f| match f {
                 event.tags().iter().filter_map(|f| match f {
                     Tag::Event(x) => Some(x.id.as_ref().to_vec()),
                     Tag::Event(x) => Some(x.id.as_ref().to_vec()),

+ 5 - 3
crates/relayer/src/subscription/manager.rs

@@ -65,6 +65,8 @@ impl Drop for ActiveSubscription {
     }
     }
 }
 }
 
 
+type SubscriptionValue = (Arc<SortedFilter>, Sender<Response>);
+
 /// Subscription manager
 /// Subscription manager
 ///
 ///
 /// This object is responsible for letting clients and processes subscribe to
 /// This object is responsible for letting clients and processes subscribe to
@@ -75,7 +77,7 @@ pub struct SubscriptionManager {
     ///
     ///
     /// A single request may be converted to multiple subscriptions entry as
     /// A single request may be converted to multiple subscriptions entry as
     /// they are sorted by their index/key
     /// they are sorted by their index/key
-    subscriptions: RwLock<BTreeMap<SubIdx, (Arc<SortedFilter>, Sender<Response>)>>,
+    subscriptions: RwLock<BTreeMap<SubIdx, SubscriptionValue>>,
     /// Total number of subscribers
     /// Total number of subscribers
     /// A single REQ may have multiple subscriptions
     /// A single REQ may have multiple subscriptions
     total_subscribers: AtomicUsize,
     total_subscribers: AtomicUsize,
@@ -145,7 +147,7 @@ impl SubscriptionManager {
             subscriptions.push(Key::Id(id[..len - i].to_vec()));
             subscriptions.push(Key::Id(id[..len - i].to_vec()));
         }
         }
 
 
-        subscriptions.push(Key::Kind(event.kind().into()));
+        subscriptions.push(Key::Kind(event.kind()));
         subscriptions.push(Key::AllUpdates);
         subscriptions.push(Key::AllUpdates);
 
 
         subscriptions
         subscriptions
@@ -214,7 +216,7 @@ impl SubscriptionManager {
                     }
                     }
 
 
                     let _ = sender.try_send(Response::Event((name, &event).into()));
                     let _ = sender.try_send(Response::Event((name, &event).into()));
-                    deliverded.insert(client.clone());
+                    deliverded.insert(*client);
                 }
                 }
             }
             }
         });
         });

+ 1 - 1
crates/types/src/types/signature.rs

@@ -29,7 +29,7 @@ pub struct Signature(pub [u8; 64]);
 
 
 impl From<secp256k1::schnorr::Signature> for Signature {
 impl From<secp256k1::schnorr::Signature> for Signature {
     fn from(signature: secp256k1::schnorr::Signature) -> Self {
     fn from(signature: secp256k1::schnorr::Signature) -> Self {
-        Self(signature.as_ref().clone())
+        Self(*signature.as_ref())
     }
     }
 }
 }