Skip to content

Commit

Permalink
Refactor querier worker to handle both query-frontend and query-sched…
Browse files Browse the repository at this point in the history
…uler (cortexproject#3457)


Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Nov 4, 2020
1 parent e69d628 commit eba13a8
Show file tree
Hide file tree
Showing 19 changed files with 770 additions and 1,096 deletions.
10 changes: 7 additions & 3 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@ Duration arguments should be specified with a unit like `5s` or `3h`. Valid time

Maximum number of samples a single query can load into memory, to avoid blowing up on enormous queries.

The next three options only apply when the querier is used together with the Query Frontend:
The next three options only apply when the querier is used together with the Query Frontend or Query Scheduler:

- `-querier.frontend-address`

Address of query frontend service, used by workers to find the frontend which will give them queries to execute.

- `-querier.scheduler-address`

Address of query scheduler service, used by workers to find the scheduler which will give them queries to execute. If set, `-querier.frontend-address` is ignored, and querier will use query scheduler.

- `-querier.dns-lookup-period`

How often the workers will query DNS to re-check where the frontend is.
How often the workers will query DNS to re-check where the query frontend or query scheduler is.

- `-querier.worker-parallelism`

Number of simultaneous queries to process, per query frontend.
Number of simultaneous queries to process, per query frontend or scheduler.
See note on `-querier.max-concurrent`

- `-querier.worker-match-max-concurrent`
Expand Down
33 changes: 15 additions & 18 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ api:
[limits: <limits_config>]

# The frontend_worker_config configures the worker - running within the Cortex
# querier - picking up and executing queries enqueued by the query-frontend.
# querier - picking up and executing queries enqueued by the query-frontend or
# query-scheduler.
[frontend_worker: <frontend_worker_config>]

# The query_frontend_config configures the Cortex query-frontend.
Expand Down Expand Up @@ -2607,7 +2608,7 @@ grpc_client_config:

### `frontend_worker_config`

The `frontend_worker_config` configures the worker - running within the Cortex querier - picking up and executing queries enqueued by the query-frontend.
The `frontend_worker_config` configures the worker - running within the Cortex querier - picking up and executing queries enqueued by the query-frontend or query-scheduler.

```yaml
# Address of query frontend service, in host:port format. If
Expand All @@ -2617,7 +2618,18 @@ The `frontend_worker_config` configures the worker - running within the Cortex q
# CLI flag: -querier.frontend-address
[frontend_address: <string> | default = ""]
# Number of simultaneous queries to process per query frontend.
# Hostname (and port) of scheduler that querier will periodically resolve,
# connect to and receive queries from. If set, takes precedence over
# -querier.frontend-address.
# CLI flag: -querier.scheduler-address
[scheduler_address: <string> | default = ""]
# How often to query DNS for query-frontend or query-scheduler address.
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 10s]
# Number of simultaneous queries to process per query-frontend or
# query-scheduler.
# CLI flag: -querier.worker-parallelism
[parallelism: <int> | default = 10]
Expand All @@ -2626,10 +2638,6 @@ The `frontend_worker_config` configures the worker - running within the Cortex q
# CLI flag: -querier.worker-match-max-concurrent
[match_max_concurrent: <boolean> | default = false]
# How often to query DNS.
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 10s]
# Querier ID, sent to frontend service to identify requests from the same
# querier. Defaults to hostname.
# CLI flag: -querier.id
Expand Down Expand Up @@ -2697,17 +2705,6 @@ grpc_client_config:
# Skip validating server certificate.
# CLI flag: -querier.frontend-client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]
# Hostname (and port) of scheduler that querier will periodically resolve,
# connect to and receive queries from. If set, takes precedence over
# -querier.frontend-address.
# CLI flag: -querier.scheduler-address
[scheduler_address: <string> | default = ""]
# How often to resolve the scheduler-address, in order to look for new
# query-scheduler instances.
# CLI flag: -querier.scheduler-dns-lookup-period
[scheduler_dns_lookup_period: <duration> | default = 10s]
```

### `etcd_config`
Expand Down
2 changes: 0 additions & 2 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ func preCortex14Flags(flags map[string]string) map[string]string {
"-store-gateway.sharding-ring.replication-factor": "",
// Query-scheduler has been introduced in 1.6.0
"-frontend.scheduler-dns-lookup-period": "",
"-querier.scheduler-dns-lookup-period": "",
})
}

func preCortex16Flags(flags map[string]string) map[string]string {
return e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
// Query-scheduler has been introduced in 1.6.0
"-frontend.scheduler-dns-lookup-period": "",
"-querier.scheduler-dns-lookup-period": "",
})
}

