Skip to content

Commit

Permalink
Refactoring: allow to customize Kafka reader client metrics prefix (#…
Browse files Browse the repository at this point in the history
…10226)

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Dec 12, 2024
1 parent c91873a commit 89fd0e4
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (b *BlockBuilder) starting(context.Context) (err error) {

b.kafkaClient, err = ingest.NewKafkaReaderClient(
b.cfg.Kafka,
ingest.NewKafkaReaderClientMetrics("block-builder", b.register),
ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder", b.register),
b.logger,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func New(
func (s *BlockBuilderScheduler) starting(ctx context.Context) error {
kc, err := ingest.NewKafkaReaderClient(
s.cfg.Kafka,
ingest.NewKafkaReaderClientMetrics("block-builder-scheduler", s.register),
ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "block-builder-scheduler", s.register),
s.logger,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (t *Mimir) initQueryFrontendTopicOffsetsReader() (services.Service, error)

var err error

kafkaMetrics := ingest.NewKafkaReaderClientMetrics("query-frontend", t.Registerer)
kafkaMetrics := ingest.NewKafkaReaderClientMetrics(ingest.ReaderMetricsPrefix, "query-frontend", t.Registerer)
kafkaClient, err := ingest.NewKafkaReaderClient(t.Cfg.IngestStorage.KafkaConfig, kafkaMetrics, util_log.Logger)
if err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
// This is usually used when there aren't enough records available to fulfil MinBytes, so the broker waits for more records to be produced.
// Warpstream clamps this between 5s and 30s.
defaultMinBytesMaxWaitTime = 5 * time.Second

// ReaderMetricsPrefix is the reader metrics prefix used by the ingest storage.
ReaderMetricsPrefix = "cortex_ingest_storage_reader"
)

var (
Expand Down Expand Up @@ -1032,7 +1035,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
}),
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
lastConsumedOffset: lastConsumedOffset,
kprom: NewKafkaReaderClientMetrics(component, reg),
kprom: NewKafkaReaderClientMetrics(ReaderMetricsPrefix, component, reg),
missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_missed_records_total",
Help: "The number of offsets that were never consumed by the reader because they weren't fetched.",
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Lo
return client, nil
}

func NewKafkaReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.Metrics {
return kprom.NewMetrics("cortex_ingest_storage_reader",
func NewKafkaReaderClientMetrics(prefix, component string, reg prometheus.Registerer) *kprom.Metrics {
return kprom.NewMetrics(prefix,
kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg)),
// Do not export the client ID, because we use it to specify options to the backend.
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
Expand Down

0 comments on commit 89fd0e4

Please sign in to comment.