Skip to content

Commit

Permalink
Merge pull request #781 from akutz/feature/async-create-vm
Browse files Browse the repository at this point in the history
✨ Support async create VM
  • Loading branch information
akutz authored Nov 15, 2024
2 parents 2fd67f4 + 8f51f41 commit fa8791a
Show file tree
Hide file tree
Showing 18 changed files with 930 additions and 375 deletions.
104 changes: 88 additions & 16 deletions controllers/virtualmachine/virtualmachine/virtualmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,28 +318,48 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
return ctrl.Result{}, err
}

if err := r.ReconcileNormal(vmCtx); err != nil {
if err = r.ReconcileNormal(vmCtx); err != nil && !ignoredCreateErr(err) {
vmCtx.Logger.Error(err, "Failed to reconcile VirtualMachine")
return ctrl.Result{}, err
}

return ctrl.Result{RequeueAfter: requeueDelay(vmCtx)}, nil
// Requeue after N amount of time according to the state of the VM.
return ctrl.Result{RequeueAfter: requeueDelay(vmCtx, err)}, nil
}

// Determine if we should request a non-zero requeue delay in order to trigger a non-rate limited reconcile
// at some point in the future. Use this delay-based reconcile to trigger a specific reconcile to discovery the VM IP
// address rather than relying on the resync period to do.
// Determine if we should request a non-zero requeue delay in order to trigger a
// non-rate limited reconcile at some point in the future.
//
// TODO: It would be much preferable to determine that a non-error resync is required at the source of the determination that
// TODO: the VM IP isn't available rather than up here in the reconcile loop. However, in the interest of time, we are making
// TODO: this determination here and will have to refactor at some later date.
func requeueDelay(ctx *pkgctx.VirtualMachineContext) time.Duration {
// If the VM is in Creating phase, the reconciler has run out of threads to Create VMs on the provider. Do not queue
// immediately to avoid exponential backoff.
if !conditions.IsTrue(ctx.VM, vmopv1.VirtualMachineConditionCreated) {
// When async signal is disabled, this is used to trigger a specific reconcile
// to discovery the VM IP address rather than relying on the resync period to
// do.
//
// TODO
// It would be much preferable to determine that a non-error resync is required
// at the source of the determination that the VM IP is not available rather
// than up here in the reconcile loop. However, in the interest of time, we are
// making this determination here and will have to refactor at some later date.
func requeueDelay(
ctx *pkgctx.VirtualMachineContext,
err error) time.Duration {

// If there were too many concurrent create operations or if the VM is in
// Creating phase, the reconciler has run out of threads or goroutines to
// Create VMs on the provider. Do not queue immediately to avoid exponential
// backoff.
if ignoredCreateErr(err) ||
!conditions.IsTrue(ctx.VM, vmopv1.VirtualMachineConditionCreated) {

return pkgcfg.FromContext(ctx).CreateVMRequeueDelay
}

// Do not requeue for the IP address if async signal is enabled.
if pkgcfg.FromContext(ctx).Features.WorkloadDomainIsolation &&
!pkgcfg.FromContext(ctx).AsyncSignalDisabled {

return 0
}

if ctx.VM.Status.PowerState == vmopv1.VirtualMachinePowerStateOn {
networkSpec := ctx.VM.Spec.Network
if networkSpec != nil && !networkSpec.Disabled {
Expand Down Expand Up @@ -441,14 +461,54 @@ func (r *Reconciler) ReconcileNormal(ctx *pkgctx.VirtualMachineContext) (reterr
// Upgrade schema fields where needed
upgradeSchema(ctx)

err := r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)
var (
err error
chanErr <-chan error
)

if pkgcfg.FromContext(ctx).Features.WorkloadDomainIsolation &&
!pkgcfg.FromContext(ctx).AsyncSignalDisabled &&
!pkgcfg.FromContext(ctx).AsyncCreateDisabled {
//
// Non-blocking create
//
chanErr, err = r.VMProvider.CreateOrUpdateVirtualMachineAsync(ctx, ctx.VM)
} else {
//
// Blocking create
//
err = r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)
}

switch {
case ctxop.IsCreate(ctx):
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
case ctxop.IsCreate(ctx) && !ignoredCreateErr(err):

if chanErr == nil {
//
// Blocking create
//
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
} else {
//
// Non-blocking create
//
if err != nil {
// Failed before goroutine.
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
} else {
// Emit event once goroutine is complete.
go func(obj client.Object) {
err := <-chanErr
r.Recorder.EmitEvent(obj, "Create", err, false)
}(ctx.VM.DeepCopy())
}
}
case ctxop.IsUpdate(ctx):

r.Recorder.EmitEvent(ctx.VM, "Update", err, false)
case err != nil:

case err != nil && !ignoredCreateErr(err):

// Catch all event for neither create nor update op.
r.Recorder.EmitEvent(ctx.VM, "ReconcileNormal", err, true)
}
Expand Down Expand Up @@ -481,3 +541,15 @@ func getIsDefaultVMClassController(ctx context.Context) bool {
}
return false
}

// ignoredCreateErr is written this way in order to illustrate coverage more
// accurately.
func ignoredCreateErr(err error) bool {
if err == providers.ErrDuplicateCreate {
return true
}
if err == providers.ErrTooManyCreates {
return true
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
pkgcfg "github.com/vmware-tanzu/vm-operator/pkg/config"
"github.com/vmware-tanzu/vm-operator/pkg/constants/testlabels"
pkgctx "github.com/vmware-tanzu/vm-operator/pkg/context"
"github.com/vmware-tanzu/vm-operator/pkg/providers"
providerfake "github.com/vmware-tanzu/vm-operator/pkg/providers/fake"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
"github.com/vmware-tanzu/vm-operator/pkg/util/ptr"
Expand Down Expand Up @@ -105,11 +106,13 @@ func intgTestsReconcile() {
dummyInstanceUUID := uuid.NewString()

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return nil
},
)
})

AfterEach(func() {
Expand All @@ -128,21 +131,63 @@ func intgTestsReconcile() {
})

It("Reconciles after VirtualMachine creation", func() {
var createAttempts int32

By("Exceed number of allowed concurrent create operations", func() {
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&createAttempts, 1)
return providers.ErrTooManyCreates
},
)
})

vm.Spec.Network = &vmopv1.VirtualMachineNetworkSpec{}
Expect(ctx.Client.Create(ctx, vm)).To(Succeed())

By("VirtualMachine should have finalizer added", func() {
waitForVirtualMachineFinalizer(ctx, vmKey)
})

Eventually(func(g Gomega) {
g.Expect(atomic.LoadInt32(&createAttempts)).To(BeNumerically(">=", int32(3)))
g.Expect(conditions.IsTrue(vm, vmopv1.VirtualMachineConditionCreated)).To(BeFalse())
}, "5s").Should(
Succeed(),
"waiting for reconcile to be requeued at least three times")

atomic.StoreInt32(&createAttempts, 0)

By("Causing duplicate creates", func() {
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&createAttempts, 1)
return providers.ErrDuplicateCreate
},
)
})

