From fc0f7cd99b458e83e7f6ca4456f6410edfd80f6b Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Tue, 17 Sep 2024 20:03:57 -0400 Subject: [PATCH 1/2] Rename literal offloading flags and enable offloading in local config Signed-off-by: Eduardo Apolinario --- flyte-single-binary-local.yaml | 2 ++ flytepropeller/pkg/controller/config/config.go | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flyte-single-binary-local.yaml b/flyte-single-binary-local.yaml index 3a4dc780b2..487eae5bf1 100644 --- a/flyte-single-binary-local.yaml +++ b/flyte-single-binary-local.yaml @@ -24,6 +24,8 @@ propeller: create-flyteworkflow-crd: true kube-config: $HOME/.flyte/sandbox/kubeconfig rawoutput-prefix: s3://my-s3-bucket/data + literal-offloading-config: + enabled: true server: kube-config: $HOME/.flyte/sandbox/kubeconfig diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 2d61c94970..f045058e95 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -173,17 +173,17 @@ type Config struct { CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"` - LiteralOffloadingConfig LiteralOffloadingConfig `json:"literalOffloadingConfig" pflag:",config used for literal offloading."` + LiteralOffloadingConfig LiteralOffloadingConfig `json:"literal-offloading-config" pflag:",config used for literal offloading."` } type LiteralOffloadingConfig struct { Enabled bool // Maps flytekit and union SDK names to minimum supported version that can handle reading offloaded literals. - SupportedSDKVersions map[string]string + SupportedSDKVersions map[string]string `json:"supported-sdk-versions" pflag:",Maps flytekit and union SDK names to minimum supported version that can handle reading offloaded literals."` // Default, 10Mbs. Determines the size of a literal at which to trigger offloading - MinSizeInMBForOffloading int64 + MinSizeInMBForOffloading int64 `json:"min-size-in-mb-for-offloading" pflag:",Size of a literal at which to trigger offloading"` // Fail fast threshold - MaxSizeInMBForOffloading int64 + MaxSizeInMBForOffloading int64 `json:"max-size-in-mb-for-offloading" pflag:",Size of a literal at which to fail fast"` } // IsSupportedSDKVersion returns true if the provided SDK and version are supported by the literal offloading config. From c04a0f6d8b52ca819570b7116d62a6340d332d28 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Tue, 17 Sep 2024 20:16:07 -0400 Subject: [PATCH 2/2] Run `make -C flytepropeller generate` Signed-off-by: Eduardo Apolinario --- .../pkg/controller/config/config_flags.go | 8 +++---- .../controller/config/config_flags_test.go | 24 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index b2e88e88e6..d8496a56fe 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -112,9 +112,9 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "node-execution-worker-count"), defaultConfig.NodeExecutionWorkerCount, "Number of workers to evaluate node executions, currently only used for array nodes") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "array-node-config.event-version"), defaultConfig.ArrayNode.EventVersion, "ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "array-node-config.default-parallelism-behavior"), defaultConfig.ArrayNode.DefaultParallelismBehavior, "Default parallelism behavior for array nodes") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "literalOffloadingConfig.Enabled"), defaultConfig.LiteralOffloadingConfig.Enabled, "") - cmdFlags.StringToString(fmt.Sprintf("%v%v", prefix, "literalOffloadingConfig.SupportedSDKVersions"), defaultConfig.LiteralOffloadingConfig.SupportedSDKVersions, "") - cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "literalOffloadingConfig.MinSizeInMBForOffloading"), defaultConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading, "") - cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "literalOffloadingConfig.MaxSizeInMBForOffloading"), defaultConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading, "") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "literal-offloading-config.Enabled"), defaultConfig.LiteralOffloadingConfig.Enabled, "") + cmdFlags.StringToString(fmt.Sprintf("%v%v", prefix, "literal-offloading-config.supported-sdk-versions"), defaultConfig.LiteralOffloadingConfig.SupportedSDKVersions, "Maps flytekit and union SDK names to minimum supported version that can handle reading offloaded literals.") + cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "literal-offloading-config.min-size-in-mb-for-offloading"), defaultConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading, "Size of a literal at which to trigger offloading") + cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "literal-offloading-config.max-size-in-mb-for-offloading"), defaultConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading, "Size of a literal at which to fail fast") return cmdFlags } diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index aadb24b36a..109dc47b28 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -967,13 +967,13 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_literalOffloadingConfig.Enabled", func(t *testing.T) { + t.Run("Test_literal-offloading-config.Enabled", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("literalOffloadingConfig.Enabled", testValue) - if vBool, err := cmdFlags.GetBool("literalOffloadingConfig.Enabled"); err == nil { + cmdFlags.Set("literal-offloading-config.Enabled", testValue) + if vBool, err := cmdFlags.GetBool("literal-offloading-config.Enabled"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.LiteralOffloadingConfig.Enabled) } else { @@ -981,13 +981,13 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_literalOffloadingConfig.SupportedSDKVersions", func(t *testing.T) { + t.Run("Test_literal-offloading-config.supported-sdk-versions", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "a=1,b=2" - cmdFlags.Set("literalOffloadingConfig.SupportedSDKVersions", testValue) - if vStringToString, err := cmdFlags.GetStringToString("literalOffloadingConfig.SupportedSDKVersions"); err == nil { + cmdFlags.Set("literal-offloading-config.supported-sdk-versions", testValue) + if vStringToString, err := cmdFlags.GetStringToString("literal-offloading-config.supported-sdk-versions"); err == nil { testDecodeRaw_Config(t, vStringToString, &actual.LiteralOffloadingConfig.SupportedSDKVersions) } else { @@ -995,13 +995,13 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_literalOffloadingConfig.MinSizeInMBForOffloading", func(t *testing.T) { + t.Run("Test_literal-offloading-config.min-size-in-mb-for-offloading", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("literalOffloadingConfig.MinSizeInMBForOffloading", testValue) - if vInt64, err := cmdFlags.GetInt64("literalOffloadingConfig.MinSizeInMBForOffloading"); err == nil { + cmdFlags.Set("literal-offloading-config.min-size-in-mb-for-offloading", testValue) + if vInt64, err := cmdFlags.GetInt64("literal-offloading-config.min-size-in-mb-for-offloading"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vInt64), &actual.LiteralOffloadingConfig.MinSizeInMBForOffloading) } else { @@ -1009,13 +1009,13 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_literalOffloadingConfig.MaxSizeInMBForOffloading", func(t *testing.T) { + t.Run("Test_literal-offloading-config.max-size-in-mb-for-offloading", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("literalOffloadingConfig.MaxSizeInMBForOffloading", testValue) - if vInt64, err := cmdFlags.GetInt64("literalOffloadingConfig.MaxSizeInMBForOffloading"); err == nil { + cmdFlags.Set("literal-offloading-config.max-size-in-mb-for-offloading", testValue) + if vInt64, err := cmdFlags.GetInt64("literal-offloading-config.max-size-in-mb-for-offloading"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vInt64), &actual.LiteralOffloadingConfig.MaxSizeInMBForOffloading) } else {