From 588c04b30192d145efd9b89058f5c93ac81a53b9 Mon Sep 17 00:00:00 2001 From: QuentinBisson Date: Mon, 13 May 2024 14:16:43 +0200 Subject: [PATCH] Add sharding strategy support Signed-off-by: QuentinBisson --- CHANGELOG.md | 4 ++ docs/monitoring/README.md | 5 ++ docs/monitoring/sharding.md | 24 +++++++ .../templates/deployment.yaml | 2 + helm/observability-operator/values.yaml | 3 + .../cluster_monitoring_controller.go | 6 +- main.go | 35 ++++++--- pkg/monitoring/config.go | 10 +++ .../{config.go => configmap.go} | 43 +++++++++-- pkg/monitoring/prometheusagent/service.go | 2 +- .../prometheusagent/sharding/sharding.go | 46 ++++++++++++ .../prometheusagent/sharding/sharding_test.go | 71 +++++++++++++++++++ .../prometheusagent/shards/shards.go | 28 -------- .../prometheusagent/shards/shards_test.go | 69 ------------------ 14 files changed, 230 insertions(+), 118 deletions(-) create mode 100644 docs/monitoring/README.md create mode 100644 docs/monitoring/sharding.md create mode 100644 pkg/monitoring/config.go rename pkg/monitoring/prometheusagent/{config.go => configmap.go} (68%) create mode 100644 pkg/monitoring/prometheusagent/sharding/sharding.go create mode 100644 pkg/monitoring/prometheusagent/sharding/sharding_test.go delete mode 100644 pkg/monitoring/prometheusagent/shards/shards.go delete mode 100644 pkg/monitoring/prometheusagent/shards/shards_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d1d7b46..afff45db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add per cluster and installation overridable sharding strategy support for mimir-backed installations. + ## [0.0.2] - 2024-04-08 ### Fixed diff --git a/docs/monitoring/README.md b/docs/monitoring/README.md new file mode 100644 index 00000000..fab5de0a --- /dev/null +++ b/docs/monitoring/README.md @@ -0,0 +1,5 @@ +# Introduction + +The Observability Operator is in charge of configuring the Prometheus Agent instances running in workload clusters like remote write configuration, [sharding](sharding.md) and so on. + +TODO(atlas): Add documentation here. diff --git a/docs/monitoring/sharding.md b/docs/monitoring/sharding.md new file mode 100644 index 00000000..1aafa386 --- /dev/null +++ b/docs/monitoring/sharding.md @@ -0,0 +1,24 @@ + +# Prometheus Agent Sharding + +To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator decides on the number of running __prometheus agent shards__ for each workload cluster based . The number of shards depends on the __total number of time series__ stored for a given cluster. + +__By default__, the operator configures a shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%. + +As this default value was not enough to avoid workload disruptions, we added 2 ways to be able to override the scale up series count target and the scale down percentage. + +1. Those values can be configured at the installation level by overriding the following values: + +```yaml +monitoring: + sharding: + scaleUpSeriesCount: 1000000 + scaleDownPercentage: 0.20 +``` + +2. Those values can also be set per cluster using the following cluster annotations: + +```yaml +monitoring.giantswarm.io/prometheus-agent-scale-up-series-count: 1000000 +monitoring.giantswarm.io/prometheus-agent-scale-down-percentage: 0.20 +``` diff --git a/helm/observability-operator/templates/deployment.yaml b/helm/observability-operator/templates/deployment.yaml index d21c7ab8..9b9d1e42 100644 --- a/helm/observability-operator/templates/deployment.yaml +++ b/helm/observability-operator/templates/deployment.yaml @@ -31,6 +31,8 @@ spec: - --management-cluster-pipeline={{ $.Values.managementCluster.pipeline }} - --management-cluster-region={{ $.Values.managementCluster.region }} - --monitoring-enabled={{ $.Values.monitoring.enabled }} + - --monitoring-sharding-scale-up-series-count={{ $.Values.monitoring.sharding.scaleUpSeriesCount }} + - --monitoring-sharding-scale-down-percentage={{ $.Values.monitoring.sharding.scaleDownPercentage }} {{- if .Values.monitoring.prometheusVersion }} - --prometheus-version={{ $.Values.monitoring.prometheusVersion }} {{- end }} diff --git a/helm/observability-operator/values.yaml b/helm/observability-operator/values.yaml index 2d4231e0..87e69e3f 100644 --- a/helm/observability-operator/values.yaml +++ b/helm/observability-operator/values.yaml @@ -19,6 +19,9 @@ monitoring: enabled: false opsgenieApiKey: "" prometheusVersion: "" + sharding: + scaleUpSeriesCount: 1000000 + scaleDownPercentage: 0.20 operator: # -- Configures the resources for the operator deployment diff --git a/internal/controller/cluster_monitoring_controller.go b/internal/controller/cluster_monitoring_controller.go index ba96b33b..9567e73c 100644 --- a/internal/controller/cluster_monitoring_controller.go +++ b/internal/controller/cluster_monitoring_controller.go @@ -44,8 +44,8 @@ type ClusterMonitoringReconciler struct { prometheusagent.PrometheusAgentService // HeartbeatRepository is the repository for managing heartbeats. heartbeat.HeartbeatRepository - // MonitoringEnabled defines whether monitoring is enabled at the installation level. - MonitoringEnabled bool + // MonitoringConfig is the configuration for the monitoring package. + MonitoringConfig monitoring.Config } // SetupWithManager sets up the controller with the Manager. @@ -81,7 +81,7 @@ func (r *ClusterMonitoringReconciler) Reconcile(ctx context.Context, req ctrl.Re } // Handle deletion reconciliation loop. - if !cluster.ObjectMeta.DeletionTimestamp.IsZero() || !r.MonitoringEnabled { + if !cluster.ObjectMeta.DeletionTimestamp.IsZero() || !r.MonitoringConfig.Enabled { logger.Info("Handling deletion for Cluster", "cluster", cluster.Name) return r.reconcileDelete(ctx, cluster) } diff --git a/main.go b/main.go index 1d40ecd8..5deaeee7 100644 --- a/main.go +++ b/main.go @@ -41,6 +41,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/common/password" + "github.com/giantswarm/observability-operator/pkg/monitoring" "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent" //+kubebuilder:scaffold:imports @@ -50,19 +51,23 @@ var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") - metricsAddr string - enableLeaderElection bool - probeAddr string - secureMetrics bool - enableHTTP2 bool + metricsAddr string + enableLeaderElection bool + probeAddr string + secureMetrics bool + enableHTTP2 bool + managementClusterBaseDomain string managementClusterCustomer string managementClusterInsecureCA bool managementClusterName string managementClusterPipeline string managementClusterRegion string - monitoringEnabled bool - prometheusVersion string + + monitoringEnabled bool + monitoringShardingScaleUpSeriesCount float64 + monitoringShardingScaleDownPercentage float64 + prometheusVersion string ) const ( @@ -102,6 +107,10 @@ func main() { "The region of the management cluster.") flag.BoolVar(&monitoringEnabled, "monitoring-enabled", false, "Enable monitoring at the management cluster level.") + flag.Float64Var(&monitoringShardingScaleUpSeriesCount, "monitoring-sharding-scale-up-series-count", 0, + "Configures the number of time series needed to add an extra prometheus agent shard.") + flag.Float64Var(&monitoringShardingScaleDownPercentage, "monitoring-sharding-scale-down-percentage", 0, + "Configures the percentage of series that need to be removed to be able to scale down the number of prometheus agent shards.") flag.StringVar(&prometheusVersion, "prometheus-version", "", "The version of Prometheus Agents to deploy.") opts := zap.Options{ @@ -186,13 +195,19 @@ func main() { organizationRepository := organization.NewNamespaceRepository(mgr.GetClient()) - // TODO(atlas): validate prometheus version + monitoringConfig := monitoring.Config{ + Enabled: monitoringEnabled, + ShardingScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount, + ShardingScaleDownPercentage: monitoringShardingScaleDownPercentage, + PrometheusVersion: prometheusVersion, + } + prometheusAgentService := prometheusagent.PrometheusAgentService{ Client: mgr.GetClient(), OrganizationRepository: organizationRepository, PasswordManager: password.SimpleManager{}, ManagementCluster: managementCluster, - PrometheusVersion: prometheusVersion, + MonitoringConfig: monitoringConfig, } if err = (&controller.ClusterMonitoringReconciler{ @@ -200,7 +215,7 @@ func main() { ManagementCluster: managementCluster, HeartbeatRepository: heartbeatRepository, PrometheusAgentService: prometheusAgentService, - MonitoringEnabled: monitoringEnabled, + MonitoringConfig: monitoringConfig, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/pkg/monitoring/config.go b/pkg/monitoring/config.go new file mode 100644 index 00000000..e1427c17 --- /dev/null +++ b/pkg/monitoring/config.go @@ -0,0 +1,10 @@ +package monitoring + +// Config represents the configuration used by the monitoring package. +type Config struct { + Enabled bool + ShardingScaleUpSeriesCount float64 + ShardingScaleDownPercentage float64 + // TODO(atlas): validate prometheus version + PrometheusVersion string +} diff --git a/pkg/monitoring/prometheusagent/config.go b/pkg/monitoring/prometheusagent/configmap.go similarity index 68% rename from pkg/monitoring/prometheusagent/config.go rename to pkg/monitoring/prometheusagent/configmap.go index d6890521..879d7753 100644 --- a/pkg/monitoring/prometheusagent/config.go +++ b/pkg/monitoring/prometheusagent/configmap.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "strconv" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -15,7 +16,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/monitoring" "github.com/giantswarm/observability-operator/pkg/monitoring/mimir/querier" - "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/shards" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding" ) func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, @@ -45,7 +46,7 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, "service_priority": getServicePriority(cluster), } - shards, err := getShardsCountForCluster(ctx, cluster, currentShards) + shards, err := pas.getShardsCountForCluster(ctx, cluster, currentShards) if err != nil { return nil, errors.WithStack(err) } @@ -54,10 +55,10 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, PrometheusAgentConfig: &PrometheusAgentConfig{ ExternalLabels: externalLabels, Image: &PrometheusAgentImage{ - Tag: pas.PrometheusVersion, + Tag: pas.MonitoringConfig.PrometheusVersion, }, Shards: shards, - Version: pas.PrometheusVersion, + Version: pas.MonitoringConfig.PrometheusVersion, }, }) if err != nil { @@ -96,17 +97,45 @@ func getServicePriority(cluster *clusterv1.Cluster) string { } // We want to compute the number of shards based on the number of series. -func getShardsCountForCluster(ctx context.Context, cluster *clusterv1.Cluster, currentShardCount int) (int, error) { +func (pas PrometheusAgentService) getShardsCountForCluster(ctx context.Context, cluster *clusterv1.Cluster, currentShardCount int) (int, error) { + clusterShardingStrategy, err := getClusterShardingStrategy(cluster) + if err != nil { + return 0, errors.WithStack(err) + } + + shardingStrategy := sharding.ShardingStrategy{ + ScaleUpSeriesCount: pas.MonitoringConfig.ShardingScaleUpSeriesCount, + ScaleDownPercentage: pas.MonitoringConfig.ShardingScaleDownPercentage, + }.Merge(clusterShardingStrategy) headSeries, err := querier.QueryTSDBHeadSeries(ctx, cluster.Name) if err != nil { // Verify that Prometheus is accessible. If not, return the default number of shards. var dnsError *net.DNSError if errors.As(err, &dnsError) { - return shards.ComputeShards(currentShardCount, defaultShards), nil + return shardingStrategy.ComputeShards(currentShardCount, defaultShards), nil } return 0, errors.WithStack(err) } - return shards.ComputeShards(currentShardCount, headSeries), nil + return shardingStrategy.ComputeShards(currentShardCount, headSeries), nil +} + +func getClusterShardingStrategy(cluster metav1.Object) (*sharding.ShardingStrategy, error) { + var err error + var scaleUpSeriesCount, scaleDownPercentage float64 + if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-up-series-count"]; ok { + if scaleUpSeriesCount, err = strconv.ParseFloat(value, 64); err != nil { + return nil, err + } + } + if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-down-percentage"]; ok { + if scaleDownPercentage, err = strconv.ParseFloat(value, 64); err != nil { + return nil, err + } + } + return &sharding.ShardingStrategy{ + ScaleUpSeriesCount: scaleUpSeriesCount, + ScaleDownPercentage: scaleDownPercentage, + }, nil } func readCurrentShardsFromConfig(configMap corev1.ConfigMap) (int, error) { diff --git a/pkg/monitoring/prometheusagent/service.go b/pkg/monitoring/prometheusagent/service.go index 4eb5b9a4..1329116d 100644 --- a/pkg/monitoring/prometheusagent/service.go +++ b/pkg/monitoring/prometheusagent/service.go @@ -24,7 +24,7 @@ type PrometheusAgentService struct { organization.OrganizationRepository PasswordManager password.Manager common.ManagementCluster - PrometheusVersion string + MonitoringConfig monitoring.Config } // ReconcileRemoteWriteConfiguration ensures that the prometheus remote write config is present in the cluster. diff --git a/pkg/monitoring/prometheusagent/sharding/sharding.go b/pkg/monitoring/prometheusagent/sharding/sharding.go new file mode 100644 index 00000000..bdf71289 --- /dev/null +++ b/pkg/monitoring/prometheusagent/sharding/sharding.go @@ -0,0 +1,46 @@ +package sharding + +import "math" + +type ShardingStrategy struct { + // Configures the number of series needed to add a new shard. Computation is number of series / ScaleUpSeriesCount + ScaleUpSeriesCount float64 + // Percentage of needed series based on ScaleUpSeriesCount to scale down agents + ScaleDownPercentage float64 +} + +func (pass1 ShardingStrategy) Merge(pass2 *ShardingStrategy) ShardingStrategy { + strategy := ShardingStrategy{ + pass1.ScaleUpSeriesCount, + pass1.ScaleDownPercentage, + } + if pass2 != nil { + if pass2.ScaleUpSeriesCount > 0 { + strategy.ScaleUpSeriesCount = pass2.ScaleUpSeriesCount + } + if pass2.ScaleDownPercentage > 0 { + strategy.ScaleDownPercentage = pass2.ScaleDownPercentage + } + } + return strategy +} + +// We want to start with 1 prometheus-agent for each 1M time series with a scale down 20% threshold. +func (pass ShardingStrategy) ComputeShards(currentShardCount int, timeSeries float64) int { + shardScaleDownThreshold := pass.ScaleDownPercentage * pass.ScaleUpSeriesCount + desiredShardCount := int(math.Ceil(timeSeries / pass.ScaleUpSeriesCount)) + + // Compute Scale Down + if currentShardCount > desiredShardCount { + // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold + if math.Mod(timeSeries, pass.ScaleUpSeriesCount) > pass.ScaleUpSeriesCount-shardScaleDownThreshold { + desiredShardCount = currentShardCount + } + } + + // We always have a minimum of 1 agent, even if there is no worker node + if desiredShardCount <= 0 { + return 1 + } + return desiredShardCount +} diff --git a/pkg/monitoring/prometheusagent/sharding/sharding_test.go b/pkg/monitoring/prometheusagent/sharding/sharding_test.go new file mode 100644 index 00000000..2d3baa25 --- /dev/null +++ b/pkg/monitoring/prometheusagent/sharding/sharding_test.go @@ -0,0 +1,71 @@ +package sharding + +import ( + "testing" +) + +func TestShardComputationScaleUp(t *testing.T) { + pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + + expected := 1 + result := pass.ComputeShards(0, float64(1_000_000)) + if result != expected { + t.Errorf(`expected computeShards(0, 1_000_000) to be %d, got %d`, expected, result) + } + + expected = 2 + result = pass.ComputeShards(0, float64(1_000_001)) + if result != expected { + t.Errorf(`expected computeShards(0, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 3 + result = pass.ComputeShards(0, float64(2_000_001)) + if result != expected { + t.Errorf(`expected computeShards(0, 2_000_001) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { + pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + + expected := 1 + result := pass.ComputeShards(0, 0) + if result != expected { + t.Errorf(`expected computeShards(0, 0) to be %d, got %d`, expected, result) + } + + expected = 1 + result = pass.ComputeShards(0, -5) + if result != expected { + t.Errorf(`expected computeShards(0, -5) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationScaleDown(t *testing.T) { + pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)} + expected := 2 + result := pass.ComputeShards(1, 1_000_001) + if result != expected { + t.Errorf(`expected computeShards(1, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 2 + result = pass.ComputeShards(2, 999_999) + if result != expected { + t.Errorf(`expected computeShards(2, 999_999) to be %d, got %d`, expected, result) + } + + expected = 2 + result = pass.ComputeShards(2, 800_001) + if result != expected { + t.Errorf(`expected computeShards(2, 800_001) to be %d, got %d`, expected, result) + } + + // threshold hit + expected = 1 + result = pass.ComputeShards(2, 800_000) + if result != expected { + t.Errorf(`expected computeShards(2, 800_000) to be %d, got %d`, expected, result) + } +} diff --git a/pkg/monitoring/prometheusagent/shards/shards.go b/pkg/monitoring/prometheusagent/shards/shards.go deleted file mode 100644 index 25394ff4..00000000 --- a/pkg/monitoring/prometheusagent/shards/shards.go +++ /dev/null @@ -1,28 +0,0 @@ -package shards - -import "math" - -const ( - shardStep = float64(1_000_000) - shardScaleDownPercentage = float64(0.20) - shardScaleDownThreshold = shardScaleDownPercentage * shardStep -) - -// We want to start with 1 prometheus-agent for each 1M time series with a scale down 25% threshold. -func ComputeShards(currentShardCount int, timeSeries float64) int { - desiredShardCount := int(math.Ceil(timeSeries / shardStep)) - - // Compute Scale Down - if currentShardCount > desiredShardCount { - // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold - if math.Mod(timeSeries, shardStep) > shardStep-shardScaleDownThreshold { - desiredShardCount = currentShardCount - } - } - - // We always have a minimum of 1 agent, even if there is no worker node - if desiredShardCount <= 0 { - return 1 - } - return desiredShardCount -} diff --git a/pkg/monitoring/prometheusagent/shards/shards_test.go b/pkg/monitoring/prometheusagent/shards/shards_test.go deleted file mode 100644 index 3043a5ec..00000000 --- a/pkg/monitoring/prometheusagent/shards/shards_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package shards - -import ( - "flag" - "testing" -) - -var _ = flag.Bool("update", false, "update the output file") - -func TestShardComputationScaleUp(t *testing.T) { - expected := 1 - result := ComputeShards(0, float64(1_000_000)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 1_000_000) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(0, float64(1_000_001)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 3 - result = ComputeShards(0, float64(2_000_001)) - if result != expected { - t.Errorf(`expected ComputeShards(0, 2_000_001) to be %d, got %d`, expected, result) - } -} - -func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { - expected := 1 - result := ComputeShards(0, 0) - if result != expected { - t.Errorf(`expected ComputeShards(0, 0) to be %d, got %d`, expected, result) - } - - expected = 1 - result = ComputeShards(0, -5) - if result != expected { - t.Errorf(`expected ComputeShards(0, -5) to be %d, got %d`, expected, result) - } -} - -func TestShardComputationScaleDown(t *testing.T) { - expected := 2 - result := ComputeShards(1, 1_000_001) - if result != expected { - t.Errorf(`expected ComputeShards(1, 1_000_001) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(2, 999_999) - if result != expected { - t.Errorf(`expected ComputeShards(2, 999_999) to be %d, got %d`, expected, result) - } - - expected = 2 - result = ComputeShards(2, 800_001) - if result != expected { - t.Errorf(`expected ComputeShards(2, 800_001) to be %d, got %d`, expected, result) - } - - // threshold hit - expected = 1 - result = ComputeShards(2, 800_000) - if result != expected { - t.Errorf(`expected ComputeShards(2, 800_000) to be %d, got %d`, expected, result) - } -}