Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: export wallclock lag metrics also for storage collections #30568

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cluster-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use anyhow::bail;
use serde::{Deserialize, Serialize};

pub mod client;
pub mod metrics;

/// A function that computes the lag between the given time and wallclock time.
pub type WallclockLagFn<T> = Arc<dyn Fn(&T) -> Duration + Send + Sync>;
Expand Down
134 changes: 134 additions & 0 deletions src/cluster-client/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Metrics shared by both compute and storage.

use std::time::Duration;

use mz_ore::metric;
use mz_ore::metrics::{
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry,
};
use mz_ore::stats::SlidingMinMax;
use prometheus::core::{AtomicF64, AtomicU64};

/// Controller metrics.
#[derive(Debug, Clone)]
pub struct ControllerMetrics {
dataflow_wallclock_lag_seconds: GaugeVec,
dataflow_wallclock_lag_seconds_sum: CounterVec,
dataflow_wallclock_lag_seconds_count: IntCounterVec,
}

impl ControllerMetrics {
/// Create a metrics instance registered into the given registry.
pub fn new(metrics_registry: &MetricsRegistry) -> Self {
Self {
// The next three metrics immitate a summary metric type. The `prometheus` crate lacks
// support for summaries, so we roll our own. Note that we also only expose the 0- and
// the 1-quantile, i.e., minimum and maximum lag values.
dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds",
help: "A summary of the second-by-second lag of the dataflow frontier relative \
to wallclock time, aggregated over the last minute.",
var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
)),
dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_sum",
help: "The total sum of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_count",
help: "The total count of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
}
}

/// Return an object that tracks wallclock lag metrics for the given collection on the given
/// cluster and replica.
pub fn wallclock_lag_metrics(
&self,
collection_id: String,
instance_id: Option<String>,
replica_id: Option<String>,
) -> WallclockLagMetrics {
let labels = vec![
instance_id.unwrap_or_default(),
replica_id.unwrap_or_default(),
collection_id.to_string(),
];

let labels_with_quantile = |quantile: &str| {
labels
.iter()
.cloned()
.chain([quantile.to_string()])
.collect()
};

let wallclock_lag_seconds_min = self
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("0"));
let wallclock_lag_seconds_max = self
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("1"));
let wallclock_lag_seconds_sum = self
.dataflow_wallclock_lag_seconds_sum
.get_delete_on_drop_metric(labels.clone());
let wallclock_lag_seconds_count = self
.dataflow_wallclock_lag_seconds_count
.get_delete_on_drop_metric(labels);
let wallclock_lag_minmax = SlidingMinMax::new(60);

WallclockLagMetrics {
wallclock_lag_seconds_min,
wallclock_lag_seconds_max,
wallclock_lag_seconds_sum,
wallclock_lag_seconds_count,
wallclock_lag_minmax,
}
}
}

/// Metrics tracking frontier wallclock lag for a collection.
#[derive(Debug)]
pub struct WallclockLagMetrics {
/// Gauge tracking minimum dataflow wallclock lag.
wallclock_lag_seconds_min: DeleteOnDropGauge<'static, AtomicF64, Vec<String>>,
/// Gauge tracking maximum dataflow wallclock lag.
wallclock_lag_seconds_max: DeleteOnDropGauge<'static, AtomicF64, Vec<String>>,
/// Counter tracking the total sum of dataflow wallclock lag.
wallclock_lag_seconds_sum: DeleteOnDropCounter<'static, AtomicF64, Vec<String>>,
/// Counter tracking the total count of dataflow wallclock lag measurements.
wallclock_lag_seconds_count: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,

/// State maintaining minimum and maximum wallclock lag.
wallclock_lag_minmax: SlidingMinMax<f32>,
}

