From e0a1afa91d8e51ba2c6aed6c604f2a69bdb1b387 Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Wed, 15 Jun 2022 22:24:30 -0700 Subject: [PATCH] fix: sync cluster Workflow Template Informer before it's used (#8961) * fix: After Controller leader election we need to make sure clusterWorkflowTemplateInformer's cache is updated first. Otherwise, we call Lister() on it before it's ready. Signed-off-by: Julie Vogelman * fix: empty commit Signed-off-by: Julie Vogelman * fix: empty commit Signed-off-by: Julie Vogelman * fix: fix condition for exiting function Signed-off-by: Julie Vogelman * feat: for efficiency move the call to create cluster workflow template informer before we sync Signed-off-by: Julie Vogelman * feat: empty commit Signed-off-by: Julie Vogelman * feat: empty commit Signed-off-by: Julie Vogelman * fix: additional log line for error just in case Signed-off-by: Julie Vogelman * fix: additional error handling Signed-off-by: Julie Vogelman * fix: revert previous change - nil is valid Signed-off-by: Julie Vogelman * fix: for safety add verification that pointer isn't nil before use Signed-off-by: Julie Vogelman --- workflow/controller/controller.go | 11 +++++++++-- workflow/controller/operator.go | 18 ++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 88801c0b7528..df063d6c400b 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -263,6 +263,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.configMapInformer.Run(ctx.Done()) go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) go wfc.taskResultInformer.Run(ctx.Done()) + wfc.createClusterWorkflowTemplateInformer(ctx) // Wait for all involved caches to be synced, before processing items from the queue is started if !cache.WaitForCacheSync( @@ -277,8 +278,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo log.Fatal("Timed out waiting for caches to sync") } - wfc.createClusterWorkflowTemplateInformer(ctx) - // Start the metrics server go wfc.metrics.RunServer(ctx) @@ -420,6 +419,14 @@ func (wfc *WorkflowController) createClusterWorkflowTemplateInformer(ctx context if cwftGetAllowed && cwftListAllowed && cwftWatchAllowed { wfc.cwftmplInformer = informer.NewTolerantClusterWorkflowTemplateInformer(wfc.dynamicInterface, clusterWorkflowTemplateResyncPeriod) go wfc.cwftmplInformer.Informer().Run(ctx.Done()) + + // since the above call is asynchronous, make sure we populate our cache before we try to use it later + if !cache.WaitForCacheSync( + ctx.Done(), + wfc.cwftmplInformer.Informer().HasSynced, + ) { + log.Fatal("Timed out waiting for ClusterWorkflowTemplate cache to sync") + } } else { log.Warnf("Controller doesn't have RBAC access for ClusterWorkflowTemplates") } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 41cd6d2adaf7..37ebe2d8b831 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1151,7 +1151,11 @@ func printPodSpecLog(pod *apiv1.Pod, wfName string) { // and returns the new node status if something changed func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus) *wfv1.NodeStatus { new := old.DeepCopy() - tmpl := woc.GetNodeTemplate(old) + tmpl, err := woc.GetNodeTemplate(old) + if err != nil { + woc.log.Error(err) + return nil + } switch pod.Status.Phase { case apiv1.PodPending: new.Phase = wfv1.NodePending @@ -1162,7 +1166,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus new.Daemoned = nil case apiv1.PodFailed: // ignore pod failure for daemoned steps - if tmpl.IsDaemon() { + if tmpl != nil && tmpl.IsDaemon() { new.Phase = wfv1.NodeSucceeded } else { new.Phase, new.Message = woc.inferFailedReason(pod, tmpl) @@ -1172,7 +1176,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus // Daemons are a special case we need to understand the rules: // A node transitions into "daemoned" only if it's a daemon template and it becomes running AND ready. // A node will be unmarked "daemoned" when its boundary node completes, anywhere killDaemonedChildren is called. - if tmpl.IsDaemon() { + if tmpl != nil && tmpl.IsDaemon() { if !old.Fulfilled() { // pod is running and template is marked daemon. check if everything is ready for _, ctrStatus := range pod.Status.ContainerStatuses { @@ -2083,19 +2087,21 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool { return false } -func (woc *wfOperationCtx) GetNodeTemplate(node *wfv1.NodeStatus) *wfv1.Template { +func (woc *wfOperationCtx) GetNodeTemplate(node *wfv1.NodeStatus) (*wfv1.Template, error) { if node.TemplateRef != nil { tmplCtx, err := woc.createTemplateContext(node.GetTemplateScope()) if err != nil { woc.markNodeError(node.Name, err) + return nil, err } tmpl, err := tmplCtx.GetTemplateFromRef(node.TemplateRef) if err != nil { woc.markNodeError(node.Name, err) + return tmpl, err } - return tmpl + return tmpl, nil } - return woc.wf.GetTemplateByName(node.TemplateName) + return woc.wf.GetTemplateByName(node.TemplateName), nil } func (woc *wfOperationCtx) markWorkflowRunning(ctx context.Context) {