Skip to content

Commit

Permalink
Create a experimental HealthCheck GRPC Handler (#6225)
Browse files Browse the repository at this point in the history
* Implementing HealthCheck grpc handlers

Signed-off-by: alanprot <[email protected]>

* tests

Signed-off-by: alanprot <[email protected]>

* lint

Signed-off-by: alanprot <[email protected]>

* Adding tests

Signed-off-by: alanprot <[email protected]>

* Name to the hc service + changelog

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Oct 8, 2024
1 parent f971a60 commit e6e9fea
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224

## 1.18.0 2024-09-03
Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,24 @@ querier:
# CLI flag: -querier.store-gateway-client.grpc-compression
[grpc_compression: <string> | default = ""]

# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each
# target and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before
# considering a target unhealthy. 0 means disabled.
# CLI flag: -querier.store-gateway-client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]

# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -querier.store-gateway-client.interval
[interval: <duration> | default = 5s]

# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -querier.store-gateway-client.timeout
[timeout: <duration> | default = 1s]

# If enabled, store gateway query stats will be logged using `info` log level.
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]
Expand Down
36 changes: 36 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3099,6 +3099,24 @@ grpc_client_config:
# CLI flag: -ingester.client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]
# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target
# and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before considering
# a target unhealthy. 0 means disabled.
# CLI flag: -ingester.client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]
# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -ingester.client.interval
[interval: <duration> | default = 5s]
# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -ingester.client.timeout
[timeout: <duration> | default = 1s]
# Max inflight push requests that this ingester client can handle. This limit is
# per-ingester-client. Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.client.max-inflight-push-requests
Expand Down Expand Up @@ -3815,6 +3833,24 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-client.grpc-compression
[grpc_compression: <string> | default = ""]
# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target
# and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before considering
# a target unhealthy. 0 means disabled.
# CLI flag: -querier.store-gateway-client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]
# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -querier.store-gateway-client.interval
[interval: <duration> | default = 5s]
# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -querier.store-gateway-client.timeout
[timeout: <duration> | default = 1s]
# If enabled, store gateway query stats will be logged using `info` log level.
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]
Expand Down
20 changes: 18 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/modules"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
Expand All @@ -65,6 +66,7 @@ const (
Server string = "server"
Distributor string = "distributor"
DistributorService string = "distributor-service"
GrpcClientService string = "grpcclient-service"
Ingester string = "ingester"
IngesterService string = "ingester-service"
Flusher string = "flusher"
Expand Down Expand Up @@ -230,6 +232,19 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
return t.Distributor, nil
}

func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) {
s := grpcclient.NewHealthCheckInterceptors(util_log.Logger)
if t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.UnhealthyThreshold > 0 {
t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.HealthCheckInterceptors = s
}

if t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.UnhealthyThreshold > 0 {
t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.HealthCheckInterceptors = s
}

return s, nil
}

func (t *Cortex) initDistributor() (serv services.Service, err error) {
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor)

