Skip to content

Commit

Permalink
add some logs to ring batch to help debug remotewrite errors from rul… (
Browse files Browse the repository at this point in the history
#5514)

* add some logs to ring batch to help debug remotewrite errors from ruler or distributor

Signed-off-by: Wen Xu <[email protected]>

* fix lint

Signed-off-by: Wen Xu <[email protected]>

* fix unit test

Signed-off-by: Wen Xu <[email protected]>

---------

Signed-off-by: Wen Xu <[email protected]>
  • Loading branch information
wenxu1024 authored Sep 15, 2023
1 parent 6a49e3b commit a064bce
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/codes"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -283,6 +284,9 @@ func TestDistributor_Push(t *testing.T) {
request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata)
response, err := ds[0].Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
if err != nil {
err = errors.Cause(err)
}
assert.Equal(t, tc.expectedError, err)

// Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum
Expand Down
22 changes: 16 additions & 6 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"google.golang.org/grpc/status"

"github.com/pkg/errors"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -116,7 +117,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
for _, i := range instances {
go func(i instance) {
err := callback(i.desc, i.indexes)
tracker.record(i.itemTrackers, err)
tracker.record(i, err)
wg.Done()
}(i)
}
Expand All @@ -138,29 +139,38 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
}
}

func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
func (b *batchTracker) record(instance instance, err error) {
// If we reach the required number of successful puts on this sample, then decrement the
// number of pending samples by one.
//
// The use of atomic increments here is needed as:
// * rpcsPending and rpcsPending guarantees only a single sendSamples goroutine will write to either channel
// * succeeded, failed4xx, failed5xx and remaining guarantees that the "return decision" is made atomically
// avoiding race condition
sampleTrackers := instance.itemTrackers
for i := range sampleTrackers {
if err != nil {
// Track the number of errors by error family, and if it exceeds maxFailures
// shortcut the waiting rpc.
errCount := sampleTrackers[i].recordError(err)
wrappedErr := errors.Wrapf(err, "addr=%s state=%s zone=%s", instance.desc.Addr, instance.desc.State, instance.desc.Zone)
errCount := instance.itemTrackers[i].recordError(wrappedErr)
// We should return an error if we reach the maxFailure (quorum) on a given error family OR
// we dont have any remaining ingesters to try
// Ex: 2xx, 4xx, 5xx -> return 4xx
// Ex: 2xx, 5xx, 4xx -> return 4xx
// Ex: 4xx, 4xx, _ -> return 4xx
// Ex: 5xx, _, 5xx -> return 5xx
if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 {
if errCount > int32(sampleTrackers[i].maxFailures) {
if b.rpcsFailed.Inc() == 1 {
b.err <- sampleTrackers[i].getError()
b.err <- errors.Wrap(sampleTrackers[i].getError(), "maxFailure (quorum) on a given error family")
}
continue
}
if sampleTrackers[i].remaining.Dec() == 0 {
if b.rpcsFailed.Inc() == 1 {
b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try")
}
continue
}
} else {
// If we successfully push all samples to min success instances,
Expand All @@ -177,7 +187,7 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
// Ex: 4xx, 5xx, 2xx
if sampleTrackers[i].remaining.Dec() == 0 {
if b.rpcsFailed.Inc() == 1 {
b.err <- sampleTrackers[i].getError()
b.err <- errors.Wrap(sampleTrackers[i].getError(), "not enough remaining instances to try")
}
}
}
Expand Down

0 comments on commit a064bce

Please sign in to comment.