Преглед изворни кода

Introduce KeyManager for the wallet

This commit introduces a new **KeyManager** subsystem that centralizes mint key
management across all wallets with in-memory caching, automatic background
refresh, and lock-free read access. This architectural change eliminates
redundant database queries, reduces mint server load, and improves wallet
performance through optimized key access patterns.

```
┌─────────┐     ┌──────────┐     ┌──────────┐
│ Wallet  │────>│ Database │<────│ Wallet   │
└─────────┘     └──────────┘     └──────────┘
      │                                │
      └────────> Mint Server <─────────┘
         (redundant requests)
```

**Problems:**
- Each wallet independently queries database for keys
- Duplicate HTTP requests to mint servers
- No coordination between wallets
- Blocking I/O on every key access

```
┌─────────────────────────────────────────────────────────┐
│                      KeyManager                         │
│  ┌────────────────────────────────────────────────────┐ │
│  │   Per-Mint Cache (ArcSwap<MintKeyCache>)           │ │
│  │   • MintInfo                                       │ │
│  │   • Keysets by ID (HashMap<Id, KeySetInfo>)        │ │
│  │   • Active keysets (Vec<KeySetInfo>)               │ │
│  │   • Keys by keyset ID (HashMap<Id, Keys>)          │ │
│  │   • Refresh timestamp & version (u64)              │ │
│  └────────────────────────────────────────────────────┘ │
│                           ↓                             │
│  ┌──────────────────┐  ┌──────────────────────────┐     │
│  │ Refresh Scheduler│  │ HTTP Semaphore           │     │
│  │ • BTreeMap queue │  │ • Max 5 concurrent       │     │
│  │ • 5-min interval │  │ • Fair permit allocation │     │
│  └──────────────────┘  └──────────────────────────┘     │
└─────────────────────────────────────────────────────────┘
        ↑                           ↑
        │                           │
  ┌─────────┐                ┌──────────┐
  │ Wallet  │                │ Database │
  │ Wallet  │                │  & HTTP  │
  └─────────┘                └──────────┘
   (shared access)         (coordinated)
```

**Benefits:**
- Single source of truth for all mint keys
- Lock-free reads via `ArcSwap` (zero contention)
- Automatic background refresh every 5 minutes
- HTTP throttling (max 5 concurrent requests)
- Database fallback on cache miss
- Memory-efficient shared state
Cesar Rodas пре 3 недеља
родитељ
комит
c81df88cd4

+ 3 - 3
crates/cdk-common/src/mint.rs

@@ -49,7 +49,7 @@ impl FromStr for OperationKind {
             "swap" => Ok(OperationKind::Swap),
             "mint" => Ok(OperationKind::Mint),
             "melt" => Ok(OperationKind::Melt),
-            _ => Err(Error::Custom(format!("Invalid operation kind: {}", value))),
+            _ => Err(Error::Custom(format!("Invalid operation kind: {value}"))),
         }
     }
 }
@@ -80,7 +80,7 @@ impl FromStr for SwapSagaState {
         match value.as_str() {
             "setup_complete" => Ok(SwapSagaState::SetupComplete),
             "signed" => Ok(SwapSagaState::Signed),
-            _ => Err(Error::Custom(format!("Invalid swap saga state: {}", value))),
+            _ => Err(Error::Custom(format!("Invalid swap saga state: {value}"))),
         }
     }
 }
@@ -279,7 +279,7 @@ impl Operation {
             "mint" => Ok(Self::Mint(uuid)),
             "melt" => Ok(Self::Melt(uuid)),
             "swap" => Ok(Self::Swap(uuid)),
-            _ => Err(Error::Custom(format!("Invalid operation kind: {}", kind))),
+            _ => Err(Error::Custom(format!("Invalid operation kind: {kind}"))),
         }
     }
 }

+ 2 - 2
crates/cdk-redb/src/wallet/mod.rs

@@ -63,7 +63,7 @@ impl WalletRedbDatabase {
                 if !parent.exists() {
                     return Err(Error::Io(std::io::Error::new(
                         std::io::ErrorKind::NotFound,
-                        format!("Parent directory does not exist: {:?}", parent),
+                        format!("Parent directory does not exist: {parent:?}"),
                     )));
                 }
             }
@@ -171,7 +171,7 @@ impl WalletRedbDatabase {
             if !parent.exists() {
                 return Err(Error::Io(std::io::Error::new(
                     std::io::ErrorKind::NotFound,
-                    format!("Parent directory does not exist: {:?}", parent),
+                    format!("Parent directory does not exist: {parent:?}"),
                 )));
             }
         }

+ 2 - 3
crates/cdk-sql-common/build.rs

