diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index d86a5d0ce686..209dee14da1e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -227,10 +226,6 @@ public boolean advance() throws IOException { METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString()); rawSizes.update(recordSize); - for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { - backlogBytesOfSplit.set(backlogSplit.getValue()); - } - // Pass metrics to container. kafkaResults.updateKafkaMetrics(); return true; @@ -349,7 +344,6 @@ public long getSplitBacklogBytes() { private final Counter bytesReadBySplit; private final Gauge backlogBytesOfSplit; private final Gauge backlogElementsOfSplit; - private HashMap perPartitionBacklogMetrics = new HashMap();; private final Counter checkpointMarkCommitsEnqueued = Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC); // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed). @@ -506,10 +500,6 @@ Instant updateAndGetWatermark() { lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext()); return lastWatermark; } - - String name() { - return this.topicPartition.toString(); - } } KafkaUnboundedReader( @@ -554,16 +544,14 @@ String name() { prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis())); } - PartitionState state = - new PartitionState( + states.add( + new PartitionState<>( tp, nextOffset, source .getSpec() .getTimestampPolicyFactory() - .createTimestampPolicy(tp, prevWatermark)); - states.add(state); - perPartitionBacklogMetrics.put(state.name(), 0L); + .createTimestampPolicy(tp, prevWatermark))); } partitionStates = ImmutableList.copyOf(states); @@ -680,6 +668,8 @@ private void nextBatch() throws IOException { partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator()); + reportBacklog(); + // cycle through the partitions in order to interleave records from each. curBatch = Iterators.cycle(new ArrayList<>(partitionStates)); } @@ -758,7 +748,6 @@ private long getSplitBacklogMessageCount() { if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } - perPartitionBacklogMetrics.put(p.name(), pBacklog); backlogCount += pBacklog; } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 4d7aa6b32aef..26964d43a16f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -19,11 +19,14 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.math.BigDecimal; +import java.math.MathContext; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -222,13 +225,12 @@ private ReadFromKafkaDoFn( // Valid between bundle start and bundle finish. private transient @Nullable Deserializer keyDeserializerInstance = null; private transient @Nullable Deserializer valueDeserializerInstance = null; - private transient @Nullable Map offsetEstimatorCache; + private transient @Nullable LoadingCache + offsetEstimatorCache; - private transient @Nullable LoadingCache avgRecordSize; + private transient @Nullable LoadingCache + avgRecordSizeCache; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; - - private HashMap perPartitionBacklogMetrics = new HashMap();; - @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @@ -290,7 +292,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource Map updatedConsumerConfig = overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); TopicPartition partition = kafkaSourceDescriptor.getTopicPartition(); - LOG.info("Creating Kafka consumer for initial restriction for {}", partition); + LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor); try (Consumer offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) { ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition)); long startOffset; @@ -337,38 +339,31 @@ public WatermarkEstimator newWatermarkEstimator( @GetSize public double getSize( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange) - throws Exception { + throws ExecutionException { // If present, estimates the record size to offset gap ratio. Compacted topics may hold less // records than the estimated offset range due to record deletion within a partition. - final LoadingCache avgRecordSize = - Preconditions.checkStateNotNull(this.avgRecordSize); + final LoadingCache avgRecordSizeCache = + Preconditions.checkStateNotNull(this.avgRecordSizeCache); + final @Nullable AverageRecordSize avgRecordSize = + avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor); // The tracker estimates the offset range by subtracting the last claimed position from the // currently observed end offset for the partition belonging to this split. double estimatedOffsetRange = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining(); // Before processing elements, we don't have a good estimated size of records and offset gap. // Return the estimated offset range without scaling by a size to gap ratio. - if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) { + if (avgRecordSize == null) { return estimatedOffsetRange; } - if (offsetEstimatorCache != null) { - for (Map.Entry tp : - offsetEstimatorCache.entrySet()) { - perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate()); - } - } - // When processing elements, a moving average estimates the size of records and offset gap. // Return the estimated offset range scaled by the estimated size to gap ratio. - return estimatedOffsetRange - * avgRecordSize - .get(kafkaSourceDescriptor.getTopicPartition()) - .estimateRecordByteSizeToOffsetCountRatio(); + return estimatedOffsetRange * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio(); } @NewTracker public OffsetRangeTracker restrictionTracker( - @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) { + @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) + throws ExecutionException { if (restriction.getTo() < Long.MAX_VALUE) { return new OffsetRangeTracker(restriction); } @@ -376,24 +371,10 @@ public OffsetRangeTracker restrictionTracker( // OffsetEstimators are cached for each topic-partition because they hold a stateful connection, // so we want to minimize the amount of connections that we start and track with Kafka. Another // point is that it has a memoized backlog, and this should make that more reusable estimations. - final Map offsetEstimatorCacheInstance = + final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); - - TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition); - if (offsetEstimator == null) { - Map updatedConsumerConfig = - overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); - - LOG.info("Creating Kafka consumer for offset estimation for {}", topicPartition); - - Consumer offsetConsumer = - consumerFactoryFn.apply( - KafkaIOUtils.getOffsetConsumerConfig( - "tracker-" + topicPartition, offsetConsumerConfig, updatedConsumerConfig)); - offsetEstimator = new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition); - offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator); - } + final KafkaLatestOffsetEstimator offsetEstimator = + offsetEstimatorCache.get(kafkaSourceDescriptor); return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } @@ -405,22 +386,22 @@ public ProcessContinuation processElement( WatermarkEstimator watermarkEstimator, MultiOutputReceiver receiver) throws Exception { - final LoadingCache avgRecordSize = - Preconditions.checkStateNotNull(this.avgRecordSize); + final LoadingCache avgRecordSizeCache = + Preconditions.checkStateNotNull(this.avgRecordSizeCache); + final LoadingCache offsetEstimatorCache = + Preconditions.checkStateNotNull(this.offsetEstimatorCache); final Deserializer keyDeserializerInstance = Preconditions.checkStateNotNull(this.keyDeserializerInstance); final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); + final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); + final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); + // TODO: Metrics should be reported per split instead of partition, add bootstrap server hash? final Distribution rawSizes = - Metrics.distribution( - METRIC_NAMESPACE, - RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString()); - for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { - Gauge backlog = - Metrics.gauge( - METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey()); - backlog.set(backlogSplit.getValue()); - } + Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString()); + final Gauge backlogBytes = + Metrics.gauge( + METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + topicPartition.toString()); // Stop processing current TopicPartition when it's time to stop. if (checkStopReadingFn != null @@ -438,13 +419,10 @@ public ProcessContinuation processElement( if (timestampPolicyFactory != null) { timestampPolicy = timestampPolicyFactory.createTimestampPolicy( - kafkaSourceDescriptor.getTopicPartition(), - Optional.ofNullable(watermarkEstimator.currentWatermark())); + topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark())); } - LOG.info( - "Creating Kafka consumer for process continuation for {}", - kafkaSourceDescriptor.getTopicPartition()); + LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor); try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { ConsumerSpEL.evaluateAssign( consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); @@ -518,8 +496,8 @@ public ProcessContinuation processElement( int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - avgRecordSize - .getUnchecked(kafkaSourceDescriptor.getTopicPartition()) + avgRecordSizeCache + .getUnchecked(kafkaSourceDescriptor) .update(recordSize, rawRecord.offset() - expectedOffset); rawSizes.update(recordSize); expectedOffset = rawRecord.offset() + 1; @@ -551,6 +529,15 @@ public ProcessContinuation processElement( } } } + + backlogBytes.set( + (long) + (BigDecimal.valueOf( + Preconditions.checkStateNotNull( + offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) + .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) + .doubleValue() + * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); } } } @@ -611,19 +598,44 @@ public Coder restrictionCoder() { @Setup public void setup() throws Exception { // Start to track record size and offset gap per bundle. - avgRecordSize = + avgRecordSizeCache = CacheBuilder.newBuilder() .maximumSize(1000L) .build( - new CacheLoader() { + new CacheLoader() { @Override - public AverageRecordSize load(TopicPartition topicPartition) throws Exception { + public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) + throws Exception { return new AverageRecordSize(); } }); keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true); valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false); - offsetEstimatorCache = new HashMap<>(); + offsetEstimatorCache = + CacheBuilder.newBuilder() + .weakValues() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build( + new CacheLoader() { + @Override + public KafkaLatestOffsetEstimator load( + KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception { + LOG.info( + "Creating Kafka consumer for offset estimation for {}", + kafkaSourceDescriptor); + + TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); + Map updatedConsumerConfig = + overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); + Consumer offsetConsumer = + consumerFactoryFn.apply( + KafkaIOUtils.getOffsetConsumerConfig( + "tracker-" + topicPartition, + offsetConsumerConfig, + updatedConsumerConfig)); + return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition); + } + }); if (checkStopReadingFn != null) { checkStopReadingFn.setup(); } @@ -645,7 +657,7 @@ public void teardown() throws Exception { } if (offsetEstimatorCache != null) { - offsetEstimatorCache.clear(); + offsetEstimatorCache.invalidateAll(); } if (checkStopReadingFn != null) { checkStopReadingFn.teardown();