Skip to content

Commit

Permalink
Add sharding strategy support
Browse files Browse the repository at this point in the history
Signed-off-by: QuentinBisson <[email protected]>
  • Loading branch information
QuentinBisson committed May 13, 2024
1 parent 80458ef commit 588c04b
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 118 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add per cluster and installation overridable sharding strategy support for mimir-backed installations.

## [0.0.2] - 2024-04-08

### Fixed
Expand Down
5 changes: 5 additions & 0 deletions docs/monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Introduction

The Observability Operator is in charge of configuring the Prometheus Agent instances running in workload clusters like remote write configuration, [sharding](sharding.md) and so on.

TODO(atlas): Add documentation here.
24 changes: 24 additions & 0 deletions docs/monitoring/sharding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

# Prometheus Agent Sharding

To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator decides on the number of running __prometheus agent shards__ for each workload cluster based . The number of shards depends on the __total number of time series__ stored for a given cluster.

__By default__, the operator configures a shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%.

As this default value was not enough to avoid workload disruptions, we added 2 ways to be able to override the scale up series count target and the scale down percentage.

1. Those values can be configured at the installation level by overriding the following values:

```yaml
monitoring:
sharding:
scaleUpSeriesCount: 1000000
scaleDownPercentage: 0.20
```
2. Those values can also be set per cluster using the following cluster annotations:
```yaml
monitoring.giantswarm.io/prometheus-agent-scale-up-series-count: 1000000
monitoring.giantswarm.io/prometheus-agent-scale-down-percentage: 0.20
```
2 changes: 2 additions & 0 deletions helm/observability-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ spec:
- --management-cluster-pipeline={{ $.Values.managementCluster.pipeline }}
- --management-cluster-region={{ $.Values.managementCluster.region }}
- --monitoring-enabled={{ $.Values.monitoring.enabled }}
- --monitoring-sharding-scale-up-series-count={{ $.Values.monitoring.sharding.scaleUpSeriesCount }}
- --monitoring-sharding-scale-down-percentage={{ $.Values.monitoring.sharding.scaleDownPercentage }}
{{- if .Values.monitoring.prometheusVersion }}
- --prometheus-version={{ $.Values.monitoring.prometheusVersion }}
{{- end }}
Expand Down
3 changes: 3 additions & 0 deletions helm/observability-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ monitoring:
enabled: false
opsgenieApiKey: ""
prometheusVersion: ""
sharding:
scaleUpSeriesCount: 1000000
scaleDownPercentage: 0.20

operator:
# -- Configures the resources for the operator deployment
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/cluster_monitoring_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ClusterMonitoringReconciler struct {
prometheusagent.PrometheusAgentService
// HeartbeatRepository is the repository for managing heartbeats.
heartbeat.HeartbeatRepository
// MonitoringEnabled defines whether monitoring is enabled at the installation level.
MonitoringEnabled bool
// MonitoringConfig is the configuration for the monitoring package.
MonitoringConfig monitoring.Config
}

