Skip to content

Commit

Permalink
Ruler: Add support for per-user external labels
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
  • Loading branch information
damnever committed Nov 25, 2024
1 parent 24efa2b commit 50d0a69
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3614,6 +3614,9 @@ query_rejection:

# list of rule groups to disable
[disabled_rule_groups: <list of DisabledRuleGroup> | default = []]

# external labels for alerting rules
[external_labels: <map of string (labelName) to string (labelValue)> | default = []]
```
### `memberlist_config`
Expand Down
7 changes: 4 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
// no need to initialize module if load path is empty
return nil, nil
}
t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig
runtimeConfigLoader := runtimeConfigLoader{cfg: t.Cfg}
t.Cfg.RuntimeConfig.Loader = runtimeConfigLoader.load

// make sure to set default limits before we start loading configuration into memory
validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig)
Expand Down Expand Up @@ -612,14 +613,14 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
}

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
} else {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
}

if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func (l *runtimeConfigTenantLimits) AllByUserID() map[string]*validation.Limits
return nil
}

func loadRuntimeConfig(r io.Reader) (interface{}, error) {
type runtimeConfigLoader struct {
cfg Config
}

func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
var overrides = &RuntimeConfigValues{}

decoder := yaml.NewDecoder(r)
Expand All @@ -74,6 +78,12 @@ func loadRuntimeConfig(r io.Reader) (interface{}, error) {
return nil, errMultipleDocuments
}

for _, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels); err != nil {
return nil, err
}
}

return overrides, nil
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/cortex/runtime_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand All @@ -28,7 +29,8 @@ overrides:
'1235': *id001
'1236': *id001
`)
runtimeCfg, err := loadRuntimeConfig(yamlFile)
loader := runtimeConfigLoader{cfg: Config{Distributor: distributor.Config{ShardByAllLabels: true}}}
runtimeCfg, err := loader.load(yamlFile)
require.NoError(t, err)

