Skip to content

Commit

Permalink
kafka replay speed: concurrency fetching improvements (#9389)
Browse files Browse the repository at this point in the history
* fetched records include timestamps

Signed-off-by: Dimitar Dimitrov <[email protected]>

* try with defaultMinBytesWaitTime=3s

Signed-off-by: Dimitar Dimitrov <[email protected]>

* add fetch_min_bytes_max_wait

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Don't block on sending to the channel

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove wait for when we're fetching from the end

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix bug with blocking on fetch

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Slightly easier to follow lifecycle of previousResult

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Correct merging of results

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Avoid double-logging events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Revert "add fetch_min_bytes_max_wait"

This reverts commit 6197d4b.

* Increase MinBytesWaitTime to 5s

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add comment about warpstream and MinBytes

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Address review comments

Signed-off-by: gotjosh <[email protected]>

* Add tests for concurrentFetchers

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix bugs in tracking lastReturnedRecord

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Renamed method

Signed-off-by: gotjosh <[email protected]>

* use the older context

Signed-off-by: gotjosh <[email protected]>

* Name variable correct variable name

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Reduce MaxWaitTime in PartitionReader tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Change test createConcurrentFetchers signature

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Sort imports

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>
  • Loading branch information
dimitarvdimitrov and gotjosh authored Sep 26, 2024
1 parent 41559b1 commit 861563d
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 43 deletions.
72 changes: 51 additions & 21 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ const (
// kafkaOffsetEnd is a special offset value that means the end of the partition.
kafkaOffsetEnd = int64(-1)

// defaultMinBytesWaitTime is the time the Kafka broker can wait for MinBytes to be filled.
// defaultMinBytesMaxWaitTime is the time the Kafka broker can wait for MinBytes to be filled.
// This is usually used when there aren't enough records available to fulfil MinBytes, so the broker waits for more records to be produced.
defaultMinBytesWaitTime = 10 * time.Second
// Warpstream clamps this between 5s and 30s.
defaultMinBytesMaxWaitTime = 5 * time.Second
)

var (
Expand Down Expand Up @@ -119,7 +120,7 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri
consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID),
metrics: newReaderMetrics(partitionID, reg),
consumedOffsetWatcher: newPartitionOffsetWatcher(),
concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesWaitTime,
concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesMaxWaitTime,
logger: log.With(logger, "partition", partitionID),
reg: reg,
}
Expand Down Expand Up @@ -736,7 +737,7 @@ func fetchWantFrom(offset int64, recordsPerFetch int) fetchWant {
return fetchWant{
startOffset: offset,
endOffset: offset + int64(recordsPerFetch),
result: make(chan fetchResult, 1), // buffer of 1 so we can do secondary attempt requests in the background
result: make(chan fetchResult),
}
}

Expand Down Expand Up @@ -819,6 +820,11 @@ func (fr *fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant)
default:
logger = level.Error(logger)
}
var firstTimestamp, lastTimestamp string
if gotRecords > 0 {
firstTimestamp = fr.Records[0].Timestamp.String()
lastTimestamp = fr.Records[gotRecords-1].Timestamp.String()
}
logger.Log(
"msg", msg,
"duration", time.Since(fetchStartTime),
Expand All @@ -830,6 +836,8 @@ func (fr *fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant)
"asked_bytes", w.MaxBytes(),
"got_bytes", fr.fetchedBytes,
"diff_bytes", int(w.MaxBytes())-fr.fetchedBytes,
"first_timestamp", firstTimestamp,
"last_timestamp", lastTimestamp,
"hwm", fr.HighWatermark,
"lso", fr.LogStartOffset,
"err", fr.Err,
Expand All @@ -841,9 +849,26 @@ func (fr *fetchResult) startWaitingForConsumption() {
}

func (fr *fetchResult) finishWaitingForConsumption() {
if fr.waitingToBePickedUpFromOrderedFetchesSpan == nil {
fr.waitingToBePickedUpFromOrderedFetchesSpan, fr.ctx = opentracing.StartSpanFromContext(fr.ctx, "fetchResult.noWaitingForConsumption")
}
fr.waitingToBePickedUpFromOrderedFetchesSpan.Finish()
}

// Merge merges other with an older fetchResult. mergedWith keeps most of the fields of fr and assumes they are more up to date then other's.
func (fr *fetchResult) Merge(older fetchResult) fetchResult {
if older.ctx != nil {
level.Debug(spanlogger.FromContext(older.ctx, log.NewNopLogger())).Log("msg", "merged fetch result with the next result")
}

// older.Records are older than fr.Records, so we append them first.
fr.Records = append(older.Records, fr.Records...)

// We ignore HighWatermark, LogStartOffset, LastStableOffset because this result should be more up to date.
fr.fetchedBytes += older.fetchedBytes
return *fr
}

func newEmptyFetchResult(ctx context.Context, err error) fetchResult {
return fetchResult{
ctx: ctx,
Expand Down Expand Up @@ -944,8 +969,10 @@ func (r *concurrentFetchers) pollFetches(ctx context.Context) (kgo.Fetches, cont
firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)

r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
f.Records = f.Records[firstUnreturnedRecordIdx:]
if len(f.Records) > 0 {
r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
}

return kgo.Fetches{{
Topics: []kgo.FetchTopic{
Expand All @@ -964,7 +991,7 @@ func recordIndexAfterOffset(records []*kgo.Record, offset int64) int {
return i
}
}
return len(records) - 1
return len(records)
}

func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) {
Expand Down Expand Up @@ -1014,7 +1041,7 @@ func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant) (fr

func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) kmsg.FetchRequest {
req := kmsg.NewFetchRequest()
req.MinBytes = 1
req.MinBytes = 1 // Warpstream ignores this field. This means that the WaitTime below is always waited and MaxBytes play a bigger role in how fast Ws responds.
req.Version = 13
req.MaxWaitMillis = int32(r.minBytesWaitTime / time.Millisecond)
req.MaxBytes = fw.MaxBytes()
Expand Down Expand Up @@ -1110,27 +1137,22 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa
MaxRetries: 0, // retry forever
})

// more aggressive backoff when we're waiting for records to be produced.
// It's likely there's already some records produced by the time we get back the response and send another request.
newRecordsProducedBackoff := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: time.Second,
MaxRetries: 0, // retry forever
})