// SetupWithManager sets up the controller with the Manager.
Expand Down Expand Up @@ -81,7 +81,7 @@ func (r *ClusterMonitoringReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

// Handle deletion reconciliation loop.
if !cluster.ObjectMeta.DeletionTimestamp.IsZero() || !r.MonitoringEnabled {
if !cluster.ObjectMeta.DeletionTimestamp.IsZero() || !r.MonitoringConfig.Enabled {
logger.Info("Handling deletion for Cluster", "cluster", cluster.Name)
return r.reconcileDelete(ctx, cluster)
}
Expand Down
35 changes: 25 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/giantswarm/observability-operator/pkg/common"
"github.com/giantswarm/observability-operator/pkg/common/organization"
"github.com/giantswarm/observability-operator/pkg/common/password"
"github.com/giantswarm/observability-operator/pkg/monitoring"
"github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent"
//+kubebuilder:scaffold:imports
Expand All @@ -50,19 +51,23 @@ var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")

metricsAddr string
enableLeaderElection bool
probeAddr string
secureMetrics bool
enableHTTP2 bool
metricsAddr string
enableLeaderElection bool
probeAddr string
secureMetrics bool
enableHTTP2 bool

managementClusterBaseDomain string
managementClusterCustomer string
managementClusterInsecureCA bool
managementClusterName string
managementClusterPipeline string
managementClusterRegion string
monitoringEnabled bool
prometheusVersion string

monitoringEnabled bool
monitoringShardingScaleUpSeriesCount float64
monitoringShardingScaleDownPercentage float64
prometheusVersion string
)

const (
Expand Down Expand Up @@ -102,6 +107,10 @@ func main() {
"The region of the management cluster.")
flag.BoolVar(&monitoringEnabled, "monitoring-enabled", false,
"Enable monitoring at the management cluster level.")
flag.Float64Var(&monitoringShardingScaleUpSeriesCount, "monitoring-sharding-scale-up-series-count", 0,
"Configures the number of time series needed to add an extra prometheus agent shard.")
flag.Float64Var(&monitoringShardingScaleDownPercentage, "monitoring-sharding-scale-down-percentage", 0,
"Configures the percentage of series that need to be removed to be able to scale down the number of prometheus agent shards.")
flag.StringVar(&prometheusVersion, "prometheus-version", "",
"The version of Prometheus Agents to deploy.")
opts := zap.Options{
Expand Down Expand Up @@ -186,21 +195,27 @@ func main() {

organizationRepository := organization.NewNamespaceRepository(mgr.GetClient())

// TODO(atlas): validate prometheus version
monitoringConfig := monitoring.Config{
Enabled: monitoringEnabled,
ShardingScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount,
ShardingScaleDownPercentage: monitoringShardingScaleDownPercentage,
PrometheusVersion: prometheusVersion,
}

prometheusAgentService := prometheusagent.PrometheusAgentService{
Client: mgr.GetClient(),
OrganizationRepository: organizationRepository,
PasswordManager: password.SimpleManager{},
ManagementCluster: managementCluster,
PrometheusVersion: prometheusVersion,
MonitoringConfig: monitoringConfig,
}

if err = (&controller.ClusterMonitoringReconciler{
Client: mgr.GetClient(),
ManagementCluster: managementCluster,
HeartbeatRepository: heartbeatRepository,
PrometheusAgentService: prometheusAgentService,
MonitoringEnabled: monitoringEnabled,
MonitoringConfig: monitoringConfig,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
10 changes: 10 additions & 0 deletions pkg/monitoring/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package monitoring

// Config represents the configuration used by the monitoring package.
type Config struct {
Enabled bool
ShardingScaleUpSeriesCount float64
ShardingScaleDownPercentage float64
// TODO(atlas): validate prometheus version
PrometheusVersion string
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"strconv"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -15,7 +16,7 @@ import (
"github.com/giantswarm/observability-operator/pkg/common"
"github.com/giantswarm/observability-operator/pkg/monitoring"
"github.com/giantswarm/observability-operator/pkg/monitoring/mimir/querier"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/shards"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding"
)

func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context,
Expand Down Expand Up @@ -45,7 +46,7 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context,
"service_priority": getServicePriority(cluster),
}

shards, err := getShardsCountForCluster(ctx, cluster, currentShards)
shards, err := pas.getShardsCountForCluster(ctx, cluster, currentShards)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -54,10 +55,10 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context,
PrometheusAgentConfig: &PrometheusAgentConfig{
ExternalLabels: externalLabels,
Image: &PrometheusAgentImage{
Tag: pas.PrometheusVersion,
Tag: pas.MonitoringConfig.PrometheusVersion,
},
Shards: shards,
Version: pas.PrometheusVersion,
Version: pas.MonitoringConfig.PrometheusVersion,
},
})
if err != nil {
Expand Down Expand Up @@ -96,17 +97,45 @@ func getServicePriority(cluster *clusterv1.Cluster) string {
}

// We want to compute the number of shards based on the number of series.
func getShardsCountForCluster(ctx context.Context, cluster *clusterv1.Cluster, currentShardCount int) (int, error) {
func (pas PrometheusAgentService) getShardsCountForCluster(ctx context.Context, cluster *clusterv1.Cluster, currentShardCount int) (int, error) {
clusterShardingStrategy, err := getClusterShardingStrategy(cluster)
if err != nil {
return 0, errors.WithStack(err)
}

shardingStrategy := sharding.ShardingStrategy{
ScaleUpSeriesCount: pas.MonitoringConfig.ShardingScaleUpSeriesCount,
ScaleDownPercentage: pas.MonitoringConfig.ShardingScaleDownPercentage,
}.Merge(clusterShardingStrategy)
headSeries, err := querier.QueryTSDBHeadSeries(ctx, cluster.Name)
if err != nil {
// Verify that Prometheus is accessible. If not, return the default number of shards.
var dnsError *net.DNSError
if errors.As(err, &dnsError) {
return shards.ComputeShards(currentShardCount, defaultShards), nil
return shardingStrategy.ComputeShards(currentShardCount, defaultShards), nil
}
return 0, errors.WithStack(err)
}
return shards.ComputeShards(currentShardCount, headSeries), nil
return shardingStrategy.ComputeShards(currentShardCount, headSeries), nil
}

func getClusterShardingStrategy(cluster metav1.Object) (*sharding.ShardingStrategy, error) {
var err error
var scaleUpSeriesCount, scaleDownPercentage float64
if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-up-series-count"]; ok {
if scaleUpSeriesCount, err = strconv.ParseFloat(value, 64); err != nil {
return nil, err
}
}
if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-down-percentage"]; ok {
if scaleDownPercentage, err = strconv.ParseFloat(value, 64); err != nil {
return nil, err
}
}
return &sharding.ShardingStrategy{
ScaleUpSeriesCount: scaleUpSeriesCount,
ScaleDownPercentage: scaleDownPercentage,
}, nil
}

func readCurrentShardsFromConfig(configMap corev1.ConfigMap) (int, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/monitoring/prometheusagent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type PrometheusAgentService struct {
organization.OrganizationRepository
PasswordManager password.Manager
common.ManagementCluster
PrometheusVersion string
MonitoringConfig monitoring.Config
}

// ReconcileRemoteWriteConfiguration ensures that the prometheus remote write config is present in the cluster.
Expand Down
46 changes: 46 additions & 0 deletions pkg/monitoring/prometheusagent/sharding/sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package sharding

import "math"

type ShardingStrategy struct {
// Configures the number of series needed to add a new shard. Computation is number of series / ScaleUpSeriesCount
ScaleUpSeriesCount float64
// Percentage of needed series based on ScaleUpSeriesCount to scale down agents
ScaleDownPercentage float64
}

func (pass1 ShardingStrategy) Merge(pass2 *ShardingStrategy) ShardingStrategy {
strategy := ShardingStrategy{
pass1.ScaleUpSeriesCount,
pass1.ScaleDownPercentage,
}
if pass2 != nil {
if pass2.ScaleUpSeriesCount > 0 {
strategy.ScaleUpSeriesCount = pass2.ScaleUpSeriesCount
}
if pass2.ScaleDownPercentage > 0 {
strategy.ScaleDownPercentage = pass2.ScaleDownPercentage
}
}
return strategy
}

// We want to start with 1 prometheus-agent for each 1M time series with a scale down 20% threshold.
func (pass ShardingStrategy) ComputeShards(currentShardCount int, timeSeries float64) int {
shardScaleDownThreshold := pass.ScaleDownPercentage * pass.ScaleUpSeriesCount
desiredShardCount := int(math.Ceil(timeSeries / pass.ScaleUpSeriesCount))

// Compute Scale Down
if currentShardCount > desiredShardCount {
// We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold
if math.Mod(timeSeries, pass.ScaleUpSeriesCount) > pass.ScaleUpSeriesCount-shardScaleDownThreshold {
desiredShardCount = currentShardCount
}
}

// We always have a minimum of 1 agent, even if there is no worker node
if desiredShardCount <= 0 {
return 1
}
return desiredShardCount
}
71 changes: 71 additions & 0 deletions pkg/monitoring/prometheusagent/sharding/sharding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package sharding

import (
"testing"
)

func TestShardComputationScaleUp(t *testing.T) {
pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}

expected := 1
result := pass.ComputeShards(0, float64(1_000_000))
if result != expected {
t.Errorf(`expected computeShards(0, 1_000_000) to be %d, got %d`, expected, result)
}

expected = 2
result = pass.ComputeShards(0, float64(1_000_001))
if result != expected {
t.Errorf(`expected computeShards(0, 1_000_001) to be %d, got %d`, expected, result)
}

expected = 3
result = pass.ComputeShards(0, float64(2_000_001))
if result != expected {
t.Errorf(`expected computeShards(0, 2_000_001) to be %d, got %d`, expected, result)
}
}

func TestShardComputationReturnsAtLeast1Shart(t *testing.T) {
pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}

expected := 1
result := pass.ComputeShards(0, 0)
if result != expected {
t.Errorf(`expected computeShards(0, 0) to be %d, got %d`, expected, result)
}

expected = 1
result = pass.ComputeShards(0, -5)
if result != expected {
t.Errorf(`expected computeShards(0, -5) to be %d, got %d`, expected, result)
}
}

func TestShardComputationScaleDown(t *testing.T) {
pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}
expected := 2
result := pass.ComputeShards(1, 1_000_001)
if result != expected {
t.Errorf(`expected computeShards(1, 1_000_001) to be %d, got %d`, expected, result)
}

expected = 2
result = pass.ComputeShards(2, 999_999)
if result != expected {
t.Errorf(`expected computeShards(2, 999_999) to be %d, got %d`, expected, result)
}

expected = 2
result = pass.ComputeShards(2, 800_001)
if result != expected {
t.Errorf(`expected computeShards(2, 800_001) to be %d, got %d`, expected, result)
}

// threshold hit
expected = 1
result = pass.ComputeShards(2, 800_000)
if result != expected {
t.Errorf(`expected computeShards(2, 800_000) to be %d, got %d`, expected, result)
}
}
Loading

0 comments on commit 588c04b

Please sign in to comment.