From d36b24f45f700de318062eeeda90cfb657892151 Mon Sep 17 00:00:00 2001 From: Laxman Ch Date: Sat, 23 Nov 2024 08:02:00 +0530 Subject: [PATCH 1/2] KAFKA-17299: Fix Kafka Streams consumer hang issue (#17899) When Kafka Streams skips overs corrupted messages, it might not resume previously paused partitions, if more than one record is skipped at once, and if the buffer drop below the max-buffer limit at the same time. Reviewers: Matthias J. Sax --- .../apache/kafka/streams/processor/internals/StreamTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 1fdd298088bb..3ea6a374e84a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -786,7 +786,7 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition - if (recordInfo.queue().size() == maxBufferedSize) { + if (recordInfo.queue().size() <= maxBufferedSize) { partitionsToResume.add(partition); } From b04a49831784b8bb5e15bfc2c7f63a7e501097b4 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 23 Nov 2024 14:07:12 +0800 Subject: [PATCH 2/2] MINOR: Enhance error message in KafkaProducer#throwIfInvalidGroupMetadata (#17915) Reviewers: Chia-Ping Tsai --- .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7bc4b47ab087..f65365f274e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1492,7 +1492,7 @@ private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { throw new IllegalArgumentException("Consumer group metadata could not be null"); } else if (groupMetadata.generationId() > 0 && JoinGroupRequest.UNKNOWN_MEMBER_ID.equals(groupMetadata.memberId())) { - throw new IllegalArgumentException("Passed in group metadata " + groupMetadata + " has generationId > 0 but member.id "); + throw new IllegalArgumentException("Passed in group metadata " + groupMetadata + " has generationId > 0 but the member.id is unknown"); } }