From 810c9dc3bd88e158325bf665993e5d7987990c0a Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Mon, 18 Sep 2023 03:56:11 +0000 Subject: [PATCH] can not use errors wrap, otherwise httpgrpc error will be converted to err, which will be considered as 5xx error later --- pkg/distributor/distributor_test.go | 6 +----- pkg/ring/batch.go | 10 +++++----- pkg/util/httpgrpcutil/errors.go | 23 +++++++++++++++++++++++ 3 files changed, 29 insertions(+), 10 deletions(-) create mode 100644 pkg/util/httpgrpcutil/errors.go diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 134fcd78eb6..d0b18f689cb 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -17,7 +17,6 @@ import ( "google.golang.org/grpc/codes" "github.com/go-kit/log" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -284,10 +283,7 @@ func TestDistributor_Push(t *testing.T) { request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata) response, err := ds[0].Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) - if err != nil { - err = errors.Cause(err) - } - assert.Equal(t, tc.expectedError, err) + assert.Equal(t, status.Code(tc.expectedError), status.Code(err)) // Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum // is reached, when we reach this point the 3rd ingester may not have received series/metadata diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 01ff9bd4c57..04616907af0 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -3,11 +3,11 @@ package ring import ( "context" "fmt" + "github.com/cortexproject/cortex/pkg/util/httpgrpcutil" "sync" "google.golang.org/grpc/status" - "github.com/pkg/errors" "go.uber.org/atomic" ) @@ -152,7 +152,7 @@ func (b *batchTracker) record(instance instance, err error) { if err != nil { // Track the number of errors by error family, and if it exceeds maxFailures // shortcut the waiting rpc. - wrappedErr := errors.Wrapf(err, "addr=%s state=%s zone=%s", instance.desc.Addr, instance.desc.State, instance.desc.Zone) + wrappedErr := httpgrpcutil.WrapHttpGrpcError(err, "addr=%s state=%s zone=%s", instance.desc.Addr, instance.desc.State, instance.desc.Zone) errCount := instance.itemTrackers[i].recordError(wrappedErr) // We should return an error if we reach the maxFailure (quorum) on a given error family OR // we dont have any remaining ingesters to try @@ -162,13 +162,13 @@ func (b *batchTracker) record(instance instance, err error) { // Ex: 5xx, _, 5xx -> return 5xx if errCount > int32(sampleTrackers[i].maxFailures) { if b.rpcsFailed.Inc() == 1 { - b.err <- errors.Wrap(sampleTrackers[i].getError(), "maxFailure (quorum) on a given error family") + b.err <- httpgrpcutil.WrapHttpGrpcError(sampleTrackers[i].getError(), "maxFailure (quorum) on a given error family") } continue } if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try") + b.err <- httpgrpcutil.WrapHttpGrpcError(sampleTrackers[i].getError(), "not enough remaining instances to try") } continue } @@ -187,7 +187,7 @@ func (b *batchTracker) record(instance instance, err error) { // Ex: 4xx, 5xx, 2xx if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try") + b.err <- httpgrpcutil.WrapHttpGrpcError(sampleTrackers[i].getError(), "not enough remaining instances to try") } } } diff --git a/pkg/util/httpgrpcutil/errors.go b/pkg/util/httpgrpcutil/errors.go new file mode 100644 index 00000000000..6100bd58550 --- /dev/null +++ b/pkg/util/httpgrpcutil/errors.go @@ -0,0 +1,23 @@ +package httpgrpcutil + +import ( + "fmt" + "github.com/weaveworks/common/httpgrpc" + "net/http" +) + +func WrapHttpGrpcError(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + msg := fmt.Sprintf(format, args...) + resp, ok := httpgrpc.HTTPResponseFromError(err) + if !ok { + return httpgrpc.Errorf(http.StatusInternalServerError, "%s, %s", msg, err) + } + return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{ + Code: resp.Code, + Headers: resp.Headers, + Body: []byte(fmt.Sprintf("%s, %s", msg, err)), + }) +}