Browse Source

feat: lnbits and strike kill

thesimplekid 5 months ago
parent
commit
c82e6bdcab

+ 1 - 0
crates/cdk-lnbits/Cargo.toml

@@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false }
 cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] }
 futures = { version = "0.3.28", default-features = false }
 tokio = { version = "1", default-features = false }
+tokio-util = { version = "0.7.11", default-features = false }
 tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
 thiserror = "1"
 # lnbits-rs = "0.2.0"

+ 46 - 16
crates/cdk-lnbits/src/lib.rs

@@ -4,6 +4,7 @@
 #![warn(rustdoc::bare_urls)]
 
 use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 
 use anyhow::anyhow;
@@ -26,6 +27,7 @@ use futures::Stream;
 use lnbits_rs::api::invoice::CreateInvoiceRequest;
 use lnbits_rs::LNBitsClient;
 use tokio::sync::Mutex;
+use tokio_util::sync::CancellationToken;
 
 pub mod error;
 
@@ -38,6 +40,8 @@ pub struct LNbits {
     fee_reserve: FeeReserve,
     receiver: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<String>>>>,
     webhook_url: String,
+    wait_invoice_cancel_token: CancellationToken,
+    wait_invoice_is_active: Arc<AtomicBool>,
 }
 
 impl LNbits {
@@ -62,6 +66,8 @@ impl LNbits {
             receiver,
             fee_reserve,
             webhook_url,
+            wait_invoice_cancel_token: CancellationToken::new(),
+            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
         })
     }
 }
@@ -81,13 +87,14 @@ impl MintLightning for LNbits {
     }
 
     fn is_wait_invoice_active(&self) -> bool {
-        todo!()
+        self.wait_invoice_is_active.load(Ordering::SeqCst)
     }
 
     fn cancel_wait_invoice(&self) {
-        todo!()
+        self.wait_invoice_cancel_token.cancel()
     }
 
+    #[allow(clippy::incompatible_msrv)]
     async fn wait_any_invoice(
         &self,
     ) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>, Self::Err> {
@@ -100,25 +107,48 @@ impl MintLightning for LNbits {
 
         let lnbits_api = self.lnbits_api.clone();
 
+        let cancel_token = self.wait_invoice_cancel_token.clone();
+
         Ok(futures::stream::unfold(
-            (receiver, lnbits_api),
-            |(mut receiver, lnbits_api)| async move {
-                match receiver.recv().await {
-                    Some(msg) => {
-                        let check = lnbits_api.is_invoice_paid(&msg).await;
-
-                        match check {
-                            Ok(state) => {
-                                if state {
-                                    Some((msg, (receiver, lnbits_api)))
-                                } else {
-                                    None
+            (
+                receiver,
+                lnbits_api,
+                cancel_token,
+                Arc::clone(&self.wait_invoice_is_active),
+            ),
+            |(mut receiver, lnbits_api, cancel_token, is_active)| async move {
+                is_active.store(true, Ordering::SeqCst);
+
+                tokio::select! {
+                    _ = cancel_token.cancelled() => {
+                        // Stream is cancelled
+                        is_active.store(false, Ordering::SeqCst);
+                        tracing::info!("Waiting for phonixd invoice ending");
+                        None
+                    }
+                    msg_option = receiver.recv() => {
+                    match msg_option {
+                        Some(msg) => {
+                            let check = lnbits_api.is_invoice_paid(&msg).await;
+
+                            match check {
+                                Ok(state) => {
+                                    if state {
+                                        Some((msg, (receiver, lnbits_api, cancel_token, is_active)))
+                                    } else {
+                                        None
+                                    }
                                 }
+                                _ => None,
                             }
-                            _ => None,
                         }
+                        None => {
+                            is_active.store(true, Ordering::SeqCst);
+                            None
+                        },
+                    }
+
                     }
-                    None => None,
                 }
             },
         )

+ 1 - 0
crates/cdk-strike/Cargo.toml

@@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false }
 cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] }
 futures = { version = "0.3.28", default-features = false }
 tokio = { version = "1", default-features = false }
+tokio-util = { version = "0.7.11", default-features = false }
 tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
 thiserror = "1"
 uuid = { version = "1", features = ["v4"] }

+ 32 - 6
crates/cdk-strike/src/lib.rs

@@ -4,6 +4,7 @@
 #![warn(rustdoc::bare_urls)]
 
 use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 
 use anyhow::{anyhow, bail};
@@ -27,6 +28,7 @@ use strike_rs::{
     PayInvoiceQuoteRequest, Strike as StrikeApi,
 };
 use tokio::sync::Mutex;
+use tokio_util::sync::CancellationToken;
 use uuid::Uuid;
 
 pub mod error;
@@ -40,6 +42,8 @@ pub struct Strike {
     unit: CurrencyUnit,
     receiver: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<String>>>>,
     webhook_url: String,
+    wait_invoice_cancel_token: CancellationToken,
+    wait_invoice_is_active: Arc<AtomicBool>,
 }
 
 impl Strike {
@@ -60,6 +64,8 @@ impl Strike {
             receiver,
             unit,
             webhook_url,
+            wait_invoice_cancel_token: CancellationToken::new(),
+            wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
         })
     }
 }
@@ -79,13 +85,14 @@ impl MintLightning for Strike {
     }
 
     fn is_wait_invoice_active(&self) -> bool {
-        todo!()
+        self.wait_invoice_is_active.load(Ordering::SeqCst)
     }
 
     fn cancel_wait_invoice(&self) {
-        todo!()
+        self.wait_invoice_cancel_token.cancel()
     }
 
+    #[allow(clippy::incompatible_msrv)]
     async fn wait_any_invoice(
         &self,
     ) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>, Self::Err> {
@@ -101,18 +108,34 @@ impl MintLightning for Strike {
             .ok_or(anyhow!("No receiver"))?;
 
         let strike_api = self.strike_api.clone();
+        let cancel_token = self.wait_invoice_cancel_token.clone();
 
         Ok(futures::stream::unfold(
-            (receiver, strike_api),
-            |(mut receiver, strike_api)| async move {
-                match receiver.recv().await {
+            (
+                receiver,
+                strike_api,
+                cancel_token,
+                Arc::clone(&self.wait_invoice_is_active),
+            ),
+            |(mut receiver, strike_api, cancel_token, is_active)| async move {
+                tokio::select! {
+
+                    _ = cancel_token.cancelled() => {
+                        // Stream is cancelled
+                        is_active.store(false, Ordering::SeqCst);
+                        tracing::info!("Waiting for phonixd invoice ending");
+                        None
+                    }
+
+                    msg_option = receiver.recv() => {
+                match msg_option {
                     Some(msg) => {
                         let check = strike_api.get_incoming_invoice(&msg).await;
 
                         match check {
                             Ok(state) => {
                                 if state.state == InvoiceState::Paid {
-                                    Some((msg, (receiver, strike_api)))
+                                    Some((msg, (receiver, strike_api, cancel_token, is_active)))
                                 } else {
                                     None
                                 }
@@ -122,6 +145,9 @@ impl MintLightning for Strike {
                     }
                     None => None,
                 }
+
+                    }
+                }
             },
         )
         .boxed())