diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index 4de4e2dd73e..63cd9124b32 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -64,7 +64,8 @@ type KafkaConfig struct { WriteTimeout time.Duration `yaml:"write_timeout"` WriteClients int `yaml:"write_clients"` - ConsumerGroup string `yaml:"consumer_group"` + ConsumerGroup string `yaml:"consumer_group"` + ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"` LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"` @@ -90,6 +91,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.IntVar(&cfg.WriteClients, prefix+".write-clients", 1, "The number of Kafka clients used by producers. When the configured number of clients is greater than 1, partitions are sharded among Kafka clients. An higher number of clients may provide higher write throughput at the cost of additional Metadata requests pressure to Kafka.") f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '' placeholder, it will be replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir will use the ingester instance ID to guarantee uniqueness.") + f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.") f.DurationVar(&cfg.LastProducedOffsetPollInterval, prefix+".last-produced-offset-poll-interval", time.Second, "How frequently to poll the last produced offset, used to enforce strong read consistency.") f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.") diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 50a3d6f041c..df15ba35669 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -59,8 +59,7 @@ type PartitionReader struct { consumer recordConsumer metrics readerMetrics - committer *partitionCommitter - commitInterval time.Duration + committer *partitionCommitter // consumedOffsetWatcher is used to wait until a given offset has been consumed. // This gets initialised with -1 which means nothing has been consumed from the partition yet. @@ -83,7 +82,6 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri consumer: consumer, consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), metrics: newReaderMetrics(partitionID, reg), - commitInterval: time.Second, consumedOffsetWatcher: newPartitionOffsetWatcher(), logger: log.With(logger, "partition", partitionID), reg: reg, @@ -119,7 +117,7 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if err != nil { return errors.Wrap(err, "creating kafka reader client") } - r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.commitInterval, r.logger, r.reg) + r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg) r.offsetReader = newPartitionOffsetReader(r.client, r.kafkaCfg.Topic, r.partitionID, r.kafkaCfg.LastProducedOffsetPollInterval, r.reg, r.logger) @@ -576,10 +574,9 @@ func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr er type partitionCommitter struct { services.Service - kafkaCfg KafkaConfig - commitInterval time.Duration - partitionID int32 - consumerGroup string + kafkaCfg KafkaConfig + partitionID int32 + consumerGroup string toCommit *atomic.Int64 admClient *kadm.Client @@ -593,15 +590,14 @@ type partitionCommitter struct { lastCommittedOffset prometheus.Gauge } -func newPartitionCommitter(kafkaCfg KafkaConfig, admClient *kadm.Client, partitionID int32, consumerGroup string, commitInterval time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter { +func newPartitionCommitter(kafkaCfg KafkaConfig, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter { c := &partitionCommitter{ - logger: logger, - kafkaCfg: kafkaCfg, - partitionID: partitionID, - consumerGroup: consumerGroup, - toCommit: atomic.NewInt64(-1), - admClient: admClient, - commitInterval: commitInterval, + logger: logger, + kafkaCfg: kafkaCfg, + partitionID: partitionID, + consumerGroup: consumerGroup, + toCommit: atomic.NewInt64(-1), + admClient: admClient, commitRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingest_storage_reader_offset_commit_requests_total", @@ -641,7 +637,7 @@ func (r *partitionCommitter) enqueueOffset(o int64) { } func (r *partitionCommitter) run(ctx context.Context) error { - commitTicker := time.NewTicker(r.commitInterval) + commitTicker := time.NewTicker(r.kafkaCfg.ConsumerGroupOffsetCommitInterval) defer commitTicker.Stop() previousOffset := r.toCommit.Load() diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 8504319d88b..3104f9c4ca8 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1190,8 +1190,7 @@ func TestPartitionCommitter(t *testing.T) { adm := kadm.NewClient(client) reg := prometheus.NewPedanticRegistry() - interval := time.Second - committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, interval, logger, reg) + committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, logger, reg) require.NoError(t, services.StartAndAwaitRunning(context.Background(), committer)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), committer)) @@ -1209,7 +1208,7 @@ func TestPartitionCommitter(t *testing.T) { commitRequestsShouldFail.Store(false) // Now we expect the commit to succeed, once the committer will trigger the commit the next interval. - test.Poll(t, 10*interval, nil, func() interface{} { + test.Poll(t, 10*cfg.ConsumerGroupOffsetCommitInterval, nil, func() interface{} { return promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_reader_last_committed_offset The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet. # TYPE cortex_ingest_storage_reader_last_committed_offset gauge @@ -1231,7 +1230,7 @@ func TestPartitionCommitter(t *testing.T) { // Since we haven't enqueued any other offset and the last enqueued one has been successfully committed, // we expect the committer to not issue any other request in the future. expectedRequestsCount := commitRequestsCount.Load() - time.Sleep(3 * interval) + time.Sleep(3 * cfg.ConsumerGroupOffsetCommitInterval) assert.Equal(t, expectedRequestsCount, commitRequestsCount.Load()) }) } @@ -1257,7 +1256,7 @@ func TestPartitionCommitter_commit(t *testing.T) { adm := kadm.NewClient(client) reg := prometheus.NewPedanticRegistry() - committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, time.Second, log.NewNopLogger(), reg) + committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, log.NewNopLogger(), reg) require.NoError(t, committer.commit(context.Background(), 123)) @@ -1297,7 +1296,7 @@ func TestPartitionCommitter_commit(t *testing.T) { adm := kadm.NewClient(client) reg := prometheus.NewPedanticRegistry() - committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, time.Second, log.NewNopLogger(), reg) + committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, log.NewNopLogger(), reg) require.Error(t, committer.commit(context.Background(), 123)) @@ -1342,19 +1341,18 @@ func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, t } type readerTestCfg struct { - kafka KafkaConfig - partitionID int32 - consumer recordConsumer - registry *prometheus.Registry - logger log.Logger - commitInterval time.Duration + kafka KafkaConfig + partitionID int32 + consumer recordConsumer + registry *prometheus.Registry + logger log.Logger } type readerTestCfgOtp func(cfg *readerTestCfg) func withCommitInterval(i time.Duration) func(cfg *readerTestCfg) { return func(cfg *readerTestCfg) { - cfg.commitInterval = i + cfg.kafka.ConsumerGroupOffsetCommitInterval = i } } @@ -1391,12 +1389,11 @@ func withRegistry(reg *prometheus.Registry) func(cfg *readerTestCfg) { func defaultReaderTestConfig(t *testing.T, addr string, topicName string, partitionID int32, consumer recordConsumer) *readerTestCfg { return &readerTestCfg{ - registry: prometheus.NewPedanticRegistry(), - logger: testutil.NewLogger(t), - kafka: createTestKafkaConfig(addr, topicName), - partitionID: partitionID, - consumer: consumer, - commitInterval: 10 * time.Second, + registry: prometheus.NewPedanticRegistry(), + logger: testutil.NewLogger(t), + kafka: createTestKafkaConfig(addr, topicName), + partitionID: partitionID, + consumer: consumer, } } @@ -1407,7 +1404,6 @@ func createReader(t *testing.T, addr string, topicName string, partitionID int32 } reader, err := newPartitionReader(cfg.kafka, cfg.partitionID, "test-group", cfg.consumer, cfg.logger, cfg.registry) require.NoError(t, err) - reader.commitInterval = cfg.commitInterval return reader }