Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: upstream push sharding #9454

Merged
merged 3 commits into from
Sep 30, 2024

Conversation

dimitarvdimitrov
Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov commented Sep 27, 2024

What this PR does

This is the third of series of PRs to upstream the code for improving Kafka replay speed in the ingester.

In this PR we upstream push sharding. The basic idea is that instead of ingesting each record from Kafka sequentially, we can shard each timeseries and ingest them in parallel.

I'm submitting the PR, but the work was done jointly by @gotjosh and myself

Basic breakdown

  1. pusherConsumer

    • This component receives records from Kafka and pushes them to storage.
    • It uses a separate goroutine for unmarshaling records while pushing the current record to storage.
    • After unmarshalling each record is pushed to a parallelStoragePusher
  2. parallelStoragePusher

    • Manages parallel pushing of WriteRequests to storage.
    • Tenant separation: It maintains a map of tenant IDs to parallelStorageShards. This ensures each tenant's data is processed independently and in parallel.
    • It also maintains one shard per write request source (Ruler, API). This means we have double the number of shards.
    • When closing, all shards are flushed because:
      a) It provides a complete unit (batch of records) that can be retried if necessary.
      b) It ensures all data from the batch is processed before moving to the next batch.
  3. parallelStorageShards

    • Handles sharding of series for parallel processing for a single (tenant, WriteRequest.Source) pair
    • Sharding method: Each series is sharded based on the hash of its labels. The hashing function used is the same as Prometheus TSDB's stripeSeries, to make best use of reduced lock contention.
  4. batchingQueue

    • Manages batching and flushing of data for each shard.
    • Batching: Accumulates time series and metadata up to a specified batch size.
    • Flushing: When the batch size is reached or when explicitly closed, it pushes the batch to a channel for processing. The other end of that channel invokes the ingester pushing logic.

Error Handling:

Due to the concurrent nature of pushing, it is hard to immediately abort fetching. As a result errors are delayed and we may continue ingesting for a little bit until we detect the error.

The error handling has been extracted from pusherConsumer and implemented in the parallelStoragePusher and sequentialStoragePusher. This approach allows errors to be handled closer to their source. Instead of propagating lists of errors, only retriable critical errors that require aborting consumption are propagated. This simplifies error management and allows for more targeted retry strategies.

Cosmetic Changes

In pusherConsumer.Consume, the two goroutines for unmarshaling and ingestion have been inlined into the Consume function. There shouldn't be any functional changes there.

Metrics changes

We renamed cortex_ingest_storage_reader_processing_time_seconds to cortex_ingest_storage_reader_records_processing_time_seconds to be more consistent.

Which issue(s) this PR fixes or relates to

Fixes #

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov requested review from tacole02 and a team as code owners September 27, 2024 16:19
Copy link
Contributor

@gotjosh gotjosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

pkg/storage/ingest/pusher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/pusher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/pusher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/pusher.go Outdated Show resolved Hide resolved
@dimitarvdimitrov dimitarvdimitrov enabled auto-merge (squash) September 30, 2024 08:43
Co-Authored-By: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov merged commit 456bbfd into main Sep 30, 2024
29 checks passed
@dimitarvdimitrov dimitarvdimitrov deleted the dimitar/ingest/replay-speed/sharding-pushers branch September 30, 2024 16:29
@grafanabot
Copy link
Contributor

The backport to r293 failed:

The process '/usr/bin/git' failed with exit code 1

To backport manually, run these commands in your terminal:

# Fetch latest updates from GitHub
git fetch
# Create a new branch
git switch --create backport-9454-to-r293 origin/r293
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x 456bbfde5745fcb711cee460d208c4e073aa5974
# Push it to GitHub
git push --set-upstream origin backport-9454-to-r293
git switch main
# Remove the local backport branch
git branch -D backport-9454-to-r293

Then, create a pull request where the base branch is r293 and the compare/head branch is backport-9454-to-r293.

grafanabot pushed a commit that referenced this pull request Sep 30, 2024
* kafka replay speed: upstream push sharding

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>

* Apply suggestions from code review

Co-authored-by: gotjosh <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>
(cherry picked from commit 456bbfd)
gotjosh pushed a commit that referenced this pull request Sep 30, 2024
* kafka replay speed: upstream push sharding

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>

* Apply suggestions from code review

Co-authored-by: gotjosh <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>
(cherry picked from commit 456bbfd)

Co-authored-by: Dimitar Dimitrov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants