Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinBisson committed Apr 4, 2024
1 parent 2511a7e commit 85eeb72
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 10 deletions.
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ require (
github.com/onsi/gomega v1.32.0
github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.22
github.com/pkg/errors v0.9.1
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/common v0.51.1
github.com/sirupsen/logrus v1.9.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.3
k8s.io/apimachinery v0.29.3
k8s.io/client-go v0.29.3
sigs.k8s.io/cluster-api v1.6.3
Expand Down Expand Up @@ -44,9 +49,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.51.1 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand All @@ -63,9 +66,7 @@ require (
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.3 // indirect
k8s.io/apiextensions-apiserver v0.29.3 // indirect
k8s.io/component-base v0.29.3 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
Expand Down
9 changes: 8 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand All @@ -77,6 +79,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/onsi/ginkgo/v2 v2.17.1 h1:V++EzdbhI4ZV4ev0UTIj0PzhzOcReJFyJaLjtSF55M8=
github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs=
github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk=
Expand All @@ -86,8 +90,11 @@ github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.22/go.mod h1:4OjcxgwdXzezqytxN534Moo
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0 h1:9h7PxMhT1S8lOdadEKJnBh3ELMdO60XkoDV98grYjuM=
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.72.0/go.mod h1:4FiLCL664L4dNGeqZewiiD0NS7hhqi/CxyM4UOq5dfM=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
Expand Down
126 changes: 125 additions & 1 deletion internal/controller/cluster_monitoring_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package controller

import (
"context"
"reflect"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -29,8 +31,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/giantswarm/observability-operator/pkg/common"
"github.com/giantswarm/observability-operator/pkg/common/organization"
"github.com/giantswarm/observability-operator/pkg/monitoring"
"github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat"
"github.com/giantswarm/observability-operator/pkg/monitoring/prometheusagent"
)

// ClusterMonitoringReconciler reconciles a Cluster object
Expand All @@ -40,6 +44,8 @@ type ClusterMonitoringReconciler struct {
common.ManagementCluster
// HeartbeatRepository is the repository for managing heartbeats.
heartbeat.HeartbeatRepository
// OrganizationRepository is the repository for reading cluster organization.
organization.OrganizationRepository
// MonitoringEnabled defines whether monitoring is enabled at the installation level.
MonitoringEnabled bool
}
Expand Down Expand Up @@ -105,6 +111,18 @@ func (r *ClusterMonitoringReconciler) reconcile(ctx context.Context, cluster *cl
return ctrl.Result{}, nil
}

organization, err := r.OrganizationRepository.Read(ctx, cluster)
if err != nil {
logger.Error(err, "failed to get cluster organization")
return ctrl.Result{Requeue: true}, errors.WithStack(err)
}

provider, err := common.GetClusterProvider(cluster)
if err != nil {
logger.Error(err, "failed to get cluster provider")
return ctrl.Result{Requeue: true}, errors.WithStack(err)
}

if cluster.Name == r.ManagementCluster.Name {
err := r.HeartbeatRepository.CreateOrUpdate(ctx)
if err != nil {
Expand All @@ -113,14 +131,16 @@ func (r *ClusterMonitoringReconciler) reconcile(ctx context.Context, cluster *cl
}
}

// Create or update PrometheusAgent remote write configuration.
r.ensurePrometheusAgentRemoteWriteConfig(ctx, cluster, organization, provider)

return ctrl.Result{}, nil
}

// reconcileDelete handles cluster deletion.
func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster) (reconcile.Result, error) {
logger := log.FromContext(ctx).WithValues("cluster", cluster.Name)
if controllerutil.ContainsFinalizer(cluster, monitoring.MonitoringFinalizer) {

if cluster.Name == r.ManagementCluster.Name {
err := r.HeartbeatRepository.Delete(ctx)
if err != nil {
Expand All @@ -129,6 +149,23 @@ func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, clust
}
}

organization, err := r.OrganizationRepository.Read(ctx, cluster)
if err != nil {
logger.Error(err, "failed to get cluster organization")
return ctrl.Result{Requeue: true}, errors.WithStack(err)
}

provider, err := common.GetClusterProvider(cluster)
if err != nil {
logger.Error(err, "failed to get cluster provider")
return ctrl.Result{Requeue: true}, errors.WithStack(err)
}

err = r.deletePrometheusAgentRemoteWriteConfig(ctx, cluster, organization, provider)
if err != nil {
logger.Error(err, "failed to dekete prometheus agent remote write config")
return ctrl.Result{Requeue: true}, errors.WithStack(err)
}
// We get the latest state of the object to avoid race conditions.
// Finalizer handling needs to come last.
logger.Info("removing finalizer", "finalizer", monitoring.MonitoringFinalizer)
Expand All @@ -145,3 +182,90 @@ func (r *ClusterMonitoringReconciler) reconcileDelete(ctx context.Context, clust
controllerutil.RemoveFinalizer(cluster, monitoring.MonitoringFinalizer)
return ctrl.Result{}, nil
}

// ensurePrometheusAgentRemoteWriteConfig ensures that the prometheus remote write config is present in the cluster.
func (r *ClusterMonitoringReconciler) ensurePrometheusAgentRemoteWriteConfig(ctx context.Context, cluster *clusterv1.Cluster, organization string, provider string) error {
logger := log.FromContext(ctx)
logger.Info("ensuring prometheus remote write config")

objectKey := client.ObjectKey{
Name: prometheusagent.GetPrometheusAgentRemoteWriteConfigName(cluster),
Namespace: cluster.GetNamespace(),
}

current := &corev1.ConfigMap{}
// Get the current configmap if it exists.
err := r.Client.Get(ctx, objectKey, current)
if apierrors.IsNotFound(err) {
configMap, err := prometheusagent.BuildRemoteWriteConfig(ctx, r.ManagementCluster, cluster, organization, provider, 1)
if err != nil {
return errors.WithStack(err)
}

err = r.Client.Create(ctx, configMap)
return errors.WithStack(err)
} else if err != nil {
return errors.WithStack(err)
}

if current != nil {
currentShards, err := prometheusagent.ReadCurrentShardsFromConfig(*current)
if err != nil {
return errors.WithStack(err)
}

desired, err := prometheusagent.BuildRemoteWriteConfig(ctx, r.ManagementCluster, cluster, organization, provider, currentShards)
if err != nil {
return errors.WithStack(err)
}

if !reflect.DeepEqual(current.Data, desired.Data) {
err = r.Client.Patch(ctx, current, client.MergeFrom(desired))
if err != nil {
return errors.WithStack(err)
}
}
}

logger.Info("ensured prometheus remote write config")

return nil
}

func (r *ClusterMonitoringReconciler) deletePrometheusAgentRemoteWriteConfig(ctx context.Context, cluster *clusterv1.Cluster, organization string, provider string) error {

logger := log.FromContext(ctx)
logger.Info("deleting prometheus remote write config")

objectKey := client.ObjectKey{
Name: prometheusagent.GetPrometheusAgentRemoteWriteConfigName(cluster),
Namespace: cluster.GetNamespace(),
}

current := &corev1.ConfigMap{}
// Get the current configmap if it exists.
err := r.Client.Get(ctx, objectKey, current)
if apierrors.IsNotFound(err) {
// We ignore cases where the configmap is not found (it it was manually deleted for instance)
return nil
} else if err != nil {
return errors.WithStack(err)
}

desired := current.DeepCopy()
// Delete the finalizer
controllerutil.RemoveFinalizer(desired, monitoring.MonitoringFinalizer)
err = r.Client.Patch(ctx, current, client.MergeFrom(desired))
if err != nil {
return errors.WithStack(err)
}

err = r.Client.Delete(ctx, desired)
if err != nil {
return errors.WithStack(err)
}

logger.Info("deleted prometheus remote write config")

return nil
}
12 changes: 8 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/giantswarm/observability-operator/internal/controller"
"github.com/giantswarm/observability-operator/pkg/common"
"github.com/giantswarm/observability-operator/pkg/common/organization"
"github.com/giantswarm/observability-operator/pkg/monitoring/heartbeat"
//+kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -170,11 +171,14 @@ func main() {
os.Exit(1)
}

organizationRepository := organization.NewNamespaceRepository(mgr.GetClient())

if err = (&controller.ClusterMonitoringReconciler{
Client: mgr.GetClient(),
ManagementCluster: managementCluster,
HeartbeatRepository: heartbeatRepository,
MonitoringEnabled: monitoringEnabled,
Client: mgr.GetClient(),
ManagementCluster: managementCluster,
HeartbeatRepository: heartbeatRepository,
OrganizationRepository: organizationRepository,
MonitoringEnabled: monitoringEnabled,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
39 changes: 39 additions & 0 deletions pkg/common/organization/repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package organization

import (
"context"
"errors"

corev1 "k8s.io/api/core/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
organizationLabel string = "giantswarm.io/organization"
)

type OrganizationRepository interface {
Read(ctx context.Context, cluster *clusterv1.Cluster) (string, error)
}

type NamespaceOrganizationRepository struct {
client.Client
}

func NewNamespaceRepository(client client.Client) OrganizationRepository {
return NamespaceOrganizationRepository{client}
}

func (r NamespaceOrganizationRepository) Read(ctx context.Context, cluster *clusterv1.Cluster) (string, error) {
namespace := &corev1.Namespace{}
err := r.Client.Get(ctx, client.ObjectKey{Name: cluster.GetNamespace()}, namespace)
if err != nil {
return "", err
}

if organization, ok := namespace.Labels[organizationLabel]; ok {
return organization, nil
}
return "", errors.New("cluster namespace missing organization label")
}
67 changes: 67 additions & 0 deletions pkg/monitoring/mimir/querier/querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package querier

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)

// headerAdder is an http.RoundTripper that adds additional headers to the request
type headerAdder struct {
headers map[string][]string

rt http.RoundTripper
}

func (h *headerAdder) RoundTrip(req *http.Request) (*http.Response, error) {
for k, vv := range h.headers {
for _, v := range vv {
req.Header.Add(k, v)
}
}
return h.rt.RoundTrip(req)
}

// QueryTSDBHeadSeries performs an instant query against Mimir.
func QueryTSDBHeadSeries(clusterName string) (float64, error) {
headerAdder := &headerAdder{
headers: map[string][]string{
"X-Org-Id": {"anonynous"},
},
rt: http.DefaultTransport,
}
config := api.Config{
Address: "http://mimir-gateway.mimir.svc/prometheus",
RoundTripper: headerAdder,
}

// Create new client.
c, err := api.NewClient(config)
if err != nil {
return 0, err
}

// Run query against client.
api := v1.NewAPI(c)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, _, err := api.Query(ctx, fmt.Sprintf("max_over_time(count({cluster_id=\"%s\"})[6h])", clusterName), time.Now())
cancel()
if err != nil {
return 0, err
}

switch val.Type() {
case model.ValVector:
vector := val.(model.Vector)
return float64(vector[0].Value), nil
default:
return 0, errors.New("failed to get current number of time series")
}
}
Loading

0 comments on commit 85eeb72

Please sign in to comment.