Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to the VPA admission-controller to reload it's certificate #6653

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vertical-pod-autoscaler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module k8s.io/autoscaler/vertical-pod-autoscaler
go 1.21

require (
github.com/fsnotify/fsnotify v1.7.0
github.com/golang/mock v1.6.0
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16
Expand Down
2 changes: 2 additions & 0 deletions vertical-pod-autoscaler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ github.com/emicklei/go-restful/v3 v3.10.1 h1:rc42Y5YTp7Am7CS630D7JmhRjq4UlEUuEKf
github.com/emicklei/go-restful/v3 v3.10.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down
96 changes: 88 additions & 8 deletions vertical-pod-autoscaler/pkg/admission-controller/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,31 @@ package main

import (
"os"
"path"

"crypto/tls"
"sync"

"k8s.io/klog/v2"

"github.com/fsnotify/fsnotify"
)

type certsContainer struct {
caCert, serverKey, serverCert []byte
// KeypairReloader structs holds cert path and certs
type KeypairReloader struct {
certMu sync.RWMutex
cert *tls.Certificate
certPath string
keyPath string
}

type certsConfig struct {
clientCaFile, tlsCertFile, tlsPrivateKey *string
reload *bool
}

func (cc *certsConfig) ReadCA() []byte {
return readFile(*cc.clientCaFile)
}

func readFile(filePath string) []byte {
Expand All @@ -41,10 +56,75 @@ func readFile(filePath string) []byte {
return res
}

func initCerts(config certsConfig) certsContainer {
res := certsContainer{}
res.caCert = readFile(*config.clientCaFile)
res.serverCert = readFile(*config.tlsCertFile)
res.serverKey = readFile(*config.tlsPrivateKey)
return res
// NewKeypairReloader will load certs on first run and trigger a goroutine for fsnotify watcher
func NewKeypairReloader(config certsConfig) (*KeypairReloader, error) {
result := &KeypairReloader{
certPath: *config.tlsCertFile,
keyPath: *config.tlsPrivateKey,
}
cert, err := tls.LoadX509KeyPair(*config.tlsCertFile, *config.tlsPrivateKey)
if err != nil {
return nil, err
}
result.cert = &cert

// creates a new file watcher
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}

defer func() {
if err != nil {
watcher.Close()
}
}()

if err := watcher.Add(path.Dir(*config.tlsCertFile)); err != nil {
return nil, err
}
if err := watcher.Add(path.Dir(*config.tlsPrivateKey)); err != nil {
return nil, err
}

go func() {
for {
select {
// watch for events
case event := <-watcher.Events:
// fsnotify.create events will tell us if there are new certs
if event.Op.Has(fsnotify.Create) || event.Op.Has(fsnotify.Write) {
klog.V(2).Info("New certificate found, reloading")
if err := result.reload(); err != nil {
klog.Warningf("Could not load new certs: %v", err)
}
}

// watch for errors
case err := <-watcher.Errors:
klog.Warningf("Error watching certificate: %v", err)
}
}
}()

return result, nil
}

// reload loads updated cert and key whenever they are updated
func (kpr *KeypairReloader) reload() error {
newCert, err := tls.LoadX509KeyPair(kpr.certPath, kpr.keyPath)
if err != nil {
return err
}
kpr.certMu.Lock()
defer kpr.certMu.Unlock()
kpr.cert = &newCert
return nil
}

// GetCertificate is the function which will be used as tls.Config.GetCertificate
func (kpr *KeypairReloader) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, error) {
kpr.certMu.RLock()
defer kpr.certMu.RUnlock()
return kpr.cert, nil
}
150 changes: 150 additions & 0 deletions vertical-pod-autoscaler/pkg/admission-controller/certs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"os"
"path"
"testing"
"time"
)

func TestKeypairReloader(t *testing.T) {
tempDir, err := os.MkdirTemp(os.TempDir(), "vpa-test")
if err != nil {
t.Error(err)
}
defer os.RemoveAll(tempDir)

caCert := &x509.Certificate{
SerialNumber: big.NewInt(0),
Subject: pkix.Name{
Organization: []string{"ca"},
},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(2, 0, 0),
IsCA: true,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}
caKey, err := rsa.GenerateKey(rand.Reader, 4096)
if err != nil {
t.Error(err)
}
caBytes, err := x509.CreateCertificate(rand.Reader, caCert, caCert, &caKey.PublicKey, caKey)
if err != nil {
t.Error(err)
}
caPath := path.Join(tempDir, "ca.crt")
caFile, err := os.Create(caPath)
if err != nil {
t.Error(err)
}
err = pem.Encode(caFile, &pem.Block{
Type: "CERTIFICATE",
Bytes: caBytes,
})
if err != nil {
t.Error(err)
}

generateCerts := func(org string) ([]byte, []byte) {
cert := &x509.Certificate{
SerialNumber: big.NewInt(0),
Subject: pkix.Name{
Organization: []string{org},
},
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(1, 0, 0),
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature,
}
certKey, err := rsa.GenerateKey(rand.Reader, 4096)
if err != nil {
t.Error(err)
}
certBytes, err := x509.CreateCertificate(rand.Reader, cert, caCert, &certKey.PublicKey, caKey)
if err != nil {
t.Error(err)
}

var certPem bytes.Buffer
err = pem.Encode(&certPem, &pem.Block{
Type: "CERTIFICATE",
Bytes: certBytes,
})
if err != nil {
t.Error(err)
}

var certKeyPem bytes.Buffer
err = pem.Encode(&certKeyPem, &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(certKey),
})
if err != nil {
t.Error(err)
}
return certPem.Bytes(), certKeyPem.Bytes()
}

pub, privateKey := generateCerts("first")
certPath := path.Join(tempDir, "cert.crt")
if err = os.WriteFile(certPath, pub, 0666); err != nil {
t.Error(err)
}
keyPath := path.Join(tempDir, "cert.key")
if err = os.WriteFile(keyPath, privateKey, 0666); err != nil {
t.Error(err)
}

kpr, err := NewKeypairReloader(certsConfig{
clientCaFile: &caPath,
tlsCertFile: &certPath,
tlsPrivateKey: &keyPath,
})
if err != nil {
t.Error(err)
}

pub, privateKey = generateCerts("second")
if err = os.WriteFile(certPath, pub, 0666); err != nil {
t.Error(err)
}
if err = os.WriteFile(keyPath, privateKey, 0666); err != nil {
t.Error(err)
}
time.Sleep(40 * time.Millisecond)
tlsCert, err := kpr.GetCertificate(nil)
if err != nil {
t.Error(err)
}
pubDER, _ := pem.Decode(pub)
if string(tlsCert.Certificate[0]) != string(pubDER.Bytes) {
t.Error("did not reload certificate")
}
}
24 changes: 17 additions & 7 deletions vertical-pod-autoscaler/pkg/admission-controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,10 @@ const (
webhookConfigName = "vpa-webhook-config"
)

