Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use grafana configuration in the Alertmanager #8066

Merged
merged 24 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
520bc9a
Add promoted bool field to Grafana state and configuration
santihernandezc May 3, 2024
af5c9f9
fix tests
santihernandezc May 6, 2024
8112ab8
add function to compute final configuration from Mimir and Grafana co…
santihernandezc May 6, 2024
24bd0f3
parse grafana configuration into an AlertConfigDesc
santihernandezc May 7, 2024
8ecff41
apply grafana configuration to the alertmanager ignoring grafana rece…
santihernandezc May 7, 2024
38a3fb3
include grafana receivers
santihernandezc May 8, 2024
f60ee36
implement webhook sender, send notifications
santihernandezc May 8, 2024
7079756
Merge branch 'main' of https://github.com/grafana/mimir into santiher…
santihernandezc May 10, 2024
0359ebe
Merge branch 'main' of https://github.com/grafana/mimir into santiher…
santihernandezc May 10, 2024
79fcbdd
add version and orgID to BuildReceiverIntegrations(), fix whSenderFn()
santihernandezc May 20, 2024
effcdc4
make tests compile
santihernandezc May 20, 2024
b3aa755
add logger implementation and logger factory
santihernandezc May 20, 2024
8187b69
Merge branch 'main' of https://github.com/grafana/mimir into santiher…
santihernandezc May 23, 2024
46a5f94
Merge branch 'main' of https://github.com/grafana/mimir into santiher…
santihernandezc May 27, 2024
9aadec6
use NoopDecrypt from alerting package, delete getActiveReceiversMap
santihernandezc May 27, 2024
6555447
make sender work with the go-kit logger, use the sender, use logger f…
santihernandezc May 27, 2024
06e1253
go back to using pointers for integrations, update alerting package a…
santihernandezc May 27, 2024
55c2817
Merge branch 'main' of https://github.com/grafana/mimir into santiher…
santihernandezc May 28, 2024
2b83927
Merge branch 'main' of https://github.com/grafana/mimir into santiher…
santihernandezc Jun 3, 2024
3697724
update AM fork
santihernandezc Jun 3, 2024
fd33a39
Merge branch 'main' of https://github.com/grafana/mimir into santiher…
santihernandezc Jun 6, 2024
24501d1
pass 1 as orgID when creating integrations
santihernandezc Jun 6, 2024
1492f98
comments, tests
santihernandezc Jun 6, 2024
25202d3
address code review comments, refactor
santihernandezc Jun 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ require (
github.com/pires/go-proxyproto v0.7.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
gopkg.in/telebot.v3 v3.2.1 // indirect
k8s.io/apimachinery v0.29.3 // indirect
k8s.io/client-go v0.29.3 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
Expand Down Expand Up @@ -250,7 +251,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/telebot.v3 v3.2.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
59 changes: 54 additions & 5 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/alerting/definition"
"github.com/grafana/alerting/images"
alertingNotify "github.com/grafana/alerting/notify"
alertingReceivers "github.com/grafana/alerting/receivers"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/api"
Expand Down Expand Up @@ -60,6 +63,7 @@ import (

"github.com/grafana/mimir/pkg/alertmanager/alertstore"
util_net "github.com/grafana/mimir/pkg/util/net"
"github.com/grafana/mimir/pkg/util/version"
)

const (
Expand Down Expand Up @@ -362,7 +366,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *definition.PostableApiA
// Create a firewall binded to the per-tenant config.
firewallDialer := util_net.NewFirewallDialer(newFirewallDialerConfigProvider(userID, am.cfg.Limits))

integrationsMap, err := buildIntegrationsMap(cfg.Receivers, tmpl, firewallDialer, am.logger, func(integrationName string, notifier notify.Notifier) notify.Notifier {
integrationsMap, err := buildIntegrationsMap(conf.Receivers, tmpl, firewallDialer, am.logger, func(integrationName string, notifier notify.Notifier) notify.Notifier {
if am.cfg.Limits != nil {
rl := &tenantRateLimits{
tenant: userID,
Expand Down Expand Up @@ -455,20 +459,65 @@ func (am *Alertmanager) getFullState() (*clusterpb.FullState, error) {
return am.state.GetFullState()
}

// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a
// list of receiver config.
func buildIntegrationsMap(nc []config.Receiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, notifierWrapper func(string, notify.Notifier) notify.Notifier) (map[string][]*notify.Integration, error) {
// buildIntegrationsMap builds a map of name to the list of integration notifiers off of a list of receiver config.
func buildIntegrationsMap(nc []*definition.PostableApiReceiver, tmpl *template.Template, firewallDialer *util_net.FirewallDialer, logger log.Logger, notifierWrapper func(string, notify.Notifier) notify.Notifier) (map[string][]*notify.Integration, error) {
integrationsMap := make(map[string][]*notify.Integration, len(nc))
for _, rcv := range nc {
integrations, err := buildReceiverIntegrations(rcv, tmpl, firewallDialer, logger, notifierWrapper)
var integrations []*notify.Integration
var err error
if rcv.Type() == definition.GrafanaReceiverType {
integrations, err = buildGrafanaReceiverIntegrations(rcv, tmpl, logger)
} else {
integrations, err = buildReceiverIntegrations(rcv.Receiver, tmpl, firewallDialer, logger, notifierWrapper)
}
if err != nil {
return nil, err
}

integrationsMap[rcv.Name] = integrations
}

return integrationsMap, nil
}

func buildGrafanaReceiverIntegrations(rcv *definition.PostableApiReceiver, tmpl *template.Template, logger log.Logger) ([]*notify.Integration, error) {
loggerFactory := newLoggerFactory(logger)
whFn := func(n alertingReceivers.Metadata) (alertingReceivers.WebhookSender, error) {
return NewSender(logger), nil
}
emailFn := func(n alertingReceivers.Metadata) (alertingReceivers.EmailSender, error) {
return NewSender(logger), nil
}

// The decrypt functions and the context are used to decrypt the configuration.
// We don't need to decrypt anything, so we can pass a no-op decrypt func and a context.Background().
rCfg, err := alertingNotify.BuildReceiverConfiguration(context.Background(), definition.PostableAPIReceiverToAPIReceiver(rcv), alertingNotify.NoopDecrypt)
if err != nil {
return nil, err
}

integrations, err := alertingNotify.BuildReceiverIntegrations(
rCfg,
tmpl,
&images.UnavailableProvider{}, // TODO: include images in notifications
loggerFactory,
whFn,
emailFn,
1, // orgID is always 1.
version.Version,
)
if err != nil {
return nil, err
}

// TODO: use the nfstatus.Integration wrapper for all integrations.
upstreamIntegrations := make([]*notify.Integration, 0, len(integrations))
for _, integration := range integrations {
upstreamIntegrations = append(upstreamIntegrations, integration.Integration())
}
return upstreamIntegrations, nil
}

// buildReceiverIntegrations builds a list of integration notifiers off of a
// receiver config.
// Taken from https://github.com/prometheus/alertmanager/blob/94d875f1227b29abece661db1a68c001122d1da5/cmd/alertmanager/main.go#L112-L159.
Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertspb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type AlertConfigDescs struct {

// ToProto transforms a yaml Alertmanager config and map of template files to an AlertConfigDesc.
func ToProto(cfg string, templates map[string]string, user string) AlertConfigDesc {
tmpls := []*TemplateDesc{}
tmpls := make([]*TemplateDesc, 0, len(templates))
for fn, body := range templates {
tmpls = append(tmpls, &TemplateDesc{
Body: body,
Expand Down
28 changes: 28 additions & 0 deletions pkg/alertmanager/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/alertmanager/distributor.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.

package alertmanager

import (
"encoding/json"
"fmt"

"github.com/grafana/mimir/pkg/alertmanager/alertspb"
)

// parseGrafanaConfig creates an AlertConfigDesc from a GrafanaAlertConfigDesc.
func parseGrafanaConfig(cfg alertspb.GrafanaAlertConfigDesc) (alertspb.AlertConfigDesc, error) {
var amCfg GrafanaAlertmanagerConfig
if err := json.Unmarshal([]byte(cfg.RawConfig), &amCfg); err != nil {
return alertspb.AlertConfigDesc{}, fmt.Errorf("failed to unmarshal Grafana Alertmanager configuration %w", err)
}

rawCfg, err := json.Marshal(amCfg.AlertmanagerConfig)
if err != nil {
return alertspb.AlertConfigDesc{}, fmt.Errorf("failed to marshal Grafana Alertmanager configuration %w", err)
}

return alertspb.ToProto(string(rawCfg), amCfg.Templates, cfg.User), nil
}
2 changes: 0 additions & 2 deletions pkg/alertmanager/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ type alertingLogger struct {
}

// newLoggerFactory returns a function that implements the alertingLogging.LoggerFactory interface.
//
//lint:ignore U1000 Ignore unused functions for now, they will be used to create the Grafana notifiers.
func newLoggerFactory(logger log.Logger) alertingLogging.LoggerFactory {
return func(loggerName string, ctx ...any) alertingLogging.Logger {
keyvals := append([]any{"logger", loggerName}, ctx...)
Expand Down
52 changes: 46 additions & 6 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, store alerts
return createMultitenantAlertmanager(cfg, fallbackConfig, store, ringStore, limits, features, logger, registerer)
}

// ComputeFallbackConfig will load, vaildate and return the provided fallbackConfigFile
// ComputeFallbackConfig will load, validate and return the provided fallbackConfigFile
// or return an valid empty default configuration if none is provided.
func ComputeFallbackConfig(fallbackConfigFile string) ([]byte, error) {
if fallbackConfigFile != "" {
Expand Down Expand Up @@ -638,11 +638,17 @@ func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr())
}

func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertConfigDescs) {
level.Debug(am.logger).Log("msg", "adding configurations", "num_configs", len(cfgs))
for user, cfg := range cfgs {
err := am.setConfig(cfg.Mimir)
func (am *MultitenantAlertmanager) syncConfigs(cfgMap map[string]alertspb.AlertConfigDescs) {
level.Debug(am.logger).Log("msg", "adding configurations", "num_configs", len(cfgMap))
for user, cfgs := range cfgMap {
cfg, err := am.computeConfig(cfgs)
if err != nil {
am.multitenantMetrics.lastReloadSuccessful.WithLabelValues(user).Set(float64(0))
level.Warn(am.logger).Log("msg", "error computing config", "err", err)
continue
}

if err := am.setConfig(cfg); err != nil {
am.multitenantMetrics.lastReloadSuccessful.WithLabelValues(user).Set(float64(0))
level.Warn(am.logger).Log("msg", "error applying config", "err", err)
continue
Expand All @@ -656,7 +662,7 @@ func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertCon

am.alertmanagersMtx.Lock()
for userID, userAM := range am.alertmanagers {
if _, exists := cfgs[userID]; !exists {
if _, exists := cfgMap[userID]; !exists {
userAlertmanagersToStop[userID] = userAM
delete(am.alertmanagers, userID)
delete(am.cfgs, userID)
Expand All @@ -675,6 +681,40 @@ func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertCon
}
}

// computeConfig takes an AlertConfigDescs struct containing Mimir and Grafana configurations.
// It returns the final configuration the Alertmanager will use.
func (am *MultitenantAlertmanager) computeConfig(cfgs alertspb.AlertConfigDescs) (alertspb.AlertConfigDesc, error) {
var cfg alertspb.AlertConfigDesc
switch {
// Mimir configuration.
case !cfgs.Grafana.Promoted:
level.Debug(am.logger).Log("msg", "grafana configuration not promoted, using mimir config", "user", cfgs.Mimir.User)
cfg = cfgs.Mimir
case cfgs.Grafana.Default:
level.Debug(am.logger).Log("msg", "grafana configuration is default, using mimir config", "user", cfgs.Mimir.User)
cfg = cfgs.Mimir
case cfgs.Grafana.RawConfig == "":
level.Debug(am.logger).Log("msg", "grafana configuration is empty, using mimir config", "user", cfgs.Mimir.User)
cfg = cfgs.Mimir

// Grafana configuration.
case cfgs.Mimir.RawConfig == am.fallbackConfig:
level.Debug(am.logger).Log("msg", "mimir configuration is default, using grafana config", "user", cfgs.Mimir.User)
return parseGrafanaConfig(cfgs.Grafana)
case cfgs.Mimir.RawConfig == "":
level.Debug(am.logger).Log("msg", "mimir configuration is empty, using grafana config", "user", cfgs.Grafana.User)
return parseGrafanaConfig(cfgs.Grafana)

// Both configurations.
// TODO: merge configurations.
default:
level.Warn(am.logger).Log("msg", "merging configurations not implemented, using mimir config", "user", cfgs.Mimir.User)
return cfgs.Mimir, nil
}

return cfg, nil
}

// setConfig applies the given configuration to the alertmanager for `userID`,
// creating an alertmanager if it doesn't already exist.
func (am *MultitenantAlertmanager) setConfig(cfg alertspb.AlertConfigDesc) error {
Expand Down
76 changes: 75 additions & 1 deletion pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ receivers:

receivers:
- name: dummy`

grafanaConfig = `{"template_files":{},"alertmanager_config":{"route":{"receiver":"grafana-default-email","group_by":["grafana_folder","alertname"]},"templates":null,"receivers":[{"name":"grafana-default-email","grafana_managed_receiver_configs":[{"uid":"dde6ntuob69dtf","name":"WH","type":"webhook","disableResolveMessage":false,"settings":{"url":"http://localhost:8080","username":"test"},"secureSettings":{"password":"test"}}]}]}}`
)

func mockAlertmanagerConfig(t *testing.T) *MultitenantAlertmanagerConfig {
Expand Down Expand Up @@ -350,6 +352,75 @@ templates:
require.True(t, cfgExists)
require.Equal(t, simpleConfigTwo, currentConfig.RawConfig)

// Ensure that when a Grafana config is added, it is synced correctly
userGrafanaCfg := alertspb.GrafanaAlertConfigDesc{
User: "user4",
RawConfig: grafanaConfig,
Hash: "test",
CreatedAtTimestamp: time.Now().Unix(),
Default: false,
Promoted: true,
}
emptyMimirConfig := alertspb.AlertConfigDesc{User: "user4"}
require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg))
require.NoError(t, store.SetAlertConfig(ctx, alertspb.AlertConfigDesc{User: "user4"}))

err = am.loadAndSyncConfigs(ctx, reasonPeriodic)
require.NoError(t, err)
require.Len(t, am.alertmanagers, 4)

// The Mimir configuration was empty, so the Grafana configuration should be chosen for user 4.
parsed, err := parseGrafanaConfig(userGrafanaCfg)
require.NoError(t, err)
require.Equal(t, parsed, am.cfgs["user4"])

dirs = am.getPerUserDirectories()
user4Dir := dirs["user4"]
require.NotZero(t, user4Dir)
require.True(t, dirExists(t, user4Dir))

require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_alertmanager_config_last_reload_successful Boolean set to 1 whenever the last configuration reload attempt was successful.
# TYPE cortex_alertmanager_config_last_reload_successful gauge
cortex_alertmanager_config_last_reload_successful{user="user1"} 1
cortex_alertmanager_config_last_reload_successful{user="user2"} 1
cortex_alertmanager_config_last_reload_successful{user="user3"} 1
cortex_alertmanager_config_last_reload_successful{user="user4"} 1
`), "cortex_alertmanager_config_last_reload_successful"))

// Ensure the config can be unpromoted.
userGrafanaCfg.Promoted = false
require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg))

err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
require.Equal(t, emptyMimirConfig, am.cfgs["user4"])

// Ensure the Grafana config is used when it's promoted again.
userGrafanaCfg.Promoted = true
require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg))

err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
require.Equal(t, parsed, am.cfgs["user4"])

// Ensure the Grafana config is ignored when it's marked as default.
userGrafanaCfg.Default = true
require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg))

err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
require.Equal(t, emptyMimirConfig, am.cfgs["user4"])

// Ensure the Grafana config is ignored when it's empty.
userGrafanaCfg.Default = false
userGrafanaCfg.RawConfig = ""
require.NoError(t, store.SetGrafanaAlertConfig(ctx, userGrafanaCfg))

err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
require.Equal(t, emptyMimirConfig, am.cfgs["user4"])

// Test Delete User, ensure config is removed and the resources are freed.
require.NoError(t, store.DeleteAlertConfig(ctx, "user3"))
err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
Expand All @@ -371,6 +442,7 @@ templates:
# TYPE cortex_alertmanager_config_last_reload_successful gauge
cortex_alertmanager_config_last_reload_successful{user="user1"} 1
cortex_alertmanager_config_last_reload_successful{user="user2"} 1
cortex_alertmanager_config_last_reload_successful{user="user4"} 1
`), "cortex_alertmanager_config_last_reload_successful"))

// Ensure when a 3rd config is re-added, it is synced correctly
Expand Down Expand Up @@ -402,6 +474,7 @@ templates:
cortex_alertmanager_config_last_reload_successful{user="user1"} 1
cortex_alertmanager_config_last_reload_successful{user="user2"} 1
cortex_alertmanager_config_last_reload_successful{user="user3"} 1
cortex_alertmanager_config_last_reload_successful{user="user4"} 1
`), "cortex_alertmanager_config_last_reload_successful"))

// Removed template files should be cleaned up
Expand Down Expand Up @@ -2148,7 +2221,8 @@ func TestAlertmanager_StateReplication_InitialSyncFromPeers(t *testing.T) {

// prepareInMemoryAlertStore builds and returns an in-memory alert store.
func prepareInMemoryAlertStore() alertstore.AlertStore {
return bucketclient.NewBucketAlertStore(bucketclient.BucketAlertStoreConfig{}, objstore.NewInMemBucket(), nil, log.NewNopLogger())
cfg := bucketclient.BucketAlertStoreConfig{FetchGrafanaConfig: true}
return bucketclient.NewBucketAlertStore(cfg, objstore.NewInMemBucket(), nil, log.NewNopLogger())
}

func TestSafeTemplateFilepath(t *testing.T) {
Expand Down
Loading