-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KafkaIO] Decouple consumer threads from harness threads #32986
base: master
Are you sure you want to change the base?
[KafkaIO] Decouple consumer threads from harness threads #32986
Conversation
1c56d7c
to
11fde0e
Compare
11fde0e
to
0fd21b5
Compare
I'm interested in reviewing since I was looking at performance of this earlier. Let me know if you'd like me to take a pass in the current state or wait. Thanks! looking forward to these improvements. |
0fd21b5
to
27f473a
Compare
Feel free to take a peek @scwhittle! 😃 I had some inspiration on Sunday and revised the approach to thread-safe sharing of a Kafka consumer after hitting that 1 GiB/s bottleneck. The new Kafka's info logs are verbose enough to make out that Kafka consumers on various workers are frequently assigned more than one partition. The image below is from a throughput test using 256-512 cores (n2d-standard-2) to read from 500 partitions that are filled by a producer at ~10 GiB/s and the time mark highlights the moment at which the pipeline had scaled up from 256 to 512 cores. I seem to be hitting a cap at 2.5 GiB/s at the moment when I run this test with a shuffle step as a simple IO sink. This same workload using the unmodified source implementation on Dataflow's Runner V1 reports 2.5-3 GiB/s after scaling up. Without an IO sink both of them are able to process 10 GiB/s with comparable behavior throughout the pipeline's lifetime. The |
27f473a
to
69853c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some initial comments, didn't get through everything
private static final Map<Long, LoadingCache<KafkaSourceDescriptor, AverageRecordSize>> | ||
AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>(); | ||
private static final Map< | ||
Long, LoadingCache<Optional<ImmutableSet<String>>, ConsumerExecutionContext>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on keys of map and loading cache
} | ||
|
||
static final class TopicPartitionPollState implements AutoCloseable { | ||
private static final List<ConsumerRecord<byte[], byte[]>> CLOSED_SENTINEL = Arrays.asList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe ImmutableList.of() would be clearer this isn't to be modified
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Outdated
Show resolved
Hide resolved
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
final class ConcurrentConsumer<K, V> implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An overview comment would be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I've been putting off the documentation since I had a few variations of this change running in tandem.
final Supplier<Metric> metric = | ||
this.partitionRecordsLag.getOrDefault(topicPartition, this.recordsLagMax); | ||
try { | ||
return ((Number) metric.get().metricValue()).longValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle null explicitly without exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
} | ||
|
||
@Nullable | ||
OffsetAndTimestamp initialOffsetForTime(final TopicPartition topicPartition, final long time) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is units for time? can Instant be used here and changed to long internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in milliseconds, the types requested by the ConcurrentConsumer
wrapper match the client library where possible: https://kafka.apache.org/38/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)
this.partitionRecordsLag.computeIfAbsent( | ||
topicPartition, | ||
k -> | ||
Suppliers.memoize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar concern here, what if metrics aren't updated or it is missing on original call?
private final KafkaSourceDescriptor sourceDescriptor; | ||
private final LoadingCache<Optional<ImmutableSet<String>>, ConcurrentConsumer<byte[], byte[]>> | ||
consumerExecutionContextCache; | ||
private @MonotonicNonNull ConcurrentConsumer<byte[], byte[]> consumerExecutionContextInstance; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't appear to be used to avoid lookups in the cache. Should it be removed or should the logic be changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The context cache stores values as weak references so it's mainly there to retain a strong reference to a healthy context. The intent is to make a context eligible for eviction from this cache when the last remaining bundle processor referring to this ReadFromKafkaDoFn
instance has been evicted and eventually collected to minimize unnecessary evictions from the context cache that would result from a time or size based eviction policy.
The use of this field is a mess though so I'll clean that up. 👍
public long estimate() { | ||
return memoizedBacklog.get(); | ||
final long position = | ||
this.consumerExecutionContextInstance.position(this.sourceDescriptor.getTopicPartition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use consumerExecutionContext instead of this.consumerExecutionContext
Allows a Kafka consumer to be used for bundled assignments. By decoupling consumers from splits and running them in separate threads the harness thread should be blocked less by the creation of network connections or outstanding polls. The consumer thread may prefetch a batch of records while the harness thread is processing the current record batch. Multiplexing assigned
TopicPartition
s onto a single consumer may improve utilization of the network connection. A follow up PR may introduce consumer pools for cases where a single consumer would become a bottleneck.These changes to KafkaIO's SDF should meet or exceed the throughput performance of the unbounded source implementation. Attached are the throughput and backlog bytes graphs, shown on the left is KafkaIO's unbounded source on Dataflow (legacy) and shown on the right is KafkaIO's SDF with these changes on Dataflow (portability). The input is produced in GCP by a n2d-standard-16 machine at a rate of ~110 MiB/s to a topic with 500 partitions in a cluster hosted on Google Cloud Managed Service for Apache Kafka. The pipelines use a pool of up to 8 n2d-standard-2 machines each and the pipeline on the left was intentionally configured to not quite catch up with its backlog. It's possible that applying these changes to the unbounded source implementation may result in a slight uplift to throughput performance there as well.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.