Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from cortexproject:master #580

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
* [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503
* [ENHANCEMENT] Querier: Batch adding series to query limiter to optimize locking. #5505
* [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
* [ENHANCEMENT] Querier: Retry store gateway client connection closing gRPC error. #5558
* [ENHANCEMENT] QueryFrontend: Add generic retry for all APIs. #5561.
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
3 changes: 2 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
}

func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
retry := transport.NewRetry(t.Cfg.QueryRange.MaxRetries, prometheus.DefaultRegisterer)
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer, retry)
if err != nil {
return nil, err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,17 +1093,18 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
for _, m := range resp.Metric {
if err := queryLimiter.AddSeries(m.Labels); err != nil {
return nil, validation.LimitError(err.Error())
}
s = append(s, m.Labels)
m := cortexpb.FromLabelAdaptersToMetric(m.Labels)
fingerprint := m.Fingerprint()
mutex.Lock()
(*metrics)[fingerprint] = m
mutex.Unlock()
}

if err := queryLimiter.AddSeries(s...); err != nil {
return nil, validation.LimitError(err.Error())
}
return nil, nil
})

Expand All @@ -1130,19 +1131,18 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
} else if err != nil {
return nil, err
}

s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
for _, metric := range resp.Metric {
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)

if err := queryLimiter.AddSeries(metric.Labels); err != nil {
return nil, validation.LimitError(err.Error())
}

s = append(s, metric.Labels)
fingerprint := m.Fingerprint()
mutex.Lock()
(*metrics)[fingerprint] = m
mutex.Unlock()
}
if err := queryLimiter.AddSeries(s...); err != nil {
return nil, validation.LimitError(err.Error())
}
}

return nil, nil
Expand Down
19 changes: 10 additions & 9 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(chunkLimitErr.Error())
}

s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)+len(resp.Timeseries))
for _, series := range resp.Chunkseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
s = append(s, series.Labels)
}

for _, series := range resp.Timeseries {
s = append(s, series.Labels)
}

if limitErr := queryLimiter.AddSeries(s...); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}

if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil {
Expand All @@ -340,12 +347,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(dataBytesLimitErr.Error())
}

for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
}

result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) {
func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) {
switch {
case cfg.DownstreamURL != "":
// If the user has specified a downstream Prometheus, then we should use that.
Expand All @@ -59,12 +59,12 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i
cfg.FrontendV2.Port = grpcListenPort
}

fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg)
fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err

default:
// No scheduler = use original frontend.
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
fr, err := v1.New(cfg.FrontendV1, limits, log, reg, retry)
if err != nil {
return nil, nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil)
rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil, transport.NewRetry(0, nil))
require.NoError(t, err)
require.NotNil(t, rt)
// v1 will be nil if DownstreamURL is defined.
Expand Down
56 changes: 56 additions & 0 deletions pkg/frontend/transport/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package transport

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
)

type Retry struct {
maxRetries int
retriesCount prometheus.Histogram
}

func NewRetry(maxRetries int, reg prometheus.Registerer) *Retry {
return &Retry{
maxRetries: maxRetries,
retriesCount: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_frontend_retries",
Help: "Number of times a request is retried.",
Buckets: []float64{0, 1, 2, 3, 4, 5},
}),
}
}

func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)) (*httpgrpc.HTTPResponse, error) {
if r.maxRetries == 0 {
// Retries are disabled. Try only once.
return f()
}

tries := 0
defer func() { r.retriesCount.Observe(float64(tries)) }()

var (
resp *httpgrpc.HTTPResponse
err error
)
for ; tries < r.maxRetries; tries++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}

resp, err = f()
if err != nil && err != context.Canceled {
continue // Retryable
} else if resp != nil && resp.Code/100 == 5 {
continue // Retryable
} else {
break
}
}
return resp, err
}
31 changes: 31 additions & 0 deletions pkg/frontend/transport/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package transport

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
)

func TestRetry(t *testing.T) {
tries := atomic.NewInt64(3)
r := NewRetry(3, nil)
ctx := context.Background()
res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
try := tries.Dec()
if try > 1 {
return &httpgrpc.HTTPResponse{
Code: 500,
}, nil
}
return &httpgrpc.HTTPResponse{
Code: 200,
}, nil

})

require.NoError(t, err)
require.Equal(t, int32(200), res.Code)
}
47 changes: 26 additions & 21 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
Expand Down Expand Up @@ -66,6 +67,7 @@ type Frontend struct {
cfg Config
log log.Logger
limits Limits
retry *transport.Retry

requestQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
Expand All @@ -92,11 +94,12 @@ type request struct {
}

// New creates a new frontend. Frontend implements service, and must be started and stopped.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
limits: limits,
retry: retry,
queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
Expand Down Expand Up @@ -173,31 +176,33 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
}
}

request := request{
request: req,
originalCtx: ctx,
return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
request := request{
request: req,
originalCtx: ctx,

// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
// client context cancellation.
err: make(chan error, 1),
response: make(chan *httpgrpc.HTTPResponse, 1),
}
// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
// client context cancellation.
err: make(chan error, 1),
response: make(chan *httpgrpc.HTTPResponse, 1),
}

if err := f.queueRequest(ctx, &request); err != nil {
return nil, err
}
if err := f.queueRequest(ctx, &request); err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, ctx.Err()
select {
case <-ctx.Done():
return nil, ctx.Err()

case resp := <-request.response:
return resp, nil
case resp := <-request.response:
return resp, nil

case err := <-request.err:
return nil, err
}
case err := <-request.err:
return nil, err
}
})
}

// Process allows backends to pull requests from the frontend.
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
require.NoError(t, err)

limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}}
v1, err := New(config, limits, logger, reg)
v1, err := New(config, limits, logger, reg, transport.NewRetry(0, nil))
require.NoError(t, err)
require.NotNil(t, v1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v1/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand All @@ -25,7 +26,7 @@ func setupFrontend(t *testing.T, config Config) (*Frontend, error) {
logger := log.NewNopLogger()

limits := MockLimits{Queriers: 3, MockLimits: queue.MockLimits{MaxOutstanding: 100}}
frontend, err := New(config, limits, logger, nil)
frontend, err := New(config, limits, logger, nil, transport.NewRetry(0, nil))
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
Loading