diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 902a2bde421..3b5764fe351 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -19,9 +19,9 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.io.IOException; import java.math.BigDecimal; import java.math.MathContext; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -30,8 +30,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -41,6 +46,8 @@ import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.ExecutorOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; @@ -56,16 +63,18 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -73,6 +82,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.Deserializer; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -189,7 +199,6 @@ private ReadFromKafkaDoFn( ReadSourceDescriptors transform, TupleTag>> recordTag) { this.consumerConfig = transform.getConsumerConfig(); - this.offsetConsumerConfig = transform.getOffsetConsumerConfig(); this.keyDeserializerProvider = Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider()); this.valueDeserializerProvider = @@ -220,15 +229,208 @@ private static final class SharedStateHolder { OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>(); private static final Map> AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>(); + private static final Map< + Long, LoadingCache>, ConsumerExecutionContext>> + CONSUMER_EXECUTION_CONTEXT_CACHE = new ConcurrentHashMap<>(); + } + + static final class TopicPartitionState implements AutoCloseable { + private final AtomicBoolean closed; + private final TopicPartition topicPartition; + private final AtomicLong position; + private final AtomicLong endOffset; + private final AtomicReference deferredThrowable; + private final ListenableFuture restriction; + private final SynchronousQueue>> records; + private Optional>> pendingRecords; + + TopicPartitionState( + final KafkaSourceDescriptor kafkaSourceDescriptor, + final AtomicReference deferredThrowable, + final Optional optionalOffsetRange) { + this.closed = new AtomicBoolean(); + this.topicPartition = kafkaSourceDescriptor.getTopicPartition(); + this.position = new AtomicLong(-1); + this.endOffset = new AtomicLong(-1); + this.deferredThrowable = deferredThrowable; + this.restriction = + optionalOffsetRange.map(Futures::immediateFuture).orElseGet(SettableFuture::create); + this.pendingRecords = Optional.empty(); + this.records = new SynchronousQueue<>(); + } + + boolean isClosed() { + return this.closed.get(); + } + + TopicPartition getTopicPartition() { + return this.topicPartition; + } + + boolean hasExistingTopicPartition() { + return this.existingTopicPartition.get(); + } + + long getPosition() throws Throwable { + Throwable deferredThrowable = this.deferredThrowable.get(); + if (deferredThrowable != null) { + throw deferredThrowable; + } + return this.position.get(); + } + + long setPosition(long position) { + this.position.set(position); + } + + long getEndOffset() throws Throwable { + Throwable deferredThrowable = this.deferredThrowable.get(); + if (deferredThrowable != null) { + throw deferredThrowable; + } + return this.endOffset.get(); + } + + void setEndOffset(long endOffset) { + this.endOffset.set(endOffset); + } + + OffsetRange getInitialRestriction() throws Throwable { + return this.restriction.get(); + } + + List> getRecords() throws Throwable { + Throwable deferredThrowable = this.deferredThrowable.get(); + if (deferredThrowable != null) { + throw deferredThrowable; + } + return this.records.poll(1, TimeUnit.SECONDS); + } + + boolean offerPendingRecords() { + this.pendingRecords = + this.pendingRecords.filter(rs -> !rs.isEmpty() && this.records.offer(rs)); + return !this.pendingRecords.isPresent(); + } + + boolean offerRecords(List> records) { + assert !this.pendingRecords.isPresent(); + + if (this.records.offer(records)) { + return true; + } else { + this.pendingRecords = Optional.of(records); + return false; + } + } + + @Override + public void close() { + this.closed.set(true); + } + } + + static final class ConsumerExecutionContext implements Runnable { + AtomicBoolean running = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Consumer consumer; + AtomicReference deferredThrowable; + PriorityBlockingQueue queue; + + ConsumerExecutionContext(Consumer consumer) { + this.consumer = consumer; + this.deferredThrowable = new AtomicReference<>(); + this.queue = new PriorityBlockingQueue<>(); + this.unassignments = new PriorityBlockingQueue<>(); + } + + @Override + public void run() { + if (!running.compareAndSet(false, true)) { + return; + } + try { + Set splits = new HashSet<>(); + while (true) { + // Poll with timeout while empty + + Set newSplits = new HashSet<>(); + + if (splits.isEmpty()) { + @Nullable TopicPartitionState split = queue.poll(1, TimeUnit.MINUTES); + if (split == null) { + // Received no response within timeout + continue; + } + newSplits.add(split); + } else { + queue.drainTo(newSplits); + } + + consumer.assign( + splits.stream() + .map(TopicPartitionState::getTopicPartition) + .collect(Collectors.toSet())); + + if (consumer.assignment().isEmpty()) { + continue; + } + + // consumer.position(); + + try { + ConsumerRecords batch = consumer.poll(1000); + Map endOffsets = consumer.endOffsets(consumer.assignment()); + splits.stream() + .parallel() + .filter(state -> endOffsets.containsKey(state.getTopicPartition())) + .forEach( + state -> state.setEndOffset((long) endOffsets.get(state.getTopicPartition()))); + consumer.pause( + splits.stream() + .parallel() + .filter(state -> !state.offerRecords(batch.records(state.getTopicPartition()))) + .map(TopicPartitionState::getTopicPartition) + .collect(Collectors.toList())); + } catch (WakeupException e) { + // TODO: continue + } + } + } catch (Throwable t) { + this.deferredThrowable.set(t); + } finally { + done.set(true); + consumer.close(); + } + } + + public boolean isRunning() { + return running.get(); + } + + public boolean isDone() { + return done.get(); + } + + public TopicPartitionState assign( + KafkaSourceDescriptor kafkaSourceDescriptor, Optional offsetRange) { + TopicPartitionState state = + new TopicPartitionState( + kafkaSourceDescriptor, this.deferredThrowable, Optional.of(offsetRange)); + assert this.queue.offer(state); + return state; + } + + public void wakeup() {} } private static final AtomicLong FN_ID = new AtomicLong(); + private static final Joiner COMMA_JOINER = Joiner.on(','); + // A unique identifier for the instance. Generally unique unless the ID generator overflows. private final long fnId = FN_ID.getAndIncrement(); - private final @Nullable Map offsetConsumerConfig; - private final @Nullable CheckStopReadingFn checkStopReadingFn; private final SerializableFunction, Consumer> @@ -245,11 +447,21 @@ private static final class SharedStateHolder { // Valid between bundle start and bundle finish. private transient @Nullable Deserializer keyDeserializerInstance = null; private transient @Nullable Deserializer valueDeserializerInstance = null; - private transient @Nullable LoadingCache - offsetEstimatorCache; + // Only used to retain a strong reference to the consumer execution context until this function + // instance is torn down. + // This ties the lifetime of the consumer execution context to that of the bundle processor (or + // equivalent for non-portable runners). + // The consumer execution context cache stores weak references to consumer execution contexts, + // thus allowing the garbage collector to finalize the consumer execution context when no strong + // references to it are held. + @SuppressWarnings("unused") + private transient @Nullable ConsumerExecutionContext consumerExecutionContextInstance = null; + + private transient LoadingCache avgRecordSizeCache; + + private transient LoadingCache>, ConsumerExecutionContext> + consumerExecutionContextCache; - private transient @Nullable LoadingCache - avgRecordSizeCache; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @@ -267,43 +479,20 @@ private static final class SharedStateHolder { private static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator { - private final Consumer offsetConsumer; - private final TopicPartition topicPartition; - private final Supplier memoizedBacklog; - - KafkaLatestOffsetEstimator( - Consumer offsetConsumer, TopicPartition topicPartition) { - this.offsetConsumer = offsetConsumer; - this.topicPartition = topicPartition; - memoizedBacklog = - Suppliers.memoizeWithExpiration( - () -> { - synchronized (offsetConsumer) { - return Preconditions.checkStateNotNull( - offsetConsumer - .endOffsets(Collections.singleton(topicPartition)) - .get(topicPartition), - "No end offset found for partition %s.", - topicPartition); - } - }, - 1, - TimeUnit.SECONDS); - } + private final TopicPartitionState topicPartitionState; - @Override - protected void finalize() { - try { - Closeables.close(offsetConsumer, true); - LOG.info("Offset Estimator consumer was closed for {}", topicPartition); - } catch (Exception anyException) { - LOG.warn("Failed to close offset consumer for {}", topicPartition); - } + KafkaLatestOffsetEstimator(TopicPartitionState topicPartitionState) { + this.topicPartitionState = topicPartitionState; } @Override public long estimate() { - return memoizedBacklog.get(); + long endOffset = topicPartitionState.endOffset.get(); + checkState( + endOffset != -1, + "No end offset found for partition %s.", + topicPartitionState.getTopicPartition()); + return endOffset; } } @@ -313,7 +502,8 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); TopicPartition partition = kafkaSourceDescriptor.getTopicPartition(); LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor); - try (Consumer offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) { + try (TopicPartitionState topicPartitionState = + consumerExecutionContext.assign(kafkaSourceDescriptor, Optional.empty())) { ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition)); long startOffset; @Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime(); @@ -332,14 +522,14 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource } else if (stopReadTime != null) { endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, stopReadTime); } - new OffsetRange(startOffset, endOffset); + OffsetRange offsetRange = new OffsetRange(startOffset, endOffset); Lineage.getSources() .add( "kafka", ImmutableList.of( (String) updatedConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), partition.topic()))); - return new OffsetRange(startOffset, endOffset); + return offsetRange; } } @@ -375,21 +565,32 @@ public double getSize( @NewTracker public OffsetRangeTracker restrictionTracker( - @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) + @Element KafkaSourceDescriptor kafkaSourceDescriptor, + @Restriction OffsetRange restriction, + PipelineOptions options) throws ExecutionException { if (restriction.getTo() < Long.MAX_VALUE) { return new OffsetRangeTracker(restriction); } - - // OffsetEstimators are cached for each topic-partition because they hold a stateful connection, - // so we want to minimize the amount of connections that we start and track with Kafka. Another - // point is that it has a memoized backlog, and this should make that more reusable estimations. - final LoadingCache offsetEstimatorCache = - Preconditions.checkStateNotNull(this.offsetEstimatorCache); - final KafkaLatestOffsetEstimator offsetEstimator = - offsetEstimatorCache.get(kafkaSourceDescriptor); - - return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); + Optional> consumerExecutionContextKey = + ImmutableSet.of(kafkaSourceDescriptor.getBootStrapServers()); + ConsumerExecutionContext consumerExecutionContext = + this.consumerExecutionContextCache.get(consumerExecutionContextKey); + if (consumerExecutionContext.isDone()) { + this.consumerExecutionContextCache.refresh(consumerExecutionContextKey); + consumerExecutionContext = + this.consumerExecutionContextCache.get(consumerExecutionContextKey); + } + if (!consumerExecutionContext.isRunning()) { + options + .as(ExecutorOptions.class) + .getScheduledExecutorService() + .submit(consumerExecutionContext); + } + return new GrowableOffsetRangeTracker( + restriction.getFrom(), + new KafkaLatestOffsetEstimator( + consumerExecutionContext.assign(kafkaSourceDescriptor), Optional.of(restriction))); } @ProcessElement @@ -398,17 +599,13 @@ public ProcessContinuation processElement( RestrictionTracker tracker, WatermarkEstimator watermarkEstimator, MultiOutputReceiver receiver) - throws Exception { - final LoadingCache avgRecordSizeCache = - Preconditions.checkStateNotNull(this.avgRecordSizeCache); - final LoadingCache offsetEstimatorCache = - Preconditions.checkStateNotNull(this.offsetEstimatorCache); + throws Throwable { final Deserializer keyDeserializerInstance = Preconditions.checkStateNotNull(this.keyDeserializerInstance); final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); + final AverageRecordSize avgRecordSize = this.avgRecordSizeCache.get(kafkaSourceDescriptor); // TODO: Metrics should be reported per split instead of partition, add bootstrap server hash? final Distribution rawSizes = Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString()); @@ -424,8 +621,24 @@ public ProcessContinuation processElement( tracker.tryClaim(tracker.currentRestriction().getTo() - 1); return ProcessContinuation.stop(); } - Map updatedConsumerConfig = - overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); + + Optional> consumerExecutionContextKey = + Optional.ofNullable(kafkaSourceDescriptor.getBootStrapServers()).map(ImmutableSet::copyOf); + ConsumerExecutionContext consumerExecutionContext = + this.consumerExecutionContextCache.get(consumerExecutionContextKey); + if (consumerExecutionContext.isDone()) { + this.consumerExecutionContextCache.refresh(consumerExecutionContextKey); + consumerExecutionContext = + this.consumerExecutionContextCache.get(consumerExecutionContextKey); + } + if (!consumerExecutionContext.isRunning()) { + options + .as(ExecutorOptions.class) + .getScheduledExecutorService() + .submit(consumerExecutionContext); + } + this.consumerExecutionContextInstance = consumerExecutionContext; + // If there is a timestampPolicyFactory, create the TimestampPolicy for current // TopicPartition. TimestampPolicy timestampPolicy = null; @@ -436,25 +649,20 @@ public ProcessContinuation processElement( } LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor); - try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { - ConsumerSpEL.evaluateAssign( - consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); + + try (TopicPartitionState topicPartitionState = + consumerExecutionContext.assign(kafkaSourceDescriptor)) { long startOffset = tracker.currentRestriction().getFrom(); long expectedOffset = startOffset; - consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset); ConsumerRecords rawRecords = ConsumerRecords.empty(); long skippedRecords = 0L; final Stopwatch sw = Stopwatch.createStarted(); - while (true) { - rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); + while (!topicPartitionState.isClosed()) { + rawRecords = topicPartitionState.getRecords(); // When there are no records available for the current TopicPartition, self-checkpoint // and move to process the next element. if (rawRecords.isEmpty()) { - if (!topicPartitionExists( - kafkaSourceDescriptor.getTopicPartition(), consumer.listTopics())) { - return ProcessContinuation.stop(); - } if (timestampPolicy != null) { updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); } @@ -494,6 +702,7 @@ public ProcessContinuation processElement( if (!tracker.tryClaim(rawRecord.offset())) { return ProcessContinuation.stop(); } + topicPartitionState.setPosition(rawRecord.offset()); try { KafkaRecord kafkaRecord = new KafkaRecord<>( @@ -545,13 +754,13 @@ public ProcessContinuation processElement( backlogBytes.set( (long) avgRecordSize.getTotalSize( - BigDecimal.valueOf( - Preconditions.checkStateNotNull( - offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) + BigDecimal.valueOf(topicPartitionState.getEndOffset()) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue())); } } + + return ProcessContinuation.stop(); } private boolean topicPartitionExists( @@ -619,58 +828,8 @@ public Coder restrictionCoder() { @Setup public void setup() throws Exception { - // Start to track record size and offset gap per bundle. - avgRecordSizeCache = - SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent( - fnId, - k -> { - return CacheBuilder.newBuilder() - .maximumSize(1000L) - .build( - new CacheLoader() { - @Override - public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) - throws Exception { - return new AverageRecordSize(); - } - }); - }); keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true); valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false); - offsetEstimatorCache = - SharedStateHolder.OFFSET_ESTIMATOR_CACHE.computeIfAbsent( - fnId, - k -> { - final Map consumerConfig = ImmutableMap.copyOf(this.consumerConfig); - final @Nullable Map offsetConsumerConfig = - this.offsetConsumerConfig == null - ? null - : ImmutableMap.copyOf(this.offsetConsumerConfig); - return CacheBuilder.newBuilder() - .weakValues() - .expireAfterAccess(1, TimeUnit.MINUTES) - .build( - new CacheLoader() { - @Override - public KafkaLatestOffsetEstimator load( - KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception { - LOG.info( - "Creating Kafka consumer for offset estimation for {}", - kafkaSourceDescriptor); - - TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - Map updatedConsumerConfig = - overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); - Consumer offsetConsumer = - consumerFactoryFn.apply( - KafkaIOUtils.getOffsetConsumerConfig( - "tracker-" + topicPartition, - offsetConsumerConfig, - updatedConsumerConfig)); - return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition); - } - }); - }); if (checkStopReadingFn != null) { checkStopReadingFn.setup(); } @@ -680,8 +839,6 @@ public KafkaLatestOffsetEstimator load( public void teardown() throws Exception { final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); - final LoadingCache offsetEstimatorCache = - Preconditions.checkStateNotNull(this.offsetEstimatorCache); try { if (valueDeserializerInstance != null) { Closeables.close(valueDeserializerInstance, true); @@ -700,7 +857,6 @@ public void teardown() throws Exception { // Allow the cache to perform clean up tasks when this instance is about to be deleted. avgRecordSizeCache.cleanUp(); - offsetEstimatorCache.cleanUp(); } private Map overrideBootstrapServersConfig( @@ -717,6 +873,61 @@ private Map overrideBootstrapServersConfig( return config; } + private void writeObject(java.io.ObjectOutputStream stream) { + stream.defaultWriteObject(); + stream.writeLong(FN_ID.getAndIncrement()); + } + + private void readObject(java.io.ObjectInputStream stream) + throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + this.fnId = stream.readLong(); + // Start to track record size and offset gap per bundle. + this.avgRecordSizeCache = + SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent( + fnId, + k -> { + return CacheBuilder.newBuilder() + .maximumSize(1000L) + .build( + new CacheLoader() { + @Override + public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) + throws Exception { + return new AverageRecordSize(); + } + }); + }); + this.consumerExecutionContextCache = + SharedStateHolder.CONSUMER_EXECUTION_CONTEXT_CACHE.computeIfAbsent( + fnId, + k -> { + return CacheBuilder.newBuilder() + .weakValues() + .build( + new CacheLoader>, ConsumerExecutionContext>() { + @Override + public ConsumerExecutionContext load( + Optional> optionalBootstrapServers) + throws Exception { + final Map consumerConfig = + new HashMap<>(ReadFromKafkaDoFn.this.consumerConfig); + ImmutableSet bootstrapServers; + if (optionalBootstrapServers.isPresent() + && (bootstrapServers = optionalBootstrapServers.get()).size() > 0) { + consumerConfig.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + COMMA_JOINER.join(bootstrapServers)); + } + checkState( + consumerConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + return new ConsumerExecutionContext( + ReadFromKafkaDoFn.this.consumerFactoryFn.apply(consumerConfig)); + } + }); + }); + } + private static class AverageRecordSize { private MovingAvg avgRecordSize; private MovingAvg avgRecordGap;