Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Filtering on activation state of workflows #305

Merged
merged 2 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions cmd/get/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Retrieve all the workflows within project and domain (workflow/workflows can be

flytectl get workflow -p flytesnacks -d development

Retrieve workflow by name within project and domain:
Retrieve all versions of a workflow by name within project and domain:

::

Expand Down Expand Up @@ -103,6 +103,14 @@ var listWorkflowColumns = []printer.Column{
{Header: "Created At", JSONPath: "$.closure.createdAt"},
}

var namedEntityColumns = []printer.Column{
{Header: "Project", JSONPath: "$.id.project"},
{Header: "Domain", JSONPath: "$.id.domain"},
{Header: "Name", JSONPath: "$.id.name"},
{Header: "Description", JSONPath: "$.metadata.description"},
{Header: "State", JSONPath: "$.metadata.state"},
}

func WorkflowToProtoMessages(l []*admin.Workflow) []proto.Message {
messages := make([]proto.Message, 0, len(l))
for _, m := range l {
Expand All @@ -111,6 +119,14 @@ func WorkflowToProtoMessages(l []*admin.Workflow) []proto.Message {
return messages
}

func NamedEntityToProtoMessages(l []*admin.NamedEntity) []proto.Message {
messages := make([]proto.Message, 0, len(l))
for _, m := range l {
messages = append(messages, m)
}
return messages
}

func WorkflowToTableProtoMessages(l []*admin.Workflow) []proto.Message {
messages := make([]proto.Message, 0, len(l))
for _, m := range l {
Expand Down Expand Up @@ -155,16 +171,13 @@ func getWorkflowFunc(ctx context.Context, args []string, cmdCtx cmdCore.CommandC
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), columns, WorkflowToProtoMessages(workflows)...)
}

workflows, err = cmdCtx.AdminFetcherExt().FetchAllVerOfWorkflow(ctx, "", config.GetConfig().Project, config.GetConfig().Domain, workflowconfig.DefaultConfig.Filter)
nameEntities, err := cmdCtx.AdminFetcherExt().FetchAllWorkflows(ctx, config.GetConfig().Project, config.GetConfig().Domain, workflowconfig.DefaultConfig.Filter)
if err != nil {
return err
}

logger.Debugf(ctx, "Retrieved %v workflows", len(workflows))
if config.GetConfig().MustOutputFormat() == printer.OutputFormatTABLE {
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), listWorkflowColumns, WorkflowToTableProtoMessages(workflows)...)
}
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), listWorkflowColumns, WorkflowToProtoMessages(workflows)...)
logger.Debugf(ctx, "Retrieved %v workflows", len(nameEntities))
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), namedEntityColumns, NamedEntityToProtoMessages(nameEntities)...)
}

// FetchWorkflowForName fetches the workflow give it name.
Expand Down
37 changes: 23 additions & 14 deletions cmd/get/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,12 @@ import (
)

var (
resourceListRequestWorkflow *admin.ResourceListRequest
workflowListResponse *admin.WorkflowList
argsWf []string
workflow1 *admin.Workflow
workflows []*admin.Workflow
argsWf []string
workflow1 *admin.Workflow
workflows []*admin.Workflow
)

func getWorkflowSetup() {
resourceListRequestWorkflow = &admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
Project: projectValue,
Domain: domainValue,
},
}

variableMap := map[string]*core.Variable{
"var1": {
Expand Down Expand Up @@ -97,9 +89,6 @@ func getWorkflowSetup() {
},
}
workflows = []*admin.Workflow{workflow1, workflow2}
workflowListResponse = &admin.WorkflowList{
Workflows: workflows,
}
argsWf = []string{"workflow1"}
workflow.DefaultConfig.Latest = false
workflow.DefaultConfig.Version = ""
Expand Down Expand Up @@ -149,6 +138,26 @@ func TestGetWorkflowFuncWithError(t *testing.T) {
assert.NotNil(t, err)
})

t.Run("fetching all workflow success", func(t *testing.T) {
s := setup()
getWorkflowSetup()
var args []string
s.FetcherExt.OnFetchAllWorkflowsMatch(mock.Anything, mock.Anything,
mock.Anything, mock.Anything).Return([]*admin.NamedEntity{}, nil)
err := getWorkflowFunc(s.Ctx, args, s.CmdCtx)
assert.Nil(t, err)
})

t.Run("fetching all workflow error", func(t *testing.T) {
s := setup()
getWorkflowSetup()
var args []string
s.FetcherExt.OnFetchAllWorkflowsMatch(mock.Anything, mock.Anything,
mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error fetching all workflows"))
err := getWorkflowFunc(s.Ctx, args, s.CmdCtx)
assert.NotNil(t, err)
})

}

