From efbfebf384417df71e46030cd549249b19021971 Mon Sep 17 00:00:00 2001 From: Yanjun Zhou Date: Fri, 27 Dec 2024 15:22:40 +0800 Subject: [PATCH] Refactor Subnet lock for SubnetPort creation (#974) Instead of using lock for Subnet, this PR adds a cache in SubnetPort service to record the number of SubnetPorts on each Subnet. Read/Write operation for this cache is protected by lock to avoid race condition and ensure the number of SubnetPort on the Subnet will be controlled below the Subnet capacity. Testing done: - Create 16 Pods on default SubnetSet with size 16 (capacity 12), got all Pods ready; delete all Pods and observe the Subnets under the SubnetSet will be deleted by GC. - Create 16 VMs on DHCPServer SubnetSet with size 16 (capacity 12), all Vms can get IP; delete all VMs and observe the Subnet under the SubnetSet will be deleted by GC. - Create 16 VMs on DHCPServer Subnet with size 16 (capacity 12), only 12 VMs can PowerOn and get IP. The remainings will have `VirtualMachineNetworkReady` as `NotReady` and Message like `network interface "eth0" error: subnetPort is not ready: SubnetPortNotReady - error occurred while processing the SubnetPort CR. Error: no valid IP in Subnet /orgs/default/projects/project-quality/vpcs/ns-1_7fcad11c-da38-4137-96ec-f0f80ae8d96b/subnets/subnet-test-1_60ebfa36-bbe2-4e6e-85df-8228be4e190a` Signed-off-by: Yanjun Zhou --- pkg/controllers/common/utils.go | 26 +++--- pkg/controllers/common/utils_test.go | 2 +- pkg/controllers/pod/pod_controller.go | 20 ++-- pkg/controllers/pod/pod_controller_test.go | 26 +++--- pkg/controllers/subnet/subnet_controller.go | 1 + .../subnet/subnet_controller_test.go | 28 ++++-- .../subnetport/subnetport_controller.go | 19 ++-- .../subnetport/subnetport_controller_test.go | 29 ++++-- .../subnetset/subnetset_controller.go | 42 ++++----- .../subnetset/subnetset_controller_test.go | 11 +-- pkg/mock/services_mock.go | 24 +++-- pkg/nsx/services/common/services.go | 11 +-- pkg/nsx/services/subnet/store.go | 38 -------- pkg/nsx/services/subnet/subnet.go | 66 ++++---------- pkg/nsx/services/subnet/subnet_test.go | 14 +-- pkg/nsx/services/subnetport/store.go | 12 +++ pkg/nsx/services/subnetport/subnetport.go | 91 ++++++++++++++++--- .../services/subnetport/subnetport_test.go | 18 ++++ 18 files changed, 261 insertions(+), 217 deletions(-) diff --git a/pkg/controllers/common/utils.go b/pkg/controllers/common/utils.go index f9a1a93a0..d7ccb93f9 100644 --- a/pkg/controllers/common/utils.go +++ b/pkg/controllers/common/utils.go @@ -19,7 +19,6 @@ import ( "github.com/vmware-tanzu/nsx-operator/pkg/logger" "github.com/vmware-tanzu/nsx-operator/pkg/metrics" servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" - "github.com/vmware-tanzu/nsx-operator/pkg/util" ) var ( @@ -29,19 +28,11 @@ var ( func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servicecommon.VPCServiceProvider, subnetService servicecommon.SubnetServiceProvider, subnetPortService servicecommon.SubnetPortServiceProvider) (string, error) { // Use SubnetSet uuid lock to make sure when multiple ports are created on the same SubnetSet, only one Subnet will be created - subnetSetLock := lockSubnetSet(subnetSet.GetUID()) - defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock) + subnetSetLock := LockSubnetSet(subnetSet.GetUID()) + defer UnlockSubnetSet(subnetSet.GetUID(), subnetSetLock) subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID())) for _, nsxSubnet := range subnetList { - portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id)) - totalIP := int(*nsxSubnet.Ipv4SubnetSize) - if len(nsxSubnet.IpAddresses) > 0 { - // totalIP will be overrided if IpAddresses are specified. - totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses) - } - // NSX reserves 4 ip addresses in each subnet for network address, gateway address, - // dhcp server address and broadcast address. - if portNums < totalIP-4 { + if subnetPortService.AllocatePortFromSubnet(nsxSubnet) { return *nsxSubnet.Path, nil } } @@ -56,7 +47,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi log.Error(err, "Failed to allocate Subnet") return "", err } - return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags) + nsxSubnet, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags) + if err != nil { + return "", err + } + subnetPortService.AllocatePortFromSubnet(nsxSubnet) + return *nsxSubnet.Path, nil } func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) { @@ -241,7 +237,7 @@ func NewStatusUpdater(client k8sclient.Client, nsxConfig *config.NSXOperatorConf } } -func lockSubnetSet(uuid types.UID) *sync.Mutex { +func LockSubnetSet(uuid types.UID) *sync.Mutex { lock := sync.Mutex{} subnetSetLock, _ := SubnetSetLocks.LoadOrStore(uuid, &lock) log.V(1).Info("Lock SubnetSet", "uuid", uuid) @@ -249,7 +245,7 @@ func lockSubnetSet(uuid types.UID) *sync.Mutex { return subnetSetLock.(*sync.Mutex) } -func unlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) { +func UnlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) { if subnetSetLock != nil { log.V(1).Info("Unlock SubnetSet", "uuid", uuid) subnetSetLock.Unlock() diff --git a/pkg/controllers/common/utils_test.go b/pkg/controllers/common/utils_test.go index e4765eefd..195694f46 100644 --- a/pkg/controllers/common/utils_test.go +++ b/pkg/controllers/common/utils_test.go @@ -120,7 +120,7 @@ func TestAllocateSubnetFromSubnetSet(t *testing.T) { Return([]*model.VpcSubnet{}) ssp.(*pkg_mock.MockSubnetServiceProvider).On("GenerateSubnetNSTags", mock.Anything) vsp.(*pkg_mock.MockVPCServiceProvider).On("ListVPCInfo", mock.Anything).Return([]servicecommon.VPCResourceInfo{{}}) - ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedSubnetPath, nil) + ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.VpcSubnet{Path: &expectedSubnetPath}, nil) }, expectedResult: expectedSubnetPath, }, diff --git a/pkg/controllers/pod/pod_controller.go b/pkg/controllers/pod/pod_controller.go index 3ce5035f8..feb301372 100644 --- a/pkg/controllers/pod/pod_controller.go +++ b/pkg/controllers/pod/pod_controller.go @@ -74,11 +74,14 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R if !podIsDeleted(pod) { r.StatusUpdater.IncreaseUpdateTotal() - nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod) + isExisting, nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod) if err != nil { log.Error(err, "failed to get NSX resource path from subnet", "pod.Name", pod.Name, "pod.UID", pod.UID) return common.ResultRequeue, err } + if !isExisting { + defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath) + } log.Info("got NSX subnet for pod", "NSX subnet path", nsxSubnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID) node, err := r.GetNodeByName(pod.Spec.NodeName) if err != nil { @@ -87,11 +90,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return common.ResultRequeue, err } contextID := *node.UniqueId - // There is a race condition that the subnetset controller may delete the - // subnet during CollectGarbage. So check the subnet under lock. - lock := r.SubnetService.RLockSubnet(&nsxSubnetPath) - defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock) - nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath) if err != nil { return common.ResultRequeue, err @@ -200,24 +198,24 @@ func (r *PodReconciler) CollectGarbage(ctx context.Context) { } } -func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (string, error) { +func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (bool, string, error) { subnetPortIDForPod := r.SubnetPortService.BuildSubnetPortId(&pod.ObjectMeta) subnetPath := r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortIDForPod) if len(subnetPath) > 0 { log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "pod.UID", pod.UID, "subnetPath", subnetPath) - return subnetPath, nil + return true, subnetPath, nil } subnetSet, err := common.GetDefaultSubnetSet(r.SubnetPortService.Client, ctx, pod.Namespace, servicecommon.LabelDefaultPodSubnetSet) if err != nil { - return "", err + return false, "", err } log.Info("got default subnetset for pod, allocating the NSX subnet", "subnetSet.Name", subnetSet.Name, "subnetSet.UID", subnetSet.UID, "pod.Name", pod.Name, "pod.UID", pod.UID) subnetPath, err = common.AllocateSubnetFromSubnetSet(subnetSet, r.VPCService, r.SubnetService, r.SubnetPortService) if err != nil { - return subnetPath, err + return false, subnetPath, err } log.Info("allocated NSX subnet for pod", "nsxSubnetPath", subnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID) - return subnetPath, nil + return false, subnetPath, nil } func podIsDeleted(pod *v1.Pod) bool { diff --git a/pkg/controllers/pod/pod_controller_test.go b/pkg/controllers/pod/pod_controller_test.go index 731b0dfe4..a885e312f 100644 --- a/pkg/controllers/pod/pod_controller_test.go +++ b/pkg/controllers/pod/pod_controller_test.go @@ -52,6 +52,7 @@ func TestPodReconciler_Reconcile(t *testing.T) { }, }, }, + SubnetPortStore: &subnetport.SubnetPortStore{}, }, SubnetService: &subnet.SubnetService{ SubnetStore: &subnet.SubnetStore{}, @@ -99,8 +100,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patchesGetSubnetPathForPod := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "", errors.New("failed to get subnet path") + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "", errors.New("failed to get subnet path") }) return patchesGetSubnetPathForPod }, @@ -124,8 +125,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -145,8 +146,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -170,8 +171,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -199,8 +200,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -386,6 +387,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) { prepareFunc func(*testing.T, *PodReconciler) *gomonkey.Patches expectedErr string expectedSubnetPath string + expectedIsExisting bool }{ { name: "SubnetExisted", @@ -397,6 +399,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) { return patches }, expectedSubnetPath: subnetPath, + expectedIsExisting: true, }, { name: "NoGetDefaultSubnetSet", @@ -466,7 +469,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) { t.Run(tt.name, func(t *testing.T) { patches := tt.prepareFunc(t, r) defer patches.Reset() - path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{ + isExisting, path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod-1", Namespace: "ns-1", @@ -477,6 +480,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) { } else { assert.Nil(t, err) assert.Equal(t, subnetPath, path) + assert.Equal(t, tt.expectedIsExisting, isExisting) } }) } diff --git a/pkg/controllers/subnet/subnet_controller.go b/pkg/controllers/subnet/subnet_controller.go index b4fe1b0fa..5d21896b9 100644 --- a/pkg/controllers/subnet/subnet_controller.go +++ b/pkg/controllers/subnet/subnet_controller.go @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error { log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id) return err } + r.SubnetPortService.DeletePortCount(*nsxSubnet.Path) log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id) } log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets)) diff --git a/pkg/controllers/subnet/subnet_controller_test.go b/pkg/controllers/subnet/subnet_controller_test.go index 186ec7b0e..632b6d8f4 100644 --- a/pkg/controllers/subnet/subnet_controller_test.go +++ b/pkg/controllers/subnet/subnet_controller_test.go @@ -53,9 +53,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) { tags2 := []model.Tag{{Scope: common.String(common.TagScopeSubnetCRUID), Tag: common.String("fake-id2")}} var nsxSubnets []*model.VpcSubnet id1 := "fake-id1" - nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1}) + nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1, Path: common.String("fake-path")}) id2 := "fake-id2" - nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2}) + nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2, Path: common.String("fake-path")}) return nsxSubnets }) patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) { @@ -64,6 +64,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) { patch.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patch }, }, @@ -272,6 +275,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patches }, expectRes: ResultNormal, @@ -343,6 +349,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patches }, expectRes: ResultRequeue, @@ -418,6 +427,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patches }, expectRes: ResultNormal, @@ -488,8 +500,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { {OrgID: "org-id", ProjectID: "project-id", VPCID: "vpc-id", ID: "fake-id"}, } }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { - return "", errors.New("create or update failed") + patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { + return nil, errors.New("create or update failed") }) return patches }, @@ -521,8 +533,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { } }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { - return "", nil + patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { + return nil, nil }) patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) { @@ -575,8 +587,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { } }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { - return "", nil + patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { + return nil, nil }) patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) { diff --git a/pkg/controllers/subnetport/subnetport_controller.go b/pkg/controllers/subnetport/subnetport_controller.go index 80cac49be..e15998402 100644 --- a/pkg/controllers/subnetport/subnetport_controller.go +++ b/pkg/controllers/subnetport/subnetport_controller.go @@ -94,7 +94,7 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.StatusUpdater.IncreaseUpdateTotal() old_status := subnetPort.Status.DeepCopy() - isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort) + isExisting, isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort) if isParentResourceTerminating { err = errors.New("parent resource is terminating, SubnetPort cannot be created") r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService) @@ -104,15 +104,14 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get NSX resource path from Subnet", setSubnetPortReadyStatusFalse, r.SubnetPortService) return common.ResultRequeue, err } + if !isExisting { + defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath) + } labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort) if err != nil { r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService) return common.ResultRequeue, err } - // There is a race condition that the subnetset controller may delete the - // subnet during CollectGarbage. So check the subnet under lock. - lock := r.SubnetService.RLockSubnet(&nsxSubnetPath) - defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock) nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath) if err != nil { @@ -426,7 +425,7 @@ func getExistingConditionOfType(conditionType v1alpha1.ConditionType, existingCo return nil } -func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (isStale bool, subnetPath string, err error) { +func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (existing bool, isStale bool, subnetPath string, err error) { subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta) subnetPath = r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortID) if len(subnetPath) > 0 { @@ -442,6 +441,7 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co } } else { log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "subnetPort.UID", subnetPort.UID, "subnetPath", subnetPath) + existing = true return } } @@ -469,7 +469,12 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList) return } - subnetPath = *subnetList[0].Path + nsxSubnet := subnetList[0] + if !r.SubnetPortService.AllocatePortFromSubnet(nsxSubnet) { + err = fmt.Errorf("no valid IP in Subnet %s", *nsxSubnet.Path) + return + } + subnetPath = *nsxSubnet.Path } else if len(subnetPort.Spec.SubnetSet) > 0 { subnetSet := &v1alpha1.SubnetSet{} namespacedName := types.NamespacedName{ diff --git a/pkg/controllers/subnetport/subnetport_controller_test.go b/pkg/controllers/subnetport/subnetport_controller_test.go index ec69d98ea..3d37392cf 100644 --- a/pkg/controllers/subnetport/subnetport_controller_test.go +++ b/pkg/controllers/subnetport/subnetport_controller_test.go @@ -138,8 +138,8 @@ func TestSubnetPortReconciler_Reconcile(t *testing.T) { sp := &v1alpha1.SubnetPort{} err = errors.New("CheckAndGetSubnetPathForSubnetPort failed") patchesCheckAndGetSubnetPathForSubnetPort := gomonkey.ApplyFunc((*SubnetPortReconciler).CheckAndGetSubnetPathForSubnetPort, - func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, string, error) { - return false, "", err + func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, bool, string, error) { + return false, false, "", err }) defer patchesCheckAndGetSubnetPathForSubnetPort.Reset() patchesGetByKey := gomonkey.ApplyFunc((*subnetport.SubnetPortStore).GetByKey, @@ -160,8 +160,8 @@ func TestSubnetPortReconciler_Reconcile(t *testing.T) { // getLabelsFromVirtualMachine fails err = errors.New("getLabelsFromVirtualMachine failed") patchesCheckAndGetSubnetPathForSubnetPort = gomonkey.ApplyFunc((*SubnetPortReconciler).CheckAndGetSubnetPathForSubnetPort, - func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, string, error) { - return false, "", nil + func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, bool, string, error) { + return true, false, "", nil }) defer patchesCheckAndGetSubnetPathForSubnetPort.Reset() patchesGetLabelsFromVirtualMachine := gomonkey.ApplyFunc((*SubnetPortReconciler).getLabelsFromVirtualMachine, @@ -573,9 +573,11 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { k8sClient := mock_client.NewMockClient(mockCtl) defer mockCtl.Finish() r := &SubnetPortReconciler{ - Client: k8sClient, - SubnetPortService: &subnetport.SubnetPortService{}, - SubnetService: &subnet.SubnetService{}, + Client: k8sClient, + SubnetPortService: &subnetport.SubnetPortService{ + SubnetPortStore: &subnetport.SubnetPortStore{}, + }, + SubnetService: &subnet.SubnetService{}, } tests := []struct { @@ -584,6 +586,7 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { expectedIsStale bool expectedErr string expectedSubnetPath string + expectedIsExisting bool subnetport *v1alpha1.SubnetPort }{ { @@ -606,6 +609,7 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { Namespace: "ns-1", }, }, + expectedIsExisting: true, }, { name: "FailedToDeleteSubnetPort", @@ -725,9 +729,15 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { patches.ApplyFunc((*subnet.SubnetService).GetSubnetsByIndex, func(s *subnet.SubnetService, key string, value string) []*model.VpcSubnet { return []*model.VpcSubnet{{ - Path: servicecommon.String("subnet-path-1"), + Path: servicecommon.String("subnet-path-1"), + Ipv4SubnetSize: servicecommon.Int64(16), + Id: servicecommon.String("subnet-1"), }} }) + patches.ApplyFunc((*subnetport.SubnetPortService).AllocatePortFromSubnet, + func(s *subnetport.SubnetPortService, nsxSubnet *model.VpcSubnet) bool { + return true + }) return patches }, expectedSubnetPath: "subnet-path-1", @@ -878,8 +888,9 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { ctx := context.TODO() patches := tt.prepareFunc(t, r) defer patches.Reset() - isStale, subnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, tt.subnetport) + isExisting, isStale, subnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, tt.subnetport) assert.Equal(t, tt.expectedIsStale, isStale) + assert.Equal(t, tt.expectedIsExisting, isExisting) if tt.expectedErr != "" { assert.Contains(t, err.Error(), tt.expectedErr) } else { diff --git a/pkg/controllers/subnetset/subnetset_controller.go b/pkg/controllers/subnetset/subnetset_controller.go index 3ae5dec4a..2efb9d230 100644 --- a/pkg/controllers/subnetset/subnetset_controller.go +++ b/pkg/controllers/subnetset/subnetset_controller.go @@ -346,12 +346,14 @@ func (r *SubnetSetReconciler) deleteSubnetBySubnetSetName(ctx context.Context, s } func (r *SubnetSetReconciler) deleteSubnetForSubnetSet(subnetSet v1alpha1.SubnetSet, updateStatus, ignoreStaleSubnetPort bool) error { + subnetSetLock := common.LockSubnetSet(subnetSet.GetUID()) nsxSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID())) // If ignoreStaleSubnetPort is true, we will actively delete the existing SubnetConnectionBindingMaps connected to the // corresponding NSX Subnet. This happens in the GC case to scale-in the NSX Subnet if no SubnetPort exists. // For SubnetSet CR deletion event, we don't delete the existing SubnetConnectionBindingMaps but let the // SubnetConnectionBindingMap controller do it after the binding CR is removed. hasStaleSubnetPort, deleteErr := r.deleteSubnets(nsxSubnets, ignoreStaleSubnetPort) + common.UnlockSubnetSet(subnetSet.GetUID(), subnetSetLock) if updateStatus { if err := r.SubnetService.UpdateSubnetSetStatus(&subnetSet); err != nil { return err @@ -375,32 +377,30 @@ func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet, delet } var deleteErrs []error for _, nsxSubnet := range nsxSubnets { - lock := r.SubnetService.LockSubnet(nsxSubnet.Path) - func() { - defer r.SubnetService.UnlockSubnet(nsxSubnet.Path, lock) - - portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id)) - if portNums > 0 { - hasStalePort = true - log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id) - return - } - if deleteBindingMaps { - if err := r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil { - deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err) - deleteErrs = append(deleteErrs, deleteErr) - log.Error(deleteErr, "Skipping to next Subnet") - return - } - } + if !r.SubnetPortService.IsEmptySubnet(*nsxSubnet.Id, *nsxSubnet.Path) { + hasStalePort = true + log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id) + continue + } - if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil { - deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err) + if deleteBindingMaps { + if err = r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil { + deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err) deleteErrs = append(deleteErrs, deleteErr) log.Error(deleteErr, "Skipping to next Subnet") + continue } - }() + } + + if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil { + deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err) + deleteErrs = append(deleteErrs, deleteErr) + log.Error(deleteErr, "Skipping to next Subnet") + } else { + r.SubnetPortService.DeletePortCount(*nsxSubnet.Path) + } + } if len(deleteErrs) > 0 { err = fmt.Errorf("multiple errors occurred while deleting Subnets: %v", deleteErrs) diff --git a/pkg/controllers/subnetset/subnetset_controller_test.go b/pkg/controllers/subnetset/subnetset_controller_test.go index ca70a2f42..2f9433811 100644 --- a/pkg/controllers/subnetset/subnetset_controller_test.go +++ b/pkg/controllers/subnetset/subnetset_controller_test.go @@ -119,7 +119,7 @@ func createFakeSubnetSetReconciler(objs []client.Object) *SubnetSetReconciler { Client: nil, NSXClient: &nsx.Client{}, }, - SubnetPortStore: nil, + SubnetPortStore: &subnetport.SubnetPortStore{}, } return &SubnetSetReconciler{ @@ -396,13 +396,8 @@ func TestReconcile_DeleteSubnetSet(t *testing.T) { return nil }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) { - id := "fake-subnetport-0" - return []*model.VpcSubnetPort{ - { - Id: &id, - }, - } + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "IsEmptySubnet", func(_ *subnetport.SubnetPortService, _ string) bool { + return false }) patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil diff --git a/pkg/mock/services_mock.go b/pkg/mock/services_mock.go index 963e34f0a..8d7cdc9a2 100644 --- a/pkg/mock/services_mock.go +++ b/pkg/mock/services_mock.go @@ -1,8 +1,6 @@ package mock import ( - "sync" - "github.com/stretchr/testify/mock" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "sigs.k8s.io/controller-runtime/pkg/client" @@ -71,9 +69,9 @@ func (m *MockSubnetServiceProvider) GetSubnetsByIndex(key, value string) []*mode return arg.Get(0).([]*model.VpcSubnet) } -func (m *MockSubnetServiceProvider) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { +func (m *MockSubnetServiceProvider) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { arg := m.Called(obj, vpcInfo, tags) - return arg.Get(0).(string), arg.Error(1) + return arg.Get(0).(*model.VpcSubnet), arg.Error(1) } func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []model.Tag { @@ -81,26 +79,26 @@ func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []mo return []model.Tag{} } -func (m *MockSubnetServiceProvider) LockSubnet(path *string) *sync.RWMutex { - return nil +type MockSubnetPortServiceProvider struct { + mock.Mock } -func (m *MockSubnetServiceProvider) UnlockSubnet(path *string, lock *sync.RWMutex) { +func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) { return } -func (m *MockSubnetServiceProvider) RLockSubnet(path *string) *sync.RWMutex { - return nil +func (m *MockSubnetPortServiceProvider) AllocatePortFromSubnet(subnet *model.VpcSubnet) bool { + return true } -func (m *MockSubnetServiceProvider) RUnlockSubnet(path *string, lock *sync.RWMutex) { +func (m *MockSubnetPortServiceProvider) ReleasePortInSubnet(path string) { return } -type MockSubnetPortServiceProvider struct { - mock.Mock +func (m *MockSubnetPortServiceProvider) IsEmptySubnet(id string, path string) bool { + return true } -func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) { +func (m *MockSubnetPortServiceProvider) DeletePortCount(path string) { return } diff --git a/pkg/nsx/services/common/services.go b/pkg/nsx/services/common/services.go index 1248720c5..b9e139fc9 100644 --- a/pkg/nsx/services/common/services.go +++ b/pkg/nsx/services/common/services.go @@ -2,7 +2,6 @@ package common import ( "context" - "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "sigs.k8s.io/controller-runtime/pkg/client" @@ -29,16 +28,16 @@ type SubnetServiceProvider interface { GetSubnetByKey(key string) (*model.VpcSubnet, error) GetSubnetByPath(path string) (*model.VpcSubnet, error) GetSubnetsByIndex(key, value string) []*model.VpcSubnet - CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (string, error) + CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) GenerateSubnetNSTags(obj client.Object) []model.Tag - LockSubnet(path *string) *sync.RWMutex - UnlockSubnet(path *string, lock *sync.RWMutex) - RLockSubnet(path *string) *sync.RWMutex - RUnlockSubnet(path *string, lock *sync.RWMutex) } type SubnetPortServiceProvider interface { GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) + AllocatePortFromSubnet(subnet *model.VpcSubnet) bool + ReleasePortInSubnet(path string) + IsEmptySubnet(id string, path string) bool + DeletePortCount(path string) } type NodeServiceReader interface { diff --git a/pkg/nsx/services/subnet/store.go b/pkg/nsx/services/subnet/store.go index 03dd4144d..a8ee9d230 100644 --- a/pkg/nsx/services/subnet/store.go +++ b/pkg/nsx/services/subnet/store.go @@ -2,7 +2,6 @@ package subnet import ( "errors" - "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" @@ -70,43 +69,6 @@ func subnetSetIndexFunc(obj interface{}) ([]string, error) { // SubnetStore is a store for subnet. type SubnetStore struct { common.ResourceStore - // save locks for subnet by path - pathLocks sync.Map -} - -func (subnetStore *SubnetStore) Add(i interface{}) error { - subnet := i.(*model.VpcSubnet) - if subnet.Path == nil { - log.Info("Store a subnet without path", "subnet", subnet) - return subnetStore.ResourceStore.Add(i) - } - lock := sync.RWMutex{} - subnetStore.pathLocks.LoadOrStore(*subnet.Path, &lock) - return subnetStore.ResourceStore.Add(i) -} - -func (subnetStore *SubnetStore) Delete(i interface{}) error { - subnet := i.(*model.VpcSubnet) - if subnet.Path == nil { - log.Info("Delete a subnet without path", "subnet", subnet) - return subnetStore.ResourceStore.Delete(i) - } - subnetStore.pathLocks.Delete(*subnet.Path) - return subnetStore.ResourceStore.Delete(i) -} - -func (subnetStore *SubnetStore) Lock(path string) *sync.RWMutex { - lock := sync.RWMutex{} - subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock) - subnetLock.(*sync.RWMutex).Lock() - return subnetLock.(*sync.RWMutex) -} - -func (subnetStore *SubnetStore) RLock(path string) *sync.RWMutex { - lock := sync.RWMutex{} - subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock) - subnetLock.(*sync.RWMutex).RLock() - return subnetLock.(*sync.RWMutex) } func (subnetStore *SubnetStore) Apply(i interface{}) error { diff --git a/pkg/nsx/services/subnet/subnet.go b/pkg/nsx/services/subnet/subnet.go index 9b5ab00e7..b14508d89 100644 --- a/pkg/nsx/services/subnet/subnet.go +++ b/pkg/nsx/services/subnet/subnet.go @@ -83,25 +83,25 @@ func InitializeSubnetService(service common.Service) (*SubnetService, error) { return subnetService, nil } -func (service *SubnetService) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (subnetPath string, err error) { - if subnetPath, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI); err != nil { +func (service *SubnetService) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (subnet *model.VpcSubnet, err error) { + if subnet, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI); err != nil { if nsxErr, ok := err.(*nsxutil.NSXApiError); ok { if *nsxErr.ErrorCode == ErrorCodeUnrecognizedField { log.Info("NSX does not support subnet_dhcp_config, using old API", "error", err) service.useLegacyAPI = true - subnetPath, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI) + subnet, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI) } } } - return subnetPath, err + return subnet, err } -func (service *SubnetService) createOrUpdateSubnetWithAPI(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag, useLegacyAPI bool) (string, error) { +func (service *SubnetService) createOrUpdateSubnetWithAPI(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag, useLegacyAPI bool) (*model.VpcSubnet, error) { uid := string(obj.GetUID()) nsxSubnet, err := service.buildSubnet(obj, tags, useLegacyAPI) if err != nil { log.Error(err, "Failed to build Subnet") - return "", err + return nil, err } // Only check whether it needs update when obj is v1alpha1.Subnet if subnet, ok := obj.(*v1alpha1.Subnet); ok { @@ -124,26 +124,26 @@ func (service *SubnetService) createOrUpdateSubnetWithAPI(obj client.Object, vpc } if !changed { log.Info("Subnet not changed, skip updating", "SubnetId", uid) - return uid, nil + return existingSubnet, nil } } return service.createOrUpdateSubnet(obj, nsxSubnet, &vpcInfo) } -func (service *SubnetService) createOrUpdateSubnet(obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (string, error) { +func (service *SubnetService) createOrUpdateSubnet(obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (*model.VpcSubnet, error) { orgRoot, err := service.WrapHierarchySubnet(nsxSubnet, vpcInfo) if err != nil { log.Error(err, "Failed to WrapHierarchySubnet") - return "", err + return nil, err } if err = service.NSXClient.OrgRootClient.Patch(*orgRoot, &EnforceRevisionCheckParam); err != nil { err = nsxutil.TransNSXApiError(err) - return "", err + return nil, err } // Get Subnet from NSX after patch operation as NSX renders several fields like `path`/`parent_path`. if *nsxSubnet, err = service.NSXClient.SubnetsClient.Get(vpcInfo.OrgID, vpcInfo.ProjectID, vpcInfo.VPCID, *nsxSubnet.Id); err != nil { err = nsxutil.TransNSXApiError(err) - return "", err + return nil, err } realizeService := realizestate.InitializeRealizeState(service.Service) backoff := wait.Backoff{ @@ -162,21 +162,21 @@ func (service *SubnetService) createOrUpdateSubnet(obj client.Object, nsxSubnet deleteErr := service.DeleteSubnet(*nsxSubnet) if deleteErr != nil { log.Error(deleteErr, "Failed to delete Subnet after realization check failure", "ID", *nsxSubnet.Id) - return "", fmt.Errorf("realization check failed: %v; deletion failed: %v", err, deleteErr) + return nil, fmt.Errorf("realization check failed: %v; deletion failed: %v", err, deleteErr) } - return "", err + return nil, err } if err = service.SubnetStore.Apply(nsxSubnet); err != nil { log.Error(err, "Failed to add subnet to store", "ID", *nsxSubnet.Id) - return "", err + return nil, err } if subnetSet, ok := obj.(*v1alpha1.SubnetSet); ok { if err = service.UpdateSubnetSetStatus(subnetSet); err != nil { - return "", err + return nil, err } } log.Info("Successfully updated nsxSubnet", "nsxSubnet", nsxSubnet) - return *nsxSubnet.Path, nil + return nsxSubnet, nil } func (service *SubnetService) DeleteSubnet(nsxSubnet model.VpcSubnet) error { @@ -478,37 +478,3 @@ func (service *SubnetService) UpdateSubnetSet(ns string, vpcSubnets []*model.Vpc } return nil } - -func (service *SubnetService) LockSubnet(path *string) *sync.RWMutex { - if path != nil && *path != "" { - log.V(1).Info("Locked Subnet for writing", "path", *path) - return service.SubnetStore.Lock(*path) - } - return nil -} - -func (service *SubnetService) UnlockSubnet(path *string, lock *sync.RWMutex) { - if lock != nil { - if path != nil && *path != "" { - log.V(1).Info("Unlocked Subnet for writing", "path", *path) - } - lock.Unlock() - } -} - -func (service *SubnetService) RLockSubnet(path *string) *sync.RWMutex { - if path != nil && *path != "" { - log.V(1).Info("Locked Subnet for reading", "path", *path) - return service.SubnetStore.RLock(*path) - } - return nil -} - -func (service *SubnetService) RUnlockSubnet(path *string, lock *sync.RWMutex) { - if lock != nil { - if path != nil && *path != "" { - log.V(1).Info("Unlocked Subnet for reading", "path", *path) - } - lock.RUnlock() - } -} diff --git a/pkg/nsx/services/subnet/subnet_test.go b/pkg/nsx/services/subnet/subnet_test.go index 7c03601da..9fc11ce8b 100644 --- a/pkg/nsx/services/subnet/subnet_test.go +++ b/pkg/nsx/services/subnet/subnet_test.go @@ -199,7 +199,7 @@ func TestInitializeSubnetService(t *testing.T) { }, subnetCRTags: []model.Tag{}, expectAllSubnetNumAfterCreate: 1, - expectCreateSubnetUID: fakeSubnetPath, + expectCreateSubnetUID: nsxSubnetID, }, { name: "Subnet exists and not change", @@ -232,7 +232,7 @@ func TestInitializeSubnetService(t *testing.T) { }, expectAllSubnetNum: 1, expectAllSubnetNumAfterCreate: 1, - expectCreateSubnetUID: subnetID, + expectCreateSubnetUID: nsxSubnetID, }, { name: "Subnet exists and changed", @@ -267,7 +267,7 @@ func TestInitializeSubnetService(t *testing.T) { }, expectAllSubnetNum: 1, expectAllSubnetNumAfterCreate: 1, - expectCreateSubnetUID: fakeSubnetPath, + expectCreateSubnetUID: nsxSubnetID, }, } @@ -307,9 +307,9 @@ func TestInitializeSubnetService(t *testing.T) { res := service.ListAllSubnet() assert.Equal(t, tc.expectAllSubnetNum, len(res)) - createdNSXSubnetUID, err := service.CreateOrUpdateSubnet(tc.existingSubnetCR, *tc.existingVPCInfo, tc.subnetCRTags) + createdNSXSubnet, err := service.CreateOrUpdateSubnet(tc.existingSubnetCR, *tc.existingVPCInfo, tc.subnetCRTags) assert.NoError(t, err) - assert.Equal(t, tc.expectCreateSubnetUID, createdNSXSubnetUID) + assert.Equal(t, tc.expectCreateSubnetUID, *createdNSXSubnet.Id) res = service.ListAllSubnet() @@ -398,8 +398,8 @@ func TestSubnetService_UpdateSubnetSet(t *testing.T) { }) patchesCreateOrUpdateSubnet := gomonkey.ApplyFunc((*SubnetService).createOrUpdateSubnet, - func(r *SubnetService, obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (string, error) { - return fakeSubnetPath, nil + func(r *SubnetService, obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (*model.VpcSubnet, error) { + return &model.VpcSubnet{Path: &fakeSubnetPath}, nil }) defer patchesCreateOrUpdateSubnet.Reset() diff --git a/pkg/nsx/services/subnetport/store.go b/pkg/nsx/services/subnetport/store.go index 493f2f867..30f6177a4 100644 --- a/pkg/nsx/services/subnetport/store.go +++ b/pkg/nsx/services/subnetport/store.go @@ -2,6 +2,7 @@ package subnetport import ( "errors" + "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "k8s.io/apimachinery/pkg/types" @@ -88,6 +89,17 @@ func subnetPortIndexPodNamespace(obj interface{}) ([]string, error) { // SubnetPortStore is a store for SubnetPorts type SubnetPortStore struct { common.ResourceStore + // PortCountInfo stores the Subnet and the information + // regarding SubnetPort count on that Subnet + PortCountInfo sync.Map +} + +type CountInfo struct { + // dirtyCount defines the number of SubnetPorts under creation in the Subnet + dirtyCount int + lock sync.Mutex + // totalIp defines the number of available IP in the Subnet + totalIp int } func (vs *SubnetPortStore) Apply(i interface{}) error { diff --git a/pkg/nsx/services/subnetport/subnetport.go b/pkg/nsx/services/subnetport/subnetport.go index 112188191..8f341408a 100644 --- a/pkg/nsx/services/subnetport/subnetport.go +++ b/pkg/nsx/services/subnetport/subnetport.go @@ -46,18 +46,20 @@ func InitializeSubnetPort(service servicecommon.Service) (*SubnetPortService, er subnetPortService := &SubnetPortService{Service: service} - subnetPortService.SubnetPortStore = &SubnetPortStore{ResourceStore: servicecommon.ResourceStore{ - Indexer: cache.NewIndexer( - keyFunc, - cache.Indexers{ - servicecommon.TagScopeSubnetPortCRUID: subnetPortIndexByCRUID, - servicecommon.TagScopePodUID: subnetPortIndexByPodUID, - servicecommon.TagScopeVMNamespace: subnetPortIndexNamespace, - servicecommon.TagScopeNamespace: subnetPortIndexPodNamespace, - servicecommon.IndexKeySubnetID: subnetPortIndexBySubnetID, - }), - BindingType: model.VpcSubnetPortBindingType(), - }} + subnetPortService.SubnetPortStore = &SubnetPortStore{ + ResourceStore: servicecommon.ResourceStore{ + Indexer: cache.NewIndexer( + keyFunc, + cache.Indexers{ + servicecommon.TagScopeSubnetPortCRUID: subnetPortIndexByCRUID, + servicecommon.TagScopePodUID: subnetPortIndexByPodUID, + servicecommon.TagScopeVMNamespace: subnetPortIndexNamespace, + servicecommon.TagScopeNamespace: subnetPortIndexPodNamespace, + servicecommon.IndexKeySubnetID: subnetPortIndexBySubnetID, + }), + BindingType: model.VpcSubnetPortBindingType(), + }, + } go subnetPortService.InitializeResourceStore(&wg, fatalErrors, ResourceTypeSubnetPort, nil, subnetPortService.SubnetPortStore) @@ -373,3 +375,68 @@ func (service *SubnetPortService) Cleanup(ctx context.Context) error { } return nil } + +// AllocatePortFromSubnet checks the number of SubnetPorts on the Subnet. +// If the Subnet has capacity for the new SubnetPorts, it will increase +// the number of SubnetPort under creation and return true. +func (service *SubnetPortService) AllocatePortFromSubnet(subnet *model.VpcSubnet) bool { + info := &CountInfo{} + obj, ok := service.SubnetPortStore.PortCountInfo.LoadOrStore(*subnet.Path, info) + info = obj.(*CountInfo) + + info.lock.Lock() + defer info.lock.Unlock() + if !ok { + totalIP := int(*subnet.Ipv4SubnetSize) + if len(subnet.IpAddresses) > 0 { + // totalIP will be overrided if IpAddresses are specified. + totalIP, _ = util.CalculateIPFromCIDRs(subnet.IpAddresses) + } + // NSX reserves 4 ip addresses in each subnet for network address, gateway address, + // dhcp server address and broadcast address. + info.totalIp = totalIP - 4 + } + + // Number of SubnetPorts on the Subnet includes the SubnetPorts under creation + // and the SubnetPorts already created + existingPortCount := len(service.GetPortsOfSubnet(*subnet.Id)) + if info.dirtyCount+existingPortCount < info.totalIp { + info.dirtyCount += 1 + log.V(2).Info("Allocate Subnetport to Subnet", "Subnet", *subnet.Path, "dirtyPortCount", info.dirtyCount, "existingPortCount", existingPortCount) + return true + } + return false +} + +// ReleasePortInSubnet decreases the number of SubnetPort under creation. +func (service *SubnetPortService) ReleasePortInSubnet(path string) { + obj, ok := service.SubnetPortStore.PortCountInfo.Load(path) + if !ok { + log.Error(nil, "Subnet does not have Subnetport to remove", "Subnet", path) + return + } + info := obj.(*CountInfo) + info.lock.Lock() + defer info.lock.Unlock() + if info.dirtyCount < 1 { + log.Error(nil, "Subnet does not have Subnetport to remove", "Subnet", path) + return + } + info.dirtyCount -= 1 + log.V(2).Info("Release Subnetport from Subnet", "Subnet", path, "dirtyPortCount", info.dirtyCount) +} + +// IsEmptySubnet check if there is any SubnetPort created or being creating on the Subnet. +func (service *SubnetPortService) IsEmptySubnet(id string, path string) bool { + portCount := len(service.GetPortsOfSubnet(id)) + obj, ok := service.SubnetPortStore.PortCountInfo.Load(path) + if ok { + info := obj.(*CountInfo) + portCount += info.dirtyCount + } + return portCount < 1 +} + +func (service *SubnetPortService) DeletePortCount(path string) { + service.SubnetPortStore.PortCountInfo.Delete(path) +} diff --git a/pkg/nsx/services/subnetport/subnetport_test.go b/pkg/nsx/services/subnetport/subnetport_test.go index 72cdc6ce6..bdd42ea23 100644 --- a/pkg/nsx/services/subnetport/subnetport_test.go +++ b/pkg/nsx/services/subnetport/subnetport_test.go @@ -813,6 +813,24 @@ func TestSubnetPortService_ListSubnetPortByPodName(t *testing.T) { assert.Equal(t, subnetPort2, subnetPorts[0]) } +func TestSubnetPortService_AllocatePortFromSubnet(t *testing.T) { + subnetPath := "subnet-path-1" + subnetId := "subnet-id-1" + subnetPortService := createSubnetPortService() + ok := subnetPortService.AllocatePortFromSubnet(&model.VpcSubnet{ + Ipv4SubnetSize: common.Int64(16), + IpAddresses: []string{"10.0.0.1/28"}, + Path: &subnetPath, + Id: &subnetId, + }) + assert.True(t, ok) + empty := subnetPortService.IsEmptySubnet(subnetId, subnetPath) + assert.False(t, empty) + subnetPortService.ReleasePortInSubnet(subnetPath) + empty = subnetPortService.IsEmptySubnet(subnetId, subnetPath) + assert.True(t, empty) +} + func createSubnetPortService() *SubnetPortService { return &SubnetPortService{ SubnetPortStore: &SubnetPortStore{ResourceStore: common.ResourceStore{