From cb16559291fd0040224ff09d7f9b0ea81ef72f2d Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 5 Dec 2024 12:03:40 +0100 Subject: [PATCH] Introduce interface for matchers cache Signed-off-by: Pedro Tanaka --- cmd/thanos/receive.go | 2 +- pkg/receive/multitsdb.go | 4 +-- pkg/store/local.go | 2 +- pkg/store/prometheus.go | 15 +++++------ pkg/store/proxy.go | 5 ++-- pkg/store/proxy_test.go | 3 +++ pkg/store/storepb/custom.go | 4 +-- pkg/store/storepb/matcher_cache.go | 42 ++++++++++++++++++++++++------ pkg/store/tsdb.go | 5 ++-- 9 files changed, 55 insertions(+), 27 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e0105bfc73..57c62b19df 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -226,7 +226,7 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } - var cache *storepb.MatchersCache + var cache = storepb.NewNoopMatcherCache() if conf.matcherCacheSize > 0 { cache, err = storepb.NewMatchersCache(storepb.WithSize(conf.matcherCacheSize), storepb.WithPromRegistry(reg)) if err != nil { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 706592adae..91f226ccde 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -64,7 +64,7 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig - cache *storepb.MatchersCache + cache storepb.MatchersCache tsdbClients []store.Client exemplarClients map[string]*exemplars.TSDB @@ -97,7 +97,7 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption { } } -func WithMatchersCache(cache *storepb.MatchersCache) MultiTSDBOption { +func WithMatchersCache(cache storepb.MatchersCache) MultiTSDBOption { return func(s *MultiTSDB) { s.cache = cache } diff --git a/pkg/store/local.go b/pkg/store/local.go index c9f05609b4..4d63d2d64a 100644 --- a/pkg/store/local.go +++ b/pkg/store/local.go @@ -130,7 +130,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storepb.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 7cf7e2941e..ba326b4e98 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -125,7 +125,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -488,16 +488,13 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que // matchesExternalLabels returns false if given matchers are not matching external labels. // If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. -func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache *storepb.MatchersCache) (bool, []*labels.Matcher, error) { +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storepb.MatchersCache) (bool, []*labels.Matcher, error) { var ( tms []*labels.Matcher err error ) - if cache != nil { - tms, err = storepb.MatchersToPromMatchersCached(cache, ms...) - } else { - tms, err = storepb.MatchersToPromMatchers(ms...) - } + + tms, err = storepb.MatchersToPromMatchersCached(cache, ms...) if err != nil { return false, nil, err } @@ -545,7 +542,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -608,7 +605,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 76b4488fc8..6d414f6922 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -89,7 +89,7 @@ type ProxyStore struct { retrievalStrategy RetrievalStrategy debugLogging bool tsdbSelector *TSDBSelector - matcherCache *storepb.MatchersCache + matcherCache storepb.MatchersCache enableDedup bool } @@ -139,7 +139,7 @@ func WithoutDedup() ProxyStoreOption { } // WithMatcherCache sets the matcher cache instance for the proxy. -func WithMatcherCache(cache *storepb.MatchersCache) ProxyStoreOption { +func WithMatcherCache(cache storepb.MatchersCache) ProxyStoreOption { return func(s *ProxyStore) { s.matcherCache = cache } @@ -176,6 +176,7 @@ func NewProxyStore( retrievalStrategy: retrievalStrategy, tsdbSelector: DefaultSelector, enableDedup: true, + matcherCache: storepb.NewNoopMatcherCache(), } for _, option := range options { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 937926aaff..7d51972977 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2174,6 +2174,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { responseTimeout: 5 * time.Second, retrievalStrategy: EagerRetrieval, tsdbSelector: DefaultSelector, + matcherCache: storepb.NewNoopMatcherCache(), } var allResps []*storepb.SeriesResponse @@ -2310,6 +2311,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, + matcherCache: storepb.NewNoopMatcherCache(), } ctx, cancel := context.WithCancel(context.Background()) @@ -2347,6 +2349,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, + matcherCache: storepb.NewNoopMatcherCache(), } ctx := context.Background() diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 2b6c872acd..f0d7b9dd60 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -397,8 +397,8 @@ func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { // MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. // Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. -// NOTE: It allocates memory. -func MatchersToPromMatchersCached(cache *MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) { +// NOTE: It (can) allocate memory. +func MatchersToPromMatchersCached(cache MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) for _, m := range ms { pm, err := cache.GetOrSet(m, MatcherToPromMatcher) diff --git a/pkg/store/storepb/matcher_cache.go b/pkg/store/storepb/matcher_cache.go index 5d77d73711..cc77cdca1d 100644 --- a/pkg/store/storepb/matcher_cache.go +++ b/pkg/store/storepb/matcher_cache.go @@ -15,7 +15,33 @@ const DefaultCacheSize = 200 type NewItemFunc func(matcher LabelMatcher) (*labels.Matcher, error) -type MatchersCache struct { +type MatchersCache interface { + // GetOrSet retrieves a matcher from cache or creates and stores it if not present. + // If the matcher is not in cache, it uses the provided newItem function to create it. + GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) +} + +// Ensure implementations satisfy the interface +var ( + _ MatchersCache = (*LruMatchersCache)(nil) + _ MatchersCache = (*NoopMatcherCache)(nil) +) + +// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything. +type NoopMatcherCache struct{} + +// NewNoopMatcherCache creates a new no-op matcher cache. +func NewNoopMatcherCache() MatchersCache { + return &NoopMatcherCache{} +} + +// GetOrSet implements MatchersCache by always creating a new matcher without caching. +func (n *NoopMatcherCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + return newItem(key) +} + +// LruMatchersCache implements MatchersCache with an LRU cache and metrics. +type LruMatchersCache struct { reg prometheus.Registerer cache *lru.Cache[LabelMatcher, *labels.Matcher] metrics *matcherCacheMetrics @@ -23,22 +49,22 @@ type MatchersCache struct { sf singleflight.Group } -type MatcherCacheOption func(*MatchersCache) +type MatcherCacheOption func(*LruMatchersCache) func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption { - return func(c *MatchersCache) { + return func(c *LruMatchersCache) { c.reg = reg } } func WithSize(size int) MatcherCacheOption { - return func(c *MatchersCache) { + return func(c *LruMatchersCache) { c.size = size } } -func NewMatchersCache(opts ...MatcherCacheOption) (*MatchersCache, error) { - cache := &MatchersCache{ +func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { + cache := &LruMatchersCache{ reg: prometheus.NewRegistry(), size: DefaultCacheSize, } @@ -57,7 +83,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*MatchersCache, error) { return cache, nil } -func (c *MatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { +func (c *LruMatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { c.metrics.requestsTotal.Inc() if item, ok := c.cache.Get(key); ok { c.metrics.hitsTotal.Inc() @@ -85,7 +111,7 @@ func (c *MatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels return v.(*labels.Matcher), nil } -func (c *MatchersCache) onEvict(_ LabelMatcher, _ *labels.Matcher) { +func (c *LruMatchersCache) onEvict(_ LabelMatcher, _ *labels.Matcher) { c.metrics.evicted.Inc() c.metrics.numItems.Set(float64(c.cache.Len())) } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 790413b69a..f0b13615bb 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -53,7 +53,7 @@ func WithCuckooMetricNameStoreFilter() TSDBStoreOption { } } -func WithMatcherCacheInstance(cache *storepb.MatchersCache) TSDBStoreOption { +func WithMatcherCacheInstance(cache storepb.MatchersCache) TSDBStoreOption { return func(s *TSDBStore) { s.matcherCache = cache } @@ -68,7 +68,7 @@ type TSDBStore struct { component component.StoreAPI buffers sync.Pool maxBytesPerFrame int - matcherCache *storepb.MatchersCache + matcherCache storepb.MatchersCache extLset labels.Labels startStoreFilterUpdate bool @@ -119,6 +119,7 @@ func NewTSDBStore( b := make([]byte, 0, initialBufSize) return &b }}, + matcherCache: storepb.NewNoopMatcherCache(), } for _, option := range options {