diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index cc3144e5be22..e2eef855ee69 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -378,25 +378,6 @@ message MonitoringInfoSpecs { value: "URN utilized to report user metric." }] }]; - - // Represents per worker metrics - PER_WORKER_LATENCY_METRIC= 23 [(monitoring_info_spec) = { - urn: "beam:metrics:per_worker_metric:v1", - type: "beam:metrics:per_worker_histogram_int64:v1", - required_labels: [ - "PTRANSFORM" - ], - annotations: [ - { - key: "description", - value: "Histogram counts for request latencies made to IO service APIs to batch read or write elements." - }, - { - key: "units", - value: "Milliseconds" - } - ] - }]; } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 551550012bf5..12a26ff8a390 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -401,7 +401,7 @@ public MetricUpdates getUpdates() { return metricToMonitoringMetadata( metricKey, MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE, - MonitoringInfoConstants.Urns.PER_WORKER_LATENCY_METRIC); + MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM); } /** @@ -656,7 +656,7 @@ public void update(Iterable monitoringInfos) { break; case PER_WORKER_HISTOGRAM_TYPE: - updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info + updateForPerWorkerHistogramInt64(monitoringInfo); break; default: LOG.warn("Unsupported metric type {}", monitoringInfo.getType()); @@ -830,6 +830,7 @@ public static MetricsContainerImpl deltaContainer( deltaValueCell.incTopBucketCount( currValue.getTopBucketCount() - prevValue.getTopBucketCount()); } + // Do the same for perWorkerHistograms for (Map.Entry, HistogramCell> cell : curr.perWorkerHistograms.entries()) { HistogramData.BucketType bt = cell.getKey().getValue(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index c818e422f5e5..3c6441a52e5a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -42,7 +42,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; -// TODO(naireenhussain): Refactor out DataflowHistogramValue to be runner agnostic, and rename to +// TODO(#33093): Refactor out DataflowHistogramValue to be runner agnostic, and rename to // remove Dataflow reference. /** A set of functions used to encode and decode common monitoring info types. */