Skip to content

Commit

Permalink
query-frontend: Allow for multiple topics in readConsistencyRoundTrip…
Browse files Browse the repository at this point in the history
…per (#10220)

* query-frontend: Allow for multiple topics in readConsistencyRoundTripper

* Add topic label to StrongReadConsistencyInstrumentation metrics

* Update CHANGELOG.md

* Remove unused PartitionReader.Topic() method

* Avoid overwriting QueryFrontendTopicOffsetsReaders map if already populated
  • Loading branch information
leizor authored Dec 20, 2024
1 parent f2217e9 commit 79c32ab
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 189 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## main / unreleased

* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220

### Grafana Mimir

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
Expand Down
65 changes: 46 additions & 19 deletions pkg/frontend/querymiddleware/read_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package querymiddleware

import (
"net/http"
"sync"

"github.com/go-kit/log"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

apierror "github.com/grafana/mimir/pkg/api/error"
querierapi "github.com/grafana/mimir/pkg/querier/api"
Expand All @@ -19,19 +21,21 @@ import (
type readConsistencyRoundTripper struct {
next http.RoundTripper

offsetsReader *ingest.TopicOffsetsReader
limits Limits
logger log.Logger
metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]
// offsetsReaders is a map of offsets readers keyed by the request header the offsets get attached to.
offsetsReaders map[string]*ingest.TopicOffsetsReader

limits Limits
logger log.Logger
metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]
}

func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReader *ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper {
func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReaders map[string]*ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper {
return &readConsistencyRoundTripper{
next: next,
limits: limits,
logger: logger,
offsetsReader: offsetsReader,
metrics: metrics,
next: next,
offsetsReaders: offsetsReaders,
limits: limits,
logger: logger,
metrics: metrics,
}
}

Expand All @@ -57,15 +61,32 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp
return r.next.RoundTrip(req)
}

// Fetch last produced offsets.
offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) {
return r.offsetsReader.WaitNextFetchLastProducedOffset(ctx)
})
if err != nil {
return nil, errors.Wrap(err, "wait for last produced offsets")
errGroup, ctx := errgroup.WithContext(ctx)
reqHeaderLock := &sync.Mutex{}

for headerKey, offsetsReader := range r.offsetsReaders {
headerKey := headerKey
offsetsReader := offsetsReader

errGroup.Go(func() error {
offsets, err := r.metrics.Observe(offsetsReader.Topic(), false, func() (map[int32]int64, error) {
return offsetsReader.WaitNextFetchLastProducedOffset(ctx)
})
if err != nil {
return errors.Wrapf(err, "wait for last produced offsets of topic '%s'", offsetsReader.Topic())
}

reqHeaderLock.Lock()
req.Header.Add(headerKey, string(querierapi.EncodeOffsets(offsets)))
reqHeaderLock.Unlock()

return nil
})
}

req.Header.Add(querierapi.ReadConsistencyOffsetsHeader, string(querierapi.EncodeOffsets(offsets)))
if err = errGroup.Wait(); err != nil {
return nil, err
}

return r.next.RoundTrip(req)
}
Expand All @@ -82,7 +103,13 @@ func getDefaultReadConsistency(tenantIDs []string, limits Limits) string {
return querierapi.ReadConsistencyEventual
}

func newReadConsistencyMetrics(reg prometheus.Registerer) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] {
func newReadConsistencyMetrics(reg prometheus.Registerer, offsetsReaders map[string]*ingest.TopicOffsetsReader) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] {
const component = "query-frontend"
return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg)

topics := make([]string, 0, len(offsetsReaders))
for _, r := range offsetsReaders {
topics = append(topics, r.Topic())
}

return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg, topics)
}
12 changes: 7 additions & 5 deletions pkg/frontend/querymiddleware/read_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ func TestReadConsistencyRoundTripper(t *testing.T) {
req = req.WithContext(querierapi.ContextWithReadConsistencyLevel(req.Context(), testData.reqConsistency))
}

offsetsReaders := map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader}

reg := prometheus.NewPedanticRegistry()
rt := newReadConsistencyRoundTripper(downstream, reader, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg))
rt := newReadConsistencyRoundTripper(downstream, offsetsReaders, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg, offsetsReaders))
_, err = rt.RoundTrip(req)
require.NoError(t, err)

