From 93ab67828546dc5887e9fe68d03db578ec9a9d88 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 18 Dec 2021 02:43:07 +0800 Subject: [PATCH 01/14] Deprecate ArrayJob proto Signed-off-by: Kevin Su --- go/tasks/plugins/array/array_tests_base.go | 22 ++++----- .../array/awsbatch/transformer_test.go | 4 +- go/tasks/plugins/array/catalog.go | 13 ++++-- go/tasks/plugins/array/catalog_test.go | 13 ++---- go/tasks/plugins/array/core/state.go | 46 +++++++++++-------- go/tasks/plugins/array/core/state_test.go | 26 +++++------ .../plugins/array/k8s/transformer_test.go | 8 ++-- 7 files changed, 66 insertions(+), 66 deletions(-) diff --git a/go/tasks/plugins/array/array_tests_base.go b/go/tasks/plugins/array/array_tests_base.go index dd8551bc0..9b9fd5235 100644 --- a/go/tasks/plugins/array/array_tests_base.go +++ b/go/tasks/plugins/array/array_tests_base.go @@ -6,8 +6,6 @@ import ( "github.com/flyteorg/flyteplugins/tests" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" "github.com/flyteorg/flytestdlib/utils" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" @@ -49,12 +47,10 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera } var err error - template.Custom, err = utils.MarshalPbToStruct(&plugins.ArrayJob{ - Parallelism: 10, - Size: 1, - SuccessCriteria: &plugins.ArrayJob_MinSuccesses{ - MinSuccesses: 1, - }, + template.Custom, err = utils.MarshalObjToStruct(&ArrayJob{ + Parallelism: 10, + Size: 1, + MinSuccesses: 1, }) assert.NoError(t, err) @@ -84,12 +80,10 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera } var err error - template.Custom, err = utils.MarshalPbToStruct(&plugins.ArrayJob{ - Parallelism: 10, - Size: 2, - SuccessCriteria: &plugins.ArrayJob_MinSuccesses{ - MinSuccesses: 1, - }, + template.Custom, err = utils.MarshalObjToStruct(&ArrayJob{ + Parallelism: 10, + Size: 2, + MinSuccesses: 1, }) assert.NoError(t, err) diff --git a/go/tasks/plugins/array/awsbatch/transformer_test.go b/go/tasks/plugins/array/awsbatch/transformer_test.go index f78fecf65..32fe7c66a 100644 --- a/go/tasks/plugins/array/awsbatch/transformer_test.go +++ b/go/tasks/plugins/array/awsbatch/transformer_test.go @@ -29,7 +29,7 @@ import ( "github.com/aws/aws-sdk-go/service/batch" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" + "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" "github.com/stretchr/testify/assert" ) @@ -136,7 +136,7 @@ func TestArrayJobToBatchInput(t *testing.T) { }, } - input := &plugins.ArrayJob{ + input := &array.ArrayJob{ Size: 10, Parallelism: 5, } diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 781f87227..cf2297476 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -21,6 +21,13 @@ import ( idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) +type ArrayJob struct { + Parallelism int64 + Size int64 + MinSuccesses int64 + MinSuccessRatio float64 +} + // DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an // ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location // which is different than their original location. To find the original index we construct an indexLookup array. @@ -38,7 +45,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex } // Extract the custom plugin pb - arrayJob, err := arrayCore.ToArrayJob(taskTemplate.GetCustom(), taskTemplate.TaskTypeVersion) + arrayJob, err := arrayCore.ToArrayJob(taskTemplate.Config, taskTemplate.TaskTypeVersion) if err != nil { return state, err } @@ -50,7 +57,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex if taskTemplate.TaskTypeVersion == 0 { state = state.SetOriginalArraySize(arrayJob.Size) arrayJobSize = arrayJob.Size - state = state.SetOriginalMinSuccesses(arrayJob.GetMinSuccesses()) + state = state.SetOriginalMinSuccesses(arrayJob.MinSuccesses) // build input readers inputReaders, err = ConstructRemoteFileInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJobSize)) @@ -81,7 +88,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex return state, errors.Errorf(errors.BadTaskSpecification, "Unable to determine array size from inputs") } - minSuccesses := math.Ceil(float64(arrayJob.GetMinSuccessRatio()) * float64(size)) + minSuccesses := math.Ceil(float64(arrayJob.MinSuccesses) * float64(size)) logger.Debugf(ctx, "Computed state: size [%d] and minSuccesses [%d]", int64(size), int64(minSuccesses)) state = state.SetOriginalArraySize(int64(size)) diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index aad365dd0..982b01602 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -5,7 +5,6 @@ import ( "errors" "testing" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" structpb "github.com/golang/protobuf/ptypes/struct" @@ -244,13 +243,11 @@ func TestDiscoverabilityTaskType1(t *testing.T) { download.OnGetCachedResults().Return(bitarray.NewBitSet(1)).Once() toCache := arrayCore.InvertBitSet(bitarray.NewBitSet(uint(3)), uint(3)) - arrayJob := &plugins.ArrayJob{ - SuccessCriteria: &plugins.ArrayJob_MinSuccessRatio{ - MinSuccessRatio: 0.5, - }, + arrayJob := &ArrayJob{ + MinSuccessRatio: 0.5, } - var arrayJobCustom structpb.Struct - err := utils.MarshalStruct(arrayJob, &arrayJobCustom) + var arrayJobCustom *structpb.Struct + arrayJobCustom, err := utils.MarshalObjToStruct(arrayJob) assert.NoError(t, err) templateType1 := &core.TaskTemplate{ Id: &core.Identifier{ @@ -276,7 +273,7 @@ func TestDiscoverabilityTaskType1(t *testing.T) { }, }, TaskTypeVersion: 1, - Custom: &arrayJobCustom, + Custom: arrayJobCustom, } runDetermineDiscoverabilityTest(t, templateType1, f, &arrayCore.State{ diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 3703d6480..0c08be79c 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "strconv" "time" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" @@ -13,11 +14,9 @@ import ( "github.com/flyteorg/flytestdlib/bitarray" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - idlPlugins "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" "github.com/flyteorg/flytestdlib/logger" - structpb "github.com/golang/protobuf/ptypes/struct" ) //go:generate mockery -all -case=underscore @@ -133,29 +132,36 @@ const ( ErrorK8sArrayGeneric errors.ErrorCode = "ARRAY_JOB_GENERIC_FAILURE" ) -func ToArrayJob(structObj *structpb.Struct, taskTypeVersion int32) (*idlPlugins.ArrayJob, error) { - if structObj == nil { +func ToArrayJob(config map[string]string, taskTypeVersion int32) (*array.ArrayJob, error) { + if config == nil { if taskTypeVersion == 0 { - - return &idlPlugins.ArrayJob{ - Parallelism: 1, - Size: 1, - SuccessCriteria: &idlPlugins.ArrayJob_MinSuccesses{ - MinSuccesses: 1, - }, + return &array.ArrayJob{ + Parallelism: 1, + Size: 1, + MinSuccesses: 1, }, nil } - return &idlPlugins.ArrayJob{ - Parallelism: 1, - Size: 1, - SuccessCriteria: &idlPlugins.ArrayJob_MinSuccessRatio{ - MinSuccessRatio: 1.0, - }, + return &array.ArrayJob{ + Parallelism: 1, + Size: 1, + MinSuccessRatio: 1.0, }, nil } - arrayJob := &idlPlugins.ArrayJob{} - err := utils.UnmarshalStruct(structObj, arrayJob) + arrayJob := &array.ArrayJob{} + var err error + if config["Parallelism"] != "" { + arrayJob.Parallelism, err = strconv.Atoi(config["Parallelism"]) + } + if config["Size"] != "" { + arrayJob.Size, err = strconv.Atoi(config["Size"]) + } + if config["MinSuccesses"] != "" { + arrayJob.MinSuccesses, err = strconv.Atoi(config["MinSuccesses"]) + } + if config["MinSuccessRatio"] != "" { + arrayJob.MinSuccessRatio, err = strconv.ParseFloat(config["MinSuccessRatio"], 64) + } return arrayJob, err } diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 876612623..b55bf73fd 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -7,7 +7,7 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" + "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" "github.com/golang/protobuf/proto" "github.com/flyteorg/flytestdlib/bitarray" @@ -247,24 +247,20 @@ func TestToArrayJob(t *testing.T) { t.Run("task_type_version == 0", func(t *testing.T) { arrayJob, err := ToArrayJob(nil, 0) assert.NoError(t, err) - assert.True(t, proto.Equal(arrayJob, &plugins.ArrayJob{ - Parallelism: 1, - Size: 1, - SuccessCriteria: &plugins.ArrayJob_MinSuccesses{ - MinSuccesses: 1, - }, - })) + assert.True(t, *arrayJob == array.ArrayJob{ + Parallelism: 1, + Size: 1, + MinSuccesses: 1, + }) }) t.Run("task_type_version == 1", func(t *testing.T) { arrayJob, err := ToArrayJob(nil, 1) assert.NoError(t, err) - assert.True(t, proto.Equal(arrayJob, &plugins.ArrayJob{ - Parallelism: 1, - Size: 1, - SuccessCriteria: &plugins.ArrayJob_MinSuccessRatio{ - MinSuccessRatio: 1.0, - }, - })) + assert.True(t, *arrayJob == array.ArrayJob{ + Parallelism: 1, + Size: 1, + MinSuccessRatio: 1.0, + }) }) } diff --git a/go/tasks/plugins/array/k8s/transformer_test.go b/go/tasks/plugins/array/k8s/transformer_test.go index 822e3c544..59bcf75e5 100644 --- a/go/tasks/plugins/array/k8s/transformer_test.go +++ b/go/tasks/plugins/array/k8s/transformer_test.go @@ -11,10 +11,10 @@ import ( "github.com/flyteorg/flytestdlib/storage" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - idlPlugins "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -42,7 +42,7 @@ var podSpec = v1.PodSpec{ }, } -var arrayJob = idlPlugins.ArrayJob{ +var arrayJob = array.ArrayJob{ Size: 100, } @@ -57,8 +57,8 @@ func getK8sPodTask(t *testing.T, annotations map[string]string) *core.TaskTempla t.Fatal(err) } - custom := &structpb.Struct{} - if err := utils.MarshalStruct(&arrayJob, custom); err != nil { + var custom *structpb.Struct + if custom, err = utils.MarshalObjToStruct(&arrayJob); err != nil { t.Fatal(err) } From 9517891cc61d2361dc245884c3c4ec85cf8822d6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 18 Dec 2021 03:07:09 +0800 Subject: [PATCH 02/14] Deprecate ArrayJob proto Signed-off-by: Kevin Su --- go/tasks/plugins/array/array_tests_base.go | 5 ++-- .../array/awsbatch/transformer_test.go | 4 +-- go/tasks/plugins/array/catalog.go | 9 +------ go/tasks/plugins/array/catalog_test.go | 2 +- go/tasks/plugins/array/core/array_job.go | 26 +++++++++++++++++++ go/tasks/plugins/array/core/state.go | 15 +++++------ go/tasks/plugins/array/core/state_test.go | 5 ++-- go/tasks/plugins/array/k8s/transformer.go | 8 +++--- 8 files changed, 46 insertions(+), 28 deletions(-) create mode 100644 go/tasks/plugins/array/core/array_job.go diff --git a/go/tasks/plugins/array/array_tests_base.go b/go/tasks/plugins/array/array_tests_base.go index 9b9fd5235..f98b742d7 100644 --- a/go/tasks/plugins/array/array_tests_base.go +++ b/go/tasks/plugins/array/array_tests_base.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/flyteorg/flyteplugins/tests" + arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/utils" @@ -47,7 +48,7 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera } var err error - template.Custom, err = utils.MarshalObjToStruct(&ArrayJob{ + template.Custom, err = utils.MarshalObjToStruct(&arrayCore.ArrayJob{ Parallelism: 10, Size: 1, MinSuccesses: 1, @@ -80,7 +81,7 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera } var err error - template.Custom, err = utils.MarshalObjToStruct(&ArrayJob{ + template.Custom, err = utils.MarshalObjToStruct(&arrayCore.ArrayJob{ Parallelism: 10, Size: 2, MinSuccesses: 1, diff --git a/go/tasks/plugins/array/awsbatch/transformer_test.go b/go/tasks/plugins/array/awsbatch/transformer_test.go index 32fe7c66a..6b5ab7012 100644 --- a/go/tasks/plugins/array/awsbatch/transformer_test.go +++ b/go/tasks/plugins/array/awsbatch/transformer_test.go @@ -29,7 +29,7 @@ import ( "github.com/aws/aws-sdk-go/service/batch" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" + arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" "github.com/stretchr/testify/assert" ) @@ -136,7 +136,7 @@ func TestArrayJobToBatchInput(t *testing.T) { }, } - input := &array.ArrayJob{ + input := &arrayCore.ArrayJob{ Size: 10, Parallelism: 5, } diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index cf2297476..704cebad8 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -21,13 +21,6 @@ import ( idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) -type ArrayJob struct { - Parallelism int64 - Size int64 - MinSuccesses int64 - MinSuccessRatio float64 -} - // DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an // ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location // which is different than their original location. To find the original index we construct an indexLookup array. @@ -45,7 +38,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex } // Extract the custom plugin pb - arrayJob, err := arrayCore.ToArrayJob(taskTemplate.Config, taskTemplate.TaskTypeVersion) + arrayJob, err := arrayCore.ToArrayJob(taskTemplate.GetConfig(), taskTemplate.TaskTypeVersion) if err != nil { return state, err } diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index 982b01602..521ff8c30 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -243,7 +243,7 @@ func TestDiscoverabilityTaskType1(t *testing.T) { download.OnGetCachedResults().Return(bitarray.NewBitSet(1)).Once() toCache := arrayCore.InvertBitSet(bitarray.NewBitSet(uint(3)), uint(3)) - arrayJob := &ArrayJob{ + arrayJob := &arrayCore.ArrayJob{ MinSuccessRatio: 0.5, } var arrayJobCustom *structpb.Struct diff --git a/go/tasks/plugins/array/core/array_job.go b/go/tasks/plugins/array/core/array_job.go new file mode 100644 index 000000000..5d3f23973 --- /dev/null +++ b/go/tasks/plugins/array/core/array_job.go @@ -0,0 +1,26 @@ +package core + + +type ArrayJob struct { + Parallelism int64 + Size int64 + MinSuccesses int64 + MinSuccessRatio float64 +} + +func (a ArrayJob) GetParallelism() int64 { + return a.Parallelism +} + +func (a ArrayJob) GetSize() int64 { + return a.Size +} + +func (a ArrayJob) GetMinSuccesses() int64 { + return a.MinSuccesses +} + +func (a ArrayJob) GetMinSuccessRatio() float64 { + return a.MinSuccessRatio +} + diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 0c08be79c..36857ca34 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -15,7 +15,6 @@ import ( idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" "github.com/flyteorg/flytestdlib/logger" ) @@ -132,32 +131,32 @@ const ( ErrorK8sArrayGeneric errors.ErrorCode = "ARRAY_JOB_GENERIC_FAILURE" ) -func ToArrayJob(config map[string]string, taskTypeVersion int32) (*array.ArrayJob, error) { +func ToArrayJob(config map[string]string, taskTypeVersion int32) (*ArrayJob, error) { if config == nil { if taskTypeVersion == 0 { - return &array.ArrayJob{ + return &ArrayJob{ Parallelism: 1, Size: 1, MinSuccesses: 1, }, nil } - return &array.ArrayJob{ + return &ArrayJob{ Parallelism: 1, Size: 1, MinSuccessRatio: 1.0, }, nil } - arrayJob := &array.ArrayJob{} + arrayJob := &ArrayJob{} var err error if config["Parallelism"] != "" { - arrayJob.Parallelism, err = strconv.Atoi(config["Parallelism"]) + arrayJob.Parallelism, err = strconv.ParseInt(config["Parallelism"], 10,64) } if config["Size"] != "" { - arrayJob.Size, err = strconv.Atoi(config["Size"]) + arrayJob.Size, err = strconv.ParseInt(config["Size"], 10, 64) } if config["MinSuccesses"] != "" { - arrayJob.MinSuccesses, err = strconv.Atoi(config["MinSuccesses"]) + arrayJob.MinSuccesses, err = strconv.ParseInt(config["MinSuccesses"], 10, 64) } if config["MinSuccessRatio"] != "" { arrayJob.MinSuccessRatio, err = strconv.ParseFloat(config["MinSuccessRatio"], 64) diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index b55bf73fd..06d19d8ab 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -7,7 +7,6 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" "github.com/golang/protobuf/proto" "github.com/flyteorg/flytestdlib/bitarray" @@ -247,7 +246,7 @@ func TestToArrayJob(t *testing.T) { t.Run("task_type_version == 0", func(t *testing.T) { arrayJob, err := ToArrayJob(nil, 0) assert.NoError(t, err) - assert.True(t, *arrayJob == array.ArrayJob{ + assert.True(t, *arrayJob == ArrayJob{ Parallelism: 1, Size: 1, MinSuccesses: 1, @@ -257,7 +256,7 @@ func TestToArrayJob(t *testing.T) { t.Run("task_type_version == 1", func(t *testing.T) { arrayJob, err := ToArrayJob(nil, 1) assert.NoError(t, err) - assert.True(t, *arrayJob == array.ArrayJob{ + assert.True(t, *arrayJob == ArrayJob{ Parallelism: 1, Size: 1, MinSuccessRatio: 1.0, diff --git a/go/tasks/plugins/array/k8s/transformer.go b/go/tasks/plugins/array/k8s/transformer.go index 8b10c54d3..c152d3660 100644 --- a/go/tasks/plugins/array/k8s/transformer.go +++ b/go/tasks/plugins/array/k8s/transformer.go @@ -14,7 +14,7 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - idlPlugins "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" + arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" "github.com/flyteorg/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" @@ -97,7 +97,7 @@ func buildPodMapTask(task *idlCore.TaskTemplate, metadata core.TaskExecutionMeta // FlyteArrayJobToK8sPodTemplate returns a pod template for the given task context. Note that Name is not set on the // result object. It's up to the caller to set the Name before creating the object in K8s. func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionContext, namespaceTemplate string) ( - podTemplate v1.Pod, job *idlPlugins.ArrayJob, err error) { + podTemplate v1.Pod, job *arrayCore.ArrayJob, err error) { // Check that the taskTemplate is valid taskTemplate, err := tCtx.TaskReader().Read(ctx) @@ -117,9 +117,9 @@ func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionC arrayInputReader: array.GetInputReader(tCtx, taskTemplate), } - var arrayJob *idlPlugins.ArrayJob + var arrayJob *arrayCore.ArrayJob if taskTemplate.GetCustom() != nil { - arrayJob, err = core2.ToArrayJob(taskTemplate.GetCustom(), taskTemplate.TaskTypeVersion) + arrayJob, err = core2.ToArrayJob(taskTemplate.GetConfig(), taskTemplate.TaskTypeVersion) if err != nil { return v1.Pod{}, nil, err } From 555c6c523fb5e33a5e6478e6d6367c45ef38f4fa Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 22 Dec 2021 03:25:43 +0800 Subject: [PATCH 03/14] Fix lint Signed-off-by: Kevin Su --- go/tasks/plugins/array/array_tests_base.go | 2 +- go/tasks/plugins/array/catalog.go | 4 ++-- go/tasks/plugins/array/core/array_job.go | 2 -- go/tasks/plugins/array/core/state.go | 2 +- go/tasks/plugins/array/k8s/transformer.go | 2 +- go/tasks/plugins/array/k8s/transformer_test.go | 4 ++-- 6 files changed, 7 insertions(+), 9 deletions(-) diff --git a/go/tasks/plugins/array/array_tests_base.go b/go/tasks/plugins/array/array_tests_base.go index f98b742d7..bb42986dd 100644 --- a/go/tasks/plugins/array/array_tests_base.go +++ b/go/tasks/plugins/array/array_tests_base.go @@ -3,8 +3,8 @@ package array import ( "testing" - "github.com/flyteorg/flyteplugins/tests" arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" + "github.com/flyteorg/flyteplugins/tests" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/utils" diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 704cebad8..6fe181c0a 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -50,7 +50,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex if taskTemplate.TaskTypeVersion == 0 { state = state.SetOriginalArraySize(arrayJob.Size) arrayJobSize = arrayJob.Size - state = state.SetOriginalMinSuccesses(arrayJob.MinSuccesses) + state = state.SetOriginalMinSuccesses(arrayJob.GetMinSuccesses()) // build input readers inputReaders, err = ConstructRemoteFileInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJobSize)) @@ -81,7 +81,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex return state, errors.Errorf(errors.BadTaskSpecification, "Unable to determine array size from inputs") } - minSuccesses := math.Ceil(float64(arrayJob.MinSuccesses) * float64(size)) + minSuccesses := math.Ceil(float64(arrayJob.GetMinSuccesses()) * float64(size)) logger.Debugf(ctx, "Computed state: size [%d] and minSuccesses [%d]", int64(size), int64(minSuccesses)) state = state.SetOriginalArraySize(int64(size)) diff --git a/go/tasks/plugins/array/core/array_job.go b/go/tasks/plugins/array/core/array_job.go index 5d3f23973..3289a1ce7 100644 --- a/go/tasks/plugins/array/core/array_job.go +++ b/go/tasks/plugins/array/core/array_job.go @@ -1,6 +1,5 @@ package core - type ArrayJob struct { Parallelism int64 Size int64 @@ -23,4 +22,3 @@ func (a ArrayJob) GetMinSuccesses() int64 { func (a ArrayJob) GetMinSuccessRatio() float64 { return a.MinSuccessRatio } - diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 36857ca34..d010bb52b 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -150,7 +150,7 @@ func ToArrayJob(config map[string]string, taskTypeVersion int32) (*ArrayJob, err arrayJob := &ArrayJob{} var err error if config["Parallelism"] != "" { - arrayJob.Parallelism, err = strconv.ParseInt(config["Parallelism"], 10,64) + arrayJob.Parallelism, err = strconv.ParseInt(config["Parallelism"], 10, 64) } if config["Size"] != "" { arrayJob.Size, err = strconv.ParseInt(config["Size"], 10, 64) diff --git a/go/tasks/plugins/array/k8s/transformer.go b/go/tasks/plugins/array/k8s/transformer.go index c152d3660..04b750e3f 100644 --- a/go/tasks/plugins/array/k8s/transformer.go +++ b/go/tasks/plugins/array/k8s/transformer.go @@ -14,10 +14,10 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" "github.com/flyteorg/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" core2 "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/go/tasks/plugins/array/k8s/transformer_test.go b/go/tasks/plugins/array/k8s/transformer_test.go index 59bcf75e5..b810ab9d1 100644 --- a/go/tasks/plugins/array/k8s/transformer_test.go +++ b/go/tasks/plugins/array/k8s/transformer_test.go @@ -14,7 +14,7 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" + arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -42,7 +42,7 @@ var podSpec = v1.PodSpec{ }, } -var arrayJob = array.ArrayJob{ +var arrayJob = arrayCore.ArrayJob{ Size: 100, } From debb637917c899a02c9e3355d9dee2f778efdffb Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 22 Dec 2021 04:16:07 +0800 Subject: [PATCH 04/14] Fix lint Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/transformer.go | 9 +++------ go/tasks/plugins/array/k8s/transformer_test.go | 8 +------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/go/tasks/plugins/array/k8s/transformer.go b/go/tasks/plugins/array/k8s/transformer.go index 04b750e3f..e702c8446 100644 --- a/go/tasks/plugins/array/k8s/transformer.go +++ b/go/tasks/plugins/array/k8s/transformer.go @@ -117,12 +117,9 @@ func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionC arrayInputReader: array.GetInputReader(tCtx, taskTemplate), } - var arrayJob *arrayCore.ArrayJob - if taskTemplate.GetCustom() != nil { - arrayJob, err = core2.ToArrayJob(taskTemplate.GetConfig(), taskTemplate.TaskTypeVersion) - if err != nil { - return v1.Pod{}, nil, err - } + arrayJob, err := core2.ToArrayJob(taskTemplate.GetConfig(), taskTemplate.TaskTypeVersion) + if err != nil { + return v1.Pod{}, nil, err } annotations := utils.UnionMaps(config.GetK8sPluginConfig().DefaultAnnotations, tCtx.TaskExecutionMetadata().GetAnnotations()) diff --git a/go/tasks/plugins/array/k8s/transformer_test.go b/go/tasks/plugins/array/k8s/transformer_test.go index b810ab9d1..b3a97a368 100644 --- a/go/tasks/plugins/array/k8s/transformer_test.go +++ b/go/tasks/plugins/array/k8s/transformer_test.go @@ -13,7 +13,6 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/stretchr/testify/assert" @@ -57,15 +56,11 @@ func getK8sPodTask(t *testing.T, annotations map[string]string) *core.TaskTempla t.Fatal(err) } - var custom *structpb.Struct - if custom, err = utils.MarshalObjToStruct(&arrayJob); err != nil { - t.Fatal(err) - } - return &core.TaskTemplate{ TaskTypeVersion: 2, Config: map[string]string{ primaryContainerKey: testPrimaryContainerName, + "Size": "100", }, Target: &core.TaskTemplate_K8SPod{ K8SPod: &core.K8SPod{ @@ -78,7 +73,6 @@ func getK8sPodTask(t *testing.T, annotations map[string]string) *core.TaskTempla }, }, }, - Custom: custom, } } From 16199d199189413cd0e963813ef4e8a50a93931b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 22 Dec 2021 17:21:28 +0800 Subject: [PATCH 05/14] Fix test Signed-off-by: Kevin Su --- go/tasks/plugins/array/catalog.go | 2 +- go/tasks/plugins/array/catalog_test.go | 12 +++--------- go/tasks/plugins/array/core/state.go | 8 ++++---- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 6fe181c0a..1ddb800b7 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -81,7 +81,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex return state, errors.Errorf(errors.BadTaskSpecification, "Unable to determine array size from inputs") } - minSuccesses := math.Ceil(float64(arrayJob.GetMinSuccesses()) * float64(size)) + minSuccesses := math.Ceil(float64(arrayJob.GetMinSuccessRatio()) * float64(size)) logger.Debugf(ctx, "Computed state: size [%d] and minSuccesses [%d]", int64(size), int64(minSuccesses)) state = state.SetOriginalArraySize(int64(size)) diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index 521ff8c30..668a98876 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -5,9 +5,6 @@ import ( "errors" "testing" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" - structpb "github.com/golang/protobuf/ptypes/struct" - stdErrors "github.com/flyteorg/flytestdlib/errors" pluginErrors "github.com/flyteorg/flyteplugins/go/tasks/errors" @@ -243,12 +240,9 @@ func TestDiscoverabilityTaskType1(t *testing.T) { download.OnGetCachedResults().Return(bitarray.NewBitSet(1)).Once() toCache := arrayCore.InvertBitSet(bitarray.NewBitSet(uint(3)), uint(3)) - arrayJob := &arrayCore.ArrayJob{ - MinSuccessRatio: 0.5, + arrayJob := map[string]string{ + "MinSuccessRatio": "0.5", } - var arrayJobCustom *structpb.Struct - arrayJobCustom, err := utils.MarshalObjToStruct(arrayJob) - assert.NoError(t, err) templateType1 := &core.TaskTemplate{ Id: &core.Identifier{ ResourceType: core.ResourceType_TASK, @@ -273,7 +267,7 @@ func TestDiscoverabilityTaskType1(t *testing.T) { }, }, TaskTypeVersion: 1, - Custom: arrayJobCustom, + Config: arrayJob, } runDetermineDiscoverabilityTest(t, templateType1, f, &arrayCore.State{ diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index d010bb52b..f58ee5f60 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -149,16 +149,16 @@ func ToArrayJob(config map[string]string, taskTypeVersion int32) (*ArrayJob, err arrayJob := &ArrayJob{} var err error - if config["Parallelism"] != "" { + if len(config["Parallelism"]) != 0 { arrayJob.Parallelism, err = strconv.ParseInt(config["Parallelism"], 10, 64) } - if config["Size"] != "" { + if len(config["Size"]) != 0 { arrayJob.Size, err = strconv.ParseInt(config["Size"], 10, 64) } - if config["MinSuccesses"] != "" { + if len(config["MinSuccesses"]) != 0 { arrayJob.MinSuccesses, err = strconv.ParseInt(config["MinSuccesses"], 10, 64) } - if config["MinSuccessRatio"] != "" { + if len(config["MinSuccessRatio"]) != 0 { arrayJob.MinSuccessRatio, err = strconv.ParseFloat(config["MinSuccessRatio"], 64) } return arrayJob, err From 2fbfd2d05c8f323a6aa8b4e9e48056219a0c981c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 22 Dec 2021 17:32:53 +0800 Subject: [PATCH 06/14] Fix test Signed-off-by: Kevin Su --- go/tasks/plugins/array/array_tests_base.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/go/tasks/plugins/array/array_tests_base.go b/go/tasks/plugins/array/array_tests_base.go index bb42986dd..89ed06b8a 100644 --- a/go/tasks/plugins/array/array_tests_base.go +++ b/go/tasks/plugins/array/array_tests_base.go @@ -80,14 +80,11 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera }, } - var err error - template.Custom, err = utils.MarshalObjToStruct(&arrayCore.ArrayJob{ - Parallelism: 10, - Size: 2, - MinSuccesses: 1, - }) - - assert.NoError(t, err) + template.Config = map[string]string{ + "Parallelism": "10", + "Size": "2", + "MinSuccesses": "1", + } expectedOutputs := coreutils.MustMakeLiteral(map[string]interface{}{ "x": []interface{}{5, 5}, From ac34b203e5c026cded75a6df0b61b16b7c789950 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 23 Dec 2021 01:25:27 +0800 Subject: [PATCH 07/14] Fix lint Signed-off-by: Kevin Su --- go/tasks/plugins/array/catalog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 1ddb800b7..b7396bb42 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -81,7 +81,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex return state, errors.Errorf(errors.BadTaskSpecification, "Unable to determine array size from inputs") } - minSuccesses := math.Ceil(float64(arrayJob.GetMinSuccessRatio()) * float64(size)) + minSuccesses := math.Ceil(arrayJob.GetMinSuccessRatio() * float64(size)) logger.Debugf(ctx, "Computed state: size [%d] and minSuccesses [%d]", int64(size), int64(minSuccesses)) state = state.SetOriginalArraySize(int64(size)) From 8847068e20768303a6e64b9e13c1ed71366ac73e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 29 Dec 2021 01:27:33 +0800 Subject: [PATCH 08/14] Added test Signed-off-by: Kevin Su --- go/tasks/plugins/array/core/state_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 06d19d8ab..cdcaf112d 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -262,4 +262,20 @@ func TestToArrayJob(t *testing.T) { MinSuccessRatio: 1.0, }) }) + + t.Run("ToArrayJob with config", func(t *testing.T) { + config := map[string]string{ + "Parallelism": "10", + "Size": "10", + "MinSuccesses": "1", + "MinSuccessRatio": "1.0", + } + + arrayJob, err := ToArrayJob(config, 0) + assert.NoError(t, err) + assert.Equal(t, arrayJob.GetParallelism(), int64(10)) + assert.Equal(t, arrayJob.GetSize(), int64(10)) + assert.Equal(t, arrayJob.GetMinSuccesses(), int64(1)) + assert.Equal(t, arrayJob.GetMinSuccessRatio(), 1.0) + }) } From f61ee5e6b9aee9e2faaaa4eba4468f7c0ad6d72b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 29 Dec 2021 01:55:36 +0800 Subject: [PATCH 09/14] Fixed test errors Signed-off-by: Kevin Su --- go/tasks/plugins/array/core/state_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index cdcaf112d..78ca3ff55 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -265,9 +265,9 @@ func TestToArrayJob(t *testing.T) { t.Run("ToArrayJob with config", func(t *testing.T) { config := map[string]string{ - "Parallelism": "10", - "Size": "10", - "MinSuccesses": "1", + "Parallelism": "10", + "Size": "10", + "MinSuccesses": "1", "MinSuccessRatio": "1.0", } From 86f2271af50fd11ed7e663bb0a20e57cf8b1101b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 18 Jan 2022 22:59:52 +0800 Subject: [PATCH 10/14] keep backward compatibility Signed-off-by: Kevin Su --- go/tasks/plugins/array/array_tests_base.go | 13 ++-- go/tasks/plugins/array/catalog.go | 2 +- go/tasks/plugins/array/catalog_test.go | 26 ++++++++ go/tasks/plugins/array/core/state.go | 69 ++++++++++++++-------- go/tasks/plugins/array/core/state_test.go | 9 ++- go/tasks/plugins/array/k8s/transformer.go | 2 +- 6 files changed, 81 insertions(+), 40 deletions(-) diff --git a/go/tasks/plugins/array/array_tests_base.go b/go/tasks/plugins/array/array_tests_base.go index 89ed06b8a..90ed5f531 100644 --- a/go/tasks/plugins/array/array_tests_base.go +++ b/go/tasks/plugins/array/array_tests_base.go @@ -3,12 +3,9 @@ package array import ( "testing" - arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" "github.com/flyteorg/flyteplugins/tests" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flytestdlib/utils" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "context" @@ -48,11 +45,11 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera } var err error - template.Custom, err = utils.MarshalObjToStruct(&arrayCore.ArrayJob{ - Parallelism: 10, - Size: 1, - MinSuccesses: 1, - }) + template.Config = map[string]string{ + "Parallelism": "10", + "Size": "1", + "MinSuccesses": "1", + } assert.NoError(t, err) diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index b7396bb42..4712b1b40 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -38,7 +38,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex } // Extract the custom plugin pb - arrayJob, err := arrayCore.ToArrayJob(taskTemplate.GetConfig(), taskTemplate.TaskTypeVersion) + arrayJob, err := arrayCore.ToArrayJob(taskTemplate, taskTemplate.TaskTypeVersion) if err != nil { return state, err } diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index 668a98876..4e029936b 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -5,6 +5,10 @@ import ( "errors" "testing" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + structpb "github.com/golang/protobuf/ptypes/struct" + stdErrors "github.com/flyteorg/flytestdlib/errors" pluginErrors "github.com/flyteorg/flyteplugins/go/tasks/errors" @@ -279,5 +283,27 @@ func TestDiscoverabilityTaskType1(t *testing.T) { IndexesToCache: toCache, Reason: "Task is not discoverable.", }, nil) + + // Get ArrayJob information from taskTemplate.config + arrayJobProto := &plugins.ArrayJob{ + SuccessCriteria: &plugins.ArrayJob_MinSuccessRatio{ + MinSuccessRatio: 0.5, + }, + } + var arrayJobCustom structpb.Struct + err := utils.MarshalStruct(arrayJobProto, &arrayJobCustom) + assert.NoError(t, err) + templateType1.Config = nil + templateType1.Custom = &arrayJobCustom + + runDetermineDiscoverabilityTest(t, templateType1, f, &arrayCore.State{ + CurrentPhase: arrayCore.PhasePreLaunch, + PhaseVersion: core2.DefaultPhaseVersion, + ExecutionArraySize: 3, + OriginalArraySize: 3, + OriginalMinSuccesses: 2, + IndexesToCache: toCache, + Reason: "Task is not discoverable.", + }, nil) }) } diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index f58ee5f60..5a7de7367 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -6,6 +6,8 @@ import ( "strconv" "time" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flytestdlib/errors" @@ -14,6 +16,7 @@ import ( "github.com/flyteorg/flytestdlib/bitarray" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + idlPlugins "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flytestdlib/logger" ) @@ -131,37 +134,53 @@ const ( ErrorK8sArrayGeneric errors.ErrorCode = "ARRAY_JOB_GENERIC_FAILURE" ) -func ToArrayJob(config map[string]string, taskTypeVersion int32) (*ArrayJob, error) { - if config == nil { - if taskTypeVersion == 0 { - return &ArrayJob{ - Parallelism: 1, - Size: 1, - MinSuccesses: 1, - }, nil +func ToArrayJob(taskTemplate *idlCore.TaskTemplate, taskTypeVersion int32) (*ArrayJob, error) { + if taskTemplate.GetConfig() != nil { + config := taskTemplate.GetConfig() + arrayJob := &ArrayJob{} + var err error + if len(config["Parallelism"]) != 0 { + arrayJob.Parallelism, err = strconv.ParseInt(config["Parallelism"], 10, 64) + } + if len(config["Size"]) != 0 { + arrayJob.Size, err = strconv.ParseInt(config["Size"], 10, 64) + } + if len(config["MinSuccesses"]) != 0 { + arrayJob.MinSuccesses, err = strconv.ParseInt(config["MinSuccesses"], 10, 64) + } + if len(config["MinSuccessRatio"]) != 0 { + arrayJob.MinSuccessRatio, err = strconv.ParseFloat(config["MinSuccessRatio"], 64) + } + return arrayJob, err + } + + // Keep backward compatibility for those who use arrayJob proto + if taskTemplate.GetCustom() != nil { + arrayJob := &idlPlugins.ArrayJob{} + err := utils.UnmarshalStruct(taskTemplate.GetCustom(), arrayJob) + if err != nil { + return nil, err } return &ArrayJob{ - Parallelism: 1, - Size: 1, - MinSuccessRatio: 1.0, + Parallelism: arrayJob.GetParallelism(), + Size: arrayJob.GetSize(), + MinSuccessRatio: float64(arrayJob.GetMinSuccessRatio()), + MinSuccesses: arrayJob.GetMinSuccesses(), }, nil } - arrayJob := &ArrayJob{} - var err error - if len(config["Parallelism"]) != 0 { - arrayJob.Parallelism, err = strconv.ParseInt(config["Parallelism"], 10, 64) - } - if len(config["Size"]) != 0 { - arrayJob.Size, err = strconv.ParseInt(config["Size"], 10, 64) - } - if len(config["MinSuccesses"]) != 0 { - arrayJob.MinSuccesses, err = strconv.ParseInt(config["MinSuccesses"], 10, 64) - } - if len(config["MinSuccessRatio"]) != 0 { - arrayJob.MinSuccessRatio, err = strconv.ParseFloat(config["MinSuccessRatio"], 64) + if taskTypeVersion == 0 { + return &ArrayJob{ + Parallelism: 1, + Size: 1, + MinSuccesses: 1, + }, nil } - return arrayJob, err + return &ArrayJob{ + Parallelism: 1, + Size: 1, + MinSuccessRatio: 1.0, + }, nil } func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 { diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 78ca3ff55..72cf7e5b1 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -6,12 +6,11 @@ import ( "testing" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/golang/protobuf/proto" - "github.com/flyteorg/flytestdlib/bitarray" - + idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flytestdlib/bitarray" "github.com/stretchr/testify/assert" ) @@ -270,8 +269,8 @@ func TestToArrayJob(t *testing.T) { "MinSuccesses": "1", "MinSuccessRatio": "1.0", } - - arrayJob, err := ToArrayJob(config, 0) + taskTemplate := &idlCore.TaskTemplate{Config: config} + arrayJob, err := ToArrayJob(taskTemplate, 0) assert.NoError(t, err) assert.Equal(t, arrayJob.GetParallelism(), int64(10)) assert.Equal(t, arrayJob.GetSize(), int64(10)) diff --git a/go/tasks/plugins/array/k8s/transformer.go b/go/tasks/plugins/array/k8s/transformer.go index e702c8446..9eb5819ec 100644 --- a/go/tasks/plugins/array/k8s/transformer.go +++ b/go/tasks/plugins/array/k8s/transformer.go @@ -117,7 +117,7 @@ func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionC arrayInputReader: array.GetInputReader(tCtx, taskTemplate), } - arrayJob, err := core2.ToArrayJob(taskTemplate.GetConfig(), taskTemplate.TaskTypeVersion) + arrayJob, err := core2.ToArrayJob(taskTemplate, taskTemplate.TaskTypeVersion) if err != nil { return v1.Pod{}, nil, err } From 42ae6c5bae933af644782adc5b945352514cbe38 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 18 Jan 2022 23:20:35 +0800 Subject: [PATCH 11/14] Added tests Signed-off-by: Kevin Su --- go/tasks/plugins/array/core/state_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 72cf7e5b1..602c4b05f 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -5,6 +5,9 @@ import ( "fmt" "testing" + idlPlugins "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" + "github.com/flyteorg/flytestdlib/utils" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/golang/protobuf/proto" @@ -277,4 +280,23 @@ func TestToArrayJob(t *testing.T) { assert.Equal(t, arrayJob.GetMinSuccesses(), int64(1)) assert.Equal(t, arrayJob.GetMinSuccessRatio(), 1.0) }) + + t.Run("ToArrayJob with custom", func(t *testing.T) { + arrayJobProto := &idlPlugins.ArrayJob{ + Parallelism: 10, + Size: 10, + SuccessCriteria: &idlPlugins.ArrayJob_MinSuccessRatio{ + MinSuccessRatio: 1.0, + }, + } + custom, err := utils.MarshalPbToStruct(arrayJobProto) + assert.NoError(t, err) + taskTemplate := &idlCore.TaskTemplate{Custom: custom} + arrayJob, err := ToArrayJob(taskTemplate, 0) + assert.NoError(t, err) + assert.Equal(t, arrayJob.GetParallelism(), int64(10)) + assert.Equal(t, arrayJob.GetSize(), int64(10)) + assert.Equal(t, arrayJob.GetMinSuccesses(), int64(0)) + assert.Equal(t, arrayJob.GetMinSuccessRatio(), 1.0) + }) } From f2c393ccc19374ee1981b4c193497c84c51c6911 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 7 Feb 2022 08:37:24 +0800 Subject: [PATCH 12/14] Fixed tests Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/executor.go | 4 ++-- go/tasks/plugins/array/awsbatch/transformer_test.go | 2 +- go/tasks/plugins/array/catalog.go | 1 - go/tasks/plugins/array/catalog_test.go | 2 +- go/tasks/plugins/array/core/state.go | 5 +++-- go/tasks/plugins/array/inputs.go | 3 ++- go/tasks/plugins/array/outputs.go | 2 +- 7 files changed, 10 insertions(+), 9 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/executor.go b/go/tasks/plugins/array/awsbatch/executor.go index ec4b5bc3c..7f7c0e64b 100644 --- a/go/tasks/plugins/array/awsbatch/executor.go +++ b/go/tasks/plugins/array/awsbatch/executor.go @@ -189,10 +189,10 @@ func init() { pluginmachinery.PluginRegistry().RegisterCorePlugin( core.PluginEntry{ ID: executorName, - RegisteredTaskTypes: []core.TaskType{arrayTaskType, array.AwsBatchTaskType}, + RegisteredTaskTypes: []core.TaskType{arrayTaskType, arrayCore.AwsBatchTaskType}, LoadPlugin: createNewExecutorPlugin, IsDefault: false, - DefaultForTaskTypes: []core.TaskType{arrayTaskType, array.AwsBatchTaskType}, + DefaultForTaskTypes: []core.TaskType{arrayTaskType, arrayCore.AwsBatchTaskType}, }) } diff --git a/go/tasks/plugins/array/awsbatch/transformer_test.go b/go/tasks/plugins/array/awsbatch/transformer_test.go index cc67283ca..effea38e6 100644 --- a/go/tasks/plugins/array/awsbatch/transformer_test.go +++ b/go/tasks/plugins/array/awsbatch/transformer_test.go @@ -207,7 +207,7 @@ func TestArrayJobToBatchInput(t *testing.T) { assert.NotNil(t, batchInput) assert.Equal(t, *expectedBatchInput, *batchInput) - taskTemplate.Type = array.AwsBatchTaskType + taskTemplate.Type = arrayCore.AwsBatchTaskType tr.OnReadMatch(mock.Anything).Return(taskTemplate, nil) taskCtx.OnTaskReader().Return(tr) diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 7104f042d..6c7373d9c 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -21,7 +21,6 @@ import ( idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) -const AwsBatchTaskType = "aws-batch" // DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an // ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index cb38461da..d4ccb3858 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -177,7 +177,7 @@ func TestDetermineDiscoverability(t *testing.T) { t.Run("Run AWS Batch single job", func(t *testing.T) { toCache := arrayCore.InvertBitSet(bitarray.NewBitSet(1), 1) - template.Type = AwsBatchTaskType + template.Type = arrayCore.AwsBatchTaskType runDetermineDiscoverabilityTest(t, template, f, &arrayCore.State{ CurrentPhase: arrayCore.PhasePreLaunch, PhaseVersion: core2.DefaultPhaseVersion, diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index cd75f2a37..a712d2285 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -10,7 +10,6 @@ import ( "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus" "github.com/flyteorg/flytestdlib/bitarray" @@ -40,6 +39,8 @@ const ( PhasePermanentFailure ) +const AwsBatchTaskType = "aws-batch" + type State struct { CurrentPhase Phase `json:"phase"` PhaseVersion uint32 `json:"phaseVersion"` @@ -176,7 +177,7 @@ func ToArrayJob(taskTemplate *idlCore.TaskTemplate, taskTypeVersion int32) (*Arr }, nil } - if taskTypeVersion == 0 || taskTemplate.Type == array.AwsBatchTaskType { + if taskTypeVersion == 0 || taskTemplate.Type == AwsBatchTaskType { return &ArrayJob{ Parallelism: 1, Size: 1, diff --git a/go/tasks/plugins/array/inputs.go b/go/tasks/plugins/array/inputs.go index ac0c2acb5..aa525ae6c 100644 --- a/go/tasks/plugins/array/inputs.go +++ b/go/tasks/plugins/array/inputs.go @@ -6,6 +6,7 @@ import ( idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" + arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" "github.com/flyteorg/flytestdlib/storage" ) @@ -20,7 +21,7 @@ func (i arrayJobInputReader) GetInputPath() storage.DataReference { } func GetInputReader(tCtx core.TaskExecutionContext, taskTemplate *idlCore.TaskTemplate) io.InputReader { - if taskTemplate.GetTaskTypeVersion() == 0 && taskTemplate.Type != AwsBatchTaskType { + if taskTemplate.GetTaskTypeVersion() == 0 && taskTemplate.Type != arrayCore.AwsBatchTaskType { // Prior to task type version == 1, dynamic type tasks (including array tasks) would write input files for each // individual array task instance. In this case we use a modified input reader to only pass in the parent input // directory. diff --git a/go/tasks/plugins/array/outputs.go b/go/tasks/plugins/array/outputs.go index 4177fcb61..0d7b2ece1 100644 --- a/go/tasks/plugins/array/outputs.go +++ b/go/tasks/plugins/array/outputs.go @@ -198,7 +198,7 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC finalPhases: finalPhases, outputPaths: tCtx.OutputWriter(), dataStore: tCtx.DataStore(), - isAwsSingleJob: taskTemplate.Type == AwsBatchTaskType, + isAwsSingleJob: taskTemplate.Type == arrayCore.AwsBatchTaskType, }) if err != nil { From e75e0edc2155fc1c51f489357259146d944c7dba Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 7 Feb 2022 09:05:48 +0800 Subject: [PATCH 13/14] Fixed tests Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/transformer_test.go | 1 - go/tasks/plugins/array/inputs_test.go | 11 +++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/array/awsbatch/transformer_test.go b/go/tasks/plugins/array/awsbatch/transformer_test.go index effea38e6..ebbbbb1bd 100644 --- a/go/tasks/plugins/array/awsbatch/transformer_test.go +++ b/go/tasks/plugins/array/awsbatch/transformer_test.go @@ -22,7 +22,6 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/array" "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/awsbatch/config" v12 "k8s.io/api/core/v1" diff --git a/go/tasks/plugins/array/inputs_test.go b/go/tasks/plugins/array/inputs_test.go index 42b0f8703..1026c3aa8 100644 --- a/go/tasks/plugins/array/inputs_test.go +++ b/go/tasks/plugins/array/inputs_test.go @@ -6,6 +6,7 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" pluginsCoreMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" pluginsIOMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -38,4 +39,14 @@ func TestGetInputReader(t *testing.T) { }) assert.Equal(t, inputReader.GetInputPath().String(), "test-data-reference") }) + + t.Run("task_type_version == AwsBatchTaskType", func(t *testing.T) { + taskCtx := &pluginsCoreMock.TaskExecutionContext{} + taskCtx.On("InputReader").Return(inputReader) + + inputReader := GetInputReader(taskCtx, &core.TaskTemplate{ + Type: arrayCore.AwsBatchTaskType, + }) + assert.Equal(t, inputReader.GetInputPath().String(), "test-data-reference") + }) } From 0dd93c23ed2edeca65d58e89b0edb613f88120e0 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 7 Feb 2022 09:34:08 +0800 Subject: [PATCH 14/14] Fixed tests Signed-off-by: Kevin Su --- go/tasks/plugins/array/catalog.go | 1 - go/tasks/plugins/array/core/state.go | 6 +++--- go/tasks/plugins/array/core/state_test.go | 11 +++++++++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 6c7373d9c..4712b1b40 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -21,7 +21,6 @@ import ( idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) - // DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an // ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location // which is different than their original location. To find the original index we construct an indexLookup array. diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index a712d2285..787634ace 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -143,7 +143,7 @@ const ( ) func ToArrayJob(taskTemplate *idlCore.TaskTemplate, taskTypeVersion int32) (*ArrayJob, error) { - if taskTemplate.GetConfig() != nil { + if taskTemplate != nil && taskTemplate.GetConfig() != nil { config := taskTemplate.GetConfig() arrayJob := &ArrayJob{} var err error @@ -163,7 +163,7 @@ func ToArrayJob(taskTemplate *idlCore.TaskTemplate, taskTypeVersion int32) (*Arr } // Keep backward compatibility for those who use arrayJob proto - if taskTemplate.GetCustom() != nil { + if taskTemplate != nil && taskTemplate.GetCustom() != nil { arrayJob := &idlPlugins.ArrayJob{} err := utils.UnmarshalStruct(taskTemplate.GetCustom(), arrayJob) if err != nil { @@ -177,7 +177,7 @@ func ToArrayJob(taskTemplate *idlCore.TaskTemplate, taskTypeVersion int32) (*Arr }, nil } - if taskTypeVersion == 0 || taskTemplate.Type == AwsBatchTaskType { + if taskTypeVersion == 0 || (taskTemplate != nil && taskTemplate.Type == AwsBatchTaskType) { return &ArrayJob{ Parallelism: 1, Size: 1, diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 9e55c6e43..6a3423c59 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -312,6 +312,17 @@ func TestToArrayJob(t *testing.T) { }) }) + t.Run("task_type_version == AwsBatchTaskType", func(t *testing.T) { + taskTemplate := &idlCore.TaskTemplate{Type: AwsBatchTaskType} + arrayJob, err := ToArrayJob(taskTemplate, 1) + assert.NoError(t, err) + assert.True(t, *arrayJob == ArrayJob{ + Parallelism: 1, + Size: 1, + MinSuccesses: 1, + }) + }) + t.Run("ToArrayJob with config", func(t *testing.T) { config := map[string]string{ "Parallelism": "10",