Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't load entire archived workflow into memory in list APIs #12912

Merged
merged 5 commits into from
Apr 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
log "github.com/sirupsen/logrus"
"github.com/upper/db/v4"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
Expand All @@ -30,6 +32,9 @@ type archivedWorkflowMetadata struct {
Phase wfv1.WorkflowPhase `db:"phase"`
StartedAt time.Time `db:"startedat"`
FinishedAt time.Time `db:"finishedat"`
Labels string `db:"labels,omitempty"`
Annotations string `db:"annotations,omitempty"`
Progress string `db:"progress,omitempty"`
}

type archivedWorkflowRecord struct {
Expand Down Expand Up @@ -142,7 +147,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
}

func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowRecord
var archivedWfs []archivedWorkflowMetadata

// If we were passed 0 as the limit, then we should load all available archived workflows
// to match the behavior of the `List` operations in the Kubernetes API
Expand All @@ -151,8 +156,13 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
offset = -1
}

selectQuery, err := selectArchivedWorkflowQuery(r.dbType)
if err != nil {
return nil, err
}

selector := r.session.SQL().
Select("workflow").
Select(selectQuery).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(namespace)).
Expand All @@ -161,7 +171,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
And(startedAtFromClause(minStartedAt)).
And(startedAtToClause(maxStartedAt))

selector, err := labelsClause(selector, r.dbType, labelRequirements)
selector, err = labelsClause(selector, r.dbType, labelRequirements)
if err != nil {
return nil, err
}
Expand All @@ -174,16 +184,35 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
return nil, err
}

wfs := make(wfv1.Workflows, 0)
for _, archivedWf := range archivedWfs {
wf := wfv1.Workflow{}
err = json.Unmarshal([]byte(archivedWf.Workflow), &wf)
if err != nil {
log.WithFields(log.Fields{"workflowUID": archivedWf.UID, "workflowName": archivedWf.Name}).Errorln("unable to unmarshal workflow from database")
} else {
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"
wfs = append(wfs, wf)
wfs := make(wfv1.Workflows, len(archivedWfs))
for i, md := range archivedWfs {
labels := make(map[string]string)
if err := json.Unmarshal([]byte(md.Labels), &labels); err != nil {
return nil, err
}
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"

annotations := make(map[string]string)
if err := json.Unmarshal([]byte(md.Annotations), &annotations); err != nil {
return nil, err
}

wfs[i] = wfv1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: md.Name,
Namespace: md.Namespace,
UID: types.UID(md.UID),
CreationTimestamp: v1.Time{Time: md.StartedAt},
Labels: labels,
Annotations: annotations,
},
Status: wfv1.WorkflowStatus{
Phase: md.Phase,
StartedAt: v1.Time{Time: md.StartedAt},
FinishedAt: v1.Time{Time: md.FinishedAt},
Progress: wfv1.Progress(md.Progress),
},
Copy link
Contributor

@agilgur5 agilgur5 Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What did you base the metadata requirements on? Based on workflows-service.ts and reports.tsx, we may also want status.message, status.estimatedDuration, status.resourceDuration, and spec.suspend

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was checking columns in the workflow UI and decided what metadata to include, but if the fields that you mentioned are required, I can open another PR to add them

Copy link
Contributor

@agilgur5 agilgur5 Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically speaking, fields is a query parameter in the API, so ideally we'd dynamically set the fields.

If you didn't notice some of them it may be because you can expand items in the list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that could be, I will prepare a PR to add those fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be causing a regression now: #13098

Copy link
Contributor

@agilgur5 agilgur5 May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm looking at #11121 (comment) more, it may have also unintentionally introduced a feature where you can get all parts of an Archived Workflows in the list API, whereas you could not in 3.4 if I'm reading correctly.

I.e. fixing that bug also removed that feature. I don't necessarily mind that if it wasn't present in 3.4, but I can see how that could cause some back-and-forth turbulence & instability for users (which, well, #11121 caused a lot of regressions & instability unfortunately (and predates me))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR to add the above missing fields: #13136

}
}
return wfs, nil
Expand Down Expand Up @@ -347,3 +376,13 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
return nil
}

func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
switch t {
case MySQL:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->>'$.metadata.labels', '{}') as labels,coalesce(workflow->>'$.metadata.annotations', '{}') as annotations, coalesce(workflow->>'$.status.progress', '') as progress"), nil
case Postgres:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce((workflow::json)->'metadata'->>'labels', '{}') as labels, coalesce((workflow::json)->'metadata'->>'annotations', '{}') as annotations, coalesce((workflow::json)->'status'->>'progress', '') as progress"), nil
}
return nil, fmt.Errorf("unsupported db type %s", t)
}
Loading