|
@@ -4,25 +4,26 @@
|
|
|
use std::collections::HashMap;
|
|
use std::collections::HashMap;
|
|
|
use std::sync::atomic::AtomicBool;
|
|
use std::sync::atomic::AtomicBool;
|
|
|
use std::sync::{Arc, RwLock};
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
|
+use std::time::Duration;
|
|
|
|
|
|
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::mpsc;
|
|
|
|
|
+use tokio::time::sleep;
|
|
|
|
|
|
|
|
use super::pubsub::Topic;
|
|
use super::pubsub::Topic;
|
|
|
use super::subscriber::{ActiveSubscription, SubscriptionRequest};
|
|
use super::subscriber::{ActiveSubscription, SubscriptionRequest};
|
|
|
-use super::{Error, Indexable, Pubsub};
|
|
|
|
|
-
|
|
|
|
|
-type ActiveSubscriptions<T> = RwLock<
|
|
|
|
|
- HashMap<
|
|
|
|
|
- <T as Topic>::SubscriptionName,
|
|
|
|
|
- (
|
|
|
|
|
- Vec<<<T as Topic>::Event as Indexable>::Index>,
|
|
|
|
|
- ActiveSubscription<T>,
|
|
|
|
|
- ),
|
|
|
|
|
- >,
|
|
|
|
|
->;
|
|
|
|
|
|
|
+use super::{Error, Event, Pubsub};
|
|
|
|
|
+
|
|
|
|
|
+type UniqueSubscriptions<T> =
|
|
|
|
|
+ RwLock<HashMap<<<T as Topic>::Event as Event>::Topic, (usize, ActiveSubscription<T>)>>;
|
|
|
|
|
+
|
|
|
|
|
+type ActiveSubscriptions<T> =
|
|
|
|
|
+ RwLock<HashMap<<T as Topic>::SubscriptionName, Vec<<<T as Topic>::Event as Event>::Topic>>>;
|
|
|
|
|
|
|
|
type InternalSender<T> = mpsc::Sender<(<T as Topic>::SubscriptionName, <T as Topic>::Event)>;
|
|
type InternalSender<T> = mpsc::Sender<(<T as Topic>::SubscriptionName, <T as Topic>::Event)>;
|
|
|
|
|
|
|
|
|
|
+const LONG_CONNECTION_SLEEP_MS: u64 = 10;
|
|
|
|
|
+const POLL_SLEEP_SECS: u64 = 2;
|
|
|
|
|
+
|
|
|
/// Subscription consumer
|
|
/// Subscription consumer
|
|
|
pub struct Consumer<T>
|
|
pub struct Consumer<T>
|
|
|
where
|
|
where
|
|
@@ -31,24 +32,67 @@ where
|
|
|
transport: T,
|
|
transport: T,
|
|
|
pubsub_internal_sender: InternalSender<T::Topic>,
|
|
pubsub_internal_sender: InternalSender<T::Topic>,
|
|
|
inner_pubsub: Pubsub<T::Topic>,
|
|
inner_pubsub: Pubsub<T::Topic>,
|
|
|
|
|
+ unique_subscriptions: UniqueSubscriptions<T::Topic>,
|
|
|
subscriptions: ActiveSubscriptions<T::Topic>,
|
|
subscriptions: ActiveSubscriptions<T::Topic>,
|
|
|
send_to_transport_loop: RwLock<mpsc::Sender<MessageToTransportLoop<T::Topic>>>,
|
|
send_to_transport_loop: RwLock<mpsc::Sender<MessageToTransportLoop<T::Topic>>>,
|
|
|
still_running: AtomicBool,
|
|
still_running: AtomicBool,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+/// Remote consumer
|
|
|
|
|
+pub struct RemoteActiveConsumer<T>
|
|
|
|
|
+where
|
|
|
|
|
+ T: Transport + 'static, {}
|
|
|
|
|
+
|
|
|
|
|
+struct InternalConversion<T>
|
|
|
|
|
+where
|
|
|
|
|
+ T: Transport + 'static,
|
|
|
|
|
+{
|
|
|
|
|
+ name: <T::Topic as Topic>::SubscriptionName,
|
|
|
|
|
+ index: <<T::Topic as Topic>::Event as Event>::Topic,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl<T> Clone for InternalConversion<T>
|
|
|
|
|
+where
|
|
|
|
|
+ T: Transport + 'static,
|
|
|
|
|
+{
|
|
|
|
|
+ fn clone(&self) -> Self {
|
|
|
|
|
+ Self {
|
|
|
|
|
+ name: self.name.clone(),
|
|
|
|
|
+ index: self.index.clone(),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl<T> SubscriptionRequest for InternalConversion<T>
|
|
|
|
|
+where
|
|
|
|
|
+ T: Transport + 'static,
|
|
|
|
|
+{
|
|
|
|
|
+ type Topic = <<T::Topic as Topic>::Event as Event>::Topic;
|
|
|
|
|
+
|
|
|
|
|
+ type SubscriptionName = <T::Topic as Topic>::SubscriptionName;
|
|
|
|
|
+
|
|
|
|
|
+ fn subscription_name(&self) -> Self::SubscriptionName {
|
|
|
|
|
+ self.name.to_owned()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
|
|
|
|
|
+ Ok(vec![self.index.clone()])
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
impl<T> Consumer<T>
|
|
impl<T> Consumer<T>
|
|
|
where
|
|
where
|
|
|
T: Transport + 'static,
|
|
T: Transport + 'static,
|
|
|
{
|
|
{
|
|
|
/// Creates a new instance
|
|
/// Creates a new instance
|
|
|
- pub async fn new(transport: T) -> Arc<Self> {
|
|
|
|
|
- let (sender, _) = mpsc::channel(10_000);
|
|
|
|
|
|
|
+ pub fn new<F>(transport: T, inner_pubsub: Pubsub<T::Topic>) -> Arc<Self> {
|
|
|
let this = Arc::new(Self {
|
|
let this = Arc::new(Self {
|
|
|
transport,
|
|
transport,
|
|
|
- inner_pubsub: T::new_pubsub().await,
|
|
|
|
|
|
|
+ inner_pubsub,
|
|
|
pubsub_internal_sender: mpsc::channel(10_000).0,
|
|
pubsub_internal_sender: mpsc::channel(10_000).0,
|
|
|
subscriptions: Default::default(),
|
|
subscriptions: Default::default(),
|
|
|
- send_to_transport_loop: RwLock::new(sender),
|
|
|
|
|
|
|
+ unique_subscriptions: Default::default(),
|
|
|
|
|
+ send_to_transport_loop: RwLock::new(mpsc::channel(10_000).0),
|
|
|
still_running: true.into(),
|
|
still_running: true.into(),
|
|
|
});
|
|
});
|
|
|
|
|
|
|
@@ -58,15 +102,72 @@ where
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
async fn connection_loop(instance: Arc<Self>) {
|
|
async fn connection_loop(instance: Arc<Self>) {
|
|
|
- loop {
|
|
|
|
|
- let (sender, receiver) = mpsc::channel(10_000);
|
|
|
|
|
|
|
+ let mut long_connection_supported = true;
|
|
|
|
|
+ let mut poll_supported = true;
|
|
|
|
|
|
|
|
|
|
+ loop {
|
|
|
|
|
+ if (!long_connection_supported && !poll_supported)
|
|
|
|
|
+ || !instance
|
|
|
|
|
+ .still_running
|
|
|
|
|
+ .load(std::sync::atomic::Ordering::Relaxed)
|
|
|
{
|
|
{
|
|
|
- let mut shared_sender = instance.send_to_transport_loop.write().unwrap();
|
|
|
|
|
- *shared_sender = sender;
|
|
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- instance.transport.long_connection(receiver).await;
|
|
|
|
|
|
|
+ if long_connection_supported {
|
|
|
|
|
+ let (sender, receiver) = mpsc::channel(10_000);
|
|
|
|
|
+
|
|
|
|
|
+ {
|
|
|
|
|
+ let mut shared_sender = instance.send_to_transport_loop.write().unwrap();
|
|
|
|
|
+ *shared_sender = sender;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let current_subscriptions = instance
|
|
|
|
|
+ .unique_subscriptions
|
|
|
|
|
+ .read()
|
|
|
|
|
+ .expect("xxx")
|
|
|
|
|
+ .keys()
|
|
|
|
|
+ .map(|x| x.clone())
|
|
|
|
|
+ .collect::<Vec<_>>();
|
|
|
|
|
+
|
|
|
|
|
+ if let Err(err) = instance
|
|
|
|
|
+ .transport
|
|
|
|
|
+ .long_connection(receiver, current_subscriptions)
|
|
|
|
|
+ .await
|
|
|
|
|
+ {
|
|
|
|
|
+ if matches!(err, Error::NotSupported) {
|
|
|
|
|
+ long_connection_supported = false;
|
|
|
|
|
+ }
|
|
|
|
|
+ tracing::error!("Long connection failed with error {:?}", err);
|
|
|
|
|
+ }
|
|
|
|
|
+ sleep(Duration::from_millis(LONG_CONNECTION_SLEEP_MS)).await;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if poll_supported {
|
|
|
|
|
+ let current_subscriptions = instance
|
|
|
|
|
+ .unique_subscriptions
|
|
|
|
|
+ .read()
|
|
|
|
|
+ .expect("xxx")
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .map(|(key, value)| (value.1.name().clone(), key.clone()))
|
|
|
|
|
+ .collect::<Vec<_>>();
|
|
|
|
|
+
|
|
|
|
|
+ if let Err(err) = instance
|
|
|
|
|
+ .transport
|
|
|
|
|
+ .poll(
|
|
|
|
|
+ current_subscriptions,
|
|
|
|
|
+ instance.pubsub_internal_sender.clone(),
|
|
|
|
|
+ )
|
|
|
|
|
+ .await
|
|
|
|
|
+ {
|
|
|
|
|
+ if matches!(err, Error::NotSupported) {
|
|
|
|
|
+ poll_supported = false;
|
|
|
|
|
+ }
|
|
|
|
|
+ tracing::error!("Polling failed with error {:?}", err);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ sleep(Duration::from_secs(POLL_SLEEP_SECS)).await;
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -82,44 +183,61 @@ where
|
|
|
/// broadcasat the event.
|
|
/// broadcasat the event.
|
|
|
///
|
|
///
|
|
|
///
|
|
///
|
|
|
- pub fn subscribe<I>(&self, request: I) -> Result<(), Error>
|
|
|
|
|
|
|
+ pub fn subscribe<I>(&self, request: I) -> Result<RemoteActiveConsumer<T>, Error>
|
|
|
where
|
|
where
|
|
|
I: SubscriptionRequest<
|
|
I: SubscriptionRequest<
|
|
|
- Index = <<T::Topic as Topic>::Event as Indexable>::Index,
|
|
|
|
|
|
|
+ Topic = <<T::Topic as Topic>::Event as Event>::Topic,
|
|
|
SubscriptionName = <T::Topic as Topic>::SubscriptionName,
|
|
SubscriptionName = <T::Topic as Topic>::SubscriptionName,
|
|
|
>,
|
|
>,
|
|
|
{
|
|
{
|
|
|
- let transport_loop = self
|
|
|
|
|
- .send_to_transport_loop
|
|
|
|
|
- .read()
|
|
|
|
|
|
|
+ let subscription_name = request.subscription_name();
|
|
|
|
|
+ let indexes = request.try_get_topics()?;
|
|
|
|
|
+
|
|
|
|
|
+ let mut unique_subscriptions = self
|
|
|
|
|
+ .unique_subscriptions
|
|
|
|
|
+ .write()
|
|
|
.map_err(|_| Error::Poison)?;
|
|
.map_err(|_| Error::Poison)?;
|
|
|
let mut subscriptions = self.subscriptions.write().map_err(|_| Error::Poison)?;
|
|
let mut subscriptions = self.subscriptions.write().map_err(|_| Error::Poison)?;
|
|
|
- let subscription_name = request.subscription_name();
|
|
|
|
|
- let indexes = request.try_get_indexes()?;
|
|
|
|
|
|
|
|
|
|
if subscriptions.get(&subscription_name).is_some() {
|
|
if subscriptions.get(&subscription_name).is_some() {
|
|
|
return Err(Error::AlreadySubscribed);
|
|
return Err(Error::AlreadySubscribed);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- subscriptions.insert(
|
|
|
|
|
- subscription_name.clone(),
|
|
|
|
|
- (
|
|
|
|
|
- indexes.clone(),
|
|
|
|
|
- self.inner_pubsub.subscribe_with(
|
|
|
|
|
- request,
|
|
|
|
|
- self.pubsub_internal_sender.clone(),
|
|
|
|
|
- None,
|
|
|
|
|
- )?,
|
|
|
|
|
- ),
|
|
|
|
|
- );
|
|
|
|
|
- drop(subscriptions);
|
|
|
|
|
|
|
+ for index in indexes.iter() {
|
|
|
|
|
+ if let Some(subscription) = unique_subscriptions.get_mut(&index) {
|
|
|
|
|
+ subscription.0 = subscription.0 + 1;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ unique_subscriptions.insert(
|
|
|
|
|
+ index.clone(),
|
|
|
|
|
+ (
|
|
|
|
|
+ 0,
|
|
|
|
|
+ self.inner_pubsub.subscribe_with(
|
|
|
|
|
+ InternalConversion::<T> {
|
|
|
|
|
+ name: self.transport.new_name(),
|
|
|
|
|
+ index: index.clone(),
|
|
|
|
|
+ },
|
|
|
|
|
+ self.pubsub_internal_sender.clone(),
|
|
|
|
|
+ None,
|
|
|
|
|
+ )?,
|
|
|
|
|
+ ),
|
|
|
|
|
+ );
|
|
|
|
|
|
|
|
- let _ = transport_loop.try_send(MessageToTransportLoop::Subscribe((
|
|
|
|
|
- subscription_name,
|
|
|
|
|
- indexes,
|
|
|
|
|
- )));
|
|
|
|
|
|
|
+ // new subscription is created, so the connection worker should be notified
|
|
|
|
|
+ let _ = self
|
|
|
|
|
+ .send_to_transport_loop
|
|
|
|
|
+ .read()
|
|
|
|
|
+ .map_err(|_| Error::Poison)?
|
|
|
|
|
+ .try_send(MessageToTransportLoop::Subscribe((
|
|
|
|
|
+ subscription_name.clone(),
|
|
|
|
|
+ indexes.clone(),
|
|
|
|
|
+ )));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- Ok(())
|
|
|
|
|
|
|
+ subscriptions.insert(subscription_name, indexes);
|
|
|
|
|
+ drop(subscriptions);
|
|
|
|
|
+
|
|
|
|
|
+ todo!()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -141,7 +259,7 @@ where
|
|
|
T: Topic + 'static,
|
|
T: Topic + 'static,
|
|
|
{
|
|
{
|
|
|
/// Add a subscription
|
|
/// Add a subscription
|
|
|
- Subscribe((T::SubscriptionName, Vec<<T::Event as Indexable>::Index>)),
|
|
|
|
|
|
|
+ Subscribe((T::SubscriptionName, Vec<<T::Event as Event>::Topic>)),
|
|
|
/// Desuscribe
|
|
/// Desuscribe
|
|
|
Desuscribe(T::SubscriptionName),
|
|
Desuscribe(T::SubscriptionName),
|
|
|
/// Exit the loop
|
|
/// Exit the loop
|
|
@@ -154,19 +272,28 @@ pub trait Transport: Send + Sync {
|
|
|
/// Topic
|
|
/// Topic
|
|
|
type Topic: Topic + Clone + Sync + Send;
|
|
type Topic: Topic + Clone + Sync + Send;
|
|
|
|
|
|
|
|
- /// Creates a new pubsub topic producer
|
|
|
|
|
- async fn new_pubsub() -> Pubsub<Self::Topic>;
|
|
|
|
|
|
|
+ /// Create a new subscription name
|
|
|
|
|
+ fn new_name(&self) -> <Self::Topic as Topic>::SubscriptionName;
|
|
|
|
|
|
|
|
/// Open a long connection
|
|
/// Open a long connection
|
|
|
async fn long_connection(
|
|
async fn long_connection(
|
|
|
&self,
|
|
&self,
|
|
|
subscribe_changes: mpsc::Receiver<MessageToTransportLoop<Self::Topic>>,
|
|
subscribe_changes: mpsc::Receiver<MessageToTransportLoop<Self::Topic>>,
|
|
|
- ) where
|
|
|
|
|
|
|
+ topics: Vec<<<Self::Topic as Topic>::Event as Event>::Topic>,
|
|
|
|
|
+ ) -> Result<(), Error>
|
|
|
|
|
+ where
|
|
|
Self: Sized;
|
|
Self: Sized;
|
|
|
|
|
|
|
|
/// Poll on demand
|
|
/// Poll on demand
|
|
|
async fn poll(
|
|
async fn poll(
|
|
|
&self,
|
|
&self,
|
|
|
- index: Vec<<<Self::Topic as Topic>::Event as Indexable>::Index>,
|
|
|
|
|
- ) -> Result<Vec<Self::Topic>, Error>;
|
|
|
|
|
|
|
+ topics: Vec<(
|
|
|
|
|
+ <Self::Topic as Topic>::SubscriptionName,
|
|
|
|
|
+ <<Self::Topic as Topic>::Event as Event>::Topic,
|
|
|
|
|
+ )>,
|
|
|
|
|
+ reply_to: mpsc::Sender<(
|
|
|
|
|
+ <Self::Topic as Topic>::SubscriptionName,
|
|
|
|
|
+ <Self::Topic as Topic>::Event,
|
|
|
|
|
+ )>,
|
|
|
|
|
+ ) -> Result<(), Error>;
|
|
|
}
|
|
}
|