Skip to content

Commit

Permalink
Store: minor cleanup in option passing
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Sep 15, 2023
1 parent 70de8fd commit d85a959
Show file tree
Hide file tree
Showing 23 changed files with 146 additions and 140 deletions.
9 changes: 6 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

options := []store.ProxyStoreOption{}
options := []store.StoreOption[store.ProxyStore]{
store.WithLogger[store.ProxyStore](logger),
store.WithRegistry[store.ProxyStore](reg),
}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options = append(options, store.WithDebugLogging[store.ProxyStore]())
}

var (
Expand Down Expand Up @@ -549,7 +552,7 @@ func runQuery(
endpointInfoTimeout,
queryConnMetricLabels...,
)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...)
proxy = store.NewProxyStore(endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
Expand Down
9 changes: 5 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,15 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

options := []store.ProxyStoreOption{}
options := []store.StoreOption[store.ProxyStore]{
store.WithLogger[store.ProxyStore](logger),
store.WithRegistry[store.ProxyStore](reg),
}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options = append(options, store.WithDebugLogging[store.ProxyStore]())
}

proxy := store.NewProxyStore(
logger,
reg,
dbs.TSDBLocalClients,
comp,
labels.Labels{},
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func runRule(
}
infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()}
if tsdbDB != nil {
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset)
tsdbStore := store.NewTSDBStore(tsdbDB, component.Rule, conf.lset, store.WithLogger[store.TSDBStore](logger))
infoOptions = append(
infoOptions,
info.WithLabelSetFunc(func() []labelpb.ZLabelSet {
Expand All @@ -654,6 +654,7 @@ func runRule(
return nil
}),
)

storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, store.WithLogger[store.PrometheusStore](logger), store.WithRegistry[store.PrometheusStore](reg))
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,9 @@ func runStore(
return errors.Wrap(err, "create chunk pool")
}

options := []store.BucketStoreOption{
store.WithLogger(logger),
store.WithRegistry(reg),
options := []store.StoreOption[store.BucketStore]{
store.WithLogger[store.BucketStore](logger),
store.WithRegistry[store.BucketStore](reg),
store.WithIndexCache(indexCache),
store.WithQueryGate(queriesGate),
store.WithChunkPool(chunkPool),
Expand All @@ -390,7 +390,7 @@ func runStore(
}

if conf.debugLogging {
options = append(options, store.WithDebugLogging())
options = append(options, store.WithDebugLogging[store.BucketStore]())
}

bs, err := store.NewBucketStore(
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/query/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestGRPCQueryAPIErrorHandling(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval)
proxy := store.NewProxyStore(func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval, store.WithLogger[store.ProxyStore](logger), store.WithRegistry[store.ProxyStore](reg))
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute)
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
tests := []struct {
Expand Down
4 changes: 1 addition & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,13 +631,11 @@ func TestQueryEndpoints(t *testing.T) {
func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
c := &storetestutil.TestClient{
Name: "1",
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil), 0),
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(db, component.Query, nil), 0),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
}

return store.NewProxyStore(
nil,
nil,
func() []store.Client { return []store.Client{c} },
component.Query,
nil,
Expand Down
2 changes: 0 additions & 2 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,6 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore {
}

return store.NewProxyStore(
nil,
nil,
func() []store.Client { return cls },
component.Query,
nil,
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestQuerier_Proxy(t *testing.T) {
q := NewQueryableCreator(
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return clients },
store.NewProxyStore(func() []store.Client { return clients },
component.Debug, nil, 5*time.Minute, store.EagerRetrieval),
1000000,
5*time.Minute,
Expand All @@ -51,7 +51,7 @@ func TestQuerier_Proxy(t *testing.T) {
// TODO(bwplotka): Parse external labels.
clients = append(clients, &storetestutil.TestClient{
Name: fmt.Sprintf("store number %v", i),
StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0),
StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0),
MinTime: st.mint,
MaxTime: st.maxt,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.hashFunc,
)
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset))
tenant.set(store.NewTSDBStore(s, component.Receive, lset, store.WithLogger[store.TSDBStore](logger)), s, ship, exemplars.NewTSDB(s, lset))
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim
}

func queryLabelValues(ctx context.Context, m *MultiTSDB) error {
proxy := store.NewProxyStore(nil, nil, func() []store.Client {
proxy := store.NewProxyStore(func() []store.Client {
clients := m.TSDBLocalClients()
if len(clients) > 0 {
clients[0] = &slowClient{clients[0]}
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
version, err := promclient.NewDefaultClient().BuildVersion(context.Background(), u)
testutil.Ok(tt, err)

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
promStore, err := NewPrometheusStore(promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return extLset },
func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) },
func() string { return version })
Expand All @@ -841,7 +841,7 @@ func TestTSDBStore_Acceptance(t *testing.T) {
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })

tsdbStore := NewTSDBStore(nil, db, component.Rule, extLset)
tsdbStore := NewTSDBStore(db, component.Rule, extLset)

appendFn(db.Appender(context.Background()))
return tsdbStore
Expand Down
46 changes: 11 additions & 35 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ type BlockEstimator func(meta metadata.Meta) uint64
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
type BucketStore struct {
logger log.Logger
reg prometheus.Registerer // TODO(metalmatze) remove and add via BucketStoreOption
reg prometheus.Registerer // TODO(metalmatze) remove and add via Option
metrics *bucketStoreMetrics
bkt objstore.InstrumentedBucketReader
fetcher block.MetadataFetcher
Expand Down Expand Up @@ -414,84 +414,60 @@ func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.
return map[storage.SeriesRef][]byte{}, ids
}

// BucketStoreOption are functions that configure BucketStore.
type BucketStoreOption func(s *BucketStore)

// WithLogger sets the BucketStore logger to the one you pass.
func WithLogger(logger log.Logger) BucketStoreOption {
return func(s *BucketStore) {
s.logger = logger
}
}

// WithRegistry sets a registry that BucketStore uses to register metrics with.
func WithRegistry(reg prometheus.Registerer) BucketStoreOption {
return func(s *BucketStore) {
s.reg = reg
}
}

// WithIndexCache sets a indexCache to use instead of a noopCache.
func WithIndexCache(cache storecache.IndexCache) BucketStoreOption {
func WithIndexCache(cache storecache.IndexCache) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.indexCache = cache
}
}

// WithQueryGate sets a queryGate to use instead of a noopGate.
func WithQueryGate(queryGate gate.Gate) BucketStoreOption {
func WithQueryGate(queryGate gate.Gate) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.queryGate = queryGate
}
}

// WithChunkPool sets a pool.Bytes to use for chunks.
func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption {
func WithChunkPool(chunkPool pool.Bytes) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.chunkPool = chunkPool
}
}

