//! Publish/Subscribe core //! //! This module defines the transport-agnostic pub/sub primitives used by both //! mint and wallet components. The design prioritizes: //! //! - **Request coalescing**: multiple local subscribers to the same remote topic //! result in a single upstream subscription, with local fan‑out. //! - **Latest-on-subscribe** (NUT-17): on (re)subscription, the most recent event //! is fetched and delivered before streaming new ones. //! - **Backpressure-aware delivery**: bounded channels + drop policies prevent //! a slow consumer from stalling the whole pipeline. //! - **Resilience**: automatic reconnect with exponential backoff; WebSocket //! streaming when available, HTTP long-poll fallback otherwise. //! //! Terms used throughout the module: //! - **Event**: a domain object that maps to one or more `Topic`s via `Event::get_topics`. //! - **Topic**: an index/type that defines storage and matching semantics. //! - **SubscriptionRequest**: a domain-specific filter that can be converted into //! low-level transport messages (e.g., WebSocket subscribe frames). //! - **Spec**: type bundle tying `Event`, `Topic`, `SubscriptionId`, and serialization. mod error; mod pubsub; pub mod remote_consumer; mod subscriber; mod types; pub use self::error::Error; pub use self::pubsub::Pubsub; pub use self::subscriber::{Subscriber, SubscriptionRequest}; pub use self::types::*; #[cfg(test)] mod test { use std::collections::HashMap; use std::sync::{Arc, RwLock}; use serde::{Deserialize, Serialize}; use super::subscriber::SubscriptionRequest; use super::{Error, Event, Pubsub, Spec, Subscriber}; #[derive(Clone, Debug, Serialize, Eq, PartialEq, Deserialize)] pub struct Message { pub foo: u64, pub bar: u64, } #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)] pub enum IndexTest { Foo(u64), Bar(u64), } impl Event for Message { type Topic = IndexTest; fn get_topics(&self) -> Vec { vec![IndexTest::Foo(self.foo), IndexTest::Bar(self.bar)] } } pub struct CustomPubSub { pub storage: Arc>>, } #[async_trait::async_trait] impl Spec for CustomPubSub { type Topic = IndexTest; type Event = Message; type SubscriptionId = String; type Context = (); fn new_instance(_context: Self::Context) -> Arc where Self: Sized, { Arc::new(Self { storage: Default::default(), }) } async fn fetch_events( self: &Arc, topics: Vec<::Topic>, reply_to: Subscriber, ) where Self: Sized, { let storage = self.storage.read().unwrap(); for index in topics { if let Some(value) = storage.get(&index) { let _ = reply_to.send(value.clone()); } } } } #[derive(Debug, Clone)] pub enum SubscriptionReq { Foo(u64), Bar(u64), } impl SubscriptionRequest for SubscriptionReq { type Topic = IndexTest; type SubscriptionId = String; fn try_get_topics(&self) -> Result, Error> { Ok(vec![match self { SubscriptionReq::Bar(n) => IndexTest::Bar(*n), SubscriptionReq::Foo(n) => IndexTest::Foo(*n), }]) } fn subscription_name(&self) -> Arc { Arc::new("test".to_owned()) } } #[tokio::test] async fn delivery_twice_realtime() { let pubsub = Pubsub::new(CustomPubSub::new_instance(())); assert_eq!(pubsub.active_subscribers(), 0); let mut subscriber = pubsub.subscribe(SubscriptionReq::Foo(2)).unwrap(); assert_eq!(pubsub.active_subscribers(), 1); let _ = pubsub.publish_now(Message { foo: 2, bar: 1 }); let _ = pubsub.publish_now(Message { foo: 2, bar: 2 }); assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(1)); assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(2)); assert!(subscriber.try_recv().is_none()); drop(subscriber); assert_eq!(pubsub.active_subscribers(), 0); } #[tokio::test] async fn read_from_storage() { let x = CustomPubSub::new_instance(()); let storage = x.storage.clone(); let pubsub = Pubsub::new(x); { // set previous value let mut s = storage.write().unwrap(); s.insert(IndexTest::Bar(2), Message { foo: 3, bar: 2 }); } let mut subscriber = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap(); // Just should receive the latest assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(3)); // realtime delivery test let _ = pubsub.publish_now(Message { foo: 1, bar: 2 }); assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(1)); { // set previous value let mut s = storage.write().unwrap(); s.insert(IndexTest::Bar(2), Message { foo: 1, bar: 2 }); } // new subscription should only get the latest state (it is up to the Topic trait) let mut y = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap(); assert_eq!(y.recv().await.map(|x| x.foo), Some(1)); } }