diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index abe7d0d364d3..68f0568cab7b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -98,6 +98,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscribeTransform; import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; @@ -400,6 +401,20 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) { dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT); } + boolean hasExperimentEnableKafkaMetrics = false; + if (dataflowOptions.getExperiments() != null) { + for (String experiment : dataflowOptions.getExperiments()) { + if (experiment.startsWith("enable_kafka_metrics")) { + hasExperimentEnableKafkaMetrics = true; + break; + } + } + } + + if (dataflowOptions.isStreaming() && hasExperimentEnableKafkaMetrics) { + KafkaSinkMetrics.setSupportKafkaMetrics(true); + } + // Adding the Java version to the SDK name for user's and support convenience. String agentJavaVer = "(JRE 8 environment)"; if (Environments.getJavaVersion() != Environments.JavaVersion.java8) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java index 147a30dcdd1a..0bbef88900ae 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -31,8 +31,12 @@ /** Stores and exports metrics for a batch of Kafka Client RPCs. */ public interface KafkaMetrics { + /* Record latency, to be used later to update/create histogram in another thread */ void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime); + /* Record and create histogram in current thread */ + void recordRpcLatencyMetric(String topic, Duration duration); + void updateKafkaMetrics(); /** No-op implementation of {@code KafkaResults}. */ @@ -42,6 +46,9 @@ private NoOpKafkaMetrics() {} @Override public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {} + @Override + public void recordRpcLatencyMetric(String topic, Duration elapsedTime) {} + @Override public void updateKafkaMetrics() {} @@ -78,7 +85,7 @@ public static KafkaMetricsImpl create() { new HashMap>(), new AtomicBoolean(true)); } - /** Record the rpc status and latency of a successful Kafka poll RPC call. */ + /** Record the rpc latency of a successful Kafka poll RPC call. */ @Override public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { if (isWritable().get()) { @@ -93,7 +100,7 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { } } - /** Record rpc latency histogram metrics for all recorded topics. */ + /** Create or update histograms with rpc latency metrics for all recorded topics. */ private void recordRpcLatencyMetrics() { for (Map.Entry> topicLatencies : perTopicRpcLatencies().entrySet()) { @@ -106,7 +113,6 @@ private void recordRpcLatencyMetrics() { KafkaSinkMetrics.RpcMethod.POLL, topicLatencies.getKey()); latencyHistograms.put(topicLatencies.getKey(), topicHistogram); } - // update all the latencies for (Duration d : topicLatencies.getValue()) { Preconditions.checkArgumentNotNull(topicHistogram); topicHistogram.update(d.toMillis()); @@ -114,6 +120,20 @@ private void recordRpcLatencyMetrics() { } } + /** Create or update latency histogram for a singlar topic. */ + @Override + public void recordRpcLatencyMetric(String topic, Duration duration) { + Histogram topicHistogram; + if (latencyHistograms.containsKey(topic)) { + topicHistogram = latencyHistograms.get(topic); + } else { + topicHistogram = + KafkaSinkMetrics.createRPCLatencyHistogram(KafkaSinkMetrics.RpcMethod.POLL, topic); + latencyHistograms.put(topic, topicHistogram); + } + topicHistogram.update(duration.toMillis()); + } + /** * Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics} * containers. This function will only report metrics once per instance. Subsequent calls to diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java index f71926f97d27..5b8b7fc5aa7a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java @@ -34,7 +34,8 @@ // TODO, refactor out common parts for BQ sink, so it can be reused with other sinks, eg, GCS? // @SuppressWarnings("unused") public class KafkaSinkMetrics { - private static boolean supportKafkaMetrics = false; + private static boolean supportKafkaMetrics = + true; // where to set to true for UW if experiement is passed public static final String METRICS_NAMESPACE = "KafkaSink"; @@ -50,6 +51,14 @@ enum RpcMethod { private static final String TOPIC_LABEL = "topic_name"; private static final String RPC_METHOD = "rpc_method"; + private static MetricName createMetricName(RpcMethod method, String topic) { + LabeledMetricNameUtils.MetricNameBuilder nameBuilder = + LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY); + nameBuilder.addLabel(RPC_METHOD, method.toString()); + nameBuilder.addLabel(TOPIC_LABEL, topic); + return nameBuilder.build(METRICS_NAMESPACE); + } + /** * Creates an Histogram metric to record RPC latency. Metric will have name. * @@ -60,14 +69,8 @@ enum RpcMethod { * @return Histogram with exponential buckets with a sqrt(2) growth factor. */ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic) { - LabeledMetricNameUtils.MetricNameBuilder nameBuilder = - LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY); - nameBuilder.addLabel(RPC_METHOD, method.toString()); - nameBuilder.addLabel(TOPIC_LABEL, topic); - - MetricName metricName = nameBuilder.build(METRICS_NAMESPACE); + MetricName metricName = createMetricName(method, topic); HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17); - return new DelegatingHistogram(metricName, buckets, false, true); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 4bda8cf28d4e..e10a3b0ed4ac 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -228,6 +228,9 @@ private ReadFromKafkaDoFn( private HashMap perPartitionBacklogMetrics = new HashMap();; + // Initialize only when used, since its not serializable + private @Nullable KafkaMetrics kafkaResults = null; + @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @@ -571,7 +574,12 @@ private ConsumerRecords poll( java.time.Duration elapsed = java.time.Duration.ZERO; java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout); while (true) { + kafkaResults = KafkaSinkMetrics.kafkaMetrics(); final ConsumerRecords rawRecords = consumer.poll(timeout.minus(elapsed)); + elapsed = sw.elapsed(); + Preconditions.checkStateNotNull(kafkaResults); + kafkaResults.recordRpcLatencyMetric(topicPartition.topic(), elapsed); + if (!rawRecords.isEmpty()) { // return as we have found some entries return rawRecords; @@ -580,7 +588,6 @@ private ConsumerRecords poll( // there was no progress on the offset/position, which indicates end of stream return rawRecords; } - elapsed = sw.elapsed(); if (elapsed.toMillis() >= timeout.toMillis()) { // timeout is over LOG.warn( diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java index b84e143be773..44a41b7214e1 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java @@ -24,8 +24,8 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.core.metrics.HistogramCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; -import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.HistogramData; @@ -39,10 +39,14 @@ // TODO:Naireen - Refactor to remove duplicate code between the two sinks @RunWith(JUnit4.class) public class KafkaMetricsTest { - public static class TestHistogram implements Histogram { + public static class TestHistogramCell extends HistogramCell { public List values = Lists.newArrayList(); private MetricName metricName = MetricName.named("KafkaSink", "name"); + public TestHistogramCell(KV kv) { + super(kv); + } + @Override public void update(double value) { values.add(value); @@ -55,25 +59,21 @@ public MetricName getName() { } public static class TestMetricsContainer extends MetricsContainerImpl { - public ConcurrentHashMap, TestHistogram> + public ConcurrentHashMap, TestHistogramCell> perWorkerHistograms = - new ConcurrentHashMap, TestHistogram>(); + new ConcurrentHashMap, TestHistogramCell>(); public TestMetricsContainer() { super("TestStep"); } @Override - public Histogram getPerWorkerHistogram( + public TestHistogramCell getPerWorkerHistogram( MetricName metricName, HistogramData.BucketType bucketType) { - perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram()); + perWorkerHistograms.computeIfAbsent( + KV.of(metricName, bucketType), kv -> new TestHistogramCell(kv)); return perWorkerHistograms.get(KV.of(metricName, bucketType)); } - - @Override - public void reset() { - perWorkerHistograms.clear(); - } } @Test @@ -126,4 +126,4 @@ public void testKafkaRPCLatencyMetricsAreNotRecorded() throws Exception { results.updateKafkaMetrics(); assertThat(testContainer.perWorkerHistograms.size(), equalTo(0)); } -} +} \ No newline at end of file