瀏覽代碼

Fixed tags and base64 types bugs discovered while connecting to other relayers

Cesar Rodas 2 月之前
父節點
當前提交
65a48e22b5

+ 18 - 18
Cargo.lock

@@ -1209,8 +1209,8 @@ dependencies = [
  "futures",
  "log",
  "nostr-rs-client",
- "nostr-rs-memory",
  "nostr-rs-relayer",
+ "nostr-rs-sqlite",
  "nostr-rs-storage-base",
  "nostr-rs-types",
  "serde",
@@ -1253,6 +1253,23 @@ dependencies = [
 ]
 
 [[package]]
+name = "nostr-rs-sqlite"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "cuckoofilter",
+ "futures",
+ "hex",
+ "nostr-rs-storage-base",
+ "nostr-rs-subscription-manager",
+ "nostr-rs-types",
+ "serde",
+ "serde_json",
+ "sqlx",
+ "tokio",
+]
+
+[[package]]
 name = "nostr-rs-storage-base"
 version = "0.1.0"
 dependencies = [
@@ -1969,23 +1986,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "sqlite"
-version = "0.1.0"
-dependencies = [
- "async-trait",
- "cuckoofilter",
- "futures",
- "hex",
- "nostr-rs-storage-base",
- "nostr-rs-subscription-manager",
- "nostr-rs-types",
- "serde",
- "serde_json",
- "sqlx",
- "tokio",
-]
-
-[[package]]
 name = "sqlx"
 version = "0.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"

+ 6 - 1
Cargo.toml

@@ -9,7 +9,12 @@ members = [
     "crates/client",
     "crates/relayer",
     "crates/storage/base",
-    "crates/storage/rocksdb", "crates/dump", "crates/storage/memory", "crates/personal-relayer", "crates/subscription-manager", "crates/storage/sqlite",
+    "crates/storage/rocksdb",
+    "crates/dump",
+    "crates/storage/memory",
+    "crates/personal-relayer",
+    "crates/subscription-manager",
+    "crates/storage/sqlite",
 ]
 
 [dependencies]

+ 0 - 4
crates/client/src/client.rs

@@ -192,9 +192,7 @@ impl Client {
                                     continue;
                                 }
                             }
