From d5423e4e79a29317021fa95821a856a4cc6f9869 Mon Sep 17 00:00:00 2001 From: QuentinBisson Date: Mon, 13 May 2024 14:16:43 +0200 Subject: [PATCH 1/9] Add sharding strategy support Signed-off-by: QuentinBisson --- CHANGELOG.md | 6 +- docs/monitoring/README.md | 5 ++ docs/monitoring/sharding.md | 24 +++++++ .../templates/deployment.yaml | 2 + helm/observability-operator/values.yaml | 3 + .../cluster_monitoring_controller.go | 8 +-- main.go | 35 ++++++--- pkg/monitoring/config.go | 10 +++ .../{config.go => configmap.go} | 39 ++++++++-- pkg/monitoring/prometheusagent/service.go | 7 +- .../prometheusagent/sharding/sharding.go | 46 ++++++++++++ .../prometheusagent/sharding/sharding_test.go | 71 +++++++++++++++++++ .../prometheusagent/shards/shards.go | 28 -------- .../prometheusagent/shards/shards_test.go | 69 ------------------ 14 files changed, 234 insertions(+), 119 deletions(-) create mode 100644 docs/monitoring/README.md create mode 100644 docs/monitoring/sharding.md create mode 100644 pkg/monitoring/config.go rename pkg/monitoring/prometheusagent/{config.go => configmap.go} (71%) create mode 100644 pkg/monitoring/prometheusagent/sharding/sharding.go create mode 100644 pkg/monitoring/prometheusagent/sharding/sharding_test.go delete mode 100644 pkg/monitoring/prometheusagent/shards/shards.go delete mode 100644 pkg/monitoring/prometheusagent/shards/shards_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c15b12c8..417967be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add per cluster and installation overridable sharding strategy support for mimir-backed installations. + ### Fixed - Fix an issue where remote-write secret was not being created when head series query fails. @@ -19,7 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.1.0] - 2024-06-06 -### Add +### Added - Add support for mimir in remoteWrite secret creation. - Add mimir ingress secret for basic auth creation. diff --git a/docs/monitoring/README.md b/docs/monitoring/README.md new file mode 100644 index 00000000..fab5de0a --- /dev/null +++ b/docs/monitoring/README.md @@ -0,0 +1,5 @@ +# Introduction + +The Observability Operator is in charge of configuring the Prometheus Agent instances running in workload clusters like remote write configuration, [sharding](sharding.md) and so on. + +TODO(atlas): Add documentation here. diff --git a/docs/monitoring/sharding.md b/docs/monitoring/sharding.md new file mode 100644 index 00000000..1aafa386 --- /dev/null +++ b/docs/monitoring/sharding.md @@ -0,0 +1,24 @@ + +# Prometheus Agent Sharding + +To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator decides on the number of running __prometheus agent shards__ for each workload cluster based . The number of shards depends on the __total number of time series__ stored for a given cluster. + +__By default__, the operator configures a shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%. + +As this default value was not enough to avoid workload disruptions, we added 2 ways to be able to override the scale up series count target and the scale down percentage. + +1. Those values can be configured at the installation level by overriding the following values: + +```yaml +monitoring: + sharding: + scaleUpSeriesCount: 1000000 + scaleDownPercentage: 0.20 +``` + +2. Those values can also be set per cluster using the following cluster annotations: + +```yaml +monitoring.giantswarm.io/prometheus-agent-scale-up-series-count: 1000000 +monitoring.giantswarm.io/prometheus-agent-scale-down-percentage: 0.20 +``` diff --git a/helm/observability-operator/templates/deployment.yaml b/helm/observability-operator/templates/deployment.yaml index d21c7ab8..9b9d1e42 100644 --- a/helm/observability-operator/templates/deployment.yaml +++ b/helm/observability-operator/templates/deployment.yaml @@ -31,6 +31,8 @@ spec: - --management-cluster-pipeline={{ $.Values.managementCluster.pipeline }} - --management-cluster-region={{ $.Values.managementCluster.region }} - --monitoring-enabled={{ $.Values.monitoring.enabled }} + - --monitoring-sharding-scale-up-series-count={{ $.Values.monitoring.sharding.scaleUpSeriesCount }} + - --monitoring-sharding-scale-down-percentage={{ $.Values.monitoring.sharding.scaleDownPercentage }} {{- if .Values.monitoring.prometheusVersion }} - --prometheus-version={{ $.Values.monitoring.prometheusVersion }} {{- end }} diff --git a/helm/observability-operator/values.yaml b/helm/observability-operator/values.yaml index 2d4231e0..87e69e3f 100644 --- a/helm/observability-operator/values.yaml +++ b/helm/observability-operator/values.yaml @@ -19,6 +19,9 @@ monitoring: enabled: false opsgenieApiKey: "" prometheusVersion: "" + sharding: + scaleUpSeriesCount: 1000000 + scaleDownPercentage: 0.20 operator: # -- Configures the resources for the operator deployment diff --git a/internal/controller/cluster_monitoring_controller.go b/internal/controller/cluster_monitoring_controller.go index 60b63933..0aabcad7 100644 --- a/internal/controller/cluster_monitoring_controller.go +++ b/internal/controller/cluster_monitoring_controller.go @@ -48,8 +48,8 @@ type ClusterMonitoringReconciler struct { heartbeat.HeartbeatRepository // MimirService is the service for managing mimir configuration. mimir.MimirService - // MonitoringEnabled defines whether monitoring is enabled at the installation level. - MonitoringEnabled bool + // MonitoringConfig is the configuration for the monitoring package. + MonitoringConfig monitoring.Config } // SetupWithManager sets up the controller with the Manager. @@ -84,10 +84,10 @@ func (r *ClusterMonitoringReconciler) Reconcile(ctx context.Context, req ctrl.Re // Linting is disabled for the 2 following lines as otherwise it fails with the following error: // "should not use built-in type string as key for value" - logger := log.FromContext(ctx).WithValues("cluster", cluster.Name).WithValues("installation", r.ManagementCluster.Name) // nolint + logger := log.FromContext(ctx).WithValues("installation", r.ManagementCluster.Name) // nolint ctx = log.IntoContext(ctx, logger) - if !r.MonitoringEnabled { + if !r.MonitoringConfig.Enabled { logger.Info("Monitoring is disabled at the installation level") return ctrl.Result{}, nil } diff --git a/main.go b/main.go index 06de81aa..61b7c53b 100644 --- a/main.go +++ b/main.go @@ -41,6 +41,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/common/password" + "github.com/giantswarm/observability-operator/pkg/monitoring" "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" "github.com/giantswarm/observability-operator/pkg/monitoring/mimir" "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent" @@ -51,19 +52,23 @@ var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") - metricsAddr string - enableLeaderElection bool - probeAddr string - secureMetrics bool - enableHTTP2 bool + metricsAddr string + enableLeaderElection bool + probeAddr string + secureMetrics bool + enableHTTP2 bool + managementClusterBaseDomain string managementClusterCustomer string managementClusterInsecureCA bool managementClusterName string managementClusterPipeline string managementClusterRegion string - monitoringEnabled bool - prometheusVersion string + + monitoringEnabled bool + monitoringShardingScaleUpSeriesCount float64 + monitoringShardingScaleDownPercentage float64 + prometheusVersion string ) const ( @@ -103,6 +108,10 @@ func main() { "The region of the management cluster.") flag.BoolVar(&monitoringEnabled, "monitoring-enabled", false, "Enable monitoring at the management cluster level.") + flag.Float64Var(&monitoringShardingScaleUpSeriesCount, "monitoring-sharding-scale-up-series-count", 0, + "Configures the number of time series needed to add an extra prometheus agent shard.") + flag.Float64Var(&monitoringShardingScaleDownPercentage, "monitoring-sharding-scale-down-percentage", 0, + "Configures the percentage of removed series to scale down the number of prometheus agent shards.") flag.StringVar(&prometheusVersion, "prometheus-version", "", "The version of Prometheus Agents to deploy.") opts := zap.Options{ @@ -187,13 +196,19 @@ func main() { organizationRepository := organization.NewNamespaceRepository(mgr.GetClient()) - // TODO(atlas): validate prometheus version + monitoringConfig := monitoring.Config{ + Enabled: monitoringEnabled, + ShardingScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount, + ShardingScaleDownPercentage: monitoringShardingScaleDownPercentage, + PrometheusVersion: prometheusVersion, + } + prometheusAgentService := prometheusagent.PrometheusAgentService{ Client: mgr.GetClient(), OrganizationRepository: organizationRepository, PasswordManager: password.SimpleManager{}, ManagementCluster: managementCluster, - PrometheusVersion: prometheusVersion, + MonitoringConfig: monitoringConfig, } mimirService := mimir.MimirService{ @@ -208,7 +223,7 @@ func main() { HeartbeatRepository: heartbeatRepository, PrometheusAgentService: prometheusAgentService, MimirService: mimirService, - MonitoringEnabled: monitoringEnabled, + MonitoringConfig: monitoringConfig, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/pkg/monitoring/config.go b/pkg/monitoring/config.go new file mode 100644 index 00000000..e1427c17 --- /dev/null +++ b/pkg/monitoring/config.go @@ -0,0 +1,10 @@ +package monitoring + +// Config represents the configuration used by the monitoring package. +type Config struct { + Enabled bool + ShardingScaleUpSeriesCount float64 + ShardingScaleDownPercentage float64 + // TODO(atlas): validate prometheus version + PrometheusVersion string +} diff --git a/pkg/monitoring/prometheusagent/config.go b/pkg/monitoring/prometheusagent/configmap.go similarity index 71% rename from pkg/monitoring/prometheusagent/config.go rename to pkg/monitoring/prometheusagent/configmap.go index f6014f4b..3396705b 100644 --- a/pkg/monitoring/prometheusagent/config.go +++ b/pkg/monitoring/prometheusagent/configmap.go @@ -3,6 +3,7 @@ package prometheusagent import ( "context" "fmt" + "strconv" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -14,7 +15,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/metrics" "github.com/giantswarm/observability-operator/pkg/monitoring/mimir/querier" - "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/shards" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding" ) func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, @@ -50,16 +51,27 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, logger.Error(err, "failed to query head series") metrics.MimirQueryErrors.WithLabelValues().Inc() } - shards := shards.ComputeShards(currentShards, headSeries) + + clusterShardingStrategy, err := getClusterShardingStrategy(cluster) + if err != nil { + return nil, errors.WithStack(err) + } + + shardingStrategy := sharding.ShardingStrategy{ + ScaleUpSeriesCount: pas.MonitoringConfig.ShardingScaleUpSeriesCount, + ScaleDownPercentage: pas.MonitoringConfig.ShardingScaleDownPercentage, + }.Merge(clusterShardingStrategy) + + shards := shardingStrategy.ComputeShards(currentShards, headSeries) config, err := yaml.Marshal(RemoteWriteConfig{ PrometheusAgentConfig: &PrometheusAgentConfig{ ExternalLabels: externalLabels, Image: &PrometheusAgentImage{ - Tag: pas.PrometheusVersion, + Tag: pas.MonitoringConfig.PrometheusVersion, }, Shards: shards, - Version: pas.PrometheusVersion, + Version: pas.MonitoringConfig.PrometheusVersion, }, }) if err != nil { @@ -94,6 +106,25 @@ func getServicePriority(cluster *clusterv1.Cluster) string { return defaultServicePriority } +func getClusterShardingStrategy(cluster metav1.Object) (*sharding.ShardingStrategy, error) { + var err error + var scaleUpSeriesCount, scaleDownPercentage float64 + if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-up-series-count"]; ok { + if scaleUpSeriesCount, err = strconv.ParseFloat(value, 64); err != nil { + return nil, err + } + } + if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-down-percentage"]; ok { + if scaleDownPercentage, err = strconv.ParseFloat(value, 64); err != nil { + return nil, err + } + } + return &sharding.ShardingStrategy{ + ScaleUpSeriesCount: scaleUpSeriesCount, + ScaleDownPercentage: scaleDownPercentage, + }, nil +} + func readCurrentShardsFromConfig(configMap corev1.ConfigMap) (int, error) { remoteWriteConfig := RemoteWriteConfig{} err := yaml.Unmarshal([]byte(configMap.Data["values"]), &remoteWriteConfig) diff --git a/pkg/monitoring/prometheusagent/service.go b/pkg/monitoring/prometheusagent/service.go index 523b602a..f49bcc21 100644 --- a/pkg/monitoring/prometheusagent/service.go +++ b/pkg/monitoring/prometheusagent/service.go @@ -15,6 +15,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/common/password" + "github.com/giantswarm/observability-operator/pkg/monitoring" ) type PrometheusAgentService struct { @@ -22,14 +23,14 @@ type PrometheusAgentService struct { organization.OrganizationRepository PasswordManager password.Manager common.ManagementCluster - PrometheusVersion string + MonitoringConfig monitoring.Config } // ReconcileRemoteWriteConfiguration ensures that the prometheus remote write config is present in the cluster. func (pas *PrometheusAgentService) ReconcileRemoteWriteConfiguration( ctx context.Context, cluster *clusterv1.Cluster) error { - logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) + logger := log.FromContext(ctx) logger.Info("ensuring prometheus agent remote write configmap and secret") err := pas.createOrUpdateConfigMap(ctx, cluster, logger) @@ -140,7 +141,7 @@ func (pas PrometheusAgentService) createOrUpdateSecret(ctx context.Context, func (pas *PrometheusAgentService) DeleteRemoteWriteConfiguration( ctx context.Context, cluster *clusterv1.Cluster) error { - logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) + logger := log.FromContext(ctx) logger.Info("deleting prometheus agent remote write configmap and secret") err := pas.deleteConfigMap(ctx, cluster) diff --git a/pkg/monitoring/prometheusagent/sharding/sharding.go b/pkg/monitoring/prometheusagent/sharding/sharding.go new file mode 100644 index 00000000..bdf71289 --- /dev/null +++ b/pkg/monitoring/prometheusagent/sharding/sharding.go @@ -0,0 +1,46 @@ +package sharding + +import "math" + +type ShardingStrategy struct { + // Configures the number of series needed to add a new shard. Computation is number of series / ScaleUpSeriesCount + ScaleUpSeriesCount float64 + // Percentage of needed series based on ScaleUpSeriesCount to scale down agents + ScaleDownPercentage float64 +} + +func (pass1 ShardingStrategy) Merge(pass2 *ShardingStrategy) ShardingStrategy { + strategy := ShardingStrategy{ + pass1.ScaleUpSeriesCount, + pass1.ScaleDownPercentage, + } + if pass2 != nil { + if pass2.ScaleUpSeriesCount > 0 { + strategy.ScaleUpSeriesCount = pass2.ScaleUpSeriesCount + } + if pass2.ScaleDownPercentage > 0 { + strategy.ScaleDownPercentage = pass2.ScaleDownPercentage + } + } + return strategy +} + +// We want to start with 1 prometheus-agent for each 1M time series with a scale down 20% threshold. +func (pass ShardingStrategy) ComputeShards(currentShardCount int, timeSeries float64) int { + shardScaleDownThreshold := pass.ScaleDownPercentage * pass.ScaleUpSeriesCount + desiredShardCount := int(math.Ceil(timeSeries / pass.ScaleUpSeriesCount)) + + // Compute Scale Down + if currentShardCount > desiredShardCount { + // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold + if math.Mod(timeSeries, pass.ScaleUpSeriesCount) > pass.ScaleUpSeriesCount-shardScaleDownThreshold { + desiredShardCount = currentShardCount + } + } + + // We always have a minimum of 1 agent, even if there is no worker node + if desiredShardCount <= 0 { + return 1 + } + return desiredShardCount +} diff --git a/pkg/monitoring/prometheusagent/sharding/sharding_test.go b/pkg/monitoring/prometheusagent/sharding/sharding_test.go new file mode 100644 index 00000000..2d3baa25 --- /dev/null +++ b/pkg/monitoring/prometheusagent/sharding/sharding_test.go @@ -0,0 +1,71 @@ +package sharding + +import ( + "testing" +) + +func TestShardComputationScaleUp(t *testing.T) { + pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + + expected := 1 + result := pass.ComputeShards(0, float64(1_000_000)) + if result != expected { + t.Errorf(`expected computeShards(0, 1_000_000) to be %d, got %d`, expected, result) + } + + expected = 2 + result = pass.ComputeShards(0, float64(1_000_001)) + if result != expected { + t.Errorf(`expected computeShards(0, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 3 + result = pass.ComputeShards(0, float64(2_000_001)) + if result != expected { + t.Errorf(`expected computeShards(0, 2_000_001) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { + pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + + expected := 1 + result := pass.ComputeShards(0, 0) + if result != expected { + t.Errorf(`expected computeShards(0, 0) to be %d, got %d`, expected, result) + } + + expected = 1 + result = pass.ComputeShards(0, -5) + if result != expected { + t.Errorf(`expected computeShards(0, -5) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationScaleDown(t *testing.T) { + pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + expected := 2 + result := pass.ComputeShards(1, 1_000_001) + if result != expected { + t.Errorf(`expected computeShards(1, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 2 + result = pass.ComputeShards(2, 999_999) + if result != expected { + t.Errorf(`expected computeShards(2, 999_999) to be %d, got %d`, expected, result) + } + + expected = 2 + result = pass.ComputeShards(2, 800_001) + if result != expected { + t.Errorf(`expected computeShards(2, 800_001) to be %d, got %d`, expected, result) + } + + // threshold hit + expected = 1 + result = pass.ComputeShards(2, 800_000) + if result != expected { + t.Errorf(`expected computeShards(2, 800_000) to be %d, got %d`, expected, result) + } +} diff --git a/pkg/monitoring/prometheusagent/shards/shards.go b/pkg/monitoring/prometheusagent/shards/shards.go deleted file mode 100644 index 25394ff4..00000000 --- a/pkg/monitoring/prometheusagent/shards/shards.go +++ /dev/null @@ -1,28 +0,0 @@ -package shards - -import "math" - -const ( - shardStep = float64(1_000_000) - shardScaleDownPercentage = float64(0.20) - shardScaleDownThreshold = shardScaleDownPercentage * shardStep -) - -// We want to start with 1 prometheus-agent for each 1M time series with a scale down 25% threshold. -func ComputeShards(currentShardCount int, timeSeries float64) int { - desiredShardCount := int(math.Ceil(timeSeries / shardStep)) - - // Compute Scale Down - if currentShardCount > desiredShardCount { - // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold - if math.Mod(timeSeries, shardStep) > shardStep-shardScaleDownThreshold { - desiredShardCount = currentShardCount - } - } - - // We always have a minimum of 1 agent, even if there is no worker node - if desiredShardCount <= 0 { - return 1 - } - return desiredShardCount -} diff --git a/pkg/monitoring/prometheusagent/shards/shards_test.go b/pkg/monitoring/prometheusagent/shards/shards_test.go deleted file mode 100644 index 3043a5ec..00000000 --- a/pkg/monitoring/prometheusagent/shards/shards_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package shards - -import ( - "flag" - "testing" -) - -var _ = flag.Bool("update", false, "update the output file") - -func TestShardComputationScaleUp(t *testing.T) { - expected := 1 - result := ComputeShards(0, float64(1_000_000)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 1_000_000) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(0, float64(1_000_001)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 3 - result = ComputeShards(0, float64(2_000_001)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 2_000_001) to be %d, got %d`, expected, result) - } -} - -func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { - expected := 1 - result := ComputeShards(0, 0) - if result != expected { - t.Errorf(`expected ComputeShards(0, 0) to be %d, got %d`, expected, result) - } - - expected = 1 - result = ComputeShards(0, -5) - if result != expected { - t.Errorf(`expected ComputeShards(0, -5) to be %d, got %d`, expected, result) - } -} - -func TestShardComputationScaleDown(t *testing.T) { - expected := 2 - result := ComputeShards(1, 1_000_001) - if result != expected { - t.Errorf(`expected ComputeShards(1, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(2, 999_999) - if result != expected { - t.Errorf(`expected ComputeShards(2, 999_999) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(2, 800_001) - if result != expected { - t.Errorf(`expected ComputeShards(2, 800_001) to be %d, got %d`, expected, result) - } - - // threshold hit - expected = 1 - result = ComputeShards(2, 800_000) - if result != expected { - t.Errorf(`expected ComputeShards(2, 800_000) to be %d, got %d`, expected, result) - } -} From 37ba8ac91881b72056bb3adec4b572cd25742c04 Mon Sep 17 00:00:00 2001 From: Quentin Bisson Date: Thu, 30 May 2024 14:17:31 +0200 Subject: [PATCH 2/9] Update docs/monitoring/sharding.md --- docs/monitoring/sharding.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/monitoring/sharding.md b/docs/monitoring/sharding.md index 1aafa386..f4817398 100644 --- a/docs/monitoring/sharding.md +++ b/docs/monitoring/sharding.md @@ -1,4 +1,3 @@ - # Prometheus Agent Sharding To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator decides on the number of running __prometheus agent shards__ for each workload cluster based . The number of shards depends on the __total number of time series__ stored for a given cluster. From 8aaff3ac5090eb9e7727ea8d7cdc38a61f4ab1da Mon Sep 17 00:00:00 2001 From: Quentin Bisson Date: Thu, 6 Jun 2024 22:47:23 +0200 Subject: [PATCH 3/9] Update docs/monitoring/sharding.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Théo Brigitte --- docs/monitoring/sharding.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring/sharding.md b/docs/monitoring/sharding.md index f4817398..65be7273 100644 --- a/docs/monitoring/sharding.md +++ b/docs/monitoring/sharding.md @@ -1,6 +1,6 @@ # Prometheus Agent Sharding -To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator decides on the number of running __prometheus agent shards__ for each workload cluster based . The number of shards depends on the __total number of time series__ stored for a given cluster. +To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator chooses the number of running __prometheus agent shards__ on each workload cluster. The number of shards is based on the __total number of time series__ ingested for a given cluster. __By default__, the operator configures a shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%. From b80fb97f7d47f4067dd33eaa5de2a49107ae25d4 Mon Sep 17 00:00:00 2001 From: Quentin Bisson Date: Thu, 6 Jun 2024 22:47:30 +0200 Subject: [PATCH 4/9] Update docs/monitoring/sharding.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Théo Brigitte --- docs/monitoring/sharding.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring/sharding.md b/docs/monitoring/sharding.md index 65be7273..45a3dce0 100644 --- a/docs/monitoring/sharding.md +++ b/docs/monitoring/sharding.md @@ -2,7 +2,7 @@ To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator chooses the number of running __prometheus agent shards__ on each workload cluster. The number of shards is based on the __total number of time series__ ingested for a given cluster. -__By default__, the operator configures a shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%. +__By default__, the operator configures 1 shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%. As this default value was not enough to avoid workload disruptions, we added 2 ways to be able to override the scale up series count target and the scale down percentage. From 718de328821138da4f2836a6a671acd990938f2f Mon Sep 17 00:00:00 2001 From: Quentin Bisson Date: Thu, 6 Jun 2024 22:47:48 +0200 Subject: [PATCH 5/9] Update docs/monitoring/sharding.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Théo Brigitte --- docs/monitoring/sharding.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring/sharding.md b/docs/monitoring/sharding.md index 45a3dce0..4fe5fd53 100644 --- a/docs/monitoring/sharding.md +++ b/docs/monitoring/sharding.md @@ -4,7 +4,7 @@ To be able to ingest metrics without disrupting the workload running in the clus __By default__, the operator configures 1 shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%. -As this default value was not enough to avoid workload disruptions, we added 2 ways to be able to override the scale up series count target and the scale down percentage. +Scale up series threshold and scale down percentage are overridables. 1. Those values can be configured at the installation level by overriding the following values: From b249e1d2190b2f030124fb29b4bfb0ac12f60d04 Mon Sep 17 00:00:00 2001 From: Quentin Bisson Date: Thu, 6 Jun 2024 22:56:17 +0200 Subject: [PATCH 6/9] Update internal/controller/cluster_monitoring_controller.go --- internal/controller/cluster_monitoring_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controller/cluster_monitoring_controller.go b/internal/controller/cluster_monitoring_controller.go index 0aabcad7..c0873033 100644 --- a/internal/controller/cluster_monitoring_controller.go +++ b/internal/controller/cluster_monitoring_controller.go @@ -82,7 +82,7 @@ func (r *ClusterMonitoringReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{}, errors.WithStack(err) } - // Linting is disabled for the 2 following lines as otherwise it fails with the following error: + // Linting is disabled for the following line as otherwise it fails with the following error: // "should not use built-in type string as key for value" logger := log.FromContext(ctx).WithValues("installation", r.ManagementCluster.Name) // nolint ctx = log.IntoContext(ctx, logger) From ca373a575576df6c1afb59b662906bbcf3a4b125 Mon Sep 17 00:00:00 2001 From: Quentin Bisson Date: Mon, 10 Jun 2024 11:58:44 +0200 Subject: [PATCH 7/9] Update pkg/monitoring/prometheusagent/sharding/sharding.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Théo Brigitte --- pkg/monitoring/prometheusagent/sharding/sharding.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/monitoring/prometheusagent/sharding/sharding.go b/pkg/monitoring/prometheusagent/sharding/sharding.go index bdf71289..f462bfb9 100644 --- a/pkg/monitoring/prometheusagent/sharding/sharding.go +++ b/pkg/monitoring/prometheusagent/sharding/sharding.go @@ -32,7 +32,7 @@ func (pass ShardingStrategy) ComputeShards(currentShardCount int, timeSeries flo // Compute Scale Down if currentShardCount > desiredShardCount { - // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold + // Check if the remaining time series from ( timeSeries mod ScaleupSeriesCount ) is bigger than the scale down threshold. if math.Mod(timeSeries, pass.ScaleUpSeriesCount) > pass.ScaleUpSeriesCount-shardScaleDownThreshold { desiredShardCount = currentShardCount } From eeea93714da5ea2b73223b104295104e5499d1eb Mon Sep 17 00:00:00 2001 From: QuentinBisson Date: Mon, 10 Jun 2024 12:02:37 +0200 Subject: [PATCH 8/9] Fix review comments Signed-off-by: QuentinBisson --- docs/monitoring/README.md | 2 +- main.go | 11 ++++++---- pkg/monitoring/config.go | 9 ++++---- pkg/monitoring/prometheusagent/configmap.go | 10 +++------ .../prometheusagent/sharding/sharding.go | 22 +++++++++---------- .../prometheusagent/sharding/sharding_test.go | 6 ++--- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/docs/monitoring/README.md b/docs/monitoring/README.md index fab5de0a..7323afe8 100644 --- a/docs/monitoring/README.md +++ b/docs/monitoring/README.md @@ -2,4 +2,4 @@ The Observability Operator is in charge of configuring the Prometheus Agent instances running in workload clusters like remote write configuration, [sharding](sharding.md) and so on. -TODO(atlas): Add documentation here. +TODO(atlas): Add operator specific documentation here ("sequence diagrams", list of created and managed resources) diff --git a/main.go b/main.go index 61b7c53b..b46647cf 100644 --- a/main.go +++ b/main.go @@ -45,6 +45,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" "github.com/giantswarm/observability-operator/pkg/monitoring/mimir" "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding" //+kubebuilder:scaffold:imports ) @@ -197,10 +198,12 @@ func main() { organizationRepository := organization.NewNamespaceRepository(mgr.GetClient()) monitoringConfig := monitoring.Config{ - Enabled: monitoringEnabled, - ShardingScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount, - ShardingScaleDownPercentage: monitoringShardingScaleDownPercentage, - PrometheusVersion: prometheusVersion, + Enabled: monitoringEnabled, + DefaultShardingStrategy: sharding.Strategy{ + ScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount, + ScaleDownPercentage: monitoringShardingScaleDownPercentage, + }, + PrometheusVersion: prometheusVersion, } prometheusAgentService := prometheusagent.PrometheusAgentService{ diff --git a/pkg/monitoring/config.go b/pkg/monitoring/config.go index e1427c17..77ad1477 100644 --- a/pkg/monitoring/config.go +++ b/pkg/monitoring/config.go @@ -1,10 +1,11 @@ package monitoring +import "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding" + // Config represents the configuration used by the monitoring package. type Config struct { - Enabled bool - ShardingScaleUpSeriesCount float64 - ShardingScaleDownPercentage float64 - // TODO(atlas): validate prometheus version + Enabled bool + DefaultShardingStrategy sharding.Strategy + // TODO(atlas): validate prometheus version using SemVer PrometheusVersion string } diff --git a/pkg/monitoring/prometheusagent/configmap.go b/pkg/monitoring/prometheusagent/configmap.go index 3396705b..9e577a72 100644 --- a/pkg/monitoring/prometheusagent/configmap.go +++ b/pkg/monitoring/prometheusagent/configmap.go @@ -57,11 +57,7 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, return nil, errors.WithStack(err) } - shardingStrategy := sharding.ShardingStrategy{ - ScaleUpSeriesCount: pas.MonitoringConfig.ShardingScaleUpSeriesCount, - ScaleDownPercentage: pas.MonitoringConfig.ShardingScaleDownPercentage, - }.Merge(clusterShardingStrategy) - + shardingStrategy := pas.MonitoringConfig.DefaultShardingStrategy.Merge(clusterShardingStrategy) shards := shardingStrategy.ComputeShards(currentShards, headSeries) config, err := yaml.Marshal(RemoteWriteConfig{ @@ -106,7 +102,7 @@ func getServicePriority(cluster *clusterv1.Cluster) string { return defaultServicePriority } -func getClusterShardingStrategy(cluster metav1.Object) (*sharding.ShardingStrategy, error) { +func getClusterShardingStrategy(cluster metav1.Object) (*sharding.Strategy, error) { var err error var scaleUpSeriesCount, scaleDownPercentage float64 if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-up-series-count"]; ok { @@ -119,7 +115,7 @@ func getClusterShardingStrategy(cluster metav1.Object) (*sharding.ShardingStrate return nil, err } } - return &sharding.ShardingStrategy{ + return &sharding.Strategy{ ScaleUpSeriesCount: scaleUpSeriesCount, ScaleDownPercentage: scaleDownPercentage, }, nil diff --git a/pkg/monitoring/prometheusagent/sharding/sharding.go b/pkg/monitoring/prometheusagent/sharding/sharding.go index f462bfb9..ffaa0845 100644 --- a/pkg/monitoring/prometheusagent/sharding/sharding.go +++ b/pkg/monitoring/prometheusagent/sharding/sharding.go @@ -2,31 +2,31 @@ package sharding import "math" -type ShardingStrategy struct { +type Strategy struct { // Configures the number of series needed to add a new shard. Computation is number of series / ScaleUpSeriesCount ScaleUpSeriesCount float64 // Percentage of needed series based on ScaleUpSeriesCount to scale down agents ScaleDownPercentage float64 } -func (pass1 ShardingStrategy) Merge(pass2 *ShardingStrategy) ShardingStrategy { - strategy := ShardingStrategy{ - pass1.ScaleUpSeriesCount, - pass1.ScaleDownPercentage, +func (s Strategy) Merge(newStrategy *Strategy) Strategy { + strategy := Strategy{ + s.ScaleUpSeriesCount, + s.ScaleDownPercentage, } - if pass2 != nil { - if pass2.ScaleUpSeriesCount > 0 { - strategy.ScaleUpSeriesCount = pass2.ScaleUpSeriesCount + if newStrategy != nil { + if newStrategy.ScaleUpSeriesCount > 0 { + strategy.ScaleUpSeriesCount = newStrategy.ScaleUpSeriesCount } - if pass2.ScaleDownPercentage > 0 { - strategy.ScaleDownPercentage = pass2.ScaleDownPercentage + if newStrategy.ScaleDownPercentage > 0 { + strategy.ScaleDownPercentage = newStrategy.ScaleDownPercentage } } return strategy } // We want to start with 1 prometheus-agent for each 1M time series with a scale down 20% threshold. -func (pass ShardingStrategy) ComputeShards(currentShardCount int, timeSeries float64) int { +func (pass Strategy) ComputeShards(currentShardCount int, timeSeries float64) int { shardScaleDownThreshold := pass.ScaleDownPercentage * pass.ScaleUpSeriesCount desiredShardCount := int(math.Ceil(timeSeries / pass.ScaleUpSeriesCount)) diff --git a/pkg/monitoring/prometheusagent/sharding/sharding_test.go b/pkg/monitoring/prometheusagent/sharding/sharding_test.go index 2d3baa25..479aa8bc 100644 --- a/pkg/monitoring/prometheusagent/sharding/sharding_test.go +++ b/pkg/monitoring/prometheusagent/sharding/sharding_test.go @@ -5,7 +5,7 @@ import ( ) func TestShardComputationScaleUp(t *testing.T) { - pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} expected := 1 result := pass.ComputeShards(0, float64(1_000_000)) @@ -27,7 +27,7 @@ func TestShardComputationScaleUp(t *testing.T) { } func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { - pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} expected := 1 result := pass.ComputeShards(0, 0) @@ -43,7 +43,7 @@ func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { } func TestShardComputationScaleDown(t *testing.T) { - pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} expected := 2 result := pass.ComputeShards(1, 1_000_001) if result != expected { From f61bb41c06f71ed5b9aa84ce312b9988bdd3605e Mon Sep 17 00:00:00 2001 From: QuentinBisson Date: Sat, 22 Jun 2024 15:01:35 +0200 Subject: [PATCH 9/9] improve test logic using table driven testing Signed-off-by: QuentinBisson --- .../prometheusagent/sharding/sharding.go | 2 +- .../prometheusagent/sharding/sharding_test.go | 147 +++++++++++------- 2 files changed, 90 insertions(+), 59 deletions(-) diff --git a/pkg/monitoring/prometheusagent/sharding/sharding.go b/pkg/monitoring/prometheusagent/sharding/sharding.go index ffaa0845..9b0eae3a 100644 --- a/pkg/monitoring/prometheusagent/sharding/sharding.go +++ b/pkg/monitoring/prometheusagent/sharding/sharding.go @@ -32,7 +32,7 @@ func (pass Strategy) ComputeShards(currentShardCount int, timeSeries float64) in // Compute Scale Down if currentShardCount > desiredShardCount { - // Check if the remaining time series from ( timeSeries mod ScaleupSeriesCount ) is bigger than the scale down threshold. + // Check if the remainder of (timeSeries mod ScaleupSeriesCount) is bigger than the scale down threshold. if math.Mod(timeSeries, pass.ScaleUpSeriesCount) > pass.ScaleUpSeriesCount-shardScaleDownThreshold { desiredShardCount = currentShardCount } diff --git a/pkg/monitoring/prometheusagent/sharding/sharding_test.go b/pkg/monitoring/prometheusagent/sharding/sharding_test.go index 479aa8bc..007c6cda 100644 --- a/pkg/monitoring/prometheusagent/sharding/sharding_test.go +++ b/pkg/monitoring/prometheusagent/sharding/sharding_test.go @@ -4,68 +4,99 @@ import ( "testing" ) -func TestShardComputationScaleUp(t *testing.T) { - pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} - - expected := 1 - result := pass.ComputeShards(0, float64(1_000_000)) - if result != expected { - t.Errorf(`expected computeShards(0, 1_000_000) to be %d, got %d`, expected, result) - } - - expected = 2 - result = pass.ComputeShards(0, float64(1_000_001)) - if result != expected { - t.Errorf(`expected computeShards(0, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 3 - result = pass.ComputeShards(0, float64(2_000_001)) - if result != expected { - t.Errorf(`expected computeShards(0, 2_000_001) to be %d, got %d`, expected, result) - } +type testCase struct { + currentShardCount int + timeSeries float64 + expected int } -func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { - pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} - - expected := 1 - result := pass.ComputeShards(0, 0) - if result != expected { - t.Errorf(`expected computeShards(0, 0) to be %d, got %d`, expected, result) - } +var defaultShardingStrategy = Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} - expected = 1 - result = pass.ComputeShards(0, -5) - if result != expected { - t.Errorf(`expected computeShards(0, -5) to be %d, got %d`, expected, result) - } +var tests = []struct { + name string + strategy Strategy + cases []testCase +}{ + { + name: "scale up", + strategy: defaultShardingStrategy, + cases: []testCase{ + { + currentShardCount: 0, + timeSeries: float64(1_000_000), + expected: 1, + }, + { + currentShardCount: 0, + timeSeries: float64(1_000_001), + expected: 2, + }, + { + currentShardCount: 0, + timeSeries: float64(2_000_001), + expected: 3, + }, + }, + }, + { + name: "scale down", + strategy: defaultShardingStrategy, + cases: []testCase{ + { + currentShardCount: 1, + timeSeries: float64(1_000_001), + expected: 2, + }, + { + currentShardCount: 2, + timeSeries: float64(999_999), + expected: 2, + }, + { + currentShardCount: 2, + timeSeries: float64(800_001), + expected: 2, + }, + { + currentShardCount: 2, + // 20% default threshold hit + timeSeries: float64(800_000), + expected: 1, + }, + }, + }, + { + name: "always defaults to 1", + strategy: defaultShardingStrategy, + cases: []testCase{ + { + currentShardCount: 0, + timeSeries: float64(0), + expected: 1, + }, + { + currentShardCount: 0, + timeSeries: float64(-5), + expected: 1, + }, + }, + }, } -func TestShardComputationScaleDown(t *testing.T) { - pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} - expected := 2 - result := pass.ComputeShards(1, 1_000_001) - if result != expected { - t.Errorf(`expected computeShards(1, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 2 - result = pass.ComputeShards(2, 999_999) - if result != expected { - t.Errorf(`expected computeShards(2, 999_999) to be %d, got %d`, expected, result) - } - - expected = 2 - result = pass.ComputeShards(2, 800_001) - if result != expected { - t.Errorf(`expected computeShards(2, 800_001) to be %d, got %d`, expected, result) - } - - // threshold hit - expected = 1 - result = pass.ComputeShards(2, 800_000) - if result != expected { - t.Errorf(`expected computeShards(2, 800_000) to be %d, got %d`, expected, result) +func TestShardComputationLogic(t *testing.T) { + t.Parallel() + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + for _, c := range tt.cases { + c := c + result := tt.strategy.ComputeShards(c.currentShardCount, c.timeSeries) + if result != c.expected { + t.Errorf(`expected computeShards(%d, %f) to be %d, got %d`, c.currentShardCount, c.timeSeries, c.expected, result) + } + t.Logf(`computeShards(%d, %f) = %d`, c.currentShardCount, c.timeSeries, result) + } + }) } }