Skip to content

Commit

Permalink
disable rule groups
Browse files Browse the repository at this point in the history
Signed-off-by: Anand Rajagopal <[email protected]>
  • Loading branch information
rajagopalanand committed Sep 8, 2023
1 parent 94192ff commit 6fa6f92
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [FEATURE] Ruler: Add support for disabling rule groups. #5521
* [FEATURE] Added the flag `-alertmanager.alerts-gc-interval` to configure alert manager alerts Garbage collection interval. #5550
* [FEATURE] Ruler: Add support for Limit field on RuleGroup. #5528
* [FEATURE] AlertManager: Add support for Webex, Discord and Telegram Receiver. #5493
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3097,6 +3097,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# alerts will fail with a log message and metric increment. 0 = no limit.
# CLI flag: -alertmanager.max-alerts-size-bytes
[alertmanager_max_alerts_size_bytes: <int> | default = 0]

# list of rule groups to disable
[disabled_rule_groups: <list of rule groups to disable> | default = ]
```
### `memberlist_config`
Expand Down
123 changes: 123 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integration

import (
"bytes"
"context"
"crypto/x509"
"crypto/x509/pkix"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore/providers/s3"
"gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/integration/ca"
Expand Down Expand Up @@ -915,6 +917,127 @@ func TestRulerMetricsWhenIngesterFails(t *testing.T) {
})
}

func TestRulerDisablesRuleGroups(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

const blockRangePeriod = 2 * time.Second
// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),

// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "15",
"-querier.query-store-after": (1 * time.Second).String(),
"-querier.query-ingesters-within": (2 * time.Second).String(),
},
)

const namespace = "test"
const user = "user"
configFileName := "runtime-config.yaml"
bucketName := "cortex"

storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

flags = mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
"-runtime-config.backend": "s3",
"-runtime-config.s3.access-key-id": e2edb.MinioAccessKey,
"-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey,
"-runtime-config.s3.bucket-name": bucketName,
"-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-runtime-config.s3.insecure": "true",
"-runtime-config.file": configFileName,
"-runtime-config.reload-period": "2s",
})

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

client, err := s3.NewBucketWithConfig(nil, s3.Config{
Endpoint: minio.HTTPEndpoint(),
Insecure: true,
Bucket: bucketName,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
}, "runtime-config-test")

require.NoError(t, err)

// update runtime config
newRuntimeConfig := []byte(`overrides:
user:
disabled_rule_groups:
- name: bad_rule
namespace: test`)
require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig)))
time.Sleep(2 * time.Second)

ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")

ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "absent(sum_over_time(metric{}[2s] offset 1h))"

t.Run("disable_rule_group", func(t *testing.T) {

ruleGroup := ruleGroupWithRule("bad_rule", "rule", expression)
ruleGroup.Interval = 2
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))

ruleGroup = ruleGroupWithRule("good_rule", "rule", expression)
ruleGroup.Interval = 2
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))

m1 := ruleGroupMatcher(user, namespace, "good_rule")

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_sync_rules_total"}, e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m1), e2e.WaitMissingMetrics))

filter := e2ecortex.RuleFilter{}
actualGroups, err := c.GetPrometheusRules(filter)
require.NoError(t, err)
assert.Equal(t, 1, len(actualGroups))
assert.Equal(t, "good_rule", actualGroups[0].Name)
assert.Equal(t, "test", actualGroups[0].File)
})
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"time"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -142,6 +144,7 @@ type RulesLimits interface {
RulerTenantShardSize(userID string) int
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
}

// EngineQueryFunc returns a new engine query function by passing an altered timestamp.
Expand Down
70 changes: 60 additions & 10 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ const (
recordingRuleFilter string = "record"
)

type DisabledRuleGroupErr struct {
Message string
}

func (e *DisabledRuleGroupErr) Error() string {
return e.Message
}

// Config is the configuration for the recording rules server.
type Config struct {
// This is used for template expansion in alerts; must be a valid URL.
Expand Down Expand Up @@ -400,6 +408,17 @@ func SendAlerts(n sender, externalURL string) promRules.NotifyFunc {
}
}

func ruleGroupDisabled(ruleGroup *rulespb.RuleGroupDesc, disabledRuleGroupsForUser validation.DisabledRuleGroups) bool {
for _, disabledRuleGroupForUser := range disabledRuleGroupsForUser {
if ruleGroup.Namespace == disabledRuleGroupForUser.Namespace &&
ruleGroup.Name == disabledRuleGroupForUser.Name &&
ruleGroup.User == disabledRuleGroupForUser.User {
return true
}
}
return false
}

var sep = []byte("/")

func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
Expand All @@ -415,15 +434,21 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
return ringHasher.Sum32()
}

func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAddr string) (bool, error) {
func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) {

hash := tokenForGroup(g)

rlrs, err := r.Get(hash, RingOp, nil, nil, nil)
if err != nil {
return false, errors.Wrap(err, "error reading ring to verify rule group ownership")
}

return rlrs.Instances[0].Addr == instanceAddr, nil
ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr
if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) {
return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)}
}

return ownsRuleGroup, nil
}

func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -533,7 +558,26 @@ func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGr
}

func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
return r.store.ListAllRuleGroups(ctx)
allRuleGroups, err := r.store.ListAllRuleGroups(ctx)
if err != nil {
return nil, err
}
for userID, groups := range allRuleGroups {
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
if len(disabledRuleGroupsForUser) == 0 {
continue
}
filteredGroupsForUser := rulespb.RuleGroupList{}
for _, group := range groups {
if !ruleGroupDisabled(group, disabledRuleGroupsForUser) {
filteredGroupsForUser = append(filteredGroupsForUser, group)
} else {
level.Info(r.logger).Log("msg", "rule group disabled", "name", group.Name, "namespace", group.Namespace, "user", group.User)
}
}
allRuleGroups[userID] = filteredGroupsForUser
}
return allRuleGroups, nil
}

func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
Expand All @@ -544,7 +588,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp

filteredConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) > 0 {
filteredConfigs[userID] = filtered
}
Expand Down Expand Up @@ -602,7 +646,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}

filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) == 0 {
continue
}
Expand All @@ -624,15 +668,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
//
// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring,
// but only ring passed as parameter.
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
// Prune the rule group to only contain rules that this ruler is responsible for, based on ring.
var result []*rulespb.RuleGroupDesc
for _, g := range ruleGroups {
owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr)
owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr)
if err != nil {
ringCheckErrors.Inc()
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
continue
switch e := err.(type) {
case *DisabledRuleGroupErr:
level.Info(log).Log("msg", e.Message)
continue
default:
ringCheckErrors.Inc()
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
continue
}
}

if owned {
Expand Down
Loading

0 comments on commit 6fa6f92

Please sign in to comment.