Skip to content

Commit

Permalink
KAFKA-18038: fix flakey test StreamThreadTest.shouldLogAndRecordSkipp…
Browse files Browse the repository at this point in the history
…edRecordsForInvalidTimestamps

With KAFKA-17872, we changed some internals that effects the conditions
of this test, introducing a race condition when the expected log
messages are printed.

This PR adds additional wait-conditions to the test to close the race
condition.
  • Loading branch information
mjsax committed Nov 21, 2024
1 parent 57299cf commit 0ed139b
Showing 1 changed file with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
Expand Down Expand Up @@ -141,6 +142,7 @@
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.startsWith;
Expand Down Expand Up @@ -262,7 +264,8 @@ private Properties configProps(final boolean enableEoS, final boolean stateUpdat
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()),
mkEntry(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(stateUpdaterEnabled)),
mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled))
mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1")
));
}

Expand Down Expand Up @@ -2979,7 +2982,11 @@ public void shouldNotCommitNonRunningNonRestoringTasks(final boolean stateUpdate

@ParameterizedTest
@MethodSource("data")
public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled
) throws Exception {

internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);

final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
Expand Down Expand Up @@ -3013,12 +3020,20 @@ public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean s
addRecord(mockConsumer, ++offset);
runOnce(processingThreadsEnabled);

if (processingThreadsEnabled) {
waitForCommit(mockConsumer, offset + 1);
}

addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
runOnce(processingThreadsEnabled);

if (processingThreadsEnabled) {
waitForCommit(mockConsumer, offset + 1);
}

addRecord(mockConsumer, ++offset, 1L);
addRecord(mockConsumer, ++offset, 1L);
runOnce(processingThreadsEnabled);
Expand Down Expand Up @@ -3059,6 +3074,18 @@ public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean s
}
}

private void waitForCommit(final MockConsumer<byte[], byte[]> mockConsumer, final long expectedOffset) throws Exception {
waitForCondition(() -> {
mockTime.sleep(10L);
runOnce(true);
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Collections.singleton(t1p1));
return !committed.isEmpty() && committed.get(t1p1).offset() == expectedOffset;
},
"Never committed offset " + expectedOffset
);

}

@ParameterizedTest
@MethodSource("data")
public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
Expand Down

0 comments on commit 0ed139b

Please sign in to comment.