impl WallclockLagMetrics {
/// Observe a new wallclock lag measurement.
pub fn observe(&mut self, lag: Duration) {
let lag_secs = lag.as_secs_f32();

self.wallclock_lag_minmax.add_sample(lag_secs);

let (&min, &max) = self
.wallclock_lag_minmax
.get()
.expect("just added a sample");

self.wallclock_lag_seconds_min.set(min.into());
self.wallclock_lag_seconds_max.set(max.into());
self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
self.wallclock_lag_seconds_count.inc();
}
}
8 changes: 6 additions & 2 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::client::ClusterReplicaLocation;
use mz_cluster_client::metrics::ControllerMetrics;
use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
Expand Down Expand Up @@ -239,7 +240,8 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
storage_collections: StorageCollections<T>,
envd_epoch: NonZeroI64,
read_only: bool,
metrics_registry: MetricsRegistry,
metrics_registry: &MetricsRegistry,
controller_metrics: ControllerMetrics,
now: NowFn,
wallclock_lag: WallclockLagFn<T>,
) -> Self {
Expand Down Expand Up @@ -291,6 +293,8 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
}
});

let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);

Self {
instances: BTreeMap::new(),
instance_workload_classes,
Expand All @@ -302,7 +306,7 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
arrangement_exert_proportionality: 16,
stashed_response: None,
envd_epoch,
metrics: ComputeControllerMetrics::new(metrics_registry),
metrics,
now,
wallclock_lag,
dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
}

if let Some(metrics) = &mut collection.metrics {
metrics.observe_wallclock_lag(lag);
metrics.wallclock_lag.observe(lag);
};
}
}
Expand Down
103 changes: 17 additions & 86 deletions src/compute-client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ use std::borrow::Borrow;
use std::sync::Arc;
use std::time::Duration;

use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
use mz_cluster_client::ReplicaId;
use mz_compute_types::ComputeInstanceId;
use mz_ore::cast::CastFrom;
use mz_ore::metric;
use mz_ore::metrics::raw::UIntGaugeVec;
use mz_ore::metrics::{
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec,
HistogramVec, IntCounterVec, MetricVecExt, MetricsRegistry,
DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec,
IntCounterVec, MetricVecExt, MetricsRegistry,
};
use mz_ore::stats::{histogram_seconds_buckets, SlidingMinMax};
use mz_ore::stats::histogram_seconds_buckets;
use mz_repr::GlobalId;
use mz_service::codec::StatsCollector;
use prometheus::core::{AtomicF64, AtomicU64};

use crate::protocol::command::{ComputeCommand, ProtoComputeCommand};
use crate::protocol::response::{PeekResponse, ProtoComputeResponse};

type Counter = DeleteOnDropCounter<'static, AtomicF64, Vec<String>>;
pub(crate) type IntCounter = DeleteOnDropCounter<'static, AtomicU64, Vec<String>>;
type Gauge = DeleteOnDropGauge<'static, AtomicF64, Vec<String>>;
/// TODO(database-issues#7533): Add documentation.
Expand Down Expand Up @@ -68,14 +68,14 @@ pub struct ComputeControllerMetrics {

// dataflows
dataflow_initial_output_duration_seconds: GaugeVec,
dataflow_wallclock_lag_seconds: GaugeVec,
dataflow_wallclock_lag_seconds_sum: CounterVec,
dataflow_wallclock_lag_seconds_count: IntCounterVec,

/// Metrics shared with the storage controller.
shared: ControllerMetrics,
}

impl ComputeControllerMetrics {
/// Create a metrics instance registered into the given registry.
pub fn new(metrics_registry: MetricsRegistry) -> Self {
pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
ComputeControllerMetrics {
commands_total: metrics_registry.register(metric!(
name: "mz_compute_commands_total",
Expand Down Expand Up @@ -174,25 +174,7 @@ impl ComputeControllerMetrics {
var_labels: ["instance_id", "replica_id", "collection_id"],
)),

// The next three metrics immitate a summary metric type. The `prometheus` crate lacks
// support for summaries, so we roll our own. Note that we also only expose the 0- and
// the 1-quantile, i.e., minimum and maximum lag values.
dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds",
help: "A summary of the second-by-second lag of the dataflow frontier relative \
to wallclock time, aggregated over the last minute.",
var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
)),
dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_sum",
help: "The total sum of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_count",
help: "The total count of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
shared,
}
}

