client.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. use std::path::PathBuf;
  2. use std::pin::Pin;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::sync::Arc;
  5. use anyhow::anyhow;
  6. use cdk_common::payment::{
  7. CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
  8. MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
  9. PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
  10. };
  11. use futures::{Stream, StreamExt};
  12. use serde_json::Value;
  13. use tokio_util::sync::CancellationToken;
  14. use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
  15. use tonic::{async_trait, Request};
  16. use tracing::instrument;
  17. use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
  18. use crate::proto::{
  19. CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
  20. IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
  21. };
  22. /// Payment Processor
  23. #[derive(Clone)]
  24. pub struct PaymentProcessorClient {
  25. inner: CdkPaymentProcessorClient<Channel>,
  26. wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
  27. cancel_incoming_payment_listener: CancellationToken,
  28. }
  29. impl PaymentProcessorClient {
  30. /// Payment Processor
  31. pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
  32. let addr = format!("{addr}:{port}");
  33. let channel = if let Some(tls_dir) = tls_dir {
  34. // TLS directory exists, configure TLS
  35. // Check for ca.pem
  36. let ca_pem_path = tls_dir.join("ca.pem");
  37. if !ca_pem_path.exists() {
  38. let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
  39. tracing::error!("{}", err_msg);
  40. return Err(anyhow!(err_msg));
  41. }
  42. // Check for client.pem
  43. let client_pem_path = tls_dir.join("client.pem");
  44. // Check for client.key
  45. let client_key_path = tls_dir.join("client.key");
  46. // check for ca cert
  47. let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
  48. let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
  49. let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
  50. true => {
  51. let client_cert = std::fs::read_to_string(&client_pem_path)?;
  52. let client_key = std::fs::read_to_string(&client_key_path)?;
  53. let client_identity = Identity::from_pem(client_cert, client_key);
  54. ClientTlsConfig::new()
  55. .ca_certificate(server_root_ca_cert)
  56. .identity(client_identity)
  57. }
  58. false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
  59. };
  60. Channel::from_shared(addr)?
  61. .tls_config(tls)?
  62. .connect()
  63. .await?
  64. } else {
  65. // No TLS directory, skip TLS configuration
  66. Channel::from_shared(addr)?.connect().await?
  67. };
  68. let client = CdkPaymentProcessorClient::new(channel);
  69. Ok(Self {
  70. inner: client,
  71. wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
  72. cancel_incoming_payment_listener: CancellationToken::new(),
  73. })
  74. }
  75. }
  76. #[async_trait]
  77. impl MintPayment for PaymentProcessorClient {
  78. type Err = cdk_common::payment::Error;
  79. async fn get_settings(&self) -> Result<Value, Self::Err> {
  80. let mut inner = self.inner.clone();
  81. let response = inner
  82. .get_settings(Request::new(EmptyRequest {}))
  83. .await
  84. .map_err(|err| {
  85. tracing::error!("Could not get settings: {}", err);
  86. cdk_common::payment::Error::Custom(err.to_string())
  87. })?;
  88. let settings = response.into_inner();
  89. Ok(serde_json::from_str(&settings.inner)?)
  90. }
  91. /// Create a new invoice
  92. async fn create_incoming_payment_request(
  93. &self,
  94. unit: &cdk_common::CurrencyUnit,
  95. options: CdkIncomingPaymentOptions,
  96. ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
  97. let mut inner = self.inner.clone();
  98. let proto_options = match options {
  99. CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
  100. options: Some(super::incoming_payment_options::Options::Bolt11(
  101. super::Bolt11IncomingPaymentOptions {
  102. description: opts.description,
  103. amount: opts.amount.into(),
  104. unix_expiry: opts.unix_expiry,
  105. },
  106. )),
  107. },
  108. CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
  109. options: Some(super::incoming_payment_options::Options::Bolt12(
  110. super::Bolt12IncomingPaymentOptions {
  111. description: opts.description,
  112. amount: opts.amount.map(Into::into),
  113. unix_expiry: opts.unix_expiry,
  114. },
  115. )),
  116. },
  117. };
  118. let response = inner
  119. .create_payment(Request::new(CreatePaymentRequest {
  120. unit: unit.to_string(),
  121. options: Some(proto_options),
  122. }))
  123. .await
  124. .map_err(|err| {
  125. tracing::error!("Could not create payment request: {}", err);
  126. cdk_common::payment::Error::Custom(err.to_string())
  127. })?;
  128. let response = response.into_inner();
  129. Ok(response.try_into().map_err(|_| {
  130. cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
  131. })?)
  132. }
  133. async fn get_payment_quote(
  134. &self,
  135. unit: &cdk_common::CurrencyUnit,
  136. options: cdk_common::payment::OutgoingPaymentOptions,
  137. ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
  138. let mut inner = self.inner.clone();
  139. let request_type = match &options {
  140. cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
  141. OutgoingPaymentRequestType::Bolt11Invoice
  142. }
  143. cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
  144. OutgoingPaymentRequestType::Bolt12Offer
  145. }
  146. };
  147. let proto_request = match &options {
  148. cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
  149. cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
  150. };
  151. let proto_options = match &options {
  152. cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
  153. cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
  154. };
  155. let response = inner
  156. .get_payment_quote(Request::new(PaymentQuoteRequest {
  157. request: proto_request,
  158. unit: unit.to_string(),
  159. options: proto_options.map(Into::into),
  160. request_type: request_type.into(),
  161. }))
  162. .await
  163. .map_err(|err| {
  164. tracing::error!("Could not get payment quote: {}", err);
  165. cdk_common::payment::Error::Custom(err.to_string())
  166. })?;
  167. let response = response.into_inner();
  168. Ok(response.into())
  169. }
  170. async fn make_payment(
  171. &self,
  172. _unit: &cdk_common::CurrencyUnit,
  173. options: cdk_common::payment::OutgoingPaymentOptions,
  174. ) -> Result<CdkMakePaymentResponse, Self::Err> {
  175. let mut inner = self.inner.clone();
  176. let payment_options = match options {
  177. cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
  178. super::OutgoingPaymentVariant {
  179. options: Some(super::outgoing_payment_variant::Options::Bolt11(
  180. super::Bolt11OutgoingPaymentOptions {
  181. bolt11: opts.bolt11.to_string(),
  182. max_fee_amount: opts.max_fee_amount.map(Into::into),
  183. timeout_secs: opts.timeout_secs,
  184. melt_options: opts.melt_options.map(Into::into),
  185. },
  186. )),
  187. }
  188. }
  189. cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
  190. super::OutgoingPaymentVariant {
  191. options: Some(super::outgoing_payment_variant::Options::Bolt12(
  192. super::Bolt12OutgoingPaymentOptions {
  193. offer: opts.offer.to_string(),
  194. max_fee_amount: opts.max_fee_amount.map(Into::into),
  195. timeout_secs: opts.timeout_secs,
  196. melt_options: opts.melt_options.map(Into::into),
  197. },
  198. )),
  199. }
  200. }
  201. };
  202. let response = inner
  203. .make_payment(Request::new(MakePaymentRequest {
  204. payment_options: Some(payment_options),
  205. partial_amount: None,
  206. max_fee_amount: None,
  207. }))
  208. .await
  209. .map_err(|err| {
  210. tracing::error!("Could not pay payment request: {}", err);
  211. if err.message().contains("already paid") {
  212. cdk_common::payment::Error::InvoiceAlreadyPaid
  213. } else if err.message().contains("pending") {
  214. cdk_common::payment::Error::InvoicePaymentPending
  215. } else {
  216. cdk_common::payment::Error::Custom(err.to_string())
  217. }
  218. })?;
  219. let response = response.into_inner();
  220. Ok(response.try_into().map_err(|_err| {
  221. cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
  222. })?)
  223. }
  224. #[instrument(skip_all)]
  225. async fn wait_payment_event(
  226. &self,
  227. ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
  228. self.wait_incoming_payment_stream_is_active
  229. .store(true, Ordering::SeqCst);
  230. tracing::debug!("Client waiting for payment");
  231. let mut inner = self.inner.clone();
  232. let stream = inner
  233. .wait_incoming_payment(EmptyRequest {})
  234. .await
  235. .map_err(|err| {
  236. tracing::error!("Could not check incoming payment stream: {}", err);
  237. cdk_common::payment::Error::Custom(err.to_string())
  238. })?
  239. .into_inner();
  240. let cancel_token = self.cancel_incoming_payment_listener.clone();
  241. let cancel_fut = cancel_token.cancelled_owned();
  242. let active_flag = self.wait_incoming_payment_stream_is_active.clone();
  243. let transformed_stream = stream
  244. .take_until(cancel_fut)
  245. .filter_map(|item| async {
  246. match item {
  247. Ok(value) => match value.try_into() {
  248. Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
  249. payment_response,
  250. )),
  251. Err(e) => {
  252. tracing::error!("Error converting payment response: {}", e);
  253. None
  254. }
  255. },
  256. Err(e) => {
  257. tracing::error!("Error in payment stream: {}", e);
  258. None
  259. }
  260. }
  261. })
  262. .inspect(move |_| {
  263. active_flag.store(false, Ordering::SeqCst);
  264. tracing::info!("Payment stream inactive");
  265. });
  266. Ok(Box::pin(transformed_stream))
  267. }
  268. /// Is wait invoice active
  269. fn is_wait_invoice_active(&self) -> bool {
  270. self.wait_incoming_payment_stream_is_active
  271. .load(Ordering::SeqCst)
  272. }
  273. /// Cancel wait invoice
  274. fn cancel_wait_invoice(&self) {
  275. self.cancel_incoming_payment_listener.cancel();
  276. }
  277. async fn check_incoming_payment_status(
  278. &self,
  279. payment_identifier: &cdk_common::payment::PaymentIdentifier,
  280. ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
  281. let mut inner = self.inner.clone();
  282. let response = inner
  283. .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
  284. request_identifier: Some(payment_identifier.clone().into()),
  285. }))
  286. .await
  287. .map_err(|err| {
  288. tracing::error!("Could not check incoming payment: {}", err);
  289. cdk_common::payment::Error::Custom(err.to_string())
  290. })?;
  291. let check_incoming = response.into_inner();
  292. check_incoming
  293. .payments
  294. .into_iter()
  295. .map(|resp| resp.try_into().map_err(Self::Err::from))
  296. .collect()
  297. }
  298. async fn check_outgoing_payment(
  299. &self,
  300. payment_identifier: &cdk_common::payment::PaymentIdentifier,
  301. ) -> Result<CdkMakePaymentResponse, Self::Err> {
  302. let mut inner = self.inner.clone();
  303. let response = inner
  304. .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
  305. request_identifier: Some(payment_identifier.clone().into()),
  306. }))
  307. .await
  308. .map_err(|err| {
  309. tracing::error!("Could not check outgoing payment: {}", err);
  310. cdk_common::payment::Error::Custom(err.to_string())
  311. })?;
  312. let check_outgoing = response.into_inner();
  313. Ok(check_outgoing
  314. .try_into()
  315. .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
  316. }
  317. }