diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 969e2baa1d9..3b991d757a3 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -493,9 +493,12 @@ func runQuery( dns.ResolverType(dnsSDResolver), ) - options := []store.ProxyStoreOption{} + options := []store.StoreOption[store.ProxyStore]{ + store.WithLogger[store.ProxyStore](logger), + store.WithRegistry[store.ProxyStore](reg), + } if debugLogging { - options = append(options, store.WithProxyStoreDebugLogging()) + options = append(options, store.WithDebugLogging[store.ProxyStore]()) } var ( @@ -549,7 +552,7 @@ func runQuery( endpointInfoTimeout, queryConnMetricLabels..., ) - proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) + proxy = store.NewProxyStore(endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients) metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5b699d264dc..14427f0726c 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -315,14 +315,15 @@ func runReceive( return errors.Wrap(err, "setup gRPC server") } - options := []store.ProxyStoreOption{} + options := []store.StoreOption[store.ProxyStore]{ + store.WithLogger[store.ProxyStore](logger), + store.WithRegistry[store.ProxyStore](reg), + } if debugLogging { - options = append(options, store.WithProxyStoreDebugLogging()) + options = append(options, store.WithDebugLogging[store.ProxyStore]()) } proxy := store.NewProxyStore( - logger, - reg, dbs.TSDBLocalClients, comp, labels.Labels{}, diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index ceadf1159cd..0654cf61417 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -634,7 +634,7 @@ func runRule( } infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()} if tsdbDB != nil { - tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset) + tsdbStore := store.NewTSDBStore(tsdbDB, component.Rule, conf.lset, store.WithLogger[store.TSDBStore](logger)) infoOptions = append( infoOptions, info.WithLabelSetFunc(func() []labelpb.ZLabelSet { @@ -654,6 +654,7 @@ func runRule( return nil }), ) + storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger))) } diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 27cf759b2ab..7e26c38fe77 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -249,7 +249,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) + promStore, err := store.NewPrometheusStore(c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, store.WithLogger[store.PrometheusStore](logger), store.WithRegistry[store.PrometheusStore](reg)) if err != nil { return errors.Wrap(err, "create Prometheus store") } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 73e4b838ddc..0660b717adc 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -363,9 +363,9 @@ func runStore( return errors.Wrap(err, "create chunk pool") } - options := []store.BucketStoreOption{ - store.WithLogger(logger), - store.WithRegistry(reg), + options := []store.StoreOption[store.BucketStore]{ + store.WithLogger[store.BucketStore](logger), + store.WithRegistry[store.BucketStore](reg), store.WithIndexCache(indexCache), store.WithQueryGate(queriesGate), store.WithChunkPool(chunkPool), @@ -390,7 +390,7 @@ func runStore( } if conf.debugLogging { - options = append(options, store.WithDebugLogging()) + options = append(options, store.WithDebugLogging[store.BucketStore]()) } bs, err := store.NewBucketStore( diff --git a/pkg/api/query/grpc_test.go b/pkg/api/query/grpc_test.go index 208efb2e6c2..20206974a79 100644 --- a/pkg/api/query/grpc_test.go +++ b/pkg/api/query/grpc_test.go @@ -25,7 +25,7 @@ import ( func TestGRPCQueryAPIErrorHandling(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() - proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval) + proxy := store.NewProxyStore(func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval, store.WithLogger[store.ProxyStore](logger), store.WithRegistry[store.ProxyStore](reg)) queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute) lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute } tests := []struct { diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index d4191f2e8c6..e3f0fa7047a 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -631,13 +631,11 @@ func TestQueryEndpoints(t *testing.T) { func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore { c := &storetestutil.TestClient{ Name: "1", - StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil), 0), + StoreClient: storepb.ServerAsClient(store.NewTSDBStore(db, component.Query, nil), 0), MinTime: math.MinInt64, MaxTime: math.MaxInt64, } return store.NewProxyStore( - nil, - nil, func() []store.Client { return []store.Client{c} }, component.Query, nil, diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index ea005e8eda1..3c1c5fe56c4 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -848,8 +848,6 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore { } return store.NewProxyStore( - nil, - nil, func() []store.Client { return cls }, component.Query, nil, diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 35949377e81..7cfa1a02f57 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -36,7 +36,7 @@ func TestQuerier_Proxy(t *testing.T) { q := NewQueryableCreator( logger, nil, - store.NewProxyStore(logger, nil, func() []store.Client { return clients }, + store.NewProxyStore(func() []store.Client { return clients }, component.Debug, nil, 5*time.Minute, store.EagerRetrieval), 1000000, 5*time.Minute, @@ -51,7 +51,7 @@ func TestQuerier_Proxy(t *testing.T) { // TODO(bwplotka): Parse external labels. clients = append(clients, &storetestutil.TestClient{ Name: fmt.Sprintf("store number %v", i), - StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0), + StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0), MinTime: st.mint, MaxTime: st.maxt, }) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 509e9f35351..eecbbb44b9a 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -596,7 +596,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.hashFunc, ) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset)) + tenant.set(store.NewTSDBStore(s, component.Receive, lset, store.WithLogger[store.TSDBStore](logger)), s, ship, exemplars.NewTSDB(s, lset)) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 58241cd5a05..bb597c13321 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -687,7 +687,7 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim } func queryLabelValues(ctx context.Context, m *MultiTSDB) error { - proxy := store.NewProxyStore(nil, nil, func() []store.Client { + proxy := store.NewProxyStore(func() []store.Client { clients := m.TSDBLocalClients() if len(clients) > 0 { clients[0] = &slowClient{clients[0]} diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index c22c27bf3c1..32bb9937411 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -823,7 +823,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) { version, err := promclient.NewDefaultClient().BuildVersion(context.Background(), u) testutil.Ok(tt, err) - promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + promStore, err := NewPrometheusStore(promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, func() string { return version }) @@ -841,7 +841,7 @@ func TestTSDBStore_Acceptance(t *testing.T) { testutil.Ok(tt, err) tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) - tsdbStore := NewTSDBStore(nil, db, component.Rule, extLset) + tsdbStore := NewTSDBStore(db, component.Rule, extLset) appendFn(db.Appender(context.Background())) return tsdbStore diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 54150d178a0..d65c4f467b3 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -338,7 +338,7 @@ type BlockEstimator func(meta metadata.Meta) uint64 // When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller. type BucketStore struct { logger log.Logger - reg prometheus.Registerer // TODO(metalmatze) remove and add via BucketStoreOption + reg prometheus.Registerer // TODO(metalmatze) remove and add via Option metrics *bucketStoreMetrics bkt objstore.InstrumentedBucketReader fetcher block.MetadataFetcher @@ -414,84 +414,60 @@ func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage. return map[storage.SeriesRef][]byte{}, ids } -// BucketStoreOption are functions that configure BucketStore. -type BucketStoreOption func(s *BucketStore) - -// WithLogger sets the BucketStore logger to the one you pass. -func WithLogger(logger log.Logger) BucketStoreOption { - return func(s *BucketStore) { - s.logger = logger - } -} - -// WithRegistry sets a registry that BucketStore uses to register metrics with. -func WithRegistry(reg prometheus.Registerer) BucketStoreOption { - return func(s *BucketStore) { - s.reg = reg - } -} - // WithIndexCache sets a indexCache to use instead of a noopCache. -func WithIndexCache(cache storecache.IndexCache) BucketStoreOption { +func WithIndexCache(cache storecache.IndexCache) StoreOption[BucketStore] { return func(s *BucketStore) { s.indexCache = cache } } // WithQueryGate sets a queryGate to use instead of a noopGate. -func WithQueryGate(queryGate gate.Gate) BucketStoreOption { +func WithQueryGate(queryGate gate.Gate) StoreOption[BucketStore] { return func(s *BucketStore) { s.queryGate = queryGate } } // WithChunkPool sets a pool.Bytes to use for chunks. -func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption { +func WithChunkPool(chunkPool pool.Bytes) StoreOption[BucketStore] { return func(s *BucketStore) { s.chunkPool = chunkPool } } // WithFilterConfig sets a filter which Store uses for filtering metrics based on time. -func WithFilterConfig(filter *FilterConfig) BucketStoreOption { +func WithFilterConfig(filter *FilterConfig) StoreOption[BucketStore] { return func(s *BucketStore) { s.filterConfig = filter } } -// WithDebugLogging enables debug logging. -func WithDebugLogging() BucketStoreOption { - return func(s *BucketStore) { - s.debugLogging = true - } -} - -func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption { +func WithChunkHashCalculation(enableChunkHashCalculation bool) StoreOption[BucketStore] { return func(s *BucketStore) { s.enableChunkHashCalculation = enableChunkHashCalculation } } -func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption { +func WithSeriesBatchSize(seriesBatchSize int) StoreOption[BucketStore] { return func(s *BucketStore) { s.seriesBatchSize = seriesBatchSize } } -func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) BucketStoreOption { +func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) StoreOption[BucketStore] { return func(s *BucketStore) { s.blockEstimatedMaxSeriesFunc = f } } -func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { +func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) StoreOption[BucketStore] { return func(s *BucketStore) { s.blockEstimatedMaxChunkFunc = f } } // WithLazyExpandedPostings enables lazy expanded postings. -func WithLazyExpandedPostings(enabled bool) BucketStoreOption { +func WithLazyExpandedPostings(enabled bool) StoreOption[BucketStore] { return func(s *BucketStore) { s.enabledLazyExpandedPostings = enabled } @@ -513,7 +489,7 @@ func NewBucketStore( enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. lazyIndexReaderEnabled bool, lazyIndexReaderIdleTimeout time.Duration, - options ...BucketStoreOption, + options ...StoreOption[BucketStore], ) (*BucketStore, error) { s := &BucketStore{ logger: log.NewNopLogger(), diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index ebd1ffa7095..ec46d9714e1 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -174,10 +174,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, true, time.Minute, - WithLogger(s.logger), + WithLogger[BucketStore](s.logger), WithIndexCache(s.cache), WithFilterConfig(filterConf), - WithRegistry(reg), + WithRegistry[BucketStore](reg), ) testutil.Ok(t, err) defer func() { testutil.Ok(t, store.Close()) }() diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 846fea2e981..4920d352b47 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -902,7 +902,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul false, false, 0, - WithLogger(logger), + WithLogger[BucketStore](logger), WithFilterConfig(allowAllFilterConf), ) testutil.Ok(t, err) @@ -1416,7 +1416,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, false, false, 0, - WithLogger(logger), + WithLogger[BucketStore](logger), WithChunkPool(chunkPool), WithLazyExpandedPostings(lazyExpandedPostings), ) @@ -1864,7 +1864,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { true, false, 0, - WithLogger(logger), + WithLogger[BucketStore](logger), WithIndexCache(indexCache), ) testutil.Ok(tb, err) @@ -1955,7 +1955,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { true, false, 0, - WithLogger(logger), + WithLogger[BucketStore](logger), WithIndexCache(indexCache), ) testutil.Ok(tb, err) @@ -2113,7 +2113,7 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { true, false, 0, - WithLogger(logger), + WithLogger[BucketStore](logger), WithIndexCache(indexCache), ) testutil.Ok(tb, err) @@ -2299,7 +2299,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb true, false, 0, - WithLogger(logger), + WithLogger[BucketStore](logger), WithIndexCache(indexCache), ) testutil.Ok(tb, err) @@ -2515,7 +2515,7 @@ func TestSeries_ChunksHaveHashRepresentation(t *testing.T) { true, false, 0, - WithLogger(logger), + WithLogger[BucketStore](logger), WithIndexCache(indexCache), ) testutil.Ok(tb, err) diff --git a/pkg/store/options.go b/pkg/store/options.go new file mode 100644 index 00000000000..0215eaf6efe --- /dev/null +++ b/pkg/store/options.go @@ -0,0 +1,53 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +type StoreOption[T BucketStore | PrometheusStore | TSDBStore | ProxyStore] func(*T) + +// WithLogger sets the logger to the one you pass. +func WithLogger[T BucketStore | PrometheusStore | TSDBStore | ProxyStore](logger log.Logger) StoreOption[T] { + return func(s *T) { + switch s := any(s).(type) { + case *BucketStore: + s.logger = logger + case *PrometheusStore: + s.logger = logger + case *TSDBStore: + s.logger = logger + case *ProxyStore: + s.logger = logger + } + } +} + +// WithDebugLogging enables debug logging. +func WithDebugLogging[T BucketStore | ProxyStore]() StoreOption[T] { + return func(s *T) { + switch s := any(s).(type) { + case *BucketStore: + s.debugLogging = true + case *ProxyStore: + s.debugLogging = true + } + } +} + +// WithRegistry sets a registry that the store uses to register metrics with. +func WithRegistry[T BucketStore | PrometheusStore | ProxyStore](reg prometheus.Registerer) StoreOption[T] { + return func(s *T) { + switch s := any(s).(type) { + case *BucketStore: + s.reg = reg + case *PrometheusStore: + s.reg = reg + case *ProxyStore: + s.reg = reg + } + } +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index fd6d4c01952..576892fd2c6 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -48,6 +48,7 @@ import ( // PrometheusStore implements the store node API on top of the Prometheus remote read API. type PrometheusStore struct { logger log.Logger + reg prometheus.Registerer base *url.URL client *promclient.Client buffers sync.Pool @@ -72,20 +73,16 @@ const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size // to talk to Prometheus. // It attaches the provided external labels to all results. Provided external labels has to be sorted. func NewPrometheusStore( - logger log.Logger, - reg prometheus.Registerer, client *promclient.Client, baseURL *url.URL, component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), promVersion func() string, + options ...StoreOption[PrometheusStore], ) (*PrometheusStore, error) { - if logger == nil { - logger = log.NewNopLogger() - } p := &PrometheusStore{ - logger: logger, + logger: log.NewNopLogger(), base: baseURL, client: client, component: component, @@ -97,14 +94,19 @@ func NewPrometheusStore( b := make([]byte, 0, initialBufSize) return &b }}, - framesRead: promauto.With(reg).NewHistogram( - prometheus.HistogramOpts{ - Name: "prometheus_store_received_frames", - Help: "Number of frames received per streamed response.", - Buckets: prometheus.ExponentialBuckets(10, 10, 5), - }, - ), } + + for _, option := range options { + option(p) + } + + p.framesRead = promauto.With(p.reg).NewHistogram( + prometheus.HistogramOpts{ + Name: "prometheus_store_received_frames", + Help: "Number of frames received per streamed response.", + Buckets: prometheus.ExponentialBuckets(10, 10, 5), + }, + ) return p, nil } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index d0597b6e9cb..9158f711a48 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -68,7 +68,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.Ok(t, err) limitMinT := int64(0) - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + proxy, err := NewPrometheusStore(promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, nil, @@ -229,7 +229,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + promStore, err := NewPrometheusStore(promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil, @@ -411,7 +411,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + proxy, err := NewPrometheusStore(promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, nil) @@ -474,7 +474,7 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + proxy, err := NewPrometheusStore(promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, nil) @@ -504,7 +504,7 @@ func TestPrometheusStore_Info(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, + proxy, err := NewPrometheusStore(promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 123, 456 }, nil) @@ -583,7 +583,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + proxy, err := NewPrometheusStore(promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, nil) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 63801b1dc81..91df77e3c82 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -77,6 +77,7 @@ type Client interface { // ProxyStore implements the store API that proxies request to all given underlying stores. type ProxyStore struct { logger log.Logger + reg prometheus.Registerer stores func() []Client component component.StoreAPI selectorLabels labels.Labels @@ -109,35 +110,18 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* } } -// BucketStoreOption are functions that configure BucketStore. -type ProxyStoreOption func(s *ProxyStore) - -// WithProxyStoreDebugLogging enables debug logging. -func WithProxyStoreDebugLogging() ProxyStoreOption { - return func(s *ProxyStore) { - s.debugLogging = true - } -} - // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( - logger log.Logger, - reg prometheus.Registerer, stores func() []Client, component component.StoreAPI, selectorLabels labels.Labels, responseTimeout time.Duration, retrievalStrategy RetrievalStrategy, - options ...ProxyStoreOption, + options ...StoreOption[ProxyStore], ) *ProxyStore { - if logger == nil { - logger = log.NewNopLogger() - } - - metrics := newProxyStoreMetrics(reg) s := &ProxyStore{ - logger: logger, + logger: log.NewNopLogger(), stores: stores, component: component, selectorLabels: selectorLabels, @@ -146,7 +130,6 @@ func NewProxyStore( return &b }}, responseTimeout: responseTimeout, - metrics: metrics, retrievalStrategy: retrievalStrategy, } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 25f3e84102d..9c214eac35a 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -63,8 +63,7 @@ func TestProxyStore_Info(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - q := NewProxyStore(nil, - nil, + q := NewProxyStore( func() []Client { return nil }, component.Query, nil, 0*time.Second, RetrievalStrategy(EagerRetrieval), @@ -94,7 +93,7 @@ func TestProxyStore_TSDBInfos(t *testing.T) { }, }, } - q := NewProxyStore(nil, nil, + q := NewProxyStore( func() []Client { return stores }, component.Query, nil, 0*time.Second, EagerRetrieval, ) @@ -632,8 +631,7 @@ func TestProxyStore_Series(t *testing.T) { } for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} { t.Run(string(strategy), func(t *testing.T) { - q := NewProxyStore(nil, - nil, + q := NewProxyStore( func() []Client { return tc.storeAPIs }, component.Query, tc.selectorLabels, @@ -1165,8 +1163,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { if ok := t.Run(tc.title, func(t *testing.T) { for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} { if ok := t.Run(string(strategy), func(t *testing.T) { - q := NewProxyStore(nil, - nil, + q := NewProxyStore( func() []Client { return tc.storeAPIs }, component.Query, tc.selectorLabels, @@ -1223,8 +1220,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { MaxTime: 300, }, } - q := NewProxyStore(nil, - nil, + q := NewProxyStore( func() []Client { return cls }, component.Query, nil, @@ -1284,8 +1280,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { } - q := NewProxyStore(nil, - nil, + q := NewProxyStore( func() []Client { return cls }, component.Query, labels.FromStrings("fed", "a"), @@ -1331,8 +1326,7 @@ func TestProxyStore_LabelValues(t *testing.T) { MaxTime: timestamp.FromTime(time.Now()), }, } - q := NewProxyStore(nil, - nil, + q := NewProxyStore( func() []Client { return cls }, component.Query, nil, @@ -1531,8 +1525,6 @@ func TestProxyStore_LabelNames(t *testing.T) { } { if ok := t.Run(tc.title, func(t *testing.T) { q := NewProxyStore( - nil, - nil, func() []Client { return tc.storeAPIs }, component.Query, nil, diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index b5182f30081..66023389bf1 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -62,12 +62,9 @@ type ReadWriteTSDBStore struct { // NewTSDBStore creates a new TSDBStore. // NOTE: Given lset has to be sorted. -func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore { - if logger == nil { - logger = log.NewNopLogger() - } - return &TSDBStore{ - logger: logger, +func NewTSDBStore(db TSDBReader, component component.StoreAPI, extLset labels.Labels, options ...StoreOption[TSDBStore]) *TSDBStore { + s := &TSDBStore{ + logger: log.NewNopLogger(), db: db, component: component, extLset: extLset, @@ -77,6 +74,10 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI return &b }}, } + for _, option := range options { + option(s) + } + return s } func (s *TSDBStore) SetExtLset(extLset labels.Labels) { diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 6dcc033c1c7..fc4db3ba0e7 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -39,7 +39,7 @@ func TestTSDBStore_Info(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + tsdbStore := NewTSDBStore(db, component.Rule, labels.FromStrings("region", "eu-west")) resp, err := tsdbStore.Info(ctx, &storepb.InfoRequest{}) testutil.Ok(t, err) @@ -73,7 +73,7 @@ func TestTSDBStore_Series_ChunkChecksum(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + tsdbStore := NewTSDBStore(db, component.Rule, labels.FromStrings("region", "eu-west")) appender := db.Appender(context.Background()) @@ -113,7 +113,7 @@ func TestTSDBStore_Series(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + tsdbStore := NewTSDBStore(db, component.Rule, labels.FromStrings("region", "eu-west")) appender := db.Appender(context.Background()) @@ -237,7 +237,7 @@ func TestTSDBStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) { testutil.Ok(t, err) testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t, db.Appender(context.Background()), func() storepb.StoreServer { - return NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + return NewTSDBStore(db, component.Rule, labels.FromStrings("region", "eu-west")) }) } @@ -299,7 +299,7 @@ func TestTSDBStore_SeriesAccessWithDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(&mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, WithLogger[TSDBStore](logger)) srv := storetestutil.NewSeriesServer(context.Background()) csrv := &delegatorServer{SeriesServer: srv} @@ -462,7 +462,7 @@ func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(&mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, WithLogger[TSDBStore](logger)) srv := storetestutil.NewSeriesServer(context.Background()) t.Run("call series and access results", func(t *testing.T) { @@ -603,7 +603,7 @@ func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) { defer func() { testutil.Ok(t, db.Close()) }() extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(&mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, WithLogger[TSDBStore](logger)) var expected []*storepb.Series for _, resp := range resps { diff --git a/pkg/tenancy/tenancy_test.go b/pkg/tenancy/tenancy_test.go index c5c9563c441..e6bb9946a52 100644 --- a/pkg/tenancy/tenancy_test.go +++ b/pkg/tenancy/tenancy_test.go @@ -133,8 +133,7 @@ func TestTenantProxyPassing(t *testing.T) { &storetestutil.TestClient{StoreClient: mockedStore}, } - q := store.NewProxyStore(nil, - nil, + q := store.NewProxyStore( func() []store.Client { return cls }, component.Query, nil, 0*time.Second, store.EagerRetrieval, @@ -178,8 +177,7 @@ func TestTenantProxyPassing(t *testing.T) { &storetestutil.TestClient{StoreClient: mockedStore}, } - q := store.NewProxyStore(nil, - nil, + q := store.NewProxyStore( func() []store.Client { return cls }, component.Query, nil, 0*time.Second, store.EagerRetrieval,