Skip to content

Commit

Permalink
Update metrics and pktparse (#151)
Browse files Browse the repository at this point in the history
json metrics support is removed as it was decided we dont need it anymore
  • Loading branch information
conorbros authored Aug 30, 2021
1 parent f7f83f9 commit cbcfa61
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 46 deletions.
8 changes: 4 additions & 4 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ serde_yaml = "0.8"
bincode = "1.3.1"

#Observability
metrics = "0.12.1"
metrics-core = "0.5.2"
metrics-runtime = "0.13.1"
metrics = "0.17.0"
metrics-exporter-prometheus = "0.6.0"
tracing = { version = "0.1.15", features = ["release_max_level_info"]}
tracing-subscriber = { version = "0.2.11", features = ["env-filter"]}
tracing-futures = "0.2.4"
Expand Down Expand Up @@ -72,14 +71,15 @@ rusoto_signature = "0.47.0"
criterion = { version = "0.3", features = ["async_tokio", "html_reports"] }
redis = { version = "0.21.0", features = ["tokio-comp", "cluster"] }
pcap = "0.8.1"
pktparse = {version = "0.4.0", features = ["derive"]}
pktparse = {version = "0.6.1", features = ["serde"]}
dns-parser = "0.8"
tls-parser = "0.10.0"
threadpool = "1.0"
num_cpus = "1.0"
serial_test = "0.5.1"
test-helpers = { path = "../test-helpers" }
hex-literal = "0.3.3"
reqwest = { version = "0.11.4", features = ["blocking"] }

[[bench]]
name = "redis_benches"
Expand Down
38 changes: 13 additions & 25 deletions shotover-proxy/src/admin/httpserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use hyper::{
};

use bytes::Bytes;
use metrics_core::{Builder, Drain, Observe};
use metrics_runtime::observers::{JsonBuilder, PrometheusBuilder};
use metrics_exporter_prometheus::PrometheusHandle;
use std::convert::Infallible;
use std::{net::SocketAddr, sync::Arc};
use tracing::{error, trace};
use tracing_subscriber::reload::Handle;
use tracing_subscriber::EnvFilter;

/// Exports metrics over HTTP.
pub struct LogFilterHttpExporter<C, S> {
controller: C,
pub struct LogFilterHttpExporter<S> {
recorder_handle: PrometheusHandle,
address: SocketAddr,
handle: Handle<EnvFilter, S>,
}
Expand All @@ -24,7 +23,7 @@ where
S: tracing::Subscriber + 'static,
{
use std::str;
let body = str::from_utf8(&bytes.as_ref()).map_err(|e| format!("{}", e))?;
let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?;
trace!(request.body = ?body);
let new_filter = body
.parse::<tracing_subscriber::filter::EnvFilter>()
Expand All @@ -39,17 +38,20 @@ fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
.expect("builder with known status code must not fail")
}

impl<C, S> LogFilterHttpExporter<C, S>
impl<S> LogFilterHttpExporter<S>
where
C: Observe + Send + Sync + 'static,
S: tracing::Subscriber + 'static,
{
/// Creates a new [`HttpExporter`] that listens on the given `address`.
///
/// Observers expose their output by being converted into strings.
pub fn new(controller: C, address: SocketAddr, handle: Handle<EnvFilter, S>) -> Self {
pub fn new(
recorder_handle: PrometheusHandle,
address: SocketAddr,
handle: Handle<EnvFilter, S>,
) -> Self {
LogFilterHttpExporter {
controller,
recorder_handle,
address,
handle,
}
Expand All @@ -58,7 +60,7 @@ where
/// Starts an HTTP server on the `address` the exporter was originally configured with,
/// responding to any request with the output of the configured observer.
pub async fn async_run(self) -> hyper::Result<()> {
let controller = Arc::new(self.controller);
let controller = Arc::new(self.recorder_handle);
let handle = Arc::new(self.handle);

let make_svc = make_service_fn(move |_| {
Expand All @@ -73,21 +75,7 @@ where
async move {
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let output = match req.uri().query() {
Some("x-accept=application/json") => {
let builder = JsonBuilder::new();
let mut observer = builder.build();
controller.observe(&mut observer);
observer.drain()
}
_ => {
let builder = PrometheusBuilder::new();
let mut observer = builder.build();
controller.observe(&mut observer);
observer.drain()
}
};
Response::new(Body::from(output))
Response::new(Body::from(controller.as_ref().render()))
}
(&Method::PUT, "/filter") => {
trace!("setting filter");
Expand Down
14 changes: 6 additions & 8 deletions shotover-proxy/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::net::SocketAddr;

use anyhow::{anyhow, Result};
use clap::{crate_version, Clap};
use metrics_runtime::Receiver;
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::runtime::{self, Runtime};
use tokio::signal;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -81,14 +81,12 @@ impl Runner {
}

pub fn with_observability_interface(self) -> Result<Self> {
let receiver = Receiver::builder()
.build()
.expect("failed to create receiver");
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter =
LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone());
let recorder = PrometheusBuilder::new().build();
let handle = recorder.handle();
metrics::set_boxed_recorder(Box::new(recorder))?;

receiver.install();
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(handle, socket, self.tracing.handle.clone());
self.runtime.spawn(exporter.async_run());

Ok(self)
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
// error here is non-recoverable.
let socket = self.accept().await?;

gauge!("shotover_available_connections", self.limit_connections.available_permits() as i64 ,"source" => self.source_name.clone());
gauge!("shotover_available_connections", self.limit_connections.available_permits() as f64,"source" => self.source_name.clone());

let peer = socket
.peer_addr()
Expand Down
5 changes: 2 additions & 3 deletions shotover-proxy/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{anyhow, Result};
use futures::TryFutureExt;

use itertools::Itertools;
use metrics::{counter, timing};
use metrics::{counter, histogram};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver as OneReceiver;
Expand Down Expand Up @@ -191,12 +191,11 @@ impl TransformChain {
wrapper.reset(iter);

let result = wrapper.call_next_transform().await;
let end = Instant::now();
counter!("shotover_chain_total", 1, "chain" => self.name.clone());
if result.is_err() {
counter!("shotover_chain_failures", 1, "chain" => self.name.clone())
}
timing!("shotover_chain_latency", start, end, "chain" => self.name.clone(), "client_details" => client_details);
histogram!("shotover_chain_latency", start.elapsed(), "chain" => self.name.clone(), "client_details" => client_details);
result
}
}
6 changes: 2 additions & 4 deletions shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::message::{Message, Messages};
use crate::transforms::cassandra::cassandra_codec_destination::{
CodecConfiguration, CodecDestination,
};
use metrics::{counter, timing};
use metrics::{counter, histogram};

use crate::transforms::chain::TransformChain;
use crate::transforms::coalesce::{Coalesce, CoalesceConfig};
Expand Down Expand Up @@ -294,13 +294,11 @@ impl<'a> Wrapper<'a> {
let result = CONTEXT_CHAIN_NAME
.scope(chain_name, transform.transform(self))
.await;
let end = Instant::now();

counter!("shotover_transform_total", 1, "transform" => transform_name);
if result.is_err() {
counter!("shotover_transform_failures", 1, "transform" => transform_name)
}
timing!("shotover_transform_latency", start, end, "transform" => transform_name);
histogram!("shotover_transform_latency", start.elapsed(), "transform" => transform_name);
result
}

Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/tests/admin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod observability_int_tests;
36 changes: 36 additions & 0 deletions shotover-proxy/tests/admin/observability_int_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::helpers::ShotoverManager;
use serial_test::serial;
use test_helpers::docker_compose::DockerCompose;

#[test]
#[serial]
fn test_metrics() {
let _compose = DockerCompose::new("examples/redis-passthrough/docker-compose.yml");

let shotover_manager =
ShotoverManager::from_topology_file("examples/redis-passthrough/topology.yaml");

let mut connection = shotover_manager.redis_connection(6379);

redis::cmd("SET")
.arg("the_key")
.arg(42)
.execute(&mut connection);

redis::cmd("SET")
.arg("the_key")
.arg(43)
.execute(&mut connection);

let body = reqwest::blocking::get("http://localhost:9001/metrics")
.unwrap()
.text()
.unwrap();

// If the body contains these substrings, we can assume metrics are working
assert!(body.contains("# TYPE shotover_transform_total counter"));
assert!(body.contains("# TYPE shotover_chain_total counter"));
assert!(body.contains("# TYPE shotover_available_connections gauge"));
assert!(body.contains("# TYPE shotover_transform_latency summary"));
assert!(body.contains("# TYPE shotover_chain_latency summary"));
}
9 changes: 8 additions & 1 deletion shotover-proxy/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ impl ShotoverManager {
config_file: "config/config.yaml".into(),
..ConfigOpts::default()
};
let spawn = Runner::new(opts).unwrap().run_spawn();
let spawn = Runner::new(opts)
.unwrap()
.with_observability_interface()
.unwrap()
.run_spawn();

// If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it.
// This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail.
Expand Down Expand Up @@ -56,6 +60,9 @@ impl ShotoverManager {

impl Drop for ShotoverManager {
fn drop(&mut self) {
// Must clear the recorder before skipping a shutdown on panic; if one test panics and the recorder is not cleared,
// the following tests will panic because they will try to set another recorder
metrics::clear_recorder();
if std::thread::panicking() {
// If already panicking do nothing in order to avoid a double panic.
// We only shutdown shotover to test the shutdown process not because we need to clean up any resources.
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/tests/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod admin;
pub mod codec;
mod helpers;
pub mod redis_int_tests;

0 comments on commit cbcfa61

Please sign in to comment.