|
@@ -24,6 +24,8 @@ type SubId = u128;
|
|
|
|
|
|
type Subscriptions = HashMap<SubId, (SubscriptionId, Sender<Response>)>;
|
|
|
|
|
|
+/// Relayer struct
|
|
|
+///
|
|
|
pub struct Relayer<T: Storage + Send + Sync + 'static> {
|
|
|
/// Storage engine, if provided the services are going to persisted in disk,
|
|
|
/// otherwise all the messages are going to be ephemeral, making this
|
|
@@ -63,6 +65,14 @@ impl<T: Storage + Send + Sync + 'static> Drop for Relayer<T> {
|
|
|
}
|
|
|
|
|
|
impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
+ /// Creates a new relayer instance
|
|
|
+ ///
|
|
|
+ /// If the storage is given, it will be used to persist events, as well to
|
|
|
+ /// server past events when a new subscription is added.
|
|
|
+ ///
|
|
|
+ /// If the client_pool is given it will be used to connect to those relayers
|
|
|
+ /// and create a network of relayers, reposting events to them and
|
|
|
+ /// subscribing to their events.`gqq`
|
|
|
pub fn new(storage: Option<T>, client_pool: Option<Pool>) -> Result<Self, Error> {
|
|
|
let (sender, receiver) = channel(100_000);
|
|
|
Ok(Self {
|
|
@@ -80,11 +90,16 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+ /// Splits the relayer object and extract their receiver.
|
|
|
pub fn split(mut self) -> Result<(Self, Receiver<(u128, Request)>), Error> {
|
|
|
let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?;
|
|
|
Ok((self, receiver))
|
|
|
}
|
|
|
|
|
|
+ /// Runs the relayer main loop in a tokio task and returns it.
|
|
|
+ ///
|
|
|
+ /// This function consumes the object and takes the ownership. The returned
|
|
|
+ /// JoinHandle() can be used to stop the main loop
|
|
|
pub fn main(self, server: TcpListener) -> Result<JoinHandle<()>, Error> {
|
|
|
let (this, mut receiver) = self.split()?;
|
|
|
Ok(tokio::spawn(async move {
|
|
@@ -99,7 +114,10 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
if conn_id == 0 {
|
|
|
// connection pool
|
|
|
if let Request::Event(event) = request {
|
|
|
- this.store_and_broadcast(&event.deref()).await;
|
|
|
+ if let Some(storage) = this.storage.as_ref() {
|
|
|
+ let _ = storage.store_local_event(&event).await;
|
|
|
+ }
|
|
|
+ this.broadcast(&event.deref()).await;
|
|
|
}
|
|
|
continue;
|
|
|
}
|
|
@@ -112,7 +130,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
};
|
|
|
|
|
|
// receive messages from clients
|
|
|
- let _ = this.progress_request_from_client(connection, request).await;
|
|
|
+ let _ = this.process_request_from_client(connection, request).await;
|
|
|
drop(connections);
|
|
|
}
|
|
|
else => {
|
|
@@ -135,7 +153,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
Response::Event(event) => {
|
|
|
let _ = sender.send((0, Request::Event(event.event.into()))).await;
|
|
|
}
|
|
|
- _ => {}
|
|
|
+ x => {
|
|
|
+ println!("x => {:?}", x);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -149,6 +169,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
&self.storage
|
|
|
}
|
|
|
|
|
|
+ /// Adds a new TpStream and adds it to the list of active connections.
|
|
|
+ ///
|
|
|
+ /// This function will spawn the client's loop to receive incoming messages and send those messages
|
|
|
pub async fn add_connection(
|
|
|
&self,
|
|
|
disconnection_notify: Option<mpsc::Sender<u128>>,
|
|
@@ -163,14 +186,25 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
Ok(id)
|
|
|
}
|
|
|
|
|
|
- async fn progress_request_from_client(
|
|
|
+ async fn process_request_from_client(
|
|
|
&self,
|
|
|
connection: &Connection,
|
|
|
request: Request,
|
|
|
) -> Result<Option<Request>, Error> {
|
|
|
match &request {
|
|
|
Request::Event(event) => {
|
|
|
- self.store_and_broadcast_local_event(event.deref()).await;
|
|
|
+ if let Some(storage) = self.storage.as_ref() {
|
|
|
+ let _ = storage.store(event).await;
|
|
|
+ let _ = storage.store_local_event(event).await;
|
|
|
+ }
|
|
|
+
|
|
|
+ self.broadcast(event).await;
|
|
|
+
|
|
|
+ if let Some((client_pool, _)) = self.client_pool.as_ref() {
|
|
|
+ // pass the event to the pool of clients, so this relayer can relay
|
|
|
+ // their local events to the clients in the network of relayers
|
|
|
+ let _ = client_pool.write().await.post(event.clone().into()).await;
|
|
|
+ }
|
|
|
}
|
|
|
Request::Request(request) => {
|
|
|
if let Some((client_pool, _)) = self.client_pool.as_ref() {
|
|
@@ -252,15 +286,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
Ok(Some(request))
|
|
|
}
|
|
|
|
|
|
- pub async fn send_to_conn(&self, conn_id: u128, response: Response) -> Result<(), Error> {
|
|
|
- let connections = self.clients.read().await;
|
|
|
- let connection = connections
|
|
|
- .get(&conn_id)
|
|
|
- .ok_or(Error::UnknownConnection(conn_id))?;
|
|
|
-
|
|
|
- connection.send(response)
|
|
|
- }
|
|
|
-
|
|
|
#[inline]
|
|
|
fn broadcast_to_subscribers<'a>(
|
|
|
subscriptions: RwLockReadGuard<'a, Subscriptions>,
|
|
@@ -278,28 +303,10 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
}
|
|
|
|
|
|
#[inline]
|
|
|
- pub async fn store_and_broadcast_local_event(&self, event: &Event) {
|
|
|
- if let Some(storage) = self.storage.as_ref() {
|
|
|
- let _ = storage.store_local_event(event).await;
|
|
|
- }
|
|
|
- let subscriptions = self.subscriptions.read().await;
|
|
|
-
|
|
|
- for subscription_type in Subscription::from_event(event) {
|
|
|
- if let Some(subscribers) = subscriptions.get(&subscription_type) {
|
|
|
- Self::broadcast_to_subscribers(subscribers.read().await, event);
|
|
|
- }
|
|
|
- }
|
|
|
- if let Some((client_pool, _)) = self.client_pool.as_ref() {
|
|
|
- // pass the event to the pool of clients, so this relayer can relay
|
|
|
- // their local events to the clients in the network of relayers
|
|
|
- let _ = client_pool.write().await.post(event.clone().into()).await;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- #[inline]
|
|
|
- pub async fn store_and_broadcast(&self, event: &Event) {
|
|
|
+ /// Broadcast a given event to all local subscribers
|
|
|
+ pub async fn broadcast(&self, event: &Event) {
|
|
|
if let Some(storage) = self.storage.as_ref() {
|
|
|
- let _ = storage.store(event);
|
|
|
+ let _ = storage.store(event).await;
|
|
|
}
|
|
|
let subscriptions = self.subscriptions.read().await;
|
|
|
|
|
@@ -394,7 +401,7 @@ mod test {
|
|
|
let relayer = Relayer::new(Some(get_db(true).await), None).expect("valid relayer");
|
|
|
let (connection, mut recv) = Connection::new_for_test();
|
|
|
let _ = relayer
|
|
|
- .progress_request_from_client(&connection, request)
|
|
|
+ .process_request_from_client(&connection, request)
|
|
|
.await;
|
|
|
// ev1
|
|
|
assert_eq!(
|
|
@@ -469,7 +476,7 @@ mod test {
|
|
|
let relayer = Relayer::new(Some(get_db(false).await), None).expect("valid relayer");
|
|
|
let (connection, mut recv) = Connection::new_for_test();
|
|
|
let _ = relayer
|
|
|
- .progress_request_from_client(&connection, request)
|
|
|
+ .process_request_from_client(&connection, request)
|
|
|
.await;
|
|
|
// eod
|
|
|
assert!(recv
|
|
@@ -484,7 +491,7 @@ mod test {
|
|
|
let new_event: Request = serde_json::from_str(r#"["EVENT", {"kind":1,"content":"Pong","tags":[["e","9508850d7ddc8ef58c8b392236c49d472dc23fa11f4e73eb5475dfb099ddff42","","root"],["e","2e72250d80e9b3fd30230b3db3ed7d22f15d266ed345c36700b01ec153c9e28a","","reply"],["p","39a7d06e824c0c2523bedb93f0cef84245e4401fee03b6257a1c6dfd18b57efb"],["p","ee7202ad91459e013bfef263c59e47deb0163a5e7651b026673765488bfaf102"]],"created_at":1681938616,"pubkey":"a42007e33cfa25673b26f46f39df039aa6003258a68dc88f1f1e0447607aedb3","id":"e862fe23daf52ab09b36a37fa91ca3743e0c323e630e8627891212ca147c2da9","sig":"9036150a6c8a32933cffcc42aec4d2109a22e9f10d1c3860c0435a925e6386babb7df5c95fcf68c8ed6a9726a1f07225af663d0b068eb555014130aad21674fc","meta":{"revision":0,"created":1681939266488,"version":0},"$loki":108}]"#).expect("value");
|
|
|
|
|
|
relayer
|
|
|
- .progress_request_from_client(&connection, new_event)
|
|
|
+ .process_request_from_client(&connection, new_event)
|
|
|
.await
|
|
|
.expect("process event");
|
|
|
|