Skip to content

Commit

Permalink
fix: apply custom cursor pagination where workflows and archived work…
Browse files Browse the repository at this point in the history
…flows are merged (#11761)

Signed-off-by: sunyeongchoi <[email protected]>
Signed-off-by: happyso <[email protected]>
Co-authored-by: Yuan (Terry) Tang <[email protected]>
  • Loading branch information
sunyeongchoi and terrytangyuan authored Sep 12, 2023
1 parent a5c7d51 commit e073dcc
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 23 deletions.
90 changes: 72 additions & 18 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sort"
"strconv"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -128,7 +129,50 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return wf, nil
}

func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.WorkflowList, numWfsToKeep int) *wfv1.WorkflowList {
func cursorPaginationByResourceVersion(items []wfv1.Workflow, resourceVersion string, limit int64, wfList *wfv1.WorkflowList) {
// Sort the workflow list in descending order by resourceVersion.
sort.Slice(items, func(i, j int) bool {
itemIRV, _ := strconv.Atoi(items[i].ResourceVersion)
itemJRV, _ := strconv.Atoi(items[j].ResourceVersion)
return itemIRV > itemJRV
})

// resourceVersion: unique value to identify the version of the object by Kubernetes. It is used for pagination in workflows.
// receivedRV: resourceVersion value used for previous pagination
// Due to the descending sorting above, the items are filtered to have a resourceVersion smaller than receivedRV.
// The data with values smaller than the receivedRV on the current page will be used for the next page.
if resourceVersion != "" {
var newItems []wfv1.Workflow
for _, item := range items {
targetRV, _ := strconv.Atoi(item.ResourceVersion)
receivedRV, _ := strconv.Atoi(resourceVersion)
if targetRV < receivedRV {
newItems = append(newItems, item)
}
items = newItems
}
}

// Indexing list by limit count
if limit != 0 {
endIndex := int(limit)
if endIndex > len(items) || limit == 0 {
endIndex = len(items)
}
wfList.Items = items[0:endIndex]
} else {
wfList.Items = items
}

// Calculate new offset for next page
// For the next pagination, the resourceVersion of the last item is set in the Continue field.
if limit != 0 && len(wfList.Items) == int(limit) {
lastIndex := len(wfList.Items) - 1
wfList.ListMeta.Continue = wfList.Items[lastIndex].ResourceVersion
}
}

func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.WorkflowList) *wfv1.WorkflowList {
var mergedWfs []wfv1.Workflow
var uidToWfs = map[types.UID][]wfv1.Workflow{}
for _, item := range liveWfs.Items {
Expand All @@ -151,43 +195,53 @@ func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.Work
}
}
}
mergedWfsList := wfv1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
sort.Sort(mergedWfsList.Items)
numWfs := 0
var finalWfs []wfv1.Workflow
for _, item := range mergedWfsList.Items {
if numWfsToKeep == 0 || numWfs < numWfsToKeep {
finalWfs = append(finalWfs, item)
numWfs += 1
}
}
return &wfv1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta}
// The ListMeta of type WorkflowList requires a resourceVersion for the List object.
// While archivedWfs does not have a resourceVersion corresponding to the List object,
// liveWfs does have a resourceVersion corresponding to the List object.
// Therefore, the ListMetadata should contain the ListMetadata value of liveWfs.
return &wfv1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) {
wfClient := auth.GetWfClient(ctx)

listOption := &metav1.ListOptions{}
options := &metav1.ListOptions{}
if req.ListOptions != nil {
listOption = req.ListOptions
options = req.ListOptions
}
s.instanceIDService.With(listOption)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *listOption)

// Save the original Continue and Limit.
resourceVersion := options.Continue
limit := options.Limit

// Search whole with Limit 0.
// Reset the Continue "" to prevent Kubernetes native pagination.
options.Continue = ""
options.Limit = 0

s.instanceIDService.With(options)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

// Search whole with Limit 0.
// Reset the Continue "0" to prevent archive workflow pagination.
options.Continue = "0"
options.Limit = 0
archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{
ListOptions: listOption,
ListOptions: options,
NamePrefix: "",
Namespace: req.Namespace,
})
if err != nil {
log.Warnf("unable to list archived workflows:%v", err)
} else {
if archivedWfList != nil {
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit))
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList)
}
}
cursorPaginationByResourceVersion(wfList.Items, resourceVersion, limit, wfList)

cleaner := fields.NewCleaner(req.Fields)
if s.offloadNodeStatusRepo.IsEnabled() && !cleaner.WillExclude("items.status.nodes") {
Expand Down
41 changes: 36 additions & 5 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,42 @@ func TestMergeWithArchivedWorkflows(t *testing.T) {
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}}
liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}}
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}}
expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}}
expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}}
assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items)
assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items)
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Archived}}
expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2, wf3}}
assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList).Items)
}

func TestCursorPaginationByResourceVersion(t *testing.T) {
wf1 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1", Name: "wf1"}}
wf2 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2", Name: "wf2"}}
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3", Name: "wf3"}}
wf4 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4", Name: "wf4"}}
wf5 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5", Name: "wf5"}}
wf6 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6", Name: "wf6"}}
wf7 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "7", Name: "wf7"}}
wf8 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "8", Name: "wf8"}}
wf9 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "9", Name: "wf9"}}
wf10 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "10", Name: "wf10"}}

items := []v1alpha1.Workflow{wf2, wf1, wf4, wf3, wf6, wf5, wf8, wf7, wf10, wf9}
wfList := &v1alpha1.WorkflowList{}

cursorPaginationByResourceVersion(items, "8", 5, wfList)
expectedWfList := &v1alpha1.WorkflowList{}
expectedWfList.Items = []v1alpha1.Workflow{wf7, wf6, wf5, wf4, wf3}
expectedWfList.ListMeta.Continue = "3"

assert.Equal(t, expectedWfList, wfList)
}

func TestWatchWorkflows(t *testing.T) {
Expand Down

0 comments on commit e073dcc

Please sign in to comment.