Skip to content

Commit

Permalink
Address Sam's comments
Browse files Browse the repository at this point in the history
[Dataflow Streaming] Use isolated windmill streams based on job settings (apache#32503)
  • Loading branch information
Naireen committed Sep 26, 2024
1 parent 129094b commit c03a6e3
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.auto.value.AutoValue;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -30,7 +29,7 @@
/** Stores and exports metrics for a batch of Kafka Client RPCs. */
public interface KafkaMetrics {

void updateSuccessfulRpcMetrics(String topic, Instant start, Instant end);
void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime);

void updateKafkaMetrics();

Expand All @@ -39,7 +38,7 @@ class NoOpKafkaMetrics implements KafkaMetrics {
private NoOpKafkaMetrics() {}

@Override
public void updateSuccessfulRpcMetrics(String topic, Instant start, Instant end) {}
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {}

@Override
public void updateKafkaMetrics() {}
Expand Down Expand Up @@ -77,15 +76,15 @@ public static KafkaMetricsImpl create() {

/** Record the rpc status and latency of a successful Kafka poll RPC call. */
@Override
public void updateSuccessfulRpcMetrics(String topic, Instant start, Instant end) {
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
if (isWritable().get()) {
ConcurrentLinkedQueue<Duration> latencies = perTopicRpcLatencies().get(topic);
if (latencies == null) {
latencies = new ConcurrentLinkedQueue<Duration>();
latencies.add(Duration.between(start, end));
latencies.add(elapsedTime);
perTopicRpcLatencies().put(topic, latencies);
} else {
latencies.add(Duration.between(start, end));
latencies.add(elapsedTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
Expand Down Expand Up @@ -407,6 +408,8 @@ public long getSplitBacklogBytes() {

// Created in each next batch, and updated at the end.
public KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
private Stopwatch stopwatch = Stopwatch.createUnstarted();
private String kafkaTopic = "";

@Override
public String toString() {
Expand Down Expand Up @@ -518,6 +521,13 @@ String name() {

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());

// Each source has a single unique topic.
for (TopicPartition topicPartition : partitions) {
this.kafkaTopic = topicPartition.topic();
break;
}

List<PartitionState<K, V>> states = new ArrayList<>(partitions.size());

if (checkpointMark != null) {
Expand Down Expand Up @@ -577,21 +587,12 @@ private void consumerPollLoop() {
while (!closed.get()) {
try {
if (records.isEmpty()) {
// Each source has a single unique topic.
List<TopicPartition> topicPartitions = source.getSpec().getTopicPartitions();
Preconditions.checkStateNotNull(topicPartitions);
String topicName = "null"; // value will be overridden
for (TopicPartition topicPartition : topicPartitions) {
topicName = topicPartition.topic();
break;
}

java.time.Instant operationStartTime = java.time.Instant.now();
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
java.time.Instant operationEndTime = java.time.Instant.now();

stopwatch.start();
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
stopwatch.stop();
kafkaResults.updateSuccessfulRpcMetrics(
topicName, operationStartTime, operationEndTime);
kafkaTopic, java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));

} else if (availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
Expand All @@ -616,6 +617,7 @@ private void consumerPollLoop() {

private void commitCheckpointMark() {
KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null);

if (checkpointMark != null) {
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.hamcrest.Matchers.equalTo;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
Expand Down Expand Up @@ -67,7 +66,6 @@ public TestMetricsContainer() {
@Override
public Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
System.out.println("xxx metricName " + metricName);
perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram());
return perWorkerHistograms.get(KV.of(metricName, bucketType));
}
Expand All @@ -83,9 +81,8 @@ public void testNoOpKafkaMetrics() throws Exception {
TestMetricsContainer testContainer = new TestMetricsContainer();
MetricsEnvironment.setCurrentContainer(testContainer);

Instant t1 = Instant.now();
KafkaMetrics results = KafkaMetrics.NoOpKafkaMetrics.getInstance();
results.updateSuccessfulRpcMetrics("test-topic", t1, t1.plus(Duration.ofMillis(10)));
results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));

results.updateKafkaMetrics();

Expand All @@ -97,10 +94,9 @@ public void testKafkaRPCLatencyMetrics() throws Exception {
TestMetricsContainer testContainer = new TestMetricsContainer();
MetricsEnvironment.setCurrentContainer(testContainer);

Instant t1 = Instant.now();
KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics();

results.updateSuccessfulRpcMetrics("test-topic", t1, t1.plus(Duration.ofMillis(10)));
results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));

results.updateKafkaMetrics();
// RpcLatency*rpc_method:POLL;topic_name:test-topic
Expand Down

0 comments on commit c03a6e3

Please sign in to comment.