Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't send inputURI for start-node #5780

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions flyteadmin/pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,20 +1263,28 @@
return tx.Migrator().DropTable("execution_tags")
},
},

{
ID: "2024-06-06-drop-execution_admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("execution_admin_tags")
},
},

{
ID: "2024-06-06-drop-admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("admin_tags")
},
},
{
ID: "2024-08-08-remove-input-uri-for-start-nodes",
Migrate: func(db *gorm.DB) error {
return db.Exec("UPDATE node_executions SET input_uri = '' WHERE node_id = 'start-node'").Error
},
Rollback: func(db *gorm.DB) error {

Check warning on line 1283 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1280-L1283

Added lines #L1280 - L1283 were not covered by tests
// can't rollback missing data
return nil
},

Check warning on line 1286 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1285-L1286

Added lines #L1285 - L1286 were not covered by tests
},
}

var m = append(LegacyMigrations, NoopMigrations...)
Expand Down
15 changes: 11 additions & 4 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,10 +1248,17 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter

targetEntity := common.GetTargetEntity(ctx, nCtx)

nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p, nCtx.InputReader().GetInputPath().String(), nCtx.NodeStatus(), nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(), c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase,
c.eventConfig, targetEntity)
nev, err := ToNodeExecutionEvent(
nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p,
nCtx.InputReader().GetInputPath().String(),
nCtx.NodeStatus(),
nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(),
c.clusterID,
nCtx.NodeStateReader().GetDynamicNodeState().Phase,
c.eventConfig,
targetEntity)
if err != nil {
return interfaces.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event")
}
Expand Down
8 changes: 8 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,7 @@ func TestNodeExecutor_FinalizeHandler(t *testing.T) {
assert.NoError(t, exec.FinalizeHandler(ctx, nil, nil, nl, n))
})
}

func TestNodeExecutionEventStartNode(t *testing.T) {
execID := &core.WorkflowExecutionIdentifier{
Name: "e1",
Expand Down Expand Up @@ -1763,9 +1764,11 @@ func TestNodeExecutionEventStartNode(t *testing.T) {
ns.OnGetParentTaskID().Return(tID)
ns.OnGetOutputDirMatch(mock.Anything).Return("dummy://dummyOutUrl")
ns.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{})

ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{
RawOutputPolicy: config.RawOutputPolicyReference,
}, subWfID)

assert.NoError(t, err)
assert.Equal(t, "start-node", ev.Id.NodeId)
assert.Equal(t, execID, ev.Id.ExecutionId)
Expand All @@ -1778,6 +1781,7 @@ func TestNodeExecutionEventStartNode(t *testing.T) {
ev.OutputResult.(*event.NodeExecutionEvent_OutputUri).OutputUri)
assert.Equal(t, ev.ProducerId, testClusterID)
assert.Equal(t, subWfID, ev.GetTargetEntity())
assert.Nil(t, ev.InputValue)
}

func TestNodeExecutionEventV0(t *testing.T) {
Expand Down Expand Up @@ -1821,6 +1825,7 @@ func TestNodeExecutionEventV0(t *testing.T) {
assert.Empty(t, ev.NodeName)
assert.Empty(t, ev.RetryGroup)
assert.Empty(t, ev.TargetEntity)
assert.Equal(t, "reference", ev.GetInputUri())
}

func TestNodeExecutionEventV1(t *testing.T) {
Expand Down Expand Up @@ -1859,9 +1864,11 @@ func TestNodeExecutionEventV1(t *testing.T) {
ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted)
nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns)
ns.OnGetParentTaskID().Return(tID)

eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{
RawOutputPolicy: config.RawOutputPolicyInline,
}, nil)

assert.NoError(t, err)
assert.Equal(t, "np1-2-n1", eventOpt.Id.NodeId)
assert.Equal(t, execID, eventOpt.Id.ExecutionId)
Expand All @@ -1875,6 +1882,7 @@ func TestNodeExecutionEventV1(t *testing.T) {
assert.Equal(t, "2", eventOpt.RetryGroup)
assert.True(t, proto.Equal(eventOpt.GetInputData(), inputs))
assert.Empty(t, eventOpt.TargetEntity)
assert.Equal(t, inputs, eventOpt.GetInputData())
}

func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) {
Expand Down
30 changes: 16 additions & 14 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase {
}
}

func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
func ToNodeExecutionEvent(
nodeExecID *core.NodeExecutionIdentifier,
info handler.PhaseInfo,
inputPath string,
status v1alpha1.ExecutableNodeStatus,
Expand Down Expand Up @@ -109,9 +110,11 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
dynamicChain = true
}

eInfo := info.GetInfo()
var nev *event.NodeExecutionEvent
// Start node is special case where the Inputs and Outputs are the same and hence here we copy the Output file
// Start node is special case where the Outputs are the same and hence here we copy the Output file
// into the OutputResult and in admin we copy it over into input as well.
// Start node doesn't have inputs.
if nodeExecID.NodeId == v1alpha1.StartNodeID {
outputsFile := v1alpha1.GetOutputsFile(status.GetOutputDir())
nev = &event.NodeExecutionEvent{
Expand Down Expand Up @@ -139,6 +142,17 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
TargetEntity: targetEntity,
IsInDynamicChain: dynamicChain,
}
if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline {
if eInfo != nil {
nev.InputValue = &event.NodeExecutionEvent_InputData{
InputData: eInfo.Inputs,
}
}
} else {
nev.InputValue = &event.NodeExecutionEvent_InputUri{
InputUri: inputPath,
}
}
}

if eventVersion == v1alpha1.EventVersion0 && status.GetParentTaskID() != nil {
Expand All @@ -163,7 +177,6 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
nev.NodeName = node.GetName()
}

eInfo := info.GetInfo()
if eInfo != nil {
if eInfo.WorkflowNodeInfo != nil {
v := ToNodeExecWorkflowNodeMetadata(eInfo.WorkflowNodeInfo)
Expand Down Expand Up @@ -201,17 +214,6 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
nev.IsParent = true
}
}
if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline {
if eInfo != nil {
nev.InputValue = &event.NodeExecutionEvent_InputData{
InputData: eInfo.Inputs,
}
}
} else {
nev.InputValue = &event.NodeExecutionEvent_InputUri{
InputUri: inputPath,
}
}

return nev, nil
}
Expand Down
Loading