diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 5af91ec2f056..41a225480879 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20240815-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20241209-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20240706-2.0.0", // [bomupgrader] sets version diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java index f9cd098edaa6..5bc6ecc64ea6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java @@ -74,6 +74,12 @@ public Gauge getGauge(MetricName metricName) { return getCurrentContainer().getGauge(metricName); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName); + return gauge; + } + @Override public StringSet getStringSet(MetricName metricName) { return getCurrentContainer().getStringSet(metricName); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java index 6291b4a75b14..de1ac5804cdc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java @@ -19,6 +19,7 @@ import com.google.api.services.dataflow.model.Base2Exponent; import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowGaugeValue; import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.api.services.dataflow.model.Linear; import com.google.api.services.dataflow.model.MetricValue; @@ -84,7 +85,7 @@ private static Optional convertCounterToMetricValue( .setValueInt64(value)); } - /** + /** * @param metricName The {@link MetricName} that represents this counter. * @param value The counter value. * @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise @@ -96,18 +97,24 @@ private static Optional convertGaugeToMetricValue( Map parsedPerWorkerMetricsCache) { if ((!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE) - && !metricName.getNamespace().equals(KafkaSinkMetrics.METRICS_NAMESPACE))) { + && !metricName.getNamespace().equals(KafkaSinkMetrics.METRICS_NAMESPACE))) { return Optional.empty(); } - return getParsedMetricName(metricName, parsedPerWorkerMetricsCache) - .filter(labeledName -> !labeledName.getBaseName().isEmpty()) - .map( - labeledName -> - new MetricValue() - .setMetric(labeledName.getBaseName()) - .setMetricLabels(labeledName.getMetricLabels()) - .setGaugeInt64(value)); // change this + Optional labeledName = + getParsedMetricName(metricName, parsedPerWorkerMetricsCache); + if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) { + return Optional.empty(); + } + + DataflowGaugeValue gauge_value = new DataflowGaugeValue(); + gauge_value.setValue(value); + + return Optional.of( + new MetricValue() + .setMetric(labeledName.get().getBaseName()) + .setMetricLabels(labeledName.get().getMetricLabels()) + .setValueGauge64(gauge_value)); } /** @@ -272,8 +279,9 @@ public static Collection convert( for (Entry entry : gauges.entrySet()) { MetricName metricName = entry.getKey(); - Optional metricValue = - convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); + Optional metricValue; + metricValue = + convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); if (!metricValue.isPresent()) { continue; } @@ -288,7 +296,6 @@ public static Collection convert( .setMetricsNamespace(metricName.getNamespace()); metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics); } - stepNamespaceMetrics.getMetricValues().add(metricValue.get()); } return metricsByNamespace.values(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 7daded233294..6ce60283735f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -665,7 +665,6 @@ public static void main(String[] args) throws Exception { } if (DataflowRunner.hasExperiment(options, "enable_kafka_metrics")) { - LOG.info("xxx enable experiement"); KafkaSinkMetrics.setSupportKafkaMetrics(true); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 8d03f5500af3..075ac54f441f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -49,8 +49,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + /** * For Dataflow Streaming, we want to efficiently support many threads report metric updates, and a * single total delta being reported periodically as physical counters. @@ -60,8 +59,6 @@ }) public class StreamingStepMetricsContainer implements MetricsContainer { - private static final Logger LOG = LoggerFactory.getLogger(StreamingStepMetricsContainer.class); - private final String stepName; private static boolean enablePerWorkerMetrics = false; @@ -74,7 +71,8 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private MetricsMap gauges = new MetricsMap<>(GaugeCell::new); // how to handle concurrency - private final ConcurrentHashMap perWorkerGauges = new ConcurrentHashMap<>(); + private final ConcurrentHashMap perWorkerGauges = + new ConcurrentHashMap<>(); private MetricsMap stringSet = new MetricsMap<>(StringSetCell::new); @@ -89,7 +87,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final ConcurrentHashMap parsedPerWorkerMetricsCache; - // PerWorkerCounters and PerWorkerGauges that have been longer than this value will be removed from the underlying + // PerWorkerCounters that have been longer than this value will be removed from the underlying // metrics map. private final Duration maximumPerWorkerCounterStaleness = Duration.ofMinutes(5); @@ -173,16 +171,14 @@ public Gauge getGauge(MetricName metricName) { @Override public Gauge getPerWorkerGauge(MetricName metricName) { if (!enablePerWorkerMetrics) { - return MetricsContainer.super.getPerWorkerGauge(metricName); + return MetricsContainer.super.getPerWorkerGauge(metricName); // returns no op gauge } - Gauge val = perWorkerGauges.get(metricName); if (val != null) { return val; } - return perWorkerGauges.computeIfAbsent( - metricName, name -> new GaugeCell(metricName)); + return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName)); } @Override @@ -356,8 +352,6 @@ Iterable extractPerWorkerMetricUpdates() { ConcurrentHashMap histograms = new ConcurrentHashMap(); HashSet currentZeroValuedCounters = new HashSet(); - LOG.info("xxx per worker extractPerWorkerMetricUpdates"); - // Extract metrics updates. perWorkerCounters.forEach( (k, v) -> { Long val = v.getAndSet(0); @@ -367,12 +361,13 @@ Iterable extractPerWorkerMetricUpdates() { } counters.put(k, val); }); + perWorkerGauges.forEach( - (k, v) -> { - Long val = v.getCumulative().value(); - LOG.info("xxx per worker gauges k {} val {}", k.getName(), val); - gauges.put(k, val); // no special handing for zero, since that value is important - }); + (k, v) -> { + Long val = v.getCumulative().value(); + gauges.put(k, val); // no special handing for zero, since that value is important + v.reset(); + }); perWorkerHistograms.forEach( (k, v) -> { v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot)); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index ba77d8e1ce26..c43b877a9ddb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -398,7 +398,6 @@ private Optional createWorkerMessageForPerWorkerMetrics() { List metrics = new ArrayList<>(); allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues())); - if (metrics.isEmpty()) { return Optional.empty(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java index ef99da9dcdca..7939db911245 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java @@ -23,6 +23,7 @@ import com.google.api.services.dataflow.model.Base2Exponent; import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowGaugeValue; import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.api.services.dataflow.model.Linear; import com.google.api.services.dataflow.model.MetricValue; @@ -44,6 +45,7 @@ @RunWith(JUnit4.class) public class MetricsToPerStepNamespaceMetricsConverterTest { + private static final HistogramData.BucketType lienarBuckets = HistogramData.LinearBuckets.of(0, 10, 10); private static final HistogramData.BucketType exponentialBuckets = @@ -86,6 +88,7 @@ public void testConvert_successfulyConvertCounters() { String step = "testStepName"; Map emptyHistograms = new HashMap<>(); Map counters = new HashMap(); + Map emptyGauges = new HashMap(); Map parsedMetricNames = new HashMap<>(); MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "metric1"); @@ -99,7 +102,7 @@ public void testConvert_successfulyConvertCounters() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, counters, counters, emptyHistograms, parsedMetricNames); + step, counters, emptyGauges, emptyHistograms, parsedMetricNames); MetricValue expectedVal1 = new MetricValue().setMetric("metric1").setValueInt64(5L).setMetricLabels(new HashMap<>()); @@ -133,6 +136,7 @@ public void testConvert_skipInvalidMetricNames() { Map parsedMetricNames = new HashMap<>(); Map counters = new HashMap<>(); + Map emptyGauges = new HashMap(); MetricName invalidName1 = MetricName.named("BigQuerySink", "**"); counters.put(invalidName1, 5L); @@ -144,7 +148,7 @@ public void testConvert_skipInvalidMetricNames() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - "testStep", counters, counters, histograms, parsedMetricNames); + "testStep", counters, emptyGauges, histograms, parsedMetricNames); assertThat(conversionResult.size(), equalTo(0)); assertThat(parsedMetricNames.size(), equalTo(0)); } @@ -152,7 +156,7 @@ public void testConvert_skipInvalidMetricNames() { @Test public void testConvert_successfulConvertHistograms() { Map parsedMetricNames = new HashMap<>(); - + Map emptyGauges = new HashMap(); Map histograms = new HashMap<>(); MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel"); MetricName bigQueryMetric2 = @@ -181,7 +185,7 @@ public void testConvert_successfulConvertHistograms() { Map emptyCounters = new HashMap<>(); Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, emptyCounters,emptyCounters, histograms, parsedMetricNames); + step, emptyCounters, emptyGauges, histograms, parsedMetricNames); // Expected value 1 List bucketCounts1 = ImmutableList.of(0L, 1L, 1L, 1L); @@ -280,6 +284,7 @@ public void testConvert_skipUnknownHistogramBucketType() { public void testConvert_convertCountersAndHistograms() { String step = "testStep"; Map counters = new HashMap<>(); + Map emptyGauges = new HashMap<>(); Map histograms = new HashMap<>(); Map parsedMetricNames = new HashMap<>(); @@ -293,7 +298,7 @@ public void testConvert_convertCountersAndHistograms() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, counters, counters, histograms, parsedMetricNames); + step, counters, emptyGauges, histograms, parsedMetricNames); // Expected counter MetricValue Map counterLabelMap = new HashMap<>(); @@ -346,52 +351,76 @@ public void testConvert_convertCountersAndHistograms() { IsMapContaining.hasEntry(histogramMetricName, parsedHistogramMetricName)); } -// @Test -// public void testConvert_successfulyConvertGauges() { -// String step = "testStepName"; -// Map emptyHistograms = new HashMap<>(); -// Map counters = new HashMap(); -// Map gauge = new HashMap(); - -// Map parsedMetricNames = new HashMap<>(); - -// MetricName bigQueryMetric1 = MetricName.named("KafkaSink", "metric1"); -// MetricName bigQueryMetric2 = -// MetricName.named("KafkaSink", "metric2*label1:val1;label2:val2;"); -// MetricName bigQueryMetric3 = MetricName.named("BigQuerySink", "zeroValue"); // ? - -// counters.put(bigQueryMetric1, 5L); -// counters.put(bigQueryMetric2, 10L); -// counters.put(bigQueryMetric3, 0L); - -// Collection conversionResult = -// MetricsToPerStepNamespaceMetricsConverter.convert( -// step, counters, emptyHistograms, parsedMetricNames); - -// MetricValue expectedVal1 = -// new MetricValue().setMetric("metric1").setGaugeValue(5L).setMetricLabels(new HashMap<>()); -// Map val2LabelMap = new HashMap<>(); -// val2LabelMap.put("label1", "val1"); -// val2LabelMap.put("label2", "val2"); -// MetricValue expectedVal2 = -// new MetricValue().setMetric("metric2").setValueInt64(10L).setMetricLabels(val2LabelMap); - -// assertThat(conversionResult.size(), equalTo(1)); -// PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next(); - -// assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step)); -// assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("BigQuerySink")); -// assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(2)); -// assertThat( -// perStepNamespaceMetrics.getMetricValues(), containsInAnyOrder(expectedVal1, expectedVal2)); - -// LabeledMetricNameUtils.ParsedMetricName parsedBigQueryMetric1 = -// LabeledMetricNameUtils.parseMetricName(bigQueryMetric1.getName()).get(); -// LabeledMetricNameUtils.ParsedMetricName parsedBigQueryMetric2 = -// LabeledMetricNameUtils.parseMetricName(bigQueryMetric2.getName()).get(); - -// assertThat(parsedMetricNames.size(), equalTo(2)); -// assertThat(parsedMetricNames, IsMapContaining.hasEntry(bigQueryMetric1, parsedBigQueryMetric1)); -// assertThat(parsedMetricNames, IsMapContaining.hasEntry(bigQueryMetric2, parsedBigQueryMetric2)); -// } + @Test + public void testConvert_successfulyConvertGauges() { + String step = "testStepName"; + Map emptyHistograms = new HashMap<>(); + Map counters = new HashMap(); + Map gauges = new HashMap(); + // convertCountersAndHistograms + Map parsedMetricNames = new HashMap<>(); + + MetricName KafkaMetric1 = MetricName.named("KafkaSink", "metric1"); + MetricName KafkaMetric2 = MetricName.named("KafkaSink", "metric2*label1:val1;label2:val2;"); + MetricName KafkaMetric3 = MetricName.named("KafkaSink", "metric3"); // ? + + gauges.put(KafkaMetric1, 5L); + gauges.put(KafkaMetric2, 10L); + gauges.put(KafkaMetric3, 0L); // zero valued metric is still reported + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert( + step, counters, gauges, emptyHistograms, parsedMetricNames); + + DataflowGaugeValue gauge_value1 = new DataflowGaugeValue(); + gauge_value1.setValue(5L); + + DataflowGaugeValue gauge_value2 = new DataflowGaugeValue(); + gauge_value2.setValue(10L); + + DataflowGaugeValue gauge_value3 = new DataflowGaugeValue(); + gauge_value3.setValue(0L); // zero valued + + MetricValue expectedVal1 = + new MetricValue() + .setMetric("metric1") + .setValueGauge64(gauge_value1) + .setMetricLabels(new HashMap<>()); + + Map val2LabelMap = new HashMap<>(); + val2LabelMap.put("label1", "val1"); + val2LabelMap.put("label2", "val2"); + MetricValue expectedVal2 = + new MetricValue() + .setMetric("metric2") + .setValueGauge64(gauge_value2) + .setMetricLabels(val2LabelMap); + + MetricValue expectedVal3 = + new MetricValue() + .setMetric("metric3") + .setValueGauge64(gauge_value3) + .setMetricLabels(new HashMap<>()); + + assertThat(conversionResult.size(), equalTo(1)); + PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next(); + + assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step)); + assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("KafkaSink")); + assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(3)); + assertThat( + perStepNamespaceMetrics.getMetricValues(), + containsInAnyOrder(expectedVal1, expectedVal2, expectedVal3)); + + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric1 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric1.getName()).get(); + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric2 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric2.getName()).get(); + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric3 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric3.getName()).get(); + assertThat(parsedMetricNames.size(), equalTo(3)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric1, parsedKafkaMetric1)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric2, parsedKafkaMetric2)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric3, parsedKafkaMetric3)); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index c138833cf803..2d07df1d899b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -82,6 +82,7 @@ default Histogram getPerWorkerHistogram( default Gauge getPerWorkerGauge(MetricName metricName) { return NoOpGauge.getInstance(); } + /** Return the cumulative values for any metrics in this container as MonitoringInfos. */ default Iterable getMonitoringInfos() { throw new RuntimeException("getMonitoringInfos is not implemented on this MetricsContainer."); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java index 6f1cf56c0aea..ee03627302d2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -23,15 +23,12 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.DelegatingCounter; -import org.apache.beam.sdk.metrics.Gauge; -import org.apache.beam.sdk.metrics.MetricName; - /** Stores and exports metrics for a batch of Kafka Client RPCs. */ public interface KafkaMetrics { @@ -84,7 +81,7 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { // metric name abstract HashMap> perTopicRpcLatencies(); - // worry about concurrency? + // worry about concurrency? with atomic Gauge in static HashMap backlogGauges = new HashMap(); abstract HashMap perTopicPartitionBacklogs(); @@ -93,7 +90,9 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { public static KafkaMetricsImpl create() { return new AutoValue_KafkaMetrics_KafkaMetricsImpl( - new HashMap>(), new HashMap(), new AtomicBoolean(true)); + new HashMap>(), + new HashMap(), + new AtomicBoolean(true)); } /** Record the rpc status and latency of a successful Kafka poll RPC call. */ @@ -120,10 +119,10 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { */ @Override public void updateBacklogBytes(String topicName, int partitionId, long backlog) { - // more efficient to create each time? or to store them in memory? - String name = KafkaSinkMetrics.GetMetricGaugeName(topicName, partitionId).getName(); - // dont need to check if it exists since we rewrite it - perTopicPartitionBacklogs().put(name, backlog); + if (isWritable().get()) { + String name = KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName(); + perTopicPartitionBacklogs().put(name, backlog); + } } /** Record rpc latency histogram metrics for all recorded topics. */ @@ -147,11 +146,10 @@ private void recordRpcLatencyMetrics() { } } - // Create gauges from current map - private void recordBacklogBytes(){ - // LOG.info("xxx recordBacklogBytes backlog bytes"); - for (Map.Entry backlogs : perTopicPartitionBacklogs().entrySet()){ - Gauge gauge = KafkaSinkMetrics.createBacklogGauge( MetricName.named("KafkaSink", backlogs.getKey())); + private void recordBacklogBytes() { + for (Map.Entry backlogs : perTopicPartitionBacklogs().entrySet()) { + Gauge gauge = + KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", backlogs.getKey())); gauge.set(backlogs.getValue()); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java index be3ff502df9f..9ddbcf8f0ba6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; -import org.apache.beam.sdk.metrics.DelegatingHistogram; import org.apache.beam.sdk.metrics.DelegatingGauge; +import org.apache.beam.sdk.metrics.DelegatingHistogram; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; @@ -36,7 +36,7 @@ // TODO, refactor out common parts for BQ sink, so it can be reused with other sinks, eg, GCS? // @SuppressWarnings("unused") public class KafkaSinkMetrics { - private static boolean supportKafkaMetrics = true; + private static boolean supportKafkaMetrics = false; public static final String METRICS_NAMESPACE = "KafkaSink"; @@ -54,7 +54,6 @@ enum RpcMethod { private static final String RPC_METHOD = "rpc_method"; private static final String PARTITION_ID = "partition_id"; - /** * Creates an Histogram metric to record RPC latency. Metric will have name. * @@ -76,36 +75,44 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic return new DelegatingHistogram(metricName, buckets, false, true); } - /** + /** * Creates an Histogram metric to record RPC latency. Metric will have name. * - *

