Cesar Rodas 3 săptămâni în urmă
părinte
comite
dff6217c1d

+ 4 - 6
crates/cdk/src/wallet/key_manager/mod.rs

@@ -27,7 +27,7 @@ use arc_swap::ArcSwap;
 use cdk_common::database::{self, WalletDatabase};
 use cdk_common::mint_url::MintUrl;
 use cdk_common::nuts::{KeySetInfo, Keys};
-use cdk_common::parking_lot::{Mutex as ParkingLotMutex, RwLock as ParkingLotRwLock};
+use cdk_common::parking_lot::RwLock as ParkingLotRwLock;
 use cdk_common::task::spawn;
 use cdk_common::MintInfo;
 use once_cell::sync::Lazy;
@@ -184,6 +184,7 @@ impl KeyManager {
                 // Reuse existing worker
                 tracing::debug!("Reusing existing worker for {}", mint_url);
                 existing_worker.storages.write().push(storage.clone());
+                let _ = existing_worker.tx.send(MessageToWorker::SyncDb(storage));
                 existing_worker.clone()
             } else {
                 let mut registry = WORKER_REGISTRY.write();
@@ -234,7 +235,7 @@ impl KeyManager {
                         });
 
                         // Trigger initial sync
-                        let _ = tx.send(MessageToWorker::SyncMint);
+                        let _ = tx.send(MessageToWorker::FetchMint);
 
                         worker
                     })
@@ -272,7 +273,6 @@ impl KeyManager {
                     .cloned()
                     .ok_or(Error::UnknownKeySet);
             }
-            self.send_message(MessageToWorker::SyncMint);
             tokio::time::sleep(RETRY_SLEEP).await;
         }
 
@@ -290,7 +290,6 @@ impl KeyManager {
                     .cloned()
                     .ok_or(Error::UnknownKeySet);
             }
-            self.send_message(MessageToWorker::SyncMint);
             tokio::time::sleep(RETRY_SLEEP).await;
         }
 
@@ -316,7 +315,6 @@ impl KeyManager {
                 };
             }
 
-            self.send_message(MessageToWorker::SyncMint);
             tokio::time::sleep(RETRY_SLEEP).await;
         }
 
@@ -331,7 +329,6 @@ impl KeyManager {
             if cache.is_ready {
                 return Ok(cache.active_keysets.clone());
             }
-            self.send_message(MessageToWorker::SyncMint);
             tokio::time::sleep(RETRY_SLEEP).await;
         }
 
@@ -351,6 +348,7 @@ impl KeyManager {
         for _ in 0..MAX_RETRY {
             if let Some(keysets) = {
                 let cache = shared_cache.load();
+                println!("{:?}", cache);
                 if last_version > 0 || cache.refresh_version > 0 {
                     Some(
                         cache

+ 20 - 64
crates/cdk/src/wallet/key_manager/worker.rs

@@ -4,12 +4,13 @@ use std::sync::Arc;
 use std::time::Duration;
 
 use arc_swap::ArcSwap;
-use cdk_common::database::WalletDatabase;
+use cdk_common::database::{self, WalletDatabase};
 use cdk_common::mint_url::MintUrl;
 use cdk_common::parking_lot::RwLock as ParkingLotRwLock;
 use cdk_common::task::spawn;
 use cdk_common::util::unix_time;
 use cdk_common::KeySet;
+use tokio::time::sleep;
 
 use super::scheduler::RefreshScheduler;
 use super::{KeyManager, MintKeyCache};
@@ -26,10 +27,8 @@ pub(super) enum MessageToWorker {
     /// Stop the refresh task
     Stop,
 
-    /// Make sure the mint is loaded, from either any localstore or remotely.
     ///
-    /// This function also makes sure all storages have a copy of the mint info and keys
-    SyncMint,
+    SyncDb(Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>),
 
     /// Fetch keys from the mint immediately
     FetchMint,
@@ -148,8 +147,10 @@ impl KeyManager {
             }
         }
 
-        storage_cache.refresh_version += 1;
-        storage_cache.is_ready = true;
+        if storage_cache.mint_info.is_some() {
+            storage_cache.refresh_version += 1;
+            storage_cache.is_ready = true;
+        }
 
         tracing::debug!(
             "Loaded {} keys from storage for {}",
@@ -345,53 +346,6 @@ impl KeyManager {
         Ok::<(), Error>(())
     }
 
-    pub(super) fn sync_mint_task(
-        mint_url: MintUrl,
-        client: Arc<dyn MintConnector + Send + Sync>,
-        #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
-        storages: Arc<
-            ParkingLotRwLock<
-                Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
-            >,
-        >,
-        cache: Arc<ArcSwap<MintKeyCache>>,
-        refresh_scheduler: RefreshScheduler,
-    ) {
-        spawn(async move {
-            if !cache.load().is_ready {
-                if let Ok(new_cache) = Self::fetch_mint_info_and_keys_from_db(&mint_url, &storages)
-                    .await
-                    .inspect_err(|e| {
-                        tracing::warn!("Failed to load keys from storage for {}: {}", mint_url, e)
-                    })
-                {
-                    let new_cache = Arc::new(new_cache);
-                    Self::persist_cache(&storages, mint_url.clone(), new_cache.clone()).await;
-                    cache.store(new_cache);
-                }
-            }
-
-            if !cache.load().is_ready {
-                let _ = tokio::time::timeout(
-                    Duration::from_secs(60),
-                    Self::fetch_from_http(
-                        mint_url.clone(),
-                        client,
-                        #[cfg(feature = "auth")]
-                        auth_client,
-                        storages,
-                        cache,
-                        refresh_scheduler,
-                    ),
-                )
-                .await
-                .inspect_err(|e| {
-                    tracing::warn!("Failed to fetch keys for {} with error {}", mint_url, e)
-                });
-            }
-        });
-    }
-
     /// Refresh keys from mint server
     ///
     /// Spawns an async task with 60s timeout.
@@ -488,17 +442,19 @@ impl KeyManager {
                             tracing::debug!("Stopping refresh task for {}", mint_url);
                             break;
                         }
-                        MessageToWorker::SyncMint => {
-                            tracing::debug!("Sync mint {}", mint_url);
-                            Self::sync_mint_task(
-                                mint_url.clone(),
-                                client.clone(),
-                                #[cfg(feature = "auth")]
-                                auth_client.clone(),
-                                storages.clone(),
-                                cache.clone(),
-                                refresh_scheduler.clone(),
-                            );
+                        MessageToWorker::SyncDb(db) => {
+                            let cache_for_spawn = cache.clone();
+                            let mint_for_spawn = mint_url.clone();
+                            tokio::spawn(async move {
+                                loop {
+                                    let cache = cache_for_spawn.load();
+                                    if cache.is_ready {
+                                        Self::persist_cache_db(db, mint_for_spawn, cache.clone()).await;
+                                        break;
+                                    }
+                                    sleep(Duration::from_millis(100)).await;
+                                }
+                            });
                         }
                         MessageToWorker::FetchMint => {
                             tracing::debug!("FetchMint message received for {}", mint_url);