|
@@ -1,24 +1,18 @@
|
|
|
-use crate::{Connection, Error};
|
|
|
+use crate::{Connection, Error, Subscription};
|
|
|
use nostr_rs_storage::RocksDb;
|
|
|
use nostr_rs_types::{
|
|
|
relayer,
|
|
|
- types::{Event, Filter, SubscriptionId},
|
|
|
+ types::{Event, SubscriptionId},
|
|
|
Request, Response,
|
|
|
};
|
|
|
use parking_lot::{RwLock, RwLockReadGuard};
|
|
|
use std::{collections::HashMap, ops::Deref, sync::Arc};
|
|
|
+#[allow(unused_imports)]
|
|
|
use tokio::{
|
|
|
net::TcpStream,
|
|
|
sync::mpsc::{channel, Receiver, Sender},
|
|
|
};
|
|
|
|
|
|
-#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
|
|
|
-pub struct SubscriptionType {
|
|
|
- public_key: Option<Vec<u8>>,
|
|
|
- id: Option<Vec<u8>>,
|
|
|
- kind: Option<u32>,
|
|
|
-}
|
|
|
-
|
|
|
type SubId = u128;
|
|
|
|
|
|
type Subscriptions = HashMap<SubId, (SubscriptionId, Sender<Response>)>;
|
|
@@ -31,14 +25,15 @@ pub struct Relayer {
|
|
|
/// Each connection keeps a list of the subscription ID provided by the user
|
|
|
/// (String) and the internal, globally recognized subscription ID which is
|
|
|
/// internal (SubId)
|
|
|
- subscriptions_ids_index: RwLock<HashMap<SubId, Vec<SubscriptionType>>>,
|
|
|
+ subscriptions_ids_index: RwLock<HashMap<SubId, Vec<Subscription>>>,
|
|
|
/// Each subscription type that is active has a list of subscriptions.
|
|
|
///
|
|
|
/// A single REQ can be subscribed to multiple subscription types, specially
|
|
|
/// when it is translated in OR filters. It is designed this way to allow a
|
|
|
/// fast iteration and match quickly filters.
|
|
|
- subscriptions: RwLock<HashMap<SubscriptionType, RwLock<Subscriptions>>>,
|
|
|
+ subscriptions: RwLock<HashMap<Subscription, RwLock<Subscriptions>>>,
|
|
|
clients: RwLock<HashMap<u128, Connection>>,
|
|
|
+ #[allow(dead_code)]
|
|
|
sender: Sender<(u128, Request)>,
|
|
|
}
|
|
|
|
|
@@ -62,6 +57,7 @@ impl Relayer {
|
|
|
&self.storage
|
|
|
}
|
|
|
|
|
|
+ #[cfg(not(test))]
|
|
|
pub async fn add_connection(&self, stream: TcpStream) -> Result<(), Error> {
|
|
|
let client = Connection::new(self.sender.clone(), stream).await?;
|
|
|
let mut clients = self.clients.write();
|
|
@@ -70,62 +66,52 @@ impl Relayer {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- pub async fn recv(
|
|
|
+ fn recv_request_from_client(
|
|
|
&self,
|
|
|
- receiver: &mut Receiver<(u128, Request)>,
|
|
|
+ connection: &Connection,
|
|
|
+ request: Request,
|
|
|
) -> Result<Option<Request>, Error> {
|
|
|
- let (conn_id, request) = if let Some(request) = receiver.recv().await {
|
|
|
- request
|
|
|
- } else {
|
|
|
- return Ok(None);
|
|
|
- };
|
|
|
-
|
|
|
- let connections = self.clients.read();
|
|
|
- let connection = connections
|
|
|
- .get(&conn_id)
|
|
|
- .ok_or(Error::UnknownConnection(conn_id))?;
|
|
|
-
|
|
|
match &request {
|
|
|
Request::Event(event) => {
|
|
|
self.store_and_broadcast_local_event(event.deref());
|
|
|
}
|
|
|
Request::Request(request) => {
|
|
|
- for filter in request.filters.clone().into_iter() {
|
|
|
- // Create subscription
|
|
|
- let (sub_id, receiver) =
|
|
|
- connection.create_subscription(request.subscription_id.deref().to_owned());
|
|
|
- let mut sub_index = self.subscriptions_ids_index.write();
|
|
|
- let mut subscriptions = self.subscriptions.write();
|
|
|
- if let Some(prev_subs) = sub_index.remove(&sub_id) {
|
|
|
- // remove any previous subscriptions
|
|
|
- prev_subs.iter().for_each(|index| {
|
|
|
- if let Some(subscriptions) = subscriptions.get_mut(&index) {
|
|
|
- subscriptions.write().remove(&sub_id);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- sub_index.insert(
|
|
|
- sub_id,
|
|
|
- Self::get_indexes_from_filter(&filter)
|
|
|
- .into_iter()
|
|
|
- .map(|index| {
|
|
|
- subscriptions
|
|
|
- .entry(index.clone())
|
|
|
- .or_insert_with(|| RwLock::new(HashMap::new()))
|
|
|
- .write()
|
|
|
- .insert(
|
|
|
- sub_id,
|
|
|
- (request.subscription_id.clone(), receiver.clone()),
|
|
|
- );
|
|
|
- index
|
|
|
- })
|
|
|
- .collect::<Vec<_>>(),
|
|
|
- );
|
|
|
+ // Create subscription
|
|
|
+ let (sub_id, receiver) =
|
|
|
+ connection.create_subscription(request.subscription_id.deref().to_owned());
|
|
|
+ let mut sub_index = self.subscriptions_ids_index.write();
|
|
|
+ let mut subscriptions = self.subscriptions.write();
|
|
|
+ if let Some(prev_subs) = sub_index.remove(&sub_id) {
|
|
|
+ // remove any previous subscriptions
|
|
|
+ prev_subs.iter().for_each(|index| {
|
|
|
+ if let Some(subscriptions) = subscriptions.get_mut(&index) {
|
|
|
+ subscriptions.write().remove(&sub_id);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ sub_index.insert(
|
|
|
+ sub_id,
|
|
|
+ Subscription::from_filters(&request.filters)
|
|
|
+ .into_iter()
|
|
|
+ .map(|index| {
|
|
|
+ subscriptions
|
|
|
+ .entry(index.clone())
|
|
|
+ .or_insert_with(|| RwLock::new(HashMap::new()))
|
|
|
+ .write()
|
|
|
+ .insert(
|
|
|
+ sub_id,
|
|
|
+ (request.subscription_id.clone(), receiver.clone()),
|
|
|
+ );
|
|
|
+ index
|
|
|
+ })
|
|
|
+ .collect::<Vec<_>>(),
|
|
|
+ );
|
|
|
|
|
|
- drop(subscriptions);
|
|
|
- drop(sub_index);
|
|
|
+ drop(subscriptions);
|
|
|
+ drop(sub_index);
|
|
|
|
|
|
- // Sent all events that match the filter that are stored in our database
|
|
|
+ // Sent all events that match the filter that are stored in our database
|
|
|
+ for filter in request.filters.clone().into_iter() {
|
|
|
self.storage
|
|
|
.get_by_filter(filter)?
|
|
|
.into_iter()
|
|
@@ -139,6 +125,7 @@ impl Relayer {
|
|
|
);
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
let _ = connection
|
|
|
.send(relayer::EndOfStoredEvents(request.subscription_id.clone()).into());
|
|
|
}
|
|
@@ -160,73 +147,21 @@ impl Relayer {
|
|
|
Ok(Some(request))
|
|
|
}
|
|
|
|
|
|
- #[inline]
|
|
|
- fn get_indexes_from_filter(filter: &Filter) -> Vec<SubscriptionType> {
|
|
|
- let public_keys = if filter.references_to_public_key.is_empty() {
|
|
|
- vec![None]
|
|
|
- } else {
|
|
|
- filter
|
|
|
- .references_to_public_key
|
|
|
- .iter()
|
|
|
- .map(|public_key| Some((*public_key).to_vec()))
|
|
|
- .collect()
|
|
|
- };
|
|
|
- let id = if filter.references_to_event.is_empty() {
|
|
|
- vec![None]
|
|
|
- } else {
|
|
|
- filter
|
|
|
- .references_to_event
|
|
|
- .iter()
|
|
|
- .map(|id| Some((*id).to_vec()))
|
|
|
- .collect()
|
|
|
- };
|
|
|
- let kind = if filter.kinds.is_empty() {
|
|
|
- vec![None]
|
|
|
+ pub async fn recv(
|
|
|
+ &self,
|
|
|
+ receiver: &mut Receiver<(u128, Request)>,
|
|
|
+ ) -> Result<Option<Request>, Error> {
|
|
|
+ let (conn_id, request) = if let Some(request) = receiver.recv().await {
|
|
|
+ request
|
|
|
} else {
|
|
|
- filter
|
|
|
- .kinds
|
|
|
- .iter()
|
|
|
- .map(|kind| Some((*kind).into()))
|
|
|
- .collect()
|
|
|
+ return Ok(None);
|
|
|
};
|
|
|
- let mut subs = vec![];
|
|
|
-
|
|
|
- for public_key in public_keys.iter() {
|
|
|
- for id in id.iter() {
|
|
|
- for kind in kind.iter() {
|
|
|
- subs.push(SubscriptionType {
|
|
|
- public_key: public_key.clone(),
|
|
|
- id: id.clone(),
|
|
|
- kind: *kind,
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- subs
|
|
|
- }
|
|
|
-
|
|
|
- #[inline]
|
|
|
- fn get_possible_listeners_from_event(event: &Event) -> Vec<SubscriptionType> {
|
|
|
- let kind = event.kind().into();
|
|
|
- let public_keys = [None, Some(event.author().as_ref().to_vec())];
|
|
|
- let id = [None, Some(event.id.as_ref().to_vec())];
|
|
|
- let kind = [None, Some(kind)];
|
|
|
- let mut subs = vec![];
|
|
|
-
|
|
|
- for public_key in public_keys.iter() {
|
|
|
- for id in id.iter() {
|
|
|
- for kind in kind.iter() {
|
|
|
- subs.push(SubscriptionType {
|
|
|
- public_key: public_key.clone(),
|
|
|
- id: id.clone(),
|
|
|
- kind: *kind,
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ let connections = self.clients.read();
|
|
|
+ let connection = connections
|
|
|
+ .get(&conn_id)
|
|
|
+ .ok_or(Error::UnknownConnection(conn_id))?;
|
|
|
|
|
|
- subs
|
|
|
+ self.recv_request_from_client(connection, request)
|
|
|
}
|
|
|
|
|
|
#[inline]
|
|
@@ -247,7 +182,7 @@ impl Relayer {
|
|
|
let _ = self.storage.store_local_event(event);
|
|
|
let subscriptions = self.subscriptions.read();
|
|
|
|
|
|
- for subscription_type in Self::get_possible_listeners_from_event(event) {
|
|
|
+ for subscription_type in Subscription::from_event(event) {
|
|
|
if let Some(subscribers) = subscriptions.get(&subscription_type) {
|
|
|
Self::broadcast_to_subscribers(subscribers.read(), event);
|
|
|
}
|
|
@@ -259,10 +194,113 @@ impl Relayer {
|
|
|
let _ = self.storage.store(event);
|
|
|
let subscriptions = self.subscriptions.read();
|
|
|
|
|
|
- for subscription_type in Self::get_possible_listeners_from_event(event) {
|
|
|
+ for subscription_type in Subscription::from_event(event) {
|
|
|
if let Some(subscribers) = subscriptions.get(&subscription_type) {
|
|
|
Self::broadcast_to_subscribers(subscribers.read(), event);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+#[cfg(test)]
|
|
|
+mod test {
|
|
|
+ use super::*;
|
|
|
+ use crate::get_id;
|
|
|
+ use nostr_rs_storage::RocksDb;
|
|
|
+ use nostr_rs_types::Request;
|
|
|
+ use std::{
|
|
|
+ fs::File,
|
|
|
+ io::{BufRead, BufReader},
|
|
|
+ };
|
|
|
+
|
|
|
+ fn get_db(prefill: bool) -> RocksDb {
|
|
|
+ let db = RocksDb::new(format!("tests/db/{}", get_id())).expect("db");
|
|
|
+ if prefill {
|
|
|
+ let file = File::open("./tests/events.json").expect("file");
|
|
|
+ let events = BufReader::new(file)
|
|
|
+ .lines()
|
|
|
+ .map(|line| serde_json::from_str(&line.expect("line")).expect("valid"))
|
|
|
+ .collect::<Vec<Event>>();
|
|
|
+
|
|
|
+ for event in events {
|
|
|
+ assert!(db.store(&event).expect("valid"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ db
|
|
|
+ }
|
|
|
+
|
|
|
+ #[tokio::test]
|
|
|
+ async fn serve_listener_from_local_db() {
|
|
|
+ let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
|
|
|
+ let (relayer, _) = Relayer::new(get_db(true));
|
|
|
+ let (connection, mut recv) = Connection::new();
|
|
|
+ let _ = relayer.recv_request_from_client(&connection, request);
|
|
|
+ // ev1
|
|
|
+ assert_eq!(
|
|
|
+ "9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42",
|
|
|
+ recv.try_recv()
|
|
|
+ .expect("valid")
|
|
|
+ .as_event()
|
|
|
+ .expect("event")
|
|
|
+ .event
|
|
|
+ .id
|
|
|
+ .to_string()
|
|
|
+ );
|
|
|
+ // ev2
|
|
|
+ assert_eq!(
|
|
|
+ "2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a",
|
|
|
+ recv.try_recv()
|
|
|
+ .expect("valid")
|
|
|
+ .as_event()
|
|
|
+ .expect("event")
|
|
|
+ .event
|
|
|
+ .id
|
|
|
+ .to_string()
|
|
|
+ );
|
|
|
+ // ev3
|
|
|
+ assert_eq!(
|
|
|
+ "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9",
|
|
|
+ recv.try_recv()
|
|
|
+ .expect("valid")
|
|
|
+ .as_event()
|
|
|
+ .expect("event")
|
|
|
+ .event
|
|
|
+ .id
|
|
|
+ .to_string()
|
|
|
+ );
|
|
|
+ // eod
|
|
|
+ assert!(recv
|
|
|
+ .try_recv()
|
|
|
+ .expect("valid")
|
|
|
+ .as_end_of_stored_events()
|
|
|
+ .is_some());
|
|
|
+
|
|
|
+ assert!(recv.try_recv().is_err());
|
|
|
+ }
|
|
|
+
|
|
|
+ #[tokio::test]
|
|
|
+ async fn server_listener_real_time() {
|
|
|
+ let request: Request = serde_json::from_str("[\"REQ\",\"1298169700973717\",{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[1,3,6,7,9735],\"since\":1681939304},{\"#p\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"authors\":[\"39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb\"],\"kinds\":[4]},{\"#e\":[\"2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a\",\"a5e3369c43daf2675ecbce18831e5f4e07db0d4dde0ef4f5698e645e4c46eed1\"],\"kinds\":[1,6,7,9735]}]").expect("valid object");
|
|
|
+ let (relayer, _) = Relayer::new(get_db(false));
|
|
|
+ let (connection, mut recv) = Connection::new();
|
|
|
+ let _ = relayer.recv_request_from_client(&connection, request);
|
|
|
+ // eod
|
|
|
+ assert!(recv
|
|
|
+ .try_recv()
|
|
|
+ .expect("valid")
|
|
|
+ .as_end_of_stored_events()
|
|
|
+ .is_some());
|
|
|
+
|
|
|
+ // It is empty
|
|
|
+ assert!(recv.try_recv().is_err());
|
|
|
+
|
|
|
+ let new_event: Request = serde_json::from_str(r#"["EVENT", {"kind":1,"content":"Pong","tags":[["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]],"created_at":1681938616,"pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3","id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9","sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc","meta":{"revision":0,"created":1681939266488,"version":0},"$loki":108}]"#).expect("value");
|
|
|
+
|
|
|
+ relayer
|
|
|
+ .recv_request_from_client(&connection, new_event)
|
|
|
+ .expect("process event");
|
|
|
+
|
|
|
+ // It is not empty
|
|
|
+ assert!(recv.try_recv().is_ok());
|
|
|
+ }
|
|
|
+}
|