diff --git a/flytepropeller/pkg/controller/nodes/array/execution_context.go b/flytepropeller/pkg/controller/nodes/array/execution_context.go index 4fb5a8a214..9bd42c0b6b 100644 --- a/flytepropeller/pkg/controller/nodes/array/execution_context.go +++ b/flytepropeller/pkg/controller/nodes/array/execution_context.go @@ -23,11 +23,18 @@ func (a *arrayExecutionContext) GetExecutionConfig() v1alpha1.ExecutionConfig { func newArrayExecutionContext(executionContext executors.ExecutionContext, subNodeIndex int) *arrayExecutionContext { executionConfig := executionContext.GetExecutionConfig() - if executionConfig.EnvironmentVariables == nil { - executionConfig.EnvironmentVariables = make(map[string]string) + + // since maps are all reference types in golang if we are going to modify the + // EnvironmentVariables for each subNode we need to at least shallow copy the map to ensure + // there are no concurrent modifications during parallelized evaluation of subNodes + environmentVariables := executionConfig.EnvironmentVariables + executionConfig.EnvironmentVariables = make(map[string]string) + for key, value := range environmentVariables { + executionConfig.EnvironmentVariables[key] = value } executionConfig.EnvironmentVariables[JobIndexVarName] = FlyteK8sArrayIndexVarName executionConfig.EnvironmentVariables[FlyteK8sArrayIndexVarName] = strconv.Itoa(subNodeIndex) + executionConfig.MaxParallelism = 0 // hardcoded to 0 because parallelism is handled by the array node return &arrayExecutionContext{