Skip to content

Commit

Permalink
Raw output prefix (flyteorg#169)
Browse files Browse the repository at this point in the history
* attempt to bring in IDL change, and add to workflow struct

* go sum

* changes

* spelling

* make generate

* just use meta

* unit tests

* wip

* wip

* make goimports

* wip

* add comment in config.yaml

* Revert local changes

Co-authored-by: Haytham AbuelFutuh <[email protected]>
  • Loading branch information
wild-endeavor and EngHabu authored Aug 12, 2020
1 parent 91e5d0a commit a2d8a74
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 21 deletions.
5 changes: 3 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ tasks:
- container
- K8S-ARRAY
- qubole-hive-executor
- sagemaker_training
- sagemaker_hyperparameter_tuning
# Uncomment to enable sagemaker plugin
# - sagemaker_training
# - sagemaker_hyperparameter_tuning
# Sample plugins config
plugins:
# All k8s plugins default configuration
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.18.0
github.com/lyft/flyteidl v0.18.1
github.com/lyft/flyteplugins v0.4.4
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0 h1:f4yv1MafE26wpMC6QlthM02EeTEDXpy/waL54dRDiSs=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.2 h1:DUyvi7PkJtQ+WV5ZlVypIfJOJOL3THn6QJgh5g24kG4=
github.com/lyft/flyteplugins v0.4.2/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio=
github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.4 h1:2tFBAtcxjd81wVByI5yVSIBKJ/UECk7XQK3F1XzttNA=
github.com/lyft/flyteplugins v0.4.4/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package v1alpha1

import "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

// This contains an OutputLocationPrefix. When running against AWS, this should be something of the form
// s3://my-bucket, or s3://my-bucket/ A sharding string will automatically be appended to this prefix before
// handing off to plugins/tasks. Sharding behavior may change in the future.
// Background available at https://github.com/lyft/flyte/issues/211
type RawOutputDataConfig struct {
*admin.RawOutputDataConfig
}

func (in *RawOutputDataConfig) DeepCopyInto(out *RawOutputDataConfig) {
*out = *in
}
15 changes: 15 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package v1alpha1

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/stretchr/testify/assert"

"testing"
)

func TestRawOutputConfig(t *testing.T) {
r := RawOutputDataConfig{&admin.RawOutputDataConfig{
OutputLocationPrefix: "s3://bucket",
}}
assert.Equal(t, "s3://bucket", r.OutputLocationPrefix)
}
1 change: 1 addition & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ type Meta interface {
GetName() string
GetServiceAccountName() string
IsInterruptible() bool
GetRawOutputDataConfig() RawOutputDataConfig
}

type TaskDetailsGetter interface {
Expand Down
32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ type FlyteWorkflow struct {
ServiceAccountName string `json:"serviceAccountName,omitempty" protobuf:"bytes,8,opt,name=serviceAccountName"`
// Status is the only mutable section in the workflow. It holds all the execution information
Status WorkflowStatus `json:"status,omitempty"`

// non-Serialized fields
// RawOutputDataConfig defines the configurations to use for generating raw outputs (e.g. blobs, schemas).
RawOutputDataConfig RawOutputDataConfig `json:"rawOutputDataConfig,omitempty"`

// non-Serialized fields (these will not get written to etcd)
// As of 2020-07, the only real implementation of this interface is a URLPathConstructor, which is just an empty
// struct. However, because this field is an interface, we create it once when the crd is hydrated from etcd,
// so that it can be used downstream without any confusion.
// This field is here because it's easier to put it here than pipe through a new object through all of propeller.
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`
}

Expand Down Expand Up @@ -110,6 +116,10 @@ func (in *FlyteWorkflow) IsInterruptible() bool {
return in.NodeDefaults.Interruptible
}

func (in *FlyteWorkflow) GetRawOutputDataConfig() RawOutputDataConfig {
return in.RawOutputDataConfig
}

type Inputs struct {
*core.LiteralMap
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/controller/executors/mocks/immutable_execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
Expand Down Expand Up @@ -193,6 +195,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, startNode, startNodeStatus

}
Expand Down Expand Up @@ -292,6 +297,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -377,6 +385,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -507,6 +518,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -599,6 +613,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
mockWf.OnGetLabels().Return(make(map[string]string))
mockWf.OnIsInterruptible().Return(false)
mockWf.OnGetOnFailurePolicy().Return(v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY))
mockWf.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
})
mockWfStatus.OnGetDataDir().Return(storage.DataReference("x"))
mockWfStatus.OnConstructNodeDataDirMatch(mock.Anything, mock.Anything, mock.Anything).Return("x", nil)
return mockWf, mockN2Status
Expand Down Expand Up @@ -1098,6 +1115,9 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -1210,6 +1230,9 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) {
eCtx.OnIsInterruptible().Return(true)
eCtx.OnGetExecutionID().Return(v1alpha1.WorkflowExecutionIdentifier{WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{}})
eCtx.OnGetLabels().Return(nil)
eCtx.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
})

branchTakenNodeID := "branchTakenNode"
branchTakenNode := &mocks.ExecutableNode{}
Expand Down
Loading

0 comments on commit a2d8a74

Please sign in to comment.