diff --git a/libafl/src/monitors/prometheus.rs b/libafl/src/monitors/prometheus.rs index 1817b33cfa..8dcf8d769d 100644 --- a/libafl/src/monitors/prometheus.rs +++ b/libafl/src/monitors/prometheus.rs @@ -28,8 +28,9 @@ //! When using docker, you may need to point `prometheus.yml` to the `docker0` interface or `host.docker.internal` use alloc::{borrow::Cow, fmt::Debug, string::String, vec::Vec}; -use core::{fmt, time::Duration}; +use core::{fmt, fmt::Write, time::Duration}; use std::{ + string::ToString, sync::{atomic::AtomicU64, Arc}, thread, }; @@ -46,8 +47,21 @@ use prometheus_client::{ // using tide for the HTTP server library (fast, async, simple) use tide::Request; +use super::Aggregator; use crate::monitors::{ClientStats, Monitor, UserStatsValue}; +/// Prometheus metrics for global and each client. +#[derive(Clone, Debug, Default)] +pub struct PrometheusStats { + corpus_count: Family, + objective_count: Family, + executions: Family, + exec_rate: Family>, + runtime: Family, + clients_count: Family, + custom_stat: Family>, +} + /// Tracking monitor during fuzzing. #[derive(Clone)] pub struct PrometheusMonitor @@ -56,14 +70,10 @@ where { print_fn: F, start_time: Duration, - client_stats: Vec, - corpus_count: Family, - objective_count: Family, - executions: Family, - exec_rate: Family>, - runtime: Family, - clients_count: Family, - custom_stat: Family>, + prometheus_global_stats: PrometheusStats, // global prometheus metrics + prometheus_client_stats: PrometheusStats, // per-client prometheus metrics + client_stats: Vec, // per-client statistics + aggregator: Aggregator, // aggregator for global custom statistics } impl Debug for PrometheusMonitor @@ -102,64 +112,80 @@ where self.start_time = time; } + /// aggregate client stats + fn aggregate(&mut self, name: &str) { + self.aggregator.aggregate(name, &self.client_stats); + } + #[allow(clippy::cast_sign_loss)] fn display(&mut self, event_msg: &str, sender_id: ClientId) { // Update the prometheus metrics - // Label each metric with the sender / client_id // The gauges must take signed i64's, with max value of 2^63-1 so it is // probably fair to error out at a count of nine quintillion across any // of these counts. // realistically many of these metrics should be counters but would // require a fair bit of logic to handle "amount to increment given // time since last observation" + + // Global (aggregated) metrics let corpus_size = self.corpus_size(); - self.corpus_count + self.prometheus_global_stats + .corpus_count .get_or_create(&Labels { - client: sender_id.0, + client: Cow::from("global"), stat: Cow::from(""), }) .set(corpus_size.try_into().unwrap()); + let objective_size = self.objective_size(); - self.objective_count + self.prometheus_global_stats + .objective_count .get_or_create(&Labels { - client: sender_id.0, + client: Cow::from("global"), stat: Cow::from(""), }) .set(objective_size.try_into().unwrap()); + let total_execs = self.total_execs(); - self.executions + self.prometheus_global_stats + .executions .get_or_create(&Labels { - client: sender_id.0, + client: Cow::from("global"), stat: Cow::from(""), }) .set(total_execs.try_into().unwrap()); + let execs_per_sec = self.execs_per_sec(); - self.exec_rate + self.prometheus_global_stats + .exec_rate .get_or_create(&Labels { - client: sender_id.0, + client: Cow::from("global"), stat: Cow::from(""), }) .set(execs_per_sec); + let run_time = (current_time() - self.start_time).as_secs(); - self.runtime + self.prometheus_global_stats + .runtime .get_or_create(&Labels { - client: sender_id.0, + client: Cow::from("global"), stat: Cow::from(""), }) .set(run_time.try_into().unwrap()); // run time in seconds, which can be converted to a time format by Grafana or similar + let total_clients = self.client_stats_count().try_into().unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...) - self.clients_count + self.prometheus_global_stats + .clients_count .get_or_create(&Labels { - client: sender_id.0, + client: Cow::from("global"), stat: Cow::from(""), }) .set(total_clients); // display stats in a SimpleMonitor format - let fmt = format!( - "[Prometheus] [{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", + let mut global_fmt = format!( + "[Prometheus] [{} #GLOBAL] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", event_msg, - sender_id.0, format_duration_hms(&(current_time() - self.start_time)), self.client_stats_count(), self.corpus_size(), @@ -167,31 +193,151 @@ where self.total_execs(), self.execs_per_sec_pretty() ); - (self.print_fn)(&fmt); + for (key, val) in &self.aggregator.aggregated { + // print global aggregated custom stats + write!(global_fmt, ", {key}: {val}").unwrap(); + #[allow(clippy::cast_precision_loss)] + let value: f64 = match val { + UserStatsValue::Number(n) => *n as f64, + UserStatsValue::Float(f) => *f, + UserStatsValue::String(_s) => 0.0, + UserStatsValue::Ratio(a, b) => { + if key == "edges" { + self.prometheus_global_stats + .custom_stat + .get_or_create(&Labels { + client: Cow::from("global"), + stat: Cow::from("edges_total"), + }) + .set(*b as f64); + self.prometheus_global_stats + .custom_stat + .get_or_create(&Labels { + client: Cow::from("global"), + stat: Cow::from("edges_hit"), + }) + .set(*a as f64); + } + (*a as f64 / *b as f64) * 100.0 + } + UserStatsValue::Percent(p) => *p * 100.0, + }; + self.prometheus_global_stats + .custom_stat + .get_or_create(&Labels { + client: Cow::from("global"), + stat: Cow::from(key.clone()), + }) + .set(value); + } + + (self.print_fn)(&global_fmt); + + // Client-specific metrics self.client_stats_insert(sender_id); - let cur_client = self.client_stats_mut_for(sender_id); - let cur_client_clone = cur_client.clone(); + let client = self.client_stats_for(sender_id); + let mut cur_client_clone = client.clone(); + + self.prometheus_client_stats + .corpus_count + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from(""), + }) + .set(cur_client_clone.corpus_size.try_into().unwrap()); + + self.prometheus_client_stats + .objective_count + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from(""), + }) + .set(cur_client_clone.objective_size.try_into().unwrap()); + + self.prometheus_client_stats + .executions + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from(""), + }) + .set(cur_client_clone.executions.try_into().unwrap()); + + self.prometheus_client_stats + .exec_rate + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from(""), + }) + .set(cur_client_clone.execs_per_sec(current_time())); + + let client_run_time = (current_time() - cur_client_clone.start_time).as_secs(); + self.prometheus_client_stats + .runtime + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from(""), + }) + .set(client_run_time.try_into().unwrap()); // run time in seconds per-client, which can be converted to a time format by Grafana or similar + + self.prometheus_global_stats + .clients_count + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from(""), + }) + .set(total_clients); + + let mut fmt = format!( + "[Prometheus] [{} #{}] corpus: {}, objectives: {}, executions: {}, exec/sec: {}", + event_msg, + sender_id.0, + client.corpus_size, + client.objective_size, + client.executions, + cur_client_clone.execs_per_sec_pretty(current_time()) + ); for (key, val) in cur_client_clone.user_monitor { + // print the custom stats for each client + write!(fmt, ", {key}: {val}").unwrap(); // Update metrics added to the user_stats hashmap by feedback event-fires // You can filter for each custom stat in promQL via labels of both the stat name and client id - log::info!("{key}: {val}"); #[allow(clippy::cast_precision_loss)] let value: f64 = match val.value() { UserStatsValue::Number(n) => *n as f64, UserStatsValue::Float(f) => *f, UserStatsValue::String(_s) => 0.0, - UserStatsValue::Ratio(a, b) => (*a as f64 / *b as f64) * 100.0, + UserStatsValue::Ratio(a, b) => { + if key == "edges" { + self.prometheus_client_stats + .custom_stat + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from("edges_total"), + }) + .set(*b as f64); + self.prometheus_client_stats + .custom_stat + .get_or_create(&Labels { + client: Cow::from(sender_id.0.to_string()), + stat: Cow::from("edges_hit"), + }) + .set(*a as f64); + } + (*a as f64 / *b as f64) * 100.0 + } UserStatsValue::Percent(p) => *p * 100.0, }; - self.custom_stat + self.prometheus_client_stats + .custom_stat .get_or_create(&Labels { - client: sender_id.0, + client: Cow::from(sender_id.0.to_string()), stat: key.clone(), }) .set(value); } + (self.print_fn)(&fmt); } } @@ -203,33 +349,18 @@ where /// The `listener` is the address to send logs to. /// The `print_fn` is the printing function that can output the logs otherwise. pub fn new(listener: String, print_fn: F) -> Self { - // Gauge's implementation of clone uses Arc - let corpus_count = Family::::default(); - let corpus_count_clone = corpus_count.clone(); - let objective_count = Family::::default(); - let objective_count_clone = objective_count.clone(); - let executions = Family::::default(); - let executions_clone = executions.clone(); - let exec_rate = Family::>::default(); - let exec_rate_clone = exec_rate.clone(); - let runtime = Family::::default(); - let runtime_clone = runtime.clone(); - let clients_count = Family::::default(); - let clients_count_clone = clients_count.clone(); - let custom_stat = Family::>::default(); - let custom_stat_clone = custom_stat.clone(); + let prometheus_global_stats = PrometheusStats::default(); + let prometheus_global_stats_clone = prometheus_global_stats.clone(); + let prometheus_client_stats = PrometheusStats::default(); + let prometheus_client_stats_clone = prometheus_client_stats.clone(); + let client_stats = Vec::::default(); // Need to run the metrics server in a different thread to avoid blocking thread::spawn(move || { block_on(serve_metrics( listener, - corpus_count_clone, - objective_count_clone, - executions_clone, - exec_rate_clone, - runtime_clone, - clients_count_clone, - custom_stat_clone, + prometheus_global_stats_clone, + prometheus_client_stats_clone, )) .map_err(|err| log::error!("{err:?}")) .ok(); @@ -237,43 +368,25 @@ where Self { print_fn, start_time: current_time(), - client_stats: vec![], - corpus_count, - objective_count, - executions, - exec_rate, - runtime, - clients_count, - custom_stat, + prometheus_global_stats, + prometheus_client_stats, + client_stats, + aggregator: Aggregator::new(), } } /// Creates the monitor with a given `start_time`. pub fn with_time(listener: String, print_fn: F, start_time: Duration) -> Self { - let corpus_count = Family::::default(); - let corpus_count_clone = corpus_count.clone(); - let objective_count = Family::::default(); - let objective_count_clone = objective_count.clone(); - let executions = Family::::default(); - let executions_clone = executions.clone(); - let exec_rate = Family::>::default(); - let exec_rate_clone = exec_rate.clone(); - let runtime = Family::::default(); - let runtime_clone = runtime.clone(); - let clients_count = Family::::default(); - let clients_count_clone = clients_count.clone(); - let custom_stat = Family::>::default(); - let custom_stat_clone = custom_stat.clone(); + let prometheus_global_stats = PrometheusStats::default(); + let prometheus_global_stats_clone = prometheus_global_stats.clone(); + let prometheus_client_stats = PrometheusStats::default(); + let prometheus_client_stats_clone = prometheus_client_stats.clone(); + let client_stats = Vec::::default(); thread::spawn(move || { block_on(serve_metrics( listener, - corpus_count_clone, - objective_count_clone, - executions_clone, - exec_rate_clone, - runtime_clone, - clients_count_clone, - custom_stat_clone, + prometheus_global_stats_clone, + prometheus_client_stats_clone, )) .map_err(|err| log::error!("{err:?}")) .ok(); @@ -281,58 +394,94 @@ where Self { print_fn, start_time, - client_stats: vec![], - corpus_count, - objective_count, - executions, - exec_rate, - runtime, - clients_count, - custom_stat, + prometheus_global_stats, + prometheus_client_stats, + client_stats, + aggregator: Aggregator::new(), } } } /// Set up an HTTP endpoint /metrics -#[allow(clippy::too_many_arguments)] pub(crate) async fn serve_metrics( listener: String, - corpus: Family, - objectives: Family, - executions: Family, - exec_rate: Family>, - runtime: Family, - clients_count: Family, - custom_stat: Family>, + global_stats: PrometheusStats, + client_stats: PrometheusStats, ) -> Result<(), std::io::Error> { let mut registry = Registry::default(); - registry.register("corpus_count", "Number of test cases in the corpus", corpus); + // Register the global stats registry.register( - "objective_count", + "global_corpus_count", + "Number of test cases in the corpus", + global_stats.corpus_count, + ); + registry.register( + "global_objective_count", "Number of times the objective has been achieved (e.g., crashes)", - objectives, + global_stats.objective_count, + ); + registry.register( + "global_executions_total", + "Total number of executions", + global_stats.executions, + ); + registry.register( + "execution_rate", + "Rate of executions per second", + global_stats.exec_rate, + ); + registry.register( + "global_runtime", + "How long the fuzzer has been running for (seconds)", + global_stats.runtime, + ); + registry.register( + "global_clients_count", + "How many clients have been spawned for the fuzzing job", + global_stats.clients_count, + ); + registry.register( + "global_custom_stat", + "A metric to contain custom stats returned by feedbacks, filterable by label (aggregated)", + global_stats.custom_stat, + ); + + // Register the client stats + registry.register( + "corpus_count", + "Number of test cases in the client's corpus", + client_stats.corpus_count, + ); + registry.register( + "objective_count", + "Number of client's objectives (e.g., crashes)", + client_stats.objective_count, ); registry.register( "executions_total", - "Number of executions the fuzzer has done", - executions, + "Total number of client executions", + client_stats.executions, + ); + registry.register( + "execution_rate", + "Rate of executions per second", + client_stats.exec_rate, ); - registry.register("execution_rate", "Rate of executions per second", exec_rate); registry.register( "runtime", - "How long the fuzzer has been running for (seconds)", - runtime, + "How long the client has been running for (seconds)", + client_stats.runtime, ); registry.register( "clients_count", "How many clients have been spawned for the fuzzing job", - clients_count, + client_stats.clients_count, ); registry.register( "custom_stat", "A metric to contain custom stats returned by feedbacks, filterable by label", - custom_stat, + client_stats.custom_stat, ); let mut app = tide::with_state(State { @@ -359,7 +508,7 @@ pub(crate) async fn serve_metrics( #[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)] pub struct Labels { /// The `sender_id` helps to differentiate between clients when multiple are spawned. - client: u32, + client: Cow<'static, str>, /// Used for `custom_stat` filtering. stat: Cow<'static, str>, }