Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobid: 2024-10-03_13_46_39-12160241026546509250 with map global per worker explicit thread kill after timeout #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
Expand All @@ -52,8 +58,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
Expand All @@ -68,6 +72,7 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -237,53 +242,99 @@ private ReadFromKafkaDoFn(
@VisibleForTesting
static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX;

private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1;
private static final Map<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> partitionState =
new ConcurrentHashMap<>();
private static final UUID machineUUID = UUID.randomUUID();
/**
* A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to
* fetch backlog.
*/
private static class KafkaLatestOffsetEstimator
implements GrowableOffsetRangeTracker.RangeEndEstimator {

private final Consumer<byte[], byte[]> offsetConsumer;
private final TopicPartition topicPartition;
private final Supplier<Long> memoizedBacklog;
private boolean closed;

KafkaLatestOffsetEstimator(
Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) {
this.offsetConsumer = offsetConsumer;
this.topicPartition = topicPartition;
ConsumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition));
memoizedBacklog =
Suppliers.memoizeWithExpiration(
() -> {
synchronized (offsetConsumer) {
ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition);
return offsetConsumer.position(topicPartition);
}
},
1,
TimeUnit.SECONDS);
}

@Override
protected void finalize() {
try {
Closeables.close(offsetConsumer, true);
closed = true;
LOG.info("Offset Estimator consumer was closed for {}", topicPartition);
} catch (Exception anyException) {
LOG.warn("Failed to close offset consumer for {}", topicPartition);
private final AtomicLong endOffset = new AtomicLong(Long.MAX_VALUE);
private final KafkaSourceDescriptor kafkaSourceDescriptor;
private final Consumer<byte[], byte[]> localOffsetConsumer;
@Nullable ScheduledFuture<?> scheduledFuture;
ScheduledExecutorService offsetFetcherThread = Executors.newSingleThreadScheduledExecutor();
// private AtomicBoolean isStarted;
UUID instanceUUID = UUID.randomUUID();
Instant lastQueried = Instant.now();

public KafkaLatestOffsetEstimator(
KafkaSourceDescriptor kafkaSourceDescriptor, Consumer<byte[], byte[]> localOffsetConsumer) {
this.kafkaSourceDescriptor = kafkaSourceDescriptor;
this.localOffsetConsumer = localOffsetConsumer;

LOG.info(
"bzablockilog query for end position from constructor for {}, instanceUUID: {}, machineUUID: {}",
kafkaSourceDescriptor.getTopicPartition(),
instanceUUID,
machineUUID);

Long currentEndOffset =
localOffsetConsumer
.endOffsets(ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()))
.get(kafkaSourceDescriptor.getTopicPartition());

LOG.info(
"bzablockilog queried end pos from constructor for {}, offset: {}, instanceUUID: {}, machineUUID: {}",
kafkaSourceDescriptor.getTopicPartition(),
currentEndOffset,
instanceUUID,
machineUUID);
if (currentEndOffset != null) {
endOffset.set(currentEndOffset);
}
}

@Override
public long estimate() {
return memoizedBacklog.get();
lastQueried = Instant.now();
return endOffset.get();
}

// public void updateEndOffset(Long currentEndOffset) {
// endOffset.set(currentEndOffset);
// }

public void startThread() {
scheduledFuture =
offsetFetcherThread.scheduleAtFixedRate(
this::updateLatestOffsets, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
// this.isStarted.set(true);
}

private void updateLatestOffsets() {
if (new Duration(Instant.now(), lastQueried).toStandardSeconds().getSeconds() > 10) {
LOG.info(
"bzablockilog kill thread for {}, instanceUUID: {}, machineUUID: {}",
kafkaSourceDescriptor.getTopicPartition(),
instanceUUID,
machineUUID);
killThread();
}
LOG.info(
"bzablockilog query for end position for {}, instanceUUID: {}, machineUUID: {}",
kafkaSourceDescriptor.getTopicPartition(),
instanceUUID,
machineUUID);

Long currentEndOffset =
localOffsetConsumer
.endOffsets(ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()))
.get(kafkaSourceDescriptor.getTopicPartition());

if (currentEndOffset != null) {
endOffset.set(currentEndOffset);
}
}

public boolean isClosed() {
return closed;
public void killThread() {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
}

Expand All @@ -293,34 +344,52 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
LOG.info("Creating Kafka consumer for initial restriction for {}", partition);
try (Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));
long startOffset;
@Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
if (kafkaSourceDescriptor.getStartReadOffset() != null) {
startOffset = kafkaSourceDescriptor.getStartReadOffset();
} else if (startReadTime != null) {
startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, startReadTime);
} else {
startOffset = offsetConsumer.position(partition);
}

long endOffset = Long.MAX_VALUE;
@Nullable Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
if (kafkaSourceDescriptor.getStopReadOffset() != null) {
endOffset = kafkaSourceDescriptor.getStopReadOffset();
} else if (stopReadTime != null) {
endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, stopReadTime);
}
new OffsetRange(startOffset, endOffset);
Lineage.getSources()
.add(
"kafka",
ImmutableList.of(
(String) updatedConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), partition.topic())));
return new OffsetRange(startOffset, endOffset);
Map<String, Object> updatedConsumerConfig2 =
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + kafkaSourceDescriptor.getTopicPartition(),
offsetConsumerConfig,
updatedConsumerConfig);

Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig2);

ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));

