diff --git a/CHANGELOG.md b/CHANGELOG.md index 042c546291..c31ba2b06f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## main / unreleased +* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220 + ### Grafana Mimir * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 diff --git a/pkg/frontend/querymiddleware/read_consistency.go b/pkg/frontend/querymiddleware/read_consistency.go index 7455a924af..9c15ae0e02 100644 --- a/pkg/frontend/querymiddleware/read_consistency.go +++ b/pkg/frontend/querymiddleware/read_consistency.go @@ -4,11 +4,13 @@ package querymiddleware import ( "net/http" + "sync" "github.com/go-kit/log" "github.com/grafana/dskit/tenant" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/errgroup" apierror "github.com/grafana/mimir/pkg/api/error" querierapi "github.com/grafana/mimir/pkg/querier/api" @@ -19,19 +21,21 @@ import ( type readConsistencyRoundTripper struct { next http.RoundTripper - offsetsReader *ingest.TopicOffsetsReader - limits Limits - logger log.Logger - metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] + // offsetsReaders is a map of offsets readers keyed by the request header the offsets get attached to. + offsetsReaders map[string]*ingest.TopicOffsetsReader + + limits Limits + logger log.Logger + metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] } -func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReader *ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper { +func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReaders map[string]*ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper { return &readConsistencyRoundTripper{ - next: next, - limits: limits, - logger: logger, - offsetsReader: offsetsReader, - metrics: metrics, + next: next, + offsetsReaders: offsetsReaders, + limits: limits, + logger: logger, + metrics: metrics, } } @@ -57,15 +61,32 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp return r.next.RoundTrip(req) } - // Fetch last produced offsets. - offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) { - return r.offsetsReader.WaitNextFetchLastProducedOffset(ctx) - }) - if err != nil { - return nil, errors.Wrap(err, "wait for last produced offsets") + errGroup, ctx := errgroup.WithContext(ctx) + reqHeaderLock := &sync.Mutex{} + + for headerKey, offsetsReader := range r.offsetsReaders { + headerKey := headerKey + offsetsReader := offsetsReader + + errGroup.Go(func() error { + offsets, err := r.metrics.Observe(offsetsReader.Topic(), false, func() (map[int32]int64, error) { + return offsetsReader.WaitNextFetchLastProducedOffset(ctx) + }) + if err != nil { + return errors.Wrapf(err, "wait for last produced offsets of topic '%s'", offsetsReader.Topic()) + } + + reqHeaderLock.Lock() + req.Header.Add(headerKey, string(querierapi.EncodeOffsets(offsets))) + reqHeaderLock.Unlock() + + return nil + }) } - req.Header.Add(querierapi.ReadConsistencyOffsetsHeader, string(querierapi.EncodeOffsets(offsets))) + if err = errGroup.Wait(); err != nil { + return nil, err + } return r.next.RoundTrip(req) } @@ -82,7 +103,13 @@ func getDefaultReadConsistency(tenantIDs []string, limits Limits) string { return querierapi.ReadConsistencyEventual } -func newReadConsistencyMetrics(reg prometheus.Registerer) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] { +func newReadConsistencyMetrics(reg prometheus.Registerer, offsetsReaders map[string]*ingest.TopicOffsetsReader) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] { const component = "query-frontend" - return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg) + + topics := make([]string, 0, len(offsetsReaders)) + for _, r := range offsetsReaders { + topics = append(topics, r.Topic()) + } + + return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg, topics) } diff --git a/pkg/frontend/querymiddleware/read_consistency_test.go b/pkg/frontend/querymiddleware/read_consistency_test.go index 805207b8a2..0c68099ed1 100644 --- a/pkg/frontend/querymiddleware/read_consistency_test.go +++ b/pkg/frontend/querymiddleware/read_consistency_test.go @@ -102,8 +102,10 @@ func TestReadConsistencyRoundTripper(t *testing.T) { req = req.WithContext(querierapi.ContextWithReadConsistencyLevel(req.Context(), testData.reqConsistency)) } + offsetsReaders := map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader} + reg := prometheus.NewPedanticRegistry() - rt := newReadConsistencyRoundTripper(downstream, reader, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg)) + rt := newReadConsistencyRoundTripper(downstream, offsetsReaders, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg, offsetsReaders)) _, err = rt.RoundTrip(req) require.NoError(t, err) @@ -130,13 +132,13 @@ func TestReadConsistencyRoundTripper(t *testing.T) { assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="false"} %d - cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="true"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="false"} %d + cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="true"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend"} 0 - `, expectedRequests)), + cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend", topic="%s"} 0 + `, topic, expectedRequests, topic, topic)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index b0c190c210..0aa5d49fa5 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -207,10 +207,10 @@ func NewTripperware( cacheExtractor Extractor, engineOpts promql.EngineOpts, engineExperimentalFunctionsEnabled bool, - ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader, + ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader, registerer prometheus.Registerer, ) (Tripperware, error) { - queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReader, registerer) + queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReaders, registerer) if err != nil { return nil, err } @@ -228,7 +228,7 @@ func newQueryTripperware( cacheExtractor Extractor, engineOpts promql.EngineOpts, engineExperimentalFunctionsEnabled bool, - ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader, + ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader, registerer prometheus.Registerer, ) (Tripperware, error) { // Disable concurrency limits for sharded queries. @@ -281,18 +281,18 @@ func newQueryTripperware( } // Enforce read consistency after caching. - if ingestStorageTopicOffsetsReader != nil { - metrics := newReadConsistencyMetrics(registerer) - - queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReader, limits, log, metrics) - instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReader, limits, log, metrics) - cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReader, limits, log, metrics) - activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReader, limits, log, metrics) - activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReader, limits, log, metrics) - labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReader, limits, log, metrics) - series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReader, limits, log, metrics) - remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReader, limits, log, metrics) - next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReader, limits, log, metrics) + if len(ingestStorageTopicOffsetsReaders) > 0 { + metrics := newReadConsistencyMetrics(registerer, ingestStorageTopicOffsetsReaders) + + queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReaders, limits, log, metrics) + instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReaders, limits, log, metrics) + cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReaders, limits, log, metrics) + activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReaders, limits, log, metrics) + activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReaders, limits, log, metrics) + labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReaders, limits, log, metrics) + series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReaders, limits, log, metrics) + remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReaders, limits, log, metrics) + next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReaders, limits, log, metrics) } // Look up cache as first thing after validation. diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index e967205429..0e829478ae 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -874,7 +874,7 @@ func TestTripperware_ShouldSupportReadConsistencyOffsetsInjection(t *testing.T) Timeout: time.Minute, }, true, - offsetsReader, + map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: offsetsReader}, nil, ) require.NoError(t, err) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 7bcd3eac25..ca8ebfe53d 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -705,40 +705,40 @@ type Mimir struct { ServiceMap map[string]services.Service ModuleManager *modules.Manager - API *api.API - Server *server.Server - IngesterRing *ring.Ring - IngesterPartitionRingWatcher *ring.PartitionRingWatcher - IngesterPartitionInstanceRing *ring.PartitionInstanceRing - TenantLimits validation.TenantLimits - Overrides *validation.Overrides - ActiveGroupsCleanup *util.ActiveGroupsCleanupService - Distributor *distributor.Distributor - Ingester *ingester.Ingester - Flusher *flusher.Flusher - FrontendV1 *frontendv1.Frontend - RuntimeConfig *runtimeconfig.Manager - QuerierQueryable prom_storage.SampleAndChunkQueryable - ExemplarQueryable prom_storage.ExemplarQueryable - AdditionalStorageQueryables []querier.TimeRangeQueryable - MetadataSupplier querier.MetadataSupplier - QuerierEngine promql.QueryEngine - QueryFrontendTripperware querymiddleware.Tripperware - QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader - QueryFrontendCodec querymiddleware.Codec - Ruler *ruler.Ruler - RulerStorage rulestore.RuleStore - Alertmanager *alertmanager.MultitenantAlertmanager - Compactor *compactor.MultitenantCompactor - StoreGateway *storegateway.StoreGateway - MemberlistKV *memberlist.KVInitService - ActivityTracker *activitytracker.ActivityTracker - Vault *vault.Vault - UsageStatsReporter *usagestats.Reporter - BlockBuilder *blockbuilder.BlockBuilder - BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler - ContinuousTestManager *continuoustest.Manager - BuildInfoHandler http.Handler + API *api.API + Server *server.Server + IngesterRing *ring.Ring + IngesterPartitionRingWatcher *ring.PartitionRingWatcher + IngesterPartitionInstanceRing *ring.PartitionInstanceRing + TenantLimits validation.TenantLimits + Overrides *validation.Overrides + ActiveGroupsCleanup *util.ActiveGroupsCleanupService + Distributor *distributor.Distributor + Ingester *ingester.Ingester + Flusher *flusher.Flusher + FrontendV1 *frontendv1.Frontend + RuntimeConfig *runtimeconfig.Manager + QuerierQueryable prom_storage.SampleAndChunkQueryable + ExemplarQueryable prom_storage.ExemplarQueryable + AdditionalStorageQueryables []querier.TimeRangeQueryable + MetadataSupplier querier.MetadataSupplier + QuerierEngine promql.QueryEngine + QueryFrontendTripperware querymiddleware.Tripperware + QueryFrontendTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader + QueryFrontendCodec querymiddleware.Codec + Ruler *ruler.Ruler + RulerStorage rulestore.RuleStore + Alertmanager *alertmanager.MultitenantAlertmanager + Compactor *compactor.MultitenantCompactor + StoreGateway *storegateway.StoreGateway + MemberlistKV *memberlist.KVInitService + ActivityTracker *activitytracker.ActivityTracker + Vault *vault.Vault + UsageStatsReporter *usagestats.Reporter + BlockBuilder *blockbuilder.BlockBuilder + BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler + ContinuousTestManager *continuoustest.Manager + BuildInfoHandler http.Handler } // New makes a new Mimir. diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 39868511b5..a2839280bb 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -50,6 +50,7 @@ import ( "github.com/grafana/mimir/pkg/frontend/transport" "github.com/grafana/mimir/pkg/ingester" "github.com/grafana/mimir/pkg/querier" + querierapi "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/engine" "github.com/grafana/mimir/pkg/querier/tenantfederation" querier_worker "github.com/grafana/mimir/pkg/querier/worker" @@ -70,42 +71,42 @@ import ( // The various modules that make up Mimir. const ( - ActivityTracker string = "activity-tracker" - API string = "api" - SanityCheck string = "sanity-check" - IngesterRing string = "ingester-ring" - IngesterPartitionRing string = "ingester-partitions-ring" - RuntimeConfig string = "runtime-config" - Overrides string = "overrides" - OverridesExporter string = "overrides-exporter" - Server string = "server" - ActiveGroupsCleanupService string = "active-groups-cleanup-service" - Distributor string = "distributor" - DistributorService string = "distributor-service" - Ingester string = "ingester" - IngesterService string = "ingester-service" - Flusher string = "flusher" - Querier string = "querier" - Queryable string = "queryable" - StoreQueryable string = "store-queryable" - QueryFrontend string = "query-frontend" - QueryFrontendCodec string = "query-frontend-codec" - QueryFrontendTripperware string = "query-frontend-tripperware" - QueryFrontendTopicOffsetsReader string = "query-frontend-topic-offsets-reader" - RulerStorage string = "ruler-storage" - Ruler string = "ruler" - AlertManager string = "alertmanager" - Compactor string = "compactor" - StoreGateway string = "store-gateway" - MemberlistKV string = "memberlist-kv" - QueryScheduler string = "query-scheduler" - Vault string = "vault" - TenantFederation string = "tenant-federation" - UsageStats string = "usage-stats" - BlockBuilder string = "block-builder" - BlockBuilderScheduler string = "block-builder-scheduler" - ContinuousTest string = "continuous-test" - All string = "all" + ActivityTracker string = "activity-tracker" + API string = "api" + SanityCheck string = "sanity-check" + IngesterRing string = "ingester-ring" + IngesterPartitionRing string = "ingester-partitions-ring" + RuntimeConfig string = "runtime-config" + Overrides string = "overrides" + OverridesExporter string = "overrides-exporter" + Server string = "server" + ActiveGroupsCleanupService string = "active-groups-cleanup-service" + Distributor string = "distributor" + DistributorService string = "distributor-service" + Ingester string = "ingester" + IngesterService string = "ingester-service" + Flusher string = "flusher" + Querier string = "querier" + Queryable string = "queryable" + StoreQueryable string = "store-queryable" + QueryFrontend string = "query-frontend" + QueryFrontendCodec string = "query-frontend-codec" + QueryFrontendTripperware string = "query-frontend-tripperware" + QueryFrontendTopicOffsetsReaders string = "query-frontend-topic-offsets-reader" + RulerStorage string = "ruler-storage" + Ruler string = "ruler" + AlertManager string = "alertmanager" + Compactor string = "compactor" + StoreGateway string = "store-gateway" + MemberlistKV string = "memberlist-kv" + QueryScheduler string = "query-scheduler" + Vault string = "vault" + TenantFederation string = "tenant-federation" + UsageStats string = "usage-stats" + BlockBuilder string = "block-builder" + BlockBuilderScheduler string = "block-builder-scheduler" + ContinuousTest string = "continuous-test" + All string = "all" // Write Read and Backend are the targets used when using the read-write deployment mode. Write string = "write" @@ -702,9 +703,9 @@ func (t *Mimir) initQueryFrontendCodec() (services.Service, error) { return nil, nil } -// initQueryFrontendTopicOffsetsReader instantiates the topic offsets reader used by the query-frontend +// initQueryFrontendTopicOffsetsReaders instantiates the topic offsets reader used by the query-frontend // when the ingest storage is enabled. -func (t *Mimir) initQueryFrontendTopicOffsetsReader() (services.Service, error) { +func (t *Mimir) initQueryFrontendTopicOffsetsReaders() (services.Service, error) { if !t.Cfg.IngestStorage.Enabled { return nil, nil } @@ -726,8 +727,14 @@ func (t *Mimir) initQueryFrontendTopicOffsetsReader() (services.Service, error) return t.IngesterPartitionRingWatcher.PartitionRing().PartitionIDs(), nil } - t.QueryFrontendTopicOffsetsReader = ingest.NewTopicOffsetsReader(kafkaClient, t.Cfg.IngestStorage.KafkaConfig.Topic, getPartitionIDs, t.Cfg.IngestStorage.KafkaConfig.LastProducedOffsetPollInterval, t.Registerer, util_log.Logger) - return t.QueryFrontendTopicOffsetsReader, nil + ingestTopicOffsetsReader := ingest.NewTopicOffsetsReader(kafkaClient, t.Cfg.IngestStorage.KafkaConfig.Topic, getPartitionIDs, t.Cfg.IngestStorage.KafkaConfig.LastProducedOffsetPollInterval, t.Registerer, util_log.Logger) + + if t.QueryFrontendTopicOffsetsReaders == nil { + t.QueryFrontendTopicOffsetsReaders = make(map[string]*ingest.TopicOffsetsReader) + } + t.QueryFrontendTopicOffsetsReaders[querierapi.ReadConsistencyOffsetsHeader] = ingestTopicOffsetsReader + + return ingestTopicOffsetsReader, nil } // initQueryFrontendTripperware instantiates the tripperware used by the query frontend @@ -745,7 +752,7 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error querymiddleware.PrometheusResponseExtractor{}, engineOpts, engineExperimentalFunctionsEnabled, - t.QueryFrontendTopicOffsetsReader, + t.QueryFrontendTopicOffsetsReaders, t.Registerer, ) if err != nil { @@ -1149,7 +1156,7 @@ func (t *Mimir) setupModuleManager() error { mm.RegisterModule(StoreQueryable, t.initStoreQueryable, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontendCodec, t.initQueryFrontendCodec, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendTripperware, modules.UserInvisibleModule) - mm.RegisterModule(QueryFrontendTopicOffsetsReader, t.initQueryFrontendTopicOffsetsReader, modules.UserInvisibleModule) + mm.RegisterModule(QueryFrontendTopicOffsetsReaders, t.initQueryFrontendTopicOffsetsReaders, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontend, t.initQueryFrontend) mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule) mm.RegisterModule(Ruler, t.initRuler) @@ -1170,39 +1177,39 @@ func (t *Mimir) setupModuleManager() error { // Add dependencies deps := map[string][]string{ - Server: {ActivityTracker, SanityCheck, UsageStats}, - API: {Server}, - MemberlistKV: {API, Vault}, - RuntimeConfig: {API}, - IngesterRing: {API, RuntimeConfig, MemberlistKV, Vault}, - IngesterPartitionRing: {MemberlistKV, IngesterRing, API}, - Overrides: {RuntimeConfig}, - OverridesExporter: {Overrides, MemberlistKV, Vault}, - Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, - DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, - Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, - IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, - Flusher: {Overrides, API}, - Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, - Querier: {TenantFederation, Vault}, - StoreQueryable: {Overrides, MemberlistKV}, - QueryFrontendTripperware: {API, Overrides, QueryFrontendCodec, QueryFrontendTopicOffsetsReader}, - QueryFrontend: {QueryFrontendTripperware, MemberlistKV, Vault}, - QueryFrontendTopicOffsetsReader: {IngesterPartitionRing}, - QueryScheduler: {API, Overrides, MemberlistKV, Vault}, - Ruler: {DistributorService, StoreQueryable, RulerStorage, Vault}, - RulerStorage: {Overrides}, - AlertManager: {API, MemberlistKV, Overrides, Vault}, - Compactor: {API, MemberlistKV, Overrides, Vault}, - StoreGateway: {API, Overrides, MemberlistKV, Vault}, - TenantFederation: {Queryable}, - BlockBuilder: {API, Overrides}, - BlockBuilderScheduler: {API}, - ContinuousTest: {API}, - Write: {Distributor, Ingester}, - Read: {QueryFrontend, Querier}, - Backend: {QueryScheduler, Ruler, StoreGateway, Compactor, AlertManager, OverridesExporter}, - All: {QueryFrontend, Querier, Ingester, Distributor, StoreGateway, Ruler, Compactor}, + Server: {ActivityTracker, SanityCheck, UsageStats}, + API: {Server}, + MemberlistKV: {API, Vault}, + RuntimeConfig: {API}, + IngesterRing: {API, RuntimeConfig, MemberlistKV, Vault}, + IngesterPartitionRing: {MemberlistKV, IngesterRing, API}, + Overrides: {RuntimeConfig}, + OverridesExporter: {Overrides, MemberlistKV, Vault}, + Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, + DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, + Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, + IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, + Flusher: {Overrides, API}, + Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, + Querier: {TenantFederation, Vault}, + StoreQueryable: {Overrides, MemberlistKV}, + QueryFrontendTripperware: {API, Overrides, QueryFrontendCodec, QueryFrontendTopicOffsetsReaders}, + QueryFrontend: {QueryFrontendTripperware, MemberlistKV, Vault}, + QueryFrontendTopicOffsetsReaders: {IngesterPartitionRing}, + QueryScheduler: {API, Overrides, MemberlistKV, Vault}, + Ruler: {DistributorService, StoreQueryable, RulerStorage, Vault}, + RulerStorage: {Overrides}, + AlertManager: {API, MemberlistKV, Overrides, Vault}, + Compactor: {API, MemberlistKV, Overrides, Vault}, + StoreGateway: {API, Overrides, MemberlistKV, Vault}, + TenantFederation: {Queryable}, + BlockBuilder: {API, Overrides}, + BlockBuilderScheduler: {API}, + ContinuousTest: {API}, + Write: {Distributor, Ingester}, + Read: {QueryFrontend, Querier}, + Backend: {QueryScheduler, Ruler, StoreGateway, Compactor, AlertManager, OverridesExporter}, + All: {QueryFrontend, Querier, Ingester, Distributor, StoreGateway, Ruler, Compactor}, } for mod, targets := range deps { if err := mm.AddDependency(mod, targets...); err != nil { diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 34fe74dd89..233f809b0e 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -725,7 +725,7 @@ func TestConcurrentFetchers(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{}) + metrics := newReaderMetrics(partitionID, reg, noopReaderMetricsSource{}, topicName) client := newKafkaProduceClient(t, clusterAddr) @@ -1151,7 +1151,7 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli logger := testingLogger.WithT(t) reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}) + metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}, topic) // This instantiates the fields of kprom. // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. diff --git a/pkg/storage/ingest/partition_offset_reader.go b/pkg/storage/ingest/partition_offset_reader.go index 9a3f43112a..2acd6892ab 100644 --- a/pkg/storage/ingest/partition_offset_reader.go +++ b/pkg/storage/ingest/partition_offset_reader.go @@ -199,3 +199,7 @@ func (p *TopicOffsetsReader) FetchLastProducedOffset(ctx context.Context) (map[i return p.client.FetchPartitionsLastProducedOffsets(ctx, partitionIDs) } + +func (p *TopicOffsetsReader) Topic() string { + return p.topic +} diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 4abe193213..982d896231 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -124,7 +124,7 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri reg: reg, } - r.metrics = newReaderMetrics(partitionID, reg, r) + r.metrics = newReaderMetrics(partitionID, reg, r, kafkaCfg.Topic) r.Service = services.NewBasicService(r.start, r.run, r.stop) return r, nil @@ -761,7 +761,7 @@ func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, of } func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bool, getOffset func(context.Context) (int64, error)) error { - _, err := r.metrics.strongConsistencyInstrumentation.Observe(withOffset, func() (struct{}, error) { + _, err := r.metrics.strongConsistencyInstrumentation.Observe(r.kafkaCfg.Topic, withOffset, func() (struct{}, error) { spanLog := spanlogger.FromContext(ctx, r.logger) spanLog.DebugLog("msg", "waiting for read consistency") @@ -968,7 +968,7 @@ type readerMetricsSource interface { EstimatedBytesPerRecord() int64 } -func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource) readerMetrics { +func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource, topic string) readerMetrics { const component = "partition-reader" receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -1033,7 +1033,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc Help: "How long a consumer spent processing a batch of records from Kafka. This includes retries on server errors.", NativeHistogramBucketFactor: 1.1, }), - strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), + strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg, []string{topic}), lastConsumedOffset: lastConsumedOffset, kprom: NewKafkaReaderClientMetrics(ReaderMetricsPrefix, component, reg), missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -1051,23 +1051,23 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc type StrongReadConsistencyInstrumentation[T any] struct { requests *prometheus.CounterVec - failures prometheus.Counter - latency prometheus.Histogram + failures *prometheus.CounterVec + latency *prometheus.HistogramVec } -func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer) *StrongReadConsistencyInstrumentation[T] { +func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer, topics []string) *StrongReadConsistencyInstrumentation[T] { i := &StrongReadConsistencyInstrumentation[T]{ requests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_strong_consistency_requests_total", Help: "Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.", ConstLabels: map[string]string{"component": component}, - }, []string{"with_offset"}), - failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"with_offset", "topic"}), + failures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_strong_consistency_failures_total", Help: "Total number of failures while waiting for strong consistency to be enforced.", ConstLabels: map[string]string{"component": component}, - }), - latency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{"topic"}), + latency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_strong_consistency_wait_duration_seconds", Help: "How long a request spent waiting for strong consistency to be guaranteed.", NativeHistogramBucketFactor: 1.1, @@ -1075,20 +1075,24 @@ func NewStrongReadConsistencyInstrumentation[T any](component string, reg promet NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.DefBuckets, ConstLabels: map[string]string{"component": component}, - }), + }, []string{"topic"}), } // Init metrics. - for _, value := range []bool{true, false} { - i.requests.WithLabelValues(strconv.FormatBool(value)) + for _, topic := range topics { + for _, value := range []bool{true, false} { + i.requests.WithLabelValues(strconv.FormatBool(value), topic) + } + i.failures.WithLabelValues(topic) + i.latency.WithLabelValues(topic) } return i } -func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f func() (T, error)) (_ T, returnErr error) { +func (i *StrongReadConsistencyInstrumentation[T]) Observe(topic string, withOffset bool, f func() (T, error)) (_ T, returnErr error) { startTime := time.Now() - i.requests.WithLabelValues(strconv.FormatBool(withOffset)).Inc() + i.requests.WithLabelValues(strconv.FormatBool(withOffset), topic).Inc() defer func() { // Do not track failure or latency if the request was canceled (because the tracking would be incorrect). @@ -1098,10 +1102,10 @@ func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f fun // Track latency for failures too, so that we have a better measurement of latency if // backend latency is high and requests fail because of timeouts. - i.latency.Observe(time.Since(startTime).Seconds()) + i.latency.WithLabelValues(topic).Observe(time.Since(startTime).Seconds()) if returnErr != nil { - i.failures.Inc() + i.failures.WithLabelValues(topic).Inc() } }() diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 22cef17a54..c6fc2b896c 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -312,13 +312,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) @@ -362,13 +362,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) @@ -415,13 +415,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) @@ -452,13 +452,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 0 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 0 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) }) @@ -489,13 +489,13 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset. # TYPE cortex_ingest_storage_strong_consistency_requests_total counter - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 1 - cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", with_offset="%t"} 0 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 1 + cortex_ingest_storage_strong_consistency_requests_total{component="partition-reader", topic="%s", with_offset="%t"} 0 # HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced. # TYPE cortex_ingest_storage_strong_consistency_failures_total counter - cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader"} 1 - `, withOffset, !withOffset)), + cortex_ingest_storage_strong_consistency_failures_total{component="partition-reader", topic="%s"} 1 + `, topicName, withOffset, topicName, !withOffset, topicName)), "cortex_ingest_storage_strong_consistency_requests_total", "cortex_ingest_storage_strong_consistency_failures_total")) })