Skip to content

Commit

Permalink
Refactor Subnet lock for SubnetPort creation
Browse files Browse the repository at this point in the history
Signed-off-by: Yanjun Zhou <[email protected]>
  • Loading branch information
yanjunz97 committed Dec 23, 2024
1 parent 86c0d65 commit f78aabf
Show file tree
Hide file tree
Showing 15 changed files with 225 additions and 149 deletions.
10 changes: 7 additions & 3 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock)
subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID()))
for _, nsxSubnet := range subnetList {
portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
// NSX reserves 4 ip addresses in each subnet for network address, gateway address,
// dhcp server address and broadcast address.
if portNums < totalIP-4 {
if subnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) {
return *nsxSubnet.Path, nil
}
}
Expand All @@ -56,7 +55,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
log.Error(err, "Failed to allocate Subnet")
return "", err
}
return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
path, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
if err != nil {
return path, err
}
subnetPortService.AddPortToSubnet(path, 0)
return path, nil
}

func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) {
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,21 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if err != nil {
// The error at the very beginning of the operator startup is expected because at that time the node may be not cached yet. We can expect the retry to become normal.
log.Error(err, "failed to get node ID for pod", "pod.Name", req.NamespacedName, "pod.UID", pod.UID, "node", pod.Spec.NodeName)
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
return common.ResultRequeue, err
}
contextID := *node.UniqueId
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
return common.ResultRequeue, err
}
_, err = r.SubnetPortService.CreateOrUpdateSubnetPort(pod, nsxSubnet, contextID, &pod.ObjectMeta.Labels)
if err != nil {
// Remove SubnetPort from Subnet if the SubnetPort is not saved to store
if subnetport.IsNotCreatedError(err) {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
}
r.StatusUpdater.UpdateFail(ctx, pod, err, "", nil)
return common.ResultRequeue, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestPodReconciler_Reconcile(t *testing.T) {
},
},
},
SubnetPortStore: &subnetport.SubnetPortStore{},
},
SubnetService: &subnet.SubnetService{
SubnetStore: &subnet.SubnetStore{},
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/subnet/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id)
return err
}
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id)
}
log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets))
Expand Down
16 changes: 14 additions & 2 deletions pkg/controllers/subnet/subnet_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
tags2 := []model.Tag{{Scope: common.String(common.TagScopeSubnetCRUID), Tag: common.String("fake-id2")}}
var nsxSubnets []*model.VpcSubnet
id1 := "fake-id1"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1, Path: common.String("fake-path")})
id2 := "fake-id2"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2, Path: common.String("fake-path")})
return nsxSubnets
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) {
Expand All @@ -64,6 +64,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
patch.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patch
},
},
Expand Down Expand Up @@ -272,6 +275,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down Expand Up @@ -343,6 +349,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultRequeue,
Expand Down Expand Up @@ -418,6 +427,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down
22 changes: 17 additions & 5 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,23 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, fmt.Sprintf("Failed to get Subnet by path: %s", nsxSubnetPath), setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(subnetPort, nsxSubnet, "", labels)
if err != nil {
// Remove SubnetPort from Subnet if the SubnetPort is not saved to store
if subnetport.IsNotCreatedError(err) {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
}
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
Expand Down Expand Up @@ -469,7 +471,17 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList)
return
}
subnetPath = *subnetList[0].Path
nsxSubnet := subnetList[0]
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
subnetPath = *nsxSubnet.Path
if !r.SubnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) {
err = fmt.Errorf("no valid IP in Subnet %s", subnetPath)
return
}
} else if len(subnetPort.Spec.SubnetSet) > 0 {
subnetSet := &v1alpha1.SubnetSet{}
namespacedName := types.NamespacedName{
Expand Down
13 changes: 9 additions & 4 deletions pkg/controllers/subnetport/subnetport_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,13 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) {
k8sClient := mock_client.NewMockClient(mockCtl)
defer mockCtl.Finish()
r := &SubnetPortReconciler{
Client: k8sClient,
SubnetPortService: &subnetport.SubnetPortService{},
SubnetService: &subnet.SubnetService{},
Client: k8sClient,
SubnetPortService: &subnetport.SubnetPortService{
SubnetPortStore: &subnetport.SubnetPortStore{
PortCountMap: make(map[string]*subnetport.CountWithMark),
},
},
SubnetService: &subnet.SubnetService{},
}

tests := []struct {
Expand Down Expand Up @@ -725,7 +729,8 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) {
patches.ApplyFunc((*subnet.SubnetService).GetSubnetsByIndex,
func(s *subnet.SubnetService, key string, value string) []*model.VpcSubnet {
return []*model.VpcSubnet{{
Path: servicecommon.String("subnet-path-1"),
Path: servicecommon.String("subnet-path-1"),
Ipv4SubnetSize: servicecommon.Int64(16),
}}
})
return patches
Expand Down
40 changes: 19 additions & 21 deletions pkg/controllers/subnetset/subnetset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,32 +375,30 @@ func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet, delet
}
var deleteErrs []error
for _, nsxSubnet := range nsxSubnets {
lock := r.SubnetService.LockSubnet(nsxSubnet.Path)
func() {
defer r.SubnetService.UnlockSubnet(nsxSubnet.Path, lock)

portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
if portNums > 0 {
hasStalePort = true
log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id)
return
}

if deleteBindingMaps {
if err := r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
return
}
}
if !r.SubnetPortService.IsEmptySubnet(*nsxSubnet.Path) {
hasStalePort = true
log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id)
continue
}

if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
if deleteBindingMaps {
if err = r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
continue
}
}()
}

