Skip to content

Commit

Permalink
Add topic label to StrongReadConsistencyInstrumentation metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
leizor committed Dec 12, 2024
1 parent 6591673 commit 9bfbc29
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 49 deletions.
12 changes: 9 additions & 3 deletions pkg/frontend/querymiddleware/read_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp
offsetsReader := offsetsReader

errGroup.Go(func() error {
offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) {
offsets, err := r.metrics.Observe(offsetsReader.Topic(), false, func() (map[int32]int64, error) {
return offsetsReader.WaitNextFetchLastProducedOffset(ctx)
})
if err != nil {
Expand Down Expand Up @@ -103,7 +103,13 @@ func getDefaultReadConsistency(tenantIDs []string, limits Limits) string {
return querierapi.ReadConsistencyEventual
}

func newReadConsistencyMetrics(reg prometheus.Registerer) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] {
func newReadConsistencyMetrics(reg prometheus.Registerer, offsetsReaders map[string]*ingest.TopicOffsetsReader) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] {
const component = "query-frontend"
return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg)

topics := make([]string, 0, len(offsetsReaders))
for _, r := range offsetsReaders {
topics = append(topics, r.Topic())
}

return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg, topics)
}
12 changes: 7 additions & 5 deletions pkg/frontend/querymiddleware/read_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ func TestReadConsistencyRoundTripper(t *testing.T) {
req = req.WithContext(querierapi.ContextWithReadConsistencyLevel(req.Context(), testData.reqConsistency))
}

offsetsReaders := map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader}

reg := prometheus.NewPedanticRegistry()
rt := newReadConsistencyRoundTripper(downstream, map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader}, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg))
rt := newReadConsistencyRoundTripper(downstream, offsetsReaders, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg, offsetsReaders))
_, err = rt.RoundTrip(req)
require.NoError(t, err)

Expand All @@ -130,13 +132,13 @@ func TestReadConsistencyRoundTripper(t *testing.T) {
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.
# TYPE cortex_ingest_storage_strong_consistency_requests_total counter
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="false"} %d
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="true"} 0
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="false"} %d
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="true"} 0
# HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced.
# TYPE cortex_ingest_storage_strong_consistency_failures_total counter
cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend"} 0
`, expectedRequests)),
cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend", topic="%s"} 0
`, topic, expectedRequests, topic, topic)),
"cortex_ingest_storage_strong_consistency_requests_total",
"cortex_ingest_storage_strong_consistency_failures_total"))
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func newQueryTripperware(

// Enforce read consistency after caching.
if len(ingestStorageTopicOffsetsReaders) > 0 {
metrics := newReadConsistencyMetrics(registerer)
metrics := newReadConsistencyMetrics(registerer, ingestStorageTopicOffsetsReaders)

queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReaders, limits, log, metrics)
instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReaders, limits, log, metrics)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func TestConcurrentFetchers(t *testing.T) {

logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{})
metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{}, topicName)

client := newKafkaProduceClient(t, clusterAddr)

Expand Down Expand Up @@ -1151,7 +1151,7 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli
logger := testingLogger.WithT(t)

reg := prometheus.NewPedanticRegistry()
metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{})
metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}, topic)

// This instantiates the fields of kprom.
// This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves.
Expand Down
44 changes: 26 additions & 18 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,16 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri
reg: reg,
}

r.metrics = newReaderMetrics(partitionID, reg, r)
r.metrics = newReaderMetrics(partitionID, reg, r, kafkaCfg.Topic)

r.Service = services.NewBasicService(r.start, r.run, r.stop)
return r, nil
}

func (r *PartitionReader) Topic() string {
return r.kafkaCfg.Topic
}

// Stop implements fetcher
func (r *PartitionReader) Stop() {
// Given the partition reader has no concurrency it doesn't support stopping anything.
Expand Down Expand Up @@ -758,7 +762,7 @@ func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, of
}

