|
@@ -1,5 +1,6 @@
|
|
|
//! Background worker for fetching and refreshing mint keys
|
|
//! Background worker for fetching and refreshing mint keys
|
|
|
|
|
|
|
|
|
|
+use std::fmt::Debug;
|
|
|
use std::sync::Arc;
|
|
use std::sync::Arc;
|
|
|
use std::time::Duration;
|
|
use std::time::Duration;
|
|
|
|
|
|
|
@@ -10,10 +11,11 @@ use cdk_common::parking_lot::RwLock as ParkingLotRwLock;
|
|
|
use cdk_common::task::spawn;
|
|
use cdk_common::task::spawn;
|
|
|
use cdk_common::util::unix_time;
|
|
use cdk_common::util::unix_time;
|
|
|
use cdk_common::KeySet;
|
|
use cdk_common::KeySet;
|
|
|
|
|
+use tokio::sync::mpsc;
|
|
|
|
|
+use tokio::task::JoinHandle;
|
|
|
use tokio::time::sleep;
|
|
use tokio::time::sleep;
|
|
|
|
|
|
|
|
-use super::scheduler::RefreshScheduler;
|
|
|
|
|
-use super::{KeyManager, MintKeyCache};
|
|
|
|
|
|
|
+use super::MintKeyCache;
|
|
|
use crate::nuts::Id;
|
|
use crate::nuts::Id;
|
|
|
use crate::wallet::MintConnector;
|
|
use crate::wallet::MintConnector;
|
|
|
use crate::Error;
|
|
use crate::Error;
|
|
@@ -22,7 +24,7 @@ use crate::Error;
|
|
|
use crate::wallet::AuthMintConnector;
|
|
use crate::wallet::AuthMintConnector;
|
|
|
|
|
|
|
|
/// Type alias for storage list to improve readability
|
|
/// Type alias for storage list to improve readability
|
|
|
-type StorageList =
|
|
|
|
|
|
|
+pub(super) type StorageList =
|
|
|
Arc<ParkingLotRwLock<Vec<Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>>>>;
|
|
Arc<ParkingLotRwLock<Vec<Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>>>>;
|
|
|
|
|
|
|
|
/// Messages for the background refresh task
|
|
/// Messages for the background refresh task
|
|
@@ -38,494 +40,507 @@ pub(super) enum MessageToWorker {
|
|
|
FetchMint,
|
|
FetchMint,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-impl KeyManager {
|
|
|
|
|
- /// Load a specific keyset from database or HTTP
|
|
|
|
|
- ///
|
|
|
|
|
- /// First checks all databases for the keyset. If not found,
|
|
|
|
|
- /// fetches from the mint server via HTTP and persists to all databases.
|
|
|
|
|
- async fn load_keyset_from_db_or_http(
|
|
|
|
|
- mint_url: &MintUrl,
|
|
|
|
|
- storages: &StorageList,
|
|
|
|
|
- client: &Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
- keyset_id: &Id,
|
|
|
|
|
- ) -> Result<KeySet, Error> {
|
|
|
|
|
- let storages_list = storages.read().clone();
|
|
|
|
|
-
|
|
|
|
|
- // Try all databases first
|
|
|
|
|
- for storage in &storages_list {
|
|
|
|
|
- if let Some(keys) = storage.get_keys(keyset_id).await? {
|
|
|
|
|
- tracing::debug!("Loaded keyset {} from database for {}", keyset_id, mint_url);
|
|
|
|
|
-
|
|
|
|
|
- // Get keyset info to construct KeySet
|
|
|
|
|
- if let Some(keyset_info) = storage.get_keyset_by_id(keyset_id).await? {
|
|
|
|
|
- return Ok(KeySet {
|
|
|
|
|
- id: *keyset_id,
|
|
|
|
|
- unit: keyset_info.unit,
|
|
|
|
|
- final_expiry: keyset_info.final_expiry,
|
|
|
|
|
- keys,
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+/// Shared worker state for a mint
|
|
|
|
|
+///
|
|
|
|
|
+/// One worker per mint_url, shared across all KeyManager instances for that mint.
|
|
|
|
|
+pub(super) struct SharedWorker {
|
|
|
|
|
+ /// Mint URL
|
|
|
|
|
+ pub(super) mint_url: MintUrl,
|
|
|
|
|
|
|
|
- // Not in any database, fetch from HTTP
|
|
|
|
|
- tracing::debug!(
|
|
|
|
|
- "Keyset {} not in database, fetching from mint server for {}",
|
|
|
|
|
- keyset_id,
|
|
|
|
|
- mint_url
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- let keyset = client.get_mint_keyset(*keyset_id).await?;
|
|
|
|
|
- keyset.verify_id()?;
|
|
|
|
|
-
|
|
|
|
|
- // Persist to all databases
|
|
|
|
|
- for storage in &storages_list {
|
|
|
|
|
- let _ = storage.add_keys(keyset.clone()).await.inspect_err(|e| {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Failed to persist keyset {} for {}: {}",
|
|
|
|
|
- keyset_id,
|
|
|
|
|
- mint_url,
|
|
|
|
|
- e
|
|
|
|
|
- )
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ /// Client for HTTP requests (shared across all instances)
|
|
|
|
|
+ pub(super) client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
|
|
|
- tracing::debug!("Loaded keyset {} from HTTP for {}", keyset_id, mint_url);
|
|
|
|
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
|
|
+ pub(super) auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
|
|
|
- Ok(keyset)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ /// All storages registered for this mint
|
|
|
|
|
+ pub(super) storages: StorageList,
|
|
|
|
|
|
|
|
- /// Load cached mint data from all registered storage backends
|
|
|
|
|
- ///
|
|
|
|
|
- /// Iterates through all storages, collecting unique keysets and keys.
|
|
|
|
|
- /// Marks cache as ready only if mint_info was found.
|
|
|
|
|
- ///
|
|
|
|
|
- /// Returns a MintKeyCache that may or may not be ready depending on what was found.
|
|
|
|
|
- async fn load_cache_from_storages(
|
|
|
|
|
- mint_url: &MintUrl,
|
|
|
|
|
- storages: &StorageList,
|
|
|
|
|
- ) -> Result<MintKeyCache, Error> {
|
|
|
|
|
- tracing::debug!("Loading cache from storage for {}", mint_url);
|
|
|
|
|
-
|
|
|
|
|
- let mut cache = MintKeyCache::empty();
|
|
|
|
|
- let storages_list = storages.read().clone();
|
|
|
|
|
-
|
|
|
|
|
- for storage in storages_list {
|
|
|
|
|
- // Load mint info from first storage that has it
|
|
|
|
|
- if cache.mint_info.is_none() {
|
|
|
|
|
- cache.mint_info = storage.get_mint(mint_url.clone()).await?;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ /// Shared cache
|
|
|
|
|
+ pub(super) cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
|
|
|
- // Collect unique keysets from all storages
|
|
|
|
|
- for keyset in storage
|
|
|
|
|
- .get_mint_keysets(mint_url.clone())
|
|
|
|
|
- .await?
|
|
|
|
|
- .unwrap_or_default()
|
|
|
|
|
- {
|
|
|
|
|
- if cache.keysets_by_id.contains_key(&keyset.id) {
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ /// Message sender to worker (bounded to 1000 messages)
|
|
|
|
|
+ pub(super) tx: mpsc::Sender<MessageToWorker>,
|
|
|
|
|
|
|
|
- let arc_keyset = Arc::new(keyset.clone());
|
|
|
|
|
- cache.keysets_by_id.insert(keyset.id, arc_keyset.clone());
|
|
|
|
|
|
|
+ /// Worker task handle
|
|
|
|
|
+ pub(super) task: Option<JoinHandle<()>>,
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- if keyset.active {
|
|
|
|
|
- cache.active_keysets.push(arc_keyset);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+impl Debug for SharedWorker {
|
|
|
|
|
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
|
+ f.debug_struct("SharedWorker")
|
|
|
|
|
+ .field("mint_url", &self.mint_url)
|
|
|
|
|
+ .field("storages", &self.storages.read().len().to_string())
|
|
|
|
|
+ .field("tx", &self.tx.is_closed())
|
|
|
|
|
+ .field("cache", &self.cache)
|
|
|
|
|
+ .finish()
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Collect unique keys from all storages
|
|
|
|
|
- for keyset_id in cache.keysets_by_id.keys() {
|
|
|
|
|
- if cache.keys_by_id.contains_key(keyset_id) {
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+impl Drop for SharedWorker {
|
|
|
|
|
+ fn drop(&mut self) {
|
|
|
|
|
+ tracing::debug!("Dropping SharedWorker for {}", self.mint_url);
|
|
|
|
|
+ let _ = self.tx.try_send(MessageToWorker::Stop).inspect_err(|e| {
|
|
|
|
|
+ tracing::error!("Failed to send Stop message for {}: {}", self.mint_url, e)
|
|
|
|
|
+ });
|
|
|
|
|
+ if let Some(task) = self.task.take() {
|
|
|
|
|
+ task.abort();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- if let Some(keys) = storage.get_keys(keyset_id).await? {
|
|
|
|
|
- cache.keys_by_id.insert(*keyset_id, Arc::new(keys));
|
|
|
|
|
- }
|
|
|
|
|
|
|
+/// Load a specific keyset from database or HTTP
|
|
|
|
|
+///
|
|
|
|
|
+/// First checks all databases for the keyset. If not found,
|
|
|
|
|
+/// fetches from the mint server via HTTP and persists to all databases.
|
|
|
|
|
+async fn load_keyset_from_db_or_http(
|
|
|
|
|
+ mint_url: &MintUrl,
|
|
|
|
|
+ client: &Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
+ storages: &StorageList,
|
|
|
|
|
+ keyset_id: &Id,
|
|
|
|
|
+) -> Result<KeySet, Error> {
|
|
|
|
|
+ let storages_list = storages.read().clone();
|
|
|
|
|
+
|
|
|
|
|
+ // Try all databases first
|
|
|
|
|
+ for storage in &storages_list {
|
|
|
|
|
+ if let Some(keys) = storage.get_keys(keyset_id).await? {
|
|
|
|
|
+ tracing::debug!("Loaded keyset {} from database for {}", keyset_id, mint_url);
|
|
|
|
|
+
|
|
|
|
|
+ // Get keyset info to construct KeySet
|
|
|
|
|
+ if let Some(keyset_info) = storage.get_keyset_by_id(keyset_id).await? {
|
|
|
|
|
+ return Ok(KeySet {
|
|
|
|
|
+ id: *keyset_id,
|
|
|
|
|
+ unit: keyset_info.unit,
|
|
|
|
|
+ final_expiry: keyset_info.final_expiry,
|
|
|
|
|
+ keys,
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- Self::finalize_cache(&mut cache, 0);
|
|
|
|
|
|
|
+ // Not in any database, fetch from HTTP
|
|
|
|
|
+ tracing::debug!(
|
|
|
|
|
+ "Keyset {} not in database, fetching from mint server for {}",
|
|
|
|
|
+ keyset_id,
|
|
|
|
|
+ mint_url
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ let keyset = client.get_mint_keyset(*keyset_id).await?;
|
|
|
|
|
+ keyset.verify_id()?;
|
|
|
|
|
+
|
|
|
|
|
+ // Persist to all databases
|
|
|
|
|
+ for storage in &storages_list {
|
|
|
|
|
+ let _ = storage.add_keys(keyset.clone()).await.inspect_err(|e| {
|
|
|
|
|
+ tracing::warn!(
|
|
|
|
|
+ "Failed to persist keyset {} for {}: {}",
|
|
|
|
|
+ keyset_id,
|
|
|
|
|
+ mint_url,
|
|
|
|
|
+ e
|
|
|
|
|
+ )
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- tracing::debug!(
|
|
|
|
|
- "Loaded {} keys from storage for {}",
|
|
|
|
|
- cache.keys_by_id.len(),
|
|
|
|
|
- mint_url
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ tracing::debug!("Loaded keyset {} from HTTP for {}", keyset_id, mint_url);
|
|
|
|
|
|
|
|
- Ok(cache)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ Ok(keyset)
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- /// Finalize cache state with proper versioning and timestamp
|
|
|
|
|
- ///
|
|
|
|
|
- /// Sets the cache as ready and updates metadata.
|
|
|
|
|
- fn finalize_cache(cache: &mut MintKeyCache, previous_version: u64) {
|
|
|
|
|
- cache.refresh_version = previous_version + 1;
|
|
|
|
|
- cache.is_ready = cache.mint_info.is_some();
|
|
|
|
|
- cache.last_refresh = std::time::Instant::now();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+/// Load cached mint data from all registered storage backends
|
|
|
|
|
+///
|
|
|
|
|
+/// Iterates through all storages, collecting unique keysets and keys.
|
|
|
|
|
+/// Marks cache as ready only if mint_info was found.
|
|
|
|
|
+///
|
|
|
|
|
+/// Returns a MintKeyCache that may or may not be ready depending on what was found.
|
|
|
|
|
+async fn load_cache_from_storages(
|
|
|
|
|
+ mint_url: &MintUrl,
|
|
|
|
|
+ storages: &StorageList,
|
|
|
|
|
+) -> Result<MintKeyCache, Error> {
|
|
|
|
|
+ tracing::debug!("Loading cache from storage for {}", mint_url);
|
|
|
|
|
+
|
|
|
|
|
+ let mut cache = MintKeyCache::empty();
|
|
|
|
|
+ let storages_list = storages.read().clone();
|
|
|
|
|
+
|
|
|
|
|
+ for storage in storages_list {
|
|
|
|
|
+ // Load mint info from first storage that has it
|
|
|
|
|
+ if cache.mint_info.is_none() {
|
|
|
|
|
+ cache.mint_info = storage.get_mint(mint_url.clone()).await?;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- /// Write keys that don't already exist in storage
|
|
|
|
|
- ///
|
|
|
|
|
- /// Checks each key in cache and only writes if not already in storage.
|
|
|
|
|
- async fn write_missing_keys_to_storage(
|
|
|
|
|
- storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
|
|
- mint_url: &MintUrl,
|
|
|
|
|
- cache: &MintKeyCache,
|
|
|
|
|
- ) {
|
|
|
|
|
- for (keyset_id, keys) in cache.keys_by_id.iter() {
|
|
|
|
|
- // Skip if already exists
|
|
|
|
|
- if storage.get_keys(keyset_id).await.ok().flatten().is_some() {
|
|
|
|
|
- continue;
|
|
|
|
|
|
|
+ // Collect unique keysets from all storages
|
|
|
|
|
+ for keyset in storage
|
|
|
|
|
+ .get_mint_keysets(mint_url.clone())
|
|
|
|
|
+ .await?
|
|
|
|
|
+ .unwrap_or_default()
|
|
|
|
|
+ {
|
|
|
|
|
+ let arc_keyset = Arc::new(keyset.clone());
|
|
|
|
|
+ cache.keysets_by_id.insert(keyset.id, arc_keyset.clone());
|
|
|
|
|
+
|
|
|
|
|
+ if keyset.active {
|
|
|
|
|
+ cache.active_keysets.push(arc_keyset);
|
|
|
}
|
|
}
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Get keyset info
|
|
|
|
|
- let keyset_info = if let Some(keyset_info) = cache.keysets_by_id.get(keyset_id) {
|
|
|
|
|
- keyset_info
|
|
|
|
|
- } else {
|
|
|
|
|
- tracing::warn!("Missing keyset info for {}", keyset_id);
|
|
|
|
|
- continue;
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- // Write keyset
|
|
|
|
|
- let keyset = KeySet {
|
|
|
|
|
- id: *keyset_id,
|
|
|
|
|
- unit: keyset_info.unit.clone(),
|
|
|
|
|
- final_expiry: keyset_info.final_expiry,
|
|
|
|
|
- keys: (**keys).clone(),
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- let _ = storage.add_keys(keyset).await.inspect_err(|e| {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Failed to write keys for {} to {}: {}",
|
|
|
|
|
- keyset_id,
|
|
|
|
|
- mint_url,
|
|
|
|
|
- e
|
|
|
|
|
- )
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // Collect unique keys from all storages
|
|
|
|
|
+ for (id, _keyset_info) in &cache.keysets_by_id {
|
|
|
|
|
+ if !cache.keys_by_id.contains_key(id) {
|
|
|
|
|
+ if let Some(keys) = storage.get_keys(id).await? {
|
|
|
|
|
+ cache.keys_by_id.insert(*id, Arc::new(keys));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /// Write cached data to a single storage backend
|
|
|
|
|
- ///
|
|
|
|
|
- /// Persists mint_info, keysets, and keys. Only writes keys that don't already exist.
|
|
|
|
|
- /// All errors are logged but don't stop the operation.
|
|
|
|
|
- async fn write_cache_to_storage(
|
|
|
|
|
- storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
|
|
- mint_url: MintUrl,
|
|
|
|
|
- cache: Arc<MintKeyCache>,
|
|
|
|
|
- ) {
|
|
|
|
|
- if !cache.is_ready {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Set ready status based on whether we got mint_info
|
|
|
|
|
+ cache.is_ready = cache.mint_info.is_some();
|
|
|
|
|
|
|
|
- // Write mint info
|
|
|
|
|
- if cache.mint_info.is_some() {
|
|
|
|
|
- let _ = storage
|
|
|
|
|
- .add_mint(mint_url.clone(), cache.mint_info.clone())
|
|
|
|
|
- .await
|
|
|
|
|
- .inspect_err(|e| {
|
|
|
|
|
- tracing::warn!("Failed to persist mint_info for {}: {}", mint_url, e);
|
|
|
|
|
|
|
+ tracing::debug!(
|
|
|
|
|
+ "Loaded {} keysets and {} keys from storage for {}",
|
|
|
|
|
+ cache.keysets_by_id.len(),
|
|
|
|
|
+ cache.keys_by_id.len(),
|
|
|
|
|
+ mint_url
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ Ok(cache)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+/// Finalize cache after successful update
|
|
|
|
|
+///
|
|
|
|
|
+/// Increments generation, marks as ready, and updates timestamp.
|
|
|
|
|
+fn finalize_cache(cache: &mut MintKeyCache, previous_version: u64) {
|
|
|
|
|
+ cache.refresh_version = previous_version + 1;
|
|
|
|
|
+ cache.is_ready = true;
|
|
|
|
|
+ cache.last_refresh = std::time::Instant::now();
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+/// Write keys that are missing from storage
|
|
|
|
|
+///
|
|
|
|
|
+/// For each keyset in cache, checks if storage has the keys. If not, persists them.
|
|
|
|
|
+async fn write_missing_keys_to_storage(
|
|
|
|
|
+ storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
|
|
+ mint_url: &MintUrl,
|
|
|
|
|
+ cache: &MintKeyCache,
|
|
|
|
|
+) {
|
|
|
|
|
+ for (keyset_id, keys) in &cache.keys_by_id {
|
|
|
|
|
+ // Check if keys already exist in storage
|
|
|
|
|
+ if storage.get_keys(keyset_id).await.ok().flatten().is_none() {
|
|
|
|
|
+ // Keys don't exist, need to persist
|
|
|
|
|
+ if let Some(keyset_info) = cache.keysets_by_id.get(keyset_id) {
|
|
|
|
|
+ let keyset = KeySet {
|
|
|
|
|
+ id: *keyset_id,
|
|
|
|
|
+ unit: keyset_info.unit.clone(),
|
|
|
|
|
+ final_expiry: keyset_info.final_expiry,
|
|
|
|
|
+ keys: (**keys).clone(),
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ let _ = storage.add_keys(keyset).await.inspect_err(|e| {
|
|
|
|
|
+ tracing::warn!(
|
|
|
|
|
+ "Failed to persist keys for {} keyset {}: {}",
|
|
|
|
|
+ mint_url,
|
|
|
|
|
+ keyset_id,
|
|
|
|
|
+ e
|
|
|
|
|
+ )
|
|
|
});
|
|
});
|
|
|
|
|
+ } else {
|
|
|
|
|
+ tracing::warn!("Missing keyset info for {}", keyset_id);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Write keysets
|
|
|
|
|
|
|
+/// Write complete cache to a single storage backend
|
|
|
|
|
+///
|
|
|
|
|
+/// Persists mint_info, keysets, and missing keys to the specified storage.
|
|
|
|
|
+async fn write_cache_to_storage(
|
|
|
|
|
+ storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
|
|
+ mint_url: MintUrl,
|
|
|
|
|
+ cache: Arc<MintKeyCache>,
|
|
|
|
|
+) {
|
|
|
|
|
+ // Write mint_info
|
|
|
|
|
+ if let Some(ref mint_info) = cache.mint_info {
|
|
|
let _ = storage
|
|
let _ = storage
|
|
|
- .add_mint_keysets(
|
|
|
|
|
- mint_url.clone(),
|
|
|
|
|
- cache
|
|
|
|
|
- .keysets_by_id
|
|
|
|
|
- .values()
|
|
|
|
|
- .map(|ks| (**ks).clone())
|
|
|
|
|
- .collect(),
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ .add_mint(mint_url.clone(), Some(mint_info.clone()))
|
|
|
.await
|
|
.await
|
|
|
- .inspect_err(|e| tracing::warn!("Failed to persist keysets for {}: {}", mint_url, e));
|
|
|
|
|
|
|
+ .inspect_err(|e| {
|
|
|
|
|
+ tracing::warn!("Failed to persist mint_info for {}: {}", mint_url, e)
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Write keysets
|
|
|
|
|
+ let keysets: Vec<_> = cache.keysets_by_id.values().map(|ks| (**ks).clone()).collect();
|
|
|
|
|
+ let _ = storage
|
|
|
|
|
+ .add_mint_keysets(mint_url.clone(), keysets)
|
|
|
|
|
+ .await
|
|
|
|
|
+ .inspect_err(|e| tracing::warn!("Failed to persist keysets for {}: {}", mint_url, e));
|
|
|
|
|
|
|
|
- // Write missing keys
|
|
|
|
|
- Self::write_missing_keys_to_storage(&storage, &mint_url, &cache).await;
|
|
|
|
|
|
|
+ // Write any missing keys
|
|
|
|
|
+ write_missing_keys_to_storage(&storage, &mint_url, &cache).await;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+/// Write cached data to all registered storage backends concurrently
|
|
|
|
|
+///
|
|
|
|
|
+/// Spawns a task for each storage to write in parallel.
|
|
|
|
|
+async fn write_cache_to_all_storages(
|
|
|
|
|
+ mint_url: &MintUrl,
|
|
|
|
|
+ storages: &StorageList,
|
|
|
|
|
+ cache: Arc<MintKeyCache>,
|
|
|
|
|
+) {
|
|
|
|
|
+ let storages_list = storages.read().clone();
|
|
|
|
|
+
|
|
|
|
|
+ for storage in storages_list {
|
|
|
|
|
+ spawn(write_cache_to_storage(
|
|
|
|
|
+ storage,
|
|
|
|
|
+ mint_url.clone(),
|
|
|
|
|
+ cache.clone(),
|
|
|
|
|
+ ));
|
|
|
}
|
|
}
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- /// Write cached data to all registered storage backends concurrently
|
|
|
|
|
- ///
|
|
|
|
|
- /// Spawns a task for each storage to write in parallel.
|
|
|
|
|
- async fn write_cache_to_all_storages(
|
|
|
|
|
- storages: &StorageList,
|
|
|
|
|
- mint_url: MintUrl,
|
|
|
|
|
- cache: Arc<MintKeyCache>,
|
|
|
|
|
- ) {
|
|
|
|
|
- let storages_list = storages.read().clone();
|
|
|
|
|
-
|
|
|
|
|
- for storage in storages_list {
|
|
|
|
|
- spawn(Self::write_cache_to_storage(
|
|
|
|
|
- storage,
|
|
|
|
|
- mint_url.clone(),
|
|
|
|
|
- cache.clone(),
|
|
|
|
|
- ));
|
|
|
|
|
|
|
+/// Try to load cache from storage and update shared cache if successful
|
|
|
|
|
+///
|
|
|
|
|
+/// Returns true if cache was successfully loaded and is ready, false otherwise.
|
|
|
|
|
+async fn try_load_cache_from_storages(
|
|
|
|
|
+ mint_url: &MintUrl,
|
|
|
|
|
+ storages: &StorageList,
|
|
|
|
|
+ cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
+) -> bool {
|
|
|
|
|
+ match load_cache_from_storages(mint_url, storages).await {
|
|
|
|
|
+ Ok(mut new_cache) => {
|
|
|
|
|
+ if new_cache.is_ready {
|
|
|
|
|
+ let old_generation = cache.load().refresh_version;
|
|
|
|
|
+ finalize_cache(&mut new_cache, old_generation);
|
|
|
|
|
+ cache.store(Arc::new(new_cache));
|
|
|
|
|
+ true
|
|
|
|
|
+ } else {
|
|
|
|
|
+ false
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ tracing::warn!("Failed to load cache from storage for {}: {}", mint_url, e);
|
|
|
|
|
+ false
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- /// Attempt to load cache from storages if not ready, and persist if successful
|
|
|
|
|
- ///
|
|
|
|
|
- /// Returns true if cache is ready after this operation.
|
|
|
|
|
- async fn try_load_cache_from_storages(
|
|
|
|
|
- mint_url: &MintUrl,
|
|
|
|
|
- storages: &StorageList,
|
|
|
|
|
- cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
- ) -> bool {
|
|
|
|
|
- // Already ready, nothing to do
|
|
|
|
|
- if cache.load().is_ready {
|
|
|
|
|
- return true;
|
|
|
|
|
|
|
+/// Fetch complete mint data from HTTP and update cache
|
|
|
|
|
+///
|
|
|
|
|
+/// Multi-step process:
|
|
|
|
|
+/// 1. Fetches and validates mint info (checks time synchronization)
|
|
|
|
|
+/// 2. Fetches keyset list (including auth keysets if enabled)
|
|
|
|
|
+/// 3. Loads actual keyset keys (trying storage first, HTTP fallback)
|
|
|
|
|
+/// 4. Updates shared cache
|
|
|
|
|
+/// 5. Persists all data to all registered storages
|
|
|
|
|
+pub(super) async fn fetch_mint_data_from_http(
|
|
|
|
|
+ mint_url: &MintUrl,
|
|
|
|
|
+ client: &Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
+ #[cfg(feature = "auth")] auth_client: &Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
+ storages: &StorageList,
|
|
|
|
|
+ cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
+) -> Result<(), Error> {
|
|
|
|
|
+ tracing::debug!("Fetching keys from mint server for {}", mint_url);
|
|
|
|
|
+
|
|
|
|
|
+ // Fetch and validate mint info
|
|
|
|
|
+ let mint_info = client.get_mint_info().await?;
|
|
|
|
|
+
|
|
|
|
|
+ if let Some(mint_unix_time) = mint_info.time {
|
|
|
|
|
+ let current_unix_time = unix_time();
|
|
|
|
|
+ if current_unix_time.abs_diff(mint_unix_time) > 30 {
|
|
|
|
|
+ tracing::warn!(
|
|
|
|
|
+ "Mint time does not match wallet time. Mint: {}, Wallet: {}",
|
|
|
|
|
+ mint_unix_time,
|
|
|
|
|
+ current_unix_time
|
|
|
|
|
+ );
|
|
|
|
|
+ return Err(Error::MintTimeExceedsTolerance);
|
|
|
}
|
|
}
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Try loading from storages
|
|
|
|
|
- let loaded_cache = match Self::load_cache_from_storages(mint_url, storages).await {
|
|
|
|
|
- Ok(c) => c,
|
|
|
|
|
- Err(e) => {
|
|
|
|
|
- tracing::warn!("Failed to load cache from storage for {}: {}", mint_url, e);
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- // If we got valid data, persist and store
|
|
|
|
|
- if loaded_cache.is_ready {
|
|
|
|
|
- let loaded_cache = Arc::new(loaded_cache);
|
|
|
|
|
- Self::write_cache_to_all_storages(storages, mint_url.clone(), loaded_cache.clone())
|
|
|
|
|
- .await;
|
|
|
|
|
- cache.store(loaded_cache);
|
|
|
|
|
- return true;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Fetch keysets
|
|
|
|
|
+ let keysets_response = client.get_mint_keysets().await?;
|
|
|
|
|
+ let mut keysets = keysets_response.keysets;
|
|
|
|
|
|
|
|
- false
|
|
|
|
|
|
|
+ // Include auth keysets if enabled
|
|
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
|
|
+ if let Ok(auth_keysets_response) = auth_client.get_mint_blind_auth_keysets().await {
|
|
|
|
|
+ keysets.extend_from_slice(&auth_keysets_response.keysets);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /// Fetch fresh mint data from HTTP endpoint
|
|
|
|
|
- ///
|
|
|
|
|
- /// 1. Fetches mint info and validates time sync
|
|
|
|
|
- /// 2. Fetches keyset list (including auth keysets if enabled)
|
|
|
|
|
- /// 3. Loads actual keyset keys (trying storage first, HTTP fallback)
|
|
|
|
|
- /// 4. Updates shared cache and schedules next refresh
|
|
|
|
|
- /// 5. Persists all data to all registered storages
|
|
|
|
|
- pub(super) async fn fetch_mint_data_from_http(
|
|
|
|
|
- mint_url: MintUrl,
|
|
|
|
|
- client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
- #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
- storages: StorageList,
|
|
|
|
|
- cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
- refresh_scheduler: RefreshScheduler,
|
|
|
|
|
- ) -> Result<(), Error> {
|
|
|
|
|
- tracing::debug!("Fetching keys from mint server for {}", mint_url);
|
|
|
|
|
-
|
|
|
|
|
- // Fetch and validate mint info
|
|
|
|
|
- let mint_info = client.get_mint_info().await?;
|
|
|
|
|
-
|
|
|
|
|
- if let Some(mint_unix_time) = mint_info.time {
|
|
|
|
|
- let current_unix_time = unix_time();
|
|
|
|
|
- if current_unix_time.abs_diff(mint_unix_time) > 30 {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Mint time does not match wallet time. Mint: {}, Wallet: {}",
|
|
|
|
|
- mint_unix_time,
|
|
|
|
|
- current_unix_time
|
|
|
|
|
- );
|
|
|
|
|
- return Err(Error::MintTimeExceedsTolerance);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Build new cache
|
|
|
|
|
+ let mut new_cache = MintKeyCache::empty();
|
|
|
|
|
|
|
|
- // Fetch keysets
|
|
|
|
|
- let keysets_response = client.get_mint_keysets().await?;
|
|
|
|
|
- let mut keysets = keysets_response.keysets;
|
|
|
|
|
|
|
+ for keyset_info in keysets {
|
|
|
|
|
+ let arc_keyset = Arc::new(keyset_info.clone());
|
|
|
|
|
+ new_cache
|
|
|
|
|
+ .keysets_by_id
|
|
|
|
|
+ .insert(keyset_info.id, arc_keyset.clone());
|
|
|
|
|
|
|
|
- // Include auth keysets if enabled
|
|
|
|
|
- #[cfg(feature = "auth")]
|
|
|
|
|
- if let Ok(auth_keysets_response) = auth_client.get_mint_blind_auth_keysets().await {
|
|
|
|
|
- keysets.extend_from_slice(&auth_keysets_response.keysets);
|
|
|
|
|
|
|
+ if keyset_info.active {
|
|
|
|
|
+ new_cache.active_keysets.push(arc_keyset);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Build new cache
|
|
|
|
|
- let mut new_cache = MintKeyCache::empty();
|
|
|
|
|
-
|
|
|
|
|
- for keyset_info in keysets {
|
|
|
|
|
- let arc_keyset = Arc::new(keyset_info.clone());
|
|
|
|
|
|
|
+ // Load keyset keys (try database first, then HTTP)
|
|
|
|
|
+ if let Ok(keyset) = load_keyset_from_db_or_http(mint_url, client, storages, &keyset_info.id)
|
|
|
|
|
+ .await
|
|
|
|
|
+ .inspect_err(|e| {
|
|
|
|
|
+ tracing::warn!(
|
|
|
|
|
+ "Failed to load keyset {} for {}: {}",
|
|
|
|
|
+ keyset_info.id,
|
|
|
|
|
+ mint_url,
|
|
|
|
|
+ e
|
|
|
|
|
+ )
|
|
|
|
|
+ })
|
|
|
|
|
+ {
|
|
|
new_cache
|
|
new_cache
|
|
|
- .keysets_by_id
|
|
|
|
|
- .insert(keyset_info.id, arc_keyset.clone());
|
|
|
|
|
-
|
|
|
|
|
- if keyset_info.active {
|
|
|
|
|
- new_cache.active_keysets.push(arc_keyset);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Load keyset keys (try database first, then HTTP)
|
|
|
|
|
- if let Ok(keyset) =
|
|
|
|
|
- Self::load_keyset_from_db_or_http(&mint_url, &storages, &client, &keyset_info.id)
|
|
|
|
|
- .await
|
|
|
|
|
- .inspect_err(|e| {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Failed to load keyset {} for {}: {}",
|
|
|
|
|
- keyset_info.id,
|
|
|
|
|
- mint_url,
|
|
|
|
|
- e
|
|
|
|
|
- )
|
|
|
|
|
- })
|
|
|
|
|
- {
|
|
|
|
|
- new_cache
|
|
|
|
|
- .keys_by_id
|
|
|
|
|
- .insert(keyset_info.id, Arc::new(keyset.keys));
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ .keys_by_id
|
|
|
|
|
+ .insert(keyset_info.id, Arc::new(keyset.keys));
|
|
|
}
|
|
}
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Schedule next refresh
|
|
|
|
|
- refresh_scheduler.schedule_refresh(mint_url.clone());
|
|
|
|
|
|
|
+ // Finalize cache
|
|
|
|
|
+ let old_generation = cache.load().refresh_version;
|
|
|
|
|
+ new_cache.mint_info = Some(mint_info);
|
|
|
|
|
+ finalize_cache(&mut new_cache, old_generation);
|
|
|
|
|
|
|
|
- // Finalize cache
|
|
|
|
|
- let old_generation = cache.load().refresh_version;
|
|
|
|
|
- new_cache.mint_info = Some(mint_info);
|
|
|
|
|
- Self::finalize_cache(&mut new_cache, old_generation);
|
|
|
|
|
|
|
+ tracing::debug!(
|
|
|
|
|
+ "Refreshed {} keysets and {} keys for {} (generation {})",
|
|
|
|
|
+ new_cache.keysets_by_id.len(),
|
|
|
|
|
+ new_cache.keys_by_id.len(),
|
|
|
|
|
+ mint_url,
|
|
|
|
|
+ new_cache.refresh_version
|
|
|
|
|
+ );
|
|
|
|
|
|
|
|
- tracing::debug!(
|
|
|
|
|
- "Refreshed {} keysets and {} keys for {} (generation {})",
|
|
|
|
|
- new_cache.keysets_by_id.len(),
|
|
|
|
|
- new_cache.keys_by_id.len(),
|
|
|
|
|
- mint_url,
|
|
|
|
|
- new_cache.refresh_version
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ // Store updated cache
|
|
|
|
|
+ let cache_arc = Arc::new(new_cache);
|
|
|
|
|
+ cache.store(cache_arc.clone());
|
|
|
|
|
|
|
|
- // Persist and update cache
|
|
|
|
|
- let new_cache = Arc::new(new_cache);
|
|
|
|
|
- Self::write_cache_to_all_storages(&storages, mint_url, new_cache.clone()).await;
|
|
|
|
|
- cache.store(new_cache);
|
|
|
|
|
|
|
+ // Persist to all storages
|
|
|
|
|
+ write_cache_to_all_storages(mint_url, storages, cache_arc).await;
|
|
|
|
|
|
|
|
- Ok(())
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- /// Spawn task to refresh mint data (database-first, then HTTP)
|
|
|
|
|
- ///
|
|
|
|
|
- /// First attempts to load from storage. If that fails or cache is still not ready,
|
|
|
|
|
- /// fetches from HTTP with a 60s timeout. All operations run in spawned task.
|
|
|
|
|
- pub(super) fn refresh_mint_task(
|
|
|
|
|
- mint_url: MintUrl,
|
|
|
|
|
- client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
- #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
- storages: StorageList,
|
|
|
|
|
- cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
- refresh_scheduler: RefreshScheduler,
|
|
|
|
|
- ) {
|
|
|
|
|
- spawn(async move {
|
|
|
|
|
- // Try loading from storage first
|
|
|
|
|
- Self::try_load_cache_from_storages(&mint_url, &storages, &cache).await;
|
|
|
|
|
-
|
|
|
|
|
- // If still not ready, fetch from HTTP
|
|
|
|
|
- let result = tokio::time::timeout(
|
|
|
|
|
- Duration::from_secs(60),
|
|
|
|
|
- Self::fetch_mint_data_from_http(
|
|
|
|
|
- mint_url.clone(),
|
|
|
|
|
- client,
|
|
|
|
|
- #[cfg(feature = "auth")]
|
|
|
|
|
- auth_client,
|
|
|
|
|
- storages,
|
|
|
|
|
- cache,
|
|
|
|
|
- refresh_scheduler,
|
|
|
|
|
- ),
|
|
|
|
|
- )
|
|
|
|
|
- .await;
|
|
|
|
|
-
|
|
|
|
|
- // Log result
|
|
|
|
|
- let _ = result
|
|
|
|
|
- .map_err(|_| Error::Timeout)
|
|
|
|
|
- .and_then(|r| r)
|
|
|
|
|
- .inspect(|_| {
|
|
|
|
|
- tracing::debug!("Successfully refreshed keys for {}", mint_url);
|
|
|
|
|
- })
|
|
|
|
|
- .inspect_err(|e| match e {
|
|
|
|
|
- Error::Timeout => {
|
|
|
|
|
- tracing::error!("Timeout fetching keys from mint server for {}", mint_url)
|
|
|
|
|
- }
|
|
|
|
|
- _ => {
|
|
|
|
|
- tracing::error!("Failed to refresh keys for {}: {}", mint_url, e)
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
|
|
+/// Spawn task to refresh mint data (database-first, then HTTP)
|
|
|
|
|
+///
|
|
|
|
|
+/// First attempts to load from storage. If that fails or cache is still not ready,
|
|
|
|
|
+/// fetches from HTTP with a 60s timeout. All operations run in spawned task.
|
|
|
|
|
+pub(super) fn refresh_mint_task(
|
|
|
|
|
+ mint_url: MintUrl,
|
|
|
|
|
+ client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
+ #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
+ storages: StorageList,
|
|
|
|
|
+ cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
+) {
|
|
|
|
|
+ spawn(async move {
|
|
|
|
|
+ // Try loading from storage first
|
|
|
|
|
+ try_load_cache_from_storages(&mint_url, &storages, &cache).await;
|
|
|
|
|
+
|
|
|
|
|
+ // If still not ready, fetch from HTTP
|
|
|
|
|
+ let result = tokio::time::timeout(
|
|
|
|
|
+ Duration::from_secs(60),
|
|
|
|
|
+ fetch_mint_data_from_http(
|
|
|
|
|
+ &mint_url,
|
|
|
|
|
+ &client,
|
|
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
|
|
+ &auth_client,
|
|
|
|
|
+ &storages,
|
|
|
|
|
+ &cache,
|
|
|
|
|
+ ),
|
|
|
|
|
+ )
|
|
|
|
|
+ .await;
|
|
|
|
|
+
|
|
|
|
|
+ // Log result
|
|
|
|
|
+ let _ = result
|
|
|
|
|
+ .map_err(|_| Error::Timeout)
|
|
|
|
|
+ .and_then(|r| r)
|
|
|
|
|
+ .inspect(|_| {
|
|
|
|
|
+ tracing::debug!("Successfully refreshed keys for {}", mint_url);
|
|
|
|
|
+ })
|
|
|
|
|
+ .inspect_err(|e| match e {
|
|
|
|
|
+ Error::Timeout => {
|
|
|
|
|
+ tracing::error!("Timeout fetching keys from mint server for {}", mint_url)
|
|
|
|
|
+ }
|
|
|
|
|
+ _ => {
|
|
|
|
|
+ tracing::error!("Failed to refresh keys for {}: {}", mint_url, e)
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ });
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- /// Background refresh loop with message handling
|
|
|
|
|
- ///
|
|
|
|
|
- /// Runs independently and handles refresh messages and periodic updates.
|
|
|
|
|
- /// All refresh operations are spawned as separate tasks with timeouts.
|
|
|
|
|
- pub(super) async fn refresh_loop(
|
|
|
|
|
- mut rx: tokio::sync::mpsc::UnboundedReceiver<MessageToWorker>,
|
|
|
|
|
- mint_url: MintUrl,
|
|
|
|
|
- client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
- #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
- storages: StorageList,
|
|
|
|
|
- cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
- refresh_interval: Duration,
|
|
|
|
|
- ) {
|
|
|
|
|
- let mut interval = tokio::time::interval(Duration::from_secs(1));
|
|
|
|
|
- let refresh_scheduler = RefreshScheduler::new(refresh_interval);
|
|
|
|
|
- let mut is_scheduled = false;
|
|
|
|
|
-
|
|
|
|
|
- loop {
|
|
|
|
|
- tokio::select! {
|
|
|
|
|
- Some(msg) = rx.recv() => {
|
|
|
|
|
- match msg {
|
|
|
|
|
- MessageToWorker::Stop => {
|
|
|
|
|
- tracing::debug!("Stopping loop {}", mint_url);
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- MessageToWorker::SyncDb(db) => {
|
|
|
|
|
- // Wait for cache to be ready, then sync this storage
|
|
|
|
|
- let cache_clone = cache.clone();
|
|
|
|
|
- let mint_clone = mint_url.clone();
|
|
|
|
|
- spawn(async move {
|
|
|
|
|
- loop {
|
|
|
|
|
- let current_cache = cache_clone.load();
|
|
|
|
|
- if current_cache.is_ready {
|
|
|
|
|
- Self::write_cache_to_storage(
|
|
|
|
|
- db,
|
|
|
|
|
- mint_clone,
|
|
|
|
|
- current_cache.clone()
|
|
|
|
|
- ).await;
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- sleep(Duration::from_millis(100)).await;
|
|
|
|
|
|
|
+/// Background refresh loop with message handling
|
|
|
|
|
+///
|
|
|
|
|
+/// Runs independently and handles refresh messages and periodic updates.
|
|
|
|
|
+/// Uses a simple sleep-based refresh interval since each worker handles only one URL.
|
|
|
|
|
+pub(super) async fn refresh_loop(
|
|
|
|
|
+ mint_url: MintUrl,
|
|
|
|
|
+ client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
|
|
+ #[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
+ storages: StorageList,
|
|
|
|
|
+ cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
+ mut rx: tokio::sync::mpsc::Receiver<MessageToWorker>,
|
|
|
|
|
+ refresh_interval: Duration,
|
|
|
|
|
+) {
|
|
|
|
|
+ // Perform initial refresh
|
|
|
|
|
+ refresh_mint_task(
|
|
|
|
|
+ mint_url.clone(),
|
|
|
|
|
+ client.clone(),
|
|
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
|
|
+ auth_client.clone(),
|
|
|
|
|
+ storages.clone(),
|
|
|
|
|
+ cache.clone(),
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ loop {
|
|
|
|
|
+ tokio::select! {
|
|
|
|
|
+ Some(msg) = rx.recv() => {
|
|
|
|
|
+ match msg {
|
|
|
|
|
+ MessageToWorker::Stop => {
|
|
|
|
|
+ tracing::debug!("Stopping refresh loop for {}", mint_url);
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ MessageToWorker::SyncDb(db) => {
|
|
|
|
|
+ // Wait for cache to be ready, then sync this storage
|
|
|
|
|
+ let cache_clone = cache.clone();
|
|
|
|
|
+ let mint_clone = mint_url.clone();
|
|
|
|
|
+ spawn(async move {
|
|
|
|
|
+ loop {
|
|
|
|
|
+ let current_cache = cache_clone.load();
|
|
|
|
|
+ if current_cache.is_ready {
|
|
|
|
|
+ write_cache_to_storage(
|
|
|
|
|
+ db,
|
|
|
|
|
+ mint_clone,
|
|
|
|
|
+ current_cache.clone()
|
|
|
|
|
+ ).await;
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
- MessageToWorker::FetchMint => {
|
|
|
|
|
- tracing::debug!("FetchMint message received for {}", mint_url);
|
|
|
|
|
- Self::refresh_mint_task(
|
|
|
|
|
- mint_url.clone(),
|
|
|
|
|
- client.clone(),
|
|
|
|
|
- #[cfg(feature = "auth")]
|
|
|
|
|
- auth_client.clone(),
|
|
|
|
|
- storages.clone(),
|
|
|
|
|
- cache.clone(),
|
|
|
|
|
- refresh_scheduler.clone(),
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ sleep(Duration::from_millis(100)).await;
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- _ = interval.tick() => {
|
|
|
|
|
- let due_mints = refresh_scheduler.get_due_refreshes();
|
|
|
|
|
-
|
|
|
|
|
- if !due_mints.is_empty() || !is_scheduled {
|
|
|
|
|
- tracing::debug!("Time to refresh mint: {}", mint_url);
|
|
|
|
|
- Self::refresh_mint_task(
|
|
|
|
|
|
|
+ MessageToWorker::FetchMint => {
|
|
|
|
|
+ tracing::debug!("FetchMint message received for {}", mint_url);
|
|
|
|
|
+ refresh_mint_task(
|
|
|
mint_url.clone(),
|
|
mint_url.clone(),
|
|
|
client.clone(),
|
|
client.clone(),
|
|
|
#[cfg(feature = "auth")]
|
|
#[cfg(feature = "auth")]
|
|
|
auth_client.clone(),
|
|
auth_client.clone(),
|
|
|
storages.clone(),
|
|
storages.clone(),
|
|
|
cache.clone(),
|
|
cache.clone(),
|
|
|
- refresh_scheduler.clone(),
|
|
|
|
|
);
|
|
);
|
|
|
- is_scheduled = true;
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- tracing::debug!("Refresh loop stopped for {}", mint_url);
|
|
|
|
|
|
|
+ _ = sleep(refresh_interval) => {
|
|
|
|
|
+ tracing::debug!("Time to refresh mint: {}", mint_url);
|
|
|
|
|
+ refresh_mint_task(
|
|
|
|
|
+ mint_url.clone(),
|
|
|
|
|
+ client.clone(),
|
|
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
|
|
+ auth_client.clone(),
|
|
|
|
|
+ storages.clone(),
|
|
|
|
|
+ cache.clone(),
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ tracing::debug!("Refresh loop stopped for {}", mint_url);
|
|
|
}
|
|
}
|