Ver Fonte

Add `new balance` event for the wallet

Cesar Rodas há 2 semanas atrás
pai
commit
50b9f61b8e

+ 6 - 0
crates/cdk/src/pub_sub.rs

@@ -29,6 +29,7 @@ pub const DEFAULT_REMOVE_SIZE: usize = 10_000;
 /// Default channel size for subscription buffering
 pub const DEFAULT_CHANNEL_SIZE: usize = 10;
 
+#[derive(Debug)]
 /// Subscription manager
 ///
 /// This object keep track of all subscription listener and it is also
@@ -113,6 +114,11 @@ where
         }
     }
 
+    /// Returns the on_subscription_struct
+    pub fn on_new_subscription(&self) -> &Option<F> {
+        &self.on_new_subscription
+    }
+
     /// Broadcasts an event to all listeners
     ///
     /// This public method will not block the caller, it will spawn a new task

+ 2 - 0
crates/cdk/src/wallet/builder.rs

@@ -10,6 +10,7 @@ use cdk_common::AuthToken;
 #[cfg(feature = "auth")]
 use tokio::sync::RwLock;
 
+use super::events::EventStore;
 use crate::cdk_database::WalletDatabase;
 use crate::error::Error;
 use crate::mint_url::MintUrl;
@@ -156,6 +157,7 @@ impl WalletBuilder {
             xpriv,
             client: client.clone(),
             subscription: SubscriptionManager::new(client),
+            event_manager: Arc::new(EventStore::default().into()),
         })
     }
 }

+ 200 - 0
crates/cdk/src/wallet/events.rs

@@ -0,0 +1,200 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use cdk_common::mint_url::MintUrl;
+use cdk_common::pub_sub::index::{self, Indexable};
+use cdk_common::pub_sub::{OnNewSubscription, SubId};
+use cdk_common::{Amount, CurrencyUnit, State};
+use tokio::sync::RwLock;
+
+use super::{ProofsMethods, Wallet};
+use crate::pub_sub::{self, ActiveSubscription};
+
+/// The internal event is `()` because the events() will accept no filter, all events all sent to
+/// all subscriber with no option to filter.
+///
+/// To change this, change this type to an enum or something
+type EventFilter = ();
+
+/// Event types
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum Event {
+    Balance(MintUrl, CurrencyUnit, State, Amount),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, Hash)]
+pub enum EventId {
+    Balance(MintUrl, CurrencyUnit, State),
+}
+
+impl Event {
+    pub fn get_id(&self) -> EventId {
+        match self {
+            Event::Balance(url, currency_unit, state, _) => {
+                EventId::Balance(url.to_owned(), currency_unit.to_owned(), state.to_owned())
+            }
+        }
+    }
+}
+
+impl Indexable for Event {
+    type Index = EventFilter;
+
+    fn to_indexes(&self) -> Vec<index::Index<Self::Index>> {
+        vec![index::Index::from(())]
+    }
+}
+
+/// Keep in memory the latest events and send it over back to new subscribers
+#[derive(Debug, Default)]
+pub struct EventStore {
+    last_events: RwLock<HashMap<EventId, Event>>,
+}
+
+#[async_trait::async_trait]
+impl OnNewSubscription for EventStore {
+    type Event = Event;
+    type Index = EventFilter;
+
+    async fn on_new_subscription(
+        &self,
+        _request: &[&Self::Index],
+    ) -> Result<Vec<Self::Event>, String> {
+        Ok(self
+            .last_events
+            .read()
+            .await
+            .values()
+            .map(|x| x.to_owned())
+            .collect())
+    }
+}
+
+/// The event manager is an alias manager
+pub type EventManager = pub_sub::Manager<Event, EventStore>;
+
+/// An type to subscribe, meaningless until EventFilter is an enum
+#[derive(Debug, Default)]
+struct SubscribeToAllEvents {
+    inner: SubId,
+}
+
+impl AsRef<SubId> for SubscribeToAllEvents {
+    fn as_ref(&self) -> &SubId {
+        &self.inner
+    }
+}
+
+impl From<SubscribeToAllEvents> for Vec<index::Index<EventFilter>> {
+    fn from(_val: SubscribeToAllEvents) -> Self {
+        vec![().into()]
+    }
+}
+
+impl Wallet {
+    /// Internal function to trigger an event. This function is private and must be called from
+    /// within itself.
+    #[inline(always)]
+    async fn trigger_events(event_manager: Arc<EventManager>, events: Vec<Event>) {
+        let events = if let Some(event_store) = event_manager.on_new_subscription() {
+            let mut last_events = event_store.last_events.write().await;
+
+            events
+                .into_iter()
+                .filter_map(|event| {
+                    if let Some(previous) = last_events.insert(event.get_id(), event.clone()) {
+                        if previous == event {
+                            // do nothing
+                            return None;
+                        }
+                    }
+
+                    Some(event)
+                })
+                .collect()
+        } else {
+            events
+        };
+
+        events
+            .into_iter()
+            .for_each(|event| event_manager.broadcast(event));
+    }
+
+    /// Notify all balances, because it is likely it has changed
+    pub(crate) fn notify_update_balance(&self) {
+        let db = self.localstore.clone();
+        let event_manager = self.event_manager.clone();
+        let mint_url = self.mint_url.clone();
+        let unit = self.unit.clone();
+
+        if event_manager.active_subscriptions() == 0 {
+            // Do not query the db is there are no listeners
+            return;
+        }
+
+        tokio::spawn(async move {
+            let unspent = db.get_proofs(
+                Some(mint_url.clone()),
+                Some(unit.clone()),
+                Some(vec![State::Unspent]),
+                None,
+            );
+            let reserved = db.get_proofs(
+                Some(mint_url.clone()),
+                Some(unit.clone()),
+                Some(vec![State::Reserved]),
+                None,
+            );
+            let pending = db.get_proofs(
+                Some(mint_url.clone()),
+                Some(unit.clone()),
+                Some(vec![State::Pending]),
+                None,
+            );
+            let (unspent, reserved, pending) = tokio::join!(unspent, reserved, pending);
+
+            let events = vec![
+                unspent.map(|x| {
+                    x.into_iter()
+                        .map(|x| x.proof)
+                        .collect::<Vec<_>>()
+                        .total_amount()
+                        .map(|total| {
+                            Event::Balance(mint_url.clone(), unit.clone(), State::Unspent, total)
+                        })
+                }),
+                reserved.map(|x| {
+                    x.into_iter()
+                        .map(|x| x.proof)
+                        .collect::<Vec<_>>()
+                        .total_amount()
+                        .map(|total| {
+                            Event::Balance(mint_url.clone(), unit.clone(), State::Reserved, total)
+                        })
+                }),
+                pending.map(|x| {
+                    x.into_iter()
+                        .map(|x| x.proof)
+                        .collect::<Vec<_>>()
+                        .total_amount()
+                        .map(|total| {
+                            Event::Balance(mint_url.clone(), unit.clone(), State::Pending, total)
+                        })
+                }),
+            ]
+            .into_iter()
+            .filter_map(|event| event.ok()?.ok())
+            .collect();
+
+            Self::trigger_events(event_manager.clone(), events).await;
+        });
+    }
+
+    /// Subscribe to wallet events
+    pub async fn events(&self) -> ActiveSubscription<Event, EventFilter> {
+        self.event_manager
+            .subscribe(SubscribeToAllEvents::default())
+            .await
+    }
+}

