|
|
@@ -27,8 +27,9 @@ use std::time::{Duration, Instant};
|
|
|
use arc_swap::ArcSwap;
|
|
|
use cdk_common::database::{self, WalletDatabase};
|
|
|
use cdk_common::mint_url::MintUrl;
|
|
|
-use cdk_common::nuts::{CurrencyUnit, KeySetInfo, Keys};
|
|
|
+use cdk_common::nuts::{KeySetInfo, Keys};
|
|
|
use cdk_common::parking_lot::{Mutex as ParkingLotMutex, RwLock as ParkingLotRwLock};
|
|
|
+use cdk_common::task::spawn;
|
|
|
use cdk_common::util::unix_time;
|
|
|
use cdk_common::{KeySet, MintInfo};
|
|
|
use tokio::sync::{mpsc, Semaphore};
|
|
|
@@ -49,6 +50,9 @@ const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
|
|
|
/// Maximum concurrent HTTP requests to mint servers
|
|
|
const MAX_CONCURRENT_HTTP_REQUESTS: usize = 5;
|
|
|
|
|
|
+const MAX_RETRY: usize = 50;
|
|
|
+const RETRY_SLEEP: Duration = Duration::from_millis(100);
|
|
|
+
|
|
|
/// Manages refresh scheduling for mints
|
|
|
///
|
|
|
/// Tracks when each mint should be refreshed next. Uses BTreeMap for efficient
|
|
|
@@ -116,6 +120,11 @@ pub enum RefreshMessage {
|
|
|
/// Stop the refresh task
|
|
|
Stop,
|
|
|
|
|
|
+ /// Make sure the mint is loaded, from either any localstore or remotely.
|
|
|
+ ///
|
|
|
+ /// This function also make sure all stores have a copy of the mint info and keys
|
|
|
+ SyncMint(MintUrl),
|
|
|
+
|
|
|
/// Fetch keys for a specific mint immediately
|
|
|
FetchMint(MintUrl),
|
|
|
}
|
|
|
@@ -126,6 +135,9 @@ pub enum RefreshMessage {
|
|
|
/// The `refresh_version` increments on each update to detect when cache has changed.
|
|
|
#[derive(Clone, Debug)]
|
|
|
struct MintKeyCache {
|
|
|
+ /// If the cache is ready
|
|
|
+ is_ready: bool,
|
|
|
+
|
|
|
/// Mint info from server
|
|
|
mint_info: Option<MintInfo>,
|
|
|
|
|
|
@@ -148,6 +160,7 @@ struct MintKeyCache {
|
|
|
impl MintKeyCache {
|
|
|
fn empty() -> Self {
|
|
|
Self {
|
|
|
+ is_ready: false,
|
|
|
mint_info: None,
|
|
|
keysets_by_id: HashMap::new(),
|
|
|
active_keysets: Vec::new(),
|
|
|
@@ -181,9 +194,6 @@ struct MintRegistration {
|
|
|
/// Mint URL
|
|
|
mint_url: MintUrl,
|
|
|
|
|
|
- /// Currency unit
|
|
|
- unit: CurrencyUnit,
|
|
|
-
|
|
|
/// External resources (storage + client)
|
|
|
resources: Arc<ParkingLotRwLock<HashMap<usize, MintResources>>>,
|
|
|
|
|
|
@@ -195,7 +205,6 @@ impl std::fmt::Debug for MintRegistration {
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
f.debug_struct("MintRegistration")
|
|
|
.field("mint_url", &self.mint_url)
|
|
|
- .field("unit", &self.unit)
|
|
|
.field("resources", &"<MintResources>")
|
|
|
.field("cache", &self.cache)
|
|
|
.finish()
|
|
|
@@ -288,7 +297,7 @@ impl KeyManager {
|
|
|
|
|
|
let mints = manager.mints.clone();
|
|
|
let refresh_interval = manager.refresh_interval;
|
|
|
- let task = tokio::spawn(async move {
|
|
|
+ let task = spawn(async move {
|
|
|
Self::refresh_loop(rx, mints, refresh_interval, refresh_semaphore).await;
|
|
|
});
|
|
|
|
|
|
@@ -315,11 +324,10 @@ impl KeyManager {
|
|
|
pub fn register_mint(
|
|
|
&self,
|
|
|
mint_url: MintUrl,
|
|
|
- unit: CurrencyUnit,
|
|
|
storage: Arc<dyn WalletDatabase<Err = database::Error> + Send + Sync>,
|
|
|
client: Arc<dyn MintConnector + Send + Sync>,
|
|
|
) -> Arc<KeySubscription> {
|
|
|
- debug!("Registering mint: {} ({})", mint_url, unit);
|
|
|
+ debug!("Registering mint: {}", mint_url);
|
|
|
let mut mints = self.mints.write();
|
|
|
|
|
|
let mint = mints.entry(mint_url.clone()).or_insert_with(|| {
|
|
|
@@ -327,7 +335,6 @@ impl KeyManager {
|
|
|
|
|
|
MintRegistration {
|
|
|
mint_url: mint_url.clone(),
|
|
|
- unit,
|
|
|
resources: Arc::new(ParkingLotRwLock::new(HashMap::new())),
|
|
|
cache,
|
|
|
}
|
|
|
@@ -337,12 +344,6 @@ impl KeyManager {
|
|
|
.counter
|
|
|
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
|
|
|
|
|
|
- tokio::spawn(Self::persist_cache_db(
|
|
|
- storage.clone(),
|
|
|
- mint_url.clone(),
|
|
|
- mint.cache.load().clone(),
|
|
|
- ));
|
|
|
-
|
|
|
#[cfg(feature = "auth")]
|
|
|
let mint_resource = MintResources {
|
|
|
storage,
|
|
|
@@ -359,7 +360,7 @@ impl KeyManager {
|
|
|
|
|
|
debug!("Mint registered: {}", mint_url);
|
|
|
|
|
|
- self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
|
|
|
+ self.send_message(RefreshMessage::SyncMint(mint_url.clone()));
|
|
|
|
|
|
Arc::new(KeySubscription {
|
|
|
mints: self.mints.clone(),
|
|
|
@@ -390,124 +391,177 @@ impl KeyManager {
|
|
|
/// Returns keys from cache if available. If not cached, triggers a refresh
|
|
|
/// and waits up to 2 seconds for the keys to arrive.
|
|
|
pub async fn get_keys(&self, mint_url: &MintUrl, keyset_id: &Id) -> Result<Arc<Keys>, Error> {
|
|
|
- if let Ok(keys) = self.get_keys_sync(mint_url, keyset_id) {
|
|
|
- return Ok(keys.clone());
|
|
|
- }
|
|
|
-
|
|
|
- tracing::debug!(
|
|
|
- "Keyset {} not in cache, triggering refresh for mint {}",
|
|
|
- keyset_id,
|
|
|
- mint_url
|
|
|
- );
|
|
|
-
|
|
|
- self.refresh_now(mint_url);
|
|
|
+ let shared_cache = {
|
|
|
+ let mints = self.mints.read();
|
|
|
+ let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
+ registration.cache.clone()
|
|
|
+ };
|
|
|
|
|
|
- for _ in 0..20 {
|
|
|
- if let Ok(keys) = self.get_keys_sync(mint_url, keyset_id) {
|
|
|
- return Ok(keys.clone());
|
|
|
+ for _ in 0..MAX_RETRY {
|
|
|
+ let cache = shared_cache.load();
|
|
|
+ if cache.is_ready {
|
|
|
+ return cache
|
|
|
+ .keys_by_id
|
|
|
+ .get(keyset_id)
|
|
|
+ .cloned()
|
|
|
+ .ok_or(Error::UnknownKeySet);
|
|
|
}
|
|
|
- tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
|
+ self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
|
|
|
+ tokio::time::sleep(RETRY_SLEEP).await;
|
|
|
}
|
|
|
|
|
|
Err(Error::UnknownKeySet)
|
|
|
}
|
|
|
|
|
|
- /// Get keys for a keyset (cache-only, no refresh)
|
|
|
- ///
|
|
|
- /// Returns keys immediately from cache. Returns error if not cached.
|
|
|
- /// Does not trigger any background fetches.
|
|
|
- pub fn get_keys_sync(&self, mint_url: &MintUrl, keyset_id: &Id) -> Result<Arc<Keys>, Error> {
|
|
|
- let mints = self.mints.read();
|
|
|
- let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
- let cache = registration.cache.load();
|
|
|
- cache
|
|
|
- .keys_by_id
|
|
|
- .get(keyset_id)
|
|
|
- .cloned()
|
|
|
- .ok_or(Error::UnknownKeySet)
|
|
|
- }
|
|
|
-
|
|
|
/// Get keyset info by ID (cache-first with automatic refresh)
|
|
|
pub async fn get_keyset_by_id(
|
|
|
&self,
|
|
|
mint_url: &MintUrl,
|
|
|
keyset_id: &Id,
|
|
|
) -> Result<Arc<KeySetInfo>, Error> {
|
|
|
- if let Ok(keysets) = self.get_keyset_by_id_sync(mint_url, keyset_id) {
|
|
|
- return Ok(keysets);
|
|
|
+ let shared_cache = {
|
|
|
+ let mints = self.mints.read();
|
|
|
+ let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
+ registration.cache.clone()
|
|
|
+ };
|
|
|
+
|
|
|
+ for _ in 0..MAX_RETRY {
|
|
|
+ let cache = shared_cache.load();
|
|
|
+ if cache.is_ready {
|
|
|
+ return cache
|
|
|
+ .keysets_by_id
|
|
|
+ .get(keyset_id)
|
|
|
+ .cloned()
|
|
|
+ .ok_or(Error::UnknownKeySet);
|
|
|
+ }
|
|
|
+ self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
|
|
|
+ tokio::time::sleep(RETRY_SLEEP).await;
|
|
|
}
|
|
|
|
|
|
- self.refresh_now(mint_url);
|
|
|
+ Err(Error::UnknownKeySet)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Get all keysets for a mint (cache-first with automatic refresh)
|
|
|
+ pub async fn get_keysets(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
|
|
|
+ let shared_cache = {
|
|
|
+ let mints = self.mints.read();
|
|
|
+ let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
+ registration.cache.clone()
|
|
|
+ };
|
|
|
|
|
|
- for _ in 0..20 {
|
|
|
- if let Ok(keysets) = self.get_keyset_by_id_sync(mint_url, keyset_id) {
|
|
|
- return Ok(keysets);
|
|
|
+ for _ in 0..MAX_RETRY {
|
|
|
+ let cache = shared_cache.load();
|
|
|
+ if cache.is_ready {
|
|
|
+ let keysets: Vec<KeySetInfo> = cache
|
|
|
+ .keysets_by_id
|
|
|
+ .values()
|
|
|
+ .map(|ks| (**ks).clone())
|
|
|
+ .collect();
|
|
|
+ return if keysets.is_empty() {
|
|
|
+ Err(Error::UnknownKeySet)
|
|
|
+ } else {
|
|
|
+ Ok(keysets)
|
|
|
+ };
|
|
|
}
|
|
|
- tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
|
+
|
|
|
+ self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
|
|
|
+ tokio::time::sleep(RETRY_SLEEP).await;
|
|
|
}
|
|
|
|
|
|
Err(Error::UnknownKeySet)
|
|
|
}
|
|
|
|
|
|
- /// Get keyset info by ID (cache-only, no refresh)
|
|
|
- pub fn get_keyset_by_id_sync(
|
|
|
+ /// Get all active keysets for a mint (cache-only, no refresh)
|
|
|
+ pub async fn get_active_keysets(
|
|
|
&self,
|
|
|
mint_url: &MintUrl,
|
|
|
- keyset_id: &Id,
|
|
|
- ) -> Result<Arc<KeySetInfo>, Error> {
|
|
|
- let mints = self.mints.read();
|
|
|
- let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
- let cache = registration.cache.load();
|
|
|
- cache
|
|
|
- .keysets_by_id
|
|
|
- .get(keyset_id)
|
|
|
- .cloned()
|
|
|
- .ok_or(Error::UnknownKeySet)
|
|
|
- }
|
|
|
+ ) -> Result<Vec<Arc<KeySetInfo>>, Error> {
|
|
|
+ let shared_cache = {
|
|
|
+ let mints = self.mints.read();
|
|
|
+ let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
+ registration.cache.clone()
|
|
|
+ };
|
|
|
|
|
|
- /// Get all keysets for a mint (cache-first with automatic refresh)
|
|
|
- pub async fn get_keysets(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
|
|
|
- if let Ok(keysets) = self.get_keysets_sync(mint_url) {
|
|
|
- return Ok(keysets);
|
|
|
+ for _ in 0..MAX_RETRY {
|
|
|
+ let cache = shared_cache.load();
|
|
|
+ if cache.is_ready {
|
|
|
+ return Ok(cache.active_keysets.clone());
|
|
|
+ }
|
|
|
+ self.send_message(RefreshMessage::SyncMint(mint_url.to_owned()));
|
|
|
+ tokio::time::sleep(RETRY_SLEEP).await;
|
|
|
}
|
|
|
|
|
|
- self.refresh_now(mint_url);
|
|
|
+ Err(Error::UnknownKeySet)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Load a specific keyset from database or HTTP
|
|
|
+ ///
|
|
|
+ /// First checks all registered databases for the keyset. If not found,
|
|
|
+ /// fetches from the mint server via HTTP and persists to all databases.
|
|
|
+ async fn load_keyset_from_db_or_http(
|
|
|
+ registration: &MintRegistration,
|
|
|
+ keyset_id: &Id,
|
|
|
+ ) -> Result<KeySet, Error> {
|
|
|
+ let storages = registration
|
|
|
+ .resources
|
|
|
+ .read()
|
|
|
+ .values()
|
|
|
+ .map(|resource| resource.storage.clone())
|
|
|
+ .collect::<Vec<_>>();
|
|
|
+
|
|
|
+ // Try database first
|
|
|
+ for storage in &storages {
|
|
|
+ if let Some(keys) = storage.get_keys(keyset_id).await? {
|
|
|
+ debug!(
|
|
|
+ "Loaded keyset {} from database for {}",
|
|
|
+ keyset_id, registration.mint_url
|
|
|
+ );
|
|
|
|
|
|
- for _ in 0..20 {
|
|
|
- if let Ok(keysets) = self.get_keysets_sync(mint_url) {
|
|
|
- return Ok(keysets);
|
|
|
+ // Get keyset info to construct KeySet
|
|
|
+ if let Some(keyset_info) = storage.get_keyset_by_id(keyset_id).await? {
|
|
|
+ return Ok(KeySet {
|
|
|
+ id: *keyset_id,
|
|
|
+ unit: keyset_info.unit,
|
|
|
+ final_expiry: keyset_info.final_expiry,
|
|
|
+ keys,
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
- tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
|
}
|
|
|
|
|
|
- Err(Error::UnknownKeySet)
|
|
|
- }
|
|
|
+ // Not in database, fetch from HTTP
|
|
|
+ debug!(
|
|
|
+ "Keyset {} not in database, fetching from mint server for {}",
|
|
|
+ keyset_id, registration.mint_url
|
|
|
+ );
|
|
|
|
|
|
- /// Get all keysets for a mint (cache-only, no refresh)
|
|
|
- pub fn get_keysets_sync(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
|
|
|
- let mints = self.mints.read();
|
|
|
- let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
- let cache = registration.cache.load();
|
|
|
- let keysets: Vec<KeySetInfo> = cache
|
|
|
- .keysets_by_id
|
|
|
+ let http_client = registration
|
|
|
+ .resources
|
|
|
+ .read()
|
|
|
.values()
|
|
|
- .map(|ks| (**ks).clone())
|
|
|
- .collect();
|
|
|
- if keysets.is_empty() {
|
|
|
- Err(Error::UnknownKeySet)
|
|
|
- } else {
|
|
|
- Ok(keysets)
|
|
|
+ .next()
|
|
|
+ .ok_or(Error::IncorrectMint)?
|
|
|
+ .client
|
|
|
+ .clone();
|
|
|
+
|
|
|
+ let keyset = http_client.get_mint_keyset(*keyset_id).await?;
|
|
|
+
|
|
|
+ // Persist to all databases
|
|
|
+ for storage in &storages {
|
|
|
+ let _ = storage.add_keys(keyset.clone()).await.inspect_err(|e| {
|
|
|
+ warn!(
|
|
|
+ "Failed to persist keyset {} for {}: {}",
|
|
|
+ keyset_id, registration.mint_url, e
|
|
|
+ )
|
|
|
+ });
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- /// Get all active keysets for a mint (cache-only, no refresh)
|
|
|
- pub fn get_active_keysets(&self, mint_url: &MintUrl) -> Vec<Arc<KeySetInfo>> {
|
|
|
- let mints = self.mints.read();
|
|
|
- let Some(registration) = mints.get(mint_url) else {
|
|
|
- return Vec::new();
|
|
|
- };
|
|
|
- let cache = registration.cache.load();
|
|
|
- cache.active_keysets.clone()
|
|
|
+ debug!(
|
|
|
+ "Loaded keyset {} from HTTP for {}",
|
|
|
+ keyset_id, registration.mint_url
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(keyset)
|
|
|
}
|
|
|
|
|
|
/// Load mint info and keys from all registered databases
|
|
|
@@ -549,7 +603,7 @@ impl KeyManager {
|
|
|
.keysets_by_id
|
|
|
.insert(keyset.id, arc_keyset.clone());
|
|
|
|
|
|
- if keyset.active && keyset.unit == registration.unit {
|
|
|
+ if keyset.active {
|
|
|
storage_cache.active_keysets.push(arc_keyset);
|
|
|
}
|
|
|
}
|
|
|
@@ -566,7 +620,8 @@ impl KeyManager {
|
|
|
}
|
|
|
|
|
|
let keys_count = storage_cache.keys_by_id.len();
|
|
|
- storage_cache.refresh_version = storage_cache.refresh_version + 1;
|
|
|
+ storage_cache.refresh_version += 1;
|
|
|
+ storage_cache.is_ready = true;
|
|
|
|
|
|
let storage_cache = Arc::new(storage_cache);
|
|
|
registration.cache.store(storage_cache.clone());
|
|
|
@@ -590,6 +645,10 @@ impl KeyManager {
|
|
|
mint_url: MintUrl,
|
|
|
new_cache: Arc<MintKeyCache>,
|
|
|
) {
|
|
|
+ if !new_cache.is_ready {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
if new_cache.mint_info.is_some() {
|
|
|
let _ = storage
|
|
|
.add_mint(mint_url.clone(), new_cache.mint_info.clone())
|
|
|
@@ -651,7 +710,7 @@ impl KeyManager {
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
for storage in storages {
|
|
|
- tokio::spawn(Self::persist_cache_db(
|
|
|
+ spawn(Self::persist_cache_db(
|
|
|
storage,
|
|
|
registration.mint_url.clone(),
|
|
|
new_cache.clone(),
|
|
|
@@ -728,14 +787,19 @@ impl KeyManager {
|
|
|
.keysets_by_id
|
|
|
.insert(keyset_info.id, arc_keyset.clone());
|
|
|
|
|
|
- if keyset_info.active && keyset_info.unit == registration.unit {
|
|
|
+ if keyset_info.active {
|
|
|
new_cache.active_keysets.push(arc_keyset);
|
|
|
}
|
|
|
|
|
|
- if let Ok(keyset) = http_client
|
|
|
- .get_mint_keyset(keyset_info.id)
|
|
|
+ // Try to load keyset from database first, then HTTP
|
|
|
+ if let Ok(keyset) = Self::load_keyset_from_db_or_http(®istration, &keyset_info.id)
|
|
|
.await
|
|
|
- .inspect_err(|e| warn!("Failed to fetch keys for keyset {}: {}", keyset_info.id, e))
|
|
|
+ .inspect_err(|e| {
|
|
|
+ warn!(
|
|
|
+ "Failed to load keyset {} for {}: {}",
|
|
|
+ keyset_info.id, registration.mint_url, e
|
|
|
+ )
|
|
|
+ })
|
|
|
{
|
|
|
let keys = Arc::new(keyset.keys.clone());
|
|
|
new_cache.keys_by_id.insert(keyset_info.id, keys);
|
|
|
@@ -747,6 +811,7 @@ impl KeyManager {
|
|
|
let old_generation = registration.cache.load().refresh_version;
|
|
|
new_cache.mint_info = Some(mint_info);
|
|
|
new_cache.refresh_version = old_generation + 1;
|
|
|
+ new_cache.is_ready = true;
|
|
|
new_cache.last_refresh = Instant::now();
|
|
|
|
|
|
debug!(
|
|
|
@@ -764,17 +829,42 @@ impl KeyManager {
|
|
|
Ok::<(), Error>(())
|
|
|
}
|
|
|
|
|
|
+ fn sync_mint_task(registration: MintRegistration, refresh_scheduler: RefreshScheduler) {
|
|
|
+ spawn(async move {
|
|
|
+ if !registration.cache.load().is_ready {
|
|
|
+ let _ = Self::fetch_mint_info_and_keys_from_db(®istration)
|
|
|
+ .await
|
|
|
+ .inspect_err(|e| {
|
|
|
+ warn!(
|
|
|
+ "Failed to load keys from storage for {}: {}",
|
|
|
+ registration.mint_url, e
|
|
|
+ )
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ if !registration.cache.load().is_ready {
|
|
|
+ let mint_url = registration.mint_url.clone();
|
|
|
+ let _ = tokio::time::timeout(
|
|
|
+ Duration::from_secs(60),
|
|
|
+ Self::fetch_from_http(registration, refresh_scheduler),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ .inspect_err(|e| warn!("Failed to fetch keys for {} with error {}", mint_url, e));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
/// Refresh keys from mint server
|
|
|
///
|
|
|
/// Spawns an async task with 60s timeout. HTTP requests are limited to
|
|
|
/// MAX_CONCURRENT_HTTP_REQUESTS concurrent requests via semaphore.
|
|
|
- fn refresh_mint_task(
|
|
|
+ fn fetch_and_sync_mint_task(
|
|
|
registration: MintRegistration,
|
|
|
semaphore: Arc<Semaphore>,
|
|
|
refresh_scheduler: RefreshScheduler,
|
|
|
) {
|
|
|
- tokio::spawn(async move {
|
|
|
- if registration.cache.load().keysets_by_id.is_empty() {
|
|
|
+ spawn(async move {
|
|
|
+ if !registration.cache.load().is_ready {
|
|
|
let _ = Self::fetch_mint_info_and_keys_from_db(®istration)
|
|
|
.await
|
|
|
.inspect_err(|e| {
|
|
|
@@ -802,9 +892,8 @@ impl KeyManager {
|
|
|
|
|
|
let mint_url = registration.mint_url.clone();
|
|
|
|
|
|
- let timeout = Duration::from_secs(60);
|
|
|
let result = tokio::time::timeout(
|
|
|
- timeout,
|
|
|
+ Duration::from_secs(60),
|
|
|
Self::fetch_from_http(registration, refresh_scheduler),
|
|
|
)
|
|
|
.await;
|
|
|
@@ -861,6 +950,19 @@ impl KeyManager {
|
|
|
debug!("Stopping refresh task");
|
|
|
break;
|
|
|
}
|
|
|
+ RefreshMessage::SyncMint(mint_url) => {
|
|
|
+ debug!("Sync all instances of {}", mint_url);
|
|
|
+ let registration = {
|
|
|
+ let mints_lock = mints.read();
|
|
|
+ mints_lock.get(&mint_url).cloned()
|
|
|
+ };
|
|
|
+
|
|
|
+ if let Some(reg) = registration {
|
|
|
+ Self::sync_mint_task(reg, refresh_scheduler.clone());
|
|
|
+ } else {
|
|
|
+ warn!("FetchMint: Mint not registered: {}", mint_url);
|
|
|
+ }
|
|
|
+ }
|
|
|
RefreshMessage::FetchMint(mint_url) => {
|
|
|
debug!("FetchMint message received for {}", mint_url);
|
|
|
let registration = {
|
|
|
@@ -869,7 +971,7 @@ impl KeyManager {
|
|
|
};
|
|
|
|
|
|
if let Some(reg) = registration {
|
|
|
- Self::refresh_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
|
|
|
+ Self::fetch_and_sync_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
|
|
|
} else {
|
|
|
warn!("FetchMint: Mint not registered: {}", mint_url);
|
|
|
}
|
|
|
@@ -893,7 +995,7 @@ impl KeyManager {
|
|
|
};
|
|
|
|
|
|
if let Some(reg) = registration {
|
|
|
- Self::refresh_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
|
|
|
+ Self::fetch_and_sync_mint_task(reg, semaphore.clone(), refresh_scheduler.clone());
|
|
|
} else {
|
|
|
warn!("Mint no longer registered: {}", mint_url);
|
|
|
}
|
|
|
@@ -910,20 +1012,19 @@ impl KeyManager {
|
|
|
/// Sends a refresh message to the background task and waits up to 2 seconds
|
|
|
/// for the cache to be updated with a newer version.
|
|
|
pub async fn refresh(&self, mint_url: &MintUrl) -> Result<Vec<KeySetInfo>, Error> {
|
|
|
- let last_version = {
|
|
|
+ let shared_cache = {
|
|
|
let mints = self.mints.read();
|
|
|
let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
- let cache = registration.cache.load();
|
|
|
- cache.refresh_version
|
|
|
+ registration.cache.clone()
|
|
|
};
|
|
|
|
|
|
+ let last_version = shared_cache.load().refresh_version;
|
|
|
+
|
|
|
self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
|
|
|
|
|
|
- for _ in 0..200 {
|
|
|
+ for _ in 0..MAX_RETRY {
|
|
|
if let Some(keysets) = {
|
|
|
- let mints = self.mints.read();
|
|
|
- let registration = mints.get(mint_url).ok_or(Error::IncorrectMint)?;
|
|
|
- let cache = registration.cache.load();
|
|
|
+ let cache = shared_cache.load();
|
|
|
if last_version > 0 || cache.refresh_version > 0 {
|
|
|
Some(
|
|
|
cache
|
|
|
@@ -939,7 +1040,7 @@ impl KeyManager {
|
|
|
return Ok(keysets);
|
|
|
}
|
|
|
|
|
|
- tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
|
+ tokio::time::sleep(RETRY_SLEEP).await;
|
|
|
}
|
|
|
|
|
|
Err(Error::UnknownKeySet)
|
|
|
@@ -949,38 +1050,6 @@ impl KeyManager {
|
|
|
pub fn refresh_now(&self, mint_url: &MintUrl) {
|
|
|
self.send_message(RefreshMessage::FetchMint(mint_url.clone()));
|
|
|
}
|
|
|
-
|
|
|
- /// Get number of cached keys for a mint
|
|
|
- pub fn cached_keys_count(&self, mint_url: &MintUrl) -> usize {
|
|
|
- let mints = self.mints.read();
|
|
|
- let Some(registration) = mints.get(mint_url) else {
|
|
|
- return 0;
|
|
|
- };
|
|
|
- registration.cache.load().keys_by_id.len()
|
|
|
- }
|
|
|
-
|
|
|
- /// Get number of cached keysets for a mint
|
|
|
- pub fn cached_keysets_count(&self, mint_url: &MintUrl) -> usize {
|
|
|
- let mints = self.mints.read();
|
|
|
- let Some(registration) = mints.get(mint_url) else {
|
|
|
- return 0;
|
|
|
- };
|
|
|
- registration.cache.load().keysets_by_id.len()
|
|
|
- }
|
|
|
-
|
|
|
- /// Get number of available HTTP request slots
|
|
|
- pub fn available_http_slots(&self) -> usize {
|
|
|
- self.refresh_semaphore.available_permits()
|
|
|
- }
|
|
|
-
|
|
|
- /// Invalidate cache for a specific mint
|
|
|
- pub fn invalidate(&self, mint_url: &MintUrl) {
|
|
|
- let mints = self.mints.read();
|
|
|
- if let Some(registration) = mints.get(mint_url) {
|
|
|
- registration.cache.store(Arc::new(MintKeyCache::empty()));
|
|
|
- debug!("Cache invalidated for {}", mint_url);
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
impl Drop for KeyManager {
|