diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 9bb950bb8e6c..952e29f75104 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -441,10 +441,11 @@ public ProcessContinuation processElement( ConsumerSpEL.evaluateAssign( consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); long startOffset = tracker.currentRestriction().getFrom(); - long expectedOffset = startOffset; consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset); ConsumerRecords rawRecords = ConsumerRecords.empty(); + long skippedRecords = 0L; + final Stopwatch sw = Stopwatch.createStarted(); while (true) { rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); @@ -461,6 +462,36 @@ public ProcessContinuation processElement( return ProcessContinuation.resume(); } for (ConsumerRecord rawRecord : rawRecords) { + // If the Kafka consumer returns a record with an offset that is already processed + // the record can be safely skipped. This is needed because there is a possibility + // that the seek() above fails to move the offset to the desired position. In which + // case poll() would return records that are already cnsumed. + if (rawRecord.offset() < startOffset) { + // If the start offset is not reached even after skipping the records for 10 seconds + // then the processing is stopped with a backoff to give the Kakfa server some time + // catch up. + if (sw.elapsed().getSeconds() > 10L) { + LOG.error( + "The expected offset ({}) was not reached even after" + + " skipping consumed records for 10 seconds. The offset we could" + + " reach was {}. The processing of this bundle will be attempted" + + " at a later time.", + expectedOffset, + rawRecord.offset()); + return ProcessContinuation.resume() + .withResumeDelay(org.joda.time.Duration.standardSeconds(10L)); + } + skippedRecords++; + continue; + } + if (skippedRecords > 0L) { + LOG.warn( + "{} records were skipped due to seek returning an" + + " earlier position than requested position of {}", + skippedRecords, + expectedOffset); + skippedRecords = 0L; + } if (!tracker.tryClaim(rawRecord.offset())) { return ProcessContinuation.stop(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 75b9cfc9a74c..a9e4a4eddb61 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -110,6 +110,12 @@ public class ReadFromKafkaDoFnTest { private final ReadFromKafkaDoFn exceptionDofnInstance = ReadFromKafkaDoFn.create(makeReadSourceDescriptor(exceptionConsumer), RECORDS); + private final SimpleMockKafkaConsumerWithBrokenSeek consumerWithBrokenSeek = + new SimpleMockKafkaConsumerWithBrokenSeek(OffsetResetStrategy.NONE, topicPartition); + + private final ReadFromKafkaDoFn dofnInstanceWithBrokenSeek = + ReadFromKafkaDoFn.create(makeReadSourceDescriptor(consumerWithBrokenSeek), RECORDS); + private ReadSourceDescriptors makeReadSourceDescriptor( Consumer kafkaMockConsumer) { return ReadSourceDescriptors.read() @@ -295,6 +301,17 @@ public synchronized long position(TopicPartition partition) { } } + private static class SimpleMockKafkaConsumerWithBrokenSeek extends SimpleMockKafkaConsumer { + + public SimpleMockKafkaConsumerWithBrokenSeek( + OffsetResetStrategy offsetResetStrategy, TopicPartition topicPartition) { + super(offsetResetStrategy, topicPartition); + } + + @Override + public synchronized void seek(TopicPartition partition, long offset) {} + } + private static class MockMultiOutputReceiver implements MultiOutputReceiver { MockOutputReceiver>> mockOutputReceiver = @@ -377,6 +394,7 @@ private List>> createExpec public void setUp() throws Exception { dofnInstance.setup(); exceptionDofnInstance.setup(); + dofnInstanceWithBrokenSeek.setup(); consumer.reset(); } @@ -475,6 +493,24 @@ public void testProcessElement() throws Exception { receiver.getGoodRecords()); } + @Test + public void testProcessElementWithEarlierOffset() throws Exception { + MockMultiOutputReceiver receiver = new MockMultiOutputReceiver(); + consumerWithBrokenSeek.setNumOfRecordsPerPoll(6L); + consumerWithBrokenSeek.setCurrentPos(0L); + long startOffset = 3L; + OffsetRangeTracker tracker = + new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3)); + KafkaSourceDescriptor descriptor = + KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null); + ProcessContinuation result = + dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver); + assertEquals(ProcessContinuation.stop(), result); + assertEquals( + createExpectedRecords(descriptor, startOffset, 3, "key", "value"), + receiver.getGoodRecords()); + } + @Test public void testRawSizeMetric() throws Exception { final int numElements = 1000;