diff --git a/go.mod b/go.mod index a79cc519..665210e7 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,12 @@ require ( github.com/onsi/gomega v1.32.0 github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.22 github.com/pkg/errors v0.9.1 + github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0 + github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/common v0.51.1 github.com/sirupsen/logrus v1.9.0 + gopkg.in/yaml.v2 v2.4.0 + k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 k8s.io/client-go v0.29.3 sigs.k8s.io/cluster-api v1.6.3 @@ -44,9 +49,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.51.1 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect @@ -63,9 +66,7 @@ require ( google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.3 // indirect k8s.io/apiextensions-apiserver v0.29.3 // indirect k8s.io/component-base v0.29.3 // indirect k8s.io/klog/v2 v2.120.1 // indirect diff --git a/go.sum b/go.sum index 56fe11da..cd2994d2 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -77,6 +79,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo/v2 v2.17.1 h1:V++EzdbhI4ZV4ev0UTIj0PzhzOcReJFyJaLjtSF55M8= github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs= github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk= @@ -86,8 +90,11 @@ github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.22/go.mod h1:4OjcxgwdXzezqytxN534Moo github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0 h1:9h7PxMhT1S8lOdadEKJnBh3ELMdO60XkoDV98grYjuM= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0/go.mod h1:4FiLCL664L4dNGeqZewiiD0NS7hhqi/CxyM4UOq5dfM= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= diff --git a/internal/controller/cluster_monitoring_controller.go b/internal/controller/cluster_monitoring_controller.go index b1766029..39ed7596 100644 --- a/internal/controller/cluster_monitoring_controller.go +++ b/internal/controller/cluster_monitoring_controller.go @@ -18,8 +18,10 @@ package controller import ( "context" + "reflect" "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" ctrl "sigs.k8s.io/controller-runtime" @@ -29,8 +31,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/giantswarm/observability-operator/pkg/common" + "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/monitoring" "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent" ) // ClusterMonitoringReconciler reconciles a Cluster object @@ -40,6 +44,8 @@ type ClusterMonitoringReconciler struct { common.ManagementCluster // HeartbeatRepository is the repository for managing heartbeats. heartbeat.HeartbeatRepository + // OrganizationRepository is the repository for reading cluster organization. + organization.OrganizationRepository // MonitoringEnabled defines whether monitoring is enabled at the installation level. MonitoringEnabled bool } @@ -105,6 +111,18 @@ func (r *ClusterMonitoringReconciler) reconcile(ctx context.Context, cluster *cl return ctrl.Result{}, nil } + organization, err := r.OrganizationRepository.Read(ctx, cluster) + if err != nil { + logger.Error(err, "failed to get cluster organization") + return ctrl.Result{Requeue: true}, errors.WithStack(err) + } + + provider, err := common.GetClusterProvider(cluster) + if err != nil { + logger.Error(err, "failed to get cluster provider") + return ctrl.Result{Requeue: true}, errors.WithStack(err) + } + if cluster.Name == r.ManagementCluster.Name { err := r.HeartbeatRepository.CreateOrUpdate(ctx) if err != nil { @@ -113,6 +131,9 @@ func (r *ClusterMonitoringReconciler) reconcile(ctx context.Context, cluster *cl } } + // Create or update PrometheusAgent remote write configuration. + r.ensurePrometheusAgentRemoteWriteConfig(ctx, cluster, organization, provider) + return ctrl.Result{}, nil } @@ -120,7 +141,6 @@ func (r *ClusterMonitoringReconciler) reconcile(ctx context.Context, cluster *cl func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster) (reconcile.Result, error) { logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) if controllerutil.ContainsFinalizer(cluster, monitoring.MonitoringFinalizer) { - if cluster.Name == r.ManagementCluster.Name { err := r.HeartbeatRepository.Delete(ctx) if err != nil { @@ -129,6 +149,23 @@ func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, clust } } + organization, err := r.OrganizationRepository.Read(ctx, cluster) + if err != nil { + logger.Error(err, "failed to get cluster organization") + return ctrl.Result{Requeue: true}, errors.WithStack(err) + } + + provider, err := common.GetClusterProvider(cluster) + if err != nil { + logger.Error(err, "failed to get cluster provider") + return ctrl.Result{Requeue: true}, errors.WithStack(err) + } + + err = r.deletePrometheusAgentRemoteWriteConfig(ctx, cluster, organization, provider) + if err != nil { + logger.Error(err, "failed to dekete prometheus agent remote write config") + return ctrl.Result{Requeue: true}, errors.WithStack(err) + } // We get the latest state of the object to avoid race conditions. // Finalizer handling needs to come last. logger.Info("removing finalizer", "finalizer", monitoring.MonitoringFinalizer) @@ -145,3 +182,90 @@ func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, clust controllerutil.RemoveFinalizer(cluster, monitoring.MonitoringFinalizer) return ctrl.Result{}, nil } + +// ensurePrometheusAgentRemoteWriteConfig ensures that the prometheus remote write config is present in the cluster. +func (r *ClusterMonitoringReconciler) ensurePrometheusAgentRemoteWriteConfig(ctx context.Context, cluster *clusterv1.Cluster, organization string, provider string) error { + logger := log.FromContext(ctx) + logger.Info("ensuring prometheus remote write config") + + objectKey := client.ObjectKey{ + Name: prometheusagent.GetPrometheusAgentRemoteWriteConfigName(cluster), + Namespace: cluster.GetNamespace(), + } + + current := &corev1.ConfigMap{} + // Get the current configmap if it exists. + err := r.Client.Get(ctx, objectKey, current) + if apierrors.IsNotFound(err) { + configMap, err := prometheusagent.BuildRemoteWriteConfig(ctx, r.ManagementCluster, cluster, organization, provider, 1) + if err != nil { + return errors.WithStack(err) + } + + err = r.Client.Create(ctx, configMap) + return errors.WithStack(err) + } else if err != nil { + return errors.WithStack(err) + } + + if current != nil { + currentShards, err := prometheusagent.ReadCurrentShardsFromConfig(*current) + if err != nil { + return errors.WithStack(err) + } + + desired, err := prometheusagent.BuildRemoteWriteConfig(ctx, r.ManagementCluster, cluster, organization, provider, currentShards) + if err != nil { + return errors.WithStack(err) + } + + if !reflect.DeepEqual(current.Data, desired.Data) { + err = r.Client.Patch(ctx, current, client.MergeFrom(desired)) + if err != nil { + return errors.WithStack(err) + } + } + } + + logger.Info("ensured prometheus remote write config") + + return nil +} + +func (r *ClusterMonitoringReconciler) deletePrometheusAgentRemoteWriteConfig(ctx context.Context, cluster *clusterv1.Cluster, organization string, provider string) error { + + logger := log.FromContext(ctx) + logger.Info("deleting prometheus remote write config") + + objectKey := client.ObjectKey{ + Name: prometheusagent.GetPrometheusAgentRemoteWriteConfigName(cluster), + Namespace: cluster.GetNamespace(), + } + + current := &corev1.ConfigMap{} + // Get the current configmap if it exists. + err := r.Client.Get(ctx, objectKey, current) + if apierrors.IsNotFound(err) { + // We ignore cases where the configmap is not found (it it was manually deleted for instance) + return nil + } else if err != nil { + return errors.WithStack(err) + } + + desired := current.DeepCopy() + // Delete the finalizer + controllerutil.RemoveFinalizer(desired, monitoring.MonitoringFinalizer) + err = r.Client.Patch(ctx, current, client.MergeFrom(desired)) + if err != nil { + return errors.WithStack(err) + } + + err = r.Client.Delete(ctx, desired) + if err != nil { + return errors.WithStack(err) + } + + logger.Info("deleted prometheus remote write config") + + return nil +} diff --git a/main.go b/main.go index 9845d80c..abe1bee4 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,7 @@ import ( "github.com/giantswarm/observability-operator/internal/controller" "github.com/giantswarm/observability-operator/pkg/common" + "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" //+kubebuilder:scaffold:imports ) @@ -170,11 +171,14 @@ func main() { os.Exit(1) } + organizationRepository := organization.NewNamespaceRepository(mgr.GetClient()) + if err = (&controller.ClusterMonitoringReconciler{ - Client: mgr.GetClient(), - ManagementCluster: managementCluster, - HeartbeatRepository: heartbeatRepository, - MonitoringEnabled: monitoringEnabled, + Client: mgr.GetClient(), + ManagementCluster: managementCluster, + HeartbeatRepository: heartbeatRepository, + OrganizationRepository: organizationRepository, + MonitoringEnabled: monitoringEnabled, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/pkg/common/organization/repository.go b/pkg/common/organization/repository.go new file mode 100644 index 00000000..5057cf9f --- /dev/null +++ b/pkg/common/organization/repository.go @@ -0,0 +1,39 @@ +package organization + +import ( + "context" + "errors" + + corev1 "k8s.io/api/core/v1" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + organizationLabel string = "giantswarm.io/organization" +) + +type OrganizationRepository interface { + Read(ctx context.Context, cluster *clusterv1.Cluster) (string, error) +} + +type NamespaceOrganizationRepository struct { + client.Client +} + +func NewNamespaceRepository(client client.Client) OrganizationRepository { + return NamespaceOrganizationRepository{client} +} + +func (r NamespaceOrganizationRepository) Read(ctx context.Context, cluster *clusterv1.Cluster) (string, error) { + namespace := &corev1.Namespace{} + err := r.Client.Get(ctx, client.ObjectKey{Name: cluster.GetNamespace()}, namespace) + if err != nil { + return "", err + } + + if organization, ok := namespace.Labels[organizationLabel]; ok { + return organization, nil + } + return "", errors.New("cluster namespace missing organization label") +} diff --git a/pkg/monitoring/mimir/querier/querier.go b/pkg/monitoring/mimir/querier/querier.go new file mode 100644 index 00000000..4f29a573 --- /dev/null +++ b/pkg/monitoring/mimir/querier/querier.go @@ -0,0 +1,67 @@ +package querier + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" +) + +// headerAdder is an http.RoundTripper that adds additional headers to the request +type headerAdder struct { + headers map[string][]string + + rt http.RoundTripper +} + +func (h *headerAdder) RoundTrip(req *http.Request) (*http.Response, error) { + for k, vv := range h.headers { + for _, v := range vv { + req.Header.Add(k, v) + } + } + return h.rt.RoundTrip(req) +} + +// QueryTSDBHeadSeries performs an instant query against Mimir. +func QueryTSDBHeadSeries(clusterName string) (float64, error) { + headerAdder := &headerAdder{ + headers: map[string][]string{ + "X-Org-Id": {"anonynous"}, + }, + rt: http.DefaultTransport, + } + config := api.Config{ + Address: "http://mimir-gateway.mimir.svc/prometheus", + RoundTripper: headerAdder, + } + + // Create new client. + c, err := api.NewClient(config) + if err != nil { + return 0, err + } + + // Run query against client. + api := v1.NewAPI(c) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + val, _, err := api.Query(ctx, fmt.Sprintf("max_over_time(count({cluster_id=\"%s\"})[6h])", clusterName), time.Now()) + cancel() + if err != nil { + return 0, err + } + + switch val.Type() { + case model.ValVector: + vector := val.(model.Vector) + return float64(vector[0].Value), nil + default: + return 0, errors.New("failed to get current number of time series") + } +} diff --git a/pkg/monitoring/prometheusagent/remotewrite/types.go b/pkg/monitoring/prometheusagent/remotewrite/types.go new file mode 100644 index 00000000..5e0483da --- /dev/null +++ b/pkg/monitoring/prometheusagent/remotewrite/types.go @@ -0,0 +1,21 @@ +package remotewrite + +import ( + promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" +) + +type RemoteWriteConfig struct { + PrometheusAgentConfig PrometheusAgentConfig `yaml:"prometheus-agent,omitempty" json:"prometheus-agent,omitempty"` +} + +type PrometheusAgentConfig struct { + ExternalLabels map[string]string `yaml:"externalLabels,omitempty" json:"externalLabels,omitempty"` + RemoteWrite []RemoteWrite `yaml:"remoteWrite,omitempty" json:"remoteWrite,omitempty"` + Shards int `yaml:"shards,omitempty" json:"shards,omitempty"` +} + +type RemoteWrite struct { + promv1.RemoteWriteSpec `yaml:",inline" json:",inline"` + Password string `yaml:"password" json:"password"` + Username string `yaml:"username" json:"username"` +} diff --git a/pkg/monitoring/prometheusagent/shards/shards.go b/pkg/monitoring/prometheusagent/shards/shards.go new file mode 100644 index 00000000..25394ff4 --- /dev/null +++ b/pkg/monitoring/prometheusagent/shards/shards.go @@ -0,0 +1,28 @@ +package shards + +import "math" + +const ( + shardStep = float64(1_000_000) + shardScaleDownPercentage = float64(0.20) + shardScaleDownThreshold = shardScaleDownPercentage * shardStep +) + +// We want to start with 1 prometheus-agent for each 1M time series with a scale down 25% threshold. +func ComputeShards(currentShardCount int, timeSeries float64) int { + desiredShardCount := int(math.Ceil(timeSeries / shardStep)) + + // Compute Scale Down + if currentShardCount > desiredShardCount { + // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold + if math.Mod(timeSeries, shardStep) > shardStep-shardScaleDownThreshold { + desiredShardCount = currentShardCount + } + } + + // We always have a minimum of 1 agent, even if there is no worker node + if desiredShardCount <= 0 { + return 1 + } + return desiredShardCount +} diff --git a/pkg/monitoring/prometheusagent/shards/shards_test.go b/pkg/monitoring/prometheusagent/shards/shards_test.go new file mode 100644 index 00000000..3043a5ec --- /dev/null +++ b/pkg/monitoring/prometheusagent/shards/shards_test.go @@ -0,0 +1,69 @@ +package shards + +import ( + "flag" + "testing" +) + +var _ = flag.Bool("update", false, "update the output file") + +func TestShardComputationScaleUp(t *testing.T) { + expected := 1 + result := ComputeShards(0, float64(1_000_000)) + if result != expected { + t.Errorf(`expected ComputeShards(0, 1_000_000) to be %d, got %d`, expected, result) + } + + expected = 2 + result = ComputeShards(0, float64(1_000_001)) + if result != expected { + t.Errorf(`expected ComputeShards(0, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 3 + result = ComputeShards(0, float64(2_000_001)) + if result != expected { + t.Errorf(`expected ComputeShards(0, 2_000_001) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { + expected := 1 + result := ComputeShards(0, 0) + if result != expected { + t.Errorf(`expected ComputeShards(0, 0) to be %d, got %d`, expected, result) + } + + expected = 1 + result = ComputeShards(0, -5) + if result != expected { + t.Errorf(`expected ComputeShards(0, -5) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationScaleDown(t *testing.T) { + expected := 2 + result := ComputeShards(1, 1_000_001) + if result != expected { + t.Errorf(`expected ComputeShards(1, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 2 + result = ComputeShards(2, 999_999) + if result != expected { + t.Errorf(`expected ComputeShards(2, 999_999) to be %d, got %d`, expected, result) + } + + expected = 2 + result = ComputeShards(2, 800_001) + if result != expected { + t.Errorf(`expected ComputeShards(2, 800_001) to be %d, got %d`, expected, result) + } + + // threshold hit + expected = 1 + result = ComputeShards(2, 800_000) + if result != expected { + t.Errorf(`expected ComputeShards(2, 800_000) to be %d, got %d`, expected, result) + } +} diff --git a/pkg/monitoring/prometheusagent/utils.go b/pkg/monitoring/prometheusagent/utils.go new file mode 100644 index 00000000..882f6f85 --- /dev/null +++ b/pkg/monitoring/prometheusagent/utils.go @@ -0,0 +1,111 @@ +package prometheusagent + +import ( + "context" + "fmt" + "net" + + "github.com/pkg/errors" + "gopkg.in/yaml.v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + + "github.com/giantswarm/observability-operator/pkg/common" + "github.com/giantswarm/observability-operator/pkg/monitoring" + "github.com/giantswarm/observability-operator/pkg/monitoring/mimir/querier" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/remotewrite" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/shards" +) + +const ( + // defaultServicePriority is the default service priority if not set. + defaultServicePriority string = "highest" + // defaultShardCount is the default number of shards to use. + defaultShardCount = 1 + + // servicePriorityLabel is the label used to determine the priority of a service. + servicePriorityLabel string = "giantswarm.io/service-priority" +) + +func GetPrometheusAgentRemoteWriteConfigName(cluster *clusterv1.Cluster) string { + return fmt.Sprintf("%s-remote-write-config", cluster.Name) +} + +func BuildRemoteWriteConfig(ctx context.Context, mc common.ManagementCluster, cluster *clusterv1.Cluster, organization string, provider string, currentShards int) (*corev1.ConfigMap, error) { + clusterType := "workload_cluster" + if val, ok := cluster.Labels["cluster.x-k8s.io/cluster-name"]; ok && val == mc.Name { + clusterType = "management_cluster" + } + + externalLabels := map[string]string{ + "cluster_id": cluster.Name, + "cluster_type": clusterType, + "customer": mc.Customer, + "installation": mc.Name, + "organization": organization, + "pipeline": mc.Pipeline, + "provider": provider, + "region": mc.Region, + "service_priority": getServicePriority(cluster), + } + + shards, err := getShardsCountForCluster(ctx, cluster, currentShards) + if err != nil { + return nil, errors.WithStack(err) + } + + config, err := yaml.Marshal(remotewrite.RemoteWriteConfig{ + PrometheusAgentConfig: remotewrite.PrometheusAgentConfig{ + ExternalLabels: externalLabels, + Shards: shards, + }, + }) + if err != nil { + return nil, errors.WithStack(err) + } + + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: GetPrometheusAgentRemoteWriteConfigName(cluster), + Namespace: cluster.Namespace, + Finalizers: []string{ + monitoring.MonitoringFinalizer, + }, + }, + Data: map[string]string{ + "values": string(config), + }, + }, nil +} + +func ReadCurrentShardsFromConfig(configMap corev1.ConfigMap) (int, error) { + remoteWriteConfig := remotewrite.RemoteWriteConfig{} + err := yaml.Unmarshal([]byte(configMap.Data["values"]), &remoteWriteConfig) + if err != nil { + return 0, errors.WithStack(err) + } + + return remoteWriteConfig.PrometheusAgentConfig.Shards, nil +} + +// We want to compute the number of shards based on the number of nodes. +func getShardsCountForCluster(ctx context.Context, cluster *clusterv1.Cluster, currentShardCount int) (int, error) { + headSeries, err := querier.QueryTSDBHeadSeries(cluster.Name) + if err != nil { + // If prometheus is not accessible (for instance, not running because this is a new cluster, we check if prometheus is accessible) + var dnsError *net.DNSError + if errors.As(err, &dnsError) { + return shards.ComputeShards(currentShardCount, defaultShardCount), nil + } + return 0, errors.WithStack(err) + } + return shards.ComputeShards(currentShardCount, headSeries), nil +} + +func getServicePriority(cluster *clusterv1.Cluster) string { + if servicePriority, ok := cluster.GetLabels()[servicePriorityLabel]; ok && servicePriority != "" { + return servicePriority + } + return defaultServicePriority +}