Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache Matchers on ingesters #6382

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3128,6 +3128,10 @@ instance_limits:
# change by changing this option.
# CLI flag: -ingester.disable-chunk-trimming
[disable_chunk_trimming: <boolean> | default = false]

# Maximum number of entries in the matchers cache. 0 to disable.
# CLI flag: -ingester.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]
```

### `ingester_client_config`
Expand Down
5 changes: 3 additions & 2 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,9 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
"-blocks-storage.expanded_postings_cache.head.enabled": "true",
"-blocks-storage.expanded_postings_cache.block.enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
"-ring.store": "consul",
"-ingester.matchers-cache-max-items": "1024",
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
Expand Down
8 changes: 4 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3333,7 +3333,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts
return nil, errFail
}

_, _, matchers, err := client.FromQueryRequest(req)
_, _, matchers, err := client.FromQueryRequest(req, labels.NewMatcher)
if err != nil {
return nil, err
}
Expand All @@ -3359,7 +3359,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
return nil, errFail
}

_, _, matchers, err := client.FromQueryRequest(req)
_, _, matchers, err := client.FromQueryRequest(req, labels.NewMatcher)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3418,7 +3418,7 @@ func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *c
return nil, errFail
}

_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req, labels.NewMatcher)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3450,7 +3450,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
return nil, errFail
}

_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req, labels.NewMatcher)
if err != nil {
return nil, err
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ
}

// FromQueryRequest unpacks a QueryRequest proto.
func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) {
matchers, err := FromLabelMatchers(req.Matchers)
func FromQueryRequest(req *QueryRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (model.Time, model.Time, []*labels.Matcher, error) {
matchers, err := FromLabelMatchers(req.Matchers, newMatcher)
if err != nil {
return 0, 0, nil, err
}
Expand Down Expand Up @@ -55,10 +55,10 @@ func ToExemplarQueryRequest(from, to model.Time, matchers ...[]*labels.Matcher)
}

// FromExemplarQueryRequest unpacks a ExemplarQueryRequest proto.
func FromExemplarQueryRequest(req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) {
func FromExemplarQueryRequest(req *ExemplarQueryRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (int64, int64, [][]*labels.Matcher, error) {
var result [][]*labels.Matcher
for _, m := range req.Matchers {
matchers, err := FromLabelMatchers(m.Matchers)
matchers, err := FromLabelMatchers(m.Matchers, newMatcher)
if err != nil {
return 0, 0, nil, err
}
Expand Down Expand Up @@ -175,10 +175,10 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
}

// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
for _, matchers := range req.MatchersSet {
matchers, err := FromLabelMatchers(matchers.Matchers)
matchers, err := FromLabelMatchers(matchers.Matchers, newMatcher)
if err != nil {
return 0, 0, 0, nil, err
}
Expand Down Expand Up @@ -206,12 +206,12 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit
}

// FromLabelValuesRequest unpacks a LabelValuesRequest proto
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
func FromLabelValuesRequest(req *LabelValuesRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (string, int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
matchers, err = FromLabelMatchers(req.Matchers.Matchers, newMatcher)
if err != nil {
return "", 0, 0, 0, nil, err
}
Expand All @@ -236,12 +236,12 @@ func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matc
}

// FromLabelNamesRequest unpacks a LabelNamesRequest proto
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
func FromLabelNamesRequest(req *LabelNamesRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
matchers, err = FromLabelMatchers(req.Matchers.Matchers, newMatcher)
if err != nil {
return 0, 0, 0, nil, err
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
return result, nil
}

func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
func FromLabelMatchers(matchers []*LabelMatcher, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) ([]*labels.Matcher, error) {
result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers {
var mtype labels.MatchType
Expand All @@ -291,7 +291,7 @@ func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
default:
return nil, fmt.Errorf("invalid matcher type")
}
matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value)
matcher, err := newMatcher(mtype, matcher.Name, matcher.Value)
if err != nil {
return nil, err
}
Expand Down
95 changes: 58 additions & 37 deletions pkg/ingester/client/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,74 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util"
)

func TestQueryRequest(t *testing.T) {
from, to := model.Time(int64(0)), model.Time(int64(10))
matchers := []*labels.Matcher{}
matcher1, err := labels.NewMatcher(labels.MatchEqual, "foo", "1")
if err != nil {
t.Fatal(err)
c, err := util.NewMatcherCache(1024)
require.NoError(t, err)

tc := map[string]struct {
newMatcherFunc func(t labels.MatchType, n, v string) (*labels.Matcher, error)
}{
"no cache": {
newMatcherFunc: labels.NewMatcher,
},
"cache": {
newMatcherFunc: c.GetMatcher,
},
}
matchers = append(matchers, matcher1)

matcher2, err := labels.NewMatcher(labels.MatchNotEqual, "bar", "2")
if err != nil {
t.Fatal(err)
}
matchers = append(matchers, matcher2)
for name, tc := range tc {
t.Run(name, func(t *testing.T) {
from, to := model.Time(int64(0)), model.Time(int64(10))
matchers := []*labels.Matcher{}
matcher1, err := labels.NewMatcher(labels.MatchEqual, "foo", "1")
if err != nil {
t.Fatal(err)
}
matchers = append(matchers, matcher1)

matcher3, err := labels.NewMatcher(labels.MatchRegexp, "baz", "3")
if err != nil {
t.Fatal(err)
}
matchers = append(matchers, matcher3)
matcher2, err := labels.NewMatcher(labels.MatchNotEqual, "bar", "2")
if err != nil {
t.Fatal(err)
}
matchers = append(matchers, matcher2)

matcher4, err := labels.NewMatcher(labels.MatchNotRegexp, "bop", "4")
if err != nil {
t.Fatal(err)
}
matchers = append(matchers, matcher4)
matcher3, err := labels.NewMatcher(labels.MatchRegexp, "baz", "3")
if err != nil {
t.Fatal(err)
}
matchers = append(matchers, matcher3)

req, err := ToQueryRequest(from, to, matchers)
if err != nil {
t.Fatal(err)
}
matcher4, err := labels.NewMatcher(labels.MatchNotRegexp, "bop", "4")
if err != nil {
t.Fatal(err)
}
matchers = append(matchers, matcher4)

haveFrom, haveTo, haveMatchers, err := FromQueryRequest(req)
if err != nil {
t.Fatal(err)
}
req, err := ToQueryRequest(from, to, matchers)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(haveFrom, from) {
t.Fatalf("Bad from FromQueryRequest(ToQueryRequest) round trip")
}
if !reflect.DeepEqual(haveTo, to) {
t.Fatalf("Bad to FromQueryRequest(ToQueryRequest) round trip")
}
if !matchersEqual(haveMatchers, matchers) {
t.Fatalf("Bad have FromQueryRequest(ToQueryRequest) round trip - %v != %v", haveMatchers, matchers)
haveFrom, haveTo, haveMatchers, err := FromQueryRequest(req, tc.newMatcherFunc)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(haveFrom, from) {
t.Fatalf("Bad from FromQueryRequest(ToQueryRequest) round trip")
}
if !reflect.DeepEqual(haveTo, to) {
t.Fatalf("Bad to FromQueryRequest(ToQueryRequest) round trip")
}
if !matchersEqual(haveMatchers, matchers) {
t.Fatalf("Bad have FromQueryRequest(ToQueryRequest) round trip - %v != %v", haveMatchers, matchers)
}
})
}
}

Expand Down
25 changes: 20 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ type Config struct {
// When disabled, the result may contain samples outside the queried time range but Select() performances
// may be improved.
DisableChunkTrimming bool `yaml:"disable_chunk_trimming"`

MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -170,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")

f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the matchers cache. 0 to disable.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -240,6 +243,8 @@ type Ingester struct {
maxInflightQueryRequests util_math.MaxTracker

expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory

newMatcherFunc func(t labels.MatchType, n, v string) (*labels.Matcher, error)
}

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -698,12 +703,22 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
return nil, errors.Wrap(err, "failed to create the bucket client")
}

newMatcherFunc := labels.NewMatcher
if cfg.MatchersCacheMaxItems > 0 {
matcherCache, err := util.NewMatcherCache(1024)
if err != nil {
return nil, errors.Wrap(err, "failed to create matcher cache")
}
newMatcherFunc = matcherCache.GetMatcher
}

i := &Ingester{
cfg: cfg,
limits: limits,
usersMetadata: map[string]*userMetricsMetadata{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
newMatcherFunc: newMatcherFunc,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache),
}
Expand Down Expand Up @@ -1448,7 +1463,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

from, through, matchers, err := client.FromExemplarQueryRequest(req)
from, through, matchers, err := client.FromExemplarQueryRequest(req, i.newMatcherFunc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1538,7 +1553,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
return nil, cleanup, err
}

labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(req)
labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(req, i.newMatcherFunc)
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -1628,7 +1643,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return nil, cleanup, err
}

startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req)
startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req, i.newMatcherFunc)
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -1727,7 +1742,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
}

// Parse the request
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req, i.newMatcherFunc)
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -1946,7 +1961,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return err
}

from, through, matchers, err := client.FromQueryRequest(req)
from, through, matchers, err := client.FromQueryRequest(req, i.newMatcherFunc)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func defaultIngesterTestConfig(t testing.TB) Config {
cfg.LifecyclerConfig.FinalSleep = 0
cfg.ActiveSeriesMetricsEnabled = true
cfg.LabelsStringInterningEnabled = true
cfg.MatchersCacheMaxItems = 1024
return cfg
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -34,7 +35,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler {
errors := make(chan error)
for i, qr := range req.Queries {
go func(i int, qr *client.QueryRequest) {
from, to, matchers, err := client.FromQueryRequest(qr)
from, to, matchers, err := client.FromQueryRequest(qr, labels.NewMatcher)
if err != nil {
errors <- err
return
Expand Down
Loading
Loading