Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Nov 15, 2023
1 parent e8b678d commit 8454aa9
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 1 deletion.
1 change: 1 addition & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
NodeKindWorkflow NodeKind = "workflow" // Either an inline workflow or a remote workflow definition
NodeKindGate NodeKind = "gate" // A Gate node with a condition
NodeKindArray NodeKind = "array" // An array node with a subtask Node
NodeKindFailure NodeKind = "failure" // A failure node with a subtask Node
NodeKindStart NodeKind = "start" // Start node is a special node
NodeKindEnd NodeKind = "end"
)
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas
return nil, errs
}
failureN = nodes[0]
// failureN.Kind = v1alpha1.NodeKindFailure
}

nodes, _ := buildNodes(wf.Template.GetNodes(), tasks, errs.NewScope())
Expand Down
5 changes: 4 additions & 1 deletion flytepropeller/pkg/compiler/workflow_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile
return nil, !errs.HasErrors()
}

checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes))
checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes)) // nodes in the workflow + failure node
checkpoint = append(checkpoint, fg.Template.Nodes...)
//if fg.Template.FailureNode != nil {
// checkpoint = append(checkpoint, fg.Template.FailureNode)
//}
fg.Template.Nodes = make([]*core.Node, 0, len(fg.Template.Nodes))
wf.GetCoreWorkflow().Connections = &core.ConnectionSet{
Downstream: make(map[string]*core.ConnectionSet_IdList),
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
Kind: core.ExecutionError_SYSTEM,
}), nil
}
logger.Debugf(ctx, "Downstream nodes [%v]", downstreamNodes)
if len(downstreamNodes) == 0 {
logger.Debugf(ctx, "No downstream nodes found. Complete.")
return interfaces.NodeStatusComplete, nil
Expand Down

0 comments on commit 8454aa9

Please sign in to comment.