Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KafkaIO] Decouple consumer threads from harness threads #32986

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.function.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ConcurrentConsumer<K, V> implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An overview comment would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I've been putting off the documentation since I had a few variations of this change running in tandem.

private static final Logger LOG = LoggerFactory.getLogger(ConcurrentConsumer.class);

private final ConsumerPhaser phaser;
private final Consumer<K, V> consumer;
private final Duration pollDuration;
private final AtomicLongMap<TopicPartition> times;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what kind of time? a better variable name could clarify

comment on keys, ditto for positions

private final AtomicLongMap<TopicPartition> positions;
private final Supplier<Metric> recordsLagMax;
private final Map<TopicPartition, Supplier<Metric>> partitionRecordsLag;
private ConsumerRecords<K, V> pollResult;
private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResult;

private final class ConsumerPhaser extends Phaser {
@Override
protected boolean onAdvance(final int phase, final int registeredParties) {
try {
final Map<TopicPartition, Long> positionsView = positions.asMap();
final Set<TopicPartition> prevAssignment = consumer.assignment();
final Set<TopicPartition> nextAssignment = positionsView.keySet();

if (!times.isEmpty()) {
offsetsForTimesResult = consumer.offsetsForTimes(times.asMap());
times.clear();
}

if (!prevAssignment.equals(nextAssignment)) {
consumer.assign(nextAssignment);
}

positionsView.forEach(
(tp, o) -> {
if (o == Long.MIN_VALUE) {
consumer.pause(Collections.singleton(tp));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better to just pause once by building up all partitions you want to pause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer.pause(Collection<TopicPartition>) iterates over the input collection, looks up the assignment in its subscription state table and then mutates the state. There's virtually no difference between N invocations of pause/resume for singleton collections or a single invocation for a collection of size N.

I reworked the phaser interaction a bit to deregister when a non-empty list of records for a partition is returned by Consumer.poll() so this can be rewritten as consumer.pause(consumerRecords.partitions()) which returns the set of partitions with available records. That addresses the throughput regression observed on larger machine types along with the rate jumping from ~2.5 GiB/s to ~3.1 GiB/s. I'm looking into a stall at the moment, likely a missed arrive or deregister at the phaser or a missed cache invalidation.

} else if (!prevAssignment.contains(tp)) {
consumer.seek(tp, o);
}
});

if (consumer.paused().size() != nextAssignment.size()) {
pollResult = consumer.poll(pollDuration.toMillis());
}

nextAssignment.forEach(tp -> positions.put(tp, consumer.position(tp)));
return false;
} catch (WakeupException e) {
if (!this.isTerminated()) {
LOG.error("Unexpected wakeup while running", e);
}
} catch (Exception e) {
LOG.error("Exception while reading from Kafka", e);
}
return true;
}
}

ConcurrentConsumer(final Consumer<K, V> consumer, final Duration pollDuration) {
this.phaser = new ConsumerPhaser();
this.consumer = consumer;
this.pollDuration = pollDuration;
this.times = AtomicLongMap.create();
this.positions = AtomicLongMap.create();
this.recordsLagMax =
Suppliers.memoize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see guarantees in kafka interface that same metric object is updated, or about when the metric woudl be created.

Perhaps it woudl be better to have some expiration here? Or at least don't cache a null if we attempt to memoize before the metric has been created in the consumer?

() ->
this.consumer.metrics().values().stream()
.filter(
m ->
"consumer-fetch-manager-metrics".equals(m.metricName().group())
&& "records-lag-max".equals(m.metricName().name())
&& !m.metricName().tags().containsKey("topic")
&& !m.metricName().tags().containsKey("partition"))
.findAny()
.get());
this.partitionRecordsLag = new ConcurrentHashMap<>();
this.pollResult = ConsumerRecords.empty();
this.offsetsForTimesResult = Collections.emptyMap();
}

