Skip to content

Commit

Permalink
Make chunk and series limit error messages returned by store-gateways…
Browse files Browse the repository at this point in the history
… consistent with those returned by queriers (#6347)

* Make chunk and series limit error messages returned by store-gateways consistent with those returned by queriers

* Add changelog entry
  • Loading branch information
charleskorn authored Oct 13, 2023
1 parent b9335fb commit 231d362
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] All: improved IPv6 support by using the proper host:port formatting. #6311
* [ENHANCEMENT] Querier: always return error encountered during chunks streaming, rather than `the stream has already been exhausted`. #6345
* [ENHANCEMENT] Query-frontend: add `instance_enable_ipv6` to support IPv6. #6111
* [ENHANCEMENT] Store-gateway: return same detailed error messages as queriers when chunks or series limits are reached. #6347
* [ENHANCEMENT] Querier: reduce memory consumed for queries that hit store-gateways. #6348
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
Expand Down
9 changes: 4 additions & 5 deletions integration/store_gateway_limits_hit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2emimir"
"github.com/grafana/mimir/pkg/storegateway"
"github.com/grafana/mimir/pkg/util/globalerror"
)

Expand Down Expand Up @@ -83,17 +82,17 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) {
additionalQuerierFlags map[string]string
expectedErrorKey string
}{
"when store-gateway hits max_fetched_series_per_query, 'exceeded series limit' is returned": {
"when store-gateway hits max_fetched_series_per_query, 'err-mimir-max-series-per-query' is returned": {
additionalStoreGatewayFlags: map[string]string{"-querier.max-fetched-series-per-query": "1"},
expectedErrorKey: storegateway.ErrSeriesLimitMessage,
expectedErrorKey: string(globalerror.MaxSeriesPerQuery),
},
"when querier hits max_fetched_series_per_query, 'err-mimir-max-series-per-query' is returned": {
additionalQuerierFlags: map[string]string{"-querier.max-fetched-series-per-query": "1"},
expectedErrorKey: string(globalerror.MaxSeriesPerQuery),
},
"when store-gateway hits max_fetched_chunks_per_query, 'exceeded chunks limit' is returned": {
"when store-gateway hits max_fetched_chunks_per_query, 'err-mimir-max-chunks-per-query' is returned": {
additionalStoreGatewayFlags: map[string]string{"-querier.max-fetched-chunks-per-query": "1"},
expectedErrorKey: storegateway.ErrChunksLimitMessage,
expectedErrorKey: string(globalerror.MaxChunksPerQuery),
},
"when querier hits max_fetched_chunks_per_query, 'err-mimir-max-chunks-per-query' is returned": {
additionalQuerierFlags: map[string]string{"-querier.max-fetched-chunks-per-query": "1"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers []
if err != nil {
return nil, errors.Wrap(err, "fetch series")
}
seriesSetsIterator = newLimitingSeriesChunkRefsSetIterator(seriesSetsIterator, NewLimiter(0, nil), seriesLimiter)
seriesSetsIterator = newLimitingSeriesChunkRefsSetIterator(seriesSetsIterator, NewLimiter(0, nil, ""), seriesLimiter)
seriesSet := newSeriesChunkRefsSeriesSet(seriesSetsIterator)
// Extract label names from all series. Many label names will be the same, so we need to deduplicate them.
labelNames := map[string]struct{}{}
Expand Down
7 changes: 3 additions & 4 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -623,12 +622,12 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
},
"should fail if the max chunks limit is exceeded - 422": {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
expectedErr: "the query exceeded the maximum number of chunks (limit: 11 chunks) (err-mimir-max-chunks-per-query)",
expectedCode: http.StatusUnprocessableEntity,
},
"should fail if the max series limit is exceeded - 422": {
maxChunksLimit: expectedChunks,
expectedErr: "exceeded series limit",
expectedErr: "the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query)",
maxSeriesLimit: 1,
expectedCode: http.StatusUnprocessableEntity,
},
Expand Down Expand Up @@ -665,7 +664,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), testData.expectedErr))
assert.Contains(t, err.Error(), testData.expectedErr)
status, ok := status.FromError(err)
assert.Equal(t, true, ok)
assert.Equal(t, testData.expectedCode, status.Code())
Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestBlockLabelNames(t *testing.T) {
slices.Sort(jFooLabelNames)
slices.Sort(jNotFooLabelNames)

sl := NewLimiter(math.MaxUint64, promauto.With(nil).NewCounter(prometheus.CounterOpts{Name: "test"}))
sl := NewLimiter(math.MaxUint64, promauto.With(nil).NewCounter(prometheus.CounterOpts{Name: "test"}), "exceeded unlimited limit of %v")
newTestBucketBlock := prepareTestBlock(test.NewTB(t), appendTestSeries(series))

t.Run("happy case with no matchers", func(t *testing.T) {
Expand Down Expand Up @@ -2322,7 +2322,7 @@ func TestBucketStore_Series_Limits(t *testing.T) {
"should fail if the number of unique series queried is greater than the configured series limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
seriesLimit: 1,
expectedErr: ErrSeriesLimitMessage,
expectedErr: "the query exceeded the maximum number of series (limit: 1 series) (err-mimir-max-series-per-query)",
},
"should pass if the number of unique series queried is equal or less than the configured series limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
Expand All @@ -2332,7 +2332,7 @@ func TestBucketStore_Series_Limits(t *testing.T) {
"should fail if the number of chunks queried is greater than the configured chunks limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
chunksLimit: 3,
expectedErr: ErrChunksLimitMessage,
expectedErr: "the query exceeded the maximum number of chunks (limit: 3 chunks) (err-mimir-max-chunks-per-query)",
},
"should pass if the number of chunks queried is equal or less than the configured chunks limit": {
reqMatchers: []storepb.LabelMatcher{{Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: "series_[123]"}},
Expand Down
5 changes: 3 additions & 2 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
mimir_testutil "github.com/grafana/mimir/pkg/storage/tsdb/testutil"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/test"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -1424,7 +1425,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi
},
"should return error if the actual number of queried chunks is > limit": {
limit: chunksQueried - 1,
expectedErr: status.Error(http.StatusUnprocessableEntity, fmt.Sprintf("exceeded chunks limit: rpc error: code = Code(422) desc = limit %d exceeded", chunksQueried-1)),
expectedErr: status.Error(http.StatusUnprocessableEntity, "rpc error: code = Code(422) desc = "+fmt.Sprintf(limiter.MaxChunksPerQueryLimitMsgFormat, chunksQueried-1)),
},
}

Expand Down Expand Up @@ -1488,7 +1489,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi
assert.True(t, ok)
s2, ok := status.FromError(errors.Cause(testData.expectedErr))
assert.True(t, ok)
assert.True(t, strings.Contains(s1.Message(), s2.Message()))
assert.Contains(t, s1.Message(), s2.Message())
assert.Equal(t, s1.Code(), s2.Code())
} else {
require.NoError(t, err)
Expand Down
17 changes: 10 additions & 7 deletions pkg/storegateway/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/util/limiter"
)

type ChunksLimiter interface {
Expand All @@ -37,17 +39,18 @@ type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter

// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64
reserved atomic.Uint64
limit uint64
reserved atomic.Uint64
errorMessageFormat string

// Counter metric which we will increase if limit is exceeded.
failedCounter prometheus.Counter
failedOnce sync.Once
}

// NewLimiter returns a new limiter with a specified limit. 0 disables the limit.
func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter {
return &Limiter{limit: limit, failedCounter: ctr}
func NewLimiter(limit uint64, ctr prometheus.Counter, errorMessageFormat string) *Limiter {
return &Limiter{limit: limit, failedCounter: ctr, errorMessageFormat: errorMessageFormat}
}

// Reserve implements ChunksLimiter.
Expand All @@ -59,21 +62,21 @@ func (l *Limiter) Reserve(num uint64) error {
// We need to protect from the counter being incremented twice due to concurrency
// while calling Reserve().
l.failedOnce.Do(l.failedCounter.Inc)
return httpgrpc.Errorf(http.StatusUnprocessableEntity, "limit %v exceeded", l.limit)
return httpgrpc.Errorf(http.StatusUnprocessableEntity, l.errorMessageFormat, l.limit)
}
return nil
}

// NewChunksLimiterFactory makes a new ChunksLimiterFactory with a dynamic limit.
func NewChunksLimiterFactory(limitsExtractor func() uint64) ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) ChunksLimiter {
return NewLimiter(limitsExtractor(), failedCounter)
return NewLimiter(limitsExtractor(), failedCounter, limiter.MaxChunksPerQueryLimitMsgFormat)
}
}

// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a dynamic limit.
func NewSeriesLimiterFactory(limitsExtractor func() uint64) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return NewLimiter(limitsExtractor(), failedCounter)
return NewLimiter(limitsExtractor(), failedCounter, limiter.MaxSeriesHitMsgFormat)
}
}
18 changes: 9 additions & 9 deletions pkg/storegateway/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestLimiter(t *testing.T) {
c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
l := NewLimiter(10, c)
l := NewLimiter(10, c, "limit of %v exceeded")

assert.NoError(t, l.Reserve(5))
assert.Equal(t, float64(0), prom_testutil.ToFloat64(c))
Expand All @@ -27,12 +27,12 @@ func TestLimiter(t *testing.T) {
assert.Equal(t, float64(0), prom_testutil.ToFloat64(c))

err := l.Reserve(1)
assert.Error(t, err)
assert.ErrorContains(t, err, "limit of 10 exceeded")
assert.Equal(t, float64(1), prom_testutil.ToFloat64(c))
checkErrorStatusCode(t, err)

err = l.Reserve(2)
assert.Error(t, err)
assert.ErrorContains(t, err, "limit of 10 exceeded")
assert.Equal(t, float64(1), prom_testutil.ToFloat64(c))
checkErrorStatusCode(t, err)
}
Expand All @@ -45,14 +45,14 @@ func checkErrorStatusCode(t *testing.T, err error) {

// newStaticChunksLimiterFactory makes a new ChunksLimiterFactory with a static limit.
func newStaticChunksLimiterFactory(limit uint64) ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) ChunksLimiter {
return NewLimiter(limit, failedCounter)
}
return NewChunksLimiterFactory(func() uint64 {
return limit
})
}

