diff --git a/CHANGELOG.md b/CHANGELOG.md index aae93e6b24..27091a2a74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,13 @@ # Changelog ## master / unreleased +* [FEATURE] Ruler: Add support for disabling rule groups. #5521 * [FEATURE] Added the flag `-alertmanager.alerts-gc-interval` to configure alert manager alerts Garbage collection interval. #5550 * [FEATURE] Ruler: Add support for Limit field on RuleGroup. #5528 * [FEATURE] AlertManager: Add support for Webex, Discord and Telegram Receiver. #5493 * [FEATURE] Ingester: added `-admin-limit-message` to customize the message contained in limit errors.#5460 * [FEATURE] AlertManager: Update version to v0.26.0 and bring in Microsoft Teams receiver. #5543 +* [FEATURE] Store Gateway: Support lazy expanded posting optimization. Added new flag `"blocks-storage.bucket-store.lazy-expanded-postings-enabled` and new metrics `cortex_bucket_store_lazy_expanded_postings_total`, `cortex_bucket_store_lazy_expanded_posting_size_bytes_total` and `cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total`. #5556. * [CHANGE] AlertManager: include reason label in cortex_alertmanager_notifications_failed_total.#5409 * [CHANGE] Query: Set CORS Origin headers for Query API #5388 * [CHANGE] Updating prometheus/alertmanager from v0.25.0 to v0.25.1-0.20230505130626-263ca5c9438e. This includes the below changes. #5276 @@ -33,6 +35,7 @@ * [FEATURE] Ruler: Support for filtering rules in the API. #5417 * [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432 * [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496 +* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests`for store gateways to reject further requests upon reaching the limit. #5553 * [FEATURE] Compactor: Implemented partitioning compactor based on proposal #4843. #5465 * [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319 * [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292 @@ -58,6 +61,7 @@ * [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532 * [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517 * [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542 +* [ENHANCEMENT] Querier: Retry store gateway client connection closing gRPC error. #5558 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index a981b5411d..76ecf8a179 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -499,6 +499,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. + # The limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] @@ -1101,6 +1106,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout [index_header_lazy_loading_idle_timeout: | default = 20m] + # If true, Store Gateway will estimate postings size and try to lazily + # expand postings if it downloads less data than expanding all postings. + # CLI flag: -blocks-storage.bucket-store.lazy-expanded-postings-enabled + [lazy_expanded_postings_enabled: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 891b5824cf..d407806542 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -602,6 +602,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. + # The limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] @@ -1204,6 +1209,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout [index_header_lazy_loading_idle_timeout: | default = 20m] + # If true, Store Gateway will estimate postings size and try to lazily + # expand postings if it downloads less data than expanding all postings. + # CLI flag: -blocks-storage.bucket-store.lazy-expanded-postings-enabled + [lazy_expanded_postings_enabled: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 02d3037b11..3dec1c8c4b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1042,6 +1042,11 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. The + # limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] @@ -1643,6 +1648,11 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout [index_header_lazy_loading_idle_timeout: | default = 20m] + # If true, Store Gateway will estimate postings size and try to lazily expand + # postings if it downloads less data than expanding all postings. + # CLI flag: -blocks-storage.bucket-store.lazy-expanded-postings-enabled + [lazy_expanded_postings_enabled: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir @@ -2857,7 +2867,7 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # List of metric relabel configurations. Note that in most situations, it is # more effective to use metrics relabeling directly in the Prometheus server, # e.g. remote_write.write_relabel_configs. -[metric_relabel_configs: | default = ] +[metric_relabel_configs: | default = []] # Enables support for exemplars in TSDB and sets the maximum number that will be # stored. less than zero means disabled. If the value is set to zero, cortex @@ -3105,6 +3115,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # alerts will fail with a log message and metric increment. 0 = no limit. # CLI flag: -alertmanager.max-alerts-size-bytes [alertmanager_max_alerts_size_bytes: | default = 0] + +# list of rule groups to disable +[disabled_rule_groups: | default = []] ``` ### `memberlist_config` @@ -3724,7 +3737,7 @@ The `ruler_config` configures the Cortex ruler. [external_url: | default = ] # Labels to add to all alerts. -[external_labels: | default = ] +[external_labels: | default = []] ruler_client: # gRPC client max receive message size (bytes). @@ -4937,3 +4950,21 @@ otel: # CLI flag: -tracing.otel.tls.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] ``` + +### `DisabledRuleGroup` + +```yaml +# namespace in which the rule group belongs +[namespace: | default = ""] + +# name of the rule group +[name: | default = ""] +``` + +### `Label` + +```yaml +[name: | default = ""] + +[value: | default = ""] +``` diff --git a/go.mod b/go.mod index 59b6252d09..2d3bd3850a 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20230816175749-20395bffdf26 github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e - github.com/thanos-io/thanos v0.32.1-0.20230831143954-f75e44ac929c + github.com/thanos-io/thanos v0.32.3-0.20230911095949-f6a39507b6bd github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d go.etcd.io/etcd/api/v3 v3.5.9 diff --git a/go.sum b/go.sum index 533f3e5f50..a89bbfe090 100644 --- a/go.sum +++ b/go.sum @@ -1216,8 +1216,8 @@ github.com/thanos-io/objstore v0.0.0-20230816175749-20395bffdf26 h1:q1lin/af0lw+ github.com/thanos-io/objstore v0.0.0-20230816175749-20395bffdf26/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e h1:kwsFCU8eSkZehbrAN3nXPw5RdMHi/Bok/y8l2C4M+gk= github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e/go.mod h1:+T/ZYNCGybT6eTsGGvVtGb63nT1cvUmH6MjqRrcQoKw= -github.com/thanos-io/thanos v0.32.1-0.20230831143954-f75e44ac929c h1:d5IJk0L61FaewLnGoVLlJb206vMz8WD6ash104tsc2w= -github.com/thanos-io/thanos v0.32.1-0.20230831143954-f75e44ac929c/go.mod h1:J81dp4qaOX+GfPmRoYqu/aZXfEBri7+i3TzY2xamthg= +github.com/thanos-io/thanos v0.32.3-0.20230911095949-f6a39507b6bd h1:JAXqwb/nzY7WzijekZrhrL63m988VLyoFUEaKLU15iA= +github.com/thanos-io/thanos v0.32.3-0.20230911095949-f6a39507b6bd/go.mod h1:J81dp4qaOX+GfPmRoYqu/aZXfEBri7+i3TzY2xamthg= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/integration/ruler_test.go b/integration/ruler_test.go index 9413e75c95..f8de7223d0 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -4,6 +4,7 @@ package integration import ( + "bytes" "context" "crypto/x509" "crypto/x509/pkix" @@ -29,6 +30,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore/providers/s3" "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/integration/ca" @@ -915,6 +917,127 @@ func TestRulerMetricsWhenIngesterFails(t *testing.T) { }) } +func TestRulerDisablesRuleGroups(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + const blockRangePeriod = 2 * time.Second + // Configure the ruler. + flags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(), + map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "false", + // Evaluate rules often, so that we don't need to wait for metrics to show up. + "-ruler.evaluation-interval": "2s", + "-ruler.poll-interval": "2s", + // No delay + "-ruler.evaluation-delay-duration": "0", + + // We run single ingester only, no replication. + "-distributor.replication-factor": "1", + + // Very low limit so that ruler hits it. + "-querier.max-fetched-chunks-per-query": "15", + "-querier.query-store-after": (1 * time.Second).String(), + "-querier.query-ingesters-within": (2 * time.Second).String(), + }, + ) + + const namespace = "test" + const user = "user" + configFileName := "runtime-config.yaml" + bucketName := "cortex" + + storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(), + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": bucketName, + "-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-runtime-config.s3.insecure": "true", + "-runtime-config.file": configFileName, + "-runtime-config.reload-period": "2s", + }) + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: bucketName, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "runtime-config-test") + + require.NoError(t, err) + + // update runtime config + newRuntimeConfig := []byte(`overrides: + user: + disabled_rule_groups: + - name: bad_rule + namespace: test`) + require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig))) + time.Sleep(2 * time.Second) + + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "") + + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway)) + + // Wait until both the distributor and ruler have updated the ring. The querier will also watch + // the store-gateway ring if blocks sharding is enabled. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + expression := "absent(sum_over_time(metric{}[2s] offset 1h))" + + t.Run("disable_rule_group", func(t *testing.T) { + + ruleGroup := ruleGroupWithRule("bad_rule", "rule", expression) + ruleGroup.Interval = 2 + require.NoError(t, c.SetRuleGroup(ruleGroup, namespace)) + + ruleGroup = ruleGroupWithRule("good_rule", "rule", expression) + ruleGroup.Interval = 2 + require.NoError(t, c.SetRuleGroup(ruleGroup, namespace)) + + m1 := ruleGroupMatcher(user, namespace, "good_rule") + + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_sync_rules_total"}, e2e.WaitMissingMetrics)) + + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m1), e2e.WaitMissingMetrics)) + + filter := e2ecortex.RuleFilter{} + actualGroups, err := c.GetPrometheusRules(filter) + require.NoError(t, err) + assert.Equal(t, 1, len(actualGroups)) + assert.Equal(t, "good_rule", actualGroups[0].Name) + assert.Equal(t, "test", actualGroups[0].File) + }) +} + func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher { return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName)) } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c78d895236..5729a82a17 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -1116,6 +1116,12 @@ func isRetryableError(err error) bool { switch status.Code(err) { case codes.Unavailable: return true + case codes.ResourceExhausted: + return errors.Is(err, storegateway.ErrTooManyInflightRequests) + // Client side connection closing, this error happens during store gateway deployment. + // https://github.com/grpc/grpc-go/blob/03172006f5d168fc646d87928d85cb9c4a480291/clientconn.go#L67 + case codes.Canceled: + return strings.Contains(err.Error(), "grpc: the client connection is closing") default: return false } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 26935a1f39..a01d4f2893 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" @@ -638,6 +639,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them fails to return due to clientconn closing": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.Canceled, "grpc: the client connection is closing"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, "all store-gateways return PermissionDenied": { finderResult: bucketindex.Blocks{ {ID: block1}, @@ -708,6 +738,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them had too many inflight requests": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: storegateway.ErrTooManyInflightRequests, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, + "store gateway returns resource exhausted error other than max inflight request": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"), + }, } for testName, testData := range tests { diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 879f3f82e3..e8e597f515 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -5,6 +5,8 @@ import ( "errors" "time" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -142,6 +144,7 @@ type RulesLimits interface { RulerTenantShardSize(userID string) int RulerMaxRuleGroupsPerTenant(userID string) int RulerMaxRulesPerRuleGroup(userID string) int + DisabledRuleGroups(userID string) validation.DisabledRuleGroups } // EngineQueryFunc returns a new engine query function by passing an altered timestamp. diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ae5089a1b4..e90c9b7e2c 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -71,6 +71,14 @@ const ( recordingRuleFilter string = "record" ) +type DisabledRuleGroupErr struct { + Message string +} + +func (e *DisabledRuleGroupErr) Error() string { + return e.Message +} + // Config is the configuration for the recording rules server. type Config struct { // This is used for template expansion in alerts; must be a valid URL. @@ -400,6 +408,17 @@ func SendAlerts(n sender, externalURL string) promRules.NotifyFunc { } } +func ruleGroupDisabled(ruleGroup *rulespb.RuleGroupDesc, disabledRuleGroupsForUser validation.DisabledRuleGroups) bool { + for _, disabledRuleGroupForUser := range disabledRuleGroupsForUser { + if ruleGroup.Namespace == disabledRuleGroupForUser.Namespace && + ruleGroup.Name == disabledRuleGroupForUser.Name && + ruleGroup.User == disabledRuleGroupForUser.User { + return true + } + } + return false +} + var sep = []byte("/") func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 { @@ -415,7 +434,8 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 { return ringHasher.Sum32() } -func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAddr string) (bool, error) { +func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) { + hash := tokenForGroup(g) rlrs, err := r.Get(hash, RingOp, nil, nil, nil) @@ -423,7 +443,12 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAd return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } - return rlrs.Instances[0].Addr == instanceAddr, nil + ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr + if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { + return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} + } + + return ownsRuleGroup, nil } func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -533,7 +558,26 @@ func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGr } func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { - return r.store.ListAllRuleGroups(ctx) + allRuleGroups, err := r.store.ListAllRuleGroups(ctx) + if err != nil { + return nil, err + } + for userID, groups := range allRuleGroups { + disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID) + if len(disabledRuleGroupsForUser) == 0 { + continue + } + filteredGroupsForUser := rulespb.RuleGroupList{} + for _, group := range groups { + if !ruleGroupDisabled(group, disabledRuleGroupsForUser) { + filteredGroupsForUser = append(filteredGroupsForUser, group) + } else { + level.Info(r.logger).Log("msg", "rule group disabled", "name", group.Name, "namespace", group.Namespace, "user", group.User) + } + } + allRuleGroups[userID] = filteredGroupsForUser + } + return allRuleGroups, nil } func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { @@ -544,7 +588,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp filteredConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { - filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) if len(filtered) > 0 { filteredConfigs[userID] = filtered } @@ -602,7 +646,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } - filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) if len(filtered) == 0 { continue } @@ -624,15 +668,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp // // Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, // but only ring passed as parameter. -func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { +func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { // Prune the rule group to only contain rules that this ruler is responsible for, based on ring. var result []*rulespb.RuleGroupDesc for _, g := range ruleGroups { - owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr) + owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr) if err != nil { - ringCheckErrors.Inc() - level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) - continue + switch e := err.(type) { + case *DisabledRuleGroupErr: + level.Info(log).Log("msg", e.Message) + continue + default: + ringCheckErrors.Inc() + level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + continue + } } if owned { diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index c8034ee62f..4d89b9d693 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -85,6 +85,7 @@ type ruleLimits struct { tenantShard int maxRulesPerRuleGroup int maxRuleGroups int + disabledRuleGroups validation.DisabledRuleGroups } func (r ruleLimits) EvaluationDelay(_ string) time.Duration { @@ -103,6 +104,10 @@ func (r ruleLimits) RulerMaxRulesPerRuleGroup(_ string) int { return r.maxRulesPerRuleGroup } +func (r ruleLimits) DisabledRuleGroups(userID string) validation.DisabledRuleGroups { + return r.disabledRuleGroups +} + func newEmptyQueryable() storage.Queryable { return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return emptyQuerier{}, nil @@ -1481,3 +1486,245 @@ func TestRecoverAlertsPostOutage(t *testing.T) { require.Equal(t, promRules.StateFiring, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) } + +func TestRulerDisablesRuleGroups(t *testing.T) { + const ( + ruler1 = "ruler-1" + ruler1Host = "1.1.1.1" + ruler1Port = 9999 + ruler1Addr = "1.1.1.1:9999" + + ruler2 = "ruler-2" + ruler2Host = "2.2.2.2" + ruler2Port = 9999 + ruler2Addr = "2.2.2.2:9999" + + ruler3 = "ruler-3" + ruler3Host = "3.3.3.3" + ruler3Port = 9999 + ruler3Addr = "3.3.3.3:9999" + ) + const ( + user1 = "user1" + user2 = "user2" + user3 = "user3" + ) + + user1Group1 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace1", Name: "group1"} + user1Group2 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace1", Name: "group2"} + user2Group1 := &rulespb.RuleGroupDesc{User: user2, Namespace: "namespace1", Name: "group1"} + user3Group1 := &rulespb.RuleGroupDesc{User: user3, Namespace: "namespace1", Name: "group1"} + + user1Group1Token := tokenForGroup(user1Group1) + user1Group2Token := tokenForGroup(user1Group2) + user2Group1Token := tokenForGroup(user2Group1) + user3Group1Token := tokenForGroup(user3Group1) + + d := &querier.MockDistributor{} + d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + model.Matrix{ + &model.SampleStream{ + Values: []model.SamplePair{}, + }, + }, nil) + querierConfig := querier.DefaultQuerierConfig() + querierConfig.IngesterStreaming = false + + ruleGroupDesc := func(user, name, namespace string) *rulespb.RuleGroupDesc { + return &rulespb.RuleGroupDesc{ + Name: name, + Namespace: namespace, + User: user, + } + } + + ruleGroupWithRule := func(expr, user, name, namespace string) *rulespb.RuleGroupDesc { + rg := ruleGroupDesc(user, name, namespace) + rg.Rules = []*rulespb.RuleDesc{ + { + Record: "RecordingRule", + Expr: expr, + }, + } + return rg + } + + disabledRuleGroups := validation.DisabledRuleGroups{ + validation.DisabledRuleGroup{ + Namespace: "namespace1", + Name: "group1", + User: "user1", + }, + } + + for _, tc := range []struct { + name string + rules map[string]rulespb.RuleGroupList + expectedRuleGroupsForUser map[string]rulespb.RuleGroupList + sharding bool + shardingStrategy string + setupRing func(*ring.Desc) + disabledRuleGroups validation.DisabledRuleGroups + }{ + { + name: "disables rule group - shuffle sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: true, + shardingStrategy: util.ShardingStrategyShuffle, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + disabledRuleGroups: disabledRuleGroups, + }, + { + name: "disables rule group - no sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: false, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + disabledRuleGroups: disabledRuleGroups, + }, + { + name: "disables rule group - default sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + disabledRuleGroups: disabledRuleGroups, + }, + { + name: "disables rule group - default sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupDesc("user1", "group1", "namespace1")}, + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler { + store := newMockRuleStore(tc.rules) + cfg := Config{ + EnableSharding: tc.sharding, + ShardingStrategy: tc.shardingStrategy, + Ring: RingConfig{ + InstanceID: id, + InstanceAddr: host, + InstancePort: port, + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 1 * time.Minute, + }, + FlushCheckPeriod: 0, + } + + r := buildRuler(t, cfg, nil, store, nil) + r.limits = ruleLimits{evalDelay: 0, tenantShard: 3, disabledRuleGroups: tc.disabledRuleGroups} + + if forceRing != nil { + r.ring = forceRing + } + return r + } + + r1 := setupRuler(ruler1, ruler1Host, ruler1Port, nil) + + rulerRing := r1.ring + + // We start ruler's ring, but nothing else (not even lifecycler). + if rulerRing != nil { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), rulerRing)) + t.Cleanup(rulerRing.StopAsync) + } + + var r2, r3 *Ruler + if rulerRing != nil { + // Reuse ring from r1. + r2 = setupRuler(ruler2, ruler2Host, ruler2Port, rulerRing) + r3 = setupRuler(ruler3, ruler3Host, ruler3Port, rulerRing) + } + + if tc.setupRing != nil { + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + + tc.setupRing(d) + + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + } + + actualRules := map[string]rulespb.RuleGroupList{} + loadedRules, err := r1.listRules(context.Background()) + require.NoError(t, err) + for k, v := range loadedRules { + if len(v) > 0 { + actualRules[k] = v + } + } + + fetchRules := func(id string, r *Ruler) { + // Only expect rules from other rulers when using ring, and they are present in the ring. + if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { + loaded, err := r.listRules(context.Background()) + require.NoError(t, err) + + // Normalize nil map to empty one. + if loaded == nil { + loaded = map[string]rulespb.RuleGroupList{} + } + for k, v := range loaded { + actualRules[k] = v + } + } + } + + fetchRules(ruler2, r2) + fetchRules(ruler3, r3) + + require.Equal(t, tc.expectedRuleGroupsForUser, actualRules) + }) + } +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 73897cc112..e8af5e1c41 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -241,6 +241,7 @@ type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` SyncInterval time.Duration `yaml:"sync_interval"` MaxConcurrent int `yaml:"max_concurrent"` + MaxInflightRequests int `yaml:"max_inflight_requests"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` @@ -261,6 +262,9 @@ type BucketStoreConfig struct { IndexHeaderLazyLoadingEnabled bool `yaml:"index_header_lazy_loading_enabled"` IndexHeaderLazyLoadingIdleTimeout time.Duration `yaml:"index_header_lazy_loading_idle_timeout"` + // Controls whether lazy expanded posting optimization is enabled or not. + LazyExpandedPostingsEnabled bool `yaml:"lazy_expanded_postings_enabled"` + // Controls the partitioner, used to aggregate multiple GET object API requests. // The config option is hidden until experimental. PartitionerMaxGapBytes uint64 `yaml:"partitioner_max_gap_bytes" doc:"hidden"` @@ -291,6 +295,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ChunkPoolMinBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-min-bucket-size-bytes", ChunkPoolDefaultMinBucketSize, "Size - in bytes - of the smallest chunks pool bucket.") f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") + f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") @@ -305,6 +310,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", store.PartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.") f.Uint64Var(&cfg.EstimatedMaxSeriesSizeBytes, "blocks-storage.bucket-store.estimated-max-series-size-bytes", store.EstimatedMaxSeriesSize, "Estimated max series size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 64KB.") f.Uint64Var(&cfg.EstimatedMaxChunkSizeBytes, "blocks-storage.bucket-store.estimated-max-chunk-size-bytes", store.EstimatedMaxChunkSize, "Estimated max chunk size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.") + f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.") } // Validate the config. diff --git a/pkg/storegateway/bucket_store_metrics.go b/pkg/storegateway/bucket_store_metrics.go index e0f728ca8a..3977b85480 100644 --- a/pkg/storegateway/bucket_store_metrics.go +++ b/pkg/storegateway/bucket_store_metrics.go @@ -42,6 +42,10 @@ type BucketStoreMetrics struct { postingsFetchDuration *prometheus.Desc chunkFetchDuration *prometheus.Desc + lazyExpandedPostingsCount *prometheus.Desc + lazyExpandedPostingSizeBytes *prometheus.Desc + lazyExpandedPostingSeriesOverfetchedSizeBytes *prometheus.Desc + indexHeaderLazyLoadCount *prometheus.Desc indexHeaderLazyLoadFailedCount *prometheus.Desc indexHeaderLazyUnloadCount *prometheus.Desc @@ -185,6 +189,19 @@ func NewBucketStoreMetrics() *BucketStoreMetrics { "cortex_bucket_store_indexheader_lazy_load_duration_seconds", "Duration of the index-header lazy loading in seconds.", nil, nil), + + lazyExpandedPostingsCount: prometheus.NewDesc( + "cortex_bucket_store_lazy_expanded_postings_total", + "Total number of lazy expanded postings when fetching block series.", + nil, nil), + lazyExpandedPostingSizeBytes: prometheus.NewDesc( + "cortex_bucket_store_lazy_expanded_posting_size_bytes_total", + "Total number of lazy posting group size in bytes.", + nil, nil), + lazyExpandedPostingSeriesOverfetchedSizeBytes: prometheus.NewDesc( + "cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total", + "Total number of series size in bytes overfetched due to posting lazy expansion.", + nil, nil), } } @@ -232,6 +249,10 @@ func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) { out <- m.indexHeaderLazyUnloadCount out <- m.indexHeaderLazyUnloadFailedCount out <- m.indexHeaderLazyLoadDuration + + out <- m.lazyExpandedPostingsCount + out <- m.lazyExpandedPostingSizeBytes + out <- m.lazyExpandedPostingSeriesOverfetchedSizeBytes } func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) { @@ -275,4 +296,8 @@ func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCounters(out, m.indexHeaderLazyUnloadCount, "thanos_bucket_store_indexheader_lazy_unload_total") data.SendSumOfCounters(out, m.indexHeaderLazyUnloadFailedCount, "thanos_bucket_store_indexheader_lazy_unload_failed_total") data.SendSumOfHistograms(out, m.indexHeaderLazyLoadDuration, "thanos_bucket_store_indexheader_lazy_load_duration_seconds") + + data.SendSumOfCounters(out, m.lazyExpandedPostingsCount, "thanos_bucket_store_lazy_expanded_postings_total") + data.SendSumOfCounters(out, m.lazyExpandedPostingSizeBytes, "thanos_bucket_store_lazy_expanded_posting_size_bytes_total") + data.SendSumOfCounters(out, m.lazyExpandedPostingSeriesOverfetchedSizeBytes, "thanos_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total") } diff --git a/pkg/storegateway/bucket_store_metrics_test.go b/pkg/storegateway/bucket_store_metrics_test.go index e0d4bc3243..33061ca25c 100644 --- a/pkg/storegateway/bucket_store_metrics_test.go +++ b/pkg/storegateway/bucket_store_metrics_test.go @@ -492,6 +492,15 @@ func TestBucketStoreMetrics(t *testing.T) { # HELP cortex_bucket_store_indexheader_lazy_unload_total Total number of index-header lazy unload operations. # TYPE cortex_bucket_store_indexheader_lazy_unload_total counter cortex_bucket_store_indexheader_lazy_unload_total 1.396178e+06 + # HELP cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total Total number of series size in bytes overfetched due to posting lazy expansion. + # TYPE cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total counter + cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total 180152 + # HELP cortex_bucket_store_lazy_expanded_posting_size_bytes_total Total number of lazy posting group size in bytes. + # TYPE cortex_bucket_store_lazy_expanded_posting_size_bytes_total counter + cortex_bucket_store_lazy_expanded_posting_size_bytes_total 157633 + # HELP cortex_bucket_store_lazy_expanded_postings_total Total number of lazy expanded postings when fetching block series. + # TYPE cortex_bucket_store_lazy_expanded_postings_total counter + cortex_bucket_store_lazy_expanded_postings_total 135114 # HELP cortex_bucket_store_postings_size_bytes Size in bytes of the postings for a single series call. # TYPE cortex_bucket_store_postings_size_bytes histogram cortex_bucket_store_postings_size_bytes_bucket{le="32"} 0 @@ -620,6 +629,10 @@ func populateMockedBucketStoreMetrics(base float64) *prometheus.Registry { m.emptyPostingCount.Add(5 * base) + m.lazyExpandedPostingsCount.Add(6 * base) + m.lazyExpandedPostingSizeBytes.Add(7 * base) + m.lazyExpandedPostingSeriesOverfetchedSizeBytes.Add(8 * base) + return reg } @@ -660,6 +673,10 @@ type mockedBucketStoreMetrics struct { indexHeaderLazyUnloadCount prometheus.Counter indexHeaderLazyUnloadFailedCount prometheus.Counter indexHeaderLazyLoadDuration prometheus.Histogram + + lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingSizeBytes prometheus.Counter + lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter } func newMockedBucketStoreMetrics(reg prometheus.Registerer) *mockedBucketStoreMetrics { @@ -822,5 +839,20 @@ func newMockedBucketStoreMetrics(reg prometheus.Registerer) *mockedBucketStoreMe Help: "Total number of empty postings when fetching block series.", }) + m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_postings_total", + Help: "Total number of times when lazy expanded posting optimization applies.", + }) + + m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total", + Help: "Total number of lazy posting group size in bytes.", + }) + + m.lazyExpandedPostingSeriesOverfetchedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total", + Help: "Total number of series size in bytes overfetched due to posting lazy expansion.", + }) + return &m } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index cc5cb5a527..d7c709c4ec 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -31,6 +31,7 @@ import ( "github.com/weaveworks/common/logging" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -72,6 +73,10 @@ type BucketStores struct { storesErrorsMu sync.RWMutex storesErrors map[string]error + // Keeps number of inflight requests + inflightRequestCnt int + inflightRequestMu sync.RWMutex + // Metrics. syncTimes prometheus.Histogram syncLastSuccess prometheus.Gauge @@ -79,6 +84,8 @@ type BucketStores struct { tenantsSynced prometheus.Gauge } +var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") + // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) @@ -313,6 +320,16 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return nil } + maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests + if maxInflightRequests > 0 { + if u.getInflightRequestCnt() >= maxInflightRequests { + return ErrTooManyInflightRequests + } + + u.incrementInflightRequestCnt() + defer u.decrementInflightRequestCnt() + } + err = store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, @@ -321,6 +338,24 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return err } +func (u *BucketStores) getInflightRequestCnt() int { + u.inflightRequestMu.RLock() + defer u.inflightRequestMu.RUnlock() + return u.inflightRequestCnt +} + +func (u *BucketStores) incrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt++ + u.inflightRequestMu.Unlock() +} + +func (u *BucketStores) decrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt-- + u.inflightRequestMu.Unlock() +} + // LabelNames implements the Storegateway proto service. func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") @@ -552,6 +587,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro } return u.cfg.BucketStore.EstimatedMaxSeriesSizeBytes }), + store.WithLazyExpandedPostings(u.cfg.BucketStore.LazyExpandedPostingsEnabled), } if u.logLevel.String() == "debug" { bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging()) diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 7cb3188e74..1b9b488768 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -514,6 +514,48 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t } } +func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.MaxInflightRequests = 10 + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 + stores.inflightRequestMu.Unlock() + series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100) + assert.ErrorIs(t, err, ErrTooManyInflightRequests) + assert.Empty(t, series) + assert.Empty(t, warnings) +} + +func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) { + cfg := prepareStorageConfig(t) + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 // max_inflight_request is set to 0 by default = disabled + stores.inflightRequestMu.Unlock() + series, _, err := querySeries(stores, "user_id", "series_1", 0, 100) + require.NoError(t, err) + assert.Equal(t, 1, len(series)) +} + func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig { cfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&cfg) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index cdf4930d8f..fe99a32fa1 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -39,6 +39,10 @@ const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 + + instanceLimitsMetric = "cortex_storegateway_instance_limits" + instanceLimitsMetricHelp = "Instance limits used by this store gateway." + limitLabel = "limit" ) var ( @@ -142,6 +146,22 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf g.bucketSync.WithLabelValues(syncReasonPeriodic) g.bucketSync.WithLabelValues(syncReasonRingChange) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_inflight_requests"}, + }).Set(float64(storageCfg.BucketStore.MaxInflightRequests)) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_concurrent"}, + }).Set(float64(storageCfg.BucketStore.MaxConcurrent)) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"}, + }).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes)) + // Init sharding strategy. var shardingStrategy ShardingStrategy diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 587ce75f62..ca05c8cc10 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -38,6 +38,14 @@ func (e LimitError) Error() string { return string(e) } +type DisabledRuleGroup struct { + Namespace string `yaml:"namespace" doc:"nocli|description=namespace in which the rule group belongs"` + Name string `yaml:"name" doc:"nocli|description=name of the rule group"` + User string `yaml:"-" doc:"nocli"` +} + +type DisabledRuleGroups []DisabledRuleGroup + // Limits describe all the limits for users; can be used to describe global default // limits via flags, or per-user limits via yaml config. type Limits struct { @@ -124,12 +132,13 @@ type Limits struct { NotificationRateLimit float64 `yaml:"alertmanager_notification_rate_limit" json:"alertmanager_notification_rate_limit"` NotificationRateLimitPerIntegration NotificationRateLimitMap `yaml:"alertmanager_notification_rate_limit_per_integration" json:"alertmanager_notification_rate_limit_per_integration"` - AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"` - AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"` - AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"` - AlertmanagerMaxDispatcherAggregationGroups int `yaml:"alertmanager_max_dispatcher_aggregation_groups" json:"alertmanager_max_dispatcher_aggregation_groups"` - AlertmanagerMaxAlertsCount int `yaml:"alertmanager_max_alerts_count" json:"alertmanager_max_alerts_count"` - AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"` + AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"` + AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"` + AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"` + AlertmanagerMaxDispatcherAggregationGroups int `yaml:"alertmanager_max_dispatcher_aggregation_groups" json:"alertmanager_max_dispatcher_aggregation_groups"` + AlertmanagerMaxAlertsCount int `yaml:"alertmanager_max_alerts_count" json:"alertmanager_max_alerts_count"` + AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"` + DisabledRuleGroups DisabledRuleGroups `yaml:"disabled_rule_groups" json:"disabled_rule_groups" doc:"nocli|description=list of rule groups to disable"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -681,6 +690,26 @@ func (o *Overrides) AlertmanagerMaxAlertsSizeBytes(userID string) int { return o.GetOverridesForUser(userID).AlertmanagerMaxAlertsSizeBytes } +func (o *Overrides) DisabledRuleGroups(userID string) DisabledRuleGroups { + if o.tenantLimits != nil { + l := o.tenantLimits.ByUserID(userID) + if l != nil { + disabledRuleGroupsForUser := make(DisabledRuleGroups, len(l.DisabledRuleGroups)) + + for i, disabledRuleGroup := range l.DisabledRuleGroups { + disabledRuleGroupForUser := DisabledRuleGroup{ + Namespace: disabledRuleGroup.Namespace, + Name: disabledRuleGroup.Name, + User: userID, + } + disabledRuleGroupsForUser[i] = disabledRuleGroupForUser + } + return disabledRuleGroupsForUser + } + } + return DisabledRuleGroups{} +} + // GetOverridesForUser returns the per-tenant limits with overrides. func (o *Overrides) GetOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { diff --git a/tools/doc-generator/main.go b/tools/doc-generator/main.go index 65ee0b8212..1fc1cf4701 100644 --- a/tools/doc-generator/main.go +++ b/tools/doc-generator/main.go @@ -46,6 +46,11 @@ const ( ) var ( + typesToIgnoreCLI = map[string]bool{ + "labels.Label": true, + "flagext.CIDR": true, + } + // Ordered list of root blocks. The order is the same order that will // follow the markdown generation. rootBlocks = []rootBlock{ diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index a8fb2133b2..3ebb313368 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -112,7 +112,7 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl } // Skip field types which are non configurable - if field.Type.Kind() == reflect.Func { + if field.Type.Kind() == reflect.Func || field.Type.Kind() == reflect.Pointer { continue } @@ -123,7 +123,7 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl } // Handle custom fields in vendored libs upon which we have no control. - fieldEntry, err := getCustomFieldEntry(field, fieldValue, flags) + fieldEntry, err := getCustomFieldEntry(t, field, fieldValue, flags) if err != nil { return nil, err } @@ -189,17 +189,45 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl return nil, errors.Wrapf(err, "config=%s.%s", t.PkgPath(), t.Name()) } - fieldFlag, err := getFieldFlag(field, fieldValue, flags) + fieldDefault := "" + if field.Type.Kind() == reflect.Slice { + sliceElementType := field.Type.Elem() + if sliceElementType.Kind() == reflect.Struct { + rootBlocks = append(rootBlocks, rootBlock{ + name: field.Type.Elem().Name(), + structType: field.Type.Elem(), + }) + sliceElementBlock := &configBlock{ + name: field.Type.Elem().Name(), + desc: "", + } + sliceElementCfg := reflect.New(sliceElementType).Interface() + otherBlocks, err := parseConfig(sliceElementBlock, sliceElementCfg, flags) + + if err != nil { + return nil, err + } + if len(sliceElementBlock.entries) > 0 { + blocks = append(blocks, sliceElementBlock) + } + + blocks = append(blocks, otherBlocks...) + } + fieldDefault = "[]" + } + + fieldFlag, err := getFieldFlag(t, field, fieldValue, flags) if err != nil { return nil, errors.Wrapf(err, "config=%s.%s", t.PkgPath(), t.Name()) } if fieldFlag == nil { block.Add(&configEntry{ - kind: "field", - name: fieldName, - required: isFieldRequired(field), - fieldDesc: getFieldDescription(field, ""), - fieldType: fieldType, + kind: "field", + name: fieldName, + required: isFieldRequired(field), + fieldDesc: getFieldDescription(field, ""), + fieldType: fieldType, + fieldDefault: fieldDefault, }) continue } @@ -213,6 +241,7 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl fieldType: fieldType, fieldDefault: fieldFlag.DefValue, }) + } return blocks, nil @@ -257,15 +286,14 @@ func getFieldType(t reflect.Type) (string, error) { return "string", nil case "[]*relabel.Config": return "relabel_config...", nil - case "labels.Labels": - return "map of string to string", nil } // Fallback to auto-detection of built-in data types switch t.Kind() { + case reflect.Struct: + return t.Name(), nil case reflect.Bool: return "boolean", nil - case reflect.Int: fallthrough case reflect.Int8: @@ -312,8 +340,9 @@ func getFieldType(t reflect.Type) (string, error) { } } -func getFieldFlag(field reflect.StructField, fieldValue reflect.Value, flags map[uintptr]*flag.Flag) (*flag.Flag, error) { - if isAbsentInCLI(field) { +func getFieldFlag(parent reflect.Type, field reflect.StructField, fieldValue reflect.Value, flags map[uintptr]*flag.Flag) (*flag.Flag, error) { + + if isAbsentInCLI(field) || ignoreCLI(parent) { return nil, nil } fieldPtr := fieldValue.Addr().Pointer() @@ -325,9 +354,9 @@ func getFieldFlag(field reflect.StructField, fieldValue reflect.Value, flags map return fieldFlag, nil } -func getCustomFieldEntry(field reflect.StructField, fieldValue reflect.Value, flags map[uintptr]*flag.Flag) (*configEntry, error) { +func getCustomFieldEntry(parent reflect.Type, field reflect.StructField, fieldValue reflect.Value, flags map[uintptr]*flag.Flag) (*configEntry, error) { if field.Type == reflect.TypeOf(logging.Level{}) || field.Type == reflect.TypeOf(logging.Format{}) { - fieldFlag, err := getFieldFlag(field, fieldValue, flags) + fieldFlag, err := getFieldFlag(parent, field, fieldValue, flags) if err != nil { return nil, err } @@ -343,7 +372,7 @@ func getCustomFieldEntry(field reflect.StructField, fieldValue reflect.Value, fl }, nil } if field.Type == reflect.TypeOf(flagext.URLValue{}) { - fieldFlag, err := getFieldFlag(field, fieldValue, flags) + fieldFlag, err := getFieldFlag(parent, field, fieldValue, flags) if err != nil { return nil, err } @@ -359,7 +388,7 @@ func getCustomFieldEntry(field reflect.StructField, fieldValue reflect.Value, fl }, nil } if field.Type == reflect.TypeOf(flagext.Secret{}) { - fieldFlag, err := getFieldFlag(field, fieldValue, flags) + fieldFlag, err := getFieldFlag(parent, field, fieldValue, flags) if err != nil { return nil, err } @@ -375,7 +404,7 @@ func getCustomFieldEntry(field reflect.StructField, fieldValue reflect.Value, fl }, nil } if field.Type == reflect.TypeOf(model.Duration(0)) { - fieldFlag, err := getFieldFlag(field, fieldValue, flags) + fieldFlag, err := getFieldFlag(parent, field, fieldValue, flags) if err != nil { return nil, err } @@ -391,7 +420,7 @@ func getCustomFieldEntry(field reflect.StructField, fieldValue reflect.Value, fl }, nil } if field.Type == reflect.TypeOf(flagext.Time{}) { - fieldFlag, err := getFieldFlag(field, fieldValue, flags) + fieldFlag, err := getFieldFlag(parent, field, fieldValue, flags) if err != nil { return nil, err } @@ -418,6 +447,13 @@ func isAbsentInCLI(f reflect.StructField) bool { return getDocTagFlag(f, "nocli") } +func ignoreCLI(f reflect.Type) bool { + if ignore, OK := typesToIgnoreCLI[f.String()]; OK && ignore { + return true + } + return false +} + func isFieldRequired(f reflect.StructField) bool { return getDocTagFlag(f, "required") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go index 1befe63a7f..16ef73ac3b 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go @@ -47,6 +47,8 @@ const ( postingLengthFieldSize = 4 ) +var NotFoundRange = index.Range{Start: -1, End: -1} + // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) { return r.indexVersion, nil } +// PostingsOffsets implements Reader. +func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + return r.postingsOffset(name, values...) +} + // TODO(bwplotka): Get advantage of multi value offset fetch. func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) { rngs, err := r.postingsOffset(name, value) if err != nil { return index.Range{}, err } - if len(rngs) != 1 { + if len(rngs) != 1 || rngs[0] == NotFoundRange { return index.Range{}, NotFoundRangeErr } return rngs[0], nil @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra valueIndex := 0 for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value { // Discard values before the start. + rngs = append(rngs, NotFoundRange) valueIndex++ } @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue }) if i == len(e.offsets) { // We're past the end. + for len(rngs) < len(values) { + rngs = append(rngs, NotFoundRange) + } break } if i > 0 && e.offsets[i].value != wantedValue { @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra // Record on the way if wanted value is equal to the current value. if string(value) == wantedValue { newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize}) + } else { + rngs = append(rngs, NotFoundRange) } valueIndex++ if valueIndex == len(values) { @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra } if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value { + // Increment i when wanted value is same as next offset. + if wantedValue == e.offsets[i+1].value { + i++ + } // wantedValue is smaller or same as the next offset we know about, let's iterate further to add those. continue } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/header.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/header.go index 8ecef33564..d0b4141afd 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/header.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/header.go @@ -20,10 +20,16 @@ type Reader interface { // IndexVersion returns version of index. IndexVersion() (int, error) + // PostingsOffsets returns start and end offsets for postings for given name and values. + // Input values need to be sorted. + // If the requested label name doesn't exist, then no posting and error will be returned. + // If the requested label name exists, but some values don't exist, the corresponding index range + // will be set to -1 for both start and end. + PostingsOffsets(name string, value ...string) ([]index.Range, error) + // PostingsOffset returns start and end offsets of postings for given name and value. // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. // NotFoundRangeErr is returned when no index can be found for given name and value. - // TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark. PostingsOffset(name string, value string) (index.Range, error) // LookupSymbol returns string based on given reference. diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go index c3bee382c2..451a79b6ee 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) { return r.reader.IndexVersion() } +// PostingsOffsets implements Reader. +func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return nil, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.PostingsOffsets(name, values...) +} + // PostingsOffset implements Reader. func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) { r.readerMx.RLock() diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 0d780fc2c7..4a8eae4572 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -138,6 +138,10 @@ type bucketStoreMetrics struct { chunkRefetches prometheus.Counter emptyPostingCount prometheus.Counter + lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingSizeBytes prometheus.Counter + lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter + cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec cachedPostingsCompressionTimeSeconds *prometheus.CounterVec @@ -302,6 +306,21 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of empty postings when fetching block series.", }) + m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_postings_total", + Help: "Total number of times when lazy expanded posting optimization applies.", + }) + + m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total", + Help: "Total number of lazy posting group size in bytes.", + }) + + m.lazyExpandedPostingSeriesOverfetchedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total", + Help: "Total number of series size in bytes overfetched due to posting lazy expansion.", + }) + return &m } @@ -366,6 +385,8 @@ type BucketStore struct { enableChunkHashCalculation bool + enabledLazyExpandedPostings bool + bmtx sync.Mutex labelNamesSet stringset.Set @@ -473,6 +494,13 @@ func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { } } +// WithLazyExpandedPostings enables lazy expanded postings. +func WithLazyExpandedPostings(enabled bool) BucketStoreOption { + return func(s *BucketStore) { + s.enabledLazyExpandedPostings = enabled + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -898,20 +926,27 @@ type blockSeriesClient struct { chunksLimiter ChunksLimiter bytesLimiter BytesLimiter + lazyExpandedPostingEnabled bool + lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingSizeBytes prometheus.Counter + lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter + skipChunks bool shardMatcher *storepb.ShardMatcher + blockMatchers []*labels.Matcher calculateChunkHash bool chunkFetchDuration prometheus.Histogram // Internal state. - i uint64 - postings []storage.SeriesRef - chkMetas []chunks.Meta - lset labels.Labels - symbolizedLset []symbolizedLabel - entries []seriesEntry - hasMorePostings bool - batchSize int + i uint64 + lazyPostings *lazyExpandedPostings + expandedPostings []storage.SeriesRef + chkMetas []chunks.Meta + lset labels.Labels + symbolizedLset []symbolizedLabel + entries []seriesEntry + hasMorePostings bool + batchSize int } func newBlockSeriesClient( @@ -921,11 +956,16 @@ func newBlockSeriesClient( req *storepb.SeriesRequest, limiter ChunksLimiter, bytesLimiter BytesLimiter, + blockMatchers []*labels.Matcher, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, chunkFetchDuration prometheus.Histogram, extLsetToRemove map[string]struct{}, + lazyExpandedPostingEnabled bool, + lazyExpandedPostingsCount prometheus.Counter, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -952,8 +992,14 @@ func newBlockSeriesClient( skipChunks: req.SkipChunks, chunkFetchDuration: chunkFetchDuration, + lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + lazyExpandedPostingsCount: lazyExpandedPostingsCount, + lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes, + lazyExpandedPostingSeriesOverfetchedSizeBytes: lazyExpandedPostingSeriesOverfetchedSizeBytes, + loadAggregates: req.Aggregates, shardMatcher: shardMatcher, + blockMatchers: blockMatchers, calculateChunkHash: calculateChunkHash, hasMorePostings: true, batchSize: batchSize, @@ -996,22 +1042,30 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes) if err != nil { return errors.Wrap(err, "expanded matching posting") } - if len(ps) == 0 { + if ps == nil || len(ps.postings) == 0 { + b.lazyPostings = emptyLazyPostings return nil } + b.lazyPostings = ps - if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { + // If lazy expanded posting enabled, it is possible to fetch more series + // so easier to hit the series limit. + if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } - b.postings = ps - if b.batchSize > len(ps) { - b.batchSize = len(ps) + if b.batchSize > len(ps.postings) { + b.batchSize = len(ps.postings) + } + if b.lazyPostings.lazyExpanded() { + // Assume lazy expansion could cut actual expanded postings length to 50%. + b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) + b.lazyExpandedPostingsCount.Inc() } b.entries = make([]seriesEntry, 0, b.batchSize) return nil @@ -1043,14 +1097,26 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + SeriesBatchSize - if end > uint64(len(b.postings)) { - end = uint64(len(b.postings)) + if end > uint64(len(b.lazyPostings.postings)) { + end = uint64(len(b.lazyPostings.postings)) } b.i = end - postingsBatch := b.postings[start:end] + postingsBatch := b.lazyPostings.postings[start:end] if len(postingsBatch) == 0 { b.hasMorePostings = false + if b.lazyPostings.lazyExpanded() { + v, err := b.indexr.IndexVersion() + if err != nil { + return errors.Wrap(err, "get index version") + } + if v >= 2 { + for i := range b.expandedPostings { + b.expandedPostings[i] = b.expandedPostings[i] / 16 + } + } + b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings)) + } return nil } @@ -1064,6 +1130,7 @@ func (b *blockSeriesClient) nextBatch() error { } b.entries = b.entries[:0] +OUTER: for i := 0; i < len(postingsBatch); i++ { if err := b.ctx.Err(); err != nil { return err @@ -1080,6 +1147,19 @@ func (b *blockSeriesClient) nextBatch() error { return errors.Wrap(err, "Lookup labels symbols") } + for _, matcher := range b.lazyPostings.matchers { + val := b.lset.Get(matcher.Name) + if !matcher.Matches(val) { + // Series not matched means series we overfetched due to lazy posting expansion. + seriesBytes := b.indexr.loadedSeries[postingsBatch[i]] + b.lazyExpandedPostingSeriesOverfetchedSizeBytes.Add(float64(len(seriesBytes))) + continue OUTER + } + } + if b.lazyPostings.lazyExpanded() { + b.expandedPostings = append(b.expandedPostings, postingsBatch[i]) + } + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if b.extLsetToRemove != nil { completeLabelset = rmLabels(completeLabelset, b.extLsetToRemove) @@ -1255,7 +1335,6 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) - if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1319,7 +1398,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store if !ok { continue } - + // Sort matchers to make sure we generate the same cache key + // when fetching expanded postings. sortedBlockMatchers := newSortedMatchers(blockMatchers) blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) @@ -1346,11 +1426,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store req, chunksLimiter, bytesLimiter, + sortedBlockMatchers, shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, s.metrics.chunkFetchDuration, extLsetToRemove, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingSizeBytes, + s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, ) defer blockClient.Close() @@ -1370,27 +1455,56 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store mtx.Unlock() } - if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil { + if err := blockClient.ExpandPostings( + sortedBlockMatchers, + seriesLimiter, + ); err != nil { onClose() span.Finish() return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - part := newLazyRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - ) + // If we have inner replica labels we need to resort. + s.mtx.Lock() + needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels) + s.mtx.Unlock() + + var resp respSet + if needsEagerRetrival { + labelsToRemove := make(map[string]struct{}) + for _, replicaLabel := range req.WithoutReplicaLabels { + labelsToRemove[replicaLabel] = struct{}{} + } + resp = newEagerRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + labelsToRemove, + ) + } else { + resp = newLazyRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + ) + } mtx.Lock() - respSets = append(respSets, part) + respSets = append(respSets, resp) mtx.Unlock() return nil @@ -1618,11 +1732,16 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration, nil, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingSizeBytes, + s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, ) defer blockClient.Close() @@ -1846,11 +1965,16 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration, nil, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingSizeBytes, + s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, ) defer blockClient.Close() @@ -2250,6 +2374,8 @@ type bucketIndexReader struct { mtx sync.Mutex loadedSeries map[storage.SeriesRef][]byte + + indexVersion int } func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { @@ -2263,6 +2389,20 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { } return r } + +// IndexVersion caches the index header version. +func (r *bucketIndexReader) IndexVersion() (int, error) { + if r.indexVersion != 0 { + return r.indexVersion, nil + } + v, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return 0, err + } + r.indexVersion = v + return v, nil +} + func (r *bucketIndexReader) reset() { r.loadedSeries = map[storage.SeriesRef][]byte{} } @@ -2276,7 +2416,7 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { @@ -2288,12 +2428,11 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch return nil, err } if hit { - return postings, nil + return newLazyExpandedPostings(postings), nil } var ( allRequested = false hasAdds = false - keys []labels.Label ) postingGroups, err := matchersToPostingGroups(ctx, r.block.indexHeaderReader.LabelValues, ms) @@ -2304,83 +2443,50 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) return nil, nil } + i := 0 for _, pg := range postingGroups { allRequested = allRequested || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) + // If a posting group doesn't have any keys, like posting group created + // from `=~".*"`, we don't have to keep the posting group as long as we + // keep track of whether we need to add all postings or not. + if len(pg.addKeys) == 0 && len(pg.removeKeys) == 0 { + continue } + postingGroups[i] = pg + i++ } + postingGroups = postingGroups[:i] + addAllPostings := allRequested && !hasAdds // We only need special All postings if there are no other adds. If there are, we can skip fetching // special All postings completely. - if allRequested && !hasAdds { + if addAllPostings { // add group with label to fetch "special All postings". name, value := index.AllPostingsKey() - allPostingsLabel := labels.Label{Name: name, Value: value} - postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) - keys = append(keys, allPostingsLabel) } - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) - defer func() { - for _, closeFn := range closeFns { - closeFn() - } - }() + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes) if err != nil { - return nil, errors.Wrap(err, "get postings") - } - - // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys - // again, and this is exactly the same order as before (when building the groups), so we can simply - // use one incrementing index to fetch postings from returned slice. - postingIndex := 0 - - var groupAdds, groupRemovals []index.Postings - for _, g := range postingGroups { - // We cannot add empty set to groupAdds, since they are intersected. - if len(g.addKeys) > 0 { - toMerge := make([]index.Postings, 0, len(g.addKeys)) - for _, l := range g.addKeys { - toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } - - groupAdds = append(groupAdds, index.Merge(toMerge...)) - } - - for _, l := range g.removeKeys { - groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } + return nil, errors.Wrap(err, "fetch and expand postings") } - - result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, errors.Wrap(err, "expand") + // If postings still have matchers to be applied lazily, cache expanded postings after filtering series so skip here. + if !ps.lazyExpanded() { + r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) } - r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps), len(ps)) - if len(ps) > 0 { + if len(ps.postings) > 0 { // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() + version, err := r.IndexVersion() if err != nil { return nil, errors.Wrap(err, "get index version") } if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 + for i, id := range ps.postings { + ps.postings[i] = id * 16 } } } @@ -2403,22 +2509,26 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) (res []sto // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - addKeys []string - removeKeys []string + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { return &postingGroup{ - addAll: addAll, name: name, + addAll: addAll, addKeys: addKeys, removeKeys: removeKeys, } } -func (pg postingGroup) merge(other *postingGroup) *postingGroup { +// mergeKeys merges keys from two posting groups and ignores other fields. +func (pg postingGroup) mergeKeys(other *postingGroup) *postingGroup { if other == nil { return &pg } @@ -2514,12 +2624,16 @@ func checkNilPosting(name, value string, p index.Postings) index.Postings { } func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]string, error), ms []*labels.Matcher) ([]*postingGroup, error) { - matchersMap := make(map[string][]*labels.Matcher) + matchersMap := make(map[string]map[string]*labels.Matcher) for _, m := range ms { - matchersMap[m.Name] = append(matchersMap[m.Name], m) + m := m + if _, ok := matchersMap[m.Name]; !ok { + matchersMap[m.Name] = make(map[string]*labels.Matcher) + } + matchersMap[m.Name][m.String()] = m } - pgs := make([]*postingGroup, 0) + pgs := make([]*postingGroup, 0, len(matchersMap)) // NOTE: Derived from tsdb.PostingsForMatchers. for _, values := range matchersMap { var ( @@ -2530,8 +2644,9 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s valuesCached bool ) lvalsFunc := lvalsFn + matchers := make([]*labels.Matcher, 0, len(vals)) // Merge PostingGroups with the same matcher into 1 to - // avoid fetching duplicate postings. + // avoid fetching duplicate postings. for _, val := range values { pg, vals, err = toPostingGroup(ctx, lvalsFunc, val) if err != nil { @@ -2554,7 +2669,7 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s if mergedPG == nil { mergedPG = pg } else { - mergedPG = mergedPG.merge(pg) + mergedPG = mergedPG.mergeKeys(pg) } // If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty @@ -2563,7 +2678,10 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s if !mergedPG.addAll && len(mergedPG.addKeys) == 0 { return nil, nil } + matchers = append(matchers, val) } + // Set and sort matchers to be used when picking up posting fetch strategy. + mergedPG.matchers = newSortedMatchers(matchers) pgs = append(pgs, mergedPG) } slices.SortFunc(pgs, func(a, b *postingGroup) bool { @@ -2574,13 +2692,6 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(ctx context.Context, lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, []string, error) { - if m.Type == labels.MatchRegexp { - if vals := findSetMatches(m.Value); len(vals) > 0 { - sort.Strings(vals) - return newPostingGroup(false, m.Name, vals, nil), nil, nil - } - } - // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555. @@ -2619,6 +2730,12 @@ func toPostingGroup(ctx context.Context, lvalsFn func(name string) ([]string, er return newPostingGroup(true, m.Name, nil, toRemove), vals, nil } + if m.Type == labels.MatchRegexp { + if vals := findSetMatches(m.Value); len(vals) > 0 { + sort.Strings(vals) + return newPostingGroup(false, m.Name, vals, nil), nil, nil + } + } // Fast-path for equal matching. if m.Type == labels.MatchEqual { diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go new file mode 100644 index 0000000000..2e02836c0c --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go @@ -0,0 +1,272 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "math" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" + "golang.org/x/exp/slices" + + "github.com/thanos-io/thanos/pkg/block/indexheader" +) + +var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil} + +// lazyExpandedPostings contains expanded postings (series IDs). If lazy posting expansion is +// enabled, it might contain matchers that can be lazily applied during series filtering time. +type lazyExpandedPostings struct { + postings []storage.SeriesRef + matchers []*labels.Matcher +} + +func newLazyExpandedPostings(ps []storage.SeriesRef, matchers ...*labels.Matcher) *lazyExpandedPostings { + return &lazyExpandedPostings{ + postings: ps, + matchers: matchers, + } +} + +func (p *lazyExpandedPostings) lazyExpanded() bool { + return p != nil && len(p.matchers) > 0 +} + +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) { + if len(postingGroups) <= 1 { + return postingGroups, false, nil + } + // Collect posting cardinality of each posting group. + for _, pg := range postingGroups { + // A posting group can have either add keys or remove keys but not both the same time. + vals := pg.addKeys + if len(pg.removeKeys) > 0 { + vals = pg.removeKeys + } + rngs, err := r.block.indexHeaderReader.PostingsOffsets(pg.name, vals...) + if err != nil { + return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) + } + + // No posting ranges found means empty posting. + if len(rngs) == 0 { + return nil, true, nil + } + for _, r := range rngs { + if r == indexheader.NotFoundRange { + continue + } + // Each range starts from the #entries field which is 4 bytes. + // Need to subtract it when calculating number of postings. + // https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md. + pg.cardinality += (r.End - r.Start - 4) / 4 + } + } + slices.SortFunc(postingGroups, func(a, b *postingGroup) bool { + if a.cardinality == b.cardinality { + return a.name < b.name + } + return a.cardinality < b.cardinality + }) + + /* + Algorithm of choosing what postings we need to fetch right now and what + postings we expand lazily. + Sort posting groups by cardinality, so we can iterate from posting group with the smallest posting size. + The algorithm focuses on fetching fewer data, including postings and series. + + We need to fetch at least 1 posting group in order to fetch series. So if we only fetch the first posting group, + the data bytes we need to download is formula F1: P1 * 4 + P1 * S where P1 is the number of postings in group 1 + and S is the size per series. 4 is the byte size per posting. + + If we are going to fetch 2 posting groups, we can intersect the two postings to reduce series we need to download (hopefully). + Assuming for each intersection, the series matching ratio is R (0 < R < 1). Then the data bytes we need to download is + formula F2: P1 * 4 + P2 * 4 + P1 * S * R. + We can get formula F3 if we are going to fetch 3 posting groups: + F3: P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2. + + Let's compare formula F2 and F1 first. + P1 * 4 + P2 * 4 + P1 * S * R < P1 * 4 + P1 * S + => P2 * 4 < P1 * S * (1 - R) + Left hand side is the posting group size and right hand side is basically the series size we don't need to fetch + by having the additional intersection. In order to fetch less data for F2 than F1, we just need to ensure that + the additional postings size is smaller. + + Let's compare formula F3 and F2. + P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2 < P1 * 4 + P2 * 4 + P1 * S * R + => P3 * 4 < P1 * S * R * (1 - R) + Same as the previous formula. + + Compare formula F4 (Cost to fetch up to 4 posting groups) and F3. + P4 * 4 < P1 * S * R^2 * (1 - R) + + We can generalize this to formula: Pn * 4 < P1 * S * R^(n - 2) * (1 - R) + + The idea of the algorithm: + By iterating the posting group in sorted order of cardinality, we need to make sure that by fetching the current posting group, + the total data fetched is smaller than the previous posting group. If so, then we continue to next posting group, + otherwise we stop. + + This ensures that when we stop at one posting group, posting groups after it always need to fetch more data. + Based on formula Pn * 4 < P1 * S * R^(n - 2) * (1 - R), left hand side is always increasing while iterating to larger + posting groups while right hand side value is always decreasing as R < 1. + */ + seriesBytesToFetch := postingGroups[0].cardinality * seriesMaxSize + p := float64(1) + i := 1 // Start from index 1 as we always need to fetch the smallest posting group. + hasAdd := !postingGroups[0].addAll + for i < len(postingGroups) { + pg := postingGroups[i] + // Need to fetch more data on postings than series we avoid fetching, stop here and lazy expanding rest of matchers. + // If there is no posting group with add keys, don't skip any posting group until we have one. + // Fetch posting group with addAll is much more expensive due to fetch all postings. + if hasAdd && pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) { + break + } + hasAdd = hasAdd || !pg.addAll + p = p * seriesMatchRatio + i++ + } + for i < len(postingGroups) { + postingGroups[i].lazy = true + lazyExpandedPostingSizeBytes.Add(float64(4 * postingGroups[i].cardinality)) + i++ + } + return postingGroups, false, nil +} + +func fetchLazyExpandedPostings( + ctx context.Context, + postingGroups []*postingGroup, + r *bucketIndexReader, + bytesLimiter BytesLimiter, + addAllPostings bool, + lazyExpandedPostingEnabled bool, + lazyExpandedPostingSizeBytes prometheus.Counter, +) (*lazyExpandedPostings, error) { + var ( + err error + emptyPostingGroup bool + ) + /* + There are several cases that we skip postings fetch optimization: + - Lazy expanded posting disabled. + - Add all postings. This means we don't have a posting group with any add keys. + - Block estimated max series size not set which means we don't have a way to estimate series bytes downloaded. + - `SeriesMaxSize` not set for this block then we have no way to estimate series size. + - Only one effective posting group available. We need to at least download postings from 1 posting group so no need to optimize. + */ + if lazyExpandedPostingEnabled && !addAllPostings && + r.block.estimatedMaxSeriesSize > 0 && len(postingGroups) > 1 { + postingGroups, emptyPostingGroup, err = optimizePostingsFetchByDownloadedBytes( + r, + postingGroups, + int64(r.block.estimatedMaxSeriesSize), + 0.5, // TODO(yeya24): Expose this as a flag. + lazyExpandedPostingSizeBytes, + ) + if err != nil { + return nil, err + } + if emptyPostingGroup { + return emptyLazyPostings, nil + } + } + + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter) + if err != nil { + return nil, err + } + return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil +} + +// keysToFetchFromPostingGroups returns label pairs (postings) to fetch +// and matchers we need to use for lazy posting expansion. +// Input `postingGroups` needs to be ordered by cardinality in case lazy +// expansion is enabled. When we find the first lazy posting group we can exit. +func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) { + var lazyMatchers []*labels.Matcher + keys := make([]labels.Label, 0) + i := 0 + for i < len(postingGroups) { + pg := postingGroups[i] + if pg.lazy { + break + } + + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + i++ + } + if i < len(postingGroups) { + lazyMatchers = make([]*labels.Matcher, 0) + for i < len(postingGroups) { + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + i++ + } + } + return keys, lazyMatchers +} + +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter) ([]storage.SeriesRef, []*labels.Matcher, error) { + keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() + if err != nil { + return nil, nil, errors.Wrap(err, "get postings") + } + + // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys + // again, and this is exactly the same order as before (when building the groups), so we can simply + // use one incrementing index to fetch postings from returned slice. + postingIndex := 0 + + var groupAdds, groupRemovals []index.Postings + for _, g := range postingGroups { + if g.lazy { + break + } + // We cannot add empty set to groupAdds, since they are intersected. + if len(g.addKeys) > 0 { + toMerge := make([]index.Postings, 0, len(g.addKeys)) + for _, l := range g.addKeys { + toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + + groupAdds = append(groupAdds, index.Merge(toMerge...)) + } + + for _, l := range g.removeKeys { + groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + } + + result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) + + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/proxy_heap.go b/vendor/github.com/thanos-io/thanos/pkg/store/proxy_heap.go index d5cc940637..7ea18b134d 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/proxy_heap.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/proxy_heap.go @@ -605,7 +605,8 @@ func newAsyncRespSet( seriesCtx, span, frameTimeout, - st, + st.String(), + st.LabelSets(), closeSeries, cl, shardMatcher, @@ -639,12 +640,14 @@ type eagerRespSet struct { ctx context.Context closeSeries context.CancelFunc - st Client frameTimeout time.Duration shardMatcher *storepb.ShardMatcher removeLabels map[string]struct{} - storeLabels map[string]struct{} + + storeName string + storeLabels map[string]struct{} + storeLabelSets []labels.Labels // Internal bookkeeping. bufferedResponses []*storepb.SeriesResponse @@ -656,7 +659,8 @@ func newEagerRespSet( ctx context.Context, span opentracing.Span, frameTimeout time.Duration, - st Client, + storeName string, + storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, shardMatcher *storepb.ShardMatcher, @@ -666,7 +670,6 @@ func newEagerRespSet( ) respSet { ret := &eagerRespSet{ span: span, - st: st, closeSeries: closeSeries, cl: cl, frameTimeout: frameTimeout, @@ -675,9 +678,11 @@ func newEagerRespSet( wg: &sync.WaitGroup{}, shardMatcher: shardMatcher, removeLabels: removeLabels, + storeName: storeName, + storeLabelSets: storeLabelSets, } ret.storeLabels = make(map[string]struct{}) - for _, ls := range st.LabelSets() { + for _, ls := range storeLabelSets { for _, l := range ls { ret.storeLabels[l.Name] = struct{}{} } @@ -686,7 +691,7 @@ func newEagerRespSet( ret.wg.Add(1) // Start a goroutine and immediately buffer everything. - go func(st Client, l *eagerRespSet) { + go func(l *eagerRespSet) { seriesStats := &storepb.SeriesStatsCounter{} bytesProcessed := 0 @@ -715,7 +720,7 @@ func newEagerRespSet( select { case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) + err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName) l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) l.span.SetTag("err", err.Error()) return false @@ -731,9 +736,9 @@ func newEagerRespSet( // Most likely the per-Recv timeout has been reached. // There's a small race between canceling and the Recv() // but this is most likely true. - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String()) + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) } else { - rerr = errors.Wrapf(err, "receive series from %s", st.String()) + rerr = errors.Wrapf(err, "receive series from %s", storeName) } l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) l.span.SetTag("err", rerr.Error()) @@ -773,7 +778,7 @@ func newEagerRespSet( sortWithoutLabels(l.bufferedResponses, l.removeLabels) } - }(st, ret) + }(ret) return ret } @@ -845,6 +850,7 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] } func (l *eagerRespSet) Close() { + l.closeSeries() l.shardMatcher.Close() } @@ -873,11 +879,11 @@ func (l *eagerRespSet) Empty() bool { } func (l *eagerRespSet) StoreID() string { - return l.st.String() + return l.storeName } func (l *eagerRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.st.LabelSets()) + return labelpb.PromLabelSetsToString(l.storeLabelSets) } func (l *eagerRespSet) StoreLabels() map[string]struct{} { diff --git a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go index bf7e900a9b..9da879de82 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go +++ b/vendor/github.com/thanos-io/thanos/pkg/testutil/e2eutil/prometheus.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -38,8 +39,6 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -446,15 +445,16 @@ func createBlockWithDelay(ctx context.Context, dir string, series []labels.Label return ulid.ULID{}, errors.Wrap(err, "create block id") } - m, err := metadata.ReadFromDir(path.Join(dir, blockID.String())) + bdir := path.Join(dir, blockID.String()) + m, err := metadata.ReadFromDir(bdir) if err != nil { return ulid.ULID{}, errors.Wrap(err, "open meta file") } + logger := log.NewNopLogger() m.ULID = id m.Compaction.Sources = []ulid.ULID{id} - - if err := m.WriteToDir(log.NewNopLogger(), path.Join(dir, blockID.String())); err != nil { + if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil { return ulid.ULID{}, errors.Wrap(err, "write meta.json file") } @@ -555,6 +555,11 @@ func createBlock( } blockDir := filepath.Join(dir, id.String()) + logger := log.NewNopLogger() + seriesSize, err := gatherMaxSeriesSize(filepath.Join(blockDir, "index")) + if err != nil { + return id, errors.Wrap(err, "gather max series size") + } files := []metadata.File{} if hashFunc != metadata.NoneFunc { @@ -581,11 +586,12 @@ func createBlock( } } - if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ + if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, Files: files, + IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize}, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } @@ -599,6 +605,49 @@ func createBlock( return id, nil } +func gatherMaxSeriesSize(fn string) (int64, error) { + r, err := index.NewFileReader(fn) + if err != nil { + return 0, errors.Wrap(err, "open index file") + } + defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") + + p, err := r.Postings(index.AllPostingsKey()) + if err != nil { + return 0, errors.Wrap(err, "get all postings") + } + + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + offsetMultiplier := 1 + version := r.Version() + if version >= 2 { + offsetMultiplier = 16 + } + + // Per series. + var ( + prevId storage.SeriesRef + maxSeriesSize int64 + ) + for p.Next() { + id := p.At() + if prevId != 0 { + // Approximate size. + seriesSize := int64(id-prevId) * int64(offsetMultiplier) + if seriesSize > maxSeriesSize { + maxSeriesSize = seriesSize + } + } + prevId = id + } + if p.Err() != nil { + return 0, errors.Wrap(err, "walk postings") + } + + return maxSeriesSize, nil +} + var indexFilename = "index" type indexWriterSeries struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index f34cdeec90..a4f3202b16 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -908,7 +908,7 @@ github.com/thanos-io/promql-engine/logicalplan github.com/thanos-io/promql-engine/parser github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/worker -# github.com/thanos-io/thanos v0.32.1-0.20230831143954-f75e44ac929c +# github.com/thanos-io/thanos v0.32.3-0.20230911095949-f6a39507b6bd ## explicit; go 1.18 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader