Ver Fonte

Refactor KeyManager for WASM compatibility and improved caching

This commit refactors the wallet's `KeyManager` to be WASM-compatible by
introducing a platform-agnostic task spawning abstraction and fundamentally
redesigning the key fetching and caching strategy. The changes improve
reliability, reduce redundant HTTP requests, and enable proper WASM support.

- **New `is_ready` field**: Added to `MintKeyCache` to track whether the cache
  has been initialized
- **Purpose**: Prevents returning stale/empty data and enables proper
  wait-for-ready semantics

- **Removed `unit` field** from `MintRegistration`
- **Impact**: KeyManager now manages all keysets for a mint regardless of currency unit

- **New `load_keyset_from_db_or_http` method**: Tries all registered databases
  before falling back to HTTP
- **Changed behavior**: When fetching keys from HTTP, the KeyManager now checks
  databases first across all registered stores
- **Benefit**: Reduces redundant HTTP requests when multiple components share
  the same mint but have different storage backends

- **Removed synchronous methods**: `get_keys_sync`, `get_keyset_by_id_sync`,
  `get_keysets_sync`
- **Unified async-only API**: All getters now have consistent async behavior
  with automatic waiting for cache readiness
- **Retry logic**: All getters use `MAX_RETRY` (50) with `RETRY_SLEEP` (100ms)
  to wait for cache initialization
- **Cleaner semantics**: No more "maybe cached, maybe not" confusion

- **New `sync_mint_task`**: Handles database-first loading without HTTP
  semaphore
- **Renamed `refresh_mint_task` → `fetch_and_sync_mint_task`**: Clarifies that
  it handles both HTTP fetch and database sync
- **All spawns use `task::spawn`**: WASM-compatible throughout
Cesar Rodas há 3 semanas atrás
pai
commit
d3da875139

+ 28 - 0
crates/cdk-common/src/lib.rs

@@ -8,6 +8,33 @@
 #![warn(missing_docs)]
 #![warn(rustdoc::bare_urls)]
 
+pub mod task {
+    //! Thin wrapper for spawn and spawn_local for native and wasm.
+    use std::future::Future;
+
+    use tokio::task::JoinHandle;
+
+    /// Spawns a new asynchronous task returning nothing
+    #[cfg(not(target_arch = "wasm32"))]
+    pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        tokio::spawn(future)
+    }
+
+    /// Spawns a new asynchronous task returning nothing
+    #[cfg(target_arch = "wasm32")]
+    pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
+    where
+        F: Future + 'static,
+        F::Output: 'static,
+    {
+        tokio::task::spawn_local(future)
+    }
+}
+
 pub mod common;
 pub mod database;
 pub mod error;
@@ -24,6 +51,7 @@ pub mod subscription;
 #[cfg(feature = "wallet")]
 pub mod wallet;
 pub mod ws;
+
 // re-exporting external crates
 pub use bitcoin;
 pub use cashu::amount::{self, Amount};

+ 3 - 14
crates/cdk-common/src/pub_sub/pubsub.rs

@@ -10,6 +10,7 @@ use tokio::sync::mpsc;
 
 use super::subscriber::{ActiveSubscription, SubscriptionRequest};
 use super::{Error, Event, Spec, Subscriber};
+use crate::task::spawn;
 
 /// Default channel size for subscription buffering
 pub const DEFAULT_CHANNEL_SIZE: usize = 10_000;
@@ -92,13 +93,7 @@ where
         let topics = self.listeners_topics.clone();
         let event = event.into();
 
-        #[cfg(not(target_arch = "wasm32"))]
-        tokio::spawn(async move {
-            let _ = Self::publish_internal(event, &topics);
-        });
-
-        #[cfg(target_arch = "wasm32")]
-        wasm_bindgen_futures::spawn_local(async move {
+        spawn(async move {
             let _ = Self::publish_internal(event, &topics);
         });
     }
@@ -150,17 +145,11 @@ where
         let inner = self.inner.clone();
         let subscribed_to_for_spawn = subscribed_to.clone();
 
-        #[cfg(not(target_arch = "wasm32"))]
-        tokio::spawn(async move {
+        spawn(async move {
             // TODO: Ignore topics broadcasted from fetch_events _if_ any real time has been broadcasted already.
             inner.fetch_events(subscribed_to_for_spawn, sender).await;
         });
 
-        #[cfg(target_arch = "wasm32")]
-        wasm_bindgen_futures::spawn_local(async move {
-            inner.fetch_events(subscribed_to_for_spawn, sender).await;
-        });
-
         Ok(ActiveSubscription::new(
             subscription_internal_id,
             subscription_name,

+ 2 - 8
crates/cdk-common/src/pub_sub/remote_consumer.rs

@@ -12,6 +12,7 @@ use tokio::time::{sleep, Instant};
 
 use super::subscriber::{ActiveSubscription, SubscriptionRequest};
 use super::{Error, Event, Pubsub, Spec};
+use crate::task::spawn;
 
 const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
 
@@ -21,9 +22,6 @@ const INTERNAL_POLL_SIZE: usize = 1_000;
 
 const POLL_SLEEP: Duration = Duration::from_millis(2_000);
 
-#[cfg(target_arch = "wasm32")]
-use wasm_bindgen_futures;
-
 struct UniqueSubscription<S>
 where
     S: Spec,
@@ -157,11 +155,7 @@ where
             still_running: true.into(),
         });
 
-        #[cfg(not(target_arch = "wasm32"))]
-        tokio::spawn(Self::stream(this.clone()));
-
-        #[cfg(target_arch = "wasm32")]
-        wasm_bindgen_futures::spawn_local(Self::stream(this.clone()));
+        spawn(Self::stream(this.clone()));
 
         this
     }

+ 13 - 1
crates/cdk/src/wallet/auth/auth_wallet.rs

@@ -216,7 +216,19 @@ impl AuthWallet {
     pub async fn refresh_keysets(&self) -> Result<Vec<KeySetInfo>, Error> {
         tracing::debug!("Refreshing auth keysets via KeyManager");
 
-        self.key_manager.refresh(&self.mint_url).await
+        let auth_keysets = self
+            .key_manager
+            .refresh(&self.mint_url)
+            .await?
+            .into_iter()
+            .filter(|k| k.unit == CurrencyUnit::Auth && k.active)
+            .collect::<Vec<_>>();
+
+        if !auth_keysets.is_empty() {
+            Ok(auth_keysets)
+        } else {
+            Err(Error::UnknownKeySet)
+        }
     }
 
     /// Get the first active blind auth keyset - always goes online

+ 2 - 6
crates/cdk/src/wallet/builder.rs

@@ -178,12 +178,8 @@ impl WalletBuilder {
         };
 
         let key_manager = self.key_manager.unwrap_or_else(KeyManager::new);
-        let key_sub_id = key_manager.register_mint(
-            mint_url.clone(),
-            unit.clone(),
-            localstore.clone(),
-            client.clone(),
-        );
+        let key_sub_id =
+            key_manager.register_mint(mint_url.clone(), localstore.clone(), client.clone());
 
         Ok(Wallet {
             mint_url,

+ 222 - 153
crates/cdk/src/wallet/key_manager.rs

@@ -27,8 +27,9 @@ use std::time::{Duration, Instant};
 use arc_swap::ArcSwap;
 use cdk_common::database::{self, WalletDatabase};
 use cdk_common::mint_url::MintUrl;
-use cdk_common::nuts::{CurrencyUnit, KeySetInfo, Keys};
+use cdk_common::nuts::{KeySetInfo, Keys};
 use cdk_common::parking_lot::{Mutex as ParkingLotMutex, RwLock as ParkingLotRwLock};
+use cdk_common::task::spawn;
 use cdk_common::util::unix_time;
 use cdk_common::{KeySet, MintInfo};
 use tokio::sync::{mpsc, Semaphore};
@@ -49,6 +50,9 @@ const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
 /// Maximum concurrent HTTP requests to mint servers
 const MAX_CONCURRENT_HTTP_REQUESTS: usize = 5;
 
+const MAX_RETRY: usize = 50;
+const RETRY_SLEEP: Duration = Duration::from_millis(100);
+
 /// Manages refresh scheduling for mints
 ///
 /// Tracks when each mint should be refreshed next. Uses BTreeMap for efficient
@@ -116,6 +120,11 @@ pub enum RefreshMessage {
     /// Stop the refresh task
     Stop,
 
+    /// Make sure the mint is loaded, from either any localstore or remotely.
+    ///
+    /// This function also make sure all stores have a copy of the mint info and keys
+    SyncMint(MintUrl),
+
     /// Fetch keys for a specific mint immediately
     FetchMint(MintUrl),
 }
@@ -126,6 +135,9 @@ pub enum RefreshMessage {
 /// The `refresh_version` increments on each update to detect when cache has changed.
 #[derive(Clone, Debug)]
 struct MintKeyCache {
+    /// If the cache is ready
+    is_ready: bool,
+
     /// Mint info from server
     mint_info: Option<MintInfo>,
 
@@ -148,6 +160,7 @@ struct MintKeyCache {
 impl MintKeyCache {
     fn empty() -> Self {
         Self {
+            is_ready: false,
             mint_info: None,
             keysets_by_id: HashMap::new(),
             active_keysets: Vec::new(),
@@ -181,9 +194,6 @@ struct MintRegistration {
     /// Mint URL
     mint_url: MintUrl,
 
-    /// Currency unit
-    unit: CurrencyUnit,
-
     /// External resources (storage + client)
     resources: Arc<ParkingLotRwLock<HashMap<usize, MintResources>>>,
 
@@ -195,7 +205,6 @@ impl std::fmt::Debug for MintRegistration {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("MintRegistration")
             .field("mint_url", &self.mint_url)
-            .field("unit", &self.unit)
             .field("resources", &"<MintResources>")
             .field("cache", &self.cache)
             .finish()
@@ -288,7 +297,7 @@ impl KeyManager {
 
         let mints = manager.mints.clone();
         let refresh_interval = manager.refresh_interval;
-        let task = tokio::spawn(async move {
+        let task = spawn(async move {
             Self::refresh_loop(rx, mints, refresh_interval, refresh_semaphore).await;
         });
 
@@ -315,11 +324,10 @@ impl KeyManager {
     pub fn register_mint(
         &self,
         mint_url: MintUrl,
-        unit: CurrencyUnit,
         storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
         client: Arc<dyn MintConnector + Send + Sync>,
     ) -> Arc<KeySubscription> {
-        debug!("Registering mint: {} ({})", mint_url, unit);
+        debug!("Registering mint: {}", mint_url);
         let mut mints = self.mints.write();
 
         let mint = mints.entry(mint_url.clone()).or_insert_with(|| {
@@ -327,7 +335,6 @@ impl KeyManager {
 
             MintRegistration {
                 mint_url: mint_url.clone(),
-                unit,
                 resources: Arc::new(ParkingLotRwLock::new(HashMap::new())),
                 cache,
             }
@@ -337,12 +344,6 @@ impl KeyManager {
             .counter
             .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
 
-        tokio::spawn(Self::persist_cache_db(
-            storage.clone(),
-            mint_url.clone(),
-            mint.cache.load().clone(),
-        ));
-
         #[cfg(feature = "auth")]
         let mint_resource = MintResources {
             storage,
@@ -359,7 +360,7 @@ impl KeyManager {
 
         debug!("Mint registered: {}", mint_url);
 
-        self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
+        self.send_message(RefreshMessage::SyncMint(mint_url.clone()));
 
         Arc::new(KeySubscription {
             mints: self.mints.clone(),
@@ -390,124 +391,177 @@ impl KeyManager {
     /// Returns keys from cache if available. If not cached, triggers a refresh
     /// and waits up to 2 seconds for the keys to arrive.
     pub async fn get_keys(&self, mint_url: &MintUrl, keyset_id: &Id) -> Result<Arc<Keys>, Error> {
-        if let Ok(keys) = self.get_keys_sync(mint_url, keyset_id) {
-            return Ok(keys.clone());
-        }
-
-        tracing::debug!(
-            "Keyset {} not in cache, triggering refresh for mint {}",
-            keyset_id,
-            mint_url
-        );
-
-        self.refresh_now(mint_url);
+        let shared_cache = {
+            let mints = self.mints.read();
+            let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+            registration.cache.clone()
+        };
 
-        for _ in 0..20 {
-            if let Ok(keys) = self.get_keys_sync(mint_url, keyset_id) {
-                return Ok(keys.clone());
+        for _ in 0..MAX_RETRY {
+            let cache = shared_cache.load();
+            if cache.is_ready {
+                return cache
+                    .keys_by_id
+                    .get(keyset_id)
+                    .cloned()
+                    .ok_or(Error::UnknownKeySet);
             }
-            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+            self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
+            tokio::time::sleep(RETRY_SLEEP).await;
         }
 
         Err(Error::UnknownKeySet)
     }
 
-    /// Get keys for a keyset (cache-only, no refresh)
-    ///
-    /// Returns keys immediately from cache. Returns error if not cached.
-    /// Does not trigger any background fetches.
-    pub fn get_keys_sync(&self, mint_url: &MintUrl, keyset_id: &Id) -> Result<Arc<Keys>, Error> {
-        let mints = self.mints.read();
-        let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
-        let cache = registration.cache.load();
-        cache
-            .keys_by_id
-            .get(keyset_id)
-            .cloned()
-            .ok_or(Error::UnknownKeySet)
-    }
-
     /// Get keyset info by ID (cache-first with automatic refresh)
     pub async fn get_keyset_by_id(
         &self,
         mint_url: &MintUrl,
         keyset_id: &Id,
     ) -> Result<Arc<KeySetInfo>, Error> {
-        if let Ok(keysets) = self.get_keyset_by_id_sync(mint_url, keyset_id) {
-            return Ok(keysets);
+        let shared_cache = {
+            let mints = self.mints.read();
+            let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+            registration.cache.clone()
+        };
+
+        for _ in 0..MAX_RETRY {
+            let cache = shared_cache.load();
+            if cache.is_ready {
+                return cache
+                    .keysets_by_id
+                    .get(keyset_id)
+                    .cloned()
+                    .ok_or(Error::UnknownKeySet);
+            }
+            self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
+            tokio::time::sleep(RETRY_SLEEP).await;
         }
 
-        self.refresh_now(mint_url);
+        Err(Error::UnknownKeySet)
+    }
+
+    /// Get all keysets for a mint (cache-first with automatic refresh)
+    pub async fn get_keysets(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
+        let shared_cache = {
+            let mints = self.mints.read();
+            let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+            registration.cache.clone()
+        };
 
-        for _ in 0..20 {
-            if let Ok(keysets) = self.get_keyset_by_id_sync(mint_url, keyset_id) {
-                return Ok(keysets);
+        for _ in 0..MAX_RETRY {
+            let cache = shared_cache.load();
+            if cache.is_ready {
+                let keysets: Vec<KeySetInfo> = cache
+                    .keysets_by_id
+                    .values()
+                    .map(|ks| (**ks).clone())
+                    .collect();
+                return if keysets.is_empty() {
+                    Err(Error::UnknownKeySet)
+                } else {
+                    Ok(keysets)
+                };
             }
-            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+
+            self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
+            tokio::time::sleep(RETRY_SLEEP).await;
         }
 
         Err(Error::UnknownKeySet)
     }
 
-    /// Get keyset info by ID (cache-only, no refresh)
-    pub fn get_keyset_by_id_sync(
+    /// Get all active keysets for a mint (cache-only, no refresh)
+    pub async fn get_active_keysets(
         &self,
         mint_url: &MintUrl,
-        keyset_id: &Id,
-    ) -> Result<Arc<KeySetInfo>, Error> {
-        let mints = self.mints.read();
-        let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
-        let cache = registration.cache.load();
-        cache
-            .keysets_by_id
-            .get(keyset_id)
-            .cloned()
-            .ok_or(Error::UnknownKeySet)
-    }
+    ) -> Result<Vec<Arc<KeySetInfo>>, Error> {
+        let shared_cache = {
+            let mints = self.mints.read();
+            let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+            registration.cache.clone()
+        };
 
-    /// Get all keysets for a mint (cache-first with automatic refresh)
-    pub async fn get_keysets(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
-        if let Ok(keysets) = self.get_keysets_sync(mint_url) {
-            return Ok(keysets);
+        for _ in 0..MAX_RETRY {
+            let cache = shared_cache.load();
+            if cache.is_ready {
+                return Ok(cache.active_keysets.clone());
+            }
+            self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
+            tokio::time::sleep(RETRY_SLEEP).await;
         }
 
-        self.refresh_now(mint_url);
+        Err(Error::UnknownKeySet)
+    }
+
+    /// Load a specific keyset from database or HTTP
+    ///
+    /// First checks all registered databases for the keyset. If not found,
+    /// fetches from the mint server via HTTP and persists to all databases.
+    async fn load_keyset_from_db_or_http(
+        registration: &MintRegistration,
+        keyset_id: &Id,
+    ) -> Result<KeySet, Error> {
+        let storages = registration
+            .resources
+            .read()
+            .values()
+            .map(|resource| resource.storage.clone())
+            .collect::<Vec<_>>();
+
+        // Try database first
+        for storage in &storages {
+            if let Some(keys) = storage.get_keys(keyset_id).await? {
+                debug!(
+                    "Loaded keyset {} from database for {}",
+                    keyset_id, registration.mint_url
+                );
 
-        for _ in 0..20 {
-            if let Ok(keysets) = self.get_keysets_sync(mint_url) {
-                return Ok(keysets);
+                // Get keyset info to construct KeySet
+                if let Some(keyset_info) = storage.get_keyset_by_id(keyset_id).await? {
+                    return Ok(KeySet {
+                        id: *keyset_id,
+                        unit: keyset_info.unit,
+                        final_expiry: keyset_info.final_expiry,
+                        keys,
+                    });
+                }
             }
-            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
         }
 
-        Err(Error::UnknownKeySet)
-    }
+        // Not in database, fetch from HTTP
+        debug!(
+            "Keyset {} not in database, fetching from mint server for {}",
+            keyset_id, registration.mint_url
+        );
 
-    /// Get all keysets for a mint (cache-only, no refresh)
-    pub fn get_keysets_sync(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
-        let mints = self.mints.read();
-        let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
-        let cache = registration.cache.load();
-        let keysets: Vec<KeySetInfo> = cache
-            .keysets_by_id
+        let http_client = registration
+            .resources
+            .read()
             .values()
-            .map(|ks| (**ks).clone())
-            .collect();
-        if keysets.is_empty() {
-            Err(Error::UnknownKeySet)
-        } else {
-            Ok(keysets)
+            .next()
+            .ok_or(Error::IncorrectMint)?
+            .client
+            .clone();
+
+        let keyset = http_client.get_mint_keyset(*keyset_id).await?;
+
+        // Persist to all databases
+        for storage in &storages {
+            let _ = storage.add_keys(keyset.clone()).await.inspect_err(|e| {
+                warn!(
+                    "Failed to persist keyset {} for {}: {}",
+                    keyset_id, registration.mint_url, e
+                )
+            });
         }
-    }
 
-    /// Get all active keysets for a mint (cache-only, no refresh)
-    pub fn get_active_keysets(&self, mint_url: &MintUrl) -> Vec<Arc<KeySetInfo>> {
-        let mints = self.mints.read();
-        let Some(registration) = mints.get(mint_url) else {
-            return Vec::new();
-        };
-        let cache = registration.cache.load();
-        cache.active_keysets.clone()
+        debug!(
+            "Loaded keyset {} from HTTP for {}",
+            keyset_id, registration.mint_url
+        );
+
+        Ok(keyset)
     }
 
     /// Load mint info and keys from all registered databases
@@ -549,7 +603,7 @@ impl KeyManager {
                     .keysets_by_id
                     .insert(keyset.id, arc_keyset.clone());
 
-                if keyset.active && keyset.unit == registration.unit {
+                if keyset.active {
                     storage_cache.active_keysets.push(arc_keyset);
                 }
             }
@@ -566,7 +620,8 @@ impl KeyManager {
         }
 
         let keys_count = storage_cache.keys_by_id.len();
-        storage_cache.refresh_version = storage_cache.refresh_version + 1;
+        storage_cache.refresh_version += 1;
+        storage_cache.is_ready = true;
 
         let storage_cache = Arc::new(storage_cache);
         registration.cache.store(storage_cache.clone());
@@ -590,6 +645,10 @@ impl KeyManager {
         mint_url: MintUrl,
         new_cache: Arc<MintKeyCache>,
     ) {
+        if !new_cache.is_ready {
+            return;
+        }
+
         if new_cache.mint_info.is_some() {
             let _ = storage
                 .add_mint(mint_url.clone(), new_cache.mint_info.clone())
@@ -651,7 +710,7 @@ impl KeyManager {
             .collect::<Vec<_>>();
 
         for storage in storages {
-            tokio::spawn(Self::persist_cache_db(
+            spawn(Self::persist_cache_db(
                 storage,
                 registration.mint_url.clone(),
                 new_cache.clone(),
@@ -728,14 +787,19 @@ impl KeyManager {
                 .keysets_by_id
                 .insert(keyset_info.id, arc_keyset.clone());
 
-            if keyset_info.active && keyset_info.unit == registration.unit {
+            if keyset_info.active {
                 new_cache.active_keysets.push(arc_keyset);
             }
 
-            if let Ok(keyset) = http_client
-                .get_mint_keyset(keyset_info.id)
+            // Try to load keyset from database first, then HTTP
+            if let Ok(keyset) = Self::load_keyset_from_db_or_http(&registration, &keyset_info.id)
                 .await
-                .inspect_err(|e| warn!("Failed to fetch keys for keyset {}: {}", keyset_info.id, e))
+                .inspect_err(|e| {
+                    warn!(
+                        "Failed to load keyset {} for {}: {}",
+                        keyset_info.id, registration.mint_url, e
+                    )
+                })
             {
                 let keys = Arc::new(keyset.keys.clone());
                 new_cache.keys_by_id.insert(keyset_info.id, keys);
@@ -747,6 +811,7 @@ impl KeyManager {
         let old_generation = registration.cache.load().refresh_version;
         new_cache.mint_info = Some(mint_info);
         new_cache.refresh_version = old_generation + 1;
+        new_cache.is_ready = true;
         new_cache.last_refresh = Instant::now();
 
         debug!(
@@ -764,17 +829,42 @@ impl KeyManager {
         Ok::<(), Error>(())
     }
 
+    fn sync_mint_task(registration: MintRegistration, refresh_scheduler: RefreshScheduler) {
+        spawn(async move {
+            if !registration.cache.load().is_ready {
+                let _ = Self::fetch_mint_info_and_keys_from_db(&registration)
+                    .await
+                    .inspect_err(|e| {
+                        warn!(
+                            "Failed to load keys from storage for {}: {}",
+                            registration.mint_url, e
+                        )
+                    });
+            }
+
+            if !registration.cache.load().is_ready {
+                let mint_url = registration.mint_url.clone();
+                let _ = tokio::time::timeout(
+                    Duration::from_secs(60),
+                    Self::fetch_from_http(registration, refresh_scheduler),
+                )
+                .await
+                .inspect_err(|e| warn!("Failed to fetch keys for {} with error {}", mint_url, e));
+            }
+        });
+    }
+
     /// Refresh keys from mint server
     ///
     /// Spawns an async task with 60s timeout. HTTP requests are limited to
     /// MAX_CONCURRENT_HTTP_REQUESTS concurrent requests via semaphore.
-    fn refresh_mint_task(
+    fn fetch_and_sync_mint_task(
         registration: MintRegistration,
         semaphore: Arc<Semaphore>,
         refresh_scheduler: RefreshScheduler,
     ) {
-        tokio::spawn(async move {
-            if registration.cache.load().keysets_by_id.is_empty() {
+        spawn(async move {
+            if !registration.cache.load().is_ready {
                 let _ = Self::fetch_mint_info_and_keys_from_db(&registration)
                     .await
                     .inspect_err(|e| {
@@ -802,9 +892,8 @@ impl KeyManager {
 
             let mint_url = registration.mint_url.clone();
 
-            let timeout = Duration::from_secs(60);
             let result = tokio::time::timeout(
-                timeout,
+                Duration::from_secs(60),
                 Self::fetch_from_http(registration, refresh_scheduler),
             )
             .await;
@@ -861,6 +950,19 @@ impl KeyManager {
                             debug!("Stopping refresh task");
                             break;
                         }
+                        RefreshMessage::SyncMint(mint_url) => {
+                            debug!("Sync all instances of {}", mint_url);
+                            let registration = {
+                                let mints_lock = mints.read();
+                                mints_lock.get(&mint_url).cloned()
+                            };
+
+                            if let Some(reg) = registration {
+                                Self::sync_mint_task(reg, refresh_scheduler.clone());
+                            } else {
+                                warn!("FetchMint: Mint not registered: {}", mint_url);
+                            }
+                        }
                         RefreshMessage::FetchMint(mint_url) => {
                             debug!("FetchMint message received for {}", mint_url);
                             let registration = {
@@ -869,7 +971,7 @@ impl KeyManager {
                             };
 
                             if let Some(reg) = registration {
-                                Self::refresh_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
+                                Self::fetch_and_sync_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
                             } else {
                                 warn!("FetchMint: Mint not registered: {}", mint_url);
                             }
@@ -893,7 +995,7 @@ impl KeyManager {
                         };
 
                         if let Some(reg) = registration {
-                            Self::refresh_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
+                            Self::fetch_and_sync_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
                         } else {
                             warn!("Mint no longer registered: {}", mint_url);
                         }
@@ -910,20 +1012,19 @@ impl KeyManager {
     /// Sends a refresh message to the background task and waits up to 2 seconds
     /// for the cache to be updated with a newer version.
     pub async fn refresh(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
-        let last_version = {
+        let shared_cache = {
             let mints = self.mints.read();
             let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
-            let cache = registration.cache.load();
-            cache.refresh_version
+            registration.cache.clone()
         };
 
+        let last_version = shared_cache.load().refresh_version;
+
         self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
 
-        for _ in 0..200 {
+        for _ in 0..MAX_RETRY {
             if let Some(keysets) = {
-                let mints = self.mints.read();
-                let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
-                let cache = registration.cache.load();
+                let cache = shared_cache.load();
                 if last_version > 0 || cache.refresh_version > 0 {
                     Some(
                         cache
@@ -939,7 +1040,7 @@ impl KeyManager {
                 return Ok(keysets);
             }
 
-            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+            tokio::time::sleep(RETRY_SLEEP).await;
         }
 
         Err(Error::UnknownKeySet)
@@ -949,38 +1050,6 @@ impl KeyManager {
     pub fn refresh_now(&self, mint_url: &MintUrl) {
         self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
     }
-
-    /// Get number of cached keys for a mint
-    pub fn cached_keys_count(&self, mint_url: &MintUrl) -> usize {
-        let mints = self.mints.read();
-        let Some(registration) = mints.get(mint_url) else {
-            return 0;
-        };
-        registration.cache.load().keys_by_id.len()
-    }
-
-    /// Get number of cached keysets for a mint
-    pub fn cached_keysets_count(&self, mint_url: &MintUrl) -> usize {
-        let mints = self.mints.read();
-        let Some(registration) = mints.get(mint_url) else {
-            return 0;
-        };
-        registration.cache.load().keysets_by_id.len()
-    }
-
-    /// Get number of available HTTP request slots
-    pub fn available_http_slots(&self) -> usize {
-        self.refresh_semaphore.available_permits()
-    }
-
-    /// Invalidate cache for a specific mint
-    pub fn invalidate(&self, mint_url: &MintUrl) {
-        let mints = self.mints.read();
-        if let Some(registration) = mints.get(mint_url) {
-            registration.cache.store(Arc::new(MintKeyCache::empty()));
-            debug!("Cache invalidated for {}", mint_url);
-        }
-    }
 }
 
 impl Drop for KeyManager {

+ 28 - 3
crates/cdk/src/wallet/keysets.rs

@@ -2,6 +2,7 @@ use std::collections::HashMap;
 
 use cdk_common::amount::{FeeAndAmounts, KeysetFeeAndAmounts};
 use cdk_common::nut02::{KeySetInfos, KeySetInfosMethods};
+use cdk_common::CurrencyUnit;
 use tracing::instrument;
 
 use crate::nuts::{Id, KeySetInfo, Keys};
@@ -39,7 +40,19 @@ impl Wallet {
     /// offline and rely on previously cached keyset data.
     #[instrument(skip(self))]
     pub async fn get_mint_keysets(&self) -> Result<Vec<KeySetInfo>, Error> {
-        self.key_manager.get_keysets(&self.mint_url).await
+        let keysets = self
+            .key_manager
+            .get_keysets(&self.mint_url)
+            .await?
+            .into_iter()
+            .filter(|k| k.unit != CurrencyUnit::Auth)
+            .collect::<Vec<_>>();
+
+        if !keysets.is_empty() {
+            Ok(keysets)
+        } else {
+            Err(Error::UnknownKeySet)
+        }
     }
 
     /// Refresh keysets by fetching the latest from mint - always goes online
@@ -51,7 +64,19 @@ impl Wallet {
     pub async fn refresh_keysets(&self) -> Result<KeySetInfos, Error> {
         tracing::debug!("Refreshing keysets via KeyManager");
 
-        self.key_manager.refresh(&self.mint_url).await
+        let keysets = self
+            .key_manager
+            .refresh(&self.mint_url)
+            .await?
+            .into_iter()
+            .filter(|k| self.unit == k.unit && k.active)
+            .collect::<Vec<_>>();
+
+        if !keysets.is_empty() {
+            Ok(keysets)
+        } else {
+            Err(Error::UnknownKeySet)
+        }
     }
 
     /// Get the active keyset with the lowest fees - always goes online
@@ -76,7 +101,7 @@ impl Wallet {
     /// returns an error. Use this for offline operations or when you want to avoid network calls.
     #[instrument(skip(self))]
     pub async fn get_active_keyset(&self) -> Result<KeySetInfo, Error> {
-        let active_keysets = self.key_manager.get_active_keysets(&self.mint_url);
+        let active_keysets = self.key_manager.get_active_keysets(&self.mint_url).await?;
 
         active_keysets
             .into_iter()

+ 4 - 30
crates/cdk/src/wallet/multi_mint_wallet.rs

@@ -11,6 +11,7 @@ use std::sync::Arc;
 use anyhow::Result;
 use cdk_common::database;
 use cdk_common::database::WalletDatabase;
+use cdk_common::task::spawn;
 use cdk_common::wallet::{Transaction, TransactionDirection};
 use tokio::sync::RwLock;
 use tracing::instrument;
@@ -914,20 +915,7 @@ impl MultiMintWallet {
             let target_mint_url = target_mint_url.clone();
 
             // Spawn parallel transfer task
-            #[cfg(not(target_arch = "wasm32"))]
-            let task = tokio::spawn(async move {
-                self_clone
-                    .transfer(
-                        &source_mint_url,
-                        &target_mint_url,
-                        TransferMode::ExactReceive(transfer_amount),
-                    )
-                    .await
-                    .map(|result| result.amount_received)
-            });
-
-            #[cfg(target_arch = "wasm32")]
-            let task = tokio::task::spawn_local(async move {
+            let task = spawn(async move {
                 self_clone
                     .transfer(
                         &source_mint_url,
@@ -1342,14 +1330,7 @@ impl MultiMintWallet {
             let amount_msat = u64::from(amount) * 1000;
             let options = Some(MeltOptions::new_mpp(amount_msat));
 
-            #[cfg(not(target_arch = "wasm32"))]
-            let task = tokio::spawn(async move {
-                let quote = wallet.melt_quote(bolt11_clone, options).await;
-                (mint_url_clone, quote)
-            });
-
-            #[cfg(target_arch = "wasm32")]
-            let task = tokio::task::spawn_local(async move {
+            let task = spawn(async move {
                 let quote = wallet.melt_quote(bolt11_clone, options).await;
                 (mint_url_clone, quote)
             });
@@ -1398,14 +1379,7 @@ impl MultiMintWallet {
 
             let mint_url_clone = mint_url.clone();
 
-            #[cfg(not(target_arch = "wasm32"))]
-            let task = tokio::spawn(async move {
-                let melted = wallet.melt(&quote_id).await;
-                (mint_url_clone, melted)
-            });
-
-            #[cfg(target_arch = "wasm32")]
-            let task = tokio::task::spawn_local(async move {
+            let task = spawn(async move {
                 let melted = wallet.melt(&quote_id).await;
                 (mint_url_clone, melted)
             });