From a064bce0d1b34d82dda2581810d90a8e7a636df7 Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Fri, 15 Sep 2023 12:22:59 -0400 Subject: [PATCH] =?UTF-8?q?add=20some=20logs=20to=20ring=20batch=20to=20he?= =?UTF-8?q?lp=20debug=20remotewrite=20errors=20from=20rul=E2=80=A6=20(#551?= =?UTF-8?q?4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add some logs to ring batch to help debug remotewrite errors from ruler or distributor Signed-off-by: Wen Xu * fix lint Signed-off-by: Wen Xu * fix unit test Signed-off-by: Wen Xu --------- Signed-off-by: Wen Xu --- pkg/distributor/distributor_test.go | 4 ++++ pkg/ring/batch.go | 22 ++++++++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 16c02e3de1..134fcd78eb 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/codes" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -283,6 +284,9 @@ func TestDistributor_Push(t *testing.T) { request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata) response, err := ds[0].Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) + if err != nil { + err = errors.Cause(err) + } assert.Equal(t, tc.expectedError, err) // Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index db18fc2d09..01ff9bd4c5 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc/status" + "github.com/pkg/errors" "go.uber.org/atomic" ) @@ -116,7 +117,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb for _, i := range instances { go func(i instance) { err := callback(i.desc, i.indexes) - tracker.record(i.itemTrackers, err) + tracker.record(i, err) wg.Done() }(i) } @@ -138,7 +139,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb } } -func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { +func (b *batchTracker) record(instance instance, err error) { // If we reach the required number of successful puts on this sample, then decrement the // number of pending samples by one. // @@ -146,21 +147,30 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // * rpcsPending and rpcsPending guarantees only a single sendSamples goroutine will write to either channel // * succeeded, failed4xx, failed5xx and remaining guarantees that the "return decision" is made atomically // avoiding race condition + sampleTrackers := instance.itemTrackers for i := range sampleTrackers { if err != nil { // Track the number of errors by error family, and if it exceeds maxFailures // shortcut the waiting rpc. - errCount := sampleTrackers[i].recordError(err) + wrappedErr := errors.Wrapf(err, "addr=%s state=%s zone=%s", instance.desc.Addr, instance.desc.State, instance.desc.Zone) + errCount := instance.itemTrackers[i].recordError(wrappedErr) // We should return an error if we reach the maxFailure (quorum) on a given error family OR // we dont have any remaining ingesters to try // Ex: 2xx, 4xx, 5xx -> return 4xx // Ex: 2xx, 5xx, 4xx -> return 4xx // Ex: 4xx, 4xx, _ -> return 4xx // Ex: 5xx, _, 5xx -> return 5xx - if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 { + if errCount > int32(sampleTrackers[i].maxFailures) { if b.rpcsFailed.Inc() == 1 { - b.err <- sampleTrackers[i].getError() + b.err <- errors.Wrap(sampleTrackers[i].getError(), "maxFailure (quorum) on a given error family") } + continue + } + if sampleTrackers[i].remaining.Dec() == 0 { + if b.rpcsFailed.Inc() == 1 { + b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try") + } + continue } } else { // If we successfully push all samples to min success instances, @@ -177,7 +187,7 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // Ex: 4xx, 5xx, 2xx if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- sampleTrackers[i].getError() + b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try") } } }