@@ -23,7 +23,7 @@ fn main() {
             .unwrap_or("default")
             .replace("/", "_")
             .replace("\\", "_");
-        let dest_path = out_dir.join(format!("migrations_{}.rs", migration_name));
+        let dest_path = out_dir.join(format!("migrations_{migration_name}.rs"));
         let mut out_file = File::create(&dest_path).expect("Failed to create migrations.rs");
 
         let skip_name = migration_path.to_str().unwrap_or_default().len();
@@ -115,8 +115,7 @@ fn main() {
             let relative_to_out_dir = relative_path.to_str().unwrap().replace("\\", "/");
             writeln!(
                 out_file,
-                "    (\"{prefix}\", \"{rel_name}\", include_str!(r#\"{}\"#)),",
-                relative_to_out_dir
+                "    (\"{prefix}\", \"{rel_name}\", include_str!(r#\"{relative_to_out_dir}\"#)),"
             )
             .unwrap();
             println!("cargo:rerun-if-changed={}", path.display());

+ 7 - 7
crates/cdk-sql-common/src/mint/mod.rs

@@ -2248,10 +2248,10 @@ where
         let current_time = unix_time();
 
         let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
-            .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {}", e)))?;
+            .map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {e}")))?;
 
         let input_ys_json = serde_json::to_string(&saga.input_ys)
-            .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {}", e)))?;
+            .map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {e}")))?;
 
         query(
             r#"
@@ -2645,23 +2645,23 @@ fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
 
     let operation_id_str = column_as_string!(&operation_id);
     let operation_id = uuid::Uuid::parse_str(&operation_id_str)
-        .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {}", e)))?;
+        .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
 
     let operation_kind_str = column_as_string!(&operation_kind);
     let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
-        .map_err(|e| Error::Internal(format!("Invalid operation kind: {}", e)))?;
+        .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
 
     let state_str = column_as_string!(&state);
     let state = mint::SagaStateEnum::new(operation_kind, &state_str)
-        .map_err(|e| Error::Internal(format!("Invalid saga state: {}", e)))?;
+        .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
 
     let blinded_secrets_str = column_as_string!(&blinded_secrets);
     let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
-        .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {}", e)))?;
+        .map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {e}")))?;
 
     let input_ys_str = column_as_string!(&input_ys);
     let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
-        .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {}", e)))?;
+        .map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {e}")))?;
 
     let quote_id = match &quote_id {
         Column::Text(s) => {

+ 30 - 66
crates/cdk/src/wallet/auth/auth_wallet.rs

@@ -3,7 +3,6 @@ use std::sync::Arc;
 
 use cdk_common::database::{self, WalletDatabase};
 use cdk_common::mint_url::MintUrl;
-use cdk_common::nut02::KeySetInfosMethods;
 use cdk_common::{AuthProof, Id, Keys, MintInfo};
 use serde::{Deserialize, Serialize};
 use tokio::sync::RwLock;
@@ -18,6 +17,7 @@ use crate::nuts::{
     Proofs, ProtectedEndpoint, State,
 };
 use crate::types::ProofInfo;
+use crate::wallet::key_manager::KeyManager;
 use crate::wallet::mint_connector::AuthHttpClient;
 use crate::{Amount, Error, OidcClient};
 
@@ -40,6 +40,8 @@ pub struct AuthWallet {
     pub mint_url: MintUrl,
     /// Storage backend
     pub localstore: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
+    /// Centralized key manager (lock-free cached key access)
+    pub key_manager: Arc<KeyManager>,
     /// Protected methods
     pub protected_endpoints: Arc<RwLock<HashMap<ProtectedEndpoint, AuthRequired>>>,
     /// Refresh token for auth
@@ -55,6 +57,7 @@ impl AuthWallet {
         mint_url: MintUrl,
         cat: Option<AuthToken>,
         localstore: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
+        key_manager: Arc<KeyManager>,
         protected_endpoints: HashMap<ProtectedEndpoint, AuthRequired>,
         oidc_client: Option<OidcClient>,
     ) -> Self {
@@ -62,6 +65,7 @@ impl AuthWallet {
         Self {
             mint_url,
             localstore,
+            key_manager,
             protected_endpoints: Arc::new(RwLock::new(protected_endpoints)),
             refresh_token: Arc::new(RwLock::new(None)),
             client: http_client,
@@ -170,89 +174,49 @@ impl AuthWallet {
 
     /// Fetch keys for mint keyset
     ///
-    /// Returns keys from local database if they are already stored.
-    /// If keys are not found locally, goes online to query the mint for the keyset and stores the [`Keys`] in local database.
+    /// Returns keys from KeyManager cache if available.
+    /// If keys are not cached, triggers a refresh and waits briefly before checking again.
     #[instrument(skip(self))]
     pub async fn load_keyset_keys(&self, keyset_id: Id) -> Result<Keys, Error> {
-        let keys = if let Some(keys) = self.localstore.get_keys(&keyset_id).await? {
-            keys
-        } else {
-            let keys = self.client.get_mint_blind_auth_keyset(keyset_id).await?;
-
-            keys.verify_id()?;
-
-            self.localstore.add_keys(keys.clone()).await?;
-
-            keys.keys
-        };
-
-        Ok(keys)
+        Ok((*self
+            .key_manager
+            .get_keys(&self.mint_url, &keyset_id)
+            .await?)
+            .clone())
     }
 
-    /// Get blind auth keysets from local database or go online if missing
+    /// Get blind auth keysets from KeyManager cache or trigger refresh if missing
     ///
-    /// First checks the local database for cached blind auth keysets. If keysets are not found locally,
-    /// goes online to refresh keysets from the mint and updates the local database.
+    /// First checks the KeyManager cache for keysets. If keysets are not cached,
+    /// triggers a refresh from the mint and waits briefly before checking again.
     /// This is the main method for getting auth keysets in operations that can work offline
     /// but will fall back to online if needed.
     #[instrument(skip(self))]
     pub async fn load_mint_keysets(&self) -> Result<Vec<KeySetInfo>, Error> {
-        match self
-            .localstore
-            .get_mint_keysets(self.mint_url.clone())
-            .await?
-        {
-            Some(keysets_info) => {
-                let auth_keysets: Vec<KeySetInfo> =
-                    keysets_info.unit(CurrencyUnit::Sat).cloned().collect();
-                if auth_keysets.is_empty() {
-                    // If we don't have any auth keysets, fetch them from the mint
-                    let keysets = self.refresh_keysets().await?;
-                    Ok(keysets)
-                } else {
-                    Ok(auth_keysets)
-                }
-            }
-            None => {
-                // If we don't have any keysets, fetch them from the mint
-                let keysets = self.refresh_keysets().await?;
-                Ok(keysets)
+        if let Ok(keysets) = self.key_manager.get_keysets(&self.mint_url).await {
+            let auth_keysets: Vec<KeySetInfo> = keysets
+                .into_iter()
+                .filter(|k| k.unit == CurrencyUnit::Auth)
+                .collect();
+            if !auth_keysets.is_empty() {
+                return Ok(auth_keysets);
             }
         }
+
+        Err(Error::UnknownKeySet)
     }
 
     /// Refresh blind auth keysets by fetching the latest from mint - always goes online
     ///
-    /// This method always goes online to fetch the latest blind auth keyset information from the mint.
-    /// It updates the local database with the fetched keysets and ensures we have keys for all keysets.
-    /// Returns only the keysets with Auth currency unit. This is used when operations need the most
-    /// up-to-date keyset information and are willing to go online.
+    /// This method triggers a KeyManager refresh which fetches the latest blind auth keyset
+    /// information from the mint. The KeyManager handles updating the cache and database.
+    /// Returns only the keysets with Auth currency unit. This is used when operations need
+    /// the most up-to-date keyset information.
     #[instrument(skip(self))]
     pub async fn refresh_keysets(&self) -> Result<Vec<KeySetInfo>, Error> {
-        let keysets_response = self.client.get_mint_blind_auth_keysets().await?;
-        let keysets = keysets_response.keysets;
-
-        // Update local store with keysets
-        self.localstore
-            .add_mint_keysets(self.mint_url.clone(), keysets.clone())
-            .await?;
-
-        // Filter for auth keysets
-        let auth_keysets = keysets
-            .clone()
-            .into_iter()
-            .filter(|k| k.unit == CurrencyUnit::Auth)
-            .collect::<Vec<KeySetInfo>>();
-
-        // Ensure we have keys for all auth keysets
-        for keyset in &auth_keysets {
-            if self.localstore.get_keys(&keyset.id).await?.is_none() {
-                tracing::debug!("Fetching missing keys for auth keyset {}", keyset.id);
-                self.load_keyset_keys(keyset.id).await?;
-            }
-        }
+        tracing::debug!("Refreshing auth keysets via KeyManager");
 
-        Ok(auth_keysets)
+        self.key_manager.refresh(&self.mint_url).await
     }
 
     /// Get the first active blind auth keyset - always goes online

+ 25 - 0
crates/cdk/src/wallet/builder.rs

@@ -14,6 +14,7 @@ use crate::mint_url::MintUrl;
 use crate::nuts::CurrencyUnit;
 #[cfg(feature = "auth")]
 use crate::wallet::auth::AuthWallet;
+use crate::wallet::key_manager::KeyManager;
 use crate::wallet::{HttpClient, MintConnector, SubscriptionManager, Wallet};
 
 /// Builder for creating a new [`Wallet`]
@@ -28,6 +29,7 @@ pub struct WalletBuilder {
     seed: Option<[u8; 64]>,
     use_http_subscription: bool,
     client: Option<Arc<dyn MintConnector + Send + Sync>>,
+    key_manager: Option<Arc<KeyManager>>,
 }
 
 impl Default for WalletBuilder {
@@ -42,6 +44,7 @@ impl Default for WalletBuilder {
             seed: None,
             client: None,
             use_http_subscription: false,
+            key_manager: None,
         }
     }
 }
@@ -117,13 +120,25 @@ impl WalletBuilder {
         self
     }
 
+    /// Set a shared global KeyManager
+    ///
+    /// This allows multiple wallets to share the same KeyManager instance for
+    /// optimal performance and memory usage. If not provided, a new KeyManager
+    /// will be created for each wallet.
+    pub fn key_manager(mut self, key_manager: Arc<KeyManager>) -> Self {
+        self.key_manager = Some(key_manager);
+        self
+    }
+
     /// Set auth CAT (Clear Auth Token)
     #[cfg(feature = "auth")]
     pub fn set_auth_cat(mut self, cat: String) -> Self {
+        let key_manager = self.key_manager.clone().unwrap_or_else(KeyManager::new);
         self.auth_wallet = Some(AuthWallet::new(
             self.mint_url.clone().expect("Mint URL required"),
             Some(AuthToken::ClearAuth(cat)),
             self.localstore.clone().expect("Localstore required"),
+            key_manager,
             HashMap::new(),
             None,
         ));
@@ -162,10 +177,20 @@ impl WalletBuilder {
             }
         };
 
+        let key_manager = self.key_manager.unwrap_or_else(KeyManager::new);
+        let key_sub_id = key_manager.register_mint(
+            mint_url.clone(),
+            unit.clone(),
+            localstore.clone(),
+            client.clone(),
+        );
+
         Ok(Wallet {
             mint_url,
             unit,
             localstore,
+            key_manager,
+            _key_sub_id: key_sub_id,
             target_proof_count: self.target_proof_count.unwrap_or(3),
             #[cfg(feature = "auth")]
             auth_wallet: Arc::new(RwLock::new(self.auth_wallet)),

+ 995 - 0
crates/cdk/src/wallet/key_manager.rs

@@ -0,0 +1,995 @@
+//! Centralized key management with in-memory caching
+//!
+//! Provides global key management for all wallets with automatic background refresh
+//! and lock-free cache access. Keys are fetched from mint servers, cached in memory,
+//! and periodically updated without blocking wallet operations.
+//!
+//! # Architecture
+//!
+//! - **Per-mint cache**: Stores keysets and keys indexed by ID with atomic updates
+//! - **Background refresh**: Periodic 5-minute updates keep keys fresh
+//! - **HTTP throttling**: Max 5 concurrent requests to prevent overwhelming servers
+//! - **Database fallback**: Loads from storage when cache misses or HTTP fails
+//!
+//! # Usage
+//!
+//! ```ignore
+//! let key_manager = KeyManager::new();
+//! key_manager.register_mint(mint_url, unit, storage, client);
+//! let keys = key_manager.get_keys(&mint_url, &keyset_id).await?;
+//! ```
+
+use std::collections::{BTreeMap, HashMap};
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use arc_swap::ArcSwap;
+use cdk_common::database::{self, WalletDatabase};
+use cdk_common::mint_url::MintUrl;
+use cdk_common::nuts::{CurrencyUnit, KeySetInfo, Keys};
+use cdk_common::parking_lot::{Mutex as ParkingLotMutex, RwLock as ParkingLotRwLock};
+use cdk_common::util::unix_time;
+use cdk_common::{KeySet, MintInfo};
+use tokio::sync::{mpsc, Semaphore};
+use tokio::task::JoinHandle;
+use tracing::{debug, error, warn};
+
+#[cfg(feature = "auth")]
+use super::AuthMintConnector;
+use crate::nuts::Id;
+#[cfg(feature = "auth")]
+use crate::wallet::AuthHttpClient;
+use crate::wallet::MintConnector;
+use crate::Error;
+
+/// Refresh interval for background key refresh (5 minutes)
+const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
+
+/// Maximum concurrent HTTP requests to mint servers
+const MAX_CONCURRENT_HTTP_REQUESTS: usize = 5;
+
+/// Manages refresh scheduling for mints
+///
+/// Tracks when each mint should be refreshed next. Uses BTreeMap for efficient
+/// range queries to find all mints due for refresh. Counter ensures unique keys
+/// when multiple mints are scheduled for the same instant.
+#[derive(Clone)]
+struct RefreshScheduler {
+    /// Maps refresh time to mint URL
+    schedule: Arc<ParkingLotMutex<BTreeMap<(Instant, usize), MintUrl>>>,
+
+    /// Counter to ensure unique BTreeMap keys
+    counter: Arc<AtomicUsize>,
+
+    /// Interval between refreshes
+    interval: Duration,
+}
+
+impl RefreshScheduler {
+    /// Create a new refresh scheduler with the given interval
+    fn new(interval: Duration) -> Self {
+        Self {
+            schedule: Arc::new(ParkingLotMutex::new(BTreeMap::new())),
+            interval,
+            counter: Arc::new(0.into()),
+        }
+    }
+
+    /// Schedule a mint for refresh after the configured interval
+    fn schedule_refresh(&self, mint_url: MintUrl) {
+        let next_refresh = Instant::now() + self.interval;
+        let mut schedule = self.schedule.lock();
+        schedule.insert(
+            (
+                next_refresh,
+                self.counter
+                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
+            ),
+            mint_url,
+        );
+    }
+
+    /// Get all mints that are due for refresh
+    ///
+    /// Returns and removes all mints whose scheduled refresh time has passed.
+    fn get_due_refreshes(&self) -> Vec<MintUrl> {
+        let mut schedule = self.schedule.lock();
+        let now = Instant::now();
+
+        let (keys, due_mints): (Vec<_>, Vec<_>) = schedule
+            .range(..=(now, usize::MAX))
+            .map(|(key, mint_url)| (*key, mint_url.clone()))
+            .unzip();
+
+        for key in keys {
+            let _ = schedule.remove(&key);
+        }
+
+        due_mints
+    }
+}
+
+/// Messages for the background refresh task
+#[derive(Debug, Clone)]
+pub enum RefreshMessage {
+    /// Stop the refresh task
+    Stop,
+
+    /// Fetch keys for a specific mint immediately
+    FetchMint(MintUrl),
+}
+
+/// Per-mint key cache
+///
+/// Stores all keyset and key data for a single mint. Updated atomically via ArcSwap.
+/// The `refresh_version` increments on each update to detect when cache has changed.
+#[derive(Clone, Debug)]
+struct MintKeyCache {
+    /// Mint info from server
+    mint_info: Option<MintInfo>,
+
+    /// All keysets by ID
+    keysets_by_id: HashMap<Id, Arc<KeySetInfo>>,
+
+    /// Active keysets for quick access
+    active_keysets: Vec<Arc<KeySetInfo>>,
+
+    /// All keys by keyset ID
+    keys_by_id: HashMap<Id, Arc<Keys>>,
+
+    /// Last refresh timestamp
+    last_refresh: Instant,
+
+    /// Cache generation (increments on each refresh)
+    refresh_version: u64,
+}
+
+impl MintKeyCache {
+    fn empty() -> Self {
+        Self {
+            mint_info: None,
+            keysets_by_id: HashMap::new(),
+            active_keysets: Vec::new(),
+            keys_by_id: HashMap::new(),
+            last_refresh: Instant::now(),
+            refresh_version: 0,
+        }
+    }
+}
+
+/// External resources needed to manage keys for a mint
+///
+/// Combines storage and client into a single struct to keep them paired together.
+#[derive(Clone)]
+struct MintResources {
+    /// Storage backend for persistence
+    storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
+
+    #[cfg(feature = "auth")]
+    auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
+
+    /// Client for fetching keys from mint
+    client: Arc<dyn MintConnector + Send + Sync>,
+}
+
+/// Per-mint registration info
+///
+/// Contains all resources and cached data for managing keys for a single mint.
+#[derive(Clone)]
+struct MintRegistration {
+    /// Mint URL
+    mint_url: MintUrl,
+
+    /// Currency unit
+    unit: CurrencyUnit,
+
+    /// External resources (storage + client)
+    resources: Arc<ParkingLotRwLock<HashMap<usize, MintResources>>>,
+
+    /// Cached data
+    cache: Arc<ArcSwap<MintKeyCache>>,
+}
+
+impl std::fmt::Debug for MintRegistration {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("MintRegistration")
+            .field("mint_url", &self.mint_url)
+            .field("unit", &self.unit)
+            .field("resources", &"<MintResources>")
+            .field("cache", &self.cache)
+            .finish()
+    }
+}
+
+type Mints = Arc<ParkingLotRwLock<HashMap<MintUrl, MintRegistration>>>;
+
+/// Global key manager shared across all wallets
+///
+/// Centralizes key management with in-memory caching and background refresh.
+/// All wallets share the same cache for each mint, avoiding duplicate fetches.
+///
+/// Spawns a background task on creation that refreshes keys every 5 minutes.
+/// Dropping the KeyManager stops the background task.
+pub struct KeyManager {
+    /// Registered mints by URL (using parking_lot for sync access)
+    mints: Mints,
+
+    /// Message sender to refresh task
+    tx: mpsc::UnboundedSender<RefreshMessage>,
+
+    /// Background refresh task handle
+    refresh_task: Arc<ParkingLotMutex<Option<JoinHandle<()>>>>,
+
+    /// Refresh interval
+    refresh_interval: Duration,
+
+    /// Internal counter for each registered/mint wallet
+    counter: AtomicUsize,
+
+    /// Semaphore to limit concurrent HTTP requests to mint servers
+    /// This is stored to keep it alive and is passed to the refresh loop
+    #[allow(dead_code)]
+    refresh_semaphore: Arc<Semaphore>,
+}
+
+impl std::fmt::Debug for KeyManager {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("KeyManager")
+            .field("mints", &self.mints)
+            .field("refresh_interval", &self.refresh_interval)
+            .finish()
+    }
+}
+
+/// KeySubscription
+pub struct KeySubscription {
+    /// Registered mints by URL (using parking_lot for sync access)
+    mints: Mints,
+    mint_url: MintUrl,
+    id: usize,
+}
+
+impl std::fmt::Debug for KeySubscription {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("KeyManager")
+            .field("mint_url", &self.mint_url)
+            .finish()
+    }
+}
+
+impl Drop for KeySubscription {
+    fn drop(&mut self) {
+        KeyManager::deregister_mint(self.mints.clone(), self.mint_url.clone(), self.id);
+    }
+}
+
+impl KeyManager {
+    /// Create a new KeyManager with default 5-minute refresh interval
+    ///
+    /// Spawns a background task that refreshes all registered mints periodically.
+    pub fn new() -> Arc<Self> {
+        Self::with_refresh_interval(DEFAULT_REFRESH_INTERVAL)
+    }
+
+    /// Create a new KeyManager with custom refresh interval
+    pub fn with_refresh_interval(refresh_interval: Duration) -> Arc<Self> {
+        let (tx, rx) = mpsc::unbounded_channel();
+        let refresh_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_HTTP_REQUESTS));
+
+        let manager = Self {
+            mints: Arc::new(ParkingLotRwLock::new(HashMap::new())),
+            tx,
+            refresh_task: Arc::new(ParkingLotMutex::new(None)),
+            refresh_interval,
+            refresh_semaphore: refresh_semaphore.clone(),
+            counter: 0.into(),
+        };
+
+        let mints = manager.mints.clone();
+        let refresh_interval = manager.refresh_interval;
+        let task = tokio::spawn(async move {
+            Self::refresh_loop(rx, mints, refresh_interval, refresh_semaphore).await;
+        });
+
+        {
+            let mut refresh_task = manager.refresh_task.lock();
+            *refresh_task = Some(task);
+        }
+
+        Arc::new(manager)
+    }
+
+    /// Send a message to the background refresh task
+    fn send_message(&self, msg: RefreshMessage) {
+        let _ = self
+            .tx
+            .send(msg)
+            .inspect_err(|e| error!("Failed to send message to refresh task: {}", e));
+    }
+
+    /// Register a mint for key management
+    ///
+    /// Registers the mint immediately and triggers an initial key fetch in the background.
+    /// The mint will be automatically refreshed every `refresh_interval`.
+    pub fn register_mint(
+        &self,
+        mint_url: MintUrl,
+        unit: CurrencyUnit,
+        storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
+        client: Arc<dyn MintConnector + Send + Sync>,
+    ) -> Arc<KeySubscription> {
+        debug!("Registering mint: {} ({})", mint_url, unit);
+        let mut mints = self.mints.write();
+
+        let mint = mints.entry(mint_url.clone()).or_insert_with(|| {
+            let cache = Arc::new(ArcSwap::from_pointee(MintKeyCache::empty()));
+
+            MintRegistration {
+                mint_url: mint_url.clone(),
+                unit,
+                resources: Arc::new(ParkingLotRwLock::new(HashMap::new())),
+                cache,
+            }
+        });
+
+        let id = self
+            .counter
+            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
+
+        tokio::spawn(Self::persist_cache_db(
+            storage.clone(),
+            mint_url.clone(),
+            mint.cache.load().clone(),
+        ));
+
+        #[cfg(feature = "auth")]
+        let mint_resource = MintResources {
+            storage,
+            client,
+            auth_client: Arc::new(AuthHttpClient::new(mint_url.clone(), None)),
+        };
+
+        #[cfg(not(feature = "auth"))]
+        let mint_resource = MintResources { storage, client };
+
+        mint.resources.write().insert(id, mint_resource);
+
+        drop(mints);
+
+        debug!("Mint registered: {}", mint_url);
+
+        self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
+
+        Arc::new(KeySubscription {
+            mints: self.mints.clone(),
+            mint_url,
+            id,
+        })
+    }
+
+    fn deregister_mint(locked_mints: Mints, mint_url: MintUrl, internal_id: usize) {
+        let mut mints = locked_mints.write();
+        let mint = if let Some(r) = mints.remove(&mint_url) {
+            r
+        } else {
+            return;
+        };
+        let mut r = mint.resources.write();
+        r.remove(&internal_id);
+
+        if !r.is_empty() {
+            // add mint back, as there are other wallets to the same mint_url active
+            drop(r);
+            mints.insert(mint_url, mint);
+        }
+    }
+
+    /// Get keys for a keyset (cache-first with automatic refresh)
+    ///
+    /// Returns keys from cache if available. If not cached, triggers a refresh
+    /// and waits up to 2 seconds for the keys to arrive.
+    pub async fn get_keys(&self, mint_url: &MintUrl, keyset_id: &Id) -> Result<Arc<Keys>, Error> {
+        if let Ok(keys) = self.get_keys_sync(mint_url, keyset_id) {
+            return Ok(keys.clone());
+        }
+
+        tracing::debug!(
+            "Keyset {} not in cache, triggering refresh for mint {}",
+            keyset_id,
+            mint_url
+        );
+
+        self.refresh_now(mint_url);
+
+        for _ in 0..20 {
+            if let Ok(keys) = self.get_keys_sync(mint_url, keyset_id) {
+                return Ok(keys.clone());
+            }
+            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+        }
+
+        Err(Error::UnknownKeySet)
+    }
+
+    /// Get keys for a keyset (cache-only, no refresh)
+    ///
+    /// Returns keys immediately from cache. Returns error if not cached.
+    /// Does not trigger any background fetches.
+    pub fn get_keys_sync(&self, mint_url: &MintUrl, keyset_id: &Id) -> Result<Arc<Keys>, Error> {
+        let mints = self.mints.read();
+        let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+        let cache = registration.cache.load();
+        cache
+            .keys_by_id
+            .get(keyset_id)
+            .cloned()
+            .ok_or(Error::UnknownKeySet)
+    }
+
+    /// Get keyset info by ID (cache-first with automatic refresh)
+    pub async fn get_keyset_by_id(
+        &self,
+        mint_url: &MintUrl,
+        keyset_id: &Id,
+    ) -> Result<Arc<KeySetInfo>, Error> {
+        if let Ok(keysets) = self.get_keyset_by_id_sync(mint_url, keyset_id) {
+            return Ok(keysets);
+        }
+
+        self.refresh_now(mint_url);
+
+        for _ in 0..20 {
+            if let Ok(keysets) = self.get_keyset_by_id_sync(mint_url, keyset_id) {
+                return Ok(keysets);
+            }
+            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+        }
+
+        Err(Error::UnknownKeySet)
+    }
+
+    /// Get keyset info by ID (cache-only, no refresh)
+    pub fn get_keyset_by_id_sync(
+        &self,
+        mint_url: &MintUrl,
+        keyset_id: &Id,
+    ) -> Result<Arc<KeySetInfo>, Error> {
+        let mints = self.mints.read();
+        let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+        let cache = registration.cache.load();
+        cache
+            .keysets_by_id
+            .get(keyset_id)
+            .cloned()
+            .ok_or(Error::UnknownKeySet)
+    }
+
+    /// Get all keysets for a mint (cache-first with automatic refresh)
+    pub async fn get_keysets(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
+        if let Ok(keysets) = self.get_keysets_sync(mint_url) {
+            return Ok(keysets);
+        }
+
+        self.refresh_now(mint_url);
+
+        for _ in 0..20 {
+            if let Ok(keysets) = self.get_keysets_sync(mint_url) {
+                return Ok(keysets);
+            }
+            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+        }
+
+        Err(Error::UnknownKeySet)
+    }
+
+    /// Get all keysets for a mint (cache-only, no refresh)
+    pub fn get_keysets_sync(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
+        let mints = self.mints.read();
+        let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+        let cache = registration.cache.load();
+        let keysets: Vec<KeySetInfo> = cache
+            .keysets_by_id
+            .values()
+            .map(|ks| (**ks).clone())
+            .collect();
+        if keysets.is_empty() {
+            Err(Error::UnknownKeySet)
+        } else {
+            Ok(keysets)
+        }
+    }
+
+    /// Get all active keysets for a mint (cache-only, no refresh)
+    pub fn get_active_keysets(&self, mint_url: &MintUrl) -> Vec<Arc<KeySetInfo>> {
+        let mints = self.mints.read();
+        let Some(registration) = mints.get(mint_url) else {
+            return Vec::new();
+        };
+        let cache = registration.cache.load();
+        cache.active_keysets.clone()
+    }
+
+    /// Load mint info and keys from all registered databases
+    ///
+    /// Iterates through all storage backends and loads keysets and keys into cache.
+    /// This is called on first access when cache is empty.
+    async fn fetch_mint_info_and_keys_from_db(
+        registration: &MintRegistration,
+    ) -> Result<(), Error> {
+        debug!(
+            "Cache empty, loading from storage first for {}",
+            registration.mint_url
+        );
+
+        let mut storage_cache = MintKeyCache::empty();
+        let storages = registration
+            .resources
+            .read()
+            .values()
+            .map(|resource| resource.storage.clone())
+            .collect::<Vec<_>>();
+
+        for storage in storages {
+            if storage_cache.mint_info.is_none() {
+                storage_cache.mint_info = storage.get_mint(registration.mint_url.clone()).await?;
+            }
+
+            for keyset in storage
+                .get_mint_keysets(registration.mint_url.clone())
+                .await?
+                .ok_or(Error::UnknownKeySet)?
+            {
+                if storage_cache.keysets_by_id.contains_key(&keyset.id) {
+                    continue;
+                }
+
+                let arc_keyset = Arc::new(keyset.clone());
+                storage_cache
+                    .keysets_by_id
+                    .insert(keyset.id, arc_keyset.clone());
+
+                if keyset.active && keyset.unit == registration.unit {
+                    storage_cache.active_keysets.push(arc_keyset);
+                }
+            }
+
+            for keyset_id in storage_cache.keysets_by_id.keys() {
+                if storage_cache.keys_by_id.contains_key(keyset_id) {
+                    continue;
+                }
+
+                if let Some(keys) = storage.get_keys(keyset_id).await? {
+                    storage_cache.keys_by_id.insert(*keyset_id, Arc::new(keys));
+                }
+            }
+        }
+
+        let keys_count = storage_cache.keys_by_id.len();
+        storage_cache.refresh_version = storage_cache.refresh_version + 1;
+
+        let storage_cache = Arc::new(storage_cache);
+        registration.cache.store(storage_cache.clone());
+
+        Self::persist_cache(registration, storage_cache).await;
+
+        debug!(
+            "Loaded {} keys from storage for {}",
+            keys_count, registration.mint_url
+        );
+
+        Ok(())
+    }
+
+    /// Persist cache to a single database
+    ///
+    /// Writes mint info, keysets, and keys to the given storage backend.
+    /// Errors are logged but don't fail the operation.
+    async fn persist_cache_db(
+        storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
+        mint_url: MintUrl,
+        new_cache: Arc<MintKeyCache>,
+    ) {
+        if new_cache.mint_info.is_some() {
+            let _ = storage
+                .add_mint(mint_url.clone(), new_cache.mint_info.clone())
+                .await
+                .inspect_err(|e| {
+                    warn!("Failed to persist mint_info for {}: {}", mint_url, e);
+                });
+        }
+        let _ = storage
+            .add_mint_keysets(
+                mint_url.clone(),
+                new_cache
+                    .keysets_by_id
+                    .values()
+                    .map(|ks| (**ks).clone())
+                    .collect(),
+            )
+            .await
+            .inspect_err(|e| warn!("Failed to persist keysets for {}: {}", mint_url, e));
+
+        for (keyset_id, keys) in new_cache.keys_by_id.iter() {
+            if storage
+                .get_keys(keyset_id)
+                .await
+                .inspect_err(|e| warn!("Failed to get_keys {e}"))
+                .unwrap_or_default()
+                .is_none()
+            {
+                let keyset = if let Some(v) = new_cache.keysets_by_id.get(keyset_id) {
+                    v
+                } else {
+                    warn!("Malformed keysets, cannot find {}", keyset_id);
+                    continue;
+                };
+                let _ = storage
+                    .add_keys(KeySet {
+                        id: *keyset_id,
+                        unit: keyset.unit.clone(),
+                        final_expiry: keyset.final_expiry,
+                        keys: (**keys).clone(),
+                    })
+                    .await
+                    .inspect_err(|e| {
+                        warn!("Failed to persist keys for keyset {}: {}", keyset_id, e)
+                    });
+            }
+        }
+    }
+
+    /// Persist cache to all registered databases
+    ///
+    /// Spawns a task for each storage backend to write cache asynchronously.
+    async fn persist_cache(registration: &MintRegistration, new_cache: Arc<MintKeyCache>) {
+        let storages = registration
+            .resources
+            .read()
+            .values()
+            .map(|x| x.storage.clone())
+            .collect::<Vec<_>>();
+
+        for storage in storages {
+            tokio::spawn(Self::persist_cache_db(
+                storage,
+                registration.mint_url.clone(),
+                new_cache.clone(),
+            ));
+        }
+    }
+
+    /// Fetch keys from mint server via HTTP
+    ///
+    /// Fetches mint info, keysets, and keys from the mint server. Updates cache
+    /// and schedules next refresh. Persists new data to all databases.
+    async fn fetch_from_http(
+        registration: MintRegistration,
+        refresh_scheduler: RefreshScheduler,
+    ) -> Result<(), Error> {
+        debug!(
+            "Fetching keys from mint server for {}",
+            registration.mint_url
+        );
+
+        let http_client = registration
+            .resources
+            .read()
+            .values()
+            .next()
+            .ok_or(Error::IncorrectMint)?
+            .client
+            .clone();
+
+        #[cfg(feature = "auth")]
+        let http_auth_client = registration
+            .resources
+            .read()
+            .values()
+            .next()
+            .ok_or(Error::IncorrectMint)?
+            .auth_client
+            .clone();
+
+        let mint_info = http_client.get_mint_info().await?;
+
+        if let Some(mint_unix_time) = mint_info.time {
+            let current_unix_time = unix_time();
+            if current_unix_time.abs_diff(mint_unix_time) > 30 {
+                tracing::warn!(
+                    "Mint time does match wallet time. Mint: {}, Wallet: {}",
+                    mint_unix_time,
+                    current_unix_time
+                );
+                return Err(Error::MintTimeExceedsTolerance);
+            }
+        }
+
+        let keysets_response = http_client.get_mint_keysets().await?;
+
+        let keysets = keysets_response.keysets;
+
+        #[cfg(feature = "auth")]
+        let keysets = if let Ok(auth_keysets_response) =
+            http_auth_client.get_mint_blind_auth_keysets().await
+        {
+            let mut keysets = keysets;
+            keysets.extend_from_slice(&auth_keysets_response.keysets);
+            keysets
+        } else {
+            keysets
+        };
+
+        let mut new_cache = MintKeyCache::empty();
+
+        for keyset_info in keysets {
+            let arc_keyset = Arc::new(keyset_info.clone());
+            new_cache
+                .keysets_by_id
+                .insert(keyset_info.id, arc_keyset.clone());
+
+            if keyset_info.active && keyset_info.unit == registration.unit {
+                new_cache.active_keysets.push(arc_keyset);
+            }
+
+            if let Ok(keyset) = http_client
+                .get_mint_keyset(keyset_info.id)
+                .await
+                .inspect_err(|e| warn!("Failed to fetch keys for keyset {}: {}", keyset_info.id, e))
+            {
+                let keys = Arc::new(keyset.keys.clone());
+                new_cache.keys_by_id.insert(keyset_info.id, keys);
+            }
+        }
+
+        refresh_scheduler.schedule_refresh(registration.mint_url.clone());
+
+        let old_generation = registration.cache.load().refresh_version;
+        new_cache.mint_info = Some(mint_info);
+        new_cache.refresh_version = old_generation + 1;
+        new_cache.last_refresh = Instant::now();
+
+        debug!(
+            "Refreshed {} keysets and {} keys for {} (generation {})",
+            new_cache.keysets_by_id.len(),
+            new_cache.keys_by_id.len(),
+            registration.mint_url,
+            new_cache.refresh_version
+        );
+
+        let new_cache = Arc::new(new_cache);
+        Self::persist_cache(&registration, new_cache.clone()).await;
+        registration.cache.store(new_cache);
+
+        Ok::<(), Error>(())
+    }
+
+    /// Refresh keys from mint server
+    ///
+    /// Spawns an async task with 60s timeout. HTTP requests are limited to
+    /// MAX_CONCURRENT_HTTP_REQUESTS concurrent requests via semaphore.
+    fn refresh_mint_task(
+        registration: MintRegistration,
+        semaphore: Arc<Semaphore>,
+        refresh_scheduler: RefreshScheduler,
+    ) {
+        tokio::spawn(async move {
+            if registration.cache.load().keysets_by_id.is_empty() {
+                let _ = Self::fetch_mint_info_and_keys_from_db(&registration)
+                    .await
+                    .inspect_err(|e| {
+                        warn!(
+                            "Failed to load keys from storage for {}: {}",
+                            registration.mint_url, e
+                        )
+                    });
+            }
+
+            let Ok(http_permit) = semaphore.acquire().await.inspect_err(|e| {
+                error!(
+                    "Failed to acquire HTTP permit for {}: {}",
+                    registration.mint_url, e
+                );
+            }) else {
+                return;
+            };
+
+            debug!(
+                "Acquired HTTP permit for {} ({} available)",
+                registration.mint_url,
+                semaphore.available_permits()
+            );
+
+            let mint_url = registration.mint_url.clone();
+
+            let timeout = Duration::from_secs(60);
+            let result = tokio::time::timeout(
+                timeout,
+                Self::fetch_from_http(registration, refresh_scheduler),
+            )
+            .await;
+
+            drop(http_permit);
+
+            let _ = result
+                .map_err(|_| Error::Timeout)
+                .and_then(|r| r)
+                .inspect(|_| {
+                    debug!(
+                        "Successfully fetched keys from mint server for {}",
+                        mint_url
+                    );
+                })
+                .inspect_err(|e| match e {
+                    Error::Timeout => {
+                        error!("Timeout fetching keys from mint server for {}", mint_url)
+                    }
+                    _ => {
+                        error!(
+                            "Failed to fetch keys from mint server for {}: {}",
+                            mint_url, e
+                        )
+                    }
+                });
+
+            debug!(
+                "Released HTTP permit for {} ({} available)",
+                mint_url,
+                semaphore.available_permits()
+            );
+        });
+    }
+
+    /// Background refresh loop with message handling
+    ///
+    /// Runs independently and handles refresh messages and periodic updates.
+    /// All refresh operations are spawned as separate tasks with timeouts.
+    async fn refresh_loop(
+        mut rx: mpsc::UnboundedReceiver<RefreshMessage>,
+        mints: Arc<ParkingLotRwLock<HashMap<MintUrl, MintRegistration>>>,
+        refresh_interval: Duration,
+        semaphore: Arc<Semaphore>,
+    ) {
+        let mut interval = tokio::time::interval(Duration::from_secs(1));
+        let refresh_scheduler = RefreshScheduler::new(refresh_interval);
+
+        loop {
+            tokio::select! {
+                Some(msg) = rx.recv() => {
+                    match msg {
+                        RefreshMessage::Stop => {
+                            debug!("Stopping refresh task");
+                            break;
+                        }
+                        RefreshMessage::FetchMint(mint_url) => {
+                            debug!("FetchMint message received for {}", mint_url);
+                            let registration = {
+                                let mints_lock = mints.read();
+                                mints_lock.get(&mint_url).cloned()
+                            };
+
+                            if let Some(reg) = registration {
+                                Self::refresh_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
+                            } else {
+                                warn!("FetchMint: Mint not registered: {}", mint_url);
+                            }
+                        }
+                    }
+                }
+
+                _ = interval.tick() => {
+                    debug!("Checking for mints due for refresh");
+
+                    let due_mints = refresh_scheduler.get_due_refreshes();
+
+                    if !due_mints.is_empty() {
+                        debug!("Found {} mints due for refresh", due_mints.len());
+                    }
+
+                    for mint_url in due_mints {
+                        let registration = {
+                            let mints_lock = mints.read();
+                            mints_lock.get(&mint_url).cloned()
+                        };
+
+                        if let Some(reg) = registration {
+                            Self::refresh_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
+                        } else {
+                            warn!("Mint no longer registered: {}", mint_url);
+                        }
+                    }
+                }
+            }
+        }
+
+        debug!("Refresh loop stopped");
+    }
+
+    /// Trigger a refresh and wait for it to complete
+    ///
+    /// Sends a refresh message to the background task and waits up to 2 seconds
+    /// for the cache to be updated with a newer version.
+    pub async fn refresh(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
+        let last_version = {
+            let mints = self.mints.read();
+            let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+            let cache = registration.cache.load();
+            cache.refresh_version
+        };
+
+        self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
+
+        for _ in 0..200 {
+            if let Some(keysets) = {
+                let mints = self.mints.read();
+                let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
+                let cache = registration.cache.load();
+                if last_version > 0 || cache.refresh_version > 0 {
+                    Some(
+                        cache
+                            .keysets_by_id
+                            .values()
+                            .map(|ks| (**ks).clone())
+                            .collect::<Vec<_>>(),
+                    )
+                } else {
+                    None
+                }
+            } {
+                return Ok(keysets);
+            }
+
+            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+        }
+
+        Err(Error::UnknownKeySet)
+    }
+
+    /// Trigger a refresh for a specific mint (non-blocking)
+    pub fn refresh_now(&self, mint_url: &MintUrl) {
+        self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
+    }
+
+    /// Get number of cached keys for a mint
+    pub fn cached_keys_count(&self, mint_url: &MintUrl) -> usize {
+        let mints = self.mints.read();
+        let Some(registration) = mints.get(mint_url) else {
+            return 0;
+        };
+        registration.cache.load().keys_by_id.len()
+    }
+
+    /// Get number of cached keysets for a mint
+    pub fn cached_keysets_count(&self, mint_url: &MintUrl) -> usize {
+        let mints = self.mints.read();
+        let Some(registration) = mints.get(mint_url) else {
+            return 0;
+        };
+        registration.cache.load().keysets_by_id.len()
+    }
+
+    /// Get number of available HTTP request slots
+    pub fn available_http_slots(&self) -> usize {
+        self.refresh_semaphore.available_permits()
+    }
+
+    /// Invalidate cache for a specific mint
+    pub fn invalidate(&self, mint_url: &MintUrl) {
+        let mints = self.mints.read();
+        if let Some(registration) = mints.get(mint_url) {
+            registration.cache.store(Arc::new(MintKeyCache::empty()));
+            debug!("Cache invalidated for {}", mint_url);
+        }
+    }
+}
+
+impl Drop for KeyManager {
+    fn drop(&mut self) {
+        self.send_message(RefreshMessage::Stop);
+        if let Some(mut task) = self.refresh_task.try_lock() {
+            if let Some(handle) = task.take() {
+                handle.abort();
+            }
+        }
+    }
+}

+ 35 - 95
crates/cdk/src/wallet/keysets.rs

@@ -10,99 +10,48 @@ use crate::{Error, Wallet};
 impl Wallet {
     /// Load keys for mint keyset
     ///
-    /// Returns keys from local database if they are already stored.
-    /// If keys are not found locally, goes online to query the mint for the keyset and stores the [`Keys`] in local database.
+    /// Returns keys from KeyManager cache if available.
+    /// If keys are not cached, triggers a refresh and waits briefly before checking again.
     #[instrument(skip(self))]
     pub async fn load_keyset_keys(&self, keyset_id: Id) -> Result<Keys, Error> {
-        let keys = if let Some(keys) = self.localstore.get_keys(&keyset_id).await? {
-            keys
-        } else {
-            tracing::debug!(
-                "Keyset {} not in db fetching from mint {}",
-                keyset_id,
-                self.mint_url
-            );
-
-            let keys = self.client.get_mint_keyset(keyset_id).await?;
-
-            keys.verify_id()?;
-
-            self.localstore.add_keys(keys.clone()).await?;
-
-            keys.keys
-        };
-
-        Ok(keys)
+        Ok((*self
+            .key_manager
+            .get_keys(&self.mint_url, &keyset_id)
+            .await?)
+            .clone())
     }
 
-    /// Get keysets from local database or go online if missing
+    /// Get keysets from KeyManager cache or trigger refresh if missing
     ///
-    /// First checks the local database for cached keysets. If keysets are not found locally,
-    /// goes online to refresh keysets from the mint and updates the local database.
+    /// First checks the KeyManager cache for keysets. If keysets are not cached,
+    /// triggers a refresh from the mint and waits briefly before checking again.
     /// This is the main method for getting keysets in token operations that can work offline
     /// but will fall back to online if needed.
     #[instrument(skip(self))]
     pub async fn load_mint_keysets(&self) -> Result<Vec<KeySetInfo>, Error> {
-        match self
-            .localstore
-            .get_mint_keysets(self.mint_url.clone())
-            .await?
-        {
-            Some(keysets_info) => Ok(keysets_info),
-            None => {
-                // If we don't have any keysets, fetch them from the mint
-                let keysets = self.refresh_keysets().await?;
-                Ok(keysets)
-            }
-        }
+        self.key_manager.get_keysets(&self.mint_url).await
     }
 
-    /// Get keysets from local database only - pure offline operation
+    /// Get keysets from KeyManager cache only - pure offline operation
     ///
-    /// Only checks the local database for cached keysets. If keysets are not found locally,
+    /// Only checks the KeyManager cache for keysets. If keysets are not cached,
     /// returns an error without going online. This is used for operations that must remain
     /// offline and rely on previously cached keyset data.
     #[instrument(skip(self))]
     pub async fn get_mint_keysets(&self) -> Result<Vec<KeySetInfo>, Error> {
-        match self
-            .localstore
-            .get_mint_keysets(self.mint_url.clone())
-            .await?
-        {
-            Some(keysets_info) => Ok(keysets_info),
-            None => Err(Error::UnknownKeySet),
-        }
+        self.key_manager.get_keysets(&self.mint_url).await
     }
 
     /// Refresh keysets by fetching the latest from mint - always goes online
     ///
-    /// This method always goes online to fetch the latest keyset information from the mint.
-    /// It updates the local database with the fetched keysets and ensures we have keys
-    /// for all active keysets. This is used when operations need the most up-to-date
-    /// keyset information and are willing to go online.
+    /// This method triggers a KeyManager refresh which fetches the latest keyset
+    /// information from the mint. The KeyManager handles updating the cache and database.
+    /// This is used when operations need the most up-to-date keyset information.
     #[instrument(skip(self))]
     pub async fn refresh_keysets(&self) -> Result<KeySetInfos, Error> {
-        tracing::debug!("Refreshing keysets and ensuring we have keys");
-        let _ = self.fetch_mint_info().await?;
-
-        // Fetch all current keysets from mint
-        let keysets_response = self.client.get_mint_keysets().await?;
-        let all_keysets = keysets_response.keysets;
+        tracing::debug!("Refreshing keysets via KeyManager");
 
-        // Update local storage with keyset info
-        self.localstore
-            .add_mint_keysets(self.mint_url.clone(), all_keysets.clone())
-            .await?;
-
-        // Filter for active keysets matching our unit
-        let keysets: KeySetInfos = all_keysets.unit(self.unit.clone()).cloned().collect();
-
-        // Ensure we have keys for all active keysets
-        for keyset in &keysets {
-            self.load_keyset_keys(keyset.id).await?;
-        }
-
-        Ok(keysets)
+        self.key_manager.refresh(&self.mint_url).await
     }
 
     /// Get the active keyset with the lowest fees - always goes online
@@ -120,47 +69,38 @@ impl Wallet {
             .ok_or(Error::NoActiveKeyset)
     }
 
-    /// Get the active keyset with the lowest fees from local database only - offline operation
+    /// Get the active keyset with the lowest fees from KeyManager cache - offline operation
     ///
-    /// Returns the active keyset with minimum input fees from cached keysets in the local database.
-    /// This is an offline operation that does not contact the mint. If no keysets are found locally,
+    /// Returns the active keyset with minimum input fees from the KeyManager cache.
+    /// This is an offline operation that does not contact the mint. If no keysets are cached,
     /// returns an error. Use this for offline operations or when you want to avoid network calls.
     #[instrument(skip(self))]
     pub async fn get_active_keyset(&self) -> Result<KeySetInfo, Error> {
-        match self
-            .localstore
-            .get_mint_keysets(self.mint_url.clone())
-            .await?
-        {
-            Some(keysets_info) => keysets_info
-                .into_iter()
-                .min_by_key(|k| k.input_fee_ppk)
-                .ok_or(Error::NoActiveKeyset),
-            None => Err(Error::UnknownKeySet),
-        }
+        let active_keysets = self.key_manager.get_active_keysets(&self.mint_url);
+
+        active_keysets
+            .into_iter()
+            .min_by_key(|k| k.input_fee_ppk)
+            .map(|ks| (*ks).clone())
+            .ok_or(Error::NoActiveKeyset)
     }
 
-    /// Get keyset fees and amounts for mint from local database only - offline operation
+    /// Get keyset fees and amounts for mint from KeyManager cache - offline operation
     ///
     /// Returns a HashMap of keyset IDs to their input fee rates (per-proof-per-thousand)
-    /// from cached keysets in the local database. This is an offline operation that does
-    /// not contact the mint. If no keysets are found locally, returns an error.
+    /// from the KeyManager cache. This is an offline operation that does not contact the mint.
+    /// If no keysets are cached, returns an error.
     pub async fn get_keyset_fees_and_amounts(&self) -> Result<KeysetFeeAndAmounts, Error> {
-        let keysets = self
-            .localstore
-            .get_mint_keysets(self.mint_url.clone())
-            .await?
-            .ok_or(Error::UnknownKeySet)?;
+        let keysets = self.key_manager.get_keysets(&self.mint_url).await?;
 
         let mut fees = HashMap::new();
         for keyset in keysets {
+            let keys = self.load_keyset_keys(keyset.id).await?;
             fees.insert(
                 keyset.id,
                 (
                     keyset.input_fee_ppk,
-                    self.load_keyset_keys(keyset.id)
-                        .await?
-                        .iter()
+                    keys.iter()
                         .map(|(amount, _)| amount.to_u64())
                         .collect::<Vec<_>>(),
                 )

+ 11 - 7
crates/cdk/src/wallet/mod.rs

@@ -9,6 +9,7 @@ use cdk_common::amount::FeeAndAmounts;
 use cdk_common::database::{self, WalletDatabase};
 use cdk_common::subscription::WalletParams;
 use getrandom::getrandom;
+use key_manager::KeySubscription;
 use subscription::{ActiveSubscription, SubscriptionManager};
 #[cfg(feature = "auth")]
 use tokio::sync::RwLock;
@@ -39,6 +40,7 @@ pub use mint_connector::TorHttpClient;
 mod balance;
 mod builder;
 mod issue;
+mod key_manager;
 mod keysets;
 mod melt;
 mod mint_connector;
@@ -86,8 +88,11 @@ pub struct Wallet {
     pub unit: CurrencyUnit,
     /// Storage backend
     pub localstore: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
+    /// Centralized key manager (lock-free cached key access)
+    pub key_manager: Arc<key_manager::KeyManager>,
     /// The targeted amount of proofs to have at each size
     pub target_proof_count: usize,
+    _key_sub_id: Arc<KeySubscription>,
     #[cfg(feature = "auth")]
     auth_wallet: Arc<RwLock<Option<AuthWallet>>>,
     seed: [u8; 64],
@@ -220,10 +225,9 @@ impl Wallet {
 
         for keyset_id in proofs_per_keyset.keys() {
             let mint_keyset_info = self
-                .localstore
-                .get_keyset_by_id(keyset_id)
-                .await?
-                .ok_or(Error::UnknownKeySet)?;
+                .key_manager
+                .get_keyset_by_id(&self.mint_url, keyset_id)
+                .await?;
             fee_per_keyset.insert(*keyset_id, mint_keyset_info.input_fee_ppk);
         }
 
@@ -236,10 +240,9 @@ impl Wallet {
     #[instrument(skip_all)]
     pub async fn get_keyset_count_fee(&self, keyset_id: &Id, count: u64) -> Result<Amount, Error> {
         let input_fee_ppk = self
-            .localstore
-            .get_keyset_by_id(keyset_id)
+            .key_manager
+            .get_keyset_by_id(&self.mint_url, keyset_id)
             .await?
-            .ok_or(Error::UnknownKeySet)?
             .input_fee_ppk;
 
         let fee = (input_fee_ppk * count).div_ceil(1000);
@@ -307,6 +310,7 @@ impl Wallet {
                                 self.mint_url.clone(),
                                 None,
                                 self.localstore.clone(),
+                                self.key_manager.clone(),
                                 mint_info.protected_endpoints(),
                                 oidc_client,
                             );

+ 3 - 4
crates/cdk/src/wallet/swap.rs

@@ -50,10 +50,9 @@ impl Wallet {
             .await?;
 
         let active_keys = self
-            .localstore
-            .get_keys(&active_keyset_id)
-            .await?
-            .ok_or(Error::NoActiveKeyset)?;
+            .key_manager
+            .get_keys(&self.mint_url, &active_keyset_id)
+            .await?;
 
         let post_swap_proofs = construct_proofs(
             swap_response.signatures,