client.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. use std::path::PathBuf;
  2. use std::pin::Pin;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::sync::Arc;
  5. use anyhow::anyhow;
  6. use cdk_common::payment::{
  7. CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
  8. MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
  9. PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
  10. };
  11. use futures::{Stream, StreamExt};
  12. use serde_json::Value;
  13. use tokio_util::sync::CancellationToken;
  14. use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
  15. use tonic::{async_trait, Request};
  16. use tracing::instrument;
  17. use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
  18. use crate::proto::{
  19. CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
  20. IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
  21. };
  22. /// Payment Processor
  23. #[derive(Clone)]
  24. pub struct PaymentProcessorClient {
  25. inner: CdkPaymentProcessorClient<Channel>,
  26. wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
  27. cancel_incoming_payment_listener: CancellationToken,
  28. }
  29. impl PaymentProcessorClient {
  30. /// Payment Processor
  31. pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
  32. let addr = format!("{addr}:{port}");
  33. let channel = if let Some(tls_dir) = tls_dir {
  34. // TLS directory exists, configure TLS
  35. // Check for ca.pem
  36. let ca_pem_path = tls_dir.join("ca.pem");
  37. if !ca_pem_path.exists() {
  38. let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
  39. tracing::error!("{}", err_msg);
  40. return Err(anyhow!(err_msg));
  41. }
  42. // Check for client.pem
  43. let client_pem_path = tls_dir.join("client.pem");
  44. if !client_pem_path.exists() {
  45. let err_msg = format!(
  46. "Client certificate file not found: {}",
  47. client_pem_path.display()
  48. );
  49. tracing::error!("{}", err_msg);
  50. return Err(anyhow!(err_msg));
  51. }
  52. // Check for client.key
  53. let client_key_path = tls_dir.join("client.key");
  54. if !client_key_path.exists() {
  55. let err_msg = format!("Client key file not found: {}", client_key_path.display());
  56. tracing::error!("{}", err_msg);
  57. return Err(anyhow!(err_msg));
  58. }
  59. let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
  60. let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
  61. let client_cert = std::fs::read_to_string(&client_pem_path)?;
  62. let client_key = std::fs::read_to_string(&client_key_path)?;
  63. let client_identity = Identity::from_pem(client_cert, client_key);
  64. let tls = ClientTlsConfig::new()
  65. .ca_certificate(server_root_ca_cert)
  66. .identity(client_identity);
  67. Channel::from_shared(addr)?
  68. .tls_config(tls)?
  69. .connect()
  70. .await?
  71. } else {
  72. // No TLS directory, skip TLS configuration
  73. Channel::from_shared(addr)?.connect().await?
  74. };
  75. let client = CdkPaymentProcessorClient::new(channel);
  76. Ok(Self {
  77. inner: client,
  78. wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
  79. cancel_incoming_payment_listener: CancellationToken::new(),
  80. })
  81. }
  82. }
  83. #[async_trait]
  84. impl MintPayment for PaymentProcessorClient {
  85. type Err = cdk_common::payment::Error;
  86. async fn get_settings(&self) -> Result<Value, Self::Err> {
  87. let mut inner = self.inner.clone();
  88. let response = inner
  89. .get_settings(Request::new(EmptyRequest {}))
  90. .await
  91. .map_err(|err| {
  92. tracing::error!("Could not get settings: {}", err);
  93. cdk_common::payment::Error::Custom(err.to_string())
  94. })?;
  95. let settings = response.into_inner();
  96. Ok(serde_json::from_str(&settings.inner)?)
  97. }
  98. /// Create a new invoice
  99. async fn create_incoming_payment_request(
  100. &self,
  101. unit: &cdk_common::CurrencyUnit,
  102. options: CdkIncomingPaymentOptions,
  103. ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
  104. let mut inner = self.inner.clone();
  105. let proto_options = match options {
  106. CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
  107. options: Some(super::incoming_payment_options::Options::Bolt11(
  108. super::Bolt11IncomingPaymentOptions {
  109. description: opts.description,
  110. amount: opts.amount.into(),
  111. unix_expiry: opts.unix_expiry,
  112. },
  113. )),
  114. },
  115. CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
  116. options: Some(super::incoming_payment_options::Options::Bolt12(
  117. super::Bolt12IncomingPaymentOptions {
  118. description: opts.description,
  119. amount: opts.amount.map(Into::into),
  120. unix_expiry: opts.unix_expiry,
  121. },
  122. )),
  123. },
  124. };
  125. let response = inner
  126. .create_payment(Request::new(CreatePaymentRequest {
  127. unit: unit.to_string(),
  128. options: Some(proto_options),
  129. }))
  130. .await
  131. .map_err(|err| {
  132. tracing::error!("Could not create payment request: {}", err);
  133. cdk_common::payment::Error::Custom(err.to_string())
  134. })?;
  135. let response = response.into_inner();
  136. Ok(response.try_into().map_err(|_| {
  137. cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
  138. })?)
  139. }
  140. async fn get_payment_quote(
  141. &self,
  142. unit: &cdk_common::CurrencyUnit,
  143. options: cdk_common::payment::OutgoingPaymentOptions,
  144. ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
  145. let mut inner = self.inner.clone();
  146. let request_type = match &options {
  147. cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
  148. OutgoingPaymentRequestType::Bolt11Invoice
  149. }
  150. cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
  151. OutgoingPaymentRequestType::Bolt12Offer
  152. }
  153. };
  154. let proto_request = match &options {
  155. cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
  156. cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
  157. };
  158. let proto_options = match &options {
  159. cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
  160. cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
  161. };
  162. let response = inner
  163. .get_payment_quote(Request::new(PaymentQuoteRequest {
  164. request: proto_request,
  165. unit: unit.to_string(),
  166. options: proto_options.map(Into::into),
  167. request_type: request_type.into(),
  168. }))
  169. .await
  170. .map_err(|err| {
  171. tracing::error!("Could not get payment quote: {}", err);
  172. cdk_common::payment::Error::Custom(err.to_string())
  173. })?;
  174. let response = response.into_inner();
  175. Ok(response.into())
  176. }
  177. async fn make_payment(
  178. &self,
  179. _unit: &cdk_common::CurrencyUnit,
  180. options: cdk_common::payment::OutgoingPaymentOptions,
  181. ) -> Result<CdkMakePaymentResponse, Self::Err> {
  182. let mut inner = self.inner.clone();
  183. let payment_options = match options {
  184. cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
  185. super::OutgoingPaymentVariant {
  186. options: Some(super::outgoing_payment_variant::Options::Bolt11(
  187. super::Bolt11OutgoingPaymentOptions {
  188. bolt11: opts.bolt11.to_string(),
  189. max_fee_amount: opts.max_fee_amount.map(Into::into),
  190. timeout_secs: opts.timeout_secs,
  191. melt_options: opts.melt_options.map(Into::into),
  192. },
  193. )),
  194. }
  195. }
  196. cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
  197. super::OutgoingPaymentVariant {
  198. options: Some(super::outgoing_payment_variant::Options::Bolt12(
  199. super::Bolt12OutgoingPaymentOptions {
  200. offer: opts.offer.to_string(),
  201. max_fee_amount: opts.max_fee_amount.map(Into::into),
  202. timeout_secs: opts.timeout_secs,
  203. invoice: opts.invoice,
  204. melt_options: opts.melt_options.map(Into::into),
  205. },
  206. )),
  207. }
  208. }
  209. };
  210. let response = inner
  211. .make_payment(Request::new(MakePaymentRequest {
  212. payment_options: Some(payment_options),
  213. partial_amount: None,
  214. max_fee_amount: None,
  215. }))
  216. .await
  217. .map_err(|err| {
  218. tracing::error!("Could not pay payment request: {}", err);
  219. if err.message().contains("already paid") {
  220. cdk_common::payment::Error::InvoiceAlreadyPaid
  221. } else if err.message().contains("pending") {
  222. cdk_common::payment::Error::InvoicePaymentPending
  223. } else {
  224. cdk_common::payment::Error::Custom(err.to_string())
  225. }
  226. })?;
  227. let response = response.into_inner();
  228. Ok(response.try_into().map_err(|_err| {
  229. cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
  230. })?)
  231. }
  232. #[instrument(skip_all)]
  233. async fn wait_any_incoming_payment(
  234. &self,
  235. ) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err> {
  236. self.wait_incoming_payment_stream_is_active
  237. .store(true, Ordering::SeqCst);
  238. tracing::debug!("Client waiting for payment");
  239. let mut inner = self.inner.clone();
  240. let stream = inner
  241. .wait_incoming_payment(EmptyRequest {})
  242. .await
  243. .map_err(|err| {
  244. tracing::error!("Could not check incoming payment stream: {}", err);
  245. cdk_common::payment::Error::Custom(err.to_string())
  246. })?
  247. .into_inner();
  248. let cancel_token = self.cancel_incoming_payment_listener.clone();
  249. let cancel_fut = cancel_token.cancelled_owned();
  250. let active_flag = self.wait_incoming_payment_stream_is_active.clone();
  251. let transformed_stream = stream
  252. .take_until(cancel_fut)
  253. .filter_map(|item| async {
  254. match item {
  255. Ok(value) => match value.try_into() {
  256. Ok(payment_response) => Some(payment_response),
  257. Err(e) => {
  258. tracing::error!("Error converting payment response: {}", e);
  259. None
  260. }
  261. },
  262. Err(e) => {
  263. tracing::error!("Error in payment stream: {}", e);
  264. None
  265. }
  266. }
  267. })
  268. .inspect(move |_| {
  269. active_flag.store(false, Ordering::SeqCst);
  270. tracing::info!("Payment stream inactive");
  271. });
  272. Ok(Box::pin(transformed_stream))
  273. }
  274. /// Is wait invoice active
  275. fn is_wait_invoice_active(&self) -> bool {
  276. self.wait_incoming_payment_stream_is_active
  277. .load(Ordering::SeqCst)
  278. }
  279. /// Cancel wait invoice
  280. fn cancel_wait_invoice(&self) {
  281. self.cancel_incoming_payment_listener.cancel();
  282. }
  283. async fn check_incoming_payment_status(
  284. &self,
  285. payment_identifier: &cdk_common::payment::PaymentIdentifier,
  286. ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
  287. let mut inner = self.inner.clone();
  288. let response = inner
  289. .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
  290. request_identifier: Some(payment_identifier.clone().into()),
  291. }))
  292. .await
  293. .map_err(|err| {
  294. tracing::error!("Could not check incoming payment: {}", err);
  295. cdk_common::payment::Error::Custom(err.to_string())
  296. })?;
  297. let check_incoming = response.into_inner();
  298. check_incoming
  299. .payments
  300. .into_iter()
  301. .map(|resp| resp.try_into().map_err(Self::Err::from))
  302. .collect()
  303. }
  304. async fn check_outgoing_payment(
  305. &self,
  306. payment_identifier: &cdk_common::payment::PaymentIdentifier,
  307. ) -> Result<CdkMakePaymentResponse, Self::Err> {
  308. let mut inner = self.inner.clone();
  309. let response = inner
  310. .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
  311. request_identifier: Some(payment_identifier.clone().into()),
  312. }))
  313. .await
  314. .map_err(|err| {
  315. tracing::error!("Could not check outgoing payment: {}", err);
  316. cdk_common::payment::Error::Custom(err.to_string())
  317. })?;
  318. let check_outgoing = response.into_inner();
  319. Ok(check_outgoing
  320. .try_into()
  321. .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
  322. }
  323. }