Skip to content

Commit

Permalink
Support async create VM
Browse files Browse the repository at this point in the history
This patch introduces an async variant on the provider's
`CreateOrUpdateVirtualMachine` function. This new function,
`CreateOrUpdateVirtualMachineAsync`, returns `<-chan error, error`,
for when a non-blocking create is used.

Please note, support for async create is only enabled when the
async signal feature is enabled. This is because the new async
create workflow no longer:

- Falls into a post-create reconfigure, instead allowing async
  signal to enqueue a new reconcile when the create has completed.

- Requeues the VM after N time based on the VM's state, ex. when
  the VM is powered on and does not yet have an IP address. This
  also now relies on async signal when the feature is enabled.

Finally, while a non-blocking create does mean the reconciler
threads are no longer consumed by create operations, it does not
mean VM Op will allow unbounded, concurrent creates. Because each
non-blocking create operation consumes a goroutine, the number of
concurrent create operations is still limited. The limit is the same
as the number of threads previously allowed to do create operations.
The difference is, with async create disabled, if 16 threads are
doing creates, that is 16 threads that cannot do anything else. With
async create enabled, if 16 goroutines are doing create, there are
still 16 reconciler threads available to do other things.
  • Loading branch information
akutz committed Oct 29, 2024
1 parent ce8d39c commit c8e1590
Show file tree
Hide file tree
Showing 11 changed files with 798 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
return ctrl.Result{}, err
}

// Do not requeue if async signal is enabled.
if pkgcfg.FromContext(ctx).Features.WorkloadDomainIsolation &&
!pkgcfg.FromContext(ctx).AsyncSignalDisabled {

return ctrl.Result{}, nil
}

// Requeue after N amount of time according to the state of the VM.
return ctrl.Result{RequeueAfter: requeueDelay(vmCtx)}, nil
}

Expand All @@ -338,7 +346,7 @@ func requeueDelay(ctx *pkgctx.VirtualMachineContext) time.Duration {
return pkgcfg.FromContext(ctx).CreateVMRequeueDelay
}

if ctx.VM.Status.PowerState == vmopv1.VirtualMachinePowerStateOn {
if ctx.VM.Status.PowerState == "" || ctx.VM.Status.PowerState == vmopv1.VirtualMachinePowerStateOn {
networkSpec := ctx.VM.Spec.Network
if networkSpec != nil && !networkSpec.Disabled {
networkStatus := ctx.VM.Status.Network
Expand Down Expand Up @@ -439,11 +447,46 @@ func (r *Reconciler) ReconcileNormal(ctx *pkgctx.VirtualMachineContext) (reterr
// Upgrade schema fields where needed
upgradeSchema(ctx)

err := r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)
var (
err error
chanErr <-chan error
)

if pkgcfg.FromContext(ctx).Features.WorkloadDomainIsolation &&
!pkgcfg.FromContext(ctx).AsyncSignalDisabled {
//
// Non-blocking create
//
chanErr, err = r.VMProvider.CreateOrUpdateVirtualMachineAsync(ctx, ctx.VM)
} else {
//
// Blocking create
//
err = r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)
}

switch {
case ctxop.IsCreate(ctx):
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
if chanErr == nil {
//
// Blocking create
//
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
} else {
//
// Non-blocking create
//
if err != nil {
// Failed before goroutine.
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
} else {
// Emit event once goroutine is complete.
go func(obj client.Object) {
err := <-chanErr
r.Recorder.EmitEvent(obj, "Create", err, false)
}(ctx.VM.DeepCopy())
}
}
case ctxop.IsUpdate(ctx):
r.Recorder.EmitEvent(ctx.VM, "Update", err, false)
case err != nil:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,13 @@ func intgTestsReconcile() {
dummyInstanceUUID := uuid.NewString()

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return nil
},
)
})

AfterEach(func() {
Expand All @@ -136,13 +138,15 @@ func intgTestsReconcile() {
})

By("Set InstanceUUID in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Just using InstanceUUID here for a field to update.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Just using InstanceUUID here for a field to update.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil
},
)
})

By("VirtualMachine should have InstanceUUID set", func() {
Expand All @@ -155,13 +159,14 @@ func intgTestsReconcile() {
})

By("Set Created condition in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.PowerState = vmopv1.VirtualMachinePowerStateOn
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
},
)
})

By("VirtualMachine should have Created condition set", func() {
Expand All @@ -174,14 +179,16 @@ func intgTestsReconcile() {
})

By("Set IP address in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.Network = &vmopv1.VirtualMachineNetworkStatus{
PrimaryIP4: dummyIPAddress,
}
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.Network = &vmopv1.VirtualMachineNetworkStatus{
PrimaryIP4: dummyIPAddress,
}
return nil
},
)
})

By("VirtualMachine should have IP address set", func() {
Expand Down Expand Up @@ -213,13 +220,15 @@ func intgTestsReconcile() {
biosUUID := uuid.NewString()

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
vm.Status.BiosUUID = biosUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
vm.Status.BiosUUID = biosUUID
return nil
},
)
})

// NOTE: mutating webhook sets the default spec.instanceUUID, but is not run in this test -
Expand Down Expand Up @@ -261,13 +270,15 @@ func intgTestsReconcile() {
})

It("Reconciles after VirtualMachineClass change", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Set this so requeueDelay() returns 0.
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Set this so requeueDelay() returns 0.
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
},
)

Expect(ctx.Client.Create(ctx, vm)).To(Succeed())
// Wait for initial reconcile.
Expand All @@ -283,12 +294,14 @@ func intgTestsReconcile() {

instanceUUID := uuid.NewString()

intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
return nil
},
)

vmClass := builder.DummyVirtualMachineClass(vm.Spec.ClassName)
vmClass.Namespace = vm.Namespace
Expand Down Expand Up @@ -390,6 +403,7 @@ var _ = Describe(
ctx = pkgcfg.UpdateContext(
ctx,
func(config *pkgcfg.Config) {
config.AsyncSignalDisabled = false
config.Features.WorkloadDomainIsolation = true
},
)
Expand All @@ -400,10 +414,14 @@ var _ = Describe(
provider.VSphereClientFn = func(ctx context.Context) (*vsclient.Client, error) {
return vsclient.NewClient(ctx, vcSimCtx.VCClientConfig)
}
provider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, obj *vmopv1.VirtualMachine) error {
atomic.AddInt32(&numCreateOrUpdateCalls, 1)
return nil
}
providerfake.SetCreateOrUpdateFunction(
ctx,
provider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&numCreateOrUpdateCalls, 1)
return nil
},
)

vcSimCtx = builder.NewIntegrationTestContextForVCSim(
ctx,
Expand Down
Loading

0 comments on commit c8e1590

Please sign in to comment.