Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return 503 on hitting distributor instance limits #6387

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Distributor: Add new `cortex_distributor_inflight_client_requests` metric to track number of ingester client inflight requests. #6358
* [ENHANCEMENT] Distributor: Expose `cortex_label_size_bytes` native histogram metric. #6372
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
Expand Down
11 changes: 3 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ var (
// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size. The value must be greater than or equal to 0")

// Distributor instance limits errors.
errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")
errMaxSamplesPushRateLimitReached = errors.New("distributor's samples push rate limit reached")
errTooManyInflightClientRequests = errors.New("too many inflight ingester client requests in distributor")
)

const (
Expand Down Expand Up @@ -668,19 +663,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))

if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) {
return nil, errTooManyInflightPushRequests
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight push requests in distributor")
}

if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")
}
}

// only reject requests at this stage to allow distributor to finish sending the current batch request to all ingesters
// even if we've exceeded the MaxInflightClientRequests in the `doBatch`
if d.cfg.InstanceLimits.MaxInflightClientRequests > 0 && d.inflightClientRequests.Load() > int64(d.cfg.InstanceLimits.MaxInflightClientRequests) {
return nil, errTooManyInflightClientRequests
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight ingester client requests in distributor")
}

removeReplica := false
Expand Down
15 changes: 8 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
preInflight: 101,
inflightLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: errTooManyInflightPushRequests},
{samples: 100, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight push requests in distributor")},
},
},
"below inflight client limit": {
Expand All @@ -863,7 +863,8 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
preInflightClient: 103,
inflightClientLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: errTooManyInflightClientRequests},
{samples: 100, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable,
"too many inflight ingester client requests in distributor")},
},
},
"below ingestion rate limit": {
Expand Down Expand Up @@ -892,7 +893,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 100, expectedError: errMaxSamplesPushRateLimitReached},
{samples: 100, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")},
{samples: 100, expectedError: nil},
},
},
Expand All @@ -902,10 +903,10 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896
{samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8
{samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400
{samples: 5000, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120
{samples: 5000, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896
{samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8
},
},
}
Expand Down
Loading