for w := range wants {
// Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested.
wantSpan, ctx := spanlogger.NewWithLogger(ctx, logger, "concurrentFetcher.fetch")
wantSpan.SetTag("start_offset", w.startOffset)
wantSpan.SetTag("end_offset", w.endOffset)

var previousResult fetchResult
for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ {
attemptSpan, ctx := spanlogger.NewWithLogger(ctx, wantSpan, "concurrentFetcher.fetch.attempt")
attemptSpan, ctx := spanlogger.NewWithLogger(ctx, logger, "concurrentFetcher.fetch.attempt")
attemptSpan.SetTag("attempt", attempt)

f := r.fetchSingle(ctx, w)
f = f.Merge(previousResult)
previousResult = f
if f.Err != nil {
w = handleKafkaFetchErr(f.Err, w, errBackoff, newRecordsProducedBackoff, r.startOffsets, r.client, attemptSpan)
w = handleKafkaFetchErr(f.Err, w, errBackoff, r.startOffsets, r.client, attemptSpan)
}
if len(f.Records) == 0 {
// Typically if we had an error, then there wouldn't be any records.
Expand All @@ -1144,12 +1166,21 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa
// We reset the backoff if we received any records whatsoever. A received record means _some_ success.
// We don't want to slow down until we hit a larger error.
errBackoff.Reset()
newRecordsProducedBackoff.Reset()

f.startWaitingForConsumption()
select {
case w.result <- f:
previousResult = fetchResult{}
case <-ctx.Done():
default:
if w.startOffset >= w.endOffset {
// We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now.
f.startWaitingForConsumption()
select {
case w.result <- f:
previousResult = fetchResult{}
case <-ctx.Done():
}
}
}
attemptSpan.Finish()
}
Expand Down Expand Up @@ -1233,7 +1264,7 @@ type metadataRefresher interface {
// handleKafkaFetchErr handles all the errors listed in the franz-go documentation as possible errors when fetching records.
// For most of them we just apply a backoff. They are listed here so we can be explicit in what we're handling and how.
// It may also return an adjusted fetchWant in case the error indicated, we were consuming not yet produced records or records already deleted due to retention.
func handleKafkaFetchErr(err error, fw fetchWant, shortBackoff, longBackoff waiter, partitionStartOffset *genericOffsetReader[int64], refresher metadataRefresher, logger log.Logger) fetchWant {
func handleKafkaFetchErr(err error, fw fetchWant, longBackoff waiter, partitionStartOffset *genericOffsetReader[int64], refresher metadataRefresher, logger log.Logger) fetchWant {
// Typically franz-go will update its own metadata when it detects a change in brokers. But it's hard to verify this.
// So we force a metadata refresh here to be sure.
// It's ok to call this from multiple fetchers concurrently. franz-go will only be sending one metadata request at a time (whether automatic, periodic, or forced).
Expand Down Expand Up @@ -1269,7 +1300,6 @@ func handleKafkaFetchErr(err error, fw fetchWant, shortBackoff, longBackoff wait
// We set a MaxWaitMillis on fetch requests, but even then there may be no records for some time.
// Wait for a short time to allow the broker to catch up or for new records to be produced.
level.Debug(logger).Log("msg", "offset out of range; waiting for new records to be produced")
shortBackoff.Wait()
}
case errors.Is(err, kerr.TopicAuthorizationFailed):
longBackoff.Wait()
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/ingest/reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package ingest

import (
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -20,7 +18,7 @@ func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Lo
opts = append(opts,
kgo.FetchMinBytes(1),
kgo.FetchMaxBytes(fetchMaxBytes), // these are unused by concurrent fetchers
kgo.FetchMaxWait(5*time.Second),
kgo.FetchMaxWait(defaultMinBytesMaxWaitTime),
kgo.FetchMaxPartitionBytes(50_000_000), // these are unused by concurrent fetchers

// BrokerMaxReadBytes sets the maximum response size that can be read from
Expand Down
Loading

0 comments on commit 861563d

Please sign in to comment.