Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Fix MapTasks caching and default Log Links (#227)
Browse files Browse the repository at this point in the history
* Fix input caching for catalog lookup in map tasks

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Use static input readers for catalog write

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Rename

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Filter by cachable subtasks

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Unit tests and PR Comments

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* PR comments

Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Dec 9, 2021
1 parent 1bf97e9 commit 973a51a
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 55 deletions.
15 changes: 10 additions & 5 deletions go/tasks/logs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"github.com/flyteorg/flyteplugins/go/tasks/config"
)

//go:generate pflags LogConfig
//go:generate pflags LogConfig --default-var=DefaultConfig

// A URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

// Log plugins configs
// LogConfig encapsulates plugins' log configs
type LogConfig struct {
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
// Deprecated: Please use CloudwatchTemplateURI
Expand Down Expand Up @@ -41,14 +41,19 @@ type TemplateLogPluginConfig struct {
}

var (
logConfigSection = config.MustRegisterSubSection("logs", &LogConfig{})
DefaultConfig = LogConfig{
IsKubernetesEnabled: true,
KubernetesTemplateURI: "http://localhost:30082/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}",
}

logConfigSection = config.MustRegisterSubSection("logs", &DefaultConfig)
)

func GetLogConfig() *LogConfig {
return logConfigSection.GetConfig().(*LogConfig)
}

// This method should be used for unit testing only
// SetLogConfig should be used for unit testing only
func SetLogConfig(logConfig *LogConfig) error {
return logConfigSection.SetConfig(logConfig)
}
2 changes: 1 addition & 1 deletion go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklo
}, nil
}

// Internal
// InitializeLogPlugins initializes log plugin based on config.
func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
// Use a list to maintain order.
logPlugins := make([]logPlugin, 0, 2)
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/catalog/reader_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (p ReaderProcessor) Process(ctx context.Context, workItem workqueue.WorkIte

wi.cached = true

logger.Debugf(ctx, "Successfully wrote to catalog. Key [%v]", wi.key)
logger.Debugf(ctx, "Successfully read from catalog. Key [%v]", wi.key)
return workqueue.WorkStatusSucceeded, nil
}

Expand Down
14 changes: 14 additions & 0 deletions go/tasks/plugins/array/awsbatch/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"reflect"
"testing"

mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"

"github.com/flyteorg/flytestdlib/storage"

v1 "k8s.io/api/core/v1"

"github.com/flyteorg/flytestdlib/promutils"
Expand Down Expand Up @@ -123,11 +127,21 @@ func TestExecutor_Handle(t *testing.T) {
tMeta.OnGetTaskExecutionID().Return(tID)
tMeta.OnGetOverrides().Return(overrides)

dataStore, err := storage.NewDataStore(&storage.Config{
Type: storage.TypeMemory,
}, promutils.NewTestScope())
assert.NoError(t, err)

inputReader := &mocks2.InputReader{}
inputReader.OnGetInputPrefixPath().Return("/inputs.pb")

tCtx := &pluginMocks.TaskExecutionContext{}
tCtx.OnPluginStateReader().Return(pluginStateReader)
tCtx.OnPluginStateWriter().Return(pluginStateWriter)
tCtx.OnTaskReader().Return(tr)
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnDataStore().Return(dataStore)
tCtx.OnInputReader().Return(inputReader)

transition, err := e.Handle(ctx, tCtx)
assert.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/array/awsbatch/job_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
const (
// Keep these in-sync with flyteAdmin @
// https://github.com/flyteorg/flyteadmin/blob/d1c61c34f62d8ee51964f47877802d070dfa9e98/pkg/manager/impl/execution_manager.go#L42-L43

PrimaryTaskQueueKey = "primary_queue"
DynamicTaskQueueKey = "dynamic_queue"
ChildTaskQueueKey = "child_queue"
Expand Down
83 changes: 61 additions & 22 deletions go/tasks/plugins/array/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

// Check if there are any previously cached tasks. If there are we will only submit an ArrayJob for the
// non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location
// DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an
// ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location
// which is different than their original location. To find the original index we construct an indexLookup array.
// The subtask can find it's original index value in indexLookup[JOB_ARRAY_INDEX] where JOB_ARRAY_INDEX is an
// environment variable in the pod
Expand All @@ -44,30 +44,43 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex
}

var arrayJobSize int64
var inputReaders []io.InputReader

// Save this in the state
if taskTemplate.TaskTypeVersion == 0 {
state = state.SetOriginalArraySize(arrayJob.Size)
arrayJobSize = arrayJob.Size
state = state.SetOriginalMinSuccesses(arrayJob.GetMinSuccesses())

// build input readers
inputReaders, err = ConstructRemoteFileInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJobSize))
if err != nil {
return state, err
}
} else {
inputs, err := tCtx.InputReader().Get(ctx)
if err != nil {
return state, errors.Errorf(errors.MetadataAccessFailed, "Could not read inputs and therefore failed to determine array job size")
}

size := 0
for _, literal := range inputs.Literals {
if literal.GetCollection() != nil {
var literalCollection *idlCore.LiteralCollection
var discoveredInputName string
for inputName, literal := range inputs.Literals {
if literalCollection = literal.GetCollection(); literalCollection != nil {
size = len(literal.GetCollection().Literals)
discoveredInputName = inputName
break
}
}

if size == 0 {
// Something is wrong, we should have inferred the array size when it is not specified by the size of the
// input collection (for any input value). Non-collection type inputs are not currently supported for
// taskTypeVersion > 0.
return state, errors.Errorf(errors.BadTaskSpecification, "Unable to determine array size from inputs")
}

minSuccesses := math.Ceil(float64(arrayJob.GetMinSuccessRatio()) * float64(size))

logger.Debugf(ctx, "Computed state: size [%d] and minSuccesses [%d]", int64(size), int64(minSuccesses))
Expand All @@ -76,6 +89,9 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex
state = state.SetOriginalMinSuccesses(int64(minSuccesses))

arrayJobSize = int64(size)

// build input readers
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName)
}

// If the task is not discoverable, then skip data catalog work and move directly to launch
Expand All @@ -90,11 +106,6 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex
}

