From 685b1fd979421ac7d3fdfa66ec4899ffad03aa7b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 3 Dec 2023 17:54:59 -0800 Subject: [PATCH 1/3] remove unused tombstones loader and cache gen code Signed-off-by: Ben Ye --- integration/querier_tenant_federation_test.go | 1 - pkg/api/handlers.go | 6 +- pkg/api/handlers_test.go | 4 +- pkg/api/middlewares.go | 23 --- pkg/chunk/cache/cache_gen.go | 92 ---------- pkg/chunk/cache/cache_gen_test.go | 41 ----- pkg/cortex/cortex.go | 2 - pkg/cortex/modules.go | 13 +- pkg/purger/tombstones.go | 74 -------- pkg/querier/querier.go | 30 +--- pkg/querier/querier_test.go | 22 +-- pkg/querier/series/series_set.go | 162 ------------------ .../tripperware/queryrange/query_range.go | 11 -- .../queryrange/query_range_middlewares.go | 3 +- .../query_range_middlewares_test.go | 1 - .../tripperware/queryrange/results_cache.go | 37 ---- .../queryrange/results_cache_test.go | 119 +------------ pkg/ruler/ruler_test.go | 3 +- 18 files changed, 28 insertions(+), 616 deletions(-) delete mode 100644 pkg/chunk/cache/cache_gen.go delete mode 100644 pkg/chunk/cache/cache_gen_test.go delete mode 100644 pkg/purger/tombstones.go diff --git a/integration/querier_tenant_federation_test.go b/integration/querier_tenant_federation_test.go index 9bdee25f2d..11c4e545fc 100644 --- a/integration/querier_tenant_federation_test.go +++ b/integration/querier_tenant_federation_test.go @@ -164,7 +164,6 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers( labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|"))))) - // TODO: check cache invalidation on tombstone cache gen increase // TODO: check fairness in queryfrontend } diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index a1aa3daed8..a1b1afc200 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -24,7 +24,6 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/middleware" - "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" @@ -162,7 +161,6 @@ func NewQuerierHandler( exemplarQueryable storage.ExemplarQueryable, engine v1.QueryEngine, distributor Distributor, - tombstonesLoader purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger, ) http.Handler { @@ -242,9 +240,7 @@ func NewQuerierHandler( ResponseBodySize: sentMessageSize, InflightRequests: inflightRequests, } - cacheGenHeaderMiddleware := getHTTPCacheGenNumberHeaderSetterMiddleware(tombstonesLoader) - middlewares := middleware.Merge(inst, cacheGenHeaderMiddleware) - router.Use(middlewares.Wrap) + router.Use(inst.Wrap) // Define the prefixes for all routes prefix := path.Join(cfg.ServerPrefix, cfg.PrometheusHTTPPrefix) diff --git a/pkg/api/handlers_test.go b/pkg/api/handlers_test.go index 8a3b614aeb..32e84d70a9 100644 --- a/pkg/api/handlers_test.go +++ b/pkg/api/handlers_test.go @@ -14,8 +14,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" - - "github.com/cortexproject/cortex/pkg/purger" ) func TestIndexHandlerPrefix(t *testing.T) { @@ -234,7 +232,7 @@ func TestBuildInfoAPI(t *testing.T) { version.Version = tc.version version.Branch = tc.branch version.Revision = tc.revision - handler := NewQuerierHandler(cfg, nil, nil, nil, nil, purger.NewNoopTombstonesLoader(), nil, &FakeLogger{}) + handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, &FakeLogger{}) writer := httptest.NewRecorder() req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil) req = req.WithContext(user.InjectOrgID(req.Context(), "test")) diff --git a/pkg/api/middlewares.go b/pkg/api/middlewares.go index 5ca69bb6ab..8ddefaa2c6 100644 --- a/pkg/api/middlewares.go +++ b/pkg/api/middlewares.go @@ -4,32 +4,9 @@ import ( "context" "net/http" - "github.com/weaveworks/common/middleware" - - "github.com/cortexproject/cortex/pkg/purger" - "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" - "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" ) -// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation -func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader purger.TombstonesLoader) middleware.Interface { - return middleware.Func(func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tenantIDs, err := tenant.TenantIDs(r.Context()) - if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - - cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs) - - w.Header().Set(queryrange.ResultsCacheGenNumberHeaderName, cacheGenNumber) - next.ServeHTTP(w, r) - }) - }) -} - // HTTPHeaderMiddleware adds specified HTTPHeaders to the request context type HTTPHeaderMiddleware struct { TargetHeaders []string diff --git a/pkg/chunk/cache/cache_gen.go b/pkg/chunk/cache/cache_gen.go deleted file mode 100644 index 3fd151db1d..0000000000 --- a/pkg/chunk/cache/cache_gen.go +++ /dev/null @@ -1,92 +0,0 @@ -package cache - -import ( - "context" -) - -type contextKey int - -// cacheGenContextKey is used for setting a Cache Generation number in context. -const cacheGenContextKey contextKey = 0 - -// GenNumMiddleware adds gen number to keys from context. Expected size of gen numbers is upto 2 digits. -// If we start seeing problems with keys exceeding length limit, we need to look into resetting gen numbers. -type GenNumMiddleware struct { - downstreamCache Cache -} - -// NewCacheGenNumMiddleware creates a new GenNumMiddleware. -func NewCacheGenNumMiddleware(downstreamCache Cache) Cache { - return &GenNumMiddleware{downstreamCache} -} - -// Store adds cache gen number to keys before calling Store method of downstream cache. -func (c GenNumMiddleware) Store(ctx context.Context, keys []string, buf [][]byte) { - keys = addCacheGenNumToCacheKeys(ctx, keys) - c.downstreamCache.Store(ctx, keys, buf) -} - -// Fetch adds cache gen number to keys before calling Fetch method of downstream cache. -// It also removes gen number before responding back with found and missing keys to make sure consumer of response gets to see same keys. -func (c GenNumMiddleware) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) { - keys = addCacheGenNumToCacheKeys(ctx, keys) - - found, bufs, missing = c.downstreamCache.Fetch(ctx, keys) - - found = removeCacheGenNumFromKeys(ctx, found) - missing = removeCacheGenNumFromKeys(ctx, missing) - - return -} - -// Stop calls Stop method of downstream cache. -func (c GenNumMiddleware) Stop() { - c.downstreamCache.Stop() -} - -// InjectCacheGenNumber returns a derived context containing the cache gen. -func InjectCacheGenNumber(ctx context.Context, cacheGen string) context.Context { - return context.WithValue(ctx, interface{}(cacheGenContextKey), cacheGen) -} - -// ExtractCacheGenNumbersFromHeaders gets the cache gen from the context. -func ExtractCacheGenNumber(ctx context.Context) string { - cacheGenNumber, ok := ctx.Value(cacheGenContextKey).(string) - if !ok { - return "" - } - return cacheGenNumber -} - -// addCacheGenNumToCacheKeys adds gen number to keys as prefix. -func addCacheGenNumToCacheKeys(ctx context.Context, keys []string) []string { - cacheGen := ExtractCacheGenNumber(ctx) - if cacheGen == "" { - return keys - } - - prefixedKeys := make([]string, len(keys)) - - for i := range keys { - prefixedKeys[i] = cacheGen + keys[i] - } - - return prefixedKeys -} - -// removeCacheGenNumFromKeys removes prefixed gen number from keys. -func removeCacheGenNumFromKeys(ctx context.Context, keys []string) []string { - cacheGen := ExtractCacheGenNumber(ctx) - if cacheGen == "" { - return keys - } - - unprefixedKeys := make([]string, len(keys)) - cacheGenPrefixLen := len(cacheGen) - - for i := range keys { - unprefixedKeys[i] = keys[i][cacheGenPrefixLen:] - } - - return unprefixedKeys -} diff --git a/pkg/chunk/cache/cache_gen_test.go b/pkg/chunk/cache/cache_gen_test.go deleted file mode 100644 index 1ac45dc69a..0000000000 --- a/pkg/chunk/cache/cache_gen_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package cache - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestCacheGenNumCacheKeysPrefix(t *testing.T) { - keys := []string{"foo", "bar", "baz"} - - for _, tc := range []struct { - name string - prefix string - }{ - { - name: "empty-prefix", - }, - { - name: "with-prefix", - prefix: "prefix", - }, - } { - t.Run(tc.name, func(t *testing.T) { - ctx := InjectCacheGenNumber(context.Background(), tc.prefix) - - prefixedKeys := addCacheGenNumToCacheKeys(ctx, keys) - for i, key := range prefixedKeys { - require.Equal(t, tc.prefix+keys[i], key) - } - require.Len(t, prefixedKeys, len(keys)) - - unprefixedKeys := removeCacheGenNumFromKeys(ctx, prefixedKeys) - for i, key := range unprefixedKeys { - require.Equal(t, keys[i], key) - } - require.Len(t, unprefixedKeys, len(keys)) - }) - } -} diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 83496d3565..279bfcaece 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -38,7 +38,6 @@ import ( frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/tenantfederation" "github.com/cortexproject/cortex/pkg/querier/tripperware" @@ -302,7 +301,6 @@ type Cortex struct { Flusher *flusher.Flusher Frontend *frontendv1.Frontend RuntimeConfig *runtimeconfig.Manager - TombstonesLoader purger.TombstonesLoader QuerierQueryable prom_storage.SampleAndChunkQueryable ExemplarQueryable prom_storage.ExemplarQueryable QuerierEngine v1.QueryEngine diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 4fc0969ee7..6526531b2d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -230,7 +230,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) // Create a querier queryable and PromQL engine - t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, querierRegisterer, util_log.Logger) + t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger) // Register the default endpoints that are always enabled for the querier module t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor) @@ -308,7 +308,6 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { t.ExemplarQueryable, t.QuerierEngine, t.Distributor, - t.TombstonesLoader, prometheus.DefaultRegisterer, util_log.Logger, ) @@ -441,12 +440,6 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) { return t.Flusher, nil } -func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { - // no-op while blocks store does not support series deletion - t.TombstonesLoader = purger.NewNoopTombstonesLoader() - return -} - // initQueryFrontendTripperware instantiates the tripperware used by the query frontend // to optimize Prometheus query requests. func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) { @@ -462,7 +455,6 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro t.Overrides, queryrange.PrometheusResponseExtractor{}, prometheus.DefaultRegisterer, - t.TombstonesLoader, queryAnalyzer, prometheusCodec, shardedPrometheusCodec, @@ -591,7 +583,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { } else { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) // TODO: Consider wrapping logger to differentiate from querier module logger - queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger) + queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger) managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger) @@ -746,7 +738,6 @@ func (t *Cortex) setupModuleManager() error { mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule) - mm.RegisterModule(DeleteRequestsStore, t.initDeleteRequestsStore, modules.UserInvisibleModule) mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(IngesterService, t.initIngesterService, modules.UserInvisibleModule) mm.RegisterModule(Flusher, t.initFlusher) diff --git a/pkg/purger/tombstones.go b/pkg/purger/tombstones.go deleted file mode 100644 index 31084dd529..0000000000 --- a/pkg/purger/tombstones.go +++ /dev/null @@ -1,74 +0,0 @@ -package purger - -import ( - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" -) - -// TombstonesSet holds all the pending delete requests for a user -type TombstonesSet interface { - // GetDeletedIntervals returns non-overlapping, sorted deleted intervals. - GetDeletedIntervals(lbls labels.Labels, from, to model.Time) []model.Interval - - // Len returns number of tombstones that are there - Len() int - - // HasTombstonesForInterval tells whether there are any tombstones which overlapping given interval - HasTombstonesForInterval(from, to model.Time) bool -} - -type noopTombstonesSet struct { -} - -// TombstonesLoader loads delete requests and gen numbers from store and keeps checking for updates. -// It keeps checking for changes in gen numbers, which also means changes in delete requests and reloads specific users delete requests. -type TombstonesLoader interface { - // GetPendingTombstones returns all pending tombstones - GetPendingTombstones(userID string) (TombstonesSet, error) - - // GetPendingTombstonesForInterval returns all pending tombstones between two times - GetPendingTombstonesForInterval(userID string, from, to model.Time) (TombstonesSet, error) - - // GetStoreCacheGenNumber returns store cache gen number for a user - GetStoreCacheGenNumber(tenantIDs []string) string - - // GetResultsCacheGenNumber returns results cache gen number for a user - GetResultsCacheGenNumber(tenantIDs []string) string -} - -type noopTombstonesLoader struct { - ts noopTombstonesSet -} - -// NewNoopTombstonesLoader creates a TombstonesLoader that does nothing -func NewNoopTombstonesLoader() TombstonesLoader { - return &noopTombstonesLoader{} -} - -func (tl *noopTombstonesLoader) GetPendingTombstones(userID string) (TombstonesSet, error) { - return &tl.ts, nil -} - -func (tl *noopTombstonesLoader) GetPendingTombstonesForInterval(userID string, from, to model.Time) (TombstonesSet, error) { - return &tl.ts, nil -} - -func (tl *noopTombstonesLoader) GetStoreCacheGenNumber(tenantIDs []string) string { - return "" -} - -func (tl *noopTombstonesLoader) GetResultsCacheGenNumber(tenantIDs []string) string { - return "" -} - -func (ts noopTombstonesSet) GetDeletedIntervals(lbls labels.Labels, from, to model.Time) []model.Interval { - return nil -} - -func (ts noopTombstonesSet) Len() int { - return 0 -} - -func (ts noopTombstonesSet) HasTombstonesForInterval(from, to model.Time) bool { - return false -} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index d66697674d..2a5e03af3b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -26,11 +26,9 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/iterators" "github.com/cortexproject/cortex/pkg/querier/lazyquery" - "github.com/cortexproject/cortex/pkg/querier/series" seriesset "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -156,7 +154,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { } // New builds a queryable and promql engine. -func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, v1.QueryEngine) { +func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, v1.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) @@ -168,7 +166,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor QueryStoreAfter: cfg.QueryStoreAfter, } } - queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits, tombstonesLoader) + queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits) exemplarQueryable := newDistributorExemplarQueryable(distributor) lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) { @@ -252,14 +250,13 @@ type limiterHolder struct { } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides, tombstonesLoader purger.TombstonesLoader) storage.Queryable { +func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ now: time.Now(), mint: mint, maxt: maxt, chunkIterFn: chunkIterFn, - tombstonesLoader: tombstonesLoader, limits: limits, maxQueryIntoFuture: cfg.MaxQueryIntoFuture, queryStoreForLabels: cfg.QueryStoreForLabels, @@ -277,7 +274,6 @@ type querier struct { now time.Time mint, maxt int64 - tombstonesLoader purger.TombstonesLoader limits *validation.Overrides maxQueryIntoFuture time.Duration queryStoreForLabels bool @@ -398,19 +394,8 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select return storage.ErrSeriesSet(limitErr) } - tombstones, err := q.tombstonesLoader.GetPendingTombstonesForInterval(userID, startTime, endTime) - if err != nil { - return storage.ErrSeriesSet(err) - } - if len(queriers) == 1 { - seriesSet := queriers[0].Select(ctx, sortSeries, sp, matchers...) - - if tombstones.Len() != 0 { - seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime}) - } - - return seriesSet + return queriers[0].Select(ctx, sortSeries, sp, matchers...) } sets := make(chan storage.SeriesSet, len(queriers)) @@ -434,12 +419,7 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select // we have all the sets from different sources (chunk from store, chunks from ingesters, // time series from store and time series from ingesters). // mergeSeriesSets will return sorted set. - seriesSet := q.mergeSeriesSets(result) - - if tombstones.Len() != 0 { - seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime}) - } - return seriesSet + return q.mergeSeriesSets(result) } // LabelValues implements storage.Querier. diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index ed3d6abc29..f230e1dbf9 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -552,7 +552,7 @@ func TestQuerier(t *testing.T) { require.NoError(t, err) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) testRangeQuery(t, queryable, queryEngine, through, query) }) } @@ -576,7 +576,7 @@ func TestQuerierMetric(t *testing.T) { queryables := []QueryableWithFilter{} r := prometheus.NewRegistry() reg := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, r) - New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), reg, log.NewNopLogger()) + New(cfg, overrides, distributor, queryables, reg, log.NewNopLogger()) assert.NoError(t, promutil.GatherAndCompare(r, strings.NewReader(` # HELP cortex_max_concurrent_queries The maximum number of concurrent queries. # TYPE cortex_max_concurrent_queries gauge @@ -701,7 +701,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "0") - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, nil, log.NewNopLogger()) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) @@ -800,7 +800,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "0") queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.queryStartTime, c.queryEndTime, time.Minute) require.NoError(t, err) @@ -885,7 +885,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) queryEngine := promql.NewEngine(opts) ctx := user.InjectOrgID(context.Background(), "test") @@ -1031,7 +1031,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) require.NoError(t, err) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, testData.query, testData.queryStartTime, testData.queryEndTime, time.Minute) @@ -1060,7 +1060,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1101,7 +1101,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1129,7 +1129,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1156,7 +1156,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1507,7 +1507,7 @@ func TestShortTermQueryToLTS(t *testing.T) { overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) require.NoError(t, err) - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, purger.NewNoopTombstonesLoader(), nil, log.NewNopLogger()) + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore))}, nil, log.NewNopLogger()) ctx := user.InjectOrgID(context.Background(), "0") query, err := engine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) diff --git a/pkg/querier/series/series_set.go b/pkg/querier/series/series_set.go index 90e2b83f62..cd49bf23fe 100644 --- a/pkg/querier/series/series_set.go +++ b/pkg/querier/series/series_set.go @@ -26,7 +26,6 @@ import ( "github.com/prometheus/prometheus/util/annotations" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" - "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier/iterators" ) @@ -199,167 +198,6 @@ func (b byLabels) Len() int { return len(b) } func (b byLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i].Labels(), b[j].Labels()) < 0 } -type DeletedSeriesSet struct { - seriesSet storage.SeriesSet - tombstones purger.TombstonesSet - queryInterval model.Interval -} - -func NewDeletedSeriesSet(seriesSet storage.SeriesSet, tombstones purger.TombstonesSet, queryInterval model.Interval) storage.SeriesSet { - return &DeletedSeriesSet{ - seriesSet: seriesSet, - tombstones: tombstones, - queryInterval: queryInterval, - } -} - -func (d DeletedSeriesSet) Next() bool { - return d.seriesSet.Next() -} - -func (d DeletedSeriesSet) At() storage.Series { - series := d.seriesSet.At() - deletedIntervals := d.tombstones.GetDeletedIntervals(series.Labels(), d.queryInterval.Start, d.queryInterval.End) - - // series is deleted for whole query range so return empty series - if len(deletedIntervals) == 1 && deletedIntervals[0] == d.queryInterval { - return NewEmptySeries(series.Labels()) - } - - return NewDeletedSeries(series, deletedIntervals) -} - -func (d DeletedSeriesSet) Err() error { - return d.seriesSet.Err() -} - -func (d DeletedSeriesSet) Warnings() annotations.Annotations { - return nil -} - -type DeletedSeries struct { - series storage.Series - deletedIntervals []model.Interval -} - -func NewDeletedSeries(series storage.Series, deletedIntervals []model.Interval) storage.Series { - return &DeletedSeries{ - series: series, - deletedIntervals: deletedIntervals, - } -} - -func (d DeletedSeries) Labels() labels.Labels { - return d.series.Labels() -} - -func (d DeletedSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { - return NewDeletedSeriesIterator(d.series.Iterator(it), d.deletedIntervals) -} - -type DeletedSeriesIterator struct { - itr chunkenc.Iterator - deletedIntervals []model.Interval -} - -func NewDeletedSeriesIterator(itr chunkenc.Iterator, deletedIntervals []model.Interval) chunkenc.Iterator { - return iterators.NewCompatibleChunksIterator(&DeletedSeriesIterator{ - itr: itr, - deletedIntervals: deletedIntervals, - }) -} - -func (d DeletedSeriesIterator) Seek(t int64) bool { - if found := d.itr.Seek(t); found == chunkenc.ValNone { - return false - } - - seekedTs, _ := d.itr.At() - if d.isDeleted(seekedTs) { - // point we have seeked into is deleted, Next() should find a new non-deleted sample which is after t and seekedTs - return d.Next() - } - - return true -} - -func (d DeletedSeriesIterator) At() (t int64, v float64) { - return d.itr.At() -} - -func (d DeletedSeriesIterator) Next() bool { - for d.itr.Next() != chunkenc.ValNone { - ts, _ := d.itr.At() - - if d.isDeleted(ts) { - continue - } - return true - } - return false -} - -func (d DeletedSeriesIterator) Err() error { - return d.itr.Err() -} - -// isDeleted removes intervals which are past ts while checking for whether ts happens to be in one of the deleted intervals -func (d *DeletedSeriesIterator) isDeleted(ts int64) bool { - mts := model.Time(ts) - - for _, interval := range d.deletedIntervals { - if mts > interval.End { - d.deletedIntervals = d.deletedIntervals[1:] - continue - } else if mts < interval.Start { - return false - } - - return true - } - - return false -} - -type emptySeries struct { - labels labels.Labels -} - -func NewEmptySeries(labels labels.Labels) storage.Series { - return emptySeries{labels} -} - -func (e emptySeries) Labels() labels.Labels { - return e.labels -} - -func (emptySeries) Iterator(chunkenc.Iterator) chunkenc.Iterator { - return NewEmptySeriesIterator() -} - -type emptySeriesIterator struct { -} - -func NewEmptySeriesIterator() chunkenc.Iterator { - return iterators.NewCompatibleChunksIterator(emptySeriesIterator{}) -} - -func (emptySeriesIterator) Seek(t int64) bool { - return false -} - -func (emptySeriesIterator) At() (t int64, v float64) { - return 0, 0 -} - -func (emptySeriesIterator) Next() bool { - return false -} - -func (emptySeriesIterator) Err() error { - return nil -} - type seriesSetWithWarnings struct { wrapped storage.SeriesSet warnings annotations.Annotations diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 18b0be8888..8ca3364401 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -135,12 +135,8 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques } promResponses := make([]*PrometheusResponse, 0, len(responses)) - // we need to pass on all the headers for results cache gen numbers. - var resultsCacheGenNumberHeaderValues []string - for _, res := range responses { promResponses = append(promResponses, res.(*PrometheusResponse)) - resultsCacheGenNumberHeaderValues = append(resultsCacheGenNumberHeaderValues, getHeaderValuesWithName(res, ResultsCacheGenNumberHeaderName)...) } // Merge the responses. @@ -159,13 +155,6 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques }, } - if len(resultsCacheGenNumberHeaderValues) != 0 { - response.Headers = []*tripperware.PrometheusResponseHeader{{ - Name: ResultsCacheGenNumberHeaderName, - Values: resultsCacheGenNumberHeaderValues, - }} - } - return &response, nil } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 09a768028a..0201856179 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -76,7 +76,6 @@ func Middlewares( limits tripperware.Limits, cacheExtractor Extractor, registerer prometheus.Registerer, - cacheGenNumberLoader CacheGenNumberLoader, queryAnalyzer querysharding.Analyzer, prometheusCodec tripperware.Codec, shardedPrometheusCodec tripperware.Codec, @@ -101,7 +100,7 @@ func Middlewares( } return false } - queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer) + queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer) if err != nil { return nil, nil, err } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 518cdce885..027dddb446 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -57,7 +57,6 @@ func TestRoundTrip(t *testing.T) { mockLimits{}, nil, nil, - nil, qa, PrometheusCodec, ShardedPrometheusCodec, diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index 41b43137c2..22ccce82d5 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -38,15 +38,8 @@ import ( var ( // Value that cacheControlHeader has if the response indicates that the results should not be cached. noStoreValue = "no-store" - - // ResultsCacheGenNumberHeaderName holds name of the header we want to set in http response - ResultsCacheGenNumberHeaderName = "Results-Cache-Gen-Number" ) -type CacheGenNumberLoader interface { - GetResultsCacheGenNumber(tenantIDs []string) string -} - // ResultsCacheConfig is the config for the results cache. type ResultsCacheConfig struct { CacheConfig cache.Config `yaml:"cache"` @@ -161,7 +154,6 @@ type resultsCache struct { extractor Extractor minCacheExtent int64 // discard any cache extent smaller than this merger tripperware.Merger - cacheGenNumberLoader CacheGenNumberLoader shouldCache ShouldCacheFn cacheQueryableSamplesStats bool } @@ -179,7 +171,6 @@ func NewResultsCacheMiddleware( limits tripperware.Limits, merger tripperware.Merger, extractor Extractor, - cacheGenNumberLoader CacheGenNumberLoader, shouldCache ShouldCacheFn, reg prometheus.Registerer, ) (tripperware.Middleware, cache.Cache, error) { @@ -191,10 +182,6 @@ func NewResultsCacheMiddleware( c = cache.NewSnappy(c, logger) } - if cacheGenNumberLoader != nil { - c = cache.NewCacheGenNumMiddleware(c) - } - return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler { return &resultsCache{ logger: logger, @@ -206,7 +193,6 @@ func NewResultsCacheMiddleware( extractor: extractor, minCacheExtent: (5 * time.Minute).Milliseconds(), splitter: splitter, - cacheGenNumberLoader: cacheGenNumberLoader, shouldCache: shouldCache, cacheQueryableSamplesStats: cfg.CacheQueryableSamplesStats, } @@ -232,10 +218,6 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar return s.next.Do(ctx, r) } - if s.cacheGenNumberLoader != nil { - ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs)) - } - var ( key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r) extents []Extent @@ -284,25 +266,6 @@ func (s resultsCache) shouldCacheResponse(ctx context.Context, req tripperware.R return false } - if s.cacheGenNumberLoader == nil { - return true - } - - genNumbersFromResp := getHeaderValuesWithName(r, ResultsCacheGenNumberHeaderName) - genNumberFromCtx := cache.ExtractCacheGenNumber(ctx) - - if len(genNumbersFromResp) == 0 && genNumberFromCtx != "" { - level.Debug(util_log.WithContext(ctx, s.logger)).Log("msg", fmt.Sprintf("we found results cache gen number %s set in store but none in headers", genNumberFromCtx)) - return false - } - - for _, gen := range genNumbersFromResp { - if gen != genNumberFromCtx { - level.Debug(util_log.WithContext(ctx, s.logger)).Log("msg", fmt.Sprintf("inconsistency in results cache gen numbers %s (GEN-FROM-RESPONSE) != %s (GEN-FROM-STORE), not caching the response", gen, genNumberFromCtx)) - return false - } - } - return true } diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index b7ba0d5f97..c6204ac57f 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -214,7 +214,6 @@ func TestStatsCacheQuerySamples(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, - nil, ) require.NoError(t, err) @@ -237,13 +236,12 @@ func TestStatsCacheQuerySamples(t *testing.T) { func TestShouldCache(t *testing.T) { t.Parallel() maxCacheTime := int64(150 * 1000) - c := &resultsCache{logger: log.NewNopLogger(), cacheGenNumberLoader: newMockCacheGenNumberLoader()} + c := &resultsCache{logger: log.NewNopLogger()} for _, tc := range []struct { - name string - request tripperware.Request - input tripperware.Response - cacheGenNumberToInject string - expected bool + name string + request tripperware.Request + input tripperware.Response + expected bool }{ // Tests only for cacheControlHeader { @@ -307,94 +305,6 @@ func TestShouldCache(t *testing.T) { }), expected: true, }, - - // Tests only for cacheGenNumber header - { - name: "cacheGenNumber not set in both header and store", - request: &PrometheusRequest{Query: "metric"}, - input: tripperware.Response(&PrometheusResponse{ - Headers: []*tripperware.PrometheusResponseHeader{ - { - Name: "meaninglessheader", - Values: []string{}, - }, - }, - }), - expected: true, - }, - { - name: "cacheGenNumber set in store but not in header", - request: &PrometheusRequest{Query: "metric"}, - input: tripperware.Response(&PrometheusResponse{ - Headers: []*tripperware.PrometheusResponseHeader{ - { - Name: "meaninglessheader", - Values: []string{}, - }, - }, - }), - cacheGenNumberToInject: "1", - expected: false, - }, - { - name: "cacheGenNumber set in header but not in store", - request: &PrometheusRequest{Query: "metric"}, - input: tripperware.Response(&PrometheusResponse{ - Headers: []*tripperware.PrometheusResponseHeader{ - { - Name: ResultsCacheGenNumberHeaderName, - Values: []string{"1"}, - }, - }, - }), - expected: false, - }, - { - name: "cacheGenNumber in header and store are the same", - request: &PrometheusRequest{Query: "metric"}, - input: tripperware.Response(&PrometheusResponse{ - Headers: []*tripperware.PrometheusResponseHeader{ - { - Name: ResultsCacheGenNumberHeaderName, - Values: []string{"1", "1"}, - }, - }, - }), - cacheGenNumberToInject: "1", - expected: true, - }, - { - name: "inconsistency between cacheGenNumber in header and store", - request: &PrometheusRequest{Query: "metric"}, - input: tripperware.Response(&PrometheusResponse{ - Headers: []*tripperware.PrometheusResponseHeader{ - { - Name: ResultsCacheGenNumberHeaderName, - Values: []string{"1", "2"}, - }, - }, - }), - cacheGenNumberToInject: "1", - expected: false, - }, - { - name: "cacheControl header says not to catch and cacheGenNumbers in store and headers have consistency", - request: &PrometheusRequest{Query: "metric"}, - input: tripperware.Response(&PrometheusResponse{ - Headers: []*tripperware.PrometheusResponseHeader{ - { - Name: cacheControlHeader, - Values: []string{noStoreValue}, - }, - { - Name: ResultsCacheGenNumberHeaderName, - Values: []string{"1", "1"}, - }, - }, - }), - cacheGenNumberToInject: "1", - expected: false, - }, // @ modifier on vector selectors. { name: "@ modifier on vector selector, before end, before maxCacheTime", @@ -510,8 +420,7 @@ func TestShouldCache(t *testing.T) { { t.Run(tc.name, func(t *testing.T) { t.Parallel() - ctx := cache.InjectCacheGenNumber(context.Background(), tc.cacheGenNumberToInject) - ret := c.shouldCacheResponse(ctx, tc.request, tc.input, maxCacheTime) + ret := c.shouldCacheResponse(context.Background(), tc.request, tc.input, maxCacheTime) require.Equal(t, tc.expected, ret) }) } @@ -1009,7 +918,6 @@ func TestResultsCache(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, - nil, ) require.NoError(t, err) @@ -1051,7 +959,6 @@ func TestResultsCacheRecent(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, - nil, ) require.NoError(t, err) @@ -1117,7 +1024,6 @@ func TestResultsCacheMaxFreshness(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, - nil, ) require.NoError(t, err) @@ -1155,7 +1061,6 @@ func Test_resultsCache_MissingData(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, - nil, ) require.NoError(t, err) rc := rm.Wrap(nil).(*resultsCache) @@ -1266,7 +1171,6 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { mockLimits{maxCacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, - nil, tc.shouldCache, nil, ) @@ -1290,14 +1194,3 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { func toMs(t time.Duration) int64 { return int64(t / time.Millisecond) } - -type mockCacheGenNumberLoader struct { -} - -func newMockCacheGenNumberLoader() CacheGenNumberLoader { - return mockCacheGenNumberLoader{} -} - -func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string { - return "" -} diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 92f92094c9..aa3d5e0e02 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -41,7 +41,6 @@ import ( "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" @@ -141,7 +140,7 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg querierTestConfig.Cfg.ActiveQueryTrackerDir = "" overrides, _ := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, purger.NewNoopTombstonesLoader(), reg, logger) + q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger) return func(mint, maxt int64) (storage.Querier, error) { return q.Querier(mint, maxt) } From 411bd4b5999d0d8d6f591195930c8befbf952888 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 3 Dec 2023 18:26:58 -0800 Subject: [PATCH 2/3] fix tests Signed-off-by: Ben Ye --- pkg/querier/querier_test.go | 5 +- pkg/querier/series/series_set_test.go | 101 -------------------------- 2 files changed, 2 insertions(+), 104 deletions(-) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index f230e1dbf9..bc7f43c550 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -31,7 +31,6 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" - "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/iterators" "github.com/cortexproject/cortex/pkg/tenant" @@ -278,7 +277,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queriable := range tc.storeQueriables { wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queriable}) } - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides, purger.NewNoopTombstonesLoader()) + queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6, @@ -487,7 +486,7 @@ func TestLimits(t *testing.T) { overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) require.NoError(t, err) - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides, purger.NewNoopTombstonesLoader()) + queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6, diff --git a/pkg/querier/series/series_set_test.go b/pkg/querier/series/series_set_test.go index 304a761639..7e243a1444 100644 --- a/pkg/querier/series/series_set_test.go +++ b/pkg/querier/series/series_set_test.go @@ -1,13 +1,11 @@ package series import ( - "math/rand" "testing" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" ) @@ -56,102 +54,3 @@ func TestMatrixToSeriesSetSortsMetricLabels(t *testing.T) { {Name: "g", Value: "h"}, }, l) } - -func TestDeletedSeriesIterator(t *testing.T) { - t.Parallel() - cs := ConcreteSeries{labels: labels.FromStrings("foo", "bar")} - // Insert random stuff from (0, 1000). - for i := 0; i < 1000; i++ { - cs.samples = append(cs.samples, model.SamplePair{Timestamp: model.Time(i), Value: model.SampleValue(rand.Float64())}) - } - - cases := []struct { - r []model.Interval - }{ - {r: []model.Interval{{Start: 1, End: 20}}}, - {r: []model.Interval{{Start: 1, End: 10}, {Start: 12, End: 20}, {Start: 21, End: 23}, {Start: 25, End: 30}}}, - {r: []model.Interval{{Start: 1, End: 10}, {Start: 12, End: 20}, {Start: 20, End: 30}}}, - {r: []model.Interval{{Start: 1, End: 10}, {Start: 12, End: 23}, {Start: 25, End: 30}}}, - {r: []model.Interval{{Start: 1, End: 23}, {Start: 12, End: 20}, {Start: 25, End: 30}}}, - {r: []model.Interval{{Start: 1, End: 23}, {Start: 12, End: 20}, {Start: 25, End: 3000}}}, - {r: []model.Interval{{Start: 0, End: 2000}}}, - {r: []model.Interval{{Start: 500, End: 2000}}}, - {r: []model.Interval{{Start: 0, End: 200}}}, - {r: []model.Interval{{Start: 1000, End: 20000}}}, - } - - for _, c := range cases { - i := int64(-1) - it := NewDeletedSeriesIterator(NewConcreteSeriesIterator(&cs), c.r) - ranges := c.r[:] - - for it.Next() != chunkenc.ValNone { - i++ - for _, tr := range ranges { - if inbound(model.Time(i), tr) { - i = int64(tr.End + 1) - ranges = ranges[1:] - } - } - - require.Equal(t, true, i < 1000) - - ts, v := it.At() - require.Equal(t, int64(cs.samples[i].Timestamp), ts) - require.Equal(t, float64(cs.samples[i].Value), v) - } - - // There has been an extra call to Next(). - i++ - for _, tr := range ranges { - if inbound(model.Time(i), tr) { - i = int64(tr.End + 1) - ranges = ranges[1:] - } - } - - require.Equal(t, true, i >= 1000) - require.NoError(t, it.Err()) - } -} - -func TestDeletedIterator_WithSeek(t *testing.T) { - t.Parallel() - cs := ConcreteSeries{labels: labels.FromStrings("foo", "bar")} - // Insert random stuff from (0, 1000). - for i := 0; i < 1000; i++ { - cs.samples = append(cs.samples, model.SamplePair{Timestamp: model.Time(i), Value: model.SampleValue(rand.Float64())}) - } - - cases := []struct { - r []model.Interval - seek int64 - ok bool - seekedTs int64 - }{ - {r: []model.Interval{{Start: 1, End: 20}}, seek: 1, ok: true, seekedTs: 21}, - {r: []model.Interval{{Start: 1, End: 20}}, seek: 20, ok: true, seekedTs: 21}, - {r: []model.Interval{{Start: 1, End: 20}}, seek: 10, ok: true, seekedTs: 21}, - {r: []model.Interval{{Start: 1, End: 20}}, seek: 999, ok: true, seekedTs: 999}, - {r: []model.Interval{{Start: 1, End: 20}}, seek: 1000, ok: false}, - {r: []model.Interval{{Start: 1, End: 23}, {Start: 24, End: 40}, {Start: 45, End: 3000}}, seek: 1, ok: true, seekedTs: 41}, - {r: []model.Interval{{Start: 5, End: 23}, {Start: 24, End: 40}, {Start: 41, End: 3000}}, seek: 5, ok: false}, - {r: []model.Interval{{Start: 0, End: 2000}}, seek: 10, ok: false}, - {r: []model.Interval{{Start: 500, End: 2000}}, seek: 10, ok: true, seekedTs: 10}, - {r: []model.Interval{{Start: 500, End: 2000}}, seek: 501, ok: false}, - } - - for _, c := range cases { - it := NewDeletedSeriesIterator(NewConcreteSeriesIterator(&cs), c.r) - - require.NotEqual(t, c.ok, it.Seek(c.seek), chunkenc.ValNone) - if c.ok { - ts, _ := it.At() - require.Equal(t, c.seekedTs, ts) - } - } -} - -func inbound(t model.Time, interval model.Interval) bool { - return interval.Start <= t && t <= interval.End -} From 6ecad8a983302c46ab89ccb31176f621c84ecff5 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 3 Dec 2023 18:39:02 -0800 Subject: [PATCH 3/3] cleanup unused modules Signed-off-by: Ben Ye --- pkg/cortex/modules.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 6526531b2d..08f7dba55f 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -74,7 +74,6 @@ const ( StoreQueryable string = "store-queryable" QueryFrontend string = "query-frontend" QueryFrontendTripperware string = "query-frontend-tripperware" - DeleteRequestsStore string = "delete-requests-store" RulerStorage string = "ruler-storage" Ruler string = "ruler" Configs string = "configs" @@ -768,28 +767,28 @@ func (t *Cortex) setupModuleManager() error { OverridesExporter: {RuntimeConfig}, Distributor: {DistributorService, API}, DistributorService: {Ring, Overrides}, - Ingester: {IngesterService, Overrides, DeleteRequestsStore, API}, + Ingester: {IngesterService, Overrides, API}, IngesterService: {Overrides, RuntimeConfig, MemberlistKV}, - Flusher: {Overrides, DeleteRequestsStore, API}, - Queryable: {Overrides, DistributorService, Overrides, DeleteRequestsStore, Ring, API, StoreQueryable, MemberlistKV}, + Flusher: {Overrides, API}, + Queryable: {Overrides, DistributorService, Overrides, Ring, API, StoreQueryable, MemberlistKV}, Querier: {TenantFederation}, - StoreQueryable: {Overrides, Overrides, DeleteRequestsStore, MemberlistKV}, - QueryFrontendTripperware: {API, Overrides, DeleteRequestsStore}, + StoreQueryable: {Overrides, Overrides, MemberlistKV}, + QueryFrontendTripperware: {API, Overrides}, QueryFrontend: {QueryFrontendTripperware}, QueryScheduler: {API, Overrides}, - Ruler: {DistributorService, Overrides, DeleteRequestsStore, StoreQueryable, RulerStorage}, + Ruler: {DistributorService, Overrides, StoreQueryable, RulerStorage}, RulerStorage: {Overrides}, Configs: {API}, AlertManager: {API, MemberlistKV, Overrides}, Compactor: {API, MemberlistKV, Overrides}, StoreGateway: {API, Overrides, MemberlistKV}, - TenantDeletion: {API, Overrides, DeleteRequestsStore}, + TenantDeletion: {API, Overrides}, Purger: {TenantDeletion}, TenantFederation: {Queryable}, All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler}, } if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil { - deps[Ruler] = []string{Overrides, DeleteRequestsStore, RulerStorage} + deps[Ruler] = []string{Overrides, RulerStorage} } for mod, targets := range deps { if err := mm.AddDependency(mod, targets...); err != nil {