diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index b3d744bd77..e345801c9d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -46,6 +46,7 @@ const ( NodeKindWorkflow NodeKind = "workflow" // Either an inline workflow or a remote workflow definition NodeKindGate NodeKind = "gate" // A Gate node with a condition NodeKindArray NodeKind = "array" // An array node with a subtask Node + NodeKindFailure NodeKind = "failure" // A failure node with a subtask Node NodeKindStart NodeKind = "start" // Start node is a special node NodeKindEnd NodeKind = "end" ) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 2421ddf9bb..e0541ed765 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -92,6 +92,7 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas return nil, errs } failureN = nodes[0] + // failureN.Kind = v1alpha1.NodeKindFailure } nodes, _ := buildNodes(wf.Template.GetNodes(), tasks, errs.NewScope()) diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index 07fc7bf938..25e2ebf437 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -187,8 +187,11 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile return nil, !errs.HasErrors() } - checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes)) + checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes)) // nodes in the workflow + failure node checkpoint = append(checkpoint, fg.Template.Nodes...) + //if fg.Template.FailureNode != nil { + // checkpoint = append(checkpoint, fg.Template.FailureNode) + //} fg.Template.Nodes = make([]*core.Node, 0, len(fg.Template.Nodes)) wf.GetCoreWorkflow().Connections = &core.ConnectionSet{ Downstream: make(map[string]*core.ConnectionSet_IdList), diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index bd96edcd5f..e2f606541d 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -266,6 +266,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex Kind: core.ExecutionError_SYSTEM, }), nil } + logger.Debugf(ctx, "Downstream nodes [%v]", downstreamNodes) if len(downstreamNodes) == 0 { logger.Debugf(ctx, "No downstream nodes found. Complete.") return interfaces.NodeStatusComplete, nil