diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 67d6b5f8c5..02054d117e 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/pool" "github.com/cortexproject/cortex/integration/ca" "github.com/cortexproject/cortex/integration/e2e" @@ -436,3 +437,87 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend) assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler) } + +func TestQueryFrontendNoRetryChunkPool(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.bucket-store.max-chunk-pool-bytes": "1", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + seriesTimestamp := time.Now() + series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) + series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2ns series is in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + }), "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) + + // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) + + // Query back the series. + c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // We expect request to hit chunk pool exhaustion. + resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Contains(t, string(body), pool.ErrPoolExhausted.Error()) + // We shouldn't be able to see any retries. + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_frontend_retries"}, e2e.WaitMissingMetrics)) +} diff --git a/pkg/frontend/transport/retry.go b/pkg/frontend/transport/retry.go index bf010745ac..bf1b4faa1c 100644 --- a/pkg/frontend/transport/retry.go +++ b/pkg/frontend/transport/retry.go @@ -2,10 +2,16 @@ package transport import ( "context" + "errors" + "strings" + "unsafe" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/pool" "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/querier/tripperware" ) type Retry struct { @@ -44,13 +50,38 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error) } resp, err = f() - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { continue // Retryable } else if resp != nil && resp.Code/100 == 5 { - continue // Retryable - } else { - break + // This is not that efficient as we might decode the body multiple + // times. But error response should be too large so we should be fine. + // TODO: investigate ways to decode only once. + body, err := tripperware.BodyBufferFromHTTPGRPCResponse(resp, nil) + if err != nil { + return nil, err + } + + if tries < r.maxRetries-1 && isBodyRetryable(yoloString(body)) { + continue + } + + return resp, nil } + break + } + if err != nil { + return nil, err } + return resp, err } + +func isBodyRetryable(body string) bool { + // If pool exhausted, retry at query frontend might make things worse. + // Rely on retries at querier level only. + return !strings.Contains(body, pool.ErrPoolExhausted.Error()) +} + +func yoloString(b []byte) string { + return *((*string)(unsafe.Pointer(&b))) +} diff --git a/pkg/frontend/transport/retry_test.go b/pkg/frontend/transport/retry_test.go index a79c083640..3b8ead1a89 100644 --- a/pkg/frontend/transport/retry_test.go +++ b/pkg/frontend/transport/retry_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/pool" "github.com/weaveworks/common/httpgrpc" "go.uber.org/atomic" ) @@ -29,3 +30,25 @@ func TestRetry(t *testing.T) { require.NoError(t, err) require.Equal(t, int32(200), res.Code) } + +func TestNoRetryOnChunkPoolExhaustion(t *testing.T) { + tries := atomic.NewInt64(3) + r := NewRetry(3, nil) + ctx := context.Background() + res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { + try := tries.Dec() + if try > 1 { + return &httpgrpc.HTTPResponse{ + Code: 500, + Body: []byte(pool.ErrPoolExhausted.Error()), + }, nil + } + return &httpgrpc.HTTPResponse{ + Code: 200, + }, nil + + }) + + require.NoError(t, err) + require.Equal(t, int32(500), res.Code) +} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index b3e7f9073c..f7dac096c7 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -46,6 +47,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/math" + "github.com/cortexproject/cortex/pkg/util/multierror" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" @@ -341,10 +343,10 @@ func (q *blocksStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, convertedMatchers = convertMatchersToLabelMatcher(matchers) ) - queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT, convertedMatchers) + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { + nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT, convertedMatchers) if err != nil { - return nil, err + return nil, err, retryableError } resMtx.Lock() @@ -352,7 +354,7 @@ func (q *blocksStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, resWarnings = append(resWarnings, warnings...) resMtx.Unlock() - return queriedBlocks, nil + return queriedBlocks, nil, retryableError } err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) @@ -376,10 +378,10 @@ func (q *blocksStoreQuerier) LabelValues(name string, matchers ...*labels.Matche resultMtx sync.Mutex ) - queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - valueSets, warnings, queriedBlocks, err := q.fetchLabelValuesFromStore(spanCtx, name, clients, minT, maxT, matchers...) + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { + valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, name, clients, minT, maxT, matchers...) if err != nil { - return nil, err + return nil, err, retryableError } resultMtx.Lock() @@ -387,7 +389,7 @@ func (q *blocksStoreQuerier) LabelValues(name string, matchers ...*labels.Matche resWarnings = append(resWarnings, warnings...) resultMtx.Unlock() - return queriedBlocks, nil + return queriedBlocks, nil, retryableError } err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) @@ -421,11 +423,10 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resultMtx sync.Mutex ) - queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit) + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { + seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit) if err != nil { - - return nil, err + return nil, err, retryableError } resultMtx.Lock() @@ -440,7 +441,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* } resultMtx.Unlock() - return queriedBlocks, nil + return queriedBlocks, nil, retryableError } err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) @@ -458,7 +459,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* } func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, - queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error)) error { + queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error { // If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until // now - queryStoreAfter, because the most recent time range is covered by ingesters. This // optimization is particularly important for the blocks storage because can be used to skip @@ -501,6 +502,9 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg resQueriedBlocks = []ulid.ULID(nil) attemptedBlocksZones = make(map[ulid.ULID]map[string]int, len(remainingBlocks)) + + queriedBlocks []ulid.ULID + retryableError error ) for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ { @@ -521,7 +525,7 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg // Fetch series from stores. If an error occur we do not retry because retries // are only meant to cover missing blocks. - queriedBlocks, err := queryFunc(clients, minT, maxT) + queriedBlocks, err, retryableError = queryFunc(clients, minT, maxT) if err != nil { return err } @@ -553,6 +557,12 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg remainingBlocks = missingBlocks } + // After we exhausted retries, if retryable error is not nil return the retryable error. + // It can be helpful to know whether we need to retry more or not. + if retryableError != nil { + return retryableError + } + // We've not been able to query all expected blocks after all retries. level.Warn(util_log.WithContext(ctx, logger)).Log("msg", "failed consistency check", "err", err) return fmt.Errorf("consistency check failed because some blocks were not queried: %s", strings.Join(convertULIDsToString(remainingBlocks), " ")) @@ -567,7 +577,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( matchers []*labels.Matcher, maxChunksLimit int, leftChunksLimit int, -) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) { +) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) g, gCtx = errgroup.WithContext(reqCtx) @@ -579,11 +589,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( spanLog = spanlogger.FromContext(ctx) queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) reqStats = stats.FromContext(ctx) + merrMtx = sync.Mutex{} + merr = multierror.MultiError{} ) matchers, shardingInfo, err := querysharding.ExtractShardingInfo(matchers) if err != nil { - return nil, nil, nil, 0, err + return nil, nil, nil, 0, err, merr.Err() } convertedMatchers := convertMatchersToLabelMatcher(matchers) @@ -614,6 +626,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if err != nil { if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) @@ -637,6 +652,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to receive series from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } @@ -773,10 +791,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( // Wait until all client requests complete. if err := g.Wait(); err != nil { - return nil, nil, nil, 0, err + return nil, nil, nil, 0, err, merr.Err() } - return seriesSets, queriedBlocks, warnings, int(numChunks.Load()), nil + return seriesSets, queriedBlocks, warnings, int(numChunks.Load()), nil, merr.Err() } func (q *blocksStoreQuerier) fetchLabelNamesFromStore( @@ -785,7 +803,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( minT int64, maxT int64, matchers []storepb.LabelMatcher, -) ([][]string, storage.Warnings, []ulid.ULID, error) { +) ([][]string, storage.Warnings, []ulid.ULID, error, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) g, gCtx = errgroup.WithContext(reqCtx) @@ -794,6 +812,8 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) + merrMtx = sync.Mutex{} + merr = multierror.MultiError{} ) // Concurrently fetch series from all clients. @@ -812,6 +832,9 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( if err != nil { if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label names from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } @@ -868,10 +891,10 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( // Wait until all client requests complete. if err := g.Wait(); err != nil { - return nil, nil, nil, err + return nil, nil, nil, err, merr.Err() } - return nameSets, warnings, queriedBlocks, nil + return nameSets, warnings, queriedBlocks, nil, merr.Err() } func (q *blocksStoreQuerier) fetchLabelValuesFromStore( @@ -881,7 +904,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( minT int64, maxT int64, matchers ...*labels.Matcher, -) ([][]string, storage.Warnings, []ulid.ULID, error) { +) ([][]string, storage.Warnings, []ulid.ULID, error, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) g, gCtx = errgroup.WithContext(reqCtx) @@ -890,6 +913,8 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) + merrMtx = sync.Mutex{} + merr = multierror.MultiError{} ) // Concurrently fetch series from all clients. @@ -908,6 +933,9 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( if err != nil { if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label values from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } @@ -967,10 +995,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( // Wait until all client requests complete. if err := g.Wait(); err != nil { - return nil, nil, nil, err + return nil, nil, nil, err, merr.Err() } - return valueSets, warnings, queriedBlocks, nil + return valueSets, warnings, queriedBlocks, nil, merr.Err() } func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) { @@ -1126,6 +1154,9 @@ func isRetryableError(err error) bool { // https://github.com/grpc/grpc-go/blob/03172006f5d168fc646d87928d85cb9c4a480291/clientconn.go#L67 case codes.Canceled: return strings.Contains(err.Error(), "grpc: the client connection is closing") + case codes.Unknown: + // Catch chunks pool exhaustion error only. + return strings.Contains(err.Error(), pool.ErrPoolExhausted.Error()) default: return false } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index a01d4f2893..23114f2663 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/promql-engine/engine" "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -668,6 +669,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them fails to return due to chunk pool exhaustion": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.Unknown, pool.ErrPoolExhausted.Error()), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, "all store-gateways return PermissionDenied": { finderResult: bucketindex.Blocks{ {ID: block1}, diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 42de413e52..f893d20b66 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -231,6 +231,25 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) { return buf.Bytes(), nil } +func BodyBufferFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) { + // if the response is gziped, lets unzip it here + headers := http.Header{} + for _, h := range res.Headers { + headers[h.Key] = h.Values + } + if strings.EqualFold(headers.Get("Content-Encoding"), "gzip") { + gReader, err := gzip.NewReader(bytes.NewBuffer(res.Body)) + if err != nil { + return nil, err + } + defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader") + + return io.ReadAll(gReader) + } + + return res.Body, nil +} + func StatsMerge(stats map[int64]*PrometheusResponseQueryableSamplesStatsPerStep) *PrometheusResponseStats { keys := make([]int64, 0, len(stats)) for key := range stats {