Skip to content

Commit

Permalink
Node execution refactor for Parent-child relationship (flyteorg#178)
Browse files Browse the repository at this point in the history
* Node execution refactor for Parent-child relationship
* Test fixes
* Comments and test
  • Loading branch information
anandswaminathan authored Aug 19, 2020
1 parent a2d8a74 commit c53e1d7
Show file tree
Hide file tree
Showing 32 changed files with 1,159 additions and 81 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb
gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect
google.golang.org/genproto v0.0.0-20200312145019-da6875a35672 // indirect
google.golang.org/grpc v1.28.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ golang.org/x/tools v0.0.0-20200204074204-1cc6d1ef6c74/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb h1:iKlO7ROJc6SttHKlxzwGytRtBUqX4VARrNTgP2YLX5M=
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ type ExecutableNode interface {
GetExecutionDeadline() *time.Duration
GetActiveDeadline() *time.Duration
IsInterruptible() *bool
GetName() string
}

// Interface for the Workflow p. This is the mutable portion for a Workflow
Expand Down Expand Up @@ -432,6 +433,7 @@ type Meta interface {
GetName() string
GetServiceAccountName() string
IsInterruptible() bool
GetEventVersion() EventVersion
GetRawOutputDataConfig() RawOutputDataConfig
}

Expand Down
32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNode.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (in *NodeMetadata) DeepCopyInto(out *NodeMetadata) {

type NodeSpec struct {
ID NodeID `json:"id"`
Name string `json:"name,omitempty"`
Resources *typesv1.ResourceRequirements `json:"resources,omitempty"`
Kind NodeKind `json:"kind"`
BranchNode *BranchNodeSpec `json:"branch,omitempty"`
Expand Down Expand Up @@ -145,6 +146,10 @@ type NodeSpec struct {
Interruptibe *bool `json:"interruptible,omitempty"`
}

func (in *NodeSpec) GetName() string {
return in.Name
}

func (in *NodeSpec) GetRetryStrategy() *RetryStrategy {
return in.RetryStrategy
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type FlyteWorkflow struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
*WorkflowSpec `json:"spec"`
WorkflowMeta *WorkflowMeta `json:"workflowMeta,omitempty"`
Inputs *Inputs `json:"inputs,omitempty"`
ExecutionID ExecutionID `json:"executionId"`
Tasks map[TaskID]*TaskSpec `json:"tasks"`
Expand Down Expand Up @@ -55,6 +56,24 @@ type FlyteWorkflow struct {
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`
}

func (in *FlyteWorkflow) GetEventVersion() EventVersion {
if in.WorkflowMeta != nil {
return in.WorkflowMeta.EventVersion
}
return EventVersion0
}

type WorkflowMeta struct {
EventVersion EventVersion `json:"eventVersion,omitempty"`
}

type EventVersion int

const (
EventVersion0 EventVersion = iota
EventVersion1
)

type NodeDefaults struct {
// Default behaviour for Interruptible for nodes unless explicitly set at the node level.
Interruptible bool `json:"interruptible,omitempty"`
Expand Down
11 changes: 8 additions & 3 deletions pkg/compiler/transformers/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile
}

var interruptible *bool
if n.GetMetadata() != nil && n.GetMetadata().GetInterruptibleValue() != nil {
interruptVal := n.GetMetadata().GetInterruptible()
interruptible = &interruptVal
var name string
if n.GetMetadata() != nil {
if n.GetMetadata().GetInterruptibleValue() != nil {
interruptVal := n.GetMetadata().GetInterruptible()
interruptible = &interruptVal
}
name = n.GetMetadata().Name
}

nodeSpec := &v1alpha1.NodeSpec{
ID: n.GetId(),
Name: name,
RetryStrategy: computeRetryStrategy(n, task),
ExecutionDeadline: timeout,
Resources: res,
Expand Down
46 changes: 43 additions & 3 deletions pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,70 @@ type ImmutableExecutionContext interface {
GetOnFailurePolicy() v1alpha1.WorkflowOnFailurePolicy
}

type ParentInfoGetter interface {
GetParentInfo() ImmutableParentInfo
}

type ImmutableParentInfo interface {
GetUniqueID() v1alpha1.NodeID
CurrentAttempt() uint32
}

type ExecutionContext interface {
ImmutableExecutionContext
TaskDetailsGetter
SubWorkflowGetter
ParentInfoGetter
}

type execContext struct {
ImmutableExecutionContext
TaskDetailsGetter
SubWorkflowGetter
parentInfo ImmutableParentInfo
}

func (e execContext) GetParentInfo() ImmutableParentInfo {
return e.parentInfo
}

type parentExecutionInfo struct {
uniqueID v1alpha1.NodeID
currentAttempts uint32
}

func (p *parentExecutionInfo) GetUniqueID() v1alpha1.NodeID {
return p.uniqueID
}

func (p *parentExecutionInfo) CurrentAttempt() uint32 {
return p.currentAttempts
}

func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext {
return NewExecutionContext(prevExecContext, taskGetter, prevExecContext)
return NewExecutionContext(prevExecContext, taskGetter, prevExecContext, prevExecContext.GetParentInfo())
}

func NewExecutionContextWithWorkflowGetter(prevExecContext ExecutionContext, getter SubWorkflowGetter) ExecutionContext {
return NewExecutionContext(prevExecContext, prevExecContext, getter)
return NewExecutionContext(prevExecContext, prevExecContext, getter, prevExecContext.GetParentInfo())
}

func NewExecutionContextWithParentInfo(prevExecContext ExecutionContext, parentInfo ImmutableParentInfo) ExecutionContext {
return NewExecutionContext(prevExecContext, prevExecContext, prevExecContext, parentInfo)
}

func NewExecutionContext(immExecContext ImmutableExecutionContext, tasksGetter TaskDetailsGetter, workflowGetter SubWorkflowGetter) ExecutionContext {
func NewExecutionContext(immExecContext ImmutableExecutionContext, tasksGetter TaskDetailsGetter, workflowGetter SubWorkflowGetter, parentInfo ImmutableParentInfo) ExecutionContext {
return execContext{
ImmutableExecutionContext: immExecContext,
TaskDetailsGetter: tasksGetter,
SubWorkflowGetter: workflowGetter,
parentInfo: parentInfo,
}
}

func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo {
return &parentExecutionInfo{
currentAttempts: currentAttempts,
uniqueID: uniqueID,
}
}
18 changes: 17 additions & 1 deletion pkg/controller/executors/execution_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ type subWfGetter struct {
SubWorkflowGetter
}

type immutableParentInfo struct {
ImmutableParentInfo
}

func TestExecutionContext(t *testing.T) {
eCtx := immExecContext{}
taskGetter := tdGetter{}
subWfGetter := subWfGetter{}
immutableParentInfo := immutableParentInfo{}

ec := NewExecutionContext(eCtx, taskGetter, subWfGetter)
ec := NewExecutionContext(eCtx, taskGetter, subWfGetter, immutableParentInfo)
assert.NotNil(t, ec)
typed := ec.(execContext)
assert.Equal(t, typed.ImmutableExecutionContext, eCtx)
assert.Equal(t, typed.SubWorkflowGetter, subWfGetter)
assert.Equal(t, typed.TaskDetailsGetter, taskGetter)
assert.Equal(t, typed.GetParentInfo(), immutableParentInfo)

taskGetter2 := tdGetter{}
NewExecutionContextWithTasksGetter(ec, taskGetter2)
Expand All @@ -37,6 +43,7 @@ func TestExecutionContext(t *testing.T) {
assert.Equal(t, typed.ImmutableExecutionContext, eCtx)
assert.Equal(t, typed.SubWorkflowGetter, subWfGetter)
assert.Equal(t, typed.TaskDetailsGetter, taskGetter2)
assert.Equal(t, typed.GetParentInfo(), immutableParentInfo)

subWfGetter2 := subWfGetter
NewExecutionContextWithWorkflowGetter(ec, subWfGetter2)
Expand All @@ -45,5 +52,14 @@ func TestExecutionContext(t *testing.T) {
assert.Equal(t, typed.ImmutableExecutionContext, eCtx)
assert.Equal(t, typed.SubWorkflowGetter, subWfGetter2)
assert.Equal(t, typed.TaskDetailsGetter, taskGetter)
assert.Equal(t, typed.GetParentInfo(), immutableParentInfo)

immutableParentInfo2 := immutableParentInfo
NewExecutionContextWithParentInfo(ec, immutableParentInfo2)
assert.NotNil(t, ec)
typed = ec.(execContext)
assert.Equal(t, typed.ImmutableExecutionContext, eCtx)
assert.Equal(t, typed.SubWorkflowGetter, subWfGetter2)
assert.Equal(t, typed.TaskDetailsGetter, taskGetter)
assert.Equal(t, typed.GetParentInfo(), immutableParentInfo2)
}
Loading

0 comments on commit c53e1d7

Please sign in to comment.