Skip to content

Commit

Permalink
add component_version metric to the webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
songjiaxun committed Mar 31, 2023
1 parent b47e946 commit bfd18c7
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 96 deletions.
4 changes: 0 additions & 4 deletions cmd/csi_driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ func main() {
if *httpEndpoint != "" && metrics.IsGKEComponentVersionAvailable() {
mm = metrics.NewMetricsManager()
mm.InitializeHTTPHandler(*httpEndpoint, *metricsPath)
err = mm.EmitGKEComponentVersion()
if err != nil {
klog.Fatalf("Failed to emit GKE compoent version: %v", err)
}
}
} else {
if *nodeID == "" {
Expand Down
11 changes: 11 additions & 0 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"flag"
"net/http"

"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics"
wh "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -39,6 +40,8 @@ var (
memoryLimit = flag.String("sidecar-memory-limit", "256Mi", "The default memory limit for gcsfuse sidecar container.")
ephemeralStorageLimit = flag.String("sidecar-ephemeral-storage-limit", "10Gi", "The default ephemeral storage limit for gcsfuse sidecar container.")
sidecarImageName = flag.String("sidecar-image-name", "", "The gcsfuse sidecar container image name.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")

// These are set at compile time.
version = "unknown"
Expand All @@ -51,6 +54,14 @@ func main() {

klog.Infof("Running Google Cloud Storage FUSE CSI driver admission webhook version %v, sidecar container image %v:%v", version, *sidecarImageName, sidecarImageVersion)

if *httpEndpoint != "" && metrics.IsGKEComponentVersionAvailable() {
mm := metrics.NewMetricsManager()
mm.InitializeHTTPHandler(*httpEndpoint, *metricsPath)
if err := mm.EmitGKEComponentVersion(); err != nil {
klog.Fatalf("Failed to emit GKE component version: %v", err)
}
}

// Load webhook config
c, err := wh.LoadConfig(*sidecarImageName, sidecarImageVersion, *imagePullPolicy, *cpuLimit, *memoryLimit, *ephemeralStorageLimit)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion deploy/base/webhook/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ spec:
- --cert-dir=/etc/tls-certs
- --port=22030
- --health-probe-bind-address=:22031
- --http-endpoint=:22032
env:
- name: SIDECAR_IMAGE_PULL_POLICY
value: "IfNotPresent"
- name: SIDECAR_IMAGE_REGISTRY
value: ""
- name: GKE_GCSFUSECSI_VERSION
value: "v999.999.999"
resources:
limits:
cpu: 200m
Expand All @@ -69,6 +72,9 @@ spec:
containerPort: 22030
- name: readyz
containerPort: 22031
- name: http-endpoint
containerPort: 22032
protocol: TCP
livenessProbe:
httpGet:
scheme: HTTP
Expand All @@ -95,6 +101,11 @@ spec:
selector:
app: gcs-fuse-csi-driver-webhook
ports:
- protocol: TCP
- name: injector
protocol: TCP
port: 443
targetPort: 22030
- name: metrics
protocol: TCP
port: 8080
targetPort: 22032
124 changes: 33 additions & 91 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,21 @@ import (
"os"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/component-base/metrics"
"k8s.io/klog/v2"
)

const (
// envGKEGCSCSIVersion is an environment variable set in the Cloud Storage FUSE CSI driver controller manifest
// envGKEGCSFuseCSIVersion is an environment variable set in the Cloud Storage FUSE CSI driver webhook manifest
// with the current version of the GKE component.
envGKEGCSCSIVersion = "GKE_GCSCSI_VERSION"

subSystem = "gcscsi"
operationsLatencyMetricName = "operations_seconds"

labelStatusCode = "grpc_status_code"
labelMethodName = "method_name"
envGKEGCSFuseCSIVersion = "GKE_GCSFUSECSI_VERSION"
)

var (
metricBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600}

// This metric is exposed only from the controller driver component when GKE_GCSCSI_VERSION env variable is set.
gkeComponentVersion = metrics.NewGaugeVec(&metrics.GaugeOpts{
Name: "component_version",
Help: "Metric to expose the version of the GCSCSI GKE component.",
}, []string{"component_version"})

operationSeconds = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: subSystem,
Name: operationsLatencyMetricName,
Buckets: metricBuckets,
Help: "Operation latency in seconds",
},
[]string{labelStatusCode, labelMethodName},
)
)
// This metric is exposed only from the webhook component when GKE_GCSFUSECSI_VERSION env variable is set.
var gkeComponentVersion = metrics.NewGaugeVec(&metrics.GaugeOpts{
Name: "component_version",
Help: "Metric to expose the version of the gcsfusecsi GKE component.",
}, []string{"component_version"})

