Ver código fonte

Introduce a generic pubsub mod in cdk-common

Remove all previous implementations that were cumbersome and complicated, full
of generics, in favor of a simpler generic pubsub with simpler concepts.

The pubsub has a simple database optional implementation to persist the latest
messages and send them to new subscribers.

The same codebase is shared between the mint and the wallet. The wallet
implements a remote pubsub subscription, which connects to a remote pubsub via
two methods: polling or long connections, both of which are abstracted.
Internally, a local pubsub is used to distribute messages efficiently and
coalesce subscriptions

TODO: Implement the remote subscription into the wallet
Cesar Rodas 1 mês atrás
pai
commit
4ee3a3b9bc

+ 2 - 2
crates/cashu/Cargo.toml

@@ -13,13 +13,13 @@ readme = "README.md"
 [features]
 default = ["mint", "wallet", "auth"]
 swagger = ["dep:utoipa"]
-mint = ["dep:uuid"]
+mint = []
 wallet = []
 auth = ["dep:strum", "dep:strum_macros", "dep:regex"]
 bench = []
 
 [dependencies]
-uuid = { workspace = true, optional = true }
+uuid.workspace = true
 bitcoin.workspace = true
 cbor-diag.workspace = true
 ciborium.workspace = true

+ 0 - 1
crates/cashu/src/lib.rs

@@ -16,7 +16,6 @@ pub use self::mint_url::MintUrl;
 pub use self::nuts::*;
 pub use self::util::SECP256K1;
 
-#[cfg(feature = "mint")]
 pub mod quote_id;
 
 #[doc(hidden)]

+ 2 - 24
crates/cashu/src/nuts/nut17/mod.rs

@@ -2,12 +2,10 @@
 use serde::de::DeserializeOwned;
 use serde::{Deserialize, Serialize};
 
-#[cfg(feature = "mint")]
 use super::PublicKey;
 use crate::nuts::{
     CurrencyUnit, MeltQuoteBolt11Response, MintQuoteBolt11Response, PaymentMethod, ProofState,
 };
-#[cfg(feature = "mint")]
 use crate::quote_id::{QuoteId, QuoteIdError};
 use crate::MintQuoteBolt12Response;
 
@@ -130,28 +128,9 @@ pub enum NotificationPayload<T> {
     MintQuoteBolt12Response(MintQuoteBolt12Response<T>),
 }
 
-impl<T> From<ProofState> for NotificationPayload<T> {
-    fn from(proof_state: ProofState) -> NotificationPayload<T> {
-        NotificationPayload::ProofState(proof_state)
-    }
-}
-
-impl<T> From<MeltQuoteBolt11Response<T>> for NotificationPayload<T> {
-    fn from(melt_quote: MeltQuoteBolt11Response<T>) -> NotificationPayload<T> {
-        NotificationPayload::MeltQuoteBolt11Response(melt_quote)
-    }
-}
-
-impl<T> From<MintQuoteBolt11Response<T>> for NotificationPayload<T> {
-    fn from(mint_quote: MintQuoteBolt11Response<T>) -> NotificationPayload<T> {
-        NotificationPayload::MintQuoteBolt11Response(mint_quote)
-    }
-}
-
-#[cfg(feature = "mint")]
-#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Hash, Serialize)]
 /// A parsed notification
