diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index ce9208db4fb..32f331dde34 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6664,7 +6664,7 @@ "kind": "field", "name": "ongoing_fetch_concurrency", "required": false, - "desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.", + "desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.", "fieldValue": null, "fieldDefaultValue": 0, "fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index ef0ff249051..c1a9957ffbf 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1374,7 +1374,7 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.ongoing-fetch-concurrency int - The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0. + The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable. -ingest-storage.kafka.ongoing-records-per-fetch int The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30) -ingest-storage.kafka.producer-max-buffered-bytes int diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index f556d397e28..17978b52626 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -422,7 +422,7 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.ongoing-fetch-concurrency int - The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0. + The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable. -ingest-storage.kafka.ongoing-records-per-fetch int The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30) -ingest-storage.kafka.producer-max-buffered-bytes int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index f63aff4ded1..5f1ad1215ec 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3861,8 +3861,8 @@ kafka: # The number of concurrent fetch requests that the ingester makes when reading # data continuously from Kafka after startup. Is disabled unless - # ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be - # greater than 0. + # ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to + # disable. # CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency [ongoing_fetch_concurrency: | default = 0] diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index f4d9aeab1e7..43ae99618d0 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -138,7 +138,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.") f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.") - f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.") + f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. 0 to disable.") f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.") f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.") @@ -183,8 +183,8 @@ func (cfg *KafkaConfig) Validate() error { return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0") } - if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 { - return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when ingest-storage.kafka.startup-fetch-concurrency is greater than 0") + if cfg.OngoingFetchConcurrency > 0 && cfg.StartupFetchConcurrency <= 0 { + return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater than 0 when ingest-storage.kafka.ongoing-fetch-concurrency is greater than 0") } if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 { diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index b6744557f66..917b3235b48 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -231,7 +231,7 @@ func (r *PartitionReader) stopDependencies() error { } func (r *PartitionReader) run(ctx context.Context) error { - r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + r.switchToOngoingFetcher(ctx) for ctx.Err() == nil { err := r.processNextFetches(ctx, r.metrics.receiveDelayWhenRunning) @@ -244,6 +244,47 @@ func (r *PartitionReader) run(ctx context.Context) error { return nil } +func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { + if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch { + // we're already using the same settings, no need to switch + return + } + + if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency > 0 { + // No need to switch the fetcher, just update the records per fetch. + r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + return + } + + if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 { + if r.fetcher == r { + // This method has been called before, no need to switch the fetcher. + return + } + // We need to switch to franz-go for ongoing fetches. + // If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset. + r.fetcher = r + + lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset() + if lastConsumed == -1 { + // We haven't consumed any records yet with the other fetcher. + // The franz-go client is initialized to start consuming from the same place as the other fetcher. + // We can just use the client. + return + } + + // The client might have some buffered records already while we were using the other fetcher. + // Remove the buffered records. + r.client.RemoveConsumePartitions(map[string][]int32{ + r.kafkaCfg.Topic: {r.partitionID}, + }) + // Resume from the next unconsumed offset. + r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastConsumed + 1)}, + }) + } +} + func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error { fetches, fetchCtx := r.fetcher.PollFetches(ctx) // Propagate the fetching span to consuming the records. @@ -284,7 +325,7 @@ func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx conte // Don't use timedCtx because we want the fetchers to continue running // At this point we're close enough to the end of the partition that we should switch to the more responsive fetcher. - r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + r.switchToOngoingFetcher(ctx) return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger) }, diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index a9daf464de4..01cac29704c 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1439,6 +1439,125 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) } }) + + t.Run("should read target lag and then consume more records after switching to 0 ongoing concurrency if position=start, startup_fetch_concurrency=2, ongoing_fetch_concurrency=0", func(t *testing.T) { + t.Parallel() + + var ( + cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) + fetchRequestsCount = atomic.NewInt64(0) + fetchShouldFail = atomic.NewBool(false) + consumedRecordsMx sync.Mutex + consumedRecords []string + ) + + consumer := consumerFunc(func(_ context.Context, records []record) error { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + + for _, r := range records { + consumedRecords = append(consumedRecords, string(r.content)) + } + return nil + }) + + cluster.ControlKey(int16(kmsg.Fetch), func(kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() + fetchRequestsCount.Inc() + + if fetchShouldFail.Load() { + return nil, errors.New("mocked error"), true + } + + return nil, nil, false + }) + + // Produce some records. + writeClient := newKafkaProduceClient(t, clusterAddr) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) + t.Log("produced 2 records") + + // Run the test twice with the same Kafka cluster to show that second time it consumes all records again. + // Reset the test. + fetchShouldFail.Store(true) + fetchRequestsCount.Store(0) + consumedRecordsMx.Lock() + consumedRecords = nil + consumedRecordsMx.Unlock() + + // Create and start the reader. + reg := prometheus.NewPedanticRegistry() + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromStart), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs)), + withStartupConcurrency(2), + withOngoingConcurrency(0)) + + require.NoError(t, reader.StartAsync(ctx)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + }) + + // Wait until the Kafka cluster received few Fetch requests. + test.Poll(t, 5*time.Second, true, func() interface{} { + return fetchRequestsCount.Load() > 2 + }) + + // Since the mocked Kafka cluster is configured to fail any Fetch we expect the reader hasn't + // catched up yet, and it's still in Starting state. + assert.Equal(t, services.Starting, reader.State()) + + // Unblock the Fetch requests. Now they will succeed. + fetchShouldFail.Store(false) + + // We expect the reader to catch up, and then switch to Running state. + test.Poll(t, 5*time.Second, services.Running, func() interface{} { + return reader.State() + }) + + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + + // We expect the reader to have consumed the partition from start. + test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) + + // We expect the last consumed offset to be tracked in a metric. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. + # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge + cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1 + `), "cortex_ingest_storage_reader_last_consumed_offset") + }) + + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4")) + t.Log("produced 2 records") + + // We expect the reader to have consumed the partition from start. + test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) + + // We expect the last consumed offset to be tracked in a metric. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. + # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge + cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 3 + `), "cortex_ingest_storage_reader_last_consumed_offset") + }) + }) } func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { @@ -1827,6 +1946,18 @@ func withLogger(logger log.Logger) func(cfg *readerTestCfg) { } } +func withStartupConcurrency(i int) readerTestCfgOtp { + return func(cfg *readerTestCfg) { + cfg.kafka.StartupFetchConcurrency = i + } +} + +func withOngoingConcurrency(i int) readerTestCfgOtp { + return func(cfg *readerTestCfg) { + cfg.kafka.OngoingFetchConcurrency = i + } +} + var testingLogger = mimirtest.NewTestingLogger(nil) func defaultReaderTestConfig(t *testing.T, addr string, topicName string, partitionID int32, consumer recordConsumer) *readerTestCfg { diff --git a/pkg/storage/ingest/writer_test.go b/pkg/storage/ingest/writer_test.go index ebf27fff98d..40f7f4b0b76 100644 --- a/pkg/storage/ingest/writer_test.go +++ b/pkg/storage/ingest/writer_test.go @@ -1101,7 +1101,7 @@ func createTestKafkaConfig(clusterAddr, topicName string) KafkaConfig { cfg.WriteTimeout = 2 * time.Second cfg.StartupFetchConcurrency = 2 cfg.StartupRecordsPerFetch = 2 - cfg.OngoingFetchConcurrency = 2 + cfg.OngoingFetchConcurrency = 0 cfg.OngoingRecordsPerFetch = 2 return cfg