Skip to content

Commit

Permalink
ReadFromKafkaDoFn: handle failed seek (apache#32456)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dippatel98 authored and reeba212 committed Dec 4, 2024
1 parent 3f43922 commit bff7a47
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,11 @@ public ProcessContinuation processElement(
ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
long startOffset = tracker.currentRestriction().getFrom();

long expectedOffset = startOffset;
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
long skippedRecords = 0L;
final Stopwatch sw = Stopwatch.createStarted();

while (true) {
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition());
Expand All @@ -461,6 +462,36 @@ public ProcessContinuation processElement(
return ProcessContinuation.resume();
}
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
// If the Kafka consumer returns a record with an offset that is already processed
// the record can be safely skipped. This is needed because there is a possibility
// that the seek() above fails to move the offset to the desired position. In which
// case poll() would return records that are already cnsumed.
if (rawRecord.offset() < startOffset) {
// If the start offset is not reached even after skipping the records for 10 seconds
// then the processing is stopped with a backoff to give the Kakfa server some time
// catch up.
if (sw.elapsed().getSeconds() > 10L) {
LOG.error(
"The expected offset ({}) was not reached even after"
+ " skipping consumed records for 10 seconds. The offset we could"
+ " reach was {}. The processing of this bundle will be attempted"
+ " at a later time.",
expectedOffset,
rawRecord.offset());
return ProcessContinuation.resume()
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
}
skippedRecords++;
continue;
}
if (skippedRecords > 0L) {
LOG.warn(
"{} records were skipped due to seek returning an"
+ " earlier position than requested position of {}",
skippedRecords,
expectedOffset);
skippedRecords = 0L;
}
if (!tracker.tryClaim(rawRecord.offset())) {
return ProcessContinuation.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public class ReadFromKafkaDoFnTest {
private final ReadFromKafkaDoFn<String, String> exceptionDofnInstance =
ReadFromKafkaDoFn.create(makeReadSourceDescriptor(exceptionConsumer), RECORDS);

private final SimpleMockKafkaConsumerWithBrokenSeek consumerWithBrokenSeek =
new SimpleMockKafkaConsumerWithBrokenSeek(OffsetResetStrategy.NONE, topicPartition);

private final ReadFromKafkaDoFn<String, String> dofnInstanceWithBrokenSeek =
ReadFromKafkaDoFn.create(makeReadSourceDescriptor(consumerWithBrokenSeek), RECORDS);

private ReadSourceDescriptors<String, String> makeReadSourceDescriptor(
Consumer<byte[], byte[]> kafkaMockConsumer) {
return ReadSourceDescriptors.<String, String>read()
Expand Down Expand Up @@ -295,6 +301,17 @@ public synchronized long position(TopicPartition partition) {
}
}

private static class SimpleMockKafkaConsumerWithBrokenSeek extends SimpleMockKafkaConsumer {

public SimpleMockKafkaConsumerWithBrokenSeek(
OffsetResetStrategy offsetResetStrategy, TopicPartition topicPartition) {
super(offsetResetStrategy, topicPartition);
}

@Override
public synchronized void seek(TopicPartition partition, long offset) {}
}

private static class MockMultiOutputReceiver implements MultiOutputReceiver {

MockOutputReceiver<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> mockOutputReceiver =
Expand Down Expand Up @@ -377,6 +394,7 @@ private List<KV<KafkaSourceDescriptor, KafkaRecord<String, String>>> createExpec
public void setUp() throws Exception {
dofnInstance.setup();
exceptionDofnInstance.setup();
dofnInstanceWithBrokenSeek.setup();
consumer.reset();
}

Expand Down Expand Up @@ -475,6 +493,24 @@ public void testProcessElement() throws Exception {
receiver.getGoodRecords());
}

@Test
public void testProcessElementWithEarlierOffset() throws Exception {
MockMultiOutputReceiver receiver = new MockMultiOutputReceiver();
consumerWithBrokenSeek.setNumOfRecordsPerPoll(6L);
consumerWithBrokenSeek.setCurrentPos(0L);
long startOffset = 3L;
OffsetRangeTracker tracker =
new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
KafkaSourceDescriptor descriptor =
KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
ProcessContinuation result =
dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver);
assertEquals(ProcessContinuation.stop(), result);
assertEquals(
createExpectedRecords(descriptor, startOffset, 3, "key", "value"),
receiver.getGoodRecords());
}

@Test
public void testRawSizeMetric() throws Exception {
final int numElements = 1000;
Expand Down

0 comments on commit bff7a47

Please sign in to comment.