Expand Down Expand Up @@ -418,44 +400,20 @@ impl ReplicaMetrics {
collection_id.to_string(),
];

let labels_with_quantile = |quantile: &str| {
labels
.iter()
.cloned()
.chain([quantile.to_string()])
.collect()
};

let initial_output_duration_seconds = self
.metrics
.dataflow_initial_output_duration_seconds
.get_delete_on_drop_metric(labels.clone());

let wallclock_lag_seconds_min = self
.metrics
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("0"));
let wallclock_lag_seconds_max = self
.metrics
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("1"));
let wallclock_lag_seconds_sum = self
.metrics
.dataflow_wallclock_lag_seconds_sum
.get_delete_on_drop_metric(labels.clone());
let wallclock_lag_seconds_count = self
.metrics
.dataflow_wallclock_lag_seconds_count
.get_delete_on_drop_metric(labels);
let wallclock_lag_minmax = SlidingMinMax::new(60);
let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
collection_id.to_string(),
Some(self.instance_id.to_string()),
Some(self.replica_id.to_string()),
);

Some(ReplicaCollectionMetrics {
initial_output_duration_seconds,
wallclock_lag_seconds_min,
wallclock_lag_seconds_max,
wallclock_lag_seconds_sum,
wallclock_lag_seconds_count,
wallclock_lag_minmax,
wallclock_lag,
})
}
}
Expand Down Expand Up @@ -484,35 +442,8 @@ impl StatsCollector<ProtoComputeCommand, ProtoComputeResponse> for ReplicaMetric
pub(crate) struct ReplicaCollectionMetrics {
/// Gauge tracking dataflow hydration time.
pub initial_output_duration_seconds: Gauge,
/// Gauge tracking minimum dataflow wallclock lag.
wallclock_lag_seconds_min: Gauge,
/// Gauge tracking maximum dataflow wallclock lag.
wallclock_lag_seconds_max: Gauge,
/// Counter tracking the total sum of dataflow wallclock lag.
wallclock_lag_seconds_sum: Counter,
/// Counter tracking the total count of dataflow wallclock lag measurements.
wallclock_lag_seconds_count: IntCounter,

/// State maintaining minimum and maximum wallclock lag.
wallclock_lag_minmax: SlidingMinMax<f32>,
}

impl ReplicaCollectionMetrics {
pub fn observe_wallclock_lag(&mut self, lag: Duration) {
let lag_secs = lag.as_secs_f32();

self.wallclock_lag_minmax.add_sample(lag_secs);

let (&min, &max) = self
.wallclock_lag_minmax
.get()
.expect("just added a sample");

self.wallclock_lag_seconds_min.set(min.into());
self.wallclock_lag_seconds_max.set(max.into());
self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
self.wallclock_lag_seconds_count.inc();
}
/// Metrics tracking dataflow wallclock lag.
pub wallclock_lag: WallclockLagMetrics,
}

/// Metrics keyed by `ComputeCommand` type.
Expand Down
9 changes: 7 additions & 2 deletions src/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::time::Duration;

use futures::future::BoxFuture;
use mz_build_info::BuildInfo;
use mz_cluster_client::metrics::ControllerMetrics;
use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_compute_client::controller::{
ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
Expand Down Expand Up @@ -649,6 +650,8 @@ where
Duration::from(lag_ts)
});

let controller_metrics = ControllerMetrics::new(&config.metrics_registry);

let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
let collections_ctl = storage_collections::StorageCollectionsImpl::new(
config.persist_location.clone(),
Expand All @@ -675,7 +678,8 @@ where
Arc::clone(&txns_metrics),
envd_epoch,
read_only,
config.metrics_registry.clone(),
&config.metrics_registry,
controller_metrics.clone(),
config.connection_context,
storage_txn,
Arc::clone(&collections_ctl),
Expand All @@ -688,7 +692,8 @@ where
storage_collections,
envd_epoch,
read_only,
config.metrics_registry.clone(),
&config.metrics_registry,
controller_metrics,
config.now.clone(),
wallclock_lag,
);
Expand Down
Loading