Skip to content

Commit

Permalink
KEP-2170: Initial Implementations for v2 Manager
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y committed Aug 30, 2024
1 parent 2455f7d commit dafd032
Show file tree
Hide file tree
Showing 13 changed files with 751 additions and 1 deletion.
6 changes: 5 additions & 1 deletion .github/workflows/unittests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ jobs:
with:
go-version-file: ${{ env.GOPATH }}/src/github.com/kubeflow/training-operator/go.mod

- name: Run Go test
- name: Run Go test for v1
run: |
make test ENVTEST_K8S_VERSION=${{ matrix.kubernetes-version }}
- name: Run Go test for v2
run: |
make test-integrationv2 ENVTEST_K8S_VERSION=${{ matrix.kubernetes-version }}
- name: Coveralls report
uses: shogo82148/actions-goveralls@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ testall: manifests generate fmt vet golangci-lint test ## Run tests.
test: envtest
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out

.PHONY: test-integrationv2
test-integrationv2:
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./test/... -coverprofile cover.out

envtest:
ifndef HAS_SETUP_ENVTEST
go install sigs.k8s.io/controller-runtime/tools/setup-envtest@bf15e44028f908c790721fc8fe67c7bf2d06a611 # v0.17.2
Expand Down
160 changes: 160 additions & 0 deletions cmd/training-operator.v2alpha1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,166 @@ limitations under the License.

package main

import (
"crypto/tls"
"errors"
"flag"
"net/http"
"os"

zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
"github.com/kubeflow/training-operator/pkg/cert"
controllerv2 "github.com/kubeflow/training-operator/pkg/controller.v2"
webhookv2 "github.com/kubeflow/training-operator/pkg/webhook.v2"
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kubeflowv2.AddToScheme(scheme))
utilruntime.Must(jobsetv1alpha2.AddToScheme(scheme))
}

func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var webhookServerPort int
var webhookServiceName string
var webhookSecretName string
var tlsOpts []func(*tls.Config)

flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", true,
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")

// Cert generation flags
flag.IntVar(&webhookServerPort, "webhook-server-port", 9443, "Endpoint port for the webhook server.")
flag.StringVar(&webhookServiceName, "webhook-service-name", "training-operator", "Name of the Service used as part of the DNSName")
flag.StringVar(&webhookSecretName, "webhook-secret-name", "training-operator-webhook-cert", "Name of the Secret to store CA and server certs")

opts := zap.Options{
TimeEncoder: zapcore.RFC3339NanoTimeEncoder,
ZapOpts: []zaplog.Option{zaplog.AddCaller()},
}
opts.BindFlags(flag.CommandLine)
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

if !enableHTTP2 {
// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
// Rapid Reset CVEs. For more information see:
// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
// - https://github.com/advisories/GHSA-4374-p667-p6c8
tlsOpts = append(tlsOpts, func(c *tls.Config) {
setupLog.Info("disabling http/2")
c.NextProtos = []string{"http/1.1"}
})
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
TLSOpts: tlsOpts,
},
WebhookServer: webhook.NewServer(webhook.Options{
Port: webhookServerPort,
TLSOpts: tlsOpts,
}),
HealthProbeBindAddress: probeAddr,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

certsReady := make(chan struct{})
if err = cert.ManageCerts(mgr, cert.Config{
WebhookSecretName: webhookSecretName,
WebhookServiceName: webhookServiceName,
}, certsReady); err != nil {
setupLog.Error(err, "unable to set up cert rotation")
os.Exit(1)
}

setupProbeEndpoints(mgr, certsReady)
// Set up controllers using goroutines to start the manager quickly.
go setupControllers(mgr, certsReady)

setupLog.Info("Starting manager")
if err = mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "Could not run manager")
os.Exit(1)
}
}

