Skip to content

Commit

Permalink
kafka replay speed: fall back to franz-go for ongoing fetching (#9500)
Browse files Browse the repository at this point in the history
* kafka replay speed: fall back to franz-go for ongoing fetching

Striking the right configuration for ongoing fetching depends a lot on the characteristics of cluster. franz-go is better at adaptive concurrency, so we fall back to it until we've implemented adaptive concurrency and/or records-per-fetch.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove redundant panic

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Oct 3, 2024
1 parent 5855711 commit 3ea4cf4
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6675,7 +6675,7 @@
"kind": "field",
"name": "ongoing_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.",
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3868,8 +3868,8 @@ kafka:
# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be
# greater than 0.
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to
# disable.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. 0 to disable.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")

Expand Down Expand Up @@ -183,8 +183,8 @@ func (cfg *KafkaConfig) Validate() error {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0")
}

if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when ingest-storage.kafka.startup-fetch-concurrency is greater than 0")
if cfg.OngoingFetchConcurrency > 0 && cfg.StartupFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater than 0 when ingest-storage.kafka.ongoing-fetch-concurrency is greater than 0")
}

if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 {
Expand Down
45 changes: 43 additions & 2 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (r *PartitionReader) stopDependencies() error {
}

func (r *PartitionReader) run(ctx context.Context) error {
r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)
r.switchToOngoingFetcher(ctx)

for ctx.Err() == nil {
err := r.processNextFetches(ctx, r.metrics.receiveDelayWhenRunning)
Expand All @@ -246,6 +246,47 @@ func (r *PartitionReader) run(ctx context.Context) error {
return nil
}

func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) {
if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch {
// we're already using the same settings, no need to switch
return
}

if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency > 0 {
// No need to switch the fetcher, just update the records per fetch.
r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)
return
}

if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 {
if r.fetcher == r {
// This method has been called before, no need to switch the fetcher.
return
}
// We need to switch to franz-go for ongoing fetches.
// If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset.
r.fetcher = r

lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset()
if lastConsumed == -1 {
// We haven't consumed any records yet with the other fetcher.
// The franz-go client is initialized to start consuming from the same place as the other fetcher.
// We can just use the client.
return
}

// The client might have some buffered records already while we were using the other fetcher.
// Remove the buffered records.
r.client.RemoveConsumePartitions(map[string][]int32{
r.kafkaCfg.Topic: {r.partitionID},
})
// Resume from the next unconsumed offset.
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastConsumed + 1)},
})
}
}

func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error {
fetches, fetchCtx := r.fetcher.PollFetches(ctx)
// Propagate the fetching span to consuming the records.
Expand Down Expand Up @@ -286,7 +327,7 @@ func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx conte

// Don't use timedCtx because we want the fetchers to continue running
// At this point we're close enough to the end of the partition that we should switch to the more responsive fetcher.
r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)
r.switchToOngoingFetcher(ctx)

return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger)
},
Expand Down
131 changes: 131 additions & 0 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,125 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {
})
}
})

t.Run("should read target lag and then consume more records after switching to 0 ongoing concurrency if position=start, startup_fetch_concurrency=2, ongoing_fetch_concurrency=0", func(t *testing.T) {
t.Parallel()

var (
cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName)
fetchRequestsCount = atomic.NewInt64(0)
fetchShouldFail = atomic.NewBool(false)
consumedRecordsMx sync.Mutex
consumedRecords []string
)

consumer := consumerFunc(func(_ context.Context, records []record) error {
consumedRecordsMx.Lock()
defer consumedRecordsMx.Unlock()

for _, r := range records {
consumedRecords = append(consumedRecords, string(r.content))
}
return nil
})

cluster.ControlKey(int16(kmsg.Fetch), func(kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()
fetchRequestsCount.Inc()

if fetchShouldFail.Load() {
return nil, errors.New("mocked error"), true
}

return nil, nil, false
})

// Produce some records.
writeClient := newKafkaProduceClient(t, clusterAddr)
produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1"))
produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2"))
t.Log("produced 2 records")

// Run the test twice with the same Kafka cluster to show that second time it consumes all records again.
// Reset the test.
fetchShouldFail.Store(true)
fetchRequestsCount.Store(0)
consumedRecordsMx.Lock()
consumedRecords = nil
consumedRecordsMx.Unlock()

// Create and start the reader.
reg := prometheus.NewPedanticRegistry()
logs := &concurrency.SyncBuffer{}
reader := createReader(t, clusterAddr, topicName, partitionID, consumer,
withConsumeFromPositionAtStartup(consumeFromStart),
withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second),
withRegistry(reg),
withLogger(log.NewLogfmtLogger(logs)),
withStartupConcurrency(2),
withOngoingConcurrency(0))

require.NoError(t, reader.StartAsync(ctx))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))
})

// Wait until the Kafka cluster received few Fetch requests.
test.Poll(t, 5*time.Second, true, func() interface{} {
return fetchRequestsCount.Load() > 2
})

// Since the mocked Kafka cluster is configured to fail any Fetch we expect the reader hasn't
// catched up yet, and it's still in Starting state.
assert.Equal(t, services.Starting, reader.State())

// Unblock the Fetch requests. Now they will succeed.
fetchShouldFail.Store(false)

// We expect the reader to catch up, and then switch to Running state.
test.Poll(t, 5*time.Second, services.Running, func() interface{} {
return reader.State()
})

// We expect the reader to have switched to running because target consumer lag has been honored.
assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag")

// We expect the reader to have consumed the partition from start.
test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} {
consumedRecordsMx.Lock()
defer consumedRecordsMx.Unlock()
return slices.Clone(consumedRecords)
})

// We expect the last consumed offset to be tracked in a metric.
test.Poll(t, time.Second, nil, func() interface{} {
return promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.
# TYPE cortex_ingest_storage_reader_last_consumed_offset gauge
cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1
`), "cortex_ingest_storage_reader_last_consumed_offset")
})

produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3"))
produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4"))
t.Log("produced 2 records")

// We expect the reader to have consumed the partition from start.
test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} {
consumedRecordsMx.Lock()
defer consumedRecordsMx.Unlock()
return slices.Clone(consumedRecords)
})

// We expect the last consumed offset to be tracked in a metric.
test.Poll(t, time.Second, nil, func() interface{} {
return promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.
# TYPE cortex_ingest_storage_reader_last_consumed_offset gauge
cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 3
`), "cortex_ingest_storage_reader_last_consumed_offset")
})
})
}

func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) {
Expand Down Expand Up @@ -1829,6 +1948,18 @@ func withLogger(logger log.Logger) func(cfg *readerTestCfg) {
}
}

func withStartupConcurrency(i int) readerTestCfgOtp {
return func(cfg *readerTestCfg) {
cfg.kafka.StartupFetchConcurrency = i
}
}

func withOngoingConcurrency(i int) readerTestCfgOtp {
return func(cfg *readerTestCfg) {
cfg.kafka.OngoingFetchConcurrency = i
}
}

var testingLogger = mimirtest.NewTestingLogger(nil)

func defaultReaderTestConfig(t *testing.T, addr string, topicName string, partitionID int32, consumer recordConsumer) *readerTestCfg {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func createTestKafkaConfig(clusterAddr, topicName string) KafkaConfig {
cfg.WriteTimeout = 2 * time.Second
cfg.StartupFetchConcurrency = 2
cfg.StartupRecordsPerFetch = 2
cfg.OngoingFetchConcurrency = 2
cfg.OngoingFetchConcurrency = 0
cfg.OngoingRecordsPerFetch = 2

return cfg
Expand Down

0 comments on commit 3ea4cf4

Please sign in to comment.