Skip to content

Commit

Permalink
query-frontend: Allow for multiple topics in readConsistencyRoundTripper
Browse files Browse the repository at this point in the history
  • Loading branch information
leizor committed Dec 11, 2024
1 parent 5086788 commit 4caad1c
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 141 deletions.
55 changes: 38 additions & 17 deletions pkg/frontend/querymiddleware/read_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package querymiddleware

import (
"net/http"
"sync"

"github.com/go-kit/log"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

apierror "github.com/grafana/mimir/pkg/api/error"
querierapi "github.com/grafana/mimir/pkg/querier/api"
Expand All @@ -19,19 +21,21 @@ import (
type readConsistencyRoundTripper struct {
next http.RoundTripper

offsetsReader *ingest.TopicOffsetsReader
limits Limits
logger log.Logger
metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]
// offsetsReaders is a map of offsets readers keyed by the request header the offsets get attached to.
offsetsReaders map[string]*ingest.TopicOffsetsReader

limits Limits
logger log.Logger
metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]
}

func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReader *ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper {
func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReaders map[string]*ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper {
return &readConsistencyRoundTripper{
next: next,
limits: limits,
logger: logger,
offsetsReader: offsetsReader,
metrics: metrics,
next: next,
offsetsReaders: offsetsReaders,
limits: limits,
logger: logger,
metrics: metrics,
}
}

Expand All @@ -57,15 +61,32 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp
return r.next.RoundTrip(req)
}

// Fetch last produced offsets.
offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) {
return r.offsetsReader.WaitNextFetchLastProducedOffset(ctx)
})
if err != nil {
return nil, errors.Wrap(err, "wait for last produced offsets")
errGroup, ctx := errgroup.WithContext(ctx)
reqHeaderLock := &sync.Mutex{}

for headerKey, offsetsReader := range r.offsetsReaders {
headerKey := headerKey
offsetsReader := offsetsReader

errGroup.Go(func() error {
offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) {
return offsetsReader.WaitNextFetchLastProducedOffset(ctx)
})
if err != nil {
return errors.Wrapf(err, "wait for last produced offsets (%s)", headerKey)
}

reqHeaderLock.Lock()
req.Header.Add(headerKey, string(querierapi.EncodeOffsets(offsets)))
reqHeaderLock.Unlock()

return nil
})
}

req.Header.Add(querierapi.ReadConsistencyOffsetsHeader, string(querierapi.EncodeOffsets(offsets)))
if err = errGroup.Wait(); err != nil {
return nil, err
}

return r.next.RoundTrip(req)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/read_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestReadConsistencyRoundTripper(t *testing.T) {
}

reg := prometheus.NewPedanticRegistry()
rt := newReadConsistencyRoundTripper(downstream, reader, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg))
rt := newReadConsistencyRoundTripper(downstream, map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: reader}, testData.limits, log.NewNopLogger(), newReadConsistencyMetrics(reg))
_, err = rt.RoundTrip(req)
require.NoError(t, err)

Expand Down
26 changes: 13 additions & 13 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ func NewTripperware(
cacheExtractor Extractor,
engineOpts promql.EngineOpts,
engineExperimentalFunctionsEnabled bool,
ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader,
ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader,
registerer prometheus.Registerer,
) (Tripperware, error) {
queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReader, registerer)
queryRangeTripperware, err := newQueryTripperware(cfg, log, limits, codec, cacheExtractor, engineOpts, engineExperimentalFunctionsEnabled, ingestStorageTopicOffsetsReaders, registerer)
if err != nil {
return nil, err
}
Expand All @@ -228,7 +228,7 @@ func newQueryTripperware(
cacheExtractor Extractor,
engineOpts promql.EngineOpts,
engineExperimentalFunctionsEnabled bool,
ingestStorageTopicOffsetsReader *ingest.TopicOffsetsReader,
ingestStorageTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader,
registerer prometheus.Registerer,
) (Tripperware, error) {
// Disable concurrency limits for sharded queries.
Expand Down Expand Up @@ -281,18 +281,18 @@ func newQueryTripperware(
}

// Enforce read consistency after caching.
if ingestStorageTopicOffsetsReader != nil {
if len(ingestStorageTopicOffsetsReaders) > 0 {
metrics := newReadConsistencyMetrics(registerer)

queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReader, limits, log, metrics)
instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReader, limits, log, metrics)
cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReader, limits, log, metrics)
activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReader, limits, log, metrics)
activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReader, limits, log, metrics)
labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReader, limits, log, metrics)
series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReader, limits, log, metrics)
remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReader, limits, log, metrics)
next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReader, limits, log, metrics)
queryrange = newReadConsistencyRoundTripper(queryrange, ingestStorageTopicOffsetsReaders, limits, log, metrics)
instant = newReadConsistencyRoundTripper(instant, ingestStorageTopicOffsetsReaders, limits, log, metrics)
cardinality = newReadConsistencyRoundTripper(cardinality, ingestStorageTopicOffsetsReaders, limits, log, metrics)
activeSeries = newReadConsistencyRoundTripper(activeSeries, ingestStorageTopicOffsetsReaders, limits, log, metrics)
activeNativeHistogramMetrics = newReadConsistencyRoundTripper(activeNativeHistogramMetrics, ingestStorageTopicOffsetsReaders, limits, log, metrics)
labels = newReadConsistencyRoundTripper(labels, ingestStorageTopicOffsetsReaders, limits, log, metrics)
series = newReadConsistencyRoundTripper(series, ingestStorageTopicOffsetsReaders, limits, log, metrics)
remoteRead = newReadConsistencyRoundTripper(remoteRead, ingestStorageTopicOffsetsReaders, limits, log, metrics)
next = newReadConsistencyRoundTripper(next, ingestStorageTopicOffsetsReaders, limits, log, metrics)
}

