Skip to content

Commit

Permalink
move to multi-cluster controller manager
Browse files Browse the repository at this point in the history
Signed-off-by: Umanga Chapagain <[email protected]>
  • Loading branch information
umangachapagain committed Jul 18, 2024
1 parent 6570168 commit 8787924
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 106 deletions.
8 changes: 7 additions & 1 deletion addons/agent_mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// MirrorPeerReconciler reconciles a MirrorPeer object
type MirrorPeerReconciler struct {
HubCluster cluster.Cluster
HubClient client.Client
Scheme *runtime.Scheme
SpokeClient client.Client
Expand Down Expand Up @@ -452,7 +456,9 @@ func (r *MirrorPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Logger.Info("Setting up controller with manager")
mpPredicate := utils.ComposePredicates(predicate.GenerationChangedPredicate{}, mirrorPeerSpokeClusterPredicate)
return ctrl.NewControllerManagedBy(mgr).
For(&multiclusterv1alpha1.MirrorPeer{}, builder.WithPredicates(mpPredicate)).
Named("agent_mirrorpeer_controller").
WatchesRawSource(source.Kind(r.HubCluster.GetCache(), &multiclusterv1alpha1.MirrorPeer{}), &handler.EnqueueRequestForObject{},
builder.WithPredicates(mpPredicate)).
Complete(r)
}

Expand Down
5 changes: 4 additions & 1 deletion addons/green_secret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// GreenSecretReconciler reconciles a MirrorPeer object
type GreenSecretReconciler struct {
Scheme *runtime.Scheme
HubCluster cluster.Cluster
HubClient client.Client
SpokeClient client.Client
SpokeClusterName string
Expand Down Expand Up @@ -56,7 +59,7 @@ func (r *GreenSecretReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
Named("greensecret_controller").
Watches(&corev1.Secret{}, &handler.EnqueueRequestForObject{},
WatchesRawSource(source.Kind(r.HubCluster.GetCache(), &corev1.Secret{}), &handler.EnqueueRequestForObject{},
builder.WithPredicates(predicate.GenerationChangedPredicate{}, greenSecretPredicate)).
Complete(r)
}
Expand Down
165 changes: 61 additions & 104 deletions addons/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package addons
import (
"context"
"fmt"
"log/slog"
"os"
"time"

Expand Down Expand Up @@ -36,7 +35,7 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
)
Expand Down Expand Up @@ -113,6 +112,7 @@ func (o *AddonAgentOptions) RunAgent(ctx context.Context) {
}()
ctrl.SetLogger(zapr.NewLogger(zapLogger))
logger := utils.GetLogger(zapLogger)

logger.Info("Starting addon agents.")
cc, err := addonutils.NewConfigChecker("agent kubeconfig checker", o.HubKubeconfigFile)
if err != nil {
Expand All @@ -123,105 +123,22 @@ func (o *AddonAgentOptions) RunAgent(ctx context.Context) {
logger.Info("Serving health probes on port 8000")
go setup.ServeHealthProbes(ctx.Done(), ":8000", cc.Check, logger)

logger.Info("Starting spoke manager")
go runSpokeManager(ctx, *o, logger)

logger.Info("Starting hub manager")
go runHubManager(ctx, *o, logger)

logger.Info("Addon agent is running, waiting for context cancellation")
<-ctx.Done()
logger.Info("Addon agent has stopped")
}

// GetClientFromConfig returns a controller-runtime Client for a rest.Config
func GetClientFromConfig(config *rest.Config) (client.Client, error) {
cl, err := client.New(config, client.Options{Scheme: mgrScheme})
if err != nil {
return nil, err
}
return cl, nil
}

// GetClientConfig returns the rest.Config for a kubeconfig file
func GetClientConfig(kubeConfigFile string) (*rest.Config, error) {
clientconfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile)
if err != nil {
return nil, fmt.Errorf("get clientconfig failed: %w", err)
}
return clientconfig, nil
}

func runHubManager(ctx context.Context, options AddonAgentOptions, logger *slog.Logger) {
hubConfig, err := GetClientConfig(options.HubKubeconfigFile)
if err != nil {
logger.Error("Failed to get kubeconfig", "error", err)
os.Exit(1)
}

spokeClient, err := GetClientFromConfig(ctrl.GetConfigOrDie())
hubConfig, err := GetClientConfig(o.HubKubeconfigFile)
if err != nil {
logger.Error("Failed to get spoke client", "error", err)
logger.Error("Failed to get hub kubeconfig", "error", err)
os.Exit(1)
}

mgr, err := ctrl.NewManager(hubConfig, ctrl.Options{
Scheme: mgrScheme,
Metrics: server.Options{
BindAddress: "0", // disable metrics
},
HealthProbeBindAddress: "0", // disable health probe
ReadinessEndpointName: "0", // disable readiness probe
Cache: cache.Options{
hubCluster, err := cluster.New(hubConfig, func(options *cluster.Options) {
options.Scheme = mgrScheme
options.Cache = cache.Options{
DefaultNamespaces: map[string]cache.Config{
options.SpokeClusterName: {},
o.SpokeClusterName: {},
},
},
}
})
if err != nil {
logger.Error("Failed to start manager", "error", err)
os.Exit(1)
}

if err = (&MirrorPeerReconciler{
Scheme: mgr.GetScheme(),
HubClient: mgr.GetClient(),
SpokeClient: spokeClient,
SpokeClusterName: options.SpokeClusterName,
Logger: logger.With("controller", "MirrorPeerReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create MirrorPeer controller", "controller", "MirrorPeer", "error", err)
os.Exit(1)
}

if err = (&GreenSecretReconciler{
Scheme: mgr.GetScheme(),
HubClient: mgr.GetClient(),
SpokeClient: spokeClient,
SpokeClusterName: options.SpokeClusterName,
Logger: logger.With("controller", "GreenSecretReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create GreenSecret controller", "controller", "GreenSecret", "error", err)
os.Exit(1)
}

logger.Info("Starting hub controller manager")
if err := mgr.Start(ctx); err != nil {
logger.Error("Problem running hub controller manager", "error", err)
os.Exit(1)
}
}

func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slog.Logger) {
hubConfig, err := GetClientConfig(options.HubKubeconfigFile)
if err != nil {
logger.Error("Failed to get kubeconfig", "error", err)
os.Exit(1)
}

hubClient, err := GetClientFromConfig(hubConfig)
if err != nil {
logger.Error("Failed to get spoke client", "error", err)
logger.Error("Failed to get hub cluster", "error", err)
os.Exit(1)
}

Expand All @@ -233,14 +150,12 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo
HealthProbeBindAddress: "0", // disable health probe
ReadinessEndpointName: "0", // disable readiness probe
})

if err != nil {
logger.Error("Failed to start manager", "error", err)
logger.Error("Failed to create a new controller manager", "error", err)
os.Exit(1)
}

spokeKubeConfig := mgr.GetConfig()
spokeKubeClient, err := kubernetes.NewForConfig(spokeKubeConfig)
spokeKubeClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
logger.Error("Failed to get spoke kube client", "error", err)
os.Exit(1)
Expand All @@ -266,7 +181,7 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo
if err = (&MaintenanceModeReconciler{
Scheme: mgr.GetScheme(),
SpokeClient: mgr.GetClient(),
SpokeClusterName: options.SpokeClusterName,
SpokeClusterName: o.SpokeClusterName,
Logger: logger.With("controller", "MaintenanceModeReconciler"),
}).SetupWithManager(mgr); err != nil {
klog.Error("Unable to create MaintenanceMode controller.", err)
Expand Down Expand Up @@ -301,9 +216,9 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo

if err = (&BlueSecretReconciler{
Scheme: mgr.GetScheme(),
HubClient: hubClient,
HubClient: hubCluster.GetClient(),
SpokeClient: mgr.GetClient(),
SpokeClusterName: options.SpokeClusterName,
SpokeClusterName: o.SpokeClusterName,
Logger: logger.With("controller", "BlueSecretReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create BlueSecret controller", "controller", "BlueSecret", "error", err)
Expand All @@ -312,18 +227,60 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo

if err = (&S3SecretReconciler{
Scheme: mgr.GetScheme(),
HubClient: hubClient,
HubClient: hubCluster.GetClient(),
SpokeClient: mgr.GetClient(),
SpokeClusterName: options.SpokeClusterName,
SpokeClusterName: o.SpokeClusterName,
Logger: logger.With("controller", "S3SecretReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create S3Secret controller", "controller", "S3Secret", "error", err)
os.Exit(1)
}

logger.Info("Starting spoke controller manager")
if err := mgr.Add(hubCluster); err != nil {
logger.Error("Problem adding hub cluster to controller manager", "error", err)
os.Exit(1)
}

if err = (&MirrorPeerReconciler{
Scheme: mgr.GetScheme(),
HubCluster: hubCluster,
HubClient: hubCluster.GetClient(),
SpokeClient: mgr.GetClient(),
SpokeClusterName: o.SpokeClusterName,
Logger: logger.With("controller", "MirrorPeerReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create MirrorPeer controller", "controller", "MirrorPeer", "error", err)
os.Exit(1)
}

if err = (&GreenSecretReconciler{
Scheme: mgr.GetScheme(),
HubCluster: hubCluster,
HubClient: hubCluster.GetClient(),
SpokeClient: mgr.GetClient(),
SpokeClusterName: o.SpokeClusterName,
Logger: logger.With("controller", "GreenSecretReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create GreenSecret controller", "controller", "GreenSecret", "error", err)
os.Exit(1)
}

logger.Info("Starting controller manager")
if err := mgr.Start(ctx); err != nil {
logger.Error("Problem running spoke controller manager", "error", err)
logger.Error("Problem running controller manager", "error", err)
os.Exit(1)
}

logger.Info("Addon agent is running, waiting for context cancellation")
<-ctx.Done()
logger.Info("Addon agent has stopped")
}

// GetClientConfig returns the rest.Config for a kubeconfig file
func GetClientConfig(kubeConfigFile string) (*rest.Config, error) {
clientconfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile)
if err != nil {
return nil, fmt.Errorf("get clientconfig failed: %w", err)
}
return clientconfig, nil
}

0 comments on commit 8787924

Please sign in to comment.