diff --git a/cmd/register/files_test.go b/cmd/register/files_test.go index 0eec7701..129d4f49 100644 --- a/cmd/register/files_test.go +++ b/cmd/register/files_test.go @@ -60,6 +60,31 @@ func TestRegisterFromFiles(t *testing.T) { err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx) assert.Nil(t, err) }) + t.Run("Register a workflow with a failure node", func(t *testing.T) { + s := setup() + testScope := promutils.NewTestScope() + labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey) + registerFilesSetup() + rconfig.DefaultFilesConfig.Archive = true + rconfig.DefaultFilesConfig.OutputLocationPrefix = s3Output + rconfig.DefaultFilesConfig.DeprecatedSourceUploadPath = s3Output + mockStorage, err := storage.NewDataStore(&storage.Config{ + Type: storage.TypeMemory, + }, testScope.NewSubScope("flytectl")) + assert.Nil(t, err) + Client = mockStorage + + args := []string{"testdata/failure-node.tgz"} + s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + mockDataProxy := s.MockClient.DataProxyClient().(*mocks.DataProxyServiceClient) + mockDataProxy.OnCreateUploadLocationMatch(s.Ctx, mock.Anything).Return(&service.CreateUploadLocationResponse{}, nil) + + err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx) + assert.Nil(t, err) + }) t.Run("Failed fast registration while uploading the codebase", func(t *testing.T) { s := setup() registerFilesSetup() diff --git a/cmd/register/register.go b/cmd/register/register.go index a04c99bd..7caa1e9b 100644 --- a/cmd/register/register.go +++ b/cmd/register/register.go @@ -10,7 +10,7 @@ import ( // Long descriptions are whitespace sensitive when generating docs using sphinx. const ( registerCmdShort = "Registers tasks, workflows, and launch plans from a list of generated serialized files." - registercmdLong = ` + registerCmdLong = ` Take input files as serialized versions of the tasks/workflows/launchplans and register them with FlyteAdmin. Currently, these input files are protobuf files generated as output from Flytekit serialize. Project and Domain are mandatory fields to be passed for registration and an optional version which defaults to v1. @@ -23,7 +23,7 @@ func RemoteRegisterCommand() *cobra.Command { registerCmd := &cobra.Command{ Use: "register", Short: registerCmdShort, - Long: registercmdLong, + Long: registerCmdLong, } registerResourcesFuncs := map[string]cmdcore.CommandEntry{ "files": {CmdFunc: registerFromFilesFunc, Aliases: []string{"file"}, PFlagProvider: rconfig.DefaultFilesConfig, diff --git a/cmd/register/register_util.go b/cmd/register/register_util.go index e4dd4355..5234e866 100644 --- a/cmd/register/register_util.go +++ b/cmd/register/register_util.go @@ -491,6 +491,11 @@ func hydrateSpec(message proto.Message, uploadLocation storage.DataReference, co return err } } + if workflowSpec.Template.GetFailureNode() != nil { + if err := hydrateNode(workflowSpec.Template.GetFailureNode(), config.Version, config.Force); err != nil { + return err + } + } hydrateIdentifier(workflowSpec.Template.Id, config.Version, config.Force) for _, subWorkflow := range workflowSpec.SubWorkflows { for _, Noderef := range subWorkflow.Nodes { @@ -498,6 +503,11 @@ func hydrateSpec(message proto.Message, uploadLocation storage.DataReference, co return err } } + if subWorkflow.GetFailureNode() != nil { + if err := hydrateNode(subWorkflow.GetFailureNode(), config.Version, config.Force); err != nil { + return err + } + } hydrateIdentifier(subWorkflow.Id, config.Version, config.Force) } case *admin.TaskSpec: diff --git a/cmd/register/testdata/failure-node.tgz b/cmd/register/testdata/failure-node.tgz new file mode 100644 index 00000000..7ac63e86 Binary files /dev/null and b/cmd/register/testdata/failure-node.tgz differ