func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bool, getOffset func(context.Context) (int64, error)) error {
_, err := r.metrics.strongConsistencyInstrumentation.Observe(withOffset, func() (struct{}, error) {
_, err := r.metrics.strongConsistencyInstrumentation.Observe(r.kafkaCfg.Topic, withOffset, func() (struct{}, error) {
spanLog := spanlogger.FromContext(ctx, r.logger)
spanLog.DebugLog("msg", "waiting for read consistency")

Expand Down Expand Up @@ -965,7 +969,7 @@ type readerMetricsSource interface {
EstimatedBytesPerRecord() int64
}

func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource) readerMetrics {
func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource, topic string) readerMetrics {
const component = "partition-reader"

receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -1030,7 +1034,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
Help: "How long a consumer spent processing a batch of records from Kafka. This includes retries on server errors.",
NativeHistogramBucketFactor: 1.1,
}),
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg, []string{topic}),
lastConsumedOffset: lastConsumedOffset,
kprom: NewKafkaReaderClientMetrics(component, reg),
missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand All @@ -1048,44 +1052,48 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc

type StrongReadConsistencyInstrumentation[T any] struct {
requests *prometheus.CounterVec
failures prometheus.Counter
latency prometheus.Histogram
failures *prometheus.CounterVec
latency *prometheus.HistogramVec
}

func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer) *StrongReadConsistencyInstrumentation[T] {
func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer, topics []string) *StrongReadConsistencyInstrumentation[T] {
i := &StrongReadConsistencyInstrumentation[T]{
requests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_strong_consistency_requests_total",
Help: "Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.",
ConstLabels: map[string]string{"component": component},
}, []string{"with_offset"}),
failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{
}, []string{"with_offset", "topic"}),
failures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_strong_consistency_failures_total",
Help: "Total number of failures while waiting for strong consistency to be enforced.",
ConstLabels: map[string]string{"component": component},
}),
latency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
}, []string{"topic"}),
latency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_strong_consistency_wait_duration_seconds",
Help: "How long a request spent waiting for strong consistency to be guaranteed.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.DefBuckets,
ConstLabels: map[string]string{"component": component},
}),
}, []string{"topic"}),
}

// Init metrics.
for _, value := range []bool{true, false} {
i.requests.WithLabelValues(strconv.FormatBool(value))
for _, topic := range topics {
for _, value := range []bool{true, false} {
i.requests.WithLabelValues(strconv.FormatBool(value), topic)
}
i.failures.WithLabelValues(topic)
i.latency.WithLabelValues(topic)
}

return i
}

func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f func() (T, error)) (_ T, returnErr error) {
func (i *StrongReadConsistencyInstrumentation[T]) Observe(topic string, withOffset bool, f func() (T, error)) (_ T, returnErr error) {
startTime := time.Now()
i.requests.WithLabelValues(strconv.FormatBool(withOffset)).Inc()
i.requests.WithLabelValues(strconv.FormatBool(withOffset), topic).Inc()

defer func() {
// Do not track failure or latency if the request was canceled (because the tracking would be incorrect).
Expand All @@ -1095,10 +1103,10 @@ func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f fun

// Track latency for failures too, so that we have a better measurement of latency if
// backend latency is high and requests fail because of timeouts.
i.latency.Observe(time.Since(startTime).Seconds())
i.latency.WithLabelValues(topic).Observe(time.Since(startTime).Seconds())

if returnErr != nil {
i.failures.Inc()
i.failures.WithLabelValues(topic).Inc()
}
}()

Expand Down
40 changes: 20 additions & 20 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.
# TYPE cortex_ingest_storage_strong_consistency_requests_total counter
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0
# HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced.
# TYPE cortex_ingest_storage_strong_consistency_failures_total counter
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0
`, withOffset, !withOffset)),
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0
`, topicName, withOffset, topicName, !withOffset, topicName)),
"cortex_ingest_storage_strong_consistency_requests_total",
"cortex_ingest_storage_strong_consistency_failures_total"))
})
Expand Down Expand Up @@ -362,13 +362,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.
# TYPE cortex_ingest_storage_strong_consistency_requests_total counter
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0
# HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced.
# TYPE cortex_ingest_storage_strong_consistency_failures_total counter
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1
`, withOffset, !withOffset)),
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1
`, topicName, withOffset, topicName, !withOffset, topicName)),
"cortex_ingest_storage_strong_consistency_requests_total",
"cortex_ingest_storage_strong_consistency_failures_total"))

Expand Down Expand Up @@ -415,13 +415,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.
# TYPE cortex_ingest_storage_strong_consistency_requests_total counter
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0
# HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced.
# TYPE cortex_ingest_storage_strong_consistency_failures_total counter
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1
`, withOffset, !withOffset)),
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1
`, topicName, withOffset, topicName, !withOffset, topicName)),
"cortex_ingest_storage_strong_consistency_requests_total",
"cortex_ingest_storage_strong_consistency_failures_total"))

Expand Down Expand Up @@ -452,13 +452,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.
# TYPE cortex_ingest_storage_strong_consistency_requests_total counter
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0
# HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced.
# TYPE cortex_ingest_storage_strong_consistency_failures_total counter
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0
`, withOffset, !withOffset)),
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0
`, topicName, withOffset, topicName, !withOffset, topicName)),
"cortex_ingest_storage_strong_consistency_requests_total",
"cortex_ingest_storage_strong_consistency_failures_total"))
})
Expand Down Expand Up @@ -489,13 +489,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.
# TYPE cortex_ingest_storage_strong_consistency_requests_total counter
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1
cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0
# HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced.
# TYPE cortex_ingest_storage_strong_consistency_failures_total counter
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1
`, withOffset, !withOffset)),
cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1
`, topicName, withOffset, topicName, !withOffset, topicName)),
"cortex_ingest_storage_strong_consistency_requests_total",
"cortex_ingest_storage_strong_consistency_failures_total"))
})
Expand Down

0 comments on commit 9bfbc29

Please sign in to comment.