Skip to content

Commit

Permalink
Update endpoints for NEGs from non-default subnet.
Browse files Browse the repository at this point in the history
  • Loading branch information
sawsa307 committed Nov 7, 2024
1 parent 2214c3c commit 9f71175
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 18 deletions.
42 changes: 25 additions & 17 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,19 +559,15 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if operation == attachOp {
// TODO(sawsa307): Pass in subnet to help distinguish which NEGs needs
// update(in default/non-default subnets).
go s.attachNetworkEndpoints(zone, batch)
go s.attachNetworkEndpoints(endpointGroupInfo, batch)
}
if operation == detachOp {
if zone == migrationZone.Zone && subnet == migrationZone.Subnet {
// Prevent any further migration-detachments from starting while one
// is already in progress.
s.dsMigrator.Pause()
}
// TODO(sawsa307): Pass in subnet to help distinguish which NEGs needs
// update(in default/non-default subnets).
go s.detachNetworkEndpoints(zone, batch, zone == migrationZone.Zone && subnet == migrationZone.Subnet)
go s.detachNetworkEndpoints(endpointGroupInfo, batch, zone == migrationZone.Zone && subnet == migrationZone.Subnet)
}
}
return nil
Expand All @@ -588,18 +584,18 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

// attachNetworkEndpoints runs operation for attaching network endpoints.
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
err := s.operationInternal(attachOp, zone, networkEndpointMap, s.logger)
func (s *transactionSyncer) attachNetworkEndpoints(epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", epGroupInfo.Zone, "subnet", epGroupInfo.Subnet)
err := s.operationInternal(attachOp, epGroupInfo, networkEndpointMap, s.logger)

// WARNING: commitTransaction must be called at last for analyzing the operation result
s.commitTransaction(err, networkEndpointMap)
}

// detachNetworkEndpoints runs operation for detaching network endpoints.
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
err := s.operationInternal(detachOp, zone, networkEndpointMap, s.logger)
func (s *transactionSyncer) detachNetworkEndpoints(epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", epGroupInfo.Zone, "subnet", epGroupInfo.Subnet)
err := s.operationInternal(detachOp, epGroupInfo, networkEndpointMap, s.logger)

if hasMigrationDetachments {
// Unpause the migration since the ongoing migration-detachments have
Expand All @@ -614,26 +610,38 @@ func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointM
// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, logger klog.Logger) error {
func (s *transactionSyncer) operationInternal(operation transactionOp, epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, logger klog.Logger) error {
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
for _, ne := range networkEndpointMap {
networkEndpoints = append(networkEndpoints, ne)
}
zone := epGroupInfo.Zone
negName := s.NegSyncerKey.NegName
if flags.F.EnableMultiSubnetClusterPhase1 {
defaultSubnet, err := utils.KeyName(s.networkInfo.SubnetworkURL)
if err != nil {
s.logger.Error(err, "Errored getting default subnet from NetworkInfo when commiting pods")
return err
}

if epGroupInfo.Subnet != defaultSubnet {
negName = s.namer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, epGroupInfo.Subnet, s.NegSyncerKey.PortTuple.Port)
}
}
if operation == attachOp {
err = s.cloud.AttachNetworkEndpoints(s.NegSyncerKey.NegName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
err = s.cloud.AttachNetworkEndpoints(negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
}
if operation == detachOp {
err = s.cloud.DetachNetworkEndpoints(s.NegSyncerKey.NegName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
err = s.cloud.DetachNetworkEndpoints(negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger)
}

if err == nil {
s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone))
s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), negName, zone))
s.syncMetricsCollector.UpdateSyncerStatusInMetrics(s.NegSyncerKey, nil, s.inErrorState())
} else {
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err))
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), negName, zone, err))
err := checkEndpointBatchErr(err, operation)
syncErr := negtypes.ClassifyError(err)
// If the API call fails for invalid endpoint update request in any goroutine,
Expand Down
Loading

0 comments on commit 9f71175

Please sign in to comment.