Browse Source

Improve fetching related content

Cesar Rodas 1 year ago
parent
commit
d8c638b243
1 changed files with 144 additions and 52 deletions
  1. 144 52
      src/main.rs

+ 144 - 52
src/main.rs

@@ -10,8 +10,9 @@ use sqlx::{query, FromRow, Pool, Sqlite, SqlitePool};
 use tokio::time::{sleep, Duration};
 
 #[derive(Clone, FromRow, Debug)]
-struct PublicKeys {
-    pub public_key: String,
+struct ToFetch {
+    pub id: String,
+    pub typ: i64,
 }
 
 #[derive(Clone, FromRow, Debug)]
@@ -171,58 +172,148 @@ async fn process_events(conn: Pool<Sqlite>) -> Result<(), Error> {
     Ok(())
 }
 
-async fn request_profiles_from_db(
-    clients: Clients,
-    conn: Pool<Sqlite>,
-    skip: usize,
-) -> Result<usize, Error> {
-    let public_keys = sqlx::query_as::<_, PublicKeys>(
+async fn fetch_related_content(clients: Clients, conn: Pool<Sqlite>) -> Result<(), Error> {
+    let mut tx = conn.begin().await?;
+    sqlx::query(r#"DELETE FROM to_fetch"#)
+        .execute(&mut tx)
+        .await?;
+
+    println!("Building to_fetch table...");
+
+    sqlx::query(
         r#"
+    INSERT INTO to_fetch SELECT * FROM (
         SELECT
-            distinct public_key
+            relates_to,
+            count(*) total,
+            1
         FROM
-            events
-        LIMIT ?, 50
+            relationships
+        WHERE type = 1 AND relates_to NOT IN (SELECT id FROM events)
+        GROUP BY relates_to
+    ) s ORDER by total DESC
+    "#,
+    )
+    .execute(&mut tx)
+    .await?;
+
+    sqlx::query(
+        r#"
+        INSERT INTO to_fetch SELECT * FROM (
+            SELECT
+                relates_to,
+                count(*) total,
+                2
+            FROM
+                relationships
+            WHERE
+                type = 2
+                AND relates_to NOT IN (SELECT public_key FROM events WHERE kind = 0)
+                AND relates_to NOT IN (SELECT id FROM to_fetch)
+            GROUP BY relates_to
+        ) s ORDER by total DESC
         "#,
     )
-    .bind(skip.to_string())
-    .fetch_all(&conn)
-    .await?
-    .iter()
-    .map(|p| p.public_key.as_str().try_into())
-    .collect::<Result<Vec<Addr>, _>>()?;
-
-    let subscription_id: SubscriptionId = "fetch_profiles".try_into().unwrap();
-    let len = public_keys.len();
-
-    println!("Fetching {} profiles (skip = {})", len, skip);
-    clients
-        .send(
-            Subscribe {
-                subscription_id: subscription_id.clone(),
-                filters: vec![Filter {
-                    authors: public_keys,
-                    kinds: vec![
-                        Kind::Metadata,
-                        Kind::ShortTextNote,
-                        Kind::Contacts,
-                        Kind::Repost,
-                        Kind::Reaction,
-                        Kind::ZapRequest,
-                        Kind::Zap,
-                    ],
-                    ..Filter::default()
-                }],
-            }
-            .into(),
+    .execute(&mut tx)
+    .await?;
+
+    tx.commit().await?;
+
+    loop {
+        let data_to_fetch = sqlx::query_as::<_, ToFetch>(
+            r#"
+        SELECT
+            id,
+            type as typ
+        FROM
+            to_fetch
+        ORDER BY refs DESC
+        LIMIT 40
+        "#,
         )
-        .await;
-    sleep(Duration::from_millis(15_000)).await;
+        .fetch_all(&conn)
+        .await?;
 
-    let _ = clients.send(Close(subscription_id).into()).await;
-    println!("Remove listener");
+        let mut public_keys = vec![];
+        let mut ids = vec![];
 
-    Ok(len)
+        for to_fetch in data_to_fetch.into_iter() {
+            if to_fetch.id.len() < 64 {
+                continue;
+            }
+
+            let id: Addr = if let Ok(x) = to_fetch.id.try_into() {
+                x
+            } else {
+                continue;
+            };
+
+            match to_fetch.typ {
+                1 => {
+                    ids.push(id);
+                }
+                2 => {
+                    public_keys.push(id);
+                }
+                _ => {}
+            }
+        }
+
+        let mut filters = vec![];
+
+        if !ids.is_empty() {
+            filters.push(Filter {
+                ids: ids,
+                ..Default::default()
+            });
+        }
+
+        if !public_keys.is_empty() {
+            filters.push(Filter {
+                authors: public_keys,
+                kinds: vec![
+                    Kind::Metadata,
+                    Kind::ShortTextNote,
+                    Kind::Contacts,
+                    Kind::Repost,
+                    Kind::Reaction,
+                    Kind::ZapRequest,
+                    Kind::Zap,
+                ],
+                ..Default::default()
+            });
+        }
+
+        if filters.is_empty() {
+            break;
+        }
+
+        let subscription_id = SubscriptionId::default();
+        println!("{} Fetching from related content", &*subscription_id,);
+
+        clients
+            .send(
+                Subscribe {
+                    subscription_id: subscription_id.clone(),
+                    filters,
+                    ..Default::default()
+                }
+                .into(),
+            )
+            .await;
+
+        sleep(Duration::from_millis(15_000)).await;
+
+        println!("{} Unsubscribing from related content", &*subscription_id);
+
+        let _ = clients.send(Close(subscription_id).into()).await;
+
+        sqlx::query("DELETE FROM to_fetch WHERE id IN (SELECT id FROM to_fetch ORDER BY refs DESC LIMIT 40)")
+            .execute(&conn)
+            .await?;
+    }
+
+    Ok(())
 }
 
 #[tokio::main]
@@ -244,7 +335,7 @@ async fn main() {
         processed INT DEFAULT 0
     );
     CREATE INDEX events_processed_index ON events (processed);
-    CREATE INDEX events_public_key_index ON events (public_key);
+    CREATE INDEX events_public_key_index ON events (public_key, kind);
     CREATE INDEX events_kind_index ON events (kind);
     CREATE TABLE relationships (
         id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -260,6 +351,8 @@ async fn main() {
     );
     CREATE UNIQUE INDEX url ON relayers (url);
     CREATE INDEX relayers_weight_index ON relayers (weight);
+    CREATE TABLE to_fetch(id varchar(64) not null primary key, refs int not null default 0, type int not null default 1);
+    CREATE INDEX refs ON to_fetch (refs);
     "#,
     )
     .execute(&conn)
@@ -268,12 +361,11 @@ async fn main() {
     let clients_for_worker = clients.clone();
     let conn_for_worker = conn.clone();
     tokio::spawn(async move {
-        let mut i = 0;
         loop {
-            let _ =
-                request_profiles_from_db(clients_for_worker.clone(), conn_for_worker.clone(), i)
-                    .await
-                    .map(|count| i += count);
+            let r1 =
+                fetch_related_content(clients_for_worker.clone(), conn_for_worker.clone()).await;
+            println!("Fetch related content {:?}", r1);
+            sleep(Duration::from_millis(5_000)).await;
         }
     });