diff --git a/flyte-single-binary-local.yaml b/flyte-single-binary-local.yaml index 487eae5bf1..7e403de641 100644 --- a/flyte-single-binary-local.yaml +++ b/flyte-single-binary-local.yaml @@ -90,6 +90,8 @@ storage: access_key_id: minio secret_key: miniostorage container: my-s3-bucket + limits: + maxDownloadMBs: 1000 task_resources: defaults: diff --git a/flyteadmin/pkg/manager/impl/validation/execution_validator.go b/flyteadmin/pkg/manager/impl/validation/execution_validator.go index 639f29073f..f5fd30598a 100644 --- a/flyteadmin/pkg/manager/impl/validation/execution_validator.go +++ b/flyteadmin/pkg/manager/impl/validation/execution_validator.go @@ -100,13 +100,7 @@ func CheckAndFetchInputsForExecution( } executionInputMap[name] = expectedInput.GetDefault() } else { - var inputType *core.LiteralType - switch executionInputMap[name].GetValue().(type) { - case *core.Literal_OffloadedMetadata: - inputType = executionInputMap[name].GetOffloadedMetadata().GetInferredType() - default: - inputType = validators.LiteralTypeForLiteral(executionInputMap[name]) - } + inputType := validators.LiteralTypeForLiteral(executionInputMap[name]) err := validators.ValidateLiteralType(inputType) if err != nil { return nil, errors.NewInvalidLiteralTypeError(name, err) diff --git a/flyteidl/clients/go/coreutils/extract_literal.go b/flyteidl/clients/go/coreutils/extract_literal.go index 5801296dc3..f9918dd0f8 100644 --- a/flyteidl/clients/go/coreutils/extract_literal.go +++ b/flyteidl/clients/go/coreutils/extract_literal.go @@ -96,6 +96,10 @@ func ExtractFromLiteral(literal *core.Literal) (interface{}, error) { } } return mapResult, nil + case *core.Literal_OffloadedMetadata: + // Return the URI of the offloaded metadata to be used when displaying in flytectl + return literalValue.OffloadedMetadata.Uri, nil + } return nil, fmt.Errorf("unsupported literal type %T", literal) } diff --git a/flyteidl/clients/go/coreutils/extract_literal_test.go b/flyteidl/clients/go/coreutils/extract_literal_test.go index 2ce8747fd5..760e7bee0a 100644 --- a/flyteidl/clients/go/coreutils/extract_literal_test.go +++ b/flyteidl/clients/go/coreutils/extract_literal_test.go @@ -200,6 +200,30 @@ func TestFetchLiteral(t *testing.T) { assert.Equal(t, literalVal, extractedLiteralVal) }) + t.Run("Offloaded metadata", func(t *testing.T) { + literalVal := "s3://blah/blah/blah" + var storedLiteralType = &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + } + offloadedLiteral := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: literalVal, + InferredType: storedLiteralType, + }, + }, + } + extractedLiteralVal, err := ExtractFromLiteral(offloadedLiteral) + assert.NoError(t, err) + assert.Equal(t, literalVal, extractedLiteralVal) + }) + t.Run("Union", func(t *testing.T) { literalVal := int64(1) var literalType = &core.LiteralType{ diff --git a/flyteidl/clients/go/coreutils/literals.go b/flyteidl/clients/go/coreutils/literals.go index 3527ac246b..278fa30dfc 100644 --- a/flyteidl/clients/go/coreutils/literals.go +++ b/flyteidl/clients/go/coreutils/literals.go @@ -636,7 +636,6 @@ func MakeLiteralForType(t *core.LiteralType, v interface{}) (*core.Literal, erro if !found { return nil, fmt.Errorf("incorrect union value [%s], supported values %+v", v, newT.UnionType.Variants) } - default: return nil, fmt.Errorf("unsupported type %s", t.String()) } diff --git a/flytepropeller/pkg/compiler/transformers/k8s/inputs.go b/flytepropeller/pkg/compiler/transformers/k8s/inputs.go index 21250bd28d..26f50d4ddd 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/inputs.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/inputs.go @@ -35,20 +35,12 @@ func validateInputs(nodeID common.NodeID, iface *core.TypedInterface, inputs cor continue } - var inputType *core.LiteralType - switch inputVal.GetValue().(type) { - case *core.Literal_OffloadedMetadata: - inputType = inputVal.GetOffloadedMetadata().GetInferredType() - default: - inputType = validators.LiteralTypeForLiteral(inputVal) - } - + inputType := validators.LiteralTypeForLiteral(inputVal) err := validators.ValidateLiteralType(inputType) if err != nil { errs.Collect(errors.NewInvalidLiteralTypeErr(nodeID, inputVar, err)) continue } - if !validators.AreTypesCastable(inputType, v.Type) { errs.Collect(errors.NewMismatchingTypesErr(nodeID, inputVar, v.Type.String(), inputType.String())) continue diff --git a/flytepropeller/pkg/compiler/validators/utils.go b/flytepropeller/pkg/compiler/validators/utils.go index fb05e32bb3..d1bffbbe09 100644 --- a/flytepropeller/pkg/compiler/validators/utils.go +++ b/flytepropeller/pkg/compiler/validators/utils.go @@ -284,8 +284,9 @@ func LiteralTypeForLiteral(l *core.Literal) *core.LiteralType { MapValueType: literalTypeForLiterals(maps.Values(l.GetMap().Literals)), }, } + case *core.Literal_OffloadedMetadata: + return l.GetOffloadedMetadata().GetInferredType() } - return nil } diff --git a/flytepropeller/pkg/compiler/validators/utils_test.go b/flytepropeller/pkg/compiler/validators/utils_test.go index 26e34988c3..b6737c7e62 100644 --- a/flytepropeller/pkg/compiler/validators/utils_test.go +++ b/flytepropeller/pkg/compiler/validators/utils_test.go @@ -413,6 +413,47 @@ func TestLiteralTypeForLiterals(t *testing.T) { assert.True(t, proto.Equal(expectedLt, lt)) }) + t.Run("nested Lists with different types", func(t *testing.T) { + inferredType := &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_UnionType{ + UnionType: &core.UnionType{ + Variants: []*core.LiteralType{ + { + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + { + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_STRING, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + literals := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: "dummy/uri", + SizeBytes: 1000, + InferredType: inferredType, + }, + }, + } + expectedLt := inferredType + lt := LiteralTypeForLiteral(literals) + assert.True(t, proto.Equal(expectedLt, lt)) + }) + } func TestJoinVariableMapsUniqueKeys(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 6859bf46f0..834a016cb2 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -191,6 +191,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } size := -1 + for key, variable := range literalMap.Literals { literalType := validators.LiteralTypeForLiteral(variable) err := validators.ValidateLiteralType(literalType) @@ -200,10 +201,19 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu handler.PhaseInfoFailure(idlcore.ExecutionError_USER, errors.IDLNotFoundErr, errMsg, nil), ), nil } + if variable.GetOffloadedMetadata() != nil { + // variable will be overwritten with the contents of the offloaded data which contains the actual large literal. + // We need this for the map task to be able to create the subNodeSpec + err := common.ReadLargeLiteral(ctx, nCtx.DataStore(), variable) + if err != nil { + return handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoFailure(idlcore.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "couldn't read the offloaded literal", nil), + ), nil + } + } switch literalType.Type.(type) { case *idlcore.LiteralType_CollectionType: collectionLength := len(variable.GetCollection().Literals) - if size == -1 { size = collectionLength } else if size != collectionLength { diff --git a/flytepropeller/pkg/controller/nodes/attr_path_resolver.go b/flytepropeller/pkg/controller/nodes/attr_path_resolver.go index 42150cb887..fa19d2bf5c 100644 --- a/flytepropeller/pkg/controller/nodes/attr_path_resolver.go +++ b/flytepropeller/pkg/controller/nodes/attr_path_resolver.go @@ -1,15 +1,19 @@ package nodes import ( + "context" + "google.golang.org/protobuf/types/known/structpb" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/errors" + "github.com/flyteorg/flyte/flytestdlib/storage" ) // resolveAttrPathInPromise resolves the literal with attribute path // If the promise is chained with attributes (e.g. promise.a["b"][0]), then we need to resolve the promise -func resolveAttrPathInPromise(nodeID string, literal *core.Literal, bindAttrPath []*core.PromiseAttribute) (*core.Literal, error) { +func resolveAttrPathInPromise(ctx context.Context, datastore *storage.DataStore, nodeID string, literal *core.Literal, bindAttrPath []*core.PromiseAttribute) (*core.Literal, error) { var currVal *core.Literal = literal var tmpVal *core.Literal var err error @@ -17,7 +21,16 @@ func resolveAttrPathInPromise(nodeID string, literal *core.Literal, bindAttrPath count := 0 for _, attr := range bindAttrPath { + if currVal.GetOffloadedMetadata() != nil { + // currVal will be overwritten with the contents of the offloaded data which contains the actual large literal. + err := common.ReadLargeLiteral(ctx, datastore, currVal) + if err != nil { + return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID, "failed to read offloaded metadata for promise") + } + } switch currVal.GetValue().(type) { + case *core.Literal_OffloadedMetadata: + return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID, "unexpected offloaded metadata type") case *core.Literal_Map: tmpVal, exist = currVal.GetMap().GetLiterals()[attr.GetStringValue()] if !exist { diff --git a/flytepropeller/pkg/controller/nodes/attr_path_resolver_test.go b/flytepropeller/pkg/controller/nodes/attr_path_resolver_test.go index fb966c666e..8724f5287d 100644 --- a/flytepropeller/pkg/controller/nodes/attr_path_resolver_test.go +++ b/flytepropeller/pkg/controller/nodes/attr_path_resolver_test.go @@ -1,6 +1,7 @@ package nodes import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -319,7 +320,7 @@ func TestResolveAttrPathIn(t *testing.T) { } for i, arg := range args { - resolved, err := resolveAttrPathInPromise("", arg.literal, arg.path) + resolved, err := resolveAttrPathInPromise(context.Background(), nil, "", arg.literal, arg.path) if arg.hasError { assert.Error(t, err, i) assert.ErrorContains(t, err, errors.PromiseAttributeResolveError, i) diff --git a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go index a04623f88a..00a99d6c54 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go +++ b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go @@ -122,7 +122,7 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry logger.Debugf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err) return catalog.Entry{}, err } - logger.Debugf(ctx, "Artifact found %v from tag %v", artifact, tag) + logger.Debugf(ctx, "Artifact found %v from tag %v", artifact.GetId(), tag) var relevantTag *datacatalog.Tag if len(artifact.GetTags()) > 0 { @@ -230,7 +230,7 @@ func (m *CatalogClient) createArtifact(ctx context.Context, key catalog.Key, dat createArtifactRequest := &datacatalog.CreateArtifactRequest{Artifact: cachedArtifact} _, err := m.client.CreateArtifact(ctx, createArtifactRequest) if err != nil { - logger.Errorf(ctx, "Failed to create Artifact %+v, err: %v", cachedArtifact, err) + logger.Errorf(ctx, "Failed to create Artifact %+v, err: %v", cachedArtifact.Id, err) return catalog.Status{}, err } logger.Debugf(ctx, "Created artifact: %v, with %v outputs from execution %+v", cachedArtifact.Id, len(artifactDataList), metadata) @@ -259,7 +259,7 @@ func (m *CatalogClient) createArtifact(ctx context.Context, key catalog.Key, dat } } - logger.Debugf(ctx, "Successfully created artifact %+v for key %+v, dataset %+v and execution %+v", cachedArtifact, key, datasetID, metadata) + logger.Debugf(ctx, "Successfully created artifact %+v for key %+v, dataset %+v and execution %+v", cachedArtifact.Id, key, datasetID, metadata) return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil } diff --git a/flytepropeller/pkg/controller/nodes/common/utils.go b/flytepropeller/pkg/controller/nodes/common/utils.go index b02d830fe9..dd16b53f3a 100644 --- a/flytepropeller/pkg/controller/nodes/common/utils.go +++ b/flytepropeller/pkg/controller/nodes/common/utils.go @@ -2,6 +2,7 @@ package common import ( "context" + "encoding/base64" "fmt" "strconv" @@ -17,6 +18,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/pbhash" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -79,6 +81,27 @@ func GetTargetEntity(ctx context.Context, nCtx interfaces.NodeExecutionContext) return targetEntity } +// ReadLargeLiteral reads the offloaded large literal needed by array node task +func ReadLargeLiteral(ctx context.Context, datastore *storage.DataStore, + tobeRead *idlcore.Literal) error { + if tobeRead.GetOffloadedMetadata() == nil { + return fmt.Errorf("unsupported type for reading offloaded literal") + } + dataReference := tobeRead.GetOffloadedMetadata().GetUri() + if dataReference == "" { + return fmt.Errorf("uri is empty for offloaded literal") + } + // read the offloaded literal + size := tobeRead.GetOffloadedMetadata().GetSizeBytes() + if err := datastore.ReadProtobuf(ctx, storage.DataReference(dataReference), tobeRead); err != nil { + logger.Errorf(ctx, "Failed to read the offloaded literal at location [%s] with error [%s]", dataReference, err) + return err + } + + logger.Infof(ctx, "read offloaded literal at location [%s] with size [%s]", dataReference, size) + return nil +} + // OffloadLargeLiteral offloads the large literal if meets the threshold conditions func OffloadLargeLiteral(ctx context.Context, datastore *storage.DataStore, dataReference storage.DataReference, toBeOffloaded *idlcore.Literal, literalOffloadingConfig config.LiteralOffloadingConfig) error { @@ -108,6 +131,16 @@ func OffloadLargeLiteral(ctx context.Context, datastore *storage.DataStore, data return err } + if toBeOffloaded.GetHash() == "" { + // compute the hash of the literal + literalDigest, err := pbhash.ComputeHash(ctx, toBeOffloaded) + if err != nil { + logger.Errorf(ctx, "Failed to compute hash for offloaded literal with error [%s]", err) + return err + } + // Set the hash or else respect what the user set in the literal + toBeOffloaded.Hash = base64.RawURLEncoding.EncodeToString(literalDigest) + } // update the literal with the offloaded URI, size and inferred type toBeOffloaded.Value = &idlcore.Literal_OffloadedMetadata{ OffloadedMetadata: &idlcore.LiteralOffloadedMetadata{ diff --git a/flytepropeller/pkg/controller/nodes/common/utils_test.go b/flytepropeller/pkg/controller/nodes/common/utils_test.go index 7d5ce1e372..875ede858b 100644 --- a/flytepropeller/pkg/controller/nodes/common/utils_test.go +++ b/flytepropeller/pkg/controller/nodes/common/utils_test.go @@ -2,6 +2,7 @@ package common import ( "context" + "encoding/base64" "testing" "github.com/stretchr/testify/assert" @@ -14,6 +15,7 @@ import ( executorMocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors/mocks" nodeMocks "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces/mocks" "github.com/flyteorg/flyte/flytestdlib/contextutils" + "github.com/flyteorg/flyte/flytestdlib/pbhash" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" @@ -82,6 +84,40 @@ func init() { labeled.SetMetricKeys(contextutils.AppNameKey) } +func TestReadLargeLiteral(t *testing.T) { + t.Run("read successful", func(t *testing.T) { + ctx := context.Background() + datastore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + dataReference := storage.DataReference("foo/bar") + toBeRead := &idlCore.Literal{ + Value: &idlCore.Literal_Scalar{ + Scalar: &idlCore.Scalar{ + Value: &idlCore.Scalar_Primitive{ + Primitive: &idlCore.Primitive{ + Value: &idlCore.Primitive_Integer{ + Integer: 1, + }, + }, + }, + }, + }, + } + err := datastore.WriteProtobuf(ctx, dataReference, storage.Options{}, toBeRead) + assert.Nil(t, err) + + offloadedLiteral := &idlCore.Literal{ + Value: &idlCore.Literal_OffloadedMetadata{ + OffloadedMetadata: &idlCore.LiteralOffloadedMetadata{ + Uri: dataReference.String(), + }, + }, + } + err = ReadLargeLiteral(ctx, datastore, offloadedLiteral) + assert.Nil(t, err) + assert.Equal(t, int64(1), offloadedLiteral.GetScalar().GetPrimitive().GetInteger()) + }) +} + func TestOffloadLargeLiteral(t *testing.T) { t.Run("offload successful with valid size", func(t *testing.T) { ctx := context.Background() @@ -100,17 +136,46 @@ func TestOffloadLargeLiteral(t *testing.T) { }, }, } + expectedLiteralDigest, err := pbhash.ComputeHash(ctx, toBeOffloaded) + assert.Nil(t, err) literalOffloadingConfig := config.LiteralOffloadingConfig{ MinSizeInMBForOffloading: 0, MaxSizeInMBForOffloading: 1, } inferredType := validators.LiteralTypeForLiteral(toBeOffloaded) - err := OffloadLargeLiteral(ctx, datastore, dataReference, toBeOffloaded, literalOffloadingConfig) + err = OffloadLargeLiteral(ctx, datastore, dataReference, toBeOffloaded, literalOffloadingConfig) assert.NoError(t, err) assert.Equal(t, "foo/bar", toBeOffloaded.GetOffloadedMetadata().GetUri()) assert.Equal(t, uint64(6), toBeOffloaded.GetOffloadedMetadata().GetSizeBytes()) assert.Equal(t, inferredType.GetSimple(), toBeOffloaded.GetOffloadedMetadata().InferredType.GetSimple()) + assert.Equal(t, base64.RawURLEncoding.EncodeToString(expectedLiteralDigest), toBeOffloaded.Hash) + }) + t.Run("offload successful with valid size and hash passed in", func(t *testing.T) { + ctx := context.Background() + datastore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + dataReference := storage.DataReference("foo/bar") + toBeOffloaded := &idlCore.Literal{ + Value: &idlCore.Literal_Scalar{ + Scalar: &idlCore.Scalar{ + Value: &idlCore.Scalar_Primitive{ + Primitive: &idlCore.Primitive{ + Value: &idlCore.Primitive_Integer{ + Integer: 1, + }, + }, + }, + }, + }, + Hash: "hash", + } + literalOffloadingConfig := config.LiteralOffloadingConfig{ + MinSizeInMBForOffloading: 0, + MaxSizeInMBForOffloading: 1, + } + err := OffloadLargeLiteral(ctx, datastore, dataReference, toBeOffloaded, literalOffloadingConfig) + assert.NoError(t, err) + assert.Equal(t, "hash", toBeOffloaded.Hash) }) t.Run("offload fails with size larger than max", func(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/output_resolver.go b/flytepropeller/pkg/controller/nodes/output_resolver.go index 85eebd4de5..df8a6dfe19 100644 --- a/flytepropeller/pkg/controller/nodes/output_resolver.go +++ b/flytepropeller/pkg/controller/nodes/output_resolver.go @@ -66,7 +66,7 @@ func (r remoteFileOutputResolver) ExtractOutput(ctx context.Context, nl executor // resolving binding attribute path if exist if len(bindAttrPath) > 0 { - output, err = resolveAttrPathInPromise(n.GetID(), output, bindAttrPath) + output, err = resolveAttrPathInPromise(ctx, r.store, n.GetID(), output, bindAttrPath) } return output, err