diff --git a/go/tasks/logs/config.go b/go/tasks/logs/config.go index 1f58e72cb..7f9a0769b 100755 --- a/go/tasks/logs/config.go +++ b/go/tasks/logs/config.go @@ -5,12 +5,12 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/config" ) -//go:generate pflags LogConfig +//go:generate pflags LogConfig --default-var=DefaultConfig -// A URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. +// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. type TemplateURI = string -// Log plugins configs +// LogConfig encapsulates plugins' log configs type LogConfig struct { IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"` // Deprecated: Please use CloudwatchTemplateURI @@ -41,14 +41,19 @@ type TemplateLogPluginConfig struct { } var ( - logConfigSection = config.MustRegisterSubSection("logs", &LogConfig{}) + DefaultConfig = LogConfig{ + IsKubernetesEnabled: true, + KubernetesTemplateURI: "http://localhost:30082/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", + } + + logConfigSection = config.MustRegisterSubSection("logs", &DefaultConfig) ) func GetLogConfig() *LogConfig { return logConfigSection.GetConfig().(*LogConfig) } -// This method should be used for unit testing only +// SetLogConfig should be used for unit testing only func SetLogConfig(logConfig *LogConfig) error { return logConfigSection.SetConfig(logConfig) } diff --git a/go/tasks/logs/logging_utils.go b/go/tasks/logs/logging_utils.go index b3cdd953d..f6b3e06db 100755 --- a/go/tasks/logs/logging_utils.go +++ b/go/tasks/logs/logging_utils.go @@ -84,7 +84,7 @@ func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklo }, nil } -// Internal +// InitializeLogPlugins initializes log plugin based on config. func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { // Use a list to maintain order. logPlugins := make([]logPlugin, 0, 2) diff --git a/go/tasks/pluginmachinery/catalog/reader_processor.go b/go/tasks/pluginmachinery/catalog/reader_processor.go index 2fbea9815..bd999050a 100644 --- a/go/tasks/pluginmachinery/catalog/reader_processor.go +++ b/go/tasks/pluginmachinery/catalog/reader_processor.go @@ -80,7 +80,7 @@ func (p ReaderProcessor) Process(ctx context.Context, workItem workqueue.WorkIte wi.cached = true - logger.Debugf(ctx, "Successfully wrote to catalog. Key [%v]", wi.key) + logger.Debugf(ctx, "Successfully read from catalog. Key [%v]", wi.key) return workqueue.WorkStatusSucceeded, nil } diff --git a/go/tasks/plugins/array/awsbatch/executor_test.go b/go/tasks/plugins/array/awsbatch/executor_test.go index 95afb17e6..ccb257714 100644 --- a/go/tasks/plugins/array/awsbatch/executor_test.go +++ b/go/tasks/plugins/array/awsbatch/executor_test.go @@ -5,6 +5,10 @@ import ( "reflect" "testing" + mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + + "github.com/flyteorg/flytestdlib/storage" + v1 "k8s.io/api/core/v1" "github.com/flyteorg/flytestdlib/promutils" @@ -123,11 +127,21 @@ func TestExecutor_Handle(t *testing.T) { tMeta.OnGetTaskExecutionID().Return(tID) tMeta.OnGetOverrides().Return(overrides) + dataStore, err := storage.NewDataStore(&storage.Config{ + Type: storage.TypeMemory, + }, promutils.NewTestScope()) + assert.NoError(t, err) + + inputReader := &mocks2.InputReader{} + inputReader.OnGetInputPrefixPath().Return("/inputs.pb") + tCtx := &pluginMocks.TaskExecutionContext{} tCtx.OnPluginStateReader().Return(pluginStateReader) tCtx.OnPluginStateWriter().Return(pluginStateWriter) tCtx.OnTaskReader().Return(tr) tCtx.OnTaskExecutionMetadata().Return(tMeta) + tCtx.OnDataStore().Return(dataStore) + tCtx.OnInputReader().Return(inputReader) transition, err := e.Handle(ctx, tCtx) assert.NoError(t, err) diff --git a/go/tasks/plugins/array/awsbatch/job_config.go b/go/tasks/plugins/array/awsbatch/job_config.go index 0812181a8..40ba3de70 100644 --- a/go/tasks/plugins/array/awsbatch/job_config.go +++ b/go/tasks/plugins/array/awsbatch/job_config.go @@ -12,6 +12,7 @@ import ( const ( // Keep these in-sync with flyteAdmin @ // https://github.com/flyteorg/flyteadmin/blob/d1c61c34f62d8ee51964f47877802d070dfa9e98/pkg/manager/impl/execution_manager.go#L42-L43 + PrimaryTaskQueueKey = "primary_queue" DynamicTaskQueueKey = "dynamic_queue" ChildTaskQueueKey = "child_queue" diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index d04e0e83d..781f87227 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -21,8 +21,8 @@ import ( idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) -// Check if there are any previously cached tasks. If there are we will only submit an ArrayJob for the -// non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location +// DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an +// ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location // which is different than their original location. To find the original index we construct an indexLookup array. // The subtask can find it's original index value in indexLookup[JOB_ARRAY_INDEX] where JOB_ARRAY_INDEX is an // environment variable in the pod @@ -44,30 +44,43 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex } var arrayJobSize int64 + var inputReaders []io.InputReader // Save this in the state if taskTemplate.TaskTypeVersion == 0 { state = state.SetOriginalArraySize(arrayJob.Size) arrayJobSize = arrayJob.Size state = state.SetOriginalMinSuccesses(arrayJob.GetMinSuccesses()) + + // build input readers + inputReaders, err = ConstructRemoteFileInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJobSize)) + if err != nil { + return state, err + } } else { inputs, err := tCtx.InputReader().Get(ctx) if err != nil { return state, errors.Errorf(errors.MetadataAccessFailed, "Could not read inputs and therefore failed to determine array job size") } + size := 0 - for _, literal := range inputs.Literals { - if literal.GetCollection() != nil { + var literalCollection *idlCore.LiteralCollection + var discoveredInputName string + for inputName, literal := range inputs.Literals { + if literalCollection = literal.GetCollection(); literalCollection != nil { size = len(literal.GetCollection().Literals) + discoveredInputName = inputName break } } + if size == 0 { // Something is wrong, we should have inferred the array size when it is not specified by the size of the // input collection (for any input value). Non-collection type inputs are not currently supported for // taskTypeVersion > 0. return state, errors.Errorf(errors.BadTaskSpecification, "Unable to determine array size from inputs") } + minSuccesses := math.Ceil(float64(arrayJob.GetMinSuccessRatio()) * float64(size)) logger.Debugf(ctx, "Computed state: size [%d] and minSuccesses [%d]", int64(size), int64(minSuccesses)) @@ -76,6 +89,9 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex state = state.SetOriginalMinSuccesses(int64(minSuccesses)) arrayJobSize = int64(size) + + // build input readers + inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName) } // If the task is not discoverable, then skip data catalog work and move directly to launch @@ -90,11 +106,6 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex } // Otherwise, run the data catalog steps - create and submit work items to the catalog processor, - // build input readers - inputReaders, err := ConstructInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJobSize)) - if err != nil { - return state, err - } // build output writers outputWriters, err := ConstructOutputWriters(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), int(arrayJobSize)) @@ -182,22 +193,35 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state return state.SetPhase(phaseOnSuccess, core.DefaultPhaseVersion).SetReason("Task is not discoverable."), nil } - // Extract the custom plugin pb - arrayJob, err := arrayCore.ToArrayJob(taskTemplate.GetCustom(), taskTemplate.TaskTypeVersion) - if err != nil { - return state, err - } else if arrayJob == nil { - return state, errors.Errorf(errors.BadTaskSpecification, "Could not extract custom array job") - } + var inputReaders []io.InputReader + arrayJobSize := int(state.GetOriginalArraySize()) + if taskTemplate.TaskTypeVersion == 0 { + // input readers + inputReaders, err = ConstructRemoteFileInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), arrayJobSize) + if err != nil { + return nil, err + } + } else { + inputs, err := tCtx.InputReader().Get(ctx) + if err != nil { + return state, errors.Errorf(errors.MetadataAccessFailed, "Could not read inputs and therefore failed to determine array job size") + } - // input readers - inputReaders, err := ConstructInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJob.Size)) - if err != nil { - return nil, err + var literalCollection *idlCore.LiteralCollection + var discoveredInputName string + for inputName, literal := range inputs.Literals { + if literalCollection = literal.GetCollection(); literalCollection != nil { + discoveredInputName = inputName + break + } + } + + // build input readers + inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName) } // output reader - outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), int(arrayJob.Size)) + outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), arrayJobSize) if err != nil { return nil, err } @@ -389,7 +413,22 @@ func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskRe return workItems, nil } -func ConstructInputReaders(ctx context.Context, dataStore *storage.DataStore, inputPrefix storage.DataReference, +// ConstructStaticInputReaders constructs input readers that comply with the io.InputReader interface but have their +// inputs already populated. +func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputs []*idlCore.Literal, inputName string) []io.InputReader { + inputReaders := make([]io.InputReader, 0, len(inputs)) + for i := 0; i < len(inputs); i++ { + inputReaders = append(inputReaders, NewStaticInputReader(inputPaths, &idlCore.LiteralMap{ + Literals: map[string]*idlCore.Literal{ + inputName: inputs[i], + }, + })) + } + + return inputReaders +} + +func ConstructRemoteFileInputReaders(ctx context.Context, dataStore *storage.DataStore, inputPrefix storage.DataReference, size int) ([]io.InputReader, error) { inputReaders := make([]io.InputReader, 0, size) diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 2d82eb180..3703d6480 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -317,7 +317,7 @@ func NewPhasesCompactArray(count uint) bitarray.CompactArray { return a } -// Compute the original index of a sub-task. +// CalculateOriginalIndex computes the original index of a sub-task. func CalculateOriginalIndex(childIdx int, toCache *bitarray.BitSet) int { var sum = 0 for i := uint(0); i < toCache.Cap(); i++ { diff --git a/go/tasks/plugins/array/inputs.go b/go/tasks/plugins/array/inputs.go index 539b200c7..248398b42 100644 --- a/go/tasks/plugins/array/inputs.go +++ b/go/tasks/plugins/array/inputs.go @@ -1,31 +1,48 @@ package array import ( + "context" + idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flytestdlib/storage" ) -// A proxy inputreader that overrides the inputpath to be the inputpathprefix for array jobs +// arrayJobInputReader is a proxy inputreader that overrides the inputpath to be the inputpathprefix for array jobs type arrayJobInputReader struct { io.InputReader } -// We override the inputpath to return the prefix path for array jobs +// GetInputPath overrides the inputpath to return the prefix path for array jobs func (i arrayJobInputReader) GetInputPath() storage.DataReference { return i.GetInputPrefixPath() } func GetInputReader(tCtx core.TaskExecutionContext, taskTemplate *idlCore.TaskTemplate) io.InputReader { - var inputReader io.InputReader if taskTemplate.GetTaskTypeVersion() == 0 { // Prior to task type version == 1, dynamic type tasks (including array tasks) would write input files for each // individual array task instance. In this case we use a modified input reader to only pass in the parent input // directory. - inputReader = arrayJobInputReader{tCtx.InputReader()} - } else { - inputReader = tCtx.InputReader() + return arrayJobInputReader{tCtx.InputReader()} } - return inputReader + + return tCtx.InputReader() +} + +// StaticInputReader complies with the io.InputReader interface but has the input already populated. +type StaticInputReader struct { + io.InputFilePaths + input *idlCore.LiteralMap +} + +func NewStaticInputReader(inputPaths io.InputFilePaths, input *idlCore.LiteralMap) StaticInputReader { + return StaticInputReader{ + InputFilePaths: inputPaths, + input: input, + } +} + +func (i StaticInputReader) Get(_ context.Context) (*idlCore.LiteralMap, error) { + return i.input, nil } diff --git a/go/tasks/plugins/array/k8s/config.go b/go/tasks/plugins/array/k8s/config.go index df02803a7..ad039b8a7 100644 --- a/go/tasks/plugins/array/k8s/config.go +++ b/go/tasks/plugins/array/k8s/config.go @@ -37,6 +37,9 @@ var ( MaxRetries: 5, Workers: 10, }, + LogConfig: LogConfig{ + Config: logs.DefaultConfig, + }, } configSection = pluginsConfig.MustRegisterSubSection(configSectionKey, defaultConfig) @@ -76,8 +79,8 @@ func (auth Auth) GetToken() (string, error) { return string(token), nil } +// RemoteClusterConfig reads secret values from paths specified in the config to initialize a Kubernetes rest client Config. // TODO: Move logic to flytestdlib -// Reads secret values from paths specified in the config to initialize a Kubernetes rest client Config. func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error) { tokenString, err := auth.GetToken() if err != nil { @@ -106,7 +109,7 @@ func GetK8sClient(config ClusterConfig) (client.Client, error) { return client.New(kubeConf, client.Options{}) } -// Defines custom config for K8s Array plugin +// Config defines custom config for K8s Array plugin type Config struct { DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."` MaxErrorStringLength int `json:"maxErrorLength" pflag:",Determines the maximum length of the error string returned for the array."` diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index f8a1db07d..cea135181 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -71,6 +71,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon existingPhase := core.Phases[existingPhaseIdx] indexStr := strconv.Itoa(childIdx) podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr) + originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache()) if existingPhase.IsTerminal() { // If we get here it means we have already "processed" this terminal phase since we will only persist @@ -84,7 +85,6 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon } newArrayStatus.Summary.Inc(existingPhase) newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(existingPhase)) - originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache()) phaseInfo, err := FetchPodStatusAndLogs(ctx, kubeClient, k8sTypes.NamespacedName{ @@ -111,9 +111,11 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon NewArrayStatus: newArrayStatus, Config: config, ChildIdx: childIdx, + OriginalIndex: originalIdx, MessageCollector: &msg, SubTaskIDs: subTaskIDs, } + // The first time we enter this state we will launch every subtask. On subsequent rounds, the pod // has already been created so we return a Success value and continue with the Monitor step. var launchResult LaunchResult @@ -136,7 +138,6 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon return currentState, logLinks, subTaskIDs, nil } - var monitorResult MonitorResult monitorResult, taskLogs, err := task.Monitor(ctx, tCtx, kubeClient, dataStore, outputPrefix, baseOutputDataSandbox, logPlugin) if len(taskLogs) > 0 { diff --git a/go/tasks/plugins/array/k8s/monitor_test.go b/go/tasks/plugins/array/k8s/monitor_test.go index 96db6b8ce..8f7c3414c 100644 --- a/go/tasks/plugins/array/k8s/monitor_test.go +++ b/go/tasks/plugins/array/k8s/monitor_test.go @@ -202,6 +202,7 @@ func TestCheckSubTasksState(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: arrayCore.NewPhasesCompactArray(uint(5)), }, + IndexesToCache: bitarray.NewBitSet(5), }) assert.Nil(t, err) diff --git a/go/tasks/plugins/array/k8s/task.go b/go/tasks/plugins/array/k8s/task.go index c5b5f7ab3..3af7721da 100644 --- a/go/tasks/plugins/array/k8s/task.go +++ b/go/tasks/plugins/array/k8s/task.go @@ -32,6 +32,7 @@ type Task struct { NewArrayStatus *arraystatus.ArrayStatus Config *Config ChildIdx int + OriginalIndex int MessageCollector *errorcollector.ErrorMessageCollector SubTaskIDs []*string } @@ -103,9 +104,11 @@ func (t Task) Launch(ctx context.Context, tCtx core.TaskExecutionContext, kubeCl if t.Config.RemoteClusterConfig.Enabled { podTemplate.OwnerReferences = nil } + if len(podTemplate.Spec.Containers) == 0 { return LaunchError, errors2.Wrapf(ErrReplaceCmdTemplate, err, "No containers found in podSpec.") } + containerIndex, err := getTaskContainerIndex(&podTemplate) if err != nil { return LaunchError, err @@ -113,12 +116,24 @@ func (t Task) Launch(ctx context.Context, tCtx core.TaskExecutionContext, kubeCl indexStr := strconv.Itoa(t.ChildIdx) podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr) + allocationStatus, err := allocateResource(ctx, tCtx, t.Config, podName) + if err != nil { + return LaunchError, err + } + + if allocationStatus != core.AllocationStatusGranted { + t.NewArrayStatus.Detailed.SetItem(t.ChildIdx, bitarray.Item(core.PhaseWaitingForResources)) + t.NewArrayStatus.Summary.Inc(core.PhaseWaitingForResources) + return LaunchWaiting, nil + } pod := podTemplate.DeepCopy() pod.Name = podName pod.Spec.Containers[containerIndex].Env = append(pod.Spec.Containers[containerIndex].Env, corev1.EnvVar{ - Name: FlyteK8sArrayIndexVarName, - Value: indexStr, + Name: FlyteK8sArrayIndexVarName, + // Use the OriginalIndex which represents the position of the subtask in the original user's map task before + // compacting indexes caused by catalog-cache-check. + Value: strconv.Itoa(t.OriginalIndex), }) pod.Spec.Containers[containerIndex].Env = append(pod.Spec.Containers[containerIndex].Env, arrayJobEnvVars...) @@ -128,19 +143,11 @@ func (t Task) Launch(ctx context.Context, tCtx core.TaskExecutionContext, kubeCl } else if taskTemplate == nil { return LaunchError, errors2.Wrapf(ErrGetTaskTypeVersion, err, "Missing task template") } + pod = ApplyPodPolicies(ctx, t.Config, pod) pod = applyNodeSelectorLabels(ctx, t.Config, pod) pod = applyPodTolerations(ctx, t.Config, pod) pod = addPodFinalizer(pod) - allocationStatus, err := allocateResource(ctx, tCtx, t.Config, podName) - if err != nil { - return LaunchError, err - } - if allocationStatus != core.AllocationStatusGranted { - t.NewArrayStatus.Detailed.SetItem(t.ChildIdx, bitarray.Item(core.PhaseWaitingForResources)) - t.NewArrayStatus.Summary.Inc(core.PhaseWaitingForResources) - return LaunchWaiting, nil - } // Check for existing pods to prevent unnecessary Resource-Quota usage: https://github.com/kubernetes/kubernetes/issues/76787 existingPod := &corev1.Pod{} diff --git a/go/tasks/plugins/array/k8s/transformer.go b/go/tasks/plugins/array/k8s/transformer.go index 849d5f98e..8b10c54d3 100644 --- a/go/tasks/plugins/array/k8s/transformer.go +++ b/go/tasks/plugins/array/k8s/transformer.go @@ -33,7 +33,7 @@ type arrayTaskContext struct { arrayInputReader io.InputReader } -// Overrides the TaskExecutionContext from base and returns a specialized context for Array +// InputReader overrides the TaskExecutionContext from base and returns a specialized context for Array func (a *arrayTaskContext) InputReader() io.InputReader { return a.arrayInputReader } @@ -94,8 +94,8 @@ func buildPodMapTask(task *idlCore.TaskTemplate, metadata core.TaskExecutionMeta return pod, nil } -// Note that Name is not set on the result object. -// It's up to the caller to set the Name before creating the object in K8s. +// FlyteArrayJobToK8sPodTemplate returns a pod template for the given task context. Note that Name is not set on the +// result object. It's up to the caller to set the Name before creating the object in K8s. func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionContext, namespaceTemplate string) ( podTemplate v1.Pod, job *idlPlugins.ArrayJob, err error) { @@ -116,6 +116,7 @@ func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionC TaskExecutionContext: tCtx, arrayInputReader: array.GetInputReader(tCtx, taskTemplate), } + var arrayJob *idlPlugins.ArrayJob if taskTemplate.GetCustom() != nil { arrayJob, err = core2.ToArrayJob(taskTemplate.GetCustom(), taskTemplate.TaskTypeVersion) @@ -140,17 +141,20 @@ func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionC OwnerReferences: []metav1.OwnerReference{tCtx.TaskExecutionMetadata().GetOwnerReference()}, }, } + if taskTemplate.GetContainer() != nil { podSpec, err := flytek8s.ToK8sPodSpecWithInterruptible(ctx, arrTCtx, true) if err != nil { return v1.Pod{}, nil, err } + pod.Spec = *podSpec } else if taskTemplate.GetK8SPod() != nil { k8sPod, err := buildPodMapTask(taskTemplate, tCtx.TaskExecutionMetadata()) if err != nil { return v1.Pod{}, nil, err } + pod.Labels = utils.UnionMaps(pod.Labels, k8sPod.Labels) pod.Annotations = utils.UnionMaps(pod.Annotations, k8sPod.Annotations) pod.Spec = k8sPod.Spec @@ -159,12 +163,14 @@ func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionC if err != nil { return v1.Pod{}, nil, err } + templateParameters := template.Parameters{ TaskExecMetadata: tCtx.TaskExecutionMetadata(), Inputs: arrTCtx.arrayInputReader, OutputPath: tCtx.OutputWriter(), Task: tCtx.TaskReader(), } + err = flytek8s.AddFlyteCustomizationsToContainer( ctx, templateParameters, flytek8s.ResourceCustomizationModeMergeExistingResources, &pod.Spec.Containers[containerIndex]) if err != nil {