From f002f9265b722d18726aa6f066b0018f1e698c6e Mon Sep 17 00:00:00 2001 From: Andrei Kvapil Date: Mon, 7 Oct 2024 15:31:33 +0200 Subject: [PATCH 1/3] lb: endpointslice controller for External Traffic Policy Local. Signed-off-by: Bart Vercoulen --- .../kubevirteps.go | 109 +++ cmd/kubevirt-cloud-controller-manager/main.go | 12 +- .../kubevirteps/kubevirteps_controller.go | 681 ++++++++++++++++++ .../kubevirteps_controller_suite_test.go | 13 + .../kubevirteps_controller_test.go | 635 ++++++++++++++++ .../kubevirteps_controller_utils.go | 98 +++ pkg/provider/cloud.go | 49 +- pkg/provider/cloud_test.go | 2 +- pkg/provider/loadbalancer.go | 20 +- pkg/provider/loadbalancer_test.go | 63 +- 10 files changed, 1651 insertions(+), 31 deletions(-) create mode 100644 cmd/kubevirt-cloud-controller-manager/kubevirteps.go create mode 100644 pkg/controller/kubevirteps/kubevirteps_controller.go create mode 100644 pkg/controller/kubevirteps/kubevirteps_controller_suite_test.go create mode 100644 pkg/controller/kubevirteps/kubevirteps_controller_test.go create mode 100644 pkg/controller/kubevirteps/kubevirteps_controller_utils.go diff --git a/cmd/kubevirt-cloud-controller-manager/kubevirteps.go b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go new file mode 100644 index 000000000..e6b41fbb0 --- /dev/null +++ b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go @@ -0,0 +1,109 @@ +package main + +import ( + "context" + "fmt" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + cloudprovider "k8s.io/cloud-provider" + "k8s.io/cloud-provider/app" + "k8s.io/cloud-provider/app/config" + genericcontrollermanager "k8s.io/controller-manager/app" + "k8s.io/controller-manager/controller" + "k8s.io/klog/v2" + "kubevirt.io/cloud-provider-kubevirt/pkg/controller/kubevirteps" + kubevirt "kubevirt.io/cloud-provider-kubevirt/pkg/provider" +) + +func StartKubevirtCloudControllerWrapper(initContext app.ControllerInitContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) app.InitFunc { + return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) { + return startKubevirtCloudController(ctx, initContext, controllerContext, completedConfig, cloud) + } +} + +func startKubevirtCloudController( + ctx context.Context, + initContext app.ControllerInitContext, + controllerContext genericcontrollermanager.ControllerContext, + ccmConfig *config.CompletedConfig, + cloud cloudprovider.Interface) (controller.Interface, bool, error) { + + klog.Infof(fmt.Sprintf("Starting %s.", kubevirteps.ControllerName)) + + kubevirtCloud, ok := cloud.(*kubevirt.Cloud) + if !ok { + err := fmt.Errorf("%s does not support %v provider", kubevirteps.ControllerName, cloud.ProviderName()) + return nil, false, err + } + + if !*kubevirtCloud.GetCloudConfig().LoadBalancer.EnableEPSController { + klog.Infof(fmt.Sprintf("%s is not enabled.", kubevirteps.ControllerName)) + return nil, false, nil + } + + klog.Infof("Setting up tenant client.") + + var tenantClient kubernetes.Interface + // This is the kubeconfig for the tenant (in-cluster) cluster + tenantClient, err := kubernetes.NewForConfig(ccmConfig.Complete().Kubeconfig) + if err != nil { + return nil, false, err + } + + klog.Infof("Setting up infra client.") + + // This is the kubeconfig for the infra cluster + infraKubeConfig, err := kubevirtCloud.GetInfraKubeconfig() + if err != nil { + klog.Errorf("Failed to get infra kubeconfig: %v", err) + return nil, false, err + } + + var infraClientConfig clientcmd.ClientConfig + infraClientConfig, err = clientcmd.NewClientConfigFromBytes([]byte(infraKubeConfig)) + if err != nil { + klog.Errorf("Failed to create client config for infra cluster: %v", err) + return nil, false, err + } + + infraConfig, err := infraClientConfig.ClientConfig() + if err != nil { + klog.Errorf("Failed to create client config for infra cluster: %v", err) + return nil, false, err + } + + var infraClient kubernetes.Interface + + // create new client for the infra cluster + infraClient, err = kubernetes.NewForConfig(infraConfig) + if err != nil { + klog.Errorf("Failed to create client for infra cluster: %v", err) + return nil, false, err + } + + var infraDynamic dynamic.Interface + + infraDynamic, err = dynamic.NewForConfig(infraConfig) + if err != nil { + klog.Errorf("Failed to create dynamic client for infra cluster: %v", err) + return nil, false, err + } + + klog.Infof("Setting up kubevirtEPSController") + + kubevirtEPSController := kubevirteps.NewKubevirtEPSController(tenantClient, infraClient, infraDynamic, kubevirtCloud.Namespace()) + + klog.Infof("Initializing kubevirtEPSController") + + err = kubevirtEPSController.Init() + if err != nil { + klog.Errorf("Failed to initialize kubevirtEPSController: %v", err) + return nil, false, err + } + + klog.Infof("Running kubevirtEPSController") + go kubevirtEPSController.Run(1, controllerContext.Stop, controllerContext.ControllerManagerMetrics) + + return nil, false, nil +} diff --git a/cmd/kubevirt-cloud-controller-manager/main.go b/cmd/kubevirt-cloud-controller-manager/main.go index 83372c342..564b52007 100644 --- a/cmd/kubevirt-cloud-controller-manager/main.go +++ b/cmd/kubevirt-cloud-controller-manager/main.go @@ -23,6 +23,8 @@ package main import ( "os" + "kubevirt.io/cloud-provider-kubevirt/pkg/controller/kubevirteps" + "k8s.io/apimachinery/pkg/util/wait" cloudprovider "k8s.io/cloud-provider" "k8s.io/cloud-provider/app" @@ -33,8 +35,6 @@ import ( _ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins _ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration "k8s.io/klog/v2" - - _ "kubevirt.io/cloud-provider-kubevirt/pkg/provider" ) func main() { @@ -45,8 +45,14 @@ func main() { fss := cliflag.NamedFlagSets{} controllerInitializers := app.DefaultInitFuncConstructors + controllerAliases := map[string]string{"kubevirt-eps": kubevirteps.ControllerName.String()} + + // add kubevirt-cloud-controller to the list of controllers + controllerInitializers[kubevirteps.ControllerName.String()] = app.ControllerInitFuncConstructor{ + Constructor: StartKubevirtCloudControllerWrapper, + } - command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, map[string]string{}, fss, wait.NeverStop) + command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, controllerAliases, fss, wait.NeverStop) code := cli.Run(command) os.Exit(code) } diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go new file mode 100644 index 000000000..9366d7c26 --- /dev/null +++ b/pkg/controller/kubevirteps/kubevirteps_controller.go @@ -0,0 +1,681 @@ +package kubevirteps + +import ( + "context" + "errors" + "fmt" + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers" + endpointsliceutil "k8s.io/endpointslice/util" + "k8s.io/klog/v2" + kubevirtv1 "kubevirt.io/api/core/v1" + kubevirt "kubevirt.io/cloud-provider-kubevirt/pkg/provider" + "strings" + "time" +) + +const ( + ControllerName = controllerName("kubevirt_eps_controller") +) + +type controllerName string + +func (c controllerName) dashed() string { + // replace underscores with dashes + return strings.ReplaceAll(string(c), "_", "-") +} + +func (c controllerName) String() string { + return string(c) +} + +type Controller struct { + tenantClient kubernetes.Interface + tenantFactory informers.SharedInformerFactory + tenantEPSTracker tenantEPSTracker + + infraClient kubernetes.Interface + infraDynamic dynamic.Interface + infraFactory informers.SharedInformerFactory + + infraNamespace string + queue workqueue.RateLimitingInterface + maxRetries int + + maxEndPointsPerSlice int +} + +func NewKubevirtEPSController( + tenantClient kubernetes.Interface, + infraClient kubernetes.Interface, + infraDynamic dynamic.Interface, + infraNamespace string) *Controller { + + tenantFactory := informers.NewSharedInformerFactory(tenantClient, 0) + infraFactory := informers.NewSharedInformerFactoryWithOptions(infraClient, 0, informers.WithNamespace(infraNamespace)) + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + + return &Controller{ + tenantClient: tenantClient, + tenantFactory: tenantFactory, + tenantEPSTracker: tenantEPSTracker{}, + infraClient: infraClient, + infraDynamic: infraDynamic, + infraFactory: infraFactory, + infraNamespace: infraNamespace, + queue: queue, + maxRetries: 25, + maxEndPointsPerSlice: 100, + } +} + +type ReqType string + +const ( + AddReq ReqType = "add" + UpdateReq ReqType = "update" + DeleteReq ReqType = "delete" +) + +type Request struct { + ReqType ReqType + Obj interface{} + OldObj interface{} +} + +func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request { + return &Request{ + ReqType: reqType, + Obj: obj, + OldObj: oldObj, + } +} + +func (c *Controller) Init() error { + + // Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function. + // We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly. + _, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + // cast obj to Service + svc := obj.(*v1.Service) + // Only act on Services of type LoadBalancer + if svc.Spec.Type == v1.ServiceTypeLoadBalancer { + klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name) + c.queue.Add(newRequest(AddReq, obj, nil)) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // cast obj to Service + newSvc := newObj.(*v1.Service) + // Only act on Services of type LoadBalancer + if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer { + klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name) + c.queue.Add(newRequest(UpdateReq, newObj, oldObj)) + } + }, + DeleteFunc: func(obj interface{}) { + // cast obj to Service + svc := obj.(*v1.Service) + // Only act on Services of type LoadBalancer + if svc.Spec.Type == v1.ServiceTypeLoadBalancer { + klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name) + c.queue.Add(newRequest(DeleteReq, obj, nil)) + } + }, + }) + if err != nil { + return err + } + + // Monitor endpoint slices that we are interested in based on known services in the infra cluster + _, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + if c.tenantEPSTracker.contains(eps) { + klog.Infof("get Infra Service for Tenant EndpointSlice: %v/%v", eps.Namespace, eps.Name) + infraSvc, err := c.getInfraServiceFromTenantEPS(context.TODO(), eps) + if err != nil { + klog.Errorf("Failed to get Service in Infra cluster for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, err) + return + } + klog.Infof("EndpointSlice added: %v/%v", eps.Namespace, eps.Name) + c.queue.Add(newRequest(AddReq, infraSvc, nil)) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + eps := newObj.(*discovery.EndpointSlice) + if c.tenantEPSTracker.contains(eps) { + klog.Infof("get Infra Service for Tenant EndpointSlice: %v/%v", eps.Namespace, eps.Name) + infraSvc, err := c.getInfraServiceFromTenantEPS(context.TODO(), eps) + if err != nil { + klog.Errorf("Failed to get Service in Infra cluster for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, err) + return + } + klog.Infof("EndpointSlice updated: %v/%v", eps.Namespace, eps.Name) + c.queue.Add(newRequest(UpdateReq, infraSvc, nil)) + } + }, + DeleteFunc: func(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + if c.tenantEPSTracker.contains(eps) { + c.tenantEPSTracker.remove(eps) + klog.Infof("get Infra Service for Tenant EndpointSlice: %v/%v", eps.Namespace, eps.Name) + infraSvc, err := c.getInfraServiceFromTenantEPS(context.TODO(), eps) + if err != nil { + klog.Errorf("Failed to get Service in Infra cluster for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, err) + return + } + klog.Infof("EndpointSlice deleted: %v/%v", eps.Namespace, eps.Name) + c.queue.Add(newRequest(DeleteReq, infraSvc, nil)) + } + }, + }) + if err != nil { + return err + } + + //TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes + return nil +} + +// Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster. +func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + defer utilruntime.HandleCrash() + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + defer c.queue.ShutDown() + + klog.Infof(fmt.Sprintf("Starting %s", ControllerName)) + defer klog.Infof(fmt.Sprintf("Shutting down %s", ControllerName)) + controllerManagerMetrics.ControllerStarted(ControllerName.String()) + defer controllerManagerMetrics.ControllerStopped(ControllerName.String()) + + c.tenantFactory.Start(stopCh) + c.infraFactory.Start(stopCh) + + if !cache.WaitForNamedCacheSync(ControllerName.String(), stopCh, + c.infraFactory.Core().V1().Services().Informer().HasSynced, + c.tenantFactory.Discovery().V1().EndpointSlices().Informer().HasSynced) { + return + } + + for i := 0; i < numWorkers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + + <-stopCh +} + +// worker pattern adapted from https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextItem(ctx) { + } +} + +func (c *Controller) processNextItem(ctx context.Context) bool { + req, quit := c.queue.Get() + if quit { + return false + } + + defer c.queue.Done(req) + + err := c.reconcile(ctx, req.(*Request)) + + if err == nil { + c.queue.Forget(req) + } else if c.queue.NumRequeues(req) < c.maxRetries { + c.queue.AddRateLimited(req) + } else { + c.queue.Forget(req) + klog.Errorf("Dropping object out of queue after too many retries: %v", req) + utilruntime.HandleError(err) + } + + return true +} + +// getInfraServiceFromTenantEPS returns the Service in the infra cluster that is associated with the given tenant endpoint slice. +func (c *Controller) getInfraServiceFromTenantEPS(ctx context.Context, slice *discovery.EndpointSlice) (*v1.Service, error) { + infraServices, err := c.infraClient.CoreV1().Services(c.infraNamespace).List(ctx, + metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s,%s=%s", kubevirt.TenantServiceNameLabelKey, slice.Labels["kubernetes.io/service-name"], + kubevirt.TenantServiceNamespaceLabelKey, slice.Namespace)}) + if err != nil { + klog.Errorf("Failed to get Service in Infra for EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err) + return nil, err + } + if len(infraServices.Items) > 1 { + // This should never be possible, only one service should exist for a given tenant endpoint slice + klog.Errorf("Multiple services found for tenant endpoint slice %s in namespace %s", slice.Name, slice.Namespace) + return nil, errors.New("multiple services found for tenant endpoint slice") + } + if len(infraServices.Items) == 1 { + return &infraServices.Items[0], nil + } + // No service found, possible if service is deleted. + return nil, nil +} + +// getTenantEPSFromInfraService returns the EndpointSlices in the tenant cluster that are associated with the given infra service. +func (c *Controller) getTenantEPSFromInfraService(ctx context.Context, svc *v1.Service) ([]*discovery.EndpointSlice, error) { + var tenantEPSSlices []*discovery.EndpointSlice + tenantServiceName := svc.Labels[kubevirt.TenantServiceNameLabelKey] + tenantServiceNamespace := svc.Labels[kubevirt.TenantServiceNamespaceLabelKey] + clusterName := svc.Labels[kubevirt.TenantClusterNameLabelKey] + klog.Infof("Searching for endpoints on tenant cluster %s for service %s in namespace %s.", clusterName, tenantServiceName, tenantServiceNamespace) + result, err := c.tenantClient.DiscoveryV1().EndpointSlices(tenantServiceNamespace).List(ctx, + metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discovery.LabelServiceName, tenantServiceName)}) + if err != nil { + klog.Errorf("Failed to get EndpointSlices for Service %s in namespace %s: %v", tenantServiceName, + tenantServiceNamespace, err) + return nil, err + } + for _, eps := range result.Items { + c.tenantEPSTracker.add(&eps) + tenantEPSSlices = append(tenantEPSSlices, &eps) + } + return tenantEPSSlices, nil +} + +// getInfraEPSFromInfraService returns the EndpointSlices in the infra cluster that are associated with the given infra service. +func (c *Controller) getInfraEPSFromInfraService(ctx context.Context, svc *v1.Service) ([]*discovery.EndpointSlice, error) { + var infraEPSSlices []*discovery.EndpointSlice + klog.Infof("Searching for endpoints on infra cluster for service %s in namespace %s.", svc.Name, svc.Namespace) + result, err := c.infraClient.DiscoveryV1().EndpointSlices(svc.Namespace).List(ctx, + metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discovery.LabelServiceName, svc.Name)}) + if err != nil { + klog.Errorf("Failed to get EndpointSlices for Service %s in namespace %s: %v", svc.Name, svc.Namespace, err) + return nil, err + } + for _, eps := range result.Items { + infraEPSSlices = append(infraEPSSlices, &eps) + } + return infraEPSSlices, nil +} + +func (c *Controller) reconcile(ctx context.Context, r *Request) error { + service, ok := r.Obj.(*v1.Service) + if !ok { + return errors.New("could not cast object to service") + } + + if service.Labels[kubevirt.TenantServiceNameLabelKey] == "" || + service.Labels[kubevirt.TenantServiceNamespaceLabelKey] == "" || + service.Labels[kubevirt.TenantClusterNameLabelKey] == "" { + klog.Infof("This LoadBalancer Service: %s is not managed by the %s. Skipping.", service.Name, ControllerName) + return nil + } + klog.Infof("Reconciling: %v", service.Name) + + serviceDeleted := false + svc, err := c.infraFactory.Core().V1().Services().Lister().Services(c.infraNamespace).Get(service.Name) + if err != nil { + klog.Infof("Service %s in namespace %s is deleted.", service.Name, service.Namespace) + serviceDeleted = true + } else { + service = svc + } + + infraExistingEpSlices, err := c.getInfraEPSFromInfraService(ctx, service) + if err != nil { + return err + } + + // At this point we have the current state of the 3 main objects we are interested in: + // 1. The Service in the infra cluster, the one created by the KubevirtCloudController. + // 2. The EndpointSlices in the tenant cluster, created for the tenant cluster's Service. + // 3. The EndpointSlices in the infra cluster, managed by this controller. + + slicesToDelete := []*discovery.EndpointSlice{} + slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) + + serviceSupportedAddressesTypes := getAddressTypesForService(service) + // If the services switched to a different address type, we need to delete the old ones, because it's immutable. + // If the services switched to a different externalTrafficPolicy, we need to delete the old ones. + for _, eps := range infraExistingEpSlices { + if service.Spec.ExternalTrafficPolicy != v1.ServiceExternalTrafficPolicyTypeLocal || serviceDeleted { + klog.Infof("Added for deletion EndpointSlice %s in namespace %s because it has an externalTrafficPolicy different from Local", eps.Name, eps.Namespace) + // to be sure we don't delete any slice that is not managed by us + if c.managedByController(eps) { + slicesToDelete = append(slicesToDelete, eps) + } + continue + } + if !serviceSupportedAddressesTypes.Has(eps.AddressType) { + klog.Infof("Added for deletion EndpointSlice %s in namespace %s because it has an unsupported address type: %v", eps.Name, eps.Namespace, eps.AddressType) + slicesToDelete = append(slicesToDelete, eps) + continue + } + slicesByAddressType[eps.AddressType] = append(slicesByAddressType[eps.AddressType], eps) + } + + if !serviceDeleted { + // Get tenant's endpoint slices for this service + tenantEpSlices, err := c.getTenantEPSFromInfraService(ctx, service) + if err != nil { + return err + } + + // Reconcile the EndpointSlices for each address type e.g. ipv4, ipv6 + for addressType := range serviceSupportedAddressesTypes { + existingSlices := slicesByAddressType[addressType] + err := c.reconcileByAddressType(service, tenantEpSlices, existingSlices, addressType) + if err != nil { + return err + } + } + } + + // Delete the EndpointSlices that are no longer needed + for _, eps := range slicesToDelete { + err := c.infraClient.DiscoveryV1().EndpointSlices(eps.Namespace).Delete(context.TODO(), eps.Name, metav1.DeleteOptions{}) + if err != nil { + klog.Errorf("Failed to delete EndpointSlice %s in namespace %s: %v", eps.Name, eps.Namespace, err) + return err + } + klog.Infof("Deleted EndpointSlice %s in namespace %s", eps.Name, eps.Namespace) + } + + return nil +} + +//TODO: From here cleanup! + +func (c *Controller) reconcileByAddressType(service *v1.Service, tenantSlices []*discovery.EndpointSlice, existingSlices []*discovery.EndpointSlice, addressType discovery.AddressType) error { + + slicesToCreate := []*discovery.EndpointSlice{} + slicesToUpdate := []*discovery.EndpointSlice{} + slicesToDelete := []*discovery.EndpointSlice{} + slicesUntouched := []*discovery.EndpointSlice{} + + // Create the desired port configuration + var desiredPorts []discovery.EndpointPort + + for _, port := range service.Spec.Ports { + desiredPorts = append(desiredPorts, discovery.EndpointPort{ + Port: &port.TargetPort.IntVal, + Protocol: &port.Protocol, + Name: &port.Name, + }) + } + + // Create the desired endpoint configuration + var desiredEndpoints []*discovery.Endpoint + desiredEndpoints = c.getDesiredEndpoints(service, tenantSlices) + desiredEndpointSet := endpointsliceutil.EndpointSet{} + desiredEndpointSet.Insert(desiredEndpoints...) + + // 1. Iterate through existing slices, delete endpoints that are no longer + // desired and update matching endpoints that have changed. It also checks + // if the slices have the labels of the parent services, and updates them if not. + for _, existingSlice := range existingSlices { + var coveredEndpoints []discovery.Endpoint + sliceUpdated := false + // first enforce the right portmapping + if !apiequality.Semantic.DeepEqual(existingSlice.Ports, desiredPorts) { + existingSlice.Ports = desiredPorts + sliceUpdated = true + } + for _, endpoint := range existingSlice.Endpoints { + present := desiredEndpointSet.Get(&endpoint) + if present != nil { + // one of the desired endpoint is covered by this slice + coveredEndpoints = append(coveredEndpoints, *present) + // Check if the endpoint needs updating + if !endpointsliceutil.EndpointsEqualBeyondHash(present, &endpoint) { + sliceUpdated = true + } + // remove endpoint from desired set because it's already covered. + desiredEndpointSet.Delete(&endpoint) + } + } + // Check if the labels need updating + labels, labelsChanged := c.ensureEndpointSliceLabels(existingSlice, service) + + // If an endpoint was updated or removed, mark for update or delete + if sliceUpdated || labelsChanged || len(existingSlice.Endpoints) != len(coveredEndpoints) { + if len(coveredEndpoints) == 0 { + // No endpoint that is desired is covered by this slice, so it should be deleted + slicesToDelete = append(slicesToDelete, existingSlice) + } else { + // Here we override the existing endpoints with the covered ones + // This also deletes the unwanted endpoints from the existing slice + existingSlice.Endpoints = coveredEndpoints + existingSlice.Labels = labels + slicesToUpdate = append(slicesToUpdate, existingSlice) + } + } else { + slicesUntouched = append(slicesUntouched, existingSlice) + } + } + // 2. Iterate through slices that have been modified in 1 and fill them up with + // any remaining desired endpoints. + // FillAlreadyUpdatedSlicesWithDesiredEndpoints + if desiredEndpointSet.Len() > 0 { + for _, existingUpdatedSlice := range slicesToUpdate { + for len(existingUpdatedSlice.Endpoints) < c.maxEndPointsPerSlice { + endpoint, ok := desiredEndpointSet.PopAny() + if !ok { + break + } + existingUpdatedSlice.Endpoints = append(existingUpdatedSlice.Endpoints, *endpoint) + } + } + } + + // 3. If there still desired endpoints left, try to fit them into a previously + // unchanged slice and/or create new ones. + // FillUntouchedSlicesWithDesiredEndpoints + if desiredEndpointSet.Len() > 0 { + for _, untouchedSlice := range slicesUntouched { + for len(untouchedSlice.Endpoints) < c.maxEndPointsPerSlice { + endpoint, ok := desiredEndpointSet.PopAny() + if !ok { + break + } + untouchedSlice.Endpoints = append(untouchedSlice.Endpoints, *endpoint) + } + slicesToUpdate = append(slicesToUpdate, untouchedSlice) + } + } + + // 4. If there still desired endpoints left, create new slices. + if desiredEndpointSet.Len() > 0 { + slice := c.newSlice(service, desiredPorts, addressType) + slice.Labels, _ = c.ensureEndpointSliceLabels(slice, service) + for len(slice.Endpoints) < c.maxEndPointsPerSlice { + endpoint, ok := desiredEndpointSet.PopAny() + if !ok { + break + } + slice.Endpoints = append(slice.Endpoints, *endpoint) + } + slicesToCreate = append(slicesToCreate, slice) + } + + return c.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete) +} + +func ownedBy(endpointSlice *discovery.EndpointSlice, svc *v1.Service) bool { + for _, o := range endpointSlice.OwnerReferences { + if o.UID == svc.UID && o.Kind == "Service" && o.APIVersion == "v1" { + return true + } + } + return false +} + +func (c *Controller) finalize(service *v1.Service, slicesToCreate []*discovery.EndpointSlice, slicesToUpdate []*discovery.EndpointSlice, slicesToDelete []*discovery.EndpointSlice) error { + // If there are slices to delete and slices to create, make them as update + for i := 0; i < len(slicesToDelete); { + if len(slicesToCreate) == 0 { + break + } + if slicesToDelete[i].AddressType == slicesToCreate[0].AddressType && ownedBy(slicesToDelete[i], service) { + slicesToCreate[0].Name = slicesToDelete[i].Name + slicesToCreate = slicesToCreate[1:] + slicesToUpdate = append(slicesToUpdate, slicesToCreate[0]) + slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...) + } else { + i++ + } + } + + // Create the new slices if service is not marked for deletion + if service.DeletionTimestamp == nil { + for _, slice := range slicesToCreate { + createdSlice, err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("Failed to create EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err) + if k8serrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { + return nil + } + return err + } + klog.Infof("Created EndpointSlice %s in namespace %s", createdSlice.Name, createdSlice.Namespace) + } + } + + // Update slices + for _, slice := range slicesToUpdate { + _, err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Update(context.TODO(), slice, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Failed to update EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err) + return err + } + klog.Infof("Updated EndpointSlice %s in namespace %s", slice.Name, slice.Namespace) + } + + // Delete slices + for _, slice := range slicesToDelete { + err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Delete(context.TODO(), slice.Name, metav1.DeleteOptions{}) + if err != nil { + klog.Errorf("Failed to delete EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err) + return err + } + klog.Infof("Deleted EndpointSlice %s in namespace %s", slice.Name, slice.Namespace) + } + + return nil +} + +func (c *Controller) newSlice(service *v1.Service, desiredPorts []discovery.EndpointPort, addressType discovery.AddressType) *discovery.EndpointSlice { + ownerRef := metav1.NewControllerRef(service, schema.GroupVersionKind{Version: "v1", Kind: "Service"}) + + slice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + GenerateName: service.Name, + Namespace: service.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Ports: desiredPorts, + AddressType: addressType, + Endpoints: []discovery.Endpoint{}, + } + return slice +} + +func (c *Controller) getDesiredEndpoints(service *v1.Service, tenantSlices []*discovery.EndpointSlice) []*discovery.Endpoint { + var desiredEndpoints []*discovery.Endpoint + if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { + // Extract the desired endpoints from the tenant EndpointSlices + // for extracting the nodes it does not matter what type of address we are dealing with + // all nodes with an endpoint for a corresponding slice will be selected. + nodeSet := sets.Set[string]{} + for _, slice := range tenantSlices { + for _, endpoint := range slice.Endpoints { + // find all unique nodes that correspond to an endpoint in a tenant slice + nodeSet.Insert(*endpoint.NodeName) + } + } + + klog.Infof("Desired nodes for service %s in namespace %s: %v", service.Name, service.Namespace, sets.List(nodeSet)) + + for _, node := range sets.List(nodeSet) { + // find vmi for node name + obj := &unstructured.Unstructured{} + vmi := &kubevirtv1.VirtualMachineInstance{} + + obj, err := c.infraDynamic.Resource(kubevirtv1.VirtualMachineInstanceGroupVersionKind.GroupVersion().WithResource("virtualmachineinstances")).Namespace(c.infraNamespace).Get(context.TODO(), node, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Failed to get VirtualMachineInstance %s in namespace %s:%v", node, c.infraNamespace, err) + continue + } + + err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, vmi) + if err != nil { + klog.Errorf("Failed to convert Unstructured to VirtualMachineInstance: %v", err) + klog.Fatal(err) + } + + ready := vmi.Status.Phase == kubevirtv1.Running + serving := vmi.Status.Phase == kubevirtv1.Running + terminating := vmi.Status.Phase == kubevirtv1.Failed || vmi.Status.Phase == kubevirtv1.Succeeded + + for _, i := range vmi.Status.Interfaces { + if i.Name == "default" { + desiredEndpoints = append(desiredEndpoints, &discovery.Endpoint{ + Addresses: []string{i.IP}, + Conditions: discovery.EndpointConditions{ + Ready: &ready, + Serving: &serving, + Terminating: &terminating, + }, + NodeName: &vmi.Status.NodeName, + }) + continue + } + } + } + } + + return desiredEndpoints +} + +func (c *Controller) ensureEndpointSliceLabels(slice *discovery.EndpointSlice, svc *v1.Service) (map[string]string, bool) { + labels := make(map[string]string) + labelsChanged := false + + for k, v := range slice.Labels { + labels[k] = v + } + + for k, v := range svc.ObjectMeta.Labels { + labels[k] = v + } + + labels[discovery.LabelServiceName] = svc.Name + labels[discovery.LabelManagedBy] = ControllerName.dashed() + if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == v1.ClusterIPNone { + labels[v1.IsHeadlessService] = "" + } else { + delete(labels, v1.IsHeadlessService) + } + + if !apiequality.Semantic.DeepEqual(slice.Labels, labels) { + labelsChanged = true + } + return labels, labelsChanged +} + +func (c *Controller) managedByController(slice *discovery.EndpointSlice) bool { + return slice.Labels[discovery.LabelManagedBy] == ControllerName.dashed() +} diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_suite_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_suite_test.go new file mode 100644 index 000000000..59cb0da07 --- /dev/null +++ b/pkg/controller/kubevirteps/kubevirteps_controller_suite_test.go @@ -0,0 +1,13 @@ +package kubevirteps_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "testing" +) + +func TestProvider(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "KubevirtEPS Controller Suite") +} diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_test.go new file mode 100644 index 000000000..7e645a0bc --- /dev/null +++ b/pkg/controller/kubevirteps/kubevirteps_controller_test.go @@ -0,0 +1,635 @@ +package kubevirteps + +import ( + "context" + g "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + dfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/component-base/metrics/prometheus/controllers" + "k8s.io/klog/v2" + kubevirtv1 "kubevirt.io/api/core/v1" + kubevirt "kubevirt.io/cloud-provider-kubevirt/pkg/provider" +) + +const ( + tenantNamespace = "tenant-namespace" + infraNamespace = "test" +) + +type testKubevirtEPSController struct { + controller *Controller + tenantClient *fake.Clientset + infraClient *fake.Clientset + infraDynamic *dfake.FakeDynamicClient +} + +func createInfraServiceLB(name, tenantServiceName, clusterName string, servicePort v1.ServicePort, externalTrafficPolicy v1.ServiceExternalTrafficPolicy) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: infraNamespace, + Labels: map[string]string{ + kubevirt.TenantServiceNameLabelKey: tenantServiceName, + kubevirt.TenantServiceNamespaceLabelKey: tenantNamespace, + kubevirt.TenantClusterNameLabelKey: clusterName, + }, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + servicePort, + }, + Type: v1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: externalTrafficPolicy, + IPFamilies: []v1.IPFamily{ + v1.IPv4Protocol, + }, + }, + } +} + +func createUnstructuredVMINode(name, nodeName, ip string) *unstructured.Unstructured { + vmi := &unstructured.Unstructured{} + vmi.SetUnstructuredContent(map[string]interface{}{ + "apiVersion": "kubevirt.io/v1", + "kind": "VirtualMachineInstance", + "metadata": map[string]interface{}{ + "name": name, + "namespace": infraNamespace, + }, + "status": map[string]interface{}{ + "phase": "Running", + "nodeName": nodeName, + "interfaces": []interface{}{ + map[string]interface{}{ + "name": "default", + "ipAddress": ip, + }, + }, + }, + }) + return vmi +} + +func createPort(name string, port int32, protocol v1.Protocol) *discoveryv1.EndpointPort { + return &discoveryv1.EndpointPort{ + Name: &name, + Port: &port, + Protocol: &protocol, + } +} + +func createEndpoint(ip, nodeName string, ready, serving, terminating bool) *discoveryv1.Endpoint { + return &discoveryv1.Endpoint{ + Addresses: []string{ip}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &ready, + Serving: &serving, + Terminating: &terminating, + }, + NodeName: &nodeName, + } +} + +func createTenantEPSlice( + name, labelServiceName string, addressType discoveryv1.AddressType, + port discoveryv1.EndpointPort, endpoints []discoveryv1.Endpoint) *discoveryv1.EndpointSlice { + return &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: tenantNamespace, + Labels: map[string]string{ + discoveryv1.LabelServiceName: labelServiceName, + }, + }, + AddressType: addressType, + Ports: []discoveryv1.EndpointPort{ + port, + }, + Endpoints: endpoints, + } +} + +func createAndAssertVMI(node, nodeName, ip string) { + vmi := createUnstructuredVMINode(node, nodeName, ip) + _, err := testVals.infraDynamic.Resource(kubevirtv1.VirtualMachineInstanceGroupVersionKind.GroupVersion().WithResource("virtualmachineinstances")). + Namespace(infraNamespace).Create(context.TODO(), vmi, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Eventually(func() (bool, error) { + vmiList, err := testVals.infraDynamic.Resource(kubevirtv1.VirtualMachineInstanceGroupVersionKind.GroupVersion().WithResource("virtualmachineinstances")). + Namespace(infraNamespace).Get(context.TODO(), node, metav1.GetOptions{}) + if err == nil || vmiList != nil { + return true, err + } + return false, err + }).Should(BeTrue(), "VMI in infra cluster should be created") +} + +func createAndAssertTenantSlice(name, labelServiceName string, addressType discoveryv1.AddressType, port discoveryv1.EndpointPort, endpoints []discoveryv1.Endpoint) { + epSlice := createTenantEPSlice(name, labelServiceName, addressType, port, endpoints) + _, _ = testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Create(context.TODO(), epSlice, metav1.CreateOptions{}) + // Check if tenant Endpointslice is created + Eventually(func() (bool, error) { + eps, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err == nil || eps != nil { + return true, err + } + return false, err + }).Should(BeTrue(), "EndpointSlice in tenant cluster should be created") +} + +func createAndAssertInfraServiceLB(name, tenantServiceName, clusterName string, servicePort v1.ServicePort, externalTrafficPolicy v1.ServiceExternalTrafficPolicy) { + svc := createInfraServiceLB(name, tenantServiceName, clusterName, servicePort, externalTrafficPolicy) + _, _ = testVals.infraClient.CoreV1().Services(infraNamespace).Create(context.TODO(), svc, metav1.CreateOptions{}) + // Check if the service is created + Eventually(func() (bool, error) { + svc, err := testVals.infraClient.CoreV1().Services(infraNamespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err == nil || svc != nil { + return true, err + } + return false, err + + }).Should(BeTrue(), "") +} + +func setupTestKubevirtEPSController(ctx context.Context) *testKubevirtEPSController { + var tenantClient *fake.Clientset + var infraClient *fake.Clientset + + tenantClient = fake.NewSimpleClientset() + infraClient = fake.NewSimpleClientset() + + s := runtime.NewScheme() + infraDynamic := dfake.NewSimpleDynamicClientWithCustomListKinds(s, map[schema.GroupVersionResource]string{ + schema.GroupVersionResource{ + Group: kubevirtv1.GroupVersion.Group, + Version: kubevirtv1.GroupVersion.Version, + Resource: "virtualmachineinstances", + }: "VirtualMachineInstanceList", + }) + + controller := NewKubevirtEPSController(tenantClient, infraClient, infraDynamic, "test") + + err := controller.Init() + if err != nil { + klog.Errorf("Failed to initialize kubevirtEPSController: %v", err) + klog.Fatal(err) + } + + return &testKubevirtEPSController{ + controller: controller, + tenantClient: tenantClient, + infraClient: infraClient, + infraDynamic: infraDynamic, + } +} + +func (testVals *testKubevirtEPSController) runKubevirtEPSController(ctx context.Context) { + metrics := controllers.NewControllerManagerMetrics("test") + go testVals.controller.Run(1, ctx.Done(), metrics) +} + +var _ = g.Describe("KubevirtEPSController start", g.Ordered, func() { + g.Context("With starting the controller", g.Ordered, func() { + + g.It("Should start the controller", func() { + ctx, stop := context.WithCancel(context.Background()) + defer stop() + testVals := setupTestKubevirtEPSController(ctx) + testVals.runKubevirtEPSController(ctx) + }) + }) +}) + +var ( + stop context.CancelFunc + ctx context.Context + testVals *testKubevirtEPSController +) + +var _ = g.Describe("KubevirtEPSController", g.Ordered, func() { + + g.Context("With starting the controller", g.Ordered, func() { + g.It("Should start the controller", func() { + ctx, stop = context.WithCancel(context.Background()) + defer stop() + testVals := setupTestKubevirtEPSController(ctx) + testVals.runKubevirtEPSController(ctx) + + cache.WaitForCacheSync(ctx.Done(), + testVals.controller.tenantFactory.Discovery().V1().EndpointSlices().Informer().HasSynced, + testVals.controller.infraFactory.Core().V1().Services().Informer().HasSynced) + }) + }) + + g.Context("With adding an infraService", g.Ordered, func() { + // Startup and wait for cache sync + g.BeforeEach(func() { + ctx, stop = context.WithCancel(context.Background()) + testVals = setupTestKubevirtEPSController(ctx) + testVals.runKubevirtEPSController(ctx) + + cache.WaitForCacheSync(ctx.Done(), + testVals.controller.tenantFactory.Discovery().V1().EndpointSlices().Informer().HasSynced, + testVals.controller.infraFactory.Core().V1().Services().Informer().HasSynced) + + }) + + // Stop the controller + g.AfterEach(func() { + stop() + }) + + g.It("Should reconcile a new Endpointslice on the infra cluster", func() { + // Create VMI in infra cluster + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89") + + // Create Endpoinslices in tenant cluster + createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)}) + + // Create service in infra cluster + createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + var epsList *discoveryv1.EndpointSliceList + var err error + // Check if the controller creates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + + Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13")) + }) + + g.It("Should update the Endpointslice when a tenant Endpointslice is updated", func() { + + ipAddr1 := "123.45.67.11" + ipAddr2 := "123.99.99.99" + // Create VMI in infra cluster + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", ipAddr1) + createAndAssertVMI("worker-1-test", "ip-10-32-5-15", ipAddr2) + + // Create Endpoinslices in tenant cluster + createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint(ipAddr1, "worker-0-test", true, true, false)}) + + // Create service in infra cluster + createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + // Check if the controller creates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 && + len(epsList.Items[0].Endpoints) == 1 && + *epsList.Items[0].Endpoints[0].NodeName == "ip-10-32-5-13" { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + + // Update the tenant Endpointslice + epSlice := createTenantEPSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{ + *createEndpoint(ipAddr1, "worker-0-test", true, true, false), + *createEndpoint(ipAddr2, "worker-1-test", true, true, false), + }) + _, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Update(context.TODO(), epSlice, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + + // Check if tenant Endpointslice is updated + Eventually(func() (bool, error) { + epsList, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in tenant cluster should be updated") + + // Check if the controller updates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be updated by the controller reconciler") + }) + + g.It("Should update the Endpointslice when the infra Service external traffic policy changes.", func() { + // Create VMI in infra cluster + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89") + + // Create Endpoinslices in tenant cluster + createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)}) + + // Create service in infra cluster + createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + var epsList *discoveryv1.EndpointSliceList + var err error + // Check if the controller creates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + + Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13")) + + // Update the service's external traffic policy to Cluster + svc := createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyCluster) + + _, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 0 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be deleted by the controller reconciler") + + // Update the service's external traffic policy to Local + svc = createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + _, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + }) + + g.It("Should update the Endpointslice when the infra Service labels are updated.", func() { + // Create VMI in infra cluster + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89") + + // Create Endpoinslices in tenant cluster + createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)}) + + // Create service in infra cluster + createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + var epsList *discoveryv1.EndpointSliceList + var err error + // Check if the controller creates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + + Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13")) + + // Update the service's labels + svc := createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + svc.Labels["test-label"] = "test-value" + svc.Labels["test-label-2"] = "test-value-2" + + _, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + if epsList.Items[0].Labels["test-label"] == "test-value" && epsList.Items[0].Labels["test-label-2"] == "test-value-2" { + return true, err + } + return false, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should have the two added labels") + + // Update the service's external traffic policy to Cluster + svc = createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + svc.Labels["test-label"] = "test-value" + + _, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + if epsList.Items[0].Labels["test-label"] == "test-value" && epsList.Items[0].Labels["test-label-2"] == "test-value-2" { + return true, err + } + return false, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster still has the two added labels") + }) + + g.It("Should update the Endpointslice when the infra Service port is updated.", func() { + // Create VMI in infra cluster + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89") + + // Create Endpoinslices in tenant cluster + createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)}) + + // Create service in infra cluster + createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + var epsList *discoveryv1.EndpointSliceList + var err error + // Check if the controller creates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + if *epsList.Items[0].Ports[0].Port == 30390 { + return true, err + } + return false, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + + Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13")) + + // Update the service's port + svc := createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30440}}, + v1.ServiceExternalTrafficPolicyLocal) + + _, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + if *epsList.Items[0].Ports[0].Port == 30440 { + return true, err + } + return false, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should have the two added labels") + }) + + g.It("Should delete the Endpointslice when the Service in infra is deleted", func() { + // Create VMI in infra cluster + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89") + + // Create Endpoinslices in tenant cluster + createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)}) + + // Create service in infra cluster + createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + var epsList *discoveryv1.EndpointSliceList + var err error + // Check if the controller creates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + if *epsList.Items[0].Ports[0].Port == 30390 { + return true, err + } + return false, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + + Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13")) + + // Delete the service + err = testVals.infraClient.CoreV1().Services(infraNamespace).Delete(context.TODO(), "infra-service-name", metav1.DeleteOptions{}) + Expect(err).To(BeNil()) + + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 0 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be deleted.") + }) + + g.It("Should not update the Endpointslice on the infra cluster because VMI is not present", func() { + // Create VMI in infra cluster + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89") + + // Create Endpoinslices in tenant cluster + createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)}) + + // Create service in infra cluster + createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, v1.ServiceExternalTrafficPolicyLocal) + + // Check if the controller creates the EndpointSlice in the infra cluster + Eventually(func() (bool, error) { + epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler") + + // + epSlice := createTenantEPSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{ + *createEndpoint("123.45.67.89", "worker-0-test", true, true, false), + *createEndpoint("112.34.56.78", "worker-1-test", true, true, false), + }) + + _, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Update(context.TODO(), epSlice, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + + // Check if tenant Endpointslice is updated + Eventually(func() (bool, error) { + epsList, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "EndpointSlice in tenant cluster should be updated") + + //Expect call to the infraDynamic.Get to return the VMI + Eventually(func() (bool, error) { + for _, action := range testVals.infraDynamic.Actions() { + if action.Matches("get", "virtualmachineinstances") && + action.GetNamespace() == infraNamespace { + getAction := action.(testing.GetAction) + if getAction.GetName() == "worker-1-test" { + return true, nil + } + } + } + return false, nil + }).Should(BeTrue(), "Expect call to the infraDynamic.Get to return the VMI") + + }) + }) +}) diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_utils.go b/pkg/controller/kubevirteps/kubevirteps_controller_utils.go new file mode 100644 index 000000000..0d3dbfd5e --- /dev/null +++ b/pkg/controller/kubevirteps/kubevirteps_controller_utils.go @@ -0,0 +1,98 @@ +package kubevirteps + +import ( + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" + "sync" +) + +// source: https://github.com/kubernetes/endpointslice/blob/master/utils.go#L280 +func getAddressTypesForService(service *v1.Service) sets.Set[discovery.AddressType] { + serviceSupportedAddresses := sets.New[discovery.AddressType]() + + // If + for _, family := range service.Spec.IPFamilies { + if family == v1.IPv4Protocol { + serviceSupportedAddresses.Insert(discovery.AddressTypeIPv4) + } + + if family == v1.IPv6Protocol { + serviceSupportedAddresses.Insert(discovery.AddressTypeIPv6) + } + } + + if serviceSupportedAddresses.Len() > 0 { + return serviceSupportedAddresses // we have found families for this service + } + + // If no families are found, we will use the ClusterIP to determine the address type + if len(service.Spec.ClusterIP) > 0 && service.Spec.ClusterIP != v1.ClusterIPNone { // headfull + addrType := discovery.AddressTypeIPv4 + if utilnet.IsIPv6String(service.Spec.ClusterIP) { + addrType = discovery.AddressTypeIPv6 + } + serviceSupportedAddresses.Insert(addrType) + klog.V(2).Info("Couldn't find ipfamilies for service. This could happen if controller manager is connected to an old apiserver that does not support ip families yet. EndpointSlices for this Service will use addressType as the IP Family based on familyOf(ClusterIP).", "service", klog.KObj(service), "addressType", addrType, "clusterIP", service.Spec.ClusterIP) + return serviceSupportedAddresses + } + + serviceSupportedAddresses.Insert(discovery.AddressTypeIPv4) + serviceSupportedAddresses.Insert(discovery.AddressTypeIPv6) + klog.V(2).Info("Couldn't find ipfamilies for headless service, likely because controller manager is likely connected to an old apiserver that does not support ip families yet. The service endpoint slice will use dual stack families until api-server default it correctly", "service", klog.KObj(service)) + return serviceSupportedAddresses +} + +// The tenantESPTracker is used to keep track of which EndpointSlices are being watched by the KubevirtCloudController. +// This is necessary because the KubevirtCloudController needs to watch EndpointSlices in the tenant cluster that correspond +// to Services in the infra cluster. The KubevirtCloudController needs to know which EndpointSlices to watch so that it can +// update the corresponding EndpointSlices in the infra cluster when the tenant cluster's EndpointSlices change. +type tenantEPSTracker struct { + sync.RWMutex + register []types.NamespacedName +} + +func (t *tenantEPSTracker) add(eps *discovery.EndpointSlice) { + t.Lock() + defer t.Unlock() + klog.Infof("Adding EndpointSlice %s to the tenantEPSTracker", eps.Name) + name := types.NamespacedName{ + Namespace: eps.Namespace, + Name: eps.Name, + } + t.register = append(t.register, name) +} + +func (t *tenantEPSTracker) remove(eps *discovery.EndpointSlice) { + t.Lock() + defer t.Unlock() + klog.Infof("Remove EndpointSlice %s to the tenantEPSTracker", eps.Name) + name := types.NamespacedName{ + Namespace: eps.Namespace, + Name: eps.Name, + } + for i, n := range t.register { + if n == name { + t.register = append(t.register[:i], t.register[i+1:]...) + return + } + } +} + +func (t *tenantEPSTracker) contains(eps *discovery.EndpointSlice) bool { + t.RLock() + defer t.RUnlock() + name := types.NamespacedName{ + Namespace: eps.Namespace, + Name: eps.Name, + } + for _, n := range t.register { + if n == name { + return true + } + } + return false +} diff --git a/pkg/provider/cloud.go b/pkg/provider/cloud.go index 23400c2d7..30b787847 100644 --- a/pkg/provider/cloud.go +++ b/pkg/provider/cloud.go @@ -35,7 +35,7 @@ func init() { } } -type cloud struct { +type Cloud struct { namespace string client client.Client config CloudConfig @@ -62,6 +62,11 @@ type LoadBalancerConfig struct { // Selectorless delegate endpointslices creation on third party by // skipping service selector creation Selectorless *bool `yaml:"selectorless,omitempty"` + + // EnableEPSController determines if the EPS controller is enabled + // This is a temporary flag to enable/disable the EPS controller + // When disabled the service selector is used. + EnableEPSController *bool `yaml:"enableEPSController,omitempty"` } type InstancesV2Config struct { @@ -119,7 +124,7 @@ func kubevirtCloudProviderFactory(config io.Reader) (cloudprovider.Interface, er } } else { var infraKubeConfig string - infraKubeConfig, err = getInfraKubeConfig(cloudConf.Kubeconfig) + infraKubeConfig, err = GetInfraKubeConfig(cloudConf.Kubeconfig) if err != nil { return nil, err } @@ -146,20 +151,20 @@ func kubevirtCloudProviderFactory(config io.Reader) (cloudprovider.Interface, er if err != nil { return nil, err } - return &cloud{ + return &Cloud{ namespace: namespace, client: c, config: cloudConf, }, nil } -// Initialize provides the cloud with a kubernetes client builder and may spawn goroutines -// to perform housekeeping activities within the cloud provider. -func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) { +// Initialize provides the Cloud with a kubernetes client builder and may spawn goroutines +// to perform housekeeping activities within the Cloud provider. +func (c *Cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) { } // LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise. -func (c *cloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { +func (c *Cloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { if !c.config.LoadBalancer.Enabled { return nil, false } @@ -172,11 +177,11 @@ func (c *cloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { } // Instances returns an instances interface. Also returns true if the interface is supported, false otherwise. -func (c *cloud) Instances() (cloudprovider.Instances, bool) { +func (c *Cloud) Instances() (cloudprovider.Instances, bool) { return nil, false } -func (c *cloud) InstancesV2() (cloudprovider.InstancesV2, bool) { +func (c *Cloud) InstancesV2() (cloudprovider.InstancesV2, bool) { if !c.config.InstancesV2.Enabled { return nil, false } @@ -189,31 +194,43 @@ func (c *cloud) InstancesV2() (cloudprovider.InstancesV2, bool) { // Zones returns a zones interface. Also returns true if the interface is supported, false otherwise. // DEPRECATED: Zones is deprecated in favor of retrieving zone/region information from InstancesV2. -func (c *cloud) Zones() (cloudprovider.Zones, bool) { +func (c *Cloud) Zones() (cloudprovider.Zones, bool) { return nil, false } // Clusters returns a clusters interface. Also returns true if the interface is supported, false otherwise. -func (c *cloud) Clusters() (cloudprovider.Clusters, bool) { +func (c *Cloud) Clusters() (cloudprovider.Clusters, bool) { return nil, false } // Routes returns a routes interface along with whether the interface is supported. -func (c *cloud) Routes() (cloudprovider.Routes, bool) { +func (c *Cloud) Routes() (cloudprovider.Routes, bool) { return nil, false } -// ProviderName returns the cloud provider ID. -func (c *cloud) ProviderName() string { +// ProviderName returns the Cloud provider ID. +func (c *Cloud) ProviderName() string { return ProviderName } // HasClusterID returns true if a ClusterID is required and set -func (c *cloud) HasClusterID() bool { +func (c *Cloud) HasClusterID() bool { return true } -func getInfraKubeConfig(infraKubeConfigPath string) (string, error) { +func (c *Cloud) GetInfraKubeconfig() (string, error) { + return GetInfraKubeConfig(c.config.Kubeconfig) +} + +func (c *Cloud) Namespace() string { + return c.namespace +} + +func (c *Cloud) GetCloudConfig() CloudConfig { + return c.config +} + +func GetInfraKubeConfig(infraKubeConfigPath string) (string, error) { config, err := os.Open(infraKubeConfigPath) if err != nil { return "", fmt.Errorf("Couldn't open infra-kubeconfig: %v", err) diff --git a/pkg/provider/cloud_test.go b/pkg/provider/cloud_test.go index 53d88d33d..88b5e6540 100644 --- a/pkg/provider/cloud_test.go +++ b/pkg/provider/cloud_test.go @@ -22,7 +22,7 @@ var ( invalidKubeconf = "bla" ) -func makeCloudConfig(kubeconfig, namespace string, loadbalancerEnabled, instancesEnabled bool, zoneAndRegionEnabled bool, lbCreationPollInterval int, lbCreationPollTimeout int) CloudConfig { +func makeCloudConfig(kubeconfig, namespace string, loadbalancerEnabled, instancesEnabled, zoneAndRegionEnabled bool, lbCreationPollInterval int, lbCreationPollTimeout int) CloudConfig { return CloudConfig{ Kubeconfig: kubeconfig, LoadBalancer: LoadBalancerConfig{ diff --git a/pkg/provider/loadbalancer.go b/pkg/provider/loadbalancer.go index 56cc5587b..6048ef749 100644 --- a/pkg/provider/loadbalancer.go +++ b/pkg/provider/loadbalancer.go @@ -21,6 +21,11 @@ const ( // Default timeout between polling the service after creation defaultLoadBalancerCreatePollTimeout = 5 * time.Minute + + TenantServiceNameLabelKey = "cluster.x-k8s.io/tenant-service-name" + TenantServiceNamespaceLabelKey = "cluster.x-k8s.io/tenant-service-namespace" + TenantClusterNameLabelKey = "cluster.x-k8s.io/cluster-name" + TenantNodeRoleLabelKey = "cluster.x-k8s.io/role" ) type loadbalancer struct { @@ -75,14 +80,14 @@ func (lb *loadbalancer) EnsureLoadBalancer(ctx context.Context, clusterName stri } vmiLabels := map[string]string{ - "cluster.x-k8s.io/role": "worker", - "cluster.x-k8s.io/cluster-name": clusterName, + TenantNodeRoleLabelKey: "worker", + TenantClusterNameLabelKey: clusterName, } lbLabels := map[string]string{ - "cluster.x-k8s.io/tenant-service-name": service.Name, - "cluster.x-k8s.io/tenant-service-namespace": service.Namespace, - "cluster.x-k8s.io/cluster-name": clusterName, + TenantServiceNameLabelKey: service.Name, + TenantServiceNamespaceLabelKey: service.Namespace, + TenantClusterNameLabelKey: clusterName, } for key, val := range lb.infraLabels { @@ -202,7 +207,10 @@ func (lb *loadbalancer) createLoadBalancerService(ctx context.Context, lbName st ExternalTrafficPolicy: service.Spec.ExternalTrafficPolicy, }, } - if lb.config.Selectorless == nil || !*lb.config.Selectorless { + // Give controller privilege above selectorless + if lb.config.EnableEPSController != nil && *lb.config.EnableEPSController && service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster { + lbService.Spec.Selector = vmiLabels + } else if lb.config.Selectorless == nil || !*lb.config.Selectorless { lbService.Spec.Selector = vmiLabels } if len(service.Spec.ExternalIPs) > 0 { diff --git a/pkg/provider/loadbalancer_test.go b/pkg/provider/loadbalancer_test.go index c7297b9f0..c51df803e 100644 --- a/pkg/provider/loadbalancer_test.go +++ b/pkg/provider/loadbalancer_test.go @@ -127,7 +127,7 @@ func cmpLoadBalancerStatuses(a, b *corev1.LoadBalancerStatus) bool { } func generateInfraService(tenantSvc *corev1.Service, ports []corev1.ServicePort) *corev1.Service { - return &corev1.Service{ + svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: lbServiceName, Namespace: lbServiceNamespace, @@ -142,12 +142,15 @@ func generateInfraService(tenantSvc *corev1.Service, ports []corev1.ServicePort) Type: corev1.ServiceTypeLoadBalancer, Ports: ports, ExternalTrafficPolicy: tenantSvc.Spec.ExternalTrafficPolicy, - Selector: map[string]string{ - "cluster.x-k8s.io/role": "worker", - "cluster.x-k8s.io/cluster-name": clusterName, - }, }, } + if tenantSvc.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyLocal { + svc.Spec.Selector = map[string]string{ + TenantNodeRoleLabelKey: "worker", + TenantClusterNameLabelKey: clusterName, + } + } + return svc } var _ = Describe("LoadBalancer", func() { @@ -278,6 +281,56 @@ var _ = Describe("LoadBalancer", func() { }) + It("Should create a loadbalancer without selectors when ExternalTrafficPolicy is local and eps controller is enabled", func() { + checkSvcExistErr := notFoundErr + getCount := 3 + + tenantService.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal + lb.config.EnableEPSController = pointer.Bool(true) + lb.config.Selectorless = pointer.Bool(true) + + c.EXPECT(). + Get(ctx, client.ObjectKey{Name: "af6ebf1722bb111e9b210d663bd873d9", Namespace: "test"}, gomock.AssignableToTypeOf(&corev1.Service{})). + Return(checkSvcExistErr) + + infraService1 := generateInfraService( + tenantService, + []corev1.ServicePort{ + {Name: "port1", Protocol: corev1.ProtocolTCP, Port: 80, TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 30001}}, + }, + ) + infraService1.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal + + c.EXPECT().Create(ctx, infraService1) + + for i := 0; i < getCount; i++ { + infraService2 := infraService1.DeepCopy() + if i == getCount-1 { + infraService2.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + { + IP: loadBalancerIP, + }, + }, + }, + } + } + c.EXPECT().Get( + ctx, + client.ObjectKey{Name: "af6ebf1722bb111e9b210d663bd873d9", Namespace: "test"}, + gomock.AssignableToTypeOf(&corev1.Service{}), + ).Do(func(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) { + infraService2.DeepCopyInto(obj.(*corev1.Service)) + }) + } + + lbStatus, err := lb.EnsureLoadBalancer(ctx, clusterName, tenantService, nodes) + Expect(err).To(BeNil()) + Expect(len(lbStatus.Ingress)).Should(Equal(1)) + Expect(lbStatus.Ingress[0].IP).Should(Equal(loadBalancerIP)) + }) + It("Should create new Service and poll LoadBalancer service 1 time", func() { checkSvcExistErr := notFoundErr getCount := 1 From ec7e45422b0fdbb467af61b9868e5470915e6ae0 Mon Sep 17 00:00:00 2001 From: Andrei Kvapil Date: Thu, 26 Sep 2024 16:12:16 +0200 Subject: [PATCH 2/3] Intoduce endpointslice controller Signed-off-by: Andrei Kvapil --- .../kubevirteps.go | 53 +++++++++++-------- cmd/kubevirt-cloud-controller-manager/main.go | 3 +- .../kubevirteps/kubevirteps_controller.go | 13 ++--- .../kubevirteps_controller_test.go | 10 ++++ pkg/provider/loadbalancer.go | 8 +-- 5 files changed, 55 insertions(+), 32 deletions(-) diff --git a/cmd/kubevirt-cloud-controller-manager/kubevirteps.go b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go index e6b41fbb0..f8a5999b9 100644 --- a/cmd/kubevirt-cloud-controller-manager/kubevirteps.go +++ b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go @@ -3,8 +3,10 @@ package main import ( "context" "fmt" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" cloudprovider "k8s.io/cloud-provider" "k8s.io/cloud-provider/app" @@ -37,7 +39,7 @@ func startKubevirtCloudController( return nil, false, err } - if !*kubevirtCloud.GetCloudConfig().LoadBalancer.EnableEPSController { + if kubevirtCloud.GetCloudConfig().LoadBalancer.EnableEPSController == nil || !*kubevirtCloud.GetCloudConfig().LoadBalancer.EnableEPSController { klog.Infof(fmt.Sprintf("%s is not enabled.", kubevirteps.ControllerName)) return nil, false, nil } @@ -54,37 +56,46 @@ func startKubevirtCloudController( klog.Infof("Setting up infra client.") // This is the kubeconfig for the infra cluster - infraKubeConfig, err := kubevirtCloud.GetInfraKubeconfig() - if err != nil { - klog.Errorf("Failed to get infra kubeconfig: %v", err) - return nil, false, err - } - - var infraClientConfig clientcmd.ClientConfig - infraClientConfig, err = clientcmd.NewClientConfigFromBytes([]byte(infraKubeConfig)) - if err != nil { - klog.Errorf("Failed to create client config for infra cluster: %v", err) - return nil, false, err - } - - infraConfig, err := infraClientConfig.ClientConfig() - if err != nil { - klog.Errorf("Failed to create client config for infra cluster: %v", err) - return nil, false, err + var restConfig *rest.Config + + if kubevirtCloud.GetCloudConfig().Kubeconfig == "" { + restConfig, err = rest.InClusterConfig() + if err != nil { + klog.Errorf("Failed to get in-cluster config: %v", err) + return nil, false, err + } + } else { + var infraKubeConfig string + infraKubeConfig, err = kubevirtCloud.GetInfraKubeconfig() + if err != nil { + klog.Errorf("Failed to get infra kubeconfig: %v", err) + return nil, false, err + } + var clientConfig clientcmd.ClientConfig + clientConfig, err = clientcmd.NewClientConfigFromBytes([]byte(infraKubeConfig)) + if err != nil { + klog.Errorf("Failed to create client config from infra kubeconfig: %v", err) + return nil, false, err + } + restConfig, err = clientConfig.ClientConfig() + if err != nil { + klog.Errorf("Failed to create rest config for infra cluster: %v", err) + return nil, false, err + } } var infraClient kubernetes.Interface // create new client for the infra cluster - infraClient, err = kubernetes.NewForConfig(infraConfig) + infraClient, err = kubernetes.NewForConfig(restConfig) if err != nil { - klog.Errorf("Failed to create client for infra cluster: %v", err) + klog.Errorf("Failed to create infra cluster client: %v", err) return nil, false, err } var infraDynamic dynamic.Interface - infraDynamic, err = dynamic.NewForConfig(infraConfig) + infraDynamic, err = dynamic.NewForConfig(restConfig) if err != nil { klog.Errorf("Failed to create dynamic client for infra cluster: %v", err) return nil, false, err diff --git a/cmd/kubevirt-cloud-controller-manager/main.go b/cmd/kubevirt-cloud-controller-manager/main.go index 564b52007..3f91c4cd0 100644 --- a/cmd/kubevirt-cloud-controller-manager/main.go +++ b/cmd/kubevirt-cloud-controller-manager/main.go @@ -45,14 +45,13 @@ func main() { fss := cliflag.NamedFlagSets{} controllerInitializers := app.DefaultInitFuncConstructors - controllerAliases := map[string]string{"kubevirt-eps": kubevirteps.ControllerName.String()} // add kubevirt-cloud-controller to the list of controllers controllerInitializers[kubevirteps.ControllerName.String()] = app.ControllerInitFuncConstructor{ Constructor: StartKubevirtCloudControllerWrapper, } - command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, controllerAliases, fss, wait.NeverStop) + command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, fss, wait.NeverStop) code := cli.Run(command) os.Exit(code) } diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go index 9366d7c26..4dabf238b 100644 --- a/pkg/controller/kubevirteps/kubevirteps_controller.go +++ b/pkg/controller/kubevirteps/kubevirteps_controller.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "strings" + "time" + v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -25,8 +28,6 @@ import ( "k8s.io/klog/v2" kubevirtv1 "kubevirt.io/api/core/v1" kubevirt "kubevirt.io/cloud-provider-kubevirt/pkg/provider" - "strings" - "time" ) const ( @@ -312,7 +313,7 @@ func (c *Controller) getInfraEPSFromInfraService(ctx context.Context, svc *v1.Se func (c *Controller) reconcile(ctx context.Context, r *Request) error { service, ok := r.Obj.(*v1.Service) - if !ok { + if !ok || service == nil { return errors.New("could not cast object to service") } @@ -350,8 +351,8 @@ func (c *Controller) reconcile(ctx context.Context, r *Request) error { // If the services switched to a different address type, we need to delete the old ones, because it's immutable. // If the services switched to a different externalTrafficPolicy, we need to delete the old ones. for _, eps := range infraExistingEpSlices { - if service.Spec.ExternalTrafficPolicy != v1.ServiceExternalTrafficPolicyTypeLocal || serviceDeleted { - klog.Infof("Added for deletion EndpointSlice %s in namespace %s because it has an externalTrafficPolicy different from Local", eps.Name, eps.Namespace) + if service.Spec.Selector != nil || serviceDeleted { + klog.Infof("Added for deletion EndpointSlice %s in namespace %s because it has a selector", eps.Name, eps.Namespace) // to be sure we don't delete any slice that is not managed by us if c.managedByController(eps) { slicesToDelete = append(slicesToDelete, eps) @@ -595,7 +596,7 @@ func (c *Controller) newSlice(service *v1.Service, desiredPorts []discovery.Endp func (c *Controller) getDesiredEndpoints(service *v1.Service, tenantSlices []*discovery.EndpointSlice) []*discovery.Endpoint { var desiredEndpoints []*discovery.Endpoint - if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { + if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal || service.Spec.Selector == nil { // Extract the desired endpoints from the tenant EndpointSlices // for extracting the nodes it does not matter what type of address we are dealing with // all nodes with an endpoint for a corresponding slice will be selected. diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_test.go index 7e645a0bc..23b38d369 100644 --- a/pkg/controller/kubevirteps/kubevirteps_controller_test.go +++ b/pkg/controller/kubevirteps/kubevirteps_controller_test.go @@ -2,6 +2,7 @@ package kubevirteps import ( "context" + g "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" @@ -34,6 +35,14 @@ type testKubevirtEPSController struct { } func createInfraServiceLB(name, tenantServiceName, clusterName string, servicePort v1.ServicePort, externalTrafficPolicy v1.ServiceExternalTrafficPolicy) *v1.Service { + var selector map[string]string + if externalTrafficPolicy == v1.ServiceExternalTrafficPolicyCluster { + selector = map[string]string{ + "cluster.x-k8s.io/role": "worker", + "cluster.x-k8s.io/cluster-name": clusterName, + } + } + return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -50,6 +59,7 @@ func createInfraServiceLB(name, tenantServiceName, clusterName string, servicePo }, Type: v1.ServiceTypeLoadBalancer, ExternalTrafficPolicy: externalTrafficPolicy, + Selector: selector, IPFamilies: []v1.IPFamily{ v1.IPv4Protocol, }, diff --git a/pkg/provider/loadbalancer.go b/pkg/provider/loadbalancer.go index 6048ef749..4ae521a4a 100644 --- a/pkg/provider/loadbalancer.go +++ b/pkg/provider/loadbalancer.go @@ -208,9 +208,11 @@ func (lb *loadbalancer) createLoadBalancerService(ctx context.Context, lbName st }, } // Give controller privilege above selectorless - if lb.config.EnableEPSController != nil && *lb.config.EnableEPSController && service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster { - lbService.Spec.Selector = vmiLabels - } else if lb.config.Selectorless == nil || !*lb.config.Selectorless { + if lb.config.EnableEPSController != nil && *lb.config.EnableEPSController && service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + lbService.Spec.Selector = nil + } else if lb.config.Selectorless != nil && *lb.config.Selectorless { + lbService.Spec.Selector = nil + } else { lbService.Spec.Selector = vmiLabels } if len(service.Spec.ExternalIPs) > 0 { From d550079c4e6c0f508855215edf10e9b8670c461d Mon Sep 17 00:00:00 2001 From: Andrei Kvapil Date: Mon, 7 Oct 2024 16:21:10 +0200 Subject: [PATCH 3/3] Fix lint Signed-off-by: Andrei Kvapil --- cmd/kubevirt-cloud-controller-manager/kubevirteps.go | 4 +--- cmd/kubevirt-cloud-controller-manager/main.go | 4 +++- pkg/controller/kubevirteps/kubevirteps_controller.go | 9 ++++++--- .../kubevirteps/kubevirteps_controller_test.go | 11 ++++++----- .../kubevirteps/kubevirteps_controller_utils.go | 3 ++- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/cmd/kubevirt-cloud-controller-manager/kubevirteps.go b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go index f8a5999b9..74166b5d9 100644 --- a/cmd/kubevirt-cloud-controller-manager/kubevirteps.go +++ b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go @@ -20,13 +20,11 @@ import ( func StartKubevirtCloudControllerWrapper(initContext app.ControllerInitContext, completedConfig *config.CompletedConfig, cloud cloudprovider.Interface) app.InitFunc { return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) { - return startKubevirtCloudController(ctx, initContext, controllerContext, completedConfig, cloud) + return startKubevirtCloudController(controllerContext, completedConfig, cloud) } } func startKubevirtCloudController( - ctx context.Context, - initContext app.ControllerInitContext, controllerContext genericcontrollermanager.ControllerContext, ccmConfig *config.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) { diff --git a/cmd/kubevirt-cloud-controller-manager/main.go b/cmd/kubevirt-cloud-controller-manager/main.go index 3f91c4cd0..22efe0f4b 100644 --- a/cmd/kubevirt-cloud-controller-manager/main.go +++ b/cmd/kubevirt-cloud-controller-manager/main.go @@ -35,6 +35,8 @@ import ( _ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins _ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration "k8s.io/klog/v2" + + _ "kubevirt.io/cloud-provider-kubevirt/pkg/provider" ) func main() { @@ -51,7 +53,7 @@ func main() { Constructor: StartKubevirtCloudControllerWrapper, } - command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, fss, wait.NeverStop) + command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, map[string]string{}, fss, wait.NeverStop) code := cli.Run(command) os.Exit(code) } diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go index 4dabf238b..a3c1aa338 100644 --- a/pkg/controller/kubevirteps/kubevirteps_controller.go +++ b/pkg/controller/kubevirteps/kubevirteps_controller.go @@ -150,7 +150,8 @@ func (c *Controller) Init() error { eps := obj.(*discovery.EndpointSlice) if c.tenantEPSTracker.contains(eps) { klog.Infof("get Infra Service for Tenant EndpointSlice: %v/%v", eps.Namespace, eps.Name) - infraSvc, err := c.getInfraServiceFromTenantEPS(context.TODO(), eps) + var infraSvc *v1.Service + infraSvc, err = c.getInfraServiceFromTenantEPS(context.TODO(), eps) if err != nil { klog.Errorf("Failed to get Service in Infra cluster for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, err) return @@ -163,7 +164,8 @@ func (c *Controller) Init() error { eps := newObj.(*discovery.EndpointSlice) if c.tenantEPSTracker.contains(eps) { klog.Infof("get Infra Service for Tenant EndpointSlice: %v/%v", eps.Namespace, eps.Name) - infraSvc, err := c.getInfraServiceFromTenantEPS(context.TODO(), eps) + var infraSvc *v1.Service + infraSvc, err = c.getInfraServiceFromTenantEPS(context.TODO(), eps) if err != nil { klog.Errorf("Failed to get Service in Infra cluster for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, err) return @@ -177,7 +179,8 @@ func (c *Controller) Init() error { if c.tenantEPSTracker.contains(eps) { c.tenantEPSTracker.remove(eps) klog.Infof("get Infra Service for Tenant EndpointSlice: %v/%v", eps.Namespace, eps.Name) - infraSvc, err := c.getInfraServiceFromTenantEPS(context.TODO(), eps) + var infraSvc *v1.Service + infraSvc, err = c.getInfraServiceFromTenantEPS(context.TODO(), eps) if err != nil { klog.Errorf("Failed to get Service in Infra cluster for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, err) return diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_test.go index 23b38d369..99664b903 100644 --- a/pkg/controller/kubevirteps/kubevirteps_controller_test.go +++ b/pkg/controller/kubevirteps/kubevirteps_controller_test.go @@ -1,3 +1,4 @@ +//nolint:unparam package kubevirteps import ( @@ -172,7 +173,7 @@ func createAndAssertInfraServiceLB(name, tenantServiceName, clusterName string, }).Should(BeTrue(), "") } -func setupTestKubevirtEPSController(ctx context.Context) *testKubevirtEPSController { +func setupTestKubevirtEPSController() *testKubevirtEPSController { var tenantClient *fake.Clientset var infraClient *fake.Clientset @@ -181,7 +182,7 @@ func setupTestKubevirtEPSController(ctx context.Context) *testKubevirtEPSControl s := runtime.NewScheme() infraDynamic := dfake.NewSimpleDynamicClientWithCustomListKinds(s, map[schema.GroupVersionResource]string{ - schema.GroupVersionResource{ + { Group: kubevirtv1.GroupVersion.Group, Version: kubevirtv1.GroupVersion.Version, Resource: "virtualmachineinstances", @@ -215,7 +216,7 @@ var _ = g.Describe("KubevirtEPSController start", g.Ordered, func() { g.It("Should start the controller", func() { ctx, stop := context.WithCancel(context.Background()) defer stop() - testVals := setupTestKubevirtEPSController(ctx) + testVals = setupTestKubevirtEPSController() testVals.runKubevirtEPSController(ctx) }) }) @@ -233,7 +234,7 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() { g.It("Should start the controller", func() { ctx, stop = context.WithCancel(context.Background()) defer stop() - testVals := setupTestKubevirtEPSController(ctx) + testVals = setupTestKubevirtEPSController() testVals.runKubevirtEPSController(ctx) cache.WaitForCacheSync(ctx.Done(), @@ -246,7 +247,7 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() { // Startup and wait for cache sync g.BeforeEach(func() { ctx, stop = context.WithCancel(context.Background()) - testVals = setupTestKubevirtEPSController(ctx) + testVals = setupTestKubevirtEPSController() testVals.runKubevirtEPSController(ctx) cache.WaitForCacheSync(ctx.Done(), diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_utils.go b/pkg/controller/kubevirteps/kubevirteps_controller_utils.go index 0d3dbfd5e..385128521 100644 --- a/pkg/controller/kubevirteps/kubevirteps_controller_utils.go +++ b/pkg/controller/kubevirteps/kubevirteps_controller_utils.go @@ -1,13 +1,14 @@ package kubevirteps import ( + "sync" + v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" - "sync" ) // source: https://github.com/kubernetes/endpointslice/blob/master/utils.go#L280