Eventually(func(g Gomega) {
g.Expect(atomic.LoadInt32(&createAttempts)).To(BeNumerically(">=", int32(3)))
g.Expect(conditions.IsTrue(vm, vmopv1.VirtualMachineConditionCreated)).To(BeFalse())
}, "5s").Should(
Succeed(),
"waiting for reconcile to be requeued at least three times")

By("Set InstanceUUID in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Just using InstanceUUID here for a field to update.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Just using InstanceUUID here for a field to update.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil
},
)
})

By("VirtualMachine should have InstanceUUID set", func() {
Expand All @@ -155,13 +200,15 @@ func intgTestsReconcile() {
})

By("Set Created condition in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.PowerState = vmopv1.VirtualMachinePowerStateOn
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.PowerState = vmopv1.VirtualMachinePowerStateOn
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
},
)
})

By("VirtualMachine should have Created condition set", func() {
Expand All @@ -174,14 +221,16 @@ func intgTestsReconcile() {
})

By("Set IP address in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.Network = &vmopv1.VirtualMachineNetworkStatus{
PrimaryIP4: dummyIPAddress,
}
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.Network = &vmopv1.VirtualMachineNetworkStatus{
PrimaryIP4: dummyIPAddress,
}
return nil
},
)
})

By("VirtualMachine should have IP address set", func() {
Expand Down Expand Up @@ -213,13 +262,15 @@ func intgTestsReconcile() {
biosUUID := uuid.NewString()

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
vm.Status.BiosUUID = biosUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
vm.Status.BiosUUID = biosUUID
return nil
},
)
})

// NOTE: mutating webhook sets the default spec.instanceUUID, but is not run in this test -
Expand Down Expand Up @@ -261,13 +312,15 @@ func intgTestsReconcile() {
})

It("Reconciles after VirtualMachineClass change", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Set this so requeueDelay() returns 0.
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Set this so requeueDelay() returns 0.
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
},
)

Expect(ctx.Client.Create(ctx, vm)).To(Succeed())
// Wait for initial reconcile.
Expand All @@ -283,12 +336,14 @@ func intgTestsReconcile() {

instanceUUID := uuid.NewString()

intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
return nil
},
)

vmClass := builder.DummyVirtualMachineClass(vm.Spec.ClassName)
vmClass.Namespace = vm.Namespace
Expand Down Expand Up @@ -390,6 +445,8 @@ var _ = Describe(
ctx = pkgcfg.UpdateContext(
ctx,
func(config *pkgcfg.Config) {
config.AsyncSignalDisabled = false
config.AsyncCreateDisabled = false
config.Features.WorkloadDomainIsolation = true
},
)
Expand All @@ -400,10 +457,14 @@ var _ = Describe(
provider.VSphereClientFn = func(ctx context.Context) (*vsclient.Client, error) {
return vsclient.NewClient(ctx, vcSimCtx.VCClientConfig)
}
provider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, obj *vmopv1.VirtualMachine) error {
atomic.AddInt32(&numCreateOrUpdateCalls, 1)
return nil
}
providerfake.SetCreateOrUpdateFunction(
ctx,
provider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&numCreateOrUpdateCalls, 1)
return nil
},
)

vcSimCtx = builder.NewIntegrationTestContextForVCSim(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var suite = builder.NewTestSuiteForControllerWithContext(
config.SyncPeriod = 60 * time.Minute
config.CreateVMRequeueDelay = 1 * time.Second
config.PoweredOnVMHasIPRequeueDelay = 1 * time.Second
config.AsyncSignalDisabled = true
config.AsyncCreateDisabled = true
})),
virtualmachine.AddToManager,
func(ctx *pkgctx.ControllerManagerContext, _ ctrlmgr.Manager) error {
Expand Down
Loading

0 comments on commit fa8791a

Please sign in to comment.