Skip to content

Commit

Permalink
refactor!: remove deprecated refs fields (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
jahvon authored Oct 10, 2024
1 parent 015e778 commit d9eb251
Show file tree
Hide file tree
Showing 14 changed files with 29 additions and 471 deletions.
36 changes: 1 addition & 35 deletions cmd/internal/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,6 @@ func authRequired(ctx *context.Context, rootExec *executable.Executable) bool {
return true
}
}
for _, child := range rootExec.Serial.Refs {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child)
if err != nil {
continue
}
if authRequired(ctx, childExec) {
return true
}
}
for _, e := range rootExec.Serial.Execs {
if e.Ref != "" {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, e.Ref)
Expand All @@ -265,15 +256,6 @@ func authRequired(ctx *context.Context, rootExec *executable.Executable) bool {
return true
}
}
for _, child := range rootExec.Parallel.Refs {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child)
if err != nil {
continue
}
if authRequired(ctx, childExec) {
return true
}
}
for _, e := range rootExec.Parallel.Execs {
if e.Ref != "" {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, e.Ref)
Expand All @@ -289,7 +271,7 @@ func authRequired(ctx *context.Context, rootExec *executable.Executable) bool {
return false
}

//nolint:gocognit,funlen
//nolint:gocognit
func pendingFormFields(ctx *context.Context, rootExec *executable.Executable) []*views.FormField {
pending := make([]*views.FormField, 0)
switch {
Expand Down Expand Up @@ -323,14 +305,6 @@ func pendingFormFields(ctx *context.Context, rootExec *executable.Executable) []
pending = append(pending, &views.FormField{Key: param.EnvKey, Title: param.Prompt})
}
}
for _, child := range rootExec.Serial.Refs {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child)
if err != nil {
continue
}
childPending := pendingFormFields(ctx, childExec)
pending = append(pending, childPending...)
}
for _, child := range rootExec.Serial.Execs {
if child.Ref != "" {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child.Ref)
Expand All @@ -347,14 +321,6 @@ func pendingFormFields(ctx *context.Context, rootExec *executable.Executable) []
pending = append(pending, &views.FormField{Key: param.EnvKey, Title: param.Prompt})
}
}
for _, child := range rootExec.Parallel.Refs {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child)
if err != nil {
continue
}
childPending := pendingFormFields(ctx, childExec)
pending = append(pending, childPending...)
}
for _, child := range rootExec.Parallel.Execs {
if child.Ref != "" {
childExec, err := ctx.ExecutableCache.GetExecutableByRef(ctx.Logger, child.Ref)
Expand Down
25 changes: 6 additions & 19 deletions docs/schemas/flowfile_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@
"ExecutableParallelExecutableType": {
"type": "object",
"required": [
"refs"
"execs"
],
"properties": {
"args": {
"$ref": "#/definitions/ExecutableArgumentList"
},
"execs": {
"$ref": "#/definitions/ExecutableParallelRefConfigList",
"description": "A list of executables to run in parallel.\nEach executable can be a command or a reference to another executable.\nOne of `refs` or `execs` must be set.\n"
"description": "A list of executables to run in parallel.\nEach executable can be a command or a reference to another executable.\n"
},
"failFast": {
"description": "If set to true, the parallel executable will fail if any of the sub-executables fail.",
Expand All @@ -228,10 +228,6 @@
},
"params": {
"$ref": "#/definitions/ExecutableParameterList"
},
"refs": {
"$ref": "#/definitions/ExecutableRefList",
"description": "DEPRECATED: Use `execs` instead.\n\nA list of references to other executables to run in parallel.\nOne of `refs` or `execs` must be set.\n"
}
}
},
Expand Down Expand Up @@ -310,12 +306,6 @@
"description": "A reference to an executable.\nThe format is `\u003cverb\u003e \u003cworkspace\u003e/\u003cnamespace\u003e:\u003cexecutable name\u003e`.\nFor example, `exec ws/ns:my-workflow`.\n\nThe workspace and namespace are optional.\nIf the workspace is not specified, the current workspace will be used.\nIf the namespace is not specified, the current namespace will be used.\n",
"type": "string"
},
"ExecutableRefList": {
"type": "array",
"items": {
"$ref": "#/definitions/ExecutableRef"
}
},
"ExecutableRenderExecutableType": {
"description": "Renders a markdown template file with data.",
"type": "object",
Expand Down Expand Up @@ -450,15 +440,15 @@
"description": "Executes a list of executables in serial.",
"type": "object",
"required": [
"refs"
"execs"
],
"properties": {
"args": {
"$ref": "#/definitions/ExecutableArgumentList"
},
"execs": {
"$ref": "#/definitions/ExecutableSerialRefConfigList",
"description": "A list of executables to run in serial.\nEach executable can be a command or a reference to another executable.\nOne of `refs` or `execs` must be set.\n"
"description": "A list of executables to run in serial.\nEach executable can be a command or a reference to another executable.\n"
},
"failFast": {
"description": "If set to true, the serial executable will fail if any of the sub-executables fail.",
Expand All @@ -467,10 +457,6 @@
},
"params": {
"$ref": "#/definitions/ExecutableParameterList"
},
"refs": {
"$ref": "#/definitions/ExecutableRefList",
"description": "DEPRECATED: Use `execs` instead.\n\nA list of references to other executables to run in serial.\nOne of `refs` or `execs` must be set.\n"
}
}
},
Expand Down Expand Up @@ -559,7 +545,8 @@
"items": {
"type": "string"
}
}
},
"Ref": {}
},
"properties": {
"description": {
Expand Down
24 changes: 11 additions & 13 deletions docs/types/flowfile.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,10 @@ Launches an application or opens a URI.
| Field | Description | Type | Default | Required |
| ----- | ----------- | ---- | ------- | :--------: |
| `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | <no value> | |
| `execs` | A list of executables to run in parallel. Each executable can be a command or a reference to another executable. One of `refs` or `execs` must be set. | [ExecutableParallelRefConfigList](#ExecutableParallelRefConfigList) | <no value> | |
| `execs` | A list of executables to run in parallel. Each executable can be a command or a reference to another executable. | [ExecutableParallelRefConfigList](#ExecutableParallelRefConfigList) | <no value> | |
| `failFast` | If set to true, the parallel executable will fail if any of the sub-executables fail. | `boolean` | false | |
| `maxThreads` | The maximum number of threads to use when executing the parallel executables. | `integer` | 5 | |
| `params` | | [ExecutableParameterList](#ExecutableParameterList) | <no value> | |
| `refs` | DEPRECATED: Use `execs` instead. A list of references to other executables to run in parallel. One of `refs` or `execs` must be set. | [ExecutableRefList](#ExecutableRefList) | <no value> ||

### ExecutableParallelRefConfig

Expand Down Expand Up @@ -264,15 +263,6 @@ If the namespace is not specified, the current namespace will be used.



### ExecutableRefList



**Type:** `array` ([ExecutableRef](#ExecutableRef))




### ExecutableRenderExecutableType

Renders a markdown template file with data.
Expand Down Expand Up @@ -344,10 +334,9 @@ Executes a list of executables in serial.
| Field | Description | Type | Default | Required |
| ----- | ----------- | ---- | ------- | :--------: |
| `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | <no value> | |
| `execs` | A list of executables to run in serial. Each executable can be a command or a reference to another executable. One of `refs` or `execs` must be set. | [ExecutableSerialRefConfigList](#ExecutableSerialRefConfigList) | <no value> | |
| `execs` | A list of executables to run in serial. Each executable can be a command or a reference to another executable. | [ExecutableSerialRefConfigList](#ExecutableSerialRefConfigList) | <no value> | |
| `failFast` | If set to true, the serial executable will fail if any of the sub-executables fail. | `boolean` | false | |
| `params` | | [ExecutableParameterList](#ExecutableParameterList) | <no value> | |
| `refs` | DEPRECATED: Use `execs` instead. A list of references to other executables to run in serial. One of `refs` or `execs` must be set. | [ExecutableRefList](#ExecutableRefList) | <no value> ||

### ExecutableSerialRefConfig

Expand Down Expand Up @@ -446,4 +435,13 @@ A list of `.sh` files to convert into generated executables in the file's execut



### Ref









60 changes: 1 addition & 59 deletions internal/runner/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/jahvon/flow/internal/context"
"github.com/jahvon/flow/internal/runner"
"github.com/jahvon/flow/internal/utils"
argUtils "github.com/jahvon/flow/internal/utils/args"
execUtils "github.com/jahvon/flow/internal/utils/executables"
"github.com/jahvon/flow/types/executable"
Expand Down Expand Up @@ -39,70 +38,13 @@ func (r *parallelRunner) Exec(ctx *context.Context, e *executable.Executable, pr
return errors.Wrap(err, "unable to set parameters to env")
}

if err := utils.ValidateOneOf("executable list", parallelSpec.Refs, parallelSpec.Execs); err != nil {
return err
}

if len(parallelSpec.Refs) > 0 {
return handleExecRef(ctx, parallelSpec, promptedEnv)
} else if len(parallelSpec.Execs) > 0 {
if len(parallelSpec.Execs) > 0 {
return handleExec(ctx, e, parallelSpec, promptedEnv)
}

return fmt.Errorf("no parallel executables to run")
}

func handleExecRef(
ctx *context.Context, parallelSpec *executable.ParallelExecutableType, promptedEnv map[string]string,
) error {
refs := parallelSpec.Refs
groupCtx, cancel := stdCtx.WithCancel(ctx.Ctx)
defer cancel()
group, _ := errgroup.WithContext(groupCtx)
limit := parallelSpec.MaxThreads
if limit == 0 {
limit = 5
}
group.SetLimit(limit)
var errs []error
for _, ref := range refs {
ref = context.ExpandRef(ctx, ref)
exec, err := execUtils.ExecutableForRef(ctx, ref)
if err != nil {
return err
}

fields := map[string]interface{}{
"step": exec.ID(),
}
exec.Exec.SetLogFields(fields)

group.Go(func() error {
if parallelSpec.FailFast {
if err := runner.Exec(ctx, exec, promptedEnv); err != nil {
cancel()
return err
}
} else {
err := runner.Exec(ctx, exec, promptedEnv)
if err != nil {
errs = append(errs, err)
ctx.Logger.Errorx("execution error", "err", err, "ref", exec.Ref())
}
}
return nil
})
}
if err := group.Wait(); err != nil {
return errors.Wrap(err, "parallel execution error")
}

if len(errs) > 0 {
return fmt.Errorf("%d execution errors - %v", len(errs), errs)
}
return nil
}

//nolint:gocognit
func handleExec(
ctx *context.Context, parent *executable.Executable,
Expand Down
96 changes: 1 addition & 95 deletions internal/runner/parallel/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,101 +61,7 @@ var _ = Describe("ParallelRunner", func() {
})
})

When("Executables with ref", func() {
var (
rootExec *executable.Executable
subExecs executable.ExecutableList
)

BeforeEach(func() {
ns := "examples"
rootExec = builder.ParallelExecByRef(
builder.WithNamespaceName(ns),
builder.WithWorkspaceName(ctx.Ctx.CurrentWorkspace.AssignedName()),
builder.WithWorkspacePath(ctx.Ctx.CurrentWorkspace.Location()),
)
execFlowfile := builder.ExamplesExecFlowFile(
builder.WithNamespaceName(ns),
builder.WithWorkspaceName(ctx.Ctx.CurrentWorkspace.AssignedName()),
builder.WithWorkspacePath(ctx.Ctx.CurrentWorkspace.Location()),
)
subExecs = testUtils.FindSubExecs(rootExec, executable.FlowFileList{execFlowfile})

runner.RegisterRunner(parallelRnr)
runner.RegisterRunner(ctx.RunnerMock)
ctx.RunnerMock.EXPECT().IsCompatible(rootExec).Return(false).AnyTimes()
})

It("should execute all sub execs", func() {
promptedEnv := make(map[string]string)
mockRunner := ctx.RunnerMock
mockCache := ctx.ExecutableCache

for _, e := range subExecs {
isParallelExec := testUtils.ExecWithRef(e.Ref())
mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1)
mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1)
mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(nil).Times(1)
}

rootExec.Parallel.MaxThreads = 1
Expect(parallelRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).To(Succeed())

})

It("should fail fast when enabled", func() {
promptedEnv := make(map[string]string)
mockRunner := ctx.RunnerMock
mockCache := ctx.ExecutableCache

for i, e := range subExecs {
switch i {
case 0, 2:
isParallelExec := testUtils.ExecWithRef(e.Ref())
mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).MaxTimes(1)
mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).MaxTimes(1)
mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(nil).MaxTimes(1)
case 1:
isParallelExec := testUtils.ExecWithRef(e.Ref())
mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1)
mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1)
mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(errors.New("error")).Times(1)
}
}

rootExec.Parallel.FailFast = true
Expect(parallelRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).ToNot(Succeed())
})

It("should not fail fast when disabled", func() {
promptedEnv := make(map[string]string)
mockRunner := ctx.RunnerMock
mockCache := ctx.ExecutableCache
mockLogger := ctx.Logger

for i, e := range subExecs {
switch i {
case 0, 2:
isParallelExec := testUtils.ExecWithRef(e.Ref())
mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1)
mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1)
mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).Return(nil).Times(1)
case 1:
isParallelExec := testUtils.ExecWithRef(e.Ref())
mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1)
mockRunner.EXPECT().IsCompatible(isParallelExec).Return(true).Times(1)
mockRunner.EXPECT().Exec(ctx.Ctx, isParallelExec, promptedEnv).
Return(errors.New("error")).Times(1)
mockLogger.EXPECT().
Errorx("execution error", "err", gomock.Any(), "ref", e.Ref()).Times(1)
}
}

Expect(parallelRnr.Exec(ctx.Ctx, rootExec, promptedEnv)).ToNot(Succeed())
})
})

When("Executables with ref config", func() {
When("Exec", func() {
var (
rootExec *executable.Executable
subExecs executable.ExecutableList
Expand Down
Loading

0 comments on commit d9eb251

Please sign in to comment.