long startOffset;
@Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
if (kafkaSourceDescriptor.getStartReadOffset() != null) {
startOffset = kafkaSourceDescriptor.getStartReadOffset();
} else if (startReadTime != null) {
startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, startReadTime);
} else {
startOffset = offsetConsumer.position(partition);
}

long endOffset = Long.MAX_VALUE;
@Nullable Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
if (kafkaSourceDescriptor.getStopReadOffset() != null) {
endOffset = kafkaSourceDescriptor.getStopReadOffset();
} else if (stopReadTime != null) {
endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, partition, stopReadTime);
}

// Fetch offsets once before running periodically.
LOG.info(
"bzablockilog end offset background thread entry for {} ",
kafkaSourceDescriptor.getTopicPartition());

KafkaLatestOffsetEstimator kafkaLatestOffsetEstimator =
partitionState.computeIfAbsent(
kafkaSourceDescriptor, k -> new KafkaLatestOffsetEstimator(k, offsetConsumer));
kafkaLatestOffsetEstimator.startThread();

Lineage.getSources()
.add(
"kafka",
ImmutableList.of(
(String) updatedConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), partition.topic())));
return new OffsetRange(startOffset, endOffset);
}

@GetInitialWatermarkEstimatorState
Expand Down Expand Up @@ -365,29 +434,70 @@ public OffsetRangeTracker restrictionTracker(
return new OffsetRangeTracker(restriction);
}

// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);
if (offsetEstimator == null || offsetEstimator.isClosed()) {
KafkaLatestOffsetEstimator rangeEndEstimator = partitionState.get(kafkaSourceDescriptor);
if (rangeEndEstimator == null) {
LOG.info(
"bzablockilog KafkaLatestOffsetEstimator not in cache for {}",
kafkaSourceDescriptor.getTopicPartition());
TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
LOG.info("Creating Kafka consumer for initial restriction for {}", partition);
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
Map<String, Object> updatedConsumerConfig2 =
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + kafkaSourceDescriptor.getTopicPartition(),
offsetConsumerConfig,
updatedConsumerConfig);

LOG.info("Creating Kafka consumer for offset estimation for {}", topicPartition);
Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig2);

Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition, offsetConsumerConfig, updatedConsumerConfig));
offsetEstimator = new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator);
}
ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));

LOG.info(
"bzablockilog NewTracker creating end offset background thread entry for {} ",
kafkaSourceDescriptor.getTopicPartition());

return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator);
rangeEndEstimator =
partitionState.computeIfAbsent(
kafkaSourceDescriptor, k -> new KafkaLatestOffsetEstimator(k, offsetConsumer));
rangeEndEstimator.startThread();

// throw new RuntimeException(
// "bzablockilog KafkaLatestOffsetEstimator for "
// + kafkaSourceDescriptor.getTopicPartition()
// + " doesn't exist.");
}
return new GrowableOffsetRangeTracker(restriction.getFrom(), rangeEndEstimator);

// // return new GrowableOffsetRangeTracker(restriction.getFrom(), new
// KafkaLatestOffsetEstimator(kafkaSourceDescriptor));
// // OffsetEstimators are cached for each topic-partition because they hold a stateful
// connection,
// // so we want to minimize the amount of connections that we start and track with Kafka.
// Another
// // point is that it has a memoized backlog, and this should make that more reusable
// estimations.
// final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
// Preconditions.checkStateNotNull(this.offsetEstimatorCache);
//
// TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
// KafkaLatestOffsetEstimator offsetEstimator =
// offsetEstimatorCacheInstance.get(topicPartition);
// if (offsetEstimator == null || offsetEstimator.isClosed()) {
// Map<String, Object> updatedConsumerConfig =
// overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
//
// LOG.info("Creating Kafka consumer for offset estimation for {}", topicPartition);
//
// Consumer<byte[], byte[]> offsetConsumer =
// consumerFactoryFn.apply(
// KafkaIOUtils.getOffsetConsumerConfig(
// "tracker-" + topicPartition, offsetConsumerConfig, updatedConsumerConfig));
// offsetEstimator = new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
// offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator);
// }
//
// return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator);
}

@ProcessElement
Expand Down Expand Up @@ -493,6 +603,19 @@ public ProcessContinuation processElement(
skippedRecords = 0L;
}
if (!tracker.tryClaim(rawRecord.offset())) {
KafkaLatestOffsetEstimator kafkaLatestOffsetEstimator =
partitionState.get(kafkaSourceDescriptor);
LOG.info(
"bzablockilog tryClaim failed for {} for offset {}. range was to {}. Trying to stop associated backgroundthread with kafkaLatestOffsetEstimator. Is it null? {}",
kafkaSourceDescriptor.getTopicPartition(),
rawRecord.offset(),
tracker.currentRestriction().getTo(),
kafkaLatestOffsetEstimator == null);
if (kafkaLatestOffsetEstimator != null) {
LOG.info("bzablockilog remove key for {}", kafkaSourceDescriptor.getTopicPartition());
kafkaLatestOffsetEstimator.killThread();
partitionState.remove(kafkaSourceDescriptor);
}
return ProcessContinuation.stop();
}
try {
Expand Down