diff --git a/go.mod b/go.mod index ffe8b951b11a..6c573fd965eb 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( golang.org/x/crypto v0.22.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/oauth2 v0.13.0 - golang.org/x/sync v0.5.0 + golang.org/x/sync v0.6.0 golang.org/x/time v0.4.0 google.golang.org/api v0.151.0 google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b @@ -74,6 +74,7 @@ require ( k8s.io/kubectl v0.24.3 k8s.io/utils v0.0.0-20220713171938-56c0de1e6f5e sigs.k8s.io/yaml v1.4.0 + zombiezen.com/go/sqlite v1.2.0 ) require ( @@ -97,7 +98,10 @@ require ( github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/goidentity/v6 v6.0.1 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/sagikazarmark/locafero v0.3.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/segmentio/fasthash v1.0.3 // indirect @@ -106,10 +110,14 @@ require ( github.com/vbatts/tar-split v0.11.3 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/mod v0.12.0 // indirect - golang.org/x/tools v0.13.0 // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/tools v0.17.0 // indirect google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect + modernc.org/libc v1.41.0 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.29.1 // indirect ) require ( diff --git a/go.sum b/go.sum index 4b93d5f78011..37dc8a22af64 100644 --- a/go.sum +++ b/go.sum @@ -805,6 +805,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d/go.mod h1:IuKpRQcYE1Tfu+oAQqaLisqDeXgjyyltCfsaoYN18NQ= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -882,6 +884,7 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= @@ -1155,8 +1158,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= -golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1188,8 +1191,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1383,8 +1386,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.10-0.20220218145154-897bd77cd717/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= -golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= +golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1634,16 +1637,23 @@ modernc.org/internal v1.1.0/go.mod h1:IFhfxUE81NbN8Riy+oHylA3PIYgAvIQ5eMufNzg7/Q modernc.org/lex v1.1.1/go.mod h1:6r8o8DLJkAnOsQaGi8fMoi+Vt6LTbDaCrkUK729D8xM= modernc.org/lexer v1.0.4/go.mod h1:tOajb8S4sdfOYitzCgXDFmbVJ/LE0v1fNJ7annTw36U= modernc.org/lexer v1.0.5/go.mod h1:8npHn3u/NxCEtlC/tRSY77x5+WB3HvHMzMVElQ76ayI= +modernc.org/libc v1.41.0 h1:g9YAc6BkKlgORsUWj+JwqoB1wU3o4DE3bM3yvA3k+Gk= +modernc.org/libc v1.41.0/go.mod h1:w0eszPsiXoOnoMJgrXjglgLuDy/bt5RR4y3QzUUeodY= modernc.org/lldb v1.0.4/go.mod h1:AKDI6wUJk7iJS8nRX54St8rq9wUIi3o5YGN3rlejR5o= modernc.org/lldb v1.0.8/go.mod h1:ybOcsZ/RNZo3q8fiGadQFRnD+1Jc+RWGcTPdeilCnUk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= modernc.org/ql v1.4.7/go.mod h1:I900l6z8ckpPy1y9VR0gu4pZ9hl9AhmQla4F8KERzdc= modernc.org/sortutil v1.1.0/go.mod h1:ZyL98OQHJgH9IEfN71VsamvJgrtRX9Dj2gX+vH86L1k= modernc.org/sortutil v1.1.1/go.mod h1:DTj/8BqjEBLZFVPYvEGDfFFg94SsfPxQ70R+SQJ98qA= modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= +modernc.org/sqlite v1.29.1 h1:19GY2qvWB4VPw0HppFlZCPAbmxFU41r+qjKZQdQ1ryA= +modernc.org/sqlite v1.29.1/go.mod h1:hG41jCYxOAOoO6BRK66AdRlmOcDzXf7qnwlwjUIOqa0= modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= modernc.org/zappy v1.0.5/go.mod h1:Q5T4ra3/JJNORGK16oe8rRAti7kWtRW4Z93fzin2gBc= @@ -1672,3 +1682,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +zombiezen.com/go/sqlite v1.2.0 h1:jja0Ubpzpl6bjr/bSaPyvafHO+extoDJJXIaqXT7VOU= +zombiezen.com/go/sqlite v1.2.0/go.mod h1:yRl27//s/9aXU3RWs8uFQwjkTG9gYNGEls6+6SvrclY= diff --git a/persist/sqldb/archived_workflow_labels.go b/persist/sqldb/archived_workflow_labels.go index 04ce353ce209..add2bbad4bf3 100644 --- a/persist/sqldb/archived_workflow_labels.go +++ b/persist/sqldb/archived_workflow_labels.go @@ -52,9 +52,9 @@ func (r *workflowArchive) ListWorkflowsLabelValues(key string) (*wfv1.LabelValue return &wfv1.LabelValues{Items: labels}, nil } -func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements) (db.Selector, error) { +func labelsClause(selector db.Selector, t dbType, requirements labels.Requirements, tableName, labelTableName string, hasClusterName bool) (db.Selector, error) { for _, req := range requirements { - cond, err := requirementToCondition(t, req) + cond, err := requirementToCondition(t, req, tableName, labelTableName, hasClusterName) if err != nil { return nil, err } @@ -63,36 +63,40 @@ func labelsClause(selector db.Selector, t dbType, requirements labels.Requiremen return selector, nil } -func requirementToCondition(t dbType, r labels.Requirement) (*db.RawExpr, error) { +func requirementToCondition(t dbType, r labels.Requirement, tableName, labelTableName string, hasClusterName bool) (*db.RawExpr, error) { + clusterNameSelector := "" + if hasClusterName { + clusterNameSelector = fmt.Sprintf("clustername = %s.clustername and", tableName) + } // Should we "sanitize our inputs"? No. // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ // Valid label values must be 63 characters or less and must be empty or begin and end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between. // https://kb.objectrocket.com/postgresql/casting-in-postgresql-570#string+to+integer+casting switch r.Operator() { case selection.DoesNotExist: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil case selection.Equals, selection.DoubleEquals: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil case selection.In: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil case selection.NotEquals: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), r.Values().List()[0])), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value = '%s')", labelTableName, clusterNameSelector, tableName, r.Key(), r.Values().List()[0])), nil case selection.NotIn: - return db.Raw(fmt.Sprintf("not exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and value in ('%s'))", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil + return db.Raw(fmt.Sprintf("not exists (select 1 from %s where %s uid = %s.uid and name = '%s' and value in ('%s'))", labelTableName, clusterNameSelector, tableName, r.Key(), strings.Join(r.Values().List(), "', '"))), nil case selection.Exists: - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s')", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key())), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s')", labelTableName, clusterNameSelector, tableName, r.Key())), nil case selection.GreaterThan: i, err := strconv.Atoi(r.Values().List()[0]) if err != nil { return nil, err } - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) > %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) > %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil case selection.LessThan: i, err := strconv.Atoi(r.Values().List()[0]) if err != nil { return nil, err } - return db.Raw(fmt.Sprintf("exists (select 1 from %s where clustername = %s.clustername and uid = %s.uid and name = '%s' and cast(value as %s) < %d)", archiveLabelsTableName, archiveTableName, archiveTableName, r.Key(), t.intType(), i)), nil + return db.Raw(fmt.Sprintf("exists (select 1 from %s where %s uid = %s.uid and name = '%s' and cast(value as %s) < %d)", labelTableName, clusterNameSelector, tableName, r.Key(), t.intType(), i)), nil } return nil, fmt.Errorf("operation %v is not supported", r.Operator()) } diff --git a/persist/sqldb/archived_workflow_labels_test.go b/persist/sqldb/archived_workflow_labels_test.go index 144212cfae2e..61f0ca447d1f 100644 --- a/persist/sqldb/archived_workflow_labels_test.go +++ b/persist/sqldb/archived_workflow_labels_test.go @@ -31,7 +31,7 @@ func Test_labelsClause(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { for _, req := range tt.requirements { - got, err := requirementToCondition(tt.dbType, req) + got, err := requirementToCondition(tt.dbType, req, archiveTableName, archiveLabelsTableName, true) if assert.NoError(t, err) { assert.Equal(t, tt.want, *got) } diff --git a/persist/sqldb/db_type.go b/persist/sqldb/db_type.go index edf590ed7bf5..258eedb087f3 100644 --- a/persist/sqldb/db_type.go +++ b/persist/sqldb/db_type.go @@ -12,6 +12,7 @@ type dbType string const ( MySQL dbType = "mysql" Postgres dbType = "postgres" + SQLite dbType = "sqlite" ) func dbTypeFor(session db.Session) dbType { diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index 16edf80c38fd..634961944077 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -4,10 +4,11 @@ package mocks import ( mock "github.com/stretchr/testify/mock" - labels "k8s.io/apimachinery/pkg/labels" time "time" + utils "github.com/argoproj/argo-workflows/v3/server/utils" + v1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -30,23 +31,23 @@ func (_m *WorkflowArchive) ArchiveWorkflow(wf *v1alpha1.Workflow) error { return r0 } -// CountWorkflows provides a mock function with given fields: namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements -func (_m *WorkflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartAt time.Time, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) { - ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) +// CountWorkflows provides a mock function with given fields: options +func (_m *WorkflowArchive) CountWorkflows(options utils.ListOptions) (int64, error) { + ret := _m.Called(options) var r0 int64 var r1 error - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements) (int64, error)); ok { - return rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + if rf, ok := ret.Get(0).(func(utils.ListOptions) (int64, error)); ok { + return rf(options) } - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements) int64); ok { - r0 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + if rf, ok := ret.Get(0).(func(utils.ListOptions) int64); ok { + r0 = rf(options) } else { r0 = ret.Get(0).(int64) } - if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements) error); ok { - r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + if rf, ok := ret.Get(1).(func(utils.ListOptions) error); ok { + r1 = rf(options) } else { r1 = ret.Error(1) } @@ -122,25 +123,25 @@ func (_m *WorkflowArchive) IsEnabled() bool { return r0 } -// ListWorkflows provides a mock function with given fields: namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset -func (_m *WorkflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartAt time.Time, maxStartAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (v1alpha1.Workflows, error) { - ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) +// ListWorkflows provides a mock function with given fields: options +func (_m *WorkflowArchive) ListWorkflows(options utils.ListOptions) (v1alpha1.Workflows, error) { + ret := _m.Called(options) var r0 v1alpha1.Workflows var r1 error - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) (v1alpha1.Workflows, error)); ok { - return rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) + if rf, ok := ret.Get(0).(func(utils.ListOptions) (v1alpha1.Workflows, error)); ok { + return rf(options) } - if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) v1alpha1.Workflows); ok { - r0 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) + if rf, ok := ret.Get(0).(func(utils.ListOptions) v1alpha1.Workflows); ok { + r0 = rf(options) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(v1alpha1.Workflows) } } - if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements, int, int) error); ok { - r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements, limit, offset) + if rf, ok := ret.Get(1).(func(utils.ListOptions) error); ok { + r1 = rf(options) } else { r1 = ret.Error(1) } diff --git a/persist/sqldb/null_workflow_archive.go b/persist/sqldb/null_workflow_archive.go index e8e37b481c9f..e3f4863bcc7c 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/sqldb/null_workflow_archive.go @@ -4,9 +4,8 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/labels" - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" ) var NullWorkflowArchive WorkflowArchive = &nullWorkflowArchive{} @@ -21,11 +20,11 @@ func (r *nullWorkflowArchive) ArchiveWorkflow(*wfv1.Workflow) error { return nil } -func (r *nullWorkflowArchive) ListWorkflows(string, string, string, time.Time, time.Time, labels.Requirements, int, int) (wfv1.Workflows, error) { +func (r *nullWorkflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { return wfv1.Workflows{}, nil } -func (r *nullWorkflowArchive) CountWorkflows(string, string, string, time.Time, time.Time, labels.Requirements) (int64, error) { +func (r *nullWorkflowArchive) CountWorkflows(options sutils.ListOptions) (int64, error) { return 0, nil } diff --git a/persist/sqldb/selector.go b/persist/sqldb/selector.go new file mode 100644 index 000000000000..5e2b9cbb53ca --- /dev/null +++ b/persist/sqldb/selector.go @@ -0,0 +1,89 @@ +package sqldb + +import ( + "github.com/upper/db/v4" + + "github.com/argoproj/argo-workflows/v3/server/utils" +) + +func BuildArchivedWorkflowSelector(selector db.Selector, tableName, labelTableName string, t dbType, options utils.ListOptions, count bool) (db.Selector, error) { + selector = selector. + And(namespaceEqual(options.Namespace)). + And(nameEqual(options.Name)). + And(namePrefixClause(options.NamePrefix)). + And(startedAtFromClause(options.MinStartedAt)). + And(startedAtToClause(options.MaxStartedAt)) + + selector, err := labelsClause(selector, t, options.LabelRequirements, tableName, labelTableName, true) + if err != nil { + return nil, err + } + if count { + return selector, nil + } + // If we were passed 0 as the limit, then we should load all available archived workflows + // to match the behavior of the `List` operations in the Kubernetes API + if options.Limit == 0 { + options.Limit = -1 + options.Offset = -1 + } + return selector. + OrderBy("-startedat"). + Limit(options.Limit). + Offset(options.Offset), nil +} + +func BuildWorkflowSelector(in string, inArgs []any, tableName, labelTableName string, t dbType, options utils.ListOptions, count bool) (out string, outArgs []any, err error) { + var clauses []*db.RawExpr + if options.Namespace != "" { + clauses = append(clauses, db.Raw("namespace = ?", options.Namespace)) + } + if options.Name != "" { + clauses = append(clauses, db.Raw("name = ?", options.Name)) + } + if options.NamePrefix != "" { + clauses = append(clauses, db.Raw("name like ?", options.NamePrefix+"%")) + } + if !options.MinStartedAt.IsZero() { + clauses = append(clauses, db.Raw("startedat >= ?", options.MinStartedAt)) + } + if !options.MaxStartedAt.IsZero() { + clauses = append(clauses, db.Raw("startedat <= ?", options.MaxStartedAt)) + } + for _, r := range options.LabelRequirements { + q, err := requirementToCondition(t, r, tableName, labelTableName, false) + if err != nil { + return "", nil, err + } + clauses = append(clauses, q) + } + out = in + outArgs = inArgs + for _, c := range clauses { + if c == nil || c.Empty() { + continue + } + out += " and " + c.Raw() + outArgs = append(outArgs, c.Arguments()...) + } + if count { + return out, outArgs, nil + } + if options.StartedAtAscending { + out += " order by startedat asc" + } else { + out += " order by startedat desc" + } + + // If we were passed 0 as the limit, then we should load all available archived workflows + // to match the behavior of the `List` operations in the Kubernetes API + if options.Limit == 0 { + options.Limit = -1 + options.Offset = -1 + } + out += " limit ?" + outArgs = append(outArgs, options.Limit) + out += " offset ?" + outArgs = append(outArgs, options.Offset) + return out, outArgs, nil +} diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index fce2ff97b432..55d4800cfe89 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -9,7 +9,6 @@ import ( "github.com/upper/db/v4" "google.golang.org/grpc/codes" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -59,8 +58,8 @@ type archivedWorkflowCount struct { type WorkflowArchive interface { ArchiveWorkflow(wf *wfv1.Workflow) error // list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent) - ListWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) - CountWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) + ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) + CountWorkflows(options sutils.ListOptions) (int64, error) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) DeleteWorkflow(uid string) error DeleteExpiredWorkflows(ttl time.Duration) error @@ -146,16 +145,9 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { }) } -func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) { +func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { var archivedWfs []archivedWorkflowMetadata - // If we were passed 0 as the limit, then we should load all available archived workflows - // to match the behavior of the `List` operations in the Kubernetes API - if limit == 0 { - limit = -1 - offset = -1 - } - selectQuery, err := selectArchivedWorkflowQuery(r.dbType) if err != nil { return nil, err @@ -164,22 +156,14 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi selector := r.session.SQL(). Select(selectQuery). From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()). - And(namespaceEqual(namespace)). - And(nameEqual(name)). - And(namePrefixClause(namePrefix)). - And(startedAtFromClause(minStartedAt)). - And(startedAtToClause(maxStartedAt)) + Where(r.clusterManagedNamespaceAndInstanceID()) - selector, err = labelsClause(selector, r.dbType, labelRequirements) + selector, err = BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) if err != nil { return nil, err } - err = selector. - OrderBy("-startedat"). - Limit(limit). - Offset(offset). - All(&archivedWfs) + + err = selector.All(&archivedWfs) if err != nil { return nil, err } @@ -218,20 +202,15 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi return wfs, nil } -func (r *workflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements) (int64, error) { +func (r *workflowArchive) CountWorkflows(options sutils.ListOptions) (int64, error) { total := &archivedWorkflowCount{} selector := r.session.SQL(). Select(db.Raw("count(*) as total")). From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()). - And(namespaceEqual(namespace)). - And(nameEqual(name)). - And(namePrefixClause(namePrefix)). - And(startedAtFromClause(minStartedAt)). - And(startedAtToClause(maxStartedAt)) + Where(r.clusterManagedNamespaceAndInstanceID()) - selector, err := labelsClause(selector, r.dbType, labelRequirements) + selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, true) if err != nil { return 0, err } @@ -253,40 +232,37 @@ func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() *db.AndExpr { func startedAtFromClause(from time.Time) db.Cond { if !from.IsZero() { - return db.Cond{"startedat > ": from} + return db.Cond{"startedat >=": from} } return db.Cond{} } func startedAtToClause(to time.Time) db.Cond { if !to.IsZero() { - return db.Cond{"startedat < ": to} + return db.Cond{"startedat <=": to} } return db.Cond{} } func namespaceEqual(namespace string) db.Cond { - if namespace == "" { - return db.Cond{} - } else { + if namespace != "" { return db.Cond{"namespace": namespace} } + return db.Cond{} } func nameEqual(name string) db.Cond { - if name == "" { - return db.Cond{} - } else { + if name != "" { return db.Cond{"name": name} } + return db.Cond{} } func namePrefixClause(namePrefix string) db.Cond { - if namePrefix == "" { - return db.Cond{} - } else { - return db.Cond{"name LIKE ": namePrefix + "%"} + if namePrefix != "" { + return db.Cond{"name LIKE": namePrefix + "%"} } + return db.Cond{} } func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) { diff --git a/pkg/apiclient/argo-kube-client.go b/pkg/apiclient/argo-kube-client.go index b56deb251852..8f4f32c91d29 100644 --- a/pkg/apiclient/argo-kube-client.go +++ b/pkg/apiclient/argo-kube-client.go @@ -25,7 +25,7 @@ import ( cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow" "github.com/argoproj/argo-workflows/v3/server/types" workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow" - "github.com/argoproj/argo-workflows/v3/server/workflowarchive" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" "github.com/argoproj/argo-workflows/v3/util/help" "github.com/argoproj/argo-workflows/v3/util/instanceid" @@ -38,6 +38,8 @@ var ( type argoKubeClient struct { instanceIDService instanceid.Service + wfClient workflow.Interface + wfStore store.WorkflowStore } var _ Client = &argoKubeClient{} @@ -84,13 +86,16 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, if err != nil { return nil, nil, err } - return ctx, &argoKubeClient{instanceIDService}, nil + wfStore, err := store.NewSQLiteStore(instanceIDService) + if err != nil { + return nil, nil, err + } + return ctx, &argoKubeClient{instanceIDService, wfClient, wfStore}, nil } func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient { wfArchive := sqldb.NullWorkflowArchive - wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, argoKubeOffloadNodeStatusRepo) - return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}} + return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfStore)}} } func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) { diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index f16a9b28280d..93104ad78f04 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -54,6 +54,7 @@ import ( "github.com/argoproj/argo-workflows/v3/server/static" "github.com/argoproj/argo-workflows/v3/server/types" "github.com/argoproj/argo-workflows/v3/server/workflow" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" "github.com/argoproj/argo-workflows/v3/server/workflowarchive" "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" grpcutil "github.com/argoproj/argo-workflows/v3/util/grpc" @@ -229,7 +230,13 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st artifactRepositories := artifactrepositories.New(as.clients.Kubernetes, as.managedNamespace, &config.ArtifactRepository) artifactServer := artifacts.NewArtifactServer(as.gatekeeper, hydrator.New(offloadRepo), wfArchive, instanceIDService, artifactRepositories) eventServer := event.NewController(instanceIDService, eventRecorderManager, as.eventQueueSize, as.eventWorkerCount, as.eventAsyncDispatch) - grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, eventServer, config.Links, config.Columns, config.NavColor) + wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadRepo) + wfStore, err := store.NewSQLiteStore(instanceIDService) + if err != nil { + log.Fatal(err) + } + workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore) + grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor) httpServer := as.newHTTPServer(ctx, port, artifactServer) // Start listener @@ -259,6 +266,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st grpcL := tcpm.Match(cmux.Any()) go eventServer.Run(as.stopCh) + go workflowServer.Run(as.stopCh) go func() { as.checkServeErr("grpcServer", grpcServer.Serve(grpcL)) }() go func() { as.checkServeErr("httpServer", httpServer.Serve(httpL)) }() go func() { as.checkServeErr("tcpm", tcpm.Serve()) }() @@ -275,7 +283,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st <-as.stopCh } -func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server { +func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workflowServer workflowpkg.WorkflowServiceServer, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer, eventServer *event.Controller, links []*v1alpha1.Link, columns []*v1alpha1.Column, navColor string) *grpc.Server { serverLog := log.NewEntry(log.StandardLogger()) // "Prometheus histograms are a great way to measure latency distributions of your RPCs. However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. To enable them please call the following in your server initialization code:" @@ -307,12 +315,11 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloa } grpcServer := grpc.NewServer(sOpts...) - wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadNodeStatusRepo) infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links, columns, navColor)) eventpkg.RegisterEventServiceServer(grpcServer, eventServer) eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer()) sensorpkg.RegisterSensorServiceServer(grpcServer, sensor.NewSensorServer()) - workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(instanceIDService, offloadNodeStatusRepo, wfArchiveServer)) + workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflowServer) workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService)) cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService)) workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, wfArchiveServer) diff --git a/server/utils/list_options.go b/server/utils/list_options.go new file mode 100644 index 000000000000..697c09b43905 --- /dev/null +++ b/server/utils/list_options.go @@ -0,0 +1,136 @@ +package utils + +import ( + "fmt" + "strconv" + "strings" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +type ListOptions struct { + Namespace, Name, NamePrefix string + MinStartedAt, MaxStartedAt time.Time + LabelRequirements labels.Requirements + Limit, Offset int + ShowRemainingItemCount bool + StartedAtAscending bool +} + +func (l ListOptions) WithLimit(limit int) ListOptions { + l.Limit = limit + return l +} + +func (l ListOptions) WithOffset(offset int) ListOptions { + l.Offset = offset + return l +} + +func (l ListOptions) WithShowRemainingItemCount(showRemainingItemCount bool) ListOptions { + l.ShowRemainingItemCount = showRemainingItemCount + return l +} + +func (l ListOptions) WithMaxStartedAt(maxStartedAt time.Time) ListOptions { + l.MaxStartedAt = maxStartedAt + return l +} + +func (l ListOptions) WithMinStartedAt(minStartedAt time.Time) ListOptions { + l.MinStartedAt = minStartedAt + return l +} + +func (l ListOptions) WithStartedAtAscending(ascending bool) ListOptions { + l.StartedAtAscending = ascending + return l +} + +func BuildListOptions(options *metav1.ListOptions, ns, namePrefix string) (ListOptions, error) { + if options == nil { + options = &metav1.ListOptions{} + } + if options.Continue == "" { + options.Continue = "0" + } + limit := int(options.Limit) + + offset, err := strconv.Atoi(options.Continue) + if err != nil { + // no need to use sutils here + return ListOptions{}, status.Error(codes.InvalidArgument, "listOptions.continue must be int") + } + if offset < 0 { + // no need to use sutils here + return ListOptions{}, status.Error(codes.InvalidArgument, "listOptions.continue must >= 0") + } + + // namespace is now specified as its own query parameter + // note that for backward compatibility, the field selector 'metadata.namespace' is also supported for now + namespace := ns // optional + name := "" + minStartedAt := time.Time{} + maxStartedAt := time.Time{} + showRemainingItemCount := false + for _, selector := range strings.Split(options.FieldSelector, ",") { + if len(selector) == 0 { + continue + } + if strings.HasPrefix(selector, "metadata.namespace=") { + // for backward compatibility, the field selector 'metadata.namespace' is supported for now despite the addition + // of the new 'namespace' query parameter, which is what the UI uses + fieldSelectedNamespace := strings.TrimPrefix(selector, "metadata.namespace=") + switch namespace { + case "": + namespace = fieldSelectedNamespace + case fieldSelectedNamespace: + break + default: + return ListOptions{}, status.Errorf(codes.InvalidArgument, + "'namespace' query param (%q) and fieldselector 'metadata.namespace' (%q) are both specified and contradict each other", namespace, fieldSelectedNamespace) + } + } else if strings.HasPrefix(selector, "metadata.name=") { + name = strings.TrimPrefix(selector, "metadata.name=") + } else if strings.HasPrefix(selector, "spec.startedAt>") { + minStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt>")) + if err != nil { + // startedAt is populated by us, it should therefore be valid. + return ListOptions{}, ToStatusError(err, codes.Internal) + } + } else if strings.HasPrefix(selector, "spec.startedAt<") { + maxStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt<")) + if err != nil { + // no need to use sutils here + return ListOptions{}, ToStatusError(err, codes.Internal) + } + } else if strings.HasPrefix(selector, "ext.showRemainingItemCount") { + showRemainingItemCount, err = strconv.ParseBool(strings.TrimPrefix(selector, "ext.showRemainingItemCount=")) + if err != nil { + // populated by us, it should therefore be valid. + return ListOptions{}, ToStatusError(err, codes.Internal) + } + } else { + return ListOptions{}, ToStatusError(fmt.Errorf("unsupported requirement %s", selector), codes.InvalidArgument) + } + } + requirements, err := labels.ParseToRequirements(options.LabelSelector) + if err != nil { + return ListOptions{}, ToStatusError(err, codes.InvalidArgument) + } + return ListOptions{ + Namespace: namespace, + Name: name, + NamePrefix: namePrefix, + MinStartedAt: minStartedAt, + MaxStartedAt: maxStartedAt, + LabelRequirements: requirements, + Limit: limit, + Offset: offset, + ShowRemainingItemCount: showRemainingItemCount, + }, nil +} diff --git a/server/workflow/store/sqlite_store.go b/server/workflow/store/sqlite_store.go new file mode 100644 index 000000000000..8819219e55c1 --- /dev/null +++ b/server/workflow/store/sqlite_store.go @@ -0,0 +1,306 @@ +package store + +import ( + "encoding/json" + "fmt" + + log "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/cache" + "zombiezen.com/go/sqlite" + "zombiezen.com/go/sqlite/sqlitex" + + "github.com/argoproj/argo-workflows/v3/persist/sqldb" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" + "github.com/argoproj/argo-workflows/v3/util/instanceid" + "github.com/argoproj/argo-workflows/v3/workflow/common" +) + +const ( + workflowTableName = "argo_workflows" + workflowLabelsTableName = "argo_workflows_labels" + tableInitializationQuery = `create table if not exists argo_workflows ( + uid varchar(128) not null, + instanceid varchar(64), + name varchar(256), + namespace varchar(256), + phase varchar(25), + startedat timestamp, + finishedat timestamp, + workflow text, + primary key (uid) +); +create index if not exists idx_instanceid on argo_workflows (instanceid); +create table if not exists argo_workflows_labels ( + uid varchar(128) not null, + name varchar(317) not null, + value varchar(63) not null, + primary key (uid, name, value), + foreign key (uid) references argo_workflows (uid) on delete cascade +); +create index if not exists idx_name_value on argo_workflows_labels (name, value); +` + insertWorkflowQuery = `insert into argo_workflows (uid, instanceid, name, namespace, phase, startedat, finishedat, workflow) values (?, ?, ?, ?, ?, ?, ?, ?)` + insertWorkflowLabelQuery = `insert into argo_workflows_labels (uid, name, value) values (?, ?, ?)` + deleteWorkflowQuery = `delete from argo_workflows where uid = ?` +) + +func initDB() (*sqlite.Conn, error) { + conn, err := sqlite.OpenConn(":memory:", sqlite.OpenReadWrite) + if err != nil { + return nil, err + } + err = sqlitex.ExecuteTransient(conn, "pragma foreign_keys = on", nil) + if err != nil { + return nil, fmt.Errorf("failed to enable foreign key support: %w", err) + } + + err = sqlitex.ExecuteScript(conn, tableInitializationQuery, nil) + if err != nil { + return nil, err + } + return conn, nil +} + +type WorkflowStore interface { + cache.Store + ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error) + CountWorkflows(options sutils.ListOptions) (int64, error) +} + +// sqliteStore is a sqlite-based store. +type sqliteStore struct { + conn *sqlite.Conn + instanceService instanceid.Service +} + +var _ WorkflowStore = &sqliteStore{} + +func NewSQLiteStore(instanceService instanceid.Service) (WorkflowStore, error) { + conn, err := initDB() + if err != nil { + return nil, err + } + return &sqliteStore{conn: conn, instanceService: instanceService}, nil +} + +func (s *sqliteStore) ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error) { + query := `select workflow from argo_workflows +where instanceid = ? +` + args := []any{s.instanceService.InstanceID()} + + query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, false) + if err != nil { + return nil, err + } + + var workflows = wfv1.Workflows{} + err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ + Args: args, + ResultFunc: func(stmt *sqlite.Stmt) error { + wf := stmt.ColumnText(0) + w := wfv1.Workflow{} + err := json.Unmarshal([]byte(wf), &w) + if err != nil { + log.WithFields(log.Fields{"workflow": wf}).Errorln("unable to unmarshal workflow from database") + } else { + workflows = append(workflows, w) + } + return nil + }, + }) + if err != nil { + return nil, err + } + + return workflows, nil +} + +func (s *sqliteStore) CountWorkflows(options sutils.ListOptions) (int64, error) { + query := `select count(*) as total from argo_workflows +where instanceid = ? +` + args := []any{s.instanceService.InstanceID()} + + options.Limit = 0 + options.Offset = 0 + query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, true) + if err != nil { + return 0, err + } + + var total int64 + err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{ + Args: args, + ResultFunc: func(stmt *sqlite.Stmt) error { + total = stmt.ColumnInt64(0) + return nil + }, + }) + if err != nil { + return 0, err + } + return total, nil +} + +func (s *sqliteStore) Add(obj interface{}) error { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + done := sqlitex.Transaction(s.conn) + err := s.upsertWorkflow(wf) + defer done(&err) + return err +} + +func (s *sqliteStore) Update(obj interface{}) error { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + done := sqlitex.Transaction(s.conn) + err := s.upsertWorkflow(wf) + defer done(&err) + return err +} + +func (s *sqliteStore) Delete(obj interface{}) error { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) +} + +func (s *sqliteStore) Replace(list []interface{}, resourceVersion string) error { + wfs := make([]*wfv1.Workflow, 0, len(list)) + for _, obj := range list { + wf, ok := obj.(*wfv1.Workflow) + if !ok { + return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) + } + wfs = append(wfs, wf) + } + done := sqlitex.Transaction(s.conn) + err := s.replaceWorkflows(wfs) + defer done(&err) + return err +} + +func (s *sqliteStore) Resync() error { + return nil +} + +func (s *sqliteStore) List() []interface{} { + panic("not implemented") +} + +func (s *sqliteStore) ListKeys() []string { + panic("not implemented") +} + +func (s *sqliteStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + panic("not implemented") +} + +func (s *sqliteStore) GetByKey(key string) (item interface{}, exists bool, err error) { + panic("not implemented") +} + +func (s *sqliteStore) upsertWorkflow(wf *wfv1.Workflow) error { + err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) + if err != nil { + return err + } + // if workflow is archived, we don't need to store it in the sqlite store, we get if from the archive store instead + if wf.GetLabels()[common.LabelKeyWorkflowArchivingStatus] == "Archived" { + return nil + } + workflow, err := json.Marshal(wf) + if err != nil { + return err + } + err = sqlitex.Execute(s.conn, insertWorkflowQuery, + &sqlitex.ExecOptions{ + Args: []any{string(wf.UID), s.instanceService.InstanceID(), wf.Name, wf.Namespace, wf.Status.Phase, wf.Status.StartedAt.Time, wf.Status.FinishedAt.Time, string(workflow)}, + }, + ) + if err != nil { + return err + } + stmt, err := s.conn.Prepare(insertWorkflowLabelQuery) + if err != nil { + return err + } + for key, value := range wf.GetLabels() { + if err = stmt.Reset(); err != nil { + return err + } + stmt.BindText(1, string(wf.UID)) + stmt.BindText(2, key) + stmt.BindText(3, value) + if _, err = stmt.Step(); err != nil { + return err + } + } + return nil +} + +func (s *sqliteStore) replaceWorkflows(workflows []*wfv1.Workflow) error { + err := sqlitex.Execute(s.conn, `delete from argo_workflows`, nil) + if err != nil { + return err + } + wfs := make([]*wfv1.Workflow, 0, len(workflows)) + for _, wf := range workflows { + // if workflow is archived, we don't need to store it in the sqlite store, we get if from the archive store instead + if wf.GetLabels()[common.LabelKeyWorkflowArchivingStatus] != "Archived" { + wfs = append(wfs, wf) + } + } + // add all workflows to argo_workflows table + stmt, err := s.conn.Prepare(insertWorkflowQuery) + if err != nil { + return err + } + for _, wf := range wfs { + if err = stmt.Reset(); err != nil { + return err + } + stmt.BindText(1, string(wf.UID)) + stmt.BindText(2, s.instanceService.InstanceID()) + stmt.BindText(3, wf.Name) + stmt.BindText(4, wf.Namespace) + stmt.BindText(5, string(wf.Status.Phase)) + stmt.BindText(6, wf.Status.StartedAt.String()) + stmt.BindText(7, wf.Status.FinishedAt.String()) + workflow, err := json.Marshal(wf) + if err != nil { + return err + } + stmt.BindText(8, string(workflow)) + if _, err = stmt.Step(); err != nil { + return err + } + } + stmt, err = s.conn.Prepare(insertWorkflowLabelQuery) + if err != nil { + return err + } + for _, wf := range wfs { + for key, val := range wf.GetLabels() { + if err = stmt.Reset(); err != nil { + return err + } + stmt.BindText(1, string(wf.UID)) + stmt.BindText(2, key) + stmt.BindText(3, val) + if _, err = stmt.Step(); err != nil { + return err + } + } + } + return nil +} diff --git a/server/workflow/store/sqlite_store_test.go b/server/workflow/store/sqlite_store_test.go new file mode 100644 index 000000000000..11e6d731152c --- /dev/null +++ b/server/workflow/store/sqlite_store_test.go @@ -0,0 +1,155 @@ +package store + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "zombiezen.com/go/sqlite" + "zombiezen.com/go/sqlite/sqlitex" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" + "github.com/argoproj/argo-workflows/v3/util/instanceid" +) + +func TestInitDB(t *testing.T) { + conn, err := initDB() + assert.NoError(t, err) + defer conn.Close() + t.Run("TestTablesCreated", func(t *testing.T) { + err = sqlitex.Execute(conn, `select name from sqlite_master where type='table'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + name := stmt.ColumnText(0) + assert.Contains(t, []string{workflowTableName, workflowLabelsTableName}, name) + return nil + }, + }) + require.NoError(t, err) + }) + t.Run("TestForeignKeysEnabled", func(t *testing.T) { + err = sqlitex.Execute(conn, `pragma foreign_keys`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, "1", stmt.ColumnText(0)) + return nil + }, + }) + require.NoError(t, err) + }) + t.Run("TestIndexesCreated", func(t *testing.T) { + var indexes []string + err = sqlitex.Execute(conn, `select name from sqlite_master where type='index'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + name := stmt.ColumnText(0) + indexes = append(indexes, name) + return nil + }, + }) + require.NoError(t, err) + assert.Contains(t, indexes, "idx_instanceid") + assert.Contains(t, indexes, "idx_name_value") + }) + t.Run("TestForeignKeysAdded", func(t *testing.T) { + err = sqlitex.Execute(conn, `pragma foreign_key_list('argo_workflows_labels')`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, "argo_workflows", stmt.ColumnText(2)) + assert.Equal(t, "uid", stmt.ColumnText(3)) + assert.Equal(t, "uid", stmt.ColumnText(4)) + assert.Equal(t, "CASCADE", stmt.ColumnText(6)) + return nil + }, + }) + require.NoError(t, err) + }) +} + +func TestStoreOperation(t *testing.T) { + instanceIdSvc := instanceid.NewService("my-instanceid") + conn, err := initDB() + require.NoError(t, err) + store := sqliteStore{ + conn: conn, + instanceService: instanceIdSvc, + } + t.Run("TestAddWorkflow", func(t *testing.T) { + for i := 0; i < 10; i++ { + require.NoError(t, store.Add(generateWorkflow(i))) + } + num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"}) + require.NoError(t, err) + assert.Equal(t, int64(10), num) + // Labels are also added + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 10*4, stmt.ColumnInt(0)) + return nil + }, + })) + }) + t.Run("TestUpdateWorkflow", func(t *testing.T) { + wf := generateWorkflow(0) + wf.Labels["test-label-2"] = "value-2" + require.NoError(t, store.Update(wf)) + // workflow is updated + require.NoError(t, sqlitex.Execute(conn, `select workflow from argo_workflows where uid = 'uid-0'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + w := stmt.ColumnText(0) + require.NoError(t, json.Unmarshal([]byte(w), &wf)) + assert.Len(t, wf.Labels, 5) + return nil + }, + })) + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels where name = 'test-label-2' and value = 'value-2'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 1, stmt.ColumnInt(0)) + return nil + }, + })) + }) + t.Run("TestDeleteWorkflow", func(t *testing.T) { + wf := generateWorkflow(0) + require.NoError(t, store.Delete(wf)) + // workflow is deleted + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows where uid = 'uid-0'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 0, stmt.ColumnInt(0)) + return nil + }, + })) + // labels are also deleted + require.NoError(t, sqlitex.Execute(conn, `select count(*) from argo_workflows_labels where uid = 'uid-0'`, &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + assert.Equal(t, 0, stmt.ColumnInt(0)) + return nil + }, + })) + }) + t.Run("TestListWorkflows", func(t *testing.T) { + wfList, err := store.ListWorkflows(sutils.ListOptions{Namespace: "argo", Limit: 5}) + require.NoError(t, err) + assert.Len(t, wfList, 5) + }) + t.Run("TestCountWorkflows", func(t *testing.T) { + num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"}) + require.NoError(t, err) + assert.Equal(t, int64(9), num) + }) +} + +func generateWorkflow(uid int) *wfv1.Workflow { + return &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("uid-%d", uid)), + Name: fmt.Sprintf("workflow-%d", uid), + Namespace: "argo", + Labels: map[string]string{ + "workflows.argoproj.io/completed": "true", + "workflows.argoproj.io/phase": "Succeeded", + "workflows.argoproj.io/controller-instanceid": "my-instanceid", + "test-label": fmt.Sprintf("label-%d", uid), + }, + }} +} diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index 16fcc3b8c7d3..40f28e9771ee 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -6,24 +6,29 @@ import ( "fmt" "io" "sort" + "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/persist/sqldb" workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" - workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/argoproj/argo-workflows/v3/server/auth" sutils "github.com/argoproj/argo-workflows/v3/server/utils" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" argoutil "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/fields" "github.com/argoproj/argo-workflows/v3/util/instanceid" @@ -36,18 +41,39 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/validate" ) +const ( + latestAlias = "@latest" + reSyncDuration = 20 * time.Minute +) + type workflowServer struct { instanceIDService instanceid.Service offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo hydrator hydrator.Interface - wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer + wfArchive sqldb.WorkflowArchive + wfReflector *cache.Reflector + wfStore store.WorkflowStore } -const latestAlias = "@latest" +var _ workflowpkg.WorkflowServiceServer = &workflowServer{} + +// NewWorkflowServer returns a new WorkflowServer +func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfStore store.WorkflowStore) *workflowServer { + ctx := context.Background() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).Watch(ctx, options) + }, + } + wfReflector := cache.NewReflector(lw, &wfv1.Workflow{}, wfStore, reSyncDuration) + return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo), wfArchive, wfReflector, wfStore} +} -// NewWorkflowServer returns a new workflowServer -func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchiveServer workflowarchivepkg.ArchivedWorkflowServiceServer) workflowpkg.WorkflowServiceServer { - return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo), wfArchiveServer} +func (s *workflowServer) Run(stopCh <-chan struct{}) { + s.wfReflector.Run(stopCh) } func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*wfv1.Workflow, error) { @@ -128,65 +154,72 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf return wf, nil } -func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.WorkflowList, numWfsToKeep int) *wfv1.WorkflowList { - var mergedWfs []wfv1.Workflow - var uidToWfs = map[types.UID][]wfv1.Workflow{} - for _, item := range liveWfs.Items { - uidToWfs[item.UID] = append(uidToWfs[item.UID], item) - } - for _, item := range archivedWfs.Items { - uidToWfs[item.UID] = append(uidToWfs[item.UID], item) - } - - for _, v := range uidToWfs { - // The archived workflow we saved in the database have "Persisted" as the archival status. - // Prioritize 'Archived' over 'Persisted' because 'Archived' means the workflow is in the cluster - if len(v) == 1 { - mergedWfs = append(mergedWfs, v[0]) - } else { - if ok := v[0].Labels[common.LabelKeyWorkflowArchivingStatus] == "Archived"; ok { - mergedWfs = append(mergedWfs, v[0]) - } else { - mergedWfs = append(mergedWfs, v[1]) - } - } - } - mergedWfsList := wfv1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta} - sort.Sort(mergedWfsList.Items) - numWfs := 0 - var finalWfs []wfv1.Workflow - for _, item := range mergedWfsList.Items { - if numWfsToKeep == 0 || numWfs < numWfsToKeep { - finalWfs = append(finalWfs, item) - numWfs += 1 - } - } - return &wfv1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta} -} - func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) { - wfClient := auth.GetWfClient(ctx) - listOption := &metav1.ListOptions{} if req.ListOptions != nil { listOption = req.ListOptions } s.instanceIDService.With(listOption) - wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *listOption) + + options, err := sutils.BuildListOptions(req.ListOptions, req.Namespace, "") + if err != nil { + return nil, err + } + // verify if we have permission to list Workflows + allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, options.Namespace, "") if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } - archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ - ListOptions: listOption, - NamePrefix: "", - Namespace: req.Namespace, - }) + if !allowed { + return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", options.Namespace, options.Namespace)) + } + + var wfs wfv1.Workflows + liveWfCount, err := s.wfStore.CountWorkflows(options) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) + } + archivedCount, err := s.wfArchive.CountWorkflows(options) if err != nil { - log.Warnf("unable to list archived workflows:%v", err) - } else { - if archivedWfList != nil { - wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit)) + return nil, sutils.ToStatusError(err, codes.Internal) + } + totalCount := liveWfCount + archivedCount + + // first fetch live workflows + var liveWfList []wfv1.Workflow + if liveWfCount > 0 && (options.Limit == 0 || options.Offset < int(liveWfCount)) { + liveWfList, err = s.wfStore.ListWorkflows(options) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) } + wfs = append(wfs, liveWfList...) + } + + // then fetch archived workflows + if options.Limit == 0 || + int64(options.Offset+options.Limit) > liveWfCount { + archivedOffset := options.Offset - int(liveWfCount) + archivedLimit := options.Limit + if archivedOffset < 0 { + archivedOffset = 0 + archivedLimit = options.Limit - len(liveWfList) + } + archivedWfList, err := s.wfArchive.ListWorkflows(options.WithLimit(archivedLimit).WithOffset(archivedOffset)) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) + } + wfs = append(wfs, archivedWfList...) + } + meta := metav1.ListMeta{ResourceVersion: s.wfReflector.LastSyncResourceVersion()} + remainCount := totalCount - int64(options.Offset) - int64(len(wfs)) + if remainCount < 0 { + remainCount = 0 + } + if remainCount > 0 { + meta.Continue = fmt.Sprintf("%v", options.Offset+len(wfs)) + } + if options.ShowRemainingItemCount { + meta.RemainingItemCount = &remainCount } cleaner := fields.NewCleaner(req.Fields) @@ -195,10 +228,10 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } - for i, wf := range wfList.Items { + for i, wf := range wfs { if wf.Status.IsOffloadNodeStatus() { if s.offloadNodeStatusRepo.IsEnabled() { - wfList.Items[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] + wfs[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}] } else { log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled) } @@ -207,9 +240,9 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor } // we make no promises about the overall list sorting, we just sort each page - sort.Sort(wfList.Items) + sort.Sort(wfs) - res := &wfv1.WorkflowList{ListMeta: metav1.ListMeta{Continue: wfList.Continue, ResourceVersion: wfList.ResourceVersion}, Items: wfList.Items} + res := &wfv1.WorkflowList{ListMeta: meta, Items: wfs} newRes := &wfv1.WorkflowList{} if ok, err := cleaner.Clean(res, &newRes); err != nil { return nil, sutils.ToStatusError(fmt.Errorf("unable to CleanFields in request: %w", err), codes.Internal) @@ -642,15 +675,15 @@ func (s *workflowServer) getWorkflow(ctx context.Context, wfClient versioned.Int var err error wf, origErr := wfClient.ArgoprojV1alpha1().Workflows(namespace).Get(ctx, name, options) if wf == nil || origErr != nil { - wf, err = s.wfArchiveServer.GetArchivedWorkflow(ctx, &workflowarchivepkg.GetArchivedWorkflowRequest{ - Namespace: namespace, - Name: name, - }) + wf, err = s.wfArchive.GetWorkflow("", namespace, name) if err != nil { log.Errorf("failed to get live workflow: %v; failed to get archived workflow: %v", origErr, err) // We only return the original error to preserve the original status code. return nil, sutils.ToStatusError(origErr, codes.Internal) } + if wf == nil { + return nil, status.Error(codes.NotFound, "not found") + } } return wf, nil } diff --git a/server/workflow/workflow_server_test.go b/server/workflow/workflow_server_test.go index e91e672ba1cb..0150bdf70f43 100644 --- a/server/workflow/workflow_server_test.go +++ b/server/workflow/workflow_server_test.go @@ -5,11 +5,11 @@ import ( "fmt" "testing" - "time" "github.com/go-jose/go-jose/v3/jwt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + authorizationv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -25,7 +25,8 @@ import ( v1alpha "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/server/auth" "github.com/argoproj/argo-workflows/v3/server/auth/types" - "github.com/argoproj/argo-workflows/v3/server/workflowarchive" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/instanceid" "github.com/argoproj/argo-workflows/v3/workflow/common" @@ -136,7 +137,7 @@ const wf2 = ` "namespace": "workflows", "resourceVersion": "52919656", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-b6h5m", - "uid": "91066a6c-1ddc-11ea-b443-42010aa80075" + "uid": "91066a6c-1ddc-11ea-b443-42010aa80074" }, "spec": { @@ -199,7 +200,7 @@ const wf3 = ` "namespace": "test", "resourceVersion": "53020772", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-9tql2", - "uid": "6522aff1-1e01-11ea-b443-42010aa80075" + "uid": "6522aff1-1e01-11ea-b443-42010aa80074" }, "spec": { @@ -325,7 +326,7 @@ const wf5 = ` "namespace": "workflows", "resourceVersion": "53020772", "selfLink": "/apis/argoproj.io/v1alpha1/namespaces/workflows/workflows/hello-world-9tql2", - "uid": "6522aff1-1e01-11ea-b443-42010aa80075" + "uid": "6522aff1-1e01-11ea-b443-42010aa80073" }, "spec": { @@ -574,7 +575,6 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { v1alpha1.MustUnmarshal(unlabelled, &unlabelledObj) v1alpha1.MustUnmarshal(wf1, &wfObj1) - v1alpha1.MustUnmarshal(wf1, &wfObj1) v1alpha1.MustUnmarshal(wf2, &wfObj2) v1alpha1.MustUnmarshal(wf3, &wfObj3) v1alpha1.MustUnmarshal(wf4, &wfObj4) @@ -590,7 +590,6 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo := &mocks.WorkflowArchive{} - wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo, offloadNodeStatusRepo) archivedRepo.On("GetWorkflow", "", "test", "hello-world-9tql2-test").Return(&v1alpha1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "hello-world-9tql2-test", Namespace: "test"}, Spec: v1alpha1.WorkflowSpec{ @@ -604,11 +603,37 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo.On("GetWorkflow", "", "test", "unlabelled").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "latest").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "hello-world-9tql2-not").Return(nil, nil) - server := NewWorkflowServer(instanceid.NewService("my-instanceid"), offloadNodeStatusRepo, wfaServer) + archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "workflows"}).Return(int64(2), nil) + archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "workflows", Limit: -2}).Return(v1alpha1.Workflows{wfObj2, failedWfObj}, nil) + archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "test"}).Return(int64(1), nil) + archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "test", Limit: -1}).Return(v1alpha1.Workflows{wfObj4}, nil) + kubeClientSet := fake.NewSimpleClientset() + kubeClientSet.PrependReactor("create", "selfsubjectaccessreviews", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &authorizationv1.SelfSubjectAccessReview{ + Status: authorizationv1.SubjectAccessReviewStatus{Allowed: true}, + }, nil + }) wfClientset := v1alpha.NewSimpleClientset(&unlabelledObj, &wfObj1, &wfObj2, &wfObj3, &wfObj4, &wfObj5, &failedWfObj, &wftmpl, &cronwfObj, &cwfTmpl) wfClientset.PrependReactor("create", "workflows", generateNameReactor) ctx := context.WithValue(context.WithValue(context.WithValue(context.TODO(), auth.WfKey, wfClientset), auth.KubeKey, kubeClientSet), auth.ClaimsKey, &types.Claims{Claims: jwt.Claims{Subject: "my-sub"}}) + listOptions := &metav1.ListOptions{} + instanceIdSvc := instanceid.NewService("my-instanceid") + instanceIdSvc.With(listOptions) + wfStore, err := store.NewSQLiteStore(instanceIdSvc) + if err != nil { + panic(err) + } + if err = wfStore.Add(&wfObj1); err != nil { + panic(err) + } + if err = wfStore.Add(&wfObj3); err != nil { + panic(err) + } + if err = wfStore.Add(&wfObj5); err != nil { + panic(err) + } + server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore) return server, ctx } @@ -650,26 +675,6 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error { panic("implement me") } -func TestMergeWithArchivedWorkflows(t *testing.T) { - timeNow := time.Now() - wf1Live := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, - Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Archived"}}} - wf1Archived := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, - Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Persisted"}}} - wf2 := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "2", CreationTimestamp: metav1.Time{Time: timeNow.Add(2 * time.Second)}}} - wf3 := v1alpha1.Workflow{ - ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}} - liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}} - archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}} - expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}} - expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}} - assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items) - assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items) -} - func TestWatchWorkflows(t *testing.T) { server, ctx := getWorkflowServer() wf := &v1alpha1.Workflow{ diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index e1a70e3bed7b..2103e7592594 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -6,9 +6,6 @@ import ( "os" "regexp" "sort" - "strconv" - "strings" - "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" @@ -43,108 +40,42 @@ func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatus } func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) { - options := req.ListOptions - namePrefix := req.NamePrefix - if options == nil { - options = &metav1.ListOptions{} - } - if options.Continue == "" { - options.Continue = "0" - } - limit := int(options.Limit) - offset, err := strconv.Atoi(options.Continue) - if err != nil { - // no need to use sutils here - return nil, status.Error(codes.InvalidArgument, "listOptions.continue must be int") - } - if offset < 0 { - // no need to use sutils here - return nil, status.Error(codes.InvalidArgument, "listOptions.continue must >= 0") - } - - // namespace is now specified as its own query parameter - // note that for backward compatibility, the field selector 'metadata.namespace' is also supported for now - namespace := req.Namespace // optional - name := "" - minStartedAt := time.Time{} - maxStartedAt := time.Time{} - showRemainingItemCount := false - for _, selector := range strings.Split(options.FieldSelector, ",") { - if len(selector) == 0 { - continue - } - if strings.HasPrefix(selector, "metadata.namespace=") { - // for backward compatibility, the field selector 'metadata.namespace' is supported for now despite the addition - // of the new 'namespace' query parameter, which is what the UI uses - fieldSelectedNamespace := strings.TrimPrefix(selector, "metadata.namespace=") - switch namespace { - case "": - namespace = fieldSelectedNamespace - case fieldSelectedNamespace: - break - default: - return nil, status.Errorf(codes.InvalidArgument, - "'namespace' query param (%q) and fieldselector 'metadata.namespace' (%q) are both specified and contradict each other", namespace, fieldSelectedNamespace) - } - } else if strings.HasPrefix(selector, "metadata.name=") { - name = strings.TrimPrefix(selector, "metadata.name=") - } else if strings.HasPrefix(selector, "spec.startedAt>") { - minStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt>")) - if err != nil { - // startedAt is populated by us, it should therefore be valid. - return nil, sutils.ToStatusError(err, codes.Internal) - } - } else if strings.HasPrefix(selector, "spec.startedAt<") { - maxStartedAt, err = time.Parse(time.RFC3339, strings.TrimPrefix(selector, "spec.startedAt<")) - if err != nil { - // no need to use sutils here - return nil, sutils.ToStatusError(err, codes.Internal) - } - } else if strings.HasPrefix(selector, "ext.showRemainingItemCount") { - showRemainingItemCount, err = strconv.ParseBool(strings.TrimPrefix(selector, "ext.showRemainingItemCount=")) - if err != nil { - // populated by us, it should therefore be valid. - return nil, sutils.ToStatusError(err, codes.Internal) - } - } else { - return nil, sutils.ToStatusError(fmt.Errorf("unsupported requirement %s", selector), codes.InvalidArgument) - } - } - requirements, err := labels.ParseToRequirements(options.LabelSelector) + options, err := sutils.BuildListOptions(req.ListOptions, req.Namespace, req.NamePrefix) if err != nil { - return nil, sutils.ToStatusError(err, codes.InvalidArgument) + return nil, err } // verify if we have permission to list Workflows - allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, namespace, "") + allowed, err := auth.CanI(ctx, "list", workflow.WorkflowPlural, options.Namespace, "") if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } if !allowed { - return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", namespace, namespace)) + return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("Permission denied, you are not allowed to list workflows in namespace \"%s\". Maybe you want to specify a namespace with query parameter `.namespace=%s`?", options.Namespace, options.Namespace)) } + limit := options.Limit + offset := options.Offset // When the zero value is passed, we should treat this as returning all results // to align ourselves with the behavior of the `List` endpoints in the Kubernetes API loadAll := limit == 0 - limitWithMore := 0 if !loadAll { // Attempt to load 1 more record than we actually need as an easy way to determine whether or not more // records exist than we're currently requesting - limitWithMore = limit + 1 + options.Limit += 1 } - items, err := w.wfArchive.ListWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements, limitWithMore, offset) + items, err := w.wfArchive.ListWorkflows(options) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } meta := metav1.ListMeta{} - if showRemainingItemCount && !loadAll { - total, err := w.wfArchive.CountWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements) + if options.ShowRemainingItemCount && !loadAll { + total, err := w.wfArchive.CountWorkflows(options) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } diff --git a/server/workflowarchive/archived_workflow_server_test.go b/server/workflowarchive/archived_workflow_server_test.go index 50d68d27db7b..7f26bbbb20eb 100644 --- a/server/workflowarchive/archived_workflow_server_test.go +++ b/server/workflowarchive/archived_workflow_server_test.go @@ -13,7 +13,6 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" kubefake "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" @@ -25,6 +24,7 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/server/auth" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/workflow/common" ) @@ -54,18 +54,20 @@ func Test_archivedWorkflowServer(t *testing.T) { }, nil }) // two pages of results for limit 1 - repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil) - repo.On("ListWorkflows", "", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 1).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}, {}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Limit: 2, Offset: 1}).Return(wfv1.Workflows{{}}, nil) minStartAt, _ := time.Parse(time.RFC3339, "2020-01-01T00:00:00Z") maxStartAt, _ := time.Parse(time.RFC3339, "2020-01-02T00:00:00Z") createdTime := metav1.Time{Time: time.Now().UTC()} finishedTime := metav1.Time{Time: createdTime.Add(time.Second * 2)} - repo.On("ListWorkflows", "", "", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "", "my-name", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "", "", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) - repo.On("ListWorkflows", "user-ns", "", "", time.Time{}, time.Time{}, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}, {}}, nil) - repo.On("CountWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil)).Return(int64(5), nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "", NamePrefix: "", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0, ShowRemainingItemCount: true}).Return(wfv1.Workflows{{}}, nil) + repo.On("ListWorkflows", sutils.ListOptions{Namespace: "user-ns", Name: "", NamePrefix: "", MinStartedAt: time.Time{}, MaxStartedAt: time.Time{}, Limit: 2, Offset: 0}).Return(wfv1.Workflows{{}, {}}, nil) + repo.On("CountWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0}).Return(int64(5), nil) + repo.On("CountWorkflows", sutils.ListOptions{Namespace: "", Name: "my-name", NamePrefix: "my-", MinStartedAt: minStartAt, MaxStartedAt: maxStartAt, Limit: 2, Offset: 0, ShowRemainingItemCount: true}).Return(int64(5), nil) repo.On("GetWorkflow", "", "", "").Return(nil, nil) repo.On("GetWorkflow", "my-uid", "", "").Return(&wfv1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "my-name"}, diff --git a/test/e2e/fixtures/e2e_suite.go b/test/e2e/fixtures/e2e_suite.go index f8b06aefa01b..6b415cbc408c 100644 --- a/test/e2e/fixtures/e2e_suite.go +++ b/test/e2e/fixtures/e2e_suite.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/util/secrets" "github.com/TwiN/go-color" @@ -155,7 +156,10 @@ func (s *E2ESuite) DeleteResources() { archive := s.Persistence.workflowArchive parse, err := labels.ParseToRequirements(Label) s.CheckError(err) - workflows, err := archive.ListWorkflows(Namespace, "", "", time.Time{}, time.Time{}, parse, 0, 0) + workflows, err := archive.ListWorkflows(utils.ListOptions{ + Namespace: Namespace, + LabelRequirements: parse, + }) s.CheckError(err) for _, w := range workflows { err := archive.DeleteWorkflow(string(w.UID)) diff --git a/workflow/controller/estimation/estimator_factory.go b/workflow/controller/estimation/estimator_factory.go index 60311eb8f5c3..984e76d98cdc 100644 --- a/workflow/controller/estimation/estimator_factory.go +++ b/workflow/controller/estimation/estimator_factory.go @@ -2,7 +2,6 @@ package estimation import ( "fmt" - "time" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -10,6 +9,7 @@ import ( "github.com/argoproj/argo-workflows/v3/persist/sqldb" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" "github.com/argoproj/argo-workflows/v3/workflow/hydrator" @@ -76,7 +76,13 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) { if err != nil { return defaultEstimator, fmt.Errorf("failed to parse selector to requirements: %v", err) } - workflows, err := f.wfArchive.ListWorkflows(wf.Namespace, "", "", time.Time{}, time.Time{}, requirements, 1, 0) + workflows, err := f.wfArchive.ListWorkflows( + utils.ListOptions{ + Namespace: wf.Namespace, + LabelRequirements: requirements, + Limit: 1, + Offset: 0, + }) if err != nil { return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err) } diff --git a/workflow/controller/estimation/estimator_factory_test.go b/workflow/controller/estimation/estimator_factory_test.go index aeef2a7128c3..c4bb0ab06981 100644 --- a/workflow/controller/estimation/estimator_factory_test.go +++ b/workflow/controller/estimation/estimator_factory_test.go @@ -2,7 +2,6 @@ package estimation import ( "testing" - "time" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -10,6 +9,7 @@ import ( sqldbmocks "github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/server/utils" testutil "github.com/argoproj/argo-workflows/v3/test/util" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" @@ -53,7 +53,11 @@ metadata: wfArchive := &sqldbmocks.WorkflowArchive{} r, err := labels.ParseToRequirements("workflows.argoproj.io/phase=Succeeded,workflows.argoproj.io/workflow-template=my-archived-wftmpl") assert.NoError(t, err) - wfArchive.On("ListWorkflows", "my-ns", "", "", time.Time{}, time.Time{}, labels.Requirements(r), 1, 0).Return(wfv1.Workflows{ + wfArchive.On("ListWorkflows", utils.ListOptions{ + Namespace: "my-ns", + LabelRequirements: r, + Limit: 1, + }).Return(wfv1.Workflows{ *testutil.MustUnmarshalWorkflow(` metadata: name: my-archived-wftmpl-baseline`),