| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879 |
- //! Pub-sub consumer
- //!
- //! Consumers are designed to connect to a producer, through a transport, and subscribe to events.
- use std::collections::{HashMap, VecDeque};
- use std::sync::atomic::AtomicBool;
- use std::sync::Arc;
- use std::time::Duration;
- use parking_lot::RwLock;
- use tokio::sync::mpsc;
- use tokio::time::{sleep, Instant};
- use super::subscriber::{ActiveSubscription, SubscriptionRequest};
- use super::{Error, Event, Pubsub, Spec};
- use crate::task::spawn;
- const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
- const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
- const INTERNAL_POLL_SIZE: usize = 1_000;
- const POLL_SLEEP: Duration = Duration::from_millis(2_000);
- struct UniqueSubscription<S>
- where
- S: Spec,
- {
- name: S::SubscriptionId,
- total_subscribers: usize,
- }
- type UniqueSubscriptions<S> = RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>;
- type ActiveSubscriptions<S> =
- RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
- type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
- /// Subscription consumer
- pub struct Consumer<T>
- where
- T: Transport + 'static,
- {
- transport: T,
- inner_pubsub: Arc<Pubsub<T::Spec>>,
- remote_subscriptions: UniqueSubscriptions<T::Spec>,
- subscriptions: ActiveSubscriptions<T::Spec>,
- stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
- still_running: AtomicBool,
- prefer_polling: bool,
- /// Cached events
- ///
- /// The cached events are useful to share events. The cache is automatically evicted it is
- /// disconnected from the remote source, meaning the cache is only active while there is an
- /// active subscription to the remote source, and it remembers the latest event.
- cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
- }
- /// Remote consumer
- pub struct RemoteActiveConsumer<T>
- where
- T: Transport + 'static,
- {
- inner: ActiveSubscription<T::Spec>,
- previous_messages: VecDeque<<T::Spec as Spec>::Event>,
- consumer: Arc<Consumer<T>>,
- }
- impl<T> RemoteActiveConsumer<T>
- where
- T: Transport + 'static,
- {
- /// Receives the next event
- pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
- if let Some(event) = self.previous_messages.pop_front() {
- Some(event)
- } else {
- self.inner.recv().await
- }
- }
- /// Try receive an event or return Noen right away
- pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
- if let Some(event) = self.previous_messages.pop_front() {
- Some(event)
- } else {
- self.inner.try_recv()
- }
- }
- /// Get the subscription name
- pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
- self.inner.name()
- }
- }
- impl<T> Drop for RemoteActiveConsumer<T>
- where
- T: Transport + 'static,
- {
- fn drop(&mut self) {
- let _ = self.consumer.unsubscribe(self.name().clone());
- }
- }
- /// Struct to relay events from Poll and Streams from the external subscription to the local
- /// subscribers
- pub struct InternalRelay<S>
- where
- S: Spec + 'static,
- {
- inner: Arc<Pubsub<S>>,
- cached_events: Arc<RwLock<CacheEvent<S>>>,
- }
- impl<S> InternalRelay<S>
- where
- S: Spec + 'static,
- {
- /// Relay a remote event locally
- pub fn send<X>(&self, event: X)
- where
- X: Into<S::Event>,
- {
- let event = event.into();
- let mut cached_events = self.cached_events.write();
- for topic in event.get_topics() {
- cached_events.insert(topic, event.clone());
- }
- self.inner.publish(event);
- }
- }
- impl<T> Consumer<T>
- where
- T: Transport + 'static,
- {
- /// Creates a new instance
- pub fn new(
- transport: T,
- prefer_polling: bool,
- context: <T::Spec as Spec>::Context,
- ) -> Arc<Self> {
- let this = Arc::new(Self {
- transport,
- prefer_polling,
- inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
- subscriptions: Default::default(),
- remote_subscriptions: Default::default(),
- stream_ctrl: RwLock::new(None),
- cached_events: Default::default(),
- still_running: true.into(),
- });
- spawn(Self::stream(this.clone()));
- this
- }
- async fn stream(instance: Arc<Self>) {
- let mut stream_supported = true;
- let mut poll_supported = true;
- let mut backoff = STREAM_CONNECTION_BACKOFF;
- let mut retry_at = None;
- loop {
- if (!stream_supported && !poll_supported)
- || !instance
- .still_running
- .load(std::sync::atomic::Ordering::Relaxed)
- {
- break;
- }
- if instance.remote_subscriptions.read().is_empty() {
- sleep(Duration::from_millis(100)).await;
- continue;
- }
- if stream_supported
- && !instance.prefer_polling
- && retry_at
- .map(|retry_at| retry_at < Instant::now())
- .unwrap_or(true)
- {
- let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
- {
- *instance.stream_ctrl.write() = Some(sender);
- }
- let current_subscriptions = {
- instance
- .remote_subscriptions
- .read()
- .iter()
- .map(|(key, name)| (name.name.clone(), key.clone()))
- .collect::<Vec<_>>()
- };
- if let Err(err) = instance
- .transport
- .stream(
- receiver,
- current_subscriptions,
- InternalRelay {
- inner: instance.inner_pubsub.clone(),
- cached_events: instance.cached_events.clone(),
- },
- )
- .await
- {
- retry_at = Some(Instant::now() + backoff);
- backoff =
- (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
- if matches!(err, Error::NotSupported) {
- stream_supported = false;
- }
- tracing::error!("Long connection failed with error {:?}", err);
- } else {
- backoff = STREAM_CONNECTION_BACKOFF;
- }
- // remove sender to stream, as there is no stream
- let _ = instance.stream_ctrl.write().take();
- }
- if poll_supported {
- let current_subscriptions = {
- instance
- .remote_subscriptions
- .read()
- .iter()
- .map(|(key, name)| (name.name.clone(), key.clone()))
- .collect::<Vec<_>>()
- };
- if let Err(err) = instance
- .transport
- .poll(
- current_subscriptions,
- InternalRelay {
- inner: instance.inner_pubsub.clone(),
- cached_events: instance.cached_events.clone(),
- },
- )
- .await
- {
- if matches!(err, Error::NotSupported) {
- poll_supported = false;
- }
- tracing::error!("Polling failed with error {:?}", err);
- }
- sleep(POLL_SLEEP).await;
- }
- }
- }
- /// Unsubscribe from a topic, this is called automatically when RemoteActiveSubscription<T> goes
- /// out of scope
- fn unsubscribe(
- self: &Arc<Self>,
- subscription_name: <T::Spec as Spec>::SubscriptionId,
- ) -> Result<(), Error> {
- let topics = self
- .subscriptions
- .write()
- .remove(&subscription_name)
- .ok_or(Error::NoSubscription)?;
- let mut remote_subscriptions = self.remote_subscriptions.write();
- for topic in topics {
- let mut remote_subscription =
- if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
- remote_subscription
- } else {
- continue;
- };
- remote_subscription.total_subscribers = remote_subscription
- .total_subscribers
- .checked_sub(1)
- .unwrap_or_default();
- if remote_subscription.total_subscribers == 0 {
- let mut cached_events = self.cached_events.write();
- cached_events.remove(&topic);
- self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
- } else {
- remote_subscriptions.insert(topic, remote_subscription);
- }
- }
- if remote_subscriptions.is_empty() {
- self.message_to_stream(StreamCtrl::Stop)?;
- }
- Ok(())
- }
- #[inline(always)]
- fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
- let to_stream = self.stream_ctrl.read();
- if let Some(to_stream) = to_stream.as_ref() {
- Ok(to_stream.try_send(message)?)
- } else {
- Ok(())
- }
- }
- /// Creates a subscription
- ///
- /// The subscriptions have two parts:
- ///
- /// 1. Will create the subscription to the remote Pubsub service, Any events will be moved to
- /// the internal pubsub
- ///
- /// 2. The internal subscription to the inner Pubsub. Because all subscriptions are going the
- /// transport, once events matches subscriptions, the inner_pubsub will receive the message and
- /// broadcasat the event.
- pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
- where
- I: SubscriptionRequest<
- Topic = <T::Spec as Spec>::Topic,
- SubscriptionId = <T::Spec as Spec>::SubscriptionId,
- >,
- {
- let subscription_name = request.subscription_name();
- let topics = request.try_get_topics()?;
- let mut remote_subscriptions = self.remote_subscriptions.write();
- let mut subscriptions = self.subscriptions.write();
- if subscriptions.get(&subscription_name).is_some() {
- return Err(Error::NoSubscription);
- }
- let mut previous_messages = Vec::new();
- let cached_events = self.cached_events.read();
- for topic in topics.iter() {
- if let Some(subscription) = remote_subscriptions.get_mut(topic) {
- subscription.total_subscribers += 1;
- if let Some(v) = cached_events.get(topic).cloned() {
- previous_messages.push(v);
- }
- } else {
- let internal_sub_name = self.transport.new_name();
- remote_subscriptions.insert(
- topic.clone(),
- UniqueSubscription {
- total_subscribers: 1,
- name: internal_sub_name.clone(),
- },
- );
- // new subscription is created, so the connection worker should be notified
- self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
- }
- }
- subscriptions.insert(subscription_name, topics);
- drop(subscriptions);
- Ok(RemoteActiveConsumer {
- inner: self.inner_pubsub.subscribe(request)?,
- previous_messages: previous_messages.into(),
- consumer: self.clone(),
- })
- }
- }
- impl<T> Drop for Consumer<T>
- where
- T: Transport + 'static,
- {
- fn drop(&mut self) {
- self.still_running
- .store(false, std::sync::atomic::Ordering::Release);
- if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
- let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
- tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
- });
- }
- }
- }
- /// Subscribe Message
- pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
- /// Messages sent from the [`Consumer`] to the [`Transport`] background loop.
- pub enum StreamCtrl<S>
- where
- S: Spec + 'static,
- {
- /// Add a subscription
- Subscribe(SubscribeMessage<S>),
- /// Desuscribe
- Unsubscribe(S::SubscriptionId),
- /// Exit the loop
- Stop,
- }
- impl<S> Clone for StreamCtrl<S>
- where
- S: Spec + 'static,
- {
- fn clone(&self) -> Self {
- match self {
- Self::Subscribe(s) => Self::Subscribe(s.clone()),
- Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
- Self::Stop => Self::Stop,
- }
- }
- }
- /// Transport abstracts how the consumer talks to the remote pubsub.
- ///
- /// Implement this on your HTTP/WebSocket client. The transport is responsible for:
- /// - creating unique subscription names,
- /// - keeping a long connection via `stream` **or** performing on-demand `poll`,
- /// - forwarding remote events to `InternalRelay`.
- ///
- /// ```ignore
- /// struct WsTransport { /* ... */ }
- /// #[async_trait::async_trait]
- /// impl Transport for WsTransport {
- /// type Topic = MyTopic;
- /// fn new_name(&self) -> <Self::Topic as Topic>::SubscriptionName { 0 }
- /// async fn stream(/* ... */) -> Result<(), Error> { Ok(()) }
- /// async fn poll(/* ... */) -> Result<(), Error> { Ok(()) }
- /// }
- /// ```
- #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
- #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
- pub trait Transport: Send + Sync {
- /// Spec
- type Spec: Spec;
- /// Create a new subscription name
- fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
- /// Opens a persistent connection and continuously streams events.
- /// For protocols that support server push (e.g. WebSocket, SSE).
- async fn stream(
- &self,
- subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
- topics: Vec<SubscribeMessage<Self::Spec>>,
- reply_to: InternalRelay<Self::Spec>,
- ) -> Result<(), Error>;
- /// Performs a one-shot fetch of any currently available events.
- /// Called repeatedly by the consumer when streaming is not available.
- async fn poll(
- &self,
- topics: Vec<SubscribeMessage<Self::Spec>>,
- reply_to: InternalRelay<Self::Spec>,
- ) -> Result<(), Error>;
- }
- #[cfg(test)]
- mod tests {
- use std::sync::atomic::{AtomicUsize, Ordering};
- use std::sync::Arc;
- use tokio::sync::{mpsc, Mutex};
- use tokio::time::{timeout, Duration};
- use super::{
- InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
- INTERNAL_POLL_SIZE,
- };
- use crate::pub_sub::remote_consumer::Consumer;
- use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
- use crate::pub_sub::{Error, Spec, SubscriptionRequest};
- // ===== Test Event/Topic types =====
- #[derive(Clone, Debug)]
- enum SubscriptionReq {
- Foo(String, u64),
- Bar(String, u64),
- }
- impl SubscriptionRequest for SubscriptionReq {
- type Topic = IndexTest;
- type SubscriptionId = String;
- fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
- Ok(vec![match self {
- SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
- SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
- }])
- }
- fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
- Arc::new(match self {
- SubscriptionReq::Foo(n, _) => n.to_string(),
- SubscriptionReq::Bar(n, _) => n.to_string(),
- })
- }
- }
- // ===== A controllable in-memory Transport used by tests =====
- /// TestTransport relays messages from a broadcast channel to the Consumer via `InternalRelay`.
- /// It also forwards Subscribe/Unsubscribe/Stop signals to an observer channel so tests can assert them.
- struct TestTransport {
- name_ctr: AtomicUsize,
- // We forward all transport-loop control messages here so tests can observe them.
- observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
- // Whether stream / poll are supported.
- support_long: bool,
- support_poll: bool,
- rx: Mutex<mpsc::Receiver<Message>>,
- }
- impl TestTransport {
- fn new(
- support_long: bool,
- support_poll: bool,
- ) -> (
- Self,
- mpsc::Sender<Message>,
- mpsc::Receiver<StreamCtrl<CustomPubSub>>,
- ) {
- let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
- let (observe_ctrl_tx, observe_ctrl_rx) =
- mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
- let t = TestTransport {
- name_ctr: AtomicUsize::new(1),
- rx: Mutex::new(rx),
- observe_ctrl_tx,
- support_long,
- support_poll,
- };
- (t, events_tx, observe_ctrl_rx)
- }
- }
- #[async_trait::async_trait]
- impl Transport for TestTransport {
- type Spec = CustomPubSub;
- fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
- format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
- }
- async fn stream(
- &self,
- mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
- topics: Vec<SubscribeMessage<Self::Spec>>,
- reply_to: InternalRelay<Self::Spec>,
- ) -> Result<(), Error> {
- if !self.support_long {
- return Err(Error::NotSupported);
- }
- // Each invocation creates a fresh broadcast receiver
- let mut rx = self.rx.lock().await;
- let observe = self.observe_ctrl_tx.clone();
- for topic in topics {
- observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
- }
- loop {
- tokio::select! {
- // Forward any control (Subscribe/Unsubscribe/Stop) messages so the test can assert them.
- Some(ctrl) = subscribe_changes.recv() => {
- observe.try_send(ctrl.clone()).unwrap();
- if matches!(ctrl, StreamCtrl::Stop) {
- break;
- }
- }
- // Relay external events into the inner pubsub
- Some(msg) = rx.recv() => {
- reply_to.send(msg);
- }
- }
- }
- Ok(())
- }
- async fn poll(
- &self,
- _topics: Vec<SubscribeMessage<Self::Spec>>,
- reply_to: InternalRelay<Self::Spec>,
- ) -> Result<(), Error> {
- if !self.support_poll {
- return Err(Error::NotSupported);
- }
- // On each poll call, drain anything currently pending and return.
- // (The Consumer calls this repeatedly; first call happens immediately.)
- let mut rx = self.rx.lock().await;
- // Non-blocking drain pass: try a few times without sleeping to keep tests snappy
- for _ in 0..32 {
- match rx.try_recv() {
- Ok(msg) => reply_to.send(msg),
- Err(mpsc::error::TryRecvError::Empty) => continue,
- Err(mpsc::error::TryRecvError::Disconnected) => break,
- }
- }
- Ok(())
- }
- }
- // ===== Helpers =====
- async fn recv_next<T: Transport>(
- sub: &mut RemoteActiveConsumer<T>,
- dur_ms: u64,
- ) -> Option<<T::Spec as Spec>::Event> {
- timeout(Duration::from_millis(dur_ms), sub.recv())
- .await
- .ok()
- .flatten()
- }
- async fn expect_ctrl(
- rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
- dur_ms: u64,
- pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
- ) -> StreamCtrl<CustomPubSub> {
- timeout(Duration::from_millis(dur_ms), async {
- loop {
- if let Some(msg) = rx.recv().await {
- if pred(&msg) {
- break msg;
- }
- }
- }
- })
- .await
- .expect("timed out waiting for control message")
- }
- // ===== Tests =====
- #[tokio::test]
- async fn stream_delivery_and_unsubscribe_on_drop() {
- // stream supported, poll supported (doesn't matter; prefer long)
- let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
- // prefer_polling = false so connection loop will try stream first.
- let consumer = Consumer::new(transport, false, ());
- // Subscribe to Foo(7)
- let mut sub = consumer
- .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
- .expect("subscribe ok");
- // We should see a Subscribe(name, topic) forwarded to transport
- let ctrl = expect_ctrl(
- &mut ctrl_rx,
- 1000,
- |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
- )
- .await;
- match ctrl {
- StreamCtrl::Subscribe((name, idx)) => {
- assert_ne!(name, "t".to_owned());
- assert_eq!(idx, IndexTest::Foo(7));
- }
- _ => unreachable!(),
- }
- // Send an event that matches Foo(7)
- events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
- let got = recv_next::<TestTransport>(&mut sub, 1000)
- .await
- .expect("got event");
- assert_eq!(got, Message { foo: 7, bar: 1 });
- // Dropping the RemoteActiveConsumer should trigger an Unsubscribe(name)
- drop(sub);
- let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
- matches!(m, StreamCtrl::Unsubscribe(_))
- })
- .await;
- // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
- drop(consumer);
- let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
- }
- #[tokio::test]
- async fn test_cache_and_invalation() {
- // stream supported, poll supported (doesn't matter; prefer long)
- let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
- // prefer_polling = false so connection loop will try stream first.
- let consumer = Consumer::new(transport, false, ());
- // Subscribe to Foo(7)
- let mut sub_1 = consumer
- .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
- .expect("subscribe ok");
- // We should see a Subscribe(name, topic) forwarded to transport
- let ctrl = expect_ctrl(
- &mut ctrl_rx,
- 1000,
- |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
- )
- .await;
- match ctrl {
- StreamCtrl::Subscribe((name, idx)) => {
- assert_ne!(name, "t1".to_owned());
- assert_eq!(idx, IndexTest::Foo(7));
- }
- _ => unreachable!(),
- }
- // Send an event that matches Foo(7)
- events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
- let got = recv_next::<TestTransport>(&mut sub_1, 1000)
- .await
- .expect("got event");
- assert_eq!(got, Message { foo: 7, bar: 1 });
- // Subscribe to Foo(7), should receive the latest message and future messages
- let mut sub_2 = consumer
- .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
- .expect("subscribe ok");
- let got = recv_next::<TestTransport>(&mut sub_2, 1000)
- .await
- .expect("got event");
- assert_eq!(got, Message { foo: 7, bar: 1 });
- // Dropping the RemoteActiveConsumer but not unsubscribe, since sub_2 is still active
- drop(sub_1);
- // Subscribe to Foo(7), should receive the latest message and future messages
- let mut sub_3 = consumer
- .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
- .expect("subscribe ok");
- // receive cache message
- let got = recv_next::<TestTransport>(&mut sub_3, 1000)
- .await
- .expect("got event");
- assert_eq!(got, Message { foo: 7, bar: 1 });
- // Send an event that matches Foo(7)
- events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
- // receive new message
- let got = recv_next::<TestTransport>(&mut sub_2, 1000)
- .await
- .expect("got event");
- assert_eq!(got, Message { foo: 7, bar: 2 });
- let got = recv_next::<TestTransport>(&mut sub_3, 1000)
- .await
- .expect("got event");
- assert_eq!(got, Message { foo: 7, bar: 2 });
- drop(sub_2);
- drop(sub_3);
- let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
- matches!(m, StreamCtrl::Unsubscribe(_))
- })
- .await;
- // The cache should be dropped, so no new messages
- let mut sub_4 = consumer
- .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
- .expect("subscribe ok");
- assert!(
- recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
- "Should have not receive any update"
- );
- drop(sub_4);
- // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
- let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
- }
- #[tokio::test]
- async fn falls_back_to_poll_when_stream_not_supported() {
- // stream NOT supported, poll supported
- let (transport, events_tx, _) = TestTransport::new(false, true);
- // prefer_polling = true nudges the connection loop to poll first, but even if it
- // tried stream, our transport returns NotSupported and the loop will use poll.
- let consumer = Consumer::new(transport, true, ());
- // Subscribe to Bar(5)
- let mut sub = consumer
- .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
- .expect("subscribe ok");
- // Inject an event; the poll path should relay it on the first poll iteration
- events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
- let got = recv_next::<TestTransport>(&mut sub, 1500)
- .await
- .expect("event relayed via polling");
- assert_eq!(got, Message { foo: 9, bar: 5 });
- }
- #[tokio::test]
- async fn multiple_subscribers_share_single_remote_subscription() {
- // This validates the "coalescing" behavior in Consumer::subscribe where multiple local
- // subscribers to the same Topic should only create one remote subscription.
- let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
- let consumer = Consumer::new(transport, false, ());
- // Two local subscriptions to the SAME topic/name pair (different names)
- let mut a = consumer
- .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
- .expect("subscribe A");
- let _ = expect_ctrl(
- &mut ctrl_rx,
- 1000,
- |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(1)),
- )
- .await;
- let mut b = consumer
- .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
- .expect("subscribe B");
- // No second Subscribe should be forwarded for the same topic (coalesced).
- // Give a little time; if one appears, we'll fail explicitly.
- if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
- timeout(Duration::from_millis(400), ctrl_rx.recv()).await
- {
- assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
- }
- // Send one event and ensure BOTH local subscribers receive it.
- events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
- let got_a = recv_next::<TestTransport>(&mut a, 1000)
- .await
- .expect("A got");
- let got_b = recv_next::<TestTransport>(&mut b, 1000)
- .await
- .expect("B got");
- assert_eq!(got_a, Message { foo: 1, bar: 42 });
- assert_eq!(got_b, Message { foo: 1, bar: 42 });
- // Drop B: no Unsubscribe should be sent yet (still one local subscriber).
- drop(b);
- if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
- timeout(Duration::from_millis(400), ctrl_rx.recv()).await
- {
- panic!("Should NOT unsubscribe while another local subscriber exists");
- }
- // Drop A: now remote unsubscribe should occur.
- drop(a);
- let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
- matches!(m, StreamCtrl::Unsubscribe(_))
- })
- .await;
- let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
- }
- }
|