Expand Down
4 changes: 2 additions & 2 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[
"-querier.frontend-client.backoff-max-period": "100ms",
"-querier.frontend-client.backoff-retries": "1",
"-querier.worker-parallelism": "1",
// Quickly detect query-scheduler when running it.
"-querier.scheduler-dns-lookup-period": "1s",
// Quickly detect query-frontend and query-scheduler when running it.
"-querier.dns-lookup-period": "1s",
// Store-gateway ring backend.
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.store": "consul",
Expand Down
42 changes: 21 additions & 21 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,27 @@ type Config struct {
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`

API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
Storage storage.Config `yaml:"storage"`
ChunkStore chunk.StoreConfig `yaml:"chunk_store"`
Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
LimitsConfig validation.Limits `yaml:"limits"`
Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"`
Worker querier_worker.CombinedWorkerConfig `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
QueryRange queryrange.Config `yaml:"query_range"`
TableManager chunk.TableManagerConfig `yaml:"table_manager"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
PurgerConfig purger.Config `yaml:"purger"`
API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
Storage storage.Config `yaml:"storage"`
ChunkStore chunk.StoreConfig `yaml:"chunk_store"`
Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
LimitsConfig validation.Limits `yaml:"limits"`
Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
QueryRange queryrange.Config `yaml:"query_range"`
TableManager chunk.TableManagerConfig `yaml:"table_manager"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
PurgerConfig purger.Config `yaml:"purger"`

Ruler ruler.Config `yaml:"ruler"`
Configs configs.Config `yaml:"configs"`
Expand Down
14 changes: 10 additions & 4 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
prom_storage "github.com/prometheus/prometheus/storage"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/server"

"github.com/cortexproject/cortex/pkg/alertmanager"
Expand Down Expand Up @@ -285,10 +286,10 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
} else {
// Single binary mode requires a query frontend endpoint for the worker. If no frontend or scheduler endpoint
// is configured, Cortex will default to using frontend on localhost on it's own GRPC listening port.
if t.Cfg.Worker.WorkerV1.FrontendAddress == "" || t.Cfg.Worker.WorkerV2.SchedulerAddress == "" {
if t.Cfg.Worker.FrontendAddress == "" || t.Cfg.Worker.SchedulerAddress == "" {
address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort)
level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address)
t.Cfg.Worker.WorkerV1.FrontendAddress = address
t.Cfg.Worker.FrontendAddress = address
}

// If queries are processed using the external HTTP Server, we need wrap the internal querier with
Expand All @@ -297,8 +298,13 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
internalQuerierRouter = t.API.AuthMiddleware.Wrap(internalQuerierRouter)
}

// If neither frontend address or scheduler address is configured, no worker will be created.
return querier_worker.InitQuerierWorker(t.Cfg.Worker, t.Cfg.Querier, internalQuerierRouter, util.Logger)
// If neither frontend address or scheduler address is configured, no worker is needed.
if t.Cfg.Worker.FrontendAddress == "" && t.Cfg.Worker.SchedulerAddress == "" {
return nil, nil
}

t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util.Logger, prometheus.DefaultRegisterer)
}

func (t *Cortex) initStoreQueryables() (services.Service, error) {
Expand Down
10 changes: 3 additions & 7 deletions pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -233,14 +232,11 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
logger = l
}

var (
workerConfig querier_worker.WorkerConfig
querierConfig querier.Config
)
var workerConfig querier_worker.Config
flagext.DefaultValues(&workerConfig)
workerConfig.Parallelism = 1
workerConfig.MatchMaxConcurrency = matchMaxConcurrency
querierConfig.MaxConcurrent = 1
workerConfig.MaxConcurrentRequests = 1

// localhost:0 prevents firewall warnings on Mac OS X.
grpcListen, err := net.Listen("tcp", "localhost:0")
Expand Down Expand Up @@ -283,7 +279,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
go grpcServer.Serve(grpcListen) //nolint:errcheck

var worker services.Service
worker, err = querier_worker.NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger)
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))

Expand Down
10 changes: 3 additions & 7 deletions pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -178,14 +177,11 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
logger = l
}

var (
workerConfig querier_worker.WorkerConfig
querierConfig querier.Config
)
var workerConfig querier_worker.Config
flagext.DefaultValues(&workerConfig)
workerConfig.Parallelism = 1
workerConfig.MatchMaxConcurrency = matchMaxConcurrency
querierConfig.MaxConcurrent = 1
workerConfig.MaxConcurrentRequests = 1

// localhost:0 prevents firewall warnings on Mac OS X.
grpcListen, err := net.Listen("tcp", "localhost:0")
Expand Down Expand Up @@ -227,7 +223,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
go grpcServer.Serve(grpcListen) //nolint:errcheck

var worker services.Service
worker, err = querier_worker.NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger)
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))

Expand Down
55 changes: 0 additions & 55 deletions pkg/querier/worker/config.go

This file was deleted.

Loading

0 comments on commit eba13a8

Please sign in to comment.