remote_consumer.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. //! Pub-sub consumer
  2. //!
  3. //! Consumers are designed to connect to a producer, through a transport, and subscribe to events.
  4. use std::collections::{HashMap, VecDeque};
  5. use std::sync::atomic::AtomicBool;
  6. use std::sync::Arc;
  7. use std::time::Duration;
  8. use parking_lot::RwLock;
  9. use tokio::sync::mpsc;
  10. use tokio::time::{sleep, Instant};
  11. use super::subscriber::{ActiveSubscription, SubscriptionRequest};
  12. use super::{Error, Event, Pubsub, Spec};
  13. use crate::task::spawn;
  14. const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
  15. const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
  16. const INTERNAL_POLL_SIZE: usize = 1_000;
  17. const POLL_SLEEP: Duration = Duration::from_millis(2_000);
  18. struct UniqueSubscription<S>
  19. where
  20. S: Spec,
  21. {
  22. name: S::SubscriptionId,
  23. total_subscribers: usize,
  24. }
  25. type UniqueSubscriptions<S> = RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>;
  26. type ActiveSubscriptions<S> =
  27. RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
  28. type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
  29. /// Subscription consumer
  30. pub struct Consumer<T>
  31. where
  32. T: Transport + 'static,
  33. {
  34. transport: T,
  35. inner_pubsub: Arc<Pubsub<T::Spec>>,
  36. remote_subscriptions: UniqueSubscriptions<T::Spec>,
  37. subscriptions: ActiveSubscriptions<T::Spec>,
  38. stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
  39. still_running: AtomicBool,
  40. prefer_polling: bool,
  41. /// Cached events
  42. ///
  43. /// The cached events are useful to share events. The cache is automatically evicted it is
  44. /// disconnected from the remote source, meaning the cache is only active while there is an
  45. /// active subscription to the remote source, and it remembers the latest event.
  46. cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
  47. }
  48. /// Remote consumer
  49. pub struct RemoteActiveConsumer<T>
  50. where
  51. T: Transport + 'static,
  52. {
  53. inner: ActiveSubscription<T::Spec>,
  54. previous_messages: VecDeque<<T::Spec as Spec>::Event>,
  55. consumer: Arc<Consumer<T>>,
  56. }
  57. impl<T> RemoteActiveConsumer<T>
  58. where
  59. T: Transport + 'static,
  60. {
  61. /// Receives the next event
  62. pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
  63. if let Some(event) = self.previous_messages.pop_front() {
  64. Some(event)
  65. } else {
  66. self.inner.recv().await
  67. }
  68. }
  69. /// Try receive an event or return Noen right away
  70. pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
  71. if let Some(event) = self.previous_messages.pop_front() {
  72. Some(event)
  73. } else {
  74. self.inner.try_recv()
  75. }
  76. }
  77. /// Get the subscription name
  78. pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
  79. self.inner.name()
  80. }
  81. }
  82. impl<T> Drop for RemoteActiveConsumer<T>
  83. where
  84. T: Transport + 'static,
  85. {
  86. fn drop(&mut self) {
  87. let _ = self.consumer.unsubscribe(self.name().clone());
  88. }
  89. }
  90. /// Struct to relay events from Poll and Streams from the external subscription to the local
  91. /// subscribers
  92. pub struct InternalRelay<S>
  93. where
  94. S: Spec + 'static,
  95. {
  96. inner: Arc<Pubsub<S>>,
  97. cached_events: Arc<RwLock<CacheEvent<S>>>,
  98. }
  99. impl<S> InternalRelay<S>
  100. where
  101. S: Spec + 'static,
  102. {
  103. /// Relay a remote event locally
  104. pub fn send<X>(&self, event: X)
  105. where
  106. X: Into<S::Event>,
  107. {
  108. let event = event.into();
  109. let mut cached_events = self.cached_events.write();
  110. for topic in event.get_topics() {
  111. cached_events.insert(topic, event.clone());
  112. }
  113. self.inner.publish(event);
  114. }
  115. }
  116. impl<T> Consumer<T>
  117. where
  118. T: Transport + 'static,
  119. {
  120. /// Creates a new instance
  121. pub fn new(
  122. transport: T,
  123. prefer_polling: bool,
  124. context: <T::Spec as Spec>::Context,
  125. ) -> Arc<Self> {
  126. let this = Arc::new(Self {
  127. transport,
  128. prefer_polling,
  129. inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
  130. subscriptions: Default::default(),
  131. remote_subscriptions: Default::default(),
  132. stream_ctrl: RwLock::new(None),
  133. cached_events: Default::default(),
  134. still_running: true.into(),
  135. });
  136. spawn(Self::stream(this.clone()));
  137. this
  138. }
  139. async fn stream(instance: Arc<Self>) {
  140. let mut stream_supported = true;
  141. let mut poll_supported = true;
  142. let mut backoff = STREAM_CONNECTION_BACKOFF;
  143. let mut retry_at = None;
  144. loop {
  145. if (!stream_supported && !poll_supported)
  146. || !instance
  147. .still_running
  148. .load(std::sync::atomic::Ordering::Relaxed)
  149. {
  150. break;
  151. }
  152. if instance.remote_subscriptions.read().is_empty() {
  153. sleep(Duration::from_millis(100)).await;
  154. continue;
  155. }
  156. if stream_supported
  157. && !instance.prefer_polling
  158. && retry_at
  159. .map(|retry_at| retry_at < Instant::now())
  160. .unwrap_or(true)
  161. {
  162. let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
  163. {
  164. *instance.stream_ctrl.write() = Some(sender);
  165. }
  166. let current_subscriptions = {
  167. instance
  168. .remote_subscriptions
  169. .read()
  170. .iter()
  171. .map(|(key, name)| (name.name.clone(), key.clone()))
  172. .collect::<Vec<_>>()
  173. };
  174. if let Err(err) = instance
  175. .transport
  176. .stream(
  177. receiver,
  178. current_subscriptions,
  179. InternalRelay {
  180. inner: instance.inner_pubsub.clone(),
  181. cached_events: instance.cached_events.clone(),
  182. },
  183. )
  184. .await
  185. {
  186. retry_at = Some(Instant::now() + backoff);
  187. backoff =
  188. (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
  189. if matches!(err, Error::NotSupported) {
  190. stream_supported = false;
  191. }
  192. tracing::error!("Long connection failed with error {:?}", err);
  193. } else {
  194. backoff = STREAM_CONNECTION_BACKOFF;
  195. }
  196. // remove sender to stream, as there is no stream
  197. let _ = instance.stream_ctrl.write().take();
  198. }
  199. if poll_supported {
  200. let current_subscriptions = {
  201. instance
  202. .remote_subscriptions
  203. .read()
  204. .iter()
  205. .map(|(key, name)| (name.name.clone(), key.clone()))
  206. .collect::<Vec<_>>()
  207. };
  208. if let Err(err) = instance
  209. .transport
  210. .poll(
  211. current_subscriptions,
  212. InternalRelay {
  213. inner: instance.inner_pubsub.clone(),
  214. cached_events: instance.cached_events.clone(),
  215. },
  216. )
  217. .await
  218. {
  219. if matches!(err, Error::NotSupported) {
  220. poll_supported = false;
  221. }
  222. tracing::error!("Polling failed with error {:?}", err);
  223. }
  224. sleep(POLL_SLEEP).await;
  225. }
  226. }
  227. }
  228. /// Unsubscribe from a topic, this is called automatically when RemoteActiveSubscription<T> goes
  229. /// out of scope
  230. fn unsubscribe(
  231. self: &Arc<Self>,
  232. subscription_name: <T::Spec as Spec>::SubscriptionId,
  233. ) -> Result<(), Error> {
  234. let topics = self
  235. .subscriptions
  236. .write()
  237. .remove(&subscription_name)
  238. .ok_or(Error::NoSubscription)?;
  239. let mut remote_subscriptions = self.remote_subscriptions.write();
  240. for topic in topics {
  241. let mut remote_subscription =
  242. if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
  243. remote_subscription
  244. } else {
  245. continue;
  246. };
  247. remote_subscription.total_subscribers = remote_subscription
  248. .total_subscribers
  249. .checked_sub(1)
  250. .unwrap_or_default();
  251. if remote_subscription.total_subscribers == 0 {
  252. let mut cached_events = self.cached_events.write();
  253. cached_events.remove(&topic);
  254. self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
  255. } else {
  256. remote_subscriptions.insert(topic, remote_subscription);
  257. }
  258. }
  259. if remote_subscriptions.is_empty() {
  260. self.message_to_stream(StreamCtrl::Stop)?;
  261. }
  262. Ok(())
  263. }
  264. #[inline(always)]
  265. fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
  266. let to_stream = self.stream_ctrl.read();
  267. if let Some(to_stream) = to_stream.as_ref() {
  268. Ok(to_stream.try_send(message)?)
  269. } else {
  270. Ok(())
  271. }
  272. }
  273. /// Creates a subscription
  274. ///
  275. /// The subscriptions have two parts:
  276. ///
  277. /// 1. Will create the subscription to the remote Pubsub service, Any events will be moved to
  278. /// the internal pubsub
  279. ///
  280. /// 2. The internal subscription to the inner Pubsub. Because all subscriptions are going the
  281. /// transport, once events matches subscriptions, the inner_pubsub will receive the message and
  282. /// broadcasat the event.
  283. pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
  284. where
  285. I: SubscriptionRequest<
  286. Topic = <T::Spec as Spec>::Topic,
  287. SubscriptionId = <T::Spec as Spec>::SubscriptionId,
  288. >,
  289. {
  290. let subscription_name = request.subscription_name();
  291. let topics = request.try_get_topics()?;
  292. let mut remote_subscriptions = self.remote_subscriptions.write();
  293. let mut subscriptions = self.subscriptions.write();
  294. if subscriptions.get(&subscription_name).is_some() {
  295. return Err(Error::NoSubscription);
  296. }
  297. let mut previous_messages = Vec::new();
  298. let cached_events = self.cached_events.read();
  299. for topic in topics.iter() {
  300. if let Some(subscription) = remote_subscriptions.get_mut(topic) {
  301. subscription.total_subscribers += 1;
  302. if let Some(v) = cached_events.get(topic).cloned() {
  303. previous_messages.push(v);
  304. }
  305. } else {
  306. let internal_sub_name = self.transport.new_name();
  307. remote_subscriptions.insert(
  308. topic.clone(),
  309. UniqueSubscription {
  310. total_subscribers: 1,
  311. name: internal_sub_name.clone(),
  312. },
  313. );
  314. // new subscription is created, so the connection worker should be notified
  315. self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
  316. }
  317. }
  318. subscriptions.insert(subscription_name, topics);
  319. drop(subscriptions);
  320. Ok(RemoteActiveConsumer {
  321. inner: self.inner_pubsub.subscribe(request)?,
  322. previous_messages: previous_messages.into(),
  323. consumer: self.clone(),
  324. })
  325. }
  326. }
  327. impl<T> Drop for Consumer<T>
  328. where
  329. T: Transport + 'static,
  330. {
  331. fn drop(&mut self) {
  332. self.still_running
  333. .store(false, std::sync::atomic::Ordering::Release);
  334. if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
  335. let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
  336. tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
  337. });
  338. }
  339. }
  340. }
  341. /// Subscribe Message
  342. pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
  343. /// Messages sent from the [`Consumer`] to the [`Transport`] background loop.
  344. pub enum StreamCtrl<S>
  345. where
  346. S: Spec + 'static,
  347. {
  348. /// Add a subscription
  349. Subscribe(SubscribeMessage<S>),
  350. /// Desuscribe
  351. Unsubscribe(S::SubscriptionId),
  352. /// Exit the loop
  353. Stop,
  354. }
  355. impl<S> Clone for StreamCtrl<S>
  356. where
  357. S: Spec + 'static,
  358. {
  359. fn clone(&self) -> Self {
  360. match self {
  361. Self::Subscribe(s) => Self::Subscribe(s.clone()),
  362. Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
  363. Self::Stop => Self::Stop,
  364. }
  365. }
  366. }
  367. /// Transport abstracts how the consumer talks to the remote pubsub.
  368. ///
  369. /// Implement this on your HTTP/WebSocket client. The transport is responsible for:
  370. /// - creating unique subscription names,
  371. /// - keeping a long connection via `stream` **or** performing on-demand `poll`,
  372. /// - forwarding remote events to `InternalRelay`.
  373. ///
  374. /// ```ignore
  375. /// struct WsTransport { /* ... */ }
  376. /// #[async_trait::async_trait]
  377. /// impl Transport for WsTransport {
  378. /// type Topic = MyTopic;
  379. /// fn new_name(&self) -> <Self::Topic as Topic>::SubscriptionName { 0 }
  380. /// async fn stream(/* ... */) -> Result<(), Error> { Ok(()) }
  381. /// async fn poll(/* ... */) -> Result<(), Error> { Ok(()) }
  382. /// }
  383. /// ```
  384. #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
  385. #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
  386. pub trait Transport: Send + Sync {
  387. /// Spec
  388. type Spec: Spec;
  389. /// Create a new subscription name
  390. fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
  391. /// Opens a persistent connection and continuously streams events.
  392. /// For protocols that support server push (e.g. WebSocket, SSE).
  393. async fn stream(
  394. &self,
  395. subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
  396. topics: Vec<SubscribeMessage<Self::Spec>>,
  397. reply_to: InternalRelay<Self::Spec>,
  398. ) -> Result<(), Error>;
  399. /// Performs a one-shot fetch of any currently available events.
  400. /// Called repeatedly by the consumer when streaming is not available.
  401. async fn poll(
  402. &self,
  403. topics: Vec<SubscribeMessage<Self::Spec>>,
  404. reply_to: InternalRelay<Self::Spec>,
  405. ) -> Result<(), Error>;
  406. }
  407. #[cfg(test)]
  408. mod tests {
  409. use std::sync::atomic::{AtomicUsize, Ordering};
  410. use std::sync::Arc;
  411. use tokio::sync::{mpsc, Mutex};
  412. use tokio::time::{timeout, Duration};
  413. use super::{
  414. InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
  415. INTERNAL_POLL_SIZE,
  416. };
  417. use crate::pub_sub::remote_consumer::Consumer;
  418. use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
  419. use crate::pub_sub::{Error, Spec, SubscriptionRequest};
  420. // ===== Test Event/Topic types =====
  421. #[derive(Clone, Debug)]
  422. enum SubscriptionReq {
  423. Foo(String, u64),
  424. Bar(String, u64),
  425. }
  426. impl SubscriptionRequest for SubscriptionReq {
  427. type Topic = IndexTest;
  428. type SubscriptionId = String;
  429. fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
  430. Ok(vec![match self {
  431. SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
  432. SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
  433. }])
  434. }
  435. fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
  436. Arc::new(match self {
  437. SubscriptionReq::Foo(n, _) => n.to_string(),
  438. SubscriptionReq::Bar(n, _) => n.to_string(),
  439. })
  440. }
  441. }
  442. // ===== A controllable in-memory Transport used by tests =====
  443. /// TestTransport relays messages from a broadcast channel to the Consumer via `InternalRelay`.
  444. /// It also forwards Subscribe/Unsubscribe/Stop signals to an observer channel so tests can assert them.
  445. struct TestTransport {
  446. name_ctr: AtomicUsize,
  447. // We forward all transport-loop control messages here so tests can observe them.
  448. observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
  449. // Whether stream / poll are supported.
  450. support_long: bool,
  451. support_poll: bool,
  452. rx: Mutex<mpsc::Receiver<Message>>,
  453. }
  454. impl TestTransport {
  455. fn new(
  456. support_long: bool,
  457. support_poll: bool,
  458. ) -> (
  459. Self,
  460. mpsc::Sender<Message>,
  461. mpsc::Receiver<StreamCtrl<CustomPubSub>>,
  462. ) {
  463. let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
  464. let (observe_ctrl_tx, observe_ctrl_rx) =
  465. mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
  466. let t = TestTransport {
  467. name_ctr: AtomicUsize::new(1),
  468. rx: Mutex::new(rx),
  469. observe_ctrl_tx,
  470. support_long,
  471. support_poll,
  472. };
  473. (t, events_tx, observe_ctrl_rx)
  474. }
  475. }
  476. #[async_trait::async_trait]
  477. impl Transport for TestTransport {
  478. type Spec = CustomPubSub;
  479. fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
  480. format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
  481. }
  482. async fn stream(
  483. &self,
  484. mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
  485. topics: Vec<SubscribeMessage<Self::Spec>>,
  486. reply_to: InternalRelay<Self::Spec>,
  487. ) -> Result<(), Error> {
  488. if !self.support_long {
  489. return Err(Error::NotSupported);
  490. }
  491. // Each invocation creates a fresh broadcast receiver
  492. let mut rx = self.rx.lock().await;
  493. let observe = self.observe_ctrl_tx.clone();
  494. for topic in topics {
  495. observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
  496. }
  497. loop {
  498. tokio::select! {
  499. // Forward any control (Subscribe/Unsubscribe/Stop) messages so the test can assert them.
  500. Some(ctrl) = subscribe_changes.recv() => {
  501. observe.try_send(ctrl.clone()).unwrap();
  502. if matches!(ctrl, StreamCtrl::Stop) {
  503. break;
  504. }
  505. }
  506. // Relay external events into the inner pubsub
  507. Some(msg) = rx.recv() => {
  508. reply_to.send(msg);
  509. }
  510. }
  511. }
  512. Ok(())
  513. }
  514. async fn poll(
  515. &self,
  516. _topics: Vec<SubscribeMessage<Self::Spec>>,
  517. reply_to: InternalRelay<Self::Spec>,
  518. ) -> Result<(), Error> {
  519. if !self.support_poll {
  520. return Err(Error::NotSupported);
  521. }
  522. // On each poll call, drain anything currently pending and return.
  523. // (The Consumer calls this repeatedly; first call happens immediately.)
  524. let mut rx = self.rx.lock().await;
  525. // Non-blocking drain pass: try a few times without sleeping to keep tests snappy
  526. for _ in 0..32 {
  527. match rx.try_recv() {
  528. Ok(msg) => reply_to.send(msg),
  529. Err(mpsc::error::TryRecvError::Empty) => continue,
  530. Err(mpsc::error::TryRecvError::Disconnected) => break,
  531. }
  532. }
  533. Ok(())
  534. }
  535. }
  536. // ===== Helpers =====
  537. async fn recv_next<T: Transport>(
  538. sub: &mut RemoteActiveConsumer<T>,
  539. dur_ms: u64,
  540. ) -> Option<<T::Spec as Spec>::Event> {
  541. timeout(Duration::from_millis(dur_ms), sub.recv())
  542. .await
  543. .ok()
  544. .flatten()
  545. }
  546. async fn expect_ctrl(
  547. rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
  548. dur_ms: u64,
  549. pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
  550. ) -> StreamCtrl<CustomPubSub> {
  551. timeout(Duration::from_millis(dur_ms), async {
  552. loop {
  553. if let Some(msg) = rx.recv().await {
  554. if pred(&msg) {
  555. break msg;
  556. }
  557. }
  558. }
  559. })
  560. .await
  561. .expect("timed out waiting for control message")
  562. }
  563. // ===== Tests =====
  564. #[tokio::test]
  565. async fn stream_delivery_and_unsubscribe_on_drop() {
  566. // stream supported, poll supported (doesn't matter; prefer long)
  567. let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
  568. // prefer_polling = false so connection loop will try stream first.
  569. let consumer = Consumer::new(transport, false, ());
  570. // Subscribe to Foo(7)
  571. let mut sub = consumer
  572. .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
  573. .expect("subscribe ok");
  574. // We should see a Subscribe(name, topic) forwarded to transport
  575. let ctrl = expect_ctrl(
  576. &mut ctrl_rx,
  577. 1000,
  578. |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
  579. )
  580. .await;
  581. match ctrl {
  582. StreamCtrl::Subscribe((name, idx)) => {
  583. assert_ne!(name, "t".to_owned());
  584. assert_eq!(idx, IndexTest::Foo(7));
  585. }
  586. _ => unreachable!(),
  587. }
  588. // Send an event that matches Foo(7)
  589. events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
  590. let got = recv_next::<TestTransport>(&mut sub, 1000)
  591. .await
  592. .expect("got event");
  593. assert_eq!(got, Message { foo: 7, bar: 1 });
  594. // Dropping the RemoteActiveConsumer should trigger an Unsubscribe(name)
  595. drop(sub);
  596. let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
  597. matches!(m, StreamCtrl::Unsubscribe(_))
  598. })
  599. .await;
  600. // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
  601. drop(consumer);
  602. let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
  603. }
  604. #[tokio::test]
  605. async fn test_cache_and_invalation() {
  606. // stream supported, poll supported (doesn't matter; prefer long)
  607. let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
  608. // prefer_polling = false so connection loop will try stream first.
  609. let consumer = Consumer::new(transport, false, ());
  610. // Subscribe to Foo(7)
  611. let mut sub_1 = consumer
  612. .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
  613. .expect("subscribe ok");
  614. // We should see a Subscribe(name, topic) forwarded to transport
  615. let ctrl = expect_ctrl(
  616. &mut ctrl_rx,
  617. 1000,
  618. |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
  619. )
  620. .await;
  621. match ctrl {
  622. StreamCtrl::Subscribe((name, idx)) => {
  623. assert_ne!(name, "t1".to_owned());
  624. assert_eq!(idx, IndexTest::Foo(7));
  625. }
  626. _ => unreachable!(),
  627. }
  628. // Send an event that matches Foo(7)
  629. events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
  630. let got = recv_next::<TestTransport>(&mut sub_1, 1000)
  631. .await
  632. .expect("got event");
  633. assert_eq!(got, Message { foo: 7, bar: 1 });
  634. // Subscribe to Foo(7), should receive the latest message and future messages
  635. let mut sub_2 = consumer
  636. .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
  637. .expect("subscribe ok");
  638. let got = recv_next::<TestTransport>(&mut sub_2, 1000)
  639. .await
  640. .expect("got event");
  641. assert_eq!(got, Message { foo: 7, bar: 1 });
  642. // Dropping the RemoteActiveConsumer but not unsubscribe, since sub_2 is still active
  643. drop(sub_1);
  644. // Subscribe to Foo(7), should receive the latest message and future messages
  645. let mut sub_3 = consumer
  646. .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
  647. .expect("subscribe ok");
  648. // receive cache message
  649. let got = recv_next::<TestTransport>(&mut sub_3, 1000)
  650. .await
  651. .expect("got event");
  652. assert_eq!(got, Message { foo: 7, bar: 1 });
  653. // Send an event that matches Foo(7)
  654. events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
  655. // receive new message
  656. let got = recv_next::<TestTransport>(&mut sub_2, 1000)
  657. .await
  658. .expect("got event");
  659. assert_eq!(got, Message { foo: 7, bar: 2 });
  660. let got = recv_next::<TestTransport>(&mut sub_3, 1000)
  661. .await
  662. .expect("got event");
  663. assert_eq!(got, Message { foo: 7, bar: 2 });
  664. drop(sub_2);
  665. drop(sub_3);
  666. let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
  667. matches!(m, StreamCtrl::Unsubscribe(_))
  668. })
  669. .await;
  670. // The cache should be dropped, so no new messages
  671. let mut sub_4 = consumer
  672. .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
  673. .expect("subscribe ok");
  674. assert!(
  675. recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
  676. "Should have not receive any update"
  677. );
  678. drop(sub_4);
  679. // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
  680. let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
  681. }
  682. #[tokio::test]
  683. async fn falls_back_to_poll_when_stream_not_supported() {
  684. // stream NOT supported, poll supported
  685. let (transport, events_tx, _) = TestTransport::new(false, true);
  686. // prefer_polling = true nudges the connection loop to poll first, but even if it
  687. // tried stream, our transport returns NotSupported and the loop will use poll.
  688. let consumer = Consumer::new(transport, true, ());
  689. // Subscribe to Bar(5)
  690. let mut sub = consumer
  691. .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
  692. .expect("subscribe ok");
  693. // Inject an event; the poll path should relay it on the first poll iteration
  694. events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
  695. let got = recv_next::<TestTransport>(&mut sub, 1500)
  696. .await
  697. .expect("event relayed via polling");
  698. assert_eq!(got, Message { foo: 9, bar: 5 });
  699. }
  700. #[tokio::test]
  701. async fn multiple_subscribers_share_single_remote_subscription() {
  702. // This validates the "coalescing" behavior in Consumer::subscribe where multiple local
  703. // subscribers to the same Topic should only create one remote subscription.
  704. let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
  705. let consumer = Consumer::new(transport, false, ());
  706. // Two local subscriptions to the SAME topic/name pair (different names)
  707. let mut a = consumer
  708. .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
  709. .expect("subscribe A");
  710. let _ = expect_ctrl(
  711. &mut ctrl_rx,
  712. 1000,
  713. |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(1)),
  714. )
  715. .await;
  716. let mut b = consumer
  717. .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
  718. .expect("subscribe B");
  719. // No second Subscribe should be forwarded for the same topic (coalesced).
  720. // Give a little time; if one appears, we'll fail explicitly.
  721. if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
  722. timeout(Duration::from_millis(400), ctrl_rx.recv()).await
  723. {
  724. assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
  725. }
  726. // Send one event and ensure BOTH local subscribers receive it.
  727. events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
  728. let got_a = recv_next::<TestTransport>(&mut a, 1000)
  729. .await
  730. .expect("A got");
  731. let got_b = recv_next::<TestTransport>(&mut b, 1000)
  732. .await
  733. .expect("B got");
  734. assert_eq!(got_a, Message { foo: 1, bar: 42 });
  735. assert_eq!(got_b, Message { foo: 1, bar: 42 });
  736. // Drop B: no Unsubscribe should be sent yet (still one local subscriber).
  737. drop(b);
  738. if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
  739. timeout(Duration::from_millis(400), ctrl_rx.recv()).await
  740. {
  741. panic!("Should NOT unsubscribe while another local subscriber exists");
  742. }
  743. // Drop A: now remote unsubscribe should occur.
  744. drop(a);
  745. let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
  746. matches!(m, StreamCtrl::Unsubscribe(_))
  747. })
  748. .await;
  749. let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
  750. }
  751. }