diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 16c02e3de1..134fcd78eb 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/codes" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -283,6 +284,9 @@ func TestDistributor_Push(t *testing.T) { request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata) response, err := ds[0].Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) + if err != nil { + err = errors.Cause(err) + } assert.Equal(t, tc.expectedError, err) // Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index db18fc2d09..01ff9bd4c5 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc/status" + "github.com/pkg/errors" "go.uber.org/atomic" ) @@ -116,7 +117,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb for _, i := range instances { go func(i instance) { err := callback(i.desc, i.indexes) - tracker.record(i.itemTrackers, err) + tracker.record(i, err) wg.Done() }(i) } @@ -138,7 +139,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb } } -func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { +func (b *batchTracker) record(instance instance, err error) { // If we reach the required number of successful puts on this sample, then decrement the // number of pending samples by one. // @@ -146,21 +147,30 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // * rpcsPending and rpcsPending guarantees only a single sendSamples goroutine will write to either channel // * succeeded, failed4xx, failed5xx and remaining guarantees that the "return decision" is made atomically // avoiding race condition + sampleTrackers := instance.itemTrackers for i := range sampleTrackers { if err != nil { // Track the number of errors by error family, and if it exceeds maxFailures // shortcut the waiting rpc. - errCount := sampleTrackers[i].recordError(err) + wrappedErr := errors.Wrapf(err, "addr=%s state=%s zone=%s", instance.desc.Addr, instance.desc.State, instance.desc.Zone) + errCount := instance.itemTrackers[i].recordError(wrappedErr) // We should return an error if we reach the maxFailure (quorum) on a given error family OR // we dont have any remaining ingesters to try // Ex: 2xx, 4xx, 5xx -> return 4xx // Ex: 2xx, 5xx, 4xx -> return 4xx // Ex: 4xx, 4xx, _ -> return 4xx // Ex: 5xx, _, 5xx -> return 5xx - if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 { + if errCount > int32(sampleTrackers[i].maxFailures) { if b.rpcsFailed.Inc() == 1 { - b.err <- sampleTrackers[i].getError() + b.err <- errors.Wrap(sampleTrackers[i].getError(), "maxFailure (quorum) on a given error family") } + continue + } + if sampleTrackers[i].remaining.Dec() == 0 { + if b.rpcsFailed.Inc() == 1 { + b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try") + } + continue } } else { // If we successfully push all samples to min success instances, @@ -177,7 +187,7 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // Ex: 4xx, 5xx, 2xx if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- sampleTrackers[i].getError() + b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try") } } }