diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java new file mode 100644 index 000000000000..f3f31ccfd36f --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.api.services.dataflow.model.Base2Exponent; +import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowHistogramValue; +import com.google.api.services.dataflow.model.Linear; +import com.google.api.services.dataflow.model.MetricValue; +import com.google.api.services.dataflow.model.OutlierStats; +import com.google.api.services.dataflow.model.PerStepNamespaceMetrics; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.util.HistogramData; + +/** + * Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support + * converting metrics from {@link BigQuerySinkMetrics} with this converter. + */ +public class MetricsToPerStepNamespaceMetricsConverter { + /** + * @param metricName The {@link MetricName} that represents this counter. + * @param value The counter value. + * @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise + * returns an empty optional + */ + private static Optional convertCounterToMetricValue( + MetricName metricName, Long value) { + if (value == 0 || !metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)) { + return Optional.empty(); + } + + BigQuerySinkMetrics.ParsedMetricName labeledName = + BigQuerySinkMetrics.parseMetricName(metricName.getName()); + if (labeledName == null || labeledName.getBaseName().isEmpty()) { + return Optional.empty(); + } + + return Optional.of( + new MetricValue() + .setMetric(labeledName.getBaseName()) + .setMetricLabels(labeledName.getMetricLabels()) + .setValueInt64(value)); + } + + /** + * @param metricName The {@link MetricName} that represents this Histogram. + * @param value The histogram value. Currently we only support converting histograms that use + * {@code linear} or {@code exponential} buckets. + * @return If this conversion succeeds, a {@code MetricValue} that represents this histogram. + * Otherwise returns an empty optional. + */ + private static Optional convertHistogramToMetricValue( + MetricName metricName, HistogramData value) { + if (value.getTotalCount() == 0L) { + return Optional.empty(); + } + + BigQuerySinkMetrics.ParsedMetricName labeledName = + BigQuerySinkMetrics.parseMetricName(metricName.getName()); + if (labeledName == null || labeledName.getBaseName().isEmpty()) { + return Optional.empty(); + } + + DataflowHistogramValue histogramValue = new DataflowHistogramValue(); + int numberOfBuckets = value.getBucketType().getNumBuckets(); + + if (value.getBucketType() instanceof HistogramData.LinearBuckets) { + HistogramData.LinearBuckets buckets = (HistogramData.LinearBuckets) value.getBucketType(); + Linear linearOptions = + new Linear() + .setNumberOfBuckets(numberOfBuckets) + .setWidth(buckets.getWidth()) + .setStart(buckets.getStart()); + histogramValue.setBucketOptions(new BucketOptions().setLinear(linearOptions)); + } else if (value.getBucketType() instanceof HistogramData.ExponentialBuckets) { + HistogramData.ExponentialBuckets buckets = + (HistogramData.ExponentialBuckets) value.getBucketType(); + Base2Exponent expoenntialOptions = + new Base2Exponent().setNumberOfBuckets(numberOfBuckets).setScale(buckets.getScale()); + histogramValue.setBucketOptions(new BucketOptions().setExponential(expoenntialOptions)); + } else { + return Optional.empty(); + } + + histogramValue.setCount(value.getTotalCount()); + List bucketCounts = new ArrayList<>(value.getBucketType().getNumBuckets()); + + for (int i = 0; i < value.getBucketType().getNumBuckets(); i++) { + bucketCounts.add(value.getCount(i)); + } + + // Remove trailing 0 buckets. + for (int i = bucketCounts.size() - 1; i >= 0; i--) { + if (bucketCounts.get(i) != 0) { + break; + } + bucketCounts.remove(i); + } + + histogramValue.setBucketCounts(bucketCounts); + + OutlierStats outlierStats = + new OutlierStats() + .setOverflowCount(value.getTopBucketCount()) + .setOverflowMean(value.getTopBucketMean()) + .setUnderflowCount(value.getBottomBucketCount()) + .setUnderflowMean(value.getBottomBucketMean()); + + histogramValue.setOutlierStats(outlierStats); + + return Optional.of( + new MetricValue() + .setMetric(labeledName.getBaseName()) + .setMetricLabels(labeledName.getMetricLabels()) + .setValueHistogram(histogramValue)); + } + + /** + * @param stepName The unfused stage that these metrics are associated with. + * @param counters Counter updates to convert. + * @param histograms Histogram updates to convert. + * @return Collection of {@code PerStepNamespaceMetrics} that represent these metric updates. Each + * {@code PerStepNamespaceMetrics} contains a list of {@code MetricUpdates} for a {unfused + * stage, metrics namespace} pair. + */ + public static Collection convert( + String stepName, Map counters, Map histograms) { + + Map metricsByNamespace = new HashMap<>(); + + for (Entry entry : counters.entrySet()) { + MetricName metricName = entry.getKey(); + Optional metricValue = convertCounterToMetricValue(metricName, entry.getValue()); + if (!metricValue.isPresent()) { + continue; + } + + PerStepNamespaceMetrics stepNamespaceMetrics = + metricsByNamespace.get(metricName.getNamespace()); + if (stepNamespaceMetrics == null) { + stepNamespaceMetrics = + new PerStepNamespaceMetrics() + .setMetricValues(new ArrayList<>()) + .setOriginalStep(stepName) + .setMetricsNamespace(metricName.getNamespace()); + metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics); + } + + stepNamespaceMetrics.getMetricValues().add(metricValue.get()); + } + + for (Entry entry : histograms.entrySet()) { + MetricName metricName = entry.getKey(); + Optional metricValue = + convertHistogramToMetricValue(metricName, entry.getValue()); + if (!metricValue.isPresent()) { + continue; + } + + PerStepNamespaceMetrics stepNamespaceMetrics = + metricsByNamespace.get(metricName.getNamespace()); + if (stepNamespaceMetrics == null) { + stepNamespaceMetrics = + new PerStepNamespaceMetrics() + .setMetricValues(new ArrayList<>()) + .setOriginalStep(stepName) + .setMetricsNamespace(metricName.getNamespace()); + metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics); + } + + stepNamespaceMetrics.getMetricValues().add(metricValue.get()); + } + + return metricsByNamespace.values(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java new file mode 100644 index 000000000000..a37bf643b7ee --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +import com.google.api.services.dataflow.model.Base2Exponent; +import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowHistogramValue; +import com.google.api.services.dataflow.model.Linear; +import com.google.api.services.dataflow.model.MetricValue; +import com.google.api.services.dataflow.model.OutlierStats; +import com.google.api.services.dataflow.model.PerStepNamespaceMetrics; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MetricsToPerStepNamespaceMetricsConverterTest { + + public static class TestBucketType implements HistogramData.BucketType { + @Override + public double getRangeFrom() { + return 0.0; + } + + @Override + public double getRangeTo() { + return 5.0; + } + + @Override + public int getNumBuckets() { + return 1; + } + + @Override + public int getBucketIndex(double value) { + return 0; + } + + @Override + public double getBucketSize(int index) { + return 5.0; + } + + @Override + public double getAccumulatedBucketSize(int endIndex) { + return 5.0; + } + } + + @Test + public void testConvert_successfulyConvertCounters() { + String step = "testStepName"; + Map emptyHistograms = new HashMap<>(); + Map counters = new HashMap(); + MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "metric1"); + MetricName bigQueryMetric2 = + MetricName.named("BigQuerySink", "metric2-label1:val1;label2:val2;"); + MetricName bigQueryMetric3 = MetricName.named("BigQuerySink", "zeroValue"); + + counters.put(bigQueryMetric1, 5L); + counters.put(bigQueryMetric2, 10L); + counters.put(bigQueryMetric3, 0L); + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert(step, counters, emptyHistograms); + + MetricValue expectedVal1 = + new MetricValue().setMetric("metric1").setValueInt64(5L).setMetricLabels(new HashMap<>()); + Map val2LabelMap = new HashMap<>(); + val2LabelMap.put("label1", "val1"); + val2LabelMap.put("label2", "val2"); + MetricValue expectedVal2 = + new MetricValue().setMetric("metric2").setValueInt64(10L).setMetricLabels(val2LabelMap); + + assertThat(conversionResult.size(), equalTo(1)); + PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next(); + + assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step)); + assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("BigQuerySink")); + assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(2)); + assertThat( + perStepNamespaceMetrics.getMetricValues(), containsInAnyOrder(expectedVal1, expectedVal2)); + } + + @Test + public void testConvert_skipInvalidMetricNames() { + Map counters = new HashMap<>(); + MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "invalid-metric-name1"); + counters.put(bigQueryMetric1, 5L); + + Map histograms = new HashMap<>(); + MetricName bigQueryMetric2 = MetricName.named("BigQuerySink", "invalid-metric-name2"); + HistogramData nonEmptyLinearHistogram = HistogramData.linear(0, 10, 10); + nonEmptyLinearHistogram.record(-5.0); + histograms.put(bigQueryMetric2, nonEmptyLinearHistogram); + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert("testStep", counters, histograms); + assertThat(conversionResult.size(), equalTo(0)); + } + + @Test + public void testConvert_successfulConvertHistograms() { + Map histograms = new HashMap(); + MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel"); + MetricName bigQueryMetric2 = + MetricName.named("BigQuerySink", "baseLabel-label1:val1;label2:val2;"); + MetricName bigQueryMetric3 = MetricName.named("BigQuerySink", "zeroValue"); + + HistogramData nonEmptyLinearHistogram = HistogramData.linear(0, 10, 10); + nonEmptyLinearHistogram.record(-5.0, 15.0, 25.0, 35.0, 105.0); + histograms.put(bigQueryMetric1, nonEmptyLinearHistogram); + + HistogramData noEmptyExponentialHistogram = HistogramData.exponential(0, 5); + noEmptyExponentialHistogram.record(-5.0, 15.0, 25.0, 35.0, 105.0); + histograms.put(bigQueryMetric2, noEmptyExponentialHistogram); + + HistogramData emptyHistogram = HistogramData.linear(0, 10, 10); + histograms.put(bigQueryMetric3, emptyHistogram); + + String step = "testStep"; + Map emptyCounters = new HashMap<>(); + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert(step, emptyCounters, histograms); + + // Expected value 1 + List bucketCounts1 = ImmutableList.of(0L, 1L, 1L, 1L); + + Linear linearOptions1 = new Linear().setNumberOfBuckets(10).setWidth(10.0).setStart(0.0); + BucketOptions bucketOptions1 = new BucketOptions().setLinear(linearOptions1); + + OutlierStats outlierStats1 = + new OutlierStats() + .setUnderflowCount(1L) + .setUnderflowMean(-5.0) + .setOverflowCount(1L) + .setOverflowMean(105.0); + DataflowHistogramValue linearHistogram1 = + new DataflowHistogramValue() + .setCount(5L) + .setBucketOptions(bucketOptions1) + .setBucketCounts(bucketCounts1) + .setOutlierStats(outlierStats1); + + MetricValue expectedVal1 = + new MetricValue() + .setMetric("baseLabel") + .setMetricLabels(new HashMap<>()) + .setValueHistogram(linearHistogram1); + + // Expected value 2 + List bucketCounts2 = ImmutableList.of(0L, 0L, 0L, 1L, 1L); + OutlierStats outlierStats2 = + new OutlierStats() + .setUnderflowCount(1L) + .setUnderflowMean(-5.0) + .setOverflowCount(2L) + .setOverflowMean(70.0); + Base2Exponent exponentialOptions2 = new Base2Exponent().setNumberOfBuckets(5).setScale(0); + + BucketOptions bucketOptions2 = new BucketOptions().setExponential(exponentialOptions2); + + DataflowHistogramValue exponentialHistogram2 = + new DataflowHistogramValue() + .setCount(5L) + .setBucketOptions(bucketOptions2) + .setBucketCounts(bucketCounts2) + .setOutlierStats(outlierStats2); + + Map metric2Labels = new HashMap<>(); + metric2Labels.put("label1", "val1"); + metric2Labels.put("label2", "val2"); + MetricValue expectedVal2 = + new MetricValue() + .setMetric("baseLabel") + .setValueHistogram(exponentialHistogram2) + .setMetricLabels(metric2Labels); + + assertThat(conversionResult.size(), equalTo(1)); + PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next(); + + assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step)); + assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("BigQuerySink")); + assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(2)); + assertThat( + perStepNamespaceMetrics.getMetricValues(), containsInAnyOrder(expectedVal1, expectedVal2)); + } + + @Test + public void testConvert_skipUnknownHistogramBucketType() { + String step = "testStep"; + Map emptyCounters = new HashMap<>(); + Map histograms = new HashMap(); + + HistogramData histogram = new HistogramData(new TestBucketType()); + histogram.record(1.0, 2.0); + MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel"); + histograms.put(bigQueryMetric1, histogram); + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert(step, emptyCounters, histograms); + assertThat(conversionResult.size(), equalTo(0)); + } + + @Test + public void testConvert_convertCountersAndHistograms() { + String step = "testStep"; + Map counters = new HashMap<>(); + Map histograms = new HashMap(); + + MetricName counterMetricName = MetricName.named("BigQuerySink", "counter-label1:val1;"); + counters.put(counterMetricName, 3L); + + MetricName histogramMetricName = MetricName.named("BigQuerySink", "histogram-label2:val2;"); + HistogramData linearHistogram = HistogramData.linear(0, 10, 10); + linearHistogram.record(5.0); + histograms.put(histogramMetricName, linearHistogram); + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert(step, counters, histograms); + + // Expected counter MetricValue + Map counterLabelMap = new HashMap<>(); + counterLabelMap.put("label1", "val1"); + MetricValue expectedCounter = + new MetricValue().setMetric("counter").setValueInt64(3L).setMetricLabels(counterLabelMap); + + // Expected histogram MetricValue + List bucketCounts1 = ImmutableList.of(1L); + + Linear linearOptions1 = new Linear().setNumberOfBuckets(10).setWidth(10.0).setStart(0.0); + BucketOptions bucketOptions1 = new BucketOptions().setLinear(linearOptions1); + + OutlierStats outlierStats1 = + new OutlierStats() + .setUnderflowCount(0L) + .setUnderflowMean(0.0) + .setOverflowCount(0L) + .setOverflowMean(0.0); + DataflowHistogramValue linearHistogram1 = + new DataflowHistogramValue() + .setCount(1L) + .setBucketOptions(bucketOptions1) + .setBucketCounts(bucketCounts1) + .setOutlierStats(outlierStats1); + + Map histogramLabelMap = new HashMap<>(); + histogramLabelMap.put("label2", "val2"); + + MetricValue expectedHistogram = + new MetricValue() + .setMetric("histogram") + .setMetricLabels(histogramLabelMap) + .setValueHistogram(linearHistogram1); + + assertThat(conversionResult.size(), equalTo(1)); + PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next(); + + assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step)); + assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("BigQuerySink")); + assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(2)); + assertThat( + perStepNamespaceMetrics.getMetricValues(), + containsInAnyOrder(expectedCounter, expectedHistogram)); + } +}