Преглед на файлове

Allow Signatory to be run with custom incoming stream (#776)

David Caseria преди 3 седмици
родител
ревизия
0a832ff161
променени са 4 файла, в които са добавени 33 реда и са изтрити 4 реда
  1. 1 0
      crates/cdk-signatory/Cargo.toml
  2. 2 2
      crates/cdk-signatory/src/bin/cli/mod.rs
  3. 3 1
      crates/cdk-signatory/src/lib.rs
  4. 27 1
      crates/cdk-signatory/src/proto/server.rs

+ 1 - 0
crates/cdk-signatory/Cargo.toml

@@ -33,6 +33,7 @@ home.workspace = true
 thiserror.workspace = true
 tracing-subscriber.workspace = true
 tokio = { workspace = true, features = ["full"] }
+tokio-stream.workspace = true
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 tokio = { workspace = true, features = ["rt", "macros", "sync", "time"] }

+ 2 - 2
crates/cdk-signatory/src/bin/cli/mod.rs

@@ -15,7 +15,7 @@ use cdk_common::database::MintKeysDatabase;
 use cdk_common::CurrencyUnit;
 #[cfg(feature = "redb")]
 use cdk_redb::MintRedbDatabase;
-use cdk_signatory::{db_signatory, grpc_server};
+use cdk_signatory::{db_signatory, start_grpc_server};
 #[cfg(feature = "sqlite")]
 use cdk_sqlite::MintSqliteDatabase;
 use clap::Parser;
@@ -167,7 +167,7 @@ pub async fn cli_main() -> Result<()> {
 
     let socket_addr = SocketAddr::from_str(&format!("{}:{}", args.listen_addr, args.listen_port))?;
 
-    grpc_server(signatory, socket_addr, certs).await?;
+    start_grpc_server(signatory, socket_addr, certs).await?;
 
     Ok(())
 }

+ 3 - 1
crates/cdk-signatory/src/lib.rs

@@ -12,7 +12,9 @@
 mod proto;
 
 #[cfg(feature = "grpc")]
-pub use proto::{client::SignatoryRpcClient, server::grpc_server};
+pub use proto::{
+    client::SignatoryRpcClient, server::start_grpc_server, server::start_grpc_server_with_incoming,
+};
 
 mod common;
 

+ 27 - 1
crates/cdk-signatory/src/proto/server.rs

@@ -1,12 +1,17 @@
+//! This module contains the generated gRPC server code for the Signatory service.
 use std::net::SocketAddr;
 use std::path::Path;
 
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio_stream::Stream;
+use tonic::transport::server::Connected;
 use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig};
 use tonic::{Request, Response, Status};
 
 use crate::proto::{self, signatory_server};
 use crate::signatory::Signatory;
 
+/// The server implementation for the Signatory service.
 pub struct CdkSignatoryServer<T>
 where
     T: Signatory + Send + Sync + 'static,
@@ -130,6 +135,7 @@ where
     }
 }
 
+/// Error type for the gRPC server
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
     /// Transport error
@@ -141,7 +147,7 @@ pub enum Error {
 }
 
 /// Runs the signatory server
-pub async fn grpc_server<T, I: AsRef<Path>>(
+pub async fn start_grpc_server<T, I: AsRef<Path>>(
     signatory: T,
     addr: SocketAddr,
     tls_dir: Option<I>,
@@ -220,3 +226,23 @@ where
         .await?;
     Ok(())
 }
+
+/// Starts the gRPC signatory server with an incoming stream of connections.
+pub async fn start_grpc_server_with_incoming<T, I, IO, IE>(
+    signatory: T,
+    incoming: I,
+) -> Result<(), Error>
+where
+    T: Signatory + Send + Sync + 'static,
+    I: Stream<Item = Result<IO, IE>>,
+    IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
+    IE: Into<Box<dyn std::error::Error + Send + Sync>>,
+{
+    Server::builder()
+        .add_service(signatory_server::SignatoryServer::new(CdkSignatoryServer {
+            inner: signatory,
+        }))
+        .serve_with_incoming(incoming)
+        .await?;
+    Ok(())
+}