type Manager struct {
registry metrics.KubeRegistry
Expand All @@ -68,25 +46,44 @@ func NewMetricsManager() *Manager {
mm := &Manager{
registry: metrics.NewKubeRegistry(),
}
mm.registry.MustRegister(operationSeconds)

return mm
}

func (mm *Manager) GetRegistry() metrics.KubeRegistry {
return mm.registry
// InitializeHTTPHandler sets up a server and creates a handler for metrics.
func (mm *Manager) InitializeHTTPHandler(address, path string) {
mux := http.NewServeMux()
mux.Handle(path, metrics.HandlerFor(
mm.registry,
metrics.HandlerOpts{
ErrorHandling: metrics.ContinueOnError,
}))
server := &http.Server{
Addr: address,
ReadHeaderTimeout: 3 * time.Second,
Handler: mux,
}

go func() {
klog.Infof("Metric server listening at %q", address)
if err := server.ListenAndServe(); err != nil {
klog.Fatalf("Failed to start metric server at specified address (%q) and path (%q): %s", address, path, err)
}
}()
}

func (mm *Manager) registerComponentVersionMetric() {
func (mm *Manager) EmitGKEComponentVersion() error {
mm.registry.MustRegister(gkeComponentVersion)

return mm.recordComponentVersionMetric()
}

func (mm *Manager) recordComponentVersionMetric() error {
v := getEnvVar(envGKEGCSCSIVersion)
v := getEnvVar(envGKEGCSFuseCSIVersion)
if v == "" {
klog.V(2).Info("Skip emitting component version metric")

return fmt.Errorf("failed to register GKE component version metric, env variable %v not defined", envGKEGCSCSIVersion)
return fmt.Errorf("failed to register GKE component version metric, env variable %v not defined", envGKEGCSFuseCSIVersion)
}

klog.Infof("Emit component_version metric with value %v", v)
Expand All @@ -95,61 +92,6 @@ func (mm *Manager) recordComponentVersionMetric() error {
return nil
}

func (mm *Manager) RecordOperationMetrics(opErr error, methodName string, opDuration time.Duration) {
operationSeconds.WithLabelValues(getErrorCode(opErr), methodName).Observe(opDuration.Seconds())
}

func getErrorCode(err error) string {
if err == nil {
return codes.OK.String()
}

st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The operation must have failed before gRPC
// method was called, otherwise we would get gRPC error.
return "unknown-non-grpc"
}

return st.Code().String()
}

func (mm *Manager) EmitGKEComponentVersion() error {
mm.registerComponentVersionMetric()

return mm.recordComponentVersionMetric()
}

// Server represents any type that could serve HTTP requests for the metrics
// endpoint.
type Server interface {
Handle(pattern string, handler http.Handler)
}

// RegisterToServer registers an HTTP handler for this metrics manager to the
// given server at the specified address/path.
func (mm *Manager) registerToServer(s Server, metricsPath string) {
s.Handle(metricsPath, metrics.HandlerFor(
mm.GetRegistry(),
metrics.HandlerOpts{
ErrorHandling: metrics.ContinueOnError,
}))
}

// InitializeHTTPHandler sets up a server and creates a handler for metrics.
func (mm *Manager) InitializeHTTPHandler(address, path string) {
mux := http.NewServeMux()
mm.registerToServer(mux, path)

go func() {
klog.Infof("Metric server listening at %q", address)
//nolint:gosec
if err := http.ListenAndServe(address, mux); err != nil {
klog.Fatalf("Failed to start metric server at specified address (%q) and path (%q): %s", address, path, err)
}
}()
}

func getEnvVar(envVarName string) string {
v, ok := os.LookupEnv(envVarName)
if !ok {
Expand All @@ -162,5 +104,5 @@ func getEnvVar(envVarName string) string {
}

func IsGKEComponentVersionAvailable() bool {
return getEnvVar(envGKEGCSCSIVersion) != ""
return getEnvVar(envGKEGCSFuseCSIVersion) != ""
}

0 comments on commit bfd18c7

Please sign in to comment.