Browse Source

Simplified API for client and pools

Expose raw response form socket instead of wrapping it in a Event. There is no
disconnection event, as that is an internal state of the client and the pool
Cesar Rodas 3 months ago
parent
commit
e9a961ecdc
5 changed files with 39 additions and 36 deletions
  1. 1 5
      crates/client/src/lib.rs
  2. 4 16
      crates/client/src/pool.rs
  3. 5 10
      crates/dump/src/main.rs
  4. 18 0
      crates/types/src/client/subscribe.rs
  5. 11 5
      src/main.rs

+ 1 - 5
crates/client/src/lib.rs

@@ -12,8 +12,4 @@ mod client;
 mod error;
 mod pool;
 
-pub use self::{
-    client::Client,
-    error::Error,
-    pool::{Event, Pool},
-};
+pub use self::{client::Client, error::Error, pool::Pool};

+ 4 - 16
crates/client/src/pool.rs

@@ -20,8 +20,8 @@ use url::Url;
 #[derive(Debug)]
 pub struct Pool {
     clients: HashMap<Url, Client>,
-    sender: mpsc::Sender<(Event, Url)>,
-    receiver: mpsc::Receiver<(Event, Url)>,
+    sender: mpsc::Sender<(Response, Url)>,
+    receiver: mpsc::Receiver<(Response, Url)>,
     subscriptions: HashMap<SubscriptionId, Vec<ActiveSubscription>>,
 }
 
@@ -31,18 +31,6 @@ impl Default for Pool {
     }
 }
 
-/// Client event
-///
-/// This type wraps a response a disconnected event. The disconnected will be
-/// the last event to be sent.
-#[derive(Debug, Clone)]
-pub enum Event {
-    /// A response
-    Response(Box<Response>),
-    /// A disconnection event
-    Disconnected,
-}
-
 const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 10_000;
 
 impl Pool {
@@ -58,12 +46,12 @@ impl Pool {
     }
 
     /// Tries to receive a message from any of the connected relayers
-    pub fn try_recv(&mut self) -> Option<(Event, Url)> {
+    pub fn try_recv(&mut self) -> Option<(Response, Url)> {
         self.receiver.try_recv().ok()
     }
 
     /// Receives a message from any of the connected relayers
-    pub async fn recv(&mut self) -> Option<(Event, Url)> {
+    pub async fn recv(&mut self) -> Option<(Response, Url)> {
         self.receiver.recv().await
     }
 

+ 5 - 10
crates/dump/src/main.rs

@@ -1,4 +1,4 @@
-use nostr_rs_client::{Error as ClientError, Event, Pool};
+use nostr_rs_client::{Error as ClientError, Pool};
 use nostr_rs_types::{client::Subscribe, Response};
 
 #[derive(Debug, thiserror::Error)]
@@ -31,15 +31,10 @@ async fn main() {
     loop {
         if let Some((msg, relayed_by)) = clients.recv().await {
             match msg {
-                Event::Response(r) => match *r {
-                    Response::Event(x) => {
-                        println!("{} => {:?}", relayed_by, x);
-                    }
-                    Response::EndOfStoredEvents(_) => {}
-                    msg => {
-                        println!("{} {:?}", relayed_by, msg);
-                    }
-                },
+                Response::Event(x) => {
+                    println!("{} => {:?}", relayed_by, x);
+                }
+                Response::EndOfStoredEvents(_) => {}
                 msg => {
                     println!("{} {:?}", relayed_by, msg);
                 }

+ 18 - 0
crates/types/src/client/subscribe.rs

@@ -18,6 +18,24 @@ pub struct Subscribe {
     pub filters: Vec<types::Filter>,
 }
 
+impl From<types::Filter> for Subscribe {
+    fn from(filter: types::Filter) -> Self {
+        Self {
+            filters: vec![filter],
+            ..Default::default()
+        }
+    }
+}
+
+impl From<Vec<types::Filter>> for Subscribe {
+    fn from(filters: Vec<types::Filter>) -> Self {
+        Self {
+            filters,
+            ..Default::default()
+        }
+    }
+}
+
 impl Default for Subscribe {
     fn default() -> Self {
         Self {

+ 11 - 5
src/main.rs

@@ -1,5 +1,4 @@
 use futures::Future;
-use nostr_rs_client::Event;
 use nostr_rs_rocksdb::RocksDb;
 use nostr_rs_types::{types::Filter, Request, Response};
 use std::{collections::HashMap, env, fs, pin::Pin, sync::Arc};
@@ -15,7 +14,7 @@ mod config;
 use config::Config;
 
 fn on_connection(
-    host: &Url,
+    host: Url,
     _socket: mpsc::Sender<Request>,
 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
     Box::pin(async move {})
@@ -64,11 +63,11 @@ async fn main() {
     println!("{:#?}", config);
 
     let db = RocksDb::new(&config.db_path).expect("db");
-    let client_pool = config
+    let mut client_pool = config
         .relayers
         .iter()
         .fold(nostr_rs_client::Pool::new(), |clients, relayer_url| {
-            clients.connect_to(relayer_url.clone(), Some(on_connection))
+            clients.connect_to(relayer_url.clone())
         });
 
     let initial_subscription = Filter {
@@ -80,7 +79,14 @@ async fn main() {
         ..Default::default()
     };
 
-    println!("Initial subscription: {:?}", initial_subscription);
+    let _ = client_pool.subscribe(initial_subscription.into()).await;
+    loop {
+        tokio::select! {
+            Some((event, url)) = client_pool.recv() => {
+                println!("Recv: {} from {}", serde_json::to_string(&event).expect("valid json"), url);
+            }
+        }
+    }
 
     /*
     let db = RocksDb::new("./relayer-db").expect("db");