diff --git a/go.mod b/go.mod index 6b6e10e6..d5602a74 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,17 @@ module github.com/giantswarm/observability-operator go 1.21 require ( + github.com/go-logr/logr v1.4.1 github.com/onsi/ginkgo/v2 v2.17.1 github.com/onsi/gomega v1.32.0 github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.22 github.com/pkg/errors v0.9.1 + github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0 + github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/common v0.51.1 github.com/sirupsen/logrus v1.9.3 + gopkg.in/yaml.v2 v2.4.0 + k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 k8s.io/client-go v0.29.3 sigs.k8s.io/cluster-api v1.6.3 @@ -21,7 +27,6 @@ require ( github.com/emicklei/go-restful/v3 v3.12.0 // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect @@ -44,9 +49,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.51.1 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect @@ -63,9 +66,7 @@ require ( google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.3 // indirect k8s.io/apiextensions-apiserver v0.29.3 // indirect k8s.io/component-base v0.29.3 // indirect k8s.io/klog/v2 v2.120.1 // indirect diff --git a/go.sum b/go.sum index 6fae4526..dfc74ed7 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -77,6 +79,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo/v2 v2.17.1 h1:V++EzdbhI4ZV4ev0UTIj0PzhzOcReJFyJaLjtSF55M8= github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs= github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk= @@ -86,8 +90,11 @@ github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.22/go.mod h1:4OjcxgwdXzezqytxN534Moo github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0 h1:9h7PxMhT1S8lOdadEKJnBh3ELMdO60XkoDV98grYjuM= +github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0/go.mod h1:4FiLCL664L4dNGeqZewiiD0NS7hhqi/CxyM4UOq5dfM= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= diff --git a/helm/observability-operator/templates/deployment.yaml b/helm/observability-operator/templates/deployment.yaml index e3626322..1bea8099 100644 --- a/helm/observability-operator/templates/deployment.yaml +++ b/helm/observability-operator/templates/deployment.yaml @@ -29,6 +29,9 @@ spec: - --management-cluster-pipeline={{ $.Values.managementCluster.pipeline }} - --management-cluster-region={{ $.Values.managementCluster.region }} - --monitoring-enabled={{ $.Values.monitoring.enabled }} + {{- if .Values.monitoring.prometheusVersion }} + - --prometheus-version={{ $.Values.monitoring.prometheusVersion | quote }} + {{- end }} env: - name: OPSGENIE_API_KEY valueFrom: diff --git a/helm/observability-operator/values.yaml b/helm/observability-operator/values.yaml index 3688290c..8e02a133 100644 --- a/helm/observability-operator/values.yaml +++ b/helm/observability-operator/values.yaml @@ -16,6 +16,7 @@ managementCluster: monitoring: enabled: false opsgenieApiKey: "" + prometheusVersion: "" operator: # -- Configures the resources for the operator deployment diff --git a/internal/controller/cluster_monitoring_controller.go b/internal/controller/cluster_monitoring_controller.go index b1766029..e80d490e 100644 --- a/internal/controller/cluster_monitoring_controller.go +++ b/internal/controller/cluster_monitoring_controller.go @@ -31,6 +31,7 @@ import ( "github.com/giantswarm/observability-operator/pkg/common" "github.com/giantswarm/observability-operator/pkg/monitoring" "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent" ) // ClusterMonitoringReconciler reconciles a Cluster object @@ -38,6 +39,8 @@ type ClusterMonitoringReconciler struct { // Client is the controller client. client.Client common.ManagementCluster + // PrometheusAgentService is the service for managing PrometheusAgent resources. + prometheusagent.PrometheusAgentService // HeartbeatRepository is the repository for managing heartbeats. heartbeat.HeartbeatRepository // MonitoringEnabled defines whether monitoring is enabled at the installation level. @@ -113,6 +116,13 @@ func (r *ClusterMonitoringReconciler) reconcile(ctx context.Context, cluster *cl } } + // Create or update PrometheusAgent remote write configuration. + err := r.PrometheusAgentService.ReconcilePrometheusAgentRemoteWriteConfig(ctx, cluster) + if err != nil { + logger.Error(err, "failed to create or update prometheus agent remote write config") + return ctrl.Result{Requeue: true}, errors.WithStack(err) + } + return ctrl.Result{}, nil } @@ -120,7 +130,6 @@ func (r *ClusterMonitoringReconciler) reconcile(ctx context.Context, cluster *cl func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster) (reconcile.Result, error) { logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) if controllerutil.ContainsFinalizer(cluster, monitoring.MonitoringFinalizer) { - if cluster.Name == r.ManagementCluster.Name { err := r.HeartbeatRepository.Delete(ctx) if err != nil { @@ -129,11 +138,17 @@ func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, clust } } + err := r.PrometheusAgentService.DeletePrometheusAgentRemoteWriteConfig(ctx, cluster) + if err != nil { + logger.Error(err, "failed to delete prometheus agent remote write config") + return ctrl.Result{Requeue: true}, errors.WithStack(err) + } + // We get the latest state of the object to avoid race conditions. // Finalizer handling needs to come last. logger.Info("removing finalizer", "finalizer", monitoring.MonitoringFinalizer) controllerutil.RemoveFinalizer(cluster, monitoring.MonitoringFinalizer) - err := r.Client.Update(ctx, cluster) + err = r.Client.Update(ctx, cluster) if err != nil { // We need to requeue if we fail to remove the finalizer because of race conditions between multiple operators. // This will be eventually consistent. diff --git a/main.go b/main.go index 9845d80c..1a90c946 100644 --- a/main.go +++ b/main.go @@ -39,7 +39,9 @@ import ( "github.com/giantswarm/observability-operator/internal/controller" "github.com/giantswarm/observability-operator/pkg/common" + "github.com/giantswarm/observability-operator/pkg/common/organization" "github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent" //+kubebuilder:scaffold:imports ) @@ -57,6 +59,7 @@ var ( managementClusterPipeline string managementClusterRegion string monitoringEnabled bool + prometheusVersion string ) const ( @@ -92,6 +95,8 @@ func main() { "The region of the management cluster.") flag.BoolVar(&monitoringEnabled, "monitoring-enabled", false, "Enable monitoring at the management cluster level.") + flag.StringVar(&prometheusVersion, "prometheus-version", "", + "The version of Prometheus Agents to deploy.") opts := zap.Options{ Development: true, } @@ -170,11 +175,22 @@ func main() { os.Exit(1) } - if err = (&controller.ClusterMonitoringReconciler{ - Client: mgr.GetClient(), - ManagementCluster: managementCluster, - HeartbeatRepository: heartbeatRepository, - MonitoringEnabled: monitoringEnabled, + organizationRepository := organization.NewNamespaceRepository(mgr.GetClient()) + + // TODO(atlas): validate prometheus version + prometheusAgentService := prometheusagent.PrometheusAgentService{ + Client: mgr.GetClient(), + OrganizationRepository: organizationRepository, + ManagementCluster: managementCluster, + PrometheusVersion: prometheusVersion, + } + + if err = (&controller.ClusterMonitoringReconciler{ + Client: mgr.GetClient(), + ManagementCluster: managementCluster, + HeartbeatRepository: heartbeatRepository, + PrometheusAgentService: prometheusAgentService, + MonitoringEnabled: monitoringEnabled, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/pkg/common/organization/repository.go b/pkg/common/organization/repository.go new file mode 100644 index 00000000..5057cf9f --- /dev/null +++ b/pkg/common/organization/repository.go @@ -0,0 +1,39 @@ +package organization + +import ( + "context" + "errors" + + corev1 "k8s.io/api/core/v1" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + organizationLabel string = "giantswarm.io/organization" +) + +type OrganizationRepository interface { + Read(ctx context.Context, cluster *clusterv1.Cluster) (string, error) +} + +type NamespaceOrganizationRepository struct { + client.Client +} + +func NewNamespaceRepository(client client.Client) OrganizationRepository { + return NamespaceOrganizationRepository{client} +} + +func (r NamespaceOrganizationRepository) Read(ctx context.Context, cluster *clusterv1.Cluster) (string, error) { + namespace := &corev1.Namespace{} + err := r.Client.Get(ctx, client.ObjectKey{Name: cluster.GetNamespace()}, namespace) + if err != nil { + return "", err + } + + if organization, ok := namespace.Labels[organizationLabel]; ok { + return organization, nil + } + return "", errors.New("cluster namespace missing organization label") +} diff --git a/pkg/monitoring/mimir/querier/querier.go b/pkg/monitoring/mimir/querier/querier.go new file mode 100644 index 00000000..83c32e27 --- /dev/null +++ b/pkg/monitoring/mimir/querier/querier.go @@ -0,0 +1,67 @@ +package querier + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" +) + +// headerAdder is an http.RoundTripper that adds additional headers to the request +type headerAdder struct { + headers map[string][]string + + rt http.RoundTripper +} + +func (h *headerAdder) RoundTrip(req *http.Request) (*http.Response, error) { + for k, vv := range h.headers { + for _, v := range vv { + req.Header.Add(k, v) + } + } + return h.rt.RoundTrip(req) +} + +// QueryTSDBHeadSeries performs an instant query against Mimir. +func QueryTSDBHeadSeries(ctx context.Context, clusterName string) (float64, error) { + headerAdder := &headerAdder{ + headers: map[string][]string{ + "X-Org-Id": {"anonynous"}, + }, + rt: http.DefaultTransport, + } + config := api.Config{ + Address: "http://mimir-gateway.mimir.svc/prometheus", + RoundTripper: headerAdder, + } + + // Create new client. + c, err := api.NewClient(config) + if err != nil { + return 0, err + } + + // Run query against client. + api := v1.NewAPI(c) + + queryContext, cancel := context.WithTimeout(ctx, 2*time.Minute) + val, _, err := api.Query(queryContext, fmt.Sprintf("max_over_time(count({cluster_id=\"%s\"})[6h])", clusterName), time.Now()) + cancel() + if err != nil { + return 0, err + } + + switch val.Type() { + case model.ValVector: + vector := val.(model.Vector) + return float64(vector[0].Value), nil + default: + return 0, errors.New("failed to get current number of time series") + } +} diff --git a/pkg/monitoring/prometheusagent/remotewrite/types.go b/pkg/monitoring/prometheusagent/remotewrite/types.go new file mode 100644 index 00000000..309cf566 --- /dev/null +++ b/pkg/monitoring/prometheusagent/remotewrite/types.go @@ -0,0 +1,27 @@ +package remotewrite + +import ( + promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" +) + +type RemoteWriteConfig struct { + PrometheusAgentConfig PrometheusAgentConfig `yaml:"prometheus-agent,omitempty" json:"prometheus-agent,omitempty"` +} + +type PrometheusAgentConfig struct { + ExternalLabels map[string]string `yaml:"externalLabels,omitempty" json:"externalLabels,omitempty"` + Image PrometheusAgentImage `yaml:"image,omitempty" json:"image,omitempty"` + RemoteWrite []RemoteWrite `yaml:"remoteWrite,omitempty" json:"remoteWrite,omitempty"` + Shards int `yaml:"shards,omitempty" json:"shards,omitempty"` + Version string `yaml:"version,omitempty" json:"version,omitempty"` +} + +type PrometheusAgentImage struct { + Tag string `yaml:"tag" json:"tag"` +} + +type RemoteWrite struct { + promv1.RemoteWriteSpec `yaml:",inline" json:",inline"` + Password string `yaml:"password" json:"password"` + Username string `yaml:"username" json:"username"` +} diff --git a/pkg/monitoring/prometheusagent/service.go b/pkg/monitoring/prometheusagent/service.go new file mode 100644 index 00000000..29d8d18c --- /dev/null +++ b/pkg/monitoring/prometheusagent/service.go @@ -0,0 +1,225 @@ +package prometheusagent + +import ( + "context" + "fmt" + "net" + "reflect" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/giantswarm/observability-operator/pkg/common" + "github.com/giantswarm/observability-operator/pkg/common/organization" + "github.com/giantswarm/observability-operator/pkg/monitoring" + "github.com/giantswarm/observability-operator/pkg/monitoring/mimir/querier" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/remotewrite" + "github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent/shards" +) + +const ( + // defaultServicePriority is the default service priority if not set. + defaultServicePriority string = "highest" + // defaultShards is the default number of shards to use. + defaultShards = 1 + + // servicePriorityLabel is the label used to determine the priority of a service. + servicePriorityLabel string = "giantswarm.io/service-priority" +) + +type PrometheusAgentService struct { + client.Client + organization.OrganizationRepository + common.ManagementCluster + PrometheusVersion string +} + +// ensurePrometheusAgentRemoteWriteConfig ensures that the prometheus remote write config is present in the cluster. +func (pas *PrometheusAgentService) ReconcilePrometheusAgentRemoteWriteConfig(ctx context.Context, cluster *clusterv1.Cluster) error { + logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) + logger.Info("ensuring prometheus remote write config") + + objectKey := client.ObjectKey{ + Name: getPrometheusAgentRemoteWriteConfigName(cluster), + Namespace: cluster.GetNamespace(), + } + + current := &corev1.ConfigMap{} + // Get the current configmap if it exists. + err := pas.Client.Get(ctx, objectKey, current) + if apierrors.IsNotFound(err) { + configMap, err := pas.buildRemoteWriteConfig(ctx, cluster, logger, defaultShards) + if err != nil { + return errors.WithStack(err) + } + + err = pas.Client.Create(ctx, configMap) + return errors.WithStack(err) + } else if err != nil { + return errors.WithStack(err) + } + + currentShards, err := readCurrentShardsFromConfig(*current) + if err != nil { + return errors.WithStack(err) + } + + desired, err := pas.buildRemoteWriteConfig(ctx, cluster, logger, currentShards) + if err != nil { + return errors.WithStack(err) + } + + if !reflect.DeepEqual(current.Data, desired.Data) { + err = pas.Client.Patch(ctx, current, client.MergeFrom(desired)) + if err != nil { + return errors.WithStack(err) + } + } + + logger.Info("ensured prometheus remote write config") + + return nil +} + +func (pas *PrometheusAgentService) DeletePrometheusAgentRemoteWriteConfig(ctx context.Context, cluster *clusterv1.Cluster) error { + logger := log.FromContext(ctx).WithValues("cluster", cluster.Name) + logger.Info("deleting prometheus remote write config") + + objectKey := client.ObjectKey{ + Name: getPrometheusAgentRemoteWriteConfigName(cluster), + Namespace: cluster.GetNamespace(), + } + + current := &corev1.ConfigMap{} + // Get the current configmap if it exists. + err := pas.Client.Get(ctx, objectKey, current) + if apierrors.IsNotFound(err) { + // We ignore cases where the configmap is not found (it it was manually deleted for instance) + return nil + } else if err != nil { + return errors.WithStack(err) + } + + desired := current.DeepCopy() + // Delete the finalizer + controllerutil.RemoveFinalizer(desired, monitoring.MonitoringFinalizer) + err = pas.Client.Patch(ctx, current, client.MergeFrom(desired)) + if err != nil { + return errors.WithStack(err) + } + + err = pas.Client.Delete(ctx, desired) + if err != nil { + return errors.WithStack(err) + } + + logger.Info("deleted prometheus remote write config") + + return nil +} + +func (pas PrometheusAgentService) buildRemoteWriteConfig(ctx context.Context, cluster *clusterv1.Cluster, logger logr.Logger, currentShards int) (*corev1.ConfigMap, error) { + organization, err := pas.OrganizationRepository.Read(ctx, cluster) + if err != nil { + logger.Error(err, "failed to get cluster organization") + return nil, errors.WithStack(err) + } + + provider, err := common.GetClusterProvider(cluster) + if err != nil { + logger.Error(err, "failed to get cluster provider") + return nil, errors.WithStack(err) + } + + clusterType := "workload_cluster" + if val, ok := cluster.Labels["cluster.x-k8s.io/cluster-name"]; ok && val == pas.ManagementCluster.Name { + clusterType = "management_cluster" + } + + externalLabels := map[string]string{ + "cluster_id": cluster.Name, + "cluster_type": clusterType, + "customer": pas.ManagementCluster.Customer, + "installation": pas.ManagementCluster.Name, + "organization": organization, + "pipeline": pas.ManagementCluster.Pipeline, + "provider": provider, + "region": pas.ManagementCluster.Region, + "service_priority": getServicePriority(cluster), + } + + shards, err := getShardsCountForCluster(ctx, cluster, currentShards) + if err != nil { + return nil, errors.WithStack(err) + } + + config, err := yaml.Marshal(remotewrite.RemoteWriteConfig{ + PrometheusAgentConfig: remotewrite.PrometheusAgentConfig{ + ExternalLabels: externalLabels, + Image: remotewrite.PrometheusAgentImage{ + Tag: pas.PrometheusVersion, + }, + Shards: shards, + Version: pas.PrometheusVersion, + }, + }) + if err != nil { + return nil, errors.WithStack(err) + } + + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: getPrometheusAgentRemoteWriteConfigName(cluster), + Namespace: cluster.Namespace, + Finalizers: []string{ + monitoring.MonitoringFinalizer, + }, + }, + Data: map[string]string{ + "values": string(config), + }, + }, nil +} + +func getPrometheusAgentRemoteWriteConfigName(cluster *clusterv1.Cluster) string { + return fmt.Sprintf("%s-remote-write-config", cluster.Name) +} + +func readCurrentShardsFromConfig(configMap corev1.ConfigMap) (int, error) { + remoteWriteConfig := remotewrite.RemoteWriteConfig{} + err := yaml.Unmarshal([]byte(configMap.Data["values"]), &remoteWriteConfig) + if err != nil { + return 0, errors.WithStack(err) + } + + return remoteWriteConfig.PrometheusAgentConfig.Shards, nil +} + +// We want to compute the number of shards based on the number of nodes. +func getShardsCountForCluster(ctx context.Context, cluster *clusterv1.Cluster, currentShardCount int) (int, error) { + headSeries, err := querier.QueryTSDBHeadSeries(ctx, cluster.Name) + if err != nil { + // If prometheus is not accessible (for instance, not running because this is a new cluster, we check if prometheus is accessible) + var dnsError *net.DNSError + if errors.As(err, &dnsError) { + return shards.ComputeShards(currentShardCount, defaultShardCount), nil + } + return 0, errors.WithStack(err) + } + return shards.ComputeShards(currentShardCount, headSeries), nil +} + +func getServicePriority(cluster *clusterv1.Cluster) string { + if servicePriority, ok := cluster.GetLabels()[servicePriorityLabel]; ok && servicePriority != "" { + return servicePriority + } + return defaultServicePriority +} diff --git a/pkg/monitoring/prometheusagent/shards/shards.go b/pkg/monitoring/prometheusagent/shards/shards.go new file mode 100644 index 00000000..25394ff4 --- /dev/null +++ b/pkg/monitoring/prometheusagent/shards/shards.go @@ -0,0 +1,28 @@ +package shards + +import "math" + +const ( + shardStep = float64(1_000_000) + shardScaleDownPercentage = float64(0.20) + shardScaleDownThreshold = shardScaleDownPercentage * shardStep +) + +// We want to start with 1 prometheus-agent for each 1M time series with a scale down 25% threshold. +func ComputeShards(currentShardCount int, timeSeries float64) int { + desiredShardCount := int(math.Ceil(timeSeries / shardStep)) + + // Compute Scale Down + if currentShardCount > desiredShardCount { + // We get the rest of a division of timeSeries by shardStep and we compare it with the scale down threshold + if math.Mod(timeSeries, shardStep) > shardStep-shardScaleDownThreshold { + desiredShardCount = currentShardCount + } + } + + // We always have a minimum of 1 agent, even if there is no worker node + if desiredShardCount <= 0 { + return 1 + } + return desiredShardCount +} diff --git a/pkg/monitoring/prometheusagent/shards/shards_test.go b/pkg/monitoring/prometheusagent/shards/shards_test.go new file mode 100644 index 00000000..3043a5ec --- /dev/null +++ b/pkg/monitoring/prometheusagent/shards/shards_test.go @@ -0,0 +1,69 @@ +package shards + +import ( + "flag" + "testing" +) + +var _ = flag.Bool("update", false, "update the output file") + +func TestShardComputationScaleUp(t *testing.T) { + expected := 1 + result := ComputeShards(0, float64(1_000_000)) + if result != expected { + t.Errorf(`expected ComputeShards(0, 1_000_000) to be %d, got %d`, expected, result) + } + + expected = 2 + result = ComputeShards(0, float64(1_000_001)) + if result != expected { + t.Errorf(`expected ComputeShards(0, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 3 + result = ComputeShards(0, float64(2_000_001)) + if result != expected { + t.Errorf(`expected ComputeShards(0, 2_000_001) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationReturnsAtLeast1Shart(t *testing.T) { + expected := 1 + result := ComputeShards(0, 0) + if result != expected { + t.Errorf(`expected ComputeShards(0, 0) to be %d, got %d`, expected, result) + } + + expected = 1 + result = ComputeShards(0, -5) + if result != expected { + t.Errorf(`expected ComputeShards(0, -5) to be %d, got %d`, expected, result) + } +} + +func TestShardComputationScaleDown(t *testing.T) { + expected := 2 + result := ComputeShards(1, 1_000_001) + if result != expected { + t.Errorf(`expected ComputeShards(1, 1_000_001) to be %d, got %d`, expected, result) + } + + expected = 2 + result = ComputeShards(2, 999_999) + if result != expected { + t.Errorf(`expected ComputeShards(2, 999_999) to be %d, got %d`, expected, result) + } + + expected = 2 + result = ComputeShards(2, 800_001) + if result != expected { + t.Errorf(`expected ComputeShards(2, 800_001) to be %d, got %d`, expected, result) + } + + // threshold hit + expected = 1 + result = ComputeShards(2, 800_000) + if result != expected { + t.Errorf(`expected ComputeShards(2, 800_000) to be %d, got %d`, expected, result) + } +}