-                            println!("{}: Sending {:?}", url, msg);
                             if let Ok(json) = serde_json::to_string(&msg) {
-                                log::info!("{}: Sending {}", url, json);
                                 if let Err(x) = socket.send(Message::Text(json)).await {
                                     log::error!("{} : Reconnecting due {}", url, x);
                                     break;
@@ -226,8 +224,6 @@ impl Client {
                                 continue;
                             }
 
-                            log::info!("New message: {}", msg);
-
                             let event: Result<Response, _> = serde_json::from_str(&msg);
 
                             if let Ok(msg) = event {

+ 1 - 1
crates/personal-relayer/Cargo.toml

@@ -6,13 +6,13 @@ edition = "2021"
 [dependencies]
 nostr-rs-types = { path = "../types" }
 nostr-rs-storage-base = { path = "../storage/base" }
+nostr-rs-sqlite = { path = "../storage/sqlite" }
 nostr-rs-client = { path = "../client" }
 nostr-rs-relayer = { path = "../relayer" }
 thiserror = "1.0.39"
 url = { version = "2.5.2", features = ["serde"] }
 futures = "0.3.30"
 tokio = { version = "1.39.2", features = ["full"] }
-nostr-rs-memory = { path = "../storage/memory" }
 serde = "1.0.208"
 serde_json = "1.0.125"
 log = "0.4.22"

+ 40 - 63
crates/personal-relayer/src/lib.rs

@@ -1,25 +1,19 @@
-use std::{
-    collections::{HashMap, HashSet},
-    ops::Deref,
-};
-
 use futures::future::join_all;
 use nostr_rs_client::Pool;
 use nostr_rs_relayer::Relayer;
 use nostr_rs_storage_base::Storage;
 use nostr_rs_types::{
-    types::{content::profile::Profile, Content, Event, Filter, Id, Kind, Tag},
+    types::{content::profile::Profile, tag::TagType, Content, Event, Filter, Id},
     Request,
 };
 use serde::Serialize;
+use std::{
+    collections::{HashMap, HashSet},
+    ops::Deref,
+};
 use tokio::{net::TcpListener, task::JoinHandle};
 use url::Url;
 
-//
-// g
-use tokio::fs::OpenOptions;
-use tokio::io::AsyncWriteExt;
-
 pub struct Stoppable(Option<Vec<JoinHandle<()>>>);
 
 impl From<Vec<JoinHandle<()>>> for Stoppable {
@@ -107,7 +101,7 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
                 local_connection
                     .send(Request::Request(
                         vec![Filter {
-                            authors: self.accounts.keys().map(|x| x.clone()).collect::<Vec<_>>(),
+                            authors: self.accounts.keys().cloned().collect::<Vec<_>>(),
                             //kinds: vec![Kind::Metadata, Kind::ShortTextNote, Kind::Contacts],
                             ..Default::default()
                         }]
@@ -121,10 +115,10 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
                         if let Some(event) = res.as_event() {
                             match event.content() {
                                 Content::Metadata(profile) => {
-                                    self.accounts.get_mut(&event.author()).map(|account| {
+                                    if let Some(account) = self.accounts.get_mut(event.author()) {
                                         account.profile = Some(profile.clone());
                                         account.content.push(event.deref().clone());
-                                    });
+                                    }
                                 }
                                 Content::Contacts(_) => {
                                     let current = event.author().clone();
@@ -139,70 +133,53 @@ impl<T: Storage + Send + Sync + 'static> PersonalRelayer<T> {
                                     let mut ids = vec![];
 
                                     for tag in event.tags() {
-                                        match tag {
-                                            Tag::PubKey(pub_key, relayer_url, _) => {
-                                                let followed = self
-                                                    .accounts
-                                                    .entry(pub_key.clone())
-                                                    .or_insert(Contact::new(
-                                                        pub_key.clone(),
-                                                        Some(current.clone()),
-                                                    ));
-
-                                                if let Some((Some(relayer_url), _)) = relayer_url {
-                                                    let _ = relayer
-                                                        .connect_to_relayer(relayer_url.clone())
-                                                        .await;
-                                                }
-
-                                                current_user.following.insert(pub_key.clone());
-                                                followed.followed_by.insert(current.clone());
-                                                ids.push(pub_key.clone());
+                                        if let TagType::PubKey(pub_key, relayer_url, _) =
+                                            tag.deref()
+                                        {
+                                            let followed = self
+                                                .accounts
+                                                .entry(pub_key.clone())
+                                                .or_insert(Contact::new(
+                                                    pub_key.clone(),
+                                                    Some(current.clone()),
+                                                ));
+
+                                            if let Some(relayer_url) = relayer_url {
+                                                let _ = relayer
+                                                    .connect_to_relayer(relayer_url.clone())
+                                                    .await;
                                             }
-                                            _ => {}
+
+                                            current_user.following.insert(pub_key.clone());
+                                            followed.followed_by.insert(current.clone());
+                                            ids.push(pub_key.clone());
                                         }
                                     }
 
                                     self.accounts.insert(current, current_user);
 
-                                    let _ = local_connection
-                                        .send(Request::Request(
-                                            Filter {
-                                                authors: ids,
-                                                kinds: vec![
-                                                    Kind::Metadata,
-                                                    Kind::ShortTextNote,
-                                                    Kind::Contacts,
-                                                ],
-                                                ..Default::default()
-                                            }
-                                            .into(),
-                                        ))
-                                        .await;
+                                    for authors in ids.chunks(20).collect::<Vec<_>>().into_iter() {
+                                        let _ = local_connection
+                                            .send(Request::Request(
+                                                Filter {
+                                                    authors: authors.to_vec(),
+                                                    ..Default::default()
+                                                }
+                                                .into(),
+                                            ))
+                                            .await;
+                                    }
                                 }
                                 Content::ShortTextNote(_) => {
-                                    self.accounts.get_mut(&event.author()).map(|account| {
+                                    if let Some(account) = self.accounts.get_mut(event.author()) {
                                         account.content.push(event.deref().clone());
-                                    });
+                                    }
                                 }
                                 _ => {}
                             }
                         } else {
                             println!("Not an event: {:?}", res);
                         }
-
-                        let mut file = OpenOptions::new()
-                            .write(true)
-                            .create(true)
-                            .open("example.txt")
-                            .await
-                            .expect("Failed to open file");
-
-                        file.write_all(&serde_json::to_vec_pretty(&self.accounts).unwrap())
-                            .await
-                            .expect("Failed to write to file");
-
-                        file.flush().await.expect("Failed to flush file");
                     }
                 }
             }),

+ 0 - 3
crates/relayer/src/relayer.rs

@@ -173,9 +173,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
         let handle = tokio::spawn(async move {
             loop {
-                if receiver.len() > 500 {
-                    println!("{}", receiver.len());
-                }
                 if let Some((response, _)) = receiver.recv().await {
                     match response {
                         Response::Event(event) => {

+ 1 - 1
crates/storage/sqlite/Cargo.toml

@@ -1,5 +1,5 @@
 [package]
-name = "sqlite"
+name = "nostr-rs-sqlite"
 version = "0.1.0"
 edition = "2021"
 

+ 1 - 1
crates/storage/sqlite/src/lib.rs

@@ -251,7 +251,7 @@ impl SQLite {
                 if let Ok(event) = &event {
                     if filter
                         .as_ref()
-                        .map(|f| !f.check_event(&event))
+                        .map(|f| !f.check_event(event))
                         .unwrap_or_default()
                     {
                         continue;

+ 1 - 1
crates/types/src/types/content/mod.rs

@@ -104,7 +104,7 @@ impl Content {
                                     iv,
                                 })
                         })
-                        .map(|data| Self::EncryptedDirectMessage(data))
+                        .map(Self::EncryptedDirectMessage)
                         .unwrap_or_else(|_| Self::Unparsed(Kind::Unknown(4), content.to_owned())))
                 } else {
                     Ok(Self::Unparsed(Kind::Unknown(4), content.to_owned()))

+ 1 - 0
crates/types/src/types/filter.rs

@@ -44,6 +44,7 @@ impl TagValue {
     }
 
     /// Convert the value into a string
+    #[allow(clippy::inherent_to_string)]
     pub fn to_string(&self) -> String {
         match self {
             TagValue::Id(id) => id.to_string(),