'EstimatedBacklogSize*topic_name:{topic};partition_id:{partition_id};' + *

'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};' * * @param topic Kafka topic associated with this metric. - * @param partition_id partition id associated with this metric. + * @param partitionId partition id associated with this metric. * @return Counter. */ - public static Gauge createBacklogGauge(String topic, int partition_id) { - return new DelegatingGauge(GetMetricGaugeName(topic, partition_id), false, true); + public static Gauge createBacklogGauge(String topic, int partitionId) { + return new DelegatingGauge(getMetricGaugeName(topic, partitionId), false, true); } - /** - * Creates an Histogram metric to record RPC latency. Metric will have name. + /** + * Creates an Gauge metric to record per partition backlog. Metric will have name. * - *

'EstimatedBacklogSize*topic_name:{topic};partition_id:{partition_id};' + *

'EstimatedBacklogSize*topic_name:{topic};partition_id:{partitionId};' * - * @param topic Kafka topic associated with this metric. - * @param partition_id partition id associated with this metric. + * @param name MetricName for the KafkaSink. * @return Counter. */ public static Gauge createBacklogGauge(MetricName name) { return new DelegatingGauge(name, false, true); } - public static MetricName GetMetricGaugeName(String topic, int partition_id){ + /** + * Creates an MetricName based on topic name and partition id. + * + *

'EstimatedBacklogSize*topic_name:{topic};partition_id:{partitionId};' + * + * @param topic Kafka topic associated with this metric. + * @param partitionId partition id associated with this metric. + * @return MetricName. + */ + public static MetricName getMetricGaugeName(String topic, int partitionId) { LabeledMetricNameUtils.MetricNameBuilder nameBuilder = - LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIAMTED_BACKLOG_SIZE); - nameBuilder.addLabel(PARTITION_ID, String.valueOf(partition_id)); + LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIAMTED_BACKLOG_SIZE); + nameBuilder.addLabel(PARTITION_ID, String.valueOf(partitionId)); nameBuilder.addLabel(TOPIC_LABEL, topic); return nameBuilder.build(METRICS_NAMESPACE); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 4b33af07ae44..0f32cfbaefd8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -71,7 +71,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; /** * An unbounded reader to read from Kafka. Each reader consumes messages from one or more Kafka @@ -669,7 +668,6 @@ private void nextBatch() throws IOException { partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator()); reportBacklog(); - // update metrics. Cant be a part of reportBacklog, since its done when we checkpoint as well. reportBacklogMetrics(); // cycle through the partitions in order to interleave records from each. @@ -746,8 +744,8 @@ private void reportBacklogMetrics() { for (PartitionState p : partitionStates) { long pBacklog = p.approxBacklogInBytes(); if (pBacklog != UnboundedReader.BACKLOG_UNKNOWN) { - // LOG.info("xxx backlog bytes {} split {}", pBacklog, p.topicPartition().partition()); - kafkaResults.updateBacklogBytes(p.topicPartition().topic(), p.topicPartition().partition(), pBacklog); + kafkaResults.updateBacklogBytes( + p.topicPartition().topic(), p.topicPartition().partition(), pBacklog); } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java index b84e143be773..e9339878d42f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java @@ -24,7 +24,9 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -59,6 +61,9 @@ public static class TestMetricsContainer extends MetricsContainerImpl { perWorkerHistograms = new ConcurrentHashMap, TestHistogram>(); + public ConcurrentHashMap perWorkerGauges = + new ConcurrentHashMap(); + public TestMetricsContainer() { super("TestStep"); } @@ -70,9 +75,16 @@ public Histogram getPerWorkerHistogram( return perWorkerHistograms.get(KV.of(metricName, bucketType)); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName)); + return perWorkerGauges.get(metricName); + } + @Override public void reset() { perWorkerHistograms.clear(); + perWorkerGauges.clear(); } } @@ -83,10 +95,11 @@ public void testNoOpKafkaMetrics() throws Exception { KafkaMetrics results = KafkaMetrics.NoOpKafkaMetrics.getInstance(); results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); - + results.updateBacklogBytes("test-topic", 0, 10); results.updateKafkaMetrics(); assertThat(testContainer.perWorkerHistograms.size(), equalTo(0)); + assertThat(testContainer.perWorkerGauges.size(), equalTo(0)); } @Test @@ -99,6 +112,7 @@ public void testKafkaRPCLatencyMetrics() throws Exception { KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics(); results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); + results.updateBacklogBytes("test-topic", 0, 10); results.updateKafkaMetrics(); // RpcLatency*rpc_method:POLL;topic_name:test-topic @@ -110,6 +124,11 @@ public void testKafkaRPCLatencyMetrics() throws Exception { assertThat( testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, containsInAnyOrder(Double.valueOf(10.0))); + + MetricName gaugeName = + MetricName.named("KafkaSink", "EstimatedBacklogSize*partition_id:0;topic_name:test-topic;"); + assertThat(testContainer.perWorkerGauges.size(), equalTo(1)); + assertThat(testContainer.perWorkerGauges.get(gaugeName).getCumulative().value(), equalTo(10L)); } @Test