|
@@ -7,12 +7,7 @@ use std::time::Duration;
|
|
|
use arc_swap::ArcSwap;
|
|
use arc_swap::ArcSwap;
|
|
|
use cdk_common::database::{self, WalletDatabase};
|
|
use cdk_common::database::{self, WalletDatabase};
|
|
|
use cdk_common::mint_url::MintUrl;
|
|
use cdk_common::mint_url::MintUrl;
|
|
|
-use cdk_common::parking_lot::RwLock as ParkingLotRwLock;
|
|
|
|
|
-use cdk_common::task::spawn;
|
|
|
|
|
-use cdk_common::util::unix_time;
|
|
|
|
|
use cdk_common::KeySet;
|
|
use cdk_common::KeySet;
|
|
|
-use tokio::sync::mpsc;
|
|
|
|
|
-use tokio::task::JoinHandle;
|
|
|
|
|
use tokio::time::sleep;
|
|
use tokio::time::sleep;
|
|
|
|
|
|
|
|
use super::MintKeyCache;
|
|
use super::MintKeyCache;
|
|
@@ -23,96 +18,42 @@ use crate::Error;
|
|
|
#[cfg(feature = "auth")]
|
|
#[cfg(feature = "auth")]
|
|
|
use crate::wallet::AuthMintConnector;
|
|
use crate::wallet::AuthMintConnector;
|
|
|
|
|
|
|
|
-/// Type alias for storage list to improve readability
|
|
|
|
|
-pub(super) type StorageList =
|
|
|
|
|
- Arc<ParkingLotRwLock<Vec<Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>>>>;
|
|
|
|
|
-
|
|
|
|
|
/// Messages for the background refresh task
|
|
/// Messages for the background refresh task
|
|
|
-#[derive(Debug, Clone)]
|
|
|
|
|
|
|
+#[derive(Debug)]
|
|
|
pub(super) enum MessageToWorker {
|
|
pub(super) enum MessageToWorker {
|
|
|
/// Stop the refresh task
|
|
/// Stop the refresh task
|
|
|
Stop,
|
|
Stop,
|
|
|
|
|
|
|
|
- /// Sync a new storage backend with the current cache
|
|
|
|
|
- SyncDb(Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>),
|
|
|
|
|
-
|
|
|
|
|
/// Fetch keys from the mint immediately
|
|
/// Fetch keys from the mint immediately
|
|
|
FetchMint,
|
|
FetchMint,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/// Shared worker state for a mint
|
|
|
|
|
-///
|
|
|
|
|
-/// One worker per mint_url, shared across all KeyManager instances for that mint.
|
|
|
|
|
-pub(super) struct SharedWorker {
|
|
|
|
|
- /// Mint URL
|
|
|
|
|
- pub(super) mint_url: MintUrl,
|
|
|
|
|
-
|
|
|
|
|
- /// All storages registered for this mint
|
|
|
|
|
- pub(super) storages: StorageList,
|
|
|
|
|
-
|
|
|
|
|
- /// Shared cache
|
|
|
|
|
- pub(super) cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
|
|
-
|
|
|
|
|
- /// Message sender to worker (bounded to 1000 messages)
|
|
|
|
|
- pub(super) tx: mpsc::Sender<MessageToWorker>,
|
|
|
|
|
-
|
|
|
|
|
- /// Worker task handle
|
|
|
|
|
- pub(super) task: Option<JoinHandle<()>>,
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-impl Debug for SharedWorker {
|
|
|
|
|
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
|
- f.debug_struct("SharedWorker")
|
|
|
|
|
- .field("mint_url", &self.mint_url)
|
|
|
|
|
- .field("storages", &self.storages.read().len().to_string())
|
|
|
|
|
- .field("tx", &self.tx.is_closed())
|
|
|
|
|
- .field("cache", &self.cache)
|
|
|
|
|
- .finish()
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-impl Drop for SharedWorker {
|
|
|
|
|
- fn drop(&mut self) {
|
|
|
|
|
- tracing::debug!("Dropping SharedWorker for {}", self.mint_url);
|
|
|
|
|
- let _ = self.tx.try_send(MessageToWorker::Stop).inspect_err(|e| {
|
|
|
|
|
- tracing::error!("Failed to send Stop message for {}: {}", self.mint_url, e)
|
|
|
|
|
- });
|
|
|
|
|
- if let Some(task) = self.task.take() {
|
|
|
|
|
- task.abort();
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
/// Load a specific keyset from database or HTTP
|
|
/// Load a specific keyset from database or HTTP
|
|
|
///
|
|
///
|
|
|
-/// First checks all databases for the keyset. If not found,
|
|
|
|
|
-/// fetches from the mint server via HTTP and persists to all databases.
|
|
|
|
|
|
|
+/// First checks the database for the keyset. If not found,
|
|
|
|
|
+/// fetches from the mint server via HTTP and persists to database.
|
|
|
async fn load_keyset_from_db_or_http(
|
|
async fn load_keyset_from_db_or_http(
|
|
|
mint_url: &MintUrl,
|
|
mint_url: &MintUrl,
|
|
|
client: &Arc<dyn MintConnector + Send + Sync>,
|
|
client: &Arc<dyn MintConnector + Send + Sync>,
|
|
|
- storages: &StorageList,
|
|
|
|
|
|
|
+ storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
keyset_id: &Id,
|
|
keyset_id: &Id,
|
|
|
) -> Result<KeySet, Error> {
|
|
) -> Result<KeySet, Error> {
|
|
|
- let storages_list = storages.read().clone();
|
|
|
|
|
-
|
|
|
|
|
- // Try all databases first
|
|
|
|
|
- for storage in &storages_list {
|
|
|
|
|
- if let Some(keys) = storage.get_keys(keyset_id).await? {
|
|
|
|
|
- tracing::debug!("Loaded keyset {} from database for {}", keyset_id, mint_url);
|
|
|
|
|
-
|
|
|
|
|
- // Get keyset info to construct KeySet
|
|
|
|
|
- if let Some(keyset_info) = storage.get_keyset_by_id(keyset_id).await? {
|
|
|
|
|
- return Ok(KeySet {
|
|
|
|
|
- id: *keyset_id,
|
|
|
|
|
- unit: keyset_info.unit,
|
|
|
|
|
- final_expiry: keyset_info.final_expiry,
|
|
|
|
|
- keys,
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Try database first
|
|
|
|
|
+ if let Some(keys) = storage.get_keys(keyset_id).await? {
|
|
|
|
|
+ tracing::debug!("Loaded keyset {} from database for {}", keyset_id, mint_url);
|
|
|
|
|
+
|
|
|
|
|
+ // Get keyset info to construct KeySet
|
|
|
|
|
+ if let Some(keyset_info) = storage.get_keyset_by_id(keyset_id).await? {
|
|
|
|
|
+ return Ok(KeySet {
|
|
|
|
|
+ id: *keyset_id,
|
|
|
|
|
+ unit: keyset_info.unit,
|
|
|
|
|
+ final_expiry: keyset_info.final_expiry,
|
|
|
|
|
+ keys,
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Not in any database, fetch from HTTP
|
|
|
|
|
|
|
+ // Not in database, fetch from HTTP
|
|
|
tracing::debug!(
|
|
tracing::debug!(
|
|
|
"Keyset {} not in database, fetching from mint server for {}",
|
|
"Keyset {} not in database, fetching from mint server for {}",
|
|
|
keyset_id,
|
|
keyset_id,
|
|
@@ -122,369 +63,310 @@ async fn load_keyset_from_db_or_http(
|
|
|
let keyset = client.get_mint_keyset(*keyset_id).await?;
|
|
let keyset = client.get_mint_keyset(*keyset_id).await?;
|
|
|
keyset.verify_id()?;
|
|
keyset.verify_id()?;
|
|
|
|
|
|
|
|
- // Persist to all databases
|
|
|
|
|
- for storage in &storages_list {
|
|
|
|
|
- let _ = storage.add_keys(keyset.clone()).await.inspect_err(|e| {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Failed to persist keyset {} for {}: {}",
|
|
|
|
|
- keyset_id,
|
|
|
|
|
- mint_url,
|
|
|
|
|
- e
|
|
|
|
|
- )
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Persist to database
|
|
|
|
|
+ storage.add_keys(keyset.clone()).await.inspect_err(|e| {
|
|
|
|
|
+ tracing::warn!(
|
|
|
|
|
+ "Failed to persist keyset {} for {}: {}",
|
|
|
|
|
+ keyset_id,
|
|
|
|
|
+ mint_url,
|
|
|
|
|
+ e
|
|
|
|
|
+ )
|
|
|
|
|
+ })?;
|
|
|
|
|
|
|
|
tracing::debug!("Loaded keyset {} from HTTP for {}", keyset_id, mint_url);
|
|
tracing::debug!("Loaded keyset {} from HTTP for {}", keyset_id, mint_url);
|
|
|
|
|
|
|
|
Ok(keyset)
|
|
Ok(keyset)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/// Load cached mint data from all registered storage backends
|
|
|
|
|
|
|
+/// Load cached mint data from storage backend
|
|
|
///
|
|
///
|
|
|
-/// Iterates through all storages, collecting unique keysets and keys.
|
|
|
|
|
|
|
+/// Loads keysets and keys from storage.
|
|
|
/// Marks cache as ready only if mint_info was found.
|
|
/// Marks cache as ready only if mint_info was found.
|
|
|
///
|
|
///
|
|
|
/// Returns a MintKeyCache that may or may not be ready depending on what was found.
|
|
/// Returns a MintKeyCache that may or may not be ready depending on what was found.
|
|
|
-async fn load_cache_from_storages(
|
|
|
|
|
|
|
+async fn load_cache_from_storage(
|
|
|
mint_url: &MintUrl,
|
|
mint_url: &MintUrl,
|
|
|
- storages: &StorageList,
|
|
|
|
|
|
|
+ storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
) -> Result<MintKeyCache, Error> {
|
|
) -> Result<MintKeyCache, Error> {
|
|
|
tracing::debug!("Loading cache from storage for {}", mint_url);
|
|
tracing::debug!("Loading cache from storage for {}", mint_url);
|
|
|
|
|
|
|
|
let mut cache = MintKeyCache::empty();
|
|
let mut cache = MintKeyCache::empty();
|
|
|
- let storages_list = storages.read().clone();
|
|
|
|
|
|
|
|
|
|
- for storage in storages_list {
|
|
|
|
|
- // Load mint info from first storage that has it
|
|
|
|
|
- if cache.mint_info.is_none() {
|
|
|
|
|
- cache.mint_info = storage.get_mint(mint_url.clone()).await?;
|
|
|
|
|
|
|
+ // Load mint info
|
|
|
|
|
+ match storage.get_mint(mint_url.clone()).await {
|
|
|
|
|
+ Ok(Some(mint_info)) => {
|
|
|
|
|
+ tracing::debug!("Found mint info in storage for {}", mint_url);
|
|
|
|
|
+ cache.mint_info = Some(mint_info);
|
|
|
}
|
|
}
|
|
|
|
|
+ Ok(None) => {
|
|
|
|
|
+ tracing::debug!("No mint info in storage for {}", mint_url);
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ tracing::warn!("Error loading mint info from storage for {}: {}", mint_url, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Collect unique keysets from all storages
|
|
|
|
|
- for keyset in storage
|
|
|
|
|
- .get_mint_keysets(mint_url.clone())
|
|
|
|
|
- .await?
|
|
|
|
|
- .unwrap_or_default()
|
|
|
|
|
- {
|
|
|
|
|
- let arc_keyset = Arc::new(keyset.clone());
|
|
|
|
|
- cache.keysets_by_id.insert(keyset.id, arc_keyset.clone());
|
|
|
|
|
|
|
+ // Load keysets
|
|
|
|
|
+ match storage.get_mint_keysets(mint_url.clone()).await {
|
|
|
|
|
+ Ok(Some(keysets)) if !keysets.is_empty() => {
|
|
|
|
|
+ tracing::debug!("Loaded {} keysets from storage for {}", keysets.len(), mint_url);
|
|
|
|
|
|
|
|
- if keyset.active {
|
|
|
|
|
- cache.active_keysets.push(arc_keyset);
|
|
|
|
|
|
|
+ for keyset in keysets {
|
|
|
|
|
+ cache.keysets_by_id.insert(keyset.id, Arc::new(keyset.clone()));
|
|
|
|
|
+
|
|
|
|
|
+ if keyset.active {
|
|
|
|
|
+ cache.active_keysets.push(Arc::new(keyset));
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ Ok(_) => {
|
|
|
|
|
+ tracing::debug!("No keysets in storage for {}", mint_url);
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ tracing::warn!("Error loading keysets from storage for {}: {}", mint_url, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Collect unique keys from all storages
|
|
|
|
|
- for id in cache.keysets_by_id.keys() {
|
|
|
|
|
- if !cache.keys_by_id.contains_key(id) {
|
|
|
|
|
- if let Some(keys) = storage.get_keys(id).await? {
|
|
|
|
|
- cache.keys_by_id.insert(*id, Arc::new(keys));
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Load keys for each keyset
|
|
|
|
|
+ for keyset_id in cache.keysets_by_id.keys() {
|
|
|
|
|
+ match storage.get_keys(keyset_id).await {
|
|
|
|
|
+ Ok(Some(keys)) => {
|
|
|
|
|
+ cache.keys_by_id.insert(*keyset_id, Arc::new(keys));
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(None) => {
|
|
|
|
|
+ tracing::debug!("No keys for keyset {} in storage for {}", keyset_id, mint_url);
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ tracing::warn!(
|
|
|
|
|
+ "Error loading keys for keyset {} from storage for {}: {}",
|
|
|
|
|
+ keyset_id,
|
|
|
|
|
+ mint_url,
|
|
|
|
|
+ e
|
|
|
|
|
+ );
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Set ready status based on whether we got mint_info
|
|
|
|
|
|
|
+ // Only mark ready if we have mint_info
|
|
|
cache.is_ready = cache.mint_info.is_some();
|
|
cache.is_ready = cache.mint_info.is_some();
|
|
|
|
|
|
|
|
- tracing::debug!(
|
|
|
|
|
- "Loaded {} keysets and {} keys from storage for {}",
|
|
|
|
|
- cache.keysets_by_id.len(),
|
|
|
|
|
- cache.keys_by_id.len(),
|
|
|
|
|
- mint_url
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
Ok(cache)
|
|
Ok(cache)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/// Finalize cache after successful update
|
|
|
|
|
-///
|
|
|
|
|
-/// Increments generation, marks as ready, and updates timestamp.
|
|
|
|
|
-fn finalize_cache(cache: &mut MintKeyCache, previous_version: u64) {
|
|
|
|
|
- cache.refresh_version = previous_version + 1;
|
|
|
|
|
- cache.is_ready = true;
|
|
|
|
|
- cache.last_refresh = std::time::Instant::now();
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-/// Write keys that are missing from storage
|
|
|
|
|
-///
|
|
|
|
|
-/// For each keyset in cache, checks if storage has the keys. If not, persists them.
|
|
|
|
|
-async fn write_missing_keys_to_storage(
|
|
|
|
|
- storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
|
|
|
|
+/// Persist the current cache to storage
|
|
|
|
|
+async fn write_cache_to_storage(
|
|
|
mint_url: &MintUrl,
|
|
mint_url: &MintUrl,
|
|
|
- cache: &MintKeyCache,
|
|
|
|
|
|
|
+ storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
|
|
+ cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
|
) {
|
|
) {
|
|
|
- for (keyset_id, keys) in &cache.keys_by_id {
|
|
|
|
|
- // Check if keys already exist in storage
|
|
|
|
|
- if storage.get_keys(keyset_id).await.ok().flatten().is_none() {
|
|
|
|
|
- // Keys don't exist, need to persist
|
|
|
|
|
- if let Some(keyset_info) = cache.keysets_by_id.get(keyset_id) {
|
|
|
|
|
- let keyset = KeySet {
|
|
|
|
|
- id: *keyset_id,
|
|
|
|
|
- unit: keyset_info.unit.clone(),
|
|
|
|
|
- final_expiry: keyset_info.final_expiry,
|
|
|
|
|
- keys: (**keys).clone(),
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- let _ = storage.add_keys(keyset).await.inspect_err(|e| {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Failed to persist keys for {} keyset {}: {}",
|
|
|
|
|
- mint_url,
|
|
|
|
|
- keyset_id,
|
|
|
|
|
- e
|
|
|
|
|
- )
|
|
|
|
|
- });
|
|
|
|
|
- } else {
|
|
|
|
|
- tracing::warn!("Missing keyset info for {}", keyset_id);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
|
|
+ let cache_snapshot = cache.load();
|
|
|
|
|
|
|
|
-/// Write complete cache to a single storage backend
|
|
|
|
|
-///
|
|
|
|
|
-/// Persists mint_info, keysets, and missing keys to the specified storage.
|
|
|
|
|
-async fn write_cache_to_storage(
|
|
|
|
|
- storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
|
|
- mint_url: MintUrl,
|
|
|
|
|
- cache: Arc<MintKeyCache>,
|
|
|
|
|
-) {
|
|
|
|
|
- // Write mint_info
|
|
|
|
|
- if let Some(ref mint_info) = cache.mint_info {
|
|
|
|
|
- let _ = storage
|
|
|
|
|
|
|
+ // Save mint info
|
|
|
|
|
+ if let Some(mint_info) = &cache_snapshot.mint_info {
|
|
|
|
|
+ storage
|
|
|
.add_mint(mint_url.clone(), Some(mint_info.clone()))
|
|
.add_mint(mint_url.clone(), Some(mint_info.clone()))
|
|
|
.await
|
|
.await
|
|
|
- .inspect_err(|e| tracing::warn!("Failed to persist mint_info for {}: {}", mint_url, e));
|
|
|
|
|
|
|
+ .inspect_err(|e| tracing::warn!("Failed to save mint info for {}: {}", mint_url, e))
|
|
|
|
|
+ .ok();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Write keysets
|
|
|
|
|
- let keysets: Vec<_> = cache
|
|
|
|
|
|
|
+ // Save keysets (via add_mint_keysets which takes mint_url and keysets)
|
|
|
|
|
+ let keysets: Vec<_> = cache_snapshot
|
|
|
.keysets_by_id
|
|
.keysets_by_id
|
|
|
.values()
|
|
.values()
|
|
|
.map(|ks| (**ks).clone())
|
|
.map(|ks| (**ks).clone())
|
|
|
.collect();
|
|
.collect();
|
|
|
- let _ = storage
|
|
|
|
|
- .add_mint_keysets(mint_url.clone(), keysets)
|
|
|
|
|
- .await
|
|
|
|
|
- .inspect_err(|e| tracing::warn!("Failed to persist keysets for {}: {}", mint_url, e));
|
|
|
|
|
|
|
|
|
|
- // Write any missing keys
|
|
|
|
|
- write_missing_keys_to_storage(&storage, &mint_url, &cache).await;
|
|
|
|
|
-}
|
|
|
|
|
|
|
+ if !keysets.is_empty() {
|
|
|
|
|
+ storage
|
|
|
|
|
+ .add_mint_keysets(mint_url.clone(), keysets)
|
|
|
|
|
+ .await
|
|
|
|
|
+ .inspect_err(|e| tracing::warn!("Failed to save keysets for {}: {}", mint_url, e))
|
|
|
|
|
+ .ok();
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
-/// Write cached data to all registered storage backends concurrently
|
|
|
|
|
-///
|
|
|
|
|
-/// Spawns a task for each storage to write in parallel.
|
|
|
|
|
-async fn write_cache_to_all_storages(
|
|
|
|
|
- mint_url: &MintUrl,
|
|
|
|
|
- storages: &StorageList,
|
|
|
|
|
- cache: Arc<MintKeyCache>,
|
|
|
|
|
-) {
|
|
|
|
|
- let storages_list = storages.read().clone();
|
|
|
|
|
-
|
|
|
|
|
- for storage in storages_list {
|
|
|
|
|
- spawn(write_cache_to_storage(
|
|
|
|
|
- storage,
|
|
|
|
|
- mint_url.clone(),
|
|
|
|
|
- cache.clone(),
|
|
|
|
|
- ));
|
|
|
|
|
|
|
+ // Save keys
|
|
|
|
|
+ for (keyset_id, keys) in &cache_snapshot.keys_by_id {
|
|
|
|
|
+ if let Some(keyset_info) = cache_snapshot.keysets_by_id.get(keyset_id) {
|
|
|
|
|
+ let keyset = KeySet {
|
|
|
|
|
+ id: *keyset_id,
|
|
|
|
|
+ unit: keyset_info.unit.clone(),
|
|
|
|
|
+ final_expiry: keyset_info.final_expiry,
|
|
|
|
|
+ keys: (**keys).clone(),
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ storage
|
|
|
|
|
+ .add_keys(keyset)
|
|
|
|
|
+ .await
|
|
|
|
|
+ .inspect_err(|e| {
|
|
|
|
|
+ tracing::warn!("Failed to save keys for keyset {} for {}: {}", keyset_id, mint_url, e)
|
|
|
|
|
+ })
|
|
|
|
|
+ .ok();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/// Try to load cache from storage and update shared cache if successful
|
|
|
|
|
-///
|
|
|
|
|
-/// Returns true if cache was successfully loaded and is ready, false otherwise.
|
|
|
|
|
-async fn try_load_cache_from_storages(
|
|
|
|
|
|
|
+/// Try to load cache from storage and update if successful
|
|
|
|
|
+async fn try_load_cache_from_storage(
|
|
|
mint_url: &MintUrl,
|
|
mint_url: &MintUrl,
|
|
|
- storages: &StorageList,
|
|
|
|
|
|
|
+ storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
|
-) -> bool {
|
|
|
|
|
- match load_cache_from_storages(mint_url, storages).await {
|
|
|
|
|
- Ok(mut new_cache) => {
|
|
|
|
|
- if new_cache.is_ready {
|
|
|
|
|
- let old_generation = cache.load().refresh_version;
|
|
|
|
|
- finalize_cache(&mut new_cache, old_generation);
|
|
|
|
|
- cache.store(Arc::new(new_cache));
|
|
|
|
|
- true
|
|
|
|
|
- } else {
|
|
|
|
|
- false
|
|
|
|
|
- }
|
|
|
|
|
|
|
+) {
|
|
|
|
|
+ match load_cache_from_storage(mint_url, storage).await {
|
|
|
|
|
+ Ok(loaded_cache) if loaded_cache.is_ready => {
|
|
|
|
|
+ tracing::info!("Successfully loaded cache from storage for {}", mint_url);
|
|
|
|
|
+ let old_version = cache.load().refresh_version;
|
|
|
|
|
+ let mut new_cache = loaded_cache;
|
|
|
|
|
+ new_cache.refresh_version = old_version + 1;
|
|
|
|
|
+ cache.store(Arc::new(new_cache));
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(_) => {
|
|
|
|
|
+ tracing::debug!("Storage cache for {} exists but not ready", mint_url);
|
|
|
}
|
|
}
|
|
|
Err(e) => {
|
|
Err(e) => {
|
|
|
tracing::warn!("Failed to load cache from storage for {}: {}", mint_url, e);
|
|
tracing::warn!("Failed to load cache from storage for {}: {}", mint_url, e);
|
|
|
- false
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/// Fetch complete mint data from HTTP and update cache
|
|
|
|
|
|
|
+/// Fetch fresh mint data from HTTP and update cache
|
|
|
///
|
|
///
|
|
|
-/// Multi-step process:
|
|
|
|
|
-/// 1. Fetches and validates mint info (checks time synchronization)
|
|
|
|
|
-/// 2. Fetches keyset list (including auth keysets if enabled)
|
|
|
|
|
-/// 3. Loads actual keyset keys (trying storage first, HTTP fallback)
|
|
|
|
|
-/// 4. Updates shared cache
|
|
|
|
|
-/// 5. Persists all data to all registered storages
|
|
|
|
|
-pub(super) async fn fetch_mint_data_from_http(
|
|
|
|
|
|
|
+/// Steps:
|
|
|
|
|
+/// 1. Fetches mint info from server
|
|
|
|
|
+/// 2. Fetches keyset list
|
|
|
|
|
+/// 3. Fetches keys for each keyset
|
|
|
|
|
+/// 4. Updates in-memory cache atomically
|
|
|
|
|
+/// 5. Persists all data to storage
|
|
|
|
|
+async fn fetch_mint_data_from_http(
|
|
|
mint_url: &MintUrl,
|
|
mint_url: &MintUrl,
|
|
|
client: &Arc<dyn MintConnector + Send + Sync>,
|
|
client: &Arc<dyn MintConnector + Send + Sync>,
|
|
|
- #[cfg(feature = "auth")] auth_client: &Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
- storages: &StorageList,
|
|
|
|
|
|
|
+ #[cfg(feature = "auth")] _auth_client: &Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
|
|
+ storage: &Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
cache: &Arc<ArcSwap<MintKeyCache>>,
|
|
|
-) -> Result<(), Error> {
|
|
|
|
|
- tracing::debug!("Fetching keys from mint server for {}", mint_url);
|
|
|
|
|
|
|
+) {
|
|
|
|
|
+ tracing::debug!("Fetching mint data from HTTP for {}", mint_url);
|
|
|
|
|
|
|
|
- // Fetch and validate mint info
|
|
|
|
|
- let mint_info = client.get_mint_info().await?;
|
|
|
|
|
|
|
+ let mut new_cache = MintKeyCache::empty();
|
|
|
|
|
|
|
|
- if let Some(mint_unix_time) = mint_info.time {
|
|
|
|
|
- let current_unix_time = unix_time();
|
|
|
|
|
- if current_unix_time.abs_diff(mint_unix_time) > 30 {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Mint time does not match wallet time. Mint: {}, Wallet: {}",
|
|
|
|
|
- mint_unix_time,
|
|
|
|
|
- current_unix_time
|
|
|
|
|
- );
|
|
|
|
|
- return Err(Error::MintTimeExceedsTolerance);
|
|
|
|
|
|
|
+ // Fetch mint info
|
|
|
|
|
+ match client.get_mint_info().await {
|
|
|
|
|
+ Ok(mint_info) => {
|
|
|
|
|
+ tracing::debug!("Fetched mint info for {}", mint_url);
|
|
|
|
|
+ new_cache.mint_info = Some(mint_info);
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ tracing::error!("Failed to fetch mint info for {}: {}", mint_url, e);
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Fetch keysets
|
|
// Fetch keysets
|
|
|
- let keysets_response = client.get_mint_keysets().await?;
|
|
|
|
|
- let mut keysets = keysets_response.keysets;
|
|
|
|
|
-
|
|
|
|
|
- // Include auth keysets if enabled
|
|
|
|
|
- #[cfg(feature = "auth")]
|
|
|
|
|
- if let Ok(auth_keysets_response) = auth_client.get_mint_blind_auth_keysets().await {
|
|
|
|
|
- keysets.extend_from_slice(&auth_keysets_response.keysets);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ let keysets = match client.get_mint_keysets().await {
|
|
|
|
|
+ Ok(response) => response.keysets,
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ tracing::error!("Failed to fetch keysets for {}: {}", mint_url, e);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ };
|
|
|
|
|
|
|
|
- // Build new cache
|
|
|
|
|
- let mut new_cache = MintKeyCache::empty();
|
|
|
|
|
|
|
+ tracing::debug!("Fetched {} keysets for {}", keysets.len(), mint_url);
|
|
|
|
|
|
|
|
|
|
+ // Fetch keys for each keyset
|
|
|
for keyset_info in keysets {
|
|
for keyset_info in keysets {
|
|
|
- let arc_keyset = Arc::new(keyset_info.clone());
|
|
|
|
|
|
|
+ let keyset_arc = Arc::new(keyset_info.clone());
|
|
|
new_cache
|
|
new_cache
|
|
|
.keysets_by_id
|
|
.keysets_by_id
|
|
|
- .insert(keyset_info.id, arc_keyset.clone());
|
|
|
|
|
|
|
+ .insert(keyset_info.id, keyset_arc.clone());
|
|
|
|
|
|
|
|
if keyset_info.active {
|
|
if keyset_info.active {
|
|
|
- new_cache.active_keysets.push(arc_keyset);
|
|
|
|
|
|
|
+ new_cache.active_keysets.push(keyset_arc);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Load keyset keys (try database first, then HTTP)
|
|
|
|
|
- if let Ok(keyset) = load_keyset_from_db_or_http(mint_url, client, storages, &keyset_info.id)
|
|
|
|
|
- .await
|
|
|
|
|
- .inspect_err(|e| {
|
|
|
|
|
- tracing::warn!(
|
|
|
|
|
- "Failed to load keyset {} for {}: {}",
|
|
|
|
|
- keyset_info.id,
|
|
|
|
|
- mint_url,
|
|
|
|
|
- e
|
|
|
|
|
- )
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ // Load keys (from DB or HTTP)
|
|
|
|
|
+ if let Ok(keyset) = load_keyset_from_db_or_http(mint_url, client, storage, &keyset_info.id).await
|
|
|
{
|
|
{
|
|
|
- new_cache
|
|
|
|
|
- .keys_by_id
|
|
|
|
|
- .insert(keyset_info.id, Arc::new(keyset.keys));
|
|
|
|
|
|
|
+ new_cache.keys_by_id.insert(keyset_info.id, Arc::new(keyset.keys));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ tracing::warn!(
|
|
|
|
|
+ "Failed to load keys for keyset {} for {}",
|
|
|
|
|
+ keyset_info.id,
|
|
|
|
|
+ mint_url
|
|
|
|
|
+ );
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Finalize cache
|
|
|
|
|
- let old_generation = cache.load().refresh_version;
|
|
|
|
|
- new_cache.mint_info = Some(mint_info);
|
|
|
|
|
- finalize_cache(&mut new_cache, old_generation);
|
|
|
|
|
|
|
+ // Update cache atomically
|
|
|
|
|
+ let old_version = cache.load().refresh_version;
|
|
|
|
|
+ new_cache.is_ready = true;
|
|
|
|
|
+ new_cache.last_refresh = std::time::Instant::now();
|
|
|
|
|
+ new_cache.refresh_version = old_version + 1;
|
|
|
|
|
|
|
|
- tracing::debug!(
|
|
|
|
|
- "Refreshed {} keysets and {} keys for {} (generation {})",
|
|
|
|
|
- new_cache.keysets_by_id.len(),
|
|
|
|
|
- new_cache.keys_by_id.len(),
|
|
|
|
|
|
|
+ tracing::info!(
|
|
|
|
|
+ "Updating cache for {} with {} keysets (version {})",
|
|
|
mint_url,
|
|
mint_url,
|
|
|
|
|
+ new_cache.keysets_by_id.len(),
|
|
|
new_cache.refresh_version
|
|
new_cache.refresh_version
|
|
|
);
|
|
);
|
|
|
|
|
|
|
|
- // Store updated cache
|
|
|
|
|
let cache_arc = Arc::new(new_cache);
|
|
let cache_arc = Arc::new(new_cache);
|
|
|
cache.store(cache_arc.clone());
|
|
cache.store(cache_arc.clone());
|
|
|
|
|
|
|
|
- // Persist to all storages
|
|
|
|
|
- write_cache_to_all_storages(mint_url, storages, cache_arc).await;
|
|
|
|
|
-
|
|
|
|
|
- Ok(())
|
|
|
|
|
|
|
+ // Persist to storage
|
|
|
|
|
+ write_cache_to_storage(mint_url, storage, cache).await;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/// Spawn task to refresh mint data (database-first, then HTTP)
|
|
|
|
|
|
|
+/// Execute a single refresh task
|
|
|
///
|
|
///
|
|
|
-/// First attempts to load from storage. If that fails or cache is still not ready,
|
|
|
|
|
-/// fetches from HTTP with a 60s timeout. All operations run in spawned task.
|
|
|
|
|
-pub(super) fn refresh_mint_task(
|
|
|
|
|
|
|
+/// Calls fetch_mint_data_from_http and handles any errors
|
|
|
|
|
+async fn refresh_mint_task(
|
|
|
mint_url: MintUrl,
|
|
mint_url: MintUrl,
|
|
|
client: Arc<dyn MintConnector + Send + Sync>,
|
|
client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
#[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
#[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
- storages: StorageList,
|
|
|
|
|
|
|
+ storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
cache: Arc<ArcSwap<MintKeyCache>>,
|
|
cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
) {
|
|
) {
|
|
|
- spawn(async move {
|
|
|
|
|
- // Try loading from storage first
|
|
|
|
|
- try_load_cache_from_storages(&mint_url, &storages, &cache).await;
|
|
|
|
|
-
|
|
|
|
|
- // If still not ready, fetch from HTTP
|
|
|
|
|
- let result = tokio::time::timeout(
|
|
|
|
|
- Duration::from_secs(60),
|
|
|
|
|
- fetch_mint_data_from_http(
|
|
|
|
|
- &mint_url,
|
|
|
|
|
- &client,
|
|
|
|
|
- #[cfg(feature = "auth")]
|
|
|
|
|
- &auth_client,
|
|
|
|
|
- &storages,
|
|
|
|
|
- &cache,
|
|
|
|
|
- ),
|
|
|
|
|
- )
|
|
|
|
|
- .await;
|
|
|
|
|
-
|
|
|
|
|
- // Log result
|
|
|
|
|
- let _ = result
|
|
|
|
|
- .map_err(|_| Error::Timeout)
|
|
|
|
|
- .and_then(|r| r)
|
|
|
|
|
- .inspect(|_| {
|
|
|
|
|
- tracing::debug!("Successfully refreshed keys for {}", mint_url);
|
|
|
|
|
- })
|
|
|
|
|
- .inspect_err(|e| match e {
|
|
|
|
|
- Error::Timeout => {
|
|
|
|
|
- tracing::error!("Timeout fetching keys from mint server for {}", mint_url)
|
|
|
|
|
- }
|
|
|
|
|
- _ => {
|
|
|
|
|
- tracing::error!("Failed to refresh keys for {}: {}", mint_url, e)
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ fetch_mint_data_from_http(
|
|
|
|
|
+ &mint_url,
|
|
|
|
|
+ &client,
|
|
|
|
|
+ #[cfg(feature = "auth")]
|
|
|
|
|
+ &auth_client,
|
|
|
|
|
+ &storage,
|
|
|
|
|
+ &cache,
|
|
|
|
|
+ )
|
|
|
|
|
+ .await;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/// Background refresh loop with message handling
|
|
|
|
|
|
|
+/// Background refresh loop for a single mint
|
|
|
///
|
|
///
|
|
|
-/// Runs independently and handles refresh messages and periodic updates.
|
|
|
|
|
-/// Uses a simple sleep-based refresh interval since each worker handles only one URL.
|
|
|
|
|
|
|
+/// Listens for messages and periodically refreshes mint data.
|
|
|
|
|
+/// Runs until a Stop message is received.
|
|
|
pub(super) async fn refresh_loop(
|
|
pub(super) async fn refresh_loop(
|
|
|
mint_url: MintUrl,
|
|
mint_url: MintUrl,
|
|
|
client: Arc<dyn MintConnector + Send + Sync>,
|
|
client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
#[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
#[cfg(feature = "auth")] auth_client: Arc<dyn AuthMintConnector + Send + Sync>,
|
|
|
- storages: StorageList,
|
|
|
|
|
|
|
+ storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
cache: Arc<ArcSwap<MintKeyCache>>,
|
|
cache: Arc<ArcSwap<MintKeyCache>>,
|
|
|
mut rx: tokio::sync::mpsc::Receiver<MessageToWorker>,
|
|
mut rx: tokio::sync::mpsc::Receiver<MessageToWorker>,
|
|
|
refresh_interval: Duration,
|
|
refresh_interval: Duration,
|
|
|
) {
|
|
) {
|
|
|
- // Perform initial refresh
|
|
|
|
|
|
|
+ tracing::debug!("Starting refresh loop for {} (interval: {:?})", mint_url, refresh_interval);
|
|
|
|
|
+
|
|
|
|
|
+ // Try to load from storage first
|
|
|
|
|
+ try_load_cache_from_storage(&mint_url, &storage, &cache).await;
|
|
|
|
|
+
|
|
|
|
|
+ // Perform initial refresh from HTTP
|
|
|
|
|
+ tracing::debug!("Performing initial HTTP refresh for {}", mint_url);
|
|
|
refresh_mint_task(
|
|
refresh_mint_task(
|
|
|
mint_url.clone(),
|
|
mint_url.clone(),
|
|
|
client.clone(),
|
|
client.clone(),
|
|
|
#[cfg(feature = "auth")]
|
|
#[cfg(feature = "auth")]
|
|
|
auth_client.clone(),
|
|
auth_client.clone(),
|
|
|
- storages.clone(),
|
|
|
|
|
|
|
+ storage.clone(),
|
|
|
cache.clone(),
|
|
cache.clone(),
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- println!("begin refresh_loop");
|
|
|
|
|
|
|
+ )
|
|
|
|
|
+ .await;
|
|
|
|
|
|
|
|
|
|
+ // Main event loop
|
|
|
loop {
|
|
loop {
|
|
|
tokio::select! {
|
|
tokio::select! {
|
|
|
Some(msg) = rx.recv() => {
|
|
Some(msg) = rx.recv() => {
|
|
@@ -493,39 +375,19 @@ pub(super) async fn refresh_loop(
|
|
|
tracing::debug!("Stopping refresh loop for {}", mint_url);
|
|
tracing::debug!("Stopping refresh loop for {}", mint_url);
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
- MessageToWorker::SyncDb(db) => {
|
|
|
|
|
- // Wait for cache to be ready, then sync this storage
|
|
|
|
|
- let cache_clone = cache.clone();
|
|
|
|
|
- let mint_clone = mint_url.clone();
|
|
|
|
|
- spawn(async move {
|
|
|
|
|
- loop {
|
|
|
|
|
- let current_cache = cache_clone.load();
|
|
|
|
|
- if current_cache.is_ready {
|
|
|
|
|
- write_cache_to_storage(
|
|
|
|
|
- db,
|
|
|
|
|
- mint_clone,
|
|
|
|
|
- current_cache.clone()
|
|
|
|
|
- ).await;
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- sleep(Duration::from_millis(100)).await;
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
MessageToWorker::FetchMint => {
|
|
MessageToWorker::FetchMint => {
|
|
|
- tracing::debug!("FetchMint message received for {}", mint_url);
|
|
|
|
|
|
|
+ tracing::debug!("Manual refresh triggered for {}", mint_url);
|
|
|
refresh_mint_task(
|
|
refresh_mint_task(
|
|
|
mint_url.clone(),
|
|
mint_url.clone(),
|
|
|
client.clone(),
|
|
client.clone(),
|
|
|
#[cfg(feature = "auth")]
|
|
#[cfg(feature = "auth")]
|
|
|
auth_client.clone(),
|
|
auth_client.clone(),
|
|
|
- storages.clone(),
|
|
|
|
|
|
|
+ storage.clone(),
|
|
|
cache.clone(),
|
|
cache.clone(),
|
|
|
- );
|
|
|
|
|
|
|
+ ).await;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
_ = sleep(refresh_interval) => {
|
|
_ = sleep(refresh_interval) => {
|
|
|
tracing::debug!("Time to refresh mint: {}", mint_url);
|
|
tracing::debug!("Time to refresh mint: {}", mint_url);
|
|
|
refresh_mint_task(
|
|
refresh_mint_task(
|
|
@@ -533,15 +395,12 @@ pub(super) async fn refresh_loop(
|
|
|
client.clone(),
|
|
client.clone(),
|
|
|
#[cfg(feature = "auth")]
|
|
#[cfg(feature = "auth")]
|
|
|
auth_client.clone(),
|
|
auth_client.clone(),
|
|
|
- storages.clone(),
|
|
|
|
|
|
|
+ storage.clone(),
|
|
|
cache.clone(),
|
|
cache.clone(),
|
|
|
- );
|
|
|
|
|
- } else => {
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ ).await;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- tracing::debug!("Refresh loop stopped for {}", mint_url);
|
|
|
|
|
- println!("end refresh_loop");
|
|
|
|
|
|
|
+ tracing::debug!("Refresh loop ended for {}", mint_url);
|
|
|
}
|
|
}
|