diff --git a/CHANGELOG.md b/CHANGELOG.md index 07327dab21..9d3827e883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869 ## 1.18.0 in progress diff --git a/integration/ruler_test.go b/integration/ruler_test.go index c05fd2ceb4..d56bb696aa 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -267,6 +267,9 @@ func TestRulerSharding(t *testing.T) { // between the two rulers. require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules")) require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules")) + // Even with rules sharded, we expect rulers to have the same cortex_ruler_rule_groups_in_store metric values + require.NoError(t, ruler1.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store")) + require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store")) // Fetch the rules and ensure they match the configured ones. actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter) diff --git a/pkg/ruler/manager_metrics.go b/pkg/ruler/manager_metrics.go index 130387407c..ef211ddd89 100644 --- a/pkg/ruler/manager_metrics.go +++ b/pkg/ruler/manager_metrics.go @@ -271,3 +271,39 @@ func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) { m.RulerQuerySeconds.DeleteLabelValues(userID) } } + +type RuleGroupMetrics struct { + RuleGroupsInStore *prometheus.GaugeVec + tenants map[string]struct{} + allowedTenants *util.AllowedTenants +} + +func NewRuleGroupMetrics(reg prometheus.Registerer, allowedTenants *util.AllowedTenants) *RuleGroupMetrics { + m := &RuleGroupMetrics{ + RuleGroupsInStore: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ruler_rule_groups_in_store", + Help: "The number of rule groups a tenant has in store.", + }, []string{"user"}), + allowedTenants: allowedTenants, + } + return m +} + +// UpdateRuleGroupsInStore updates the cortex_ruler_rule_groups_in_store metric with the provided number of rule +// groups per tenant and removing the metrics for tenants that are not present anymore +func (r *RuleGroupMetrics) UpdateRuleGroupsInStore(ruleGroupsCount map[string]int) { + tenants := make(map[string]struct{}, len(ruleGroupsCount)) + for userID, count := range ruleGroupsCount { + if !r.allowedTenants.IsAllowed(userID) { // if the tenant is disabled just ignore its rule groups + continue + } + tenants[userID] = struct{}{} + r.RuleGroupsInStore.WithLabelValues(userID).Set(float64(count)) + } + for userID := range r.tenants { + if _, ok := tenants[userID]; !ok { + r.RuleGroupsInStore.DeleteLabelValues(userID) + } + } + r.tenants = tenants +} diff --git a/pkg/ruler/manager_metrics_test.go b/pkg/ruler/manager_metrics_test.go index 4b851ef192..60b68452ca 100644 --- a/pkg/ruler/manager_metrics_test.go +++ b/pkg/ruler/manager_metrics_test.go @@ -595,3 +595,41 @@ func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) { require.Contains(t, mfm[name].String(), "value:\"fake2\"") } } + +func TestRuleGroupMetrics(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + m := NewRuleGroupMetrics(reg, util.NewAllowedTenants(nil, []string{"fake3"})) + m.UpdateRuleGroupsInStore(map[string]int{ + "fake1": 10, + "fake2": 20, + "fake3": 30, + }) + gm, err := reg.Gather() + require.NoError(t, err) + mfm, err := util.NewMetricFamilyMap(gm) + require.NoError(t, err) + require.Equal(t, 2, len(mfm["cortex_ruler_rule_groups_in_store"].Metric)) + requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{ + "user": "fake1", + }, float64(10)) + requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[1], map[string]string{ + "user": "fake2", + }, float64(20)) + m.UpdateRuleGroupsInStore(map[string]int{ + "fake2": 30, + }) + gm, err = reg.Gather() + require.NoError(t, err) + mfm, err = util.NewMetricFamilyMap(gm) + require.NoError(t, err) + require.Equal(t, 1, len(mfm["cortex_ruler_rule_groups_in_store"].Metric)) + requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{ + "user": "fake2", + }, float64(30)) + m.UpdateRuleGroupsInStore(make(map[string]int)) + gm, err = reg.Gather() + require.NoError(t, err) + mfm, err = util.NewMetricFamilyMap(gm) + require.NoError(t, err) + require.Nil(t, mfm["cortex_ruler_rule_groups_in_store"]) +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index d47e4320c8..4f121570cc 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -294,6 +294,7 @@ type Ruler struct { ruleGroupStoreLoadDuration prometheus.Gauge ruleGroupSyncDuration prometheus.Gauge rulerGetRulesFailures *prometheus.CounterVec + ruleGroupMetrics *RuleGroupMetrics allowedTenants *util.AllowedTenants @@ -342,6 +343,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, Help: "The total number of failed rules request sent to rulers in getShardedRules.", }, []string{"ruler"}), } + ruler.ruleGroupMetrics = NewRuleGroupMetrics(reg, ruler.allowedTenants) if len(cfg.EnabledTenants) > 0 { level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", ")) @@ -667,7 +669,9 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul if err != nil { return nil, nil, err } + ruleGroupCounts := make(map[string]int, len(allRuleGroups)) for userID, groups := range allRuleGroups { + ruleGroupCounts[userID] = len(groups) disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID) if len(disabledRuleGroupsForUser) == 0 { continue @@ -682,6 +686,7 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul } allRuleGroups[userID] = filteredGroupsForUser } + r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts) return allRuleGroups, nil, nil } @@ -691,9 +696,11 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp return nil, nil, err } + ruleGroupCounts := make(map[string]int, len(configs)) ownedConfigs := make(map[string]rulespb.RuleGroupList) backedUpConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { + ruleGroupCounts[userID] = len(groups) owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) if len(owned) > 0 { ownedConfigs[userID] = owned @@ -705,6 +712,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp } } } + r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts) return ownedConfigs, backedUpConfigs, nil } @@ -732,6 +740,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } if len(userRings) == 0 { + r.ruleGroupMetrics.UpdateRuleGroupsInStore(make(map[string]int)) return nil, nil, nil } @@ -744,6 +753,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp mu := sync.Mutex{} owned := map[string]rulespb.RuleGroupList{} backedUp := map[string]rulespb.RuleGroupList{} + gLock := sync.Mutex{} + ruleGroupCounts := make(map[string]int, len(userRings)) concurrency := loadRulesConcurrency if len(userRings) < concurrency { @@ -758,6 +769,9 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp if err != nil { return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } + gLock.Lock() + ruleGroupCounts[userID] = len(groups) + gLock.Unlock() filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) var filterBackup []*rulespb.RuleGroupDesc @@ -781,6 +795,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } err = g.Wait() + r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts) return owned, backedUp, err } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index c0d9e43804..af338443ca 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -247,10 +247,23 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler { ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler)) + rgs, err := store.ListAllRuleGroups(context.Background()) + require.NoError(t, err) - // Ensure all rules are loaded before usage - ruler.syncRules(context.Background(), rulerSyncReasonInitial) - + // Wait to ensure syncRules has finished and all rules are loaded before usage + deadline := time.Now().Add(3 * time.Second) + for { + loaded := true + for tenantId := range rgs { + if len(ruler.manager.GetRules(tenantId)) == 0 { + loaded = false + } + } + if time.Now().After(deadline) || loaded { + break + } + time.Sleep(50 * time.Millisecond) + } return ruler }