func configTLS(serverCert, serverKey []byte, minTlsVersion, ciphers string) *tls.Config {
func configTLS(cfg certsConfig, minTlsVersion, ciphers string) (*tls.Config, error) {
var tlsVersion uint16
var ciphersuites []uint16
reverseCipherMap := make(map[string]uint16)
sCert, err := tls.X509KeyPair(serverCert, serverKey)
if err != nil {
klog.Fatal(err)
}

for _, c := range tls.CipherSuites() {
reverseCipherMap[c.Name] = c.ID
Expand All @@ -66,11 +62,25 @@ func configTLS(serverCert, serverKey []byte, minTlsVersion, ciphers string) *tls
klog.Fatal(fmt.Errorf("Unable to determine value for --min-tls-version (%s), must be either tls1_2 or tls1_3", minTlsVersion))
}

return &tls.Config{
config := &tls.Config{
MinVersion: tlsVersion,
Certificates: []tls.Certificate{sCert},
CipherSuites: ciphersuites,
}
if *cfg.reload {
kpr, err := NewKeypairReloader(cfg)
if err != nil {
return nil, err
}
// this will check if there are new certs before every tls handshake
config.GetCertificate = kpr.GetCertificate
} else {
cert, err := tls.LoadX509KeyPair(*cfg.tlsCertFile, *cfg.tlsPrivateKey)
if err != nil {
return nil, err
}
config.Certificates = []tls.Certificate{cert}
}
return config, nil
}

// register this webhook admission controller with the kube-apiserver
Expand Down
14 changes: 10 additions & 4 deletions vertical-pod-autoscaler/pkg/admission-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
clientCaFile: flag.String("client-ca-file", "/etc/tls-certs/caCert.pem", "Path to CA PEM file."),
tlsCertFile: flag.String("tls-cert-file", "/etc/tls-certs/serverCert.pem", "Path to server certificate PEM file."),
tlsPrivateKey: flag.String("tls-private-key", "/etc/tls-certs/serverKey.pem", "Path to server certificate key PEM file."),
reload: flag.Bool("reload-cert", false, "If true, reload leaf certificate on file changes"),
}
ciphers = flag.String("tls-ciphers", "", "A comma-separated or colon-separated list of ciphers to accept. Only works when min-tls-version is set to tls1_2.")
minTlsVersion = flag.String("min-tls-version", "tls1_2", "The minimum TLS version to accept. Must be set to either tls1_2 (default) or tls1_3.")
Expand Down Expand Up @@ -86,7 +87,6 @@ func main() {
metrics.Initialize(*address, healthCheck)
metrics_admission.Register()

certs := initCerts(*certsConfiguration)
config := common.CreateKubeConfigOrDie(*kubeconfig, float32(*kubeApiQps), int(*kubeApiBurst))

vpaClient := vpa_clientset.NewForConfigOrDie(config)
Expand All @@ -98,7 +98,8 @@ func main() {
podPreprocessor := pod.NewDefaultPreProcessor()
vpaPreprocessor := vpa.NewDefaultPreProcessor()
var limitRangeCalculator limitrange.LimitRangeCalculator
limitRangeCalculator, err := limitrange.NewLimitsRangeCalculator(factory)
var err error
limitRangeCalculator, err = limitrange.NewLimitsRangeCalculator(factory)
if err != nil {
klog.Errorf("Failed to create limitRangeCalculator, falling back to not checking limits. Error message: %s", err)
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
Expand Down Expand Up @@ -131,14 +132,19 @@ func main() {
as.Serve(w, r)
healthCheck.UpdateLastActivity()
})
tlsConfig, err := configTLS(*certsConfiguration, *minTlsVersion, *ciphers)
if err != nil {
klog.Fatalf("failed to configure TLS: %s", err)
}
server := &http.Server{
Addr: fmt.Sprintf(":%d", *port),
TLSConfig: configTLS(certs.serverCert, certs.serverKey, *minTlsVersion, *ciphers),
TLSConfig: tlsConfig,
}

url := fmt.Sprintf("%v:%v", *webhookAddress, *webhookPort)
go func() {
if *registerWebhook {
selfRegistration(kubeClient, certs.caCert, namespace, *serviceName, url, *registerByURL, int32(*webhookTimeout))
selfRegistration(kubeClient, certsConfiguration.ReadCA(), namespace, *serviceName, url, *registerByURL, int32(*webhookTimeout))
}
// Start status updates after the webhook is initialized.
statusUpdater.Run(stopCh)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading