Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: fall back to franz-go for ongoing fetching #9500

Conversation

dimitarvdimitrov
Copy link
Contributor

What this PR does

Striking the right configuration for ongoing fetching depends a lot on the characteristics of cluster. franz-go is better at adaptive concurrency, so we fall back to it until we've implemented adaptive concurrency and/or records-per-fetch.

Which issue(s) this PR fixes or relates to

Fixes #

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

@dimitarvdimitrov dimitarvdimitrov requested review from tacole02 and a team as code owners October 2, 2024 15:14
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingest/replay-speed/use-franz-go-for-ongoing-fetching branch from f9627dd to bd2a078 Compare October 2, 2024 15:17
@@ -1440,6 +1440,125 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {
})
}
})

t.Run("should read target lag and then consume more records after switching to 0 ongoing concurrency if position=start, startup_fetch_concurrency=2, ongoing_fetch_concurrency=0", func(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is effectively a copy of another test case above but with configured startup and ongoing concurrency

	t.Run("should consume partition from start if position=start, and wait until target lag is honored, and then consume some records after lag is honored", func(t *testing.T) {

@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingest/replay-speed/use-franz-go-for-ongoing-fetching branch from bd2a078 to d8dcfc7 Compare October 2, 2024 15:39
pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved
@dimitarvdimitrov dimitarvdimitrov enabled auto-merge (squash) October 2, 2024 16:38
@dimitarvdimitrov dimitarvdimitrov enabled auto-merge (squash) October 2, 2024 16:39
Striking the right configuration for ongoing fetching depends a lot on the characteristics of cluster. franz-go is better at adaptive concurrency, so we fall back to it until we've implemented adaptive concurrency and/or records-per-fetch.

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingest/replay-speed/use-franz-go-for-ongoing-fetching branch from de335b8 to e376f6b Compare October 3, 2024 07:56
@dimitarvdimitrov dimitarvdimitrov merged commit 3ea4cf4 into main Oct 3, 2024
29 checks passed
@dimitarvdimitrov dimitarvdimitrov deleted the dimitar/ingest/replay-speed/use-franz-go-for-ongoing-fetching branch October 3, 2024 08:13
grafanabot pushed a commit that referenced this pull request Oct 3, 2024
* kafka replay speed: fall back to franz-go for ongoing fetching

Striking the right configuration for ongoing fetching depends a lot on the characteristics of cluster. franz-go is better at adaptive concurrency, so we fall back to it until we've implemented adaptive concurrency and/or records-per-fetch.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove redundant panic

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
(cherry picked from commit 3ea4cf4)
dimitarvdimitrov added a commit that referenced this pull request Oct 3, 2024
#9510)

* kafka replay speed: fall back to franz-go for ongoing fetching

Striking the right configuration for ongoing fetching depends a lot on the characteristics of cluster. franz-go is better at adaptive concurrency, so we fall back to it until we've implemented adaptive concurrency and/or records-per-fetch.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove redundant panic

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
(cherry picked from commit 3ea4cf4)

Co-authored-by: Dimitar Dimitrov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants