diff --git a/cmd/csi_driver/main.go b/cmd/csi_driver/main.go index 8b19016be..6815a997d 100644 --- a/cmd/csi_driver/main.go +++ b/cmd/csi_driver/main.go @@ -74,10 +74,6 @@ func main() { if *httpEndpoint != "" && metrics.IsGKEComponentVersionAvailable() { mm = metrics.NewMetricsManager() mm.InitializeHTTPHandler(*httpEndpoint, *metricsPath) - err = mm.EmitGKEComponentVersion() - if err != nil { - klog.Fatalf("Failed to emit GKE compoent version: %v", err) - } } } else { if *nodeID == "" { diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 8ec2f00ca..789479256 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -20,6 +20,7 @@ import ( "flag" "net/http" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics" wh "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -39,6 +40,8 @@ var ( memoryLimit = flag.String("sidecar-memory-limit", "256Mi", "The default memory limit for gcsfuse sidecar container.") ephemeralStorageLimit = flag.String("sidecar-ephemeral-storage-limit", "10Gi", "The default ephemeral storage limit for gcsfuse sidecar container.") sidecarImageName = flag.String("sidecar-image-name", "", "The gcsfuse sidecar container image name.") + httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled.") + metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") // These are set at compile time. version = "unknown" @@ -51,6 +54,14 @@ func main() { klog.Infof("Running Google Cloud Storage FUSE CSI driver admission webhook version %v, sidecar container image %v:%v", version, *sidecarImageName, sidecarImageVersion) + if *httpEndpoint != "" && metrics.IsGKEComponentVersionAvailable() { + mm := metrics.NewMetricsManager() + mm.InitializeHTTPHandler(*httpEndpoint, *metricsPath) + if err := mm.EmitGKEComponentVersion(); err != nil { + klog.Fatalf("Failed to emit GKE component version: %v", err) + } + } + // Load webhook config c, err := wh.LoadConfig(*sidecarImageName, sidecarImageVersion, *imagePullPolicy, *cpuLimit, *memoryLimit, *ephemeralStorageLimit) if err != nil { diff --git a/deploy/base/webhook/deployment.yaml b/deploy/base/webhook/deployment.yaml index 5745371a7..549b0ccf1 100644 --- a/deploy/base/webhook/deployment.yaml +++ b/deploy/base/webhook/deployment.yaml @@ -52,11 +52,14 @@ spec: - --cert-dir=/etc/tls-certs - --port=22030 - --health-probe-bind-address=:22031 + - --http-endpoint=:22032 env: - name: SIDECAR_IMAGE_PULL_POLICY value: "IfNotPresent" - name: SIDECAR_IMAGE_REGISTRY value: "" + - name: GKE_GCSFUSECSI_VERSION + value: "v999.999.999" resources: limits: cpu: 200m @@ -69,6 +72,9 @@ spec: containerPort: 22030 - name: readyz containerPort: 22031 + - name: http-endpoint + containerPort: 22032 + protocol: TCP livenessProbe: httpGet: scheme: HTTP @@ -95,6 +101,11 @@ spec: selector: app: gcs-fuse-csi-driver-webhook ports: - - protocol: TCP + - name: injector + protocol: TCP port: 443 targetPort: 22030 + - name: metrics + protocol: TCP + port: 8080 + targetPort: 22032 \ No newline at end of file diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index c7e76ab10..010d19cc2 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -22,43 +22,21 @@ import ( "os" "time" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "k8s.io/component-base/metrics" "k8s.io/klog/v2" ) const ( - // envGKEGCSCSIVersion is an environment variable set in the Cloud Storage FUSE CSI driver controller manifest + // envGKEGCSFuseCSIVersion is an environment variable set in the Cloud Storage FUSE CSI driver webhook manifest // with the current version of the GKE component. - envGKEGCSCSIVersion = "GKE_GCSCSI_VERSION" - - subSystem = "gcscsi" - operationsLatencyMetricName = "operations_seconds" - - labelStatusCode = "grpc_status_code" - labelMethodName = "method_name" + envGKEGCSFuseCSIVersion = "GKE_GCSFUSECSI_VERSION" ) -var ( - metricBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600} - - // This metric is exposed only from the controller driver component when GKE_GCSCSI_VERSION env variable is set. - gkeComponentVersion = metrics.NewGaugeVec(&metrics.GaugeOpts{ - Name: "component_version", - Help: "Metric to expose the version of the GCSCSI GKE component.", - }, []string{"component_version"}) - - operationSeconds = metrics.NewHistogramVec( - &metrics.HistogramOpts{ - Subsystem: subSystem, - Name: operationsLatencyMetricName, - Buckets: metricBuckets, - Help: "Operation latency in seconds", - }, - []string{labelStatusCode, labelMethodName}, - ) -) +// This metric is exposed only from the webhook component when GKE_GCSFUSECSI_VERSION env variable is set. +var gkeComponentVersion = metrics.NewGaugeVec(&metrics.GaugeOpts{ + Name: "component_version", + Help: "Metric to expose the version of the gcsfusecsi GKE component.", +}, []string{"component_version"}) type Manager struct { registry metrics.KubeRegistry @@ -68,25 +46,44 @@ func NewMetricsManager() *Manager { mm := &Manager{ registry: metrics.NewKubeRegistry(), } - mm.registry.MustRegister(operationSeconds) return mm } -func (mm *Manager) GetRegistry() metrics.KubeRegistry { - return mm.registry +// InitializeHTTPHandler sets up a server and creates a handler for metrics. +func (mm *Manager) InitializeHTTPHandler(address, path string) { + mux := http.NewServeMux() + mux.Handle(path, metrics.HandlerFor( + mm.registry, + metrics.HandlerOpts{ + ErrorHandling: metrics.ContinueOnError, + })) + server := &http.Server{ + Addr: address, + ReadHeaderTimeout: 3 * time.Second, + Handler: mux, + } + + go func() { + klog.Infof("Metric server listening at %q", address) + if err := server.ListenAndServe(); err != nil { + klog.Fatalf("Failed to start metric server at specified address (%q) and path (%q): %s", address, path, err) + } + }() } -func (mm *Manager) registerComponentVersionMetric() { +func (mm *Manager) EmitGKEComponentVersion() error { mm.registry.MustRegister(gkeComponentVersion) + + return mm.recordComponentVersionMetric() } func (mm *Manager) recordComponentVersionMetric() error { - v := getEnvVar(envGKEGCSCSIVersion) + v := getEnvVar(envGKEGCSFuseCSIVersion) if v == "" { klog.V(2).Info("Skip emitting component version metric") - return fmt.Errorf("failed to register GKE component version metric, env variable %v not defined", envGKEGCSCSIVersion) + return fmt.Errorf("failed to register GKE component version metric, env variable %v not defined", envGKEGCSFuseCSIVersion) } klog.Infof("Emit component_version metric with value %v", v) @@ -95,61 +92,6 @@ func (mm *Manager) recordComponentVersionMetric() error { return nil } -func (mm *Manager) RecordOperationMetrics(opErr error, methodName string, opDuration time.Duration) { - operationSeconds.WithLabelValues(getErrorCode(opErr), methodName).Observe(opDuration.Seconds()) -} - -func getErrorCode(err error) string { - if err == nil { - return codes.OK.String() - } - - st, ok := status.FromError(err) - if !ok { - // This is not gRPC error. The operation must have failed before gRPC - // method was called, otherwise we would get gRPC error. - return "unknown-non-grpc" - } - - return st.Code().String() -} - -func (mm *Manager) EmitGKEComponentVersion() error { - mm.registerComponentVersionMetric() - - return mm.recordComponentVersionMetric() -} - -// Server represents any type that could serve HTTP requests for the metrics -// endpoint. -type Server interface { - Handle(pattern string, handler http.Handler) -} - -// RegisterToServer registers an HTTP handler for this metrics manager to the -// given server at the specified address/path. -func (mm *Manager) registerToServer(s Server, metricsPath string) { - s.Handle(metricsPath, metrics.HandlerFor( - mm.GetRegistry(), - metrics.HandlerOpts{ - ErrorHandling: metrics.ContinueOnError, - })) -} - -// InitializeHTTPHandler sets up a server and creates a handler for metrics. -func (mm *Manager) InitializeHTTPHandler(address, path string) { - mux := http.NewServeMux() - mm.registerToServer(mux, path) - - go func() { - klog.Infof("Metric server listening at %q", address) - //nolint:gosec - if err := http.ListenAndServe(address, mux); err != nil { - klog.Fatalf("Failed to start metric server at specified address (%q) and path (%q): %s", address, path, err) - } - }() -} - func getEnvVar(envVarName string) string { v, ok := os.LookupEnv(envVarName) if !ok { @@ -162,5 +104,5 @@ func getEnvVar(envVarName string) string { } func IsGKEComponentVersionAvailable() bool { - return getEnvVar(envGKEGCSCSIVersion) != "" + return getEnvVar(envGKEGCSFuseCSIVersion) != "" }