diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 9bb8b9390..4f0dc5a97 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -316,9 +316,9 @@ public boolean recover() { private void updateRotationTimers(SinkRecord currentRecord) { long now = time.milliseconds(); // Wallclock-based partitioners should be independent of the record argument. - lastRotate = isWallclockBased + lastRotate = isWallclockBased || currentRecord == null ? (Long) now - : currentRecord != null ? timestampExtractor.extract(currentRecord) : null; + : timestampExtractor.extract(currentRecord); if (log.isDebugEnabled() && rotateIntervalMs > 0) { log.debug( "Update last rotation timer. Next rotation for {} will be in {}ms", @@ -600,10 +600,10 @@ private void setState(State state) { } private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) { - Long currentTimestamp = null; - if (isWallclockBased) { + Long currentTimestamp; + if (isWallclockBased || currentRecord == null) { currentTimestamp = now; - } else if (currentRecord != null) { + } else { currentTimestamp = timestampExtractor.extract(currentRecord); lastRotate = lastRotate == null ? currentTimestamp : lastRotate; }