+ 2 - 0
crates/cdk/src/wallet/melt.rs

@@ -137,6 +137,8 @@ impl Wallet {
             .update_proofs_state(ys, State::Pending)
             .await?;
 
+        self.notify_update_balance();
+
         let active_keyset_id = self.get_active_mint_keyset().await?.id;
 
         let count = self

+ 4 - 1
crates/cdk/src/wallet/mod.rs

@@ -7,6 +7,7 @@ use std::sync::Arc;
 use bitcoin::bip32::Xpriv;
 use cdk_common::database::{self, WalletDatabase};
 use cdk_common::subscription::Params;
+use events::EventManager;
 use getrandom::getrandom;
 use subscription::{ActiveSubscription, SubscriptionManager};
 #[cfg(feature = "auth")]
@@ -26,14 +27,15 @@ use crate::nuts::{
 };
 use crate::types::ProofInfo;
 use crate::util::unix_time;
+use crate::Amount;
 #[cfg(feature = "auth")]
 use crate::OidcClient;
-use crate::Amount;
 
 #[cfg(feature = "auth")]
 mod auth;
 mod balance;
 mod builder;
+mod events;
 mod keysets;
 mod melt;
 mod mint;
@@ -81,6 +83,7 @@ pub struct Wallet {
     xpriv: Xpriv,
     client: Arc<dyn MintConnector + Send + Sync>,
     subscription: SubscriptionManager,
+    event_manager: Arc<EventManager>,
 }
 
 const ALPHANUMERIC: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

+ 6 - 3
crates/cdk/src/wallet/proofs.rs

@@ -63,10 +63,13 @@ impl Wallet {
     /// Return proofs to unspent allowing them to be selected and spent
     #[instrument(skip(self))]
     pub async fn unreserve_proofs(&self, ys: Vec<PublicKey>) -> Result<(), Error> {
-        Ok(self
-            .localstore
+        self.localstore
             .update_proofs_state(ys, State::Unspent)
-            .await?)
+            .await?;
+
+        self.notify_update_balance();
+
+        Ok(())
     }
 
     /// Reclaim unspent proofs

+ 6 - 0
crates/cdk/src/wallet/send.rs

@@ -156,6 +156,8 @@ impl Wallet {
             .update_proofs_state(proofs.ys()?, State::Reserved)
             .await?;
 
+        self.notify_update_balance();
+
         // Check if proofs are exact send amount
         let proofs_exact_amount = proofs.total_amount()? == amount + send_fee;
 
@@ -273,6 +275,8 @@ impl Wallet {
             .update_proofs_state(proofs_to_send.ys()?, State::PendingSpent)
             .await?;
 
+        self.notify_update_balance();
+
         // Include token memo
         let send_memo = send.options.memo.or(memo);
         let memo = send_memo.and_then(|m| if m.include_memo { Some(m.memo) } else { None });
@@ -320,6 +324,8 @@ impl Wallet {
             .update_proofs_state(send.proofs().ys()?, State::Unspent)
             .await?;
 
+        self.notify_update_balance();
+
         Ok(())
     }
 }

+ 2 - 0
crates/cdk/src/wallet/swap.rs

@@ -213,6 +213,8 @@ impl Wallet {
             .update_proofs_state(ys, State::Reserved)
             .await?;
 
+        self.notify_update_balance();
+
         let fee = self.get_proofs_fee(&proofs).await?;
 
         let change_amount: Amount = proofs_total - amount.unwrap_or(Amount::ZERO) - fee;