Cesar Rodas 3 周之前
父节点
当前提交
1076984073
共有 2 个文件被更改,包括 35 次插入35 次删除
  1. 22 19
      crates/cdk/src/wallet/key_manager/mod.rs
  2. 13 16
      crates/cdk/src/wallet/key_manager/worker.rs

+ 22 - 19
crates/cdk/src/wallet/key_manager/mod.rs

@@ -20,6 +20,7 @@
 //! ```
 
 use std::collections::HashMap;
+use std::fmt::Debug;
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 
@@ -38,8 +39,6 @@ use worker::MessageToWorker;
 mod scheduler;
 mod worker;
 
-#[cfg(feature = "auth")]
-use super::AuthMintConnector;
 use crate::nuts::Id;
 #[cfg(feature = "auth")]
 use crate::wallet::AuthHttpClient;
@@ -97,17 +96,10 @@ impl MintKeyCache {
 /// Shared worker state for a mint
 ///
 /// One worker per mint_url, shared across all KeyManager instances for that mint.
-#[allow(dead_code)]
 struct SharedWorker {
     /// Mint URL
     mint_url: MintUrl,
 
-    /// Client for HTTP requests (shared across all instances)
-    client: Arc<dyn MintConnector + Send + Sync>,
-
-    #[cfg(feature = "auth")]
-    auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
-
     /// All storages registered for this mint
     storages:
         Arc<ParkingLotRwLock<Vec<Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>>>>,
@@ -122,6 +114,17 @@ struct SharedWorker {
     task: Option<JoinHandle<()>>,
 }
 
+impl Debug for SharedWorker {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SharedWorker")
+            .field("mint_url", &self.mint_url)
+            .field("storages", &self.storages.read().len().to_string())
+            .field("tx", &self.tx.is_closed())
+            .field("cache", &self.cache)
+            .finish()
+    }
+}
+
 impl Drop for SharedWorker {
     fn drop(&mut self) {
         tracing::debug!("Dropping SharedWorker for {}", self.mint_url);
@@ -187,7 +190,10 @@ impl KeyManager {
                 // Reuse existing worker
                 tracing::debug!("Reusing existing worker for {}", mint_url);
                 existing_worker.storages.write().push(storage.clone());
-                let _ = existing_worker.tx.send(MessageToWorker::SyncDb(storage));
+                let _ = existing_worker
+                    .tx
+                    .send(MessageToWorker::SyncDb(storage))
+                    .expect("sss");
                 existing_worker.clone()
             } else {
                 let mut registry = WORKER_REGISTRY.write();
@@ -225,21 +231,18 @@ impl KeyManager {
                                 refresh_interval,
                             ))
                         };
+                        //
+                        // Trigger initial sync
+                        let _ = tx.send(MessageToWorker::FetchMint).expect("send");
 
                         let worker = Arc::new(SharedWorker {
                             mint_url: mint_url.clone(),
-                            client,
-                            #[cfg(feature = "auth")]
-                            auth_client,
                             storages,
                             cache,
-                            tx: tx.clone(),
+                            tx,
                             task: Some(task),
                         });
 
-                        // Trigger initial sync
-                        let _ = tx.send(MessageToWorker::FetchMint);
-
                         worker
                     })
                     .clone()
@@ -259,7 +262,8 @@ impl KeyManager {
             .worker
             .tx
             .send(msg)
-            .inspect_err(|e| tracing::error!("Failed to send message to refresh task: {}", e));
+            .inspect_err(|e| tracing::error!("Failed to send message to refresh task: {}", e))
+            .expect(&format!("send {}", self.worker.tx.is_closed()));
     }
 
     /// Get keys for a keyset (cache-first with automatic refresh)
@@ -351,7 +355,6 @@ impl KeyManager {
         for _ in 0..MAX_RETRY {
             if let Some(keysets) = {
                 let cache = shared_cache.load();
-                println!("{:?}", cache);
                 if last_version > 0 || cache.refresh_version > 0 {
                     Some(
                         cache

+ 13 - 16
crates/cdk/src/wallet/key_manager/worker.rs

@@ -146,11 +146,7 @@ impl KeyManager {
             }
         }
 
-        // Mark as ready only if we found mint info
-        if cache.mint_info.is_some() {
-            cache.refresh_version += 1;
-            cache.is_ready = true;
-        }
+        Self::finalize_cache(&mut cache, 0);
 
         tracing::debug!(
             "Loaded {} keys from storage for {}",
@@ -166,7 +162,7 @@ impl KeyManager {
     /// Sets the cache as ready and updates metadata.
     fn finalize_cache(cache: &mut MintKeyCache, previous_version: u64) {
         cache.refresh_version = previous_version + 1;
-        cache.is_ready = true;
+        cache.is_ready = cache.mint_info.is_some();
         cache.last_refresh = std::time::Instant::now();
     }
 
@@ -185,7 +181,9 @@ impl KeyManager {
             }
 
             // Get keyset info
-            let Some(keyset_info) = cache.keysets_by_id.get(keyset_id) else {
+            let keyset_info = if let Some(keyset_info) = cache.keysets_by_id.get(keyset_id) {
+                keyset_info
+            } else {
                 tracing::warn!("Missing keyset info for {}", keyset_id);
                 continue;
             };
@@ -198,14 +196,14 @@ impl KeyManager {
                 keys: (**keys).clone(),
             };
 
-            if let Err(e) = storage.add_keys(keyset).await {
+            let _ = storage.add_keys(keyset).await.inspect_err(|e| {
                 tracing::warn!(
                     "Failed to write keys for {} to {}: {}",
                     keyset_id,
                     mint_url,
                     e
-                );
-            }
+                )
+            });
         }
     }
 
@@ -370,8 +368,9 @@ impl KeyManager {
                         )
                     })
             {
-                let keys = Arc::new(keyset.keys.clone());
-                new_cache.keys_by_id.insert(keyset_info.id, keys);
+                new_cache
+                    .keys_by_id
+                    .insert(keyset_info.id, Arc::new(keyset.keys));
             }
         }
 
@@ -413,9 +412,7 @@ impl KeyManager {
     ) {
         spawn(async move {
             // Try loading from storage first
-            if Self::try_load_cache_from_storages(&mint_url, &storages, &cache).await {
-                return;
-            }
+            Self::try_load_cache_from_storages(&mint_url, &storages, &cache).await;
 
             // If still not ready, fetch from HTTP
             let result = tokio::time::timeout(
@@ -472,7 +469,7 @@ impl KeyManager {
                 Some(msg) = rx.recv() => {
                     match msg {
                         MessageToWorker::Stop => {
-                            tracing::debug!("Stopping refresh task for {}", mint_url);
+                            tracing::debug!("Stopping loop {}", mint_url);
                             break;
                         }
                         MessageToWorker::SyncDb(db) => {