// Look up cache as first thing after validation.
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func TestTripperware_ShouldSupportReadConsistencyOffsetsInjection(t *testing.T)
Timeout: time.Minute,
},
true,
offsetsReader,
map[string]*ingest.TopicOffsetsReader{querierapi.ReadConsistencyOffsetsHeader: offsetsReader},
nil,
)
require.NoError(t, err)
Expand Down
68 changes: 34 additions & 34 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,40 +705,40 @@ type Mimir struct {
ServiceMap map[string]services.Service
ModuleManager *modules.Manager

API *api.API
Server *server.Server
IngesterRing *ring.Ring
IngesterPartitionRingWatcher *ring.PartitionRingWatcher
IngesterPartitionInstanceRing *ring.PartitionInstanceRing
TenantLimits validation.TenantLimits
Overrides *validation.Overrides
ActiveGroupsCleanup *util.ActiveGroupsCleanupService
Distributor *distributor.Distributor
Ingester *ingester.Ingester
Flusher *flusher.Flusher
FrontendV1 *frontendv1.Frontend
RuntimeConfig *runtimeconfig.Manager
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
AdditionalStorageQueryables []querier.TimeRangeQueryable
MetadataSupplier querier.MetadataSupplier
QuerierEngine promql.QueryEngine
QueryFrontendTripperware querymiddleware.Tripperware
QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
RulerStorage rulestore.RuleStore
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.MultitenantCompactor
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInitService
ActivityTracker *activitytracker.ActivityTracker
Vault *vault.Vault
UsageStatsReporter *usagestats.Reporter
BlockBuilder *blockbuilder.BlockBuilder
BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler
ContinuousTestManager *continuoustest.Manager
BuildInfoHandler http.Handler
API *api.API
Server *server.Server
IngesterRing *ring.Ring
IngesterPartitionRingWatcher *ring.PartitionRingWatcher
IngesterPartitionInstanceRing *ring.PartitionInstanceRing
TenantLimits validation.TenantLimits
Overrides *validation.Overrides
ActiveGroupsCleanup *util.ActiveGroupsCleanupService
Distributor *distributor.Distributor
Ingester *ingester.Ingester
Flusher *flusher.Flusher
FrontendV1 *frontendv1.Frontend
RuntimeConfig *runtimeconfig.Manager
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
AdditionalStorageQueryables []querier.TimeRangeQueryable
MetadataSupplier querier.MetadataSupplier
QuerierEngine promql.QueryEngine
QueryFrontendTripperware querymiddleware.Tripperware
QueryFrontendTopicOffsetsReaders map[string]*ingest.TopicOffsetsReader
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
RulerStorage rulestore.RuleStore
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.MultitenantCompactor
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInitService
ActivityTracker *activitytracker.ActivityTracker
Vault *vault.Vault
UsageStatsReporter *usagestats.Reporter
BlockBuilder *blockbuilder.BlockBuilder
BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler
ContinuousTestManager *continuoustest.Manager
BuildInfoHandler http.Handler
}

// New makes a new Mimir.
Expand Down
Loading

0 comments on commit 4caad1c

Please sign in to comment.