lib.rs 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. //! CDK lightning backend for ldk-node
  2. #![doc = include_str!("../README.md")]
  3. use std::net::SocketAddr;
  4. use std::pin::Pin;
  5. use std::sync::atomic::{AtomicBool, Ordering};
  6. use std::sync::Arc;
  7. use async_trait::async_trait;
  8. use cdk_common::common::FeeReserve;
  9. use cdk_common::payment::{self, *};
  10. use cdk_common::util::{hex, unix_time};
  11. use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
  12. use futures::{Stream, StreamExt};
  13. use ldk_node::bitcoin::hashes::Hash;
  14. use ldk_node::bitcoin::Network;
  15. use ldk_node::lightning::ln::channelmanager::PaymentId;
  16. use ldk_node::lightning::ln::msgs::SocketAddress;
  17. use ldk_node::lightning::routing::router::RouteParametersConfig;
  18. use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
  19. use ldk_node::lightning_types::payment::PaymentHash;
  20. use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
  21. use ldk_node::{Builder, Event, Node};
  22. use tokio_stream::wrappers::BroadcastStream;
  23. use tokio_util::sync::CancellationToken;
  24. use tracing::instrument;
  25. use crate::error::Error;
  26. mod error;
  27. mod web;
  28. /// CDK Lightning backend using LDK Node
  29. ///
  30. /// Provides Lightning Network functionality for CDK with support for Cashu operations.
  31. /// Handles payment creation, processing, and event management using the Lightning Development Kit.
  32. #[derive(Clone)]
  33. pub struct CdkLdkNode {
  34. inner: Arc<Node>,
  35. fee_reserve: FeeReserve,
  36. wait_invoice_cancel_token: CancellationToken,
  37. wait_invoice_is_active: Arc<AtomicBool>,
  38. sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
  39. receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
  40. events_cancel_token: CancellationToken,
  41. web_addr: Option<SocketAddr>,
  42. }
  43. impl std::fmt::Debug for CdkLdkNode {
  44. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  45. f.debug_struct("CdkLdkNode")
  46. .field("fee_reserve", &self.fee_reserve)
  47. .field("web_addr", &self.web_addr)
  48. .finish_non_exhaustive()
  49. }
  50. }
  51. /// Configuration for connecting to Bitcoin RPC
  52. ///
  53. /// Contains the necessary connection parameters for Bitcoin Core RPC interface.
  54. #[derive(Debug, Clone)]
  55. pub struct BitcoinRpcConfig {
  56. /// Bitcoin RPC server hostname or IP address
  57. pub host: String,
  58. /// Bitcoin RPC server port number
  59. pub port: u16,
  60. /// Username for Bitcoin RPC authentication
  61. pub user: String,
  62. /// Password for Bitcoin RPC authentication
  63. pub password: String,
  64. }
  65. /// Source of blockchain data for the Lightning node
  66. ///
  67. /// Specifies how the node should connect to the Bitcoin network to retrieve
  68. /// blockchain information and broadcast transactions.
  69. #[derive(Debug, Clone)]
  70. pub enum ChainSource {
  71. /// Use an Esplora server for blockchain data
  72. ///
  73. /// Contains the URL of the Esplora server endpoint
  74. Esplora(String),
  75. /// Use Bitcoin Core RPC for blockchain data
  76. ///
  77. /// Contains the configuration for connecting to Bitcoin Core
  78. BitcoinRpc(BitcoinRpcConfig),
  79. }
  80. /// Source of Lightning network gossip data
  81. ///
  82. /// Specifies how the node should learn about the Lightning Network topology
  83. /// and routing information.
  84. #[derive(Debug, Clone)]
  85. pub enum GossipSource {
  86. /// Learn gossip through peer-to-peer connections
  87. ///
  88. /// The node will connect to other Lightning nodes and exchange gossip data directly
  89. P2P,
  90. /// Use Rapid Gossip Sync for efficient gossip updates
  91. ///
  92. /// Contains the URL of the RGS server for compressed gossip data
  93. RapidGossipSync(String),
  94. }
  95. impl CdkLdkNode {
  96. /// Create a new CDK LDK Node instance
  97. ///
  98. /// # Arguments
  99. /// * `network` - Bitcoin network (mainnet, testnet, regtest, signet)
  100. /// * `chain_source` - Source of blockchain data (Esplora or Bitcoin RPC)
  101. /// * `gossip_source` - Source of Lightning network gossip data
  102. /// * `storage_dir_path` - Directory path for node data storage
  103. /// * `fee_reserve` - Fee reserve configuration for payments
  104. /// * `listening_address` - Socket addresses for peer connections
  105. /// * `runtime` - Optional Tokio runtime to use for starting the node
  106. ///
  107. /// # Returns
  108. /// A new `CdkLdkNode` instance ready to be started
  109. ///
  110. /// # Errors
  111. /// Returns an error if the LDK node builder fails to create the node
  112. pub fn new(
  113. network: Network,
  114. chain_source: ChainSource,
  115. gossip_source: GossipSource,
  116. storage_dir_path: String,
  117. fee_reserve: FeeReserve,
  118. listening_address: Vec<SocketAddress>,
  119. ) -> Result<Self, Error> {
  120. let mut builder = Builder::new();
  121. builder.set_network(network);
  122. tracing::info!("Storage dir of node is {}", storage_dir_path);
  123. builder.set_storage_dir_path(storage_dir_path);
  124. match chain_source {
  125. ChainSource::Esplora(esplora_url) => {
  126. builder.set_chain_source_esplora(esplora_url, None);
  127. }
  128. ChainSource::BitcoinRpc(BitcoinRpcConfig {
  129. host,
  130. port,
  131. user,
  132. password,
  133. }) => {
  134. builder.set_chain_source_bitcoind_rpc(host, port, user, password);
  135. }
  136. }
  137. match gossip_source {
  138. GossipSource::P2P => {
  139. builder.set_gossip_source_p2p();
  140. }
  141. GossipSource::RapidGossipSync(rgs_url) => {
  142. builder.set_gossip_source_rgs(rgs_url);
  143. }
  144. }
  145. builder.set_listening_addresses(listening_address)?;
  146. builder.set_node_alias("cdk-ldk-node".to_string())?;
  147. let node = builder.build()?;
  148. tracing::info!("Creating tokio channel for payment notifications");
  149. let (sender, receiver) = tokio::sync::broadcast::channel(8);
  150. let id = node.node_id();
  151. let adr = node.announcement_addresses();
  152. tracing::info!(
  153. "Created node {} with address {:?} on network {}",
  154. id,
  155. adr,
  156. network
  157. );
  158. Ok(Self {
  159. inner: node.into(),
  160. fee_reserve,
  161. wait_invoice_cancel_token: CancellationToken::new(),
  162. wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
  163. sender,
  164. receiver: Arc::new(receiver),
  165. events_cancel_token: CancellationToken::new(),
  166. web_addr: None,
  167. })
  168. }
  169. /// Set the web server address for the LDK node management interface
  170. ///
  171. /// # Arguments
  172. /// * `addr` - Socket address for the web server. If None, no web server will be started.
  173. pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
  174. self.web_addr = addr;
  175. }
  176. /// Get a default web server address using an unused port
  177. ///
  178. /// Returns a SocketAddr with localhost and port 0, which will cause
  179. /// the system to automatically assign an available port
  180. pub fn default_web_addr() -> SocketAddr {
  181. SocketAddr::from(([127, 0, 0, 1], 8091))
  182. }
  183. /// Start the CDK LDK Node
  184. ///
  185. /// Starts the underlying LDK node and begins event processing.
  186. /// Sets up event handlers to listen for Lightning events like payment received.
  187. ///
  188. /// # Returns
  189. /// Returns `Ok(())` on successful start, error otherwise
  190. ///
  191. /// # Errors
  192. /// Returns an error if the LDK node fails to start or event handling setup fails
  193. pub fn start_ldk_node(&self) -> Result<(), Error> {
  194. tracing::info!("Starting cdk-ldk node");
  195. self.inner.start()?;
  196. let node_config = self.inner.config();
  197. tracing::info!("Starting node with network {}", node_config.network);
  198. tracing::info!("Node status: {:?}", self.inner.status());
  199. self.handle_events()?;
  200. Ok(())
  201. }
  202. /// Start the web server for the LDK node management interface
  203. ///
  204. /// Starts a web server that provides a user interface for managing the LDK node.
  205. /// The web interface allows users to view balances, manage channels, create invoices,
  206. /// and send payments.
  207. ///
  208. /// # Arguments
  209. /// * `web_addr` - The socket address to bind the web server to
  210. ///
  211. /// # Returns
  212. /// Returns `Ok(())` on successful start, error otherwise
  213. ///
  214. /// # Errors
  215. /// Returns an error if the web server fails to start
  216. pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
  217. let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
  218. tokio::spawn(async move {
  219. if let Err(e) = web_server.serve(web_addr).await {
  220. tracing::error!("Web server error: {}", e);
  221. }
  222. });
  223. Ok(())
  224. }
  225. /// Stop the CDK LDK Node
  226. ///
  227. /// Gracefully stops the node by cancelling all active tasks and event handlers.
  228. /// This includes:
  229. /// - Cancelling the event handler task
  230. /// - Cancelling any active wait_invoice streams
  231. /// - Stopping the underlying LDK node
  232. ///
  233. /// # Returns
  234. /// Returns `Ok(())` on successful shutdown, error otherwise
  235. ///
  236. /// # Errors
  237. /// Returns an error if the underlying LDK node fails to stop
  238. pub fn stop_ldk_node(&self) -> Result<(), Error> {
  239. tracing::info!("Stopping CdkLdkNode");
  240. // Cancel all tokio tasks
  241. tracing::info!("Cancelling event handler");
  242. self.events_cancel_token.cancel();
  243. // Cancel any wait_invoice streams
  244. if self.is_wait_invoice_active() {
  245. tracing::info!("Cancelling wait_invoice stream");
  246. self.wait_invoice_cancel_token.cancel();
  247. }
  248. // Stop the LDK node
  249. tracing::info!("Stopping LDK node");
  250. self.inner.stop()?;
  251. tracing::info!("CdkLdkNode stopped successfully");
  252. Ok(())
  253. }
  254. /// Handle payment received event
  255. async fn handle_payment_received(
  256. node: &Arc<Node>,
  257. sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
  258. payment_id: Option<PaymentId>,
  259. payment_hash: PaymentHash,
  260. amount_msat: u64,
  261. ) {
  262. tracing::info!(
  263. "Received payment for hash={} of amount={} msat",
  264. payment_hash,
  265. amount_msat
  266. );
  267. let payment_id = match payment_id {
  268. Some(id) => id,
  269. None => {
  270. tracing::warn!("Received payment without payment_id");
  271. return;
  272. }
  273. };
  274. let payment_id_hex = hex::encode(payment_id.0);
  275. if amount_msat == 0 {
  276. tracing::warn!("Payment of no amount");
  277. return;
  278. }
  279. tracing::info!(
  280. "Processing payment notification: id={}, amount={} msats",
  281. payment_id_hex,
  282. amount_msat
  283. );
  284. let payment_details = match node.payment(&payment_id) {
  285. Some(details) => details,
  286. None => {
  287. tracing::error!("Could not find payment details for id={}", payment_id_hex);
  288. return;
  289. }
  290. };
  291. let (payment_identifier, payment_id) = match payment_details.kind {
  292. PaymentKind::Bolt11 { hash, .. } => {
  293. (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
  294. }
  295. PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
  296. Some(h) => (
  297. PaymentIdentifier::OfferId(offer_id.to_string()),
  298. h.to_string(),
  299. ),
  300. None => {
  301. tracing::error!("Bolt12 payment missing hash");
  302. return;
  303. }
  304. },
  305. k => {
  306. tracing::warn!("Received payment of kind {:?} which is not supported", k);
  307. return;
  308. }
  309. };
  310. let wait_payment_response = WaitPaymentResponse {
  311. payment_identifier,
  312. payment_amount: Amount::new(amount_msat, CurrencyUnit::Msat),
  313. payment_id,
  314. };
  315. match sender.send(wait_payment_response) {
  316. Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
  317. Err(err) => tracing::error!(
  318. "Could not send payment received notification on channel: {}",
  319. err
  320. ),
  321. }
  322. }
  323. /// Set up event handling for the node
  324. pub fn handle_events(&self) -> Result<(), Error> {
  325. let node = self.inner.clone();
  326. let sender = self.sender.clone();
  327. let cancel_token = self.events_cancel_token.clone();
  328. tracing::info!("Starting event handler task");
  329. tokio::spawn(async move {
  330. tracing::info!("Event handler loop started");
  331. loop {
  332. tokio::select! {
  333. _ = cancel_token.cancelled() => {
  334. tracing::info!("Event handler cancelled");
  335. break;
  336. }
  337. event = node.next_event_async() => {
  338. match event {
  339. Event::PaymentReceived {
  340. payment_id,
  341. payment_hash,
  342. amount_msat,
  343. custom_records: _
  344. } => {
  345. Self::handle_payment_received(
  346. &node,
  347. &sender,
  348. payment_id,
  349. payment_hash,
  350. amount_msat
  351. ).await;
  352. }
  353. event => {
  354. tracing::debug!("Received other ldk node event: {:?}", event);
  355. }
  356. }
  357. if let Err(err) = node.event_handled() {
  358. tracing::error!("Error handling node event: {}", err);
  359. } else {
  360. tracing::debug!("Successfully handled node event");
  361. }
  362. }
  363. }
  364. }
  365. tracing::info!("Event handler loop terminated");
  366. });
  367. tracing::info!("Event handler task spawned");
  368. Ok(())
  369. }
  370. /// Get Node used
  371. pub fn node(&self) -> Arc<Node> {
  372. Arc::clone(&self.inner)
  373. }
  374. }
  375. /// Mint payment trait
  376. #[async_trait]
  377. impl MintPayment for CdkLdkNode {
  378. type Err = payment::Error;
  379. /// Start the payment processor
  380. /// Starts the LDK node and begins event processing
  381. async fn start(&self) -> Result<(), Self::Err> {
  382. self.start_ldk_node().map_err(|e| {
  383. tracing::error!("Failed to start CdkLdkNode: {}", e);
  384. e
  385. })?;
  386. tracing::info!("CdkLdkNode payment processor started successfully");
  387. // Start web server if configured
  388. if let Some(web_addr) = self.web_addr {
  389. tracing::info!("Starting LDK Node web interface on {}", web_addr);
  390. self.start_web_server(web_addr).map_err(|e| {
  391. tracing::error!("Failed to start web server: {}", e);
  392. e
  393. })?;
  394. } else {
  395. tracing::info!("No web server address configured, skipping web interface");
  396. }
  397. Ok(())
  398. }
  399. /// Stop the payment processor
  400. /// Gracefully stops the LDK node and cancels all background tasks
  401. async fn stop(&self) -> Result<(), Self::Err> {
  402. self.stop_ldk_node().map_err(|e| {
  403. tracing::error!("Failed to stop CdkLdkNode: {}", e);
  404. e.into()
  405. })
  406. }
  407. /// Base Settings
  408. async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
  409. let settings = SettingsResponse {
  410. unit: CurrencyUnit::Msat.to_string(),
  411. bolt11: Some(payment::Bolt11Settings {
  412. mpp: false,
  413. amountless: true,
  414. invoice_description: true,
  415. }),
  416. bolt12: Some(payment::Bolt12Settings { amountless: true }),
  417. custom: std::collections::HashMap::new(),
  418. };
  419. Ok(settings)
  420. }
  421. /// Create a new invoice
  422. #[instrument(skip(self))]
  423. async fn create_incoming_payment_request(
  424. &self,
  425. unit: &CurrencyUnit,
  426. options: IncomingPaymentOptions,
  427. ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
  428. match options {
  429. IncomingPaymentOptions::Bolt11(bolt11_options) => {
  430. let amount_msat: Amount = Amount::new(bolt11_options.amount.into(), unit.clone())
  431. .convert_to(&CurrencyUnit::Msat)?
  432. .into();
  433. let description = bolt11_options.description.unwrap_or_default();
  434. let time = bolt11_options
  435. .unix_expiry
  436. .map(|t| t - unix_time())
  437. .unwrap_or(36000);
  438. let description = Bolt11InvoiceDescription::Direct(
  439. Description::new(description).map_err(|_| Error::InvalidDescription)?,
  440. );
  441. let payment = self
  442. .inner
  443. .bolt11_payment()
  444. .receive(amount_msat.into(), &description, time as u32)
  445. .map_err(Error::LdkNode)?;
  446. let payment_hash = payment.payment_hash().to_string();
  447. let payment_identifier = PaymentIdentifier::PaymentHash(
  448. hex::decode(&payment_hash)?
  449. .try_into()
  450. .map_err(|_| Error::InvalidPaymentHashLength)?,
  451. );
  452. Ok(CreateIncomingPaymentResponse {
  453. request_lookup_id: payment_identifier,
  454. request: payment.to_string(),
  455. expiry: Some(unix_time() + time),
  456. extra_json: None,
  457. })
  458. }
  459. IncomingPaymentOptions::Bolt12(bolt12_options) => {
  460. let Bolt12IncomingPaymentOptions {
  461. description,
  462. amount,
  463. unix_expiry,
  464. } = *bolt12_options;
  465. let time = unix_expiry.map(|t| (t - unix_time()) as u32);
  466. let offer = match amount {
  467. Some(amount) => {
  468. let amount_msat: Amount = Amount::new(amount.into(), unit.clone())
  469. .convert_to(&CurrencyUnit::Msat)?
  470. .into();
  471. self.inner
  472. .bolt12_payment()
  473. .receive(
  474. amount_msat.into(),
  475. &description.unwrap_or("".to_string()),
  476. time,
  477. None,
  478. )
  479. .map_err(Error::LdkNode)?
  480. }
  481. None => self
  482. .inner
  483. .bolt12_payment()
  484. .receive_variable_amount(&description.unwrap_or("".to_string()), time)
  485. .map_err(Error::LdkNode)?,
  486. };
  487. let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
  488. Ok(CreateIncomingPaymentResponse {
  489. request_lookup_id: payment_identifier,
  490. request: offer.to_string(),
  491. expiry: time.map(|a| a as u64),
  492. extra_json: None,
  493. })
  494. }
  495. cdk_common::payment::IncomingPaymentOptions::Custom(_) => {
  496. Err(cdk_common::payment::Error::UnsupportedPaymentOption)
  497. }
  498. }
  499. }
  500. /// Get payment quote
  501. /// Used to get fee and amount required for a payment request
  502. #[instrument(skip_all)]
  503. async fn get_payment_quote(
  504. &self,
  505. unit: &CurrencyUnit,
  506. options: OutgoingPaymentOptions,
  507. ) -> Result<PaymentQuoteResponse, Self::Err> {
  508. match options {
  509. cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
  510. Err(cdk_common::payment::Error::UnsupportedPaymentOption)
  511. }
  512. OutgoingPaymentOptions::Bolt11(bolt11_options) => {
  513. let bolt11 = bolt11_options.bolt11;
  514. let amount_msat = match bolt11_options.melt_options {
  515. Some(melt_options) => melt_options.amount_msat(),
  516. None => bolt11
  517. .amount_milli_satoshis()
  518. .ok_or(Error::UnknownInvoiceAmount)?
  519. .into(),
  520. };
  521. let amount =
  522. Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
  523. let relative_fee_reserve =
  524. (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
  525. let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
  526. let fee = match relative_fee_reserve > absolute_fee_reserve {
  527. true => relative_fee_reserve,
  528. false => absolute_fee_reserve,
  529. };
  530. let payment_hash = bolt11.payment_hash().to_string();
  531. let payment_hash_bytes = hex::decode(&payment_hash)?
  532. .try_into()
  533. .map_err(|_| Error::InvalidPaymentHashLength)?;
  534. Ok(PaymentQuoteResponse {
  535. request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
  536. amount,
  537. fee: Amount::new(fee, unit.clone()),
  538. state: MeltQuoteState::Unpaid,
  539. })
  540. }
  541. OutgoingPaymentOptions::Bolt12(bolt12_options) => {
  542. let offer = bolt12_options.offer;
  543. let amount_msat = match bolt12_options.melt_options {
  544. Some(melt_options) => melt_options.amount_msat(),
  545. None => {
  546. let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
  547. match amount {
  548. ldk_node::lightning::offers::offer::Amount::Bitcoin {
  549. amount_msats,
  550. } => amount_msats.into(),
  551. _ => return Err(payment::Error::AmountMismatch),
  552. }
  553. }
  554. };
  555. let amount =
  556. Amount::new(amount_msat.into(), CurrencyUnit::Msat).convert_to(unit)?;
  557. let relative_fee_reserve =
  558. (self.fee_reserve.percent_fee_reserve * amount.value() as f32) as u64;
  559. let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
  560. let fee = match relative_fee_reserve > absolute_fee_reserve {
  561. true => relative_fee_reserve,
  562. false => absolute_fee_reserve,
  563. };
  564. Ok(PaymentQuoteResponse {
  565. request_lookup_id: None,
  566. amount,
  567. fee: Amount::new(fee, unit.clone()),
  568. state: MeltQuoteState::Unpaid,
  569. })
  570. }
  571. }
  572. }
  573. /// Pay request
  574. #[instrument(skip(self, options))]
  575. async fn make_payment(
  576. &self,
  577. unit: &CurrencyUnit,
  578. options: OutgoingPaymentOptions,
  579. ) -> Result<MakePaymentResponse, Self::Err> {
  580. match options {
  581. cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
  582. Err(cdk_common::payment::Error::UnsupportedPaymentOption)
  583. }
  584. OutgoingPaymentOptions::Bolt11(bolt11_options) => {
  585. let bolt11 = bolt11_options.bolt11;
  586. let send_params = match bolt11_options
  587. .max_fee_amount
  588. .map(|f| {
  589. Amount::new(f.into(), unit.clone())
  590. .convert_to(&CurrencyUnit::Msat)
  591. .map(|amount_msat| RouteParametersConfig {
  592. max_total_routing_fee_msat: Some(amount_msat.value()),
  593. ..Default::default()
  594. })
  595. })
  596. .transpose()
  597. {
  598. Ok(params) => params,
  599. Err(err) => {
  600. tracing::error!("Failed to convert fee amount: {}", err);
  601. return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
  602. }
  603. };
  604. let payment_id = match bolt11_options.melt_options {
  605. Some(MeltOptions::Amountless { amountless }) => self
  606. .inner
  607. .bolt11_payment()
  608. .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
  609. .map_err(|err| {
  610. tracing::error!("Could not send send amountless bolt11: {}", err);
  611. Error::CouldNotSendBolt11WithoutAmount
  612. })?,
  613. None => self
  614. .inner
  615. .bolt11_payment()
  616. .send(&bolt11, send_params)
  617. .map_err(|err| {
  618. tracing::error!("Could not send bolt11 {}", err);
  619. Error::CouldNotSendBolt11
  620. })?,
  621. _ => return Err(payment::Error::UnsupportedPaymentOption),
  622. };
  623. // Check payment status for up to 10 seconds
  624. let start = std::time::Instant::now();
  625. let timeout = std::time::Duration::from_secs(10);
  626. let (status, payment_details) = loop {
  627. let details = self
  628. .inner
  629. .payment(&payment_id)
  630. .ok_or(Error::PaymentNotFound)?;
  631. match details.status {
  632. PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
  633. PaymentStatus::Failed => {
  634. tracing::error!("Failed to pay bolt11 payment.");
  635. break (MeltQuoteState::Failed, details);
  636. }
  637. PaymentStatus::Pending => {
  638. if start.elapsed() > timeout {
  639. tracing::warn!(
  640. "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
  641. );
  642. break (MeltQuoteState::Pending, details);
  643. }
  644. tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  645. continue;
  646. }
  647. }
  648. };
  649. let payment_proof = match payment_details.kind {
  650. PaymentKind::Bolt11 {
  651. hash: _,
  652. preimage,
  653. secret: _,
  654. } => preimage.map(|p| p.to_string()),
  655. _ => return Err(Error::UnexpectedPaymentKind.into()),
  656. };
  657. let total_spent = payment_details
  658. .amount_msat
  659. .ok_or(Error::CouldNotGetAmountSpent)?
  660. + payment_details.fee_paid_msat.unwrap_or_default();
  661. let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
  662. Ok(MakePaymentResponse {
  663. payment_lookup_id: PaymentIdentifier::PaymentHash(
  664. bolt11.payment_hash().to_byte_array(),
  665. ),
  666. payment_proof,
  667. status,
  668. total_spent,
  669. })
  670. }
  671. OutgoingPaymentOptions::Bolt12(bolt12_options) => {
  672. let offer = bolt12_options.offer;
  673. let payment_id = match bolt12_options.melt_options {
  674. Some(MeltOptions::Amountless { amountless }) => self
  675. .inner
  676. .bolt12_payment()
  677. .send_using_amount(&offer, amountless.amount_msat.into(), None, None, None)
  678. .map_err(Error::LdkNode)?,
  679. None => self
  680. .inner
  681. .bolt12_payment()
  682. .send(&offer, None, None, None)
  683. .map_err(Error::LdkNode)?,
  684. _ => return Err(payment::Error::UnsupportedPaymentOption),
  685. };
  686. // Check payment status for up to 10 seconds
  687. let start = std::time::Instant::now();
  688. let timeout = std::time::Duration::from_secs(10);
  689. let (status, payment_details) = loop {
  690. let details = self
  691. .inner
  692. .payment(&payment_id)
  693. .ok_or(Error::PaymentNotFound)?;
  694. match details.status {
  695. PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
  696. PaymentStatus::Failed => {
  697. tracing::error!("Payment with id {} failed.", payment_id);
  698. break (MeltQuoteState::Failed, details);
  699. }
  700. PaymentStatus::Pending => {
  701. if start.elapsed() > timeout {
  702. tracing::warn!(
  703. "Payment has been being for 10 seconds. No longer waiting"
  704. );
  705. break (MeltQuoteState::Pending, details);
  706. }
  707. tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  708. continue;
  709. }
  710. }
  711. };
  712. let payment_proof = match payment_details.kind {
  713. PaymentKind::Bolt12Offer {
  714. hash: _,
  715. preimage,
  716. secret: _,
  717. offer_id: _,
  718. payer_note: _,
  719. quantity: _,
  720. } => preimage.map(|p| p.to_string()),
  721. _ => return Err(Error::UnexpectedPaymentKind.into()),
  722. };
  723. let total_spent = payment_details
  724. .amount_msat
  725. .ok_or(Error::CouldNotGetAmountSpent)?
  726. + payment_details.fee_paid_msat.unwrap_or_default();
  727. let total_spent = Amount::new(total_spent, CurrencyUnit::Msat).convert_to(unit)?;
  728. Ok(MakePaymentResponse {
  729. payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
  730. payment_proof,
  731. status,
  732. total_spent,
  733. })
  734. }
  735. }
  736. }
  737. /// Listen for invoices to be paid to the mint
  738. /// Returns a stream of request_lookup_id once invoices are paid
  739. #[instrument(skip(self))]
  740. async fn wait_payment_event(
  741. &self,
  742. ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
  743. tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
  744. // Set active flag to indicate stream is active
  745. self.wait_invoice_is_active.store(true, Ordering::SeqCst);
  746. tracing::debug!("wait_invoice_is_active set to true");
  747. let receiver = self.receiver.clone();
  748. tracing::info!("Receiver obtained successfully, creating response stream");
  749. // Transform the String stream into a WaitPaymentResponse stream
  750. let response_stream = BroadcastStream::new(receiver.resubscribe());
  751. // Map the stream to handle BroadcastStreamRecvError and wrap in Event
  752. let response_stream = response_stream.filter_map(|result| async move {
  753. match result {
  754. Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
  755. Err(err) => {
  756. tracing::warn!("Error in broadcast stream: {}", err);
  757. None
  758. }
  759. }
  760. });
  761. // Create a combined stream that also handles cancellation
  762. let cancel_token = self.wait_invoice_cancel_token.clone();
  763. let is_active = self.wait_invoice_is_active.clone();
  764. let stream = Box::pin(response_stream);
  765. // Set up a task to clean up when the stream is dropped
  766. tokio::spawn(async move {
  767. cancel_token.cancelled().await;
  768. tracing::info!("wait_invoice stream cancelled");
  769. is_active.store(false, Ordering::SeqCst);
  770. });
  771. tracing::info!("wait_any_incoming_payment returning stream");
  772. Ok(stream)
  773. }
  774. /// Is wait invoice active
  775. fn is_wait_invoice_active(&self) -> bool {
  776. self.wait_invoice_is_active.load(Ordering::SeqCst)
  777. }
  778. /// Cancel wait invoice
  779. fn cancel_wait_invoice(&self) {
  780. self.wait_invoice_cancel_token.cancel()
  781. }
  782. /// Check the status of an incoming payment
  783. async fn check_incoming_payment_status(
  784. &self,
  785. payment_identifier: &PaymentIdentifier,
  786. ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
  787. let payment_id_str = match payment_identifier {
  788. PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
  789. PaymentIdentifier::CustomId(id) => id.clone(),
  790. _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
  791. };
  792. let payment_id = PaymentId(
  793. hex::decode(&payment_id_str)?
  794. .try_into()
  795. .map_err(|_| Error::InvalidPaymentIdLength)?,
  796. );
  797. let payment_details = self
  798. .inner
  799. .payment(&payment_id)
  800. .ok_or(Error::PaymentNotFound)?;
  801. if payment_details.direction == PaymentDirection::Outbound {
  802. return Err(Error::InvalidPaymentDirection.into());
  803. }
  804. let amount = if payment_details.status == PaymentStatus::Succeeded {
  805. payment_details
  806. .amount_msat
  807. .ok_or(Error::CouldNotGetPaymentAmount)?
  808. } else {
  809. return Ok(vec![]);
  810. };
  811. let response = WaitPaymentResponse {
  812. payment_identifier: payment_identifier.clone(),
  813. payment_amount: Amount::new(amount, CurrencyUnit::Msat),
  814. payment_id: payment_id_str,
  815. };
  816. Ok(vec![response])
  817. }
  818. /// Check the status of an outgoing payment
  819. async fn check_outgoing_payment(
  820. &self,
  821. request_lookup_id: &PaymentIdentifier,
  822. ) -> Result<MakePaymentResponse, Self::Err> {
  823. let payment_details = match request_lookup_id {
  824. PaymentIdentifier::PaymentHash(id_hash) => self
  825. .inner
  826. .list_payments_with_filter(
  827. |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
  828. )
  829. .first()
  830. .cloned(),
  831. PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
  832. hex::decode(id)?
  833. .try_into()
  834. .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
  835. )),
  836. _ => {
  837. return Ok(MakePaymentResponse {
  838. payment_lookup_id: request_lookup_id.clone(),
  839. payment_proof: None,
  840. status: MeltQuoteState::Unknown,
  841. total_spent: Amount::new(0, CurrencyUnit::Msat),
  842. });
  843. }
  844. }
  845. .ok_or(Error::PaymentNotFound)?;
  846. // This check seems reversed in the original code, so I'm fixing it here
  847. if payment_details.direction != PaymentDirection::Outbound {
  848. return Err(Error::InvalidPaymentDirection.into());
  849. }
  850. let status = match payment_details.status {
  851. PaymentStatus::Pending => MeltQuoteState::Pending,
  852. PaymentStatus::Succeeded => MeltQuoteState::Paid,
  853. PaymentStatus::Failed => MeltQuoteState::Failed,
  854. };
  855. let payment_proof = match payment_details.kind {
  856. PaymentKind::Bolt11 {
  857. hash: _,
  858. preimage,
  859. secret: _,
  860. } => preimage.map(|p| p.to_string()),
  861. _ => return Err(Error::UnexpectedPaymentKind.into()),
  862. };
  863. let total_spent = payment_details
  864. .amount_msat
  865. .ok_or(Error::CouldNotGetAmountSpent)?;
  866. Ok(MakePaymentResponse {
  867. payment_lookup_id: request_lookup_id.clone(),
  868. payment_proof,
  869. status,
  870. total_spent: Amount::new(total_spent, CurrencyUnit::Msat),
  871. })
  872. }
  873. }
  874. impl Drop for CdkLdkNode {
  875. fn drop(&mut self) {
  876. tracing::info!("Drop called on CdkLdkNode");
  877. self.wait_invoice_cancel_token.cancel();
  878. tracing::debug!("Cancelled wait_invoice token in drop");
  879. }
  880. }