Skip to content

Commit

Permalink
fix: apply custom cursor pagination where workflows and archived work…
Browse files Browse the repository at this point in the history
…flows are merged

Signed-off-by: sunyeongchoi <[email protected]>
  • Loading branch information
sunyeongchoi committed Sep 6, 2023
1 parent 94fbb3b commit bd50093
Showing 1 changed file with 75 additions and 26 deletions.
101 changes: 75 additions & 26 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@ import (
"fmt"
"io"
"sort"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"strconv"

"github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
Expand All @@ -35,6 +28,13 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/util"
"github.com/argoproj/argo-workflows/v3/workflow/validate"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

type workflowServer struct {
Expand Down Expand Up @@ -129,7 +129,50 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return wf, nil
}

func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alpha1.WorkflowList, numWfsToKeep int) *v1alpha1.WorkflowList {
func cursorPaginationByResourceVersion(items []v1alpha1.Workflow, resourceVersion string, limit int64, wfList *v1alpha1.WorkflowList) {
// Use Kubernetes resourceVersion for cursor pagination.
// Sort the Kubernetes results in descending order by resourceVersion.
// To implement cursor pagination with filtering based on resourceVersion, start by sorting in descending order according to the resourceVersion.
sort.Slice(items, func(i, j int) bool {
itemIRV, _ := strconv.Atoi(items[i].ResourceVersion)
itemJRV, _ := strconv.Atoi(items[j].ResourceVersion)
return itemIRV > itemJRV
})

// Due to the descending sorting above, the items are filtered to have a resourceVersion smaller than the received value.
// The data with values smaller than the received resourceVersion on the current page will be used for the next page.
if resourceVersion != "" {
var newItems []v1alpha1.Workflow
for _, item := range items {
targetRV, _ := strconv.Atoi(item.ResourceVersion)
receivedRV, _ := strconv.Atoi(resourceVersion)
if targetRV < receivedRV {
newItems = append(newItems, item)
}
items = newItems
}
}

// Indexing list by limit count
if limit != 0 {
endIndex := int(limit)
if endIndex > len(items) || limit == 0 {
endIndex = len(items)
}
wfList.Items = items[0:endIndex]
} else {
wfList.Items = items
}

// Calculate new offset for next batch
// For the next pagination, the resourceVersion of the last item is set in the Continue field.
if limit != 0 && len(wfList.Items) == int(limit) {
lastIndex := len(wfList.Items) - 1
wfList.ListMeta.Continue = wfList.Items[lastIndex].ResourceVersion
}
}

func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alpha1.WorkflowList, resourceVersion string, limit int64) *v1alpha1.WorkflowList {
var mergedWfs []v1alpha1.Workflow
var uidToWfs = map[types.UID][]v1alpha1.Workflow{}
for _, item := range liveWfs.Items {
Expand All @@ -152,43 +195,49 @@ func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alp
}
}
}
mergedWfsList := v1alpha1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
sort.Sort(mergedWfsList.Items)
numWfs := 0
var finalWfs []v1alpha1.Workflow
for _, item := range mergedWfsList.Items {
if numWfsToKeep == 0 || numWfs < numWfsToKeep {
finalWfs = append(finalWfs, item)
numWfs += 1
}
}
return &v1alpha1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta}
return &v1alpha1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) {
wfClient := auth.GetWfClient(ctx)

listOption := &metav1.ListOptions{}
options := &metav1.ListOptions{}
if req.ListOptions != nil {
listOption = req.ListOptions
options = req.ListOptions
}
s.instanceIDService.With(listOption)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *listOption)

// Save the original Continue and Limit.
resourceVersion := options.Continue
limit := options.Limit

// Search whole with Limit 0.
// Reset the Continue "" to prevent Kubernetes native pagination.
options.Continue = ""
options.Limit = 0

s.instanceIDService.With(options)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

// Search whole with Limit 0.
// Reset the Continue "0" to prevent archive workflow pagination.
options.Continue = "0"
options.Limit = 0
archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{
ListOptions: listOption,
ListOptions: options,
NamePrefix: "",
Namespace: req.Namespace,
})
if err != nil {
log.Warnf("unable to list archived workflows:%v", err)
} else {
if archivedWfList != nil {
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit))
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, resourceVersion, limit)
}
}
cursorPaginationByResourceVersion(wfList.Items, resourceVersion, limit, wfList)

cleaner := fields.NewCleaner(req.Fields)
if s.offloadNodeStatusRepo.IsEnabled() && !cleaner.WillExclude("items.status.nodes") {
Expand Down

0 comments on commit bd50093

Please sign in to comment.