Skip to content

Commit

Permalink
Support async create VM
Browse files Browse the repository at this point in the history
This patch updates the CreateOrUpdateVirtualMachine implementation
to be asynchronous, i.e. non-blocking. This also removes the need
for "create threads," as the moment a create operation is enqueued,
the reconcile thread is returned to the pool.
  • Loading branch information
akutz committed Oct 28, 2024
1 parent ce8d39c commit 67dbe35
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func requeueDelay(ctx *pkgctx.VirtualMachineContext) time.Duration {
return pkgcfg.FromContext(ctx).CreateVMRequeueDelay
}

if ctx.VM.Status.PowerState == vmopv1.VirtualMachinePowerStateOn {
if ctx.VM.Status.PowerState == "" || ctx.VM.Status.PowerState == vmopv1.VirtualMachinePowerStateOn {
networkSpec := ctx.VM.Spec.Network
if networkSpec != nil && !networkSpec.Disabled {
networkStatus := ctx.VM.Status.Network
Expand Down Expand Up @@ -439,11 +439,18 @@ func (r *Reconciler) ReconcileNormal(ctx *pkgctx.VirtualMachineContext) (reterr
// Upgrade schema fields where needed
upgradeSchema(ctx)

err := r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)
chanErr, err := r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)

switch {
case ctxop.IsCreate(ctx):
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
if err != nil {
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
} else if chanErr != nil {
go func(obj client.Object) {
err := <-chanErr
r.Recorder.EmitEvent(obj, "Create", err, false)
}(ctx.VM.DeepCopy())
}
case ctxop.IsUpdate(ctx):
r.Recorder.EmitEvent(ctx.VM, "Update", err, false)
case err != nil:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ func intgTestsReconcile() {

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return nil
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
return chanErr, nil
}
intgFakeVMProvider.Unlock()
})
Expand Down Expand Up @@ -137,10 +139,12 @@ func intgTestsReconcile() {

By("Set InstanceUUID in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
// Just using InstanceUUID here for a field to update.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil
return chanErr, nil
}
intgFakeVMProvider.Unlock()
})
Expand All @@ -156,10 +160,11 @@ func intgTestsReconcile() {

By("Set Created condition in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.PowerState = vmopv1.VirtualMachinePowerStateOn
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
return chanErr, nil
}
intgFakeVMProvider.Unlock()
})
Expand All @@ -175,11 +180,13 @@ func intgTestsReconcile() {

By("Set IP address in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
vm.Status.Network = &vmopv1.VirtualMachineNetworkStatus{
PrimaryIP4: dummyIPAddress,
}
return nil
return chanErr, nil
}
intgFakeVMProvider.Unlock()
})
Expand Down Expand Up @@ -214,10 +221,13 @@ func intgTestsReconcile() {

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
vm.Status.InstanceUUID = instanceUUID
vm.Status.BiosUUID = biosUUID
return nil

return chanErr, nil
}
intgFakeVMProvider.Unlock()
})
Expand Down Expand Up @@ -262,10 +272,12 @@ func intgTestsReconcile() {

It("Reconciles after VirtualMachineClass change", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
// Set this so requeueDelay() returns 0.
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
return chanErr, nil
}
intgFakeVMProvider.Unlock()

Expand All @@ -284,9 +296,11 @@ func intgTestsReconcile() {
instanceUUID := uuid.NewString()

intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
vm.Status.InstanceUUID = instanceUUID
return nil
return chanErr, nil
}
intgFakeVMProvider.Unlock()

Expand Down Expand Up @@ -400,9 +414,11 @@ var _ = Describe(
provider.VSphereClientFn = func(ctx context.Context) (*vsclient.Client, error) {
return vsclient.NewClient(ctx, vcSimCtx.VCClientConfig)
}
provider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, obj *vmopv1.VirtualMachine) error {
provider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
atomic.AddInt32(&numCreateOrUpdateCalls, 1)
return nil
return chanErr, nil
}

vcSimCtx = builder.NewIntegrationTestContextForVCSim(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package virtualmachine_test
import (
"context"
"errors"
"fmt"
"strings"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -160,8 +159,10 @@ func unitTestsReconcile() {
Context("ProberManager", func() {

It("Should call add to Prober Manager if ReconcileNormal fails", func() {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return errors.New(providerError)
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
return chanErr, errors.New(providerError)
}

err := reconciler.ReconcileNormal(vmCtx)
Expand All @@ -180,48 +181,58 @@ func unitTestsReconcile() {
})

It("Should emit a CreateSuccess event if ReconcileNormal causes a successful VM creation", func() {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
ctxop.MarkCreate(ctx)
return nil
return chanErr, nil
}

Expect(reconciler.ReconcileNormal(vmCtx)).Should(Succeed())
expectEvents(ctx, "CreateSuccess")
})

It("Should emit CreateFailure event if ReconcileNormal causes a failed VM create", func() {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
ctxop.MarkCreate(ctx)
return fmt.Errorf("fake")
return chanErr, errors.New("fake")
}

Expect(reconciler.ReconcileNormal(vmCtx)).ShouldNot(Succeed())
expectEvents(ctx, "CreateFailure")
})

It("Should emit UpdateSuccess event if ReconcileNormal causes a successful VM update", func() {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
ctxop.MarkUpdate(ctx)
return nil
return chanErr, nil
}

Expect(reconciler.ReconcileNormal(vmCtx)).Should(Succeed())
expectEvents(ctx, "UpdateSuccess")
})

It("Should emit UpdateFailure event if ReconcileNormal causes a failed VM update", func() {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
ctxop.MarkUpdate(ctx)
return fmt.Errorf("fake")
return chanErr, errors.New("fake")
}

Expect(reconciler.ReconcileNormal(vmCtx)).ShouldNot(Succeed())
expectEvents(ctx, "UpdateFailure")
})

It("Should emit ReconcileNormalFailure if ReconcileNormal fails for neither create or update op", func() {
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return fmt.Errorf("fake")
fakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)
return chanErr, errors.New("fake")
}

Expect(reconciler.ReconcileNormal(vmCtx)).ShouldNot(Succeed())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,16 @@ func intgTestsReconcile() {

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
chanErr := make(chan error)
close(chanErr)

// Used below just to check for something in the Status is updated.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil

return chanErr, nil
}

intgFakeVMProvider.Unlock()
})