Expand All @@ -130,13 +132,13 @@ func TestReadConsistencyRoundTripper(t *testing.T) {
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_strong_consistency_requests_total Total number of requests for which strong consistency has been requested. The metric distinguishes between requests with an offset specified and requests requesting to enforce strong consistency up until the last produced offset.
# TYPE cortex_ingest_storage_strong_consistency_requests_total counter
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="false"} %d
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", with_offset="true"} 0
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="false"} %d
cortex_ingest_storage_strong_consistency_requests_total{component="query-frontend", topic="%s", with_offset="true"} 0
# HELP cortex_ingest_storage_strong_consistency_failures_total Total number of failures while waiting for strong consistency to be enforced.
# TYPE cortex_ingest_storage_strong_consistency_failures_total counter
cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend"} 0
`, expectedRequests)),
cortex_ingest_storage_strong_consistency_failures_total{component="query-frontend", topic="%s"} 0
`, topic, expectedRequests, topic, topic)),
"cortex_ingest_storage_strong_consistency_requests_total",
"cortex_ingest_storage_strong_consistency_failures_total"))
})
Expand Down
30 changes: 15 additions & 15 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ func NewTripperware(
cacheExtractor Extractor,
engineOpts promql.EngineOpts,
engineExperimentalFunctionsEnabled bool,
ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader,
ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader,
registerer prometheus.Registerer,
) (Tripperware, error) {
queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReader, registerer)
queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReaders, registerer)
if err != nil {
return nil, err
}
Expand All @@ -228,7 +228,7 @@ func newQueryTripperware(
cacheExtractor Extractor,
engineOpts promql.EngineOpts,
engineExperimentalFunctionsEnabled bool,
ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader,
ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader,
registerer prometheus.Registerer,
) (Tripperware, error) {
// Disable concurrency limits for sharded queries.
Expand Down Expand Up @@ -281,18 +281,18 @@ func newQueryTripperware(
}

// Enforce read consistency after caching.
if ingestStorageTopicOffsetsReader != nil {
metrics := newReadConsistencyMetrics(registerer)

queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReader, limits, log, metrics)
instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReader, limits, log, metrics)
cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReader, limits, log, metrics)
activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReader, limits, log, metrics)
activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReader, limits, log, metrics)
labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReader, limits, log, metrics)
series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReader, limits, log, metrics)
remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReader, limits, log, metrics)
next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReader, limits, log, metrics)
if len(ingestStorageTopicOffsetsReaders) > 0 {
metrics := newReadConsistencyMetrics(registerer, ingestStorageTopicOffsetsReaders)

queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReaders, limits, log, metrics)
instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReaders, limits, log, metrics)
cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReaders, limits, log, metrics)
activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReaders, limits, log, metrics)
activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReaders, limits, log, metrics)
labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReaders, limits, log, metrics)
series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReaders, limits, log, metrics)
remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReaders, limits, log, metrics)
next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReaders, limits, log, metrics)
}

// Look up cache as first thing after validation.
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func TestTripperware_ShouldSupportReadConsistencyOffsetsInjection(t *testing.T)
Timeout: time.Minute,
},
true,
offsetsReader,
map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: offsetsReader},
nil,
)
require.NoError(t, err)
Expand Down
68 changes: 34 additions & 34 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,40 +705,40 @@ type Mimir struct {
ServiceMap map[string]services.Service
ModuleManager *modules.Manager

API *api.API
Server *server.Server
IngesterRing *ring.Ring
IngesterPartitionRingWatcher *ring.PartitionRingWatcher
IngesterPartitionInstanceRing *ring.PartitionInstanceRing
TenantLimits validation.TenantLimits
Overrides *validation.Overrides
ActiveGroupsCleanup *util.ActiveGroupsCleanupService
Distributor *distributor.Distributor
Ingester *ingester.Ingester
Flusher *flusher.Flusher
FrontendV1 *frontendv1.Frontend
RuntimeConfig *runtimeconfig.Manager
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
AdditionalStorageQueryables []querier.TimeRangeQueryable
MetadataSupplier querier.MetadataSupplier
QuerierEngine promql.QueryEngine
QueryFrontendTripperware querymiddleware.Tripperware
QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
RulerStorage rulestore.RuleStore
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.MultitenantCompactor
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInitService
ActivityTracker *activitytracker.ActivityTracker
Vault *vault.Vault
UsageStatsReporter *usagestats.Reporter
BlockBuilder *blockbuilder.BlockBuilder
BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler
ContinuousTestManager *continuoustest.Manager
BuildInfoHandler http.Handler
API *api.API
Server *server.Server
IngesterRing *ring.Ring
IngesterPartitionRingWatcher *ring.PartitionRingWatcher
IngesterPartitionInstanceRing *ring.PartitionInstanceRing
TenantLimits validation.TenantLimits
Overrides *validation.Overrides
ActiveGroupsCleanup *util.ActiveGroupsCleanupService
Distributor *distributor.Distributor
Ingester *ingester.Ingester
Flusher *flusher.Flusher
FrontendV1 *frontendv1.Frontend
RuntimeConfig *runtimeconfig.Manager
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
AdditionalStorageQueryables []querier.TimeRangeQueryable
MetadataSupplier querier.MetadataSupplier
QuerierEngine promql.QueryEngine
QueryFrontendTripperware querymiddleware.Tripperware
QueryFrontendTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
RulerStorage rulestore.RuleStore
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.MultitenantCompactor
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInitService
ActivityTracker *activitytracker.ActivityTracker
Vault *vault.Vault
UsageStatsReporter *usagestats.Reporter
BlockBuilder *blockbuilder.BlockBuilder
BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler
ContinuousTestManager *continuoustest.Manager
BuildInfoHandler http.Handler
}

// New makes a new Mimir.
Expand Down
Loading

0 comments on commit 79c32ab

Please sign in to comment.