Browse Source

feat: phd cancel invoice

thesimplekid 5 months ago
parent
commit
1e4a7af34f
2 changed files with 51 additions and 18 deletions
  1. 1 0
      crates/cdk-phoenixd/Cargo.toml
  2. 50 18
      crates/cdk-phoenixd/src/lib.rs

+ 1 - 0
crates/cdk-phoenixd/Cargo.toml

@@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false }
 cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] }
 futures = { version = "0.3.28", default-features = false }
 tokio = { version = "1", default-features = false }
+tokio-util = { version = "0.7.11", default-features = false }
 tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
 thiserror = "1"
 # phoenixd-rs = "0.3.0"

+ 50 - 18
crates/cdk-phoenixd/src/lib.rs

@@ -4,6 +4,7 @@
 #![warn(rustdoc::bare_urls)]
 
 use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 
 use anyhow::anyhow;
@@ -24,6 +25,7 @@ use futures::{Stream, StreamExt};
 use phoenixd_rs::webhooks::WebhookResponse;
 use phoenixd_rs::{InvoiceRequest, Phoenixd as PhoenixdApi};
 use tokio::sync::Mutex;
+use tokio_util::sync::CancellationToken;
 
 pub mod error;
 
@@ -36,6 +38,8 @@ pub struct Phoenixd {
     fee_reserve: FeeReserve,
     receiver: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<WebhookResponse>>>>,
     webhook_url: String,
+    wait_invoice_cancel_token: CancellationToken,
+    wait_invoice_is_active: Arc<AtomicBool>,
 }
 
 impl Phoenixd {
@@ -57,6 +61,8 @@ impl Phoenixd {
             fee_reserve,
             receiver,
             webhook_url,
+            wait_invoice_cancel_token: CancellationToken::new(),
+            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
         })
     }
 
@@ -87,11 +93,11 @@ impl MintLightning for Phoenixd {
     }
 
     fn is_wait_invoice_active(&self) -> bool {
-        todo!()
+        self.wait_invoice_is_active.load(Ordering::SeqCst)
     }
 
     fn cancel_wait_invoice(&self) {
-        todo!()
+        self.wait_invoice_cancel_token.cancel()
     }
 
     async fn wait_any_invoice(
@@ -106,29 +112,55 @@ impl MintLightning for Phoenixd {
 
         let phoenixd_api = self.phoenixd_api.clone();
 
+        let cancel_token = self.wait_invoice_cancel_token.clone();
+
         Ok(futures::stream::unfold(
-            (receiver, phoenixd_api),
-            |(mut receiver, phoenixd_api)| async move {
-                match receiver.recv().await {
-                    Some(msg) => {
-                        let check = phoenixd_api.get_incoming_invoice(&msg.payment_hash).await;
-
-                        match check {
-                            Ok(state) => {
-                                if state.is_paid {
-                                    Some((msg.payment_hash, (receiver, phoenixd_api)))
-                                } else {
+        (receiver, phoenixd_api, cancel_token,
+                Arc::clone(&self.wait_invoice_is_active),
+        ),
+        |(mut receiver, phoenixd_api, cancel_token, is_active)| async move {
+
+                is_active.store(true, Ordering::SeqCst);
+            tokio::select! {
+                _ = cancel_token.cancelled() => {
+                    // Stream is cancelled
+                    is_active.store(false, Ordering::SeqCst);
+                    tracing::info!("Waiting for phonixd invoice ending");
+                    return None;
+                }
+                msg_option = receiver.recv() => {
+                    match msg_option {
+                        Some(msg) => {
+                            let check = phoenixd_api.get_incoming_invoice(&msg.payment_hash).await;
+
+                            match check {
+                                Ok(state) => {
+                                    if state.is_paid {
+                                        // Yield the payment hash and continue the stream
+                                        Some((msg.payment_hash, (receiver, phoenixd_api, cancel_token, is_active)))
+                                    } else {
+                                        // Invoice not paid yet, continue waiting
+                                        // We need to continue the stream, so we return the same state
+                                        None
+                                    }
+                                }
+                                Err(e) => {
+                                    // Log the error and continue
+                                    tracing::warn!("Error checking invoice state: {:?}", e);
                                     None
                                 }
                             }
-                            _ => None,
+                        }
+                        None => {
+                            // The receiver stream has ended
+                            None
                         }
                     }
-                    None => None,
                 }
-            },
-        )
-        .boxed())
+            }
+        },
+    )
+    .boxed())
     }
 
     async fn get_payment_quote(