diff --git a/pkg/search/controller.go b/pkg/search/controller.go index 61c25c4d1c59..608ee9d367fa 100644 --- a/pkg/search/controller.go +++ b/pkg/search/controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" @@ -349,6 +350,13 @@ func (c *Controller) getRegistryBackendHandler(cluster string, matchedRegistries return handler, nil } +var controlPlaneClientBuilder = func(restConfig *rest.Config) client.Client { + return gclient.NewForConfigOrDie(restConfig) +} +var clusterDynamicClientBuilder = func(cluster string, controlPlaneClient client.Client) (*util.DynamicClusterClient, error) { + return util.NewClusterDynamicClientSet(cluster, controlPlaneClient) +} + // doCacheCluster processes the resourceRegistry object // TODO: update status func (c *Controller) doCacheCluster(cluster string) error { @@ -386,9 +394,9 @@ func (c *Controller) doCacheCluster(cluster string) error { // STEP2: added/updated cluster, builds an informer manager for a specific cluster. if !c.InformerManager.IsManagerExist(cluster) { klog.Info("Try to build informer manager for cluster ", cluster) - controlPlaneClient := gclient.NewForConfigOrDie(c.restConfig) + controlPlaneClient := controlPlaneClientBuilder(c.restConfig) - clusterDynamicClient, err := util.NewClusterDynamicClientSet(cluster, controlPlaneClient) + clusterDynamicClient, err := clusterDynamicClientBuilder(cluster, controlPlaneClient) if err != nil { return err } diff --git a/pkg/search/controllers_test.go b/pkg/search/controllers_test.go new file mode 100644 index 000000000000..6576b3d35956 --- /dev/null +++ b/pkg/search/controllers_test.go @@ -0,0 +1,762 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package search + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakedynamic "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1" + versioned "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" + fakekarmadaclient "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" + informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + "github.com/karmada-io/karmada/pkg/search/backendstore" + "github.com/karmada-io/karmada/pkg/util" +) + +func TestNewKarmadaSearchController(t *testing.T) { + tests := []struct { + name string + restConfig *rest.Config + factory informerfactory.SharedInformerFactory + restMapper meta.RESTMapper + client versioned.Interface + prep func(*informerfactory.SharedInformerFactory, versioned.Interface) error + wantErr bool + errMsg string + }{ + { + name: "NewKarmadaSearchController", + restConfig: &rest.Config{}, + restMapper: meta.NewDefaultRESTMapper(nil), + client: fakekarmadaclient.NewSimpleClientset(), + factory: informerfactory.NewSharedInformerFactory(fakekarmadaclient.NewSimpleClientset(), 0), + prep: func(*informerfactory.SharedInformerFactory, versioned.Interface) error { return nil }, + wantErr: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if err := test.prep(&test.factory, test.client); err != nil { + t.Fatalf("failed to prep test environment before creating new controller, got: %v", err) + } + _, err := NewController(test.restConfig, test.factory, test.restMapper) + if err == nil && test.wantErr { + t.Fatal("expected an error, but got none") + } + if err != nil && !test.wantErr { + t.Errorf("unexpected error, got: %v", err) + } + if err != nil && test.wantErr && !strings.Contains(err.Error(), test.errMsg) { + t.Errorf("expected error message %s to be in %s", test.errMsg, err.Error()) + } + }) + } +} + +func TestAddClusterEventHandler(t *testing.T) { + tests := []struct { + name string + restConfig *rest.Config + client *fakekarmadaclient.Clientset + restMapper meta.RESTMapper + stopCh chan struct{} + prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) + verify func(*fakekarmadaclient.Clientset, *Controller) error + }{ + { + name: "AddAllEventHandlers_TriggerAddClusterEvent_ClusterAddedToWorkQueue", + restConfig: &rest.Config{}, + client: fakekarmadaclient.NewSimpleClientset(), + restMapper: meta.NewDefaultRESTMapper(nil), + stopCh: make(chan struct{}), + prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) { + factory := informerfactory.NewSharedInformerFactory(clientConnector, 0) + controller, err := createController(restConfig, factory, restMapper) + return controller, factory, err + }, + verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller) error { + var ( + clusterName, resourceVersion = "test-cluster", "1000" + apiEndpoint, labels = "10.0.0.1", map[string]string{} + ) + + // Wait a bit to allow addCluster + // background thread to complete its execution. + if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + return nil + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper) + if err != nil { + t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err) + } + + // Add event handlers and start the informer to watch for changes. + controller.addAllEventHandlers() + informer.Start(test.stopCh) + defer close(test.stopCh) + + if err := test.verify(test.client, controller); err != nil { + t.Errorf("failed to verify controller, got: %v", err) + } + }) + } +} + +func TestUpdateClusterEventHandler(t *testing.T) { + tests := []struct { + name string + restConfig *rest.Config + client *fakekarmadaclient.Clientset + restMapper meta.RESTMapper + stopCh chan struct{} + prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) + verify func(*fakekarmadaclient.Clientset, *Controller) error + }{ + { + name: "AddAllEventHandlers_TriggerUpdateClusterEvent_UpdatedClusterAddedToWorkQueue", + restConfig: &rest.Config{}, + client: fakekarmadaclient.NewSimpleClientset(), + restMapper: meta.NewDefaultRESTMapper(nil), + stopCh: make(chan struct{}), + prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) { + factory := informerfactory.NewSharedInformerFactory(clientConnector, 0) + controller, err := createController(restConfig, factory, restMapper) + return controller, factory, err + }, + verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller) error { + var ( + clusterName, resourceVersion, updatedResourceVerison = "test-cluster", "1000", "1001" + apiEndpoint, oldLabels, newLabels = "10.0.0.1", map[string]string{"status": "old"}, map[string]string{"status": "new"} + ) + + // Wait a bit to allow addCluster + // background thread to complete its execution. + if err := upsertCluster(clientConnector, oldLabels, apiEndpoint, clusterName, resourceVersion); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow updateCluster + // background thread to complete its execution. + if err := upsertCluster(clientConnector, newLabels, apiEndpoint, clusterName, updatedResourceVerison); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + return nil + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper) + if err != nil { + t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err) + } + + // Add event handlers and start the informer to watch for changes. + controller.addAllEventHandlers() + informer.Start(test.stopCh) + defer close(test.stopCh) + + if err := test.verify(test.client, controller); err != nil { + t.Errorf("failed to verify controller, got: %v", err) + } + }) + } +} + +func TestDeleteClusterEventHandler(t *testing.T) { + tests := []struct { + name string + restConfig *rest.Config + client *fakekarmadaclient.Clientset + controlPlaneClient client.WithWatch + restMapper meta.RESTMapper + stopCh chan struct{} + prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) + verify func(*fakekarmadaclient.Clientset, *Controller, client.Client) error + }{ + { + name: "AddAllEventHandlers_TriggerDeleteClusterEvent_DeletedClusterAddedToWorkQueue", + restConfig: &rest.Config{}, + client: fakekarmadaclient.NewSimpleClientset(), + controlPlaneClient: fake.NewFakeClient(), + restMapper: meta.NewDefaultRESTMapper(nil), + stopCh: make(chan struct{}), + prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) { + factory := informerfactory.NewSharedInformerFactory(clientConnector, 0) + controller, err := createController(restConfig, factory, restMapper) + return controller, factory, err + }, + verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller, controlPlaneClient client.Client) error { + var ( + registryName, clusterName = "test-registry", "test-cluster" + resourceVersion, apiEndpoint = "1000", "10.0.0.1" + labels = map[string]string{} + resourceSelectors = []searchv1alpha1.ResourceSelector{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + }, + } + ) + + if err := clusterv1alpha1.Install(scheme.Scheme); err != nil { + return fmt.Errorf("failed to install scheme: %w", err) + } + if err := upsertClusterControllerRuntime(controlPlaneClient, labels, clusterName, apiEndpoint); err != nil { + return err + } + controlPlaneClientBuilder = func(*rest.Config) client.Client { + return controlPlaneClient + } + clusterDynamicClientBuilder = func(string, client.Client) (*util.DynamicClusterClient, error) { + return &util.DynamicClusterClient{ + DynamicClientSet: fakedynamic.NewSimpleDynamicClient(scheme.Scheme), + ClusterName: clusterName, + }, nil + } + + // Wait a bit to allow for addCluster background + // thread to complete its execution. + if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow for addResourceRegistry background + // thread to complete its execution. + if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow for deleteCluster on the controller + // background thread to complete its execution. + if err := deleteCluster(clientConnector, clusterName); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Verify no backend store for this deleted cluster. + if backend := backendstore.GetBackend(clusterName); backend != nil { + return fmt.Errorf("expected backend store for cluster %s to be deleted, but got: %v", clusterName, backend) + } + + return nil + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper) + if err != nil { + t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err) + } + + // Add event handlers and start the informer to watch for changes. + controller.addAllEventHandlers() + informer.Start(test.stopCh) + defer close(test.stopCh) + + if err := test.verify(test.client, controller, test.controlPlaneClient); err != nil { + t.Errorf("failed to verify controller, got: %v", err) + } + }) + } +} + +func TestAddResourceRegistryEventHandler(t *testing.T) { + tests := []struct { + name string + restConfig *rest.Config + client *fakekarmadaclient.Clientset + controlPlaneClient client.WithWatch + restMapper meta.RESTMapper + stopCh chan struct{} + prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) + verify func(*fakekarmadaclient.Clientset, *Controller, client.Client) error + }{ + { + name: "AddAllEventHandlers_TriggerAddResourceRegistryEvent_ResourceRegistryAddedToWorkQueue", + restConfig: &rest.Config{}, + client: fakekarmadaclient.NewSimpleClientset(), + controlPlaneClient: fake.NewFakeClient(), + restMapper: meta.NewDefaultRESTMapper(nil), + stopCh: make(chan struct{}), + prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) { + factory := informerfactory.NewSharedInformerFactory(clientConnector, 0) + controller, err := createController(restConfig, factory, restMapper) + return controller, factory, err + }, + verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller, controlPlaneClient client.Client) error { + var ( + registryName, clusterName = "test-registry", "test-cluster" + resourceVersion, apiEndpoint = "1000", "10.0.0.1" + labels = map[string]string{} + resourceSelectors = []searchv1alpha1.ResourceSelector{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + }, + } + ) + + controlPlaneClientBuilder = func(*rest.Config) client.Client { + return controlPlaneClient + } + clusterDynamicClientBuilder = func(string, client.Client) (*util.DynamicClusterClient, error) { + return &util.DynamicClusterClient{ + DynamicClientSet: fakedynamic.NewSimpleDynamicClient(scheme.Scheme), + ClusterName: clusterName, + }, nil + } + + // Wait a bit to allow addCluster background + // thread to complete its execution. + if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow addResourceRegistry background + // thread to complete its execution. + if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + return nil + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper) + if err != nil { + t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err) + } + + // Add event handlers and start the informer to watch for changes. + controller.addAllEventHandlers() + informer.Start(test.stopCh) + defer close(test.stopCh) + + if err := test.verify(test.client, controller, test.controlPlaneClient); err != nil { + t.Errorf("failed to verify controller, got: %v", err) + } + }) + } +} + +func TestUpdateResourceRegistryEventHandler(t *testing.T) { + tests := []struct { + name string + restConfig *rest.Config + client *fakekarmadaclient.Clientset + restMapper meta.RESTMapper + stopCh chan struct{} + prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) + verify func(*fakekarmadaclient.Clientset, *Controller) error + }{ + { + name: "AddAllEventHandlers_TriggerUpdateResourceRegistryEvent_UpdatedResourceRegistryAddedToWorkQueue", + restConfig: &rest.Config{}, + client: fakekarmadaclient.NewSimpleClientset(), + restMapper: meta.NewDefaultRESTMapper(nil), + stopCh: make(chan struct{}), + prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) { + factory := informerfactory.NewSharedInformerFactory(clientConnector, 0) + controller, err := createController(restConfig, factory, restMapper) + return controller, factory, err + }, + verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller) error { + var ( + registryName, clusterName, resourceVersion = "test-registry", "test-cluster", "1000" + apiEndpoint, labels = "10.0.0.1", map[string]string{} + resourceSelectors = []searchv1alpha1.ResourceSelector{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + }, + } + resourceSelectorsUpdated = []searchv1alpha1.ResourceSelector{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + }, + { + APIVersion: "v1", + Kind: "Pod", + }, + } + ) + + // Wait a bit to allow addCluster background thread + // to complete its execution. + if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow addResourceRegistry background thread + // to complete its execution. + if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow updateResourceRegistry background thread + // to complete its execution. + if err := upsertResourceRegistry(clientConnector, resourceSelectorsUpdated, registryName, resourceVersion, []string{clusterName}); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + return nil + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper) + if err != nil { + t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err) + } + + // Add event handlers and start the informer to watch for changes. + controller.addAllEventHandlers() + informer.Start(test.stopCh) + defer close(test.stopCh) + + if err := test.verify(test.client, controller); err != nil { + t.Errorf("failed to verify controller, got: %v", err) + } + }) + } +} + +func TestDeleteResourceRegistryEventHandler(t *testing.T) { + tests := []struct { + name string + restConfig *rest.Config + client *fakekarmadaclient.Clientset + controlPlaneClient client.WithWatch + restMapper meta.RESTMapper + stopCh chan struct{} + prep func(*fakekarmadaclient.Clientset, *rest.Config, meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) + verify func(*fakekarmadaclient.Clientset, *Controller, client.Client) error + }{ + { + name: "AddAllEventHandlers_TriggerDeleteResourceRegistryEvent_DeletedResourceRegistryAddedToWorkQueue", + restConfig: &rest.Config{}, + client: fakekarmadaclient.NewSimpleClientset(), + controlPlaneClient: fake.NewFakeClient(), + restMapper: meta.NewDefaultRESTMapper(nil), + stopCh: make(chan struct{}), + prep: func(clientConnector *fakekarmadaclient.Clientset, restConfig *rest.Config, restMapper meta.RESTMapper) (*Controller, informerfactory.SharedInformerFactory, error) { + factory := informerfactory.NewSharedInformerFactory(clientConnector, 0) + controller, err := createController(restConfig, factory, restMapper) + return controller, factory, err + }, + verify: func(clientConnector *fakekarmadaclient.Clientset, controller *Controller, controlPlaneClient client.Client) error { + var ( + registryName, clusterName = "test-registry", "test-cluster" + resourceVersion, apiEndpoint = "1000", "10.0.0.1" + labels = map[string]string{} + resourceSelectors = []searchv1alpha1.ResourceSelector{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + }, + } + ) + + controlPlaneClientBuilder = func(*rest.Config) client.Client { + return controlPlaneClient + } + clusterDynamicClientBuilder = func(string, client.Client) (*util.DynamicClusterClient, error) { + return &util.DynamicClusterClient{ + DynamicClientSet: fakedynamic.NewSimpleDynamicClient(scheme.Scheme), + ClusterName: clusterName, + }, nil + } + + // Wait a bit to allow addCluster + // background thread to complete its execution. + if err := upsertCluster(clientConnector, labels, apiEndpoint, clusterName, resourceVersion); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow addResourceRegistry + // background thread to complete its execution. + if err := upsertResourceRegistry(clientConnector, resourceSelectors, registryName, resourceVersion, []string{clusterName}); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + // Wait a bit to allow deleteResourceRegistry + // background thread to complete its execution. + if err := deleteResourceRegistry(clientConnector, registryName); err != nil { + return err + } + time.Sleep(time.Millisecond * 250) + if err := cacheNextWrapper(controller); err != nil { + return err + } + + return nil + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + controller, informer, err := test.prep(test.client, test.restConfig, test.restMapper) + if err != nil { + t.Fatalf("failed to prepare test environment for event handler setup, got: %v", err) + } + + // Add event handlers and start the informer to watch for changes. + controller.addAllEventHandlers() + informer.Start(test.stopCh) + defer close(test.stopCh) + + if err := test.verify(test.client, controller, test.controlPlaneClient); err != nil { + t.Errorf("failed to verify controller, got: %v", err) + } + }) + } +} + +// createController initializes a new Controller instance using the provided +// Kubernetes REST configuration, shared informer factory, and REST mapper. +// It returns the created Controller or an error if initialization fails. +func createController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, restMapper meta.RESTMapper) (*Controller, error) { + newController, err := NewController(restConfig, factory, restMapper) + if err != nil { + return nil, fmt.Errorf("failed to create new controller, got: %v", err) + } + return newController, nil +} + +// upsertCluster creates or updates a Cluster resource in the Kubernetes API using the provided +// client, labels, API endpoint, cluster name, and resource version. +func upsertCluster(client *fakekarmadaclient.Clientset, labels map[string]string, apiEndpoint, clusterName, resourceVersion string) error { + cluster := &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + ResourceVersion: resourceVersion, + Labels: labels, + }, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: apiEndpoint, + }, + Status: clusterv1alpha1.ClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: clusterv1alpha1.ClusterConditionReady, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + // Try to create the Cluster. + _, err := client.ClusterV1alpha1().Clusters().Create(context.TODO(), cluster, metav1.CreateOptions{}) + if err == nil { + // Successfully created the Cluster. + return nil + } + + // If the Cluster already exists, update it. + if apierrors.IsAlreadyExists(err) { + _, updateErr := client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{}) + if updateErr != nil { + return fmt.Errorf("failed to update cluster: %v", updateErr) + } + return nil + } + + // Return any other errors encountered. + return err +} + +// upsertClusterControllerRuntime creates or updates a Cluster resource using the controller-runtime +// client. The function takes labels, API endpoint, and cluster name to define the Cluster. +func upsertClusterControllerRuntime(controlPlaneClient client.Client, labels map[string]string, clusterName, apiEndpoint string) error { + cluster := &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Labels: labels, + }, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: apiEndpoint, + }, + Status: clusterv1alpha1.ClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: clusterv1alpha1.ClusterConditionReady, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + // Try to create the Cluster. + err := controlPlaneClient.Create(context.TODO(), cluster) + if err == nil { + // Successfully created the Cluster. + return nil + } + + // If the Cluster already exists, update it. + if apierrors.IsAlreadyExists(err) { + if updateErr := controlPlaneClient.Update(context.TODO(), cluster); updateErr != nil { + return fmt.Errorf("failed to update cluster: %v", updateErr) + } + return nil + } + + return err +} + +// upsertResourceRegistry creates or updates a ResourceRegistry resource in the Kubernetes API. +// It uses the provided client, resource selectors, registry name, resource version, and target cluster names. +func upsertResourceRegistry(client *fakekarmadaclient.Clientset, resourceSelectors []searchv1alpha1.ResourceSelector, registryName, resourceVersion string, clusterNames []string) error { + resourceRegistry := &searchv1alpha1.ResourceRegistry{ + ObjectMeta: metav1.ObjectMeta{ + Name: registryName, + ResourceVersion: resourceVersion, + }, + Spec: searchv1alpha1.ResourceRegistrySpec{ + TargetCluster: policyv1alpha1.ClusterAffinity{ + ClusterNames: clusterNames, + }, + ResourceSelectors: resourceSelectors, + }, + } + + // Try to create the ResourceRegistry. + _, err := client.SearchV1alpha1().ResourceRegistries().Create(context.TODO(), resourceRegistry, metav1.CreateOptions{}) + if err == nil { + // Successfully created the ResourceRegistry. + return nil + } + + // If the ResourceRegistry already exists, update it. + if apierrors.IsAlreadyExists(err) { + _, updateErr := client.SearchV1alpha1().ResourceRegistries().Update(context.TODO(), resourceRegistry, metav1.UpdateOptions{}) + if updateErr != nil { + return fmt.Errorf("failed to update ResourceRegistry: %v", updateErr) + } + return nil + } + + // Return any other errors encountered. + return err +} + +// deleteCluster deletes a Cluster resource by name from the Kubernetes API using the provided client. +func deleteCluster(client *fakekarmadaclient.Clientset, clusterName string) error { + if err := client.ClusterV1alpha1().Clusters().Delete(context.TODO(), clusterName, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("failed to delete cluster, got: %v", err) + } + return nil +} + +// deleteResourceRegistry deletes a ResourceRegistry resource by name from the Kubernetes API +// using the provided client. +func deleteResourceRegistry(client *fakekarmadaclient.Clientset, resourceRegistryName string) error { + if err := client.SearchV1alpha1().ResourceRegistries().Delete(context.TODO(), resourceRegistryName, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("failed to delete resource registry, got: %v", err) + } + return nil +} + +// cacheNextWrapper calls the cacheNext method on the provided Controller instance. +// If the cacheNext method fails, it returns an error. +func cacheNextWrapper(controller *Controller) error { + if !controller.cacheNext() { + return errors.New("failed to cache next object") + } + return nil +}