lib.rs 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994
  1. //! CDK lightning backend for ldk-node
  2. #![doc = include_str!("../README.md")]
  3. #![warn(missing_docs)]
  4. #![warn(rustdoc::bare_urls)]
  5. use std::net::SocketAddr;
  6. use std::pin::Pin;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use std::sync::Arc;
  9. use async_trait::async_trait;
  10. use cdk_common::amount::to_unit;
  11. use cdk_common::common::FeeReserve;
  12. use cdk_common::payment::{self, *};
  13. use cdk_common::util::{hex, unix_time};
  14. use cdk_common::{Amount, CurrencyUnit, MeltOptions, MeltQuoteState};
  15. use futures::{Stream, StreamExt};
  16. use ldk_node::bitcoin::hashes::Hash;
  17. use ldk_node::bitcoin::Network;
  18. use ldk_node::lightning::ln::channelmanager::PaymentId;
  19. use ldk_node::lightning::ln::msgs::SocketAddress;
  20. use ldk_node::lightning_invoice::{Bolt11InvoiceDescription, Description};
  21. use ldk_node::lightning_types::payment::PaymentHash;
  22. use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus, SendingParameters};
  23. use ldk_node::{Builder, Event, Node};
  24. use tokio::runtime::Runtime;
  25. use tokio_stream::wrappers::BroadcastStream;
  26. use tokio_util::sync::CancellationToken;
  27. use tracing::instrument;
  28. use crate::error::Error;
  29. mod error;
  30. mod web;
  31. /// CDK Lightning backend using LDK Node
  32. ///
  33. /// Provides Lightning Network functionality for CDK with support for Cashu operations.
  34. /// Handles payment creation, processing, and event management using the Lightning Development Kit.
  35. #[derive(Clone)]
  36. pub struct CdkLdkNode {
  37. inner: Arc<Node>,
  38. fee_reserve: FeeReserve,
  39. wait_invoice_cancel_token: CancellationToken,
  40. wait_invoice_is_active: Arc<AtomicBool>,
  41. sender: tokio::sync::broadcast::Sender<WaitPaymentResponse>,
  42. receiver: Arc<tokio::sync::broadcast::Receiver<WaitPaymentResponse>>,
  43. events_cancel_token: CancellationToken,
  44. runtime: Option<Arc<Runtime>>,
  45. web_addr: Option<SocketAddr>,
  46. }
  47. /// Configuration for connecting to Bitcoin RPC
  48. ///
  49. /// Contains the necessary connection parameters for Bitcoin Core RPC interface.
  50. #[derive(Debug, Clone)]
  51. pub struct BitcoinRpcConfig {
  52. /// Bitcoin RPC server hostname or IP address
  53. pub host: String,
  54. /// Bitcoin RPC server port number
  55. pub port: u16,
  56. /// Username for Bitcoin RPC authentication
  57. pub user: String,
  58. /// Password for Bitcoin RPC authentication
  59. pub password: String,
  60. }
  61. /// Source of blockchain data for the Lightning node
  62. ///
  63. /// Specifies how the node should connect to the Bitcoin network to retrieve
  64. /// blockchain information and broadcast transactions.
  65. #[derive(Debug, Clone)]
  66. pub enum ChainSource {
  67. /// Use an Esplora server for blockchain data
  68. ///
  69. /// Contains the URL of the Esplora server endpoint
  70. Esplora(String),
  71. /// Use Bitcoin Core RPC for blockchain data
  72. ///
  73. /// Contains the configuration for connecting to Bitcoin Core
  74. BitcoinRpc(BitcoinRpcConfig),
  75. }
  76. /// Source of Lightning network gossip data
  77. ///
  78. /// Specifies how the node should learn about the Lightning Network topology
  79. /// and routing information.
  80. #[derive(Debug, Clone)]
  81. pub enum GossipSource {
  82. /// Learn gossip through peer-to-peer connections
  83. ///
  84. /// The node will connect to other Lightning nodes and exchange gossip data directly
  85. P2P,
  86. /// Use Rapid Gossip Sync for efficient gossip updates
  87. ///
  88. /// Contains the URL of the RGS server for compressed gossip data
  89. RapidGossipSync(String),
  90. }
  91. impl CdkLdkNode {
  92. /// Create a new CDK LDK Node instance
  93. ///
  94. /// # Arguments
  95. /// * `network` - Bitcoin network (mainnet, testnet, regtest, signet)
  96. /// * `chain_source` - Source of blockchain data (Esplora or Bitcoin RPC)
  97. /// * `gossip_source` - Source of Lightning network gossip data
  98. /// * `storage_dir_path` - Directory path for node data storage
  99. /// * `fee_reserve` - Fee reserve configuration for payments
  100. /// * `listening_address` - Socket addresses for peer connections
  101. /// * `runtime` - Optional Tokio runtime to use for starting the node
  102. ///
  103. /// # Returns
  104. /// A new `CdkLdkNode` instance ready to be started
  105. ///
  106. /// # Errors
  107. /// Returns an error if the LDK node builder fails to create the node
  108. pub fn new(
  109. network: Network,
  110. chain_source: ChainSource,
  111. gossip_source: GossipSource,
  112. storage_dir_path: String,
  113. fee_reserve: FeeReserve,
  114. listening_address: Vec<SocketAddress>,
  115. runtime: Option<Arc<Runtime>>,
  116. ) -> Result<Self, Error> {
  117. let mut builder = Builder::new();
  118. builder.set_network(network);
  119. tracing::info!("Storage dir of node is {}", storage_dir_path);
  120. builder.set_storage_dir_path(storage_dir_path);
  121. match chain_source {
  122. ChainSource::Esplora(esplora_url) => {
  123. builder.set_chain_source_esplora(esplora_url, None);
  124. }
  125. ChainSource::BitcoinRpc(BitcoinRpcConfig {
  126. host,
  127. port,
  128. user,
  129. password,
  130. }) => {
  131. builder.set_chain_source_bitcoind_rpc(host, port, user, password);
  132. }
  133. }
  134. match gossip_source {
  135. GossipSource::P2P => {
  136. builder.set_gossip_source_p2p();
  137. }
  138. GossipSource::RapidGossipSync(rgs_url) => {
  139. builder.set_gossip_source_rgs(rgs_url);
  140. }
  141. }
  142. builder.set_listening_addresses(listening_address)?;
  143. builder.set_node_alias("cdk-ldk-node".to_string())?;
  144. let node = builder.build()?;
  145. tracing::info!("Creating tokio channel for payment notifications");
  146. let (sender, receiver) = tokio::sync::broadcast::channel(8);
  147. let id = node.node_id();
  148. let adr = node.announcement_addresses();
  149. tracing::info!(
  150. "Created node {} with address {:?} on network {}",
  151. id,
  152. adr,
  153. network
  154. );
  155. Ok(Self {
  156. inner: node.into(),
  157. fee_reserve,
  158. wait_invoice_cancel_token: CancellationToken::new(),
  159. wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
  160. sender,
  161. receiver: Arc::new(receiver),
  162. events_cancel_token: CancellationToken::new(),
  163. runtime,
  164. web_addr: None,
  165. })
  166. }
  167. /// Set the web server address for the LDK node management interface
  168. ///
  169. /// # Arguments
  170. /// * `addr` - Socket address for the web server. If None, no web server will be started.
  171. pub fn set_web_addr(&mut self, addr: Option<SocketAddr>) {
  172. self.web_addr = addr;
  173. }
  174. /// Get a default web server address using an unused port
  175. ///
  176. /// Returns a SocketAddr with localhost and port 0, which will cause
  177. /// the system to automatically assign an available port
  178. pub fn default_web_addr() -> SocketAddr {
  179. SocketAddr::from(([127, 0, 0, 1], 8091))
  180. }
  181. /// Start the CDK LDK Node
  182. ///
  183. /// Starts the underlying LDK node and begins event processing.
  184. /// Sets up event handlers to listen for Lightning events like payment received.
  185. ///
  186. /// # Returns
  187. /// Returns `Ok(())` on successful start, error otherwise
  188. ///
  189. /// # Errors
  190. /// Returns an error if the LDK node fails to start or event handling setup fails
  191. pub fn start_ldk_node(&self) -> Result<(), Error> {
  192. match &self.runtime {
  193. Some(runtime) => {
  194. tracing::info!("Starting cdk-ldk node with existing runtime");
  195. self.inner.start_with_runtime(Arc::clone(runtime))?
  196. }
  197. None => {
  198. tracing::info!("Starting cdk-ldk-node with new runtime");
  199. self.inner.start()?
  200. }
  201. };
  202. let node_config = self.inner.config();
  203. tracing::info!("Starting node with network {}", node_config.network);
  204. tracing::info!("Node status: {:?}", self.inner.status());
  205. self.handle_events()?;
  206. Ok(())
  207. }
  208. /// Start the web server for the LDK node management interface
  209. ///
  210. /// Starts a web server that provides a user interface for managing the LDK node.
  211. /// The web interface allows users to view balances, manage channels, create invoices,
  212. /// and send payments.
  213. ///
  214. /// # Arguments
  215. /// * `web_addr` - The socket address to bind the web server to
  216. ///
  217. /// # Returns
  218. /// Returns `Ok(())` on successful start, error otherwise
  219. ///
  220. /// # Errors
  221. /// Returns an error if the web server fails to start
  222. pub fn start_web_server(&self, web_addr: SocketAddr) -> Result<(), Error> {
  223. let web_server = crate::web::WebServer::new(Arc::new(self.clone()));
  224. tokio::spawn(async move {
  225. if let Err(e) = web_server.serve(web_addr).await {
  226. tracing::error!("Web server error: {}", e);
  227. }
  228. });
  229. Ok(())
  230. }
  231. /// Stop the CDK LDK Node
  232. ///
  233. /// Gracefully stops the node by cancelling all active tasks and event handlers.
  234. /// This includes:
  235. /// - Cancelling the event handler task
  236. /// - Cancelling any active wait_invoice streams
  237. /// - Stopping the underlying LDK node
  238. ///
  239. /// # Returns
  240. /// Returns `Ok(())` on successful shutdown, error otherwise
  241. ///
  242. /// # Errors
  243. /// Returns an error if the underlying LDK node fails to stop
  244. pub fn stop_ldk_node(&self) -> Result<(), Error> {
  245. tracing::info!("Stopping CdkLdkNode");
  246. // Cancel all tokio tasks
  247. tracing::info!("Cancelling event handler");
  248. self.events_cancel_token.cancel();
  249. // Cancel any wait_invoice streams
  250. if self.is_wait_invoice_active() {
  251. tracing::info!("Cancelling wait_invoice stream");
  252. self.wait_invoice_cancel_token.cancel();
  253. }
  254. // Stop the LDK node
  255. tracing::info!("Stopping LDK node");
  256. self.inner.stop()?;
  257. tracing::info!("CdkLdkNode stopped successfully");
  258. Ok(())
  259. }
  260. /// Handle payment received event
  261. async fn handle_payment_received(
  262. node: &Arc<Node>,
  263. sender: &tokio::sync::broadcast::Sender<WaitPaymentResponse>,
  264. payment_id: Option<PaymentId>,
  265. payment_hash: PaymentHash,
  266. amount_msat: u64,
  267. ) {
  268. tracing::info!(
  269. "Received payment for hash={} of amount={} msat",
  270. payment_hash,
  271. amount_msat
  272. );
  273. let payment_id = match payment_id {
  274. Some(id) => id,
  275. None => {
  276. tracing::warn!("Received payment without payment_id");
  277. return;
  278. }
  279. };
  280. let payment_id_hex = hex::encode(payment_id.0);
  281. if amount_msat == 0 {
  282. tracing::warn!("Payment of no amount");
  283. return;
  284. }
  285. tracing::info!(
  286. "Processing payment notification: id={}, amount={} msats",
  287. payment_id_hex,
  288. amount_msat
  289. );
  290. let payment_details = match node.payment(&payment_id) {
  291. Some(details) => details,
  292. None => {
  293. tracing::error!("Could not find payment details for id={}", payment_id_hex);
  294. return;
  295. }
  296. };
  297. let (payment_identifier, payment_id) = match payment_details.kind {
  298. PaymentKind::Bolt11 { hash, .. } => {
  299. (PaymentIdentifier::PaymentHash(hash.0), hash.to_string())
  300. }
  301. PaymentKind::Bolt12Offer { hash, offer_id, .. } => match hash {
  302. Some(h) => (
  303. PaymentIdentifier::OfferId(offer_id.to_string()),
  304. h.to_string(),
  305. ),
  306. None => {
  307. tracing::error!("Bolt12 payment missing hash");
  308. return;
  309. }
  310. },
  311. k => {
  312. tracing::warn!("Received payment of kind {:?} which is not supported", k);
  313. return;
  314. }
  315. };
  316. let wait_payment_response = WaitPaymentResponse {
  317. payment_identifier,
  318. payment_amount: amount_msat.into(),
  319. unit: CurrencyUnit::Msat,
  320. payment_id,
  321. };
  322. match sender.send(wait_payment_response) {
  323. Ok(_) => tracing::info!("Successfully sent payment notification to stream"),
  324. Err(err) => tracing::error!(
  325. "Could not send payment received notification on channel: {}",
  326. err
  327. ),
  328. }
  329. }
  330. /// Set up event handling for the node
  331. pub fn handle_events(&self) -> Result<(), Error> {
  332. let node = self.inner.clone();
  333. let sender = self.sender.clone();
  334. let cancel_token = self.events_cancel_token.clone();
  335. tracing::info!("Starting event handler task");
  336. tokio::spawn(async move {
  337. tracing::info!("Event handler loop started");
  338. loop {
  339. tokio::select! {
  340. _ = cancel_token.cancelled() => {
  341. tracing::info!("Event handler cancelled");
  342. break;
  343. }
  344. event = node.next_event_async() => {
  345. match event {
  346. Event::PaymentReceived {
  347. payment_id,
  348. payment_hash,
  349. amount_msat,
  350. custom_records: _
  351. } => {
  352. Self::handle_payment_received(
  353. &node,
  354. &sender,
  355. payment_id,
  356. payment_hash,
  357. amount_msat
  358. ).await;
  359. }
  360. event => {
  361. tracing::debug!("Received other ldk node event: {:?}", event);
  362. }
  363. }
  364. if let Err(err) = node.event_handled() {
  365. tracing::error!("Error handling node event: {}", err);
  366. } else {
  367. tracing::debug!("Successfully handled node event");
  368. }
  369. }
  370. }
  371. }
  372. tracing::info!("Event handler loop terminated");
  373. });
  374. tracing::info!("Event handler task spawned");
  375. Ok(())
  376. }
  377. /// Get Node used
  378. pub fn node(&self) -> Arc<Node> {
  379. Arc::clone(&self.inner)
  380. }
  381. }
  382. /// Mint payment trait
  383. #[async_trait]
  384. impl MintPayment for CdkLdkNode {
  385. type Err = payment::Error;
  386. /// Start the payment processor
  387. /// Starts the LDK node and begins event processing
  388. async fn start(&self) -> Result<(), Self::Err> {
  389. self.start_ldk_node().map_err(|e| {
  390. tracing::error!("Failed to start CdkLdkNode: {}", e);
  391. e
  392. })?;
  393. tracing::info!("CdkLdkNode payment processor started successfully");
  394. // Start web server if configured
  395. if let Some(web_addr) = self.web_addr {
  396. tracing::info!("Starting LDK Node web interface on {}", web_addr);
  397. self.start_web_server(web_addr).map_err(|e| {
  398. tracing::error!("Failed to start web server: {}", e);
  399. e
  400. })?;
  401. } else {
  402. tracing::info!("No web server address configured, skipping web interface");
  403. }
  404. Ok(())
  405. }
  406. /// Stop the payment processor
  407. /// Gracefully stops the LDK node and cancels all background tasks
  408. async fn stop(&self) -> Result<(), Self::Err> {
  409. self.stop_ldk_node().map_err(|e| {
  410. tracing::error!("Failed to stop CdkLdkNode: {}", e);
  411. e.into()
  412. })
  413. }
  414. /// Base Settings
  415. async fn get_settings(&self) -> Result<serde_json::Value, Self::Err> {
  416. let settings = Bolt11Settings {
  417. mpp: false,
  418. unit: CurrencyUnit::Msat,
  419. invoice_description: true,
  420. amountless: true,
  421. bolt12: true,
  422. };
  423. Ok(serde_json::to_value(settings)?)
  424. }
  425. /// Create a new invoice
  426. #[instrument(skip(self))]
  427. async fn create_incoming_payment_request(
  428. &self,
  429. unit: &CurrencyUnit,
  430. options: IncomingPaymentOptions,
  431. ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
  432. match options {
  433. IncomingPaymentOptions::Bolt11(bolt11_options) => {
  434. let amount_msat = to_unit(bolt11_options.amount, unit, &CurrencyUnit::Msat)?;
  435. let description = bolt11_options.description.unwrap_or_default();
  436. let time = bolt11_options
  437. .unix_expiry
  438. .map(|t| t - unix_time())
  439. .unwrap_or(36000);
  440. let description = Bolt11InvoiceDescription::Direct(
  441. Description::new(description).map_err(|_| Error::InvalidDescription)?,
  442. );
  443. let payment = self
  444. .inner
  445. .bolt11_payment()
  446. .receive(amount_msat.into(), &description, time as u32)
  447. .map_err(Error::LdkNode)?;
  448. let payment_hash = payment.payment_hash().to_string();
  449. let payment_identifier = PaymentIdentifier::PaymentHash(
  450. hex::decode(&payment_hash)?
  451. .try_into()
  452. .map_err(|_| Error::InvalidPaymentHashLength)?,
  453. );
  454. Ok(CreateIncomingPaymentResponse {
  455. request_lookup_id: payment_identifier,
  456. request: payment.to_string(),
  457. expiry: Some(unix_time() + time),
  458. })
  459. }
  460. IncomingPaymentOptions::Bolt12(bolt12_options) => {
  461. let Bolt12IncomingPaymentOptions {
  462. description,
  463. amount,
  464. unix_expiry,
  465. } = *bolt12_options;
  466. let time = unix_expiry.map(|t| (t - unix_time()) as u32);
  467. let offer = match amount {
  468. Some(amount) => {
  469. let amount_msat = to_unit(amount, unit, &CurrencyUnit::Msat)?;
  470. self.inner
  471. .bolt12_payment()
  472. .receive(
  473. amount_msat.into(),
  474. &description.unwrap_or("".to_string()),
  475. time,
  476. None,
  477. )
  478. .map_err(Error::LdkNode)?
  479. }
  480. None => self
  481. .inner
  482. .bolt12_payment()
  483. .receive_variable_amount(&description.unwrap_or("".to_string()), time)
  484. .map_err(Error::LdkNode)?,
  485. };
  486. let payment_identifier = PaymentIdentifier::OfferId(offer.id().to_string());
  487. Ok(CreateIncomingPaymentResponse {
  488. request_lookup_id: payment_identifier,
  489. request: offer.to_string(),
  490. expiry: time.map(|a| a as u64),
  491. })
  492. }
  493. }
  494. }
  495. /// Get payment quote
  496. /// Used to get fee and amount required for a payment request
  497. #[instrument(skip_all)]
  498. async fn get_payment_quote(
  499. &self,
  500. unit: &CurrencyUnit,
  501. options: OutgoingPaymentOptions,
  502. ) -> Result<PaymentQuoteResponse, Self::Err> {
  503. match options {
  504. OutgoingPaymentOptions::Bolt11(bolt11_options) => {
  505. let bolt11 = bolt11_options.bolt11;
  506. let amount_msat = match bolt11_options.melt_options {
  507. Some(melt_options) => melt_options.amount_msat(),
  508. None => bolt11
  509. .amount_milli_satoshis()
  510. .ok_or(Error::UnknownInvoiceAmount)?
  511. .into(),
  512. };
  513. let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
  514. let relative_fee_reserve =
  515. (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
  516. let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
  517. let fee = match relative_fee_reserve > absolute_fee_reserve {
  518. true => relative_fee_reserve,
  519. false => absolute_fee_reserve,
  520. };
  521. let payment_hash = bolt11.payment_hash().to_string();
  522. let payment_hash_bytes = hex::decode(&payment_hash)?
  523. .try_into()
  524. .map_err(|_| Error::InvalidPaymentHashLength)?;
  525. Ok(PaymentQuoteResponse {
  526. request_lookup_id: Some(PaymentIdentifier::PaymentHash(payment_hash_bytes)),
  527. amount,
  528. fee: fee.into(),
  529. state: MeltQuoteState::Unpaid,
  530. unit: unit.clone(),
  531. })
  532. }
  533. OutgoingPaymentOptions::Bolt12(bolt12_options) => {
  534. let offer = bolt12_options.offer;
  535. let amount_msat = match bolt12_options.melt_options {
  536. Some(melt_options) => melt_options.amount_msat(),
  537. None => {
  538. let amount = offer.amount().ok_or(payment::Error::AmountMismatch)?;
  539. match amount {
  540. ldk_node::lightning::offers::offer::Amount::Bitcoin {
  541. amount_msats,
  542. } => amount_msats.into(),
  543. _ => return Err(payment::Error::AmountMismatch),
  544. }
  545. }
  546. };
  547. let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
  548. let relative_fee_reserve =
  549. (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
  550. let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
  551. let fee = match relative_fee_reserve > absolute_fee_reserve {
  552. true => relative_fee_reserve,
  553. false => absolute_fee_reserve,
  554. };
  555. Ok(PaymentQuoteResponse {
  556. request_lookup_id: None,
  557. amount,
  558. fee: fee.into(),
  559. state: MeltQuoteState::Unpaid,
  560. unit: unit.clone(),
  561. })
  562. }
  563. }
  564. }
  565. /// Pay request
  566. #[instrument(skip(self, options))]
  567. async fn make_payment(
  568. &self,
  569. unit: &CurrencyUnit,
  570. options: OutgoingPaymentOptions,
  571. ) -> Result<MakePaymentResponse, Self::Err> {
  572. match options {
  573. OutgoingPaymentOptions::Bolt11(bolt11_options) => {
  574. let bolt11 = bolt11_options.bolt11;
  575. let send_params = match bolt11_options
  576. .max_fee_amount
  577. .map(|f| {
  578. to_unit(f, unit, &CurrencyUnit::Msat).map(|amount_msat| SendingParameters {
  579. max_total_routing_fee_msat: Some(Some(amount_msat.into())),
  580. max_channel_saturation_power_of_half: None,
  581. max_total_cltv_expiry_delta: None,
  582. max_path_count: None,
  583. })
  584. })
  585. .transpose()
  586. {
  587. Ok(params) => params,
  588. Err(err) => {
  589. tracing::error!("Failed to convert fee amount: {}", err);
  590. return Err(payment::Error::Custom(format!("Invalid fee amount: {err}")));
  591. }
  592. };
  593. let payment_id = match bolt11_options.melt_options {
  594. Some(MeltOptions::Amountless { amountless }) => self
  595. .inner
  596. .bolt11_payment()
  597. .send_using_amount(&bolt11, amountless.amount_msat.into(), send_params)
  598. .map_err(|err| {
  599. tracing::error!("Could not send send amountless bolt11: {}", err);
  600. Error::CouldNotSendBolt11WithoutAmount
  601. })?,
  602. None => self
  603. .inner
  604. .bolt11_payment()
  605. .send(&bolt11, send_params)
  606. .map_err(|err| {
  607. tracing::error!("Could not send bolt11 {}", err);
  608. Error::CouldNotSendBolt11
  609. })?,
  610. _ => return Err(payment::Error::UnsupportedPaymentOption),
  611. };
  612. // Check payment status for up to 10 seconds
  613. let start = std::time::Instant::now();
  614. let timeout = std::time::Duration::from_secs(10);
  615. let (status, payment_details) = loop {
  616. let details = self
  617. .inner
  618. .payment(&payment_id)
  619. .ok_or(Error::PaymentNotFound)?;
  620. match details.status {
  621. PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
  622. PaymentStatus::Failed => {
  623. tracing::error!("Failed to pay bolt11 payment.");
  624. break (MeltQuoteState::Failed, details);
  625. }
  626. PaymentStatus::Pending => {
  627. if start.elapsed() > timeout {
  628. tracing::warn!(
  629. "Paying bolt11 exceeded timeout 10 seconds no longer waitning."
  630. );
  631. break (MeltQuoteState::Pending, details);
  632. }
  633. tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  634. continue;
  635. }
  636. }
  637. };
  638. let payment_proof = match payment_details.kind {
  639. PaymentKind::Bolt11 {
  640. hash: _,
  641. preimage,
  642. secret: _,
  643. } => preimage.map(|p| p.to_string()),
  644. _ => return Err(Error::UnexpectedPaymentKind.into()),
  645. };
  646. let total_spent = payment_details
  647. .amount_msat
  648. .ok_or(Error::CouldNotGetAmountSpent)?;
  649. let total_spent = to_unit(total_spent, &CurrencyUnit::Msat, unit)?;
  650. Ok(MakePaymentResponse {
  651. payment_lookup_id: PaymentIdentifier::PaymentHash(
  652. bolt11.payment_hash().to_byte_array(),
  653. ),
  654. payment_proof,
  655. status,
  656. total_spent,
  657. unit: unit.clone(),
  658. })
  659. }
  660. OutgoingPaymentOptions::Bolt12(bolt12_options) => {
  661. let offer = bolt12_options.offer;
  662. let payment_id = match bolt12_options.melt_options {
  663. Some(MeltOptions::Amountless { amountless }) => self
  664. .inner
  665. .bolt12_payment()
  666. .send_using_amount(&offer, amountless.amount_msat.into(), None, None)
  667. .map_err(Error::LdkNode)?,
  668. None => self
  669. .inner
  670. .bolt12_payment()
  671. .send(&offer, None, None)
  672. .map_err(Error::LdkNode)?,
  673. _ => return Err(payment::Error::UnsupportedPaymentOption),
  674. };
  675. // Check payment status for up to 10 seconds
  676. let start = std::time::Instant::now();
  677. let timeout = std::time::Duration::from_secs(10);
  678. let (status, payment_details) = loop {
  679. let details = self
  680. .inner
  681. .payment(&payment_id)
  682. .ok_or(Error::PaymentNotFound)?;
  683. match details.status {
  684. PaymentStatus::Succeeded => break (MeltQuoteState::Paid, details),
  685. PaymentStatus::Failed => {
  686. tracing::error!("Payment with id {} failed.", payment_id);
  687. break (MeltQuoteState::Failed, details);
  688. }
  689. PaymentStatus::Pending => {
  690. if start.elapsed() > timeout {
  691. tracing::warn!(
  692. "Payment has been being for 10 seconds. No longer waiting"
  693. );
  694. break (MeltQuoteState::Pending, details);
  695. }
  696. tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  697. continue;
  698. }
  699. }
  700. };
  701. let payment_proof = match payment_details.kind {
  702. PaymentKind::Bolt12Offer {
  703. hash: _,
  704. preimage,
  705. secret: _,
  706. offer_id: _,
  707. payer_note: _,
  708. quantity: _,
  709. } => preimage.map(|p| p.to_string()),
  710. _ => return Err(Error::UnexpectedPaymentKind.into()),
  711. };
  712. let total_spent = payment_details
  713. .amount_msat
  714. .ok_or(Error::CouldNotGetAmountSpent)?;
  715. let total_spent = to_unit(total_spent, &CurrencyUnit::Msat, unit)?;
  716. Ok(MakePaymentResponse {
  717. payment_lookup_id: PaymentIdentifier::PaymentId(payment_id.0),
  718. payment_proof,
  719. status,
  720. total_spent,
  721. unit: unit.clone(),
  722. })
  723. }
  724. }
  725. }
  726. /// Listen for invoices to be paid to the mint
  727. /// Returns a stream of request_lookup_id once invoices are paid
  728. #[instrument(skip(self))]
  729. async fn wait_payment_event(
  730. &self,
  731. ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
  732. tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
  733. // Set active flag to indicate stream is active
  734. self.wait_invoice_is_active.store(true, Ordering::SeqCst);
  735. tracing::debug!("wait_invoice_is_active set to true");
  736. let receiver = self.receiver.clone();
  737. tracing::info!("Receiver obtained successfully, creating response stream");
  738. // Transform the String stream into a WaitPaymentResponse stream
  739. let response_stream = BroadcastStream::new(receiver.resubscribe());
  740. // Map the stream to handle BroadcastStreamRecvError and wrap in Event
  741. let response_stream = response_stream.filter_map(|result| async move {
  742. match result {
  743. Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
  744. Err(err) => {
  745. tracing::warn!("Error in broadcast stream: {}", err);
  746. None
  747. }
  748. }
  749. });
  750. // Create a combined stream that also handles cancellation
  751. let cancel_token = self.wait_invoice_cancel_token.clone();
  752. let is_active = self.wait_invoice_is_active.clone();
  753. let stream = Box::pin(response_stream);
  754. // Set up a task to clean up when the stream is dropped
  755. tokio::spawn(async move {
  756. cancel_token.cancelled().await;
  757. tracing::info!("wait_invoice stream cancelled");
  758. is_active.store(false, Ordering::SeqCst);
  759. });
  760. tracing::info!("wait_any_incoming_payment returning stream");
  761. Ok(stream)
  762. }
  763. /// Is wait invoice active
  764. fn is_wait_invoice_active(&self) -> bool {
  765. self.wait_invoice_is_active.load(Ordering::SeqCst)
  766. }
  767. /// Cancel wait invoice
  768. fn cancel_wait_invoice(&self) {
  769. self.wait_invoice_cancel_token.cancel()
  770. }
  771. /// Check the status of an incoming payment
  772. async fn check_incoming_payment_status(
  773. &self,
  774. payment_identifier: &PaymentIdentifier,
  775. ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
  776. let payment_id_str = match payment_identifier {
  777. PaymentIdentifier::PaymentHash(hash) => hex::encode(hash),
  778. PaymentIdentifier::CustomId(id) => id.clone(),
  779. _ => return Err(Error::UnsupportedPaymentIdentifierType.into()),
  780. };
  781. let payment_id = PaymentId(
  782. hex::decode(&payment_id_str)?
  783. .try_into()
  784. .map_err(|_| Error::InvalidPaymentIdLength)?,
  785. );
  786. let payment_details = self
  787. .inner
  788. .payment(&payment_id)
  789. .ok_or(Error::PaymentNotFound)?;
  790. if payment_details.direction == PaymentDirection::Outbound {
  791. return Err(Error::InvalidPaymentDirection.into());
  792. }
  793. let amount = if payment_details.status == PaymentStatus::Succeeded {
  794. payment_details
  795. .amount_msat
  796. .ok_or(Error::CouldNotGetPaymentAmount)?
  797. } else {
  798. return Ok(vec![]);
  799. };
  800. let response = WaitPaymentResponse {
  801. payment_identifier: payment_identifier.clone(),
  802. payment_amount: amount.into(),
  803. unit: CurrencyUnit::Msat,
  804. payment_id: payment_id_str,
  805. };
  806. Ok(vec![response])
  807. }
  808. /// Check the status of an outgoing payment
  809. async fn check_outgoing_payment(
  810. &self,
  811. request_lookup_id: &PaymentIdentifier,
  812. ) -> Result<MakePaymentResponse, Self::Err> {
  813. let payment_details = match request_lookup_id {
  814. PaymentIdentifier::PaymentHash(id_hash) => self
  815. .inner
  816. .list_payments_with_filter(
  817. |p| matches!(&p.kind, PaymentKind::Bolt11 { hash, .. } if &hash.0 == id_hash),
  818. )
  819. .first()
  820. .cloned(),
  821. PaymentIdentifier::PaymentId(id) => self.inner.payment(&PaymentId(
  822. hex::decode(id)?
  823. .try_into()
  824. .map_err(|_| payment::Error::Custom("Invalid hex".to_string()))?,
  825. )),
  826. _ => {
  827. return Ok(MakePaymentResponse {
  828. payment_lookup_id: request_lookup_id.clone(),
  829. status: MeltQuoteState::Unknown,
  830. payment_proof: None,
  831. total_spent: Amount::ZERO,
  832. unit: CurrencyUnit::Msat,
  833. });
  834. }
  835. }
  836. .ok_or(Error::PaymentNotFound)?;
  837. // This check seems reversed in the original code, so I'm fixing it here
  838. if payment_details.direction != PaymentDirection::Outbound {
  839. return Err(Error::InvalidPaymentDirection.into());
  840. }
  841. let status = match payment_details.status {
  842. PaymentStatus::Pending => MeltQuoteState::Pending,
  843. PaymentStatus::Succeeded => MeltQuoteState::Paid,
  844. PaymentStatus::Failed => MeltQuoteState::Failed,
  845. };
  846. let payment_proof = match payment_details.kind {
  847. PaymentKind::Bolt11 {
  848. hash: _,
  849. preimage,
  850. secret: _,
  851. } => preimage.map(|p| p.to_string()),
  852. _ => return Err(Error::UnexpectedPaymentKind.into()),
  853. };
  854. let total_spent = payment_details
  855. .amount_msat
  856. .ok_or(Error::CouldNotGetAmountSpent)?;
  857. Ok(MakePaymentResponse {
  858. payment_lookup_id: request_lookup_id.clone(),
  859. payment_proof,
  860. status,
  861. total_spent: total_spent.into(),
  862. unit: CurrencyUnit::Msat,
  863. })
  864. }
  865. }
  866. impl Drop for CdkLdkNode {
  867. fn drop(&mut self) {
  868. tracing::info!("Drop called on CdkLdkNode");
  869. self.wait_invoice_cancel_token.cancel();
  870. tracing::debug!("Cancelled wait_invoice token in drop");
  871. }
  872. }