Browse Source

Build events relationships in the background

Cesar Rodas 2 years ago
parent
commit
f36e3b246b
3 changed files with 229 additions and 49 deletions
  1. 32 9
      crates/client/src/lib.rs
  2. 5 0
      crates/types/src/types/event.rs
  3. 192 40
      src/main.rs

+ 32 - 9
crates/client/src/lib.rs

@@ -1,4 +1,4 @@
-use futures_util::{SinkExt, StreamExt};
+use futures_util::{stream::FuturesUnordered, SinkExt, StreamExt};
 use nostr_rs_types::{Request, Response};
 use parking_lot::RwLock;
 use std::{collections::HashMap, sync::Arc};
@@ -30,7 +30,7 @@ pub struct Client {
 
     pub sender: mpsc::Sender<Request>,
 
-    receiver: broadcast::Receiver<Response>,
+    receiver: broadcast::Receiver<(String, Response)>,
     stopper: oneshot::Sender<()>,
 }
 
@@ -49,12 +49,14 @@ impl Client {
 
     fn spawn(
         mut receiver: mpsc::Receiver<Request>,
-        url: &str,
-    ) -> Result<(broadcast::Receiver<Response>, oneshot::Sender<()>), Error> {
-        let url = Url::parse(url)?;
+        url_str: &str,
+    ) -> Result<(broadcast::Receiver<(String, Response)>, oneshot::Sender<()>), Error> {
+        let url = Url::parse(url_str)?;
         let (response_sender, response_receiver) = broadcast::channel(10_000);
         let (stopper_sender, mut stopper_recv) = oneshot::channel();
 
+        let url_str = url_str.to_owned();
+
         tokio::spawn(async move {
             let (mut socket, _) = connect_async(url).await.expect("valid connection");
             loop {
@@ -82,7 +84,7 @@ impl Client {
                         let msg: Result<Response, _> = serde_json::from_str(&msg);
 
                         if let Ok(msg) = msg {
-                            if let Err(error) = response_sender.send(msg) {
+                            if let Err(error) = response_sender.send((url_str.to_owned(), msg)) {
                                 println!("Disconnecting client because of {}", error);
                                 break;
                             }
@@ -95,7 +97,7 @@ impl Client {
         Ok((response_receiver, stopper_sender))
     }
 
-    pub fn subscribe(&self) -> broadcast::Receiver<Response> {
+    pub fn subscribe(&self) -> broadcast::Receiver<(String, Response)> {
         self.receiver.resubscribe()
     }
 
@@ -111,7 +113,7 @@ impl Client {
 #[derive(Debug, Clone)]
 pub struct Clients {
     clients: Arc<RwLock<HashMap<String, Client>>>,
-    subscriptions: Arc<RwLock<Vec<broadcast::Receiver<Response>>>>,
+    subscriptions: Arc<RwLock<Vec<broadcast::Receiver<(String, Response)>>>>,
 }
 
 impl Default for Clients {
@@ -124,7 +126,27 @@ impl Default for Clients {
 }
 
 impl Clients {
-    pub fn try_recv(&self) -> Option<Response> {
+    pub async fn recv(&self) -> Option<(String, Response)> {
+        let mut subscriptions = self
+            .subscriptions
+            .read()
+            .iter()
+            .map(|s| s.resubscribe())
+            .collect::<Vec<_>>();
+
+        let mut futures = FuturesUnordered::new();
+
+        for sub in subscriptions.iter_mut() {
+            futures.push(sub.recv());
+        }
+
+        if let Some(Ok(response)) = futures.next().await {
+            Some(response)
+        } else {
+            None
+        }
+    }
+    pub fn try_recv(&self) -> Option<(String, Response)> {
         let mut subscriptions = self.subscriptions.write();
         for sub in subscriptions.iter_mut() {
             if let Ok(msg) = sub.try_recv() {
@@ -152,6 +174,7 @@ impl Clients {
         Ok(if clients.get(url).is_some() {
             false
         } else {
+            println!("Connecting to {}", url);
             let client = Client::new(url)?;
             let mut subscriptions = self.subscriptions.write();
             subscriptions.push(client.subscribe());

+ 5 - 0
crates/types/src/types/event.rs

@@ -179,6 +179,11 @@ impl Event {
         Self::verify_signature(&self.inner.public_key, &self.signature, &self.id)
     }
 
+    /// Returns a reference to the inner content (the parsed content)
+    pub fn content(&self) -> &Content {
+        &self.inner.content
+    }
+
     /// Verifies if the Id, Public Key and Signature are correct
     fn verify_signature(
         public_key: &Id,

+ 192 - 40
src/main.rs

@@ -1,7 +1,9 @@
+use std::collections::HashMap;
+
 use nostr_rs_client::{Clients, Error as ClientError};
 use nostr_rs_types::{
     client::{Close, Subscribe},
-    types::{Addr, Filter, Kind},
+    types::{Addr, Content, Event, Filter, Kind, SubscriptionId, Tag},
     Request, Response,
 };
 use sqlx::{query, FromRow, Pool, Sqlite, SqlitePool};
@@ -12,6 +14,17 @@ struct PublicKeys {
     pub public_key: String,
 }
 
+#[derive(Clone, FromRow, Debug)]
+struct Relayer {
+    pub url: String,
+}
+
+#[derive(Clone, FromRow, Debug)]
+struct RawValue {
+    pub id: String,
+    pub event: String,
+}
+
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("Sql: {0}")]
@@ -24,6 +37,112 @@ pub enum Error {
     Client(#[from] ClientError),
 }
 
+async fn discover_relayers(conn: Pool<Sqlite>) -> Result<(), Error> {
+    let mut tx = conn.begin().await?;
+    let values = sqlx::query_as::<_, RawValue>(
+        r#"SELECT id, event FROM events WHERE processed = 1 LIMIT 10000"#,
+    )
+    .fetch_all(&mut tx)
+    .await?;
+
+    let mut relayers = HashMap::new();
+
+    for value in values {
+        let event: Result<Event, _> = serde_json::from_str(&value.event);
+        match event {
+            Ok(event) => {
+                match event.content() {
+                    Content::Contacts(x) => x
+                        .iter()
+                        .map(|(relayer, _)| relayers.insert(relayer.to_ascii_lowercase(), 1))
+                        .for_each(drop),
+                    _ => {}
+                }
+                for tag in event.inner.tags.iter() {
+                    match tag {
+                        Tag::PubKey(pubkey) => {
+                            if let Some(relayer) = pubkey.relayer_url.as_ref() {
+                                if !relayer.is_empty() {
+                                    relayers.insert(relayer.to_ascii_lowercase(), 1);
+                                }
+                            }
+                        }
+                        Tag::Event(tag) => {
+                            if let Some(relayer) = tag.relayer_url.as_ref() {
+                                if !relayer.is_empty() {
+                                    relayers.insert(relayer.to_ascii_lowercase(), 1);
+                                }
+                            }
+                        }
+                        _ => {}
+                    }
+                }
+            }
+            _ => {}
+        }
+
+        sqlx::query("UPDATE events SET processed = 2 WHERE id = ?")
+            .bind(&value.id)
+            .execute(&mut tx)
+            .await?;
+    }
+
+    for relayer in relayers.keys() {
+        let _ = sqlx::query("INSERT INTO relayers (url) values(?)")
+            .bind(relayer)
+            .execute(&mut tx)
+            .await;
+    }
+
+    tx.commit().await?;
+    Ok(())
+}
+
+async fn process_events(conn: Pool<Sqlite>) -> Result<(), Error> {
+    let mut tx = conn.begin().await?;
+    let values = sqlx::query_as::<_, RawValue>(
+        r#"SELECT id, event FROM events WHERE processed = 0 LIMIT 10000"#,
+    )
+    .fetch_all(&mut tx)
+    .await?;
+
+    for value in values {
+        let event: Result<Event, _> = serde_json::from_str(&value.event);
+        match event {
+            Ok(event) => {
+                for tag in event.inner.tags.iter() {
+                    match tag {
+                        Tag::PubKey(pubkey) => {
+                            let _ = sqlx::query("INSERT INTO relationships (source_id, relates_to, type) values(?, ?, 2)")
+                                .bind(&value.id)
+                                .bind(pubkey.id.to_hex())
+                                .execute(&mut tx)
+                                .await;
+                        }
+                        Tag::Event(tag) => {
+                            let _ = sqlx::query("INSERT INTO relationships (source_id, relates_to, type) values(?, ?, 1)")
+                                .bind(&value.id)
+                                .bind(tag.id.to_hex())
+                                .execute(&mut tx)
+                                .await;
+                        }
+                        _ => {}
+                    }
+                }
+            }
+            _ => {}
+        }
+
+        sqlx::query("UPDATE events SET processed = 1 WHERE id = ?")
+            .bind(&value.id)
+            .execute(&mut tx)
+            .await?;
+    }
+
+    tx.commit().await?;
+    Ok(())
+}
+
 async fn request_profiles_from_db(clients: Clients, conn: Pool<Sqlite>) -> Result<(), Error> {
     let public_keys = sqlx::query_as::<_, PublicKeys>(
         r#"
@@ -36,6 +155,7 @@ async fn request_profiles_from_db(clients: Clients, conn: Pool<Sqlite>) -> Resul
             FROM events
             WHERE kind = 0
         )
+        LIMIT 50
         "#,
     )
     .fetch_all(&conn)
@@ -44,32 +164,34 @@ async fn request_profiles_from_db(clients: Clients, conn: Pool<Sqlite>) -> Resul
     .map(|p| p.public_key.as_str().try_into())
     .collect::<Result<Vec<Addr>, _>>()?;
 
-    let keys = public_keys.chunks(50).collect::<Vec<&[Addr]>>();
-    for (i, keys) in keys.iter().enumerate() {
-        println!("Fetching {} profiles", keys.len());
-        clients
-            .send(
-                Subscribe {
-                    subscription_id: format!("temp:{}", i).try_into().unwrap(),
-                    filters: vec![Filter {
-                        authors: keys.to_vec(),
-                        kinds: vec![
-                            Kind::Metadata,
-                            Kind::ShortTextNote,
-                            Kind::Contacts,
-                            Kind::Repost,
-                            Kind::Reaction,
-                            Kind::ZapRequest,
-                            Kind::Zap,
-                        ],
-                        ..Filter::default()
-                    }],
-                }
-                .into(),
-            )
-            .await;
-        sleep(Duration::from_millis(5_000)).await;
-    }
+    let subscription_id: SubscriptionId = "fetch_profiles".try_into().unwrap();
+
+    println!("Fetching {} profiles", public_keys.len());
+    clients
+        .send(
+            Subscribe {
+                subscription_id: subscription_id.clone(),
+                filters: vec![Filter {
+                    authors: public_keys,
+                    kinds: vec![
+                        Kind::Metadata,
+                        Kind::ShortTextNote,
+                        Kind::Contacts,
+                        Kind::Repost,
+                        Kind::Reaction,
+                        Kind::ZapRequest,
+                        Kind::Zap,
+                    ],
+                    ..Filter::default()
+                }],
+            }
+            .into(),
+        )
+        .await;
+    sleep(Duration::from_millis(15_000)).await;
+
+    let _ = clients.send(Close(subscription_id).into()).await;
+    println!("Remove listener");
 
     Ok(())
 }
@@ -86,12 +208,28 @@ async fn main() {
         .expect("register");
 
     let _ = query(
-        r#"CREATE TABLE events(
+        r#"
+    CREATE TABLE events(
         id varchar(64) not null primary key,
         public_key varchar(64) not null,
         kind int,
-        event text
-    )"#,
+        event text,
+        processed INT DEFAULT 0
+    );
+    CREATE INDEX events_processed_index ON events (processed);
+    CREATE TABLE relationships (
+        id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+        source_id VARCHAR(64) NOT NULL,
+        relates_to VARCHAR(64) NOT NULL,
+        type INT
+    );
+    CREATE INDEX relationships_source_id_index ON relationships (source_id);
+    CREATE TABLE relayers (
+        id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+        url VARCHAR(64) NOT NULL
+    );
+    CREATE UNIQUE INDEX url ON relayers (url);
+    "#,
     )
     .execute(&conn)
     .await;
@@ -105,15 +243,35 @@ async fn main() {
         }
     });
 
+    let conn_for_worker = conn.clone();
+    tokio::spawn(async move {
+        loop {
+            let _ = process_events(conn_for_worker.clone()).await;
+            let _ = discover_relayers(conn_for_worker.clone()).await;
+            sleep(Duration::from_millis(5_000)).await;
+        }
+    });
+
+    let relayers = sqlx::query_as::<_, Relayer>(
+        r#"select url from relayers where url like 'wss://%.%/' limit 20"#,
+    )
+    .fetch_all(&conn)
+    .await
+    .expect("query");
+
+    for relayer in relayers {
+        let _ = clients.connect_to(&relayer.url).await;
+    }
+
     let request: Request = Subscribe::default().into();
 
     clients.send(request).await;
 
     loop {
-        if let Some(msg) = clients.try_recv() {
+        if let Some((hostname, msg)) = clients.recv().await {
             match msg {
                 Response::Notice(n) => {
-                    println!("Error: {}", &*n);
+                    panic!("Error: {}", &*n);
                 }
                 Response::EndOfStoredEvents(x) => {
                     let subscription_id = &*x;
@@ -131,11 +289,7 @@ async fn main() {
                         .fetch_optional(&conn)
                         .await
                     {
-                        /*println!(
-                            "Skip storing: {} -> {:?}",
-                            event.id.to_string(),
-                            event.inner
-                        );*/
+                        //println!("Skip storing: {}", event.id.to_string(),);
                         continue;
                     }
                     let _ = query(
@@ -148,12 +302,10 @@ async fn main() {
                     .execute(&conn)
                     .await;
 
-                    //println!("Stored: {}", event.id.to_string());
+                    println!("Stored: {} (from {})", event.id.to_string(), hostname);
                 }
                 _ => {}
             };
         }
-
-        sleep(Duration::from_millis(10)).await;
     }
 }