diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7bc4b47ab087..f65365f274e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1492,7 +1492,7 @@ private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { throw new IllegalArgumentException("Consumer group metadata could not be null"); } else if (groupMetadata.generationId() > 0 && JoinGroupRequest.UNKNOWN_MEMBER_ID.equals(groupMetadata.memberId())) { - throw new IllegalArgumentException("Passed in group metadata " + groupMetadata + " has generationId > 0 but member.id "); + throw new IllegalArgumentException("Passed in group metadata " + groupMetadata + " has generationId > 0 but the member.id is unknown"); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 1fdd298088bb..3ea6a374e84a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -786,7 +786,7 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition - if (recordInfo.queue().size() == maxBufferedSize) { + if (recordInfo.queue().size() <= maxBufferedSize) { partitionsToResume.add(partition); }