func TestGetWorkflowFuncLatestWithTable(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ext/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type AdminFetcherExtInterface interface {
// FetchTaskVersion fetches particular version of task in a project, domain
FetchTaskVersion(ctx context.Context, name, version, project, domain string) (*admin.Task, error)

// FetchAllWorkflows fetches all workflows in project domain
FetchAllWorkflows(ctx context.Context, project, domain string, filter filters.Filters) ([]*admin.NamedEntity, error)

// FetchAllVerOfWorkflow fetches all versions of task in a project, domain
FetchAllVerOfWorkflow(ctx context.Context, name, project, domain string, filter filters.Filters) ([]*admin.Workflow, error)

Expand Down
41 changes: 41 additions & 0 deletions pkg/ext/mocks/admin_fetcher_ext_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/ext/workflow_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ func (a *AdminFetcherExtClient) FetchAllVerOfWorkflow(ctx context.Context, workf
return wList.Workflows, nil
}

// FetchAllWorkflows fetches all workflows in project domain
func (a *AdminFetcherExtClient) FetchAllWorkflows(ctx context.Context, project, domain string, filter filters.Filters) ([]*admin.NamedEntity, error) {
tranformFilters, err := filters.BuildNamedEntityListRequest(filter, project, domain, core.ResourceType_WORKFLOW)
if err != nil {
return nil, err
}
wList, err := a.AdminServiceClient().ListNamedEntities(ctx, tranformFilters)
if err != nil {
return nil, err
}
if len(wList.Entities) == 0 {
return nil, fmt.Errorf("no workflow retrieved for %v project %v domain", project, domain)
}
return wList.Entities, nil
}

// FetchWorkflowLatestVersion fetches latest version for given workflow name
func (a *AdminFetcherExtClient) FetchWorkflowLatestVersion(ctx context.Context, name, project, domain string, filter filters.Filters) (*admin.Workflow, error) {
// Fetch the latest version of the workflow.
Expand Down
42 changes: 39 additions & 3 deletions pkg/ext/workflow_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
)

var (
workflowListResponse *admin.WorkflowList
workflowFilter = filters.Filters{}
workflowResponse *admin.Workflow
workflowListResponse *admin.WorkflowList
namedEntityListResponse *admin.NamedEntityList
workflowFilter = filters.Filters{}
workflowResponse *admin.Workflow
)

func getWorkflowFetcherSetup() {
Expand Down Expand Up @@ -79,14 +80,49 @@ func getWorkflowFetcherSetup() {
},
}

namedEntity := &admin.NamedEntity{
Id: &admin.NamedEntityIdentifier{
Project: "project",
Domain: "domain",
Name: "workflow",
},
ResourceType: core.ResourceType_WORKFLOW,
}

workflows := []*admin.Workflow{workflow2, workflow1}

namedEntityListResponse = &admin.NamedEntityList{
Entities: []*admin.NamedEntity{namedEntity},
}
workflowListResponse = &admin.WorkflowList{
Workflows: workflows,
}
workflowResponse = workflows[0]
}

func TestFetchAllWorkflows(t *testing.T) {
t.Run("non empty response", func(t *testing.T) {
getWorkflowFetcherSetup()
adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(namedEntityListResponse, nil)
_, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter)
assert.Nil(t, err)
})
t.Run("empty response", func(t *testing.T) {
getWorkflowFetcherSetup()
namedEntityListResponse := &admin.NamedEntityList{}
adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(namedEntityListResponse, nil)
_, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter)
assert.Equal(t, fmt.Errorf("no workflow retrieved for project project domain domain"), err)
})
}

func TestFetchAllWorkflowsError(t *testing.T) {
getWorkflowFetcherSetup()
adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed"))
_, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter)
assert.Equal(t, fmt.Errorf("failed"), err)
}

func TestFetchAllVerOfWorkflow(t *testing.T) {
getWorkflowFetcherSetup()
adminClient.OnListWorkflowsMatch(mock.Anything, mock.Anything).Return(workflowListResponse, nil)
Expand Down
20 changes: 20 additions & 0 deletions pkg/filters/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strconv"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

func BuildResourceListRequestWithName(c Filters, project, domain, name string) (*admin.ResourceListRequest, error) {
Expand All @@ -29,6 +30,25 @@ func BuildResourceListRequestWithName(c Filters, project, domain, name string) (
return request, nil
}

func BuildNamedEntityListRequest(c Filters, project, domain string, resourceType core.ResourceType) (*admin.NamedEntityListRequest, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: c Filters is kinda funky :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure . will fix this in followup

fieldSelector, err := Transform(SplitTerms(c.FieldSelector))
if err != nil {
return nil, err
}
request := &admin.NamedEntityListRequest{
Limit: uint32(c.Limit),
Token: getToken(c),
Filters: fieldSelector,
Project: project,
Domain: domain,
ResourceType: resourceType,
}
if sort := buildSortingRequest(c); sort != nil {
request.SortBy = sort
}
return request, nil
}

func BuildProjectListRequest(c Filters) (*admin.ProjectListRequest, error) {
fieldSelector, err := Transform(SplitTerms(c.FieldSelector))
if err != nil {
Expand Down