Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imbalanced kafka source actors' throughput when running nexmark benchmark #5214

Open
Tracked by #7289
lmatz opened this issue Sep 8, 2022 · 21 comments
Open
Tracked by #7289
Assignees
Labels
component/connector component/streaming Stream processing related issue. type/enhancement Improvements to existing implementation. type/perf

Comments

@lmatz
Copy link
Contributor

lmatz commented Sep 8, 2022

This is a simple Nexmark q10 query with 12 parallelism on a single computing node with 16 CPU.

The number of records in each Kafka partition:
SCR-20220908-fot (1)

The throughput of each source connector:
SCR-20220908-fqq

If the speed of source connectors does not match with each other,
and if the job has event time and watermark,
then the job will probably throw away more events than usual
because one partition may make the watermark progress quickly
and thus other partitions' events get discarded more aggressively.

Some random ideas:
throttle some source actors by considering both the current throughput and the value of the event timestamp.

@github-actions github-actions bot added this to the release-0.1.13 milestone Sep 8, 2022
@lmatz lmatz removed this from the release-0.1.13 milestone Sep 8, 2022
@lmatz lmatz added type/enhancement Improvements to existing implementation. component/streaming Stream processing related issue. component/connector type/perf labels Sep 8, 2022
@BugenZhao
Copy link
Member

Is the throughput able to be automatically synced with back pressure? Note that the current bound of the connector message buffer of 512 chunks is too large.

const CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 512;

@lmatz
Copy link
Contributor Author

lmatz commented Sep 8, 2022

Some more context
The plan is:

StreamMaterialize { columns: [auction, bidder, price, date_time, date, time, _row_id(hidden)], pk_columns: [_row_id] }
   StreamExchange { dist: HashShard(_row_id) }
     StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 1:Int32), Field(bid, 2:Int32), Field(bid, 5:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), ToChar(Field(bid, 5:Int32), 'HH:MI':Varchar), _row_id] }
       StreamSource { source: "nexmark_source", columns: [event_type, person, auction, bid, _row_id] }

But both StreamMaterialize and StreamExchange do nothing in this case as their code of processing data chunk is commented out.

Every actor (with its corresponding source) should be on its own.

@fuyufjh
Copy link
Member

fuyufjh commented Sep 8, 2022

throttle some source actors by considering both the current throughput and the value of the event timestamp.

Should it be applied to NexMark source only or all kinds of sources?

@lmatz
Copy link
Contributor Author

lmatz commented Sep 8, 2022

throttle some source actors by considering both the current throughput and the value of the event timestamp.

Should it be applied to NexMark source only or all kinds of sources?

Currently, it does not block doing benchmark on NexMark sources as the total throughput aggregated from all the source actors is normal,
I feel it should be applied to all kinds of sources as long as the job involves watermark?

@lmatz
Copy link
Contributor Author

lmatz commented Sep 9, 2022

Another example:
SCR-20220909-jqs

No matter high throughput or low throughput, both are quite stable.

@github-actions
Copy link
Contributor

github-actions bot commented Nov 9, 2022

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@BugenZhao
Copy link
Member

Will the situation be better after #6170? I'd like to try it out.

@BugenZhao
Copy link
Member

The throughputs are at least really balanced if 3 nodes running on the same host. 🤔

image

@fuyufjh fuyufjh added this to the release-0.1.14 milestone Nov 9, 2022
@lmatz
Copy link
Contributor Author

lmatz commented Nov 9, 2022

The setting I tested is a single-node setting, so there should be no remote exchange but only local exchange.
I am uncertain whether this should be expected to be solved in #6170?

@BugenZhao
Copy link
Member

The setting I tested is a single-node setting

That's weird. 😕

@lmatz
Copy link
Contributor Author

lmatz commented Nov 9, 2022

The setting I tested is a single-node setting

That's weird. 😕

No worries, let me track this issue manually when we have the performance dashboard.

So push to the release where performance dashboard is in place.

