Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure controller and tests shut down cleanly #215

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions controllers/leveltriggered/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type caches struct {
runner *runner
}

func newCaches(events chan event.GenericEvent, targetScheme *runtime.Scheme) *caches {
func newCaches(targetScheme *runtime.Scheme) *caches {
events := make(chan event.GenericEvent)
return &caches{
targetScheme: targetScheme,
events: events,
Expand All @@ -53,12 +54,16 @@ func newCaches(events chan event.GenericEvent, targetScheme *runtime.Scheme) *ca
}
}

func (c *caches) appEvents() <-chan event.GenericEvent {
return c.events
}

func (c *caches) setupWithManager(mgr ctrl.Manager) error {
c.localClusterConfig = mgr.GetConfig()
c.baseLogger = mgr.GetLogger().WithValues("component", "target-cache")
c.reader = mgr.GetClient() // this specifically gets the client that has the indexing installed below; i.e., these are coupled.

c.runner = newRunner()
c.runner = newRunner(mgr.GetLogger().WithValues("component", "cache-runner"))
if err := mgr.Add(c.runner); err != nil {
return err
}
Expand Down Expand Up @@ -188,7 +193,7 @@ func (c *caches) watchTargetAndGetReader(ctx context.Context, clusterObject *clu
return nil, false, err
}

cancel := c.runner.run(func(ctx context.Context) {
cancel := c.runner.run("cache-"+cacheKey.String(), func(ctx context.Context) {
if err := ca.Start(ctx); err != nil {
logger.Error(err, "cache exited with error")
}
Expand Down Expand Up @@ -368,6 +373,7 @@ func (gc *gc) loop() {
for {
item, shutdown := gc.queue.Get()
if shutdown {
gc.log.Info("exiting cache GC loop")
return
}
key, ok := item.(clusterAndGVK)
Expand Down
10 changes: 2 additions & 8 deletions controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand All @@ -34,13 +33,9 @@ type PipelineReconciler struct {
caches *caches
recorder record.EventRecorder
stratReg strategy.StrategyRegistry

appEvents chan event.GenericEvent
}

func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string, eventRecorder record.EventRecorder, stratReg strategy.StrategyRegistry) *PipelineReconciler {
appEvents := make(chan event.GenericEvent)

// this is empty because we're going to use unstructured.Unstructured objects to support arbitrary types.
// If something changed and we wanted typed objects, this scheme would need to have those registered.
targetScheme := runtime.NewScheme()
Expand All @@ -51,8 +46,7 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName st
recorder: eventRecorder,
ControllerName: controllerName,
stratReg: stratReg,
caches: newCaches(appEvents, targetScheme),
appEvents: appEvents,
caches: newCaches(targetScheme),
}
return pc
}
Expand Down Expand Up @@ -444,7 +438,7 @@ func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
&clusterctrlv1alpha1.GitopsCluster{},
handler.EnqueueRequestsFromMapFunc(r.requestsForCluster(gitopsClusterIndexKey)),
).
WatchesRawSource(&source.Channel{Source: r.appEvents}, &handler.EnqueueRequestForObject{}).
WatchesRawSource(&source.Channel{Source: r.caches.appEvents()}, &handler.EnqueueRequestForObject{}).
Complete(r)
}

Expand Down
1 change: 1 addition & 0 deletions controllers/leveltriggered/controller_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestRemoteTargets(t *testing.T) {
if err != nil {
t.Error("starting leaf test env failed", err)
}
envsToStop = append(envsToStop, leafEnv)

user, err := leafEnv.ControlPlane.AddUser(envtest.User{
Name: "leaf-admin",
Expand Down
53 changes: 39 additions & 14 deletions controllers/leveltriggered/runner.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,60 @@
package leveltriggered

import "context"
import (
"context"
"github.com/go-logr/logr"
"sync"
)

// This is a dead simple way to run things using a manager's context as a base, so that they will
// get shut down when the manager does. It must be constructed with `newRunner`, and added to a manager:
// ctrl.Manager makes sure everything that is `Add`ed is started with a context, and cancels the context
// to signal shutdown. But: all the runnables added share the same context, so they all get shut down at
// the same time.
//
// r := newRunner()
// `runner` is a dead simple way to run things with their own context, using a manager's context as a
// base, so that they will get shut down when the manager does _and_ you can shut them down individually.
// It must be constructed with `newRunner`, and added to a manager:
//
// r := newRunner(logger)
// mgr.Add(r)
//
// then you can use it to run funcs:
//
// cancel := r.run(func(context.Context))
// cancel := r.run(string, func(context.Context))
//
// The func will be run with its own context derived from the root context supplied by the manager,
// The func will be run with its own context, derived from the root context supplied by the manager,
// with the cancel func returned to the caller as shown. This way you can cancel the context yourself,
// or let it be canceled when the manager shuts down.
//
// It'll deadlock if you call `run` before adding it to a manager (or otherwise calling `Start`).

type runWithContext struct {
ctx context.Context
run func(context.Context)
name string
ctx context.Context
do func(context.Context)
}

type runner struct {
log logr.Logger
rootContext context.Context
tostart chan runWithContext
ready chan struct{}
}

func newRunner() *runner {
func newRunner(log logr.Logger) *runner {
return &runner{
log: log,
tostart: make(chan runWithContext),
ready: make(chan struct{}),
}
}

func (r *runner) run(fn func(ctx context.Context)) context.CancelFunc {
func (r *runner) run(name string, fn func(ctx context.Context)) context.CancelFunc {
<-r.ready // wait until there's a root context
ctx, cancel := context.WithCancel(r.rootContext)
r.tostart <- runWithContext{
run: fn,
ctx: ctx,
name: name,
do: fn,
ctx: ctx,
}
return cancel
}
Expand All @@ -51,12 +64,24 @@ func (r *runner) run(fn func(ctx context.Context)) context.CancelFunc {
func (r *runner) Start(ctx context.Context) error {
r.rootContext = ctx
close(r.ready) // broadcast that things can be run
var wg sync.WaitGroup
loop:
for {
select {
case randc := <-r.tostart:
go randc.run(randc.ctx)
r.log.Info("starting child", "name", randc.name)
wg.Add(1)
go func(rc runWithContext) {
defer wg.Done()
rc.do(rc.ctx)
r.log.Info("child exited", "name", rc.name)
}(randc)
case <-r.rootContext.Done():
return nil
break loop
}
}
r.log.Info("Stopping and waiting for children")
wg.Wait()
r.log.Info("All children stopped; runner exit")
return nil
}
18 changes: 15 additions & 3 deletions controllers/leveltriggered/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var kubeConfig []byte
var eventRecorder *testEventRecorder
var pipelineReconciler *PipelineReconciler

var envsToStop []*envtest.Environment

type testEvent struct {
object runtime.Object
eventType string
Expand Down Expand Up @@ -100,6 +102,7 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatalf("starting test env failed: %s", err)
}
envsToStop = append(envsToStop, testEnv)

user, err := testEnv.ControlPlane.AddUser(envtest.User{
Name: "envtest-admin",
Expand Down Expand Up @@ -176,11 +179,20 @@ func TestMain(m *testing.M) {

cancel()
wg.Wait()
log.Println("manager exited")

err = testEnv.Stop()
if err != nil {
log.Fatalf("stoping test env failed: %s", err)
var failedToStopEnvs bool
for _, env := range envsToStop {
err = env.Stop()
if err != nil {
failedToStopEnvs = true
log.Printf("stopping test env failed: %s\n", err)
}
}
if failedToStopEnvs {
log.Fatalf("failed to stop all test envs")
}

log.Println("test envs stopped")
os.Exit(retCode)
}
2 changes: 1 addition & 1 deletion controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestMain(m *testing.M) {

err = testEnv.Stop()
if err != nil {
log.Fatalf("stoping test env failed: %s", err)
log.Fatalf("stopping test env failed: %s", err)
}

os.Exit(retCode)
Expand Down