Skip to content

Commit

Permalink
Introduce interface for matchers cache
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka committed Dec 6, 2024
1 parent 67a2e88 commit cb16559
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache *storepb.MatchersCache
var cache = storepb.NewNoopMatcherCache()
if conf.matcherCacheSize > 0 {
cache, err = storepb.NewMatchersCache(storepb.WithSize(conf.matcherCacheSize), storepb.WithPromRegistry(reg))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

cache *storepb.MatchersCache
cache storepb.MatchersCache

tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB
Expand Down Expand Up @@ -97,7 +97,7 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption {
}
}

func WithMatchersCache(cache *storepb.MatchersCache) MultiTSDBOption {
func WithMatchersCache(cache storepb.MatchersCache) MultiTSDBOption {
return func(s *MultiTSDB) {
s.cache = cache
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil)
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storepb.NewNoopMatcherCache())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -488,16 +488,13 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que

// matchesExternalLabels returns false if given matchers are not matching external labels.
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache *storepb.MatchersCache) (bool, []*labels.Matcher, error) {
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storepb.MatchersCache) (bool, []*labels.Matcher, error) {
var (
tms []*labels.Matcher
err error
)
if cache != nil {
tms, err = storepb.MatchersToPromMatchersCached(cache, ms...)
} else {
tms, err = storepb.MatchersToPromMatchers(ms...)
}

tms, err = storepb.MatchersToPromMatchersCached(cache, ms...)
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -545,7 +542,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -608,7 +605,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storepb.NewNoopMatcherCache())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
matcherCache *storepb.MatchersCache
matcherCache storepb.MatchersCache
enableDedup bool
}

Expand Down Expand Up @@ -139,7 +139,7 @@ func WithoutDedup() ProxyStoreOption {
}

// WithMatcherCache sets the matcher cache instance for the proxy.
func WithMatcherCache(cache *storepb.MatchersCache) ProxyStoreOption {
func WithMatcherCache(cache storepb.MatchersCache) ProxyStoreOption {
return func(s *ProxyStore) {
s.matcherCache = cache
}
Expand Down Expand Up @@ -176,6 +176,7 @@ func NewProxyStore(
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
enableDedup: true,
matcherCache: storepb.NewNoopMatcherCache(),
}

for _, option := range options {
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
responseTimeout: 5 * time.Second,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewNoopMatcherCache(),
}

var allResps []*storepb.SeriesResponse
Expand Down Expand Up @@ -2310,6 +2311,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewNoopMatcherCache(),
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -2347,6 +2349,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewNoopMatcherCache(),
}

ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {

// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers.
// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions.
// NOTE: It allocates memory.
func MatchersToPromMatchersCached(cache *MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) {
// NOTE: It (can) allocate memory.
func MatchersToPromMatchersCached(cache MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
pm, err := cache.GetOrSet(m, MatcherToPromMatcher)
Expand Down
42 changes: 34 additions & 8 deletions pkg/store/storepb/matcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,56 @@ const DefaultCacheSize = 200

type NewItemFunc func(matcher LabelMatcher) (*labels.Matcher, error)

type MatchersCache struct {
type MatchersCache interface {
// GetOrSet retrieves a matcher from cache or creates and stores it if not present.
// If the matcher is not in cache, it uses the provided newItem function to create it.
GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
}

// Ensure implementations satisfy the interface
var (
_ MatchersCache = (*LruMatchersCache)(nil)
_ MatchersCache = (*NoopMatcherCache)(nil)
)

// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything.
type NoopMatcherCache struct{}

// NewNoopMatcherCache creates a new no-op matcher cache.
func NewNoopMatcherCache() MatchersCache {
return &NoopMatcherCache{}
}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem(key)
}

// LruMatchersCache implements MatchersCache with an LRU cache and metrics.
type LruMatchersCache struct {
reg prometheus.Registerer
cache *lru.Cache[LabelMatcher, *labels.Matcher]
metrics *matcherCacheMetrics
size int
sf singleflight.Group
}

type MatcherCacheOption func(*MatchersCache)
type MatcherCacheOption func(*LruMatchersCache)

func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption {
return func(c *MatchersCache) {
return func(c *LruMatchersCache) {
c.reg = reg
}
}

func WithSize(size int) MatcherCacheOption {
return func(c *MatchersCache) {
return func(c *LruMatchersCache) {
c.size = size
}
}

func NewMatchersCache(opts ...MatcherCacheOption) (*MatchersCache, error) {
cache := &MatchersCache{
func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
cache := &LruMatchersCache{
reg: prometheus.NewRegistry(),
size: DefaultCacheSize,
}
Expand All @@ -57,7 +83,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*MatchersCache, error) {
return cache, nil
}

func (c *MatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
func (c *LruMatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
c.metrics.requestsTotal.Inc()
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
Expand Down Expand Up @@ -85,7 +111,7 @@ func (c *MatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels
return v.(*labels.Matcher), nil
}

func (c *MatchersCache) onEvict(_ LabelMatcher, _ *labels.Matcher) {
func (c *LruMatchersCache) onEvict(_ LabelMatcher, _ *labels.Matcher) {
c.metrics.evicted.Inc()
c.metrics.numItems.Set(float64(c.cache.Len()))
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func WithCuckooMetricNameStoreFilter() TSDBStoreOption {
}
}

func WithMatcherCacheInstance(cache *storepb.MatchersCache) TSDBStoreOption {
func WithMatcherCacheInstance(cache storepb.MatchersCache) TSDBStoreOption {
return func(s *TSDBStore) {
s.matcherCache = cache
}
Expand All @@ -68,7 +68,7 @@ type TSDBStore struct {
component component.StoreAPI
buffers sync.Pool
maxBytesPerFrame int
matcherCache *storepb.MatchersCache
matcherCache storepb.MatchersCache

extLset labels.Labels
startStoreFilterUpdate bool
Expand Down Expand Up @@ -119,6 +119,7 @@ func NewTSDBStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
matcherCache: storepb.NewNoopMatcherCache(),
}

for _, option := range options {
Expand Down

0 comments on commit cb16559

Please sign in to comment.