diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 728d6f5667..865455661b 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -92,7 +92,7 @@ func (b *BlockBuilder) starting(context.Context) (err error) { b.kafkaClient, err = ingest.NewKafkaReaderClient( b.cfg.Kafka, - ingest.NewKafkaReaderClientMetrics("block-builder", b.register), + ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder", b.register), b.logger, ) if err != nil { diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index efbce4b46b..971752a9c7 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -60,7 +60,7 @@ func New( func (s *BlockBuilderScheduler) starting(ctx context.Context) error { kc, err := ingest.NewKafkaReaderClient( s.cfg.Kafka, - ingest.NewKafkaReaderClientMetrics("block-builder-scheduler", s.register), + ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder-scheduler", s.register), s.logger, ) if err != nil { diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 62e136dcd3..697501af98 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -711,7 +711,7 @@ func (t *Mimir) initQueryFrontendTopicOffsetsReader() (services.Service, error) var err error - kafkaMetrics := ingest.NewKafkaReaderClientMetrics("query-frontend", t.Registerer) + kafkaMetrics := ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "query-frontend", t.Registerer) kafkaClient, err := ingest.NewKafkaReaderClient(t.Cfg.IngestStorage.KafkaConfig, kafkaMetrics, util_log.Logger) if err != nil { return nil, err diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 1ef3f1f189..4abe193213 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -39,6 +39,9 @@ const ( // This is usually used when there aren't enough records available to fulfil MinBytes, so the broker waits for more records to be produced. // Warpstream clamps this between 5s and 30s. defaultMinBytesMaxWaitTime = 5 * time.Second + + // ReaderMetricsPrefix is the reader metrics prefix used by the ingest storage. + ReaderMetricsPrefix = "cortex_ingest_storage_reader" ) var ( @@ -1032,7 +1035,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc }), strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), lastConsumedOffset: lastConsumedOffset, - kprom: NewKafkaReaderClientMetrics(component, reg), + kprom: NewKafkaReaderClientMetrics(ReaderMetricsPrefix, component, reg), missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingest_storage_reader_missed_records_total", Help: "The number of offsets that were never consumed by the reader because they weren't fetched.", diff --git a/pkg/storage/ingest/reader_client.go b/pkg/storage/ingest/reader_client.go index 628fa545fa..ef5fda2b5b 100644 --- a/pkg/storage/ingest/reader_client.go +++ b/pkg/storage/ingest/reader_client.go @@ -36,8 +36,8 @@ func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Lo return client, nil } -func NewKafkaReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.Metrics { - return kprom.NewMetrics("cortex_ingest_storage_reader", +func NewKafkaReaderClientMetrics(prefix, component string, reg prometheus.Registerer) *kprom.Metrics { + return kprom.NewMetrics(prefix, kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg)), // Do not export the client ID, because we use it to specify options to the backend. kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))