Skip to content

Commit

Permalink
Trying to make the cache more reusable introducing interface
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka committed Dec 9, 2024
1 parent 07a5c89 commit b2b65a2
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 32 deletions.
5 changes: 2 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/wlog"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand Down
10 changes: 4 additions & 6 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ import (
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/cache/matcher_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@ func TestMatchersCache(t *testing.T) {
cache, err := storecache.NewMatchersCache(storecache.WithSize(2))
testutil.Ok(t, err)

matcher := storepb.LabelMatcher{
matcher := &storepb.LabelMatcher{
Type: storepb.LabelMatcher_EQ,
Name: "key",
Value: "val",
}

matcher2 := storepb.LabelMatcher{
matcher2 := &storepb.LabelMatcher{
Type: storepb.LabelMatcher_RE,
Name: "key2",
Value: "val2|val3",
}

matcher3 := storepb.LabelMatcher{
matcher3 := &storepb.LabelMatcher{
Type: storepb.LabelMatcher_EQ,
Name: "key3",
Value: "val3",
}

var cacheHit bool
newItem := func(matcher storepb.LabelMatcher) (*labels.Matcher, error) {
newItem := func(matcher storepb.ConversionLabelMatcher) (*labels.Matcher, error) {
cacheHit = false
return storepb.MatcherToPromMatcher(matcher)
}
Expand Down Expand Up @@ -92,15 +92,15 @@ func BenchmarkMatchersCache(b *testing.B) {
b.Fatalf("failed to create cache: %v", err)
}

matchers := []storepb.LabelMatcher{
matchers := []*storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "key1", Value: "val1"},
{Type: storepb.LabelMatcher_EQ, Name: "key2", Value: "val2"},
{Type: storepb.LabelMatcher_EQ, Name: "key3", Value: "val3"},
{Type: storepb.LabelMatcher_EQ, Name: "key4", Value: "val4"},
{Type: storepb.LabelMatcher_RE, Name: "key5", Value: "^(val5|val6|val7|val8|val9).*$"},
}

newItem := func(matcher storepb.LabelMatcher) (*labels.Matcher, error) {
newItem := func(matcher storepb.ConversionLabelMatcher) (*labels.Matcher, error) {
return storepb.MatcherToPromMatcher(matcher)
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/store/cache/matchers_cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package storecache

import (
"github.com/hashicorp/golang-lru/v2"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -12,12 +12,12 @@ import (

const DefaultCacheSize = 200

type NewItemFunc func(matcher storepb.LabelMatcher) (*labels.Matcher, error)
type NewItemFunc func(matcher storepb.ConversionLabelMatcher) (*labels.Matcher, error)

type MatchersCache interface {
// GetOrSet retrieves a matcher from cache or creates and stores it if not present.
// If the matcher is not in cache, it uses the provided newItem function to create it.
GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
GetOrSet(key storepb.ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
}

// Ensure implementations satisfy the interface.
Expand All @@ -35,14 +35,14 @@ func NewNoopMatcherCache() MatchersCache {
}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
func (n *NoopMatcherCache) GetOrSet(key storepb.ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem(key)
}

// LruMatchersCache implements MatchersCache with an LRU cache and metrics.
type LruMatchersCache struct {
reg prometheus.Registerer
cache *lru.Cache[storepb.LabelMatcher, *labels.Matcher]
cache *lru.Cache[storepb.ConversionLabelMatcher, *labels.Matcher]
metrics *matcherCacheMetrics
size int
sf singleflight.Group
Expand Down Expand Up @@ -73,7 +73,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
}
cache.metrics = newMatcherCacheMetrics(cache.reg)

lruCache, err := lru.NewWithEvict[storepb.LabelMatcher, *labels.Matcher](cache.size, cache.onEvict)
lruCache, err := lru.NewWithEvict[storepb.ConversionLabelMatcher, *labels.Matcher](cache.size, cache.onEvict)
if err != nil {
return nil, err
}
Expand All @@ -82,7 +82,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
return cache, nil
}

func (c *LruMatchersCache) GetOrSet(key storepb.LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
func (c *LruMatchersCache) GetOrSet(key storepb.ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
c.metrics.requestsTotal.Inc()
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
Expand Down Expand Up @@ -110,7 +110,7 @@ func (c *LruMatchersCache) GetOrSet(key storepb.LabelMatcher, newItem NewItemFun
return v.(*labels.Matcher), nil
}

func (c *LruMatchersCache) onEvict(_ storepb.LabelMatcher, _ *labels.Matcher) {
func (c *LruMatchersCache) onEvict(_ storepb.ConversionLabelMatcher, _ *labels.Matcher) {
c.metrics.evicted.Inc()
c.metrics.numItems.Set(float64(c.cache.Len()))
}
Expand Down Expand Up @@ -153,8 +153,8 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics {
// NOTE: It (can) allocate memory.
func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
pm, err := cache.GetOrSet(m, storepb.MatcherToPromMatcher)
for i := range ms {
pm, err := cache.GetOrSet(&ms[i], storepb.MatcherToPromMatcher)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/cache"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down
1 change: 0 additions & 1 deletion pkg/store/matcher_cache.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/cache"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/cache"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
Expand Down
42 changes: 39 additions & 3 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/codes"

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

var PartialResponseStrategyValues = func() []string {
Expand Down Expand Up @@ -385,8 +386,8 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) {
// NOTE: It allocates memory.
func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
pm, err := MatcherToPromMatcher(m)
for i := range ms {
pm, err := MatcherToPromMatcher(&ms[i])
if err != nil {
return nil, err
}
Expand All @@ -396,7 +397,11 @@ func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
}

// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher.
func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) {
func MatcherToPromMatcher(mi ConversionLabelMatcher) (*labels.Matcher, error) {
var m, ok = mi.(*LabelMatcher)
if !ok {
return nil, errors.Errorf("unexpected matcher type %T", mi)
}
var t labels.MatchType

switch m.Type {
Expand Down Expand Up @@ -444,6 +449,37 @@ func (m *LabelMatcher) PromString() string {
return fmt.Sprintf("%s%s%q", m.Name, m.Type.PromString(), m.Value)
}

func (m *LabelMatcher) GetName() string {
return m.Name
}

func (m *LabelMatcher) GetValue() string {
return m.Value
}

func (m *LabelMatcher) GetType() prompb.LabelMatcher_Type {
switch m.Type {
case LabelMatcher_EQ:
return prompb.LabelMatcher_EQ
case LabelMatcher_NEQ:
return prompb.LabelMatcher_NEQ
case LabelMatcher_RE:
return prompb.LabelMatcher_RE
case LabelMatcher_NRE:
return prompb.LabelMatcher_NRE
default:
return prompb.LabelMatcher_EQ
}
}

// ConversionLabelMatcher is a common interface for the Prometheus and Thanos label matchers.
type ConversionLabelMatcher interface {
String() string
GetName() string
GetType() prompb.LabelMatcher_Type
GetValue() string
}

func (x LabelMatcher_Type) PromString() string {
typeToStr := map[LabelMatcher_Type]string{
LabelMatcher_EQ: "=",
Expand Down

0 comments on commit b2b65a2

Please sign in to comment.