|
@@ -8,6 +8,7 @@
|
|
use std::path::PathBuf;
|
|
use std::path::PathBuf;
|
|
use std::pin::Pin;
|
|
use std::pin::Pin;
|
|
use std::str::FromStr;
|
|
use std::str::FromStr;
|
|
|
|
+use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::Arc;
|
|
use std::sync::Arc;
|
|
|
|
|
|
use anyhow::anyhow;
|
|
use anyhow::anyhow;
|
|
@@ -30,6 +31,7 @@ use fedimint_tonic_lnd::lnrpc::FeeLimit;
|
|
use fedimint_tonic_lnd::Client;
|
|
use fedimint_tonic_lnd::Client;
|
|
use futures::{Stream, StreamExt};
|
|
use futures::{Stream, StreamExt};
|
|
use tokio::sync::Mutex;
|
|
use tokio::sync::Mutex;
|
|
|
|
+use tokio_util::sync::CancellationToken;
|
|
|
|
|
|
pub mod error;
|
|
pub mod error;
|
|
|
|
|
|
@@ -43,6 +45,8 @@ pub struct Lnd {
|
|
fee_reserve: FeeReserve,
|
|
fee_reserve: FeeReserve,
|
|
mint_settings: MintMethodSettings,
|
|
mint_settings: MintMethodSettings,
|
|
melt_settings: MeltMethodSettings,
|
|
melt_settings: MeltMethodSettings,
|
|
|
|
+ wait_invoice_cancel_token: CancellationToken,
|
|
|
|
+ wait_invoice_is_active: Arc<AtomicBool>,
|
|
}
|
|
}
|
|
|
|
|
|
impl Lnd {
|
|
impl Lnd {
|
|
@@ -70,6 +74,8 @@ impl Lnd {
|
|
fee_reserve,
|
|
fee_reserve,
|
|
mint_settings,
|
|
mint_settings,
|
|
melt_settings,
|
|
melt_settings,
|
|
|
|
+ wait_invoice_cancel_token: CancellationToken::new(),
|
|
|
|
+ wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
|
|
})
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -89,11 +95,11 @@ impl MintLightning for Lnd {
|
|
}
|
|
}
|
|
|
|
|
|
fn is_wait_invoice_active(&self) -> bool {
|
|
fn is_wait_invoice_active(&self) -> bool {
|
|
- todo!()
|
|
|
|
|
|
+ self.wait_invoice_is_active.load(Ordering::SeqCst)
|
|
}
|
|
}
|
|
|
|
|
|
fn cancel_wait_invoice(&self) {
|
|
fn cancel_wait_invoice(&self) {
|
|
- todo!()
|
|
|
|
|
|
+ self.wait_invoice_cancel_token.cancel()
|
|
}
|
|
}
|
|
|
|
|
|
async fn wait_any_invoice(
|
|
async fn wait_any_invoice(
|
|
@@ -116,19 +122,52 @@ impl MintLightning for Lnd {
|
|
.unwrap()
|
|
.unwrap()
|
|
.into_inner();
|
|
.into_inner();
|
|
|
|
|
|
- Ok(futures::stream::unfold(stream, |mut stream| async move {
|
|
|
|
- match stream.message().await {
|
|
|
|
- Ok(Some(msg)) => {
|
|
|
|
- if msg.state == 1 {
|
|
|
|
- Some((hex::encode(msg.r_hash), stream))
|
|
|
|
- } else {
|
|
|
|
|
|
+ let cancel_token = self.wait_invoice_cancel_token.clone();
|
|
|
|
+
|
|
|
|
+ Ok(futures::stream::unfold(
|
|
|
|
+ (
|
|
|
|
+ stream,
|
|
|
|
+ cancel_token,
|
|
|
|
+ Arc::clone(&self.wait_invoice_is_active),
|
|
|
|
+ ),
|
|
|
|
+ |(mut stream, cancel_token, is_active)| async move {
|
|
|
|
+ is_active.store(true, Ordering::SeqCst);
|
|
|
|
+
|
|
|
|
+ tokio::select! {
|
|
|
|
+ _ = cancel_token.cancelled() => {
|
|
|
|
+ // Stream is cancelled
|
|
|
|
+ is_active.store(false, Ordering::SeqCst);
|
|
|
|
+ tracing::info!("Waiting for lnd invoice ending");
|
|
|
|
+ return None;
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ msg = stream.message() => {
|
|
|
|
+
|
|
|
|
+ match msg {
|
|
|
|
+ Ok(Some(msg)) => {
|
|
|
|
+ if msg.state == 1 {
|
|
|
|
+ Some((hex::encode(msg.r_hash), (stream, cancel_token, is_active)))
|
|
|
|
+ } else {
|
|
|
|
+ None
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ Ok(None) => {
|
|
|
|
+ is_active.store(false, Ordering::SeqCst);
|
|
|
|
+ tracing::info!("LND invoice stream ended.");
|
|
None
|
|
None
|
|
|
|
+ }, // End of stream
|
|
|
|
+ Err(err) => {
|
|
|
|
+ is_active.store(false, Ordering::SeqCst);
|
|
|
|
+ tracing::warn!("Encounrdered error in LND invoice stream. Stream ending");
|
|
|
|
+ tracing::error!("{:?}", err);
|
|
|
|
+ None
|
|
|
|
+
|
|
|
|
+ }, // Handle errors gracefully, ends the stream on error
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- Ok(None) => None, // End of stream
|
|
|
|
- Err(_) => None, // Handle errors gracefully, ends the stream on error
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
|
|
+ },
|
|
|
|
+ )
|
|
.boxed())
|
|
.boxed())
|
|
}
|
|
}
|
|
|
|
|