From eadde1439e6663e02ed4e2ca5bafa6448f1b98eb Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 24 Oct 2024 17:55:33 +0200 Subject: [PATCH] Share AvgRecordSize and KafkaLatestOffsetEstimator caches among DoFns --- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 113 ++++++++++++------ 1 file changed, 75 insertions(+), 38 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index a90361e9641..902a2bde421 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -28,8 +28,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -62,6 +64,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -207,6 +210,23 @@ private ReadFromKafkaDoFn( private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class); + /** + * A holder class for all construction time unique instances of {@link ReadFromKafkaDoFn}. Caches + * must run clean up tasks when {@link #teardown()} is called. + */ + private static final class SharedStateHolder { + + private static final Map> + OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>(); + private static final Map> + AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>(); + } + + private static final AtomicLong FN_ID = new AtomicLong(); + + // A unique identifier for the instance. Generally unique unless the ID generator overflows. + private final long fnId = FN_ID.getAndIncrement(); + private final @Nullable Map offsetConsumerConfig; private final @Nullable CheckStopReadingFn checkStopReadingFn; @@ -601,43 +621,56 @@ public Coder restrictionCoder() { public void setup() throws Exception { // Start to track record size and offset gap per bundle. avgRecordSizeCache = - CacheBuilder.newBuilder() - .maximumSize(1000L) - .build( - new CacheLoader() { - @Override - public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) - throws Exception { - return new AverageRecordSize(); - } - }); + SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent( + fnId, + k -> { + return CacheBuilder.newBuilder() + .maximumSize(1000L) + .build( + new CacheLoader() { + @Override + public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) + throws Exception { + return new AverageRecordSize(); + } + }); + }); keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true); valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false); offsetEstimatorCache = - CacheBuilder.newBuilder() - .weakValues() - .expireAfterAccess(1, TimeUnit.MINUTES) - .build( - new CacheLoader() { - @Override - public KafkaLatestOffsetEstimator load( - KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception { - LOG.info( - "Creating Kafka consumer for offset estimation for {}", - kafkaSourceDescriptor); - - TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - Map updatedConsumerConfig = - overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); - Consumer offsetConsumer = - consumerFactoryFn.apply( - KafkaIOUtils.getOffsetConsumerConfig( - "tracker-" + topicPartition, - offsetConsumerConfig, - updatedConsumerConfig)); - return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition); - } - }); + SharedStateHolder.OFFSET_ESTIMATOR_CACHE.computeIfAbsent( + fnId, + k -> { + final Map consumerConfig = ImmutableMap.copyOf(this.consumerConfig); + final @Nullable Map offsetConsumerConfig = + this.offsetConsumerConfig == null + ? null + : ImmutableMap.copyOf(this.offsetConsumerConfig); + return CacheBuilder.newBuilder() + .weakValues() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build( + new CacheLoader() { + @Override + public KafkaLatestOffsetEstimator load( + KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception { + LOG.info( + "Creating Kafka consumer for offset estimation for {}", + kafkaSourceDescriptor); + + TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); + Map updatedConsumerConfig = + overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); + Consumer offsetConsumer = + consumerFactoryFn.apply( + KafkaIOUtils.getOffsetConsumerConfig( + "tracker-" + topicPartition, + offsetConsumerConfig, + updatedConsumerConfig)); + return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition); + } + }); + }); if (checkStopReadingFn != null) { checkStopReadingFn.setup(); } @@ -645,6 +678,10 @@ public KafkaLatestOffsetEstimator load( @Teardown public void teardown() throws Exception { + final LoadingCache avgRecordSizeCache = + Preconditions.checkStateNotNull(this.avgRecordSizeCache); + final LoadingCache offsetEstimatorCache = + Preconditions.checkStateNotNull(this.offsetEstimatorCache); try { if (valueDeserializerInstance != null) { Closeables.close(valueDeserializerInstance, true); @@ -657,13 +694,13 @@ public void teardown() throws Exception { } catch (Exception anyException) { LOG.warn("Fail to close resource during finishing bundle.", anyException); } - - if (offsetEstimatorCache != null) { - offsetEstimatorCache.invalidateAll(); - } if (checkStopReadingFn != null) { checkStopReadingFn.teardown(); } + + // Allow the cache to perform clean up tasks when this instance is about to be deleted. + avgRecordSizeCache.cleanUp(); + offsetEstimatorCache.cleanUp(); } private Map overrideBootstrapServersConfig(