embedded.rs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. //! Run a Signatory in a embedded environment, inside a CDK instance, but this wrapper makes sure to
  2. //! run the Signatory in another thread, isolated form the main CDK, communicating through messages
  3. use std::sync::Arc;
  4. use cdk_common::{BlindSignature, BlindedMessage, Error, Proof};
  5. use tokio::sync::{mpsc, oneshot};
  6. use tokio::task::JoinHandle;
  7. use crate::signatory::{RotateKeyArguments, Signatory, SignatoryKeySet, SignatoryKeysets};
  8. enum Request {
  9. BlindSign(
  10. (
  11. Vec<BlindedMessage>,
  12. oneshot::Sender<Result<Vec<BlindSignature>, Error>>,
  13. ),
  14. ),
  15. VerifyProof((Vec<Proof>, oneshot::Sender<Result<(), Error>>)),
  16. Keysets(oneshot::Sender<Result<SignatoryKeysets, Error>>),
  17. RotateKeyset(
  18. (
  19. RotateKeyArguments,
  20. oneshot::Sender<Result<SignatoryKeySet, Error>>,
  21. ),
  22. ),
  23. }
  24. /// Creates a service-like to wrap an implementation of the Signatory
  25. ///
  26. /// This implements the actor model, ensuring the Signatory and their private key is moved from the
  27. /// main thread to their own tokio task, and communicates with the main program by passing messages,
  28. /// an extra layer of security to move the keys to another layer.
  29. pub struct Service {
  30. pipeline: mpsc::Sender<Request>,
  31. runner: Option<JoinHandle<()>>,
  32. }
  33. impl Drop for Service {
  34. fn drop(&mut self) {
  35. if let Some(runner) = self.runner.take() {
  36. runner.abort();
  37. }
  38. }
  39. }
  40. impl Service {
  41. /// Takes a signatory and spawns it into a Tokio task, isolating its implementation with the
  42. /// main thread, communicating with it through messages
  43. pub fn new(handler: Arc<dyn Signatory + Send + Sync>) -> Self {
  44. let (tx, rx) = mpsc::channel(10_000);
  45. let runner = Some(tokio::spawn(Self::runner(rx, handler)));
  46. Self {
  47. pipeline: tx,
  48. runner,
  49. }
  50. }
  51. #[tracing::instrument(skip_all)]
  52. async fn runner(
  53. mut receiver: mpsc::Receiver<Request>,
  54. handler: Arc<dyn Signatory + Send + Sync>,
  55. ) {
  56. while let Some(request) = receiver.recv().await {
  57. match request {
  58. Request::BlindSign((blinded_message, response)) => {
  59. let output = handler.blind_sign(blinded_message).await;
  60. if let Err(err) = response.send(output) {
  61. tracing::error!("Error sending response: {:?}", err);
  62. }
  63. }
  64. Request::VerifyProof((proof, response)) => {
  65. let output = handler.verify_proofs(proof).await;
  66. if let Err(err) = response.send(output) {
  67. tracing::error!("Error sending response: {:?}", err);
  68. }
  69. }
  70. Request::Keysets(response) => {
  71. let output = handler.keysets().await;
  72. if let Err(err) = response.send(output) {
  73. tracing::error!("Error sending response: {:?}", err);
  74. }
  75. }
  76. Request::RotateKeyset((args, response)) => {
  77. let output = handler.rotate_keyset(args).await;
  78. if let Err(err) = response.send(output) {
  79. tracing::error!("Error sending response: {:?}", err);
  80. }
  81. }
  82. }
  83. }
  84. }
  85. }
  86. #[async_trait::async_trait]
  87. impl Signatory for Service {
  88. fn name(&self) -> String {
  89. "Embedded".to_owned()
  90. }
  91. #[tracing::instrument(skip_all)]
  92. async fn blind_sign(
  93. &self,
  94. blinded_messages: Vec<BlindedMessage>,
  95. ) -> Result<Vec<BlindSignature>, Error> {
  96. let (tx, rx) = oneshot::channel();
  97. self.pipeline
  98. .send(Request::BlindSign((blinded_messages, tx)))
  99. .await
  100. .map_err(|e| Error::SendError(e.to_string()))?;
  101. rx.await.map_err(|e| Error::RecvError(e.to_string()))?
  102. }
  103. #[tracing::instrument(skip_all)]
  104. async fn verify_proofs(&self, proofs: Vec<Proof>) -> Result<(), Error> {
  105. let (tx, rx) = oneshot::channel();
  106. self.pipeline
  107. .send(Request::VerifyProof((proofs, tx)))
  108. .await
  109. .map_err(|e| Error::SendError(e.to_string()))?;
  110. rx.await.map_err(|e| Error::RecvError(e.to_string()))?
  111. }
  112. #[tracing::instrument(skip_all)]
  113. async fn keysets(&self) -> Result<SignatoryKeysets, Error> {
  114. let (tx, rx) = oneshot::channel();
  115. self.pipeline
  116. .send(Request::Keysets(tx))
  117. .await
  118. .map_err(|e| Error::SendError(e.to_string()))?;
  119. rx.await.map_err(|e| Error::RecvError(e.to_string()))?
  120. }
  121. #[tracing::instrument(skip(self))]
  122. async fn rotate_keyset(&self, args: RotateKeyArguments) -> Result<SignatoryKeySet, Error> {
  123. let (tx, rx) = oneshot::channel();
  124. self.pipeline
  125. .send(Request::RotateKeyset((args, tx)))
  126. .await
  127. .map_err(|e| Error::SendError(e.to_string()))?;
  128. rx.await.map_err(|e| Error::RecvError(e.to_string()))?
  129. }
  130. }