diff --git a/pkg/providers/vsphere/vmprovider_vm.go b/pkg/providers/vsphere/vmprovider_vm.go index dacf0e960..a161add02 100644 --- a/pkg/providers/vsphere/vmprovider_vm.go +++ b/pkg/providers/vsphere/vmprovider_vm.go @@ -93,22 +93,21 @@ func (vs *vSphereVMProvider) CreateOrUpdateVirtualMachine( ctx context.Context, vm *vmopv1.VirtualMachine) error { - return vs.createOrUpdateVirtualMachine(ctx, vm, nil) + _, err := vs.createOrUpdateVirtualMachine(ctx, vm, false) + return err } func (vs *vSphereVMProvider) CreateOrUpdateVirtualMachineAsync( ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) { - chanErr := make(chan error) - err := vs.createOrUpdateVirtualMachine(ctx, vm, chanErr) - return chanErr, err + return vs.createOrUpdateVirtualMachine(ctx, vm, true) } func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( ctx context.Context, vm *vmopv1.VirtualMachine, - chanErr chan error) error { + async bool) (chan error, error) { vmCtx := pkgctx.VirtualMachineContext{ Context: context.WithValue( @@ -122,7 +121,7 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( client, err := vs.getVcClient(vmCtx) if err != nil { - return err + return nil, err } // Set the VC UUID annotation on the VM before attempting creation or @@ -137,27 +136,19 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( // Check to see if the VM can be found on the underlying platform. foundVM, err := vs.getVM(vmCtx, client, false) if err != nil { - return err + return nil, err } if foundVM != nil { // Mark that this is an update operation. ctxop.MarkUpdate(vmCtx) - if chanErr != nil { - defer close(chanErr) - } - return vs.updateVirtualMachine(vmCtx, foundVM, client, nil) + return nil, vs.updateVirtualMachine(vmCtx, foundVM, client, nil) } // Mark that this is a create operation. ctxop.MarkCreate(vmCtx) - createArgs, err := vs.getCreateArgs(vmCtx, client) - if err != nil { - return err - } - // Do not allow more than N create threads/goroutines. // // - In blocking create mode, this ensures there are reconciler threads @@ -165,26 +156,34 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( // // - In non-blocking create mode, this ensures the number of goroutines // spawned to create VMs does not take up too much memory. - allowed, createDeferFn := vs.vmCreateConcurrentAllowed(vmCtx) + allowed, decrementConcurrentCreatesFn := vs.vmCreateConcurrentAllowed(vmCtx) if !allowed { - return providers.ErrTooManyCreates + return nil, providers.ErrTooManyCreates } - if chanErr == nil { + // cleanupFn tracks the function(s) that must be invoked upon leaving this + // function during a blocking create or after an async create. + cleanupFn := decrementConcurrentCreatesFn + + if !async { + defer cleanupFn() vmCtx.Logger.V(4).Info("Doing a blocking create") - defer createDeferFn() + createArgs, err := vs.getCreateArgs(vmCtx, client) + if err != nil { + return nil, err + } newVM, err := vs.createVirtualMachine(vmCtx, client, createArgs) if err != nil { - return err + return nil, err } if newVM != nil { // If the create actually occurred, fall-through to an update // post-reconfigure. - return vs.createdVirtualMachineFallthroughUpdate( + return nil, vs.createdVirtualMachineFallthroughUpdate( vmCtx, newVM, client, @@ -201,12 +200,24 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( // // However, we need to make sure we decrement the number of concurrent // creates before returning. - createDeferFn() - return providers.ErrDuplicateCreate + decrementConcurrentCreatesFn() + return nil, providers.ErrDuplicateCreate } vmCtx.Logger.V(4).Info("Doing a non-blocking create") + // Update the cleanup function to include indicating a concurrent create is + // no longer occurring. + cleanupFn = func() { + currentlyCreating.Delete(vmCtx.VM.NamespacedName()) + decrementConcurrentCreatesFn() + } + + createArgs, err := vs.getCreateArgs(vmCtx, client) + if err != nil { + return nil, err + } + // Create a copy of the context and replace its VM with a copy to // ensure modifications in the goroutine below are not impacted or // impact the operations above us in the call stack. @@ -214,16 +225,17 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine( copyOfCtx.VM = vmCtx.VM.DeepCopy() // Start a goroutine to create the VM in the background. + chanErr := make(chan error) go vs.createVirtualMachineAsync( copyOfCtx, client, createArgs, chanErr, - createDeferFn) + cleanupFn) - // Return with no error. The VM will be re-enqueued once the create + // Return with the error channel. The VM will be re-enqueued once the create // completes with success or failure. - return nil + return chanErr, nil } func (vs *vSphereVMProvider) DeleteVirtualMachine( @@ -549,12 +561,11 @@ func (vs *vSphereVMProvider) createVirtualMachineAsync( vcClient *vcclient.Client, args *VMCreateArgs, chanErr chan error, - createDeferFn func()) { + cleanupFn func()) { defer func() { close(chanErr) - createDeferFn() - currentlyCreating.Delete(ctx.VM.NamespacedName()) + cleanupFn() }() moRef, vimErr := vmlifecycle.CreateVirtualMachine( @@ -565,8 +576,8 @@ func (vs *vSphereVMProvider) createVirtualMachineAsync( &args.CreateArgs) if vimErr != nil { - chanErr <- vimErr ctx.Logger.Error(vimErr, "CreateVirtualMachine failed") + chanErr <- vimErr } _, k8sErr := controllerutil.CreateOrPatch( @@ -592,8 +603,8 @@ func (vs *vSphereVMProvider) createVirtualMachineAsync( ) if k8sErr != nil { - chanErr <- k8sErr ctx.Logger.Error(k8sErr, "Failed to patch VM status after create") + chanErr <- k8sErr } }