func setupControllers(mgr ctrl.Manager, certsReady <-chan struct{}) {
setupLog.Info("Waiting for certificate generation to complete")
<-certsReady
setupLog.Info("Certs ready")

if failedCtrlName, err := controllerv2.SetupControllers(mgr); err != nil {
setupLog.Error(err, "Could not create controller", "controller", failedCtrlName)
os.Exit(1)
}
if failedWebhook, err := webhookv2.Setup(mgr); err != nil {
setupLog.Error(err, "Could not create webhook", "webhook", failedWebhook)
os.Exit(1)
}
}

func setupProbeEndpoints(mgr ctrl.Manager, certsReady <-chan struct{}) {
defer setupLog.Info("Probe endpoints are configured on healthz and readyz")

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
// Wait for the webhook server to be listening before advertising the
// training-operator replica as ready. This allows users to wait with sending the first
// requests, requiring webhooks, until the training-operator deployment is available, so
// that the early requests are not rejected during the training-operator's startup.
// We wrap the call to GetWebhookServer in a closure to delay calling
// the function, otherwise a not fully-initialized webhook server (without
// ready certs) fails the start of the manager.
if err := mgr.AddReadyzCheck("readyz", func(req *http.Request) error {
select {
case <-certsReady:
return mgr.GetWebhookServer().StartedChecker()(req)
default:
return errors.New("certificates are not ready")
}
}); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
}
60 changes: 60 additions & 0 deletions pkg/controller.v2/clustertrainingruntime_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2024 The Kubeflow Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllerv2

import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
)

type ClusterTrainingRuntimeReconciler struct {
log logr.Logger
client client.Client
recorder record.EventRecorder
}

func NewClusterTrainingRuntimeReconciler(client client.Client, recorder record.EventRecorder) *ClusterTrainingRuntimeReconciler {
return &ClusterTrainingRuntimeReconciler{
log: ctrl.Log.WithName("clustertrainingruntime-reconciler"),
client: client,
recorder: recorder,
}
}

func (r *ClusterTrainingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var clRuntime kubeflowv2.ClusterTrainingRuntime
if err := r.client.Get(ctx, req.NamespacedName, &clRuntime); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log := ctrl.LoggerFrom(ctx).WithValues("clusterTrainingRuntime", klog.KObj(&clRuntime))
ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling ClusterTrainingRuntime")
return ctrl.Result{}, nil
}

func (r *ClusterTrainingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kubeflowv2.ClusterTrainingRuntime{}).
Complete(r)
}
41 changes: 41 additions & 0 deletions pkg/controller.v2/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2024 The Kubeflow Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllerv2

import ctrl "sigs.k8s.io/controller-runtime"

func SetupControllers(mgr ctrl.Manager) (string, error) {
if err := NewTrainingRuntimeReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor("training-operator-traininigruntime-controller"),
).SetupWithManager(mgr); err != nil {
return "TrainingRuntime", err
}
if err := NewClusterTrainingRuntimeReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor("training-operator-clustertraininigruntime-controller"),
).SetupWithManager(mgr); err != nil {
return "ClusterTrainingRuntime", err
}
if err := NewTrainJobReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor("training-operator-trainjob-controller"),
).SetupWithManager(mgr); err != nil {
return "TrainJob", err
}
return "", nil
}
59 changes: 59 additions & 0 deletions pkg/controller.v2/traininigruntime_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2024 The Kubeflow Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllerv2

import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
)

type TrainingRuntimeReconciler struct {
log logr.Logger
client client.Client
recorder record.EventRecorder
}

func NewTrainingRuntimeReconciler(client client.Client, recorder record.EventRecorder) *TrainingRuntimeReconciler {
return &TrainingRuntimeReconciler{
log: ctrl.Log.WithName("trainingruntime-reconciler"),
client: client,
recorder: recorder,
}
}

func (r *TrainingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var runtime kubeflowv2.TrainingRuntime
if err := r.client.Get(ctx, req.NamespacedName, &runtime); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log := ctrl.LoggerFrom(ctx).WithValues("trainingRuntime", runtime)
ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling TrainingRuntime")
return ctrl.Result{}, nil
}

func (r *TrainingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kubeflowv2.TrainingRuntime{}).
Complete(r)
}
Loading

0 comments on commit dafd032

Please sign in to comment.