diff --git a/CHANGELOG.md b/CHANGELOG.md index c15b12c8..417967be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add per cluster and installation overridable sharding strategy support for mimir-backed installations. + ### Fixed - Fix an issue where remote-write secret was not being created when head series query fails. @@ -19,7 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.1.0] - 2024-06-06 -### Add +### Added - Add support for mimir in remoteWrite secret creation. - Add mimir ingress secret for basic auth creation. diff --git a/docs/monitoring/README.md b/docs/monitoring/README.md new file mode 100644 index 00000000..7323afe8 --- /dev/null +++ b/docs/monitoring/README.md @@ -0,0 +1,5 @@ +# Introduction + +The Observability Operator is in charge of configuring the Prometheus Agent instances running in workload clusters like remote write configuration, [sharding](sharding.md) and so on. + +TODO(atlas): Add operator specific documentation here ("sequence diagrams", list of created and managed resources) diff --git a/docs/monitoring/sharding.md b/docs/monitoring/sharding.md new file mode 100644 index 00000000..4fe5fd53 --- /dev/null +++ b/docs/monitoring/sharding.md @@ -0,0 +1,23 @@ +# Prometheus Agent Sharding + +To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator chooses the number of running __prometheus agent shards__ on each workload cluster. The number of shards is based on the __total number of time series__ ingested for a given cluster. + +__By default__, the operator configures 1 shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%. + +Scale up series threshold and scale down percentage are overridables. + +1. Those values can be configured at the installation level by overriding the following values: + +```yaml +monitoring: + sharding: + scaleUpSeriesCount: 1000000 + scaleDownPercentage: 0.20 +``` + +2. Those values can also be set per cluster using the following cluster annotations: + +```yaml +monitoring.giantswarm.io/prometheus-agent-scale-up-series-count: 1000000 +monitoring.giantswarm.io/prometheus-agent-scale-down-percentage: 0.20 +``` diff --git a/helm/observability-operator/templates/deployment.yaml b/helm/observability-operator/templates/deployment.yaml index d21c7ab8..9b9d1e42 100644 --- a/helm/observability-operator/templates/deployment.yaml +++ b/helm/observability-operator/templates/deployment.yaml @@ -31,6 +31,8 @@ spec: - --management-cluster-pipeline={{ $.Values.managementCluster.pipeline }} - --management-cluster-region={{ $.Values.managementCluster.region }} - --monitoring-enabled={{ $.Values.monitoring.enabled }} + - --monitoring-sharding-scale-up-series-count={{ $.Values.monitoring.sharding.scaleUpSeriesCount }} + - --monitoring-sharding-scale-down-percentage={{ $.Values.monitoring.sharding.scaleDownPercentage }} {{- if .Values.monitoring.prometheusVersion }} - --prometheus-version={{ $.Values.monitoring.prometheusVersion }} {{- end }} diff --git a/helm/observability-operator/values.yaml b/helm/observability-operator/values.yaml index 2d4231e0..87e69e3f 100644 --- a/helm/observability-operator/values.yaml +++ b/helm/observability-operator/values.yaml @@ -19,6 +19,9 @@ monitoring: enabled: false opsgenieApiKey: "" prometheusVersion: "" + sharding: + scaleUpSeriesCount: 1000000 + scaleDownPercentage: 0.20 operator: # -- Configures the resources for the operator deployment diff --git a/internal/controller/cluster_monitoring_controller.go b/internal/controller/cluster_monitoring_controller.go index 60b63933..c0873033 100644 --- a/internal/controller/cluster_monitoring_controller.go +++ b/internal/controller/cluster_monitoring_controller.go @@ -48,8 +48,8 @@ type ClusterMonitoringReconciler struct { heartbeat.HeartbeatRepository // MimirService is the service for managing mimir configuration. mimir.MimirService - // MonitoringEnabled defines whether monitoring is enabled at the installation level. - MonitoringEnabled bool + // MonitoringConfig is the configuration for the monitoring package. + MonitoringConfig monitoring.Config } // SetupWithManager sets up the controller with the Manager. @@ -82,12 +82,12 @@ func (r *ClusterMonitoringReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{}, errors.WithStack(err) } - // Linting is disabled for the 2 following lines as otherwise it fails with the following error: + // Linting is disabled for the following line as otherwise it fails with the following error: // "should not use built-in type string as key for value" - logger := log.FromContext(ctx).WithValues("cluster", cluster.Name).WithValues("installation", r.ManagementCluster.Name) // nolint + logger := log.FromContext(ctx).WithValues("installation", r.ManagementCluster.Name) // nolint ctx = log.IntoContext(ctx, logger) - if !r.MonitoringEnabled { + if !r.MonitoringConfig.Enabled { logger.Info("Monitoring is disabled at the installation level") return ctrl.Result{}, nil } diff --git a/main.go b/main.go index 06de81aa..b46647cf 100644 --- a/main.go +++ b/main.go @@ -41,9 +41,11 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/common/password" + "github.com/giantswarm/observability-operator/pkg/monitoring" "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" "github.com/giantswarm/observability-operator/pkg/monitoring/mimir" "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding" //+kubebuilder:scaffold:imports ) @@ -51,19 +53,23 @@ var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") - metricsAddr string - enableLeaderElection bool - probeAddr string - secureMetrics bool - enableHTTP2 bool + metricsAddr string + enableLeaderElection bool + probeAddr string + secureMetrics bool + enableHTTP2 bool + managementClusterBaseDomain string managementClusterCustomer string managementClusterInsecureCA bool managementClusterName string managementClusterPipeline string managementClusterRegion string - monitoringEnabled bool - prometheusVersion string + + monitoringEnabled bool + monitoringShardingScaleUpSeriesCount float64 + monitoringShardingScaleDownPercentage float64 + prometheusVersion string ) const ( @@ -103,6 +109,10 @@ func main() { "The region of the management cluster.") flag.BoolVar(&monitoringEnabled, "monitoring-enabled", false, "Enable monitoring at the management cluster level.") + flag.Float64Var(&monitoringShardingScaleUpSeriesCount, "monitoring-sharding-scale-up-series-count", 0, + "Configures the number of time series needed to add an extra prometheus agent shard.") + flag.Float64Var(&monitoringShardingScaleDownPercentage, "monitoring-sharding-scale-down-percentage", 0, + "Configures the percentage of removed series to scale down the number of prometheus agent shards.") flag.StringVar(&prometheusVersion, "prometheus-version", "", "The version of Prometheus Agents to deploy.") opts := zap.Options{ @@ -187,13 +197,21 @@ func main() { organizationRepository := organization.NewNamespaceRepository(mgr.GetClient()) - // TODO(atlas): validate prometheus version + monitoringConfig := monitoring.Config{ + Enabled: monitoringEnabled, + DefaultShardingStrategy: sharding.Strategy{ + ScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount, + ScaleDownPercentage: monitoringShardingScaleDownPercentage, + }, + PrometheusVersion: prometheusVersion, + } + prometheusAgentService := prometheusagent.PrometheusAgentService{ Client: mgr.GetClient(), OrganizationRepository: organizationRepository, PasswordManager: password.SimpleManager{}, ManagementCluster: managementCluster, - PrometheusVersion: prometheusVersion, + MonitoringConfig: monitoringConfig, } mimirService := mimir.MimirService{ @@ -208,7 +226,7 @@ func main() { HeartbeatRepository: heartbeatRepository, PrometheusAgentService: prometheusAgentService, MimirService: mimirService, - MonitoringEnabled: monitoringEnabled, + MonitoringConfig: monitoringConfig, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/pkg/monitoring/config.go b/pkg/monitoring/config.go new file mode 100644 index 00000000..77ad1477 --- /dev/null +++ b/pkg/monitoring/config.go @@ -0,0 +1,11 @@ +package monitoring + +import "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding" + +// Config represents the configuration used by the monitoring package. +type Config struct { + Enabled bool + DefaultShardingStrategy sharding.Strategy + // TODO(atlas): validate prometheus version using SemVer + PrometheusVersion string +} diff --git a/pkg/monitoring/prometheusagent/config.go b/pkg/monitoring/prometheusagent/configmap.go similarity index 73% rename from pkg/monitoring/prometheusagent/config.go rename to pkg/monitoring/prometheusagent/configmap.go index f6014f4b..9e577a72 100644 --- a/pkg/monitoring/prometheusagent/config.go +++ b/pkg/monitoring/prometheusagent/configmap.go @@ -3,6 +3,7 @@ package prometheusagent import ( "context" "fmt" + "strconv" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -14,7 +15,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/metrics" "github.com/giantswarm/observability-operator/pkg/monitoring/mimir/querier" - "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/shards" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding" ) func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, @@ -50,16 +51,23 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, logger.Error(err, "failed to query head series") metrics.MimirQueryErrors.WithLabelValues().Inc() } - shards := shards.ComputeShards(currentShards, headSeries) + + clusterShardingStrategy, err := getClusterShardingStrategy(cluster) + if err != nil { + return nil, errors.WithStack(err) + } + + shardingStrategy := pas.MonitoringConfig.DefaultShardingStrategy.Merge(clusterShardingStrategy) + shards := shardingStrategy.ComputeShards(currentShards, headSeries) config, err := yaml.Marshal(RemoteWriteConfig{ PrometheusAgentConfig: &PrometheusAgentConfig{ ExternalLabels: externalLabels, Image: &PrometheusAgentImage{ - Tag: pas.PrometheusVersion, + Tag: pas.MonitoringConfig.PrometheusVersion, }, Shards: shards, - Version: pas.PrometheusVersion, + Version: pas.MonitoringConfig.PrometheusVersion, }, }) if err != nil { @@ -94,6 +102,25 @@ func getServicePriority(cluster *clusterv1.Cluster) string { return defaultServicePriority } +func getClusterShardingStrategy(cluster metav1.Object) (*sharding.Strategy, error) { + var err error + var scaleUpSeriesCount, scaleDownPercentage float64 + if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-up-series-count"]; ok { + if scaleUpSeriesCount, err = strconv.ParseFloat(value, 64); err != nil { + return nil, err + } + } + if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-down-percentage"]; ok { + if scaleDownPercentage, err = strconv.ParseFloat(value, 64); err != nil { + return nil, err + } + } + return &sharding.Strategy{ + ScaleUpSeriesCount: scaleUpSeriesCount, + ScaleDownPercentage: scaleDownPercentage, + }, nil +} + func readCurrentShardsFromConfig(configMap corev1.ConfigMap) (int, error) { remoteWriteConfig := RemoteWriteConfig{} err := yaml.Unmarshal([]byte(configMap.Data["values"]), &remoteWriteConfig) diff --git a/pkg/monitoring/prometheusagent/service.go b/pkg/monitoring/prometheusagent/service.go index 523b602a..f49bcc21 100644 --- a/pkg/monitoring/prometheusagent/service.go +++ b/pkg/monitoring/prometheusagent/service.go @@ -15,6 +15,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/common/password" + "github.com/giantswarm/observability-operator/pkg/monitoring" ) type PrometheusAgentService struct { @@ -22,14 +23,14 @@ type PrometheusAgentService struct { organization.OrganizationRepository PasswordManager password.Manager common.ManagementCluster - PrometheusVersion string + MonitoringConfig monitoring.Config } // ReconcileRemoteWriteConfiguration ensures that the prometheus remote write config is present in the cluster. func (pas *PrometheusAgentService) ReconcileRemoteWriteConfiguration( ctx context.Context, cluster *clusterv1.Cluster) error { - logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) + logger := log.FromContext(ctx) logger.Info("ensuring prometheus agent remote write configmap and secret") err := pas.createOrUpdateConfigMap(ctx, cluster, logger) @@ -140,7 +141,7 @@ func (pas PrometheusAgentService) createOrUpdateSecret(ctx context.Context, func (pas *PrometheusAgentService) DeleteRemoteWriteConfiguration( ctx context.Context, cluster *clusterv1.Cluster) error { - logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) + logger := log.FromContext(ctx) logger.Info("deleting prometheus agent remote write configmap and secret") err := pas.deleteConfigMap(ctx, cluster) diff --git a/pkg/monitoring/prometheusagent/sharding/sharding.go b/pkg/monitoring/prometheusagent/sharding/sharding.go new file mode 100644 index 00000000..9b0eae3a --- /dev/null +++ b/pkg/monitoring/prometheusagent/sharding/sharding.go @@ -0,0 +1,46 @@ +package sharding + +import "math" + +type Strategy struct { + // Configures the number of series needed to add a new shard. Computation is number of series / ScaleUpSeriesCount + ScaleUpSeriesCount float64 + // Percentage of needed series based on ScaleUpSeriesCount to scale down agents + ScaleDownPercentage float64 +} + +func (s Strategy) Merge(newStrategy *Strategy) Strategy { + strategy := Strategy{ + s.ScaleUpSeriesCount, + s.ScaleDownPercentage, + } + if newStrategy != nil { + if newStrategy.ScaleUpSeriesCount > 0 { + strategy.ScaleUpSeriesCount = newStrategy.ScaleUpSeriesCount + } + if newStrategy.ScaleDownPercentage > 0 { + strategy.ScaleDownPercentage = newStrategy.ScaleDownPercentage + } + } + return strategy +} + +// We want to start with 1 prometheus-agent for each 1M time series with a scale down 20% threshold. +func (pass Strategy) ComputeShards(currentShardCount int, timeSeries float64) int { + shardScaleDownThreshold := pass.ScaleDownPercentage * pass.ScaleUpSeriesCount + desiredShardCount := int(math.Ceil(timeSeries / pass.ScaleUpSeriesCount)) + + // Compute Scale Down + if currentShardCount > desiredShardCount { + // Check if the remainder of (timeSeries mod ScaleupSeriesCount) is bigger than the scale down threshold. + if math.Mod(timeSeries, pass.ScaleUpSeriesCount) > pass.ScaleUpSeriesCount-shardScaleDownThreshold { + desiredShardCount = currentShardCount + } + } + + // We always have a minimum of 1 agent, even if there is no worker node + if desiredShardCount <= 0 { + return 1 + } + return desiredShardCount +} diff --git a/pkg/monitoring/prometheusagent/sharding/sharding_test.go b/pkg/monitoring/prometheusagent/sharding/sharding_test.go new file mode 100644 index 00000000..007c6cda --- /dev/null +++ b/pkg/monitoring/prometheusagent/sharding/sharding_test.go @@ -0,0 +1,102 @@ +package sharding + +import ( + "testing" +) + +type testCase struct { + currentShardCount int + timeSeries float64 + expected int +} + +var defaultShardingStrategy = Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + +var tests = []struct { + name string + strategy Strategy + cases []testCase +}{ + { + name: "scale up", + strategy: defaultShardingStrategy, + cases: []testCase{ + { + currentShardCount: 0, + timeSeries: float64(1_000_000), + expected: 1, + }, + { + currentShardCount: 0, + timeSeries: float64(1_000_001), + expected: 2, + }, + { + currentShardCount: 0, + timeSeries: float64(2_000_001), + expected: 3, + }, + }, + }, + { + name: "scale down", + strategy: defaultShardingStrategy, + cases: []testCase{ + { + currentShardCount: 1, + timeSeries: float64(1_000_001), + expected: 2, + }, + { + currentShardCount: 2, + timeSeries: float64(999_999), + expected: 2, + }, + { + currentShardCount: 2, + timeSeries: float64(800_001), + expected: 2, + }, + { + currentShardCount: 2, + // 20% default threshold hit + timeSeries: float64(800_000), + expected: 1, + }, + }, + }, + { + name: "always defaults to 1", + strategy: defaultShardingStrategy, + cases: []testCase{ + { + currentShardCount: 0, + timeSeries: float64(0), + expected: 1, + }, + { + currentShardCount: 0, + timeSeries: float64(-5), + expected: 1, + }, + }, + }, +} + +func TestShardComputationLogic(t *testing.T) { + t.Parallel() + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + for _, c := range tt.cases { + c := c + result := tt.strategy.ComputeShards(c.currentShardCount, c.timeSeries) + if result != c.expected { + t.Errorf(`expected computeShards(%d, %f) to be %d, got %d`, c.currentShardCount, c.timeSeries, c.expected, result) + } + t.Logf(`computeShards(%d, %f) = %d`, c.currentShardCount, c.timeSeries, result) + } + }) + } +} diff --git a/pkg/monitoring/prometheusagent/shards/shards.go b/pkg/monitoring/prometheusagent/shards/shards.go deleted file mode 100644 index 25394ff4..00000000 --- a/pkg/monitoring/prometheusagent/shards/shards.go +++ /dev/null @@ -1,28 +0,0 @@ -package shards - -import "math" - -const ( - shardStep = float64(1_000_000) - shardScaleDownPercentage = float64(0.20) - shardScaleDownThreshold = shardScaleDownPercentage * shardStep -) - -// We want to start with 1 prometheus-agent for each 1M time series with a scale down 25% threshold. -func ComputeShards(currentShardCount int, timeSeries float64) int { - desiredShardCount := int(math.Ceil(timeSeries / shardStep)) - - // Compute Scale Down - if currentShardCount > desiredShardCount { - // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold - if math.Mod(timeSeries, shardStep) > shardStep-shardScaleDownThreshold { - desiredShardCount = currentShardCount - } - } - - // We always have a minimum of 1 agent, even if there is no worker node - if desiredShardCount <= 0 { - return 1 - } - return desiredShardCount -} diff --git a/pkg/monitoring/prometheusagent/shards/shards_test.go b/pkg/monitoring/prometheusagent/shards/shards_test.go deleted file mode 100644 index 3043a5ec..00000000 --- a/pkg/monitoring/prometheusagent/shards/shards_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package shards - -import ( - "flag" - "testing" -) - -var _ = flag.Bool("update", false, "update the output file") - -func TestShardComputationScaleUp(t *testing.T) { - expected := 1 - result := ComputeShards(0, float64(1_000_000)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 1_000_000) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(0, float64(1_000_001)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 3 - result = ComputeShards(0, float64(2_000_001)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 2_000_001) to be %d, got %d`, expected, result) - } -} - -func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { - expected := 1 - result := ComputeShards(0, 0) - if result != expected { - t.Errorf(`expected ComputeShards(0, 0) to be %d, got %d`, expected, result) - } - - expected = 1 - result = ComputeShards(0, -5) - if result != expected { - t.Errorf(`expected ComputeShards(0, -5) to be %d, got %d`, expected, result) - } -} - -func TestShardComputationScaleDown(t *testing.T) { - expected := 2 - result := ComputeShards(1, 1_000_001) - if result != expected { - t.Errorf(`expected ComputeShards(1, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(2, 999_999) - if result != expected { - t.Errorf(`expected ComputeShards(2, 999_999) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(2, 800_001) - if result != expected { - t.Errorf(`expected ComputeShards(2, 800_001) to be %d, got %d`, expected, result) - } - - // threshold hit - expected = 1 - result = ComputeShards(2, 800_000) - if result != expected { - t.Errorf(`expected ComputeShards(2, 800_000) to be %d, got %d`, expected, result) - } -}