Expand Down
6 changes: 3 additions & 3 deletions pkg/providers/fake/fake_vm_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// expected to evolve as more tests get added in the future.

type funcs struct {
CreateOrUpdateVirtualMachineFn func(ctx context.Context, vm *vmopv1.VirtualMachine) error
CreateOrUpdateVirtualMachineFn func(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error)
DeleteVirtualMachineFn func(ctx context.Context, vm *vmopv1.VirtualMachine) error
PublishVirtualMachineFn func(ctx context.Context, vm *vmopv1.VirtualMachine,
vmPub *vmopv1.VirtualMachinePublishRequest, cl *imgregv1a1.ContentLibrary, actID string) (string, error)
Expand Down Expand Up @@ -82,14 +82,14 @@ func (s *VMProvider) Reset() {
s.isPublishVMCalled = false
}

func (s *VMProvider) CreateOrUpdateVirtualMachine(ctx context.Context, vm *vmopv1.VirtualMachine) error {
func (s *VMProvider) CreateOrUpdateVirtualMachine(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error) {
s.Lock()
defer s.Unlock()
if s.CreateOrUpdateVirtualMachineFn != nil {
return s.CreateOrUpdateVirtualMachineFn(ctx, vm)
}
s.addToVMMap(vm)
return nil
return nil, nil
}

func (s *VMProvider) DeleteVirtualMachine(ctx context.Context, vm *vmopv1.VirtualMachine) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/vm_provider_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// VirtualMachineProviderInterface is a pluggable interface for VM Providers.
type VirtualMachineProviderInterface interface {
CreateOrUpdateVirtualMachine(ctx context.Context, vm *vmopv1.VirtualMachine) error
CreateOrUpdateVirtualMachine(ctx context.Context, vm *vmopv1.VirtualMachine) (<-chan error, error)
DeleteVirtualMachine(ctx context.Context, vm *vmopv1.VirtualMachine) error
PublishVirtualMachine(ctx context.Context, vm *vmopv1.VirtualMachine,
vmPub *vmopv1.VirtualMachinePublishRequest, cl *imgregv1a1.ContentLibrary, actID string) (string, error)
Expand Down
Loading

0 comments on commit 67dbe35

Please sign in to comment.