@@ -0,0 +1,159 @@
+use nostr_rs_client::{Clients, Error as ClientError};
+use nostr_rs_types::{
+ client::{Close, Subscribe},
+ types::{Addr, Filter, Kind},
+ Request, Response,
+use sqlx::{query, FromRow, Pool, Sqlite, SqlitePool};
+use tokio::time::{sleep, Duration};
+#[derive(Clone, FromRow, Debug)]
+struct PublicKeys {
+ pub public_key: String,
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("Sql: {0}")]
+ Sql(#[from] sqlx::Error),
+ #[error("Nostr: {0}")]
+ Addr(#[from] nostr_rs_types::types::addr::Error),
+ #[error("client: {0}")]
+ Client(#[from] ClientError),
+async fn request_profiles_from_db(clients: Clients, conn: Pool<Sqlite>) -> Result<(), Error> {
+ let public_keys = sqlx::query_as::<_, PublicKeys>(
+ r#"
+ distinct public_key
+ events
+ WHERE public_key NOT IN (
+ SELECT public_key
+ FROM events
+ WHERE kind = 0
+ )
+ "#,
+ )
+ .fetch_all(&conn)
+ .await?
+ .iter()
+ .map(|p| p.public_key.as_str().try_into())
+ .collect::<Result<Vec<Addr>, _>>()?;
+ let keys = public_keys.chunks(50).collect::<Vec<&[Addr]>>();
+ for (i, keys) in keys.iter().enumerate() {
+ println!("Fetching {} profiles", keys.len());
+ clients
+ .send(
+ Subscribe {
+ subscription_id: format!("temp:{}", i).try_into().unwrap(),
+ filters: vec![Filter {
+ authors: keys.to_vec(),
+ kinds: vec![
+ Kind::Metadata,
+ Kind::ShortTextNote,
+ Kind::Contacts,
+ Kind::Repost,
+ Kind::Reaction,
+ Kind::ZapRequest,
+ Kind::Zap,
+ ],
+ ..Filter::default()
+ }],
+ }
+ .into(),
+ )
+ .await;
+ sleep(Duration::from_millis(5_000)).await;
+ }
+ Ok(())
+async fn main() {
+ env_logger::init();
+ let conn = SqlitePool::connect("sqlite://./db.sqlite").await.unwrap();
+ let clients = Clients::default();
+ clients
+ .connect_to("wss://relay.damus.io")
+ .await
+ .expect("register");
+ let _ = query(
+ r#"CREATE TABLE events(
+ id varchar(64) not null primary key,
+ public_key varchar(64) not null,
+ kind int,
+ event text
+ )"#,
+ )
+ .execute(&conn)
+ .await;
+ let clients_for_worker = clients.clone();
+ let conn_for_worker = conn.clone();
+ tokio::spawn(async move {
+ loop {
+ let _ =
+ request_profiles_from_db(clients_for_worker.clone(), conn_for_worker.clone()).await;
+ }
+ });
+ let request: Request = Subscribe::default().into();
+ clients.send(request).await;
+ loop {
+ if let Some(msg) = clients.try_recv() {
+ match msg {
+ Response::Notice(n) => {
+ println!("Error: {}", &*n);
+ }
+ Response::EndOfStoredEvents(x) => {
+ let subscription_id = &*x;
+ if &subscription_id[0..5] == "temp:" {
+ // Remove listener, to avoid having too many requests at the same time
+ let _ = clients.send(Close((*x).clone()).into()).await;
+ println!("Remove listener: {0}", (*subscription_id).to_string());
+ }
+ }
+ Response::Event(x) => {
+ let event = x.event;
+ let kind: u32 = event.inner.kind.try_into().expect("kind");
+ if let Ok(Some(_)) = query(r#"SELECT id FROM events WHERE id = ?"#)
+ .bind(event.id.to_string())
+ .fetch_optional(&conn)
+ .await
+ {
+ /*println!(
+ "Skip storing: {} -> {:?}",
+ event.id.to_string(),
+ event.inner
+ );*/
+ continue;
+ }
+ let _ = query(
+ r#"INSERT INTO events(id, public_key, kind, event) VALUES(?, ?, ?, ?)"#,
+ )
+ .bind(event.id.to_string())
+ .bind(event.inner.public_key.to_string())
+ .bind(kind.to_string())
+ .bind(serde_json::to_string(&event).unwrap())
+ .execute(&conn)
+ .await;
+ //println!("Stored: {}", event.id.to_string());
+ }
+ _ => {}
+ };
+ }
+ sleep(Duration::from_millis(10)).await;
+ }