diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml index e17247366d..d00c390191 100644 --- a/development/mimir-ingest-storage/config/mimir.yaml +++ b/development/mimir-ingest-storage/config/mimir.yaml @@ -15,8 +15,10 @@ ingest_storage: address: kafka_1:9092 topic: mimir-ingest last_produced_offset_poll_interval: 500ms - fetch_concurrency: 3 - ingestion_concurrency: 8 + startup_fetch_concurrency: 15 + startup_records_per_fetch: 2400 + ongoing_fetch_concurrency: 2 + ongoing_records_per_fetch: 30 ingester: track_ingester_owned_series: false # suppress log messages in c-61 about empty ring; doesn't affect testing diff --git a/development/mimir-ingest-storage/docker-compose.jsonnet b/development/mimir-ingest-storage/docker-compose.jsonnet index ee94f4a0bd..e401c98586 100644 --- a/development/mimir-ingest-storage/docker-compose.jsonnet +++ b/development/mimir-ingest-storage/docker-compose.jsonnet @@ -48,12 +48,14 @@ std.manifestYamlDoc({ '-ingester.ring.instance-id=ingester-zone-c-61', '-ingester.partition-ring.prefix=exclusive-prefix', '-ingester.ring.prefix=exclusive-prefix', - '-ingest-storage.kafka.consume-from-position-at-startup=start', + '-ingest-storage.kafka.consume-from-position-at-startup=end', '-ingest-storage.kafka.consume-from-timestamp-at-startup=0', '-ingest-storage.kafka.ingestion-concurrency=2', '-ingest-storage.kafka.ingestion-concurrency-batch-size=150', - '-ingest-storage.kafka.fetch-concurrency=4', - '-ingest-storage.kafka.records-per-fetch=6000', + '-ingest-storage.kafka.startup-fetch-concurrency=15', + '-ingest-storage.kafka.startup-records-per-fetch=2400', + '-ingest-storage.kafka.ongoing-fetch-concurrency=2', + '-ingest-storage.kafka.ongoing-records-per-fetch=30', ], extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'], }), diff --git a/development/mimir-ingest-storage/docker-compose.yml b/development/mimir-ingest-storage/docker-compose.yml index 82c394fecc..9b13f200fd 100644 --- a/development/mimir-ingest-storage/docker-compose.yml +++ b/development/mimir-ingest-storage/docker-compose.yml @@ -322,7 +322,7 @@ "command": - "sh" - "-c" - - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=start -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.fetch-concurrency=4 -ingest-storage.kafka.records-per-fetch=6000" + - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.startup-records-per-fetch=2400 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ongoing-records-per-fetch=30" "depends_on": "kafka_1": "condition": "service_healthy" diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 4be9840ebd..a48e7e538f 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -288,14 +288,16 @@ func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx conte // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). - // The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed - // from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously - // written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data - // written in the meanwhile. + // The timeout is equal to the max lag x2. This is done because the ongoing fetcher config reduces lag more slowly, + // but is better at keeping up with the partition and minimizing e2e lag. func() (time.Duration, error) { - timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) + timedCtx, cancel := context.WithTimeoutCause(ctx, 2*maxLag, errWaitTargetLagDeadlineExceeded) defer cancel() + // Don't use timedCtx because we want the fetchers to continue running + // At this point we're close enough to the end of the partition that we should switch to the more responsive fetcher. + r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger) }, @@ -925,6 +927,7 @@ func (r *concurrentFetchers) Stop() { close(r.done) r.wg.Wait() + level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) } // newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset. @@ -944,23 +947,6 @@ func newConcurrentFetchers( metrics *readerMetrics, ) (*concurrentFetchers, error) { - const noReturnedRecords = -1 // we still haven't returned the 0 offset. - - f := &concurrentFetchers{ - client: client, - logger: logger, - topicName: topic, - partitionID: partition, - metrics: metrics, - minBytesWaitTime: minBytesWaitTime, - lastReturnedRecord: noReturnedRecords, - startOffsets: startOffsetsReader, - trackCompressedBytes: trackCompressedBytes, - tracer: recordsTracer(), - orderedFetches: make(chan fetchResult), - done: make(chan struct{}), - } - var err error switch startOffset { case kafkaOffsetStart: @@ -974,6 +960,20 @@ func newConcurrentFetchers( if err != nil { return nil, fmt.Errorf("resolving offset to start consuming from: %w", err) } + f := &concurrentFetchers{ + client: client, + logger: logger, + topicName: topic, + partitionID: partition, + metrics: metrics, + minBytesWaitTime: minBytesWaitTime, + lastReturnedRecord: startOffset - 1, + startOffsets: startOffsetsReader, + trackCompressedBytes: trackCompressedBytes, + tracer: recordsTracer(), + orderedFetches: make(chan fetchResult), + done: make(chan struct{}), + } topics, err := kadm.NewClient(client).ListTopics(ctx, topic) if err != nil { @@ -984,6 +984,7 @@ func newConcurrentFetchers( } f.topicID = topics[topic].ID + f.wg.Add(1) go f.start(ctx, startOffset, concurrency, recordsPerFetch) return f, nil @@ -993,7 +994,8 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records in r.Stop() r.done = make(chan struct{}) - go r.start(ctx, r.lastReturnedRecord, concurrency, records) + r.wg.Add(1) + go r.start(ctx, r.lastReturnedRecord+1, concurrency, records) } func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, context.Context) { @@ -1248,10 +1250,11 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg } func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concurrency, recordsPerFetch int) { - r.wg.Add(concurrency) + level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "recordsPerFetch", recordsPerFetch) wants := make(chan fetchWant) defer close(wants) + r.wg.Add(concurrency) for i := 0; i < concurrency; i++ { logger := log.With(r.logger, "fetcher", i) go r.run(ctx, wants, logger) @@ -1268,7 +1271,6 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume // We need to make sure we don't leak any goroutine given that start is called within a goroutine. - r.wg.Add(1) defer r.wg.Done() for { refillBufferedResult := nextResult @@ -1340,6 +1342,7 @@ func handleKafkaFetchErr(err error, fw fetchWant, longBackoff waiter, partitionS case errors.Is(err, kerr.OffsetOutOfRange): // We're either consuming from before the first offset or after the last offset. partitionStart, err := partitionStartOffset.CachedOffset() + logger = log.With(logger, "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset) if err != nil { level.Error(logger).Log("msg", "failed to find start offset to readjust on OffsetOutOfRange; retrying same records range", "err", err) break @@ -1350,12 +1353,12 @@ func handleKafkaFetchErr(err error, fw fetchWant, longBackoff waiter, partitionS if partitionStart >= fw.endOffset { // The next fetch want is responsible for this range. We set startOffset=endOffset to effectively mark this fetch as complete. fw.startOffset = fw.endOffset - level.Debug(logger).Log("msg", "we're too far behind aborting fetch", "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset) + level.Debug(logger).Log("msg", "we're too far behind aborting fetch") break } // Only some of the offsets of our want are out of range, so let's fast-forward. fw.startOffset = partitionStart - level.Debug(logger).Log("msg", "part of fetch want is outside of available offsets, adjusted start offset", "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset) + level.Debug(logger).Log("msg", "part of fetch want is outside of available offsets, adjusted start offset") } else { // If the broker is behind or if we are requesting offsets which have not yet been produced, we end up here. // We set a MaxWaitMillis on fetch requests, but even then there may be no records for some time. diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 4e4669e8db..95b227935c 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -2410,20 +2410,167 @@ func TestConcurrentFetchers(t *testing.T) { }) + t.Run("update concurrency with continuous production", func(t *testing.T) { + t.Parallel() + const ( + testDuration = 10 * time.Second + produceInterval = 10 * time.Millisecond + initialConcurrency = 2 + ) + + ctx, cancel := context.WithTimeout(context.Background(), testDuration) + defer cancel() + + produceCtx, cancelProduce := context.WithTimeout(context.Background(), testDuration) + defer cancelProduce() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + producedCount := atomic.NewInt64(0) + + // Start producing records continuously + go func() { + ticker := time.NewTicker(produceInterval) + defer ticker.Stop() + + for { + select { + case <-produceCtx.Done(): + return + case <-ticker.C: + count := producedCount.Inc() + record := fmt.Sprintf("record-%d", count) + produceRecord(produceCtx, t, client, topicName, partitionID, []byte(record)) + } + } + }() + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, initialConcurrency, recordsPerFetch) + + fetchedRecords := make([]*kgo.Record, 0) + fetchedCount := atomic.NewInt64(0) + + fetchRecords := func(duration time.Duration) { + deadline := time.Now().Add(duration) + for time.Now().Before(deadline) { + fetches, _ := fetchers.PollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecords = append(fetchedRecords, r) + fetchedCount.Inc() + }) + } + } + + // Initial fetch with starting concurrency + fetchRecords(2 * time.Second) + initialFetched := fetchedCount.Load() + + // Update to higher concurrency + fetchers.Update(ctx, 4, recordsPerFetch) + fetchRecords(3 * time.Second) + highConcurrencyFetched := fetchedCount.Load() - initialFetched + + // Update to lower concurrency + fetchers.Update(ctx, 1, recordsPerFetch) + fetchRecords(3 * time.Second) + + cancelProduce() + // Produce everything that's left now. + fetchRecords(time.Second) + totalProduced := producedCount.Load() + totalFetched := fetchedCount.Load() + + // Verify fetched records + assert.True(t, totalFetched > 0, "Expected to fetch some records") + assert.Equal(t, totalFetched, totalProduced, "Should not fetch more records than produced") + assert.True(t, highConcurrencyFetched > initialFetched, "Expected to fetch more records with higher concurrency") + + // Verify record contents + for i, record := range fetchedRecords { + expectedContent := fmt.Sprintf("record-%d", i+1) + assert.Equal(t, expectedContent, string(record.Value), + "Record %d has unexpected content: %s", i, string(record.Value)) + } + + // Log some statistics + t.Logf("Total produced: %d, Total fetched: %d", totalProduced, totalFetched) + t.Logf("Fetched with initial concurrency: %d", initialFetched) + t.Logf("Fetched with high concurrency: %d", highConcurrencyFetched) + t.Logf("Fetched with low concurrency: %d", totalFetched-initialFetched-highConcurrencyFetched) + }) + + t.Run("consume from end and update immediately", func(t *testing.T) { + t.Parallel() + const ( + initialRecords = 100 + additionalRecords = 50 + initialConcurrency = 2 + updatedConcurrency = 4 + ) + + ctx := context.Background() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + // Produce initial records + for i := 0; i < initialRecords; i++ { + record := fmt.Sprintf("initial-record-%d", i+1) + produceRecord(ctx, t, client, topicName, partitionID, []byte(record)) + } + + // Start concurrent fetchers from the end + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, kafkaOffsetEnd, initialConcurrency, recordsPerFetch) + + // Immediately update concurrency + fetchers.Update(ctx, updatedConcurrency, recordsPerFetch) + + // Produce additional records + for i := 0; i < additionalRecords; i++ { + record := fmt.Sprintf("additional-record-%d", i+1) + produceRecord(ctx, t, client, topicName, partitionID, []byte(record)) + } + + fetchedRecords := make([]*kgo.Record, 0, additionalRecords) + fetchDeadline := time.Now().Add(5 * time.Second) + + // Fetch records + for len(fetchedRecords) < additionalRecords && time.Now().Before(fetchDeadline) { + fetches, _ := fetchers.PollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecords = append(fetchedRecords, r) + }) + } + + // Verify fetched records + assert.LessOrEqual(t, len(fetchedRecords), additionalRecords, + "Should not fetch more records than produced after start") + + // Verify record contents + for i, record := range fetchedRecords { + expectedContent := fmt.Sprintf("additional-record-%d", i+1) + assert.Equal(t, expectedContent, string(record.Value), + "Record %d has unexpected content: %s", i, string(record.Value)) + } + + // Log some statistics + t.Logf("Total records produced: %d", initialRecords+additionalRecords) + t.Logf("Records produced after start: %d", additionalRecords) + t.Logf("Records fetched: %d", len(fetchedRecords)) + }) } func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers { logger := log.NewNopLogger() - metrics := newReaderMetrics(partition, prometheus.NewRegistry()) + reg := prometheus.NewPedanticRegistry() + metrics := newReaderMetrics(partition, reg) // This instantiates the fields of kprom. // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. metrics.kprom.OnNewClient(client) - offsetReader := &partitionOffsetClient{ - client: client, - topic: topic, - } + offsetReader := newPartitionOffsetClient(client, topic, reg, logger) startOffsetsReader := newGenericOffsetReader(func(ctx context.Context) (int64, error) { return offsetReader.FetchPartitionStartOffset(ctx, partition) @@ -2445,6 +2592,7 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli &metrics, ) require.NoError(t, err) + t.Cleanup(f.Stop) return f }