// newStaticSeriesLimiterFactory makes a new ChunksLimiterFactory with a static limit.
func newStaticSeriesLimiterFactory(limit uint64) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return NewLimiter(limit, failedCounter)
}
return NewSeriesLimiterFactory(func() uint64 {
return limit
})
}
9 changes: 2 additions & 7 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ var (
})
)

const (
ErrSeriesLimitMessage = "exceeded series limit"
ErrChunksLimitMessage = "exceeded chunks limit"
)

// seriesChunkRefsSetIterator is the interface implemented by an iterator returning a sequence of seriesChunkRefsSet.
type seriesChunkRefsSetIterator interface {
Next() bool
Expand Down Expand Up @@ -649,7 +644,7 @@ func (l *limitingSeriesChunkRefsSetIterator) Next() bool {
l.currentBatch = l.from.At()
err := l.seriesLimiter.Reserve(uint64(l.currentBatch.len()))
if err != nil {
l.err = errors.Wrap(err, ErrSeriesLimitMessage)
l.err = err
return false
}

Expand All @@ -660,7 +655,7 @@ func (l *limitingSeriesChunkRefsSetIterator) Next() bool {

err = l.chunksLimiter.Reserve(uint64(totalChunks))
if err != nil {
l.err = errors.Wrap(err, ErrChunksLimitMessage)
l.err = err
return false
}
return true
Expand Down
7 changes: 4 additions & 3 deletions pkg/storegateway/series_refs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,8 @@ func TestLimitingSeriesChunkRefsSetIterator(t *testing.T) {
t.Run(testName, func(t *testing.T) {
iterator := newLimitingSeriesChunkRefsSetIterator(
newSliceSeriesChunkRefsSetIterator(testCase.upstreamErr, testCase.sets...),
&staticLimiter{limit: testCase.chunksLimit},
&staticLimiter{limit: testCase.seriesLimit},
&staticLimiter{limit: testCase.chunksLimit, msg: "exceeded chunks limit"},
&staticLimiter{limit: testCase.seriesLimit, msg: "exceeded series limit"},
)

sets := readAllSeriesChunkRefsSet(iterator)
Expand Down Expand Up @@ -2362,12 +2362,13 @@ func (s *sliceSeriesChunkRefsSetIterator) Err() error {

type staticLimiter struct {
limit int
msg string
current atomic.Uint64
}

func (l *staticLimiter) Reserve(num uint64) error {
if l.current.Add(num) > uint64(l.limit) {
return errors.New("test limit exceeded")
return errors.New(l.msg)
}
return nil
}
Expand Down

0 comments on commit 231d362

Please sign in to comment.