Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Subnet lock for SubnetPort creation #974

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
"github.com/vmware-tanzu/nsx-operator/pkg/metrics"
servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/util"
)

var (
Expand All @@ -29,19 +28,11 @@ var (

func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servicecommon.VPCServiceProvider, subnetService servicecommon.SubnetServiceProvider, subnetPortService servicecommon.SubnetPortServiceProvider) (string, error) {
// Use SubnetSet uuid lock to make sure when multiple ports are created on the same SubnetSet, only one Subnet will be created
subnetSetLock := lockSubnetSet(subnetSet.GetUID())
defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock)
subnetSetLock := LockSubnetSet(subnetSet.GetUID())
defer UnlockSubnetSet(subnetSet.GetUID(), subnetSetLock)
subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID()))
for _, nsxSubnet := range subnetList {
portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
// NSX reserves 4 ip addresses in each subnet for network address, gateway address,
// dhcp server address and broadcast address.
if portNums < totalIP-4 {
if subnetPortService.AllocatePortFromSubnet(nsxSubnet) {
return *nsxSubnet.Path, nil
}
}
Expand All @@ -56,7 +47,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
log.Error(err, "Failed to allocate Subnet")
return "", err
}
return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
nsxSubnet, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
if err != nil {
return "", err
}
subnetPortService.AllocatePortFromSubnet(nsxSubnet)
return *nsxSubnet.Path, nil
}

func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) {
Expand Down Expand Up @@ -241,15 +237,15 @@ func NewStatusUpdater(client k8sclient.Client, nsxConfig *config.NSXOperatorConf
}
}

func lockSubnetSet(uuid types.UID) *sync.Mutex {
func LockSubnetSet(uuid types.UID) *sync.Mutex {
lock := sync.Mutex{}
subnetSetLock, _ := SubnetSetLocks.LoadOrStore(uuid, &lock)
log.V(1).Info("Lock SubnetSet", "uuid", uuid)
subnetSetLock.(*sync.Mutex).Lock()
return subnetSetLock.(*sync.Mutex)
}

func unlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) {
func UnlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) {
if subnetSetLock != nil {
log.V(1).Info("Unlock SubnetSet", "uuid", uuid)
subnetSetLock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestAllocateSubnetFromSubnetSet(t *testing.T) {
Return([]*model.VpcSubnet{})
ssp.(*pkg_mock.MockSubnetServiceProvider).On("GenerateSubnetNSTags", mock.Anything)
vsp.(*pkg_mock.MockVPCServiceProvider).On("ListVPCInfo", mock.Anything).Return([]servicecommon.VPCResourceInfo{{}})
ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedSubnetPath, nil)
ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.VpcSubnet{Path: &expectedSubnetPath}, nil)
},
expectedResult: expectedSubnetPath,
},
Expand Down
20 changes: 9 additions & 11 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

if !podIsDeleted(pod) {
r.StatusUpdater.IncreaseUpdateTotal()
nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod)
isExisting, nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod)
if err != nil {
log.Error(err, "failed to get NSX resource path from subnet", "pod.Name", pod.Name, "pod.UID", pod.UID)
return common.ResultRequeue, err
}
if !isExisting {
defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath)
}
log.Info("got NSX subnet for pod", "NSX subnet path", nsxSubnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID)
node, err := r.GetNodeByName(pod.Spec.NodeName)
if err != nil {
Expand All @@ -87,11 +90,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return common.ResultRequeue, err
}
contextID := *node.UniqueId
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
return common.ResultRequeue, err
Expand Down Expand Up @@ -200,24 +198,24 @@ func (r *PodReconciler) CollectGarbage(ctx context.Context) {
}
}

func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (string, error) {
func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (bool, string, error) {
subnetPortIDForPod := r.SubnetPortService.BuildSubnetPortId(&pod.ObjectMeta)
subnetPath := r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortIDForPod)
if len(subnetPath) > 0 {
log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "pod.UID", pod.UID, "subnetPath", subnetPath)
return subnetPath, nil
return true, subnetPath, nil
}
subnetSet, err := common.GetDefaultSubnetSet(r.SubnetPortService.Client, ctx, pod.Namespace, servicecommon.LabelDefaultPodSubnetSet)
if err != nil {
return "", err
return false, "", err
}
log.Info("got default subnetset for pod, allocating the NSX subnet", "subnetSet.Name", subnetSet.Name, "subnetSet.UID", subnetSet.UID, "pod.Name", pod.Name, "pod.UID", pod.UID)
subnetPath, err = common.AllocateSubnetFromSubnetSet(subnetSet, r.VPCService, r.SubnetService, r.SubnetPortService)
if err != nil {
return subnetPath, err
return false, subnetPath, err
}
log.Info("allocated NSX subnet for pod", "nsxSubnetPath", subnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID)
return subnetPath, nil
return false, subnetPath, nil
}