// Otherwise, run the data catalog steps - create and submit work items to the catalog processor,
// build input readers
inputReaders, err := ConstructInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJobSize))
if err != nil {
return state, err
}

// build output writers
outputWriters, err := ConstructOutputWriters(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), int(arrayJobSize))
Expand Down Expand Up @@ -182,22 +193,35 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state
return state.SetPhase(phaseOnSuccess, core.DefaultPhaseVersion).SetReason("Task is not discoverable."), nil
}

// Extract the custom plugin pb
arrayJob, err := arrayCore.ToArrayJob(taskTemplate.GetCustom(), taskTemplate.TaskTypeVersion)
if err != nil {
return state, err
} else if arrayJob == nil {
return state, errors.Errorf(errors.BadTaskSpecification, "Could not extract custom array job")
}
var inputReaders []io.InputReader
arrayJobSize := int(state.GetOriginalArraySize())
if taskTemplate.TaskTypeVersion == 0 {
// input readers
inputReaders, err = ConstructRemoteFileInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), arrayJobSize)
if err != nil {
return nil, err
}
} else {
inputs, err := tCtx.InputReader().Get(ctx)
if err != nil {
return state, errors.Errorf(errors.MetadataAccessFailed, "Could not read inputs and therefore failed to determine array job size")
}

// input readers
inputReaders, err := ConstructInputReaders(ctx, tCtx.DataStore(), tCtx.InputReader().GetInputPrefixPath(), int(arrayJob.Size))
if err != nil {
return nil, err
var literalCollection *idlCore.LiteralCollection
var discoveredInputName string
for inputName, literal := range inputs.Literals {
if literalCollection = literal.GetCollection(); literalCollection != nil {
discoveredInputName = inputName
break
}
}

// build input readers
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName)
}

// output reader
outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), int(arrayJob.Size))
outputReaders, err := ConstructOutputReaders(ctx, tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), arrayJobSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -389,7 +413,22 @@ func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskRe
return workItems, nil
}

func ConstructInputReaders(ctx context.Context, dataStore *storage.DataStore, inputPrefix storage.DataReference,
// ConstructStaticInputReaders constructs input readers that comply with the io.InputReader interface but have their
// inputs already populated.
func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputs []*idlCore.Literal, inputName string) []io.InputReader {
inputReaders := make([]io.InputReader, 0, len(inputs))
for i := 0; i < len(inputs); i++ {
inputReaders = append(inputReaders, NewStaticInputReader(inputPaths, &idlCore.LiteralMap{
Literals: map[string]*idlCore.Literal{
inputName: inputs[i],
},
}))
}

return inputReaders
}

