From c9c2c109806166e2b053f18ca9d8f35f7f1a5494 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 19 Nov 2024 13:54:24 +0100 Subject: [PATCH 1/3] controller: factor out wallclock lag metrics In preparation of having the storage controller export wallclock lag metrics too, this commit factors out the common infrastructure from `mz-compute-client` and moves it into `mz-cluster-client`. --- src/cluster-client/src/lib.rs | 1 + src/cluster-client/src/metrics.rs | 134 ++++++++++++++++++ src/compute-client/src/controller.rs | 8 +- src/compute-client/src/controller/instance.rs | 2 +- src/compute-client/src/metrics.rs | 103 +++----------- src/controller/src/lib.rs | 6 +- 6 files changed, 164 insertions(+), 90 deletions(-) create mode 100644 src/cluster-client/src/metrics.rs diff --git a/src/cluster-client/src/lib.rs b/src/cluster-client/src/lib.rs index 5c5aad384a677..e4784254339ad 100644 --- a/src/cluster-client/src/lib.rs +++ b/src/cluster-client/src/lib.rs @@ -20,6 +20,7 @@ use anyhow::bail; use serde::{Deserialize, Serialize}; pub mod client; +pub mod metrics; /// A function that computes the lag between the given time and wallclock time. pub type WallclockLagFn = Arc Duration + Send + Sync>; diff --git a/src/cluster-client/src/metrics.rs b/src/cluster-client/src/metrics.rs new file mode 100644 index 0000000000000..0249b8fa4635c --- /dev/null +++ b/src/cluster-client/src/metrics.rs @@ -0,0 +1,134 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Metrics shared by both compute and storage. + +use std::time::Duration; + +use mz_ore::metric; +use mz_ore::metrics::{ + CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry, +}; +use mz_ore::stats::SlidingMinMax; +use prometheus::core::{AtomicF64, AtomicU64}; + +/// Controller metrics. +#[derive(Debug, Clone)] +pub struct ControllerMetrics { + dataflow_wallclock_lag_seconds: GaugeVec, + dataflow_wallclock_lag_seconds_sum: CounterVec, + dataflow_wallclock_lag_seconds_count: IntCounterVec, +} + +impl ControllerMetrics { + /// Create a metrics instance registered into the given registry. + pub fn new(metrics_registry: &MetricsRegistry) -> Self { + Self { + // The next three metrics immitate a summary metric type. The `prometheus` crate lacks + // support for summaries, so we roll our own. Note that we also only expose the 0- and + // the 1-quantile, i.e., minimum and maximum lag values. + dataflow_wallclock_lag_seconds: metrics_registry.register(metric!( + name: "mz_dataflow_wallclock_lag_seconds", + help: "A summary of the second-by-second lag of the dataflow frontier relative \ + to wallclock time, aggregated over the last minute.", + var_labels: ["instance_id", "replica_id", "collection_id", "quantile"], + )), + dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!( + name: "mz_dataflow_wallclock_lag_seconds_sum", + help: "The total sum of dataflow wallclock lag measurements.", + var_labels: ["instance_id", "replica_id", "collection_id"], + )), + dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!( + name: "mz_dataflow_wallclock_lag_seconds_count", + help: "The total count of dataflow wallclock lag measurements.", + var_labels: ["instance_id", "replica_id", "collection_id"], + )), + } + } + + /// Return an object that tracks wallclock lag metrics for the given collection on the given + /// cluster and replica. + pub fn wallclock_lag_metrics( + &self, + collection_id: String, + instance_id: Option, + replica_id: Option, + ) -> WallclockLagMetrics { + let labels = vec![ + instance_id.unwrap_or_default(), + replica_id.unwrap_or_default(), + collection_id.to_string(), + ]; + + let labels_with_quantile = |quantile: &str| { + labels + .iter() + .cloned() + .chain([quantile.to_string()]) + .collect() + }; + + let wallclock_lag_seconds_min = self + .dataflow_wallclock_lag_seconds + .get_delete_on_drop_metric(labels_with_quantile("0")); + let wallclock_lag_seconds_max = self + .dataflow_wallclock_lag_seconds + .get_delete_on_drop_metric(labels_with_quantile("1")); + let wallclock_lag_seconds_sum = self + .dataflow_wallclock_lag_seconds_sum + .get_delete_on_drop_metric(labels.clone()); + let wallclock_lag_seconds_count = self + .dataflow_wallclock_lag_seconds_count + .get_delete_on_drop_metric(labels); + let wallclock_lag_minmax = SlidingMinMax::new(60); + + WallclockLagMetrics { + wallclock_lag_seconds_min, + wallclock_lag_seconds_max, + wallclock_lag_seconds_sum, + wallclock_lag_seconds_count, + wallclock_lag_minmax, + } + } +} + +/// Metrics tracking frontier wallclock lag for a collection. +#[derive(Debug)] +pub struct WallclockLagMetrics { + /// Gauge tracking minimum dataflow wallclock lag. + wallclock_lag_seconds_min: DeleteOnDropGauge<'static, AtomicF64, Vec>, + /// Gauge tracking maximum dataflow wallclock lag. + wallclock_lag_seconds_max: DeleteOnDropGauge<'static, AtomicF64, Vec>, + /// Counter tracking the total sum of dataflow wallclock lag. + wallclock_lag_seconds_sum: DeleteOnDropCounter<'static, AtomicF64, Vec>, + /// Counter tracking the total count of dataflow wallclock lag measurements. + wallclock_lag_seconds_count: DeleteOnDropCounter<'static, AtomicU64, Vec>, + + /// State maintaining minimum and maximum wallclock lag. + wallclock_lag_minmax: SlidingMinMax, +} + +impl WallclockLagMetrics { + /// Observe a new wallclock lag measurement. + pub fn observe(&mut self, lag: Duration) { + let lag_secs = lag.as_secs_f32(); + + self.wallclock_lag_minmax.add_sample(lag_secs); + + let (&min, &max) = self + .wallclock_lag_minmax + .get() + .expect("just added a sample"); + + self.wallclock_lag_seconds_min.set(min.into()); + self.wallclock_lag_seconds_max.set(max.into()); + self.wallclock_lag_seconds_sum.inc_by(lag_secs.into()); + self.wallclock_lag_seconds_count.inc(); + } +} diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index e649e2369d86f..538b12e64b6db 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -38,6 +38,7 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use mz_build_info::BuildInfo; use mz_cluster_client::client::ClusterReplicaLocation; +use mz_cluster_client::metrics::ControllerMetrics; use mz_cluster_client::{ReplicaId, WallclockLagFn}; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET; @@ -239,7 +240,8 @@ impl ComputeController { storage_collections: StorageCollections, envd_epoch: NonZeroI64, read_only: bool, - metrics_registry: MetricsRegistry, + metrics_registry: &MetricsRegistry, + controller_metrics: ControllerMetrics, now: NowFn, wallclock_lag: WallclockLagFn, ) -> Self { @@ -291,6 +293,8 @@ impl ComputeController { } }); + let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics); + Self { instances: BTreeMap::new(), instance_workload_classes, @@ -302,7 +306,7 @@ impl ComputeController { arrangement_exert_proportionality: 16, stashed_response: None, envd_epoch, - metrics: ComputeControllerMetrics::new(metrics_registry), + metrics, now, wallclock_lag, dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()), diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 2420cadafdc98..5a7038d097ee0 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -564,7 +564,7 @@ impl Instance { } if let Some(metrics) = &mut collection.metrics { - metrics.observe_wallclock_lag(lag); + metrics.wallclock_lag.observe(lag); }; } } diff --git a/src/compute-client/src/metrics.rs b/src/compute-client/src/metrics.rs index 77ea4a20abd8d..60b8310f7ca4d 100644 --- a/src/compute-client/src/metrics.rs +++ b/src/compute-client/src/metrics.rs @@ -13,16 +13,17 @@ use std::borrow::Borrow; use std::sync::Arc; use std::time::Duration; +use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics}; use mz_cluster_client::ReplicaId; use mz_compute_types::ComputeInstanceId; use mz_ore::cast::CastFrom; use mz_ore::metric; use mz_ore::metrics::raw::UIntGaugeVec; use mz_ore::metrics::{ - CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, - HistogramVec, IntCounterVec, MetricVecExt, MetricsRegistry, + DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec, + IntCounterVec, MetricVecExt, MetricsRegistry, }; -use mz_ore::stats::{histogram_seconds_buckets, SlidingMinMax}; +use mz_ore::stats::histogram_seconds_buckets; use mz_repr::GlobalId; use mz_service::codec::StatsCollector; use prometheus::core::{AtomicF64, AtomicU64}; @@ -30,7 +31,6 @@ use prometheus::core::{AtomicF64, AtomicU64}; use crate::protocol::command::{ComputeCommand, ProtoComputeCommand}; use crate::protocol::response::{PeekResponse, ProtoComputeResponse}; -type Counter = DeleteOnDropCounter<'static, AtomicF64, Vec>; pub(crate) type IntCounter = DeleteOnDropCounter<'static, AtomicU64, Vec>; type Gauge = DeleteOnDropGauge<'static, AtomicF64, Vec>; /// TODO(database-issues#7533): Add documentation. @@ -68,14 +68,14 @@ pub struct ComputeControllerMetrics { // dataflows dataflow_initial_output_duration_seconds: GaugeVec, - dataflow_wallclock_lag_seconds: GaugeVec, - dataflow_wallclock_lag_seconds_sum: CounterVec, - dataflow_wallclock_lag_seconds_count: IntCounterVec, + + /// Metrics shared with the storage controller. + shared: ControllerMetrics, } impl ComputeControllerMetrics { /// Create a metrics instance registered into the given registry. - pub fn new(metrics_registry: MetricsRegistry) -> Self { + pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self { ComputeControllerMetrics { commands_total: metrics_registry.register(metric!( name: "mz_compute_commands_total", @@ -174,25 +174,7 @@ impl ComputeControllerMetrics { var_labels: ["instance_id", "replica_id", "collection_id"], )), - // The next three metrics immitate a summary metric type. The `prometheus` crate lacks - // support for summaries, so we roll our own. Note that we also only expose the 0- and - // the 1-quantile, i.e., minimum and maximum lag values. - dataflow_wallclock_lag_seconds: metrics_registry.register(metric!( - name: "mz_dataflow_wallclock_lag_seconds", - help: "A summary of the second-by-second lag of the dataflow frontier relative \ - to wallclock time, aggregated over the last minute.", - var_labels: ["instance_id", "replica_id", "collection_id", "quantile"], - )), - dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!( - name: "mz_dataflow_wallclock_lag_seconds_sum", - help: "The total sum of dataflow wallclock lag measurements.", - var_labels: ["instance_id", "replica_id", "collection_id"], - )), - dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!( - name: "mz_dataflow_wallclock_lag_seconds_count", - help: "The total count of dataflow wallclock lag measurements.", - var_labels: ["instance_id", "replica_id", "collection_id"], - )), + shared, } } @@ -418,44 +400,20 @@ impl ReplicaMetrics { collection_id.to_string(), ]; - let labels_with_quantile = |quantile: &str| { - labels - .iter() - .cloned() - .chain([quantile.to_string()]) - .collect() - }; - let initial_output_duration_seconds = self .metrics .dataflow_initial_output_duration_seconds .get_delete_on_drop_metric(labels.clone()); - let wallclock_lag_seconds_min = self - .metrics - .dataflow_wallclock_lag_seconds - .get_delete_on_drop_metric(labels_with_quantile("0")); - let wallclock_lag_seconds_max = self - .metrics - .dataflow_wallclock_lag_seconds - .get_delete_on_drop_metric(labels_with_quantile("1")); - let wallclock_lag_seconds_sum = self - .metrics - .dataflow_wallclock_lag_seconds_sum - .get_delete_on_drop_metric(labels.clone()); - let wallclock_lag_seconds_count = self - .metrics - .dataflow_wallclock_lag_seconds_count - .get_delete_on_drop_metric(labels); - let wallclock_lag_minmax = SlidingMinMax::new(60); + let wallclock_lag = self.metrics.shared.wallclock_lag_metrics( + collection_id.to_string(), + Some(self.instance_id.to_string()), + Some(self.replica_id.to_string()), + ); Some(ReplicaCollectionMetrics { initial_output_duration_seconds, - wallclock_lag_seconds_min, - wallclock_lag_seconds_max, - wallclock_lag_seconds_sum, - wallclock_lag_seconds_count, - wallclock_lag_minmax, + wallclock_lag, }) } } @@ -484,35 +442,8 @@ impl StatsCollector for ReplicaMetric pub(crate) struct ReplicaCollectionMetrics { /// Gauge tracking dataflow hydration time. pub initial_output_duration_seconds: Gauge, - /// Gauge tracking minimum dataflow wallclock lag. - wallclock_lag_seconds_min: Gauge, - /// Gauge tracking maximum dataflow wallclock lag. - wallclock_lag_seconds_max: Gauge, - /// Counter tracking the total sum of dataflow wallclock lag. - wallclock_lag_seconds_sum: Counter, - /// Counter tracking the total count of dataflow wallclock lag measurements. - wallclock_lag_seconds_count: IntCounter, - - /// State maintaining minimum and maximum wallclock lag. - wallclock_lag_minmax: SlidingMinMax, -} - -impl ReplicaCollectionMetrics { - pub fn observe_wallclock_lag(&mut self, lag: Duration) { - let lag_secs = lag.as_secs_f32(); - - self.wallclock_lag_minmax.add_sample(lag_secs); - - let (&min, &max) = self - .wallclock_lag_minmax - .get() - .expect("just added a sample"); - - self.wallclock_lag_seconds_min.set(min.into()); - self.wallclock_lag_seconds_max.set(max.into()); - self.wallclock_lag_seconds_sum.inc_by(lag_secs.into()); - self.wallclock_lag_seconds_count.inc(); - } + /// Metrics tracking dataflow wallclock lag. + pub wallclock_lag: WallclockLagMetrics, } /// Metrics keyed by `ComputeCommand` type. diff --git a/src/controller/src/lib.rs b/src/controller/src/lib.rs index 90afb94bd5fff..760bd0ed7f040 100644 --- a/src/controller/src/lib.rs +++ b/src/controller/src/lib.rs @@ -30,6 +30,7 @@ use std::time::Duration; use futures::future::BoxFuture; use mz_build_info::BuildInfo; +use mz_cluster_client::metrics::ControllerMetrics; use mz_cluster_client::{ReplicaId, WallclockLagFn}; use mz_compute_client::controller::{ ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification, @@ -649,6 +650,8 @@ where Duration::from(lag_ts) }); + let controller_metrics = ControllerMetrics::new(&config.metrics_registry); + let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry)); let collections_ctl = storage_collections::StorageCollectionsImpl::new( config.persist_location.clone(), @@ -688,7 +691,8 @@ where storage_collections, envd_epoch, read_only, - config.metrics_registry.clone(), + &config.metrics_registry, + controller_metrics, config.now.clone(), wallclock_lag, ); From ccadbaffef4d98d484195d564f77711c7f8472bc Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 21 Nov 2024 15:17:32 +0100 Subject: [PATCH 2/3] storage-controller: clean up CollectionState init This commit makes minor changes to the code structure around `CollectionState` initialization in the storage controller. This removes some redundancy, but more importantly makes it easier to attach `WallclockLagMetrics` to the `CollectionState` in the next commit. --- src/storage-controller/src/lib.rs | 69 +++++++++++++++++++------------ 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index deb180b81972e..c1755cf5c0aec 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -660,7 +660,9 @@ where let mut new_source_statistic_entries = BTreeSet::new(); let mut new_webhook_statistic_entries = BTreeSet::new(); - for (id, mut description, write, metadata) in to_register { + for (id, description, write, metadata) in to_register { + let mut data_source = description.data_source; + to_execute.insert(id); new_collections.insert(id); @@ -668,7 +670,7 @@ where // This is done in an awkward spot to appease the borrow checker. // TODO(database-issues#8620): This will be removed once sources no longer export // to primary collections and only export to explicit SourceExports (tables). - if let DataSource::Ingestion(ingestion) = &mut description.data_source { + if let DataSource::Ingestion(ingestion) = &mut data_source { if let Some(export) = ingestion.desc.primary_source_export() { ingestion.source_exports.insert(id, export); } @@ -677,8 +679,7 @@ where let write_frontier = write.upper(); // Determine if this collection has another dependency. - let storage_dependencies = - self.determine_collection_dependencies(id, &description.data_source)?; + let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?; let dependency_read_holds = self .storage_collections @@ -723,17 +724,14 @@ where ); } - let mut collection_state = CollectionState { - data_source: description.data_source.clone(), - collection_metadata: metadata.clone(), - extra_state: CollectionStateExtra::None, - wallclock_lag_max: Default::default(), - }; - - // Install the collection state in the appropriate spot. - match &collection_state.data_source { + // Perform data source-specific setup. + let mut extra_state = CollectionStateExtra::None; + match &data_source { DataSource::Introspection(typ) => { - debug!(data_source = ?collection_state.data_source, meta = ?metadata, "registering {} with persist monotonic worker", id); + debug!( + ?data_source, meta = ?metadata, + "registering {id} with persist monotonic worker", + ); // We always register the collection with the collection manager, // regardless of read-only mode. The CollectionManager itself is // aware of read-only mode and will not attempt to write before told @@ -745,11 +743,12 @@ where write, persist_client.clone(), )?; - self.collections.insert(id, collection_state); } DataSource::Webhook => { - debug!(data_source = ?collection_state.data_source, meta = ?metadata, "registering {} with persist monotonic worker", id); - self.collections.insert(id, collection_state); + debug!( + ?data_source, meta = ?metadata, + "registering {id} with persist monotonic worker", + ); new_source_statistic_entries.insert(id); // This collection of statistics is periodically aggregated into // `source_statistics`. @@ -767,7 +766,10 @@ where details, data_config, } => { - debug!(data_source = ?collection_state.data_source, meta = ?metadata, "not registering {} with a controller persist worker", id); + debug!( + ?data_source, meta = ?metadata, + "not registering {id} with a controller persist worker", + ); // Adjust the source to contain this export. let ingestion_state = self .collections @@ -810,22 +812,28 @@ where hydrated: false, }; - collection_state.extra_state = CollectionStateExtra::Ingestion(ingestion_state); + extra_state = CollectionStateExtra::Ingestion(ingestion_state); - self.collections.insert(id, collection_state); new_source_statistic_entries.insert(id); } DataSource::Table => { - debug!(data_source = ?collection_state.data_source, meta = ?metadata, "registering {} with persist table worker", id); - self.collections.insert(id, collection_state); + debug!( + ?data_source, meta = ?metadata, + "registering {id} with persist table worker", + ); table_registers.push((id, write)); } DataSource::Progress | DataSource::Other => { - debug!(data_source = ?collection_state.data_source, meta = ?metadata, "not registering {} with a controller persist worker", id); - self.collections.insert(id, collection_state); + debug!( + ?data_source, meta = ?metadata, + "not registering {id} with a controller persist worker", + ); } DataSource::Ingestion(ingestion_desc) => { - debug!(?ingestion_desc, meta = ?metadata, "not registering {} with a controller persist worker", id); + debug!( + ?data_source, meta = ?metadata, + "not registering {id} with a controller persist worker", + ); let mut dependency_since = Antichain::from_elem(T::minimum()); for read_hold in dependency_read_holds.iter() { @@ -842,12 +850,19 @@ where hydrated: false, }; - collection_state.extra_state = CollectionStateExtra::Ingestion(ingestion_state); + extra_state = CollectionStateExtra::Ingestion(ingestion_state); - self.collections.insert(id, collection_state); new_source_statistic_entries.insert(id); } } + + let collection_state = CollectionState { + data_source, + collection_metadata: metadata, + extra_state, + wallclock_lag_max: Default::default(), + }; + self.collections.insert(id, collection_state); } { From 6dbb2ab1804467416b0a7699785f2e05143d6ff1 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 21 Nov 2024 15:22:21 +0100 Subject: [PATCH 3/3] controller: export wallclock lag metrics for storage collections This commit wires the `ControllerMetrics` through to the storage controller, creates `WallclockLagMetrics` objects in the `CollectionState`s/`ExportState`s of all storage collections, and uses those to update the wallclock lag metrics during every maintenance call. --- src/controller/src/lib.rs | 3 +- src/storage-client/src/controller.rs | 5 +++ src/storage-client/src/metrics.rs | 18 ++++++++++- src/storage-controller/src/history.rs | 4 ++- src/storage-controller/src/lib.rs | 45 ++++++++++++++++++++++----- 5 files changed, 64 insertions(+), 11 deletions(-) diff --git a/src/controller/src/lib.rs b/src/controller/src/lib.rs index 760bd0ed7f040..2171bedd717a3 100644 --- a/src/controller/src/lib.rs +++ b/src/controller/src/lib.rs @@ -678,7 +678,8 @@ where Arc::clone(&txns_metrics), envd_epoch, read_only, - config.metrics_registry.clone(), + &config.metrics_registry, + controller_metrics.clone(), config.connection_context, storage_txn, Arc::clone(&collections_ctl), diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index ad298698ab741..6458e318d6530 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -29,6 +29,7 @@ use std::time::Duration; use async_trait::async_trait; use differential_dataflow::lattice::Lattice; use mz_cluster_client::client::ClusterReplicaLocation; +use mz_cluster_client::metrics::WallclockLagMetrics; use mz_cluster_client::ReplicaId; use mz_ore::collections::CollectionExt; use mz_persist_client::read::{Cursor, ReadHandle}; @@ -814,6 +815,8 @@ pub struct ExportState { /// Maximum frontier wallclock lag since the last introspection update. pub wallclock_lag_max: Duration, + /// Frontier wallclock lag metrics tracked for this collection. + pub wallclock_lag_metrics: WallclockLagMetrics, } impl ExportState { @@ -821,6 +824,7 @@ impl ExportState { description: ExportDescription, read_hold: ReadHold, read_policy: ReadPolicy, + wallclock_lag_metrics: WallclockLagMetrics, ) -> Self { Self { description, @@ -828,6 +832,7 @@ impl ExportState { read_policy, write_frontier: Antichain::from_elem(Timestamp::minimum()), wallclock_lag_max: Default::default(), + wallclock_lag_metrics, } } diff --git a/src/storage-client/src/metrics.rs b/src/storage-client/src/metrics.rs index b21b7860be3b1..cb6fbf2cfdffc 100644 --- a/src/storage-client/src/metrics.rs +++ b/src/storage-client/src/metrics.rs @@ -11,6 +11,7 @@ use std::sync::Arc; +use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics}; use mz_cluster_client::ReplicaId; use mz_ore::cast::{CastFrom, TryCastFrom}; use mz_ore::metric; @@ -19,6 +20,7 @@ use mz_ore::metrics::{ MetricsRegistry, UIntGaugeVec, }; use mz_ore::stats::HISTOGRAM_BYTE_BUCKETS; +use mz_repr::GlobalId; use mz_service::codec::StatsCollector; use mz_storage_types::instances::StorageInstanceId; use prometheus::core::AtomicU64; @@ -35,10 +37,13 @@ pub struct StorageControllerMetrics { startup_prepared_statements_kept: prometheus::IntGauge, regressed_offset_known: IntCounterVec, history_command_count: UIntGaugeVec, + + /// Metrics shared with the compute controller. + shared: ControllerMetrics, } impl StorageControllerMetrics { - pub fn new(metrics_registry: MetricsRegistry) -> Self { + pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self { Self { messages_sent_bytes: metrics_registry.register(metric!( name: "mz_storage_messages_sent_bytes", @@ -66,6 +71,8 @@ impl StorageControllerMetrics { help: "The number of commands in the controller's command history.", var_labels: ["instance_id", "command_type"], )), + + shared, } } @@ -77,6 +84,15 @@ impl StorageControllerMetrics { .get_delete_on_drop_metric(vec![id.to_string()]) } + pub fn wallclock_lag_metrics( + &self, + id: GlobalId, + instance_id: Option, + ) -> WallclockLagMetrics { + self.shared + .wallclock_lag_metrics(id.to_string(), instance_id.map(|x| x.to_string()), None) + } + pub fn for_instance(&self, id: StorageInstanceId) -> InstanceMetrics { InstanceMetrics { instance_id: id, diff --git a/src/storage-controller/src/history.rs b/src/storage-controller/src/history.rs index 9560590ba0750..79b3117b8b789 100644 --- a/src/storage-controller/src/history.rs +++ b/src/storage-controller/src/history.rs @@ -203,6 +203,7 @@ impl CommandHistory { mod tests { use std::str::FromStr; + use mz_cluster_client::metrics::ControllerMetrics; use mz_ore::metrics::MetricsRegistry; use mz_ore::url::SensitiveUrl; use mz_persist_types::PersistLocation; @@ -231,7 +232,8 @@ mod tests { fn history() -> CommandHistory { let registry = MetricsRegistry::new(); - let metrics = StorageControllerMetrics::new(registry) + let controller_metrics = ControllerMetrics::new(®istry); + let metrics = StorageControllerMetrics::new(®istry, controller_metrics) .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID")) .for_history(); diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index c1755cf5c0aec..48527242f2eae 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -27,6 +27,7 @@ use futures::StreamExt; use itertools::Itertools; use mz_build_info::BuildInfo; use mz_cluster_client::client::ClusterReplicaLocation; +use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics}; use mz_cluster_client::{ReplicaId, WallclockLagFn}; use mz_controller_types::dyncfgs::{ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_REFRESH_INTERVAL}; use mz_ore::collections::CollectionExt; @@ -726,6 +727,7 @@ where // Perform data source-specific setup. let mut extra_state = CollectionStateExtra::None; + let mut maybe_instance_id = None; match &data_source { DataSource::Introspection(typ) => { debug!( @@ -813,6 +815,7 @@ where }; extra_state = CollectionStateExtra::Ingestion(ingestion_state); + maybe_instance_id = Some(instance_id); new_source_statistic_entries.insert(id); } @@ -851,17 +854,21 @@ where }; extra_state = CollectionStateExtra::Ingestion(ingestion_state); + maybe_instance_id = Some(ingestion_desc.instance_id); new_source_statistic_entries.insert(id); } } + let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id); let collection_state = CollectionState { data_source, collection_metadata: metadata, extra_state, wallclock_lag_max: Default::default(), + wallclock_lag_metrics, }; + self.collections.insert(id, collection_state); } @@ -1258,10 +1265,17 @@ where "create_exports: creating sink" ); - self.exports.insert( - id, - ExportState::new(description.clone(), read_hold, read_policy), + let wallclock_lag_metrics = self + .metrics + .wallclock_lag_metrics(id, Some(description.instance_id)); + + let export_state = ExportState::new( + description.clone(), + read_hold, + read_policy, + wallclock_lag_metrics, ); + self.exports.insert(id, export_state); // Just like with `new_source_statistic_entries`, we can probably // `insert` here, but in the interest of safety, never override @@ -1309,12 +1323,17 @@ where return Err(StorageError::ReadBeforeSince(from_id)); } + let wallclock_lag_metrics = self + .metrics + .wallclock_lag_metrics(id, Some(new_description.instance_id)); + let new_export = ExportState { description: new_description.clone(), read_hold, read_policy: cur_export.read_policy.clone(), write_frontier: cur_export.write_frontier.clone(), wallclock_lag_max: Default::default(), + wallclock_lag_metrics, }; *cur_export = new_export; @@ -2391,7 +2410,8 @@ where txns_metrics: Arc, envd_epoch: NonZeroI64, read_only: bool, - metrics_registry: MetricsRegistry, + metrics_registry: &MetricsRegistry, + controller_metrics: ControllerMetrics, connection_context: ConnectionContext, txn: &dyn StorageTxn, storage_collections: Arc + Send + Sync>, @@ -2459,6 +2479,8 @@ where let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel(); + let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics); + Self { build_info, collections: BTreeMap::default(), @@ -2489,7 +2511,7 @@ where internal_response_queue: rx, persist_location, persist: persist_clients, - metrics: StorageControllerMetrics::new(metrics_registry), + metrics, recorded_frontiers: BTreeMap::new(), recorded_replica_frontiers: BTreeMap::new(), wallclock_lag, @@ -3418,7 +3440,8 @@ where .differential_append(id, replica_updates); } - /// Update introspection with the current wallclock lag values. + /// Refresh the `WallclockLagHistory` introspection and the `wallclock_lag_*_seconds` metrics + /// with the current lag values. /// /// We measure the lag of write frontiers behind the wallclock time every second and track the /// maximum over 60 measurements (i.e., one minute). Every minute, we emit a new lag event to @@ -3426,7 +3449,7 @@ where /// /// This method is invoked by `ComputeController::maintain`, which we expect to be called once /// per second during normal operation. - fn update_wallclock_lag_introspection(&mut self) { + fn refresh_wallclock_lag(&mut self) { let refresh_introspection = !self.read_only && self.wallclock_lag_last_refresh.elapsed() >= WALLCLOCK_LAG_REFRESH_INTERVAL.get(self.config.config_set()); @@ -3462,6 +3485,8 @@ where let row = pack_row(id, lag); updates.push((row, 1)); } + + collection.wallclock_lag_metrics.observe(lag); } let active_exports = self.exports.iter_mut().filter(|(_id, e)| !e.is_dropped()); @@ -3474,6 +3499,8 @@ where let row = pack_row(*id, lag); updates.push((row, 1)); } + + export.wallclock_lag_metrics.observe(lag); } if let Some(updates) = introspection_updates { @@ -3488,7 +3515,7 @@ where /// for tasks that need to run periodically, such as state cleanup or updating of metrics. fn maintain(&mut self) { self.update_frontier_introspection(); - self.update_wallclock_lag_introspection(); + self.refresh_wallclock_lag(); } } @@ -3673,6 +3700,8 @@ struct CollectionState { /// Maximum frontier wallclock lag since the last introspection update. wallclock_lag_max: Duration, + /// Frontier wallclock lag metrics tracked for this collection. + wallclock_lag_metrics: WallclockLagMetrics, } /// Additional state that the controller maintains for select collection types.