diff --git a/CHANGELOG.md b/CHANGELOG.md index c3b8626b32..9389500ed5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [CHANGE] DDBKV: Change metric name from `dynamodb_kv_read_capacity_total` to `dynamodb_kv_consumed_capacity_total` and include Delete, Put, Batch dimension. #5487 * [CHANGE] Compactor: Adding the userId on the compact dir path. #5524 * [CHANGE] Ingester: Remove deprecated ingester metrics. #5472 +* [CHANGE] Query Frontend: Expose `-querier.max-subquery-steps` to configure subquery max steps check. By default, the limit is set to 0, which is disabled. #5656 * [FEATURE] Store Gateway: Implementing multi level index cache. #5451 * [FEATURE] Ruler: Add support for disabling rule groups. #5521 * [FEATURE] Support object storage backends for runtime configuration file. #5292 @@ -102,6 +103,7 @@ * [BUGFIX] DDBKV: When no change detected in ring, retry the CAS until there is change. #5502 * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 * [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 +* [BUGFIX] Query Frontend: Fix query string being omitted in query stats log. #5655 ## 1.15.3 2023-06-22 diff --git a/VERSION b/VERSION index b6ea925546..f389f4bfea 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.16.0-rc.0 +1.16.0-rc.1 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 2e9ec58de4..cd79d59a55 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -157,6 +157,11 @@ querier: # CLI flag: -querier.default-evaluation-interval [default_evaluation_interval: | default = 1m] + # Max number of steps allowed for every subquery expression in query. Number + # of steps is calculated using subquery range / step. A value > 0 enables it. + # CLI flag: -querier.max-subquery-steps + [max_subquery_steps: | default = 0] + # Active query tracker monitors active queries, and writes them to the file in # given directory. If Cortex discovers any queries in this log during startup, # it will log them to the log file. Setting to empty value disables active diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5103df9a38..64e15b1ed5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3420,6 +3420,11 @@ The `querier_config` configures the Cortex querier. # CLI flag: -querier.default-evaluation-interval [default_evaluation_interval: | default = 1m] +# Max number of steps allowed for every subquery expression in query. Number of +# steps is calculated using subquery range / step. A value > 0 enables it. +# CLI flag: -querier.max-subquery-steps +[max_subquery_steps: | default = 0] + # Active query tracker monitors active queries, and writes them to the file in # given directory. If Cortex discovers any queries in this log during startup, # it will log them to the log file. Setting to empty value disables active query diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 0dbbadeb10..5b3c0029c0 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -220,6 +220,11 @@ func TestQueryFrontendSubQueryStepSize(t *testing.T) { minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"]) require.NoError(t, s.StartAndWaitReady(minio)) + + // Enable subquery step size check. + flags = mergeFlags(e2e.EmptyFlags(), map[string]string{ + "-querier.max-subquery-steps": "11000", + }) return cortexConfigFile, flags }, }) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 64cce59870..7858483c68 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -486,6 +486,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro t.Overrides, queryAnalyzer, t.Cfg.Querier.DefaultEvaluationInterval, + t.Cfg.Querier.MaxSubQuerySteps, ) return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 080712056e..36263c2c5d 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -98,7 +98,7 @@ type Handler struct { } // NewHandler creates a new frontend handler. -func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler { +func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler { h := &Handler{ cfg: cfg, log: log, @@ -407,17 +407,17 @@ func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) } func formatQueryString(queryString url.Values) (fields []interface{}) { - var queryFields []string + var queryFields []interface{} for k, v := range queryString { // If `query` or `match[]` field exists, we always put it as the last field. if k == "query" || k == "match[]" { - queryFields = []string{fmt.Sprintf("param_%s", k), strings.Join(v, ",")} + queryFields = []interface{}{fmt.Sprintf("param_%s", k), strings.Join(v, ",")} continue } fields = append(fields, fmt.Sprintf("param_%s", k), strings.Join(v, ",")) } if len(queryFields) > 0 { - fields = append(fields, queryFields) + fields = append(fields, queryFields...) } return fields } diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index d075f5ec4c..7cf4a4bd34 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -1,12 +1,15 @@ package transport import ( + "bytes" "context" "io" "net/http" "net/http/httptest" + "net/url" "strings" "testing" + "time" "github.com/go-kit/log" "github.com/pkg/errors" @@ -16,6 +19,8 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" ) type roundTripperFunc func(*http.Request) (*http.Response, error) @@ -274,8 +279,44 @@ func TestHandler_ServeHTTP(t *testing.T) { assert.Equal(t, tt.expectedMetrics, count) if tt.additionalMetricsCheckFunc != nil { - tt.additionalMetricsCheckFunc(handler.(*Handler)) + tt.additionalMetricsCheckFunc(handler) } }) } } + +func TestReportQueryStatsFormat(t *testing.T) { + outputBuf := bytes.NewBuffer(nil) + logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf)) + handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil) + + userID := "fake" + queryString := url.Values(map[string][]string{"query": {"up"}}) + req, err := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil) + require.NoError(t, err) + req.Header = http.Header{ + "User-Agent": []string{"Grafana"}, + } + resp := &http.Response{ + ContentLength: 1000, + } + stats := &querier_stats.QueryStats{ + Stats: querier_stats.Stats{ + WallTime: 3 * time.Second, + FetchedSeriesCount: 100, + FetchedChunksCount: 200, + FetchedSamplesCount: 300, + FetchedChunkBytes: 1024, + FetchedDataBytes: 2048, + }, + } + responseErr := errors.New("foo_err") + handler.reportQueryStats(req, userID, queryString, time.Second, stats, responseErr, http.StatusOK, resp) + + data, err := io.ReadAll(outputBuf) + require.NoError(t, err) + + expectedLog := `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 status_code=200 response_size=1000 query_length=2 user_agent=Grafana error=foo_err param_query=up +` + require.Equal(t, expectedLog, string(data)) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index df13fb7ce6..d66697674d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -64,6 +64,9 @@ type Config struct { // step if not specified. DefaultEvaluationInterval time.Duration `yaml:"default_evaluation_interval"` + // Limit of number of steps allowed for every subquery expression in a query. + MaxSubQuerySteps int64 `yaml:"max_subquery_steps"` + // Directory for ActiveQueryTracker. If empty, ActiveQueryTracker will be disabled and MaxConcurrent will not be applied (!). // ActiveQueryTracker logs queries that were active during the last crash, but logs them on the next startup. // However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL @@ -114,6 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") + f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") } // Validate the config diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 5ade2abd52..6276be72bd 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -74,6 +74,7 @@ func TestRoundTrip(t *testing.T) { nil, qa, time.Minute, + 0, ) for i, tc := range []struct { diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 93fa1234cd..6aefe4ccec 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -103,6 +103,7 @@ func NewQueryTripperware( limits Limits, queryAnalyzer querysharding.Analyzer, defaultSubQueryInterval time.Duration, + maxSubQuerySteps int64, ) Tripperware { // Per tenant query metrics. queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ @@ -145,10 +146,10 @@ func NewQueryTripperware( activeUsers.UpdateUserTimestamp(userStr, time.Now()) queriesPerTenant.WithLabelValues(op, userStr).Inc() - if isQuery || isQueryRange { + if maxSubQuerySteps > 0 && (isQuery || isQueryRange) { query := r.FormValue("query") // Check subquery step size. - if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil { + if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, maxSubQuerySteps); err != nil { return nil, err } } diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 8497491ba6..e52514ee8d 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -109,46 +109,69 @@ func TestRoundTrip(t *testing.T) { path, expectedBody string expectedErr error limits Limits + maxSubQuerySteps int64 }{ { - path: "/foo", - expectedBody: "bar", - limits: defaultOverrides, + path: "/foo", + expectedBody: "bar", + limits: defaultOverrides, + maxSubQuerySteps: 11000, }, { - path: queryExemplar, - expectedBody: "bar", - limits: defaultOverrides, + path: queryExemplar, + expectedBody: "bar", + limits: defaultOverrides, + maxSubQuerySteps: 11000, }, { - path: queryRange, - expectedBody: responseBody, - limits: defaultOverrides, + path: queryRange, + expectedBody: responseBody, + limits: defaultOverrides, + maxSubQuerySteps: 11000, }, { - path: query, - expectedBody: "bar", - limits: defaultOverrides, + path: query, + expectedBody: "bar", + limits: defaultOverrides, + maxSubQuerySteps: 11000, }, { - path: queryNonShardable, - expectedBody: "bar", - limits: defaultOverrides, + path: queryNonShardable, + expectedBody: "bar", + limits: defaultOverrides, + maxSubQuerySteps: 11000, }, { - path: queryNonShardable, - expectedBody: "bar", - limits: shardingOverrides, + path: queryNonShardable, + expectedBody: "bar", + limits: shardingOverrides, + maxSubQuerySteps: 11000, }, { - path: query, - expectedBody: responseBody, - limits: shardingOverrides, + path: query, + expectedBody: responseBody, + limits: shardingOverrides, + maxSubQuerySteps: 11000, }, + // Shouldn't hit subquery step limit because max steps is set to 0 so this check is disabled. { - path: querySubqueryStepSizeTooSmall, - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 11000), - limits: defaultOverrides, + path: querySubqueryStepSizeTooSmall, + expectedBody: "bar", + limits: defaultOverrides, + maxSubQuerySteps: 0, + }, + // Shouldn't hit subquery step limit because max steps is higher, which is 100K. + { + path: querySubqueryStepSizeTooSmall, + expectedBody: "bar", + limits: defaultOverrides, + maxSubQuerySteps: 100000, + }, + { + path: querySubqueryStepSizeTooSmall, + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 11000), + limits: defaultOverrides, + maxSubQuerySteps: 11000, }, } { t.Run(tc.path, func(t *testing.T) { @@ -177,6 +200,7 @@ func TestRoundTrip(t *testing.T) { tc.limits, querysharding.NewQueryAnalyzer(), time.Minute, + tc.maxSubQuerySteps, ) resp, err := tw(downstream).RoundTrip(req) if tc.expectedErr == nil {