Skip to content

Commit

Permalink
Remove mandatory beam-sdks-io-kafka dependency for dataflow worker jar (
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored Dec 11, 2024
1 parent 7a94b61 commit 04b059f
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 11 deletions.
1 change: 0 additions & 1 deletion runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def sdk_provided_project_dependencies = [
":runners:google-cloud-dataflow-java",
":sdks:java:extensions:avro",
":sdks:java:extensions:google-cloud-platform-core",
":sdks:java:io:kafka", // For metric propagation into worker
":sdks:java:io:google-cloud-platform",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
Expand All @@ -43,6 +42,9 @@
* converter.
*/
public class MetricsToPerStepNamespaceMetricsConverter {
// Avoids to introduce mandatory kafka-io dependency to Dataflow worker
// keep in sync with org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE
public static String KAFKA_SINK_METRICS_NAMESPACE = "KafkaSink";

private static Optional<LabeledMetricNameUtils.ParsedMetricName> getParsedMetricName(
MetricName metricName,
Expand Down Expand Up @@ -70,7 +72,7 @@ private static Optional<MetricValue> convertCounterToMetricValue(

if (value == 0
|| (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)
&& !metricName.getNamespace().equals(KafkaSinkMetrics.METRICS_NAMESPACE))) {
&& !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -835,10 +834,6 @@ public static void main(String[] args) throws Exception {
enableBigQueryMetrics();
}

if (DataflowRunner.hasExperiment(options, "enable_kafka_metrics")) {
KafkaSinkMetrics.setSupportKafkaMetrics(true);
}

JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
worker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import static org.apache.beam.runners.dataflow.worker.MetricsToPerStepNamespaceMetricsConverter.KAFKA_SINK_METRICS_NAMESPACE;
import static org.apache.beam.sdk.metrics.Metrics.THROTTLE_TIME_COUNTER_NAME;

import com.google.api.services.dataflow.model.CounterStructuredName;
Expand All @@ -35,7 +36,6 @@
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */
Expand Down Expand Up @@ -120,8 +120,7 @@ private void translateKnownPerWorkerCounters(List<PerStepNamespaceMetrics> metri
for (PerStepNamespaceMetrics perStepnamespaceMetrics : metrics) {
if (!BigQuerySinkMetrics.METRICS_NAMESPACE.equals(
perStepnamespaceMetrics.getMetricsNamespace())
&& !KafkaSinkMetrics.METRICS_NAMESPACE.equals(
perStepnamespaceMetrics.getMetricsNamespace())) {
&& !KAFKA_SINK_METRICS_NAMESPACE.equals(perStepnamespaceMetrics.getMetricsNamespace())) {
continue;
}
for (MetricValue metric : perStepnamespaceMetrics.getMetricValues()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.harness.JvmInitializer;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;

/** Initialize KafkaIO feature flags on worker. */
@AutoService(JvmInitializer.class)
public class KafkaIOInitializer implements JvmInitializer {
@Override
public void beforeProcessing(PipelineOptions options) {
if (ExperimentalOptions.hasExperiment(options, "enable_kafka_metrics")) {
KafkaSinkMetrics.setSupportKafkaMetrics(true);
}
}
}

0 comments on commit 04b059f

Please sign in to comment.