From cbcfa61c84a9d48c37d272cdfd673ab2ad71c551 Mon Sep 17 00:00:00 2001 From: Conor Date: Mon, 30 Aug 2021 13:39:24 +1000 Subject: [PATCH] Update metrics and pktparse (#151) json metrics support is removed as it was decided we dont need it anymore --- shotover-proxy/Cargo.toml | 8 ++-- shotover-proxy/src/admin/httpserver.rs | 38 +++++++------------ shotover-proxy/src/runner.rs | 14 +++---- shotover-proxy/src/server.rs | 2 +- shotover-proxy/src/transforms/chain.rs | 5 +-- shotover-proxy/src/transforms/mod.rs | 6 +-- shotover-proxy/tests/admin/mod.rs | 1 + .../tests/admin/observability_int_tests.rs | 36 ++++++++++++++++++ shotover-proxy/tests/helpers/mod.rs | 9 ++++- shotover-proxy/tests/lib.rs | 1 + 10 files changed, 74 insertions(+), 46 deletions(-) create mode 100644 shotover-proxy/tests/admin/mod.rs create mode 100644 shotover-proxy/tests/admin/observability_int_tests.rs diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 6a7872dd2..4041fc3f5 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -38,9 +38,8 @@ serde_yaml = "0.8" bincode = "1.3.1" #Observability -metrics = "0.12.1" -metrics-core = "0.5.2" -metrics-runtime = "0.13.1" +metrics = "0.17.0" +metrics-exporter-prometheus = "0.6.0" tracing = { version = "0.1.15", features = ["release_max_level_info"]} tracing-subscriber = { version = "0.2.11", features = ["env-filter"]} tracing-futures = "0.2.4" @@ -72,7 +71,7 @@ rusoto_signature = "0.47.0" criterion = { version = "0.3", features = ["async_tokio", "html_reports"] } redis = { version = "0.21.0", features = ["tokio-comp", "cluster"] } pcap = "0.8.1" -pktparse = {version = "0.4.0", features = ["derive"]} +pktparse = {version = "0.6.1", features = ["serde"]} dns-parser = "0.8" tls-parser = "0.10.0" threadpool = "1.0" @@ -80,6 +79,7 @@ num_cpus = "1.0" serial_test = "0.5.1" test-helpers = { path = "../test-helpers" } hex-literal = "0.3.3" +reqwest = { version = "0.11.4", features = ["blocking"] } [[bench]] name = "redis_benches" diff --git a/shotover-proxy/src/admin/httpserver.rs b/shotover-proxy/src/admin/httpserver.rs index e7082ef22..f25bde5a4 100644 --- a/shotover-proxy/src/admin/httpserver.rs +++ b/shotover-proxy/src/admin/httpserver.rs @@ -4,8 +4,7 @@ use hyper::{ }; use bytes::Bytes; -use metrics_core::{Builder, Drain, Observe}; -use metrics_runtime::observers::{JsonBuilder, PrometheusBuilder}; +use metrics_exporter_prometheus::PrometheusHandle; use std::convert::Infallible; use std::{net::SocketAddr, sync::Arc}; use tracing::{error, trace}; @@ -13,8 +12,8 @@ use tracing_subscriber::reload::Handle; use tracing_subscriber::EnvFilter; /// Exports metrics over HTTP. -pub struct LogFilterHttpExporter { - controller: C, +pub struct LogFilterHttpExporter { + recorder_handle: PrometheusHandle, address: SocketAddr, handle: Handle, } @@ -24,7 +23,7 @@ where S: tracing::Subscriber + 'static, { use std::str; - let body = str::from_utf8(&bytes.as_ref()).map_err(|e| format!("{}", e))?; + let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?; trace!(request.body = ?body); let new_filter = body .parse::() @@ -39,17 +38,20 @@ fn rsp(status: StatusCode, body: impl Into) -> Response { .expect("builder with known status code must not fail") } -impl LogFilterHttpExporter +impl LogFilterHttpExporter where - C: Observe + Send + Sync + 'static, S: tracing::Subscriber + 'static, { /// Creates a new [`HttpExporter`] that listens on the given `address`. /// /// Observers expose their output by being converted into strings. - pub fn new(controller: C, address: SocketAddr, handle: Handle) -> Self { + pub fn new( + recorder_handle: PrometheusHandle, + address: SocketAddr, + handle: Handle, + ) -> Self { LogFilterHttpExporter { - controller, + recorder_handle, address, handle, } @@ -58,7 +60,7 @@ where /// Starts an HTTP server on the `address` the exporter was originally configured with, /// responding to any request with the output of the configured observer. pub async fn async_run(self) -> hyper::Result<()> { - let controller = Arc::new(self.controller); + let controller = Arc::new(self.recorder_handle); let handle = Arc::new(self.handle); let make_svc = make_service_fn(move |_| { @@ -73,21 +75,7 @@ where async move { let response = match (req.method(), req.uri().path()) { (&Method::GET, "/metrics") => { - let output = match req.uri().query() { - Some("x-accept=application/json") => { - let builder = JsonBuilder::new(); - let mut observer = builder.build(); - controller.observe(&mut observer); - observer.drain() - } - _ => { - let builder = PrometheusBuilder::new(); - let mut observer = builder.build(); - controller.observe(&mut observer); - observer.drain() - } - }; - Response::new(Body::from(output)) + Response::new(Body::from(controller.as_ref().render())) } (&Method::PUT, "/filter") => { trace!("setting filter"); diff --git a/shotover-proxy/src/runner.rs b/shotover-proxy/src/runner.rs index ef6079042..fb3032826 100644 --- a/shotover-proxy/src/runner.rs +++ b/shotover-proxy/src/runner.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use anyhow::{anyhow, Result}; use clap::{crate_version, Clap}; -use metrics_runtime::Receiver; +use metrics_exporter_prometheus::PrometheusBuilder; use tokio::runtime::{self, Runtime}; use tokio::signal; use tokio::sync::broadcast; @@ -81,14 +81,12 @@ impl Runner { } pub fn with_observability_interface(self) -> Result { - let receiver = Receiver::builder() - .build() - .expect("failed to create receiver"); - let socket: SocketAddr = self.config.observability_interface.parse()?; - let exporter = - LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone()); + let recorder = PrometheusBuilder::new().build(); + let handle = recorder.handle(); + metrics::set_boxed_recorder(Box::new(recorder))?; - receiver.install(); + let socket: SocketAddr = self.config.observability_interface.parse()?; + let exporter = LogFilterHttpExporter::new(handle, socket, self.tracing.handle.clone()); self.runtime.spawn(exporter.async_run()); Ok(self) diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 110d9bca3..a275f5577 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -142,7 +142,7 @@ impl TcpCodecListener { // error here is non-recoverable. let socket = self.accept().await?; - gauge!("shotover_available_connections", self.limit_connections.available_permits() as i64 ,"source" => self.source_name.clone()); + gauge!("shotover_available_connections", self.limit_connections.available_permits() as f64,"source" => self.source_name.clone()); let peer = socket .peer_addr() diff --git a/shotover-proxy/src/transforms/chain.rs b/shotover-proxy/src/transforms/chain.rs index f47a3018f..aed6ce2b6 100644 --- a/shotover-proxy/src/transforms/chain.rs +++ b/shotover-proxy/src/transforms/chain.rs @@ -5,7 +5,7 @@ use anyhow::{anyhow, Result}; use futures::TryFutureExt; use itertools::Itertools; -use metrics::{counter, timing}; +use metrics::{counter, histogram}; use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Receiver as OneReceiver; @@ -191,12 +191,11 @@ impl TransformChain { wrapper.reset(iter); let result = wrapper.call_next_transform().await; - let end = Instant::now(); counter!("shotover_chain_total", 1, "chain" => self.name.clone()); if result.is_err() { counter!("shotover_chain_failures", 1, "chain" => self.name.clone()) } - timing!("shotover_chain_latency", start, end, "chain" => self.name.clone(), "client_details" => client_details); + histogram!("shotover_chain_latency", start.elapsed(), "chain" => self.name.clone(), "client_details" => client_details); result } } diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index 73e82517c..de6bc638e 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -14,7 +14,7 @@ use crate::message::{Message, Messages}; use crate::transforms::cassandra::cassandra_codec_destination::{ CodecConfiguration, CodecDestination, }; -use metrics::{counter, timing}; +use metrics::{counter, histogram}; use crate::transforms::chain::TransformChain; use crate::transforms::coalesce::{Coalesce, CoalesceConfig}; @@ -294,13 +294,11 @@ impl<'a> Wrapper<'a> { let result = CONTEXT_CHAIN_NAME .scope(chain_name, transform.transform(self)) .await; - let end = Instant::now(); - counter!("shotover_transform_total", 1, "transform" => transform_name); if result.is_err() { counter!("shotover_transform_failures", 1, "transform" => transform_name) } - timing!("shotover_transform_latency", start, end, "transform" => transform_name); + histogram!("shotover_transform_latency", start.elapsed(), "transform" => transform_name); result } diff --git a/shotover-proxy/tests/admin/mod.rs b/shotover-proxy/tests/admin/mod.rs new file mode 100644 index 000000000..cdc007157 --- /dev/null +++ b/shotover-proxy/tests/admin/mod.rs @@ -0,0 +1 @@ +mod observability_int_tests; diff --git a/shotover-proxy/tests/admin/observability_int_tests.rs b/shotover-proxy/tests/admin/observability_int_tests.rs new file mode 100644 index 000000000..424279b1e --- /dev/null +++ b/shotover-proxy/tests/admin/observability_int_tests.rs @@ -0,0 +1,36 @@ +use crate::helpers::ShotoverManager; +use serial_test::serial; +use test_helpers::docker_compose::DockerCompose; + +#[test] +#[serial] +fn test_metrics() { + let _compose = DockerCompose::new("examples/redis-passthrough/docker-compose.yml"); + + let shotover_manager = + ShotoverManager::from_topology_file("examples/redis-passthrough/topology.yaml"); + + let mut connection = shotover_manager.redis_connection(6379); + + redis::cmd("SET") + .arg("the_key") + .arg(42) + .execute(&mut connection); + + redis::cmd("SET") + .arg("the_key") + .arg(43) + .execute(&mut connection); + + let body = reqwest::blocking::get("http://localhost:9001/metrics") + .unwrap() + .text() + .unwrap(); + + // If the body contains these substrings, we can assume metrics are working + assert!(body.contains("# TYPE shotover_transform_total counter")); + assert!(body.contains("# TYPE shotover_chain_total counter")); + assert!(body.contains("# TYPE shotover_available_connections gauge")); + assert!(body.contains("# TYPE shotover_transform_latency summary")); + assert!(body.contains("# TYPE shotover_chain_latency summary")); +} diff --git a/shotover-proxy/tests/helpers/mod.rs b/shotover-proxy/tests/helpers/mod.rs index 07dded6f8..06373cd46 100644 --- a/shotover-proxy/tests/helpers/mod.rs +++ b/shotover-proxy/tests/helpers/mod.rs @@ -21,7 +21,11 @@ impl ShotoverManager { config_file: "config/config.yaml".into(), ..ConfigOpts::default() }; - let spawn = Runner::new(opts).unwrap().run_spawn(); + let spawn = Runner::new(opts) + .unwrap() + .with_observability_interface() + .unwrap() + .run_spawn(); // If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it. // This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail. @@ -56,6 +60,9 @@ impl ShotoverManager { impl Drop for ShotoverManager { fn drop(&mut self) { + // Must clear the recorder before skipping a shutdown on panic; if one test panics and the recorder is not cleared, + // the following tests will panic because they will try to set another recorder + metrics::clear_recorder(); if std::thread::panicking() { // If already panicking do nothing in order to avoid a double panic. // We only shutdown shotover to test the shutdown process not because we need to clean up any resources. diff --git a/shotover-proxy/tests/lib.rs b/shotover-proxy/tests/lib.rs index 60a1407b8..564a1c72b 100644 --- a/shotover-proxy/tests/lib.rs +++ b/shotover-proxy/tests/lib.rs @@ -1,3 +1,4 @@ +pub mod admin; pub mod codec; mod helpers; pub mod redis_int_tests;