Skip to content

Commit

Permalink
refactor: move shared retry and concurrency handling (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
jahvon authored Nov 17, 2024
1 parent 6f324a0 commit 4ab1e10
Show file tree
Hide file tree
Showing 29 changed files with 851 additions and 456 deletions.
4 changes: 3 additions & 1 deletion cmd/internal/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/jahvon/flow/internal/context"
"github.com/jahvon/flow/internal/io"
"github.com/jahvon/flow/internal/runner"
"github.com/jahvon/flow/internal/runner/engine"
"github.com/jahvon/flow/internal/runner/exec"
"github.com/jahvon/flow/internal/runner/launch"
"github.com/jahvon/flow/internal/runner/parallel"
Expand Down Expand Up @@ -132,7 +133,8 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
}
}
startTime := time.Now()
if err := runner.Exec(ctx, e, envMap); err != nil {
eng := engine.NewExecEngine()
if err := runner.Exec(ctx, e, eng, envMap); err != nil {
logger.FatalErr(err)
}
dur := time.Since(startTime)
Expand Down
1 change: 1 addition & 0 deletions codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ coverage:
only_pulls: true
ignore:
- "**/*.gen.go" # ignore generated code
- "**/*_md.go" # ignore markdown generators
- "tools/**" # ignore tools directory
- "tests/utils/**" # ignore test utilities
- "**/testdata/**" # ignore test data
Expand Down
10 changes: 4 additions & 6 deletions docs/schemas/flowfile_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,8 @@
"description": "A list of executables to run in parallel.\nEach executable can be a command or a reference to another executable.\n"
},
"failFast": {
"description": "If set to true, the parallel executable will fail if any of the sub-executables fail.",
"type": "boolean",
"default": false
"description": "End the parallel execution as soon as an exec exits with a non-zero status. This is the default behavior.\nWhen set to false, all execs will be run regardless of the exit status of parallel execs.\n",
"type": "boolean"
},
"maxThreads": {
"description": "The maximum number of threads to use when executing the parallel executables.",
Expand Down Expand Up @@ -451,9 +450,8 @@
"description": "A list of executables to run in serial.\nEach executable can be a command or a reference to another executable.\n"
},
"failFast": {
"description": "If set to true, the serial executable will fail if any of the sub-executables fail.",
"type": "boolean",
"default": false
"description": "End the serial execution as soon as an exec exits with a non-zero status. This is the default behavior.\nWhen set to false, all execs will be run regardless of the exit status of the previous exec.\n",
"type": "boolean"
},
"params": {
"$ref": "#/definitions/ExecutableParameterList"
Expand Down
4 changes: 2 additions & 2 deletions docs/types/flowfile.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Launches an application or opens a URI.
| ----- | ----------- | ---- | ------- | :--------: |
| `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | <no value> | |
| `execs` | A list of executables to run in parallel. Each executable can be a command or a reference to another executable. | [ExecutableParallelRefConfigList](#ExecutableParallelRefConfigList) | <no value> ||
| `failFast` | If set to true, the parallel executable will fail if any of the sub-executables fail. | `boolean` | false | |
| `failFast` | End the parallel execution as soon as an exec exits with a non-zero status. This is the default behavior. When set to false, all execs will be run regardless of the exit status of parallel execs. | `boolean` | <no value> | |
| `maxThreads` | The maximum number of threads to use when executing the parallel executables. | `integer` | 5 | |
| `params` | | [ExecutableParameterList](#ExecutableParameterList) | <no value> | |

Expand Down Expand Up @@ -335,7 +335,7 @@ Executes a list of executables in serial.
| ----- | ----------- | ---- | ------- | :--------: |
| `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | <no value> | |
| `execs` | A list of executables to run in serial. Each executable can be a command or a reference to another executable. | [ExecutableSerialRefConfigList](#ExecutableSerialRefConfigList) | <no value> ||
| `failFast` | If set to true, the serial executable will fail if any of the sub-executables fail. | `boolean` | false | |
| `failFast` | End the serial execution as soon as an exec exits with a non-zero status. This is the default behavior. When set to false, all execs will be run regardless of the exit status of the previous exec. | `boolean` | <no value> | |
| `params` | | [ExecutableParameterList](#ExecutableParameterList) | <no value> | |

### ExecutableSerialRefConfig
Expand Down
182 changes: 182 additions & 0 deletions internal/runner/engine/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package engine

import (
"context"
"fmt"

"golang.org/x/sync/errgroup"

"github.com/jahvon/flow/internal/runner/engine/retry"
)

//go:generate mockgen -destination=mocks/mock_engine.go -package=mocks github.com/jahvon/flow/internal/runner/engine Engine

type Result struct {
ID string
Error error
Retries int
}

type ResultSummary struct {
Results []Result
}

func (rs ResultSummary) HasErrors() bool {
for _, r := range rs.Results {
if r.Error != nil {
return true
}
}
return false
}

func (rs ResultSummary) String() string {
var res string
if rs.HasErrors() {
res += "execution error encountered\n\n"
}
for _, r := range rs.Results {
if r.Error == nil {
continue
}
res += fmt.Sprintf("- Executable: %s\n Error: %v", r.ID, r.Error)
if r.Retries > 0 {
res += fmt.Sprintf("\n Retries: %d\n", r.Retries)
}
}
return res
}

type Exec struct {
ID string
Function func() error
MaxRetries int
}

type ExecutionMode int

const (
Parallel ExecutionMode = iota
Serial
)

type Options struct {
MaxThreads int
ExecutionMode ExecutionMode
FailFast *bool
}

type OptionFunc func(*Options)

type Engine interface {
Execute(ctx context.Context, execs []Exec, opts ...OptionFunc) ResultSummary
}

type execEngine struct{}

func NewExecEngine() Engine {
return &execEngine{}
}

func WithMaxThreads(maxThreads int) OptionFunc {
return func(o *Options) {
o.MaxThreads = maxThreads
}
}

func WithFailFast(failFast *bool) OptionFunc {
return func(o *Options) {
o.FailFast = failFast
}
}

func WithMode(mode ExecutionMode) OptionFunc {
return func(o *Options) {
o.ExecutionMode = mode
}
}

func (e *execEngine) Execute(ctx context.Context, execs []Exec, opts ...OptionFunc) ResultSummary {
options := Options{MaxThreads: 0, ExecutionMode: Serial}
for _, opt := range opts {
opt(&options)
}
var results []Result
switch options.ExecutionMode {
case Parallel:
results = e.executeParallel(ctx, execs, options)
case Serial:
results = e.executeSerial(ctx, execs, options)
default:
results = []Result{{Error: fmt.Errorf("invalid execution mode")}}
}
return ResultSummary{Results: results}
}

func (e *execEngine) executeParallel(ctx context.Context, execs []Exec, opts Options) []Result {
results := make([]Result, len(execs))

groupCtx, groupCancel := context.WithCancel(ctx)
defer groupCancel()
group, _ := errgroup.WithContext(groupCtx)
limit := opts.MaxThreads
if limit == 0 {
limit = len(execs)
}
group.SetLimit(limit)

for i, exec := range execs {
runExec := func() error {
rh := retry.NewRetryHandler(exec.MaxRetries, 0)
err := rh.Execute(exec.Function)
results[i] = Result{
ID: exec.ID,
Error: err,
Retries: rh.GetStats().Attempts - 1,
}
ff := opts.FailFast == nil || *opts.FailFast
if err != nil && ff {
return err
}
return nil
}
group.Go(runExec)
}

if err := group.Wait(); err != nil {
if len(results) > 0 {
return results
}
return []Result{{Error: err}}
}
return results
}

func (e *execEngine) executeSerial(ctx context.Context, execs []Exec, opts Options) []Result {
results := make([]Result, len(execs))
for i, exec := range execs {
select {
case <-ctx.Done():
results[i] = Result{
ID: exec.ID,
Error: ctx.Err(),
}
return results
default:
rh := retry.NewRetryHandler(exec.MaxRetries, 0)
err := rh.Execute(exec.Function)
results[i] = Result{
ID: exec.ID,
Error: err,
Retries: rh.GetStats().Attempts - 1,
}

ff := opts.FailFast == nil || *opts.FailFast
if err != nil && ff {
return results[:i+1]
}
}
}

return results
}
126 changes: 126 additions & 0 deletions internal/runner/engine/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package engine_test

import (
"context"
"errors"
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/jahvon/flow/internal/runner/engine"
)

func TestEngine_Execute(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Execute Engine Suite")
}

var _ = Describe("e.Execute", func() {
var (
eng engine.Engine
ctx context.Context
cancel context.CancelFunc
)

BeforeEach(func() {
eng = engine.NewExecEngine()
ctx, cancel = context.WithCancel(context.Background())
})

AfterEach(func() {
cancel()
})

Context("Parallel execution", func() {
It("should execute execs in parallel", func() {
execs := []engine.Exec{
{ID: "exec1", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
{ID: "exec2", Function: func() error { return nil }},
}

start := time.Now()
ff := false
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel), engine.WithFailFast(&ff))
duration := time.Since(start)

Expect(summary.Results).To(HaveLen(2))
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
Expect(duration).To(BeNumerically("<", 200*time.Millisecond))
})

It("should handle exec failures with fail fast", func() {
execs := []engine.Exec{
{ID: "exec1", Function: func() error { return errors.New("error") }},
{ID: "exec2", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
}

ff := true
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel), engine.WithFailFast(&ff))

Expect(summary.Results).To(HaveLen(2))
Expect(summary.Results[0].Error).To(HaveOccurred())
Expect(summary.Results[1].Error).ToNot(HaveOccurred())
Expect(summary.HasErrors()).To(BeTrue())
})

It("should limit the number of concurrent execs", func() {
execs := []engine.Exec{
{ID: "exec1", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
{ID: "exec2", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
{ID: "exec3", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
{ID: "exec4", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
{ID: "exec5", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
}

start := time.Now()
ff := false
summary := eng.Execute(ctx, execs,
engine.WithMode(engine.Parallel), engine.WithFailFast(&ff), engine.WithMaxThreads(2))
duration := time.Since(start)

Expect(summary.Results).To(HaveLen(5))
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
Expect(summary.Results[2].Error).NotTo(HaveOccurred())
Expect(summary.Results[3].Error).NotTo(HaveOccurred())
Expect(summary.Results[4].Error).NotTo(HaveOccurred())
Expect(duration).To(BeNumerically(">=", 250*time.Millisecond))
})
})

Context("Serial execution", func() {
It("should execute execs serially", func() {
execs := []engine.Exec{
{ID: "exec1", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
{ID: "exec2", Function: func() error { time.Sleep(110 * time.Millisecond); return nil }},
}

start := time.Now()
ff := false
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff))
duration := time.Since(start)

Expect(summary.Results).To(HaveLen(2))
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
Expect(duration).To(BeNumerically(">=", 200*time.Millisecond))
})

It("should handle exec failures with fail fast", func() {
execs := []engine.Exec{
{ID: "exec1", Function: func() error { return errors.New("error") }},
{ID: "exec2", Function: func() error { return nil }},
}

ff := true
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff))

Expect(summary.Results).To(HaveLen(1))
Expect(summary.Results[0].Error).To(HaveOccurred())
Expect(summary.HasErrors()).To(BeTrue())
})
})
})
Loading

0 comments on commit 4ab1e10

Please sign in to comment.