|
@@ -350,37 +350,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
let connection = read_connections
|
|
|
.get(&connection_id)
|
|
|
.ok_or(Error::UnknownConnection(connection_id))?;
|
|
|
-
|
|
|
- if let Some(storage) = storage.as_ref() {
|
|
|
- let mut sent = HashSet::new();
|
|
|
- // Sent all events that match the filter that are stored in our database
|
|
|
- for filter in request.filters.clone().into_iter() {
|
|
|
- let mut result = storage.get_by_filter(filter).await?;
|
|
|
-
|
|
|
- while let Some(Ok(event)) = result.next().await {
|
|
|
- if sent.contains(&event.id) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- sent.insert(event.id.clone());
|
|
|
- let _ = connection.respond(
|
|
|
- relayer::Event {
|
|
|
- subscription_id: request.subscription_id.clone(),
|
|
|
- event,
|
|
|
- }
|
|
|
- .into(),
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if foreign_subscription.is_none() {
|
|
|
- // If there is a foreign subscription, we shouldn't send a
|
|
|
- // EOS until we have got EOS from all foreign relays
|
|
|
- let _ = connection.respond(
|
|
|
- relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
+ let has_foreign_subscription = foreign_subscription.is_some();
|
|
|
connection
|
|
|
.subscribe(
|
|
|
request.subscription_id.clone(),
|
|
@@ -388,14 +358,56 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
|
|
|
foreign_subscription,
|
|
|
subscription_manager
|
|
|
.subscribe(
|
|
|
- (request.subscription_id, connection.get_conn_id()).into(),
|
|
|
- request.filters,
|
|
|
+ (request.subscription_id.clone(), connection.get_conn_id())
|
|
|
+ .into(),
|
|
|
+ request.filters.clone(),
|
|
|
(),
|
|
|
)
|
|
|
.await,
|
|
|
),
|
|
|
)
|
|
|
.await;
|
|
|
+
|
|
|
+ let responder = connection.get_responder();
|
|
|
+
|
|
|
+ drop(read_connections);
|
|
|
+
|
|
|
+ tokio::spawn(async move {
|
|
|
+ if let Some(storage) = storage.as_ref() {
|
|
|
+ let mut sent = HashSet::new();
|
|
|
+ // Sent all events that match the filter that are stored in our database
|
|
|
+ for filter in request.filters.into_iter() {
|
|
|
+ let mut result = if let Ok(result) = storage.get_by_filter(filter).await
|
|
|
+ {
|
|
|
+ result
|
|
|
+ } else {
|
|
|
+ return;
|
|
|
+ };
|
|
|
+
|
|
|
+ while let Some(Ok(event)) = result.next().await {
|
|
|
+ if sent.contains(&event.id) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ sent.insert(event.id.clone());
|
|
|
+ let _ = responder.try_send(
|
|
|
+ relayer::Event {
|
|
|
+ subscription_id: request.subscription_id.clone(),
|
|
|
+ event,
|
|
|
+ }
|
|
|
+ .into(),
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if !has_foreign_subscription {
|
|
|
+ // If there is a foreign subscription, we shouldn't send a
|
|
|
+ // EOS until we have got EOS from all foreign relays
|
|
|
+ let _ = responder.try_send(
|
|
|
+ relayer::EndOfStoredEvents(request.subscription_id.clone()).into(),
|
|
|
+ );
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
Request::Close(close) => {
|
|
|
connections
|