Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sharding strategy support #20

Merged
merged 9 commits into from
Jun 25, 2024
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add per cluster and installation overridable sharding strategy support for mimir-backed installations.

### Fixed

- Fix an issue where remote-write secret was not being created when head series query fails.
Expand All @@ -19,7 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [0.1.0] - 2024-06-06

### Add
### Added

- Add support for mimir in remoteWrite secret creation.
- Add mimir ingress secret for basic auth creation.
Expand Down
5 changes: 5 additions & 0 deletions docs/monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Introduction

The Observability Operator is in charge of configuring the Prometheus Agent instances running in workload clusters like remote write configuration, [sharding](sharding.md) and so on.

TODO(atlas): Add operator specific documentation here ("sequence diagrams", list of created and managed resources)
23 changes: 23 additions & 0 deletions docs/monitoring/sharding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Prometheus Agent Sharding

To be able to ingest metrics without disrupting the workload running in the clusters, the observability operator chooses the number of running __prometheus agent shards__ on each workload cluster. The number of shards is based on the __total number of time series__ ingested for a given cluster.

__By default__, the operator configures 1 shard for every 1M time series present in Mimir for the workload cluster. To avoid scaling down too abruptly, we defined a scale down threshold of 20%.

Scale up series threshold and scale down percentage are overridables.

1. Those values can be configured at the installation level by overriding the following values:

```yaml
monitoring:
sharding:
scaleUpSeriesCount: 1000000
scaleDownPercentage: 0.20
```

2. Those values can also be set per cluster using the following cluster annotations:

```yaml
monitoring.giantswarm.io/prometheus-agent-scale-up-series-count: 1000000
monitoring.giantswarm.io/prometheus-agent-scale-down-percentage: 0.20
```
2 changes: 2 additions & 0 deletions helm/observability-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ spec:
- --management-cluster-pipeline={{ $.Values.managementCluster.pipeline }}
- --management-cluster-region={{ $.Values.managementCluster.region }}
- --monitoring-enabled={{ $.Values.monitoring.enabled }}
- --monitoring-sharding-scale-up-series-count={{ $.Values.monitoring.sharding.scaleUpSeriesCount }}
- --monitoring-sharding-scale-down-percentage={{ $.Values.monitoring.sharding.scaleDownPercentage }}
{{- if .Values.monitoring.prometheusVersion }}
- --prometheus-version={{ $.Values.monitoring.prometheusVersion }}
{{- end }}
Expand Down
3 changes: 3 additions & 0 deletions helm/observability-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ monitoring:
enabled: false
opsgenieApiKey: ""
prometheusVersion: ""
sharding:
scaleUpSeriesCount: 1000000
scaleDownPercentage: 0.20

operator:
# -- Configures the resources for the operator deployment
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/cluster_monitoring_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type ClusterMonitoringReconciler struct {
heartbeat.HeartbeatRepository
// MimirService is the service for managing mimir configuration.
mimir.MimirService
// MonitoringEnabled defines whether monitoring is enabled at the installation level.
MonitoringEnabled bool
// MonitoringConfig is the configuration for the monitoring package.
MonitoringConfig monitoring.Config
}

// SetupWithManager sets up the controller with the Manager.
Expand Down Expand Up @@ -82,12 +82,12 @@ func (r *ClusterMonitoringReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, errors.WithStack(err)
}

// Linting is disabled for the 2 following lines as otherwise it fails with the following error:
// Linting is disabled for the following line as otherwise it fails with the following error:
// "should not use built-in type string as key for value"
logger := log.FromContext(ctx).WithValues("cluster", cluster.Name).WithValues("installation", r.ManagementCluster.Name) // nolint
logger := log.FromContext(ctx).WithValues("installation", r.ManagementCluster.Name) // nolint
ctx = log.IntoContext(ctx, logger)

