diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 952e29f75104..585490c1d843 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -25,7 +25,13 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -52,8 +58,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; @@ -68,6 +72,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,6 +242,10 @@ private ReadFromKafkaDoFn( @VisibleForTesting static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX; + private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1; + private static final Map partitionState = + new ConcurrentHashMap<>(); + private static final UUID machineUUID = UUID.randomUUID(); /** * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to * fetch backlog. @@ -244,46 +253,88 @@ private ReadFromKafkaDoFn( private static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator { - private final Consumer offsetConsumer; - private final TopicPartition topicPartition; - private final Supplier memoizedBacklog; - private boolean closed; - - KafkaLatestOffsetEstimator( - Consumer offsetConsumer, TopicPartition topicPartition) { - this.offsetConsumer = offsetConsumer; - this.topicPartition = topicPartition; - ConsumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition)); - memoizedBacklog = - Suppliers.memoizeWithExpiration( - () -> { - synchronized (offsetConsumer) { - ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition); - return offsetConsumer.position(topicPartition); - } - }, - 1, - TimeUnit.SECONDS); - } - - @Override - protected void finalize() { - try { - Closeables.close(offsetConsumer, true); - closed = true; - LOG.info("Offset Estimator consumer was closed for {}", topicPartition); - } catch (Exception anyException) { - LOG.warn("Failed to close offset consumer for {}", topicPartition); + private final AtomicLong endOffset = new AtomicLong(Long.MAX_VALUE); + private final KafkaSourceDescriptor kafkaSourceDescriptor; + private final Consumer localOffsetConsumer; + @Nullable ScheduledFuture scheduledFuture; + ScheduledExecutorService offsetFetcherThread = Executors.newSingleThreadScheduledExecutor(); + // private AtomicBoolean isStarted; + UUID instanceUUID = UUID.randomUUID(); + Instant lastQueried = Instant.now(); + + public KafkaLatestOffsetEstimator( + KafkaSourceDescriptor kafkaSourceDescriptor, Consumer localOffsetConsumer) { + this.kafkaSourceDescriptor = kafkaSourceDescriptor; + this.localOffsetConsumer = localOffsetConsumer; + + LOG.info( + "bzablockilog query for end position from constructor for {}, instanceUUID: {}, machineUUID: {}", + kafkaSourceDescriptor.getTopicPartition(), + instanceUUID, + machineUUID); + + Long currentEndOffset = + localOffsetConsumer + .endOffsets(ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())) + .get(kafkaSourceDescriptor.getTopicPartition()); + + LOG.info( + "bzablockilog queried end pos from constructor for {}, offset: {}, instanceUUID: {}, machineUUID: {}", + kafkaSourceDescriptor.getTopicPartition(), + currentEndOffset, + instanceUUID, + machineUUID); + if (currentEndOffset != null) { + endOffset.set(currentEndOffset); } } @Override public long estimate() { - return memoizedBacklog.get(); + lastQueried = Instant.now(); + return endOffset.get(); + } + + // public void updateEndOffset(Long currentEndOffset) { + // endOffset.set(currentEndOffset); + // } + + public void startThread() { + scheduledFuture = + offsetFetcherThread.scheduleAtFixedRate( + this::updateLatestOffsets, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); + // this.isStarted.set(true); + } + + private void updateLatestOffsets() { + if (new Duration(Instant.now(), lastQueried).toStandardSeconds().getSeconds() > 10) { + LOG.info( + "bzablockilog kill thread for {}, instanceUUID: {}, machineUUID: {}", + kafkaSourceDescriptor.getTopicPartition(), + instanceUUID, + machineUUID); + killThread(); + } + LOG.info( + "bzablockilog query for end position for {}, instanceUUID: {}, machineUUID: {}", + kafkaSourceDescriptor.getTopicPartition(), + instanceUUID, + machineUUID); + + Long currentEndOffset = + localOffsetConsumer + .endOffsets(ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())) + .get(kafkaSourceDescriptor.getTopicPartition()); + + if (currentEndOffset != null) { + endOffset.set(currentEndOffset); + } } - public boolean isClosed() { - return closed; + public void killThread() { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } } } @@ -293,34 +344,52 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); TopicPartition partition = kafkaSourceDescriptor.getTopicPartition(); LOG.info("Creating Kafka consumer for initial restriction for {}", partition); - try (Consumer offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) { - ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition)); - long startOffset; - @Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime(); - if (kafkaSourceDescriptor.getStartReadOffset() != null) { - startOffset = kafkaSourceDescriptor.getStartReadOffset(); - } else if (startReadTime != null) { - startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, startReadTime); - } else { - startOffset = offsetConsumer.position(partition); - } - long endOffset = Long.MAX_VALUE; - @Nullable Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime(); - if (kafkaSourceDescriptor.getStopReadOffset() != null) { - endOffset = kafkaSourceDescriptor.getStopReadOffset(); - } else if (stopReadTime != null) { - endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, stopReadTime); - } - new OffsetRange(startOffset, endOffset); - Lineage.getSources() - .add( - "kafka", - ImmutableList.of( - (String) updatedConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), partition.topic()))); - return new OffsetRange(startOffset, endOffset); + Map updatedConsumerConfig2 = + KafkaIOUtils.getOffsetConsumerConfig( + "tracker-" + kafkaSourceDescriptor.getTopicPartition(), + offsetConsumerConfig, + updatedConsumerConfig); + + Consumer offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig2); + + ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition)); + + long startOffset; + @Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime(); + if (kafkaSourceDescriptor.getStartReadOffset() != null) { + startOffset = kafkaSourceDescriptor.getStartReadOffset(); + } else if (startReadTime != null) { + startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, startReadTime); + } else { + startOffset = offsetConsumer.position(partition); + } + + long endOffset = Long.MAX_VALUE; + @Nullable Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime(); + if (kafkaSourceDescriptor.getStopReadOffset() != null) { + endOffset = kafkaSourceDescriptor.getStopReadOffset(); + } else if (stopReadTime != null) { + endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, stopReadTime); } + + // Fetch offsets once before running periodically. + LOG.info( + "bzablockilog end offset background thread entry for {} ", + kafkaSourceDescriptor.getTopicPartition()); + + KafkaLatestOffsetEstimator kafkaLatestOffsetEstimator = + partitionState.computeIfAbsent( + kafkaSourceDescriptor, k -> new KafkaLatestOffsetEstimator(k, offsetConsumer)); + kafkaLatestOffsetEstimator.startThread(); + + Lineage.getSources() + .add( + "kafka", + ImmutableList.of( + (String) updatedConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), partition.topic()))); + return new OffsetRange(startOffset, endOffset); } @GetInitialWatermarkEstimatorState @@ -365,29 +434,70 @@ public OffsetRangeTracker restrictionTracker( return new OffsetRangeTracker(restriction); } - // OffsetEstimators are cached for each topic-partition because they hold a stateful connection, - // so we want to minimize the amount of connections that we start and track with Kafka. Another - // point is that it has a memoized backlog, and this should make that more reusable estimations. - final Map offsetEstimatorCacheInstance = - Preconditions.checkStateNotNull(this.offsetEstimatorCache); - - TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition); - if (offsetEstimator == null || offsetEstimator.isClosed()) { + KafkaLatestOffsetEstimator rangeEndEstimator = partitionState.get(kafkaSourceDescriptor); + if (rangeEndEstimator == null) { + LOG.info( + "bzablockilog KafkaLatestOffsetEstimator not in cache for {}", + kafkaSourceDescriptor.getTopicPartition()); + TopicPartition partition = kafkaSourceDescriptor.getTopicPartition(); + LOG.info("Creating Kafka consumer for initial restriction for {}", partition); Map updatedConsumerConfig = overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); + Map updatedConsumerConfig2 = + KafkaIOUtils.getOffsetConsumerConfig( + "tracker-" + kafkaSourceDescriptor.getTopicPartition(), + offsetConsumerConfig, + updatedConsumerConfig); - LOG.info("Creating Kafka consumer for offset estimation for {}", topicPartition); + Consumer offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig2); - Consumer offsetConsumer = - consumerFactoryFn.apply( - KafkaIOUtils.getOffsetConsumerConfig( - "tracker-" + topicPartition, offsetConsumerConfig, updatedConsumerConfig)); - offsetEstimator = new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition); - offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator); - } + ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition)); + + LOG.info( + "bzablockilog NewTracker creating end offset background thread entry for {} ", + kafkaSourceDescriptor.getTopicPartition()); - return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); + rangeEndEstimator = + partitionState.computeIfAbsent( + kafkaSourceDescriptor, k -> new KafkaLatestOffsetEstimator(k, offsetConsumer)); + rangeEndEstimator.startThread(); + + // throw new RuntimeException( + // "bzablockilog KafkaLatestOffsetEstimator for " + // + kafkaSourceDescriptor.getTopicPartition() + // + " doesn't exist."); + } + return new GrowableOffsetRangeTracker(restriction.getFrom(), rangeEndEstimator); + + // // return new GrowableOffsetRangeTracker(restriction.getFrom(), new + // KafkaLatestOffsetEstimator(kafkaSourceDescriptor)); + // // OffsetEstimators are cached for each topic-partition because they hold a stateful + // connection, + // // so we want to minimize the amount of connections that we start and track with Kafka. + // Another + // // point is that it has a memoized backlog, and this should make that more reusable + // estimations. + // final Map offsetEstimatorCacheInstance = + // Preconditions.checkStateNotNull(this.offsetEstimatorCache); + // + // TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); + // KafkaLatestOffsetEstimator offsetEstimator = + // offsetEstimatorCacheInstance.get(topicPartition); + // if (offsetEstimator == null || offsetEstimator.isClosed()) { + // Map updatedConsumerConfig = + // overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); + // + // LOG.info("Creating Kafka consumer for offset estimation for {}", topicPartition); + // + // Consumer offsetConsumer = + // consumerFactoryFn.apply( + // KafkaIOUtils.getOffsetConsumerConfig( + // "tracker-" + topicPartition, offsetConsumerConfig, updatedConsumerConfig)); + // offsetEstimator = new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition); + // offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator); + // } + // + // return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } @ProcessElement @@ -493,6 +603,19 @@ public ProcessContinuation processElement( skippedRecords = 0L; } if (!tracker.tryClaim(rawRecord.offset())) { + KafkaLatestOffsetEstimator kafkaLatestOffsetEstimator = + partitionState.get(kafkaSourceDescriptor); + LOG.info( + "bzablockilog tryClaim failed for {} for offset {}. range was to {}. Trying to stop associated backgroundthread with kafkaLatestOffsetEstimator. Is it null? {}", + kafkaSourceDescriptor.getTopicPartition(), + rawRecord.offset(), + tracker.currentRestriction().getTo(), + kafkaLatestOffsetEstimator == null); + if (kafkaLatestOffsetEstimator != null) { + LOG.info("bzablockilog remove key for {}", kafkaSourceDescriptor.getTopicPartition()); + kafkaLatestOffsetEstimator.killThread(); + partitionState.remove(kafkaSourceDescriptor); + } return ProcessContinuation.stop(); } try {