Expand Down Expand Up @@ -754,6 +769,7 @@ func (t *Cortex) setupModuleManager() error {
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule)
mm.RegisterModule(GrpcClientService, t.initGrpcClientServices, modules.UserInvisibleModule)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(IngesterService, t.initIngesterService, modules.UserInvisibleModule)
mm.RegisterModule(Flusher, t.initFlusher)
Expand Down Expand Up @@ -782,14 +798,14 @@ func (t *Cortex) setupModuleManager() error {
Ring: {API, RuntimeConfig, MemberlistKV},
Overrides: {RuntimeConfig},
OverridesExporter: {RuntimeConfig},
Distributor: {DistributorService, API},
Distributor: {DistributorService, API, GrpcClientService},
DistributorService: {Ring, Overrides},
Ingester: {IngesterService, Overrides, API},
IngesterService: {Overrides, RuntimeConfig, MemberlistKV},
Flusher: {Overrides, API},
Queryable: {Overrides, DistributorService, Overrides, Ring, API, StoreQueryable, MemberlistKV},
Querier: {TenantFederation},
StoreQueryable: {Overrides, Overrides, MemberlistKV},
StoreQueryable: {Overrides, Overrides, MemberlistKV, GrpcClientService},
QueryFrontendTripperware: {API, Overrides},
QueryFrontend: {QueryFrontendTripperware},
QueryScheduler: {API, Overrides},
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
GRPCClientConfig grpcclient.ConfigWithHealthCheck `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
Expand Down
33 changes: 19 additions & 14 deletions pkg/querier/store_gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/tls"
)

func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
func newStoreGatewayClientFactory(clientCfg grpcclient.ConfigWithHealthCheck, reg prometheus.Registerer) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "storegateway_client_request_duration_seconds",
Expand All @@ -31,7 +31,7 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Re
}
}

func dialStoreGatewayClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
func dialStoreGatewayClient(clientCfg grpcclient.ConfigWithHealthCheck, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
if err != nil {
return nil, err
Expand Down Expand Up @@ -69,15 +69,18 @@ func (c *storeGatewayClient) RemoteAddress() string {

func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool {
// We prefer sane defaults instead of exposing further config options.
clientCfg := grpcclient.Config{
MaxRecvMsgSize: 100 << 20,
MaxSendMsgSize: 16 << 20,
GRPCCompression: clientConfig.GRPCCompression,
RateLimit: 0,
RateLimitBurst: 0,
BackoffOnRatelimits: false,
TLSEnabled: clientConfig.TLSEnabled,
TLS: clientConfig.TLS,
clientCfg := grpcclient.ConfigWithHealthCheck{
Config: grpcclient.Config{
MaxRecvMsgSize: 100 << 20,
MaxSendMsgSize: 16 << 20,
GRPCCompression: clientConfig.GRPCCompression,
RateLimit: 0,
RateLimitBurst: 0,
BackoffOnRatelimits: false,
TLSEnabled: clientConfig.TLSEnabled,
TLS: clientConfig.TLS,
},
HealthCheckConfig: clientConfig.HealthCheckConfig,
}
poolCfg := client.PoolConfig{
CheckInterval: time.Minute,
Expand All @@ -96,13 +99,15 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf
}

type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
GRPCCompression string `yaml:"grpc_compression"`
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
GRPCCompression string `yaml:"grpc_compression"`
HealthCheckConfig grpcclient.HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."`
}

func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f)
}
2 changes: 1 addition & 1 deletion pkg/querier/store_gateway_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Test_newStoreGatewayClientFactory(t *testing.T) {

// Create a client factory and query back the mocked service
// with different clients.
cfg := grpcclient.Config{}
cfg := grpcclient.ConfigWithHealthCheck{}
flagext.DefaultValues(&cfg)

reg := prometheus.NewPedanticRegistry()
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ type Config struct {
SignWriteRequestsEnabled bool `yaml:"-"`
}

type ConfigWithHealthCheck struct {
Config `yaml:",inline"`
HealthCheckConfig HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", "", f)
}

func (cfg *ConfigWithHealthCheck) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix, defaultGrpcCompression, f)
cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f)
}

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
Expand Down Expand Up @@ -75,6 +85,15 @@ func (cfg *Config) CallOptions() []grpc.CallOption {
return opts
}

func (cfg *ConfigWithHealthCheck) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
if cfg.HealthCheckConfig.HealthCheckInterceptors != nil {
unaryClientInterceptors = append(unaryClientInterceptors, cfg.HealthCheckConfig.UnaryHealthCheckInterceptor(cfg))
streamClientInterceptors = append(streamClientInterceptors, cfg.HealthCheckConfig.StreamClientInterceptor(cfg))
}

return cfg.Config.DialOption(unaryClientInterceptors, streamClientInterceptors)
}

// DialOption returns the config as a grpc.DialOptions.
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
Expand Down
Loading

0 comments on commit e6e9fea

Please sign in to comment.