Cesar Rodas 3 weeks ago
parent
commit
2053872a09
2 changed files with 50 additions and 36 deletions
  1. 29 16
      crates/cdk/src/wallet/key_manager/mod.rs
  2. 21 20
      crates/cdk/src/wallet/key_manager/worker.rs

+ 29 - 16
crates/cdk/src/wallet/key_manager/mod.rs

@@ -33,7 +33,6 @@ use cdk_common::task::spawn;
 use cdk_common::MintInfo;
 use once_cell::sync::Lazy;
 use tokio::sync::mpsc;
-use tokio::task::JoinHandle;
 use worker::{MessageToWorker, SharedWorker};
 
 mod worker;
@@ -147,10 +146,12 @@ impl KeyManager {
                 // Reuse existing worker
                 tracing::debug!("Reusing existing worker for {}", mint_url);
                 existing_worker.storages.write().push(storage.clone());
-                let _ = existing_worker
+                existing_worker
                     .tx
                     .try_send(MessageToWorker::SyncDb(storage))
-                    .inspect_err(|e| tracing::error!("Failed to send SyncDb message for {}: {}", mint_url, e))
+                    .inspect_err(|e| {
+                        tracing::error!("Failed to send SyncDb message for {}: {}", mint_url, e)
+                    })
                     .expect("send SyncDb");
                 existing_worker.clone()
             } else {
@@ -191,22 +192,23 @@ impl KeyManager {
                         };
                         //
                         // Trigger initial sync
-                        let _ = tx.try_send(MessageToWorker::FetchMint)
-                            .inspect_err(|e| tracing::error!("Failed to send initial FetchMint message for {}: {}", mint_url, e))
+                        tx.try_send(MessageToWorker::FetchMint)
+                            .inspect_err(|e| {
+                                tracing::error!(
+                                    "Failed to send initial FetchMint message for {}: {}",
+                                    mint_url,
+                                    e
+                                )
+                            })
                             .expect("send initial FetchMint");
 
-                        let worker = Arc::new(SharedWorker {
+                        Arc::new(SharedWorker {
                             mint_url: mint_url.clone(),
-                            client: client.clone(),
-                            #[cfg(feature = "auth")]
-                            auth_client: auth_client.clone(),
                             storages,
                             cache,
                             tx,
                             task: Some(task),
-                        });
-
-                        worker
+                        })
                     })
                     .clone()
             }
@@ -221,12 +223,23 @@ impl KeyManager {
 
     /// Send a message to the background refresh task
     fn send_message(&self, msg: MessageToWorker) {
-        let _ = self
-            .worker
+        self.worker
             .tx
             .try_send(msg)
-            .inspect_err(|e| tracing::error!("Failed to send message to refresh task for {}: {}", self.mint_url, e))
-            .expect(&format!("send {} (closed: {})", self.mint_url, self.worker.tx.is_closed()));
+            .inspect_err(|e| {
+                tracing::error!(
+                    "Failed to send message to refresh task for {}: {}",
+                    self.mint_url,
+                    e
+                )
+            })
+            .unwrap_or_else(|_| {
+                panic!(
+                    "send {} (closed: {})",
+                    self.mint_url,
+                    self.worker.tx.is_closed()
+                )
+            });
     }
 
     /// Get keys for a keyset (cache-first with automatic refresh)

+ 21 - 20
crates/cdk/src/wallet/key_manager/worker.rs

@@ -47,12 +47,6 @@ pub(super) struct SharedWorker {
     /// Mint URL
     pub(super) mint_url: MintUrl,
 
-    /// Client for HTTP requests (shared across all instances)
-    pub(super) client: Arc<dyn MintConnector + Send + Sync>,
-
-    #[cfg(feature = "auth")]
-    pub(super) auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
-
     /// All storages registered for this mint
     pub(super) storages: StorageList,
 
@@ -181,7 +175,7 @@ async fn load_cache_from_storages(
         }
 
         // Collect unique keys from all storages
-        for (id, _keyset_info) in &cache.keysets_by_id {
+        for id in cache.keysets_by_id.keys() {
             if !cache.keys_by_id.contains_key(id) {
                 if let Some(keys) = storage.get_keys(id).await? {
                     cache.keys_by_id.insert(*id, Arc::new(keys));
@@ -260,13 +254,15 @@ async fn write_cache_to_storage(
         let _ = storage
             .add_mint(mint_url.clone(), Some(mint_info.clone()))
             .await
-            .inspect_err(|e| {
-                tracing::warn!("Failed to persist mint_info for {}: {}", mint_url, e)
-            });
+            .inspect_err(|e| tracing::warn!("Failed to persist mint_info for {}: {}", mint_url, e));
     }
 
     // Write keysets
-    let keysets: Vec<_> = cache.keysets_by_id.values().map(|ks| (**ks).clone()).collect();
+    let keysets: Vec<_> = cache
+        .keysets_by_id
+        .values()
+        .map(|ks| (**ks).clone())
+        .collect();
     let _ = storage
         .add_mint_keysets(mint_url.clone(), keysets)
         .await
@@ -378,15 +374,15 @@ pub(super) async fn fetch_mint_data_from_http(
 
         // Load keyset keys (try database first, then HTTP)
         if let Ok(keyset) = load_keyset_from_db_or_http(mint_url, client, storages, &keyset_info.id)
-                .await
-                .inspect_err(|e| {
-                    tracing::warn!(
-                        "Failed to load keyset {} for {}: {}",
-                        keyset_info.id,
-                        mint_url,
-                        e
-                    )
-                })
+            .await
+            .inspect_err(|e| {
+                tracing::warn!(
+                    "Failed to load keyset {} for {}: {}",
+                    keyset_info.id,
+                    mint_url,
+                    e
+                )
+            })
         {
             new_cache
                 .keys_by_id
@@ -487,6 +483,8 @@ pub(super) async fn refresh_loop(
         cache.clone(),
     );
 
+    println!("begin refresh_loop");
+
     loop {
         tokio::select! {
             Some(msg) = rx.recv() => {
@@ -538,9 +536,12 @@ pub(super) async fn refresh_loop(
                     storages.clone(),
                     cache.clone(),
                 );
+            } else => {
+                break;
             }
         }
     }
 
     tracing::debug!("Refresh loop stopped for {}", mint_url);
+    println!("end refresh_loop");
 }