From ac515abb34ff0923b58807e2047ca782b7b680b1 Mon Sep 17 00:00:00 2001 From: Yanjun Zhou Date: Fri, 20 Dec 2024 11:01:37 +0800 Subject: [PATCH] Refactor Subnet lock for SubnetPort creation Signed-off-by: Yanjun Zhou --- pkg/controllers/common/utils.go | 10 ++- pkg/controllers/pod/pod_controller.go | 11 +-- pkg/controllers/subnet/subnet_controller.go | 1 + .../subnetport/subnetport_controller.go | 22 ++++-- .../subnetset/subnetset_controller.go | 40 +++++----- .../subnetset/subnetset_controller_test.go | 11 +-- pkg/mock/services_mock.go | 20 +++-- pkg/nsx/services/common/services.go | 9 +-- pkg/nsx/services/subnet/store.go | 38 ---------- pkg/nsx/services/subnet/subnet.go | 34 --------- pkg/nsx/services/subnetport/store.go | 75 +++++++++++++++++++ pkg/nsx/services/subnetport/subnetport.go | 73 ++++++++++++++---- 12 files changed, 201 insertions(+), 143 deletions(-) diff --git a/pkg/controllers/common/utils.go b/pkg/controllers/common/utils.go index f9a1a93a0..66fef6889 100644 --- a/pkg/controllers/common/utils.go +++ b/pkg/controllers/common/utils.go @@ -33,7 +33,6 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock) subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID())) for _, nsxSubnet := range subnetList { - portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id)) totalIP := int(*nsxSubnet.Ipv4SubnetSize) if len(nsxSubnet.IpAddresses) > 0 { // totalIP will be overrided if IpAddresses are specified. @@ -41,7 +40,7 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi } // NSX reserves 4 ip addresses in each subnet for network address, gateway address, // dhcp server address and broadcast address. - if portNums < totalIP-4 { + if subnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) { return *nsxSubnet.Path, nil } } @@ -56,7 +55,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi log.Error(err, "Failed to allocate Subnet") return "", err } - return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags) + path, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags) + if err != nil { + return path, err + } + subnetPortService.AddPortToSubnet(path, 0) + return path, nil } func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) { diff --git a/pkg/controllers/pod/pod_controller.go b/pkg/controllers/pod/pod_controller.go index 3ce5035f8..8ecac36ee 100644 --- a/pkg/controllers/pod/pod_controller.go +++ b/pkg/controllers/pod/pod_controller.go @@ -84,20 +84,21 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R if err != nil { // The error at the very beginning of the operator startup is expected because at that time the node may be not cached yet. We can expect the retry to become normal. log.Error(err, "failed to get node ID for pod", "pod.Name", req.NamespacedName, "pod.UID", pod.UID, "node", pod.Spec.NodeName) + r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath) return common.ResultRequeue, err } contextID := *node.UniqueId - // There is a race condition that the subnetset controller may delete the - // subnet during CollectGarbage. So check the subnet under lock. - lock := r.SubnetService.RLockSubnet(&nsxSubnetPath) - defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock) - nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath) if err != nil { + r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath) return common.ResultRequeue, err } _, err = r.SubnetPortService.CreateOrUpdateSubnetPort(pod, nsxSubnet, contextID, &pod.ObjectMeta.Labels) if err != nil { + // Remove SubnetPort from Subnet if the SubnetPort is not saved to store + if subnetport.IsNotCreatedError(err) { + r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath) + } r.StatusUpdater.UpdateFail(ctx, pod, err, "", nil) return common.ResultRequeue, err } diff --git a/pkg/controllers/subnet/subnet_controller.go b/pkg/controllers/subnet/subnet_controller.go index b4fe1b0fa..5d21896b9 100644 --- a/pkg/controllers/subnet/subnet_controller.go +++ b/pkg/controllers/subnet/subnet_controller.go @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error { log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id) return err } + r.SubnetPortService.DeletePortCount(*nsxSubnet.Path) log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id) } log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets)) diff --git a/pkg/controllers/subnetport/subnetport_controller.go b/pkg/controllers/subnetport/subnetport_controller.go index 80cac49be..19cdc8f82 100644 --- a/pkg/controllers/subnetport/subnetport_controller.go +++ b/pkg/controllers/subnetport/subnetport_controller.go @@ -106,21 +106,23 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request) } labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort) if err != nil { + r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath) r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService) return common.ResultRequeue, err } - // There is a race condition that the subnetset controller may delete the - // subnet during CollectGarbage. So check the subnet under lock. - lock := r.SubnetService.RLockSubnet(&nsxSubnetPath) - defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock) nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath) if err != nil { + r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath) r.StatusUpdater.UpdateFail(ctx, subnetPort, err, fmt.Sprintf("Failed to get Subnet by path: %s", nsxSubnetPath), setSubnetPortReadyStatusFalse, r.SubnetPortService) return common.ResultRequeue, err } nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(subnetPort, nsxSubnet, "", labels) if err != nil { + // Remove SubnetPort from Subnet if the SubnetPort is not saved to store + if subnetport.IsNotCreatedError(err) { + r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath) + } r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService) return common.ResultRequeue, err } @@ -469,7 +471,17 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList) return } - subnetPath = *subnetList[0].Path + nsxSubnet := subnetList[0] + totalIP := int(*nsxSubnet.Ipv4SubnetSize) + if len(nsxSubnet.IpAddresses) > 0 { + // totalIP will be overrided if IpAddresses are specified. + totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses) + } + subnetPath = *nsxSubnet.Path + if !r.SubnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) { + err = fmt.Errorf("no valid IP in Subnet %s", subnetPath) + return + } } else if len(subnetPort.Spec.SubnetSet) > 0 { subnetSet := &v1alpha1.SubnetSet{} namespacedName := types.NamespacedName{ diff --git a/pkg/controllers/subnetset/subnetset_controller.go b/pkg/controllers/subnetset/subnetset_controller.go index 3ae5dec4a..08a61f1cf 100644 --- a/pkg/controllers/subnetset/subnetset_controller.go +++ b/pkg/controllers/subnetset/subnetset_controller.go @@ -375,32 +375,30 @@ func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet, delet } var deleteErrs []error for _, nsxSubnet := range nsxSubnets { - lock := r.SubnetService.LockSubnet(nsxSubnet.Path) - func() { - defer r.SubnetService.UnlockSubnet(nsxSubnet.Path, lock) - - portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id)) - if portNums > 0 { - hasStalePort = true - log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id) - return - } - if deleteBindingMaps { - if err := r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil { - deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err) - deleteErrs = append(deleteErrs, deleteErr) - log.Error(deleteErr, "Skipping to next Subnet") - return - } - } + if !r.SubnetPortService.IsEmptySubnet(*nsxSubnet.Path) { + hasStalePort = true + log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id) + continue + } - if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil { - deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err) + if deleteBindingMaps { + if err = r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil { + deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err) deleteErrs = append(deleteErrs, deleteErr) log.Error(deleteErr, "Skipping to next Subnet") + continue } - }() + } + + if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil { + deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err) + deleteErrs = append(deleteErrs, deleteErr) + log.Error(deleteErr, "Skipping to next Subnet") + } else { + r.SubnetPortService.DeletePortCount(*nsxSubnet.Path) + } + } if len(deleteErrs) > 0 { err = fmt.Errorf("multiple errors occurred while deleting Subnets: %v", deleteErrs) diff --git a/pkg/controllers/subnetset/subnetset_controller_test.go b/pkg/controllers/subnetset/subnetset_controller_test.go index ca70a2f42..2f9433811 100644 --- a/pkg/controllers/subnetset/subnetset_controller_test.go +++ b/pkg/controllers/subnetset/subnetset_controller_test.go @@ -119,7 +119,7 @@ func createFakeSubnetSetReconciler(objs []client.Object) *SubnetSetReconciler { Client: nil, NSXClient: &nsx.Client{}, }, - SubnetPortStore: nil, + SubnetPortStore: &subnetport.SubnetPortStore{}, } return &SubnetSetReconciler{ @@ -396,13 +396,8 @@ func TestReconcile_DeleteSubnetSet(t *testing.T) { return nil }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) { - id := "fake-subnetport-0" - return []*model.VpcSubnetPort{ - { - Id: &id, - }, - } + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "IsEmptySubnet", func(_ *subnetport.SubnetPortService, _ string) bool { + return false }) patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil diff --git a/pkg/mock/services_mock.go b/pkg/mock/services_mock.go index 963e34f0a..9f40bb6ed 100644 --- a/pkg/mock/services_mock.go +++ b/pkg/mock/services_mock.go @@ -1,8 +1,6 @@ package mock import ( - "sync" - "github.com/stretchr/testify/mock" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "sigs.k8s.io/controller-runtime/pkg/client" @@ -81,26 +79,26 @@ func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []mo return []model.Tag{} } -func (m *MockSubnetServiceProvider) LockSubnet(path *string) *sync.RWMutex { - return nil +type MockSubnetPortServiceProvider struct { + mock.Mock } -func (m *MockSubnetServiceProvider) UnlockSubnet(path *string, lock *sync.RWMutex) { +func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) { return } -func (m *MockSubnetServiceProvider) RLockSubnet(path *string) *sync.RWMutex { - return nil +func (m *MockSubnetPortServiceProvider) AddPortToSubnet(path string, size int) bool { + return true } -func (m *MockSubnetServiceProvider) RUnlockSubnet(path *string, lock *sync.RWMutex) { +func (m *MockSubnetPortServiceProvider) RemovePortFromSubnet(path string) { return } -type MockSubnetPortServiceProvider struct { - mock.Mock +func (m *MockSubnetPortServiceProvider) IsEmptySubnet(path string) bool { + return true } -func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) { +func (m *MockSubnetPortServiceProvider) DeletePortCount(path string) { return } diff --git a/pkg/nsx/services/common/services.go b/pkg/nsx/services/common/services.go index 1248720c5..f76b0c4b9 100644 --- a/pkg/nsx/services/common/services.go +++ b/pkg/nsx/services/common/services.go @@ -2,7 +2,6 @@ package common import ( "context" - "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "sigs.k8s.io/controller-runtime/pkg/client" @@ -31,14 +30,14 @@ type SubnetServiceProvider interface { GetSubnetsByIndex(key, value string) []*model.VpcSubnet CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (string, error) GenerateSubnetNSTags(obj client.Object) []model.Tag - LockSubnet(path *string) *sync.RWMutex - UnlockSubnet(path *string, lock *sync.RWMutex) - RLockSubnet(path *string) *sync.RWMutex - RUnlockSubnet(path *string, lock *sync.RWMutex) } type SubnetPortServiceProvider interface { GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) + AddPortToSubnet(path string, size int) bool + RemovePortFromSubnet(path string) + IsEmptySubnet(path string) bool + DeletePortCount(path string) } type NodeServiceReader interface { diff --git a/pkg/nsx/services/subnet/store.go b/pkg/nsx/services/subnet/store.go index 03dd4144d..a8ee9d230 100644 --- a/pkg/nsx/services/subnet/store.go +++ b/pkg/nsx/services/subnet/store.go @@ -2,7 +2,6 @@ package subnet import ( "errors" - "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" @@ -70,43 +69,6 @@ func subnetSetIndexFunc(obj interface{}) ([]string, error) { // SubnetStore is a store for subnet. type SubnetStore struct { common.ResourceStore - // save locks for subnet by path - pathLocks sync.Map -} - -func (subnetStore *SubnetStore) Add(i interface{}) error { - subnet := i.(*model.VpcSubnet) - if subnet.Path == nil { - log.Info("Store a subnet without path", "subnet", subnet) - return subnetStore.ResourceStore.Add(i) - } - lock := sync.RWMutex{} - subnetStore.pathLocks.LoadOrStore(*subnet.Path, &lock) - return subnetStore.ResourceStore.Add(i) -} - -func (subnetStore *SubnetStore) Delete(i interface{}) error { - subnet := i.(*model.VpcSubnet) - if subnet.Path == nil { - log.Info("Delete a subnet without path", "subnet", subnet) - return subnetStore.ResourceStore.Delete(i) - } - subnetStore.pathLocks.Delete(*subnet.Path) - return subnetStore.ResourceStore.Delete(i) -} - -func (subnetStore *SubnetStore) Lock(path string) *sync.RWMutex { - lock := sync.RWMutex{} - subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock) - subnetLock.(*sync.RWMutex).Lock() - return subnetLock.(*sync.RWMutex) -} - -func (subnetStore *SubnetStore) RLock(path string) *sync.RWMutex { - lock := sync.RWMutex{} - subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock) - subnetLock.(*sync.RWMutex).RLock() - return subnetLock.(*sync.RWMutex) } func (subnetStore *SubnetStore) Apply(i interface{}) error { diff --git a/pkg/nsx/services/subnet/subnet.go b/pkg/nsx/services/subnet/subnet.go index 9b5ab00e7..fb064be79 100644 --- a/pkg/nsx/services/subnet/subnet.go +++ b/pkg/nsx/services/subnet/subnet.go @@ -478,37 +478,3 @@ func (service *SubnetService) UpdateSubnetSet(ns string, vpcSubnets []*model.Vpc } return nil } - -func (service *SubnetService) LockSubnet(path *string) *sync.RWMutex { - if path != nil && *path != "" { - log.V(1).Info("Locked Subnet for writing", "path", *path) - return service.SubnetStore.Lock(*path) - } - return nil -} - -func (service *SubnetService) UnlockSubnet(path *string, lock *sync.RWMutex) { - if lock != nil { - if path != nil && *path != "" { - log.V(1).Info("Unlocked Subnet for writing", "path", *path) - } - lock.Unlock() - } -} - -func (service *SubnetService) RLockSubnet(path *string) *sync.RWMutex { - if path != nil && *path != "" { - log.V(1).Info("Locked Subnet for reading", "path", *path) - return service.SubnetStore.RLock(*path) - } - return nil -} - -func (service *SubnetService) RUnlockSubnet(path *string, lock *sync.RWMutex) { - if lock != nil { - if path != nil && *path != "" { - log.V(1).Info("Unlocked Subnet for reading", "path", *path) - } - lock.RUnlock() - } -} diff --git a/pkg/nsx/services/subnetport/store.go b/pkg/nsx/services/subnetport/store.go index 493f2f867..2283ec713 100644 --- a/pkg/nsx/services/subnetport/store.go +++ b/pkg/nsx/services/subnetport/store.go @@ -2,6 +2,7 @@ package subnetport import ( "errors" + "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "k8s.io/apimachinery/pkg/types" @@ -88,6 +89,8 @@ func subnetPortIndexPodNamespace(obj interface{}) ([]string, error) { // SubnetPortStore is a store for SubnetPorts type SubnetPortStore struct { common.ResourceStore + portCountMap map[string]*CountWithMark + portCountMapLock sync.Mutex } func (vs *SubnetPortStore) Apply(i interface{}) error { @@ -128,3 +131,75 @@ func (subnetPortStore *SubnetPortStore) GetByIndex(key string, value string) []* } return subnetPorts } + +func (subnetPortStore *SubnetPortStore) AddPortToSubnet(path string, size int) bool { + subnetPortStore.portCountMapLock.Lock() + defer subnetPortStore.portCountMapLock.Unlock() + count, ok := subnetPortStore.portCountMap[path] + if !ok { + count = &CountWithMark{ + value: 0, + markForDelete: false, + } + } + // size 0 indicates the Subnet is empty + if !count.markForDelete && (count.value < size || size == 0) { + count.value += 1 + subnetPortStore.portCountMap[path] = count + log.V(2).Info("Add Subnetport to Subnet", "Subnet", path, "count", count.value) + return true + } + return false +} + +func (subnetPortStore *SubnetPortStore) RemovePortFromSubnet(path string) { + subnetPortStore.portCountMapLock.Lock() + defer subnetPortStore.portCountMapLock.Unlock() + count, ok := subnetPortStore.portCountMap[path] + if !ok || count.value < 1 { + log.Error(nil, "Subnet does not have Subnetport to remove", "Subnet", path) + return + } + count.value -= 1 + subnetPortStore.portCountMap[path] = count + log.V(2).Info("Remove Subnetport from Subnet", "Subnet", path, "count", count.value) +} + +func (subnetPortStore *SubnetPortStore) IsEmptySubnet(path string) bool { + subnetPortStore.portCountMapLock.Lock() + defer subnetPortStore.portCountMapLock.Unlock() + count, ok := subnetPortStore.portCountMap[path] + if !ok || count.value < 1 { + return true + } + return false +} + +func (subnetPortStore *SubnetPortStore) DeletePortCount(path string) { + subnetPortStore.portCountMapLock.Lock() + defer subnetPortStore.portCountMapLock.Unlock() + _, ok := subnetPortStore.portCountMap[path] + if ok { + delete(subnetPortStore.portCountMap, path) + } +} + +func (subnetPortStore *SubnetPortStore) InitializeSubnetPortCount() { + subnetPortStore.portCountMapLock.Lock() + defer subnetPortStore.portCountMapLock.Unlock() + for _, obj := range subnetPortStore.List() { + subnetPort, ok := obj.(*model.VpcSubnetPort) + if !ok || subnetPort.ParentPath == nil { + continue + } + count, ok := subnetPortStore.portCountMap[*subnetPort.ParentPath] + if !ok { + count = &CountWithMark{ + value: 0, + markForDelete: false, + } + } + count.value += 1 + subnetPortStore.portCountMap[*subnetPort.ParentPath] = count + } +} diff --git a/pkg/nsx/services/subnetport/subnetport.go b/pkg/nsx/services/subnetport/subnetport.go index 112188191..c5dbfeddd 100644 --- a/pkg/nsx/services/subnetport/subnetport.go +++ b/pkg/nsx/services/subnetport/subnetport.go @@ -31,11 +31,25 @@ var ( MarkedForDelete = true ) +type NotCreatedError struct { + error +} + +func IsNotCreatedError(err error) bool { + _, ok := err.(*NotCreatedError) + return ok +} + type SubnetPortService struct { servicecommon.Service SubnetPortStore *SubnetPortStore } +type CountWithMark struct { + value int + markForDelete bool +} + // InitializeSubnetPort sync NSX resources. func InitializeSubnetPort(service servicecommon.Service) (*SubnetPortService, error) { wg := sync.WaitGroup{} @@ -46,18 +60,21 @@ func InitializeSubnetPort(service servicecommon.Service) (*SubnetPortService, er subnetPortService := &SubnetPortService{Service: service} - subnetPortService.SubnetPortStore = &SubnetPortStore{ResourceStore: servicecommon.ResourceStore{ - Indexer: cache.NewIndexer( - keyFunc, - cache.Indexers{ - servicecommon.TagScopeSubnetPortCRUID: subnetPortIndexByCRUID, - servicecommon.TagScopePodUID: subnetPortIndexByPodUID, - servicecommon.TagScopeVMNamespace: subnetPortIndexNamespace, - servicecommon.TagScopeNamespace: subnetPortIndexPodNamespace, - servicecommon.IndexKeySubnetID: subnetPortIndexBySubnetID, - }), - BindingType: model.VpcSubnetPortBindingType(), - }} + subnetPortService.SubnetPortStore = &SubnetPortStore{ + ResourceStore: servicecommon.ResourceStore{ + Indexer: cache.NewIndexer( + keyFunc, + cache.Indexers{ + servicecommon.TagScopeSubnetPortCRUID: subnetPortIndexByCRUID, + servicecommon.TagScopePodUID: subnetPortIndexByPodUID, + servicecommon.TagScopeVMNamespace: subnetPortIndexNamespace, + servicecommon.TagScopeNamespace: subnetPortIndexPodNamespace, + servicecommon.IndexKeySubnetID: subnetPortIndexBySubnetID, + }), + BindingType: model.VpcSubnetPortBindingType(), + }, + portCountMap: make(map[string]*CountWithMark), + } go subnetPortService.InitializeResourceStore(&wg, fatalErrors, ResourceTypeSubnetPort, nil, subnetPortService.SubnetPortStore) @@ -68,6 +85,8 @@ func InitializeSubnetPort(service servicecommon.Service) (*SubnetPortService, er select { case <-wgDone: + // Load the Subnet Port map after the store initialization + subnetPortService.SubnetPortStore.InitializeSubnetPortCount() break case err := <-fatalErrors: close(fatalErrors) @@ -89,7 +108,7 @@ func (service *SubnetPortService) CreateOrUpdateSubnetPort(obj interface{}, nsxS nsxSubnetPort, err := service.buildSubnetPort(obj, nsxSubnet, contextID, tags) if err != nil { log.Error(err, "failed to build NSX subnet port", "nsxSubnetPort.Id", uid, "*nsxSubnet.Path", *nsxSubnet.Path, "contextID", contextID) - return nil, err + return nil, &NotCreatedError{error: err} } existingSubnetPort := service.SubnetPortStore.GetByKey(*nsxSubnetPort.Id) isChanged := true @@ -102,6 +121,9 @@ func (service *SubnetPortService) CreateOrUpdateSubnetPort(obj interface{}, nsxS } subnetInfo, err := servicecommon.ParseVPCResourcePath(*nsxSubnet.Path) if err != nil { + if existingSubnetPort == nil { + err = &NotCreatedError{error: err} + } return nil, err } if !isChanged { @@ -114,10 +136,16 @@ func (service *SubnetPortService) CreateOrUpdateSubnetPort(obj interface{}, nsxS err = nsxutil.TransNSXApiError(err) if err != nil { log.Error(err, "failed to create or update subnet port", "nsxSubnetPort.Id", *nsxSubnetPort.Id, "nsxSubnetPath", *nsxSubnet.Path) + if existingSubnetPort == nil { + err = &NotCreatedError{error: err} + } return nil, err } err = service.SubnetPortStore.Apply(nsxSubnetPort) if err != nil { + if existingSubnetPort == nil { + err = &NotCreatedError{error: err} + } return nil, err } if existingSubnetPort != nil { @@ -230,6 +258,9 @@ func (service *SubnetPortService) DeleteSubnetPort(nsxSubnetPort *model.VpcSubne return err } log.Info("successfully deleted nsxSubnetPort", "nsxSubnetPortID", *nsxSubnetPort.Id) + if nsxSubnetPort.ParentPath != nil { + service.SubnetPortStore.RemovePortFromSubnet(*nsxSubnetPort.ParentPath) + } return nil } @@ -373,3 +404,19 @@ func (service *SubnetPortService) Cleanup(ctx context.Context) error { } return nil } + +func (service *SubnetPortService) AddPortToSubnet(path string, size int) bool { + return service.SubnetPortStore.AddPortToSubnet(path, size) +} + +func (service *SubnetPortService) RemovePortFromSubnet(path string) { + service.SubnetPortStore.RemovePortFromSubnet(path) +} + +func (service *SubnetPortService) IsEmptySubnet(path string) bool { + return service.SubnetPortStore.IsEmptySubnet(path) +} + +func (service *SubnetPortService) DeletePortCount(path string) { + service.SubnetPortStore.DeletePortCount(path) +}