Skip to content

Commit

Permalink
kafka replay speed: fix concurrent fetching concurrency transition (#…
Browse files Browse the repository at this point in the history
…9447)

* kafka replay speed: fix concurrent fetching concurrency transition

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/reader.go

* Make sure we evaluate r.lastReturnedRecord WHEN we return

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Redistribute wg.Add

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove defer causing data race

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Move defer to a different place

Signed-off-by: Dimitar Dimitrov <[email protected]>

* WIP

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Give more time to catch up with target_lag

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Clarify comment

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Sep 27, 2024
1 parent e74927d commit 0cb28d6
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 38 deletions.
6 changes: 4 additions & 2 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ ingest_storage:
address: kafka_1:9092
topic: mimir-ingest
last_produced_offset_poll_interval: 500ms
fetch_concurrency: 3
ingestion_concurrency: 8
startup_fetch_concurrency: 15
startup_records_per_fetch: 2400
ongoing_fetch_concurrency: 2
ongoing_records_per_fetch: 30

ingester:
track_ingester_owned_series: false # suppress log messages in c-61 about empty ring; doesn't affect testing
Expand Down
8 changes: 5 additions & 3 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ std.manifestYamlDoc({
'-ingester.ring.instance-id=ingester-zone-c-61',
'-ingester.partition-ring.prefix=exclusive-prefix',
'-ingester.ring.prefix=exclusive-prefix',
'-ingest-storage.kafka.consume-from-position-at-startup=start',
'-ingest-storage.kafka.consume-from-position-at-startup=end',
'-ingest-storage.kafka.consume-from-timestamp-at-startup=0',
'-ingest-storage.kafka.ingestion-concurrency=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
'-ingest-storage.kafka.fetch-concurrency=4',
'-ingest-storage.kafka.records-per-fetch=6000',
'-ingest-storage.kafka.startup-fetch-concurrency=15',
'-ingest-storage.kafka.startup-records-per-fetch=2400',
'-ingest-storage.kafka.ongoing-fetch-concurrency=2',
'-ingest-storage.kafka.ongoing-records-per-fetch=30',
],
extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'],
}),
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=start -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.fetch-concurrency=4 -ingest-storage.kafka.records-per-fetch=6000"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.startup-records-per-fetch=2400 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ongoing-records-per-fetch=30"
"depends_on":
"kafka_1":
"condition": "service_healthy"
Expand Down
57 changes: 30 additions & 27 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,16 @@ func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx conte

// If the target lag hasn't been reached with the first attempt (which stops once at least the max lag
// is honored) then we try to reach the (lower) target lag within a fixed time (best-effort).
// The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed
// from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously
// written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data
// written in the meanwhile.
// The timeout is equal to the max lag x2. This is done because the ongoing fetcher config reduces lag more slowly,
// but is better at keeping up with the partition and minimizing e2e lag.
func() (time.Duration, error) {
timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded)
timedCtx, cancel := context.WithTimeoutCause(ctx, 2*maxLag, errWaitTargetLagDeadlineExceeded)
defer cancel()

// Don't use timedCtx because we want the fetchers to continue running
// At this point we're close enough to the end of the partition that we should switch to the more responsive fetcher.
r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)

return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger)
},

Expand Down Expand Up @@ -925,6 +927,7 @@ func (r *concurrentFetchers) Stop() {
close(r.done)

r.wg.Wait()
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
}

// newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset.
Expand All @@ -944,23 +947,6 @@ func newConcurrentFetchers(
metrics *readerMetrics,
) (*concurrentFetchers, error) {

const noReturnedRecords = -1 // we still haven't returned the 0 offset.

f := &concurrentFetchers{
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: noReturnedRecords,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
}

var err error
switch startOffset {
case kafkaOffsetStart:
Expand All @@ -974,6 +960,20 @@ func newConcurrentFetchers(
if err != nil {
return nil, fmt.Errorf("resolving offset to start consuming from: %w", err)
}
f := &concurrentFetchers{
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
}

topics, err := kadm.NewClient(client).ListTopics(ctx, topic)
if err != nil {
Expand All @@ -984,6 +984,7 @@ func newConcurrentFetchers(
}
f.topicID = topics[topic].ID

f.wg.Add(1)
go f.start(ctx, startOffset, concurrency, recordsPerFetch)

return f, nil
Expand All @@ -993,7 +994,8 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records in
r.Stop()
r.done = make(chan struct{})

go r.start(ctx, r.lastReturnedRecord, concurrency, records)
r.wg.Add(1)
go r.start(ctx, r.lastReturnedRecord+1, concurrency, records)
}

func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, context.Context) {
Expand Down Expand Up @@ -1248,10 +1250,11 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
}

func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concurrency, recordsPerFetch int) {
r.wg.Add(concurrency)
level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "recordsPerFetch", recordsPerFetch)

wants := make(chan fetchWant)
defer close(wants)
r.wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
logger := log.With(r.logger, "fetcher", i)
go r.run(ctx, wants, logger)
Expand All @@ -1268,7 +1271,6 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume

// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
r.wg.Add(1)
defer r.wg.Done()
for {
refillBufferedResult := nextResult
Expand Down Expand Up @@ -1340,6 +1342,7 @@ func handleKafkaFetchErr(err error, fw fetchWant, longBackoff waiter, partitionS
case errors.Is(err, kerr.OffsetOutOfRange):
// We're either consuming from before the first offset or after the last offset.
partitionStart, err := partitionStartOffset.CachedOffset()
logger = log.With(logger, "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset)
if err != nil {
level.Error(logger).Log("msg", "failed to find start offset to readjust on OffsetOutOfRange; retrying same records range", "err", err)
break
Expand All @@ -1350,12 +1353,12 @@ func handleKafkaFetchErr(err error, fw fetchWant, longBackoff waiter, partitionS
if partitionStart >= fw.endOffset {
// The next fetch want is responsible for this range. We set startOffset=endOffset to effectively mark this fetch as complete.
fw.startOffset = fw.endOffset
level.Debug(logger).Log("msg", "we're too far behind aborting fetch", "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset)
level.Debug(logger).Log("msg", "we're too far behind aborting fetch")
break
}
// Only some of the offsets of our want are out of range, so let's fast-forward.
fw.startOffset = partitionStart
level.Debug(logger).Log("msg", "part of fetch want is outside of available offsets, adjusted start offset", "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset)
level.Debug(logger).Log("msg", "part of fetch want is outside of available offsets, adjusted start offset")
} else {
// If the broker is behind or if we are requesting offsets which have not yet been produced, we end up here.
// We set a MaxWaitMillis on fetch requests, but even then there may be no records for some time.
Expand Down
158 changes: 153 additions & 5 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2410,20 +2410,167 @@ func TestConcurrentFetchers(t *testing.T) {

})

t.Run("update concurrency with continuous production", func(t *testing.T) {
t.Parallel()
const (
testDuration = 10 * time.Second
produceInterval = 10 * time.Millisecond
initialConcurrency = 2
)

ctx, cancel := context.WithTimeout(context.Background(), testDuration)
defer cancel()

produceCtx, cancelProduce := context.WithTimeout(context.Background(), testDuration)
defer cancelProduce()

_, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName)
client := newKafkaProduceClient(t, clusterAddr)

producedCount := atomic.NewInt64(0)

// Start producing records continuously
go func() {
ticker := time.NewTicker(produceInterval)
defer ticker.Stop()

for {
select {
case <-produceCtx.Done():
return
case <-ticker.C:
count := producedCount.Inc()
record := fmt.Sprintf("record-%d", count)
produceRecord(produceCtx, t, client, topicName, partitionID, []byte(record))
}
}
}()

fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, initialConcurrency, recordsPerFetch)

fetchedRecords := make([]*kgo.Record, 0)
fetchedCount := atomic.NewInt64(0)

fetchRecords := func(duration time.Duration) {
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, r)
fetchedCount.Inc()
})
}
}

// Initial fetch with starting concurrency
fetchRecords(2 * time.Second)
initialFetched := fetchedCount.Load()

// Update to higher concurrency
fetchers.Update(ctx, 4, recordsPerFetch)
fetchRecords(3 * time.Second)
highConcurrencyFetched := fetchedCount.Load() - initialFetched

// Update to lower concurrency
fetchers.Update(ctx, 1, recordsPerFetch)
fetchRecords(3 * time.Second)

cancelProduce()
// Produce everything that's left now.
fetchRecords(time.Second)
totalProduced := producedCount.Load()
totalFetched := fetchedCount.Load()

// Verify fetched records
assert.True(t, totalFetched > 0, "Expected to fetch some records")
assert.Equal(t, totalFetched, totalProduced, "Should not fetch more records than produced")
assert.True(t, highConcurrencyFetched > initialFetched, "Expected to fetch more records with higher concurrency")

// Verify record contents
for i, record := range fetchedRecords {
expectedContent := fmt.Sprintf("record-%d", i+1)
assert.Equal(t, expectedContent, string(record.Value),
"Record %d has unexpected content: %s", i, string(record.Value))
}

// Log some statistics
t.Logf("Total produced: %d, Total fetched: %d", totalProduced, totalFetched)
t.Logf("Fetched with initial concurrency: %d", initialFetched)
t.Logf("Fetched with high concurrency: %d", highConcurrencyFetched)
t.Logf("Fetched with low concurrency: %d", totalFetched-initialFetched-highConcurrencyFetched)
})

t.Run("consume from end and update immediately", func(t *testing.T) {
t.Parallel()
const (
initialRecords = 100
additionalRecords = 50
initialConcurrency = 2
updatedConcurrency = 4
)

ctx := context.Background()

_, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName)
client := newKafkaProduceClient(t, clusterAddr)

// Produce initial records
for i := 0; i < initialRecords; i++ {
record := fmt.Sprintf("initial-record-%d", i+1)
produceRecord(ctx, t, client, topicName, partitionID, []byte(record))
}

// Start concurrent fetchers from the end
fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, kafkaOffsetEnd, initialConcurrency, recordsPerFetch)

// Immediately update concurrency
fetchers.Update(ctx, updatedConcurrency, recordsPerFetch)

// Produce additional records
for i := 0; i < additionalRecords; i++ {
record := fmt.Sprintf("additional-record-%d", i+1)
produceRecord(ctx, t, client, topicName, partitionID, []byte(record))
}

fetchedRecords := make([]*kgo.Record, 0, additionalRecords)
fetchDeadline := time.Now().Add(5 * time.Second)

// Fetch records
for len(fetchedRecords) < additionalRecords && time.Now().Before(fetchDeadline) {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, r)
})
}

// Verify fetched records
assert.LessOrEqual(t, len(fetchedRecords), additionalRecords,
"Should not fetch more records than produced after start")

// Verify record contents
for i, record := range fetchedRecords {
expectedContent := fmt.Sprintf("additional-record-%d", i+1)
assert.Equal(t, expectedContent, string(record.Value),
"Record %d has unexpected content: %s", i, string(record.Value))
}

// Log some statistics
t.Logf("Total records produced: %d", initialRecords+additionalRecords)
t.Logf("Records produced after start: %d", additionalRecords)
t.Logf("Records fetched: %d", len(fetchedRecords))
})
}

func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers {
logger := log.NewNopLogger()
metrics := newReaderMetrics(partition, prometheus.NewRegistry())
reg := prometheus.NewPedanticRegistry()
metrics := newReaderMetrics(partition, reg)

// This instantiates the fields of kprom.
// This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves.
metrics.kprom.OnNewClient(client)

offsetReader := &partitionOffsetClient{
client: client,
topic: topic,
}
offsetReader := newPartitionOffsetClient(client, topic, reg, logger)

startOffsetsReader := newGenericOffsetReader(func(ctx context.Context) (int64, error) {
return offsetReader.FetchPartitionStartOffset(ctx, partition)
Expand All @@ -2445,6 +2592,7 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli
&metrics,
)
require.NoError(t, err)
t.Cleanup(f.Stop)

return f
}
Expand Down

0 comments on commit 0cb28d6

Please sign in to comment.