server.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. use std::net::SocketAddr;
  2. use std::sync::Arc;
  3. use std::time::Duration;
  4. use prometheus::{Registry, TextEncoder};
  5. use crate::metrics::METRICS;
  6. #[cfg(feature = "system-metrics")]
  7. use crate::process::SystemMetrics;
  8. /// Configuration for the Prometheus server
  9. #[derive(Debug, Clone)]
  10. pub struct PrometheusConfig {
  11. /// Address to bind the server to (default: "127.0.0.1:9090")
  12. pub bind_address: SocketAddr,
  13. /// Path to serve metrics on (default: "/metrics")
  14. pub metrics_path: String,
  15. /// Whether to include system metrics (default: true if feature enabled)
  16. #[cfg(feature = "system-metrics")]
  17. pub include_system_metrics: bool,
  18. /// How often to update system metrics in seconds (default: 15)
  19. #[cfg(feature = "system-metrics")]
  20. pub system_metrics_interval: u64,
  21. }
  22. impl Default for PrometheusConfig {
  23. fn default() -> Self {
  24. Self {
  25. bind_address: "127.0.0.1:9090".parse().expect("Invalid default address"),
  26. metrics_path: "/metrics".to_string(),
  27. #[cfg(feature = "system-metrics")]
  28. include_system_metrics: true,
  29. #[cfg(feature = "system-metrics")]
  30. system_metrics_interval: 15,
  31. }
  32. }
  33. }
  34. /// Prometheus metrics server
  35. #[derive(Debug)]
  36. pub struct PrometheusServer {
  37. config: PrometheusConfig,
  38. registry: Arc<Registry>,
  39. #[cfg(feature = "system-metrics")]
  40. system_metrics: Option<SystemMetrics>,
  41. }
  42. impl PrometheusServer {
  43. /// Create a new Prometheus server with CDK metrics
  44. ///
  45. /// # Errors
  46. /// Returns an error if system metrics cannot be created (when enabled)
  47. pub fn new(config: PrometheusConfig) -> crate::Result<Self> {
  48. let registry = METRICS.registry();
  49. #[cfg(feature = "system-metrics")]
  50. let system_metrics = if config.include_system_metrics {
  51. let sys_metrics = SystemMetrics::new()?;
  52. Some(sys_metrics)
  53. } else {
  54. None
  55. };
  56. Ok(Self {
  57. config,
  58. registry,
  59. #[cfg(feature = "system-metrics")]
  60. system_metrics,
  61. })
  62. }
  63. /// Create a new Prometheus server with custom registry
  64. #[must_use]
  65. pub const fn with_registry(config: PrometheusConfig, registry: Arc<Registry>) -> Self {
  66. Self {
  67. config,
  68. registry,
  69. #[cfg(feature = "system-metrics")]
  70. system_metrics: None,
  71. }
  72. }
  73. /// Create a metrics handler function that gathers and encodes metrics
  74. fn create_metrics_handler(
  75. registry: Arc<Registry>,
  76. #[cfg(feature = "system-metrics")] system_metrics: Option<SystemMetrics>,
  77. ) -> impl Fn() -> String {
  78. move || {
  79. let encoder = TextEncoder::new();
  80. // Collect metrics from our registry
  81. #[cfg(feature = "system-metrics")]
  82. let mut metric_families = registry.gather();
  83. #[cfg(not(feature = "system-metrics"))]
  84. let metric_families = registry.gather();
  85. // Add system metrics if available
  86. #[cfg(feature = "system-metrics")]
  87. if let Some(ref sys_metrics) = system_metrics {
  88. // Update system metrics before collection
  89. if let Err(e) = sys_metrics.update_metrics() {
  90. tracing::warn!("Failed to update system metrics: {e}");
  91. }
  92. let sys_registry = sys_metrics.registry();
  93. let mut sys_families = sys_registry.gather();
  94. metric_families.append(&mut sys_families);
  95. }
  96. // Encode metrics to string
  97. encoder
  98. .encode_to_string(&metric_families)
  99. .unwrap_or_else(|e| {
  100. tracing::error!("Failed to encode metrics: {e}");
  101. format!("Failed to encode metrics: {e}")
  102. })
  103. }
  104. }
  105. /// Start the Prometheus HTTP server
  106. ///
  107. /// # Errors
  108. /// This function always returns Ok as errors are handled internally
  109. pub async fn start(
  110. self,
  111. shutdown_signal: impl std::future::Future<Output = ()> + Send + 'static,
  112. ) -> crate::Result<()> {
  113. // Create and start the exporter
  114. let binding = self.config.bind_address;
  115. let registry_clone = Arc::<Registry>::clone(&self.registry);
  116. // Create a handler that exposes our registry
  117. #[cfg(feature = "system-metrics")]
  118. let metrics_handler =
  119. Self::create_metrics_handler(registry_clone, self.system_metrics.clone());
  120. #[cfg(not(feature = "system-metrics"))]
  121. let metrics_handler = Self::create_metrics_handler(registry_clone);
  122. // Start the exporter in a background task
  123. let path = self.config.metrics_path.clone();
  124. // Create a channel for signaling the server task to shutdown
  125. let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
  126. // Spawn the server task
  127. let server_handle = tokio::spawn(async move {
  128. // We're using a simple HTTP server to expose our metrics
  129. use std::io::{Read, Write};
  130. use std::net::TcpListener;
  131. // Create a TCP listener
  132. let listener = match TcpListener::bind(binding) {
  133. Ok(listener) => {
  134. // Set non-blocking mode to allow for shutdown checking
  135. if let Err(e) = listener.set_nonblocking(true) {
  136. tracing::error!("Failed to set non-blocking mode: {e}");
  137. return;
  138. }
  139. listener
  140. }
  141. Err(e) => {
  142. tracing::error!("Failed to bind TCP listener: {e}");
  143. return;
  144. }
  145. };
  146. tracing::info!("Started Prometheus server on {} at path {}", binding, path);
  147. // Accept connections with shutdown signal handling
  148. loop {
  149. // Check for shutdown signal
  150. if shutdown_rx.try_recv().is_ok() {
  151. tracing::info!("Shutdown signal received, stopping Prometheus server");
  152. break;
  153. }
  154. // Try to accept a connection (non-blocking)
  155. match listener.accept() {
  156. Ok((mut stream, _)) => {
  157. // Handle the connection
  158. let mut buffer = [0; 1024];
  159. match stream.read(&mut buffer) {
  160. Ok(0) => {}
  161. Ok(bytes_read) => {
  162. // Convert the buffer to a string
  163. let request = String::from_utf8_lossy(&buffer[..bytes_read]);
  164. // Check if the request is for our metrics path
  165. if request.contains(&format!("GET {path} HTTP")) {
  166. // Get the metrics
  167. let metrics = metrics_handler();
  168. // Write the response
  169. let response = format!(
  170. "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}",
  171. metrics.len(),
  172. metrics
  173. );
  174. if let Err(e) = stream.write_all(response.as_bytes()) {
  175. tracing::error!("Failed to write response: {e}");
  176. }
  177. } else {
  178. // Write a 404 response
  179. let response = "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 9\r\n\r\nNot Found";
  180. if let Err(e) = stream.write_all(response.as_bytes()) {
  181. tracing::error!("Failed to write response: {e}");
  182. }
  183. }
  184. }
  185. Err(e) => {
  186. tracing::error!("Failed to read from stream: {e}");
  187. }
  188. }
  189. }
  190. Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
  191. // No connection available, continue the loop
  192. tokio::time::sleep(Duration::from_millis(10)).await;
  193. }
  194. Err(e) => {
  195. tracing::error!("Failed to accept connection: {e}");
  196. // Add a small delay to prevent busy looping on persistent errors
  197. tokio::time::sleep(Duration::from_millis(100)).await;
  198. }
  199. }
  200. }
  201. tracing::info!("Prometheus server stopped");
  202. });
  203. // Wait for the shutdown signal
  204. shutdown_signal.await;
  205. // Signal the server to shutdown
  206. let _ = shutdown_tx.send(());
  207. // Wait for the server task to complete (with a timeout)
  208. match tokio::time::timeout(Duration::from_secs(5), server_handle).await {
  209. Ok(result) => {
  210. if let Err(e) = result {
  211. tracing::error!("Server task failed: {e}");
  212. }
  213. }
  214. Err(_) => {
  215. tracing::warn!("Server shutdown timed out after 5 seconds");
  216. }
  217. }
  218. Ok(())
  219. }
  220. }
  221. /// Builder for easy Prometheus server setup
  222. #[derive(Debug)]
  223. pub struct PrometheusBuilder {
  224. config: PrometheusConfig,
  225. }
  226. impl PrometheusBuilder {
  227. /// Create a new builder with default configuration
  228. #[must_use]
  229. pub fn new() -> Self {
  230. Self {
  231. config: PrometheusConfig::default(),
  232. }
  233. }
  234. /// Set the bind address
  235. #[must_use]
  236. pub const fn bind_address(mut self, addr: SocketAddr) -> Self {
  237. self.config.bind_address = addr;
  238. self
  239. }
  240. /// Set the metrics path
  241. #[must_use]
  242. pub fn metrics_path<S: Into<String>>(mut self, path: S) -> Self {
  243. self.config.metrics_path = path.into();
  244. self
  245. }
  246. /// Enable or disable system metrics
  247. #[cfg(feature = "system-metrics")]
  248. #[must_use]
  249. pub const fn system_metrics(mut self, enabled: bool) -> Self {
  250. self.config.include_system_metrics = enabled;
  251. self
  252. }
  253. /// Set system metrics update interval
  254. #[cfg(feature = "system-metrics")]
  255. #[must_use]
  256. pub const fn system_metrics_interval(mut self, seconds: u64) -> Self {
  257. self.config.system_metrics_interval = seconds;
  258. self
  259. }
  260. /// Build the server with specific CDK metrics instance
  261. ///
  262. /// # Errors
  263. /// Returns an error if system metrics cannot be created (when enabled)
  264. pub fn build_with_cdk_metrics(self) -> crate::Result<PrometheusServer> {
  265. PrometheusServer::new(self.config)
  266. }
  267. /// Build the server with custom registry
  268. #[must_use]
  269. pub fn build_with_registry(self, registry: Arc<Registry>) -> PrometheusServer {
  270. PrometheusServer::with_registry(self.config, registry)
  271. }
  272. }
  273. impl Default for PrometheusBuilder {
  274. fn default() -> Self {
  275. Self::new()
  276. }
  277. }