-pub enum Notification {
+pub enum NotificationId {
     /// ProofState id is a Pubkey
     ProofState(PublicKey),
     /// MeltQuote id is an QuoteId
@@ -187,7 +166,6 @@ impl<I> AsRef<I> for Params<I> {
 /// Parsing error
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
-    #[cfg(feature = "mint")]
     #[error("Uuid Error: {0}")]
     /// Uuid Error
     QuoteId(#[from] QuoteIdError),

+ 1 - 1
crates/cdk-axum/src/ws/mod.rs

@@ -3,7 +3,7 @@ use std::collections::HashMap;
 use axum::extract::ws::{CloseFrame, Message, WebSocket};
 use cdk::mint::QuoteId;
 use cdk::nuts::nut17::NotificationPayload;
-use cdk::pub_sub::SubId;
+use cdk::subscription::SubId;
 use cdk::ws::{
     notification_to_ws_message, NotificationInner, WsErrorBody, WsMessageOrResponse,
     WsMethodRequest, WsRequest,

+ 3 - 3
crates/cdk-axum/src/ws/subscribe.rs

@@ -21,16 +21,16 @@ pub(crate) async fn handle(
         .state
         .mint
         .pubsub_manager()
-        .try_subscribe(params)
-        .await
+        .subscribe(params)
         .map_err(|_| WsError::ParseError)?;
 
     let publisher = context.publisher.clone();
+    let sub_id_for_sender = sub_id.clone();
     context.subscriptions.insert(
         sub_id.clone(),
         tokio::spawn(async move {
             while let Some(response) = subscription.recv().await {
-                let _ = publisher.send(response).await;
+                let _ = publisher.try_send((sub_id_for_sender.clone(), response.into()));
             }
         }),
     );

+ 1 - 0
crates/cdk-common/Cargo.toml

@@ -40,6 +40,7 @@ anyhow.workspace = true
 serde_json.workspace = true
 serde_with.workspace = true
 web-time.workspace = true
+tokio.workspace = true
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 uuid = { workspace = true, features = ["js"], optional = true }

+ 15 - 0
crates/cdk-common/src/pub_sub/error.rs

@@ -0,0 +1,15 @@
+#[derive(thiserror::Error, Debug)]
+/// Error
+pub enum Error {
+    /// Poison locked
+    #[error("Poisoned lock")]
+    Poison,
+
+    /// Already subscribed
+    #[error("Already subscribed")]
+    AlreadySubscribed,
+
+    /// Parsing error
+    #[error("Parsing Error {0}")]
+    ParsingError(String),
+}

+ 20 - 152
crates/cdk-common/src/pub_sub/index.rs

@@ -1,161 +1,29 @@
 //! WS Index
 
 use std::fmt::Debug;
-use std::ops::Deref;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::hash::Hash;
 
-use super::SubId;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
 
 /// Indexable trait
-pub trait Indexable {
-    /// The type of the index, it is unknown and it is up to the Manager's
-    /// generic type
-    type Type: PartialOrd + Ord + Send + Sync + Debug;
+pub trait Indexable: Clone {
+    /// Generic Index
+    ///
+    /// It should be serializable/deserializable to be stored in the database layer and it should
+    /// also be sorted in a BTree for in-memory matching
+    type Index: Debug
+        + Clone
+        + Eq
+        + PartialEq
+        + Ord
+        + PartialOrd
+        + Hash
+        + Send
+        + Sync
+        + DeserializeOwned
+        + Serialize;
 
     /// To indexes
-    fn to_indexes(&self) -> Vec<Index<Self::Type>>;
-}
-
-#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone)]
-/// Index
-///
-/// The Index is a sorted structure that is used to quickly find matches
-///
-/// The counter is used to make sure each Index is unique, even if the prefix
-/// are the same, and also to make sure that earlier indexes matches first
-pub struct Index<T>
-where
-    T: PartialOrd + Ord + Send + Sync + Debug,
-{
-    prefix: T,
-    counter: SubscriptionGlobalId,
-    id: super::SubId,
-}
-
-impl<T> From<&Index<T>> for super::SubId
-where
-    T: PartialOrd + Ord + Send + Sync + Debug,
-{
-    fn from(val: &Index<T>) -> Self {
-        val.id.clone()
-    }
-}
-
-impl<T> Deref for Index<T>
-where
-    T: PartialOrd + Ord + Send + Sync + Debug,
-{
-    type Target = T;
-
-    fn deref(&self) -> &Self::Target {
-        &self.prefix
-    }
-}
-
-impl<T> Index<T>
-where
-    T: PartialOrd + Ord + Send + Sync + Debug,
-{
-    /// Compare the
-    pub fn cmp_prefix(&self, other: &Index<T>) -> std::cmp::Ordering {
-        self.prefix.cmp(&other.prefix)
-    }
-
-    /// Returns a globally unique id for the Index
-    pub fn unique_id(&self) -> usize {
-        self.counter.0
-    }
-}
-
-impl<T> From<(T, SubId, SubscriptionGlobalId)> for Index<T>
-where
-    T: PartialOrd + Ord + Send + Sync + Debug,
-{
-    fn from((prefix, id, counter): (T, SubId, SubscriptionGlobalId)) -> Self {
-        Self {
-            prefix,
-            id,
-            counter,
-        }
-    }
-}
-
-impl<T> From<(T, SubId)> for Index<T>
-where
-    T: PartialOrd + Ord + Send + Sync + Debug,
-{
-    fn from((prefix, id): (T, SubId)) -> Self {
-        Self {
-            prefix,
-            id,
-            counter: Default::default(),
-        }
-    }
-}
-
-impl<T> From<T> for Index<T>
-where
-    T: PartialOrd + Ord + Send + Sync + Debug,
-{
-    fn from(prefix: T) -> Self {
-        Self {
-            prefix,
-            id: Default::default(),
-            counter: SubscriptionGlobalId(0),
-        }
-    }
-}
-
-static COUNTER: AtomicUsize = AtomicUsize::new(0);
-
-/// Dummy type
-///
-/// This is only use so each Index is unique, with the same prefix.
-///
-/// The prefix is used to leverage the BTree to find things quickly, but each
-/// entry/key must be unique, so we use this dummy type to make sure each Index
-/// is unique.
-///
-/// Unique is also used to make sure that the indexes are sorted by creation order
-#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone, Copy)]
-pub struct SubscriptionGlobalId(usize);
-
-impl Default for SubscriptionGlobalId {
-    fn default() -> Self {
-        Self(COUNTER.fetch_add(1, Ordering::Relaxed))
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_index_from_tuple() {
-        let sub_id = SubId::from("test_sub_id");
-        let prefix = "test_prefix";
-        let index: Index<&str> = Index::from((prefix, sub_id.clone()));
-        assert_eq!(index.prefix, "test_prefix");
-        assert_eq!(index.id, sub_id);
-    }
-
-    #[test]
-    fn test_index_cmp_prefix() {
-        let sub_id = SubId::from("test_sub_id");
-        let index1: Index<&str> = Index::from(("a", sub_id.clone()));
-        let index2: Index<&str> = Index::from(("b", sub_id.clone()));
-        assert_eq!(index1.cmp_prefix(&index2), std::cmp::Ordering::Less);
-    }
-
-    #[test]
-    fn test_sub_id_from_str() {
-        let sub_id = SubId::from("test_sub_id");
-        assert_eq!(sub_id.0, "test_sub_id");
-    }
-
-    #[test]
-    fn test_sub_id_deref() {
-        let sub_id = SubId::from("test_sub_id");
-        assert_eq!(&*sub_id, "test_sub_id");
-    }
+    fn to_indexes(&self) -> Vec<Self::Index>;
 }

+ 132 - 53
crates/cdk-common/src/pub_sub/mod.rs

@@ -1,4 +1,6 @@
-//! Publish–subscribe pattern.
+//! Publish–subscribe manager.
+//!
+//! This is a event-agnostic Publish-subscriber producer and consumer.
 //!
 //! This is a generic implementation for
 //! [NUT-17](<https://github.com/cashubtc/nuts/blob/main/17.md>) with a type
@@ -8,70 +10,147 @@
 //! generic type that must be converted to a vector of indexes.
 //!
 //! Events are also generic that should implement the `Indexable` trait.
-use std::fmt::Debug;
-use std::ops::Deref;
-use std::str::FromStr;
-
-use serde::{Deserialize, Serialize};
 
+mod error;
 pub mod index;
+mod pubsub;
+pub mod remote_consumer;
+mod subscriber;
 
-/// Default size of the remove channel
-pub const DEFAULT_REMOVE_SIZE: usize = 10_000;
-
-/// Default channel size for subscription buffering
-pub const DEFAULT_CHANNEL_SIZE: usize = 10;
-
-#[async_trait::async_trait]
-/// On New Subscription trait
-///
-/// This trait is optional and it is used to notify the application when a new
-/// subscription is created. This is useful when the application needs to send
-/// the initial state to the subscriber upon subscription
-pub trait OnNewSubscription {
-    /// Index type
-    type Index;
-    /// Subscription event type
-    type Event;
-
-    /// Called when a new subscription is created
-    async fn on_new_subscription(
-        &self,
-        request: &[&Self::Index],
-    ) -> Result<Vec<Self::Event>, String>;
-}
+pub use self::error::Error;
+pub use self::index::Indexable;
+pub use self::pubsub::{Pubsub, Topic};
+pub use self::subscriber::SubscriptionRequest;
+
+#[cfg(test)]
+mod test {
+    use std::collections::HashMap;
+    use std::sync::RwLock;
+
+    use serde::{Deserialize, Serialize};
+    use tokio::sync::mpsc;
 
-/// Subscription Id wrapper
-///
-/// This is the place to add some sane default (like a max length) to the
-/// subscription ID
-#[derive(Debug, Clone, Default, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
-pub struct SubId(String);
+    use super::pubsub::Topic;
+    use super::subscriber::SubscriptionRequest;
+    use super::{Error, Indexable, Pubsub};
 
-impl From<&str> for SubId {
-    fn from(s: &str) -> Self {
-        Self(s.to_string())
+    #[derive(Clone, Debug, Serialize, Deserialize)]
+    pub struct Message {
+        foo: u64,
+        bar: u64,
     }
-}
 
-impl From<String> for SubId {
-    fn from(s: String) -> Self {
-        Self(s)
+    #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
+    pub enum IndexTest {
+        Foo(u64),
+        Bar(u64),
     }
-}
 
-impl FromStr for SubId {
-    type Err = ();
+    impl Indexable for Message {
+        type Index = IndexTest;
 
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
-        Ok(Self(s.to_string()))
+        fn to_indexes(&self) -> Vec<Self::Index> {
+            vec![IndexTest::Foo(self.foo), IndexTest::Bar(self.bar)]
+        }
     }
-}
 
-impl Deref for SubId {
-    type Target = String;
+    #[derive(Default)]
+    pub struct CustomTopic {
+        storage: RwLock<HashMap<IndexTest, Message>>,
+    }
+
+    #[async_trait::async_trait]
+    impl Topic for CustomTopic {
+        type SubscriptionName = String;
+
+        type Event = Message;
+
+        async fn fetch_events(
+            &self,
+            indexes: Vec<<Self::Event as Indexable>::Index>,
+            sub_name: Self::SubscriptionName,
+            reply_to: mpsc::Sender<(Self::SubscriptionName, Self::Event)>,
+        ) {
+            let storage = self.storage.read().unwrap();
+
+            for index in indexes {
+                if let Some(value) = storage.get(&index) {
+                    let _ = reply_to.try_send((sub_name.clone(), value.clone()));
+                }
+            }
+        }
+
+        /// Store events or replace them
+        async fn store_events(&self, event: Self::Event) {
+            let mut storage = self.storage.write().unwrap();
+            for index in event.to_indexes() {
+                storage.insert(index, event.clone());
+            }
+        }
+    }
+
+    #[derive(Clone)]
+    pub enum SubscriptionReq {
+        Foo(u64),
+        Bar(u64),
+    }
+
+    impl SubscriptionRequest for SubscriptionReq {
+        type Index = IndexTest;
+
+        type SubscriptionName = String;
+
+        fn try_get_indexes(&self) -> Result<Vec<Self::Index>, Error> {
+            Ok(vec![match self {
+                SubscriptionReq::Bar(n) => IndexTest::Bar(*n),
+                SubscriptionReq::Foo(n) => IndexTest::Foo(*n),
+            }])
+        }
+
+        fn subscription_name(&self) -> Self::SubscriptionName {
+            "test".to_owned()
+        }
+    }
+
+    #[tokio::test]
+    async fn delivery_twice_realtime() {
+        let pubsub = Pubsub::new(CustomTopic::default());
+
+        assert_eq!(pubsub.active_subscribers(), 0);
+
+        let mut subscriber = pubsub.subscribe(SubscriptionReq::Foo(2)).unwrap();
+
+        assert_eq!(pubsub.active_subscribers(), 1);
+
+        let _ = pubsub.publish_sync(Message { foo: 2, bar: 1 });
+        let _ = pubsub.publish_sync(Message { foo: 2, bar: 2 });
+
+        assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(1));
+        assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(2));
+        assert!(subscriber.try_recv().is_none());
+
+        drop(subscriber);
+
+        assert_eq!(pubsub.active_subscribers(), 0);
+    }
+
+    #[tokio::test]
+    async fn store_events_once_per_index() {
+        let pubsub = Pubsub::new(CustomTopic::default());
+        let _ = pubsub.publish_sync(Message { foo: 1, bar: 2 });
+        let _ = pubsub.publish_sync(Message { foo: 3, bar: 2 });
+
+        let mut subscriber = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
+
+        // Just should receive the latest
+        assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(3));
+
+        // realtime delivery test
+        pubsub.publish(Message { foo: 1, bar: 2 });
+        assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(1));
 
-    fn deref(&self) -> &Self::Target {
-        &self.0
+        // new subscription should only get the latest state (it is up to the Topic trait)
+        let mut y = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
+        assert_eq!(y.recv().await.map(|x| x.foo), Some(1));
     }
 }

+ 226 - 0
crates/cdk-common/src/pub_sub/pubsub.rs

@@ -0,0 +1,226 @@
+//! Pub-sub producer
+
+use std::cmp::Ordering;
+use std::collections::{BTreeMap, HashSet};
+use std::fmt::Debug;
+use std::hash::Hash;
+use std::sync::atomic::AtomicUsize;
+use std::sync::{Arc, RwLock};
+
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use tokio::sync::mpsc;
+
+use super::index::Indexable;
+use super::subscriber::{ActiveSubscription, SubscriptionRequest};
+use super::Error;
+
+/// Event producer definition
+///
+/// This trait defines events to be broadcasted. Events and subscription requests are converted into
+/// a vector of indexes.
+///
+/// Matching events with subscriptions, through the indexes are broadcasted in real time.
+///
+/// When a new subscription request is created, a deferred call to `fetch_events` is executed to
+/// fetch from a persistent medium the current events to broadcast.
+#[async_trait::async_trait]
+pub trait Topic: Send + Sync {
+    /// Subscription ID
+    type SubscriptionName: Debug
+        + Clone
+        + Default
+        + Eq
+        + PartialEq
+        + Ord
+        + PartialOrd
+        + Hash
+        + Send
+        + Sync
+        + DeserializeOwned
+        + Serialize;
+
+    /// An Event should be Indexable
+    type Event: Indexable + Debug + Send + Sync + DeserializeOwned + Serialize;
+
+    /// Called when a new subscription is created. The function is responsible to not yield the same
+    async fn fetch_events(
+        &self,
+        indexes: Vec<<Self::Event as Indexable>::Index>,
+        sub_name: Self::SubscriptionName,
+        reply_to: mpsc::Sender<(Self::SubscriptionName, Self::Event)>,
+    );
+
+    /// Store events or replace them
+    async fn store_events(&self, event: Self::Event);
+}
+
+/// Default channel size for subscription buffering
+pub const DEFAULT_CHANNEL_SIZE: usize = 1_000;
+
+/// Internal Index Tree
+pub type IndexTree<P> = Arc<
+    RwLock<
+        BTreeMap<
+            // Index with a subscription unique ID
+            (<<P as Topic>::Event as Indexable>::Index, usize),
+            (
+                <P as Topic>::SubscriptionName, // Subscription ID, as given by the client, more like a name
+                mpsc::Sender<(<P as Topic>::SubscriptionName, <P as Topic>::Event)>, // Sender
+            ),
+        >,
+    >,
+>;
+
+/// Manager
+pub struct Pubsub<P>
+where
+    P: Topic + 'static,
+{
+    inner: Arc<P>,
+    listeners_index: IndexTree<P>,
+    unique_subscription_counter: AtomicUsize,
+    active_subscribers: Arc<AtomicUsize>,
+}
+
+impl<P> Pubsub<P>
+where
+    P: Topic + 'static,
+{
+    /// Create a new instance
+    pub fn new(inner: P) -> Self {
+        Self {
+            inner: Arc::new(inner),
+            listeners_index: Default::default(),
+            unique_subscription_counter: 0.into(),
+            active_subscribers: Arc::new(0.into()),
+        }
+    }
+
+    /// Total number of active subscribers, it is not the number of active indexes being subscribed
+    pub fn active_subscribers(&self) -> usize {
+        self.active_subscribers
+            .load(std::sync::atomic::Ordering::SeqCst)
+    }
+
+    /// Publish an event to all listenrs
+    fn publish_internal(
+        event: P::Event,
+        listeners_index: &IndexTree<P>,
+        inner: Arc<P>,
+    ) -> Result<(), Error> {
+        let index_storage = listeners_index.read().map_err(|_| Error::Poison)?;
+        let mut sent = HashSet::new();
+        for index in event.to_indexes() {
+            for ((subscription_index, unique_id), (subscription_id, sender)) in
+                index_storage.range((index.clone(), 0)..)
+            {
+                if subscription_index.cmp(&index) != Ordering::Equal {
+                    break;
+                }
+                if sent.contains(&unique_id) {
+                    continue;
+                }
+                sent.insert(unique_id);
+                let _ = sender.try_send((subscription_id.clone(), event.clone()));
+            }
+        }
+        drop(index_storage);
+
+        tokio::spawn(async move {
+            inner.store_events(event).await;
+        });
+
+        Ok(())
+    }
+
+    /// Broadcast an event to all listeners
+    #[inline(always)]
+    pub fn publish<E>(&self, event: E)
+    where
+        E: Into<P::Event>,
+    {
+        let indexes = self.listeners_index.clone();
+        let inner = self.inner.clone();
+        let event = event.into();
+
+        tokio::spawn(async move { Self::publish_internal(event, &indexes, inner) });
+    }
+
+    /// Broadcast an event to all listeners right away, blocking the current thread
+    ///
+    /// This function takes an Arc to the storage struct, the event_id, the kind
+    /// and the vent to broadcast
+    #[inline(always)]
+    pub fn publish_sync<E>(&self, event: E) -> Result<(), Error>
+    where
+        E: Into<P::Event>,
+    {
+        let event = event.into();
+        Self::publish_internal(event, &self.listeners_index, self.inner.clone())
+    }
+
+    /// Subscribe proving custom sender/receiver mpsc
+    #[inline(always)]
+    pub fn subscribe_with<I>(
+        &self,
+        request: I,
+        sender: mpsc::Sender<(P::SubscriptionName, P::Event)>,
+        receiver: Option<mpsc::Receiver<(P::SubscriptionName, P::Event)>>,
+    ) -> Result<ActiveSubscription<P>, Error>
+    where
+        I: SubscriptionRequest<
+            Index = <P::Event as Indexable>::Index,
+            SubscriptionName = P::SubscriptionName,
+        >,
+    {
+        let mut index_storage = self.listeners_index.write().map_err(|_| Error::Poison)?;
+        let subscription_internal_id = self
+            .unique_subscription_counter
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        self.active_subscribers
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        let subscription_name = request.subscription_name();
+        let subscribed_to = request.try_get_indexes()?;
+
+        for index in subscribed_to.iter() {
+            index_storage.insert(
+                (index.clone(), subscription_internal_id),
+                (subscription_name.clone(), sender.clone()),
+            );
+        }
+        drop(index_storage);
+
+        let inner = self.inner.clone();
+        let subscribed_to_for_spawn = subscribed_to.clone();
+        let sub_name = subscription_name.clone();
+        tokio::spawn(async move {
+            inner
+                .fetch_events(subscribed_to_for_spawn, sub_name, sender)
+                .await;
+        });
+
+        Ok(ActiveSubscription::new(
+            subscription_internal_id,
+            subscription_name,
+            self.active_subscribers.clone(),
+            self.listeners_index.clone(),
+            subscribed_to,
+            receiver,
+        ))
+    }
+
+    /// Subscribe
+    pub fn subscribe<I>(&self, request: I) -> Result<ActiveSubscription<P>, Error>
+    where
+        I: SubscriptionRequest<
+            Index = <P::Event as Indexable>::Index,
+            SubscriptionName = P::SubscriptionName,
+        >,
+    {
+        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
+        self.subscribe_with(request, sender, Some(receiver))
+    }
+}

+ 172 - 0
crates/cdk-common/src/pub_sub/remote_consumer.rs

@@ -0,0 +1,172 @@
+//! Pub-sub consumer
+//!
+//! Consumers are designed to connect to a producer, through a transport, and subscribe to events.
+use std::collections::HashMap;
+use std::sync::atomic::AtomicBool;
+use std::sync::{Arc, RwLock};
+
+use tokio::sync::mpsc;
+
+use super::pubsub::Topic;
+use super::subscriber::{ActiveSubscription, SubscriptionRequest};
+use super::{Error, Indexable, Pubsub};
+
+type ActiveSubscriptions<T> = RwLock<
+    HashMap<
+        <T as Topic>::SubscriptionName,
+        (
+            Vec<<<T as Topic>::Event as Indexable>::Index>,
+            ActiveSubscription<T>,
+        ),
+    >,
+>;
+
+type InternalSender<T> = mpsc::Sender<(<T as Topic>::SubscriptionName, <T as Topic>::Event)>;
+
+/// Subscription consumer
+pub struct Consumer<T>
+where
+    T: Transport + 'static,
+{
+    transport: T,
+    pubsub_internal_sender: InternalSender<T::Topic>,
+    inner_pubsub: Pubsub<T::Topic>,
+    subscriptions: ActiveSubscriptions<T::Topic>,
+    send_to_transport_loop: RwLock<mpsc::Sender<MessageToTransportLoop<T::Topic>>>,
+    still_running: AtomicBool,
+}
+
+impl<T> Consumer<T>
+where
+    T: Transport + 'static,
+{
+    /// Creates a new instance
+    pub async fn new(transport: T) -> Arc<Self> {
+        let (sender, _) = mpsc::channel(10_000);
+        let this = Arc::new(Self {
+            transport,
+            inner_pubsub: T::new_pubsub().await,
+            pubsub_internal_sender: mpsc::channel(10_000).0,
+            subscriptions: Default::default(),
+            send_to_transport_loop: RwLock::new(sender),
+            still_running: true.into(),
+        });
+
+        tokio::spawn(Self::connection_loop(this.clone()));
+
+        this
+    }
+
+    async fn connection_loop(instance: Arc<Self>) {
+        loop {
+            let (sender, receiver) = mpsc::channel(10_000);
+
+            {
+                let mut shared_sender = instance.send_to_transport_loop.write().unwrap();
+                *shared_sender = sender;
+            }
+
+            instance.transport.long_connection(receiver).await;
+        }
+    }
+
+    /// Creates a subscription
+    ///
+    /// The subscriptions have two parts:
+    ///
+    /// 1. Will create the subscription to the remote Pubsub service, Any events will be moved to
+    ///    the internal pubsub
+    ///
+    /// 2. The internal subscription to the inner Pubsub. Because all subscriptions are going the
+    ///    transport, once events matches subscriptions, the inner_pubsub will receive the message and
+    ///    broadcasat the event.
+    ///
+    ///
+    pub fn subscribe<I>(&self, request: I) -> Result<(), Error>
+    where
+        I: SubscriptionRequest<
+            Index = <<T::Topic as Topic>::Event as Indexable>::Index,
+            SubscriptionName = <T::Topic as Topic>::SubscriptionName,
+        >,
+    {
+        let transport_loop = self
+            .send_to_transport_loop
+            .read()
+            .map_err(|_| Error::Poison)?;
+        let mut subscriptions = self.subscriptions.write().map_err(|_| Error::Poison)?;
+        let subscription_name = request.subscription_name();
+        let indexes = request.try_get_indexes()?;
+
+        if subscriptions.get(&subscription_name).is_some() {
+            return Err(Error::AlreadySubscribed);
+        }
+
+        subscriptions.insert(
+            subscription_name.clone(),
+            (
+                indexes.clone(),
+                self.inner_pubsub.subscribe_with(
+                    request,
+                    self.pubsub_internal_sender.clone(),
+                    None,
+                )?,
+            ),
+        );
+        drop(subscriptions);
+
+        let _ = transport_loop.try_send(MessageToTransportLoop::Subscribe((
+            subscription_name,
+            indexes,
+        )));
+
+        Ok(())
+    }
+}
+
+impl<T> Drop for Consumer<T>
+where
+    T: Transport + 'static,
+{
+    fn drop(&mut self) {
+        self.still_running
+            .store(false, std::sync::atomic::Ordering::Release);
+        let r = self.send_to_transport_loop.read().unwrap();
+        let _ = r.try_send(MessageToTransportLoop::Stop);
+    }
+}
+
+///Internal message to transport loop
+pub enum MessageToTransportLoop<T>
+where
+    T: Topic + 'static,
+{
+    /// Add a subscription
+    Subscribe((T::SubscriptionName, Vec<<T::Event as Indexable>::Index>)),
+    /// Desuscribe
+    Desuscribe(T::SubscriptionName),
+    /// Exit the loop
+    Stop,
+}
+
+/// Subscription transport trait
+#[async_trait::async_trait]
+pub trait Transport: Send + Sync {
+    /// Topic
+    type Topic: Topic + Clone + Sync + Send;
+
+    /// Creates a new pubsub topic producer
+    async fn new_pubsub() -> Pubsub<Self::Topic>;
+
+    /// Open a long connection
+    async fn long_connection(
+        &self,
+        subscribe_changes: mpsc::Receiver<MessageToTransportLoop<Self::Topic>>,
+    ) where
+        Self: Sized;
+
+    /// Poll on demand
+    async fn poll(
+        &self,
+        index: Vec<<<Self::Topic as Topic>::Event as Indexable>::Index>,
+    ) -> Result<Vec<Self::Topic>, Error>;
+}

+ 97 - 0
crates/cdk-common/src/pub_sub/subscriber.rs

@@ -0,0 +1,97 @@
+//! Active subscription
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+use tokio::sync::mpsc;
+
+use super::pubsub::{IndexTree, Topic};
+use super::Error;
+use crate::pub_sub::index::Indexable;
+
+/// Subscription request
+pub trait SubscriptionRequest: Clone {
+    /// Indexes
+    type Index;
+
+    /// Subscription name
+    type SubscriptionName;
+
+    /// Try to get indexes from the request
+    fn try_get_indexes(&self) -> Result<Vec<Self::Index>, Error>;
+
+    /// Get the subscription name
+    fn subscription_name(&self) -> Self::SubscriptionName;
+}
+
+/// Active Subscription
+pub struct ActiveSubscription<P>
+where
+    P: Topic + 'static,
+{
+    id: usize,
+    name: P::SubscriptionName,
+    active_subscribers: Arc<AtomicUsize>,
+    indexes: IndexTree<P>,
+    subscribed_to: Vec<<P::Event as Indexable>::Index>,
+    receiver: Option<mpsc::Receiver<(P::SubscriptionName, P::Event)>>,
+}
+
+impl<P> ActiveSubscription<P>
+where
+    P: Topic + 'static,
+{
+    /// Creates a new instance
+    pub fn new(
+        id: usize,
+        name: P::SubscriptionName,
+        active_subscribers: Arc<AtomicUsize>,
+        indexes: IndexTree<P>,
+        subscribed_to: Vec<<P::Event as Indexable>::Index>,
+        receiver: Option<mpsc::Receiver<(P::SubscriptionName, P::Event)>>,
+    ) -> Self {
+        Self {
+            id,
+            name,
+            active_subscribers,
+            subscribed_to,
+            indexes,
+            receiver,
+        }
+    }
+
+    /// Receives the next event
+    pub async fn recv(&mut self) -> Option<P::Event> {
+        self.receiver.as_mut()?.recv().await.map(|(_, event)| event)
+    }
+
+    /// Try receive an event or return Noen right away
+    pub fn try_recv(&mut self) -> Option<P::Event> {
+        self.receiver
+            .as_mut()?
+            .try_recv()
+            .ok()
+            .map(|(_, event)| event)
+    }
+
+    /// Get the subscription name
+    pub fn name(&self) -> &P::SubscriptionName {
+        &self.name
+    }
+}
+
+impl<P> Drop for ActiveSubscription<P>
+where
+    P: Topic + 'static,
+{
+    fn drop(&mut self) {
+        // remove the listener
+        let mut indexes = self.indexes.write().unwrap();
+        for index in self.subscribed_to.drain(..) {
+            indexes.remove(&(index, self.id));
+        }
+
+        // decrement the number of active subscribers
+        self.active_subscribers
+            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+    }
+}

+ 76 - 16
crates/cdk-common/src/subscription.rs

@@ -1,20 +1,13 @@
 //! Subscription types and traits
-#[cfg(feature = "mint")]
+use std::ops::Deref;
 use std::str::FromStr;
 
-use cashu::nut17::{self};
-#[cfg(feature = "mint")]
-use cashu::nut17::{Error, Kind, Notification};
-#[cfg(feature = "mint")]
+use cashu::nut17::{self, Kind, NotificationId};
 use cashu::quote_id::QuoteId;
-#[cfg(feature = "mint")]
-use cashu::{NotificationPayload, PublicKey};
-#[cfg(feature = "mint")]
+use cashu::PublicKey;
 use serde::{Deserialize, Serialize};
 
-#[cfg(feature = "mint")]
-use crate::pub_sub::index::{Index, Indexable, SubscriptionGlobalId};
-use crate::pub_sub::SubId;
+use crate::pub_sub::{Error, SubscriptionRequest};
 
 /// Subscription parameters.
 ///
@@ -22,7 +15,6 @@ use crate::pub_sub::SubId;
 pub type Params = nut17::Params<SubId>;
 
 /// Wrapper around `nut17::Params` to implement `Indexable` for `Notification`.
-#[cfg(feature = "mint")]
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct IndexableParams(Params);
 
@@ -33,11 +25,78 @@ impl From<Params> for IndexableParams {
     }
 }
 
+/// Subscription Id wrapper
+///
+/// This is the place to add some sane default (like a max length) to the
+/// subscription ID
+#[derive(Debug, Clone, Default, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
+pub struct SubId(String);
+
+impl From<&str> for SubId {
+    fn from(s: &str) -> Self {
+        Self(s.to_string())
+    }
+}
+
+impl From<String> for SubId {
+    fn from(s: String) -> Self {
+        Self(s)
+    }
+}
+
+impl FromStr for SubId {
+    type Err = ();
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        Ok(Self(s.to_string()))
+    }
+}
+
+impl Deref for SubId {
+    type Target = String;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl SubscriptionRequest for IndexableParams {
+    type Index = NotificationId;
+
+    type SubscriptionName = SubId;
+
+    fn subscription_name(&self) -> Self::SubscriptionName {
+        self.0.id.clone()
+    }
+
+    fn try_get_indexes(&self) -> Result<Vec<Self::Index>, Error> {
+        self.0
+            .filters
+            .iter()
+            .map(|filter| match self.0.kind {
+                Kind::Bolt11MeltQuote => QuoteId::from_str(filter)
+                    .map(NotificationId::MeltQuoteBolt11)
+                    .map_err(|_| Error::ParsingError(filter.to_owned())),
+                Kind::Bolt11MintQuote => QuoteId::from_str(filter)
+                    .map(NotificationId::MintQuoteBolt11)
+                    .map_err(|_| Error::ParsingError(filter.to_owned())),
+                Kind::ProofState => PublicKey::from_str(filter)
+                    .map(NotificationId::ProofState)
+                    .map_err(|_| Error::ParsingError(filter.to_owned())),
+
+                Kind::Bolt12MintQuote => QuoteId::from_str(filter)
+                    .map(NotificationId::MintQuoteBolt12)
+                    .map_err(|_| Error::ParsingError(filter.to_owned())),
+            })
+            .collect::<Result<Vec<_>, _>>()
+    }
+}
+
+/*
 #[cfg(feature = "mint")]
-impl TryFrom<IndexableParams> for Vec<Index<Notification>> {
+impl TryFrom<IndexableParams> for Vec<Notification> {
     type Error = Error;
     fn try_from(params: IndexableParams) -> Result<Self, Self::Error> {
-        let sub_id: SubscriptionGlobalId = Default::default();
         let params = params.0;
         params
             .filters
@@ -71,9 +130,9 @@ impl AsRef<SubId> for IndexableParams {
 
 #[cfg(feature = "mint")]
 impl Indexable for NotificationPayload<QuoteId> {
-    type Type = Notification;
+    type Index = Notification;
 
-    fn to_indexes(&self) -> Vec<Index<Self::Type>> {
+    fn to_indexes(&self) -> Vec<Index<Self::Index>> {
         match self {
             NotificationPayload::ProofState(proof_state) => {
                 vec![Index::from(Notification::ProofState(proof_state.y))]
@@ -96,3 +155,4 @@ impl Indexable for NotificationPayload<QuoteId> {
         }
     }
 }
+*/

+ 1 - 1
crates/cdk-common/src/ws.rs

@@ -10,7 +10,7 @@ use cashu::quote_id::QuoteId;
 #[cfg(feature = "mint")]
 use cashu::NotificationPayload;
 
-use crate::pub_sub::SubId;
+use crate::subscription::SubId;
 
 /// Request to unsubscribe from a websocket subscription
 pub type WsUnsubscribeRequest = nut17::ws::WsUnsubscribeRequest<SubId>;

+ 5 - 5
crates/cdk-ffi/src/types.rs

@@ -5,7 +5,7 @@ use std::str::FromStr;
 use std::sync::Mutex;
 
 use cdk::nuts::{CurrencyUnit as CdkCurrencyUnit, State as CdkState};
-use cdk::pub_sub::SubId;
+use cdk::subscription::SubId;
 use cdk::Amount as CdkAmount;
 use serde::{Deserialize, Serialize};
 
@@ -1897,7 +1897,7 @@ pub fn encode_nuts(nuts: Nuts) -> Result<String, FfiError> {
 pub struct MintInfo {
     /// name of the mint and should be recognizable
     pub name: Option<String>,
-    /// hex pubkey of the mint  
+    /// hex pubkey of the mint
     pub pubkey: Option<String>,
     /// implementation name and the version running
     pub version: Option<MintVersion>,
@@ -2111,7 +2111,7 @@ pub enum Witness {
         /// Signatures
         signatures: Vec<String>,
     },
-    /// HTLC Witness  
+    /// HTLC Witness
     HTLC {
         /// Preimage
         preimage: String,
@@ -2500,7 +2500,7 @@ pub struct SubscribeParams {
     pub id: Option<String>,
 }
 
-impl From<SubscribeParams> for cdk::nuts::nut17::Params<cdk::pub_sub::SubId> {
+impl From<SubscribeParams> for cdk::nuts::nut17::Params<SubId> {
     fn from(params: SubscribeParams) -> Self {
         let sub_id = params
             .id
@@ -2821,7 +2821,7 @@ pub fn encode_keys(keys: Keys) -> Result<String, FfiError> {
 pub struct KeySet {
     /// Keyset ID
     pub id: String,
-    /// Currency unit  
+    /// Currency unit
     pub unit: CurrencyUnit,
     /// The keys (map of amount to public key hex)
     pub keys: HashMap<u64, String>,

+ 2 - 1
crates/cdk-ffi/src/wallet.rs

@@ -4,6 +4,7 @@ use std::str::FromStr;
 use std::sync::Arc;
 
 use bip39::Mnemonic;
+use cdk::subscription::SubId;
 use cdk::wallet::{Wallet as CdkWallet, WalletBuilder as CdkWalletBuilder};
 
 use crate::error::FfiError;
@@ -348,7 +349,7 @@ impl Wallet {
         &self,
         params: SubscribeParams,
     ) -> Result<std::sync::Arc<ActiveSubscription>, FfiError> {
-        let cdk_params: cdk::nuts::nut17::Params<cdk::pub_sub::SubId> = params.clone().into();
+        let cdk_params: cdk::nuts::nut17::Params<SubId> = params.clone().into();
         let sub_id = cdk_params.id.to_string();
         let active_sub = self.inner.subscribe(cdk_params).await;
         Ok(std::sync::Arc::new(ActiveSubscription::new(

+ 4 - 6
crates/cdk-integration-tests/tests/integration_tests_pure.rs

@@ -445,7 +445,7 @@ pub async fn test_p2pk_swap() {
 
     let mut listener = mint_bob
         .pubsub_manager()
-        .try_subscribe::<IndexableParams>(
+        .subscribe::<IndexableParams>(
             Params {
                 kind: cdk::nuts::nut17::Kind::ProofState,
                 filters: public_keys_to_listen.clone(),
@@ -453,7 +453,6 @@ pub async fn test_p2pk_swap() {
             }
             .into(),
         )
-        .await
         .expect("valid subscription");
 
     match mint_bob.process_swap_request(swap_request).await {
@@ -480,9 +479,8 @@ pub async fn test_p2pk_swap() {
     sleep(Duration::from_secs(1)).await;
 
     let mut msgs = HashMap::new();
-    while let Ok((sub_id, msg)) = listener.try_recv() {
-        assert_eq!(sub_id, "test".into());
-        match msg {
+    while let Some(msg) = listener.try_recv() {
+        match msg.inner() {
             NotificationPayload::ProofState(ProofState { y, state, .. }) => {
                 msgs.entry(y.to_string())
                     .or_insert_with(Vec::new)
@@ -504,7 +502,7 @@ pub async fn test_p2pk_swap() {
         );
     }
 
-    assert!(listener.try_recv().is_err(), "no other event is happening");
+    assert!(listener.try_recv().is_none(), "no other event is happening");
     assert!(msgs.is_empty(), "Only expected key events are received");
 }
 

+ 7 - 8
crates/cdk/src/lib.rs

@@ -28,11 +28,9 @@ mod bip353;
 #[cfg(all(any(feature = "wallet", feature = "mint"), feature = "auth"))]
 mod oidc_client;
 
-#[cfg(all(any(feature = "wallet", feature = "mint"), feature = "auth"))]
-pub use oidc_client::OidcClient;
-
-pub mod pub_sub;
-
+#[cfg(feature = "mint")]
+#[doc(hidden)]
+pub use cdk_common::payment as cdk_payment;
 /// Re-export amount type
 #[doc(hidden)]
 pub use cdk_common::{
@@ -40,9 +38,8 @@ pub use cdk_common::{
     error::{self, Error},
     lightning_invoice, mint_url, nuts, secret, util, ws, Amount, Bolt11Invoice,
 };
-#[cfg(feature = "mint")]
-#[doc(hidden)]
-pub use cdk_common::{payment as cdk_payment, subscription};
+#[cfg(all(any(feature = "wallet", feature = "mint"), feature = "auth"))]
+pub use oidc_client::OidcClient;
 
 pub mod fees;
 
@@ -65,6 +62,8 @@ pub use self::wallet::HttpClient;
 #[doc(hidden)]
 pub type Result<T, E = Box<dyn std::error::Error>> = std::result::Result<T, E>;
 
+/// Re-export subscription
+pub use cdk_common::subscription;
 /// Re-export futures::Stream
 #[cfg(any(feature = "wallet", feature = "mint"))]
 pub use futures::{Stream, StreamExt};

+ 2 - 2
crates/cdk/src/mint/issue/mod.rs

@@ -322,12 +322,12 @@ impl Mint {
                 PaymentMethod::Bolt11 => {
                     let res: MintQuoteBolt11Response<QuoteId> = quote.clone().into();
                     self.pubsub_manager
-                        .broadcast(NotificationPayload::MintQuoteBolt11Response(res));
+                        .publish(NotificationPayload::MintQuoteBolt11Response(res));
                 }
                 PaymentMethod::Bolt12 => {
                     let res: MintQuoteBolt12Response<QuoteId> = quote.clone().try_into()?;
                     self.pubsub_manager
-                        .broadcast(NotificationPayload::MintQuoteBolt12Response(res));
+                        .publish(NotificationPayload::MintQuoteBolt12Response(res));
                 }
                 PaymentMethod::Custom(_) => {}
             }

+ 3 - 3
crates/cdk/src/mint/mod.rs

@@ -20,7 +20,7 @@ use cdk_signatory::signatory::{Signatory, SignatoryKeySet};
 use futures::StreamExt;
 #[cfg(feature = "auth")]
 use nut21::ProtectedEndpoint;
-use subscription::PubSubManager;
+use pubsub_manager::PubSubManager;
 use tokio::sync::{Mutex, Notify};
 use tokio::task::{JoinHandle, JoinSet};
 use tracing::instrument;
@@ -41,8 +41,8 @@ mod keysets;
 mod ln;
 mod melt;
 mod proof_writer;
+mod pubsub_manager;
 mod start_up_check;
-pub mod subscription;
 mod swap;
 mod verification;
 
@@ -205,7 +205,7 @@ impl Mint {
 
         Ok(Self {
             signatory,
-            pubsub_manager: Arc::new(localstore.clone().into()),
+            pubsub_manager: PubSubManager::new(localstore.clone()),
             localstore,
             #[cfg(feature = "auth")]
             oidc_client: computed_info.nuts.nut21.as_ref().map(|nut21| {

+ 1 - 1
crates/cdk/src/mint/proof_writer.rs

@@ -5,7 +5,7 @@ use std::sync::Arc;
 use cdk_common::database::{self, DynMintDatabase, MintTransaction};
 use cdk_common::{Error, Proofs, ProofsMethods, PublicKey, QuoteId, State};
 
-use super::subscription::PubSubManager;
+use super::pubsub_manager::PubSubManager;
 
 type Tx<'a, 'b> = Box<dyn MintTransaction<'a, database::Error> + Send + Sync + 'b>;
 

+ 307 - 0
crates/cdk/src/mint/pubsub_manager.rs

@@ -0,0 +1,307 @@
+//! Specific Subscription for the cdk crate
+
+use std::ops::Deref;
+use std::sync::Arc;
+
+use cdk_common::database::DynMintDatabase;
+use cdk_common::mint::MintQuote;
+use cdk_common::nut17::NotificationId;
+use cdk_common::pub_sub::{Indexable, Pubsub, Topic};
+use cdk_common::subscription::SubId;
+use cdk_common::{
+    Amount, BlindSignature, MeltQuoteBolt11Response, MeltQuoteState, MintQuoteBolt11Response,
+    MintQuoteBolt12Response, MintQuoteState, NotificationPayload, PaymentMethod, ProofState,
+    PublicKey, QuoteId,
+};
+use serde::{Deserialize, Serialize};
+use tokio::sync::mpsc;
+
+/// Simple wrapper over NotificationPayload<QuoteId> which is a foreign type
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MintEvent(NotificationPayload<QuoteId>);
+
+#[allow(clippy::from_over_into)]
+impl Into<NotificationPayload<QuoteId>> for MintEvent {
+    fn into(self) -> NotificationPayload<QuoteId> {
+        self.0
+    }
+}
+
+impl MintEvent {
+    /// Get inner
+    pub fn inner(self) -> NotificationPayload<QuoteId> {
+        self.0
+    }
+}
+
+impl From<NotificationPayload<QuoteId>> for MintEvent {
+    fn from(value: NotificationPayload<QuoteId>) -> Self {
+        Self(value)
+    }
+}
+
+impl From<ProofState> for MintEvent {
+    fn from(value: ProofState) -> Self {
+        Self(NotificationPayload::ProofState(value))
+    }
+}
+
+impl From<MintQuoteBolt11Response<QuoteId>> for MintEvent {
+    fn from(value: MintQuoteBolt11Response<QuoteId>) -> Self {
+        Self(NotificationPayload::MintQuoteBolt11Response(value))
+    }
+}
+
+impl From<MeltQuoteBolt11Response<QuoteId>> for MintEvent {
+    fn from(value: MeltQuoteBolt11Response<QuoteId>) -> Self {
+        Self(NotificationPayload::MeltQuoteBolt11Response(value))
+    }
+}
+
+impl From<MintQuoteBolt12Response<QuoteId>> for MintEvent {
+    fn from(value: MintQuoteBolt12Response<QuoteId>) -> Self {
+        Self(NotificationPayload::MintQuoteBolt12Response(value))
+    }
+}
+
+impl Indexable for MintEvent {
+    type Index = NotificationId;
+
+    fn to_indexes(&self) -> Vec<Self::Index> {
+        vec![match &self.0 {
+            NotificationPayload::MeltQuoteBolt11Response(r) => {
+                NotificationId::MeltQuoteBolt11(r.quote.to_owned())
+            }
+            NotificationPayload::MintQuoteBolt11Response(r) => {
+                NotificationId::MintQuoteBolt11(r.quote.to_owned())
+            }
+            NotificationPayload::MintQuoteBolt12Response(r) => {
+                NotificationId::MintQuoteBolt12(r.quote.to_owned())
+            }
+            NotificationPayload::ProofState(p) => NotificationId::ProofState(p.y.to_owned()),
+        }]
+    }
+}
+
+pub struct MintSubTopics {
+    db: DynMintDatabase,
+}
+
+impl MintSubTopics {
+    async fn get_events_from_db_legacy(
+        &self,
+        request: &[NotificationId],
+    ) -> Result<Vec<MintEvent>, String> {
+        let mut to_return = vec![];
+        let mut public_keys: Vec<PublicKey> = Vec::new();
+        let mut melt_queries = Vec::new();
+        let mut mint_queries = Vec::new();
+
+        for idx in request.iter() {
+            match idx {
+                NotificationId::ProofState(pk) => public_keys.push(*pk),
+                NotificationId::MeltQuoteBolt11(uuid) => {
+                    melt_queries.push(self.db.get_melt_quote(uuid))
+                }
+                NotificationId::MintQuoteBolt11(uuid) => {
+                    mint_queries.push(self.db.get_mint_quote(uuid))
+                }
+                NotificationId::MintQuoteBolt12(uuid) => {
+                    mint_queries.push(self.db.get_mint_quote(uuid))
+                }
+                NotificationId::MeltQuoteBolt12(uuid) => {
+                    melt_queries.push(self.db.get_melt_quote(uuid))
+                }
+            }
+        }
+
+        if !melt_queries.is_empty() {
+            to_return.extend(
+                futures::future::try_join_all(melt_queries)
+                    .await
+                    .map(|quotes| {
+                        quotes
+                            .into_iter()
+                            .filter_map(|quote| quote.map(|x| x.into()))
+                            .map(|x: MeltQuoteBolt11Response<QuoteId>| x.into())
+                            .collect::<Vec<_>>()
+                    })
+                    .map_err(|e| e.to_string())?,
+            );
+        }
+
+        if !mint_queries.is_empty() {
+            to_return.extend(
+                futures::future::try_join_all(mint_queries)
+                    .await
+                    .map(|quotes| {
+                        quotes
+                            .into_iter()
+                            .filter_map(|quote| {
+                                quote.and_then(|x| match x.payment_method {
+                                    PaymentMethod::Bolt11 => {
+                                        let response: MintQuoteBolt11Response<QuoteId> = x.into();
+                                        Some(response.into())
+                                    }
+                                    PaymentMethod::Bolt12 => match x.try_into() {
+                                        Ok(response) => {
+                                            let response: MintQuoteBolt12Response<QuoteId> =
+                                                response;
+                                            Some(response.into())
+                                        }
+                                        Err(_) => None,
+                                    },
+                                    PaymentMethod::Custom(_) => None,
+                                })
+                            })
+                            .collect::<Vec<_>>()
+                    })
+                    .map_err(|e| e.to_string())?,
+            );
+        }
+
+        if !public_keys.is_empty() {
+            to_return.extend(
+                self.db
+                    .get_proofs_states(public_keys.as_slice())
+                    .await
+                    .map_err(|e| e.to_string())?
+                    .into_iter()
+                    .enumerate()
+                    .filter_map(|(idx, state)| state.map(|state| (public_keys[idx], state).into()))
+                    .map(|state: ProofState| state.into()),
+            );
+        }
+
+        Ok(to_return)
+    }
+}
+
+#[async_trait::async_trait]
+impl Topic for MintSubTopics {
+    type SubscriptionName = SubId;
+
+    type Event = MintEvent;
+
+    async fn fetch_events(
+        &self,
+        indexes: Vec<<Self::Event as Indexable>::Index>,
+        sub_name: Self::SubscriptionName,
+        reply_to: mpsc::Sender<(Self::SubscriptionName, Self::Event)>,
+    ) {
+        for event in self.get_events_from_db_legacy(&indexes).await.unwrap() {
+            let _ = reply_to.try_send((sub_name.clone(), event));
+        }
+    }
+
+    /// Store events or replace them
+    async fn store_events(&self, _event: Self::Event) {
+        todo!()
+    }
+}
+
+/// PubsubManager
+pub struct PubSubManager(Pubsub<MintSubTopics>);
+
+impl PubSubManager {
+    /// Create a new instance
+    pub fn new(db: DynMintDatabase) -> Arc<Self> {
+        Arc::new(Self(Pubsub::new(MintSubTopics { db })))
+    }
+
+    /// Helper function to emit a ProofState status
+    pub fn proof_state<E: Into<ProofState>>(&self, event: E) {
+        self.publish(event.into());
+    }
+
+    /// Helper function to publish even of a mint quote being paid
+    pub fn mint_quote_issue(&self, mint_quote: &MintQuote, total_issued: Amount) {
+        match mint_quote.payment_method {
+            PaymentMethod::Bolt11 => {
+                self.mint_quote_bolt11_status(mint_quote.clone(), MintQuoteState::Issued);
+            }
+            PaymentMethod::Bolt12 => {
+                self.mint_quote_bolt12_status(
+                    mint_quote.clone(),
+                    mint_quote.amount_paid(),
+                    total_issued,
+                );
+            }
+            _ => {
+                // We don't send ws updates for unknown methods
+            }
+        }
+    }
+
+    /// Helper function to publish even of a mint quote being paid
+    pub fn mint_quote_payment(&self, mint_quote: &MintQuote, total_paid: Amount) {
+        match mint_quote.payment_method {
+            PaymentMethod::Bolt11 => {
+                self.mint_quote_bolt11_status(mint_quote.clone(), MintQuoteState::Paid);
+            }
+            PaymentMethod::Bolt12 => {
+                self.mint_quote_bolt12_status(
+                    mint_quote.clone(),
+                    total_paid,
+                    mint_quote.amount_issued(),
+                );
+            }
+            _ => {
+                // We don't send ws updates for unknown methods
+            }
+        }
+    }
+
+    /// Helper function to emit a MintQuoteBolt11Response status
+    pub fn mint_quote_bolt11_status<E: Into<MintQuoteBolt11Response<QuoteId>>>(
+        &self,
+        quote: E,
+        new_state: MintQuoteState,
+    ) {
+        let mut event = quote.into();
+        event.state = new_state;
+
+        self.publish(event);
+    }
+
+    /// Helper function to emit a MintQuoteBolt11Response status
+    pub fn mint_quote_bolt12_status<E: TryInto<MintQuoteBolt12Response<QuoteId>>>(
+        &self,
+        quote: E,
+        amount_paid: Amount,
+        amount_issued: Amount,
+    ) {
+        if let Ok(mut event) = quote.try_into() {
+            event.amount_paid = amount_paid;
+            event.amount_issued = amount_issued;
+
+            self.publish(event);
+        } else {
+            tracing::warn!("Could not convert quote to MintQuoteResponse");
+        }
+    }
+
+    /// Helper function to emit a MeltQuoteBolt11Response status
+    pub fn melt_quote_status<E: Into<MeltQuoteBolt11Response<QuoteId>>>(
+        &self,
+        quote: E,
+        payment_preimage: Option<String>,
+        change: Option<Vec<BlindSignature>>,
+        new_state: MeltQuoteState,
+    ) {
+        let mut quote = quote.into();
+        quote.state = new_state;
+        quote.paid = Some(new_state == MeltQuoteState::Paid);
+        quote.payment_preimage = payment_preimage;
+        quote.change = change;
+        self.publish(quote);
+    }
+}
+
+impl Deref for PubSubManager {
+    type Target = Pubsub<MintSubTopics>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}

+ 0 - 292
crates/cdk/src/mint/subscription/manager.rs

@@ -1,292 +0,0 @@
-//! Specific Subscription for the cdk crate
-use std::ops::Deref;
-
-use cdk_common::database::DynMintDatabase;
-use cdk_common::mint::MintQuote;
-use cdk_common::nut17::Notification;
-use cdk_common::quote_id::QuoteId;
-use cdk_common::{Amount, MintQuoteBolt12Response, NotificationPayload, PaymentMethod};
-
-use super::OnSubscription;
-use crate::nuts::{
-    BlindSignature, MeltQuoteBolt11Response, MeltQuoteState, MintQuoteBolt11Response,
-    MintQuoteState, ProofState,
-};
-use crate::pub_sub;
-
-/// Manager
-/// Publish–subscribe manager
-///
-/// Nut-17 implementation is system-wide and not only through the WebSocket, so
-/// it is possible for another part of the system to subscribe to events.
-pub struct PubSubManager(
-    pub_sub::Manager<NotificationPayload<QuoteId>, Notification, OnSubscription>,
-);
-
-#[allow(clippy::default_constructed_unit_structs)]
-impl Default for PubSubManager {
-    fn default() -> Self {
-        PubSubManager(OnSubscription::default().into())
-    }
-}
-
-impl From<DynMintDatabase> for PubSubManager {
-    fn from(val: DynMintDatabase) -> Self {
-        PubSubManager(OnSubscription(Some(val)).into())
-    }
-}
-
-impl Deref for PubSubManager {
-    type Target = pub_sub::Manager<NotificationPayload<QuoteId>, Notification, OnSubscription>;
-
-    fn deref(&self) -> &Self::Target {
-        &self.0
-    }
-}
-
-impl PubSubManager {
-    /// Helper function to emit a ProofState status
-    pub fn proof_state<E: Into<ProofState>>(&self, event: E) {
-        self.broadcast(event.into().into());
-    }
-
-    /// Helper function to publish even of a mint quote being paid
-    pub fn mint_quote_issue(&self, mint_quote: &MintQuote, total_issued: Amount) {
-        match mint_quote.payment_method {
-            PaymentMethod::Bolt11 => {
-                self.mint_quote_bolt11_status(mint_quote.clone(), MintQuoteState::Issued);
-            }
-            PaymentMethod::Bolt12 => {
-                self.mint_quote_bolt12_status(
-                    mint_quote.clone(),
-                    mint_quote.amount_paid(),
-                    total_issued,
-                );
-            }
-            _ => {
-                // We don't send ws updates for unknown methods
-            }
-        }
-    }
-
-    /// Helper function to publish even of a mint quote being paid
-    pub fn mint_quote_payment(&self, mint_quote: &MintQuote, total_paid: Amount) {
-        match mint_quote.payment_method {
-            PaymentMethod::Bolt11 => {
-                self.mint_quote_bolt11_status(mint_quote.clone(), MintQuoteState::Paid);
-            }
-            PaymentMethod::Bolt12 => {
-                self.mint_quote_bolt12_status(
-                    mint_quote.clone(),
-                    total_paid,
-                    mint_quote.amount_issued(),
-                );
-            }
-            _ => {
-                // We don't send ws updates for unknown methods
-            }
-        }
-    }
-
-    /// Helper function to emit a MintQuoteBolt11Response status
-    pub fn mint_quote_bolt11_status<E: Into<MintQuoteBolt11Response<QuoteId>>>(
-        &self,
-        quote: E,
-        new_state: MintQuoteState,
-    ) {
-        let mut event = quote.into();
-        event.state = new_state;
-
-        self.broadcast(event.into());
-    }
-
-    /// Helper function to emit a MintQuoteBolt11Response status
-    pub fn mint_quote_bolt12_status<E: TryInto<MintQuoteBolt12Response<QuoteId>>>(
-        &self,
-        quote: E,
-        amount_paid: Amount,
-        amount_issued: Amount,
-    ) {
-        if let Ok(mut event) = quote.try_into() {
-            event.amount_paid = amount_paid;
-            event.amount_issued = amount_issued;
-
-            self.broadcast(event.into());
-        } else {
-            tracing::warn!("Could not convert quote to MintQuoteResponse");
-        }
-    }
-
-    /// Helper function to emit a MeltQuoteBolt11Response status
-    pub fn melt_quote_status<E: Into<MeltQuoteBolt11Response<QuoteId>>>(
-        &self,
-        quote: E,
-        payment_preimage: Option<String>,
-        change: Option<Vec<BlindSignature>>,
-        new_state: MeltQuoteState,
-    ) {
-        let mut quote = quote.into();
-        quote.state = new_state;
-        quote.paid = Some(new_state == MeltQuoteState::Paid);
-        quote.payment_preimage = payment_preimage;
-        quote.change = change;
-        self.broadcast(quote.into());
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use std::time::Duration;
-
-    use tokio::time::sleep;
-
-    use super::*;
-    use crate::nuts::nut17::Kind;
-    use crate::nuts::{PublicKey, State};
-    use crate::subscription::{IndexableParams, Params};
-
-    #[tokio::test]
-    async fn active_and_drop() {
-        let manager = PubSubManager::default();
-        let params: IndexableParams = Params {
-            kind: Kind::ProofState,
-            filters: vec![
-                "02a9acc1e48c25eeeb9289b5031cc57da9fe72f3fe2861d264bdc074209b107ba2".to_owned(),
-            ],
-            id: "uno".into(),
-        }
-        .into();
-
-        // Although the same param is used, two subscriptions are created, that
-        // is because each index is unique, thanks to `Unique`, it is the
-        // responsibility of the implementor to make sure that SubId are unique
-        // either globally or per client
-        let subscriptions = vec![
-            manager
-                .try_subscribe(params.clone())
-                .await
-                .expect("valid subscription"),
-            manager
-                .try_subscribe(params)
-                .await
-                .expect("valid subscription"),
-        ];
-        assert_eq!(2, manager.active_subscriptions());
-        drop(subscriptions);
-
-        sleep(Duration::from_millis(10)).await;
-
-        assert_eq!(0, manager.active_subscriptions());
-    }
-
-    #[tokio::test]
-    async fn broadcast() {
-        let manager = PubSubManager::default();
-        let mut subscriptions = [
-            manager
-                .try_subscribe::<IndexableParams>(
-                    Params {
-                        kind: Kind::ProofState,
-                        filters: vec![
-                            "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
-                                .to_string(),
-                        ],
-                        id: "uno".into(),
-                    }
-                    .into(),
-                )
-                .await
-                .expect("valid subscription"),
-            manager
-                .try_subscribe::<IndexableParams>(
-                    Params {
-                        kind: Kind::ProofState,
-                        filters: vec![
-                            "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
-                                .to_string(),
-                        ],
-                        id: "dos".into(),
-                    }
-                    .into(),
-                )
-                .await
-                .expect("valid subscription"),
-        ];
-
-        let event = ProofState {
-            y: PublicKey::from_hex(
-                "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104",
-            )
-            .expect("valid pk"),
-            state: State::Pending,
-            witness: None,
-        };
-
-        manager.broadcast(event.into());
-
-        sleep(Duration::from_millis(10)).await;
-
-        let (sub1, _) = subscriptions[0].try_recv().expect("valid message");
-        assert_eq!("uno", *sub1);
-
-        let (sub1, _) = subscriptions[1].try_recv().expect("valid message");
-        assert_eq!("dos", *sub1);
-
-        assert!(subscriptions[0].try_recv().is_err());
-        assert!(subscriptions[1].try_recv().is_err());
-    }
-
-    #[test]
-    fn parsing_request() {
-        let json = r#"{"kind":"proof_state","filters":["x"],"subId":"uno"}"#;
-        let params: Params = serde_json::from_str(json).expect("valid json");
-        assert_eq!(params.kind, Kind::ProofState);
-        assert_eq!(params.filters, vec!["x"]);
-        assert_eq!(*params.id, "uno");
-    }
-
-    #[tokio::test]
-    async fn json_test() {
-        let manager = PubSubManager::default();
-        let mut subscription = manager
-            .try_subscribe::<IndexableParams>(
-                serde_json::from_str(r#"{"kind":"proof_state","filters":["02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"],"subId":"uno"}"#)
-                    .expect("valid json"),
-            )
-            .await.expect("valid subscription");
-
-        manager.broadcast(
-            ProofState {
-                y: PublicKey::from_hex(
-                    "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104",
-                )
-                .expect("valid pk"),
-                state: State::Pending,
-                witness: None,
-            }
-            .into(),
-        );
-
-        // no one is listening for this event
-        manager.broadcast(
-            ProofState {
-                y: PublicKey::from_hex(
-                    "020000000000000000000000000000000000000000000000000000000000000001",
-                )
-                .expect("valid pk"),
-                state: State::Pending,
-                witness: None,
-            }
-            .into(),
-        );
-
-        sleep(Duration::from_millis(10)).await;
-        let (sub1, msg) = subscription.try_recv().expect("valid message");
-        assert_eq!("uno", *sub1);
-        assert_eq!(
-            r#"{"Y":"02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104","state":"PENDING","witness":null}"#,
-            serde_json::to_string(&msg).expect("valid json")
-        );
-        assert!(subscription.try_recv().is_err());
-    }
-}

+ 0 - 12
crates/cdk/src/mint/subscription/mod.rs

@@ -1,12 +0,0 @@
-//! Specific Subscription for the cdk crate
-
-#[cfg(feature = "mint")]
-mod manager;
-#[cfg(feature = "mint")]
-mod on_subscription;
-#[cfg(feature = "mint")]
-pub use manager::PubSubManager;
-#[cfg(feature = "mint")]
-pub use on_subscription::OnSubscription;
-
-pub use crate::pub_sub::SubId;

+ 0 - 119
crates/cdk/src/mint/subscription/on_subscription.rs

@@ -1,119 +0,0 @@
-//! On Subscription
-//!
-//! This module contains the code that is triggered when a new subscription is created.
-
-use cdk_common::database::DynMintDatabase;
-use cdk_common::nut17::Notification;
-use cdk_common::pub_sub::OnNewSubscription;
-use cdk_common::quote_id::QuoteId;
-use cdk_common::{MintQuoteBolt12Response, NotificationPayload, PaymentMethod};
-
-use crate::nuts::{MeltQuoteBolt11Response, MintQuoteBolt11Response, ProofState, PublicKey};
-
-#[derive(Default)]
-/// Subscription Init
-///
-/// This struct triggers code when a new subscription is created.
-///
-/// It is used to send the initial state of the subscription to the client.
-pub struct OnSubscription(pub(crate) Option<DynMintDatabase>);
-
-#[async_trait::async_trait]
-impl OnNewSubscription for OnSubscription {
-    type Event = NotificationPayload<QuoteId>;
-    type Index = Notification;
-
-    async fn on_new_subscription(
-        &self,
-        request: &[&Self::Index],
-    ) -> Result<Vec<Self::Event>, String> {
-        let datastore = if let Some(localstore) = self.0.as_ref() {
-            localstore
-        } else {
-            return Ok(vec![]);
-        };
-
-        let mut to_return = vec![];
-        let mut public_keys: Vec<PublicKey> = Vec::new();
-        let mut melt_queries = Vec::new();
-        let mut mint_queries = Vec::new();
-
-        for idx in request.iter() {
-            match idx {
-                Notification::ProofState(pk) => public_keys.push(*pk),
-                Notification::MeltQuoteBolt11(uuid) => {
-                    melt_queries.push(datastore.get_melt_quote(uuid))
-                }
-                Notification::MintQuoteBolt11(uuid) => {
-                    mint_queries.push(datastore.get_mint_quote(uuid))
-                }
-                Notification::MintQuoteBolt12(uuid) => {
-                    mint_queries.push(datastore.get_mint_quote(uuid))
-                }
-                Notification::MeltQuoteBolt12(uuid) => {
-                    melt_queries.push(datastore.get_melt_quote(uuid))
-                }
-            }
-        }
-
-        if !melt_queries.is_empty() {
-            to_return.extend(
-                futures::future::try_join_all(melt_queries)
-                    .await
-                    .map(|quotes| {
-                        quotes
-                            .into_iter()
-                            .filter_map(|quote| quote.map(|x| x.into()))
-                            .map(|x: MeltQuoteBolt11Response<QuoteId>| x.into())
-                            .collect::<Vec<_>>()
-                    })
-                    .map_err(|e| e.to_string())?,
-            );
-        }
-
-        if !mint_queries.is_empty() {
-            to_return.extend(
-                futures::future::try_join_all(mint_queries)
-                    .await
-                    .map(|quotes| {
-                        quotes
-                            .into_iter()
-                            .filter_map(|quote| {
-                                quote.and_then(|x| match x.payment_method {
-                                    PaymentMethod::Bolt11 => {
-                                        let response: MintQuoteBolt11Response<QuoteId> = x.into();
-                                        Some(response.into())
-                                    }
-                                    PaymentMethod::Bolt12 => match x.try_into() {
-                                        Ok(response) => {
-                                            let response: MintQuoteBolt12Response<QuoteId> =
-                                                response;
-                                            Some(response.into())
-                                        }
-                                        Err(_) => None,
-                                    },
-                                    PaymentMethod::Custom(_) => None,
-                                })
-                            })
-                            .collect::<Vec<_>>()
-                    })
-                    .map_err(|e| e.to_string())?,
-            );
-        }
-
-        if !public_keys.is_empty() {
-            to_return.extend(
-                datastore
-                    .get_proofs_states(public_keys.as_slice())
-                    .await
-                    .map_err(|e| e.to_string())?
-                    .into_iter()
-                    .enumerate()
-                    .filter_map(|(idx, state)| state.map(|state| (public_keys[idx], state).into()))
-                    .map(|state: ProofState| state.into()),
-            );
-        }
-
-        Ok(to_return)
-    }
-}

+ 0 - 339
crates/cdk/src/pub_sub.rs

@@ -1,339 +0,0 @@
-//! Publish–subscribe pattern.
-//!
-//! This is a generic implementation for
-//! [NUT-17](<https://github.com/cashubtc/nuts/blob/main/17.md>) with a type
-//! agnostic Publish-subscribe manager.
-//!
-//! The manager has a method for subscribers to subscribe to events with a
-//! generic type that must be converted to a vector of indexes.
-//!
-//! Events are also generic that should implement the `Indexable` trait.
-use std::cmp::Ordering;
-use std::collections::{BTreeMap, HashSet};
-use std::fmt::Debug;
-use std::ops::{Deref, DerefMut};
-use std::sync::atomic::{self, AtomicUsize};
-use std::sync::Arc;
-
-pub use cdk_common::pub_sub::index::{Index, Indexable, SubscriptionGlobalId};
-use cdk_common::pub_sub::OnNewSubscription;
-pub use cdk_common::pub_sub::SubId;
-use tokio::sync::{mpsc, RwLock};
-use tokio::task::JoinHandle;
-
-type IndexTree<T, I> = Arc<RwLock<BTreeMap<Index<I>, mpsc::Sender<(SubId, T)>>>>;
-
-/// Default size of the remove channel
-pub const DEFAULT_REMOVE_SIZE: usize = 10_000;
-
-/// Default channel size for subscription buffering
-pub const DEFAULT_CHANNEL_SIZE: usize = 10;
-
-/// Subscription manager
-///
-/// This object keep track of all subscription listener and it is also
-/// responsible for broadcasting events to all listeners
-///
-/// The content of the notification is not relevant to this scope and it is up
-/// to the application, therefore the generic T is used instead of a specific
-/// type
-pub struct Manager<T, I, F>
-where
-    T: Indexable<Type = I> + Clone + Send + Sync + 'static,
-    I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
-{
-    indexes: IndexTree<T, I>,
-    on_new_subscription: Option<Arc<F>>,
-    unsubscription_sender: mpsc::Sender<(SubId, Vec<Index<I>>)>,
-    active_subscriptions: Arc<AtomicUsize>,
-    background_subscription_remover: Option<JoinHandle<()>>,
-}
-
-impl<T, I, F> Default for Manager<T, I, F>
-where
-    T: Indexable<Type = I> + Clone + Send + Sync + 'static,
-    I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
-{
-    fn default() -> Self {
-        let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE);
-        let active_subscriptions: Arc<AtomicUsize> = Default::default();
-        let storage: IndexTree<T, I> = Arc::new(Default::default());
-
-        Self {
-            background_subscription_remover: Some(tokio::spawn(Self::remove_subscription(
-                receiver,
-                storage.clone(),
-                active_subscriptions.clone(),
-            ))),
-            on_new_subscription: None,
-            unsubscription_sender: sender,
-            active_subscriptions,
-            indexes: storage,
-        }
-    }
-}
-
-impl<T, I, F> From<F> for Manager<T, I, F>
-where
-    T: Indexable<Type = I> + Clone + Send + Sync + 'static,
-    I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
-{
-    fn from(value: F) -> Self {
-        let mut manager: Self = Default::default();
-        manager.on_new_subscription = Some(Arc::new(value));
-        manager
-    }
-}
-
-impl<T, I, F> Manager<T, I, F>
-where
-    T: Indexable<Type = I> + Clone + Send + Sync + 'static,
-    I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
-{
-    #[inline]
-    /// Broadcast an event to all listeners
-    ///
-    /// This function takes an Arc to the storage struct, the event_id, the kind
-    /// and the vent to broadcast
-    async fn broadcast_impl(storage: &IndexTree<T, I>, event: T) {
-        let index_storage = storage.read().await;
-        let mut sent = HashSet::new();
-        for index in event.to_indexes() {
-            for (key, sender) in index_storage.range(index.clone()..) {
-                if index.cmp_prefix(key) != Ordering::Equal {
-                    break;
-                }
-                let sub_id = key.unique_id();
-                if sent.contains(&sub_id) {
-                    continue;
-                }
-                sent.insert(sub_id);
-                let _ = sender.try_send((key.into(), event.clone()));
-            }
-        }
-    }
-
-    /// Broadcasts an event to all listeners
-    ///
-    /// This public method will not block the caller, it will spawn a new task
-    /// instead
-    pub fn broadcast(&self, event: T) {
-        let storage = self.indexes.clone();
-        tokio::spawn(async move {
-            Self::broadcast_impl(&storage, event).await;
-        });
-    }
-
-    /// Broadcasts an event to all listeners
-    ///
-    /// This method is async and will await for the broadcast to be completed
-    pub async fn broadcast_async(&self, event: T) {
-        Self::broadcast_impl(&self.indexes, event).await;
-    }
-
-    /// Specific of the subscription, this is the abstraction between `subscribe` and `try_subscribe`
-    #[inline(always)]
-    async fn subscribe_inner(
-        &self,
-        sub_id: SubId,
-        indexes: Vec<Index<I>>,
-    ) -> ActiveSubscription<T, I> {
-        let (sender, receiver) = mpsc::channel(10);
-
-        let mut index_storage = self.indexes.write().await;
-        // Subscribe to events as soon as possible
-        for index in indexes.clone() {
-            index_storage.insert(index, sender.clone());
-        }
-        drop(index_storage);
-
-        if let Some(on_new_subscription) = self.on_new_subscription.clone() {
-            // After we're subscribed already, fetch the current status of matching events. It is
-            // down in another thread to return right away
-            let indexes_for_worker = indexes.clone();
-            let sub_id_for_worker = sub_id.clone();
-            tokio::spawn(async move {
-                match on_new_subscription
-                    .on_new_subscription(
-                        &indexes_for_worker
-                            .iter()
-                            .map(|x| x.deref())
-                            .collect::<Vec<_>>(),
-                    )
-                    .await
-                {
-                    Ok(events) => {
-                        for event in events {
-                            let _ = sender.try_send((sub_id_for_worker.clone(), event));
-                        }
-                    }
-                    Err(err) => {
-                        tracing::info!(
-                            "Failed to get initial state for subscription: {:?}, {}",
-                            sub_id_for_worker,
-                            err
-                        );
-                    }
-                }
-            });
-        }
-
-        self.active_subscriptions
-            .fetch_add(1, atomic::Ordering::Relaxed);
-
-        ActiveSubscription {
-            sub_id,
-            receiver,
-            indexes,
-            drop: self.unsubscription_sender.clone(),
-        }
-    }
-
-    /// Try to subscribe to a specific event
-    pub async fn try_subscribe<P>(&self, params: P) -> Result<ActiveSubscription<T, I>, P::Error>
-    where
-        P: AsRef<SubId> + TryInto<Vec<Index<I>>>,
-    {
-        Ok(self
-            .subscribe_inner(params.as_ref().clone(), params.try_into()?)
-            .await)
-    }
-
-    /// Subscribe to a specific event
-    pub async fn subscribe<P>(&self, params: P) -> ActiveSubscription<T, I>
-    where
-        P: AsRef<SubId> + Into<Vec<Index<I>>>,
-    {
-        self.subscribe_inner(params.as_ref().clone(), params.into())
-            .await
-    }
-
-    /// Return number of active subscriptions
-    pub fn active_subscriptions(&self) -> usize {
-        self.active_subscriptions.load(atomic::Ordering::SeqCst)
-    }
-
-    /// Task to remove dropped subscriptions from the storage struct
-    ///
-    /// This task will run in the background (and will be dropped when the [`Manager`]
-    /// is) and will remove subscriptions from the storage struct it is dropped.
-    async fn remove_subscription(
-        mut receiver: mpsc::Receiver<(SubId, Vec<Index<I>>)>,
-        storage: IndexTree<T, I>,
-        active_subscriptions: Arc<AtomicUsize>,
-    ) {
-        while let Some((sub_id, indexes)) = receiver.recv().await {
-            tracing::info!("Removing subscription: {}", *sub_id);
-
-            active_subscriptions.fetch_sub(1, atomic::Ordering::AcqRel);
-
-            let mut index_storage = storage.write().await;
-            for key in indexes {
-                index_storage.remove(&key);
-            }
-            drop(index_storage);
-        }
-    }
-}
-
-/// Manager goes out of scope, stop all background tasks
-impl<T, I, F> Drop for Manager<T, I, F>
-where
-    T: Indexable<Type = I> + Clone + Send + Sync + 'static,
-    I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
-{
-    fn drop(&mut self) {
-        if let Some(handler) = self.background_subscription_remover.take() {
-            handler.abort();
-        }
-    }
-}
-
-/// Active Subscription
-///
-/// This struct is a wrapper around the `mpsc::Receiver<Event>` and it also used
-/// to keep track of the subscription itself. When this struct goes out of
-/// scope, it will notify the Manager about it, so it can be removed from the
-/// list of active listeners
-pub struct ActiveSubscription<T, I>
-where
-    T: Send + Sync,
-    I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
-{
-    /// The subscription ID
-    pub sub_id: SubId,
-    indexes: Vec<Index<I>>,
-    receiver: mpsc::Receiver<(SubId, T)>,
-    drop: mpsc::Sender<(SubId, Vec<Index<I>>)>,
-}
-
-impl<T, I> Deref for ActiveSubscription<T, I>
-where
-    T: Send + Sync,
-    I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
-{
-    type Target = mpsc::Receiver<(SubId, T)>;
-
-    fn deref(&self) -> &Self::Target {
-        &self.receiver
-    }
-}
-
-impl<T, I> DerefMut for ActiveSubscription<T, I>
-where
-    T: Indexable + Clone + Send + Sync + 'static,
-    I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
-{
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.receiver
-    }
-}
-
-/// The ActiveSubscription is Drop out of scope, notify the Manager about it, so
-/// it can be removed from the list of active listeners
-///
-/// Having this in place, we can avoid memory leaks and also makes it super
-/// simple to implement the Unsubscribe method
-impl<T, I> Drop for ActiveSubscription<T, I>
-where
-    T: Send + Sync,
-    I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
-{
-    fn drop(&mut self) {
-        let _ = self
-            .drop
-            .try_send((self.sub_id.clone(), self.indexes.drain(..).collect()));
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use tokio::sync::mpsc;
-
-    use super::*;
-
-    #[test]
-    fn test_active_subscription_drop() {
-        let (tx, rx) = mpsc::channel::<(SubId, ())>(10);
-        let sub_id = SubId::from("test_sub_id");
-        let indexes: Vec<Index<String>> = vec![Index::from(("test".to_string(), sub_id.clone()))];
-        let (drop_tx, mut drop_rx) = mpsc::channel(10);
-
-        {
-            let _active_subscription = ActiveSubscription {
-                sub_id: sub_id.clone(),
-                indexes,
-                receiver: rx,
-                drop: drop_tx,
-            };
-            // When it goes out of scope, it should notify
-        }
-        assert_eq!(drop_rx.try_recv().unwrap().0, sub_id); // it should have notified
-        assert!(tx.try_send(("foo".into(), ())).is_err()); // subscriber is dropped
-    }
-}

+ 1 - 1
crates/cdk/src/wallet/subscription/http.rs

@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
+use cdk_common::subscription::SubId;
 use cdk_common::MintQuoteBolt12Response;
 use tokio::sync::{mpsc, RwLock};
 #[cfg(not(target_arch = "wasm32"))]
@@ -10,7 +11,6 @@ use web_time::Duration;
 use super::WsSubscriptionBody;
 use crate::nuts::nut17::Kind;
 use crate::nuts::{nut01, nut05, nut07, nut23, CheckStateRequest, NotificationPayload};
-use crate::pub_sub::SubId;
 use crate::wallet::MintConnector;
 use crate::Wallet;
 

+ 1 - 2
crates/cdk/src/wallet/subscription/mod.rs

@@ -9,7 +9,7 @@ use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use cdk_common::subscription::Params;
+use cdk_common::subscription::{Params, SubId};
 use tokio::sync::{mpsc, RwLock};
 use tokio::task::JoinHandle;
 use tracing::error;
@@ -18,7 +18,6 @@ use wasm_bindgen_futures;
 
 use super::Wallet;
 use crate::mint_url::MintUrl;
-use crate::pub_sub::SubId;
 use crate::wallet::MintConnector;
 
 mod http;

+ 1 - 2
crates/cdk/src/wallet/subscription/ws.rs

@@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
 use std::sync::atomic::AtomicUsize;
 use std::sync::Arc;
 
-use cdk_common::subscription::Params;
+use cdk_common::subscription::{Params, SubId};
 use cdk_common::ws::{WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscribeRequest};
 use futures::{SinkExt, StreamExt};
 use tokio::sync::{mpsc, RwLock};
@@ -12,7 +12,6 @@ use tokio_tungstenite::tungstenite::Message;
 use super::http::http_main;
 use super::WsSubscriptionBody;
 use crate::mint_url::MintUrl;
-use crate::pub_sub::SubId;
 use crate::wallet::MintConnector;
 use crate::Wallet;