lib.rs 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983
  1. //! CDK lightning backend for ldk-node
  2. #![doc = include_str!("../README.md")]
  3. use std::net::SocketAddr;
  4. use std::pin::Pin;
  5. use std::sync::atomic::{AtomicBool, Ordering};
  6. use std::sync::Arc;
  7. use async_trait::async_trait;
  8. use cdk_common::amount::to_unit;
  9. use cdk_common::common::FeeReserve;
  10. use cdk_common::payment::{self, *};
  11. use cdk_common::util::{hex, unix_time};
  12. use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
  13. use futures::{Stream, StreamExt};
  14. use ldk_node::bitcoin::hashes::Hash;
  15. use ldk_node::bitcoin::Network;
  16. use ldk_node::lightning::ln::channelmanager::PaymentId;
  17. use ldk_node::lightning::ln::msgs::SocketAddress;
  18. use ldk_node::lightning::routing::router::RouteParametersConfig;
  19. use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
  20. use ldk_node::lightning_types::payment::PaymentHash;
  21. use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
  22. use ldk_node::{Builder, Event, Node};
  23. use tokio_stream::wrappers::BroadcastStream;
  24. use tokio_util::sync::CancellationToken;
  25. use tracing::instrument;
  26. use crate::error::Error;
  27. mod error;
  28. mod web;
  29. /// CDK Lightning backend using LDK Node
  30. ///
  31. /// Provides Lightning Network functionality for CDK with support for Cashu operations.
  32. /// Handles payment creation, processing, and event management using the Lightning Development Kit.
  33. #[derive(Clone)]
  34. pub struct CdkLdkNode {
  35. inner: Arc<Node>,
  36. fee_reserve: FeeReserve,
  37. wait_invoice_cancel_token: CancellationToken,
  38. wait_invoice_is_active: Arc<AtomicBool>,
  39. sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
  40. receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
  41. events_cancel_token: CancellationToken,
  42. web_addr: Option<SocketAddr>,
  43. }
  44. /// Configuration for connecting to Bitcoin RPC
  45. ///
  46. /// Contains the necessary connection parameters for Bitcoin Core RPC interface.
  47. #[derive(Debug, Clone)]
  48. pub struct BitcoinRpcConfig {
  49. /// Bitcoin RPC server hostname or IP address
  50. pub host: String,
  51. /// Bitcoin RPC server port number
  52. pub port: u16,
  53. /// Username for Bitcoin RPC authentication
  54. pub user: String,
  55. /// Password for Bitcoin RPC authentication
  56. pub password: String,
  57. }
  58. /// Source of blockchain data for the Lightning node
  59. ///
  60. /// Specifies how the node should connect to the Bitcoin network to retrieve
  61. /// blockchain information and broadcast transactions.
  62. #[derive(Debug, Clone)]
  63. pub enum ChainSource {
  64. /// Use an Esplora server for blockchain data
  65. ///
  66. /// Contains the URL of the Esplora server endpoint
  67. Esplora(String),
  68. /// Use Bitcoin Core RPC for blockchain data
  69. ///
  70. /// Contains the configuration for connecting to Bitcoin Core
  71. BitcoinRpc(BitcoinRpcConfig),
  72. }
  73. /// Source of Lightning network gossip data
  74. ///
  75. /// Specifies how the node should learn about the Lightning Network topology
  76. /// and routing information.
  77. #[derive(Debug, Clone)]
  78. pub enum GossipSource {
  79. /// Learn gossip through peer-to-peer connections
  80. ///
  81. /// The node will connect to other Lightning nodes and exchange gossip data directly
  82. P2P,
  83. /// Use Rapid Gossip Sync for efficient gossip updates
  84. ///
  85. /// Contains the URL of the RGS server for compressed gossip data
  86. RapidGossipSync(String),
  87. }
  88. impl CdkLdkNode {
  89. /// Create a new CDK LDK Node instance
  90. ///
  91. /// # Arguments
  92. /// * `network` - Bitcoin network (mainnet, testnet, regtest, signet)
  93. /// * `chain_source` - Source of blockchain data (Esplora or Bitcoin RPC)
  94. /// * `gossip_source` - Source of Lightning network gossip data
  95. /// * `storage_dir_path` - Directory path for node data storage
  96. /// * `fee_reserve` - Fee reserve configuration for payments
  97. /// * `listening_address` - Socket addresses for peer connections
  98. /// * `runtime` - Optional Tokio runtime to use for starting the node
  99. ///
  100. /// # Returns
  101. /// A new `CdkLdkNode` instance ready to be started
  102. ///
  103. /// # Errors
  104. /// Returns an error if the LDK node builder fails to create the node
  105. pub fn new(
  106. network: Network,
  107. chain_source: ChainSource,
  108. gossip_source: GossipSource,
  109. storage_dir_path: String,
  110. fee_reserve: FeeReserve,
  111. listening_address: Vec<SocketAddress>,
  112. ) -> Result<Self, Error> {
  113. let mut builder = Builder::new();
  114. builder.set_network(network);
  115. tracing::info!("Storage dir of node is {}", storage_dir_path);
  116. builder.set_storage_dir_path(storage_dir_path);
  117. match chain_source {
  118. ChainSource::Esplora(esplora_url) => {
  119. builder.set_chain_source_esplora(esplora_url, None);
  120. }
  121. ChainSource::BitcoinRpc(BitcoinRpcConfig {
  122. host,
  123. port,
  124. user,
  125. password,
  126. }) => {
  127. builder.set_chain_source_bitcoind_rpc(host, port, user, password);
  128. }
  129. }
  130. match gossip_source {
  131. GossipSource::P2P => {
  132. builder.set_gossip_source_p2p();
  133. }
  134. GossipSource::RapidGossipSync(rgs_url) => {
  135. builder.set_gossip_source_rgs(rgs_url);
  136. }
  137. }
  138. builder.set_listening_addresses(listening_address)?;
  139. builder.set_node_alias("cdk-ldk-node".to_string())?;
  140. let node = builder.build()?;
  141. tracing::info!("Creating tokio channel for payment notifications");
  142. let (sender, receiver) = tokio::sync::broadcast::channel(8);
  143. let id = node.node_id();
  144. let adr = node.announcement_addresses();
  145. tracing::info!(
  146. "Created node {} with address {:?} on network {}",
  147. id,
  148. adr,
  149. network
  150. );
  151. Ok(Self {
  152. inner: node.into(),
  153. fee_reserve,
  154. wait_invoice_cancel_token: CancellationToken::new(),
  155. wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
  156. sender,
  157. receiver: Arc::new(receiver),
  158. events_cancel_token: CancellationToken::new(),
  159. web_addr: None,
  160. })
  161. }
  162. /// Set the web server address for the LDK node management interface
  163. ///
  164. /// # Arguments
  165. /// * `addr` - Socket address for the web server. If None, no web server will be started.
  166. pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
  167. self.web_addr = addr;
  168. }
  169. /// Get a default web server address using an unused port
  170. ///
  171. /// Returns a SocketAddr with localhost and port 0, which will cause
  172. /// the system to automatically assign an available port
  173. pub fn default_web_addr() -> SocketAddr {
  174. SocketAddr::from(([127, 0, 0, 1], 8091))
  175. }
  176. /// Start the CDK LDK Node
  177. ///
  178. /// Starts the underlying LDK node and begins event processing.
  179. /// Sets up event handlers to listen for Lightning events like payment received.
  180. ///
  181. /// # Returns
  182. /// Returns `Ok(())` on successful start, error otherwise
  183. ///
  184. /// # Errors
  185. /// Returns an error if the LDK node fails to start or event handling setup fails
  186. pub fn start_ldk_node(&self) -> Result<(), Error> {
  187. tracing::info!("Starting cdk-ldk node");
  188. self.inner.start()?;
  189. let node_config = self.inner.config();
  190. tracing::info!("Starting node with network {}", node_config.network);
  191. tracing::info!("Node status: {:?}", self.inner.status());
  192. self.handle_events()?;
  193. Ok(())
  194. }
  195. /// Start the web server for the LDK node management interface
  196. ///
  197. /// Starts a web server that provides a user interface for managing the LDK node.
  198. /// The web interface allows users to view balances, manage channels, create invoices,
  199. /// and send payments.
  200. ///
  201. /// # Arguments
  202. /// * `web_addr` - The socket address to bind the web server to
  203. ///
  204. /// # Returns
  205. /// Returns `Ok(())` on successful start, error otherwise
  206. ///
  207. /// # Errors
  208. /// Returns an error if the web server fails to start
  209. pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
  210. let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
  211. tokio::spawn(async move {
  212. if let Err(e) = web_server.serve(web_addr).await {
  213. tracing::error!("Web server error: {}", e);
  214. }
  215. });
  216. Ok(())
  217. }
  218. /// Stop the CDK LDK Node
  219. ///
  220. /// Gracefully stops the node by cancelling all active tasks and event handlers.
  221. /// This includes:
  222. /// - Cancelling the event handler task
  223. /// - Cancelling any active wait_invoice streams
  224. /// - Stopping the underlying LDK node
  225. ///
  226. /// # Returns
  227. /// Returns `Ok(())` on successful shutdown, error otherwise
  228. ///
  229. /// # Errors
  230. /// Returns an error if the underlying LDK node fails to stop
  231. pub fn stop_ldk_node(&self) -> Result<(), Error> {
  232. tracing::info!("Stopping CdkLdkNode");
  233. // Cancel all tokio tasks
  234. tracing::info!("Cancelling event handler");
  235. self.events_cancel_token.cancel();
  236. // Cancel any wait_invoice streams
  237. if self.is_wait_invoice_active() {
  238. tracing::info!("Cancelling wait_invoice stream");
  239. self.wait_invoice_cancel_token.cancel();
  240. }
  241. // Stop the LDK node
  242. tracing::info!("Stopping LDK node");
  243. self.inner.stop()?;
  244. tracing::info!("CdkLdkNode stopped successfully");
  245. Ok(())
  246. }
  247. /// Handle payment received event
  248. async fn handle_payment_received(
  249. node: &Arc<Node>,
  250. sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
  251. payment_id: Option<PaymentId>,
  252. payment_hash: PaymentHash,
  253. amount_msat: u64,
  254. ) {
  255. tracing::info!(
  256. "Received payment for hash={} of amount={} msat",
  257. payment_hash,
  258. amount_msat
  259. );
  260. let payment_id = match payment_id {
  261. Some(id) => id,
  262. None => {
  263. tracing::warn!("Received payment without payment_id");
  264. return;
  265. }
  266. };
  267. let payment_id_hex = hex::encode(payment_id.0);
  268. if amount_msat == 0 {
  269. tracing::warn!("Payment of no amount");
  270. return;
  271. }
  272. tracing::info!(
  273. "Processing payment notification: id={}, amount={} msats",
  274. payment_id_hex,
  275. amount_msat
  276. );
  277. let payment_details = match node.payment(&payment_id) {
  278. Some(details) => details,
  279. None => {
  280. tracing::error!("Could not find payment details for id={}", payment_id_hex);
  281. return;
  282. }
  283. };
  284. let (payment_identifier, payment_id) = match payment_details.kind {
  285. PaymentKind::Bolt11 { hash, .. } => {
  286. (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
  287. }
  288. PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
  289. Some(h) => (
  290. PaymentIdentifier::OfferId(offer_id.to_string()),
  291. h.to_string(),
  292. ),
  293. None => {
  294. tracing::error!("Bolt12 payment missing hash");
  295. return;
  296. }
  297. },
  298. k => {
  299. tracing::warn!("Received payment of kind {:?} which is not supported", k);
  300. return;
  301. }
  302. };
  303. let wait_payment_response = WaitPaymentResponse {
  304. payment_identifier,
  305. payment_amount: amount_msat.into(),
  306. unit: CurrencyUnit::Msat,
  307. payment_id,
  308. };
  309. match sender.send(wait_payment_response) {
  310. Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
  311. Err(err) => tracing::error!(
  312. "Could not send payment received notification on channel: {}",
  313. err
  314. ),
  315. }
  316. }
  317. /// Set up event handling for the node
  318. pub fn handle_events(&self) -> Result<(), Error> {
  319. let node = self.inner.clone();
  320. let sender = self.sender.clone();
  321. let cancel_token = self.events_cancel_token.clone();
  322. tracing::info!("Starting event handler task");
  323. tokio::spawn(async move {
  324. tracing::info!("Event handler loop started");
  325. loop {
  326. tokio::select! {
  327. _ = cancel_token.cancelled() => {
  328. tracing::info!("Event handler cancelled");
  329. break;
  330. }
  331. event = node.next_event_async() => {
  332. match event {
  333. Event::PaymentReceived {
  334. payment_id,
  335. payment_hash,
  336. amount_msat,
  337. custom_records: _
  338. } => {
  339. Self::handle_payment_received(
  340. &node,
  341. &sender,
  342. payment_id,
  343. payment_hash,
  344. amount_msat
  345. ).await;
  346. }
  347. event => {
  348. tracing::debug!("Received other ldk node event: {:?}", event);
  349. }
  350. }
  351. if let Err(err) = node.event_handled() {
  352. tracing::error!("Error handling node event: {}", err);
  353. } else {
  354. tracing::debug!("Successfully handled node event");
  355. }
  356. }
  357. }
  358. }
  359. tracing::info!("Event handler loop terminated");
  360. });
  361. tracing::info!("Event handler task spawned");
  362. Ok(())
  363. }
  364. /// Get Node used
  365. pub fn node(&self) -> Arc<Node> {
  366. Arc::clone(&self.inner)
  367. }
  368. }
  369. /// Mint payment trait
  370. #[async_trait]
  371. impl MintPayment for CdkLdkNode {
  372. type Err = payment::Error;
  373. /// Start the payment processor
  374. /// Starts the LDK node and begins event processing
  375. async fn start(&self) -> Result<(), Self::Err> {
  376. self.start_ldk_node().map_err(|e| {
  377. tracing::error!("Failed to start CdkLdkNode: {}", e);
  378. e
  379. })?;
  380. tracing::info!("CdkLdkNode payment processor started successfully");
  381. // Start web server if configured
  382. if let Some(web_addr) = self.web_addr {
  383. tracing::info!("Starting LDK Node web interface on {}", web_addr);
  384. self.start_web_server(web_addr).map_err(|e| {
  385. tracing::error!("Failed to start web server: {}", e);
  386. e
  387. })?;
  388. } else {
  389. tracing::info!("No web server address configured, skipping web interface");
  390. }
  391. Ok(())
  392. }
  393. /// Stop the payment processor
  394. /// Gracefully stops the LDK node and cancels all background tasks
  395. async fn stop(&self) -> Result<(), Self::Err> {
  396. self.stop_ldk_node().map_err(|e| {
  397. tracing::error!("Failed to stop CdkLdkNode: {}", e);
  398. e.into()
  399. })
  400. }
  401. /// Base Settings
  402. async fn get_settings(&self) -> Result<serde_json::Value, Self::Err> {
  403. let settings = Bolt11Settings {
  404. mpp: false,
  405. unit: CurrencyUnit::Msat,
  406. invoice_description: true,
  407. amountless: true,
  408. bolt12: true,
  409. };
  410. Ok(serde_json::to_value(settings)?)
  411. }
  412. /// Create a new invoice
  413. #[instrument(skip(self))]
  414. async fn create_incoming_payment_request(
  415. &self,
  416. unit: &CurrencyUnit,
  417. options: IncomingPaymentOptions,
  418. ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
  419. match options {
  420. IncomingPaymentOptions::Bolt11(bolt11_options) => {
  421. let amount_msat = to_unit(bolt11_options.amount, unit, &CurrencyUnit::Msat)?;
  422. let description = bolt11_options.description.unwrap_or_default();
  423. let time = bolt11_options
  424. .unix_expiry
  425. .map(|t| t - unix_time())
  426. .unwrap_or(36000);
  427. let description = Bolt11InvoiceDescription::Direct(
  428. Description::new(description).map_err(|_| Error::InvalidDescription)?,
  429. );
  430. let payment = self
  431. .inner
  432. .bolt11_payment()
  433. .receive(amount_msat.into(), &description, time as u32)
  434. .map_err(Error::LdkNode)?;
  435. let payment_hash = payment.payment_hash().to_string();
  436. let payment_identifier = PaymentIdentifier::PaymentHash(
  437. hex::decode(&payment_hash)?
  438. .try_into()
  439. .map_err(|_| Error::InvalidPaymentHashLength)?,
  440. );
  441. Ok(CreateIncomingPaymentResponse {
  442. request_lookup_id: payment_identifier,
  443. request: payment.to_string(),
  444. expiry: Some(unix_time() + time),
  445. })
  446. }
  447. IncomingPaymentOptions::Bolt12(bolt12_options) => {
  448. let Bolt12IncomingPaymentOptions {
  449. description,
  450. amount,
  451. unix_expiry,
  452. } = *bolt12_options;
  453. let time = unix_expiry.map(|t| (t - unix_time()) as u32);
  454. let offer = match amount {
  455. Some(amount) => {
  456. let amount_msat = to_unit(amount, unit, &CurrencyUnit::Msat)?;
  457. self.inner
  458. .bolt12_payment()
  459. .receive(
  460. amount_msat.into(),
  461. &description.unwrap_or("".to_string()),
  462. time,
  463. None,
  464. )
  465. .map_err(Error::LdkNode)?
  466. }
  467. None => self
  468. .inner
  469. .bolt12_payment()
  470. .receive_variable_amount(&description.unwrap_or("".to_string()), time)
  471. .map_err(Error::LdkNode)?,
  472. };
  473. let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
  474. Ok(CreateIncomingPaymentResponse {
  475. request_lookup_id: payment_identifier,
  476. request: offer.to_string(),
  477. expiry: time.map(|a| a as u64),
  478. })
  479. }
  480. }
  481. }
  482. /// Get payment quote
  483. /// Used to get fee and amount required for a payment request
  484. #[instrument(skip_all)]
  485. async fn get_payment_quote(
  486. &self,
  487. unit: &CurrencyUnit,
  488. options: OutgoingPaymentOptions,
  489. ) -> Result<PaymentQuoteResponse, Self::Err> {
  490. match options {
  491. OutgoingPaymentOptions::Bolt11(bolt11_options) => {
  492. let bolt11 = bolt11_options.bolt11;
  493. let amount_msat = match bolt11_options.melt_options {
  494. Some(melt_options) => melt_options.amount_msat(),
  495. None => bolt11
  496. .amount_milli_satoshis()
  497. .ok_or(Error::UnknownInvoiceAmount)?
  498. .into(),
  499. };
  500. let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
  501. let relative_fee_reserve =
  502. (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
  503. let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
  504. let fee = match relative_fee_reserve > absolute_fee_reserve {
  505. true => relative_fee_reserve,
  506. false => absolute_fee_reserve,
  507. };
  508. let payment_hash = bolt11.payment_hash().to_string();
  509. let payment_hash_bytes = hex::decode(&payment_hash)?
  510. .try_into()
  511. .map_err(|_| Error::InvalidPaymentHashLength)?;
  512. Ok(PaymentQuoteResponse {
  513. request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
  514. amount,
  515. fee: fee.into(),
  516. state: MeltQuoteState::Unpaid,
  517. unit: unit.clone(),
  518. })
  519. }
  520. OutgoingPaymentOptions::Bolt12(bolt12_options) => {
  521. let offer = bolt12_options.offer;
  522. let amount_msat = match bolt12_options.melt_options {
  523. Some(melt_options) => melt_options.amount_msat(),
  524. None => {
  525. let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
  526. match amount {
  527. ldk_node::lightning::offers::offer::Amount::Bitcoin {
  528. amount_msats,
  529. } => amount_msats.into(),
  530. _ => return Err(payment::Error::AmountMismatch),
  531. }
  532. }
  533. };
  534. let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
  535. let relative_fee_reserve =
  536. (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
  537. let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
  538. let fee = match relative_fee_reserve > absolute_fee_reserve {
  539. true => relative_fee_reserve,
  540. false => absolute_fee_reserve,
  541. };
  542. Ok(PaymentQuoteResponse {
  543. request_lookup_id: None,
  544. amount,
  545. fee: fee.into(),
  546. state: MeltQuoteState::Unpaid,
  547. unit: unit.clone(),
  548. })
  549. }
  550. }
  551. }
  552. /// Pay request
  553. #[instrument(skip(self, options))]
  554. async fn make_payment(
  555. &self,
  556. unit: &CurrencyUnit,
  557. options: OutgoingPaymentOptions,
  558. ) -> Result<MakePaymentResponse, Self::Err> {
  559. match options {
  560. OutgoingPaymentOptions::Bolt11(bolt11_options) => {
  561. let bolt11 = bolt11_options.bolt11;
  562. let send_params = match bolt11_options
  563. .max_fee_amount
  564. .map(|f| {
  565. to_unit(f, unit, &CurrencyUnit::Msat).map(|amount_msat| {
  566. RouteParametersConfig {
  567. max_total_routing_fee_msat: Some(amount_msat.into()),
  568. ..Default::default()
  569. }
  570. })
  571. })
  572. .transpose()
  573. {
  574. Ok(params) => params,
  575. Err(err) => {
  576. tracing::error!("Failed to convert fee amount: {}", err);
  577. return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
  578. }
  579. };
  580. let payment_id = match bolt11_options.melt_options {
  581. Some(MeltOptions::Amountless { amountless }) => self
  582. .inner
  583. .bolt11_payment()
  584. .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
  585. .map_err(|err| {
  586. tracing::error!("Could not send send amountless bolt11: {}", err);
  587. Error::CouldNotSendBolt11WithoutAmount
  588. })?,
  589. None => self
  590. .inner
  591. .bolt11_payment()
  592. .send(&bolt11, send_params)
  593. .map_err(|err| {
  594. tracing::error!("Could not send bolt11 {}", err);
  595. Error::CouldNotSendBolt11
  596. })?,
  597. _ => return Err(payment::Error::UnsupportedPaymentOption),
  598. };
  599. // Check payment status for up to 10 seconds
  600. let start = std::time::Instant::now();
  601. let timeout = std::time::Duration::from_secs(10);
  602. let (status, payment_details) = loop {
  603. let details = self
  604. .inner
  605. .payment(&payment_id)
  606. .ok_or(Error::PaymentNotFound)?;
  607. match details.status {
  608. PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
  609. PaymentStatus::Failed => {
  610. tracing::error!("Failed to pay bolt11 payment.");
  611. break (MeltQuoteState::Failed, details);
  612. }
  613. PaymentStatus::Pending => {
  614. if start.elapsed() > timeout {
  615. tracing::warn!(
  616. "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
  617. );
  618. break (MeltQuoteState::Pending, details);
  619. }
  620. tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  621. continue;
  622. }
  623. }
  624. };
  625. let payment_proof = match payment_details.kind {
  626. PaymentKind::Bolt11 {
  627. hash: _,
  628. preimage,
  629. secret: _,
  630. } => preimage.map(|p| p.to_string()),
  631. _ => return Err(Error::UnexpectedPaymentKind.into()),
  632. };
  633. let total_spent = payment_details
  634. .amount_msat
  635. .ok_or(Error::CouldNotGetAmountSpent)?
  636. + payment_details.fee_paid_msat.unwrap_or_default();
  637. let total_spent = to_unit(total_spent, &CurrencyUnit::Msat, unit)?;
  638. Ok(MakePaymentResponse {
  639. payment_lookup_id: PaymentIdentifier::PaymentHash(
  640. bolt11.payment_hash().to_byte_array(),
  641. ),
  642. payment_proof,
  643. status,
  644. total_spent,
  645. unit: unit.clone(),
  646. })
  647. }
  648. OutgoingPaymentOptions::Bolt12(bolt12_options) => {
  649. let offer = bolt12_options.offer;
  650. let payment_id = match bolt12_options.melt_options {
  651. Some(MeltOptions::Amountless { amountless }) => self
  652. .inner
  653. .bolt12_payment()
  654. .send_using_amount(&offer, amountless.amount_msat.into(), None, None, None)
  655. .map_err(Error::LdkNode)?,
  656. None => self
  657. .inner
  658. .bolt12_payment()
  659. .send(&offer, None, None, None)
  660. .map_err(Error::LdkNode)?,
  661. _ => return Err(payment::Error::UnsupportedPaymentOption),
  662. };
  663. // Check payment status for up to 10 seconds
  664. let start = std::time::Instant::now();
  665. let timeout = std::time::Duration::from_secs(10);
  666. let (status, payment_details) = loop {
  667. let details = self
  668. .inner
  669. .payment(&payment_id)
  670. .ok_or(Error::PaymentNotFound)?;
  671. match details.status {
  672. PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
  673. PaymentStatus::Failed => {
  674. tracing::error!("Payment with id {} failed.", payment_id);
  675. break (MeltQuoteState::Failed, details);
  676. }
  677. PaymentStatus::Pending => {
  678. if start.elapsed() > timeout {
  679. tracing::warn!(
  680. "Payment has been being for 10 seconds. No longer waiting"
  681. );
  682. break (MeltQuoteState::Pending, details);
  683. }
  684. tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  685. continue;
  686. }
  687. }
  688. };
  689. let payment_proof = match payment_details.kind {
  690. PaymentKind::Bolt12Offer {
  691. hash: _,
  692. preimage,
  693. secret: _,
  694. offer_id: _,
  695. payer_note: _,
  696. quantity: _,
  697. } => preimage.map(|p| p.to_string()),
  698. _ => return Err(Error::UnexpectedPaymentKind.into()),
  699. };
  700. let total_spent = payment_details
  701. .amount_msat
  702. .ok_or(Error::CouldNotGetAmountSpent)?
  703. + payment_details.fee_paid_msat.unwrap_or_default();
  704. let total_spent = to_unit(total_spent, &CurrencyUnit::Msat, unit)?;
  705. Ok(MakePaymentResponse {
  706. payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
  707. payment_proof,
  708. status,
  709. total_spent,
  710. unit: unit.clone(),
  711. })
  712. }
  713. }
  714. }
  715. /// Listen for invoices to be paid to the mint
  716. /// Returns a stream of request_lookup_id once invoices are paid
  717. #[instrument(skip(self))]
  718. async fn wait_payment_event(
  719. &self,
  720. ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
  721. tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
  722. // Set active flag to indicate stream is active
  723. self.wait_invoice_is_active.store(true, Ordering::SeqCst);
  724. tracing::debug!("wait_invoice_is_active set to true");
  725. let receiver = self.receiver.clone();
  726. tracing::info!("Receiver obtained successfully, creating response stream");
  727. // Transform the String stream into a WaitPaymentResponse stream
  728. let response_stream = BroadcastStream::new(receiver.resubscribe());
  729. // Map the stream to handle BroadcastStreamRecvError and wrap in Event
  730. let response_stream = response_stream.filter_map(|result| async move {
  731. match result {
  732. Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
  733. Err(err) => {
  734. tracing::warn!("Error in broadcast stream: {}", err);
  735. None
  736. }
  737. }
  738. });
  739. // Create a combined stream that also handles cancellation
  740. let cancel_token = self.wait_invoice_cancel_token.clone();
  741. let is_active = self.wait_invoice_is_active.clone();
  742. let stream = Box::pin(response_stream);
  743. // Set up a task to clean up when the stream is dropped
  744. tokio::spawn(async move {
  745. cancel_token.cancelled().await;
  746. tracing::info!("wait_invoice stream cancelled");
  747. is_active.store(false, Ordering::SeqCst);
  748. });
  749. tracing::info!("wait_any_incoming_payment returning stream");
  750. Ok(stream)
  751. }
  752. /// Is wait invoice active
  753. fn is_wait_invoice_active(&self) -> bool {
  754. self.wait_invoice_is_active.load(Ordering::SeqCst)
  755. }
  756. /// Cancel wait invoice
  757. fn cancel_wait_invoice(&self) {
  758. self.wait_invoice_cancel_token.cancel()
  759. }
  760. /// Check the status of an incoming payment
  761. async fn check_incoming_payment_status(
  762. &self,
  763. payment_identifier: &PaymentIdentifier,
  764. ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
  765. let payment_id_str = match payment_identifier {
  766. PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
  767. PaymentIdentifier::CustomId(id) => id.clone(),
  768. _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
  769. };
  770. let payment_id = PaymentId(
  771. hex::decode(&payment_id_str)?
  772. .try_into()
  773. .map_err(|_| Error::InvalidPaymentIdLength)?,
  774. );
  775. let payment_details = self
  776. .inner
  777. .payment(&payment_id)
  778. .ok_or(Error::PaymentNotFound)?;
  779. if payment_details.direction == PaymentDirection::Outbound {
  780. return Err(Error::InvalidPaymentDirection.into());
  781. }
  782. let amount = if payment_details.status == PaymentStatus::Succeeded {
  783. payment_details
  784. .amount_msat
  785. .ok_or(Error::CouldNotGetPaymentAmount)?
  786. } else {
  787. return Ok(vec![]);
  788. };
  789. let response = WaitPaymentResponse {
  790. payment_identifier: payment_identifier.clone(),
  791. payment_amount: amount.into(),
  792. unit: CurrencyUnit::Msat,
  793. payment_id: payment_id_str,
  794. };
  795. Ok(vec![response])
  796. }
  797. /// Check the status of an outgoing payment
  798. async fn check_outgoing_payment(
  799. &self,
  800. request_lookup_id: &PaymentIdentifier,
  801. ) -> Result<MakePaymentResponse, Self::Err> {
  802. let payment_details = match request_lookup_id {
  803. PaymentIdentifier::PaymentHash(id_hash) => self
  804. .inner
  805. .list_payments_with_filter(
  806. |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
  807. )
  808. .first()
  809. .cloned(),
  810. PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
  811. hex::decode(id)?
  812. .try_into()
  813. .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
  814. )),
  815. _ => {
  816. return Ok(MakePaymentResponse {
  817. payment_lookup_id: request_lookup_id.clone(),
  818. status: MeltQuoteState::Unknown,
  819. payment_proof: None,
  820. total_spent: Amount::ZERO,
  821. unit: CurrencyUnit::Msat,
  822. });
  823. }
  824. }
  825. .ok_or(Error::PaymentNotFound)?;
  826. // This check seems reversed in the original code, so I'm fixing it here
  827. if payment_details.direction != PaymentDirection::Outbound {
  828. return Err(Error::InvalidPaymentDirection.into());
  829. }
  830. let status = match payment_details.status {
  831. PaymentStatus::Pending => MeltQuoteState::Pending,
  832. PaymentStatus::Succeeded => MeltQuoteState::Paid,
  833. PaymentStatus::Failed => MeltQuoteState::Failed,
  834. };
  835. let payment_proof = match payment_details.kind {
  836. PaymentKind::Bolt11 {
  837. hash: _,
  838. preimage,
  839. secret: _,
  840. } => preimage.map(|p| p.to_string()),
  841. _ => return Err(Error::UnexpectedPaymentKind.into()),
  842. };
  843. let total_spent = payment_details
  844. .amount_msat
  845. .ok_or(Error::CouldNotGetAmountSpent)?;
  846. Ok(MakePaymentResponse {
  847. payment_lookup_id: request_lookup_id.clone(),
  848. payment_proof,
  849. status,
  850. total_spent: total_spent.into(),
  851. unit: CurrencyUnit::Msat,
  852. })
  853. }
  854. }
  855. impl Drop for CdkLdkNode {
  856. fn drop(&mut self) {
  857. tracing::info!("Drop called on CdkLdkNode");
  858. self.wait_invoice_cancel_token.cancel();
  859. tracing::debug!("Cancelled wait_invoice token in drop");
  860. }
  861. }