From 76cd0bafec859215b94d79201a406376ae038508 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 9 Oct 2024 18:11:59 -0700 Subject: [PATCH] Fix healthcheck interceptors (#6257) Signed-off-by: alanprot --- CHANGELOG.md | 2 +- pkg/util/grpcclient/health_check.go | 47 ++++++++++++++---------- pkg/util/grpcclient/health_check_test.go | 23 +++++++++--- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d815747bce..69e340bbf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ * [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182 * [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195 * [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209 -* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 +* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 ## 1.18.0 2024-09-03 diff --git a/pkg/util/grpcclient/health_check.go b/pkg/util/grpcclient/health_check.go index ad378e73b5..9cbcc064d5 100644 --- a/pkg/util/grpcclient/health_check.go +++ b/pkg/util/grpcclient/health_check.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "io" "sync" "time" @@ -55,15 +56,17 @@ type HealthCheckInterceptors struct { activeInstances map[string]*healthCheckEntry instanceGcTimeout time.Duration - healthClientFactory func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient + healthClientFactory func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) } func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors { h := &HealthCheckInterceptors{ - logger: logger, - instanceGcTimeout: 2 * time.Minute, - healthClientFactory: grpc_health_v1.NewHealthClient, - activeInstances: make(map[string]*healthCheckEntry), + logger: logger, + instanceGcTimeout: 2 * time.Minute, + healthClientFactory: func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) { + return grpc_health_v1.NewHealthClient(cc), cc + }, + activeInstances: make(map[string]*healthCheckEntry), } h.Service = services. @@ -107,16 +110,6 @@ func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry { func (h *HealthCheckInterceptors) iteration(ctx context.Context) error { level.Debug(h.logger).Log("msg", "Performing health check", "registeredInstances", len(h.registeredInstances())) for _, instance := range h.registeredInstances() { - dialOpts, err := instance.clientConfig.Config.DialOption(nil, nil) - if err != nil { - return err - } - conn, err := grpc.NewClient(instance.address, dialOpts...) - c := h.healthClientFactory(conn) - if err != nil { - return err - } - if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout { h.Lock() delete(h.activeInstances, instance.address) @@ -131,11 +124,27 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error { instance.lastCheckTime.Store(time.Now()) go func(i *healthCheckEntry) { - if err := i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() { - level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err) + dialOpts, err := i.clientConfig.Config.DialOption(nil, nil) + if err != nil { + level.Error(h.logger).Log("msg", "error creating dialOpts to perform healthcheck", "address", i.address, "err", err) + return + } + conn, err := grpc.NewClient(i.address, dialOpts...) + if err != nil { + level.Error(h.logger).Log("msg", "error creating client to perform healthcheck", "address", i.address, "err", err) + return } - if err := conn.Close(); err != nil { - level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err) + + client, closer := h.healthClientFactory(conn) + + defer func() { + if err := closer.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err) + } + }() + + if err := i.recordHealth(healthCheck(client, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() { + level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err) } }(instance) } diff --git a/pkg/util/grpcclient/health_check_test.go b/pkg/util/grpcclient/health_check_test.go index 5483befa43..6491878506 100644 --- a/pkg/util/grpcclient/health_check_test.go +++ b/pkg/util/grpcclient/health_check_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "testing" "time" @@ -20,7 +21,13 @@ import ( type healthClientMock struct { grpc_health_v1.HealthClient - err atomic.Error + err atomic.Error + open atomic.Bool +} + +func (h *healthClientMock) Close() error { + h.open.Store(false) + return nil } func (h *healthClientMock) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { @@ -36,8 +43,9 @@ func TestNewHealthCheckService(t *testing.T) { i.instanceGcTimeout = time.Second * 5 hMock := &healthClientMock{} - i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient { - return hMock + i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) { + hMock.open.Store(true) + return hMock, hMock } require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -79,6 +87,8 @@ func TestNewHealthCheckService(t *testing.T) { cortex_testutil.Poll(t, i.instanceGcTimeout*2, 0, func() interface{} { return len(i.registeredInstances()) }) + + require.False(t, hMock.open.Load()) } func TestNewHealthCheckInterceptors(t *testing.T) { @@ -92,8 +102,9 @@ func TestNewHealthCheckInterceptors(t *testing.T) { Timeout: time.Second, }, } - i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient { - return hMock + i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) { + hMock.open.Store(true) + return hMock, hMock } ui := i.UnaryHealthCheckInterceptor(&cfg) @@ -113,6 +124,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) { // first health check require.NoError(t, i.iteration(context.Background())) + require.False(t, hMock.open.Load()) //Should second call even with error require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker)) @@ -121,6 +133,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) { // Second Healthcheck -> should mark as unhealthy require.NoError(t, i.iteration(context.Background())) + require.False(t, hMock.open.Load()) cortex_testutil.Poll(t, time.Second, true, func() interface{} { return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)