Skip to content

Commit

Permalink
Enable streaming chunks from store-gateways to queriers by default (#…
Browse files Browse the repository at this point in the history
…6646)

* Enable streaming chunks from store-gateways to queriers by default.

# Conflicts:
#	cmd/mimir/help-all.txt.tmpl
#	docs/sources/mimir/configure/about-versioning.md
#	pkg/querier/querier.go

* Add changelog entry.

# Conflicts:
#	CHANGELOG.md

* Fix failing TestQuerierWithBlocksStorageOnMissingBlocksFromStorage test with store-gateway chunks streaming enabled.

* Partially fix cache-related assertions in TestQuerierWithBlocksStorageRunningInSingleBinaryMode

* Address PR feedback, and add more details to failing assertions

# Conflicts:
#	integration/querier_test.go

* Update tests to reflect behaviour of chunks streaming with #8039 in place.
  • Loading branch information
charleskorn authored Jun 17, 2024
1 parent 5893053 commit fee8a90
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [CHANGE] Added new metric `cortex_compactor_disk_out_of_space_errors_total` which counts how many times a compaction failed due to the compactor being out of disk. #8237
* [CHANGE] Anonymous usage statistics tracking: report active series in addition to in-memory series. #8279
* [CHANGE] Ruler: `evaluation_delay` field in the rule group configuration has been deprecated. Please use `query_offset` instead (it has the same exact meaning and behaviour). #8295
* [CHANGE] Store-gateway / querier: enable streaming chunks from store-gateways to queriers by default. #6646
* [CHANGE] General: remove `-log.buffered`. The configuration option has been enabled by default and deprecated since Mimir 2.11.
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1860,7 +1860,7 @@
"required": false,
"desc": "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldDefaultValue": true,
"fieldFlag": "querier.prefer-streaming-chunks-from-store-gateways",
"fieldType": "boolean",
"fieldCategory": "experimental"
Expand All @@ -1885,7 +1885,7 @@
"fieldDefaultValue": 256,
"fieldFlag": "querier.streaming-chunks-per-store-gateway-buffer-size",
"fieldType": "int",
"fieldCategory": "experimental"
"fieldCategory": "advanced"
},
{
"kind": "field",
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1794,7 +1794,7 @@ Usage of ./cmd/mimir/mimir:
-querier.minimize-ingester-requests-hedging-delay duration
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this. (default true)
-querier.promql-engine string
[experimental] PromQL engine to use, either 'prometheus' or 'mimir' (default "prometheus")
-querier.promql-experimental-functions-enabled
Expand Down Expand Up @@ -1872,7 +1872,7 @@ Usage of ./cmd/mimir/mimir:
-querier.streaming-chunks-per-ingester-buffer-size uint
Number of series to buffer per ingester when streaming chunks from ingesters. (default 256)
-querier.streaming-chunks-per-store-gateway-buffer-size uint
[experimental] Number of series to buffer per store-gateway when streaming chunks from store-gateways. (default 256)
Number of series to buffer per store-gateway when streaming chunks from store-gateways. (default 256)
-querier.timeout duration
The timeout for a query. This config option should be set on query-frontend too when query sharding is enabled. This also applies to queries evaluated by the ruler (internally or remotely). (default 2m0s)
-query-frontend.active-series-write-timeout duration
Expand Down
4 changes: 3 additions & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ The following features are currently experimental:
- `-ingester.client.circuit-breaker.cooldown-period`
- Querier
- Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`)
- Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`, `-querier.streaming-chunks-per-store-gateway-buffer-size`)
- Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`)
- Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`)
- Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`)
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
Expand Down Expand Up @@ -214,6 +214,8 @@ The following features or configuration parameters are currently deprecated and
- `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`
- Mimirtool
- the flag `--rule-files`
- Querier
- the flag `-querier.prefer-streaming-chunks-from-store-gateways`

The following features or configuration parameters are currently deprecated and will be **removed in a future release (to be announced)**:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,15 +1389,15 @@ store_gateway_client:
# respond with a stream of chunks if the target store-gateway supports this, and
# this preference will be ignored by store-gateways that do not support this.
# CLI flag: -querier.prefer-streaming-chunks-from-store-gateways
[prefer_streaming_chunks_from_store_gateways: <boolean> | default = false]
[prefer_streaming_chunks_from_store_gateways: <boolean> | default = true]
# (advanced) Number of series to buffer per ingester when streaming chunks from
# ingesters.
# CLI flag: -querier.streaming-chunks-per-ingester-buffer-size
[streaming_chunks_per_ingester_series_buffer_size: <int> | default = 256]
# (experimental) Number of series to buffer per store-gateway when streaming
# chunks from store-gateways.
# (advanced) Number of series to buffer per store-gateway when streaming chunks
# from store-gateways.
# CLI flag: -querier.streaming-chunks-per-store-gateway-buffer-size
[streaming_chunks_per_store_gateway_series_buffer_size: <int> | default = 256]
Expand Down
34 changes: 18 additions & 16 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/grafana/e2e"
e2ecache "github.com/grafana/e2e/cache"
e2edb "github.com/grafana/e2e/db"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -446,6 +445,8 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.NoError(t, cluster.WaitSumMetrics(e2e.GreaterOrEqual(float64(shippedBlocks*seriesReplicationFactor)), "cortex_bucket_store_blocks_loaded"))

var expectedCacheRequests int
var expectedCacheHits int
var expectedMemcachedOps int

// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
// For every series in a block we expect
Expand All @@ -459,51 +460,53 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor * 3 // expanded postings, postings, series
expectedCacheRequests += seriesReplicationFactor * 3 // expanded postings, postings, series
expectedMemcachedOps += (seriesReplicationFactor * 3) + (seriesReplicationFactor * 3) // Same reasoning as for expectedCacheRequests, but this also includes a set for each get that is not a hit

result, err = c.Query(series2Name, series2Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector2, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor*3 + seriesReplicationFactor // expanded postings, postings, series for 1 time range; only expaned postings for another
expectedCacheRequests += seriesReplicationFactor*3 + seriesReplicationFactor // expanded postings, postings, series for 1 time range; only expanded postings for another
expectedMemcachedOps += 2 * (seriesReplicationFactor*3 + seriesReplicationFactor) // Same reasoning as for expectedCacheRequests, but this also includes a set for each get that is not a hit

result, err = c.Query(series3Name, series3Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector3, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor * 2 // expanded postings for 2 time ranges

expectedMemcachedOps := 2 * expectedCacheRequests // Same reasoning as for expectedCacheRequests, but this also includes a set for each get
expectedCacheRequests += seriesReplicationFactor * 2 // expanded postings for 2 time ranges
expectedMemcachedOps += seriesReplicationFactor*2 + seriesReplicationFactor*2 // Same reasoning as for expectedCacheRequests, but this also includes a set for each get that is not a hit

// Check the in-memory index cache metrics (in the store-gateway).
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"), "expected %v requests", expectedCacheRequests)
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheHits)), "thanos_store_index_cache_hits_total"), "expected %v hits", expectedCacheHits)
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"), "expected %v operations", expectedMemcachedOps)
}

// Query back again the 1st series from storage. This time it should use the index cache.
// It should get a hit on expanded postings; this means that it will not request individual postings for matchers.
// It should get a hit on series.
// We expect 3 cache requests and 3 cache hits.
// We expect 2 cache requests and 2 cache hits.
result, err = c.Query(series1Name, series1Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor * 2 // expanded postings and series
expectedMemcachedOps += seriesReplicationFactor * 2 // there is no set after the gets this time
expectedCacheHits += seriesReplicationFactor * 2
expectedMemcachedOps += seriesReplicationFactor * 2 // there is no set after the gets this time

require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"), "expected %v requests", expectedCacheRequests)
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheHits)), "thanos_store_index_cache_hits_total"), "expected %v hits", expectedCacheHits) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"), "expected %v operations", expectedMemcachedOps)
}

// Query metadata.
Expand Down Expand Up @@ -881,8 +884,7 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) {
// missing from the storage.
_, err = c.Query(series1Name, series1Timestamp)
require.Error(t, err)
assert.Contains(t, err.Error(), "500")
assert.Contains(t, err.(*promv1.Error).Detail, "failed to fetch some blocks")
assert.Contains(t, err.Error(), "get range reader: The specified key does not exist")

// We expect this to still be queryable as it was not in the cleared storage
_, err = c.Query(series2Name, series2Timestamp)
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type Config struct {

ShuffleShardingIngestersEnabled bool `yaml:"shuffle_sharding_ingesters_enabled" category:"advanced"`

PreferStreamingChunksFromStoreGateways bool `yaml:"prefer_streaming_chunks_from_store_gateways" category:"experimental"`
PreferStreamingChunksFromStoreGateways bool `yaml:"prefer_streaming_chunks_from_store_gateways" category:"experimental"` // Enabled by default as of Mimir 2.13, remove altogether in 2.14.
PreferAvailabilityZone string `yaml:"prefer_availability_zone" category:"experimental" doc:"hidden"`
StreamingChunksPerIngesterSeriesBufferSize uint64 `yaml:"streaming_chunks_per_ingester_series_buffer_size" category:"advanced"`
StreamingChunksPerStoreGatewaySeriesBufferSize uint64 `yaml:"streaming_chunks_per_store_gateway_series_buffer_size" category:"experimental"`
StreamingChunksPerStoreGatewaySeriesBufferSize uint64 `yaml:"streaming_chunks_per_store_gateway_series_buffer_size" category:"advanced"`
MinimizeIngesterRequests bool `yaml:"minimize_ingester_requests" category:"advanced"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"minimize_ingester_requests_hedging_delay" category:"advanced"`

Expand All @@ -77,7 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.QueryStoreAfter, queryStoreAfterFlag, 12*time.Hour, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
f.BoolVar(&cfg.ShuffleShardingIngestersEnabled, "querier.shuffle-sharding-ingesters-enabled", true, fmt.Sprintf("Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -%s. If this setting is false or -%s is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).", validation.QueryIngestersWithinFlag, validation.QueryIngestersWithinFlag))
f.BoolVar(&cfg.PreferStreamingChunksFromStoreGateways, "querier.prefer-streaming-chunks-from-store-gateways", false, "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.")
f.BoolVar(&cfg.PreferStreamingChunksFromStoreGateways, "querier.prefer-streaming-chunks-from-store-gateways", true, "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.")
f.StringVar(&cfg.PreferAvailabilityZone, "querier.prefer-availability-zone", "", "Preferred availability zone to query ingesters from when using the ingest storage.")

const minimiseIngesterRequestsFlagName = "querier.minimize-ingester-requests"
Expand Down

0 comments on commit fee8a90

Please sign in to comment.