@lmatz lmatz modified the milestones: release-0.1.14, release-0.1.15 Nov 21, 2022
@lmatz lmatz removed this from the release-0.1.15 milestone Dec 19, 2022
mergify bot pushed a commit that referenced this issue Dec 26, 2022
Add a configurable parameter to limit the source throughput in terms of bytes.

This parameter may or may not(so not go into doc and release notes this time) be intended to be used by public users at the moment.

It is convenient for people to test resource utilization/performance for a fixed input throughput.(https://redpanda.com/blog/redpanda-vs-kafka-performance-benchmark#:~:text=3.2%20under%20workloads%20(-,up%20to%201GB/sec,-)%20that%20are%20common)
Otherwise, people have to always generate events by some source generators on the fly to Kafka/Redpanda at exactly the throughput you want, which may not be achievable and depends on many other factors.
Instead, we can pre-generate a lot and then limit the source throughput stably.

Also, it may be a workaround solution for #5214.

Approved-By: tabVersion
@BugenZhao
Copy link
Member

Related: #8451 (comment)

@lmatz lmatz assigned BugenZhao and unassigned lmatz Mar 22, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Mar 23, 2023

SCR-20230324-5m

seems pretty easy to reproduce, almost every day recently

@BugenZhao
Copy link
Member

Per some discussions with @liurenjie1024 and @hzxa21, I tend to think that this is an issue of Kafka (server or client, both possible) when the workload is heavy. 😟

  • When testing with Nexmark datagen sources, I cannot observe any imbalance. This is feasible as it's not natural to write some codes to make the performance of actors not balanced, if there's no interference with each other.
  • There's also no Tokio's I/O in this case, as there's no remote exchange, and rdkafka is also driven by its own async poll interface.

I'll keep investigating if I get a new idea, but I guess we need some metrics of the Kafka cluster in order to make things clearer. 🤔

@lmatz
Copy link
Contributor Author

lmatz commented Mar 27, 2023

The EBS bandwidth of the Kafka machine is likely limited.
SCR-20230327-hl2
SCR-20230327-hkx

The maximum bandwidth of GP2
and
the default bandwidth of GP3 are both smaller than the 298MB/s shown in the figure.

Regarding why the throughput during the second half does not go up, probably because there are some extra IO credits at the beginning, and then we need to pay them back. This is hust a guess.

@BugenZhao
Copy link
Member

Yes. I also suspected that the bottleneck between Kafka and the source leads to this issue. But there's another remaining problem of why there's the imbalanced performance for different partitions under disk throughput. 😄 Guess it's caused by the internal implementation of Kafka, which needs further investigation.

@lmatz
Copy link
Contributor Author

lmatz commented Mar 27, 2023

But the weird thing is that if looking at the history of q2, it is less severe than q0
q2 does more filtering than q0.

The good news is that q2 can achieve 1M rows/s if this phenomenon does not happen.

And also the client of Kafka since Flink seems not to suffer from the same problem. It uses Java native client while rdkafka is an independent implementation.

@BugenZhao
Copy link
Member

But the weird thing is that if looking at the history of q2, it is less severe than q0
q2 does more filtering than q0.

I took a quick look and found that this happens randomly for all queries that reach the throughput of around 800k rows/s, like q0 q1 q3 q10 q22. All imbalances occur when the bytes per sec is under 300 MB/s.

@shanicky
Copy link
Contributor

Can we get the split distribution of each actor? may be we can see the information from the logs.

@BugenZhao
Copy link
Member

Can we get the split distribution of each actor? may be we can see the information from the logs.

The splits are assigned equally. See the legends of "Source Throughput Per Partition". 🤔

@BugenZhao BugenZhao changed the title Imbalanced source actors' throughput may cause trouble when there is watermark Imbalanced kafka source actors' throughput when running nexmark benchmark Jun 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/connector component/streaming Stream processing related issue. type/enhancement Improvements to existing implementation. type/perf
Projects
None yet
Development

No branches or pull requests

4 participants