Skip to content

Commit

Permalink
Optionally wait for the query-frontend to start up before rejecting r…
Browse files Browse the repository at this point in the history
…equests (#6621)

* Backoff and retry requests received while the query-frontend is starting up

* Add changelog entry.

* Address PR feedback: disable backoff by default, rename flag, make `isTerminalError` resilient to other states added in the future

* Make formatting of newQueryTripperware consistent.

* Check frontend is running earlier, to avoid unnecessary work and avoid other middleware running while frontend is in an inconsistent state

* Refactor to use `service.AwaitRunning()`

* Update changelog entry to reflect new behaviour and config option name

* Address PR feedback: `ie.` -> `i.e.`

* Address PR feedback: pass interface, not function

* Simplify code.

* Apply same logic to cardinality and series endpoints as well.

* Address PR feedback: remove implicit circular dependency.

* Update outdated comment.

* Make method and trace span names consistent
  • Loading branch information
charleskorn authored Nov 20, 2023
1 parent 880d23f commit a27952a
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* [ENHANCEMENT] Server: Add `-server.http-log-closed-connections-without-response-enabled` option to log details about connections to HTTP server that were closed before any data was sent back. This can happen if client doesn't manage to send complete HTTP headers before timeout. #6612
* [ENHANCEMENT] Query-frontend: include length of query, time since the earliest and latest points of a query, time since the earliest and latest points of a query, cached/uncached bytes in "query stats" logs. Time parameters (start/end/time) are always formatted as RFC3339 now. #6473 #6477
* [ENHANCEMENT] Distributor: added support for reducing the resolution of native histogram samples upon ingestion if the sample has too many buckets compared to `-validation.max-native-histogram-buckets`. This is enabled by default and can be turned off by setting `-validation.reduce-native-histogram-over-max-buckets` to `false`. #6535
* [ENHANCEMENT] Query-frontend: optionally wait for the frontend to complete startup if requests are received while the frontend is still starting. Disabled by default, set `-query-frontend.not-running-timeout` to a non-zero value to enable. #6621
* [ENHANCEMENT] Distributor: Include source IPs in OTLP push handler logs. #6652
* [ENHANCEMENT] Query-frontend: return clearer error message when a query request is received while shutting down. #6675
* [BUGFIX] Distributor: return server overload error in the event of exceeding the ingestion rate limit. #6549
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5519,6 +5519,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "not_running_timeout",
"required": false,
"desc": "Maximum time to wait for the query-frontend to become ready before rejecting requests received before the frontend was ready. 0 to disable (i.e. fail immediately if a request is received while the frontend is still starting up)",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "query-frontend.not-running-timeout",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "parallelize_shardable_queries",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1851,6 +1851,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of retries for a single request; beyond this, the downstream error is returned. (default 5)
-query-frontend.max-total-query-length duration
Limit the total query time range (end - start time). This limit is enforced in the query-frontend on the received query.
-query-frontend.not-running-timeout duration
[experimental] Maximum time to wait for the query-frontend to become ready before rejecting requests received before the frontend was ready. 0 to disable (i.e. fail immediately if a request is received while the frontend is still starting up)
-query-frontend.parallelize-shardable-queries
True to enable query sharding.
-query-frontend.querier-forget-delay duration
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ The following features are currently experimental:
- Lower TTL for cache entries overlapping the out-of-order samples ingestion window (re-using `-ingester.out-of-order-allowance` from ingesters)
- Use of Redis cache backend (`-query-frontend.results-cache.backend=redis`)
- Query blocking on a per-tenant basis (configured with the limit `blocked_queries`)
- Wait for the query-frontend to complete startup if a query request is received while it is starting up (`-query-frontend.not-running-timeout`)
- Query-scheduler
- `-query-scheduler.querier-forget-delay`
- Store-gateway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,13 @@ results_cache:
# CLI flag: -query-frontend.max-retries-per-request
[max_retries: <int> | default = 5]
# (experimental) Maximum time to wait for the query-frontend to become ready
# before rejecting requests received before the frontend was ready. 0 to disable
# (i.e. fail immediately if a request is received while the frontend is still
# starting up)
# CLI flag: -query-frontend.not-running-timeout
[not_running_timeout: <duration> | default = 0s]
# True to enable query sharding.
# CLI flag: -query-frontend.parallelize-shardable-queries
[parallelize_shardable_queries: <boolean> | default = false]
Expand Down
27 changes: 14 additions & 13 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval" category:"advanced"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
ShardedQueries bool `yaml:"parallelize_shardable_queries"`
DeprecatedCacheUnalignedRequests bool `yaml:"cache_unaligned_requests" category:"advanced" doc:"hidden"` // Deprecated: Deprecated in Mimir 2.10.0, remove in Mimir 2.12.0 (https://github.com/grafana/mimir/issues/5253)
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
NotRunningTimeout time.Duration `yaml:"not_running_timeout" category:"experimental"`
ShardedQueries bool `yaml:"parallelize_shardable_queries"`
DeprecatedCacheUnalignedRequests bool `yaml:"cache_unaligned_requests" category:"advanced" doc:"hidden"` // Deprecated: Deprecated in Mimir 2.10.0, remove in Mimir 2.12.0 (https://github.com/grafana/mimir/issues/5253)
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`

// CacheSplitter allows to inject a CacheSplitter to use for generating cache keys.
// If nil, the querymiddleware package uses a ConstSplitter with SplitQueriesByInterval.
Expand All @@ -66,6 +67,7 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "query-frontend.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.NotRunningTimeout, "query-frontend.not-running-timeout", 0, "Maximum time to wait for the query-frontend to become ready before rejecting requests received before the frontend was ready. 0 to disable (i.e. fail immediately if a request is received while the frontend is still starting up)")
f.DurationVar(&cfg.SplitQueriesByInterval, "query-frontend.split-queries-by-interval", 24*time.Hour, "Split range queries by an interval and execute in parallel. You should use a multiple of 24 hours to optimize querying blocks. 0 to disable it.")
f.BoolVar(&cfg.AlignQueriesWithStep, "query-frontend.align-queries-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "query-frontend.cache-results", false, "Cache query results.")
Expand Down Expand Up @@ -249,13 +251,11 @@ func newQueryTripperware(
))
}

queryInstantMiddleware := []Middleware{newLimitsMiddleware(limits, log)}

queryInstantMiddleware = append(
queryInstantMiddleware,
queryInstantMiddleware := []Middleware{
newLimitsMiddleware(limits, log),
newSplitInstantQueryByIntervalMiddleware(limits, log, engine, registerer),
queryBlockerMiddleware,
)
}

if cfg.ShardedQueries {
// Inject the cardinality estimation middleware after time-based splitting and
Expand Down Expand Up @@ -283,7 +283,8 @@ func newQueryTripperware(
registerer,
)

queryRangeMiddleware = append(queryRangeMiddleware,
queryRangeMiddleware = append(
queryRangeMiddleware,
newInstrumentMiddleware("querysharding", metrics),
queryshardingMiddleware,
)
Expand Down Expand Up @@ -311,8 +312,8 @@ func newQueryTripperware(
labels := next

if cfg.CacheResults {
cardinality = newCardinalityQueryCacheRoundTripper(c, limits, next, log, registerer)
labels = newLabelsQueryCacheRoundTripper(c, limits, next, log, registerer)
cardinality = newCardinalityQueryCacheRoundTripper(c, limits, cardinality, log, registerer)
labels = newLabelsQueryCacheRoundTripper(c, limits, labels, log, registerer)
}

return RoundTripFunc(func(r *http.Request) (*http.Response, error) {
Expand Down
73 changes: 73 additions & 0 deletions pkg/frontend/querymiddleware/running.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"
"fmt"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go/ext"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

func NewFrontendRunningRoundTripper(next http.RoundTripper, service services.Service, timeout time.Duration, log log.Logger) http.RoundTripper {
return &frontendRunningRoundTripper{
next: next,
service: service,
timeout: timeout,
log: log,
}
}

type frontendRunningRoundTripper struct {
next http.RoundTripper
service services.Service
timeout time.Duration
log log.Logger
}

func (f *frontendRunningRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
if err := awaitQueryFrontendServiceRunning(request.Context(), f.service, f.timeout, f.log); err != nil {
return nil, apierror.New(apierror.TypeUnavailable, err.Error())
}

return f.next.RoundTrip(request)
}

// This method is not on frontendRunningRoundTripper to make it easier to test this logic.
func awaitQueryFrontendServiceRunning(ctx context.Context, service services.Service, timeout time.Duration, log log.Logger) error {
if state := service.State(); state == services.Running {
// Fast path: frontend is already running, nothing more to do.
return nil
} else if timeout == 0 {
// If waiting for the frontend to be ready is disabled by config, and it's not ready, abort now.
return fmt.Errorf("frontend not running: %v", state)
}

spanLog, ctx := spanlogger.NewWithLogger(ctx, log, "awaitQueryFrontendServiceRunning")
defer spanLog.Finish()

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

if err := service.AwaitRunning(ctx); err != nil {
ext.Error.Set(spanLog.Span, true)

if ctx.Err() != nil {
level.Warn(spanLog).Log("msg", "frontend not running, timed out waiting for it to be running", "state", service.State(), "timeout", timeout)
return fmt.Errorf("frontend not running (is %v), timed out waiting for it to be running after %v", service.State(), timeout)
}

level.Warn(spanLog).Log("msg", "failed waiting for frontend to be running", "err", err)
return fmt.Errorf("frontend not running: %w", err)
}

return nil
}
85 changes: 85 additions & 0 deletions pkg/frontend/querymiddleware/running_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/require"
)

func TestAwaitQueryFrontendServiceRunning_ServiceIsReady(t *testing.T) {
run := func(ctx context.Context) error {
<-ctx.Done()
return nil
}

service := services.NewBasicService(nil, run, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), service))
defer func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), service)) }()

err := awaitQueryFrontendServiceRunning(context.Background(), service, time.Second, log.NewNopLogger())
require.NoError(t, err)
}

func TestAwaitQueryFrontendServiceRunning_ServiceIsNotReadyWaitDisabled(t *testing.T) {
startChan := make(chan struct{})
start := func(ctx context.Context) error {
<-startChan
return nil
}

service := services.NewBasicService(start, nil, nil)
require.NoError(t, service.StartAsync(context.Background()))
defer func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), service)) }()

err := awaitQueryFrontendServiceRunning(context.Background(), service, 0, log.NewNopLogger())
require.EqualError(t, err, "frontend not running: Starting")

close(startChan)
}

func TestAwaitQueryFrontendServiceRunning_ServiceIsNotReadyInitially(t *testing.T) {
startChan := make(chan struct{})
start := func(ctx context.Context) error {
<-startChan
return nil
}
run := func(ctx context.Context) error {
<-ctx.Done()
return nil
}

service := services.NewBasicService(start, run, nil)
require.NoError(t, service.StartAsync(context.Background()))
defer func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), service)) }()

go func() {
time.Sleep(500 * time.Millisecond)
close(startChan)
}()

err := awaitQueryFrontendServiceRunning(context.Background(), service, time.Second, log.NewNopLogger())
require.NoError(t, err)
}

func TestAwaitQueryFrontendServiceRunning_ServiceIsNotReadyAfterTimeout(t *testing.T) {
serviceChan := make(chan struct{})
start := func(ctx context.Context) error {
<-serviceChan
return nil
}

service := services.NewBasicService(start, nil, nil)
require.NoError(t, service.StartAsync(context.Background()))
defer func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), service)) }()

err := awaitQueryFrontendServiceRunning(context.Background(), service, 100*time.Millisecond, log.NewNopLogger())
require.EqualError(t, err, "frontend not running (is Starting), timed out waiting for it to be running after 100ms")

close(serviceChan)
}
1 change: 1 addition & 0 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (f *Frontend) stopping(_ error) error {
// RoundTripGRPC round trips a proto (instead of an HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if s := f.State(); s != services.Running {
// This should never happen: requests should be blocked by frontendRunningRoundTripper before they get here.
return nil, fmt.Errorf("frontend not running: %v", s)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ type Mimir struct {
Distributor *distributor.Distributor
Ingester *ingester.Ingester
Flusher *flusher.Flusher
Frontend *frontendv1.Frontend
FrontendV1 *frontendv1.Frontend
RuntimeConfig *runtimeconfig.Manager
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
Expand Down Expand Up @@ -944,8 +944,8 @@ func (t *Mimir) readyHandler(sm *services.Manager, shutdownRequested *atomic.Boo

// Query Frontend has a special check that makes sure that a querier is attached before it signals
// itself as ready
if t.Frontend != nil {
if err := t.Frontend.CheckReady(r.Context()); err != nil {
if t.FrontendV1 != nil {
if err := t.FrontendV1.CheckReady(r.Context()); err != nil {
http.Error(w, "Query Frontend not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,22 +700,23 @@ func (t *Mimir) initQueryFrontend() (serv services.Service, err error) {
return nil, err
}

// Wrap roundtripper into Tripperware.
roundTripper = t.QueryFrontendTripperware(roundTripper)

handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, t.Registerer, t.ActivityTracker)
t.API.RegisterQueryFrontendHandler(handler, t.BuildInfoHandler)

var frontendSvc services.Service
if frontendV1 != nil {
t.API.RegisterQueryFrontend1(frontendV1)
t.Frontend = frontendV1
t.FrontendV1 = frontendV1
frontendSvc = frontendV1
} else if frontendV2 != nil {
t.API.RegisterQueryFrontend2(frontendV2)
frontendSvc = frontendV2
}

// Wrap roundtripper into Tripperware and then wrap this with the roundtripper that checks that the frontend is ready to receive requests.
roundTripper = t.QueryFrontendTripperware(roundTripper)
roundTripper = querymiddleware.NewFrontendRunningRoundTripper(roundTripper, frontendSvc, t.Cfg.Frontend.QueryMiddleware.NotRunningTimeout, util_log.Logger)

handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, t.Registerer, t.ActivityTracker)
t.API.RegisterQueryFrontendHandler(handler, t.BuildInfoHandler)

w := services.NewFailureWatcher()
return services.NewBasicService(func(_ context.Context) error {
if frontendSvc != nil {
Expand Down

0 comments on commit a27952a

Please sign in to comment.