From a08f18a0c5c6119a37be604910eef056cfbdada5 Mon Sep 17 00:00:00 2001 From: alanprot Date: Wed, 27 Nov 2024 15:53:43 -0800 Subject: [PATCH] Cache Matchers on ingesters Signed-off-by: alanprot --- docs/configuration/config-file-reference.md | 4 + integration/query_fuzz_test.go | 5 +- pkg/distributor/distributor_test.go | 8 +- pkg/ingester/client/compat.go | 24 +++--- pkg/ingester/client/compat_test.go | 95 +++++++++++++-------- pkg/ingester/ingester.go | 25 ++++-- pkg/ingester/lifecycle_test.go | 1 + pkg/querier/remote_read.go | 3 +- pkg/util/matchers.go | 55 +++++++++--- 9 files changed, 145 insertions(+), 75 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 67ae897e02..f006582559 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3128,6 +3128,10 @@ instance_limits: # change by changing this option. # CLI flag: -ingester.disable-chunk-trimming [disable_chunk_trimming: | default = false] + +# Maximum number of entries in the matchers cache. 0 to disable. +# CLI flag: -ingester.matchers-cache-max-items +[matchers_cache_max_items: | default = 0] ``` ### `ingester_client_config` diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 3f7e47c3ae..b64ce4ef1c 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -373,8 +373,9 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { "-blocks-storage.expanded_postings_cache.head.enabled": "true", "-blocks-storage.expanded_postings_cache.block.enabled": "true", // Ingester. - "-ring.store": "consul", - "-consul.hostname": consul2.NetworkHTTPEndpoint(), + "-ring.store": "consul", + "-ingester.matchers-cache-max-items": "1024", + "-consul.hostname": consul2.NetworkHTTPEndpoint(), // Distributor. "-distributor.replication-factor": "1", // Store-gateway. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3681d13b21..0f6807ae82 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -3333,7 +3333,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts return nil, errFail } - _, _, matchers, err := client.FromQueryRequest(req) + _, _, matchers, err := client.FromQueryRequest(req, labels.NewMatcher) if err != nil { return nil, err } @@ -3359,7 +3359,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest return nil, errFail } - _, _, matchers, err := client.FromQueryRequest(req) + _, _, matchers, err := client.FromQueryRequest(req, labels.NewMatcher) if err != nil { return nil, err } @@ -3418,7 +3418,7 @@ func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *c return nil, errFail } - _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req) + _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req, labels.NewMatcher) if err != nil { return nil, err } @@ -3450,7 +3450,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client. return nil, errFail } - _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req) + _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req, labels.NewMatcher) if err != nil { return nil, err } diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index 1a8e178641..292abca509 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -26,8 +26,8 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ } // FromQueryRequest unpacks a QueryRequest proto. -func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) { - matchers, err := FromLabelMatchers(req.Matchers) +func FromQueryRequest(req *QueryRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (model.Time, model.Time, []*labels.Matcher, error) { + matchers, err := FromLabelMatchers(req.Matchers, newMatcher) if err != nil { return 0, 0, nil, err } @@ -55,10 +55,10 @@ func ToExemplarQueryRequest(from, to model.Time, matchers ...[]*labels.Matcher) } // FromExemplarQueryRequest unpacks a ExemplarQueryRequest proto. -func FromExemplarQueryRequest(req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) { +func FromExemplarQueryRequest(req *ExemplarQueryRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (int64, int64, [][]*labels.Matcher, error) { var result [][]*labels.Matcher for _, m := range req.Matchers { - matchers, err := FromLabelMatchers(m.Matchers) + matchers, err := FromLabelMatchers(m.Matchers, newMatcher) if err != nil { return 0, 0, nil, err } @@ -175,10 +175,10 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) { } // FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto -func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) { +func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (model.Time, model.Time, int, [][]*labels.Matcher, error) { matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet)) for _, matchers := range req.MatchersSet { - matchers, err := FromLabelMatchers(matchers.Matchers) + matchers, err := FromLabelMatchers(matchers.Matchers, newMatcher) if err != nil { return 0, 0, 0, nil, err } @@ -206,12 +206,12 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit } // FromLabelValuesRequest unpacks a LabelValuesRequest proto -func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) { +func FromLabelValuesRequest(req *LabelValuesRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (string, int64, int64, int, []*labels.Matcher, error) { var err error var matchers []*labels.Matcher if req.Matchers != nil { - matchers, err = FromLabelMatchers(req.Matchers.Matchers) + matchers, err = FromLabelMatchers(req.Matchers.Matchers, newMatcher) if err != nil { return "", 0, 0, 0, nil, err } @@ -236,12 +236,12 @@ func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matc } // FromLabelNamesRequest unpacks a LabelNamesRequest proto -func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) { +func FromLabelNamesRequest(req *LabelNamesRequest, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) (int64, int64, int, []*labels.Matcher, error) { var err error var matchers []*labels.Matcher if req.Matchers != nil { - matchers, err = FromLabelMatchers(req.Matchers.Matchers) + matchers, err = FromLabelMatchers(req.Matchers.Matchers, newMatcher) if err != nil { return 0, 0, 0, nil, err } @@ -275,7 +275,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) { return result, nil } -func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { +func FromLabelMatchers(matchers []*LabelMatcher, newMatcher func(t labels.MatchType, n, v string) (*labels.Matcher, error)) ([]*labels.Matcher, error) { result := make([]*labels.Matcher, 0, len(matchers)) for _, matcher := range matchers { var mtype labels.MatchType @@ -291,7 +291,7 @@ func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { default: return nil, fmt.Errorf("invalid matcher type") } - matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value) + matcher, err := newMatcher(mtype, matcher.Name, matcher.Value) if err != nil { return nil, err } diff --git a/pkg/ingester/client/compat_test.go b/pkg/ingester/client/compat_test.go index c9467abd0a..477f248e5d 100644 --- a/pkg/ingester/client/compat_test.go +++ b/pkg/ingester/client/compat_test.go @@ -7,53 +7,74 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util" ) func TestQueryRequest(t *testing.T) { - from, to := model.Time(int64(0)), model.Time(int64(10)) - matchers := []*labels.Matcher{} - matcher1, err := labels.NewMatcher(labels.MatchEqual, "foo", "1") - if err != nil { - t.Fatal(err) + c, err := util.NewMatcherCache(1024) + require.NoError(t, err) + + tc := map[string]struct { + newMatcherFunc func(t labels.MatchType, n, v string) (*labels.Matcher, error) + }{ + "no cache": { + newMatcherFunc: labels.NewMatcher, + }, + "cache": { + newMatcherFunc: c.GetMatcher, + }, } - matchers = append(matchers, matcher1) - matcher2, err := labels.NewMatcher(labels.MatchNotEqual, "bar", "2") - if err != nil { - t.Fatal(err) - } - matchers = append(matchers, matcher2) + for name, tc := range tc { + t.Run(name, func(t *testing.T) { + from, to := model.Time(int64(0)), model.Time(int64(10)) + matchers := []*labels.Matcher{} + matcher1, err := labels.NewMatcher(labels.MatchEqual, "foo", "1") + if err != nil { + t.Fatal(err) + } + matchers = append(matchers, matcher1) - matcher3, err := labels.NewMatcher(labels.MatchRegexp, "baz", "3") - if err != nil { - t.Fatal(err) - } - matchers = append(matchers, matcher3) + matcher2, err := labels.NewMatcher(labels.MatchNotEqual, "bar", "2") + if err != nil { + t.Fatal(err) + } + matchers = append(matchers, matcher2) - matcher4, err := labels.NewMatcher(labels.MatchNotRegexp, "bop", "4") - if err != nil { - t.Fatal(err) - } - matchers = append(matchers, matcher4) + matcher3, err := labels.NewMatcher(labels.MatchRegexp, "baz", "3") + if err != nil { + t.Fatal(err) + } + matchers = append(matchers, matcher3) - req, err := ToQueryRequest(from, to, matchers) - if err != nil { - t.Fatal(err) - } + matcher4, err := labels.NewMatcher(labels.MatchNotRegexp, "bop", "4") + if err != nil { + t.Fatal(err) + } + matchers = append(matchers, matcher4) - haveFrom, haveTo, haveMatchers, err := FromQueryRequest(req) - if err != nil { - t.Fatal(err) - } + req, err := ToQueryRequest(from, to, matchers) + if err != nil { + t.Fatal(err) + } - if !reflect.DeepEqual(haveFrom, from) { - t.Fatalf("Bad from FromQueryRequest(ToQueryRequest) round trip") - } - if !reflect.DeepEqual(haveTo, to) { - t.Fatalf("Bad to FromQueryRequest(ToQueryRequest) round trip") - } - if !matchersEqual(haveMatchers, matchers) { - t.Fatalf("Bad have FromQueryRequest(ToQueryRequest) round trip - %v != %v", haveMatchers, matchers) + haveFrom, haveTo, haveMatchers, err := FromQueryRequest(req, tc.newMatcherFunc) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(haveFrom, from) { + t.Fatalf("Bad from FromQueryRequest(ToQueryRequest) round trip") + } + if !reflect.DeepEqual(haveTo, to) { + t.Fatalf("Bad to FromQueryRequest(ToQueryRequest) round trip") + } + if !matchersEqual(haveMatchers, matchers) { + t.Fatalf("Bad have FromQueryRequest(ToQueryRequest) round trip - %v != %v", haveMatchers, matchers) + } + }) } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e6fb3b9838..e98c30df2b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -142,6 +142,8 @@ type Config struct { // When disabled, the result may contain samples outside the queried time range but Select() performances // may be improved. DisableChunkTrimming bool `yaml:"disable_chunk_trimming"` + + MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -170,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.") f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.") + f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the matchers cache. 0 to disable.") } func (cfg *Config) Validate() error { @@ -240,6 +243,8 @@ type Ingester struct { maxInflightQueryRequests util_math.MaxTracker expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory + + newMatcherFunc func(t labels.MatchType, n, v string) (*labels.Matcher, error) } // Shipper interface is used to have an easy way to mock it in tests. @@ -698,12 +703,22 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe return nil, errors.Wrap(err, "failed to create the bucket client") } + newMatcherFunc := labels.NewMatcher + if cfg.MatchersCacheMaxItems > 0 { + matcherCache, err := util.NewMatcherCache(1024) + if err != nil { + return nil, errors.Wrap(err, "failed to create matcher cache") + } + newMatcherFunc = matcherCache.GetMatcher + } + i := &Ingester{ cfg: cfg, limits: limits, usersMetadata: map[string]*userMetricsMetadata{}, TSDBState: newTSDBState(bucketClient, registerer), logger: logger, + newMatcherFunc: newMatcherFunc, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache), } @@ -1448,7 +1463,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } - from, through, matchers, err := client.FromExemplarQueryRequest(req) + from, through, matchers, err := client.FromExemplarQueryRequest(req, i.newMatcherFunc) if err != nil { return nil, err } @@ -1538,7 +1553,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu return nil, cleanup, err } - labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(req) + labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(req, i.newMatcherFunc) if err != nil { return nil, cleanup, err } @@ -1628,7 +1643,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR return nil, cleanup, err } - startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req) + startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req, i.newMatcherFunc) if err != nil { return nil, cleanup, err } @@ -1727,7 +1742,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien } // Parse the request - _, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req) + _, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req, i.newMatcherFunc) if err != nil { return nil, cleanup, err } @@ -1946,7 +1961,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } - from, through, matchers, err := client.FromQueryRequest(req) + from, through, matchers, err := client.FromQueryRequest(req, i.newMatcherFunc) if err != nil { return err } diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index efa739b426..4fab7d716e 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -43,6 +43,7 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg.LifecyclerConfig.FinalSleep = 0 cfg.ActiveSeriesMetricsEnabled = true cfg.LabelsStringInterningEnabled = true + cfg.MatchersCacheMaxItems = 1024 return cfg } diff --git a/pkg/querier/remote_read.go b/pkg/querier/remote_read.go index a7e86b96e5..cc30e43bde 100644 --- a/pkg/querier/remote_read.go +++ b/pkg/querier/remote_read.go @@ -5,6 +5,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -34,7 +35,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler { errors := make(chan error) for i, qr := range req.Queries { go func(i int, qr *client.QueryRequest) { - from, to, matchers, err := client.FromQueryRequest(qr) + from, to, matchers, err := client.FromQueryRequest(qr, labels.NewMatcher) if err != nil { errors <- err return diff --git a/pkg/util/matchers.go b/pkg/util/matchers.go index d0ee099b54..b09a68deaa 100644 --- a/pkg/util/matchers.go +++ b/pkg/util/matchers.go @@ -1,23 +1,50 @@ package util import ( + "strings" + + lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/prometheus/model/labels" ) -// SplitFiltersAndMatchers splits empty matchers off, which are treated as filters, see #220 -func SplitFiltersAndMatchers(allMatchers []*labels.Matcher) (filters, matchers []*labels.Matcher) { - for _, matcher := range allMatchers { - // If a matcher matches "", we need to fetch possible chunks where - // there is no value and will therefore not be in our label index. - // e.g. {foo=""} and {foo!="bar"} both match "", so we need to return - // chunks which do not have a foo label set. When looking entries in - // the index, we should ignore this matcher to fetch all possible chunks - // and then filter on the matcher after the chunks have been fetched. - if matcher.Matches("") { - filters = append(filters, matcher) - } else { - matchers = append(matchers, matcher) +type MatcherCache struct { + lru *lru.Cache[string, *labels.Matcher] +} + +func NewMatcherCache(size int) (*MatcherCache, error) { + l, err := lru.New[string, *labels.Matcher](size) + return &MatcherCache{ + lru: l, + }, err +} + +func (c *MatcherCache) GetMatcher(t labels.MatchType, n, v string) (*labels.Matcher, error) { + switch t { + // let only cache regex matchers + case labels.MatchEqual, labels.MatchNotRegexp: + k := cacheKey(t, n, v) + if m, ok := c.lru.Get(k); ok { + return m, nil } + m, err := labels.NewMatcher(t, n, v) + if err != nil { + c.lru.Add(k, m) + } + return m, err + default: + return labels.NewMatcher(t, n, v) } - return +} + +func cacheKey(t labels.MatchType, n, v string) string { + const ( + typeLen = 2 + ) + + sb := strings.Builder{} + sb.Grow(typeLen + len(n) + len(v)) + sb.WriteString(n) + sb.WriteString(t.String()) + sb.WriteString(v) + return sb.String() }