Procházet zdrojové kódy

Working on test suite

* [x] Test connection pool
* [x] Reconnection and resubscriptions
* [x] Connect to multiple relayers
Cesar Rodas před 3 měsíci
rodič
revize
5334e2a8a5

+ 2 - 0
Cargo.lock

@@ -1305,7 +1305,9 @@ version = "0.26.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4124a35fe33ae14259c490fd70fa199a32b9ce9502f2ee6bc4f81ec06fa65894"
 dependencies = [
+ "rand",
  "secp256k1-sys",
+ "serde",
 ]
 
 [[package]]

+ 26 - 12
crates/client/src/client.rs

@@ -119,7 +119,7 @@ impl Client {
                     Ok(x) => x.0,
                     Err(err) => {
                         log::warn!("{}: Failed to connect: {}", url, err);
-                        sleep(Duration::from_secs(5)).await;
+                        sleep(Duration::from_secs(1)).await;
                         continue;
                     }
                 };
@@ -127,23 +127,39 @@ impl Client {
                 log::info!("Connected to {}", url);
                 connection_attempts = 0;
 
-                let subscriptions = send_on_connection
+                let mut subscriptions = send_on_connection
                     .read()
                     .await
                     .iter()
-                    .filter_map(|x| serde_json::to_string(&Request::Request(x.1.clone())).ok())
-                    .map(Message::Text)
-                    .collect::<Vec<_>>();
-
-                for msg in subscriptions {
-                    if let Err(x) = socket.send(msg).await {
-                        log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                    .map(|(sub_id, msg)| {
+                        (
+                            sub_id.to_owned(),
+                            serde_json::to_string(&Request::Request(msg.clone()))
+                                .ok()
+                                .map(Message::Text),
+                        )
+                    })
+                    .collect::<HashMap<_, _>>();
+
+                for msg in subscriptions.values_mut() {
+                    if let Some(msg) = msg.take() {
+                        if let Err(x) = socket.send(msg).await {
+                            log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                        }
                     }
                 }
 
+                is_connected.store(true, Relaxed);
+
                 loop {
                     tokio::select! {
                         Some(msg) = send_to_socket.recv() => {
+                            if let Request::Request(sub) = &msg {
+                                if subscriptions.get(&sub.subscription_id).is_some() {
+                                    log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
+                                    continue;
+                                }
+                            }
                             if let Ok(json) = serde_json::to_string(&msg) {
                                 log::info!("{}: Sending {}", url, json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
@@ -154,7 +170,6 @@ impl Client {
                         }
                         msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
                             let msg = if let Ok(Some(Ok(msg))) = msg {
-                                is_connected.store(true, Relaxed);
                                     match msg {
                                         Message::Text(text) => text,
                                         Message::Ping(msg) => {
@@ -180,11 +195,10 @@ impl Client {
 
                             log::info!("New message: {}", msg);
 
-
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = send_message_to_listener.try_send((msg.into(), url.clone())) {
+                                if let Err(error) = send_message_to_listener.try_send((msg, url.clone())) {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }

+ 185 - 13
crates/client/src/pool.rs

@@ -177,49 +177,221 @@ mod test {
     use super::*;
     use nostr_rs_memory::Memory;
     use nostr_rs_relayer::Relayer;
+    use nostr_rs_types::{account::Account, types::Content};
     use std::time::Duration;
     use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
 
-    async fn dummy_server() -> (Url, JoinHandle<()>) {
-        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+    async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
+        let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
+            .await
+            .unwrap();
         let local_addr = listener.local_addr().expect("addr");
 
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
         let stopper = relayer.main(listener).expect("valid main loop");
         (
-            Url::parse(&format!("ws://{}", local_addr.to_string())).expect("valid url"),
+            Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
             stopper,
         )
     }
 
     #[tokio::test]
     async fn droppable_subscription() {
-        let pool = Pool::default();
-        let subscription = pool
+        let client_pool = Pool::default();
+        let subscription = client_pool
             .subscribe(Default::default())
             .await
             .expect("valid subscription");
 
-        assert_eq!(pool.active_subscriptions().await, 1);
+        assert_eq!(client_pool.active_subscriptions().await, 1);
         drop(subscription);
         sleep(Duration::from_millis(10)).await;
-        assert_eq!(pool.active_subscriptions().await, 0);
+        assert_eq!(client_pool.active_subscriptions().await, 0);
     }
 
     #[tokio::test]
     async fn connect_to_dummy_server() {
-        let (addr, stopper) = dummy_server().await;
-        let pool = Pool::new_with_clients(vec![addr]);
+        let (addr, stopper) = dummy_server(0).await;
+        let client_pool = Pool::new_with_clients(vec![addr]);
 
-        assert_eq!(0, pool.check_active_connections().await);
+        assert_eq!(0, client_pool.check_active_connections().await);
 
-        sleep(Duration::from_millis(1000)).await;
-        assert_eq!(1, pool.check_active_connections().await);
+        sleep(Duration::from_millis(10)).await;
+        assert_eq!(1, client_pool.check_active_connections().await);
 
         // stop dummy server
         stopper.abort();
 
         sleep(Duration::from_millis(100)).await;
-        assert_eq!(0, pool.check_active_connections().await);
+        assert_eq!(0, client_pool.check_active_connections().await);
+    }
+
+    #[tokio::test]
+    async fn two_clients_communication() {
+        let (addr, _) = dummy_server(0).await;
+        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
+        let client_pool2 = Pool::new_with_clients(vec![addr]);
+
+        let _sub1 = client_pool1
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+        assert!(client_pool1.try_recv().is_none());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
+    }
+
+    #[tokio::test]
+    async fn reconnect_and_resubscribe() {
+        let (addr, stopper) = dummy_server(0).await;
+        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
+        let client_pool2 = Pool::new_with_clients(vec![addr.clone()]);
+
+        let _sub1 = client_pool1
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+        assert!(client_pool1.try_recv().is_none());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 1".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
+
+        assert_eq!(1, client_pool1.check_active_connections().await);
+        assert_eq!(1, client_pool2.check_active_connections().await);
+
+        stopper.abort();
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(0, client_pool1.check_active_connections().await);
+        assert_eq!(0, client_pool2.check_active_connections().await);
+
+        let (_, stopper) = dummy_server(addr.port().expect("port")).await;
+
+        sleep(Duration::from_millis(2_000)).await;
+
+        assert_eq!(1, client_pool1.check_active_connections().await);
+        assert_eq!(1, client_pool2.check_active_connections().await);
+
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 1".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
+
+        stopper.abort();
+    }
+
+    #[tokio::test]
+    async fn connect_multiple_servers() {
+        let (addr1, _) = dummy_server(0).await;
+        let (addr2, _) = dummy_server(0).await;
+        let mut client_pool1 = Pool::new_with_clients(vec![addr1.clone(), addr2]);
+        let client_pool2 = Pool::new_with_clients(vec![addr1]);
+
+        let _sub1 = client_pool1
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
     }
 }

+ 6 - 7
crates/relayer/src/connection.rs

@@ -1,7 +1,7 @@
 use crate::{subscription::ActiveSubscription, Error};
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{
-    relayer::{Auth, ROk},
+    relayer::ROk,
     types::{Addr, SubscriptionId},
     Request, Response,
 };
@@ -92,7 +92,6 @@ impl Connection {
         let websocket = accept_async(stream).await?;
         let conn_id = Default::default();
         let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
-        let _ = sender.send(Auth::default().into()).await;
         Ok(Self {
             conn_id,
             sender,
@@ -107,6 +106,7 @@ impl Connection {
         })
     }
 
+    /// Spawn a new worker for this connection
     fn spawn(
         send_message_to_relayer: Sender<(ConnectionId, Request)>,
         websocket: WebSocketStream<TcpStream>,
@@ -115,8 +115,7 @@ impl Connection {
         conn_id: ConnectionId,
     ) -> JoinHandle<()> {
         tokio::spawn(async move {
-            let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
-            let (mut writer, mut reader) = websocket.split();
+            let (mut ws_sender, mut ws_receiver) = websocket.split();
             loop {
                 tokio::select! {
                     Some(msg) = receiver.recv() => {
@@ -125,12 +124,12 @@ impl Connection {
                         } else {
                             continue;
                         };
-                        if let Err(err) =  writer.send(Message::Text(msg)).await {
+                        if let Err(err) =  ws_sender.send(Message::Text(msg)).await {
                             log::error!("Error sending message to client: {}", err);
                             break;
                         }
                     }
-                    Some(msg) = reader.next() => {
+                    Some(msg) = ws_receiver.next() => {
                         if let Ok(Message::Text(msg)) = msg {
                             let msg: Result<Request, _> = serde_json::from_str(&msg);
                             match msg {
@@ -149,7 +148,7 @@ impl Connection {
                                     } else {
                                         continue;
                                     };
-                                    if let Err(err) =  writer.send(Message::Text(reply)).await {
+                                    if let Err(err) =  ws_sender.send(Message::Text(reply)).await {
                                         log::error!("Error sending message to client: {}", err);
                                         break;
                                     }

+ 0 - 4
crates/relayer/src/subscription/manager.rs

@@ -95,14 +95,11 @@ impl Default for SubscriptionManager {
 
 impl SubscriptionManager {
     async fn unsubscribe(self: Arc<Self>, keys: Vec<SubIdx>) {
-        println!("block");
         let mut subscriptions = self.subscriptions.write().await;
-        println!("\tblocked");
         for sub in keys {
             subscriptions.remove(&sub);
         }
         self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
-        println!("released");
     }
 
     fn get_keys_from_event(event: &Event, min_prefix_match_len: usize) -> Vec<Key> {
@@ -215,7 +212,6 @@ impl SubscriptionManager {
                     if deliverded.contains(client) || !filter.check(&event) {
                         continue;
                     }
-                    println!("send");
 
                     let _ = sender.try_send(Response::Event((name, &event).into()));
                     deliverded.insert(client.clone());

+ 5 - 1
crates/types/Cargo.toml

@@ -13,7 +13,11 @@ custom_derive = "0.1.7"
 enum_derive = "0.1.7"
 hex = "0.4"
 rand = "0.8.5"
-secp256k1 = { version = "0.26.0", features = ["global-context"] }
+secp256k1 = { version = "0.26.0", features = [
+    "global-context",
+    "rand",
+    "serde",
+] }
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 sha2 = "0.10.6"

+ 59 - 0
crates/types/src/account.rs

@@ -0,0 +1,59 @@
+//!  Account related types
+use crate::types::{event::Error, Content, Event, Tag, UnsignedEvent};
+use chrono::{DateTime, Utc};
+use rand::rngs::OsRng;
+use secp256k1::{KeyPair, Message, Secp256k1, SecretKey, XOnlyPublicKey};
+
+/// Nostr User account
+#[derive(Debug, Clone)]
+pub struct Account(KeyPair);
+
+impl Account {
+    /// Get the public key from a user
+    pub fn public_key(&self) -> XOnlyPublicKey {
+        self.0.x_only_public_key().0
+    }
+
+    /// Creates a new event with the given content
+    pub fn sign_content(
+        &self,
+        tags: Vec<Tag>,
+        content: Content,
+        created_at: Option<DateTime<Utc>>,
+    ) -> Result<Event, Error> {
+        let unsigned_event =
+            UnsignedEvent::new(self.public_key().into(), tags, content, created_at)?;
+        let id = unsigned_event.id()?;
+        let to_sign = Message::from_slice(&*id)?;
+        let secp = Secp256k1::new();
+        let signature = secp.sign_schnorr_no_aux_rand(&to_sign, &self.0);
+
+        Event::new(unsigned_event, signature.into())
+    }
+}
+
+impl From<KeyPair> for Account {
+    fn from(keypair: KeyPair) -> Self {
+        Self(keypair)
+    }
+}
+
+impl From<&SecretKey> for Account {
+    fn from(secret_key: &SecretKey) -> Self {
+        Self(KeyPair::from_secret_key(&Secp256k1::new(), secret_key))
+    }
+}
+
+impl From<SecretKey> for Account {
+    fn from(secret_key: SecretKey) -> Self {
+        Self(KeyPair::from_secret_key(&Secp256k1::new(), &secret_key))
+    }
+}
+
+impl Default for Account {
+    fn default() -> Self {
+        let secp = Secp256k1::new();
+        let (secret_key, _) = secp.generate_keypair(&mut OsRng);
+        secret_key.into()
+    }
+}

+ 1 - 0
crates/types/src/lib.rs

@@ -3,6 +3,7 @@
 //! The types needed to interact with a Nostr relayer, or to be become one.
 #![deny(missing_docs, warnings)]
 
+pub mod account;
 pub mod client;
 pub mod common;
 pub mod relayer;

+ 7 - 0
crates/types/src/types/id.rs

@@ -1,4 +1,5 @@
 //! This mod wraps the event Ids
+use secp256k1::XOnlyPublicKey;
 use serde::{
     de::{self, Deserializer},
     ser::{self, Serializer},
@@ -46,6 +47,12 @@ impl TryFrom<String> for Id {
     }
 }
 
+impl From<XOnlyPublicKey> for Id {
+    fn from(public_key: XOnlyPublicKey) -> Self {
+        Self(public_key.serialize())
+    }
+}
+
 impl<'de> Deserialize<'de> for Id {
     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     where

+ 6 - 0
crates/types/src/types/signature.rs

@@ -27,6 +27,12 @@ pub enum Error {
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Signature(pub [u8; 64]);
 
+impl From<secp256k1::schnorr::Signature> for Signature {
+    fn from(signature: secp256k1::schnorr::Signature) -> Self {
+        Self(signature.as_ref().clone())
+    }
+}
+
 impl AsRef<[u8]> for Signature {
     fn as_ref(&self) -> &[u8] {
         &self.0