Jelajahi Sumber

Merge branch 'client-test-suite' of cesar/nostr-prototype into main

Cesar Rodas 3 bulan lalu
induk
melakukan
e1382b02cc

+ 2 - 0
Cargo.lock

@@ -1305,7 +1305,9 @@ version = "0.26.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4124a35fe33ae14259c490fd70fa199a32b9ce9502f2ee6bc4f81ec06fa65894"
 dependencies = [
+ "rand",
  "secp256k1-sys",
+ "serde",
 ]
 
 [[package]]

+ 26 - 12
crates/client/src/client.rs

@@ -119,7 +119,7 @@ impl Client {
                     Ok(x) => x.0,
                     Err(err) => {
                         log::warn!("{}: Failed to connect: {}", url, err);
-                        sleep(Duration::from_secs(5)).await;
+                        sleep(Duration::from_secs(1)).await;
                         continue;
                     }
                 };
@@ -127,23 +127,39 @@ impl Client {
                 log::info!("Connected to {}", url);
                 connection_attempts = 0;
 
-                let subscriptions = send_on_connection
+                let mut subscriptions = send_on_connection
                     .read()
                     .await
                     .iter()
-                    .filter_map(|x| serde_json::to_string(&Request::Request(x.1.clone())).ok())
-                    .map(Message::Text)
-                    .collect::<Vec<_>>();
-
-                for msg in subscriptions {
-                    if let Err(x) = socket.send(msg).await {
-                        log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                    .map(|(sub_id, msg)| {
+                        (
+                            sub_id.to_owned(),
+                            serde_json::to_string(&Request::Request(msg.clone()))
+                                .ok()
+                                .map(Message::Text),
+                        )
+                    })
+                    .collect::<HashMap<_, _>>();
+
+                for msg in subscriptions.values_mut() {
+                    if let Some(msg) = msg.take() {
+                        if let Err(x) = socket.send(msg).await {
+                            log::error!("{}: Reconnecting due error at sending: {:?}", url, x);
+                        }
                     }
                 }
 
+                is_connected.store(true, Relaxed);
+
                 loop {
                     tokio::select! {
                         Some(msg) = send_to_socket.recv() => {
+                            if let Request::Request(sub) = &msg {
+                                if subscriptions.get(&sub.subscription_id).is_some() {
+                                    log::warn!("{}: Already subscribed to {}", url, sub.subscription_id);
+                                    continue;
+                                }
+                            }
                             if let Ok(json) = serde_json::to_string(&msg) {
                                 log::info!("{}: Sending {}", url, json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
@@ -154,7 +170,6 @@ impl Client {
                         }
                         msg = timeout(Duration::from_secs(NO_ACTIVITY_TIMEOUT_SECS), socket.next()) => {
                             let msg = if let Ok(Some(Ok(msg))) = msg {
-                                is_connected.store(true, Relaxed);
                                     match msg {
                                         Message::Text(text) => text,
                                         Message::Ping(msg) => {
@@ -180,11 +195,10 @@ impl Client {
 
                             log::info!("New message: {}", msg);
 
-
                             let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = msg {
-                                if let Err(error) = send_message_to_listener.try_send((msg.into(), url.clone())) {
+                                if let Err(error) = send_message_to_listener.try_send((msg, url.clone())) {
                                     log::error!("{}: Reconnecting client because of {}", url, error);
                                     break;
                                 }

+ 5 - 1
crates/client/src/lib.rs

@@ -14,4 +14,8 @@ mod pool;
 
 pub use url::Url;
 
-pub use self::{client::Client, error::Error, pool::Pool};
+pub use self::{
+    client::Client,
+    error::Error,
+    pool::{Pool, PoolSubscription},
+};

+ 186 - 13
crates/client/src/pool.rs

@@ -43,6 +43,7 @@ impl Default for Pool {
 }
 
 /// Return a subscription that will be removed when dropped
+#[derive(Debug)]
 pub struct PoolSubscription {
     subscription_id: SubscriptionId,
     subscriptions: Subscriptions,
@@ -177,49 +178,221 @@ mod test {
     use super::*;
     use nostr_rs_memory::Memory;
     use nostr_rs_relayer::Relayer;
+    use nostr_rs_types::{account::Account, types::Content};
     use std::time::Duration;
     use tokio::{net::TcpListener, task::JoinHandle, time::sleep};
 
-    async fn dummy_server() -> (Url, JoinHandle<()>) {
-        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+    async fn dummy_server(port: u16) -> (Url, JoinHandle<()>) {
+        let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
+            .await
+            .unwrap();
         let local_addr = listener.local_addr().expect("addr");
 
         let relayer = Relayer::new(Some(Memory::default()), None).expect("valid dummy server");
         let stopper = relayer.main(listener).expect("valid main loop");
         (
-            Url::parse(&format!("ws://{}", local_addr.to_string())).expect("valid url"),
+            Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
             stopper,
         )
     }
 
     #[tokio::test]
     async fn droppable_subscription() {
-        let pool = Pool::default();
-        let subscription = pool
+        let client_pool = Pool::default();
+        let subscription = client_pool
             .subscribe(Default::default())
             .await
             .expect("valid subscription");
 
-        assert_eq!(pool.active_subscriptions().await, 1);
+        assert_eq!(client_pool.active_subscriptions().await, 1);
         drop(subscription);
         sleep(Duration::from_millis(10)).await;
-        assert_eq!(pool.active_subscriptions().await, 0);
+        assert_eq!(client_pool.active_subscriptions().await, 0);
     }
 
     #[tokio::test]
     async fn connect_to_dummy_server() {
-        let (addr, stopper) = dummy_server().await;
-        let pool = Pool::new_with_clients(vec![addr]);
+        let (addr, stopper) = dummy_server(0).await;
+        let client_pool = Pool::new_with_clients(vec![addr]);
 
-        assert_eq!(0, pool.check_active_connections().await);
+        assert_eq!(0, client_pool.check_active_connections().await);
 
-        sleep(Duration::from_millis(1000)).await;
-        assert_eq!(1, pool.check_active_connections().await);
+        sleep(Duration::from_millis(10)).await;
+        assert_eq!(1, client_pool.check_active_connections().await);
 
         // stop dummy server
         stopper.abort();
 
         sleep(Duration::from_millis(100)).await;
-        assert_eq!(0, pool.check_active_connections().await);
+        assert_eq!(0, client_pool.check_active_connections().await);
+    }
+
+    #[tokio::test]
+    async fn two_clients_communication() {
+        let (addr, _) = dummy_server(0).await;
+        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
+        let client_pool2 = Pool::new_with_clients(vec![addr]);
+
+        let _sub1 = client_pool1
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+        assert!(client_pool1.try_recv().is_none());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
+    }
+
+    #[tokio::test]
+    async fn reconnect_and_resubscribe() {
+        let (addr, stopper) = dummy_server(0).await;
+        let mut client_pool1 = Pool::new_with_clients(vec![addr.clone()]);
+        let client_pool2 = Pool::new_with_clients(vec![addr.clone()]);
+
+        let _sub1 = client_pool1
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+        assert!(client_pool1.try_recv().is_none());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 1".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
+
+        assert_eq!(1, client_pool1.check_active_connections().await);
+        assert_eq!(1, client_pool2.check_active_connections().await);
+
+        stopper.abort();
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(0, client_pool1.check_active_connections().await);
+        assert_eq!(0, client_pool2.check_active_connections().await);
+
+        let (_, stopper) = dummy_server(addr.port().expect("port")).await;
+
+        sleep(Duration::from_millis(2_000)).await;
+
+        assert_eq!(1, client_pool1.check_active_connections().await);
+        assert_eq!(1, client_pool2.check_active_connections().await);
+
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 1".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
+
+        stopper.abort();
+    }
+
+    #[tokio::test]
+    async fn connect_multiple_servers() {
+        let (addr1, _) = dummy_server(0).await;
+        let (addr2, _) = dummy_server(0).await;
+        let mut client_pool1 = Pool::new_with_clients(vec![addr1.clone(), addr2]);
+        let client_pool2 = Pool::new_with_clients(vec![addr1]);
+
+        let _sub1 = client_pool1
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+
+        assert!(client_pool1
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+            .expect("valid signed content");
+
+        client_pool2.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            client_pool1
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(client_pool1.try_recv().is_none());
     }
 }

+ 18 - 12
crates/relayer/src/connection.rs

@@ -1,7 +1,8 @@
 use crate::{subscription::ActiveSubscription, Error};
 use futures_util::{SinkExt, StreamExt};
+use nostr_rs_client::PoolSubscription;
 use nostr_rs_types::{
-    relayer::{Auth, ROk},
+    relayer::ROk,
     types::{Addr, SubscriptionId},
     Request, Response,
 };
@@ -54,7 +55,8 @@ impl ConnectionId {
 pub struct Connection {
     conn_id: ConnectionId,
     sender: Sender<Response>,
-    subscriptions: RwLock<HashMap<SubscriptionId, Vec<ActiveSubscription>>>,
+    subscriptions:
+        RwLock<HashMap<SubscriptionId, (Option<PoolSubscription>, Vec<ActiveSubscription>)>>,
     handler: Option<JoinHandle<()>>,
 }
 
@@ -76,7 +78,7 @@ impl Connection {
             Self {
                 conn_id: ConnectionId::default(),
                 sender,
-                subscriptions: RwLock::new(HashMap::new()),
+                subscriptions: Default::default(),
                 handler: None,
             },
             receiver,
@@ -92,11 +94,10 @@ impl Connection {
         let websocket = accept_async(stream).await?;
         let conn_id = Default::default();
         let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
-        let _ = sender.send(Auth::default().into()).await;
         Ok(Self {
             conn_id,
             sender,
-            subscriptions: RwLock::new(HashMap::new()),
+            subscriptions: Default::default(),
             handler: Some(Self::spawn(
                 send_message_to_relayer,
                 websocket,
@@ -107,6 +108,7 @@ impl Connection {
         })
     }
 
+    /// Spawn a new worker for this connection
     fn spawn(
         send_message_to_relayer: Sender<(ConnectionId, Request)>,
         websocket: WebSocketStream<TcpStream>,
@@ -115,8 +117,7 @@ impl Connection {
         conn_id: ConnectionId,
     ) -> JoinHandle<()> {
         tokio::spawn(async move {
-            let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
-            let (mut writer, mut reader) = websocket.split();
+            let (mut ws_sender, mut ws_receiver) = websocket.split();
             loop {
                 tokio::select! {
                     Some(msg) = receiver.recv() => {
@@ -125,12 +126,12 @@ impl Connection {
                         } else {
                             continue;
                         };
-                        if let Err(err) =  writer.send(Message::Text(msg)).await {
+                        if let Err(err) =  ws_sender.send(Message::Text(msg)).await {
                             log::error!("Error sending message to client: {}", err);
                             break;
                         }
                     }
-                    Some(msg) = reader.next() => {
+                    Some(msg) = ws_receiver.next() => {
                         if let Ok(Message::Text(msg)) = msg {
                             let msg: Result<Request, _> = serde_json::from_str(&msg);
                             match msg {
@@ -149,7 +150,7 @@ impl Connection {
                                     } else {
                                         continue;
                                     };
-                                    if let Err(err) =  writer.send(Message::Text(reply)).await {
+                                    if let Err(err) =  ws_sender.send(Message::Text(reply)).await {
                                         log::error!("Error sending message to client: {}", err);
                                         break;
                                     }
@@ -188,11 +189,16 @@ impl Connection {
     }
 
     /// Create a subscription for this connection
-    pub async fn keep_track_subscription(
+    pub async fn subscribe(
         &self,
         id: SubscriptionId,
-        subscriptions: Vec<ActiveSubscription>,
+        subscriptions: (Option<PoolSubscription>, Vec<ActiveSubscription>),
     ) {
         self.subscriptions.write().await.insert(id, subscriptions);
     }
+
+    /// Remove a subscription for this connection
+    pub async fn unsubscribe(&self, id: &SubscriptionId) {
+        self.subscriptions.write().await.remove(id);
+    }
 }

+ 174 - 14
crates/relayer/src/relayer.rs

@@ -122,6 +122,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
         }))
     }
 
+    /// Handle the client pool
+    ///
+    /// Main loop to consume messages from the client pool and broadcast them to the local subscribers
     fn handle_client_pool(
         client_pool: Pool,
         sender: Sender<(ConnectionId, Request)>,
@@ -140,6 +143,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                                 ))
                                 .await;
                         }
+                        Response::EndOfStoredEvents(_) => {}
                         x => {
                             println!("x => {:?}", x);
                         }
@@ -195,11 +199,18 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 }
             }
             Request::Request(request) => {
-                if let Some((client_pool, _)) = self.client_pool.as_ref() {
+                let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref()
+                {
                     // pass the subscription request to the pool of clients, so this relayer
                     // can relay any unknown event to the clients through their subscriptions
-                    let _ = client_pool.subscribe(request.filters.clone().into()).await;
-                }
+                    Some(
+                        client_pool
+                            .subscribe(request.filters.clone().into())
+                            .await?,
+                    )
+                } else {
+                    None
+                };
 
                 if let Some(storage) = self.storage.as_ref() {
                     // Sent all events that match the filter that are stored in our database
@@ -222,20 +233,23 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     .send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
 
                 connection
-                    .keep_track_subscription(
+                    .subscribe(
                         request.subscription_id.clone(),
-                        self.subscriptions
-                            .subscribe(
-                                connection.get_conn_id(),
-                                connection.get_sender(),
-                                request.clone(),
-                            )
-                            .await,
+                        (
+                            foreign_subscription,
+                            self.subscriptions
+                                .subscribe(
+                                    connection.get_conn_id(),
+                                    connection.get_sender(),
+                                    request.clone(),
+                                )
+                                .await,
+                        ),
                     )
                     .await;
             }
-            Request::Close(_close) => {
-                todo!()
+            Request::Close(close) => {
+                connection.unsubscribe(&*close).await;
             }
         };
 
@@ -259,11 +273,27 @@ mod test {
 
     use super::*;
     use futures::future::join_all;
+    use nostr_rs_client::Url;
     use nostr_rs_memory::Memory;
-    use nostr_rs_types::Request;
+    use nostr_rs_types::{account::Account, types::Content, Request};
     use serde_json::json;
     use tokio::time::sleep;
 
+    async fn dummy_server(port: u16, client_pool: Option<Pool>) -> (Url, JoinHandle<()>) {
+        let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
+            .await
+            .unwrap();
+        let local_addr = listener.local_addr().expect("addr");
+
+        let relayer =
+            Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
+        let stopper = relayer.main(listener).expect("valid main loop");
+        (
+            Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
+            stopper,
+        )
+    }
+
     fn get_note() -> Request {
         serde_json::from_value(json!(
             [
@@ -732,4 +762,134 @@ mod test {
 
         assert_eq!(relayer.total_subscribers(), 0);
     }
+
+    #[tokio::test]
+    async fn relayer_posts_to_custom_posts_to_all_clients() {
+        let (relayer1, _) = dummy_server(0, None).await;
+        let (relayer2, _) = dummy_server(0, None).await;
+        let (relayer3, _) = dummy_server(0, None).await;
+        let (main_relayer, _) = dummy_server(
+            0,
+            Some(Pool::new_with_clients(vec![
+                relayer1.clone(),
+                relayer2.clone(),
+                relayer3.clone(),
+            ])),
+        )
+        .await;
+
+        let mut reader_client =
+            Pool::new_with_clients(vec![relayer1.clone(), relayer2.clone(), relayer3.clone()]);
+        let main_client = Pool::new_with_clients(vec![main_relayer]);
+
+        let _sub = reader_client
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(20)).await;
+
+        for _ in 0..3 {
+            assert!(reader_client
+                .try_recv()
+                .map(|(r, _)| r)
+                .expect("valid message")
+                .as_end_of_stored_events()
+                .is_some());
+        }
+        assert!(reader_client.try_recv().is_none());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+            .expect("valid signed content");
+
+        // account1 posts a new note into the relayer1, and the main relayer
+        // should get a copy of it, as well as it is connected to relayer2 and
+        // relayer1.
+        main_client.post(signed_content.clone().into()).await;
+
+        sleep(Duration::from_millis(10)).await;
+
+        let responses = (0..3)
+            .map(|_| reader_client.try_recv().expect("valid message"))
+            .filter_map(|(r, url)| {
+                r.as_event()
+                    .map(|r| (url.port().expect("port"), r.to_owned()))
+            })
+            .collect::<HashMap<_, _>>();
+
+        assert!(reader_client.try_recv().is_none());
+
+        assert_eq!(responses.len(), 3);
+        assert_eq!(
+            responses
+                .get(&relayer1.port().expect("port"))
+                .map(|x| x.event.id.clone()),
+            Some(signed_content.id.clone())
+        );
+        assert_eq!(
+            responses
+                .get(&relayer2.port().expect("port"))
+                .map(|x| x.event.id.clone()),
+            Some(signed_content.id.clone())
+        );
+        assert_eq!(
+            responses
+                .get(&relayer3.port().expect("port"))
+                .map(|x| x.event.id.clone()),
+            Some(signed_content.id)
+        );
+    }
+
+    #[tokio::test]
+    async fn relayer_with_client_pool() {
+        let (relayer1, _) = dummy_server(0, None).await;
+        let (relayer2, _) = dummy_server(0, None).await;
+        let (main_relayer, _) = dummy_server(
+            0,
+            Some(Pool::new_with_clients(vec![relayer1.clone(), relayer2])),
+        )
+        .await;
+
+        let secondary_client = Pool::new_with_clients(vec![relayer1]);
+
+        // Create a subscription in the main relayer, main_client is only
+        // connected to the main relayer
+        let mut main_client = Pool::new_with_clients(vec![main_relayer]);
+        let _sub = main_client
+            .subscribe(Default::default())
+            .await
+            .expect("valid subscription");
+
+        sleep(Duration::from_millis(10)).await;
+        assert!(main_client
+            .try_recv()
+            .map(|(r, _)| r)
+            .expect("valid message")
+            .as_end_of_stored_events()
+            .is_some());
+        assert!(main_client.try_recv().is_none());
+
+        let account1 = Account::default();
+        let signed_content = account1
+            .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
+            .expect("valid signed content");
+
+        // account1 posts a new note into the relayer1, and the main relayer
+        // should get a copy of it, as well as it is connected to relayer2 and
+        // relayer1.
+        secondary_client.post(signed_content.clone().into()).await;
+
+        // wait for the note to be delivered
+        sleep(Duration::from_millis(10)).await;
+        assert_eq!(
+            Some((signed_content.id, signed_content.signature)),
+            main_client
+                .try_recv()
+                .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
+                .map(|x| (x.id, x.signature))
+        );
+        assert!(main_client.try_recv().is_none());
+    }
 }

+ 0 - 4
crates/relayer/src/subscription/manager.rs

@@ -95,14 +95,11 @@ impl Default for SubscriptionManager {
 
 impl SubscriptionManager {
     async fn unsubscribe(self: Arc<Self>, keys: Vec<SubIdx>) {
-        println!("block");
         let mut subscriptions = self.subscriptions.write().await;
-        println!("\tblocked");
         for sub in keys {
             subscriptions.remove(&sub);
         }
         self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
-        println!("released");
     }
 
     fn get_keys_from_event(event: &Event, min_prefix_match_len: usize) -> Vec<Key> {
@@ -215,7 +212,6 @@ impl SubscriptionManager {
                     if deliverded.contains(client) || !filter.check(&event) {
                         continue;
                     }
-                    println!("send");
 
                     let _ = sender.try_send(Response::Event((name, &event).into()));
                     deliverded.insert(client.clone());

+ 5 - 1
crates/types/Cargo.toml

@@ -13,7 +13,11 @@ custom_derive = "0.1.7"
 enum_derive = "0.1.7"
 hex = "0.4"
 rand = "0.8.5"
-secp256k1 = { version = "0.26.0", features = ["global-context"] }
+secp256k1 = { version = "0.26.0", features = [
+    "global-context",
+    "rand",
+    "serde",
+] }
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 sha2 = "0.10.6"

+ 59 - 0
crates/types/src/account.rs

@@ -0,0 +1,59 @@
+//!  Account related types
+use crate::types::{event::Error, Content, Event, Tag, UnsignedEvent};
+use chrono::{DateTime, Utc};
+use rand::rngs::OsRng;
+use secp256k1::{KeyPair, Message, Secp256k1, SecretKey, XOnlyPublicKey};
+
+/// Nostr User account
+#[derive(Debug, Clone)]
+pub struct Account(KeyPair);
+
+impl Account {
+    /// Get the public key from a user
+    pub fn public_key(&self) -> XOnlyPublicKey {
+        self.0.x_only_public_key().0
+    }
+
+    /// Creates a new event with the given content
+    pub fn sign_content(
+        &self,
+        tags: Vec<Tag>,
+        content: Content,
+        created_at: Option<DateTime<Utc>>,
+    ) -> Result<Event, Error> {
+        let unsigned_event =
+            UnsignedEvent::new(self.public_key().into(), tags, content, created_at)?;
+        let id = unsigned_event.id()?;
+        let to_sign = Message::from_slice(&*id)?;
+        let secp = Secp256k1::new();
+        let signature = secp.sign_schnorr_no_aux_rand(&to_sign, &self.0);
+
+        Event::new(unsigned_event, signature.into())
+    }
+}
+
+impl From<KeyPair> for Account {
+    fn from(keypair: KeyPair) -> Self {
+        Self(keypair)
+    }
+}
+
+impl From<&SecretKey> for Account {
+    fn from(secret_key: &SecretKey) -> Self {
+        Self(KeyPair::from_secret_key(&Secp256k1::new(), secret_key))
+    }
+}
+
+impl From<SecretKey> for Account {
+    fn from(secret_key: SecretKey) -> Self {
+        Self(KeyPair::from_secret_key(&Secp256k1::new(), &secret_key))
+    }
+}
+
+impl Default for Account {
+    fn default() -> Self {
+        let secp = Secp256k1::new();
+        let (secret_key, _) = secp.generate_keypair(&mut OsRng);
+        secret_key.into()
+    }
+}

+ 1 - 0
crates/types/src/lib.rs

@@ -3,6 +3,7 @@
 //! The types needed to interact with a Nostr relayer, or to be become one.
 #![deny(missing_docs, warnings)]
 
+pub mod account;
 pub mod client;
 pub mod common;
 pub mod relayer;

+ 7 - 0
crates/types/src/types/id.rs

@@ -1,4 +1,5 @@
 //! This mod wraps the event Ids
+use secp256k1::XOnlyPublicKey;
 use serde::{
     de::{self, Deserializer},
     ser::{self, Serializer},
@@ -46,6 +47,12 @@ impl TryFrom<String> for Id {
     }
 }
 
+impl From<XOnlyPublicKey> for Id {
+    fn from(public_key: XOnlyPublicKey) -> Self {
+        Self(public_key.serialize())
+    }
+}
+
 impl<'de> Deserialize<'de> for Id {
     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     where

+ 6 - 0
crates/types/src/types/signature.rs

@@ -27,6 +27,12 @@ pub enum Error {
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Signature(pub [u8; 64]);
 
+impl From<secp256k1::schnorr::Signature> for Signature {
+    fn from(signature: secp256k1::schnorr::Signature) -> Self {
+        Self(signature.as_ref().clone())
+    }
+}
+
 impl AsRef<[u8]> for Signature {
     fn as_ref(&self) -> &[u8] {
         &self.0