From 7e48ecb787bac60b1afba372a9941b8efb455561 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Dec 2024 12:52:05 +0100 Subject: [PATCH] Refactoring: allow to customize Kafka reader client metrics prefix Signed-off-by: Marco Pracucci --- pkg/blockbuilder/blockbuilder.go | 2 +- pkg/blockbuilder/scheduler/scheduler.go | 2 +- pkg/mimir/modules.go | 2 +- pkg/storage/ingest/reader.go | 5 ++++- pkg/storage/ingest/reader_client.go | 4 ++-- 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 728d6f56671..865455661be 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -92,7 +92,7 @@ func (b *BlockBuilder) starting(context.Context) (err error) { b.kafkaClient, err = ingest.NewKafkaReaderClient( b.cfg.Kafka, - ingest.NewKafkaReaderClientMetrics("block-builder", b.register), + ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder", b.register), b.logger, ) if err != nil { diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index efbce4b46b5..971752a9c7d 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -60,7 +60,7 @@ func New( func (s *BlockBuilderScheduler) starting(ctx context.Context) error { kc, err := ingest.NewKafkaReaderClient( s.cfg.Kafka, - ingest.NewKafkaReaderClientMetrics("block-builder-scheduler", s.register), + ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder-scheduler", s.register), s.logger, ) if err != nil { diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 62e136dcd3d..697501af98f 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -711,7 +711,7 @@ func (t *Mimir) initQueryFrontendTopicOffsetsReader() (services.Service, error) var err error - kafkaMetrics := ingest.NewKafkaReaderClientMetrics("query-frontend", t.Registerer) + kafkaMetrics := ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "query-frontend", t.Registerer) kafkaClient, err := ingest.NewKafkaReaderClient(t.Cfg.IngestStorage.KafkaConfig, kafkaMetrics, util_log.Logger) if err != nil { return nil, err diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 1ef3f1f189f..4abe1932138 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -39,6 +39,9 @@ const ( // This is usually used when there aren't enough records available to fulfil MinBytes, so the broker waits for more records to be produced. // Warpstream clamps this between 5s and 30s. defaultMinBytesMaxWaitTime = 5 * time.Second + + // ReaderMetricsPrefix is the reader metrics prefix used by the ingest storage. + ReaderMetricsPrefix = "cortex_ingest_storage_reader" ) var ( @@ -1032,7 +1035,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc }), strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), lastConsumedOffset: lastConsumedOffset, - kprom: NewKafkaReaderClientMetrics(component, reg), + kprom: NewKafkaReaderClientMetrics(ReaderMetricsPrefix, component, reg), missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingest_storage_reader_missed_records_total", Help: "The number of offsets that were never consumed by the reader because they weren't fetched.", diff --git a/pkg/storage/ingest/reader_client.go b/pkg/storage/ingest/reader_client.go index 628fa545fac..ef5fda2b5b8 100644 --- a/pkg/storage/ingest/reader_client.go +++ b/pkg/storage/ingest/reader_client.go @@ -36,8 +36,8 @@ func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Lo return client, nil } -func NewKafkaReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.Metrics { - return kprom.NewMetrics("cortex_ingest_storage_reader", +func NewKafkaReaderClientMetrics(prefix, component string, reg prometheus.Registerer) *kprom.Metrics { + return kprom.NewMetrics(prefix, kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg)), // Do not export the client ID, because we use it to specify options to the backend. kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))