| 
					
				 | 
			
			
				@@ -18,6 +18,7 @@ use async_trait::async_trait; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::amount::{to_unit, Amount, MSAT_IN_SAT}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::bitcoin::hashes::Hash; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::common::FeeReserve; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+use cdk_common::database::mint::DynMintKVStore; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use cdk_common::payment::{ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -42,6 +43,12 @@ pub(crate) use proto::{lnrpc, routerrpc}; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 use crate::lnrpc::invoice::InvoiceState; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/// LND KV Store constants 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+const LND_KV_PRIMARY_NAMESPACE: &str = "cdk_lnd_lightning_backend"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+const LND_KV_SECONDARY_NAMESPACE: &str = "payment_indices"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+const LAST_ADD_INDEX_KV_KEY: &str = "last_add_index"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+const LAST_SETTLE_INDEX_KV_KEY: &str = "last_settle_index"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 /// Lnd mint backend 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #[derive(Clone)] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 pub struct Lnd { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -50,6 +57,7 @@ pub struct Lnd { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     _macaroon_file: PathBuf, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     lnd_client: client::Client, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     fee_reserve: FeeReserve, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    kv_store: DynMintKVStore, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     wait_invoice_cancel_token: CancellationToken, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     wait_invoice_is_active: Arc<AtomicBool>, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     settings: Bolt11Settings, 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -65,6 +73,7 @@ impl Lnd { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         cert_file: PathBuf, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         macaroon_file: PathBuf, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         fee_reserve: FeeReserve, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        kv_store: DynMintKVStore, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     ) -> Result<Self, Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // Validate address is not empty 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if address.is_empty() { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -104,6 +113,7 @@ impl Lnd { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             _macaroon_file: macaroon_file, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             lnd_client, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             fee_reserve, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            kv_store, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             wait_invoice_cancel_token: CancellationToken::new(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             wait_invoice_is_active: Arc::new(AtomicBool::new(false)), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             settings: Bolt11Settings { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -115,6 +125,55 @@ impl Lnd { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             }, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    /// Get last add and settle indices from KV store 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #[instrument(skip_all)] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    async fn get_last_indices(&self) -> Result<(Option<u64>, Option<u64>), Error> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let add_index = if let Some(stored_index) = self 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .kv_store 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .kv_read( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                LND_KV_PRIMARY_NAMESPACE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                LND_KV_SECONDARY_NAMESPACE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                LAST_ADD_INDEX_KV_KEY, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .map_err(|e| Error::Database(e.to_string()))? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                index_str.parse::<u64>().ok() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let settle_index = if let Some(stored_index) = self 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .kv_store 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .kv_read( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                LND_KV_PRIMARY_NAMESPACE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                LND_KV_SECONDARY_NAMESPACE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                LAST_SETTLE_INDEX_KV_KEY, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .await 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            .map_err(|e| Error::Database(e.to_string()))? 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                index_str.parse::<u64>().ok() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tracing::debug!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "LND: Retrieved last indices from KV store - add_index: {:?}, settle_index: {:?}", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            add_index, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            settle_index 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Ok((add_index, settle_index)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #[async_trait] 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -142,11 +201,21 @@ impl MintPayment for Lnd { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     ) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         let mut lnd_client = self.lnd_client.clone(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // Get last indices from KV store 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let (last_add_index, last_settle_index) = 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            self.get_last_indices().await.unwrap_or((None, None)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         let stream_req = lnrpc::InvoiceSubscription { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            add_index: 0, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            settle_index: 0, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            add_index: last_add_index.unwrap_or(0), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            settle_index: last_settle_index.unwrap_or(0), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        tracing::debug!( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "LND: Starting invoice subscription with add_index: {}, settle_index: {}", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            stream_req.add_index, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            stream_req.settle_index 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         let stream = lnd_client 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             .lightning() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             .subscribe_invoices(stream_req) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -158,68 +227,119 @@ impl MintPayment for Lnd { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             .into_inner(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         let cancel_token = self.wait_invoice_cancel_token.clone(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let kv_store = self.kv_store.clone(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Ok(futures::stream::unfold( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let event_stream = futures::stream::unfold( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             ( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 stream, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 cancel_token, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 Arc::clone(&self.wait_invoice_is_active), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                kv_store, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                last_add_index.unwrap_or(0), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                last_settle_index.unwrap_or(0), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             ), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            |(mut stream, cancel_token, is_active)| async move { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            |( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                mut stream, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                cancel_token, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                is_active, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                kv_store, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                mut current_add_index, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                mut current_settle_index, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            )| async move { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 is_active.store(true, Ordering::SeqCst); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                tokio::select! { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    _ = cancel_token.cancelled() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    // Stream is cancelled 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    is_active.store(false, Ordering::SeqCst); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    tracing::info!("Waiting for lnd invoice ending"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    msg = stream.message() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                loop { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    tokio::select! { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        _ = cancel_token.cancelled() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            // Stream is cancelled 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            is_active.store(false, Ordering::SeqCst); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            tracing::info!("Waiting for lnd invoice ending"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            return None; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        msg = stream.message() => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            match msg { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                Ok(Some(msg)) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    // Update indices based on the message 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    current_add_index = current_add_index.max(msg.add_index); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    current_settle_index = current_settle_index.max(msg.settle_index); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    // Store the updated indices in KV store regardless of settlement status 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    let add_index_str = current_add_index.to_string(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    let settle_index_str = current_settle_index.to_string(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    if let Ok(mut tx) = kv_store.begin_transaction().await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        let mut has_error = false; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_ADD_INDEX_KV_KEY, add_index_str.as_bytes()).await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            tracing::warn!("LND: Failed to write add_index {} to KV store: {}", current_add_index, e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            has_error = true; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_SETTLE_INDEX_KV_KEY, settle_index_str.as_bytes()).await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            tracing::warn!("LND: Failed to write settle_index {} to KV store: {}", current_settle_index, e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            has_error = true; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        if !has_error { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            if let Err(e) = tx.commit().await { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                tracing::warn!("LND: Failed to commit indices to KV store: {}", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                tracing::debug!("LND: Stored updated indices - add_index: {}, settle_index: {}", current_add_index, current_settle_index); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        tracing::warn!("LND: Failed to begin KV transaction for storing indices"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                match msg { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    Ok(Some(msg)) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        if msg.state() == InvoiceState::Settled { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    // Only emit event for settled invoices 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    if msg.state() == InvoiceState::Settled { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        let hash_slice: Result<[u8;32], _> = msg.r_hash.try_into(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            let hash_slice: Result<[u8;32], _> = msg.r_hash.try_into(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        if let Ok(hash_slice) = hash_slice { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            let hash = hex::encode(hash_slice); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            if let Ok(hash_slice) = hash_slice { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            let hash = hex::encode(hash_slice); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            tracing::info!("LND: Payment for {} with amount {} msat", hash,  msg.amt_paid_msat); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                              tracing::info!("LND: Processing payment with hash: {}", hash); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                             let wait_response = WaitPaymentResponse { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                                payment_identifier: PaymentIdentifier::PaymentHash(hash_slice), payment_amount: Amount::from(msg.amt_paid_msat as u64), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                payment_identifier: PaymentIdentifier::PaymentHash(hash_slice), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                payment_amount: Amount::from(msg.amt_paid_msat as u64), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                                 unit: CurrencyUnit::Msat, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                                 payment_id: hash, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                             }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                            tracing::info!("LND: Created WaitPaymentResponse with amount {} msat",  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                                         msg.amt_paid_msat); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                             let event = Event::PaymentReceived(wait_response); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                            Some((event, (stream, cancel_token, is_active))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            }  else { None } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            return Some((event, (stream, cancel_token, is_active, kv_store, current_add_index, current_settle_index))); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            // Invalid hash, skip this message but continue streaming 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            tracing::error!("LND returned invalid payment hash"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            // Continue the loop without yielding 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            continue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        // Not a settled invoice, continue but don't emit event 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        tracing::debug!("LND: Received non-settled invoice, continuing to wait for settled invoices"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        // Continue the loop without yielding 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                        continue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                Ok(None) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    is_active.store(false, Ordering::SeqCst); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    tracing::info!("LND invoice stream ended."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    return None; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                Err(err) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    is_active.store(false, Ordering::SeqCst); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    tracing::warn!("Encountered error in LND invoice stream. Stream ending"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    tracing::error!("{:?}", err); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                    return None; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    Ok(None) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    is_active.store(false, Ordering::SeqCst); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    tracing::info!("LND invoice stream ended."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    }, // End of stream 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    Err(err) => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    is_active.store(false, Ordering::SeqCst); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    tracing::warn!("Encountered error in LND invoice stream. Stream ending"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    tracing::error!("{:?}", err); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    },   // Handle errors gracefully, ends the stream on error 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             }, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        .boxed()) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Ok(Box::pin(event_stream)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     #[instrument(skip_all)] 
			 |