Skip to content

Commit

Permalink
Change dead partition detection to only look at the current topic (#3…
Browse files Browse the repository at this point in the history
…3089)

* Change dead partition detection to only look at the current topic, instead of looking at all topics

* spotless

* fix test, simplify existence check
  • Loading branch information
johnjcasey authored Nov 12, 2024
1 parent 2604943 commit c03a5e0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -463,7 +461,8 @@ public ProcessContinuation processElement(
// and move to process the next element.
if (rawRecords.isEmpty()) {
if (!topicPartitionExists(
kafkaSourceDescriptor.getTopicPartition(), consumer.listTopics())) {
kafkaSourceDescriptor.getTopicPartition(),
consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) {
return ProcessContinuation.stop();
}
if (timestampPolicy != null) {
Expand Down Expand Up @@ -557,20 +556,10 @@ public ProcessContinuation processElement(
}

private boolean topicPartitionExists(
TopicPartition topicPartition, Map<String, List<PartitionInfo>> topicListMap) {
TopicPartition topicPartition, List<PartitionInfo> partitionInfos) {
// Check if the current TopicPartition still exists.
Set<TopicPartition> existingTopicPartitions = new HashSet<>();
for (List<PartitionInfo> topicPartitionList : topicListMap.values()) {
topicPartitionList.forEach(
partitionInfo -> {
existingTopicPartitions.add(
new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
});
}
if (!existingTopicPartitions.contains(topicPartition)) {
return false;
}
return true;
return partitionInfos.stream()
.anyMatch(partitionInfo -> partitionInfo.partition() == (topicPartition.partition()));
}

// see https://github.com/apache/beam/issues/25962
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,17 @@ public synchronized Map<String, List<PartitionInfo>> listTopics() {
topicPartition.topic(), topicPartition.partition(), null, null, null)));
}

@Override
public synchronized List<PartitionInfo> partitionsFor(String partition) {
if (this.isRemoved) {
return ImmutableList.of();
} else {
return ImmutableList.of(
new PartitionInfo(
topicPartition.topic(), topicPartition.partition(), null, null, null));
}
}

@Override
public synchronized void assign(Collection<TopicPartition> partitions) {
assertTrue(Iterables.getOnlyElement(partitions).equals(this.topicPartition));
Expand Down

0 comments on commit c03a5e0

Please sign in to comment.