Przeglądaj źródła

feat(cdk-lnbits): add websocket reconnection with exponential backoff (#1237)

Implement automatic reconnection logic when LNbits websocket connection is lost, using exponential backoff strategy (1s to 30s max) with automatic resubscription
tsk 1 tydzień temu
rodzic
commit
ad4df779e3
2 zmienionych plików z 50 dodań i 16 usunięć
  1. 1 1
      crates/cdk-lnbits/Cargo.toml
  2. 49 15
      crates/cdk-lnbits/src/lib.rs

+ 1 - 1
crates/cdk-lnbits/Cargo.toml

@@ -20,6 +20,6 @@ tokio.workspace = true
 tokio-util.workspace = true
 tracing.workspace = true
 thiserror.workspace = true
-lnbits-rs = "0.8.0"
+lnbits-rs = "0.9.1"
 serde_json.workspace = true
 rustls.workspace = true

+ 49 - 15
crates/cdk-lnbits/src/lib.rs

@@ -163,23 +163,57 @@ impl MintPayment for LNbits {
         let is_active = Arc::clone(&self.wait_invoice_is_active);
 
         Ok(Box::pin(futures::stream::unfold(
-            (api, cancel_token, is_active),
-            |(api, cancel_token, is_active)| async move {
+            (api, cancel_token, is_active, 0u32),
+            |(api, cancel_token, is_active, mut retry_count)| async move {
                 is_active.store(true, Ordering::SeqCst);
 
-                let receiver = api.receiver();
-                let mut receiver = receiver.lock().await;
-
-                tokio::select! {
-                    _ = cancel_token.cancelled() => {
-                        is_active.store(false, Ordering::SeqCst);
-                        tracing::info!("Waiting for lnbits invoice ending");
-                        None
-                    }
-                    msg_option = receiver.recv() => {
-                        Self::process_message(msg_option, &api, &is_active)
-                            .await
-                            .map(|response| (Event::PaymentReceived(response), (api, cancel_token, is_active)))
+                loop {
+                    tracing::debug!("LNbits: Starting wait loop, attempting to get receiver");
+                    let receiver = api.receiver();
+                    let mut receiver = receiver.lock().await;
+                    tracing::debug!("LNbits: Got receiver lock, waiting for messages");
+
+                    tokio::select! {
+                        _ = cancel_token.cancelled() => {
+                            is_active.store(false, Ordering::SeqCst);
+                            tracing::info!("Waiting for lnbits invoice ending");
+                            return None;
+                        }
+                        msg_option = receiver.recv() => {
+                            tracing::debug!("LNbits: Received message from websocket: {:?}", msg_option.as_ref().map(|_| "Some(message)"));
+                            match msg_option {
+                                Some(_) => {
+                                    // Successfully received a message, reset retry count
+                                    retry_count = 0;
+                                    let result = Self::process_message(msg_option, &api, &is_active).await;
+                                    return result.map(|response| {
+                                        (Event::PaymentReceived(response), (api, cancel_token, is_active, retry_count))
+                                    });
+                                }
+                                None => {
+                                    // Connection lost, need to reconnect
+                                    drop(receiver); // Drop the lock before reconnecting
+
+                                    tracing::warn!("LNbits websocket connection lost (receiver returned None), attempting to reconnect...");
+
+                                    // Exponential backoff: 1s, 2s, 4s, 8s, max 10s
+                                    let backoff_secs = std::cmp::min(2u64.pow(retry_count), 10);
+                                    tracing::info!("Retrying in {} seconds (attempt {})", backoff_secs, retry_count + 1);
+                                    tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
+
+                                    // Attempt to resubscribe
+                                    if let Err(err) = api.subscribe_to_websocket().await {
+                                        tracing::error!("Failed to resubscribe to LNbits websocket: {:?}", err);
+                                    } else {
+                                        tracing::info!("Successfully reconnected to LNbits websocket");
+                                    }
+
+                                    retry_count += 1;
+                                    // Continue the loop to try again
+                                    continue;
+                                }
+                            }
+                        }
                     }
                 }
             },