From c03a5e09445d7d4279bb6450b0749362ca233d93 Mon Sep 17 00:00:00 2001 From: johnjcasey <95318300+johnjcasey@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:27:25 -0500 Subject: [PATCH] Change dead partition detection to only look at the current topic (#33089) * Change dead partition detection to only look at the current topic, instead of looking at all topics * spotless * fix test, simplify existence check --- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 21 +++++-------------- .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 11 ++++++++++ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index add76c9682a0..4d7aa6b32aef 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -21,11 +21,9 @@ import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -463,7 +461,8 @@ public ProcessContinuation processElement( // and move to process the next element. if (rawRecords.isEmpty()) { if (!topicPartitionExists( - kafkaSourceDescriptor.getTopicPartition(), consumer.listTopics())) { + kafkaSourceDescriptor.getTopicPartition(), + consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) { return ProcessContinuation.stop(); } if (timestampPolicy != null) { @@ -557,20 +556,10 @@ public ProcessContinuation processElement( } private boolean topicPartitionExists( - TopicPartition topicPartition, Map> topicListMap) { + TopicPartition topicPartition, List partitionInfos) { // Check if the current TopicPartition still exists. - Set existingTopicPartitions = new HashSet<>(); - for (List topicPartitionList : topicListMap.values()) { - topicPartitionList.forEach( - partitionInfo -> { - existingTopicPartitions.add( - new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - }); - } - if (!existingTopicPartitions.contains(topicPartition)) { - return false; - } - return true; + return partitionInfos.stream() + .anyMatch(partitionInfo -> partitionInfo.partition() == (topicPartition.partition())); } // see https://github.com/apache/beam/issues/25962 diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 52c141685760..cbff0f896619 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -252,6 +252,17 @@ public synchronized Map> listTopics() { topicPartition.topic(), topicPartition.partition(), null, null, null))); } + @Override + public synchronized List partitionsFor(String partition) { + if (this.isRemoved) { + return ImmutableList.of(); + } else { + return ImmutableList.of( + new PartitionInfo( + topicPartition.topic(), topicPartition.partition(), null, null, null)); + } + } + @Override public synchronized void assign(Collection partitions) { assertTrue(Iterables.getOnlyElement(partitions).equals(this.topicPartition));