From 8d1d0e43d79e2172ec49ee48df7b30949429c045 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 18 Dec 2024 23:39:30 +0800 Subject: [PATCH] DeckStatus Signed-off-by: Future-Outlier --- .../pkg/controller/nodes/task/handler.go | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index d2d538549d..3764a340ff 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/debug" + "strconv" "strings" "time" @@ -43,6 +44,14 @@ import ( const pluginContextKey = contextutils.Key("plugin") const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK") +type DeckStatus int + +const ( + DeckUnknown DeckStatus = iota + DeckEnabled + DeckDisabled +) + type metrics struct { pluginPanics labeled.Counter unsupportedTaskType labeled.Counter @@ -423,19 +432,32 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics return t.taskMetricsMap[metricNameKey], nil } -func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (bool, error) { +func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { template, err := tCtx.tr.Read(ctx) if err != nil { - return false, regErrors.Wrapf(err, "failed to read task template") + return DeckUnknown, regErrors.Wrapf(err, "failed to read task template") } templateConfig := template.GetConfig() if templateConfig == nil { - return false, nil + return DeckUnknown, nil + } + + rawValue, ok := templateConfig[FLYTE_ENABLE_DECK] + if !ok { + return DeckUnknown, nil } - deckEnabled := strings.ToLower(templateConfig[FLYTE_ENABLE_DECK]) - return deckEnabled == "1" || deckEnabled == "t" || deckEnabled == "true", nil + rawValue = strings.ToLower(rawValue) + deckEnabled, err := strconv.ParseBool(rawValue) + if err != nil { + return DeckUnknown, nil + } + + if deckEnabled { + return DeckEnabled, nil + } + return DeckDisabled, nil } func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) { @@ -526,11 +548,11 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // The deck should be accessible even if the task is still running or has failed. // It's possible that the deck URI may not exist in remote storage yet or will never exist. // So, it is console's responsibility to handle the case when the deck URI actually does not exist. - deckEnabled, err := IsDeckEnabled(ctx, tCtx) + deckStatus, err := IsDeckEnabled(ctx, tCtx) if err != nil { return nil, err } - if deckEnabled { + if deckStatus == DeckEnabled { pluginTrns.AddDeckURI(tCtx) } @@ -551,7 +573,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: // This is for backward compatibility with older Flytekit versions. - if !deckEnabled { + if deckStatus == DeckUnknown { err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) } if err != nil { @@ -601,7 +623,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta fallthrough case pluginCore.PhasePermanentFailure: // This is for backward compatibility with older Flytekit versions. - if !deckEnabled { + if deckStatus == DeckUnknown { err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) } if err != nil {