Skip to content

Commit

Permalink
retry chunk pool exhaustion error in querier, not in query frontend (c…
Browse files Browse the repository at this point in the history
…ortexproject#5569)

* stop retrying chunk pool exhaustion at query frontend, retry at querier level

Signed-off-by: Ben Ye <[email protected]>

* update integration test

Signed-off-by: Ben Ye <[email protected]>

* refactor

Signed-off-by: Ben Ye <[email protected]>

fix e2e test

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Sep 22, 2023
1 parent b7a3a5d commit 179c0d6
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 29 deletions.
85 changes: 85 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/pool"

"github.com/cortexproject/cortex/integration/ca"
"github.com/cortexproject/cortex/integration/e2e"
Expand Down Expand Up @@ -436,3 +437,87 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)
assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler)
}

func TestQueryFrontendNoRetryChunkPool(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.bucket-store.max-chunk-pool-bytes": "1",
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components for the write path.
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push some series to Cortex.
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

seriesTimestamp := time.Now()
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"})
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Wait until the TSDB head is compacted and shipped to the storage.
// The shipped block contains the 1st series, while the 2ns series is in the head.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))

queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
require.NoError(t, s.Start(queryFrontend))

// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.sync-interval": "5s",
}), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.sync-interval": "5s",
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))

// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))

// Query back the series.
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// We expect request to hit chunk pool exhaustion.
resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp)
require.NoError(t, err)
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
require.Contains(t, string(body), pool.ErrPoolExhausted.Error())
// We shouldn't be able to see any retries.
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_frontend_retries"}, e2e.WaitMissingMetrics))
}
39 changes: 35 additions & 4 deletions pkg/frontend/transport/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package transport

import (
"context"
"errors"
"strings"
"unsafe"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

type Retry struct {
Expand Down Expand Up @@ -44,13 +50,38 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)
}

resp, err = f()
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
continue // Retryable
} else if resp != nil && resp.Code/100 == 5 {
continue // Retryable
} else {
break
// This is not that efficient as we might decode the body multiple
// times. But error response should be too large so we should be fine.
// TODO: investigate ways to decode only once.
body, err := tripperware.BodyBufferFromHTTPGRPCResponse(resp, nil)
if err != nil {
return nil, err
}

if tries < r.maxRetries-1 && isBodyRetryable(yoloString(body)) {
continue
}

return resp, nil
}
break
}
if err != nil {
return nil, err
}

return resp, err
}

func isBodyRetryable(body string) bool {
// If pool exhausted, retry at query frontend might make things worse.
// Rely on retries at querier level only.
return !strings.Contains(body, pool.ErrPoolExhausted.Error())
}

func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
23 changes: 23 additions & 0 deletions pkg/frontend/transport/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
)
Expand All @@ -29,3 +30,25 @@ func TestRetry(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int32(200), res.Code)
}

func TestNoRetryOnChunkPoolExhaustion(t *testing.T) {
tries := atomic.NewInt64(3)
r := NewRetry(3, nil)
ctx := context.Background()
res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
try := tries.Dec()
if try > 1 {
return &httpgrpc.HTTPResponse{
Code: 500,
Body: []byte(pool.ErrPoolExhausted.Error()),
}, nil
}
return &httpgrpc.HTTPResponse{
Code: 200,
}, nil

})

require.NoError(t, err)
require.Equal(t, int32(500), res.Code)
}
Loading

0 comments on commit 179c0d6

Please sign in to comment.