func ConstructRemoteFileInputReaders(ctx context.Context, dataStore *storage.DataStore, inputPrefix storage.DataReference,
size int) ([]io.InputReader, error) {

inputReaders := make([]io.InputReader, 0, size)
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func NewPhasesCompactArray(count uint) bitarray.CompactArray {
return a
}

// Compute the original index of a sub-task.
// CalculateOriginalIndex computes the original index of a sub-task.
func CalculateOriginalIndex(childIdx int, toCache *bitarray.BitSet) int {
var sum = 0
for i := uint(0); i < toCache.Cap(); i++ {
Expand Down
31 changes: 24 additions & 7 deletions go/tasks/plugins/array/inputs.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
package array

import (
"context"

idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flytestdlib/storage"
)

// A proxy inputreader that overrides the inputpath to be the inputpathprefix for array jobs
// arrayJobInputReader is a proxy inputreader that overrides the inputpath to be the inputpathprefix for array jobs
type arrayJobInputReader struct {
io.InputReader
}

// We override the inputpath to return the prefix path for array jobs
// GetInputPath overrides the inputpath to return the prefix path for array jobs
func (i arrayJobInputReader) GetInputPath() storage.DataReference {
return i.GetInputPrefixPath()
}

func GetInputReader(tCtx core.TaskExecutionContext, taskTemplate *idlCore.TaskTemplate) io.InputReader {
var inputReader io.InputReader
if taskTemplate.GetTaskTypeVersion() == 0 {
// Prior to task type version == 1, dynamic type tasks (including array tasks) would write input files for each
// individual array task instance. In this case we use a modified input reader to only pass in the parent input
// directory.
inputReader = arrayJobInputReader{tCtx.InputReader()}
} else {
inputReader = tCtx.InputReader()
return arrayJobInputReader{tCtx.InputReader()}
}
return inputReader

return tCtx.InputReader()
}

// StaticInputReader complies with the io.InputReader interface but has the input already populated.
type StaticInputReader struct {
io.InputFilePaths
input *idlCore.LiteralMap
}

func NewStaticInputReader(inputPaths io.InputFilePaths, input *idlCore.LiteralMap) StaticInputReader {
return StaticInputReader{
InputFilePaths: inputPaths,
input: input,
}
}

func (i StaticInputReader) Get(_ context.Context) (*idlCore.LiteralMap, error) {
return i.input, nil
}
7 changes: 5 additions & 2 deletions go/tasks/plugins/array/k8s/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (
MaxRetries: 5,
Workers: 10,
},
LogConfig: LogConfig{
Config: logs.DefaultConfig,
},
}

configSection = pluginsConfig.MustRegisterSubSection(configSectionKey, defaultConfig)
Expand Down Expand Up @@ -76,8 +79,8 @@ func (auth Auth) GetToken() (string, error) {
return string(token), nil
}

// RemoteClusterConfig reads secret values from paths specified in the config to initialize a Kubernetes rest client Config.
// TODO: Move logic to flytestdlib
// Reads secret values from paths specified in the config to initialize a Kubernetes rest client Config.
func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error) {
tokenString, err := auth.GetToken()
if err != nil {
Expand Down Expand Up @@ -106,7 +109,7 @@ func GetK8sClient(config ClusterConfig) (client.Client, error) {
return client.New(kubeConf, client.Options{})
}

// Defines custom config for K8s Array plugin
// Config defines custom config for K8s Array plugin
type Config struct {
DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."`
MaxErrorStringLength int `json:"maxErrorLength" pflag:",Determines the maximum length of the error string returned for the array."`
Expand Down
5 changes: 3 additions & 2 deletions go/tasks/plugins/array/k8s/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
existingPhase := core.Phases[existingPhaseIdx]
indexStr := strconv.Itoa(childIdx)
podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr)
originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache())

if existingPhase.IsTerminal() {
// If we get here it means we have already "processed" this terminal phase since we will only persist
Expand All @@ -84,7 +85,6 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
}
newArrayStatus.Summary.Inc(existingPhase)
newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(existingPhase))
originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache())

phaseInfo, err := FetchPodStatusAndLogs(ctx, kubeClient,
k8sTypes.NamespacedName{
Expand All @@ -111,9 +111,11 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
NewArrayStatus: newArrayStatus,
Config: config,
ChildIdx: childIdx,
OriginalIndex: originalIdx,
MessageCollector: &msg,
SubTaskIDs: subTaskIDs,
}

// The first time we enter this state we will launch every subtask. On subsequent rounds, the pod
// has already been created so we return a Success value and continue with the Monitor step.
var launchResult LaunchResult
Expand All @@ -136,7 +138,6 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
return currentState, logLinks, subTaskIDs, nil
}

var monitorResult MonitorResult
monitorResult, taskLogs, err := task.Monitor(ctx, tCtx, kubeClient, dataStore, outputPrefix, baseOutputDataSandbox, logPlugin)

if len(taskLogs) > 0 {
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/array/k8s/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func TestCheckSubTasksState(t *testing.T) {
ArrayStatus: arraystatus.ArrayStatus{
Detailed: arrayCore.NewPhasesCompactArray(uint(5)),
},
IndexesToCache: bitarray.NewBitSet(5),
})

assert.Nil(t, err)
Expand Down
Loading

0 comments on commit 973a51a

Please sign in to comment.