Skip to content

Commit

Permalink
add Kafka sdk poll metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Nov 12, 2024
1 parent 3a712dd commit ce6e175
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscribeTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
Expand Down Expand Up @@ -400,6 +401,20 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) {
dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}

boolean hasExperimentEnableKafkaMetrics = false;
if (dataflowOptions.getExperiments() != null) {
for (String experiment : dataflowOptions.getExperiments()) {
if (experiment.startsWith("enable_kafka_metrics")) {
hasExperimentEnableKafkaMetrics = true;
break;
}
}
}

if (dataflowOptions.isStreaming() && hasExperimentEnableKafkaMetrics) {
KafkaSinkMetrics.setSupportKafkaMetrics(true);
}

// Adding the Java version to the SDK name for user's and support convenience.
String agentJavaVer = "(JRE 8 environment)";
if (Environments.getJavaVersion() != Environments.JavaVersion.java8) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.HistogramCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand All @@ -51,10 +51,14 @@
@RunWith(JUnit4.class)
public class BigQuerySinkMetricsTest {

public static class TestHistogram implements Histogram {
public static class TestHistogramCell extends HistogramCell {
public List<Double> values = Lists.newArrayList();
private MetricName metricName = MetricName.named("namespace", "name");

public TestHistogramCell(KV<MetricName, HistogramData.BucketType> kv) {
super(kv);
}

@Override
public void update(double value) {
values.add(value);
Expand All @@ -68,10 +72,9 @@ public MetricName getName() {

public static class TestMetricsContainer extends MetricsContainerImpl {

// public TestHistogram testHistogram = new TestHistogram();
public ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogram>
public ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogramCell>
perWorkerHistograms =
new ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogram>();
new ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogramCell>();
public ConcurrentHashMap<MetricName, CounterCell> perWorkerCounters =
new ConcurrentHashMap<MetricName, CounterCell>();

Expand All @@ -80,11 +83,11 @@ public TestMetricsContainer() {
}

@Override
public Histogram getPerWorkerHistogram(
public TestHistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram());
perWorkerHistograms.computeIfAbsent(
KV.of(metricName, bucketType), kv -> new TestHistogramCell(kv));
return perWorkerHistograms.get(KV.of(metricName, bucketType));
// return testHistogram;
}

@Override
Expand All @@ -95,7 +98,6 @@ public Counter getPerWorkerCounter(MetricName metricName) {

@Override
public void reset() {
// testHistogram.values.clear();
perWorkerHistograms.clear();
perWorkerCounters.clear();
}
Expand Down Expand Up @@ -313,4 +315,4 @@ public void testStreamingInsertsMetrics_enabled() {
BigQuerySinkMetrics.streamingInsertsMetrics(),
instanceOf(StreamingInsertsMetrics.StreamingInsertsMetricsImpl.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
/** Stores and exports metrics for a batch of Kafka Client RPCs. */
public interface KafkaMetrics {

/* Record latency, to be used later to update/create histogram in another thread */
void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime);

/* Record and create histogram in current thread */
void recordRpcLatencyMetric(String topic, Duration duration);

void updateKafkaMetrics();

/** No-op implementation of {@code KafkaResults}. */
Expand All @@ -42,6 +46,9 @@ private NoOpKafkaMetrics() {}
@Override
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {}

@Override
public void recordRpcLatencyMetric(String topic, Duration elapsedTime) {}

@Override
public void updateKafkaMetrics() {}

Expand Down Expand Up @@ -78,7 +85,7 @@ public static KafkaMetricsImpl create() {
new HashMap<String, ConcurrentLinkedQueue<Duration>>(), new AtomicBoolean(true));
}

/** Record the rpc status and latency of a successful Kafka poll RPC call. */
/** Record the rpc latency of a successful Kafka poll RPC call. */
@Override
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
if (isWritable().get()) {
Expand All @@ -93,7 +100,7 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
}
}

/** Record rpc latency histogram metrics for all recorded topics. */
/** Create or update histograms with rpc latency metrics for all recorded topics. */
private void recordRpcLatencyMetrics() {
for (Map.Entry<String, ConcurrentLinkedQueue<Duration>> topicLatencies :
perTopicRpcLatencies().entrySet()) {
Expand All @@ -106,14 +113,27 @@ private void recordRpcLatencyMetrics() {
KafkaSinkMetrics.RpcMethod.POLL, topicLatencies.getKey());
latencyHistograms.put(topicLatencies.getKey(), topicHistogram);
}
// update all the latencies
for (Duration d : topicLatencies.getValue()) {
Preconditions.checkArgumentNotNull(topicHistogram);
topicHistogram.update(d.toMillis());
}
}
}

/** Create or update latency histogram for a singlar topic. */
@Override
public void recordRpcLatencyMetric(String topic, Duration duration) {
Histogram topicHistogram;
if (latencyHistograms.containsKey(topic)) {
topicHistogram = latencyHistograms.get(topic);
} else {
topicHistogram =
KafkaSinkMetrics.createRPCLatencyHistogram(KafkaSinkMetrics.RpcMethod.POLL, topic);
latencyHistograms.put(topic, topicHistogram);
}
topicHistogram.update(duration.toMillis());
}

/**
* Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics}
* containers. This function will only report metrics once per instance. Subsequent calls to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
// TODO, refactor out common parts for BQ sink, so it can be reused with other sinks, eg, GCS?
// @SuppressWarnings("unused")
public class KafkaSinkMetrics {
private static boolean supportKafkaMetrics = false;
private static boolean supportKafkaMetrics =
true; // where to set to true for UW if experiement is passed

public static final String METRICS_NAMESPACE = "KafkaSink";

Expand All @@ -50,6 +51,14 @@ enum RpcMethod {
private static final String TOPIC_LABEL = "topic_name";
private static final String RPC_METHOD = "rpc_method";

private static MetricName createMetricName(RpcMethod method, String topic) {
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY);
nameBuilder.addLabel(RPC_METHOD, method.toString());
nameBuilder.addLabel(TOPIC_LABEL, topic);
return nameBuilder.build(METRICS_NAMESPACE);
}

/**
* Creates an Histogram metric to record RPC latency. Metric will have name.
*
Expand All @@ -60,14 +69,8 @@ enum RpcMethod {
* @return Histogram with exponential buckets with a sqrt(2) growth factor.
*/
public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic) {
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY);
nameBuilder.addLabel(RPC_METHOD, method.toString());
nameBuilder.addLabel(TOPIC_LABEL, topic);

MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
MetricName metricName = createMetricName(method, topic);
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);

return new DelegatingHistogram(metricName, buckets, false, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ private ReadFromKafkaDoFn(

private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;

// Initialize only when used, since its not serializable
private @Nullable KafkaMetrics kafkaResults = null;

@VisibleForTesting final long consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
Expand Down Expand Up @@ -569,7 +572,12 @@ private ConsumerRecords<byte[], byte[]> poll(
java.time.Duration elapsed = java.time.Duration.ZERO;
java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout);
while (true) {
kafkaResults = KafkaSinkMetrics.kafkaMetrics();
final ConsumerRecords<byte[], byte[]> rawRecords = consumer.poll(timeout.minus(elapsed));
elapsed = sw.elapsed();
Preconditions.checkStateNotNull(kafkaResults);
kafkaResults.recordRpcLatencyMetric(topicPartition.topic(), elapsed);

if (!rawRecords.isEmpty()) {
// return as we have found some entries
return rawRecords;
Expand All @@ -578,7 +586,6 @@ private ConsumerRecords<byte[], byte[]> poll(
// there was no progress on the offset/position, which indicates end of stream
return rawRecords;
}
elapsed = sw.elapsed();
if (elapsed.toMillis() >= timeout.toMillis()) {
// timeout is over
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.core.metrics.HistogramCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.HistogramData;
Expand All @@ -39,10 +39,14 @@
// TODO:Naireen - Refactor to remove duplicate code between the two sinks
@RunWith(JUnit4.class)
public class KafkaMetricsTest {
public static class TestHistogram implements Histogram {
public static class TestHistogramCell extends HistogramCell {
public List<Double> values = Lists.newArrayList();
private MetricName metricName = MetricName.named("KafkaSink", "name");

public TestHistogramCell(KV<MetricName, HistogramData.BucketType> kv) {
super(kv);
}

@Override
public void update(double value) {
values.add(value);
Expand All @@ -55,25 +59,21 @@ public MetricName getName() {
}

public static class TestMetricsContainer extends MetricsContainerImpl {
public ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogram>
public ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogramCell>
perWorkerHistograms =
new ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogram>();
new ConcurrentHashMap<KV<MetricName, HistogramData.BucketType>, TestHistogramCell>();

public TestMetricsContainer() {
super("TestStep");
}

@Override
public Histogram getPerWorkerHistogram(
public TestHistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram());
perWorkerHistograms.computeIfAbsent(
KV.of(metricName, bucketType), kv -> new TestHistogramCell(kv));
return perWorkerHistograms.get(KV.of(metricName, bucketType));
}

@Override
public void reset() {
perWorkerHistograms.clear();
}
}

@Test
Expand Down Expand Up @@ -126,4 +126,4 @@ public void testKafkaRPCLatencyMetricsAreNotRecorded() throws Exception {
results.updateKafkaMetrics();
assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
}
}
}

0 comments on commit ce6e175

Please sign in to comment.