lib.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. //! CDK lightning backend for CLN
  2. #![warn(missing_docs)]
  3. #![warn(rustdoc::bare_urls)]
  4. use std::path::PathBuf;
  5. use std::pin::Pin;
  6. use std::str::FromStr;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use std::sync::Arc;
  9. use std::time::Duration;
  10. use async_trait::async_trait;
  11. use cdk::amount::{to_unit, Amount, MSAT_IN_SAT};
  12. use cdk::cdk_lightning::{
  13. self, CreateInvoiceResponse, MintLightning, PayInvoiceResponse, PaymentQuoteResponse, Settings,
  14. };
  15. use cdk::mint::FeeReserve;
  16. use cdk::nuts::{CurrencyUnit, MeltQuoteBolt11Request, MeltQuoteState, MintQuoteState};
  17. use cdk::util::{hex, unix_time};
  18. use cdk::{mint, Bolt11Invoice};
  19. use cln_rpc::model::requests::{
  20. InvoiceRequest, ListinvoicesRequest, ListpaysRequest, PayRequest, WaitanyinvoiceRequest,
  21. };
  22. use cln_rpc::model::responses::{
  23. ListinvoicesInvoices, ListinvoicesInvoicesStatus, ListpaysPaysStatus, PayStatus,
  24. WaitanyinvoiceResponse, WaitanyinvoiceStatus,
  25. };
  26. use cln_rpc::model::Request;
  27. use cln_rpc::primitives::{Amount as CLN_Amount, AmountOrAny};
  28. use error::Error;
  29. use futures::{Stream, StreamExt};
  30. use tokio::sync::Mutex;
  31. use tokio_util::sync::CancellationToken;
  32. use uuid::Uuid;
  33. pub mod error;
  34. /// CLN mint backend
  35. #[derive(Clone)]
  36. pub struct Cln {
  37. rpc_socket: PathBuf,
  38. cln_client: Arc<Mutex<cln_rpc::ClnRpc>>,
  39. fee_reserve: FeeReserve,
  40. wait_invoice_cancel_token: CancellationToken,
  41. wait_invoice_is_active: Arc<AtomicBool>,
  42. }
  43. impl Cln {
  44. /// Create new [`Cln`]
  45. pub async fn new(rpc_socket: PathBuf, fee_reserve: FeeReserve) -> Result<Self, Error> {
  46. let cln_client = cln_rpc::ClnRpc::new(&rpc_socket).await?;
  47. Ok(Self {
  48. rpc_socket,
  49. cln_client: Arc::new(Mutex::new(cln_client)),
  50. fee_reserve,
  51. wait_invoice_cancel_token: CancellationToken::new(),
  52. wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
  53. })
  54. }
  55. }
  56. #[async_trait]
  57. impl MintLightning for Cln {
  58. type Err = cdk_lightning::Error;
  59. fn get_settings(&self) -> Settings {
  60. Settings {
  61. mpp: true,
  62. unit: CurrencyUnit::Msat,
  63. invoice_description: true,
  64. }
  65. }
  66. /// Is wait invoice active
  67. fn is_wait_invoice_active(&self) -> bool {
  68. self.wait_invoice_is_active.load(Ordering::SeqCst)
  69. }
  70. /// Cancel wait invoice
  71. fn cancel_wait_invoice(&self) {
  72. self.wait_invoice_cancel_token.cancel()
  73. }
  74. #[allow(clippy::incompatible_msrv)]
  75. // Clippy thinks select is not stable but it compiles fine on MSRV (1.63.0)
  76. async fn wait_any_invoice(
  77. &self,
  78. ) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>, Self::Err> {
  79. let last_pay_index = self.get_last_pay_index().await?;
  80. let cln_client = cln_rpc::ClnRpc::new(&self.rpc_socket).await?;
  81. let stream = futures::stream::unfold(
  82. (
  83. cln_client,
  84. last_pay_index,
  85. self.wait_invoice_cancel_token.clone(),
  86. Arc::clone(&self.wait_invoice_is_active),
  87. ),
  88. |(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move {
  89. // Set the stream as active
  90. is_active.store(true, Ordering::SeqCst);
  91. loop {
  92. tokio::select! {
  93. _ = cancel_token.cancelled() => {
  94. // Set the stream as inactive
  95. is_active.store(false, Ordering::SeqCst);
  96. // End the stream
  97. return None;
  98. }
  99. result = cln_client.call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest {
  100. timeout: None,
  101. lastpay_index: last_pay_idx,
  102. })) => {
  103. match result {
  104. Ok(invoice) => {
  105. // Try to convert the invoice to WaitanyinvoiceResponse
  106. let wait_any_response_result: Result<WaitanyinvoiceResponse, _> =
  107. invoice.try_into();
  108. let wait_any_response = match wait_any_response_result {
  109. Ok(response) => response,
  110. Err(e) => {
  111. tracing::warn!(
  112. "Failed to parse WaitAnyInvoice response: {:?}",
  113. e
  114. );
  115. // Continue to the next iteration without panicking
  116. continue;
  117. }
  118. };
  119. // Check the status of the invoice
  120. // We only want to yield invoices that have been paid
  121. match wait_any_response.status {
  122. WaitanyinvoiceStatus::PAID => (),
  123. WaitanyinvoiceStatus::EXPIRED => continue,
  124. }
  125. last_pay_idx = wait_any_response.pay_index;
  126. let payment_hash = wait_any_response.payment_hash.to_string();
  127. let request_look_up = match wait_any_response.bolt12 {
  128. // If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up.
  129. // Since this is not returned in the wait any response,
  130. // we need to do a second query for it.
  131. Some(_) => {
  132. match fetch_invoice_by_payment_hash(
  133. &mut cln_client,
  134. &payment_hash,
  135. )
  136. .await
  137. {
  138. Ok(Some(invoice)) => {
  139. if let Some(local_offer_id) = invoice.local_offer_id {
  140. local_offer_id.to_string()
  141. } else {
  142. continue;
  143. }
  144. }
  145. Ok(None) => continue,
  146. Err(e) => {
  147. tracing::warn!(
  148. "Error fetching invoice by payment hash: {e}"
  149. );
  150. continue;
  151. }
  152. }
  153. }
  154. None => payment_hash,
  155. };
  156. return Some((request_look_up, (cln_client, last_pay_idx, cancel_token, is_active)));
  157. }
  158. Err(e) => {
  159. tracing::warn!("Error fetching invoice: {e}");
  160. tokio::time::sleep(Duration::from_secs(1)).await;
  161. continue;
  162. }
  163. }
  164. }
  165. }
  166. }
  167. },
  168. )
  169. .boxed();
  170. Ok(stream)
  171. }
  172. async fn get_payment_quote(
  173. &self,
  174. melt_quote_request: &MeltQuoteBolt11Request,
  175. ) -> Result<PaymentQuoteResponse, Self::Err> {
  176. let amount = melt_quote_request.amount_msat()?;
  177. let amount = amount / MSAT_IN_SAT.into();
  178. let relative_fee_reserve =
  179. (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
  180. let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
  181. let fee = match relative_fee_reserve > absolute_fee_reserve {
  182. true => relative_fee_reserve,
  183. false => absolute_fee_reserve,
  184. };
  185. Ok(PaymentQuoteResponse {
  186. request_lookup_id: melt_quote_request.request.payment_hash().to_string(),
  187. amount,
  188. fee: fee.into(),
  189. state: MeltQuoteState::Unpaid,
  190. })
  191. }
  192. async fn pay_invoice(
  193. &self,
  194. melt_quote: mint::MeltQuote,
  195. partial_amount: Option<Amount>,
  196. max_fee: Option<Amount>,
  197. ) -> Result<PayInvoiceResponse, Self::Err> {
  198. let bolt11 = Bolt11Invoice::from_str(&melt_quote.request)?;
  199. let pay_state = self
  200. .check_outgoing_payment(&bolt11.payment_hash().to_string())
  201. .await?;
  202. match pay_state.status {
  203. MeltQuoteState::Unpaid | MeltQuoteState::Unknown | MeltQuoteState::Failed => (),
  204. MeltQuoteState::Paid => {
  205. tracing::debug!("Melt attempted on invoice already paid");
  206. return Err(Self::Err::InvoiceAlreadyPaid);
  207. }
  208. MeltQuoteState::Pending => {
  209. tracing::debug!("Melt attempted on invoice already pending");
  210. return Err(Self::Err::InvoicePaymentPending);
  211. }
  212. }
  213. let amount_msat = melt_quote
  214. .msat_to_pay
  215. .map(|a| CLN_Amount::from_msat(a.into()));
  216. let mut cln_client = self.cln_client.lock().await;
  217. let cln_response = cln_client
  218. .call(Request::Pay(PayRequest {
  219. bolt11: melt_quote.request.to_string(),
  220. amount_msat,
  221. label: None,
  222. riskfactor: None,
  223. maxfeepercent: None,
  224. retry_for: None,
  225. maxdelay: None,
  226. exemptfee: None,
  227. localinvreqid: None,
  228. exclude: None,
  229. maxfee: max_fee
  230. .map(|a| {
  231. let msat = to_unit(a, &melt_quote.unit, &CurrencyUnit::Msat)?;
  232. Ok::<CLN_Amount, Self::Err>(CLN_Amount::from_msat(msat.into()))
  233. })
  234. .transpose()?,
  235. description: None,
  236. partial_msat: partial_amount
  237. .map(|a| {
  238. let msat = to_unit(a, &melt_quote.unit, &CurrencyUnit::Msat)?;
  239. Ok::<cln_rpc::primitives::Amount, Self::Err>(CLN_Amount::from_msat(
  240. msat.into(),
  241. ))
  242. })
  243. .transpose()?,
  244. }))
  245. .await;
  246. let response = match cln_response {
  247. Ok(cln_rpc::Response::Pay(pay_response)) => {
  248. let status = match pay_response.status {
  249. PayStatus::COMPLETE => MeltQuoteState::Paid,
  250. PayStatus::PENDING => MeltQuoteState::Pending,
  251. PayStatus::FAILED => MeltQuoteState::Failed,
  252. };
  253. PayInvoiceResponse {
  254. payment_preimage: Some(hex::encode(pay_response.payment_preimage.to_vec())),
  255. payment_lookup_id: pay_response.payment_hash.to_string(),
  256. status,
  257. total_spent: to_unit(
  258. pay_response.amount_sent_msat.msat(),
  259. &CurrencyUnit::Msat,
  260. &melt_quote.unit,
  261. )?,
  262. unit: melt_quote.unit,
  263. }
  264. }
  265. Err(err) => {
  266. tracing::error!("Could not pay invoice: {}", err);
  267. return Err(Error::ClnRpc(err).into());
  268. }
  269. _ => {
  270. tracing::error!(
  271. "Error attempting to pay invoice: {}",
  272. bolt11.payment_hash().to_string()
  273. );
  274. return Err(Error::WrongClnResponse.into());
  275. }
  276. };
  277. Ok(response)
  278. }
  279. async fn create_invoice(
  280. &self,
  281. amount: Amount,
  282. unit: &CurrencyUnit,
  283. description: String,
  284. unix_expiry: u64,
  285. ) -> Result<CreateInvoiceResponse, Self::Err> {
  286. let time_now = unix_time();
  287. assert!(unix_expiry > time_now);
  288. let mut cln_client = self.cln_client.lock().await;
  289. let label = Uuid::new_v4().to_string();
  290. let amount = to_unit(amount, unit, &CurrencyUnit::Msat)?;
  291. let amount_msat = AmountOrAny::Amount(CLN_Amount::from_msat(amount.into()));
  292. let cln_response = cln_client
  293. .call(cln_rpc::Request::Invoice(InvoiceRequest {
  294. amount_msat,
  295. description,
  296. label: label.clone(),
  297. expiry: Some(unix_expiry - time_now),
  298. fallbacks: None,
  299. preimage: None,
  300. cltv: None,
  301. deschashonly: None,
  302. exposeprivatechannels: None,
  303. }))
  304. .await
  305. .map_err(Error::from)?;
  306. match cln_response {
  307. cln_rpc::Response::Invoice(invoice_res) => {
  308. let request = Bolt11Invoice::from_str(&invoice_res.bolt11)?;
  309. let expiry = request.expires_at().map(|t| t.as_secs());
  310. let payment_hash = request.payment_hash();
  311. Ok(CreateInvoiceResponse {
  312. request_lookup_id: payment_hash.to_string(),
  313. request,
  314. expiry,
  315. })
  316. }
  317. _ => {
  318. tracing::warn!("CLN returned wrong response kind");
  319. Err(Error::WrongClnResponse.into())
  320. }
  321. }
  322. }
  323. async fn check_incoming_invoice_status(
  324. &self,
  325. payment_hash: &str,
  326. ) -> Result<MintQuoteState, Self::Err> {
  327. let mut cln_client = self.cln_client.lock().await;
  328. let cln_response = cln_client
  329. .call(Request::ListInvoices(ListinvoicesRequest {
  330. payment_hash: Some(payment_hash.to_string()),
  331. label: None,
  332. invstring: None,
  333. offer_id: None,
  334. index: None,
  335. limit: None,
  336. start: None,
  337. }))
  338. .await
  339. .map_err(Error::from)?;
  340. let status = match cln_response {
  341. cln_rpc::Response::ListInvoices(invoice_response) => {
  342. match invoice_response.invoices.first() {
  343. Some(invoice_response) => {
  344. cln_invoice_status_to_mint_state(invoice_response.status)
  345. }
  346. None => {
  347. tracing::info!(
  348. "Check invoice called on unknown look up id: {}",
  349. payment_hash
  350. );
  351. return Err(Error::WrongClnResponse.into());
  352. }
  353. }
  354. }
  355. _ => {
  356. tracing::warn!("CLN returned wrong response kind");
  357. return Err(Error::WrongClnResponse.into());
  358. }
  359. };
  360. Ok(status)
  361. }
  362. async fn check_outgoing_payment(
  363. &self,
  364. payment_hash: &str,
  365. ) -> Result<PayInvoiceResponse, Self::Err> {
  366. let mut cln_client = self.cln_client.lock().await;
  367. let cln_response = cln_client
  368. .call(Request::ListPays(ListpaysRequest {
  369. payment_hash: Some(payment_hash.parse().map_err(|_| Error::InvalidHash)?),
  370. bolt11: None,
  371. status: None,
  372. start: None,
  373. index: None,
  374. limit: None,
  375. }))
  376. .await
  377. .map_err(Error::from)?;
  378. match cln_response {
  379. cln_rpc::Response::ListPays(pays_response) => match pays_response.pays.first() {
  380. Some(pays_response) => {
  381. let status = cln_pays_status_to_mint_state(pays_response.status);
  382. Ok(PayInvoiceResponse {
  383. payment_lookup_id: pays_response.payment_hash.to_string(),
  384. payment_preimage: pays_response.preimage.map(|p| hex::encode(p.to_vec())),
  385. status,
  386. total_spent: pays_response
  387. .amount_sent_msat
  388. .map_or(Amount::ZERO, |a| a.msat().into()),
  389. unit: CurrencyUnit::Msat,
  390. })
  391. }
  392. None => Ok(PayInvoiceResponse {
  393. payment_lookup_id: payment_hash.to_string(),
  394. payment_preimage: None,
  395. status: MeltQuoteState::Unknown,
  396. total_spent: Amount::ZERO,
  397. unit: CurrencyUnit::Msat,
  398. }),
  399. },
  400. _ => {
  401. tracing::warn!("CLN returned wrong response kind");
  402. Err(Error::WrongClnResponse.into())
  403. }
  404. }
  405. }
  406. }
  407. impl Cln {
  408. /// Get last pay index for cln
  409. async fn get_last_pay_index(&self) -> Result<Option<u64>, Error> {
  410. let mut cln_client = self.cln_client.lock().await;
  411. let cln_response = cln_client
  412. .call(cln_rpc::Request::ListInvoices(ListinvoicesRequest {
  413. index: None,
  414. invstring: None,
  415. label: None,
  416. limit: None,
  417. offer_id: None,
  418. payment_hash: None,
  419. start: None,
  420. }))
  421. .await
  422. .map_err(Error::from)?;
  423. match cln_response {
  424. cln_rpc::Response::ListInvoices(invoice_res) => match invoice_res.invoices.last() {
  425. Some(last_invoice) => Ok(last_invoice.pay_index),
  426. None => Ok(None),
  427. },
  428. _ => {
  429. tracing::warn!("CLN returned wrong response kind");
  430. Err(Error::WrongClnResponse)
  431. }
  432. }
  433. }
  434. }
  435. fn cln_invoice_status_to_mint_state(status: ListinvoicesInvoicesStatus) -> MintQuoteState {
  436. match status {
  437. ListinvoicesInvoicesStatus::UNPAID => MintQuoteState::Unpaid,
  438. ListinvoicesInvoicesStatus::PAID => MintQuoteState::Paid,
  439. ListinvoicesInvoicesStatus::EXPIRED => MintQuoteState::Unpaid,
  440. }
  441. }
  442. fn cln_pays_status_to_mint_state(status: ListpaysPaysStatus) -> MeltQuoteState {
  443. match status {
  444. ListpaysPaysStatus::PENDING => MeltQuoteState::Pending,
  445. ListpaysPaysStatus::COMPLETE => MeltQuoteState::Paid,
  446. ListpaysPaysStatus::FAILED => MeltQuoteState::Failed,
  447. }
  448. }
  449. async fn fetch_invoice_by_payment_hash(
  450. cln_client: &mut cln_rpc::ClnRpc,
  451. payment_hash: &str,
  452. ) -> Result<Option<ListinvoicesInvoices>, Error> {
  453. match cln_client
  454. .call(cln_rpc::Request::ListInvoices(ListinvoicesRequest {
  455. payment_hash: Some(payment_hash.to_string()),
  456. index: None,
  457. invstring: None,
  458. label: None,
  459. limit: None,
  460. offer_id: None,
  461. start: None,
  462. }))
  463. .await
  464. {
  465. Ok(cln_rpc::Response::ListInvoices(invoice_response)) => {
  466. Ok(invoice_response.invoices.first().cloned())
  467. }
  468. Ok(_) => {
  469. tracing::warn!("CLN returned an unexpected response type");
  470. Err(Error::WrongClnResponse)
  471. }
  472. Err(e) => {
  473. tracing::warn!("Error fetching invoice: {e}");
  474. Err(Error::from(e))
  475. }
  476. }
  477. }