diff --git a/CHANGELOG.md b/CHANGELOG.md index 429fae9183..7630e0d6f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [FEATURE] Ruler: Add support for disabling rule groups. #5521 * [FEATURE] Added the flag `-alertmanager.alerts-gc-interval` to configure alert manager alerts Garbage collection interval. #5550 * [FEATURE] Ruler: Add support for Limit field on RuleGroup. #5528 * [FEATURE] AlertManager: Add support for Webex, Discord and Telegram Receiver. #5493 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index ff64668a33..9f7bd445aa 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3097,6 +3097,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # alerts will fail with a log message and metric increment. 0 = no limit. # CLI flag: -alertmanager.max-alerts-size-bytes [alertmanager_max_alerts_size_bytes: | default = 0] + +# list of rule groups to disable +[disabled_rule_groups: | default = ] ``` ### `memberlist_config` diff --git a/integration/ruler_test.go b/integration/ruler_test.go index 9413e75c95..f8de7223d0 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -4,6 +4,7 @@ package integration import ( + "bytes" "context" "crypto/x509" "crypto/x509/pkix" @@ -29,6 +30,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore/providers/s3" "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/integration/ca" @@ -915,6 +917,127 @@ func TestRulerMetricsWhenIngesterFails(t *testing.T) { }) } +func TestRulerDisablesRuleGroups(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + const blockRangePeriod = 2 * time.Second + // Configure the ruler. + flags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(), + map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "false", + // Evaluate rules often, so that we don't need to wait for metrics to show up. + "-ruler.evaluation-interval": "2s", + "-ruler.poll-interval": "2s", + // No delay + "-ruler.evaluation-delay-duration": "0", + + // We run single ingester only, no replication. + "-distributor.replication-factor": "1", + + // Very low limit so that ruler hits it. + "-querier.max-fetched-chunks-per-query": "15", + "-querier.query-store-after": (1 * time.Second).String(), + "-querier.query-ingesters-within": (2 * time.Second).String(), + }, + ) + + const namespace = "test" + const user = "user" + configFileName := "runtime-config.yaml" + bucketName := "cortex" + + storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(), + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": bucketName, + "-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-runtime-config.s3.insecure": "true", + "-runtime-config.file": configFileName, + "-runtime-config.reload-period": "2s", + }) + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: bucketName, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "runtime-config-test") + + require.NoError(t, err) + + // update runtime config + newRuntimeConfig := []byte(`overrides: + user: + disabled_rule_groups: + - name: bad_rule + namespace: test`) + require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig))) + time.Sleep(2 * time.Second) + + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "") + + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway)) + + // Wait until both the distributor and ruler have updated the ring. The querier will also watch + // the store-gateway ring if blocks sharding is enabled. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + expression := "absent(sum_over_time(metric{}[2s] offset 1h))" + + t.Run("disable_rule_group", func(t *testing.T) { + + ruleGroup := ruleGroupWithRule("bad_rule", "rule", expression) + ruleGroup.Interval = 2 + require.NoError(t, c.SetRuleGroup(ruleGroup, namespace)) + + ruleGroup = ruleGroupWithRule("good_rule", "rule", expression) + ruleGroup.Interval = 2 + require.NoError(t, c.SetRuleGroup(ruleGroup, namespace)) + + m1 := ruleGroupMatcher(user, namespace, "good_rule") + + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_sync_rules_total"}, e2e.WaitMissingMetrics)) + + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m1), e2e.WaitMissingMetrics)) + + filter := e2ecortex.RuleFilter{} + actualGroups, err := c.GetPrometheusRules(filter) + require.NoError(t, err) + assert.Equal(t, 1, len(actualGroups)) + assert.Equal(t, "good_rule", actualGroups[0].Name) + assert.Equal(t, "test", actualGroups[0].File) + }) +} + func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher { return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName)) } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 879f3f82e3..e8e597f515 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -5,6 +5,8 @@ import ( "errors" "time" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -142,6 +144,7 @@ type RulesLimits interface { RulerTenantShardSize(userID string) int RulerMaxRuleGroupsPerTenant(userID string) int RulerMaxRulesPerRuleGroup(userID string) int + DisabledRuleGroups(userID string) validation.DisabledRuleGroups } // EngineQueryFunc returns a new engine query function by passing an altered timestamp. diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ae5089a1b4..e90c9b7e2c 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -71,6 +71,14 @@ const ( recordingRuleFilter string = "record" ) +type DisabledRuleGroupErr struct { + Message string +} + +func (e *DisabledRuleGroupErr) Error() string { + return e.Message +} + // Config is the configuration for the recording rules server. type Config struct { // This is used for template expansion in alerts; must be a valid URL. @@ -400,6 +408,17 @@ func SendAlerts(n sender, externalURL string) promRules.NotifyFunc { } } +func ruleGroupDisabled(ruleGroup *rulespb.RuleGroupDesc, disabledRuleGroupsForUser validation.DisabledRuleGroups) bool { + for _, disabledRuleGroupForUser := range disabledRuleGroupsForUser { + if ruleGroup.Namespace == disabledRuleGroupForUser.Namespace && + ruleGroup.Name == disabledRuleGroupForUser.Name && + ruleGroup.User == disabledRuleGroupForUser.User { + return true + } + } + return false +} + var sep = []byte("/") func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 { @@ -415,7 +434,8 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 { return ringHasher.Sum32() } -func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAddr string) (bool, error) { +func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) { + hash := tokenForGroup(g) rlrs, err := r.Get(hash, RingOp, nil, nil, nil) @@ -423,7 +443,12 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAd return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } - return rlrs.Instances[0].Addr == instanceAddr, nil + ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr + if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { + return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} + } + + return ownsRuleGroup, nil } func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -533,7 +558,26 @@ func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGr } func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { - return r.store.ListAllRuleGroups(ctx) + allRuleGroups, err := r.store.ListAllRuleGroups(ctx) + if err != nil { + return nil, err + } + for userID, groups := range allRuleGroups { + disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID) + if len(disabledRuleGroupsForUser) == 0 { + continue + } + filteredGroupsForUser := rulespb.RuleGroupList{} + for _, group := range groups { + if !ruleGroupDisabled(group, disabledRuleGroupsForUser) { + filteredGroupsForUser = append(filteredGroupsForUser, group) + } else { + level.Info(r.logger).Log("msg", "rule group disabled", "name", group.Name, "namespace", group.Namespace, "user", group.User) + } + } + allRuleGroups[userID] = filteredGroupsForUser + } + return allRuleGroups, nil } func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { @@ -544,7 +588,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp filteredConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { - filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) if len(filtered) > 0 { filteredConfigs[userID] = filtered } @@ -602,7 +646,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } - filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) if len(filtered) == 0 { continue } @@ -624,15 +668,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp // // Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, // but only ring passed as parameter. -func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { +func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { // Prune the rule group to only contain rules that this ruler is responsible for, based on ring. var result []*rulespb.RuleGroupDesc for _, g := range ruleGroups { - owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr) + owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr) if err != nil { - ringCheckErrors.Inc() - level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) - continue + switch e := err.(type) { + case *DisabledRuleGroupErr: + level.Info(log).Log("msg", e.Message) + continue + default: + ringCheckErrors.Inc() + level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + continue + } } if owned { diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index c8034ee62f..4d89b9d693 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -85,6 +85,7 @@ type ruleLimits struct { tenantShard int maxRulesPerRuleGroup int maxRuleGroups int + disabledRuleGroups validation.DisabledRuleGroups } func (r ruleLimits) EvaluationDelay(_ string) time.Duration { @@ -103,6 +104,10 @@ func (r ruleLimits) RulerMaxRulesPerRuleGroup(_ string) int { return r.maxRulesPerRuleGroup } +func (r ruleLimits) DisabledRuleGroups(userID string) validation.DisabledRuleGroups { + return r.disabledRuleGroups +} + func newEmptyQueryable() storage.Queryable { return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return emptyQuerier{}, nil @@ -1481,3 +1486,245 @@ func TestRecoverAlertsPostOutage(t *testing.T) { require.Equal(t, promRules.StateFiring, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) } + +func TestRulerDisablesRuleGroups(t *testing.T) { + const ( + ruler1 = "ruler-1" + ruler1Host = "1.1.1.1" + ruler1Port = 9999 + ruler1Addr = "1.1.1.1:9999" + + ruler2 = "ruler-2" + ruler2Host = "2.2.2.2" + ruler2Port = 9999 + ruler2Addr = "2.2.2.2:9999" + + ruler3 = "ruler-3" + ruler3Host = "3.3.3.3" + ruler3Port = 9999 + ruler3Addr = "3.3.3.3:9999" + ) + const ( + user1 = "user1" + user2 = "user2" + user3 = "user3" + ) + + user1Group1 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace1", Name: "group1"} + user1Group2 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace1", Name: "group2"} + user2Group1 := &rulespb.RuleGroupDesc{User: user2, Namespace: "namespace1", Name: "group1"} + user3Group1 := &rulespb.RuleGroupDesc{User: user3, Namespace: "namespace1", Name: "group1"} + + user1Group1Token := tokenForGroup(user1Group1) + user1Group2Token := tokenForGroup(user1Group2) + user2Group1Token := tokenForGroup(user2Group1) + user3Group1Token := tokenForGroup(user3Group1) + + d := &querier.MockDistributor{} + d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + model.Matrix{ + &model.SampleStream{ + Values: []model.SamplePair{}, + }, + }, nil) + querierConfig := querier.DefaultQuerierConfig() + querierConfig.IngesterStreaming = false + + ruleGroupDesc := func(user, name, namespace string) *rulespb.RuleGroupDesc { + return &rulespb.RuleGroupDesc{ + Name: name, + Namespace: namespace, + User: user, + } + } + + ruleGroupWithRule := func(expr, user, name, namespace string) *rulespb.RuleGroupDesc { + rg := ruleGroupDesc(user, name, namespace) + rg.Rules = []*rulespb.RuleDesc{ + { + Record: "RecordingRule", + Expr: expr, + }, + } + return rg + } + + disabledRuleGroups := validation.DisabledRuleGroups{ + validation.DisabledRuleGroup{ + Namespace: "namespace1", + Name: "group1", + User: "user1", + }, + } + + for _, tc := range []struct { + name string + rules map[string]rulespb.RuleGroupList + expectedRuleGroupsForUser map[string]rulespb.RuleGroupList + sharding bool + shardingStrategy string + setupRing func(*ring.Desc) + disabledRuleGroups validation.DisabledRuleGroups + }{ + { + name: "disables rule group - shuffle sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: true, + shardingStrategy: util.ShardingStrategyShuffle, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + disabledRuleGroups: disabledRuleGroups, + }, + { + name: "disables rule group - no sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: false, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + disabledRuleGroups: disabledRuleGroups, + }, + { + name: "disables rule group - default sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + disabledRuleGroups: disabledRuleGroups, + }, + { + name: "disables rule group - default sharding", + rules: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupWithRule("up[240m:1s]", "user1", "group1", "namespace1")}, + "user2": {ruleGroupWithRule("up[240m:1s]", "user2", "group1", "namespace1")}, + }, + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + expectedRuleGroupsForUser: map[string]rulespb.RuleGroupList{ + "user1": {ruleGroupDesc("user1", "group1", "namespace1")}, + "user2": {ruleGroupDesc("user2", "group1", "namespace1")}, + }, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler { + store := newMockRuleStore(tc.rules) + cfg := Config{ + EnableSharding: tc.sharding, + ShardingStrategy: tc.shardingStrategy, + Ring: RingConfig{ + InstanceID: id, + InstanceAddr: host, + InstancePort: port, + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 1 * time.Minute, + }, + FlushCheckPeriod: 0, + } + + r := buildRuler(t, cfg, nil, store, nil) + r.limits = ruleLimits{evalDelay: 0, tenantShard: 3, disabledRuleGroups: tc.disabledRuleGroups} + + if forceRing != nil { + r.ring = forceRing + } + return r + } + + r1 := setupRuler(ruler1, ruler1Host, ruler1Port, nil) + + rulerRing := r1.ring + + // We start ruler's ring, but nothing else (not even lifecycler). + if rulerRing != nil { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), rulerRing)) + t.Cleanup(rulerRing.StopAsync) + } + + var r2, r3 *Ruler + if rulerRing != nil { + // Reuse ring from r1. + r2 = setupRuler(ruler2, ruler2Host, ruler2Port, rulerRing) + r3 = setupRuler(ruler3, ruler3Host, ruler3Port, rulerRing) + } + + if tc.setupRing != nil { + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + + tc.setupRing(d) + + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + } + + actualRules := map[string]rulespb.RuleGroupList{} + loadedRules, err := r1.listRules(context.Background()) + require.NoError(t, err) + for k, v := range loadedRules { + if len(v) > 0 { + actualRules[k] = v + } + } + + fetchRules := func(id string, r *Ruler) { + // Only expect rules from other rulers when using ring, and they are present in the ring. + if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { + loaded, err := r.listRules(context.Background()) + require.NoError(t, err) + + // Normalize nil map to empty one. + if loaded == nil { + loaded = map[string]rulespb.RuleGroupList{} + } + for k, v := range loaded { + actualRules[k] = v + } + } + } + + fetchRules(ruler2, r2) + fetchRules(ruler3, r3) + + require.Equal(t, tc.expectedRuleGroupsForUser, actualRules) + }) + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 1c690d6992..fe46c6b561 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -38,6 +38,14 @@ func (e LimitError) Error() string { return string(e) } +type DisabledRuleGroup struct { + Namespace string `yaml:"namespace"` + Name string `yaml:"name"` + User string `yaml:"-"` +} + +type DisabledRuleGroups []DisabledRuleGroup + // Limits describe all the limits for users; can be used to describe global default // limits via flags, or per-user limits via yaml config. type Limits struct { @@ -122,12 +130,13 @@ type Limits struct { NotificationRateLimit float64 `yaml:"alertmanager_notification_rate_limit" json:"alertmanager_notification_rate_limit"` NotificationRateLimitPerIntegration NotificationRateLimitMap `yaml:"alertmanager_notification_rate_limit_per_integration" json:"alertmanager_notification_rate_limit_per_integration"` - AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"` - AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"` - AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"` - AlertmanagerMaxDispatcherAggregationGroups int `yaml:"alertmanager_max_dispatcher_aggregation_groups" json:"alertmanager_max_dispatcher_aggregation_groups"` - AlertmanagerMaxAlertsCount int `yaml:"alertmanager_max_alerts_count" json:"alertmanager_max_alerts_count"` - AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"` + AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"` + AlertmanagerMaxTemplatesCount int `yaml:"alertmanager_max_templates_count" json:"alertmanager_max_templates_count"` + AlertmanagerMaxTemplateSizeBytes int `yaml:"alertmanager_max_template_size_bytes" json:"alertmanager_max_template_size_bytes"` + AlertmanagerMaxDispatcherAggregationGroups int `yaml:"alertmanager_max_dispatcher_aggregation_groups" json:"alertmanager_max_dispatcher_aggregation_groups"` + AlertmanagerMaxAlertsCount int `yaml:"alertmanager_max_alerts_count" json:"alertmanager_max_alerts_count"` + AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"` + DisabledRuleGroups DisabledRuleGroups `yaml:"disabled_rule_groups" json:"disabled_rule_groups" doc:"nocli|description=list of rule groups to disable"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -667,6 +676,26 @@ func (o *Overrides) AlertmanagerMaxAlertsSizeBytes(userID string) int { return o.GetOverridesForUser(userID).AlertmanagerMaxAlertsSizeBytes } +func (o *Overrides) DisabledRuleGroups(userID string) DisabledRuleGroups { + if o.tenantLimits != nil { + l := o.tenantLimits.ByUserID(userID) + if l != nil { + disabledRuleGroupsForUser := make(DisabledRuleGroups, len(l.DisabledRuleGroups)) + + for i, disabledRuleGroup := range l.DisabledRuleGroups { + disabledRuleGroupForUser := DisabledRuleGroup{ + Namespace: disabledRuleGroup.Namespace, + Name: disabledRuleGroup.Name, + User: userID, + } + disabledRuleGroupsForUser[i] = disabledRuleGroupForUser + } + return disabledRuleGroupsForUser + } + } + return DisabledRuleGroups{} +} + // GetOverridesForUser returns the per-tenant limits with overrides. func (o *Overrides) GetOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index a8fb2133b2..a203259357 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -259,6 +259,8 @@ func getFieldType(t reflect.Type) (string, error) { return "relabel_config...", nil case "labels.Labels": return "map of string to string", nil + case "validation.DisabledRuleGroups": + return "list of rule groups to disable", nil } // Fallback to auto-detection of built-in data types