From 9bfbc29566b8731970adfa0a6fc46c63d96377ba Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Wed, 11 Dec 2024 17:22:29 -0800 Subject: [PATCH] Add topic label to StrongReadConsistencyInstrumentation metrics --- .../querymiddleware/read_consistency.go | 12 +++-- .../querymiddleware/read_consistency_test.go | 12 ++--- pkg/frontend/querymiddleware/roundtrip.go | 2 +- pkg/storage/ingest/fetcher_test.go | 4 +- pkg/storage/ingest/reader.go | 44 +++++++++++-------- pkg/storage/ingest/reader_test.go | 40 ++++++++--------- 6 files changed, 65 insertions(+), 49 deletions(-) diff --git a/pkg/frontend/querymiddleware/read_consistency.go b/pkg/frontend/querymiddleware/read_consistency.go index e47c2dcec3..9c15ae0e02 100644 --- a/pkg/frontend/querymiddleware/read_consistency.go +++ b/pkg/frontend/querymiddleware/read_consistency.go @@ -69,7 +69,7 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp offsetsReader := offsetsReader errGroup.Go(func() error { - offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) { + offsets, err := r.metrics.Observe(offsetsReader.Topic(), false, func() (map[int32]int64, error) { return offsetsReader.WaitNextFetchLastProducedOffset(ctx) }) if err != nil { @@ -103,7 +103,13 @@ func getDefaultReadConsistency(tenantIDs []string, limits Limits) string { return querierapi.ReadConsistencyEventual } -func newReadConsistencyMetrics(reg prometheus.Registerer) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] { +func newReadConsistencyMetrics(reg prometheus.Registerer, offsetsReaders map[string]*ingest.TopicOffsetsReader) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] { const component = "query-frontend" - return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg) + + topics := make([]string, 0, len(offsetsReaders)) + for _, r := range offsetsReaders { + topics = append(topics, r.Topic()) + } + + return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg, topics) } diff --git a/pkg/frontend/querymiddleware/read_consistency_test.go b/pkg/frontend/querymiddleware/read_consistency_test.go index a1c40fdf2c..0c68099ed1 100644 --- a/pkg/frontend/querymiddleware/read_consistency_test.go +++ b/pkg/frontend/querymiddleware/read_consistency_test.go @@ -102,8 +102,10 @@ func TestReadConsistencyRoundTripper(t *testing.T) { req = req.WithContext(querierapi.ContextWithReadConsistencyLevel(req.Context(), testData.reqConsistency)) } + offsetsReaders := map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader} + reg := prometheus.NewPedanticRegistry() - rt := newReadConsistencyRoundTripper(downstream, map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader}, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg)) + rt := newReadConsistencyRoundTripper(downstream, offsetsReaders, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg, offsetsReaders)) _, err = rt.RoundTrip(req) require.NoError(t, err) @@ -130,13 +132,13 @@ func TestReadConsistencyRoundTripper(t *testing.T) { assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="false"} %d - cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="true"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="false"} %d + cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="true"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend"} 0 - `, expectedRequests)), + cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend", topic="%s"} 0 + `, topic, expectedRequests, topic, topic)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index 09417f6376..36d8850f4a 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -282,7 +282,7 @@ func newQueryTripperware( // Enforce read consistency after caching. if len(ingestStorageTopicOffsetsReaders) > 0 { - metrics := newReadConsistencyMetrics(registerer) + metrics := newReadConsistencyMetrics(registerer, ingestStorageTopicOffsetsReaders) queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReaders, limits, log, metrics) instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReaders, limits, log, metrics) diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 34fe74dd89..233f809b0e 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -725,7 +725,7 @@ func TestConcurrentFetchers(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{}) + metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{}, topicName) client := newKafkaProduceClient(t, clusterAddr) @@ -1151,7 +1151,7 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli logger := testingLogger.WithT(t) reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}) + metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}, topic) // This instantiates the fields of kprom. // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 1ef3f1f189..1a987c1fa4 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -121,12 +121,16 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri reg: reg, } - r.metrics = newReaderMetrics(partitionID, reg, r) + r.metrics = newReaderMetrics(partitionID, reg, r, kafkaCfg.Topic) r.Service = services.NewBasicService(r.start, r.run, r.stop) return r, nil } +func (r *PartitionReader) Topic() string { + return r.kafkaCfg.Topic +} + // Stop implements fetcher func (r *PartitionReader) Stop() { // Given the partition reader has no concurrency it doesn't support stopping anything. @@ -758,7 +762,7 @@ func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, of } func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bool, getOffset func(context.Context) (int64, error)) error { - _, err := r.metrics.strongConsistencyInstrumentation.Observe(withOffset, func() (struct{}, error) { + _, err := r.metrics.strongConsistencyInstrumentation.Observe(r.kafkaCfg.Topic, withOffset, func() (struct{}, error) { spanLog := spanlogger.FromContext(ctx, r.logger) spanLog.DebugLog("msg", "waiting for read consistency") @@ -965,7 +969,7 @@ type readerMetricsSource interface { EstimatedBytesPerRecord() int64 } -func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource) readerMetrics { +func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource, topic string) readerMetrics { const component = "partition-reader" receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -1030,7 +1034,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc Help: "How long a consumer spent processing a batch of records from Kafka. This includes retries on server errors.", NativeHistogramBucketFactor: 1.1, }), - strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), + strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg, []string{topic}), lastConsumedOffset: lastConsumedOffset, kprom: NewKafkaReaderClientMetrics(component, reg), missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -1048,23 +1052,23 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc type StrongReadConsistencyInstrumentation[T any] struct { requests *prometheus.CounterVec - failures prometheus.Counter - latency prometheus.Histogram + failures *prometheus.CounterVec + latency *prometheus.HistogramVec } -func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer) *StrongReadConsistencyInstrumentation[T] { +func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer, topics []string) *StrongReadConsistencyInstrumentation[T] { i := &StrongReadConsistencyInstrumentation[T]{ requests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_strong_consistency_requests_total", Help: "Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.", ConstLabels: map[string]string{"component": component}, - }, []string{"with_offset"}), - failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"with_offset", "topic"}), + failures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_strong_consistency_failures_total", Help: "Total number of failures while waiting for strong consistency to be enforced.", ConstLabels: map[string]string{"component": component}, - }), - latency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{"topic"}), + latency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_strong_consistency_wait_duration_seconds", Help: "How long a request spent waiting for strong consistency to be guaranteed.", NativeHistogramBucketFactor: 1.1, @@ -1072,20 +1076,24 @@ func NewStrongReadConsistencyInstrumentation[T any](component string, reg promet NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.DefBuckets, ConstLabels: map[string]string{"component": component}, - }), + }, []string{"topic"}), } // Init metrics. - for _, value := range []bool{true, false} { - i.requests.WithLabelValues(strconv.FormatBool(value)) + for _, topic := range topics { + for _, value := range []bool{true, false} { + i.requests.WithLabelValues(strconv.FormatBool(value), topic) + } + i.failures.WithLabelValues(topic) + i.latency.WithLabelValues(topic) } return i } -func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f func() (T, error)) (_ T, returnErr error) { +func (i *StrongReadConsistencyInstrumentation[T]) Observe(topic string, withOffset bool, f func() (T, error)) (_ T, returnErr error) { startTime := time.Now() - i.requests.WithLabelValues(strconv.FormatBool(withOffset)).Inc() + i.requests.WithLabelValues(strconv.FormatBool(withOffset), topic).Inc() defer func() { // Do not track failure or latency if the request was canceled (because the tracking would be incorrect). @@ -1095,10 +1103,10 @@ func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f fun // Track latency for failures too, so that we have a better measurement of latency if // backend latency is high and requests fail because of timeouts. - i.latency.Observe(time.Since(startTime).Seconds()) + i.latency.WithLabelValues(topic).Observe(time.Since(startTime).Seconds()) if returnErr != nil { - i.failures.Inc() + i.failures.WithLabelValues(topic).Inc() } }() diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 22cef17a54..c6fc2b896c 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -312,13 +312,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) @@ -362,13 +362,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) @@ -415,13 +415,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) @@ -452,13 +452,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) @@ -489,13 +489,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) })