1
0
Эх сурвалжийг харах

Add metrics to all commands (#16)

* Added metrics to all commands.

The metrics gathering is using metered-rs library.

* Added a simple webserver to dump the prometheus metrics

TODO: Make this server configurable
César D. Rodas 3 жил өмнө
parent
commit
d95e8486fc
8 өөрчлөгдсөн 168 нэмэгдсэн , 14 устгасан
  1. 4 0
      Cargo.toml
  2. 8 4
      src/cmd/key.rs
  3. 28 0
      src/cmd/metrics.rs
  4. 1 0
      src/cmd/mod.rs
  5. 11 0
      src/dispatcher.rs
  6. 2 0
      src/error.rs
  7. 70 10
      src/macros.rs
  8. 44 0
      src/server.rs

+ 4 - 0
Cargo.toml

@@ -18,6 +18,10 @@ tokio-stream="0.1"
 seahash = "4"
 log="0.4"
 glob="^0.2"
+metered="^0.4"
+serde="^1.0"
+serde_json = "^1.0"
+serde_prometheus="^0.1"
 env_logger = "0.8.4"
 bytes = "1"
 rand = "0.8.0"

+ 8 - 4
src/cmd/key.rs

@@ -143,10 +143,14 @@ mod test {
             Ok(Value::Integer(1)),
             run_command(&c, &["pexpire", "foo", "6000"]).await
         );
-        assert_eq!(
-            Ok(Value::Integer(5999)),
-            run_command(&c, &["pttl", "foo"]).await
-        );
+
+        match run_command(&c, &["pttl", "foo"]).await {
+            Ok(Value::Integer(n)) => {
+                assert!(n < 6000 && n > 5900);
+            }
+            _ => unreachable!(),
+        };
+
         assert_eq!(
             Ok(Value::Integer(1)),
             run_command(&c, &["persist", "foo"]).await

+ 28 - 0
src/cmd/metrics.rs

@@ -0,0 +1,28 @@
+use crate::{connection::Connection, error::Error, value::Value};
+use bytes::Bytes;
+
+pub async fn metrics(conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+    let dispatcher = conn.all_connections().get_dispatcher();
+    let mut result: Vec<Value> = vec![];
+    let commands = if args.len() == 1 {
+        dispatcher.get_all_commands()
+    } else {
+        let mut commands = vec![];
+        for command in &(args[1..]) {
+            let command = String::from_utf8_lossy(command);
+            commands.push(dispatcher.get_handler_for_command(&command)?);
+        }
+        commands
+    };
+
+    for command in commands.iter() {
+        result.push(command.name().into());
+        result.push(
+            serde_json::to_string(command.metrics())
+                .map_err(|_| Error::Internal)?
+                .into(),
+        );
+    }
+
+    Ok(result.into())
+}

+ 1 - 0
src/cmd/mod.rs

@@ -4,6 +4,7 @@ pub mod key;
 pub mod list;
 pub mod pubsub;
 pub mod set;
+pub mod metrics;
 pub mod string;
 pub mod transaction;
 

+ 11 - 0
src/dispatcher.rs

@@ -166,6 +166,17 @@ dispatcher! {
             true,
         },
     },
+    metrics {
+        metrics {
+            cmd::metrics::metrics,
+            [""],
+            -1,
+            0,
+            0,
+            0,
+            false,
+        },
+    },
     list {
         blpop {
             cmd::list::blpop,

+ 2 - 0
src/error.rs

@@ -5,6 +5,7 @@ pub enum Error {
     CommandNotFound(String),
     InvalidArgsCount(String),
     InvalidPattern(String),
+    Internal,
     Protocol(String, String),
     WrongArgument(String, String),
     NotFound,
@@ -30,6 +31,7 @@ impl From<Error> for Value {
             Error::CommandNotFound(x) => format!("unknown command `{}`", x),
             Error::InvalidArgsCount(x) => format!("wrong number of arguments for '{}' command", x),
             Error::InvalidPattern(x) => format!("'{}' is not a valid pattern", x),
+            Error::Internal => "internal error".to_owned(),
             Error::Protocol(x, y) => format!("Protocol error: expected '{}', got '{}'", x, y),
             Error::NotInTx => " without MULTI".to_owned(),
             Error::NotANumber => "value is not an integer or out of range".to_owned(),

+ 70 - 10
src/macros.rs

@@ -18,6 +18,7 @@ macro_rules! dispatcher {
             pub mod $command {
                 use super::*;
                 use async_trait::async_trait;
+                use metered::measure;
 
                 #[derive(Debug)]
                 pub struct Command {
@@ -26,6 +27,7 @@ macro_rules! dispatcher {
                     pub key_start: i32,
                     pub key_stop: i32,
                     pub key_step: usize,
+                    pub metrics: Metrics,
                 }
 
                 impl Command {
@@ -36,6 +38,7 @@ macro_rules! dispatcher {
                             key_start: $key_start,
                             key_stop: $key_stop,
                             key_step: $key_step,
+                            metrics: Metrics::default(),
                         }
                     }
                 }
@@ -43,16 +46,35 @@ macro_rules! dispatcher {
                 #[async_trait]
                 impl ExecutableCommand for Command {
                     async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error> {
+                        let metrics = self.metrics();
+                        let hit_count = &metrics.hit_count;
+                        let error_count = &metrics.error_count;
+                        let in_flight = &metrics.in_flight;
+                        let response_time = &metrics.response_time;
+                        let throughput = &metrics.throughput;
+
                         let status = conn.status();
                         if status == ConnectionStatus::Multi && self.is_queueable() {
                             conn.queue_command(args);
                             conn.tx_keys(self.get_keys(args));
-                            Ok(Value::Queued)
+                            return Ok(Value::Queued);
                         } else if status == ConnectionStatus::Pubsub && ! self.is_pubsub_executable() {
-                            Err(Error::PubsubOnly(stringify!($command).to_owned()))
-                        } else {
-                            $handler(conn, args).await
+                            return Err(Error::PubsubOnly(stringify!($command).to_owned()));
                         }
+
+                        measure!(hit_count, {
+                            measure!(response_time, {
+                                measure!(throughput, {
+                                    measure!(in_flight, {
+                                        measure!(error_count, $handler(conn, args).await)
+                                    })
+                                })
+                            })
+                        })
+                    }
+
+                    fn metrics(&self) -> &Metrics {
+                        &self.metrics
                     }
 
                     fn is_pubsub_executable(&self) -> bool {
@@ -105,11 +127,14 @@ macro_rules! dispatcher {
         )+)+
 
         use async_trait::async_trait;
+        use metered::{Throughput, HitCount, ErrorCount, InFlight, ResponseTime};
 
         #[async_trait]
         pub trait ExecutableCommand {
             async fn execute(&self, conn: &Connection, args: &[Bytes]) -> Result<Value, Error>;
 
+            fn metrics(&self) -> &Metrics;
+
             fn is_queueable(&self) -> bool;
 
             fn is_pubsub_executable(&self) -> bool;
@@ -123,6 +148,22 @@ macro_rules! dispatcher {
             fn name(&self) -> &'static str;
         }
 
+        #[derive(Debug, Default, serde::Serialize)]
+        pub struct Metrics {
+            hit_count: HitCount,
+            error_count: ErrorCount,
+            in_flight: InFlight,
+            response_time: ResponseTime,
+            throughput: Throughput,
+        }
+
+        #[derive(serde::Serialize)]
+        pub struct ServiceMetricRegistry<'a> {
+            $($(
+            $command: &'a Metrics,
+            )+)+
+        }
+
         #[allow(non_snake_case, non_camel_case_types)]
         #[derive(Debug)]
         pub struct Dispatcher {
@@ -139,16 +180,35 @@ macro_rules! dispatcher {
                     )+)+
                 }
             }
-            pub fn get_handler(&self, args: &[Bytes]) -> Result<&(dyn ExecutableCommand + Send + Sync + 'static), Error> {
-                let command = String::from_utf8_lossy(&args[0]).to_lowercase();
 
-                let command: &(dyn ExecutableCommand + Send + Sync + 'static) = match command.as_str() {
+            pub fn get_service_metric_registry(&self) -> ServiceMetricRegistry {
+                ServiceMetricRegistry {
+                    $($(
+                        $command: self.$command.metrics(),
+                    )+)+
+                }
+            }
+
+            pub fn get_all_commands(&self) -> Vec<&(dyn ExecutableCommand + Send + Sync + 'static)> {
+                vec![
                 $($(
-                    stringify!($command) => &self.$command,
+                    &self.$command,
                 )+)+
-                    _ => return Err(Error::CommandNotFound(command.into())),
-                };
+                ]
+            }
 
+            pub fn get_handler_for_command(&self, command: &str) -> Result<&(dyn ExecutableCommand + Send + Sync + 'static), Error> {
+                match command {
+                $($(
+                    stringify!($command) => Ok(&self.$command),
+                )+)+
+                    _ => Err(Error::CommandNotFound(command.into())),
+                }
+            }
+
+            pub fn get_handler(&self, args: &[Bytes]) -> Result<&(dyn ExecutableCommand + Send + Sync + 'static), Error> {
+                let command = String::from_utf8_lossy(&args[0]).to_lowercase();
+                let command = self.get_handler_for_command(&command)?;
                 if ! command.check_number_args(args.len()) {
                     Err(Error::InvalidArgsCount(command.name().into()))
                 } else {

+ 44 - 0
src/server.rs

@@ -9,6 +9,7 @@ use log::{info, trace, warn};
 use redis_zero_protocol_parser::{parse_server, Error as RedisError};
 use std::{error::Error, io, sync::Arc};
 use tokio::{
+    io::{AsyncReadExt, AsyncWriteExt},
     net::TcpListener,
     time::{sleep, Duration},
 };
@@ -50,12 +51,51 @@ impl Decoder for RedisParser {
     }
 }
 
+pub async fn server_metrics(all_connections: Arc<Connections>) {
+    info!("Listening on 127.0.0.1:7878 for metrics");
+    let listener = tokio::net::TcpListener::bind("127.0.0.1:7878")
+        .await
+        .expect("Failed to start metrics server");
+
+    let mut globals = std::collections::HashMap::new();
+    globals.insert("service", "microredis");
+
+    loop {
+        let (mut stream, _) = listener.accept().await.expect("accept client");
+        let mut buf = vec![0; 1024];
+
+        let _ = match stream.read(&mut buf).await {
+            Ok(n) => n,
+            Err(_) => continue,
+        };
+
+        let serialized = serde_prometheus::to_string(
+            &all_connections
+                .get_dispatcher()
+                .get_service_metric_registry(),
+            Some("redis"),
+            globals.clone(),
+        )
+        .unwrap_or("".to_owned());
+
+        let response = format!(
+            "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
+            serialized.len(),
+            serialized
+        );
+
+        let _ = stream.write_all(&response.as_bytes()).await;
+        let _ = stream.flush().await;
+    }
+}
+
 pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
     let listener = TcpListener::bind(&addr).await?;
     info!("Listening on: {}", addr);
 
     let db = Arc::new(Db::new(1000));
     let all_connections = Arc::new(Connections::new(db.clone()));
+    let all_connections_for_metrics = all_connections.clone();
 
     let db_for_purging = db.clone();
     tokio::spawn(async move {
@@ -65,6 +105,10 @@ pub async fn serve(addr: String) -> Result<(), Box<dyn Error>> {
         }
     });
 
+    tokio::spawn(async move {
+        server_metrics(all_connections_for_metrics.clone()).await;
+    });
+
     loop {
         match listener.accept().await {
             Ok((socket, addr)) => {