mod.rs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. //! Publish/Subscribe core
  2. //!
  3. //! This module defines the transport-agnostic pub/sub primitives used by both
  4. //! mint and wallet components. The design prioritizes:
  5. //!
  6. //! - **Request coalescing**: multiple local subscribers to the same remote topic
  7. //! result in a single upstream subscription, with local fan‑out.
  8. //! - **Latest-on-subscribe** (NUT-17): on (re)subscription, the most recent event
  9. //! is fetched and delivered before streaming new ones.
  10. //! - **Backpressure-aware delivery**: bounded channels + drop policies prevent
  11. //! a slow consumer from stalling the whole pipeline.
  12. //! - **Resilience**: automatic reconnect with exponential backoff; WebSocket
  13. //! streaming when available, HTTP long-poll fallback otherwise.
  14. //!
  15. //! Terms used throughout the module:
  16. //! - **Event**: a domain object that maps to one or more `Topic`s via `Event::get_topics`.
  17. //! - **Topic**: an index/type that defines storage and matching semantics.
  18. //! - **SubscriptionRequest**: a domain-specific filter that can be converted into
  19. //! low-level transport messages (e.g., WebSocket subscribe frames).
  20. //! - **Spec**: type bundle tying `Event`, `Topic`, `SubscriptionId`, and serialization.
  21. mod error;
  22. mod pubsub;
  23. pub mod remote_consumer;
  24. mod subscriber;
  25. mod types;
  26. pub use self::error::Error;
  27. pub use self::pubsub::Pubsub;
  28. pub use self::subscriber::{Subscriber, SubscriptionRequest};
  29. pub use self::types::*;
  30. #[cfg(test)]
  31. mod test {
  32. use std::collections::HashMap;
  33. use std::sync::{Arc, RwLock};
  34. use serde::{Deserialize, Serialize};
  35. use super::subscriber::SubscriptionRequest;
  36. use super::{Error, Event, Pubsub, Spec, Subscriber};
  37. #[derive(Clone, Debug, Serialize, Eq, PartialEq, Deserialize)]
  38. pub struct Message {
  39. pub foo: u64,
  40. pub bar: u64,
  41. }
  42. #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
  43. pub enum IndexTest {
  44. Foo(u64),
  45. Bar(u64),
  46. }
  47. impl Event for Message {
  48. type Topic = IndexTest;
  49. fn get_topics(&self) -> Vec<Self::Topic> {
  50. vec![IndexTest::Foo(self.foo), IndexTest::Bar(self.bar)]
  51. }
  52. }
  53. pub struct CustomPubSub {
  54. pub storage: Arc<RwLock<HashMap<IndexTest, Message>>>,
  55. }
  56. #[async_trait::async_trait]
  57. impl Spec for CustomPubSub {
  58. type Topic = IndexTest;
  59. type Event = Message;
  60. type SubscriptionId = String;
  61. type Context = ();
  62. fn new_instance(_context: Self::Context) -> Arc<Self>
  63. where
  64. Self: Sized,
  65. {
  66. Arc::new(Self {
  67. storage: Default::default(),
  68. })
  69. }
  70. async fn fetch_events(
  71. self: &Arc<Self>,
  72. topics: Vec<<Self::Event as Event>::Topic>,
  73. reply_to: Subscriber<Self>,
  74. ) where
  75. Self: Sized,
  76. {
  77. let storage = self.storage.read().unwrap();
  78. for index in topics {
  79. if let Some(value) = storage.get(&index) {
  80. let _ = reply_to.send(value.clone());
  81. }
  82. }
  83. }
  84. }
  85. #[derive(Debug, Clone)]
  86. pub enum SubscriptionReq {
  87. Foo(u64),
  88. Bar(u64),
  89. }
  90. impl SubscriptionRequest for SubscriptionReq {
  91. type Topic = IndexTest;
  92. type SubscriptionId = String;
  93. fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
  94. Ok(vec![match self {
  95. SubscriptionReq::Bar(n) => IndexTest::Bar(*n),
  96. SubscriptionReq::Foo(n) => IndexTest::Foo(*n),
  97. }])
  98. }
  99. fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
  100. Arc::new("test".to_owned())
  101. }
  102. }
  103. #[tokio::test]
  104. async fn delivery_twice_realtime() {
  105. let pubsub = Pubsub::new(CustomPubSub::new_instance(()));
  106. assert_eq!(pubsub.active_subscribers(), 0);
  107. let mut subscriber = pubsub.subscribe(SubscriptionReq::Foo(2)).unwrap();
  108. assert_eq!(pubsub.active_subscribers(), 1);
  109. let _ = pubsub.publish_now(Message { foo: 2, bar: 1 });
  110. let _ = pubsub.publish_now(Message { foo: 2, bar: 2 });
  111. assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(1));
  112. assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(2));
  113. assert!(subscriber.try_recv().is_none());
  114. drop(subscriber);
  115. assert_eq!(pubsub.active_subscribers(), 0);
  116. }
  117. #[tokio::test]
  118. async fn read_from_storage() {
  119. let x = CustomPubSub::new_instance(());
  120. let storage = x.storage.clone();
  121. let pubsub = Pubsub::new(x);
  122. {
  123. // set previous value
  124. let mut s = storage.write().unwrap();
  125. s.insert(IndexTest::Bar(2), Message { foo: 3, bar: 2 });
  126. }
  127. let mut subscriber = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
  128. // Just should receive the latest
  129. assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(3));
  130. // realtime delivery test
  131. let _ = pubsub.publish_now(Message { foo: 1, bar: 2 });
  132. assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(1));
  133. {
  134. // set previous value
  135. let mut s = storage.write().unwrap();
  136. s.insert(IndexTest::Bar(2), Message { foo: 1, bar: 2 });
  137. }
  138. // new subscription should only get the latest state (it is up to the Topic trait)
  139. let mut y = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
  140. assert_eq!(y.recv().await.map(|x| x.foo), Some(1));
  141. }
  142. }