@Override
public void close() {
this.phaser.forceTermination();
try {
this.consumer.wakeup();
this.consumer.close();
} catch (Exception e) {
LOG.error("Exception while closing Kafka consumer", e);
}
this.times.clear();
this.positions.clear();
this.pollResult = ConsumerRecords.empty();
this.offsetsForTimesResult = Collections.emptyMap();
}

boolean isClosed() {
return this.phaser.isTerminated();
}

Map<MetricName, ? extends Metric> metrics() {
return this.consumer.metrics();
}

long currentLagOrMaxLag(final TopicPartition topicPartition) {
final Supplier<Metric> metric =
this.partitionRecordsLag.getOrDefault(topicPartition, this.recordsLagMax);
try {
return ((Number) metric.get().metricValue()).longValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle null explicitly without exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

} catch (Exception e) {
return 0;
}
}

long position(final TopicPartition topicPartition) {
return this.positions.get(topicPartition) & Long.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on the purpose of &, looks like to protect against exposing negative value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but I don't think we should keep this style of pause/resume bookkeeping since it's the least intuitive way to express it for readers and maintainers. It's unlikely that the representation of partition offsets will ever switch from signed to unsigned integers in a future version of Kafka, but not introducing a dependency on that assumption is also one less thing to worry about in the future.

}

long initialOffsetForPartition(final TopicPartition topicPartition) {
// Offsets start at 0 and there is no position to advance to beyond Long.MAX_VALUE.
// The sign bit indicates that an assignment is paused.
checkState(this.phaser.register() >= 0);
this.positions.put(topicPartition, Long.MIN_VALUE);

// Synchronize and throw if the consumer was terminated in between.
checkState(this.phaser.arriveAndAwaitAdvance() >= 0);

// Removal will revoke the assignment when the phase advances.
final long result = this.positions.remove(topicPartition);

// Synchronize and ignore the consumer status since the result is already known.
this.phaser.arriveAndDeregister();

// Since Long.MIN_VALUE only has the sign bit set, this will return 0 as a default value if no
// position could be determined.
return result & Long.MAX_VALUE;
}

@Nullable
OffsetAndTimestamp initialOffsetForTime(final TopicPartition topicPartition, final long time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is units for time? can Instant be used here and changed to long internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in milliseconds, the types requested by the ConcurrentConsumer wrapper match the client library where possible: https://kafka.apache.org/38/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)

// Request the offset closest to the provided time.
checkState(this.phaser.register() >= 0);
this.times.put(topicPartition, time);

// Synchronize and throw if the consumer was terminated in between.
checkState(this.phaser.arriveAndAwaitAdvance() >= 0);
final Map<TopicPartition, OffsetAndTimestamp> result = this.offsetsForTimesResult;

// Synchronize and ignore the consumer status since the result is already known.
this.phaser.arriveAndDeregister();

return result.get(topicPartition);
}

void assignAndSeek(final TopicPartition topicPartition, final long offset) {
checkState(this.phaser.register() >= 0);

this.positions.put(topicPartition, offset);
this.partitionRecordsLag.computeIfAbsent(
topicPartition,
k ->
Suppliers.memoize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar concern here, what if metrics aren't updated or it is missing on original call?

() ->
this.consumer.metrics().values().stream()
.filter(
m ->
"consumer-fetch-manager-metrics".equals(m.metricName().group())
&& "records-lag-max".equals(m.metricName().name())
&& k.topic()
.replace('.', '_')
.equals(m.metricName().tags().get("topic"))
&& Integer.toString(k.partition())
.equals(m.metricName().tags().get("partition")))
.findAny()
.get()));
}

List<ConsumerRecord<K, V>> poll(final TopicPartition topicPartition) {
checkState(this.phaser.arriveAndAwaitAdvance() >= 0);

return this.pollResult.records(topicPartition);
}

void unassign(final TopicPartition topicPartition) {
this.positions.remove(topicPartition);
this.partitionRecordsLag.remove(topicPartition);

this.phaser.arriveAndDeregister();
}
}
Loading
Loading