|
@@ -14,7 +14,6 @@ use std::{
|
|
collections::{HashMap, HashSet},
|
|
collections::{HashMap, HashSet},
|
|
ops::Deref,
|
|
ops::Deref,
|
|
sync::Arc,
|
|
sync::Arc,
|
|
- time::Instant,
|
|
|
|
};
|
|
};
|
|
use tokio::{
|
|
use tokio::{
|
|
net::{TcpListener, TcpStream},
|
|
net::{TcpListener, TcpStream},
|
|
@@ -144,8 +143,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
|
|
|
let handle = tokio::spawn(async move {
|
|
let handle = tokio::spawn(async move {
|
|
loop {
|
|
loop {
|
|
- let start = Instant::now();
|
|
|
|
- println!("{}", client_pool_receiver.len());
|
|
|
|
tokio::select! {
|
|
tokio::select! {
|
|
Ok((stream, _)) = server.accept() => {
|
|
Ok((stream, _)) = server.accept() => {
|
|
// accept new connections
|
|
// accept new connections
|
|
@@ -177,8 +174,6 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
};
|
|
};
|
|
|
|
|
|
let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into()));
|
|
let _ = connection.respond(Response::EndOfStoredEvents(sub_id.into()));
|
|
- let duration = start.elapsed();
|
|
|
|
- println!("xTime elapsed: {} ms", duration.as_millis());
|
|
|
|
}
|
|
}
|
|
_ => {}
|
|
_ => {}
|
|
}
|
|
}
|
|
@@ -355,37 +350,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
let connection = read_connections
|
|
let connection = read_connections
|
|
.get(&connection_id)
|
|
.get(&connection_id)
|
|
.ok_or(Error::UnknownConnection(connection_id))?;
|
|
.ok_or(Error::UnknownConnection(connection_id))?;
|
|
-
|
|
|
|
- if let Some(storage) = storage.as_ref() {
|
|
|
|
- let mut sent = HashSet::new();
|
|
|
|
- // Sent all events that match the filter that are stored in our database
|
|
|
|
- for filter in request.filters.clone().into_iter() {
|
|
|
|
- let mut result = storage.get_by_filter(filter).await?;
|
|
|
|
-
|
|
|
|
- while let Some(Ok(event)) = result.next().await {
|
|
|
|
- if sent.contains(&event.id) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- sent.insert(event.id.clone());
|
|
|
|
- let _ = connection.respond(
|
|
|
|
- relayer::Event {
|
|
|
|
- subscription_id: request.subscription_id.clone(),
|
|
|
|
- event,
|
|
|
|
- }
|
|
|
|
- .into(),
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if foreign_subscription.is_none() {
|
|
|
|
- // If there is a foreign subscription, we shouldn't send a
|
|
|
|
- // EOS until we have got EOS from all foreign relays
|
|
|
|
- let _ = connection.respond(
|
|
|
|
- relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ let has_foreign_subscription = foreign_subscription.is_some();
|
|
connection
|
|
connection
|
|
.subscribe(
|
|
.subscribe(
|
|
request.subscription_id.clone(),
|
|
request.subscription_id.clone(),
|
|
@@ -393,14 +358,56 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
foreign_subscription,
|
|
foreign_subscription,
|
|
subscription_manager
|
|
subscription_manager
|
|
.subscribe(
|
|
.subscribe(
|
|
- (request.subscription_id, connection.get_conn_id()).into(),
|
|
|
|
- request.filters,
|
|
|
|
|
|
+ (request.subscription_id.clone(), connection.get_conn_id())
|
|
|
|
+ .into(),
|
|
|
|
+ request.filters.clone(),
|
|
(),
|
|
(),
|
|
)
|
|
)
|
|
.await,
|
|
.await,
|
|
),
|
|
),
|
|
)
|
|
)
|
|
.await;
|
|
.await;
|
|
|
|
+
|
|
|
|
+ let responder = connection.get_responder();
|
|
|
|
+
|
|
|
|
+ drop(read_connections);
|
|
|
|
+
|
|
|
|
+ tokio::spawn(async move {
|
|
|
|
+ if let Some(storage) = storage.as_ref() {
|
|
|
|
+ let mut sent = HashSet::new();
|
|
|
|
+ // Sent all events that match the filter that are stored in our database
|
|
|
|
+ for filter in request.filters.into_iter() {
|
|
|
|
+ let mut result = if let Ok(result) = storage.get_by_filter(filter).await
|
|
|
|
+ {
|
|
|
|
+ result
|
|
|
|
+ } else {
|
|
|
|
+ return;
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ while let Some(Ok(event)) = result.next().await {
|
|
|
|
+ if sent.contains(&event.id) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ sent.insert(event.id.clone());
|
|
|
|
+ let _ = responder.try_send(
|
|
|
|
+ relayer::Event {
|
|
|
|
+ subscription_id: request.subscription_id.clone(),
|
|
|
|
+ event,
|
|
|
|
+ }
|
|
|
|
+ .into(),
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if !has_foreign_subscription {
|
|
|
|
+ // If there is a foreign subscription, we shouldn't send a
|
|
|
|
+ // EOS until we have got EOS from all foreign relays
|
|
|
|
+ let _ = responder.try_send(
|
|
|
|
+ relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ });
|
|
}
|
|
}
|
|
Request::Close(close) => {
|
|
Request::Close(close) => {
|
|
connections
|
|
connections
|
|
@@ -431,9 +438,17 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
}
|
|
}
|
|
|
|
|
|
let connections = connections.read().await;
|
|
let connections = connections.read().await;
|
|
|
|
+ let mut sent = HashSet::new();
|
|
|
|
+
|
|
for RelayerSubscriptionId((sub_id, conn_id)) in
|
|
for RelayerSubscriptionId((sub_id, conn_id)) in
|
|
subscription_manager.get_subscribers(&event).await
|
|
subscription_manager.get_subscribers(&event).await
|
|
{
|
|
{
|
|
|
|
+ if sent.contains(&conn_id) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ sent.insert(conn_id);
|
|
|
|
+
|
|
if let Some(connection) = connections.get(&conn_id) {
|
|
if let Some(connection) = connections.get(&conn_id) {
|
|
let _ = connection.respond(
|
|
let _ = connection.respond(
|
|
relayer::Event {
|
|
relayer::Event {
|