From 0ed139bf00fa94bd6fb96500d60c5447047f1ac3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 20 Nov 2024 19:18:41 -0800 Subject: [PATCH] KAFKA-18038: fix flakey test StreamThreadTest.shouldLogAndRecordSkippedRecordsForInvalidTimestamps With KAFKA-17872, we changed some internals that effects the conditions of this test, introducing a race condition when the expected log messages are printed. This PR adds additional wait-conditions to the test to close the race condition. --- .../processor/internals/StreamThreadTest.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 42b90bf3ec61..7af603b3239c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.producer.MockProducer; @@ -141,6 +142,7 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.startsWith; @@ -262,7 +264,8 @@ private Properties configProps(final boolean enableEoS, final boolean stateUpdat mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), mkEntry(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(stateUpdaterEnabled)), - mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled)) + mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled)), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1") )); } @@ -2979,7 +2982,11 @@ public void shouldNotCommitNonRunningNonRestoringTasks(final boolean stateUpdate @ParameterizedTest @MethodSource("data") - public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps( + final boolean stateUpdaterEnabled, + final boolean processingThreadsEnabled + ) throws Exception { + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); @@ -3013,12 +3020,20 @@ public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean s addRecord(mockConsumer, ++offset); runOnce(processingThreadsEnabled); + if (processingThreadsEnabled) { + waitForCommit(mockConsumer, offset + 1); + } + addRecord(mockConsumer, ++offset); addRecord(mockConsumer, ++offset); addRecord(mockConsumer, ++offset); addRecord(mockConsumer, ++offset); runOnce(processingThreadsEnabled); + if (processingThreadsEnabled) { + waitForCommit(mockConsumer, offset + 1); + } + addRecord(mockConsumer, ++offset, 1L); addRecord(mockConsumer, ++offset, 1L); runOnce(processingThreadsEnabled); @@ -3059,6 +3074,18 @@ public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean s } } + private void waitForCommit(final MockConsumer mockConsumer, final long expectedOffset) throws Exception { + waitForCondition(() -> { + mockTime.sleep(10L); + runOnce(true); + final Map committed = mockConsumer.committed(Collections.singleton(t1p1)); + return !committed.isEmpty() && committed.get(t1p1).offset() == expectedOffset; + }, + "Never committed offset " + expectedOffset + ); + + } + @ParameterizedTest @MethodSource("data") public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {