From 584d51ef9baa7980943336866ebb602d82cd0aa2 Mon Sep 17 00:00:00 2001 From: Alan Clucas Date: Tue, 7 May 2024 23:09:18 +0100 Subject: [PATCH] chore: Revert "feat: add sqlite-based memory store for live workflows. Fixes #12025 (#12736)" This reverts commit f1ab5aa32f766090998bcaf5a2706c7e3a6cc608. Signed-off-by: Alan Clucas --- go.mod | 11 +- go.sum | 20 +- persist/sqldb/archived_workflow_labels.go | 26 +- .../sqldb/archived_workflow_labels_test.go | 2 +- persist/sqldb/db_type.go | 1 - persist/sqldb/mocks/WorkflowArchive.go | 39 ++- persist/sqldb/null_workflow_archive.go | 7 +- persist/sqldb/selector.go | 89 ----- persist/sqldb/workflow_archive.go | 62 ++-- pkg/apiclient/argo-kube-client.go | 13 +- server/apiserver/argoserver.go | 15 +- server/utils/list_options.go | 136 -------- server/workflow/store/sqlite_store.go | 306 ------------------ server/workflow/store/sqlite_store_test.go | 155 --------- server/workflow/workflow_server.go | 157 ++++----- server/workflow/workflow_server_test.go | 61 ++-- .../archived_workflow_server.go | 89 ++++- .../archived_workflow_server_test.go | 20 +- test/e2e/fixtures/e2e_suite.go | 6 +- .../estimation/estimator_factory.go | 10 +- .../estimation/estimator_factory_test.go | 8 +- 21 files changed, 275 insertions(+), 958 deletions(-) delete mode 100644 persist/sqldb/selector.go delete mode 100644 server/utils/list_options.go delete mode 100644 server/workflow/store/sqlite_store.go delete mode 100644 server/workflow/store/sqlite_store_test.go diff --git a/go.mod b/go.mod index a78539644814..3954f4c9f8d3 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,6 @@ require ( k8s.io/kubectl v0.26.15 k8s.io/utils v0.0.0-20221107191617-1a15be271d1d sigs.k8s.io/yaml v1.4.0 - zombiezen.com/go/sqlite v1.2.0 ) require ( @@ -106,9 +105,7 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/segmentio/fasthash v1.0.3 // indirect @@ -121,14 +118,10 @@ require ( go.opentelemetry.io/otel/metric v1.22.0 // indirect go.opentelemetry.io/otel/trace v1.22.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/mod v0.14.0 // indirect - golang.org/x/tools v0.17.0 // indirect + golang.org/x/mod v0.12.0 // indirect + golang.org/x/tools v0.13.0 // indirect google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect - modernc.org/libc v1.41.0 // indirect - modernc.org/mathutil v1.6.0 // indirect - modernc.org/memory v1.7.2 // indirect - modernc.org/sqlite v1.29.1 // indirect ) require ( diff --git a/go.sum b/go.sum index 54896be505d1..c584480eb7c7 100644 --- a/go.sum +++ b/go.sum @@ -648,8 +648,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= -github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d/go.mod h1:IuKpRQcYE1Tfu+oAQqaLisqDeXgjyyltCfsaoYN18NQ= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -703,7 +701,6 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= @@ -915,8 +912,8 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= -golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1045,8 +1042,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= -golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1200,23 +1197,16 @@ modernc.org/internal v1.1.0/go.mod h1:IFhfxUE81NbN8Riy+oHylA3PIYgAvIQ5eMufNzg7/Q modernc.org/lex v1.1.1/go.mod h1:6r8o8DLJkAnOsQaGi8fMoi+Vt6LTbDaCrkUK729D8xM= modernc.org/lexer v1.0.4/go.mod h1:tOajb8S4sdfOYitzCgXDFmbVJ/LE0v1fNJ7annTw36U= modernc.org/lexer v1.0.5/go.mod h1:8npHn3u/NxCEtlC/tRSY77x5+WB3HvHMzMVElQ76ayI= -modernc.org/libc v1.41.0 h1:g9YAc6BkKlgORsUWj+JwqoB1wU3o4DE3bM3yvA3k+Gk= -modernc.org/libc v1.41.0/go.mod h1:w0eszPsiXoOnoMJgrXjglgLuDy/bt5RR4y3QzUUeodY= modernc.org/lldb v1.0.4/go.mod h1:AKDI6wUJk7iJS8nRX54St8rq9wUIi3o5YGN3rlejR5o= modernc.org/lldb v1.0.8/go.mod h1:ybOcsZ/RNZo3q8fiGadQFRnD+1Jc+RWGcTPdeilCnUk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= -modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= -modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= modernc.org/ql v1.4.7/go.mod h1:I900l6z8ckpPy1y9VR0gu4pZ9hl9AhmQla4F8KERzdc= modernc.org/sortutil v1.1.0/go.mod h1:ZyL98OQHJgH9IEfN71VsamvJgrtRX9Dj2gX+vH86L1k= modernc.org/sortutil v1.1.1/go.mod h1:DTj/8BqjEBLZFVPYvEGDfFFg94SsfPxQ70R+SQJ98qA= modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= -modernc.org/sqlite v1.29.1 h1:19GY2qvWB4VPw0HppFlZCPAbmxFU41r+qjKZQdQ1ryA= -modernc.org/sqlite v1.29.1/go.mod h1:hG41jCYxOAOoO6BRK66AdRlmOcDzXf7qnwlwjUIOqa0= modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= modernc.org/zappy v1.0.5/go.mod h1:Q5T4ra3/JJNORGK16oe8rRAti7kWtRW4Z93fzin2gBc= @@ -1239,5 +1229,3 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= -zombiezen.com/go/sqlite v1.2.0 h1:jja0Ubpzpl6bjr/bSaPyvafHO+extoDJJXIaqXT7VOU= -zombiezen.com/go/sqlite v1.2.0/go.mod h1:yRl27//s/9aXU3RWs8uFQwjkTG9gYNGEls6+6SvrclY= diff --git a/persist/sqldb/archived_workflow_labels.go b/persist/sqldb/archived_workflow_labels.go index add2bbad4bf3..04ce353ce209 100644 --- a/persist/sqldb/archived_workflow_labels.go +++ b/persist/sqldb/archived_workflow_labels.go @@ -52,9 +52,9 @@ func (r *workflowArchive) ListWorkflowsLabelValues(key string) (*wfv1.LabelValue return &wfv1.LabelValues{Items: labels}, nil } -func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements, tableName, labelTableName string, hasClusterName bool) (db.Selector, error) { +func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements) (db.Selector, error) { for _, req := range requirements { - cond, err := requirementToCondition(t, req, tableName, labelTableName, hasClusterName) + cond, err := requirementToCondition(t, req) if err != nil { return nil, err } @@ -63,40 +63,36 @@ func labelsClause(selector db.Selector, t dbType, requirements labels.Requiremen return selector, nil } -func requirementToCondition(t dbType, r labels.Requirement, tableName, labelTableName string, hasClusterName bool) (*db.RawExpr, error) { - clusterNameSelector := "" - if hasClusterName { - clusterNameSelector = fmt.Sprintf("clustername = %s.clustername and", tableName) - } +func requirementToCondition(t dbType, r labels.Requirement) (*db.RawExpr, error) { // Should we "sanitize our inputs"? No. // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ // Valid label values must be 63 characters or less and must be empty or begin and end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between. // https://kb.objectrocket.com/postgresql/casting-in-postgresql-570#string+to+integer+casting switch r.Operator() { case selection.DoesNotExist: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil case selection.Equals, selection.DoubleEquals: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil case selection.In: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil case selection.NotEquals: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil case selection.NotIn: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil case selection.Exists: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil case selection.GreaterThan: i, err := strconv.Atoi(r.Values().List()[0]) if err != nil { return nil, err } - return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) > %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) > %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil case selection.LessThan: i, err := strconv.Atoi(r.Values().List()[0]) if err != nil { return nil, err } - return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) < %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) < %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil } return nil, fmt.Errorf("operation %v is not supported", r.Operator()) } diff --git a/persist/sqldb/archived_workflow_labels_test.go b/persist/sqldb/archived_workflow_labels_test.go index 61f0ca447d1f..144212cfae2e 100644 --- a/persist/sqldb/archived_workflow_labels_test.go +++ b/persist/sqldb/archived_workflow_labels_test.go @@ -31,7 +31,7 @@ func Test_labelsClause(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { for _, req := range tt.requirements { - got, err := requirementToCondition(tt.dbType, req, archiveTableName, archiveLabelsTableName, true) + got, err := requirementToCondition(tt.dbType, req) if assert.NoError(t, err) { assert.Equal(t, tt.want, *got) } diff --git a/persist/sqldb/db_type.go b/persist/sqldb/db_type.go index 258eedb087f3..edf590ed7bf5 100644 --- a/persist/sqldb/db_type.go +++ b/persist/sqldb/db_type.go @@ -12,7 +12,6 @@ type dbType string const ( MySQL dbType = "mysql" Postgres dbType = "postgres" - SQLite dbType = "sqlite" ) func dbTypeFor(session db.Session) dbType { diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index 19f9eed7b0cf..bf9aa0c7a32c 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -4,11 +4,10 @@ package mocks import ( mock "github.com/stretchr/testify/mock" + labels "k8s.io/apimachinery/pkg/labels" time "time" - utils "github.com/argoproj/argo-workflows/v3/server/utils" - v1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -35,9 +34,9 @@ func (_m *WorkflowArchive) ArchiveWorkflow(wf *v1alpha1.Workflow) error { return r0 } -// CountWorkflows provides a mock function with given fields: options -func (_m *WorkflowArchive) CountWorkflows(options utils.ListOptions) (int64, error) { - ret := _m.Called(options) +// CountWorkflows provides a mock function with given fields: namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements +func (_m *WorkflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartAt time.Time, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) { + ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) if len(ret) == 0 { panic("no return value specified for CountWorkflows") @@ -45,17 +44,17 @@ func (_m *WorkflowArchive) CountWorkflows(options utils.ListOptions) (int64, err var r0 int64 var r1 error - if rf, ok := ret.Get(0).(func(utils.ListOptions) (int64, error)); ok { - return rf(options) + if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements) (int64, error)); ok { + return rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) } - if rf, ok := ret.Get(0).(func(utils.ListOptions) int64); ok { - r0 = rf(options) + if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements) int64); ok { + r0 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) } else { r0 = ret.Get(0).(int64) } - if rf, ok := ret.Get(1).(func(utils.ListOptions) error); ok { - r1 = rf(options) + if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements) error); ok { + r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) } else { r1 = ret.Error(1) } @@ -147,9 +146,9 @@ func (_m *WorkflowArchive) IsEnabled() bool { return r0 } -// ListWorkflows provides a mock function with given fields: options -func (_m *WorkflowArchive) ListWorkflows(options utils.ListOptions) (v1alpha1.Workflows, error) { - ret := _m.Called(options) +// ListWorkflows provides a mock function with given fields: namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset +func (_m *WorkflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartAt time.Time, maxStartAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (v1alpha1.Workflows, error) { + ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) if len(ret) == 0 { panic("no return value specified for ListWorkflows") @@ -157,19 +156,19 @@ func (_m *WorkflowArchive) ListWorkflows(options utils.ListOptions) (v1alpha1.Wo var r0 v1alpha1.Workflows var r1 error - if rf, ok := ret.Get(0).(func(utils.ListOptions) (v1alpha1.Workflows, error)); ok { - return rf(options) + if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) (v1alpha1.Workflows, error)); ok { + return rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) } - if rf, ok := ret.Get(0).(func(utils.ListOptions) v1alpha1.Workflows); ok { - r0 = rf(options) + if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) v1alpha1.Workflows); ok { + r0 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(v1alpha1.Workflows) } } - if rf, ok := ret.Get(1).(func(utils.ListOptions) error); ok { - r1 = rf(options) + if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) error); ok { + r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) } else { r1 = ret.Error(1) } diff --git a/persist/sqldb/null_workflow_archive.go b/persist/sqldb/null_workflow_archive.go index e3f4863bcc7c..e8e37b481c9f 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/sqldb/null_workflow_archive.go @@ -4,8 +4,9 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/labels" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - sutils "github.com/argoproj/argo-workflows/v3/server/utils" ) var NullWorkflowArchive WorkflowArchive = &nullWorkflowArchive{} @@ -20,11 +21,11 @@ func (r *nullWorkflowArchive) ArchiveWorkflow(*wfv1.Workflow) error { return nil } -func (r *nullWorkflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { +func (r *nullWorkflowArchive) ListWorkflows(string, string, string, time.Time, time.Time, labels.Requirements, int, int) (wfv1.Workflows, error) { return wfv1.Workflows{}, nil } -func (r *nullWorkflowArchive) CountWorkflows(options sutils.ListOptions) (int64, error) { +func (r *nullWorkflowArchive) CountWorkflows(string, string, string, time.Time, time.Time, labels.Requirements) (int64, error) { return 0, nil } diff --git a/persist/sqldb/selector.go b/persist/sqldb/selector.go deleted file mode 100644 index 5e2b9cbb53ca..000000000000 --- a/persist/sqldb/selector.go +++ /dev/null @@ -1,89 +0,0 @@ -package sqldb - -import ( - "github.com/upper/db/v4" - - "github.com/argoproj/argo-workflows/v3/server/utils" -) - -func BuildArchivedWorkflowSelector(selector db.Selector, tableName, labelTableName string, t dbType, options utils.ListOptions, count bool) (db.Selector, error) { - selector = selector. - And(namespaceEqual(options.Namespace)). - And(nameEqual(options.Name)). - And(namePrefixClause(options.NamePrefix)). - And(startedAtFromClause(options.MinStartedAt)). - And(startedAtToClause(options.MaxStartedAt)) - - selector, err := labelsClause(selector, t, options.LabelRequirements, tableName, labelTableName, true) - if err != nil { - return nil, err - } - if count { - return selector, nil - } - // If we were passed 0 as the limit, then we should load all available archived workflows - // to match the behavior of the `List` operations in the Kubernetes API - if options.Limit == 0 { - options.Limit = -1 - options.Offset = -1 - } - return selector. - OrderBy("-startedat"). - Limit(options.Limit). - Offset(options.Offset), nil -} - -func BuildWorkflowSelector(in string, inArgs []any, tableName, labelTableName string, t dbType, options utils.ListOptions, count bool) (out string, outArgs []any, err error) { - var clauses []*db.RawExpr - if options.Namespace != "" { - clauses = append(clauses, db.Raw("namespace = ?", options.Namespace)) - } - if options.Name != "" { - clauses = append(clauses, db.Raw("name = ?", options.Name)) - } - if options.NamePrefix != "" { - clauses = append(clauses, db.Raw("name like ?", options.NamePrefix+"%")) - } - if !options.MinStartedAt.IsZero() { - clauses = append(clauses, db.Raw("startedat >= ?", options.MinStartedAt)) - } - if !options.MaxStartedAt.IsZero() { - clauses = append(clauses, db.Raw("startedat <= ?", options.MaxStartedAt)) - } - for _, r := range options.LabelRequirements { - q, err := requirementToCondition(t, r, tableName, labelTableName, false) - if err != nil { - return "", nil, err - } - clauses = append(clauses, q) - } - out = in - outArgs = inArgs - for _, c := range clauses { - if c == nil || c.Empty() { - continue - } - out += " and " + c.Raw() - outArgs = append(outArgs, c.Arguments()...) - } - if count { - return out, outArgs, nil - } - if options.StartedAtAscending { - out += " order by startedat asc" - } else { - out += " order by startedat desc" - } - - // If we were passed 0 as the limit, then we should load all available archived workflows - // to match the behavior of the `List` operations in the Kubernetes API - if options.Limit == 0 { - options.Limit = -1 - options.Offset = -1 - } - out += " limit ?" - outArgs = append(outArgs, options.Limit) - out += " offset ?" - outArgs = append(outArgs, options.Offset) - return out, outArgs, nil -} diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 55d4800cfe89..fce2ff97b432 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -9,6 +9,7 @@ import ( "github.com/upper/db/v4" "google.golang.org/grpc/codes" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -58,8 +59,8 @@ type archivedWorkflowCount struct { type WorkflowArchive interface { ArchiveWorkflow(wf *wfv1.Workflow) error // list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent) - ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) - CountWorkflows(options sutils.ListOptions) (int64, error) + ListWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) + CountWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) DeleteWorkflow(uid string) error DeleteExpiredWorkflows(ttl time.Duration) error @@ -145,9 +146,16 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { }) } -func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { +func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) { var archivedWfs []archivedWorkflowMetadata + // If we were passed 0 as the limit, then we should load all available archived workflows + // to match the behavior of the `List` operations in the Kubernetes API + if limit == 0 { + limit = -1 + offset = -1 + } + selectQuery, err := selectArchivedWorkflowQuery(r.dbType) if err != nil { return nil, err @@ -156,14 +164,22 @@ func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workfl selector := r.session.SQL(). Select(selectQuery). From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()) + Where(r.clusterManagedNamespaceAndInstanceID()). + And(namespaceEqual(namespace)). + And(nameEqual(name)). + And(namePrefixClause(namePrefix)). + And(startedAtFromClause(minStartedAt)). + And(startedAtToClause(maxStartedAt)) - selector, err = BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) + selector, err = labelsClause(selector, r.dbType, labelRequirements) if err != nil { return nil, err } - - err = selector.All(&archivedWfs) + err = selector. + OrderBy("-startedat"). + Limit(limit). + Offset(offset). + All(&archivedWfs) if err != nil { return nil, err } @@ -202,15 +218,20 @@ func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workfl return wfs, nil } -func (r *workflowArchive) CountWorkflows(options sutils.ListOptions) (int64, error) { +func (r *workflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements) (int64, error) { total := &archivedWorkflowCount{} selector := r.session.SQL(). Select(db.Raw("count(*) as total")). From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()) + Where(r.clusterManagedNamespaceAndInstanceID()). + And(namespaceEqual(namespace)). + And(nameEqual(name)). + And(namePrefixClause(namePrefix)). + And(startedAtFromClause(minStartedAt)). + And(startedAtToClause(maxStartedAt)) - selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, true) + selector, err := labelsClause(selector, r.dbType, labelRequirements) if err != nil { return 0, err } @@ -232,37 +253,40 @@ func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() *db.AndExpr { func startedAtFromClause(from time.Time) db.Cond { if !from.IsZero() { - return db.Cond{"startedat >=": from} + return db.Cond{"startedat > ": from} } return db.Cond{} } func startedAtToClause(to time.Time) db.Cond { if !to.IsZero() { - return db.Cond{"startedat <=": to} + return db.Cond{"startedat < ": to} } return db.Cond{} } func namespaceEqual(namespace string) db.Cond { - if namespace != "" { + if namespace == "" { + return db.Cond{} + } else { return db.Cond{"namespace": namespace} } - return db.Cond{} } func nameEqual(name string) db.Cond { - if name != "" { + if name == "" { + return db.Cond{} + } else { return db.Cond{"name": name} } - return db.Cond{} } func namePrefixClause(namePrefix string) db.Cond { - if namePrefix != "" { - return db.Cond{"name LIKE": namePrefix + "%"} + if namePrefix == "" { + return db.Cond{} + } else { + return db.Cond{"name LIKE ": namePrefix + "%"} } - return db.Cond{} } func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) { diff --git a/pkg/apiclient/argo-kube-client.go b/pkg/apiclient/argo-kube-client.go index 8f4f32c91d29..b56deb251852 100644 --- a/pkg/apiclient/argo-kube-client.go +++ b/pkg/apiclient/argo-kube-client.go @@ -25,7 +25,7 @@ import ( cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow" "github.com/argoproj/argo-workflows/v3/server/types" workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow" - "github.com/argoproj/argo-workflows/v3/server/workflow/store" + "github.com/argoproj/argo-workflows/v3/server/workflowarchive" workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" "github.com/argoproj/argo-workflows/v3/util/help" "github.com/argoproj/argo-workflows/v3/util/instanceid" @@ -38,8 +38,6 @@ var ( type argoKubeClient struct { instanceIDService instanceid.Service - wfClient workflow.Interface - wfStore store.WorkflowStore } var _ Client = &argoKubeClient{} @@ -86,16 +84,13 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, if err != nil { return nil, nil, err } - wfStore, err := store.NewSQLiteStore(instanceIDService) - if err != nil { - return nil, nil, err - } - return ctx, &argoKubeClient{instanceIDService, wfClient, wfStore}, nil + return ctx, &argoKubeClient{instanceIDService}, nil } func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient { wfArchive := sqldb.NullWorkflowArchive - return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfStore)}} + wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, argoKubeOffloadNodeStatusRepo) + return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}} } func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) { diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 83b7ac0d34bc..815698ea6eb9 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -55,7 +55,6 @@ import ( "github.com/argoproj/argo-workflows/v3/server/static" "github.com/argoproj/argo-workflows/v3/server/types" "github.com/argoproj/argo-workflows/v3/server/workflow" - "github.com/argoproj/argo-workflows/v3/server/workflow/store" "github.com/argoproj/argo-workflows/v3/server/workflowarchive" "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" grpcutil "github.com/argoproj/argo-workflows/v3/util/grpc" @@ -231,13 +230,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st artifactRepositories := artifactrepositories.New(as.clients.Kubernetes, as.managedNamespace, &config.ArtifactRepository) artifactServer := artifacts.NewArtifactServer(as.gatekeeper, hydrator.New(offloadRepo), wfArchive, instanceIDService, artifactRepositories) eventServer := event.NewController(instanceIDService, eventRecorderManager, as.eventQueueSize, as.eventWorkerCount, as.eventAsyncDispatch) - wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadRepo) - wfStore, err := store.NewSQLiteStore(instanceIDService) - if err != nil { - log.Fatal(err) - } - workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore) - grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor) + grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, eventServer, config.Links, config.Columns, config.NavColor) httpServer := as.newHTTPServer(ctx, port, artifactServer) // Start listener @@ -267,7 +260,6 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st grpcL := tcpm.Match(cmux.Any()) go eventServer.Run(as.stopCh) - go workflowServer.Run(as.stopCh) go func() { as.checkServeErr("grpcServer", grpcServer.Serve(grpcL)) }() go func() { as.checkServeErr("httpServer", httpServer.Serve(httpL)) }() go func() { as.checkServeErr("tcpm", tcpm.Serve()) }() @@ -284,7 +276,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st <-as.stopCh } -func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workflowServer workflowpkg.WorkflowServiceServer, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server { +func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server { serverLog := log.NewEntry(log.StandardLogger()) // "Prometheus histograms are a great way to measure latency distributions of your RPCs. However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. To enable them please call the following in your server initialization code:" @@ -316,11 +308,12 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workfl } grpcServer := grpc.NewServer(sOpts...) + wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadNodeStatusRepo) infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links, columns, navColor)) eventpkg.RegisterEventServiceServer(grpcServer, eventServer) eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer()) sensorpkg.RegisterSensorServiceServer(grpcServer, sensor.NewSensorServer()) - workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflowServer) + workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(instanceIDService, offloadNodeStatusRepo, wfArchiveServer)) workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService)) cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService)) workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, wfArchiveServer) diff --git a/server/utils/list_options.go b/server/utils/list_options.go deleted file mode 100644 index 697c09b43905..000000000000 --- a/server/utils/list_options.go +++ /dev/null @@ -1,136 +0,0 @@ -package utils - -import ( - "fmt" - "strconv" - "strings" - "time" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" -) - -type ListOptions struct { - Namespace, Name, NamePrefix string - MinStartedAt, MaxStartedAt time.Time - LabelRequirements labels.Requirements - Limit, Offset int - ShowRemainingItemCount bool - StartedAtAscending bool -} - -func (l ListOptions) WithLimit(limit int) ListOptions { - l.Limit = limit - return l -} - -func (l ListOptions) WithOffset(offset int) ListOptions { - l.Offset = offset - return l -} - -func (l ListOptions) WithShowRemainingItemCount(showRemainingItemCount bool) ListOptions { - l.ShowRemainingItemCount = showRemainingItemCount - return l -} - -func (l ListOptions) WithMaxStartedAt(maxStartedAt time.Time) ListOptions { - l.MaxStartedAt = maxStartedAt - return l -} - -func (l ListOptions) WithMinStartedAt(minStartedAt time.Time) ListOptions { - l.MinStartedAt = minStartedAt - return l -} - -func (l ListOptions) WithStartedAtAscending(ascending bool) ListOptions { - l.StartedAtAscending = ascending - return l -} - -func BuildListOptions(options *metav1.ListOptions, ns, namePrefix string) (ListOptions, error) { - if options == nil { - options = &metav1.ListOptions{} - } - if options.Continue == "" { - options.Continue = "0" - } - limit := int(options.Limit) - - offset, err := strconv.Atoi(options.Continue) - if err != nil { - // no need to use sutils here - return ListOptions{}, status.Error(codes.InvalidArgument, "listOptions.continue must be int") - } - if offset < 0 { - // no need to use sutils here - return ListOptions{}, status.Error(codes.InvalidArgument, "listOptions.continue must >= 0") - } - - // namespace is now specified as its own query parameter - // note that for backward compatibility, the field selector 'metadata.namespace' is also supported for now - namespace := ns // optional - name := "" - minStartedAt := time.Time{} - maxStartedAt := time.Time{} - showRemainingItemCount := false - for _, selector := range strings.Split(options.FieldSelector, ",") { - if len(selector) == 0 { - continue - } - if strings.HasPrefix(selector, "metadata.namespace=") { - // for backward compatibility, the field selector 'metadata.namespace' is supported for now despite the addition - // of the new 'namespace' query parameter, which is what the UI uses - fieldSelectedNamespace := strings.TrimPrefix(selector, "metadata.namespace=") - switch namespace { - case "": - namespace = fieldSelectedNamespace - case fieldSelectedNamespace: - break - default: - return ListOptions{}, status.Errorf(codes.InvalidArgument, - "'namespace' query param (%q) and fieldselector 'metadata.namespace' (%q) are both specified and contradict each other", namespace, fieldSelectedNamespace) - } - } else if strings.HasPrefix(selector, "metadata.name=") { - name = strings.TrimPrefix(selector, "metadata.name=") - } else if strings.HasPrefix(selector, "spec.startedAt>") { - minStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt>")) - if err != nil { - // startedAt is populated by us, it should therefore be valid. - return ListOptions{}, ToStatusError(err, codes.Internal) - } - } else if strings.HasPrefix(selector, "spec.startedAt<") { - maxStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt<")) - if err != nil { - // no need to use sutils here - return ListOptions{}, ToStatusError(err, codes.Internal) - } - } else if strings.HasPrefix(selector, "ext.showRemainingItemCount") { - showRemainingItemCount, err = strconv.ParseBool(strings.TrimPrefix(selector, "ext.showRemainingItemCount=")) - if err != nil { - // populated by us, it should therefore be valid. - return ListOptions{}, ToStatusError(err, codes.Internal) - } - } else { - return ListOptions{}, ToStatusError(fmt.Errorf("unsupported requirement %s", selector), codes.InvalidArgument) - } - } - requirements, err := labels.ParseToRequirements(options.LabelSelector) - if err != nil { - return ListOptions{}, ToStatusError(err, codes.InvalidArgument) - } - return ListOptions{ - Namespace: namespace, - Name: name, - NamePrefix: namePrefix, - MinStartedAt: minStartedAt, - MaxStartedAt: maxStartedAt, - LabelRequirements: requirements, - Limit: limit, - Offset: offset, - ShowRemainingItemCount: showRemainingItemCount, - }, nil -} diff --git a/server/workflow/store/sqlite_store.go b/server/workflow/store/sqlite_store.go deleted file mode 100644 index 8819219e55c1..000000000000 --- a/server/workflow/store/sqlite_store.go +++ /dev/null @@ -1,306 +0,0 @@ -package store - -import ( - "encoding/json" - "fmt" - - log "github.com/sirupsen/logrus" - "k8s.io/client-go/tools/cache" - "zombiezen.com/go/sqlite" - "zombiezen.com/go/sqlite/sqlitex" - - "github.com/argoproj/argo-workflows/v3/persist/sqldb" - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - sutils "github.com/argoproj/argo-workflows/v3/server/utils" - "github.com/argoproj/argo-workflows/v3/util/instanceid" - "github.com/argoproj/argo-workflows/v3/workflow/common" -) - -const ( - workflowTableName = "argo_workflows" - workflowLabelsTableName = "argo_workflows_labels" - tableInitializationQuery = `create table if not exists argo_workflows ( - uid varchar(128) not null, - instanceid varchar(64), - name varchar(256), - namespace varchar(256), - phase varchar(25), - startedat timestamp, - finishedat timestamp, - workflow text, - primary key (uid) -); -create index if not exists idx_instanceid on argo_workflows (instanceid); -create table if not exists argo_workflows_labels ( - uid varchar(128) not null, - name varchar(317) not null, - value varchar(63) not null, - primary key (uid, name, value), - foreign key (uid) references argo_workflows (uid) on delete cascade -); -create index if not exists idx_name_value on argo_workflows_labels (name, value); -` - insertWorkflowQuery = `insert into argo_workflows (uid, instanceid, name, namespace, phase, startedat, finishedat, workflow) values (?, ?, ?, ?, ?, ?, ?, ?)` - insertWorkflowLabelQuery = `insert into argo_workflows_labels (uid, name, value) values (?, ?, ?)` - deleteWorkflowQuery = `delete from argo_workflows where uid = ?` -) - -func initDB() (*sqlite.Conn, error) { - conn, err := sqlite.OpenConn(":memory:", sqlite.OpenReadWrite) - if err != nil { - return nil, err - } - err = sqlitex.ExecuteTransient(conn, "pragma foreign_keys = on", nil) - if err != nil { - return nil, fmt.Errorf("failed to enable foreign key support: %w", err) - } - - err = sqlitex.ExecuteScript(conn, tableInitializationQuery, nil) - if err != nil { - return nil, err - } - return conn, nil -} - -type WorkflowStore interface { - cache.Store - ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error) - CountWorkflows(options sutils.ListOptions) (int64, error) -} - -// sqliteStore is a sqlite-based store. -type sqliteStore struct { - conn *sqlite.Conn - instanceService instanceid.Service -} - -var _ WorkflowStore = &sqliteStore{} - -func NewSQLiteStore(instanceService instanceid.Service) (WorkflowStore, error) { - conn, err := initDB() - if err != nil { - return nil, err - } - return &sqliteStore{conn: conn, instanceService: instanceService}, nil -} - -func (s *sqliteStore) ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error) { - query := `select workflow from argo_workflows -where instanceid = ? -` - args := []any{s.instanceService.InstanceID()} - - query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, false) - if err != nil { - return nil, err - } - - var workflows = wfv1.Workflows{} - err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ - Args: args, - ResultFunc: func(stmt *sqlite.Stmt) error { - wf := stmt.ColumnText(0) - w := wfv1.Workflow{} - err := json.Unmarshal([]byte(wf), &w) - if err != nil { - log.WithFields(log.Fields{"workflow": wf}).Errorln("unable to unmarshal workflow from database") - } else { - workflows = append(workflows, w) - } - return nil - }, - }) - if err != nil { - return nil, err - } - - return workflows, nil -} - -func (s *sqliteStore) CountWorkflows(options sutils.ListOptions) (int64, error) { - query := `select count(*) as total from argo_workflows -where instanceid = ? -` - args := []any{s.instanceService.InstanceID()} - - options.Limit = 0 - options.Offset = 0 - query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, true) - if err != nil { - return 0, err - } - - var total int64 - err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ - Args: args, - ResultFunc: func(stmt *sqlite.Stmt) error { - total = stmt.ColumnInt64(0) - return nil - }, - }) - if err != nil { - return 0, err - } - return total, nil -} - -func (s *sqliteStore) Add(obj interface{}) error { - wf, ok := obj.(*wfv1.Workflow) - if !ok { - return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) - } - done := sqlitex.Transaction(s.conn) - err := s.upsertWorkflow(wf) - defer done(&err) - return err -} - -func (s *sqliteStore) Update(obj interface{}) error { - wf, ok := obj.(*wfv1.Workflow) - if !ok { - return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) - } - done := sqlitex.Transaction(s.conn) - err := s.upsertWorkflow(wf) - defer done(&err) - return err -} - -func (s *sqliteStore) Delete(obj interface{}) error { - wf, ok := obj.(*wfv1.Workflow) - if !ok { - return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) - } - return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) -} - -func (s *sqliteStore) Replace(list []interface{}, resourceVersion string) error { - wfs := make([]*wfv1.Workflow, 0, len(list)) - for _, obj := range list { - wf, ok := obj.(*wfv1.Workflow) - if !ok { - return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) - } - wfs = append(wfs, wf) - } - done := sqlitex.Transaction(s.conn) - err := s.replaceWorkflows(wfs) - defer done(&err) - return err -} - -func (s *sqliteStore) Resync() error { - return nil -} - -func (s *sqliteStore) List() []interface{} { - panic("not implemented") -} - -func (s *sqliteStore) ListKeys() []string { - panic("not implemented") -} - -func (s *sqliteStore) Get(obj interface{}) (item interface{}, exists bool, err error) { - panic("not implemented") -} - -func (s *sqliteStore) GetByKey(key string) (item interface{}, exists bool, err error) { - panic("not implemented") -} - -func (s *sqliteStore) upsertWorkflow(wf *wfv1.Workflow) error { - err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) - if err != nil { - return err - } - // if workflow is archived, we don't need to store it in the sqlite store, we get if from the archive store instead - if wf.GetLabels()[common.LabelKeyWorkflowArchivingStatus] == "Archived" { - return nil - } - workflow, err := json.Marshal(wf) - if err != nil { - return err - } - err = sqlitex.Execute(s.conn, insertWorkflowQuery, - &sqlitex.ExecOptions{ - Args: []any{string(wf.UID), s.instanceService.InstanceID(), wf.Name, wf.Namespace, wf.Status.Phase, wf.Status.StartedAt.Time, wf.Status.FinishedAt.Time, string(workflow)}, - }, - ) - if err != nil { - return err - } - stmt, err := s.conn.Prepare(insertWorkflowLabelQuery) - if err != nil { - return err - } - for key, value := range wf.GetLabels() { - if err = stmt.Reset(); err != nil { - return err - } - stmt.BindText(1, string(wf.UID)) - stmt.BindText(2, key) - stmt.BindText(3, value) - if _, err = stmt.Step(); err != nil { - return err - } - } - return nil -} - -func (s *sqliteStore) replaceWorkflows(workflows []*wfv1.Workflow) error { - err := sqlitex.Execute(s.conn, `delete from argo_workflows`, nil) - if err != nil { - return err - } - wfs := make([]*wfv1.Workflow, 0, len(workflows)) - for _, wf := range workflows { - // if workflow is archived, we don't need to store it in the sqlite store, we get if from the archive store instead - if wf.GetLabels()[common.LabelKeyWorkflowArchivingStatus] != "Archived" { - wfs = append(wfs, wf) - } - } - // add all workflows to argo_workflows table - stmt, err := s.conn.Prepare(insertWorkflowQuery) - if err != nil { - return err - } - for _, wf := range wfs { - if err = stmt.Reset(); err != nil { - return err - } - stmt.BindText(1, string(wf.UID)) - stmt.BindText(2, s.instanceService.InstanceID()) - stmt.BindText(3, wf.Name) - stmt.BindText(4, wf.Namespace) - stmt.BindText(5, string(wf.Status.Phase)) - stmt.BindText(6, wf.Status.StartedAt.String()) - stmt.BindText(7, wf.Status.FinishedAt.String()) - workflow, err := json.Marshal(wf) - if err != nil { - return err - } - stmt.BindText(8, string(workflow)) - if _, err = stmt.Step(); err != nil { - return err - } - } - stmt, err = s.conn.Prepare(insertWorkflowLabelQuery) - if err != nil { - return err - } - for _, wf := range wfs { - for key, val := range wf.GetLabels() { - if err = stmt.Reset(); err != nil { - return err - } - stmt.BindText(1, string(wf.UID)) - stmt.BindText(2, key) - stmt.BindText(3, val) - if _, err = stmt.Step(); err != nil { - return err - } - } - } - return nil -} diff --git a/server/workflow/store/sqlite_store_test.go b/server/workflow/store/sqlite_store_test.go deleted file mode 100644 index 11e6d731152c..000000000000 --- a/server/workflow/store/sqlite_store_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package store - -import ( - "encoding/json" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "zombiezen.com/go/sqlite" - "zombiezen.com/go/sqlite/sqlitex" - - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - sutils "github.com/argoproj/argo-workflows/v3/server/utils" - "github.com/argoproj/argo-workflows/v3/util/instanceid" -) - -func TestInitDB(t *testing.T) { - conn, err := initDB() - assert.NoError(t, err) - defer conn.Close() - t.Run("TestTablesCreated", func(t *testing.T) { - err = sqlitex.Execute(conn, `select name from sqlite_master where type='table'`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - name := stmt.ColumnText(0) - assert.Contains(t, []string{workflowTableName, workflowLabelsTableName}, name) - return nil - }, - }) - require.NoError(t, err) - }) - t.Run("TestForeignKeysEnabled", func(t *testing.T) { - err = sqlitex.Execute(conn, `pragma foreign_keys`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - assert.Equal(t, "1", stmt.ColumnText(0)) - return nil - }, - }) - require.NoError(t, err) - }) - t.Run("TestIndexesCreated", func(t *testing.T) { - var indexes []string - err = sqlitex.Execute(conn, `select name from sqlite_master where type='index'`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - name := stmt.ColumnText(0) - indexes = append(indexes, name) - return nil - }, - }) - require.NoError(t, err) - assert.Contains(t, indexes, "idx_instanceid") - assert.Contains(t, indexes, "idx_name_value") - }) - t.Run("TestForeignKeysAdded", func(t *testing.T) { - err = sqlitex.Execute(conn, `pragma foreign_key_list('argo_workflows_labels')`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - assert.Equal(t, "argo_workflows", stmt.ColumnText(2)) - assert.Equal(t, "uid", stmt.ColumnText(3)) - assert.Equal(t, "uid", stmt.ColumnText(4)) - assert.Equal(t, "CASCADE", stmt.ColumnText(6)) - return nil - }, - }) - require.NoError(t, err) - }) -} - -func TestStoreOperation(t *testing.T) { - instanceIdSvc := instanceid.NewService("my-instanceid") - conn, err := initDB() - require.NoError(t, err) - store := sqliteStore{ - conn: conn, - instanceService: instanceIdSvc, - } - t.Run("TestAddWorkflow", func(t *testing.T) { - for i := 0; i < 10; i++ { - require.NoError(t, store.Add(generateWorkflow(i))) - } - num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"}) - require.NoError(t, err) - assert.Equal(t, int64(10), num) - // Labels are also added - require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - assert.Equal(t, 10*4, stmt.ColumnInt(0)) - return nil - }, - })) - }) - t.Run("TestUpdateWorkflow", func(t *testing.T) { - wf := generateWorkflow(0) - wf.Labels["test-label-2"] = "value-2" - require.NoError(t, store.Update(wf)) - // workflow is updated - require.NoError(t, sqlitex.Execute(conn, `select workflow from argo_workflows where uid = 'uid-0'`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - w := stmt.ColumnText(0) - require.NoError(t, json.Unmarshal([]byte(w), &wf)) - assert.Len(t, wf.Labels, 5) - return nil - }, - })) - require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels where name = 'test-label-2' and value = 'value-2'`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - assert.Equal(t, 1, stmt.ColumnInt(0)) - return nil - }, - })) - }) - t.Run("TestDeleteWorkflow", func(t *testing.T) { - wf := generateWorkflow(0) - require.NoError(t, store.Delete(wf)) - // workflow is deleted - require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows where uid = 'uid-0'`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - assert.Equal(t, 0, stmt.ColumnInt(0)) - return nil - }, - })) - // labels are also deleted - require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels where uid = 'uid-0'`, &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - assert.Equal(t, 0, stmt.ColumnInt(0)) - return nil - }, - })) - }) - t.Run("TestListWorkflows", func(t *testing.T) { - wfList, err := store.ListWorkflows(sutils.ListOptions{Namespace: "argo", Limit: 5}) - require.NoError(t, err) - assert.Len(t, wfList, 5) - }) - t.Run("TestCountWorkflows", func(t *testing.T) { - num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"}) - require.NoError(t, err) - assert.Equal(t, int64(9), num) - }) -} - -func generateWorkflow(uid int) *wfv1.Workflow { - return &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{ - UID: types.UID(fmt.Sprintf("uid-%d", uid)), - Name: fmt.Sprintf("workflow-%d", uid), - Namespace: "argo", - Labels: map[string]string{ - "workflows.argoproj.io/completed": "true", - "workflows.argoproj.io/phase": "Succeeded", - "workflows.argoproj.io/controller-instanceid": "my-instanceid", - "test-label": fmt.Sprintf("label-%d", uid), - }, - }} -} diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index e009a9371935..584278e21c6f 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -7,29 +7,24 @@ import ( "io" "sort" "sync" - "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/persist/sqldb" workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" + workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/argoproj/argo-workflows/v3/server/auth" sutils "github.com/argoproj/argo-workflows/v3/server/utils" - "github.com/argoproj/argo-workflows/v3/server/workflow/store" argoutil "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/fields" "github.com/argoproj/argo-workflows/v3/util/instanceid" @@ -42,39 +37,18 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/validate" ) -const ( - latestAlias = "@latest" - reSyncDuration = 20 * time.Minute -) - type workflowServer struct { instanceIDService instanceid.Service offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo hydrator hydrator.Interface - wfArchive sqldb.WorkflowArchive - wfReflector *cache.Reflector - wfStore store.WorkflowStore + wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer } -var _ workflowpkg.WorkflowServiceServer = &workflowServer{} - -// NewWorkflowServer returns a new WorkflowServer -func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfStore store.WorkflowStore) *workflowServer { - ctx := context.Background() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).List(ctx, options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).Watch(ctx, options) - }, - } - wfReflector := cache.NewReflector(lw, &wfv1.Workflow{}, wfStore, reSyncDuration) - return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo), wfArchive, wfReflector, wfStore} -} +const latestAlias = "@latest" -func (s *workflowServer) Run(stopCh <-chan struct{}) { - s.wfReflector.Run(stopCh) +// NewWorkflowServer returns a new workflowServer +func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer) workflowpkg.WorkflowServiceServer { + return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo), wfArchiveServer} } func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*wfv1.Workflow, error) { @@ -155,72 +129,65 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf return wf, nil } +func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.WorkflowList, numWfsToKeep int) *wfv1.WorkflowList { + var mergedWfs []wfv1.Workflow + var uidToWfs = map[types.UID][]wfv1.Workflow{} + for _, item := range liveWfs.Items { + uidToWfs[item.UID] = append(uidToWfs[item.UID], item) + } + for _, item := range archivedWfs.Items { + uidToWfs[item.UID] = append(uidToWfs[item.UID], item) + } + + for _, v := range uidToWfs { + // The archived workflow we saved in the database have "Persisted" as the archival status. + // Prioritize 'Archived' over 'Persisted' because 'Archived' means the workflow is in the cluster + if len(v) == 1 { + mergedWfs = append(mergedWfs, v[0]) + } else { + if ok := v[0].Labels[common.LabelKeyWorkflowArchivingStatus] == "Archived"; ok { + mergedWfs = append(mergedWfs, v[0]) + } else { + mergedWfs = append(mergedWfs, v[1]) + } + } + } + mergedWfsList := wfv1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta} + sort.Sort(mergedWfsList.Items) + numWfs := 0 + var finalWfs []wfv1.Workflow + for _, item := range mergedWfsList.Items { + if numWfsToKeep == 0 || numWfs < numWfsToKeep { + finalWfs = append(finalWfs, item) + numWfs += 1 + } + } + return &wfv1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta} +} + func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) { + wfClient := auth.GetWfClient(ctx) + listOption := &metav1.ListOptions{} if req.ListOptions != nil { listOption = req.ListOptions } s.instanceIDService.With(listOption) - - options, err := sutils.BuildListOptions(req.ListOptions, req.Namespace, "") - if err != nil { - return nil, err - } - // verify if we have permission to list Workflows - allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, options.Namespace, "") + wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *listOption) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } - if !allowed { - return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", options.Namespace, options.Namespace)) - } - - var wfs wfv1.Workflows - liveWfCount, err := s.wfStore.CountWorkflows(options) - if err != nil { - return nil, sutils.ToStatusError(err, codes.Internal) - } - archivedCount, err := s.wfArchive.CountWorkflows(options) + archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ + ListOptions: listOption, + NamePrefix: "", + Namespace: req.Namespace, + }) if err != nil { - return nil, sutils.ToStatusError(err, codes.Internal) - } - totalCount := liveWfCount + archivedCount - - // first fetch live workflows - var liveWfList []wfv1.Workflow - if liveWfCount > 0 && (options.Limit == 0 || options.Offset < int(liveWfCount)) { - liveWfList, err = s.wfStore.ListWorkflows(options) - if err != nil { - return nil, sutils.ToStatusError(err, codes.Internal) + log.Warnf("unable to list archived workflows:%v", err) + } else { + if archivedWfList != nil { + wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit)) } - wfs = append(wfs, liveWfList...) - } - - // then fetch archived workflows - if options.Limit == 0 || - int64(options.Offset+options.Limit) > liveWfCount { - archivedOffset := options.Offset - int(liveWfCount) - archivedLimit := options.Limit - if archivedOffset < 0 { - archivedOffset = 0 - archivedLimit = options.Limit - len(liveWfList) - } - archivedWfList, err := s.wfArchive.ListWorkflows(options.WithLimit(archivedLimit).WithOffset(archivedOffset)) - if err != nil { - return nil, sutils.ToStatusError(err, codes.Internal) - } - wfs = append(wfs, archivedWfList...) - } - meta := metav1.ListMeta{ResourceVersion: s.wfReflector.LastSyncResourceVersion()} - remainCount := totalCount - int64(options.Offset) - int64(len(wfs)) - if remainCount < 0 { - remainCount = 0 - } - if remainCount > 0 { - meta.Continue = fmt.Sprintf("%v", options.Offset+len(wfs)) - } - if options.ShowRemainingItemCount { - meta.RemainingItemCount = &remainCount } cleaner := fields.NewCleaner(req.Fields) @@ -229,10 +196,10 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } - for i, wf := range wfs { + for i, wf := range wfList.Items { if wf.Status.IsOffloadNodeStatus() { if s.offloadNodeStatusRepo.IsEnabled() { - wfs[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] + wfList.Items[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] } else { log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled) } @@ -241,9 +208,9 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor } // we make no promises about the overall list sorting, we just sort each page - sort.Sort(wfs) + sort.Sort(wfList.Items) - res := &wfv1.WorkflowList{ListMeta: meta, Items: wfs} + res := &wfv1.WorkflowList{ListMeta: metav1.ListMeta{Continue: wfList.Continue, ResourceVersion: wfList.ResourceVersion}, Items: wfList.Items} newRes := &wfv1.WorkflowList{} if ok, err := cleaner.Clean(res, &newRes); err != nil { return nil, sutils.ToStatusError(fmt.Errorf("unable to CleanFields in request: %w", err), codes.Internal) @@ -698,15 +665,15 @@ func (s *workflowServer) getWorkflow(ctx context.Context, wfClient versioned.Int var err error wf, origErr := wfClient.ArgoprojV1alpha1().Workflows(namespace).Get(ctx, name, options) if wf == nil || origErr != nil { - wf, err = s.wfArchive.GetWorkflow("", namespace, name) + wf, err = s.wfArchiveServer.GetArchivedWorkflow(ctx, &workflowarchivepkg.GetArchivedWorkflowRequest{ + Namespace: namespace, + Name: name, + }) if err != nil { log.Errorf("failed to get live workflow: %v; failed to get archived workflow: %v", origErr, err) // We only return the original error to preserve the original status code. return nil, sutils.ToStatusError(origErr, codes.Internal) } - if wf == nil { - return nil, status.Error(codes.NotFound, "not found") - } } return wf, nil } diff --git a/server/workflow/workflow_server_test.go b/server/workflow/workflow_server_test.go index 0150bdf70f43..e91e672ba1cb 100644 --- a/server/workflow/workflow_server_test.go +++ b/server/workflow/workflow_server_test.go @@ -5,11 +5,11 @@ import ( "fmt" "testing" + "time" "github.com/go-jose/go-jose/v3/jwt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - authorizationv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -25,8 +25,7 @@ import ( v1alpha "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/server/auth" "github.com/argoproj/argo-workflows/v3/server/auth/types" - sutils "github.com/argoproj/argo-workflows/v3/server/utils" - "github.com/argoproj/argo-workflows/v3/server/workflow/store" + "github.com/argoproj/argo-workflows/v3/server/workflowarchive" "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/instanceid" "github.com/argoproj/argo-workflows/v3/workflow/common" @@ -137,7 +136,7 @@ const wf2 = ` "namespace": "workflows", "resourceVersion": "52919656", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-b6h5m", - "uid": "91066a6c-1ddc-11ea-b443-42010aa80074" + "uid": "91066a6c-1ddc-11ea-b443-42010aa80075" }, "spec": { @@ -200,7 +199,7 @@ const wf3 = ` "namespace": "test", "resourceVersion": "53020772", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-9tql2", - "uid": "6522aff1-1e01-11ea-b443-42010aa80074" + "uid": "6522aff1-1e01-11ea-b443-42010aa80075" }, "spec": { @@ -326,7 +325,7 @@ const wf5 = ` "namespace": "workflows", "resourceVersion": "53020772", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-9tql2", - "uid": "6522aff1-1e01-11ea-b443-42010aa80073" + "uid": "6522aff1-1e01-11ea-b443-42010aa80075" }, "spec": { @@ -575,6 +574,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { v1alpha1.MustUnmarshal(unlabelled, &unlabelledObj) v1alpha1.MustUnmarshal(wf1, &wfObj1) + v1alpha1.MustUnmarshal(wf1, &wfObj1) v1alpha1.MustUnmarshal(wf2, &wfObj2) v1alpha1.MustUnmarshal(wf3, &wfObj3) v1alpha1.MustUnmarshal(wf4, &wfObj4) @@ -590,6 +590,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo := &mocks.WorkflowArchive{} + wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo, offloadNodeStatusRepo) archivedRepo.On("GetWorkflow", "", "test", "hello-world-9tql2-test").Return(&v1alpha1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "hello-world-9tql2-test", Namespace: "test"}, Spec: v1alpha1.WorkflowSpec{ @@ -603,37 +604,11 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo.On("GetWorkflow", "", "test", "unlabelled").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "latest").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "hello-world-9tql2-not").Return(nil, nil) - archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "workflows"}).Return(int64(2), nil) - archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "workflows", Limit: -2}).Return(v1alpha1.Workflows{wfObj2, failedWfObj}, nil) - archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "test"}).Return(int64(1), nil) - archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "test", Limit: -1}).Return(v1alpha1.Workflows{wfObj4}, nil) - + server := NewWorkflowServer(instanceid.NewService("my-instanceid"), offloadNodeStatusRepo, wfaServer) kubeClientSet := fake.NewSimpleClientset() - kubeClientSet.PrependReactor("create", "selfsubjectaccessreviews", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { - return true, &authorizationv1.SelfSubjectAccessReview{ - Status: authorizationv1.SubjectAccessReviewStatus{Allowed: true}, - }, nil - }) wfClientset := v1alpha.NewSimpleClientset(&unlabelledObj, &wfObj1, &wfObj2, &wfObj3, &wfObj4, &wfObj5, &failedWfObj, &wftmpl, &cronwfObj, &cwfTmpl) wfClientset.PrependReactor("create", "workflows", generateNameReactor) ctx := context.WithValue(context.WithValue(context.WithValue(context.TODO(), auth.WfKey, wfClientset), auth.KubeKey, kubeClientSet), auth.ClaimsKey, &types.Claims{Claims: jwt.Claims{Subject: "my-sub"}}) - listOptions := &metav1.ListOptions{} - instanceIdSvc := instanceid.NewService("my-instanceid") - instanceIdSvc.With(listOptions) - wfStore, err := store.NewSQLiteStore(instanceIdSvc) - if err != nil { - panic(err) - } - if err = wfStore.Add(&wfObj1); err != nil { - panic(err) - } - if err = wfStore.Add(&wfObj3); err != nil { - panic(err) - } - if err = wfStore.Add(&wfObj5); err != nil { - panic(err) - } - server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore) return server, ctx } @@ -675,6 +650,26 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error { panic("implement me") } +func TestMergeWithArchivedWorkflows(t *testing.T) { + timeNow := time.Now() + wf1Live := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, + Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Archived"}}} + wf1Archived := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, + Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Persisted"}}} + wf2 := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "2", CreationTimestamp: metav1.Time{Time: timeNow.Add(2 * time.Second)}}} + wf3 := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}} + liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}} + archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}} + expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}} + expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}} + assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items) + assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items) +} + func TestWatchWorkflows(t *testing.T) { server, ctx := getWorkflowServer() wf := &v1alpha1.Workflow{ diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index 2103e7592594..e1a70e3bed7b 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -6,6 +6,9 @@ import ( "os" "regexp" "sort" + "strconv" + "strings" + "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" @@ -40,42 +43,108 @@ func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatus } func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) { + options := req.ListOptions + namePrefix := req.NamePrefix + if options == nil { + options = &metav1.ListOptions{} + } + if options.Continue == "" { + options.Continue = "0" + } + limit := int(options.Limit) - options, err := sutils.BuildListOptions(req.ListOptions, req.Namespace, req.NamePrefix) + offset, err := strconv.Atoi(options.Continue) + if err != nil { + // no need to use sutils here + return nil, status.Error(codes.InvalidArgument, "listOptions.continue must be int") + } + if offset < 0 { + // no need to use sutils here + return nil, status.Error(codes.InvalidArgument, "listOptions.continue must >= 0") + } + + // namespace is now specified as its own query parameter + // note that for backward compatibility, the field selector 'metadata.namespace' is also supported for now + namespace := req.Namespace // optional + name := "" + minStartedAt := time.Time{} + maxStartedAt := time.Time{} + showRemainingItemCount := false + for _, selector := range strings.Split(options.FieldSelector, ",") { + if len(selector) == 0 { + continue + } + if strings.HasPrefix(selector, "metadata.namespace=") { + // for backward compatibility, the field selector 'metadata.namespace' is supported for now despite the addition + // of the new 'namespace' query parameter, which is what the UI uses + fieldSelectedNamespace := strings.TrimPrefix(selector, "metadata.namespace=") + switch namespace { + case "": + namespace = fieldSelectedNamespace + case fieldSelectedNamespace: + break + default: + return nil, status.Errorf(codes.InvalidArgument, + "'namespace' query param (%q) and fieldselector 'metadata.namespace' (%q) are both specified and contradict each other", namespace, fieldSelectedNamespace) + } + } else if strings.HasPrefix(selector, "metadata.name=") { + name = strings.TrimPrefix(selector, "metadata.name=") + } else if strings.HasPrefix(selector, "spec.startedAt>") { + minStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt>")) + if err != nil { + // startedAt is populated by us, it should therefore be valid. + return nil, sutils.ToStatusError(err, codes.Internal) + } + } else if strings.HasPrefix(selector, "spec.startedAt<") { + maxStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt<")) + if err != nil { + // no need to use sutils here + return nil, sutils.ToStatusError(err, codes.Internal) + } + } else if strings.HasPrefix(selector, "ext.showRemainingItemCount") { + showRemainingItemCount, err = strconv.ParseBool(strings.TrimPrefix(selector, "ext.showRemainingItemCount=")) + if err != nil { + // populated by us, it should therefore be valid. + return nil, sutils.ToStatusError(err, codes.Internal) + } + } else { + return nil, sutils.ToStatusError(fmt.Errorf("unsupported requirement %s", selector), codes.InvalidArgument) + } + } + requirements, err := labels.ParseToRequirements(options.LabelSelector) if err != nil { - return nil, err + return nil, sutils.ToStatusError(err, codes.InvalidArgument) } // verify if we have permission to list Workflows - allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, options.Namespace, "") + allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, namespace, "") if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } if !allowed { - return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", options.Namespace, options.Namespace)) + return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", namespace, namespace)) } - limit := options.Limit - offset := options.Offset // When the zero value is passed, we should treat this as returning all results // to align ourselves with the behavior of the `List` endpoints in the Kubernetes API loadAll := limit == 0 + limitWithMore := 0 if !loadAll { // Attempt to load 1 more record than we actually need as an easy way to determine whether or not more // records exist than we're currently requesting - options.Limit += 1 + limitWithMore = limit + 1 } - items, err := w.wfArchive.ListWorkflows(options) + items, err := w.wfArchive.ListWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements, limitWithMore, offset) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } meta := metav1.ListMeta{} - if options.ShowRemainingItemCount && !loadAll { - total, err := w.wfArchive.CountWorkflows(options) + if showRemainingItemCount && !loadAll { + total, err := w.wfArchive.CountWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } diff --git a/server/workflowarchive/archived_workflow_server_test.go b/server/workflowarchive/archived_workflow_server_test.go index 7f26bbbb20eb..50d68d27db7b 100644 --- a/server/workflowarchive/archived_workflow_server_test.go +++ b/server/workflowarchive/archived_workflow_server_test.go @@ -13,6 +13,7 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" kubefake "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" @@ -24,7 +25,6 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/server/auth" - sutils "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/workflow/common" ) @@ -54,20 +54,18 @@ func Test_archivedWorkflowServer(t *testing.T) { }, nil }) // two pages of results for limit 1 - repo.On("ListWorkflows", sutils.ListOptions{Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}, {}}, nil) - repo.On("ListWorkflows", sutils.ListOptions{Limit: 2, Offset: 1}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil) + repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 1).Return(wfv1.Workflows{{}}, nil) minStartAt, _ := time.Parse(time.RFC3339, "2020-01-01T00:00:00Z") maxStartAt, _ := time.Parse(time.RFC3339, "2020-01-02T00:00:00Z") createdTime := metav1.Time{Time: time.Now().UTC()} finishedTime := metav1.Time{Time: createdTime.Add(time.Second * 2)} - repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "", NamePrefix: "", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0, ShowRemainingItemCount: true}).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", sutils.ListOptions{Namespace: "user-ns", Name: "", NamePrefix: "", MinStartedAt: time.Time{}, MaxStartedAt: time.Time{}, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}, {}}, nil) - repo.On("CountWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(int64(5), nil) - repo.On("CountWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0, ShowRemainingItemCount: true}).Return(int64(5), nil) + repo.On("ListWorkflows", "", "", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", "", "my-name", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", "", "", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", "user-ns", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil) + repo.On("CountWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil)).Return(int64(5), nil) repo.On("GetWorkflow", "", "", "").Return(nil, nil) repo.On("GetWorkflow", "my-uid", "", "").Return(&wfv1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "my-name"}, diff --git a/test/e2e/fixtures/e2e_suite.go b/test/e2e/fixtures/e2e_suite.go index 350398faf74d..c518f7a51062 100644 --- a/test/e2e/fixtures/e2e_suite.go +++ b/test/e2e/fixtures/e2e_suite.go @@ -8,7 +8,6 @@ import ( "os" "time" - "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/util/secrets" apierr "k8s.io/apimachinery/pkg/api/errors" @@ -178,10 +177,7 @@ func (s *E2ESuite) DeleteResources() { archive := s.Persistence.workflowArchive parse, err := labels.ParseToRequirements(Label) s.CheckError(err) - workflows, err := archive.ListWorkflows(utils.ListOptions{ - Namespace: Namespace, - LabelRequirements: parse, - }) + workflows, err := archive.ListWorkflows(Namespace, "", "", time.Time{}, time.Time{}, parse, 0, 0) s.CheckError(err) for _, w := range workflows { err := archive.DeleteWorkflow(string(w.UID)) diff --git a/workflow/controller/estimation/estimator_factory.go b/workflow/controller/estimation/estimator_factory.go index 984e76d98cdc..60311eb8f5c3 100644 --- a/workflow/controller/estimation/estimator_factory.go +++ b/workflow/controller/estimation/estimator_factory.go @@ -2,6 +2,7 @@ package estimation import ( "fmt" + "time" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -9,7 +10,6 @@ import ( "github.com/argoproj/argo-workflows/v3/persist/sqldb" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" "github.com/argoproj/argo-workflows/v3/workflow/hydrator" @@ -76,13 +76,7 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) { if err != nil { return defaultEstimator, fmt.Errorf("failed to parse selector to requirements: %v", err) } - workflows, err := f.wfArchive.ListWorkflows( - utils.ListOptions{ - Namespace: wf.Namespace, - LabelRequirements: requirements, - Limit: 1, - Offset: 0, - }) + workflows, err := f.wfArchive.ListWorkflows(wf.Namespace, "", "", time.Time{}, time.Time{}, requirements, 1, 0) if err != nil { return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err) } diff --git a/workflow/controller/estimation/estimator_factory_test.go b/workflow/controller/estimation/estimator_factory_test.go index c4bb0ab06981..aeef2a7128c3 100644 --- a/workflow/controller/estimation/estimator_factory_test.go +++ b/workflow/controller/estimation/estimator_factory_test.go @@ -2,6 +2,7 @@ package estimation import ( "testing" + "time" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -9,7 +10,6 @@ import ( sqldbmocks "github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - "github.com/argoproj/argo-workflows/v3/server/utils" testutil "github.com/argoproj/argo-workflows/v3/test/util" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" @@ -53,11 +53,7 @@ metadata: wfArchive := &sqldbmocks.WorkflowArchive{} r, err := labels.ParseToRequirements("workflows.argoproj.io/phase=Succeeded,workflows.argoproj.io/workflow-template=my-archived-wftmpl") assert.NoError(t, err) - wfArchive.On("ListWorkflows", utils.ListOptions{ - Namespace: "my-ns", - LabelRequirements: r, - Limit: 1, - }).Return(wfv1.Workflows{ + wfArchive.On("ListWorkflows", "my-ns", "", "", time.Time{}, time.Time{}, labels.Requirements(r), 1, 0).Return(wfv1.Workflows{ *testutil.MustUnmarshalWorkflow(` metadata: name: my-archived-wftmpl-baseline`),