|
|
@@ -3,19 +3,23 @@
|
|
|
use std::sync::Arc;
|
|
|
use std::time::Duration;
|
|
|
|
|
|
+use arc_swap::ArcSwap;
|
|
|
use cdk_common::database::WalletDatabase;
|
|
|
use cdk_common::mint_url::MintUrl;
|
|
|
+use cdk_common::parking_lot::RwLock as ParkingLotRwLock;
|
|
|
use cdk_common::task::spawn;
|
|
|
use cdk_common::util::unix_time;
|
|
|
use cdk_common::KeySet;
|
|
|
-use tokio::sync::Semaphore;
|
|
|
-use tracing::{debug, error, warn};
|
|
|
|
|
|
use super::scheduler::RefreshScheduler;
|
|
|
-use super::{KeyManager, MintKeyCache, MintRegistration, Mints};
|
|
|
+use super::{KeyManager, MintKeyCache};
|
|
|
use crate::nuts::Id;
|
|
|
+use crate::wallet::MintConnector;
|
|
|
use crate::Error;
|
|
|
|
|
|
+#[cfg(feature = "auth")]
|
|
|
+use crate::wallet::AuthMintConnector;
|
|
|
+
|
|
|
/// Messages for the background refresh task
|
|
|
#[derive(Debug, Clone)]
|
|
|
pub(super) enum MessageToWorker {
|
|
|
@@ -24,36 +28,34 @@ pub(super) enum MessageToWorker {
|
|
|
|
|
|
/// Make sure the mint is loaded, from either any localstore or remotely.
|
|
|
///
|
|
|
- /// This function also make sure all stores have a copy of the mint info and keys
|
|
|
- SyncMint(MintUrl),
|
|
|
+ /// This function also makes sure all storages have a copy of the mint info and keys
|
|
|
+ SyncMint,
|
|
|
|
|
|
- /// Fetch keys for a specific mint immediately
|
|
|
- FetchMint(MintUrl),
|
|
|
+ /// Fetch keys from the mint immediately
|
|
|
+ FetchMint,
|
|
|
}
|
|
|
|
|
|
impl KeyManager {
|
|
|
/// Load a specific keyset from database or HTTP
|
|
|
///
|
|
|
- /// First checks all registered databases for the keyset. If not found,
|
|
|
+ /// First checks all databases for the keyset. If not found,
|
|
|
/// fetches from the mint server via HTTP and persists to all databases.
|
|
|
pub(super) async fn load_keyset_from_db_or_http(
|
|
|
- registration: &MintRegistration,
|
|
|
+ mint_url: &MintUrl,
|
|
|
+ storages: &Arc<
|
|
|
+ ParkingLotRwLock<
|
|
|
+ Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
|
|
|
+ >,
|
|
|
+ >,
|
|
|
+ client: &Arc<dyn MintConnector + Send + Sync>,
|
|
|
keyset_id: &Id,
|
|
|
) -> Result<KeySet, Error> {
|
|
|
- let storages = registration
|
|
|
- .resources
|
|
|
- .read()
|
|
|
- .values()
|
|
|
- .map(|resource| resource.storage.clone())
|
|
|
- .collect::<Vec<_>>();
|
|
|
-
|
|
|
- // Try database first
|
|
|
- for storage in &storages {
|
|
|
+ let storages_list = storages.read().clone();
|
|
|
+
|
|
|
+ // Try all databases first
|
|
|
+ for storage in &storages_list {
|
|
|
if let Some(keys) = storage.get_keys(keyset_id).await? {
|
|
|
- debug!(
|
|
|
- "Loaded keyset {} from database for {}",
|
|
|
- keyset_id, registration.mint_url
|
|
|
- );
|
|
|
+ tracing::debug!("Loaded keyset {} from database for {}", keyset_id, mint_url);
|
|
|
|
|
|
// Get keyset info to construct KeySet
|
|
|
if let Some(keyset_info) = storage.get_keyset_by_id(keyset_id).await? {
|
|
|
@@ -67,71 +69,59 @@ impl KeyManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Not in database, fetch from HTTP
|
|
|
- debug!(
|
|
|
+ // Not in any database, fetch from HTTP
|
|
|
+ tracing::debug!(
|
|
|
"Keyset {} not in database, fetching from mint server for {}",
|
|
|
- keyset_id, registration.mint_url
|
|
|
+ keyset_id,
|
|
|
+ mint_url
|
|
|
);
|
|
|
|
|
|
- let http_client = registration
|
|
|
- .resources
|
|
|
- .read()
|
|
|
- .values()
|
|
|
- .next()
|
|
|
- .ok_or(Error::IncorrectMint)?
|
|
|
- .client
|
|
|
- .clone();
|
|
|
-
|
|
|
- let keyset = http_client.get_mint_keyset(*keyset_id).await?;
|
|
|
+ let keyset = client.get_mint_keyset(*keyset_id).await?;
|
|
|
keyset.verify_id()?;
|
|
|
|
|
|
// Persist to all databases
|
|
|
- for storage in &storages {
|
|
|
+ for storage in &storages_list {
|
|
|
let _ = storage.add_keys(keyset.clone()).await.inspect_err(|e| {
|
|
|
- warn!(
|
|
|
+ tracing::warn!(
|
|
|
"Failed to persist keyset {} for {}: {}",
|
|
|
- keyset_id, registration.mint_url, e
|
|
|
+ keyset_id,
|
|
|
+ mint_url,
|
|
|
+ e
|
|
|
)
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- debug!(
|
|
|
- "Loaded keyset {} from HTTP for {}",
|
|
|
- keyset_id, registration.mint_url
|
|
|
- );
|
|
|
+ tracing::debug!("Loaded keyset {} from HTTP for {}", keyset_id, mint_url);
|
|
|
|
|
|
Ok(keyset)
|
|
|
}
|
|
|
|
|
|
- /// Load mint info and keys from all registered databases
|
|
|
+ /// Load mint info and keys from all databases
|
|
|
///
|
|
|
- /// Iterates through all storage backends and loads keysets and keys into cache.
|
|
|
+ /// Iterates through all storages and loads keysets and keys into cache.
|
|
|
/// This is called on first access when cache is empty.
|
|
|
pub(super) async fn fetch_mint_info_and_keys_from_db(
|
|
|
- registration: &MintRegistration,
|
|
|
- ) -> Result<(), Error> {
|
|
|
- debug!(
|
|
|
- "Cache empty, loading from storage first for {}",
|
|
|
- registration.mint_url
|
|
|
- );
|
|
|
+ mint_url: &MintUrl,
|
|
|
+ storages: &Arc<
|
|
|
+ ParkingLotRwLock<
|
|
|
+ Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
|
|
|
+ >,
|
|
|
+ >,
|
|
|
+ ) -> Result<MintKeyCache, Error> {
|
|
|
+ tracing::debug!("Cache empty, loading from storage first for {}", mint_url);
|
|
|
|
|
|
let mut storage_cache = MintKeyCache::empty();
|
|
|
- let storages = registration
|
|
|
- .resources
|
|
|
- .read()
|
|
|
- .values()
|
|
|
- .map(|resource| resource.storage.clone())
|
|
|
- .collect::<Vec<_>>();
|
|
|
-
|
|
|
- for storage in storages {
|
|
|
+ let storages_list = storages.read().clone();
|
|
|
+
|
|
|
+ for storage in storages_list {
|
|
|
if storage_cache.mint_info.is_none() {
|
|
|
- storage_cache.mint_info = storage.get_mint(registration.mint_url.clone()).await?;
|
|
|
+ storage_cache.mint_info = storage.get_mint(mint_url.clone()).await?;
|
|
|
}
|
|
|
|
|
|
for keyset in storage
|
|
|
- .get_mint_keysets(registration.mint_url.clone())
|
|
|
+ .get_mint_keysets(mint_url.clone())
|
|
|
.await?
|
|
|
- .ok_or(Error::UnknownKeySet)?
|
|
|
+ .unwrap_or_default()
|
|
|
{
|
|
|
if storage_cache.keysets_by_id.contains_key(&keyset.id) {
|
|
|
continue;
|
|
|
@@ -158,21 +148,16 @@ impl KeyManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- let keys_count = storage_cache.keys_by_id.len();
|
|
|
storage_cache.refresh_version += 1;
|
|
|
storage_cache.is_ready = true;
|
|
|
|
|
|
- let storage_cache = Arc::new(storage_cache);
|
|
|
- registration.cache.store(storage_cache.clone());
|
|
|
-
|
|
|
- Self::persist_cache(registration, storage_cache).await;
|
|
|
-
|
|
|
- debug!(
|
|
|
+ tracing::debug!(
|
|
|
"Loaded {} keys from storage for {}",
|
|
|
- keys_count, registration.mint_url
|
|
|
+ storage_cache.keys_by_id.len(),
|
|
|
+ mint_url
|
|
|
);
|
|
|
|
|
|
- Ok(())
|
|
|
+ Ok(storage_cache)
|
|
|
}
|
|
|
|
|
|
/// Persist cache to a single database
|
|
|
@@ -193,9 +178,10 @@ impl KeyManager {
|
|
|
.add_mint(mint_url.clone(), new_cache.mint_info.clone())
|
|
|
.await
|
|
|
.inspect_err(|e| {
|
|
|
- warn!("Failed to persist mint_info for {}: {}", mint_url, e);
|
|
|
+ tracing::warn!("Failed to persist mint_info for {}: {}", mint_url, e);
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
let _ = storage
|
|
|
.add_mint_keysets(
|
|
|
mint_url.clone(),
|
|
|
@@ -206,20 +192,20 @@ impl KeyManager {
|
|
|
.collect(),
|
|
|
)
|
|
|
.await
|
|
|
- .inspect_err(|e| warn!("Failed to persist keysets for {}: {}", mint_url, e));
|
|
|
+ .inspect_err(|e| tracing::warn!("Failed to persist keysets for {}: {}", mint_url, e));
|
|
|
|
|
|
for (keyset_id, keys) in new_cache.keys_by_id.iter() {
|
|
|
if storage
|
|
|
.get_keys(keyset_id)
|
|
|
.await
|
|
|
- .inspect_err(|e| warn!("Failed to get_keys {e}"))
|
|
|
+ .inspect_err(|e| tracing::warn!("Failed to get_keys {e}"))
|
|
|
.unwrap_or_default()
|
|
|
.is_none()
|
|
|
{
|
|
|
let keyset = if let Some(v) = new_cache.keysets_by_id.get(keyset_id) {
|
|
|
v
|
|
|
} else {
|
|
|
- warn!("Malformed keysets, cannot find {}", keyset_id);
|
|
|
+ tracing::warn!("Malformed keysets, cannot find {}", keyset_id);
|
|
|
continue;
|
|
|
};
|
|
|
let _ = storage
|
|
|
@@ -231,7 +217,7 @@ impl KeyManager {
|
|
|
})
|
|
|
.await
|
|
|
.inspect_err(|e| {
|
|
|
- warn!("Failed to persist keys for keyset {}: {}", keyset_id, e)
|
|
|
+ tracing::warn!("Failed to persist keys for keyset {}: {}", keyset_id, e)
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
@@ -241,20 +227,20 @@ impl KeyManager {
|
|
|
///
|
|
|
/// Spawns a task for each storage backend to write cache asynchronously.
|
|
|
pub(super) async fn persist_cache(
|
|
|
- registration: &MintRegistration,
|
|
|
+ storages: &Arc<
|
|
|
+ ParkingLotRwLock<
|
|
|
+ Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
|
|
|
+ >,
|
|
|
+ >,
|
|
|
+ mint_url: MintUrl,
|
|
|
new_cache: Arc<MintKeyCache>,
|
|
|
) {
|
|
|
- let storages = registration
|
|
|
- .resources
|
|
|
- .read()
|
|
|
- .values()
|
|
|
- .map(|x| x.storage.clone())
|
|
|
- .collect::<Vec<_>>();
|
|
|
-
|
|
|
- for storage in storages {
|
|
|
+ let storages_list = storages.read().clone();
|
|
|
+
|
|
|
+ for storage in storages_list {
|
|
|
spawn(Self::persist_cache_db(
|
|
|
storage,
|
|
|
- registration.mint_url.clone(),
|
|
|
+ mint_url.clone(),
|
|
|
new_cache.clone(),
|
|
|
));
|
|
|
}
|
|
|
@@ -265,34 +251,20 @@ impl KeyManager {
|
|
|
/// Fetches mint info, keysets, and keys from the mint server. Updates cache
|
|
|
/// and schedules next refresh. Persists new data to all databases.
|
|
|
pub(super) async fn fetch_from_http(
|
|
|
- registration: MintRegistration,
|
|
|
+ mint_url: MintUrl,
|
|
|
+ client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
+ #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
+ storages: Arc<
|
|
|
+ ParkingLotRwLock<
|
|
|
+ Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
|
|
|
+ >,
|
|
|
+ >,
|
|
|
+ cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
refresh_scheduler: RefreshScheduler,
|
|
|
) -> Result<(), Error> {
|
|
|
- debug!(
|
|
|
- "Fetching keys from mint server for {}",
|
|
|
- registration.mint_url
|
|
|
- );
|
|
|
-
|
|
|
- let http_client = registration
|
|
|
- .resources
|
|
|
- .read()
|
|
|
- .values()
|
|
|
- .next()
|
|
|
- .ok_or(Error::IncorrectMint)?
|
|
|
- .client
|
|
|
- .clone();
|
|
|
-
|
|
|
- #[cfg(feature = "auth")]
|
|
|
- let http_auth_client = registration
|
|
|
- .resources
|
|
|
- .read()
|
|
|
- .values()
|
|
|
- .next()
|
|
|
- .ok_or(Error::IncorrectMint)?
|
|
|
- .auth_client
|
|
|
- .clone();
|
|
|
+ tracing::debug!("Fetching keys from mint server for {}", mint_url);
|
|
|
|
|
|
- let mint_info = http_client.get_mint_info().await?;
|
|
|
+ let mint_info = client.get_mint_info().await?;
|
|
|
|
|
|
if let Some(mint_unix_time) = mint_info.time {
|
|
|
let current_unix_time = unix_time();
|
|
|
@@ -306,20 +278,19 @@ impl KeyManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- let keysets_response = http_client.get_mint_keysets().await?;
|
|
|
+ let keysets_response = client.get_mint_keysets().await?;
|
|
|
|
|
|
let keysets = keysets_response.keysets;
|
|
|
|
|
|
#[cfg(feature = "auth")]
|
|
|
- let keysets = if let Ok(auth_keysets_response) =
|
|
|
- http_auth_client.get_mint_blind_auth_keysets().await
|
|
|
- {
|
|
|
- let mut keysets = keysets;
|
|
|
- keysets.extend_from_slice(&auth_keysets_response.keysets);
|
|
|
- keysets
|
|
|
- } else {
|
|
|
- keysets
|
|
|
- };
|
|
|
+ let keysets =
|
|
|
+ if let Ok(auth_keysets_response) = auth_client.get_mint_blind_auth_keysets().await {
|
|
|
+ let mut keysets = keysets;
|
|
|
+ keysets.extend_from_slice(&auth_keysets_response.keysets);
|
|
|
+ keysets
|
|
|
+ } else {
|
|
|
+ keysets
|
|
|
+ };
|
|
|
|
|
|
let mut new_cache = MintKeyCache::empty();
|
|
|
|
|
|
@@ -334,143 +305,157 @@ impl KeyManager {
|
|
|
}
|
|
|
|
|
|
// Try to load keyset from database first, then HTTP
|
|
|
- if let Ok(keyset) = Self::load_keyset_from_db_or_http(®istration, &keyset_info.id)
|
|
|
- .await
|
|
|
- .inspect_err(|e| {
|
|
|
- warn!(
|
|
|
- "Failed to load keyset {} for {}: {}",
|
|
|
- keyset_info.id, registration.mint_url, e
|
|
|
- )
|
|
|
- })
|
|
|
+ if let Ok(keyset) =
|
|
|
+ Self::load_keyset_from_db_or_http(&mint_url, &storages, &client, &keyset_info.id)
|
|
|
+ .await
|
|
|
+ .inspect_err(|e| {
|
|
|
+ tracing::warn!(
|
|
|
+ "Failed to load keyset {} for {}: {}",
|
|
|
+ keyset_info.id,
|
|
|
+ mint_url,
|
|
|
+ e
|
|
|
+ )
|
|
|
+ })
|
|
|
{
|
|
|
let keys = Arc::new(keyset.keys.clone());
|
|
|
new_cache.keys_by_id.insert(keyset_info.id, keys);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- refresh_scheduler.schedule_refresh(registration.mint_url.clone());
|
|
|
+ refresh_scheduler.schedule_refresh(mint_url.clone());
|
|
|
|
|
|
- let old_generation = registration.cache.load().refresh_version;
|
|
|
+ let old_generation = cache.load().refresh_version;
|
|
|
new_cache.mint_info = Some(mint_info);
|
|
|
new_cache.refresh_version = old_generation + 1;
|
|
|
new_cache.is_ready = true;
|
|
|
new_cache.last_refresh = std::time::Instant::now();
|
|
|
|
|
|
- debug!(
|
|
|
+ tracing::debug!(
|
|
|
"Refreshed {} keysets and {} keys for {} (generation {})",
|
|
|
new_cache.keysets_by_id.len(),
|
|
|
new_cache.keys_by_id.len(),
|
|
|
- registration.mint_url,
|
|
|
+ mint_url,
|
|
|
new_cache.refresh_version
|
|
|
);
|
|
|
|
|
|
let new_cache = Arc::new(new_cache);
|
|
|
- Self::persist_cache(®istration, new_cache.clone()).await;
|
|
|
- registration.cache.store(new_cache);
|
|
|
+ Self::persist_cache(&storages, mint_url, new_cache.clone()).await;
|
|
|
+ cache.store(new_cache);
|
|
|
|
|
|
Ok::<(), Error>(())
|
|
|
}
|
|
|
|
|
|
pub(super) fn sync_mint_task(
|
|
|
- registration: MintRegistration,
|
|
|
+ mint_url: MintUrl,
|
|
|
+ client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
+ #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
+ storages: Arc<
|
|
|
+ ParkingLotRwLock<
|
|
|
+ Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
|
|
|
+ >,
|
|
|
+ >,
|
|
|
+ cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
refresh_scheduler: RefreshScheduler,
|
|
|
) {
|
|
|
spawn(async move {
|
|
|
- if !registration.cache.load().is_ready {
|
|
|
- let _ = Self::fetch_mint_info_and_keys_from_db(®istration)
|
|
|
+ if !cache.load().is_ready {
|
|
|
+ if let Ok(new_cache) = Self::fetch_mint_info_and_keys_from_db(&mint_url, &storages)
|
|
|
.await
|
|
|
.inspect_err(|e| {
|
|
|
- warn!(
|
|
|
- "Failed to load keys from storage for {}: {}",
|
|
|
- registration.mint_url, e
|
|
|
- )
|
|
|
- });
|
|
|
+ tracing::warn!("Failed to load keys from storage for {}: {}", mint_url, e)
|
|
|
+ })
|
|
|
+ {
|
|
|
+ let new_cache = Arc::new(new_cache);
|
|
|
+ Self::persist_cache(&storages, mint_url.clone(), new_cache.clone()).await;
|
|
|
+ cache.store(new_cache);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- if !registration.cache.load().is_ready {
|
|
|
- let mint_url = registration.mint_url.clone();
|
|
|
+ if !cache.load().is_ready {
|
|
|
let _ = tokio::time::timeout(
|
|
|
Duration::from_secs(60),
|
|
|
- Self::fetch_from_http(registration, refresh_scheduler),
|
|
|
+ Self::fetch_from_http(
|
|
|
+ mint_url.clone(),
|
|
|
+ client,
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
+ auth_client,
|
|
|
+ storages,
|
|
|
+ cache,
|
|
|
+ refresh_scheduler,
|
|
|
+ ),
|
|
|
)
|
|
|
.await
|
|
|
- .inspect_err(|e| warn!("Failed to fetch keys for {} with error {}", mint_url, e));
|
|
|
+ .inspect_err(|e| {
|
|
|
+ tracing::warn!("Failed to fetch keys for {} with error {}", mint_url, e)
|
|
|
+ });
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/// Refresh keys from mint server
|
|
|
///
|
|
|
- /// Spawns an async task with 60s timeout. HTTP requests are limited to
|
|
|
- /// MAX_CONCURRENT_HTTP_REQUESTS concurrent requests via semaphore.
|
|
|
+ /// Spawns an async task with 60s timeout.
|
|
|
pub(super) fn fetch_and_sync_mint_task(
|
|
|
- registration: MintRegistration,
|
|
|
- semaphore: Arc<Semaphore>,
|
|
|
+ mint_url: MintUrl,
|
|
|
+ client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
+ #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
+ storages: Arc<
|
|
|
+ ParkingLotRwLock<
|
|
|
+ Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
|
|
|
+ >,
|
|
|
+ >,
|
|
|
+ cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
refresh_scheduler: RefreshScheduler,
|
|
|
) {
|
|
|
spawn(async move {
|
|
|
- if !registration.cache.load().is_ready {
|
|
|
- let _ = Self::fetch_mint_info_and_keys_from_db(®istration)
|
|
|
+ if !cache.load().is_ready {
|
|
|
+ if let Ok(new_cache) = Self::fetch_mint_info_and_keys_from_db(&mint_url, &storages)
|
|
|
.await
|
|
|
.inspect_err(|e| {
|
|
|
- warn!(
|
|
|
- "Failed to load keys from storage for {}: {}",
|
|
|
- registration.mint_url, e
|
|
|
- )
|
|
|
- });
|
|
|
+ tracing::warn!("Failed to load keys from storage for {}: {}", mint_url, e)
|
|
|
+ })
|
|
|
+ {
|
|
|
+ let new_cache = Arc::new(new_cache);
|
|
|
+ Self::persist_cache(&storages, mint_url.clone(), new_cache.clone()).await;
|
|
|
+ cache.store(new_cache);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- let Ok(http_permit) = semaphore.acquire().await.inspect_err(|e| {
|
|
|
- error!(
|
|
|
- "Failed to acquire HTTP permit for {}: {}",
|
|
|
- registration.mint_url, e
|
|
|
- );
|
|
|
- }) else {
|
|
|
- return;
|
|
|
- };
|
|
|
-
|
|
|
- debug!(
|
|
|
- "Acquired HTTP permit for {} ({} available)",
|
|
|
- registration.mint_url,
|
|
|
- semaphore.available_permits()
|
|
|
- );
|
|
|
-
|
|
|
- let mint_url = registration.mint_url.clone();
|
|
|
-
|
|
|
let result = tokio::time::timeout(
|
|
|
Duration::from_secs(60),
|
|
|
- Self::fetch_from_http(registration, refresh_scheduler),
|
|
|
+ Self::fetch_from_http(
|
|
|
+ mint_url.clone(),
|
|
|
+ client,
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
+ auth_client,
|
|
|
+ storages,
|
|
|
+ cache,
|
|
|
+ refresh_scheduler,
|
|
|
+ ),
|
|
|
)
|
|
|
.await;
|
|
|
|
|
|
- drop(http_permit);
|
|
|
-
|
|
|
let _ = result
|
|
|
.map_err(|_| Error::Timeout)
|
|
|
.and_then(|r| r)
|
|
|
.inspect(|_| {
|
|
|
- debug!(
|
|
|
+ tracing::debug!(
|
|
|
"Successfully fetched keys from mint server for {}",
|
|
|
mint_url
|
|
|
);
|
|
|
})
|
|
|
.inspect_err(|e| match e {
|
|
|
Error::Timeout => {
|
|
|
- error!("Timeout fetching keys from mint server for {}", mint_url)
|
|
|
+ tracing::error!("Timeout fetching keys from mint server for {}", mint_url)
|
|
|
}
|
|
|
_ => {
|
|
|
- error!(
|
|
|
+ tracing::error!(
|
|
|
"Failed to fetch keys from mint server for {}: {}",
|
|
|
- mint_url, e
|
|
|
+ mint_url,
|
|
|
+ e
|
|
|
)
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
- debug!(
|
|
|
- "Released HTTP permit for {} ({} available)",
|
|
|
- mint_url,
|
|
|
- semaphore.available_permits()
|
|
|
- );
|
|
|
});
|
|
|
}
|
|
|
|
|
|
@@ -480,75 +465,76 @@ impl KeyManager {
|
|
|
/// All refresh operations are spawned as separate tasks with timeouts.
|
|
|
pub(super) async fn refresh_loop(
|
|
|
mut rx: tokio::sync::mpsc::UnboundedReceiver<MessageToWorker>,
|
|
|
- mints: Mints,
|
|
|
+ mint_url: MintUrl,
|
|
|
+ client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
+ #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
+ storages: Arc<
|
|
|
+ ParkingLotRwLock<
|
|
|
+ Vec<Arc<dyn WalletDatabase<Err = cdk_common::database::Error> + Send + Sync>>,
|
|
|
+ >,
|
|
|
+ >,
|
|
|
+ cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
refresh_interval: Duration,
|
|
|
- semaphore: Arc<Semaphore>,
|
|
|
) {
|
|
|
let mut interval = tokio::time::interval(Duration::from_secs(1));
|
|
|
let refresh_scheduler = RefreshScheduler::new(refresh_interval);
|
|
|
+ let mut is_scheduled = false;
|
|
|
|
|
|
loop {
|
|
|
tokio::select! {
|
|
|
Some(msg) = rx.recv() => {
|
|
|
match msg {
|
|
|
MessageToWorker::Stop => {
|
|
|
- debug!("Stopping refresh task");
|
|
|
+ tracing::debug!("Stopping refresh task for {}", mint_url);
|
|
|
break;
|
|
|
}
|
|
|
- MessageToWorker::SyncMint(mint_url) => {
|
|
|
- debug!("Sync all instances of {}", mint_url);
|
|
|
- let registration = {
|
|
|
- let mints_lock = mints.read();
|
|
|
- mints_lock.get(&mint_url).cloned()
|
|
|
- };
|
|
|
-
|
|
|
- if let Some(reg) = registration {
|
|
|
- Self::sync_mint_task(reg, refresh_scheduler.clone());
|
|
|
- } else {
|
|
|
- warn!("FetchMint: Mint not registered: {}", mint_url);
|
|
|
- }
|
|
|
+ MessageToWorker::SyncMint => {
|
|
|
+ tracing::debug!("Sync mint {}", mint_url);
|
|
|
+ Self::sync_mint_task(
|
|
|
+ mint_url.clone(),
|
|
|
+ client.clone(),
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
+ auth_client.clone(),
|
|
|
+ storages.clone(),
|
|
|
+ cache.clone(),
|
|
|
+ refresh_scheduler.clone(),
|
|
|
+ );
|
|
|
}
|
|
|
- MessageToWorker::FetchMint(mint_url) => {
|
|
|
- debug!("FetchMint message received for {}", mint_url);
|
|
|
- let registration = {
|
|
|
- let mints_lock = mints.read();
|
|
|
- mints_lock.get(&mint_url).cloned()
|
|
|
- };
|
|
|
-
|
|
|
- if let Some(reg) = registration {
|
|
|
- Self::fetch_and_sync_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
|
|
|
- } else {
|
|
|
- warn!("FetchMint: Mint not registered: {}", mint_url);
|
|
|
- }
|
|
|
+ MessageToWorker::FetchMint => {
|
|
|
+ tracing::debug!("FetchMint message received for {}", mint_url);
|
|
|
+ Self::fetch_and_sync_mint_task(
|
|
|
+ mint_url.clone(),
|
|
|
+ client.clone(),
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
+ auth_client.clone(),
|
|
|
+ storages.clone(),
|
|
|
+ cache.clone(),
|
|
|
+ refresh_scheduler.clone(),
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
_ = interval.tick() => {
|
|
|
- debug!("Checking for mints due for refresh");
|
|
|
-
|
|
|
let due_mints = refresh_scheduler.get_due_refreshes();
|
|
|
|
|
|
- if !due_mints.is_empty() {
|
|
|
- debug!("Found {} mints due for refresh", due_mints.len());
|
|
|
- }
|
|
|
-
|
|
|
- for mint_url in due_mints {
|
|
|
- let registration = {
|
|
|
- let mints_lock = mints.read();
|
|
|
- mints_lock.get(&mint_url).cloned()
|
|
|
- };
|
|
|
-
|
|
|
- if let Some(reg) = registration {
|
|
|
- Self::fetch_and_sync_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
|
|
|
- } else {
|
|
|
- warn!("Mint no longer registered: {}", mint_url);
|
|
|
- }
|
|
|
+ if !due_mints.is_empty() || !is_scheduled {
|
|
|
+ tracing::debug!("Time to refresh mint: {}", mint_url);
|
|
|
+ Self::fetch_and_sync_mint_task(
|
|
|
+ mint_url.clone(),
|
|
|
+ client.clone(),
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
+ auth_client.clone(),
|
|
|
+ storages.clone(),
|
|
|
+ cache.clone(),
|
|
|
+ refresh_scheduler.clone(),
|
|
|
+ );
|
|
|
+ is_scheduled = true;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- debug!("Refresh loop stopped");
|
|
|
+ tracing::debug!("Refresh loop stopped for {}", mint_url);
|
|
|
}
|
|
|
}
|