limits := validation.Limits{
Expand All @@ -51,7 +53,7 @@ func TestLoadRuntimeConfig_ShouldLoadEmptyFile(t *testing.T) {
yamlFile := strings.NewReader(`
# This is an empty YAML.
`)
actual, err := loadRuntimeConfig(yamlFile)
actual, err := runtimeConfigLoader{}.load(yamlFile)
require.NoError(t, err)
assert.Equal(t, &RuntimeConfigValues{}, actual)
}
Expand All @@ -60,7 +62,7 @@ func TestLoadRuntimeConfig_MissingPointerFieldsAreNil(t *testing.T) {
yamlFile := strings.NewReader(`
# This is an empty YAML.
`)
actual, err := loadRuntimeConfig(yamlFile)
actual, err := runtimeConfigLoader{}.load(yamlFile)
require.NoError(t, err)

actualCfg, ok := actual.(*RuntimeConfigValues)
Expand Down Expand Up @@ -102,7 +104,7 @@ overrides:
}

for _, tc := range cases {
actual, err := loadRuntimeConfig(strings.NewReader(tc))
actual, err := runtimeConfigLoader{}.load(strings.NewReader(tc))
assert.Equal(t, errMultipleDocuments, err)
assert.Nil(t, actual)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type RulesLimits interface {
RulerMaxRulesPerRuleGroup(userID string) int
RulerQueryOffset(userID string) time.Duration
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
ExternalLabels(userID string) labels.Labels
}

// EngineQueryFunc returns a new engine query function validating max queryLength.
Expand Down
68 changes: 68 additions & 0 deletions pkg/ruler/external_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ruler

import (
"sync"

"github.com/prometheus/prometheus/model/labels"
)

// userExternalLabels checks and merges per-user external labels with global external labels.
type userExternalLabels struct {
global labels.Labels
limits RulesLimits
builder *labels.Builder

mtx sync.Mutex
users map[string]labels.Labels
}

func newUserExternalLabels(global labels.Labels, limits RulesLimits) *userExternalLabels {
return &userExternalLabels{
global: global,
limits: limits,
builder: labels.NewBuilder(nil),

mtx: sync.Mutex{},
users: map[string]labels.Labels{},
}
}

func (e *userExternalLabels) get(userID string) (labels.Labels, bool) {
e.mtx.Lock()
defer e.mtx.Unlock()
lset, ok := e.users[userID]
return lset, ok
}

func (e *userExternalLabels) update(userID string) (labels.Labels, bool) {
lset := e.limits.ExternalLabels(userID)

e.mtx.Lock()
defer e.mtx.Unlock()

e.builder.Reset(e.global)
for _, l := range lset {
e.builder.Set(l.Name, l.Value)
}
lset = e.builder.Labels()

if !labels.Equal(e.users[userID], lset) {
e.users[userID] = lset
return lset, true
}
return lset, false
}

func (e *userExternalLabels) remove(user string) {
e.mtx.Lock()
defer e.mtx.Unlock()
delete(e.users, user)
}

func (e *userExternalLabels) cleanup() {
e.mtx.Lock()
defer e.mtx.Unlock()
for user := range e.users {
delete(e.users, user)
}
}
38 changes: 33 additions & 5 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/notifier"
promRules "github.com/prometheus/prometheus/rules"
Expand Down Expand Up @@ -47,6 +48,9 @@ type DefaultMultiTenantManager struct {
notifiers map[string]*rulerNotifier
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics

// Per-user externalLabels.
userExternalLabels *userExternalLabels

// rules backup
rulesBackupManager *rulesBackupManager

Expand All @@ -62,7 +66,7 @@ type DefaultMultiTenantManager struct {
syncRuleMtx sync.Mutex
}

func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
ncfg, err := buildNotifierConfig(&cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -92,6 +96,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
frontendPool: newFrontendPool(cfg, logger, reg),
ruleEvalMetrics: evalMetrics,
notifiers: map[string]*rulerNotifier{},
userExternalLabels: newUserExternalLabels(cfg.ExternalLabels, limits),
notifiersDiscoveryMetrics: notifiersDiscoveryMetrics,
mapper: newMapper(cfg.RulePath, logger),
userManagers: map[string]RulesManager{},
Expand Down Expand Up @@ -146,6 +151,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou

r.removeNotifier(userID)
r.mapper.cleanupUser(userID)
r.userExternalLabels.remove(userID)
r.lastReloadSuccessful.DeleteLabelValues(userID)
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
r.configUpdatesTotal.DeleteLabelValues(userID)
Expand Down Expand Up @@ -183,12 +189,13 @@ func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGr
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
// Map the files to disk and return the file names to be passed to the users manager if they
// have been updated
update, files, err := r.mapper.MapRules(user, groups.Formatted())
rulesUpdated, files, err := r.mapper.MapRules(user, groups.Formatted())
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to map rule files", "user", user, "err", err)
return
}
externalLabels, externalLabelsUpdated := r.userExternalLabels.update(user)

existing := true
manager := r.getRulesManager(user, ctx)
Expand All @@ -201,19 +208,26 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
return
}

if !existing || update {
if !existing || rulesUpdated || externalLabelsUpdated {
level.Debug(r.logger).Log("msg", "updating rules", "user", user)
r.configUpdatesTotal.WithLabelValues(user).Inc()
if update && existing {
if rulesUpdated && existing {
r.updateRuleCache(user, manager.RuleGroups())
}
err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
err = manager.Update(r.cfg.EvaluationInterval, files, externalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
r.deleteRuleCache(user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err)
return
}
if externalLabelsUpdated {
if err = r.notifierApplyExternalLabels(user, externalLabels); err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to update notifier", "user", user, "err", err)
return
}
}

r.lastReloadSuccessful.WithLabelValues(user).Set(1)
r.lastReloadSuccessfulTimestamp.WithLabelValues(user).SetToCurrentTime()
Expand Down Expand Up @@ -348,6 +362,19 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
return n.notifier, nil
}

func (r *DefaultMultiTenantManager) notifierApplyExternalLabels(userID string, externalLabels labels.Labels) error {
r.notifiersMtx.Lock()
defer r.notifiersMtx.Unlock()

n, ok := r.notifiers[userID]
if !ok {
return fmt.Errorf("notifier not found")
}
cfg := *r.notifierCfg // Copy it
cfg.GlobalConfig.ExternalLabels = externalLabels
return n.applyConfig(&cfg)
}

func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) {
r.ruleCacheMtx.RLock()
defer r.ruleCacheMtx.RUnlock()
Expand Down Expand Up @@ -402,6 +429,7 @@ func (r *DefaultMultiTenantManager) Stop() {

// cleanup user rules directories
r.mapper.cleanup()
r.userExternalLabels.cleanup()
}

func (*DefaultMultiTenantManager) ValidateRuleGroup(g rulefmt.RuleGroup) []error {
Expand Down
14 changes: 10 additions & 4 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func TestSyncRuleGroups(t *testing.T) {
}

ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
limits := ruleLimits{externalLabels: labels.FromStrings("from", "cortex")}

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, limits, ruleManagerFactory, nil, nil, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -61,6 +62,9 @@ func TestSyncRuleGroups(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{user}, users)
require.True(t, ok)
lset, ok := m.userExternalLabels.get(user)
require.True(t, ok)
require.Equal(t, limits.externalLabels, lset)
}

// Passing empty map / nil stops all managers.
Expand All @@ -79,6 +83,8 @@ func TestSyncRuleGroups(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string(nil), users)
require.False(t, ok)
_, ok = m.userExternalLabels.get(user)
require.False(t, ok)
}

// Resync same rules as before. Previously this didn't restart the manager.
Expand Down Expand Up @@ -154,7 +160,7 @@ func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) {
}

ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleLimits{}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(t, err)

m.SyncRuleGroups(context.Background(), userRules)
Expand Down Expand Up @@ -217,7 +223,7 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {

ruleManagerFactory := RuleManagerFactory(nil, waitDurations)

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleLimits{}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -265,7 +271,7 @@ func TestBackupRules(t *testing.T) {
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
config := Config{RulePath: dir}
config.Ring.ReplicationFactor = 3
m, err := NewDefaultMultiTenantManager(config, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
m, err := NewDefaultMultiTenantManager(config, ruleLimits{}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user1 = "testUser"
Expand Down
Loading

0 comments on commit 50d0a69

Please sign in to comment.