Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MetricsContainer::getPerWorker{Counter|Histogram} (#28903) #28923

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.HistogramData;

/**
* An implementation of {@link MetricsContainer} that reads the current execution state (tracked in
Expand Down Expand Up @@ -56,6 +58,11 @@ public Counter getCounter(MetricName metricName) {
return getCurrentContainer().getCounter(metricName);
}

@Override
public Counter getPerWorkerCounter(MetricName metricName) {
return getCurrentContainer().getPerWorkerCounter(metricName);
}

@Override
public Distribution getDistribution(MetricName metricName) {
return getCurrentContainer().getDistribution(metricName);
Expand All @@ -65,4 +72,10 @@ public Distribution getDistribution(MetricName metricName) {
public Gauge getGauge(MetricName metricName) {
return getCurrentContainer().getGauge(metricName);
}

@Override
public Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
return getCurrentContainer().getPerWorkerHistogram(metricName, bucketType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ public static void main(String[] args) throws Exception {
// metrics.
MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));

// When enabled, the Pipeline will record Per-Worker metrics that will be piped to WMW.
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
options.isEnableStreamingEngine()
&& DataflowRunner.hasExperiment(options, "enable_per_worker_metrics"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an experiment, or just on by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will leave this as an experiment until I've done some load tests. Then I will enable it as default for Streaming Engine Jobs.


JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
worker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeCell;
import org.apache.beam.runners.core.metrics.HistogramCell;
import org.apache.beam.runners.core.metrics.MetricsMap;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
Expand All @@ -47,14 +51,22 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private final String stepName;

private static Boolean enablePerWorkerMetrics;

private MetricsMap<MetricName, DeltaCounterCell> counters =
new MetricsMap<>(DeltaCounterCell::new);

private MetricsMap<MetricName, DeltaCounterCell> perWorkerCounters =
new MetricsMap<>(DeltaCounterCell::new);

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

private MetricsMap<MetricName, DeltaDistributionCell> distributions =
new MetricsMap<>(DeltaDistributionCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);

private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
}
Expand All @@ -73,6 +85,15 @@ public Counter getCounter(MetricName metricName) {
return counters.get(metricName);
}

@Override
public Counter getPerWorkerCounter(MetricName metricName) {
if (enablePerWorkerMetrics) {
return perWorkerCounters.get(metricName);
} else {
return MetricsContainer.super.getPerWorkerCounter(metricName);
}
}

@Override
public Distribution getDistribution(MetricName metricName) {
return distributions.get(metricName);
Expand All @@ -83,6 +104,16 @@ public Gauge getGauge(MetricName metricName) {
return gauges.get(metricName);
}

@Override
public Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
if (enablePerWorkerMetrics) {
return perWorkerHistograms.get(KV.of(metricName, bucketType));
} else {
return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
}
}

public Iterable<CounterUpdate> extractUpdates() {
return counterUpdates().append(distributionUpdates());
}
Expand Down Expand Up @@ -142,4 +173,8 @@ public static Iterable<CounterUpdate> extractMetricUpdates(
.getContainers()
.transformAndConcat(StreamingStepMetricsContainer::extractUpdates);
}

public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics) {
StreamingStepMetricsContainer.enablePerWorkerMetrics = enablePerWorkerMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;

Expand All @@ -33,6 +34,9 @@
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.NoOpCounter;
import org.apache.beam.sdk.metrics.NoOpHistogram;
import org.apache.beam.sdk.util.HistogramData;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -178,4 +182,22 @@ public void testDistributionUpdateExtraction() {
.setMin(longToSplitInt(3))
.setSum(longToSplitInt(3)))));
}

@Test
public void testPerWorkerMetrics() {
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);
MetricsContainer metricsContainer = registry.getContainer("test_step");
assertThat(
metricsContainer.getPerWorkerCounter(name1), sameInstance(NoOpCounter.getInstance()));
HistogramData.BucketType testBucket = HistogramData.LinearBuckets.of(1, 1, 1);
assertThat(
metricsContainer.getPerWorkerHistogram(name1, testBucket),
sameInstance(NoOpHistogram.getInstance()));

StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
assertThat(metricsContainer.getPerWorkerCounter(name1), not(instanceOf(NoOpCounter.class)));
assertThat(
metricsContainer.getPerWorkerHistogram(name1, testBucket),
not(instanceOf(NoOpHistogram.class)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public interface MetricsContainer extends Serializable {
*/
Counter getCounter(MetricName metricName);

/**
* Return the {@link Counter} that should be used for implementing the given per-worker {@code metricName)
* in this container.
*/
default Counter getPerWorkerCounter(MetricName metricName) {
return NoOpCounter.getInstance();
}

/**
* Return the {@link Distribution} that should be used for implementing the given {@code
* metricName} in this container.
Expand All @@ -52,6 +60,14 @@ public interface MetricsContainer extends Serializable {
default Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
throw new RuntimeException("Histogram metric is not supported yet.");
}
/**
* Return the {@link Histogram} that should be used for implementing the given per-worker {@code
* metricName} in this container.
*/
default Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
return NoOpHistogram.getInstance();
}

/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;

/**
* A no-op implementation of Counter. This class exists to provide a default if an implementation of
* MetricsContainer does not override a Counter getter.
*/
public class NoOpCounter implements Counter {
JayajP marked this conversation as resolved.
Show resolved Hide resolved

private static final NoOpCounter singleton = new NoOpCounter();
private static final MetricName name = MetricName.named(NoOpCounter.class, "singleton");

private NoOpCounter() {}

@Override
public void inc() {}

@Override
public void inc(long n) {}

@Override
public void dec() {}

@Override
public void dec(long n) {}

@Override
public MetricName getName() {
return name;
}

public static NoOpCounter getInstance() {
return singleton;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;

/**
* A no-op implementation of Histogram. This class exists to provide a default if an implementation
* of MetricsContainer does not override a Histogram getter.
*/
public class NoOpHistogram implements Histogram {

private static final NoOpHistogram singleton = new NoOpHistogram();
private static final MetricName name = MetricName.named(NoOpHistogram.class, "singleton");

private NoOpHistogram() {}

@Override
public void update(double value) {}

@Override
public MetricName getName() {
return name;
}

public static NoOpHistogram getInstance() {
return singleton;
}
}
Loading