diff --git a/cmd/internal/exec.go b/cmd/internal/exec.go index 6effc3b..c66a5c8 100644 --- a/cmd/internal/exec.go +++ b/cmd/internal/exec.go @@ -239,15 +239,6 @@ func authRequired(ctx *context.Context, rootExec *executable.Executable) bool { return true } } - for _, child := range rootExec.Serial.Refs { - childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child) - if err != nil { - continue - } - if authRequired(ctx, childExec) { - return true - } - } for _, e := range rootExec.Serial.Execs { if e.Ref != "" { childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, e.Ref) @@ -265,15 +256,6 @@ func authRequired(ctx *context.Context, rootExec *executable.Executable) bool { return true } } - for _, child := range rootExec.Parallel.Refs { - childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child) - if err != nil { - continue - } - if authRequired(ctx, childExec) { - return true - } - } for _, e := range rootExec.Parallel.Execs { if e.Ref != "" { childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, e.Ref) @@ -289,7 +271,7 @@ func authRequired(ctx *context.Context, rootExec *executable.Executable) bool { return false } -//nolint:gocognit,funlen +//nolint:gocognit func pendingFormFields(ctx *context.Context, rootExec *executable.Executable) []*views.FormField { pending := make([]*views.FormField, 0) switch { @@ -323,14 +305,6 @@ func pendingFormFields(ctx *context.Context, rootExec *executable.Executable) [] pending = append(pending, &views.FormField{Key: param.EnvKey, Title: param.Prompt}) } } - for _, child := range rootExec.Serial.Refs { - childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child) - if err != nil { - continue - } - childPending := pendingFormFields(ctx, childExec) - pending = append(pending, childPending...) - } for _, child := range rootExec.Serial.Execs { if child.Ref != "" { childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child.Ref) @@ -347,14 +321,6 @@ func pendingFormFields(ctx *context.Context, rootExec *executable.Executable) [] pending = append(pending, &views.FormField{Key: param.EnvKey, Title: param.Prompt}) } } - for _, child := range rootExec.Parallel.Refs { - childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child) - if err != nil { - continue - } - childPending := pendingFormFields(ctx, childExec) - pending = append(pending, childPending...) - } for _, child := range rootExec.Parallel.Execs { if child.Ref != "" { childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child.Ref) diff --git a/docs/schemas/flowfile_schema.json b/docs/schemas/flowfile_schema.json index 4b01667..3e34866 100644 --- a/docs/schemas/flowfile_schema.json +++ b/docs/schemas/flowfile_schema.json @@ -206,7 +206,7 @@ "ExecutableParallelExecutableType": { "type": "object", "required": [ - "refs" + "execs" ], "properties": { "args": { @@ -214,7 +214,7 @@ }, "execs": { "$ref": "#/definitions/ExecutableParallelRefConfigList", - "description": "A list of executables to run in parallel.\nEach executable can be a command or a reference to another executable.\nOne of `refs` or `execs` must be set.\n" + "description": "A list of executables to run in parallel.\nEach executable can be a command or a reference to another executable.\n" }, "failFast": { "description": "If set to true, the parallel executable will fail if any of the sub-executables fail.", @@ -228,10 +228,6 @@ }, "params": { "$ref": "#/definitions/ExecutableParameterList" - }, - "refs": { - "$ref": "#/definitions/ExecutableRefList", - "description": "DEPRECATED: Use `execs` instead.\n\nA list of references to other executables to run in parallel.\nOne of `refs` or `execs` must be set.\n" } } }, @@ -310,12 +306,6 @@ "description": "A reference to an executable.\nThe format is `\u003cverb\u003e \u003cworkspace\u003e/\u003cnamespace\u003e:\u003cexecutable name\u003e`.\nFor example, `exec ws/ns:my-workflow`.\n\nThe workspace and namespace are optional.\nIf the workspace is not specified, the current workspace will be used.\nIf the namespace is not specified, the current namespace will be used.\n", "type": "string" }, - "ExecutableRefList": { - "type": "array", - "items": { - "$ref": "#/definitions/ExecutableRef" - } - }, "ExecutableRenderExecutableType": { "description": "Renders a markdown template file with data.", "type": "object", @@ -450,7 +440,7 @@ "description": "Executes a list of executables in serial.", "type": "object", "required": [ - "refs" + "execs" ], "properties": { "args": { @@ -458,7 +448,7 @@ }, "execs": { "$ref": "#/definitions/ExecutableSerialRefConfigList", - "description": "A list of executables to run in serial.\nEach executable can be a command or a reference to another executable.\nOne of `refs` or `execs` must be set.\n" + "description": "A list of executables to run in serial.\nEach executable can be a command or a reference to another executable.\n" }, "failFast": { "description": "If set to true, the serial executable will fail if any of the sub-executables fail.", @@ -467,10 +457,6 @@ }, "params": { "$ref": "#/definitions/ExecutableParameterList" - }, - "refs": { - "$ref": "#/definitions/ExecutableRefList", - "description": "DEPRECATED: Use `execs` instead.\n\nA list of references to other executables to run in serial.\nOne of `refs` or `execs` must be set.\n" } } }, @@ -559,7 +545,8 @@ "items": { "type": "string" } - } + }, + "Ref": {} }, "properties": { "description": { diff --git a/docs/types/flowfile.md b/docs/types/flowfile.md index e67c3fe..31263ea 100644 --- a/docs/types/flowfile.md +++ b/docs/types/flowfile.md @@ -187,11 +187,10 @@ Launches an application or opens a URI. | Field | Description | Type | Default | Required | | ----- | ----------- | ---- | ------- | :--------: | | `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | | | -| `execs` | A list of executables to run in parallel. Each executable can be a command or a reference to another executable. One of `refs` or `execs` must be set. | [ExecutableParallelRefConfigList](#ExecutableParallelRefConfigList) | | | +| `execs` | A list of executables to run in parallel. Each executable can be a command or a reference to another executable. | [ExecutableParallelRefConfigList](#ExecutableParallelRefConfigList) | | ✘ | | `failFast` | If set to true, the parallel executable will fail if any of the sub-executables fail. | `boolean` | false | | | `maxThreads` | The maximum number of threads to use when executing the parallel executables. | `integer` | 5 | | | `params` | | [ExecutableParameterList](#ExecutableParameterList) | | | -| `refs` | DEPRECATED: Use `execs` instead. A list of references to other executables to run in parallel. One of `refs` or `execs` must be set. | [ExecutableRefList](#ExecutableRefList) | | ✘ | ### ExecutableParallelRefConfig @@ -264,15 +263,6 @@ If the namespace is not specified, the current namespace will be used. -### ExecutableRefList - - - -**Type:** `array` ([ExecutableRef](#ExecutableRef)) - - - - ### ExecutableRenderExecutableType Renders a markdown template file with data. @@ -344,10 +334,9 @@ Executes a list of executables in serial. | Field | Description | Type | Default | Required | | ----- | ----------- | ---- | ------- | :--------: | | `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | | | -| `execs` | A list of executables to run in serial. Each executable can be a command or a reference to another executable. One of `refs` or `execs` must be set. | [ExecutableSerialRefConfigList](#ExecutableSerialRefConfigList) | | | +| `execs` | A list of executables to run in serial. Each executable can be a command or a reference to another executable. | [ExecutableSerialRefConfigList](#ExecutableSerialRefConfigList) | | ✘ | | `failFast` | If set to true, the serial executable will fail if any of the sub-executables fail. | `boolean` | false | | | `params` | | [ExecutableParameterList](#ExecutableParameterList) | | | -| `refs` | DEPRECATED: Use `execs` instead. A list of references to other executables to run in serial. One of `refs` or `execs` must be set. | [ExecutableRefList](#ExecutableRefList) | | ✘ | ### ExecutableSerialRefConfig @@ -446,4 +435,13 @@ A list of `.sh` files to convert into generated executables in the file's execut +### Ref + + + + + + + + diff --git a/internal/runner/parallel/parallel.go b/internal/runner/parallel/parallel.go index 6bff8bb..0bedfc1 100644 --- a/internal/runner/parallel/parallel.go +++ b/internal/runner/parallel/parallel.go @@ -10,7 +10,6 @@ import ( "github.com/jahvon/flow/internal/context" "github.com/jahvon/flow/internal/runner" - "github.com/jahvon/flow/internal/utils" argUtils "github.com/jahvon/flow/internal/utils/args" execUtils "github.com/jahvon/flow/internal/utils/executables" "github.com/jahvon/flow/types/executable" @@ -39,70 +38,13 @@ func (r *parallelRunner) Exec(ctx *context.Context, e *executable.Executable, pr return errors.Wrap(err, "unable to set parameters to env") } - if err := utils.ValidateOneOf("executable list", parallelSpec.Refs, parallelSpec.Execs); err != nil { - return err - } - - if len(parallelSpec.Refs) > 0 { - return handleExecRef(ctx, parallelSpec, promptedEnv) - } else if len(parallelSpec.Execs) > 0 { + if len(parallelSpec.Execs) > 0 { return handleExec(ctx, e, parallelSpec, promptedEnv) } return fmt.Errorf("no parallel executables to run") } -func handleExecRef( - ctx *context.Context, parallelSpec *executable.ParallelExecutableType, promptedEnv map[string]string, -) error { - refs := parallelSpec.Refs - groupCtx, cancel := stdCtx.WithCancel(ctx.Ctx) - defer cancel() - group, _ := errgroup.WithContext(groupCtx) - limit := parallelSpec.MaxThreads - if limit == 0 { - limit = 5 - } - group.SetLimit(limit) - var errs []error - for _, ref := range refs { - ref = context.ExpandRef(ctx, ref) - exec, err := execUtils.ExecutableForRef(ctx, ref) - if err != nil { - return err - } - - fields := map[string]interface{}{ - "step": exec.ID(), - } - exec.Exec.SetLogFields(fields) - - group.Go(func() error { - if parallelSpec.FailFast { - if err := runner.Exec(ctx, exec, promptedEnv); err != nil { - cancel() - return err - } - } else { - err := runner.Exec(ctx, exec, promptedEnv) - if err != nil { - errs = append(errs, err) - ctx.Logger.Errorx("execution error", "err", err, "ref", exec.Ref()) - } - } - return nil - }) - } - if err := group.Wait(); err != nil { - return errors.Wrap(err, "parallel execution error") - } - - if len(errs) > 0 { - return fmt.Errorf("%d execution errors - %v", len(errs), errs) - } - return nil -} - //nolint:gocognit func handleExec( ctx *context.Context, parent *executable.Executable, diff --git a/internal/runner/parallel/parallel_test.go b/internal/runner/parallel/parallel_test.go index cc121ab..a630162 100644 --- a/internal/runner/parallel/parallel_test.go +++ b/internal/runner/parallel/parallel_test.go @@ -61,101 +61,7 @@ var _ = Describe("ParallelRunner", func() { }) }) - When("Executables with ref", func() { - var ( - rootExec *executable.Executable - subExecs executable.ExecutableList - ) - - BeforeEach(func() { - ns := "examples" - rootExec = builder.ParallelExecByRef( - builder.WithNamespaceName(ns), - builder.WithWorkspaceName(ctx.Ctx.CurrentWorkspace.AssignedName()), - builder.WithWorkspacePath(ctx.Ctx.CurrentWorkspace.Location()), - ) - execFlowfile := builder.ExamplesExecFlowFile( - builder.WithNamespaceName(ns), - builder.WithWorkspaceName(ctx.Ctx.CurrentWorkspace.AssignedName()), - builder.WithWorkspacePath(ctx.Ctx.CurrentWorkspace.Location()), - ) - subExecs = testUtils.FindSubExecs(rootExec, executable.FlowFileList{execFlowfile}) - - runner.RegisterRunner(parallelRnr) - runner.RegisterRunner(ctx.RunnerMock) - ctx.RunnerMock.EXPECT().IsCompatible(rootExec).Return(false).AnyTimes() - }) - - It("should execute all sub execs", func() { - promptedEnv := make(map[string]string) - mockRunner := ctx.RunnerMock - mockCache := ctx.ExecutableCache - - for _, e := range subExecs { - isParallelExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(nil).Times(1) - } - - rootExec.Parallel.MaxThreads = 1 - Expect(parallelRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).To(Succeed()) - - }) - - It("should fail fast when enabled", func() { - promptedEnv := make(map[string]string) - mockRunner := ctx.RunnerMock - mockCache := ctx.ExecutableCache - - for i, e := range subExecs { - switch i { - case 0, 2: - isParallelExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).MaxTimes(1) - mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).MaxTimes(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(nil).MaxTimes(1) - case 1: - isParallelExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(errors.New("error")).Times(1) - } - } - - rootExec.Parallel.FailFast = true - Expect(parallelRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).ToNot(Succeed()) - }) - - It("should not fail fast when disabled", func() { - promptedEnv := make(map[string]string) - mockRunner := ctx.RunnerMock - mockCache := ctx.ExecutableCache - mockLogger := ctx.Logger - - for i, e := range subExecs { - switch i { - case 0, 2: - isParallelExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(nil).Times(1) - case 1: - isParallelExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv). - Return(errors.New("error")).Times(1) - mockLogger.EXPECT(). - Errorx("execution error", "err", gomock.Any(), "ref", e.Ref()).Times(1) - } - } - - Expect(parallelRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).ToNot(Succeed()) - }) - }) - - When("Executables with ref config", func() { + When("Exec", func() { var ( rootExec *executable.Executable subExecs executable.ExecutableList diff --git a/internal/runner/serial/serial.go b/internal/runner/serial/serial.go index e986fa3..d90bb68 100644 --- a/internal/runner/serial/serial.go +++ b/internal/runner/serial/serial.go @@ -10,7 +10,6 @@ import ( "github.com/jahvon/flow/internal/context" "github.com/jahvon/flow/internal/runner" - "github.com/jahvon/flow/internal/utils" argUtils "github.com/jahvon/flow/internal/utils/args" execUtils "github.com/jahvon/flow/internal/utils/executables" "github.com/jahvon/flow/types/executable" @@ -39,54 +38,12 @@ func (r *serialRunner) Exec(ctx *context.Context, e *executable.Executable, prom return errors.Wrap(err, "unable to set parameters to env") } - if err := utils.ValidateOneOf("executable list", serialSpec.Refs, serialSpec.Execs); err != nil { - return err - } - - if len(serialSpec.Refs) > 0 { - return handleExecRef(ctx, serialSpec, promptedEnv) - } else if len(serialSpec.Execs) > 0 { + if len(serialSpec.Execs) > 0 { return handleExec(ctx, e, serialSpec, promptedEnv) } return fmt.Errorf("no serial executables to run") } -func handleExecRef( - ctx *context.Context, - serialSpec *executable.SerialExecutableType, - promptedEnv map[string]string, -) error { - order := serialSpec.Refs - - var errs []error - for i, executableRef := range order { - ctx.Logger.Debugf("executing %s (%d/%d)", executableRef, i+1, len(order)) - exec, err := execUtils.ExecutableForRef(ctx, executableRef) - if err != nil { - return err - } else if exec == nil { - return fmt.Errorf("unable to find e with reference %s", executableRef) - } - - fields := map[string]interface{}{ - "step": exec.ID(), - } - exec.Exec.SetLogFields(fields) - - if err := runner.Exec(ctx, exec, promptedEnv); err != nil { - if serialSpec.FailFast { - return errors.Wrapf(err, "execution error ref='%s'", executableRef) - } - errs = append(errs, err) - ctx.Logger.Errorx("execution error", "err", err, "ref", exec.Ref()) - } - } - if len(errs) > 0 { - return fmt.Errorf("%d execution errors - %v", len(errs), errs) - } - return nil -} - //nolint:gocognit func handleExec( ctx *context.Context, diff --git a/internal/runner/serial/serial_test.go b/internal/runner/serial/serial_test.go index 39b224f..a793774 100644 --- a/internal/runner/serial/serial_test.go +++ b/internal/runner/serial/serial_test.go @@ -61,99 +61,7 @@ var _ = Describe("SerialRunner", func() { }) }) - When("Executables with ref", func() { - var ( - rootExec *executable.Executable - subExecs executable.ExecutableList - ) - - BeforeEach(func() { - ns := "examples" - rootExec = builder.SerialExecByRef( - builder.WithNamespaceName(ns), - builder.WithWorkspaceName(ctx.Ctx.CurrentWorkspace.AssignedName()), - builder.WithWorkspacePath(ctx.Ctx.CurrentWorkspace.Location()), - ) - execFlowfile := builder.ExamplesExecFlowFile( - builder.WithNamespaceName(ns), - builder.WithWorkspaceName(ctx.Ctx.CurrentWorkspace.AssignedName()), - builder.WithWorkspacePath(ctx.Ctx.CurrentWorkspace.Location()), - ) - subExecs = testUtils.FindSubExecs(rootExec, executable.FlowFileList{execFlowfile}) - - runner.RegisterRunner(serialRnr) - runner.RegisterRunner(ctx.RunnerMock) - ctx.RunnerMock.EXPECT().IsCompatible(rootExec).Return(false).AnyTimes() - }) - - It("should execute in order", func() { - promptedEnv := make(map[string]string) - mockRunner := ctx.RunnerMock - mockCache := ctx.ExecutableCache - - for _, e := range subExecs { - isSerialExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isSerialExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isSerialExec, promptedEnv).Return(nil).Times(1) - } - - Expect(serialRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).To(Succeed()) - }) - - It("should fail fast when enabled", func() { - promptedEnv := make(map[string]string) - mockRunner := ctx.RunnerMock - mockCache := ctx.ExecutableCache - - for i, e := range subExecs { - switch i { - case 0: - isSerialExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isSerialExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isSerialExec, promptedEnv).Return(nil).Times(1) - case 1: - isSerialExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isSerialExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isSerialExec, promptedEnv).Return(errors.New("error")).Times(1) - } - } - - rootExec.Serial.FailFast = true - Expect(serialRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).ToNot(Succeed()) - }) - - It("should not fail fast when disabled", func() { - promptedEnv := make(map[string]string) - mockRunner := ctx.RunnerMock - mockLogger := ctx.Logger - mockCache := ctx.ExecutableCache - - for i, e := range subExecs { - switch i { - case 0, 2: - isSerialExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isSerialExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isSerialExec, promptedEnv).Return(nil).Times(1) - case 1: - isSerialExec := testUtils.ExecWithRef(e.Ref()) - mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) - mockRunner.EXPECT().IsCompatible(isSerialExec).Return(true).Times(1) - mockRunner.EXPECT().Exec(ctx.Ctx, isSerialExec, promptedEnv).Return(errors.New("error")).Times(1) - mockLogger.EXPECT(). - Errorx("execution error", "err", gomock.Any(), "ref", e.Ref()). - Times(1) - } - } - - Expect(serialRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).ToNot(Succeed()) - }) - }) - - When("Executables with ref configs", func() { + When("Exec", func() { var ( rootExec *executable.Executable subExecs executable.ExecutableList diff --git a/tests/utils/sub_execs.go b/tests/utils/sub_execs.go index e1bfcbf..280280d 100644 --- a/tests/utils/sub_execs.go +++ b/tests/utils/sub_execs.go @@ -20,14 +20,6 @@ func FindSubExecs(rootExec *executable.Executable, flowFiles executable.FlowFile func findSerialSubExecs(root *executable.Executable, flowFiles executable.FlowFileList) []*executable.Executable { serial := root.Serial var subExecs []*executable.Executable - for _, ref := range serial.Refs { - for _, flowFile := range flowFiles { - if e, _ := flowFile.Executables.FindByVerbAndID(ref.Verb(), ref.ID()); e != nil { - subExecs = append(subExecs, e) - break - } - } - } for i, refCfg := range serial.Execs { if refCfg.Cmd != "" { subExecs = append(subExecs, execUtils.ExecutableForCmd(root, refCfg.Cmd, i)) @@ -46,14 +38,6 @@ func findSerialSubExecs(root *executable.Executable, flowFiles executable.FlowFi func findParallelSubExecs(root *executable.Executable, flowFiles executable.FlowFileList) []*executable.Executable { parallel := root.Parallel var subExecs []*executable.Executable - for _, ref := range parallel.Refs { - for _, flowFile := range flowFiles { - if e, _ := flowFile.Executables.FindByVerbAndID(ref.Verb(), ref.ID()); e != nil { - subExecs = append(subExecs, e) - break - } - } - } for i, refCfg := range parallel.Execs { if refCfg.Cmd != "" { subExecs = append(subExecs, execUtils.ExecutableForCmd(root, refCfg.Cmd, i)) diff --git a/tools/builder/flowfile.go b/tools/builder/flowfile.go index d95897c..908e378 100644 --- a/tools/builder/flowfile.go +++ b/tools/builder/flowfile.go @@ -33,9 +33,7 @@ func ExamplesMultiExecFlowFile(opts ...Option) *executable.FlowFile { Visibility: privateFlowFileVisibility(), Tags: sharedExecTags(), Executables: []*executable.Executable{ - SerialExecByRef(opts...), SerialExecWithExit(opts...), - ParallelExecByRef(opts...), ParallelExecWithExit(opts...), ParallelExecWithMaxThreads(opts...), }, diff --git a/tools/builder/parallel.go b/tools/builder/parallel.go index 9ca1cf3..9ca5ccd 100644 --- a/tools/builder/parallel.go +++ b/tools/builder/parallel.go @@ -1,4 +1,3 @@ -//nolint:lll package builder import ( @@ -9,30 +8,6 @@ const ( parallelBaseDesc = "Multiple executables can be run concurrently using a parallel executable." ) -func ParallelExecByRef(opts ...Option) *executable.Executable { - name := "parallel" - docstring := parallelBaseDesc + - "The `refs` field is required and must be a valid reference to an executable.\n" + - "The environment derived from parameters and arguments on the root executable is inherited by all the sub-executables.\n" - e1 := ExecWithPauses(opts...) - e2 := ExecWithPauses(opts...) - e3 := ExecWithPauses(opts...) - e := &executable.Executable{ - Verb: "start", - Name: name, - Visibility: privateExecVisibility(), - Description: docstring, - Parallel: &executable.ParallelExecutableType{ - Refs: []executable.Ref{e1.Ref(), e2.Ref(), e3.Ref()}, - }, - } - if len(opts) > 0 { - vals := NewOptionValues(opts...) - e.SetContext(vals.WorkspaceName, vals.WorkspacePath, vals.NamespaceName, vals.FlowFilePath) - } - return e -} - func ParallelExecByRefConfig(opts ...Option) *executable.Executable { name := "parallel-config" e1 := SimpleExec(opts...) @@ -73,7 +48,7 @@ func ParallelExecWithExit(opts ...Option) *executable.Executable { "The `failFast` option can be set to `true` to stop the flow if a sub-executable fails.", Parallel: &executable.ParallelExecutableType{ FailFast: true, - Refs: executable.RefList{e1.Ref(), e2.Ref(), e3.Ref()}, + Execs: executable.ParallelRefConfigList{{Ref: e1.Ref()}, {Ref: e2.Ref()}, {Ref: e3.Ref()}}, }, } if len(opts) > 0 { @@ -84,7 +59,7 @@ func ParallelExecWithExit(opts ...Option) *executable.Executable { } func ParallelExecWithMaxThreads(opts ...Option) *executable.Executable { - e := ParallelExecByRef(opts...) + e := ParallelExecByRefConfig(opts...) e.Description = parallelBaseDesc + "\n\nThe `maxThreads` option can be set to limit the number of concurrent executions." e.Parallel.MaxThreads = 1 diff --git a/tools/builder/serial.go b/tools/builder/serial.go index e718aef..171aeb4 100644 --- a/tools/builder/serial.go +++ b/tools/builder/serial.go @@ -1,4 +1,3 @@ -//nolint:lll package builder import ( @@ -9,30 +8,6 @@ const ( serialBaseDesc = "Multiple executables can be run in sequence using a serial executable.\n" ) -func SerialExecByRef(opts ...Option) *executable.Executable { - name := "serial" - docstring := serialBaseDesc + - "The `refs` field is required and must be a valid reference to an executable.\n" + - "The environment derived from parameters and arguments on the root executable is inherited by all the sub-executables." - e1 := SimpleExec(opts...) - e2 := SimpleExec(opts...) - e3 := SimpleExec(opts...) - e := &executable.Executable{ - Verb: "start", - Name: name, - Visibility: privateExecVisibility(), - Description: docstring, - Serial: &executable.SerialExecutableType{ - Refs: executable.RefList{e1.Ref(), e2.Ref(), e3.Ref()}, - }, - } - if len(opts) > 0 { - vals := NewOptionValues(opts...) - e.SetContext(vals.WorkspaceName, vals.WorkspacePath, vals.NamespaceName, vals.FlowFilePath) - } - return e -} - func SerialExecByRefConfig(opts ...Option) *executable.Executable { name := "serial-config" e1 := SimpleExec(opts...) @@ -73,7 +48,7 @@ func SerialExecWithExit(opts ...Option) *executable.Executable { "The `failFast` option can be set to `true` to stop the executable if a sub-executable fails.", Serial: &executable.SerialExecutableType{ FailFast: true, - Refs: executable.RefList{e1.Ref(), e2.Ref(), e3.Ref()}, + Execs: []executable.SerialRefConfig{{Ref: e1.Ref()}, {Ref: e2.Ref()}, {Ref: e3.Ref()}}, }, } if len(opts) > 0 { diff --git a/types/executable/executable.gen.go b/types/executable/executable.gen.go index 7b23cbc..e30f43a 100644 --- a/types/executable/executable.gen.go +++ b/types/executable/executable.gen.go @@ -191,9 +191,8 @@ type ParallelExecutableType struct { // A list of executables to run in parallel. // Each executable can be a command or a reference to another executable. - // One of `refs` or `execs` must be set. // - Execs ParallelRefConfigList `json:"execs,omitempty" yaml:"execs,omitempty" mapstructure:"execs,omitempty"` + Execs ParallelRefConfigList `json:"execs" yaml:"execs" mapstructure:"execs"` // If set to true, the parallel executable will fail if any of the sub-executables // fail. @@ -204,13 +203,6 @@ type ParallelExecutableType struct { // Params corresponds to the JSON schema field "params". Params ParameterList `json:"params,omitempty" yaml:"params,omitempty" mapstructure:"params,omitempty"` - - // DEPRECATED: Use `execs` instead. - // - // A list of references to other executables to run in parallel. - // One of `refs` or `execs` must be set. - // - Refs RefList `json:"refs" yaml:"refs" mapstructure:"refs"` } // Configuration for a parallel executable. @@ -362,9 +354,8 @@ type SerialExecutableType struct { // A list of executables to run in serial. // Each executable can be a command or a reference to another executable. - // One of `refs` or `execs` must be set. // - Execs SerialRefConfigList `json:"execs,omitempty" yaml:"execs,omitempty" mapstructure:"execs,omitempty"` + Execs SerialRefConfigList `json:"execs" yaml:"execs" mapstructure:"execs"` // If set to true, the serial executable will fail if any of the sub-executables // fail. @@ -372,13 +363,6 @@ type SerialExecutableType struct { // Params corresponds to the JSON schema field "params". Params ParameterList `json:"params,omitempty" yaml:"params,omitempty" mapstructure:"params,omitempty"` - - // DEPRECATED: Use `execs` instead. - // - // A list of references to other executables to run in serial. - // One of `refs` or `execs` must be set. - // - Refs RefList `json:"refs" yaml:"refs" mapstructure:"refs"` } // Configuration for a serial executable. diff --git a/types/executable/executable_md.go b/types/executable/executable_md.go index b7a3364..5ae4c2f 100644 --- a/types/executable/executable_md.go +++ b/types/executable/executable_md.go @@ -193,9 +193,6 @@ func serialExecMarkdown(e *ExecutableEnvironment, s *SerialExecutableType) strin mkdwn += "**Fail Fast:** enabled\n" } mkdwn += "**Executables**\n" - for i, ref := range s.Refs { - mkdwn += fmt.Sprintf("%d. %s\n", i+1, ref) - } for i, refCfg := range s.Execs { if refCfg.Ref != "" { mkdwn += fmt.Sprintf("%d. ref: %s\n", i+1, refCfg.Ref) @@ -231,9 +228,6 @@ func parallelExecMarkdown(e *ExecutableEnvironment, p *ParallelExecutableType) s mkdwn += "**Fail Fast:** enabled\n" } mkdwn += "**Executables**\n" - for _, ref := range p.Refs { - mkdwn += fmt.Sprintf("- %s\n", ref) - } for i, refCfg := range p.Execs { if refCfg.Ref != "" { mkdwn += fmt.Sprintf("%d. ref: %s\n", i+1, refCfg.Ref) diff --git a/types/executable/executable_schema.yaml b/types/executable/executable_schema.yaml index 2055777..3568e16 100644 --- a/types/executable/executable_schema.yaml +++ b/types/executable/executable_schema.yaml @@ -239,25 +239,17 @@ definitions: ParallelExecutableType: type: object - required: [refs] + required: [execs] properties: params: $ref: '#/definitions/ParameterList' args: $ref: '#/definitions/ArgumentList' - refs: - $ref: '#/definitions/RefList' - description: | - DEPRECATED: Use `execs` instead. - - A list of references to other executables to run in parallel. - One of `refs` or `execs` must be set. execs: $ref: '#/definitions/ParallelRefConfigList' description: | A list of executables to run in parallel. Each executable can be a command or a reference to another executable. - One of `refs` or `execs` must be set. maxThreads: type: integer description: The maximum number of threads to use when executing the parallel executables. @@ -402,26 +394,18 @@ definitions: SerialExecutableType: type: object - required: [refs] + required: [execs] description: Executes a list of executables in serial. properties: params: $ref: '#/definitions/ParameterList' args: $ref: '#/definitions/ArgumentList' - refs: - $ref: '#/definitions/RefList' - description: | - DEPRECATED: Use `execs` instead. - - A list of references to other executables to run in serial. - One of `refs` or `execs` must be set. execs: $ref: '#/definitions/SerialRefConfigList' description: | A list of executables to run in serial. Each executable can be a command or a reference to another executable. - One of `refs` or `execs` must be set. failFast: type: boolean description: If set to true, the serial executable will fail if any of the sub-executables fail.