if !r.MonitoringEnabled {
if !r.MonitoringConfig.Enabled {
logger.Info("Monitoring is disabled at the installation level")
return ctrl.Result{}, nil
}
Expand Down
38 changes: 28 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,35 @@ import (
"github.com/giantswarm/observability-operator/pkg/common"
"github.com/giantswarm/observability-operator/pkg/common/organization"
"github.com/giantswarm/observability-operator/pkg/common/password"
"github.com/giantswarm/observability-operator/pkg/monitoring"
"github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat"
"github.com/giantswarm/observability-operator/pkg/monitoring/mimir"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding"
//+kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")

metricsAddr string
enableLeaderElection bool
probeAddr string
secureMetrics bool
enableHTTP2 bool
metricsAddr string
enableLeaderElection bool
probeAddr string
secureMetrics bool
enableHTTP2 bool

managementClusterBaseDomain string
managementClusterCustomer string
managementClusterInsecureCA bool
managementClusterName string
managementClusterPipeline string
managementClusterRegion string
monitoringEnabled bool
prometheusVersion string

monitoringEnabled bool
monitoringShardingScaleUpSeriesCount float64
monitoringShardingScaleDownPercentage float64
prometheusVersion string
)

const (
Expand Down Expand Up @@ -103,6 +109,10 @@ func main() {
"The region of the management cluster.")
flag.BoolVar(&monitoringEnabled, "monitoring-enabled", false,
"Enable monitoring at the management cluster level.")
flag.Float64Var(&monitoringShardingScaleUpSeriesCount, "monitoring-sharding-scale-up-series-count", 0,
"Configures the number of time series needed to add an extra prometheus agent shard.")
flag.Float64Var(&monitoringShardingScaleDownPercentage, "monitoring-sharding-scale-down-percentage", 0,
"Configures the percentage of removed series to scale down the number of prometheus agent shards.")
flag.StringVar(&prometheusVersion, "prometheus-version", "",
"The version of Prometheus Agents to deploy.")
opts := zap.Options{
Expand Down Expand Up @@ -187,13 +197,21 @@ func main() {

organizationRepository := organization.NewNamespaceRepository(mgr.GetClient())

// TODO(atlas): validate prometheus version
monitoringConfig := monitoring.Config{
Enabled: monitoringEnabled,
DefaultShardingStrategy: sharding.Strategy{
ScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount,
ScaleDownPercentage: monitoringShardingScaleDownPercentage,
},
PrometheusVersion: prometheusVersion,
}

prometheusAgentService := prometheusagent.PrometheusAgentService{
Client: mgr.GetClient(),
OrganizationRepository: organizationRepository,
PasswordManager: password.SimpleManager{},
ManagementCluster: managementCluster,
PrometheusVersion: prometheusVersion,
MonitoringConfig: monitoringConfig,
}

mimirService := mimir.MimirService{
Expand All @@ -208,7 +226,7 @@ func main() {
HeartbeatRepository: heartbeatRepository,
PrometheusAgentService: prometheusAgentService,
MimirService: mimirService,
MonitoringEnabled: monitoringEnabled,
MonitoringConfig: monitoringConfig,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
11 changes: 11 additions & 0 deletions pkg/monitoring/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package monitoring

import "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding"

// Config represents the configuration used by the monitoring package.
type Config struct {
Enabled bool
DefaultShardingStrategy sharding.Strategy
// TODO(atlas): validate prometheus version using SemVer
PrometheusVersion string
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prometheusagent
import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -14,7 +15,7 @@ import (
"github.com/giantswarm/observability-operator/pkg/common"
"github.com/giantswarm/observability-operator/pkg/metrics"
"github.com/giantswarm/observability-operator/pkg/monitoring/mimir/querier"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/shards"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding"
)

func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context,
Expand Down Expand Up @@ -50,16 +51,23 @@ func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context,
logger.Error(err, "failed to query head series")
metrics.MimirQueryErrors.WithLabelValues().Inc()
}
shards := shards.ComputeShards(currentShards, headSeries)

clusterShardingStrategy, err := getClusterShardingStrategy(cluster)
if err != nil {
return nil, errors.WithStack(err)
}

shardingStrategy := pas.MonitoringConfig.DefaultShardingStrategy.Merge(clusterShardingStrategy)
shards := shardingStrategy.ComputeShards(currentShards, headSeries)

config, err := yaml.Marshal(RemoteWriteConfig{
PrometheusAgentConfig: &PrometheusAgentConfig{
ExternalLabels: externalLabels,
Image: &PrometheusAgentImage{
Tag: pas.PrometheusVersion,
Tag: pas.MonitoringConfig.PrometheusVersion,
},
Shards: shards,
Version: pas.PrometheusVersion,
Version: pas.MonitoringConfig.PrometheusVersion,
},
})
if err != nil {
Expand Down Expand Up @@ -94,6 +102,25 @@ func getServicePriority(cluster *clusterv1.Cluster) string {
return defaultServicePriority
}

func getClusterShardingStrategy(cluster metav1.Object) (*sharding.Strategy, error) {
var err error
var scaleUpSeriesCount, scaleDownPercentage float64
if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-up-series-count"]; ok {
if scaleUpSeriesCount, err = strconv.ParseFloat(value, 64); err != nil {
return nil, err
}
}
if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-down-percentage"]; ok {
if scaleDownPercentage, err = strconv.ParseFloat(value, 64); err != nil {
return nil, err
}
}
return &sharding.Strategy{
ScaleUpSeriesCount: scaleUpSeriesCount,
ScaleDownPercentage: scaleDownPercentage,
}, nil
}

func readCurrentShardsFromConfig(configMap corev1.ConfigMap) (int, error) {
remoteWriteConfig := RemoteWriteConfig{}
err := yaml.Unmarshal([]byte(configMap.Data["values"]), &remoteWriteConfig)
Expand Down
7 changes: 4 additions & 3 deletions pkg/monitoring/prometheusagent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@ import (
"github.com/giantswarm/observability-operator/pkg/common"
"github.com/giantswarm/observability-operator/pkg/common/organization"
"github.com/giantswarm/observability-operator/pkg/common/password"
"github.com/giantswarm/observability-operator/pkg/monitoring"
)

type PrometheusAgentService struct {
client.Client
organization.OrganizationRepository
PasswordManager password.Manager
common.ManagementCluster
PrometheusVersion string
MonitoringConfig monitoring.Config
}

// ReconcileRemoteWriteConfiguration ensures that the prometheus remote write config is present in the cluster.
func (pas *PrometheusAgentService) ReconcileRemoteWriteConfiguration(
ctx context.Context, cluster *clusterv1.Cluster) error {

logger := log.FromContext(ctx).WithValues("cluster", cluster.Name)
logger := log.FromContext(ctx)
logger.Info("ensuring prometheus agent remote write configmap and secret")

err := pas.createOrUpdateConfigMap(ctx, cluster, logger)
Expand Down Expand Up @@ -140,7 +141,7 @@ func (pas PrometheusAgentService) createOrUpdateSecret(ctx context.Context,
func (pas *PrometheusAgentService) DeleteRemoteWriteConfiguration(
ctx context.Context, cluster *clusterv1.Cluster) error {

logger := log.FromContext(ctx).WithValues("cluster", cluster.Name)
logger := log.FromContext(ctx)
logger.Info("deleting prometheus agent remote write configmap and secret")

err := pas.deleteConfigMap(ctx, cluster)
Expand Down
46 changes: 46 additions & 0 deletions pkg/monitoring/prometheusagent/sharding/sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package sharding

import "math"

type Strategy struct {
// Configures the number of series needed to add a new shard. Computation is number of series / ScaleUpSeriesCount
ScaleUpSeriesCount float64
// Percentage of needed series based on ScaleUpSeriesCount to scale down agents
ScaleDownPercentage float64
}

func (s Strategy) Merge(newStrategy *Strategy) Strategy {
strategy := Strategy{
s.ScaleUpSeriesCount,
s.ScaleDownPercentage,
}
if newStrategy != nil {
if newStrategy.ScaleUpSeriesCount > 0 {
strategy.ScaleUpSeriesCount = newStrategy.ScaleUpSeriesCount
}
if newStrategy.ScaleDownPercentage > 0 {
strategy.ScaleDownPercentage = newStrategy.ScaleDownPercentage
}
}
return strategy
}

// We want to start with 1 prometheus-agent for each 1M time series with a scale down 20% threshold.
func (pass Strategy) ComputeShards(currentShardCount int, timeSeries float64) int {
shardScaleDownThreshold := pass.ScaleDownPercentage * pass.ScaleUpSeriesCount
desiredShardCount := int(math.Ceil(timeSeries / pass.ScaleUpSeriesCount))

// Compute Scale Down
if currentShardCount > desiredShardCount {
// Check if the remainder of (timeSeries mod ScaleupSeriesCount) is bigger than the scale down threshold.
if math.Mod(timeSeries, pass.ScaleUpSeriesCount) > pass.ScaleUpSeriesCount-shardScaleDownThreshold {
desiredShardCount = currentShardCount
}
}

// We always have a minimum of 1 agent, even if there is no worker node
if desiredShardCount <= 0 {
return 1
}
return desiredShardCount
}
Loading