func podIsDeleted(pod *v1.Pod) bool {
Expand Down
26 changes: 15 additions & 11 deletions pkg/controllers/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestPodReconciler_Reconcile(t *testing.T) {
},
},
},
SubnetPortStore: &subnetport.SubnetPortStore{},
},
SubnetService: &subnet.SubnetService{
SubnetStore: &subnet.SubnetStore{},
Expand Down Expand Up @@ -99,8 +100,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patchesGetSubnetPathForPod := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "", errors.New("failed to get subnet path")
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "", errors.New("failed to get subnet path")
})
return patchesGetSubnetPathForPod
},
Expand All @@ -124,8 +125,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand All @@ -145,8 +146,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand All @@ -170,8 +171,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand Down Expand Up @@ -199,8 +200,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand Down Expand Up @@ -386,6 +387,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) {
prepareFunc func(*testing.T, *PodReconciler) *gomonkey.Patches
expectedErr string
expectedSubnetPath string
expectedIsExisting bool
}{
{
name: "SubnetExisted",
Expand All @@ -397,6 +399,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) {
return patches
},
expectedSubnetPath: subnetPath,
expectedIsExisting: true,
},
{
name: "NoGetDefaultSubnetSet",
Expand Down Expand Up @@ -466,7 +469,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
patches := tt.prepareFunc(t, r)
defer patches.Reset()
path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{
isExisting, path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
Expand All @@ -477,6 +480,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) {
} else {
assert.Nil(t, err)
assert.Equal(t, subnetPath, path)
assert.Equal(t, tt.expectedIsExisting, isExisting)
}
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/subnet/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id)
return err
}
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id)
}
log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets))
Expand Down
28 changes: 20 additions & 8 deletions pkg/controllers/subnet/subnet_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
tags2 := []model.Tag{{Scope: common.String(common.TagScopeSubnetCRUID), Tag: common.String("fake-id2")}}
var nsxSubnets []*model.VpcSubnet
id1 := "fake-id1"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1, Path: common.String("fake-path")})
id2 := "fake-id2"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2, Path: common.String("fake-path")})
return nsxSubnets
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) {
Expand All @@ -64,6 +64,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
patch.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patch
},
},
Expand Down Expand Up @@ -272,6 +275,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down Expand Up @@ -343,6 +349,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultRequeue,
Expand Down Expand Up @@ -418,6 +427,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down Expand Up @@ -488,8 +500,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
{OrgID: "org-id", ProjectID: "project-id", VPCID: "vpc-id", ID: "fake-id"},
}
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) {
return "", errors.New("create or update failed")
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) {
return nil, errors.New("create or update failed")
})
return patches
},
Expand Down Expand Up @@ -521,8 +533,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
}
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) {
return "", nil
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) {
return nil, nil
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) {
Expand Down Expand Up @@ -575,8 +587,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
}
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) {
return "", nil
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) {
return nil, nil
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) {
Expand Down
19 changes: 12 additions & 7 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.StatusUpdater.IncreaseUpdateTotal()

old_status := subnetPort.Status.DeepCopy()
isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort)
isExisting, isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort)
if isParentResourceTerminating {
err = errors.New("parent resource is terminating, SubnetPort cannot be created")
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService)
Expand All @@ -104,15 +104,14 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get NSX resource path from Subnet", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
if !isExisting {
defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath)
}
labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort)
if err != nil {
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
Expand Down Expand Up @@ -426,7 +425,7 @@ func getExistingConditionOfType(conditionType v1alpha1.ConditionType, existingCo
return nil
}

func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (isStale bool, subnetPath string, err error) {
func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (existing bool, isStale bool, subnetPath string, err error) {
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta)
subnetPath = r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortID)
if len(subnetPath) > 0 {
Expand All @@ -442,6 +441,7 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
}
} else {
log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "subnetPort.UID", subnetPort.UID, "subnetPath", subnetPath)
existing = true
return
}
}
Expand Down Expand Up @@ -469,7 +469,12 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList)
return
}
subnetPath = *subnetList[0].Path
nsxSubnet := subnetList[0]
if !r.SubnetPortService.AllocatePortFromSubnet(nsxSubnet) {
err = fmt.Errorf("no valid IP in Subnet %s", *nsxSubnet.Path)
return
}
subnetPath = *nsxSubnet.Path
} else if len(subnetPort.Spec.SubnetSet) > 0 {
subnetSet := &v1alpha1.SubnetSet{}
namespacedName := types.NamespacedName{
Expand Down
Loading
Loading