// WithFilterConfig sets a filter which Store uses for filtering metrics based on time.
func WithFilterConfig(filter *FilterConfig) BucketStoreOption {
func WithFilterConfig(filter *FilterConfig) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.filterConfig = filter
}
}

// WithDebugLogging enables debug logging.
func WithDebugLogging() BucketStoreOption {
return func(s *BucketStore) {
s.debugLogging = true
}
}

func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption {
func WithChunkHashCalculation(enableChunkHashCalculation bool) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.enableChunkHashCalculation = enableChunkHashCalculation
}
}

func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption {
func WithSeriesBatchSize(seriesBatchSize int) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.seriesBatchSize = seriesBatchSize
}
}

func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) BucketStoreOption {
func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.blockEstimatedMaxSeriesFunc = f
}
}

func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption {
func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.blockEstimatedMaxChunkFunc = f
}
}

// WithLazyExpandedPostings enables lazy expanded postings.
func WithLazyExpandedPostings(enabled bool) BucketStoreOption {
func WithLazyExpandedPostings(enabled bool) StoreOption[BucketStore] {
return func(s *BucketStore) {
s.enabledLazyExpandedPostings = enabled
}
Expand All @@ -513,7 +489,7 @@ func NewBucketStore(
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
lazyIndexReaderEnabled bool,
lazyIndexReaderIdleTimeout time.Duration,
options ...BucketStoreOption,
options ...StoreOption[BucketStore],
) (*BucketStore, error) {
s := &BucketStore{
logger: log.NewNopLogger(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
true,
true,
time.Minute,
WithLogger(s.logger),
WithLogger[BucketStore](s.logger),
WithIndexCache(s.cache),
WithFilterConfig(filterConf),
WithRegistry(reg),
WithRegistry[BucketStore](reg),
)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, store.Close()) }()
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
false,
false,
0,
WithLogger(logger),
WithLogger[BucketStore](logger),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(t, err)
Expand Down Expand Up @@ -1416,7 +1416,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk,
false,
false,
0,
WithLogger(logger),
WithLogger[BucketStore](logger),
WithChunkPool(chunkPool),
WithLazyExpandedPostings(lazyExpandedPostings),
)
Expand Down Expand Up @@ -1864,7 +1864,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
true,
false,
0,
WithLogger(logger),
WithLogger[BucketStore](logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
Expand Down Expand Up @@ -1955,7 +1955,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
true,
false,
0,
WithLogger(logger),
WithLogger[BucketStore](logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
Expand Down Expand Up @@ -2113,7 +2113,7 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) {
true,
false,
0,
WithLogger(logger),
WithLogger[BucketStore](logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
Expand Down Expand Up @@ -2299,7 +2299,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
true,
false,
0,
WithLogger(logger),
WithLogger[BucketStore](logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
Expand Down Expand Up @@ -2515,7 +2515,7 @@ func TestSeries_ChunksHaveHashRepresentation(t *testing.T) {
true,
false,
0,
WithLogger(logger),
WithLogger[BucketStore](logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
Expand Down
Loading

0 comments on commit d85a959

Please sign in to comment.