|
@@ -4,9 +4,13 @@ use crate::{
|
|
|
Connection, Error,
|
|
|
};
|
|
|
use futures_util::StreamExt;
|
|
|
-use nostr_rs_client::{Error as ClientError, Pool};
|
|
|
+use nostr_rs_client::{Error as ClientError, Pool, Url};
|
|
|
use nostr_rs_storage_base::Storage;
|
|
|
-use nostr_rs_types::{relayer, types::Event, Request, Response};
|
|
|
+use nostr_rs_types::{
|
|
|
+ relayer::{self, ROk, ROkStatus},
|
|
|
+ types::{Addr, Event},
|
|
|
+ Request, Response,
|
|
|
+};
|
|
|
use std::{
|
|
|
collections::{HashMap, HashSet},
|
|
|
ops::Deref,
|
|
@@ -76,6 +80,13 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+ /// Connects to the relayer pool
|
|
|
+ pub async fn connect_to_relayer(&self, url: Url) -> Result<(), Error> {
|
|
|
+ let (client_pool, _) = self.client_pool.as_ref().ok_or(Error::NoClient)?;
|
|
|
+ client_pool.connect_to(url).await;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
/// Total number of subscribers requests that actively listening for new events
|
|
|
pub fn total_subscribers(&self) -> usize {
|
|
|
self.subscriptions.total_subscribers()
|
|
@@ -91,9 +102,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
///
|
|
|
/// This function consumes the object and takes the ownership. The returned
|
|
|
/// JoinHandle() can be used to stop the main loop
|
|
|
- pub fn main(self, server: TcpListener) -> Result<JoinHandle<()>, Error> {
|
|
|
+ pub fn main(self, server: TcpListener) -> Result<(Arc<Self>, JoinHandle<()>), Error> {
|
|
|
let (this, mut receiver) = self.split()?;
|
|
|
- Ok(tokio::spawn(async move {
|
|
|
+ let _self = Arc::new(this);
|
|
|
+ let this = _self.clone();
|
|
|
+
|
|
|
+ let handle = tokio::spawn(async move {
|
|
|
loop {
|
|
|
tokio::select! {
|
|
|
Ok((stream, _)) = server.accept() => {
|
|
@@ -103,12 +117,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
Some((conn_id, request)) = receiver.recv() => {
|
|
|
// receive messages from the connection pool
|
|
|
if conn_id.is_empty() {
|
|
|
- // connection pool
|
|
|
+ // message received from client pool
|
|
|
if let Request::Event(event) = request {
|
|
|
+ let _ = this.broadcast(event.deref()).await;
|
|
|
if let Some(storage) = this.storage.as_ref() {
|
|
|
let _ = storage.store_local_event(&event).await;
|
|
|
}
|
|
|
- this.broadcast(event.deref()).await;
|
|
|
}
|
|
|
continue;
|
|
|
}
|
|
@@ -128,7 +142,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- }))
|
|
|
+ });
|
|
|
+
|
|
|
+ Ok((_self, handle))
|
|
|
}
|
|
|
|
|
|
/// Handle the client pool
|
|
@@ -136,21 +152,22 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
/// Main loop to consume messages from the client pool and broadcast them to the local subscribers
|
|
|
fn handle_client_pool(
|
|
|
client_pool: Pool,
|
|
|
- sender: Sender<(ConnectionId, Request)>,
|
|
|
+ send_message_to_relayer: Sender<(ConnectionId, Request)>,
|
|
|
) -> Result<(Pool, JoinHandle<()>), ClientError> {
|
|
|
let (mut receiver, client_pool) = client_pool.split()?;
|
|
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
loop {
|
|
|
+ if receiver.len() > 500 {
|
|
|
+ println!("{}", receiver.len());
|
|
|
+ }
|
|
|
if let Some((response, _)) = receiver.recv().await {
|
|
|
match response {
|
|
|
Response::Event(event) => {
|
|
|
- let _ = sender
|
|
|
- .send((
|
|
|
- ConnectionId::new_empty(),
|
|
|
- Request::Event(event.event.into()),
|
|
|
- ))
|
|
|
- .await;
|
|
|
+ let _ = send_message_to_relayer.try_send((
|
|
|
+ ConnectionId::new_empty(),
|
|
|
+ Request::Event(event.event.into()),
|
|
|
+ ));
|
|
|
}
|
|
|
Response::EndOfStoredEvents(_) => {}
|
|
|
x => {
|
|
@@ -200,32 +217,45 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
&self,
|
|
|
connection: &Connection,
|
|
|
request: Request,
|
|
|
- ) -> Result<Option<Request>, Error> {
|
|
|
- match &request {
|
|
|
+ ) -> Result<(), Error> {
|
|
|
+ match request {
|
|
|
Request::Event(event) => {
|
|
|
- if let Some(storage) = self.storage.as_ref() {
|
|
|
- let _ = storage.store(event).await;
|
|
|
- let _ = storage.store_local_event(event).await;
|
|
|
+ let event_id: Addr = event.id.clone().into();
|
|
|
+ if !self.broadcast(&event).await? {
|
|
|
+ connection.send(
|
|
|
+ ROk {
|
|
|
+ id: event_id,
|
|
|
+ status: ROkStatus::Duplicate,
|
|
|
+ }
|
|
|
+ .into(),
|
|
|
+ )?;
|
|
|
+ return Ok(());
|
|
|
}
|
|
|
|
|
|
- self.broadcast(event).await;
|
|
|
+ if let Some(storage) = self.storage.as_ref() {
|
|
|
+ let _ = storage.store_local_event(&event).await;
|
|
|
+ }
|
|
|
|
|
|
if let Some((client_pool, _)) = self.client_pool.as_ref() {
|
|
|
// pass the event to the pool of clients, so this relayer can relay
|
|
|
// their local events to the clients in the network of relayers
|
|
|
- let _ = client_pool.post(event.clone()).await;
|
|
|
+ let _ = client_pool.post(event).await;
|
|
|
}
|
|
|
+
|
|
|
+ connection.send(
|
|
|
+ ROk {
|
|
|
+ id: event_id,
|
|
|
+ status: ROkStatus::Ok,
|
|
|
+ }
|
|
|
+ .into(),
|
|
|
+ )?;
|
|
|
}
|
|
|
Request::Request(request) => {
|
|
|
let foreign_subscription = if let Some((client_pool, _)) = self.client_pool.as_ref()
|
|
|
{
|
|
|
// pass the subscription request to the pool of clients, so this relayer
|
|
|
// can relay any unknown event to the clients through their subscriptions
|
|
|
- Some(
|
|
|
- client_pool
|
|
|
- .subscribe(request.filters.clone().into())
|
|
|
- .await?,
|
|
|
- )
|
|
|
+ Some(client_pool.subscribe(request.clone()).await?)
|
|
|
} else {
|
|
|
None
|
|
|
};
|
|
@@ -272,21 +302,24 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
.await;
|
|
|
}
|
|
|
Request::Close(close) => {
|
|
|
- connection.unsubscribe(close).await;
|
|
|
+ connection.unsubscribe(&close).await;
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- Ok(Some(request))
|
|
|
+ Ok(())
|
|
|
}
|
|
|
|
|
|
#[inline]
|
|
|
/// Broadcast a given event to all local subscribers
|
|
|
- pub async fn broadcast(&self, event: &Event) {
|
|
|
+ pub async fn broadcast(&self, event: &Event) -> Result<bool, Error> {
|
|
|
if let Some(storage) = self.storage.as_ref() {
|
|
|
- let _ = storage.store(event).await;
|
|
|
+ if !storage.store(event).await? {
|
|
|
+ return Ok(false);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
self.subscriptions.broadcast(event.clone());
|
|
|
+ Ok(true)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -314,7 +347,7 @@ mod test {
|
|
|
|
|
|
let relayer =
|
|
|
Relayer::new(Some(Memory::default()), client_pool).expect("valid dummy server");
|
|
|
- let stopper = relayer.main(listener).expect("valid main loop");
|
|
|
+ let (_, stopper) = relayer.main(listener).expect("valid main loop");
|
|
|
(
|
|
|
Url::parse(&format!("ws://{}", local_addr)).expect("valid url"),
|
|
|
stopper,
|
|
@@ -393,6 +426,18 @@ mod test {
|
|
|
let _ = relayer
|
|
|
.process_request_from_client(&connection, request)
|
|
|
.await;
|
|
|
+
|
|
|
+ // ev1
|
|
|
+ assert_eq!(
|
|
|
+ ROkStatus::Ok,
|
|
|
+ recv.try_recv()
|
|
|
+ .expect("valid")
|
|
|
+ .as_ok()
|
|
|
+ .cloned()
|
|
|
+ .unwrap()
|
|
|
+ .status,
|
|
|
+ );
|
|
|
+
|
|
|
// ev1
|
|
|
assert_eq!(
|
|
|
note,
|
|
@@ -475,7 +520,6 @@ mod test {
|
|
|
.expect("valid")
|
|
|
.as_event()
|
|
|
.expect("event")
|
|
|
- .event
|
|
|
.id
|
|
|
.to_string()
|
|
|
);
|
|
@@ -486,7 +530,6 @@ mod test {
|
|
|
.expect("valid")
|
|
|
.as_event()
|
|
|
.expect("event")
|
|
|
- .event
|
|
|
.id
|
|
|
.to_string()
|
|
|
);
|
|
@@ -497,7 +540,6 @@ mod test {
|
|
|
.expect("valid")
|
|
|
.as_event()
|
|
|
.expect("event")
|
|
|
- .event
|
|
|
.id
|
|
|
.to_string()
|
|
|
);
|
|
@@ -567,6 +609,14 @@ mod test {
|
|
|
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
|
|
+ // ok from posting
|
|
|
+ let msg = recv.try_recv();
|
|
|
+ assert!(msg.is_ok());
|
|
|
+ assert_eq!(
|
|
|
+ msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
|
|
|
+ "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
|
|
|
+ );
|
|
|
+
|
|
|
// It is not empty
|
|
|
let msg = recv.try_recv();
|
|
|
assert!(msg.is_ok());
|
|
@@ -651,6 +701,14 @@ mod test {
|
|
|
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
|
|
+ // ok from posting
|
|
|
+ let msg = recv.try_recv();
|
|
|
+ assert!(msg.is_ok());
|
|
|
+ assert_eq!(
|
|
|
+ msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
|
|
|
+ "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
|
|
|
+ );
|
|
|
+
|
|
|
// It is not empty
|
|
|
let msg = recv.try_recv();
|
|
|
assert!(msg.is_ok());
|
|
@@ -688,7 +746,7 @@ mod test {
|
|
|
.expect("valid object");
|
|
|
|
|
|
let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
|
|
|
- let (publisher, _) = Connection::new_local_connection();
|
|
|
+ let (publisher, mut recv) = Connection::new_local_connection();
|
|
|
|
|
|
let mut set1 = (0..1000)
|
|
|
.map(|_| Connection::new_local_connection())
|
|
@@ -746,6 +804,13 @@ mod test {
|
|
|
|
|
|
sleep(Duration::from_millis(10)).await;
|
|
|
|
|
|
+ let msg = recv.try_recv();
|
|
|
+ assert!(msg.is_ok());
|
|
|
+ assert_eq!(
|
|
|
+ msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
|
|
|
+ "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
|
|
|
+ );
|
|
|
+
|
|
|
for (_, recv) in set1.iter_mut() {
|
|
|
assert!(recv.try_recv().is_err());
|
|
|
}
|
|
@@ -776,6 +841,33 @@ mod test {
|
|
|
}
|
|
|
|
|
|
#[tokio::test]
|
|
|
+ async fn posting_event_replies_ok() {
|
|
|
+ let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
|
|
|
+ let (connection, mut recv) = Connection::new_local_connection();
|
|
|
+
|
|
|
+ let note = get_note();
|
|
|
+ let note_id = note.as_event().map(|x| x.id.clone()).unwrap();
|
|
|
+
|
|
|
+ relayer
|
|
|
+ .process_request_from_client(&connection, note)
|
|
|
+ .await
|
|
|
+ .expect("process event");
|
|
|
+
|
|
|
+ sleep(Duration::from_millis(10)).await;
|
|
|
+
|
|
|
+ assert_eq!(
|
|
|
+ Some(
|
|
|
+ ROk {
|
|
|
+ id: note_id.into(),
|
|
|
+ status: ROkStatus::Ok,
|
|
|
+ }
|
|
|
+ .into()
|
|
|
+ ),
|
|
|
+ recv.try_recv().ok()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ #[tokio::test]
|
|
|
async fn subscribe_to_all() {
|
|
|
let request: Request =
|
|
|
serde_json::from_value(json!(["REQ", "1298169700973717", {}])).expect("valid object");
|
|
@@ -807,6 +899,14 @@ mod test {
|
|
|
|
|
|
sleep(Duration::from_millis(10)).await;
|
|
|
|
|
|
+ // ok from posting
|
|
|
+ let msg = recv.try_recv();
|
|
|
+ assert!(msg.is_ok());
|
|
|
+ assert_eq!(
|
|
|
+ msg.expect("is ok").as_ok().expect("valid").id.to_hex(),
|
|
|
+ "e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9".to_owned(),
|
|
|
+ );
|
|
|
+
|
|
|
// It is not empty
|
|
|
let msg = recv.try_recv();
|
|
|
assert!(msg.is_ok());
|
|
@@ -894,19 +994,19 @@ mod test {
|
|
|
assert_eq!(
|
|
|
responses
|
|
|
.get(&relayer1.port().expect("port"))
|
|
|
- .map(|x| x.event.id.clone()),
|
|
|
+ .map(|x| x.id.clone()),
|
|
|
Some(signed_content.id.clone())
|
|
|
);
|
|
|
assert_eq!(
|
|
|
responses
|
|
|
.get(&relayer2.port().expect("port"))
|
|
|
- .map(|x| x.event.id.clone()),
|
|
|
+ .map(|x| x.id.clone()),
|
|
|
Some(signed_content.id.clone())
|
|
|
);
|
|
|
assert_eq!(
|
|
|
responses
|
|
|
.get(&relayer3.port().expect("port"))
|
|
|
- .map(|x| x.event.id.clone()),
|
|
|
+ .map(|x| x.id.clone()),
|
|
|
Some(signed_content.id)
|
|
|
);
|
|
|
}
|
|
@@ -942,7 +1042,7 @@ mod test {
|
|
|
|
|
|
let account1 = Account::default();
|
|
|
let signed_content = account1
|
|
|
- .sign_content(vec![], Content::ShortTextNote("test 0".to_owned()), None)
|
|
|
+ .sign_content(vec![], Content::ShortTextNote("test 01".to_owned()), None)
|
|
|
.expect("valid signed content");
|
|
|
|
|
|
// account1 posts a new note into the relayer1, and the main relayer
|
|
@@ -956,8 +1056,8 @@ mod test {
|
|
|
Some((signed_content.id, signed_content.signature)),
|
|
|
main_client
|
|
|
.try_recv()
|
|
|
- .and_then(|(r, _)| r.as_event().cloned().map(|x| x.event))
|
|
|
- .map(|x| (x.id, x.signature))
|
|
|
+ .and_then(|(r, _)| r.as_event().cloned())
|
|
|
+ .map(|x| (x.id.clone(), x.signature.clone()))
|
|
|
);
|
|
|
assert!(main_client.try_recv().is_none());
|
|
|
}
|