From b8e50f716ec29ba5b72cd8049e99c918fa40932f Mon Sep 17 00:00:00 2001 From: Santiago Date: Fri, 7 Jun 2024 17:18:18 +0200 Subject: [PATCH] Use grafana configuration in the Alertmanager (#8066) * Add promoted bool field to Grafana state and configuration * fix tests * add function to compute final configuration from Mimir and Grafana configurations * parse grafana configuration into an AlertConfigDesc * apply grafana configuration to the alertmanager ignoring grafana receivers * include grafana receivers * implement webhook sender, send notifications * add version and orgID to BuildReceiverIntegrations(), fix whSenderFn() * make tests compile * add logger implementation and logger factory * use NoopDecrypt from alerting package, delete getActiveReceiversMap * make sender work with the go-kit logger, use the sender, use logger factory * go back to using pointers for integrations, update alerting package and use helper functions, reduce diff * update AM fork * pass 1 as orgID when creating integrations * comments, tests * address code review comments, refactor --- go.mod | 2 +- pkg/alertmanager/alertmanager.go | 59 +++++++++++++++++++-- pkg/alertmanager/alertspb/compat.go | 2 +- pkg/alertmanager/config.go | 28 ++++++++++ pkg/alertmanager/log.go | 2 - pkg/alertmanager/multitenant.go | 52 ++++++++++++++++--- pkg/alertmanager/multitenant_test.go | 76 +++++++++++++++++++++++++++- 7 files changed, 205 insertions(+), 16 deletions(-) create mode 100644 pkg/alertmanager/config.go diff --git a/go.mod b/go.mod index 71d05764a6d..a4030e02ebd 100644 --- a/go.mod +++ b/go.mod @@ -111,6 +111,7 @@ require ( github.com/pires/go-proxyproto v0.7.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect + gopkg.in/telebot.v3 v3.2.1 // indirect k8s.io/apimachinery v0.29.3 // indirect k8s.io/client-go v0.29.3 // indirect k8s.io/klog/v2 v2.120.1 // indirect @@ -250,7 +251,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 gopkg.in/ini.v1 v1.67.0 // indirect - gopkg.in/telebot.v3 v3.2.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index b8651217242..7b1b83c4136 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -21,6 +21,9 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/alerting/definition" + "github.com/grafana/alerting/images" + alertingNotify "github.com/grafana/alerting/notify" + alertingReceivers "github.com/grafana/alerting/receivers" "github.com/grafana/dskit/flagext" "github.com/pkg/errors" "github.com/prometheus/alertmanager/api" @@ -60,6 +63,7 @@ import ( "github.com/grafana/mimir/pkg/alertmanager/alertstore" util_net "github.com/grafana/mimir/pkg/util/net" + "github.com/grafana/mimir/pkg/util/version" ) const ( @@ -362,7 +366,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *definition.PostableApiA // Create a firewall binded to the per-tenant config. firewallDialer := util_net.NewFirewallDialer(newFirewallDialerConfigProvider(userID, am.cfg.Limits)) - integrationsMap, err := buildIntegrationsMap(cfg.Receivers, tmpl, firewallDialer, am.logger, func(integrationName string, notifier notify.Notifier) notify.Notifier { + integrationsMap, err := buildIntegrationsMap(conf.Receivers, tmpl, firewallDialer, am.logger, func(integrationName string, notifier notify.Notifier) notify.Notifier { if am.cfg.Limits != nil { rl := &tenantRateLimits{ tenant: userID, @@ -455,20 +459,65 @@ func (am *Alertmanager) getFullState() (*clusterpb.FullState, error) { return am.state.GetFullState() } -// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a -// list of receiver config. -func buildIntegrationsMap(nc []config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, notifierWrapper func(string, notify.Notifier) notify.Notifier) (map[string][]*notify.Integration, error) { +// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a list of receiver config. +func buildIntegrationsMap(nc []*definition.PostableApiReceiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, notifierWrapper func(string, notify.Notifier) notify.Notifier) (map[string][]*notify.Integration, error) { integrationsMap := make(map[string][]*notify.Integration, len(nc)) for _, rcv := range nc { - integrations, err := buildReceiverIntegrations(rcv, tmpl, firewallDialer, logger, notifierWrapper) + var integrations []*notify.Integration + var err error + if rcv.Type() == definition.GrafanaReceiverType { + integrations, err = buildGrafanaReceiverIntegrations(rcv, tmpl, logger) + } else { + integrations, err = buildReceiverIntegrations(rcv.Receiver, tmpl, firewallDialer, logger, notifierWrapper) + } if err != nil { return nil, err } + integrationsMap[rcv.Name] = integrations } + return integrationsMap, nil } +func buildGrafanaReceiverIntegrations(rcv *definition.PostableApiReceiver, tmpl *template.Template, logger log.Logger) ([]*notify.Integration, error) { + loggerFactory := newLoggerFactory(logger) + whFn := func(n alertingReceivers.Metadata) (alertingReceivers.WebhookSender, error) { + return NewSender(logger), nil + } + emailFn := func(n alertingReceivers.Metadata) (alertingReceivers.EmailSender, error) { + return NewSender(logger), nil + } + + // The decrypt functions and the context are used to decrypt the configuration. + // We don't need to decrypt anything, so we can pass a no-op decrypt func and a context.Background(). + rCfg, err := alertingNotify.BuildReceiverConfiguration(context.Background(), definition.PostableAPIReceiverToAPIReceiver(rcv), alertingNotify.NoopDecrypt) + if err != nil { + return nil, err + } + + integrations, err := alertingNotify.BuildReceiverIntegrations( + rCfg, + tmpl, + &images.UnavailableProvider{}, // TODO: include images in notifications + loggerFactory, + whFn, + emailFn, + 1, // orgID is always 1. + version.Version, + ) + if err != nil { + return nil, err + } + + // TODO: use the nfstatus.Integration wrapper for all integrations. + upstreamIntegrations := make([]*notify.Integration, 0, len(integrations)) + for _, integration := range integrations { + upstreamIntegrations = append(upstreamIntegrations, integration.Integration()) + } + return upstreamIntegrations, nil +} + // buildReceiverIntegrations builds a list of integration notifiers off of a // receiver config. // Taken from https://github.com/prometheus/alertmanager/blob/94d875f1227b29abece661db1a68c001122d1da5/cmd/alertmanager/main.go#L112-L159. diff --git a/pkg/alertmanager/alertspb/compat.go b/pkg/alertmanager/alertspb/compat.go index 1a629f32b6a..06e4505cf10 100644 --- a/pkg/alertmanager/alertspb/compat.go +++ b/pkg/alertmanager/alertspb/compat.go @@ -19,7 +19,7 @@ type AlertConfigDescs struct { // ToProto transforms a yaml Alertmanager config and map of template files to an AlertConfigDesc. func ToProto(cfg string, templates map[string]string, user string) AlertConfigDesc { - tmpls := []*TemplateDesc{} + tmpls := make([]*TemplateDesc, 0, len(templates)) for fn, body := range templates { tmpls = append(tmpls, &TemplateDesc{ Body: body, diff --git a/pkg/alertmanager/config.go b/pkg/alertmanager/config.go new file mode 100644 index 00000000000..e9264068ec3 --- /dev/null +++ b/pkg/alertmanager/config.go @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/alertmanager/distributor.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Cortex Authors. + +package alertmanager + +import ( + "encoding/json" + "fmt" + + "github.com/grafana/mimir/pkg/alertmanager/alertspb" +) + +// parseGrafanaConfig creates an AlertConfigDesc from a GrafanaAlertConfigDesc. +func parseGrafanaConfig(cfg alertspb.GrafanaAlertConfigDesc) (alertspb.AlertConfigDesc, error) { + var amCfg GrafanaAlertmanagerConfig + if err := json.Unmarshal([]byte(cfg.RawConfig), &amCfg); err != nil { + return alertspb.AlertConfigDesc{}, fmt.Errorf("failed to unmarshal Grafana Alertmanager configuration %w", err) + } + + rawCfg, err := json.Marshal(amCfg.AlertmanagerConfig) + if err != nil { + return alertspb.AlertConfigDesc{}, fmt.Errorf("failed to marshal Grafana Alertmanager configuration %w", err) + } + + return alertspb.ToProto(string(rawCfg), amCfg.Templates, cfg.User), nil +} diff --git a/pkg/alertmanager/log.go b/pkg/alertmanager/log.go index b64ac23938c..9b0bcafaf42 100644 --- a/pkg/alertmanager/log.go +++ b/pkg/alertmanager/log.go @@ -14,8 +14,6 @@ type alertingLogger struct { } // newLoggerFactory returns a function that implements the alertingLogging.LoggerFactory interface. -// -//lint:ignore U1000 Ignore unused functions for now, they will be used to create the Grafana notifiers. func newLoggerFactory(logger log.Logger) alertingLogging.LoggerFactory { return func(loggerName string, ctx ...any) alertingLogging.Logger { keyvals := append([]any{"logger", loggerName}, ctx...) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index e8232c876b3..614349df7b7 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -324,7 +324,7 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, store alerts return createMultitenantAlertmanager(cfg, fallbackConfig, store, ringStore, limits, features, logger, registerer) } -// ComputeFallbackConfig will load, vaildate and return the provided fallbackConfigFile +// ComputeFallbackConfig will load, validate and return the provided fallbackConfigFile // or return an valid empty default configuration if none is provided. func ComputeFallbackConfig(fallbackConfigFile string) ([]byte, error) { if fallbackConfigFile != "" { @@ -638,11 +638,17 @@ func (am *MultitenantAlertmanager) isUserOwned(userID string) bool { return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr()) } -func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertConfigDescs) { - level.Debug(am.logger).Log("msg", "adding configurations", "num_configs", len(cfgs)) - for user, cfg := range cfgs { - err := am.setConfig(cfg.Mimir) +func (am *MultitenantAlertmanager) syncConfigs(cfgMap map[string]alertspb.AlertConfigDescs) { + level.Debug(am.logger).Log("msg", "adding configurations", "num_configs", len(cfgMap)) + for user, cfgs := range cfgMap { + cfg, err := am.computeConfig(cfgs) if err != nil { + am.multitenantMetrics.lastReloadSuccessful.WithLabelValues(user).Set(float64(0)) + level.Warn(am.logger).Log("msg", "error computing config", "err", err) + continue + } + + if err := am.setConfig(cfg); err != nil { am.multitenantMetrics.lastReloadSuccessful.WithLabelValues(user).Set(float64(0)) level.Warn(am.logger).Log("msg", "error applying config", "err", err) continue @@ -656,7 +662,7 @@ func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertCon am.alertmanagersMtx.Lock() for userID, userAM := range am.alertmanagers { - if _, exists := cfgs[userID]; !exists { + if _, exists := cfgMap[userID]; !exists { userAlertmanagersToStop[userID] = userAM delete(am.alertmanagers, userID) delete(am.cfgs, userID) @@ -675,6 +681,40 @@ func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertCon } } +// computeConfig takes an AlertConfigDescs struct containing Mimir and Grafana configurations. +// It returns the final configuration the Alertmanager will use. +func (am *MultitenantAlertmanager) computeConfig(cfgs alertspb.AlertConfigDescs) (alertspb.AlertConfigDesc, error) { + var cfg alertspb.AlertConfigDesc + switch { + // Mimir configuration. + case !cfgs.Grafana.Promoted: + level.Debug(am.logger).Log("msg", "grafana configuration not promoted, using mimir config", "user", cfgs.Mimir.User) + cfg = cfgs.Mimir + case cfgs.Grafana.Default: + level.Debug(am.logger).Log("msg", "grafana configuration is default, using mimir config", "user", cfgs.Mimir.User) + cfg = cfgs.Mimir + case cfgs.Grafana.RawConfig == "": + level.Debug(am.logger).Log("msg", "grafana configuration is empty, using mimir config", "user", cfgs.Mimir.User) + cfg = cfgs.Mimir + + // Grafana configuration. + case cfgs.Mimir.RawConfig == am.fallbackConfig: + level.Debug(am.logger).Log("msg", "mimir configuration is default, using grafana config", "user", cfgs.Mimir.User) + return parseGrafanaConfig(cfgs.Grafana) + case cfgs.Mimir.RawConfig == "": + level.Debug(am.logger).Log("msg", "mimir configuration is empty, using grafana config", "user", cfgs.Grafana.User) + return parseGrafanaConfig(cfgs.Grafana) + + // Both configurations. + // TODO: merge configurations. + default: + level.Warn(am.logger).Log("msg", "merging configurations not implemented, using mimir config", "user", cfgs.Mimir.User) + return cfgs.Mimir, nil + } + + return cfg, nil +} + // setConfig applies the given configuration to the alertmanager for `userID`, // creating an alertmanager if it doesn't already exist. func (am *MultitenantAlertmanager) setConfig(cfg alertspb.AlertConfigDesc) error { diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index a8a3005bff8..d141802af58 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -70,6 +70,8 @@ receivers: receivers: - name: dummy` + + grafanaConfig = `{"template_files":{},"alertmanager_config":{"route":{"receiver":"grafana-default-email","group_by":["grafana_folder","alertname"]},"templates":null,"receivers":[{"name":"grafana-default-email","grafana_managed_receiver_configs":[{"uid":"dde6ntuob69dtf","name":"WH","type":"webhook","disableResolveMessage":false,"settings":{"url":"http://localhost:8080","username":"test"},"secureSettings":{"password":"test"}}]}]}}` ) func mockAlertmanagerConfig(t *testing.T) *MultitenantAlertmanagerConfig { @@ -350,6 +352,75 @@ templates: require.True(t, cfgExists) require.Equal(t, simpleConfigTwo, currentConfig.RawConfig) + // Ensure that when a Grafana config is added, it is synced correctly + userGrafanaCfg := alertspb.GrafanaAlertConfigDesc{ + User: "user4", + RawConfig: grafanaConfig, + Hash: "test", + CreatedAtTimestamp: time.Now().Unix(), + Default: false, + Promoted: true, + } + emptyMimirConfig := alertspb.AlertConfigDesc{User: "user4"} + require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg)) + require.NoError(t, store.SetAlertConfig(ctx, alertspb.AlertConfigDesc{User: "user4"})) + + err = am.loadAndSyncConfigs(ctx, reasonPeriodic) + require.NoError(t, err) + require.Len(t, am.alertmanagers, 4) + + // The Mimir configuration was empty, so the Grafana configuration should be chosen for user 4. + parsed, err := parseGrafanaConfig(userGrafanaCfg) + require.NoError(t, err) + require.Equal(t, parsed, am.cfgs["user4"]) + + dirs = am.getPerUserDirectories() + user4Dir := dirs["user4"] + require.NotZero(t, user4Dir) + require.True(t, dirExists(t, user4Dir)) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_alertmanager_config_last_reload_successful Boolean set to 1 whenever the last configuration reload attempt was successful. + # TYPE cortex_alertmanager_config_last_reload_successful gauge + cortex_alertmanager_config_last_reload_successful{user="user1"} 1 + cortex_alertmanager_config_last_reload_successful{user="user2"} 1 + cortex_alertmanager_config_last_reload_successful{user="user3"} 1 + cortex_alertmanager_config_last_reload_successful{user="user4"} 1 + `), "cortex_alertmanager_config_last_reload_successful")) + + // Ensure the config can be unpromoted. + userGrafanaCfg.Promoted = false + require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg)) + + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + require.Equal(t, emptyMimirConfig, am.cfgs["user4"]) + + // Ensure the Grafana config is used when it's promoted again. + userGrafanaCfg.Promoted = true + require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg)) + + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + require.Equal(t, parsed, am.cfgs["user4"]) + + // Ensure the Grafana config is ignored when it's marked as default. + userGrafanaCfg.Default = true + require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg)) + + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + require.Equal(t, emptyMimirConfig, am.cfgs["user4"]) + + // Ensure the Grafana config is ignored when it's empty. + userGrafanaCfg.Default = false + userGrafanaCfg.RawConfig = "" + require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg)) + + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + require.Equal(t, emptyMimirConfig, am.cfgs["user4"]) + // Test Delete User, ensure config is removed and the resources are freed. require.NoError(t, store.DeleteAlertConfig(ctx, "user3")) err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) @@ -371,6 +442,7 @@ templates: # TYPE cortex_alertmanager_config_last_reload_successful gauge cortex_alertmanager_config_last_reload_successful{user="user1"} 1 cortex_alertmanager_config_last_reload_successful{user="user2"} 1 + cortex_alertmanager_config_last_reload_successful{user="user4"} 1 `), "cortex_alertmanager_config_last_reload_successful")) // Ensure when a 3rd config is re-added, it is synced correctly @@ -402,6 +474,7 @@ templates: cortex_alertmanager_config_last_reload_successful{user="user1"} 1 cortex_alertmanager_config_last_reload_successful{user="user2"} 1 cortex_alertmanager_config_last_reload_successful{user="user3"} 1 + cortex_alertmanager_config_last_reload_successful{user="user4"} 1 `), "cortex_alertmanager_config_last_reload_successful")) // Removed template files should be cleaned up @@ -2148,7 +2221,8 @@ func TestAlertmanager_StateReplication_InitialSyncFromPeers(t *testing.T) { // prepareInMemoryAlertStore builds and returns an in-memory alert store. func prepareInMemoryAlertStore() alertstore.AlertStore { - return bucketclient.NewBucketAlertStore(bucketclient.BucketAlertStoreConfig{}, objstore.NewInMemBucket(), nil, log.NewNopLogger()) + cfg := bucketclient.BucketAlertStoreConfig{FetchGrafanaConfig: true} + return bucketclient.NewBucketAlertStore(cfg, objstore.NewInMemBucket(), nil, log.NewNopLogger()) } func TestSafeTemplateFilepath(t *testing.T) {