Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinBisson committed Jun 10, 2024
1 parent cb7327e commit efa2965
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 33 deletions.
2 changes: 1 addition & 1 deletion docs/monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

The Observability Operator is in charge of configuring the Prometheus Agent instances running in workload clusters like remote write configuration, [sharding](sharding.md) and so on.

TODO(atlas): Add documentation here.
TODO(atlas): Add operator specific documentation here ("sequence diagrams", list of created and managed resources)
13 changes: 8 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat"
"github.com/giantswarm/observability-operator/pkg/monitoring/mimir"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -197,10 +198,12 @@ func main() {
organizationRepository := organization.NewNamespaceRepository(mgr.GetClient())

monitoringConfig := monitoring.Config{
Enabled: monitoringEnabled,
ShardingScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount,
ShardingScaleDownPercentage: monitoringShardingScaleDownPercentage,
PrometheusVersion: prometheusVersion,
Enabled: monitoringEnabled,
DefaultShardingStrategy: sharding.Strategy{
ScaleUpSeriesCount: monitoringShardingScaleUpSeriesCount,
ScaleDownPercentage: monitoringShardingScaleDownPercentage,
},
PrometheusVersion: prometheusVersion,
}

prometheusAgentService := prometheusagent.PrometheusAgentService{
Expand All @@ -223,7 +226,7 @@ func main() {
HeartbeatRepository: heartbeatRepository,
PrometheusAgentService: prometheusAgentService,
MimirService: mimirService,
MonitoringConfig: monitoringConfig,
MonitoringConfig: monitoringConfig,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
9 changes: 5 additions & 4 deletions pkg/monitoring/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package monitoring

import "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/sharding"

// Config represents the configuration used by the monitoring package.
type Config struct {
Enabled bool
ShardingScaleUpSeriesCount float64
ShardingScaleDownPercentage float64
// TODO(atlas): validate prometheus version
Enabled bool
DefaultShardingStrategy sharding.Strategy
// TODO(atlas): validate prometheus version using SemVer
PrometheusVersion string
}
15 changes: 6 additions & 9 deletions pkg/monitoring/prometheusagent/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,26 @@ func getServicePriority(cluster *clusterv1.Cluster) string {
func (pas PrometheusAgentService) getShardsCountForCluster(
ctx context.Context, cluster *clusterv1.Cluster, currentShardCount int) (int, error) {

clusterShardingStrategy, err := getClusterShardingStrategy(cluster)
clusterStrategy, err := getClusterStrategy(cluster)
if err != nil {
return 0, errors.WithStack(err)
}

shardingStrategy := sharding.ShardingStrategy{
ScaleUpSeriesCount: pas.MonitoringConfig.ShardingScaleUpSeriesCount,
ScaleDownPercentage: pas.MonitoringConfig.ShardingScaleDownPercentage,
}.Merge(clusterShardingStrategy)
Strategy := pas.MonitoringConfig.DefaultShardingStrategy.Merge(clusterStrategy)
headSeries, err := querier.QueryTSDBHeadSeries(ctx, cluster.Name)
if err != nil {
// If Prometheus is not accessible (DNSError), or if we don't have any data yet (ErrNoTimeSeries)
// Then, return the default number of shards.
var dnsError *net.DNSError
if errors.As(err, &dnsError) || errors.Is(err, querier.ErrorNoTimeSeries) {
return shardingStrategy.ComputeShards(currentShardCount, defaultShards), nil
return Strategy.ComputeShards(currentShardCount, defaultShards), nil
}
return 0, errors.WithStack(err)
}
return shardingStrategy.ComputeShards(currentShardCount, headSeries), nil
return Strategy.ComputeShards(currentShardCount, headSeries), nil
}

func getClusterShardingStrategy(cluster metav1.Object) (*sharding.ShardingStrategy, error) {
func getClusterStrategy(cluster metav1.Object) (*sharding.Strategy, error) {
var err error
var scaleUpSeriesCount, scaleDownPercentage float64
if value, ok := cluster.GetAnnotations()["monitoring.giantswarm.io/prometheus-agent-scale-up-series-count"]; ok {
Expand All @@ -131,7 +128,7 @@ func getClusterShardingStrategy(cluster metav1.Object) (*sharding.ShardingStrate
return nil, err
}
}
return &sharding.ShardingStrategy{
return &sharding.Strategy{
ScaleUpSeriesCount: scaleUpSeriesCount,
ScaleDownPercentage: scaleDownPercentage,
}, nil
Expand Down
22 changes: 11 additions & 11 deletions pkg/monitoring/prometheusagent/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,31 @@ package sharding

import "math"

type ShardingStrategy struct {
type Strategy struct {
// Configures the number of series needed to add a new shard. Computation is number of series / ScaleUpSeriesCount
ScaleUpSeriesCount float64
// Percentage of needed series based on ScaleUpSeriesCount to scale down agents
ScaleDownPercentage float64
}

func (pass1 ShardingStrategy) Merge(pass2 *ShardingStrategy) ShardingStrategy {
strategy := ShardingStrategy{
pass1.ScaleUpSeriesCount,
pass1.ScaleDownPercentage,
func (s Strategy) Merge(newStrategy *Strategy) Strategy {
strategy := Strategy{
s.ScaleUpSeriesCount,
s.ScaleDownPercentage,
}
if pass2 != nil {
if pass2.ScaleUpSeriesCount > 0 {
strategy.ScaleUpSeriesCount = pass2.ScaleUpSeriesCount
if newStrategy != nil {
if newStrategy.ScaleUpSeriesCount > 0 {
strategy.ScaleUpSeriesCount = newStrategy.ScaleUpSeriesCount
}
if pass2.ScaleDownPercentage > 0 {
strategy.ScaleDownPercentage = pass2.ScaleDownPercentage
if newStrategy.ScaleDownPercentage > 0 {
strategy.ScaleDownPercentage = newStrategy.ScaleDownPercentage
}
}
return strategy
}

// We want to start with 1 prometheus-agent for each 1M time series with a scale down 20% threshold.
func (pass ShardingStrategy) ComputeShards(currentShardCount int, timeSeries float64) int {
func (pass Strategy) ComputeShards(currentShardCount int, timeSeries float64) int {
shardScaleDownThreshold := pass.ScaleDownPercentage * pass.ScaleUpSeriesCount
desiredShardCount := int(math.Ceil(timeSeries / pass.ScaleUpSeriesCount))

Expand Down
6 changes: 3 additions & 3 deletions pkg/monitoring/prometheusagent/sharding/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

func TestShardComputationScaleUp(t *testing.T) {
pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}
pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}

expected := 1
result := pass.ComputeShards(0, float64(1_000_000))
Expand All @@ -27,7 +27,7 @@ func TestShardComputationScaleUp(t *testing.T) {
}

func TestShardComputationReturnsAtLeast1Shart(t *testing.T) {
pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}
pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}

expected := 1
result := pass.ComputeShards(0, 0)
Expand All @@ -43,7 +43,7 @@ func TestShardComputationReturnsAtLeast1Shart(t *testing.T) {
}

func TestShardComputationScaleDown(t *testing.T) {
pass := ShardingStrategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}
pass := Strategy{ScaleUpSeriesCount: float64(1_000_000), ScaleDownPercentage: float64(0.20)}
expected := 2
result := pass.ComputeShards(1, 1_000_001)
if result != expected {
Expand Down

0 comments on commit efa2965

Please sign in to comment.