diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java index 81517129c8e9..c3e4fb1388b0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java @@ -22,9 +22,11 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.util.HistogramData; /** * An implementation of {@link MetricsContainer} that reads the current execution state (tracked in @@ -56,6 +58,11 @@ public Counter getCounter(MetricName metricName) { return getCurrentContainer().getCounter(metricName); } + @Override + public Counter getPerWorkerCounter(MetricName metricName) { + return getCurrentContainer().getPerWorkerCounter(metricName); + } + @Override public Distribution getDistribution(MetricName metricName) { return getCurrentContainer().getDistribution(metricName); @@ -65,4 +72,10 @@ public Distribution getDistribution(MetricName metricName) { public Gauge getGauge(MetricName metricName) { return getCurrentContainer().getGauge(metricName); } + + @Override + public Histogram getPerWorkerHistogram( + MetricName metricName, HistogramData.BucketType bucketType) { + return getCurrentContainer().getPerWorkerHistogram(metricName, bucketType); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 32b272c67280..4c1693d61387 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -465,6 +465,11 @@ public static void main(String[] args) throws Exception { // metrics. MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null)); + // When enabled, the Pipeline will record Per-Worker metrics that will be piped to WMW. + StreamingStepMetricsContainer.setEnablePerWorkerMetrics( + options.isEnableStreamingEngine() + && DataflowRunner.hasExperiment(options, "enable_per_worker_metrics")); + JvmInitializers.runBeforeProcessing(options); worker.startStatusPages(); worker.start(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 8c5b9c2f2b66..875a2d649ece 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -24,13 +24,17 @@ import javax.annotation.Nonnull; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.GaugeCell; +import org.apache.beam.runners.core.metrics.HistogramCell; import org.apache.beam.runners.core.metrics.MetricsMap; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; @@ -47,14 +51,22 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final String stepName; + private static Boolean enablePerWorkerMetrics; + private MetricsMap counters = new MetricsMap<>(DeltaCounterCell::new); + private MetricsMap perWorkerCounters = + new MetricsMap<>(DeltaCounterCell::new); + private MetricsMap gauges = new MetricsMap<>(GaugeCell::new); private MetricsMap distributions = new MetricsMap<>(DeltaDistributionCell::new); + private MetricsMap, HistogramCell> perWorkerHistograms = + new MetricsMap<>(HistogramCell::new); + private StreamingStepMetricsContainer(String stepName) { this.stepName = stepName; } @@ -73,6 +85,15 @@ public Counter getCounter(MetricName metricName) { return counters.get(metricName); } + @Override + public Counter getPerWorkerCounter(MetricName metricName) { + if (enablePerWorkerMetrics) { + return perWorkerCounters.get(metricName); + } else { + return MetricsContainer.super.getPerWorkerCounter(metricName); + } + } + @Override public Distribution getDistribution(MetricName metricName) { return distributions.get(metricName); @@ -83,6 +104,16 @@ public Gauge getGauge(MetricName metricName) { return gauges.get(metricName); } + @Override + public Histogram getPerWorkerHistogram( + MetricName metricName, HistogramData.BucketType bucketType) { + if (enablePerWorkerMetrics) { + return perWorkerHistograms.get(KV.of(metricName, bucketType)); + } else { + return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType); + } + } + public Iterable extractUpdates() { return counterUpdates().append(distributionUpdates()); } @@ -142,4 +173,8 @@ public static Iterable extractMetricUpdates( .getContainers() .transformAndConcat(StreamingStepMetricsContainer::extractUpdates); } + + public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics) { + StreamingStepMetricsContainer.enablePerWorkerMetrics = enablePerWorkerMetrics; + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 1a4c43905d20..9e6d45a2351b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; @@ -33,6 +34,9 @@ import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.NoOpCounter; +import org.apache.beam.sdk.metrics.NoOpHistogram; +import org.apache.beam.sdk.util.HistogramData; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -178,4 +182,22 @@ public void testDistributionUpdateExtraction() { .setMin(longToSplitInt(3)) .setSum(longToSplitInt(3))))); } + + @Test + public void testPerWorkerMetrics() { + StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false); + MetricsContainer metricsContainer = registry.getContainer("test_step"); + assertThat( + metricsContainer.getPerWorkerCounter(name1), sameInstance(NoOpCounter.getInstance())); + HistogramData.BucketType testBucket = HistogramData.LinearBuckets.of(1, 1, 1); + assertThat( + metricsContainer.getPerWorkerHistogram(name1, testBucket), + sameInstance(NoOpHistogram.getInstance())); + + StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true); + assertThat(metricsContainer.getPerWorkerCounter(name1), not(instanceOf(NoOpCounter.class))); + assertThat( + metricsContainer.getPerWorkerHistogram(name1, testBucket), + not(instanceOf(NoOpHistogram.class))); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index e93f8677b814..f48b9195c37c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -33,6 +33,14 @@ public interface MetricsContainer extends Serializable { */ Counter getCounter(MetricName metricName); + /** + * Return the {@link Counter} that should be used for implementing the given per-worker {@code metricName) + * in this container. + */ + default Counter getPerWorkerCounter(MetricName metricName) { + return NoOpCounter.getInstance(); + } + /** * Return the {@link Distribution} that should be used for implementing the given {@code * metricName} in this container. @@ -52,6 +60,14 @@ public interface MetricsContainer extends Serializable { default Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) { throw new RuntimeException("Histogram metric is not supported yet."); } + /** + * Return the {@link Histogram} that should be used for implementing the given per-worker {@code + * metricName} in this container. + */ + default Histogram getPerWorkerHistogram( + MetricName metricName, HistogramData.BucketType bucketType) { + return NoOpHistogram.getInstance(); + } /** Return the cumulative values for any metrics in this container as MonitoringInfos. */ default Iterable getMonitoringInfos() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java new file mode 100644 index 000000000000..ab4fa685f9c2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +/** + * A no-op implementation of Counter. This class exists to provide a default if an implementation of + * MetricsContainer does not override a Counter getter. + */ +public class NoOpCounter implements Counter { + + private static final NoOpCounter singleton = new NoOpCounter(); + private static final MetricName name = MetricName.named(NoOpCounter.class, "singleton"); + + private NoOpCounter() {} + + @Override + public void inc() {} + + @Override + public void inc(long n) {} + + @Override + public void dec() {} + + @Override + public void dec(long n) {} + + @Override + public MetricName getName() { + return name; + } + + public static NoOpCounter getInstance() { + return singleton; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java new file mode 100644 index 000000000000..a088223ffe2b --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +/** + * A no-op implementation of Histogram. This class exists to provide a default if an implementation + * of MetricsContainer does not override a Histogram getter. + */ +public class NoOpHistogram implements Histogram { + + private static final NoOpHistogram singleton = new NoOpHistogram(); + private static final MetricName name = MetricName.named(NoOpHistogram.class, "singleton"); + + private NoOpHistogram() {} + + @Override + public void update(double value) {} + + @Override + public MetricName getName() { + return name; + } + + public static NoOpHistogram getInstance() { + return singleton; + } +}