Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Make execution transformer configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Pradithya Aria Pura <[email protected]>
  • Loading branch information
pradithya committed May 6, 2023
1 parent 93c4007 commit 3adb19b
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 19 deletions.
7 changes: 6 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,12 @@ func (m *ExecutionManager) ListExecutions(
logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err)
return nil, err
}
executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions)

listExecutionTransformer := &transformers.ExecutionTransformerOptions{
TrimErrorMessage: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages,
MaxErrorMessageLength: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength,
}
executionList, err := transformers.FromExecutionModels(output.Executions, listExecutionTransformer)
if err != nil {
logger.Errorf(ctx,
"Failed to transform execution models [%+v] with err: %v", output.Executions, err)
Expand Down
7 changes: 6 additions & 1 deletion pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context,

func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Context, nodeExecutionModels []models.NodeExecution) ([]*admin.NodeExecution, error) {
nodeExecutions := make([]*admin.NodeExecution, len(nodeExecutionModels))
listExecutionTransformer := &transformers.ExecutionTransformerOptions{
TrimErrorMessage: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages,
MaxErrorMessageLength: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength,
}

for idx, nodeExecutionModel := range nodeExecutionModels {
nodeExecution, err := m.transformNodeExecutionModel(ctx, nodeExecutionModel, &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Expand All @@ -341,7 +346,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Conte
Name: nodeExecutionModel.Name,
},
NodeId: nodeExecutionModel.NodeID,
}, transformers.ListExecutionTransformerOptions)
}, listExecutionTransformer)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ func TestTransformNodeExecutionModelList(t *testing.T) {
})

manager := NodeExecutionManager{
db: repository,
db: repository,
config: getMockExecutionsConfigProvider(),
}
nodeExecutions, err := manager.transformNodeExecutionModelList(ctx, []models.NodeExecution{
{
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/impl/testutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ func GetApplicationConfigWithDefaultDomains() runtimeInterfaces.ApplicationConfi
Scheme: common.Local, SignedURL: runtimeInterfaces.SignedURL{
Enabled: true,
}})

config.GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages = true
config.GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength = 10240
return &config
}
12 changes: 4 additions & 8 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)

const trimmedErrMessageLen = 10240

var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.String(), core.WorkflowExecution_QUEUED.String())

// CreateExecutionModelInput encapsulates request parameters for calls to CreateExecutionModel.
Expand All @@ -47,13 +45,11 @@ type CreateExecutionModelInput struct {
}

type ExecutionTransformerOptions struct {
TrimErrorMessage bool
TrimErrorMessage bool
MaxErrorMessageLength int
}

var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{}
var ListExecutionTransformerOptions = &ExecutionTransformerOptions{
TrimErrorMessage: true,
}

// CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model
func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, error) {
Expand Down Expand Up @@ -328,8 +324,8 @@ func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransfor
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength]
}
closure.OutputResult = &admin.ExecutionClosure_Error{
Error: trimmedErrOutputResult,
Expand Down
4 changes: 3 additions & 1 deletion pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func TestFromExecutionModel_Aborted(t *testing.T) {
}

func TestFromExecutionModel_Error(t *testing.T) {
trimmedErrMessageLen := 10240
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Expand All @@ -590,7 +591,8 @@ func TestFromExecutionModel_Error(t *testing.T) {
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})
expectedExecErr := execErr
expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen))
Expand Down
4 changes: 2 additions & 2 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution, opts *Execu
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength]
}
closure.OutputResult = &admin.NodeExecutionClosure_Error{
Error: trimmedErrOutputResult,
Expand Down
6 changes: 5 additions & 1 deletion pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ func TestFromNodeExecutionModel(t *testing.T) {
}

func TestFromNodeExecutionModel_Error(t *testing.T) {
trimmedErrMessageLen := 10240
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Expand All @@ -551,7 +552,10 @@ func TestFromNodeExecutionModel_Error(t *testing.T) {
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InputURI: "input uri",
Duration: duration,
}, &ExecutionTransformerOptions{TrimErrorMessage: true})
}, &ExecutionTransformerOptions{
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})
assert.Nil(t, err)

expectedExecErr := execErr
Expand Down
4 changes: 2 additions & 2 deletions pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ func FromTaskExecutionModel(taskExecutionModel models.TaskExecution, opts *Execu
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength]
}
closure.OutputResult = &admin.TaskExecutionClosure_Error{
Error: trimmedErrOutputResult,
Expand Down
7 changes: 5 additions & 2 deletions pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func TestFromTaskExecutionModel(t *testing.T) {
}

func TestFromTaskExecutionModel_Error(t *testing.T) {
trimmedErrMessageLen := 10240
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Expand Down Expand Up @@ -633,7 +634,8 @@ func TestFromTaskExecutionModel_Error(t *testing.T) {
Closure: closureBytes,
}
taskExecution, err := FromTaskExecutionModel(taskExecutionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})

expectedExecErr := execErr
Expand All @@ -653,7 +655,8 @@ func TestFromTaskExecutionModel_Error(t *testing.T) {
})
taskExecutionModel.Closure = closureBytes
taskExecution, err = FromTaskExecutionModel(taskExecutionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})
expectedExecErr = execErr
expectedExecErr.Message = string(make([]byte, 10))
Expand Down
5 changes: 5 additions & 0 deletions pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic
MaxParallelism: 25,
K8SServiceAccount: "",
UseOffloadedWorkflowClosure: false,

ListExecutionTransformersConfig: interfaces.ExecutionTransformersConfig{
TrimErrorMessages: true,
MaxErrorMessageLength: 10240,
},
})

var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{
Expand Down
10 changes: 10 additions & 0 deletions pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type PostgresConfig struct {
Debug bool `json:"debug" pflag:" Whether or not to start the database connection with debug mode enabled."`
}

type ExecutionTransformersConfig struct {
// TrimErrorMessages indicates whether error messages returned by the list workflow execution API should be trimmed.
TrimErrorMessages bool `json:"trimErrorMessages"`
// The maximum length of an error message returned by the list workflow execution API.
MaxErrorMessageLength int `json:"maxErrorMessageLength"`
}

// ApplicationConfig is the base configuration to start admin
type ApplicationConfig struct {
// The RoleName key inserted as an annotation (https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/)
Expand Down Expand Up @@ -94,6 +101,9 @@ type ApplicationConfig struct {

// Enabling will use Storage (s3/gcs/etc) to offload static parts of CRDs.
UseOffloadedWorkflowClosure bool `json:"useOffloadedWorkflowClosure"`

// Configures the execution transformers
ListExecutionTransformersConfig ExecutionTransformersConfig `json:"listExecutionTransformersConfig"`
}

func (a *ApplicationConfig) GetRoleNameKey() string {
Expand Down

0 comments on commit 3adb19b

Please sign in to comment.