Skip to content

Commit

Permalink
Refactor chanErr creation in async create
Browse files Browse the repository at this point in the history
This patch changes the way the error channel is created during the
async create workflow to make this more explicit and reduces the
chances someone does not close the channel.
  • Loading branch information
Bryan Venteicher authored and akutz committed Dec 19, 2024
1 parent 9d40af0 commit e24f48e
Showing 1 changed file with 45 additions and 35 deletions.
80 changes: 45 additions & 35 deletions pkg/providers/vsphere/vmprovider_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,21 @@ func (vs *vSphereVMProvider) CreateOrUpdateVirtualMachine(
ctx context.Context,
vm *vmopv1.VirtualMachine) error {

return vs.createOrUpdateVirtualMachine(ctx, vm, nil)
_, err := vs.createOrUpdateVirtualMachine(ctx, vm, false)
return err
}

func (vs *vSphereVMProvider) CreateOrUpdateVirtualMachineAsync(
ctx context.Context,
vm *vmopv1.VirtualMachine) (<-chan error, error) {

chanErr := make(chan error)
err := vs.createOrUpdateVirtualMachine(ctx, vm, chanErr)
return chanErr, err
return vs.createOrUpdateVirtualMachine(ctx, vm, true)
}

func (vs *vSphereVMProvider) createOrUpdateVirtualMachine(
ctx context.Context,
vm *vmopv1.VirtualMachine,
chanErr chan error) error {
async bool) (chan error, error) {

vmCtx := pkgctx.VirtualMachineContext{
Context: context.WithValue(
Expand All @@ -122,7 +121,7 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine(

client, err := vs.getVcClient(vmCtx)
if err != nil {
return err
return nil, err
}

// Set the VC UUID annotation on the VM before attempting creation or
Expand All @@ -137,93 +136,105 @@ func (vs *vSphereVMProvider) createOrUpdateVirtualMachine(
// Check to see if the VM can be found on the underlying platform.
foundVM, err := vs.getVM(vmCtx, client, false)
if err != nil {
return err
return nil, err
}

if foundVM != nil {
// Mark that this is an update operation.
ctxop.MarkUpdate(vmCtx)

if chanErr != nil {
defer close(chanErr)
}
return vs.updateVirtualMachine(vmCtx, foundVM, client, nil)
return nil, vs.updateVirtualMachine(vmCtx, foundVM, client, nil)
}

// Mark that this is a create operation.
ctxop.MarkCreate(vmCtx)

createArgs, err := vs.getCreateArgs(vmCtx, client)
if err != nil {
return err
}

// Do not allow more than N create threads/goroutines.
//
// - In blocking create mode, this ensures there are reconciler threads
// available to reconcile non-create operations.
//
// - In non-blocking create mode, this ensures the number of goroutines
// spawned to create VMs does not take up too much memory.
allowed, createDeferFn := vs.vmCreateConcurrentAllowed(vmCtx)
allowed, decrementConcurrentCreatesFn := vs.vmCreateConcurrentAllowed(vmCtx)
if !allowed {
return providers.ErrTooManyCreates
return nil, providers.ErrTooManyCreates
}

if chanErr == nil {
// cleanupFn tracks the function(s) that must be invoked upon leaving this
// function during a blocking create or after an async create.
cleanupFn := decrementConcurrentCreatesFn

if !async {
defer cleanupFn()

vmCtx.Logger.V(4).Info("Doing a blocking create")

defer createDeferFn()
createArgs, err := vs.getCreateArgs(vmCtx, client)
if err != nil {
return nil, err
}

newVM, err := vs.createVirtualMachine(vmCtx, client, createArgs)
if err != nil {
return err
return nil, err
}

if newVM != nil {
// If the create actually occurred, fall-through to an update
// post-reconfigure.
return vs.createdVirtualMachineFallthroughUpdate(
return nil, vs.createdVirtualMachineFallthroughUpdate(
vmCtx,
newVM,
client,
createArgs)
}
}

if _, ok := currentlyCreating.LoadOrStore(
vm.NamespacedName(),
struct{}{}); ok {
vmNamespacedName := vm.NamespacedName()

if _, ok := currentlyCreating.LoadOrStore(vmNamespacedName, struct{}{}); ok {
// If the VM is already being created in a goroutine, then there is no
// need to create it again.
//
// However, we need to make sure we decrement the number of concurrent
// creates before returning.
createDeferFn()
return providers.ErrDuplicateCreate
decrementConcurrentCreatesFn()
return nil, providers.ErrDuplicateCreate
}

vmCtx.Logger.V(4).Info("Doing a non-blocking create")

// Update the cleanup function to include indicating a concurrent create is
// no longer occurring.
cleanupFn = func() {
currentlyCreating.Delete(vmNamespacedName)
decrementConcurrentCreatesFn()
}

createArgs, err := vs.getCreateArgs(vmCtx, client)
if err != nil {
return nil, err
}

// Create a copy of the context and replace its VM with a copy to
// ensure modifications in the goroutine below are not impacted or
// impact the operations above us in the call stack.
copyOfCtx := vmCtx
copyOfCtx.VM = vmCtx.VM.DeepCopy()

// Start a goroutine to create the VM in the background.
chanErr := make(chan error)
go vs.createVirtualMachineAsync(
copyOfCtx,
client,
createArgs,
chanErr,
createDeferFn)
cleanupFn)

// Return with no error. The VM will be re-enqueued once the create
// Return with the error channel. The VM will be re-enqueued once the create
// completes with success or failure.
return nil
return chanErr, nil
}

func (vs *vSphereVMProvider) DeleteVirtualMachine(
Expand Down Expand Up @@ -549,12 +560,11 @@ func (vs *vSphereVMProvider) createVirtualMachineAsync(
vcClient *vcclient.Client,
args *VMCreateArgs,
chanErr chan error,
createDeferFn func()) {
cleanupFn func()) {

defer func() {
close(chanErr)
createDeferFn()
currentlyCreating.Delete(ctx.VM.NamespacedName())
cleanupFn()
}()

moRef, vimErr := vmlifecycle.CreateVirtualMachine(
Expand All @@ -565,8 +575,8 @@ func (vs *vSphereVMProvider) createVirtualMachineAsync(
&args.CreateArgs)

if vimErr != nil {
chanErr <- vimErr
ctx.Logger.Error(vimErr, "CreateVirtualMachine failed")
chanErr <- vimErr
}

_, k8sErr := controllerutil.CreateOrPatch(
Expand All @@ -592,8 +602,8 @@ func (vs *vSphereVMProvider) createVirtualMachineAsync(
)

if k8sErr != nil {
chanErr <- k8sErr
ctx.Logger.Error(k8sErr, "Failed to patch VM status after create")
chanErr <- k8sErr
}
}

Expand Down

0 comments on commit e24f48e

Please sign in to comment.