if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
} else {
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
}

}
if len(deleteErrs) > 0 {
err = fmt.Errorf("multiple errors occurred while deleting Subnets: %v", deleteErrs)
Expand Down
11 changes: 3 additions & 8 deletions pkg/controllers/subnetset/subnetset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func createFakeSubnetSetReconciler(objs []client.Object) *SubnetSetReconciler {
Client: nil,
NSXClient: &nsx.Client{},
},
SubnetPortStore: nil,
SubnetPortStore: &subnetport.SubnetPortStore{},
}

return &SubnetSetReconciler{
Expand Down Expand Up @@ -396,13 +396,8 @@ func TestReconcile_DeleteSubnetSet(t *testing.T) {
return nil
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) {
id := "fake-subnetport-0"
return []*model.VpcSubnetPort{
{
Id: &id,
},
}
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "IsEmptySubnet", func(_ *subnetport.SubnetPortService, _ string) bool {
return false
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
Expand Down
20 changes: 9 additions & 11 deletions pkg/mock/services_mock.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package mock

import (
"sync"

"github.com/stretchr/testify/mock"
"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -81,26 +79,26 @@ func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []mo
return []model.Tag{}
}

func (m *MockSubnetServiceProvider) LockSubnet(path *string) *sync.RWMutex {
return nil
type MockSubnetPortServiceProvider struct {
mock.Mock
}

func (m *MockSubnetServiceProvider) UnlockSubnet(path *string, lock *sync.RWMutex) {
func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) {
return
}

func (m *MockSubnetServiceProvider) RLockSubnet(path *string) *sync.RWMutex {
return nil
func (m *MockSubnetPortServiceProvider) AddPortToSubnet(path string, size int) bool {
return true
}

func (m *MockSubnetServiceProvider) RUnlockSubnet(path *string, lock *sync.RWMutex) {
func (m *MockSubnetPortServiceProvider) RemovePortFromSubnet(path string) {
return
}

type MockSubnetPortServiceProvider struct {
mock.Mock
func (m *MockSubnetPortServiceProvider) IsEmptySubnet(path string) bool {
return true
}

func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) {
func (m *MockSubnetPortServiceProvider) DeletePortCount(path string) {
return
}
9 changes: 4 additions & 5 deletions pkg/nsx/services/common/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package common

import (
"context"
"sync"

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -31,14 +30,14 @@ type SubnetServiceProvider interface {
GetSubnetsByIndex(key, value string) []*model.VpcSubnet
CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (string, error)
GenerateSubnetNSTags(obj client.Object) []model.Tag
LockSubnet(path *string) *sync.RWMutex
UnlockSubnet(path *string, lock *sync.RWMutex)
RLockSubnet(path *string) *sync.RWMutex
RUnlockSubnet(path *string, lock *sync.RWMutex)
}

type SubnetPortServiceProvider interface {
GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort)
AddPortToSubnet(path string, size int) bool
RemovePortFromSubnet(path string)
IsEmptySubnet(path string) bool
DeletePortCount(path string)
}

type NodeServiceReader interface {
Expand Down
Loading

0 comments on commit f78aabf

Please sign in to comment.