Skip to content

Commit

Permalink
Command Builder
Browse files Browse the repository at this point in the history
This patch implements command-builder which allows multiple command strings
to be batched, then executed using one of several execution policies, including
- in series
- concurrent
- piped (not ready)
- exit on error
- continue on error

Changes also include refactor and updated tests.
  • Loading branch information
vladimirvivien committed Oct 2, 2021
1 parent a66ccd3 commit 29d5ba4
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 16 deletions.
143 changes: 143 additions & 0 deletions exec/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package exec

import (
"sync"
)

type CommandPolicy byte

const (
CmdOnErrContinue CommandPolicy = 1 << iota
CmdOnErrExit
CmdExecSerial
CmdExecConcurrent
CmdExecPipe
)

type CommandProcs struct {
procs []*Proc
}
type CommandBuilder struct {
cmdPolicy CommandPolicy
procs []*Proc
procChan chan *Proc
}

// Commands creates a *CommandBuilder used to collect
// command strings to be executed.
func Commands(cmds ...string) *CommandBuilder {
cb := new(CommandBuilder)
for _, cmd := range cmds {
cb.procs = append(cb.procs, NewProc(cmd))
}
return cb
}

// WithPolicy sets one or more command policy mask values, i.e. (CmdOnErrContinue | CmdExecConcurrent)
func (cb *CommandBuilder) WithPolicy(policyMask CommandPolicy) *CommandBuilder {
cb.cmdPolicy = policyMask
return cb
}

// Add adds a new command string to the builder
func (cb *CommandBuilder) Add(cmds ...string) *CommandBuilder {
for _, cmd := range cmds {
cb.procs = append(cb.procs, NewProc(cmd))
}
return cb
}

// Run is a shortcut for executing the procs serially:
//
// cb.WithPolicy(CmdOnErrContinue).Start().Wait()
//
func (cb *CommandBuilder) Run() CommandProcs {
return cb.WithPolicy(CmdOnErrContinue).Start().Wait()
}

// ConcurRun is a shortcut for executing procs concurrently:
//
// cb.WithPolicy(CmdExecConcurrent).Start().Wait()
//
func (cb *CommandBuilder) ConcurRun() CommandProcs {
return cb.WithPolicy(CmdOnErrContinue | CmdExecConcurrent).Start().Wait()
}

// Start starts running the registered procs serially and returns immediately.
// This should be followed by a call to Wait to retrieve results.
func (cb *CommandBuilder) Start() *CommandBuilder {
if len(cb.procs) == 0 {
return cb
}

cb.procChan = make(chan *Proc, len(cb.procs))
switch {
case hasPolicy(cb.cmdPolicy, CmdExecConcurrent):
// launch each command in its own goroutine
go func() {
defer close(cb.procChan)
var gate sync.WaitGroup
for _, proc := range cb.procs {
gate.Add(1)
go func(wg *sync.WaitGroup, ch chan<- *Proc, p *Proc) {
defer wg.Done()
ch <- p.Start()
}(&gate, cb.procChan, proc)
}
// wait for procs to launch
gate.Wait()
}()

case hasPolicy(cb.cmdPolicy, CmdExecPipe):
// pipe successive commands serially
go func(ch chan<- *Proc) {
defer close(cb.procChan)
if len(cb.procs) == 1 {
ch <- cb.procs[0].Start()
return
}
}(cb.procChan)
default:
// launch all procs (serially), return immediately
go func(ch chan<- *Proc) {
defer close(cb.procChan)
for _, proc := range cb.procs {
ch <- proc.Start()
}
}(cb.procChan)
}
return cb
}

func (cb *CommandBuilder) Wait() CommandProcs {
if len(cb.procs) == 0 || cb.procChan == nil {
return CommandProcs{procs: []*Proc{}}
}

var result CommandProcs
for proc := range cb.procChan {
result.procs = append(result.procs, proc)

// check for start err
if proc.Err() != nil {
if hasPolicy(cb.cmdPolicy, CmdOnErrExit) {
break
}
}

// wait for command to complete
if err := proc.Wait().Err(); err != nil {
if hasPolicy(cb.cmdPolicy, CmdOnErrExit) {
break
}
}
}
return result
}

func hasPolicy(mask, pol CommandPolicy) bool {
return (mask & pol) != 0
}

// TODO - add termination methods
// - Pipe() - Runs each command, piping result of prev command into std input of next command
194 changes: 194 additions & 0 deletions exec/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package exec

import (
"testing"
)

func TestCommandBuilder(t *testing.T) {
tests := []struct {
name string
initCmds []string
additionalCmds []string
}{
{name: "no procs"},
{
name: "initial procs only",
initCmds: []string{"echo 'hello world'", "date", "ls -al"},
},
{
name: "initial and one additional",
initCmds: []string{"echo 'hello world'", "date", "ls -al"},
additionalCmds: []string{"git commit --signoff"},
},
{
name: "initial and multiple additional",
initCmds: []string{"echo 'hello world'", "date", "ls -al"},
additionalCmds: []string{"git commit --signoff", "history", "man time", "man man"},
},
{
name: "no initial multiple additional",
additionalCmds: []string{"git commit --signoff", "history", "man time", "man man"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := Commands(test.initCmds...)
if len(test.initCmds) != len(c.procs) {
t.Error("unexpected command count in CommandBuilder")
}
c.Add(test.additionalCmds...)
if (len(test.initCmds) + len(test.additionalCmds)) != len(c.procs) {
t.Error("procs are not added to builder properly")
}
})
}
}

func TestCommandBuilder_Run(t *testing.T) {
tests := []struct {
name string
commands []string
expectedCmds int
}{
{
name: "zero procs",
},
{
name: "no error in procs",
commands: []string{"echo 'hello world'", "date", "ls -al"},
expectedCmds: 3,
},
{
name: "continue on 1 error",
commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"},
expectedCmds: 4,
},
{
name: "continue on 2 errors",
commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"},
expectedCmds: 5,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := Commands(test.commands...).Run()
if len(c.procs) != test.expectedCmds {
t.Errorf("expecting %d procs to run, got %d", test.expectedCmds, len(c.procs))
}
})
}
}

func TestCommandBuilder_ConcurRun(t *testing.T) {
tests := []struct {
name string
commands []string
expectedCmds int
}{
{
name: "zero procs",
},
{
name: "no error in procs",
commands: []string{"echo 'hello world'", "date", "ls -al"},
expectedCmds: 3,
},
{
name: "continue on 1 error",
commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"},
expectedCmds: 4,
},
{
name: "continue on 2 errors",
commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"},
expectedCmds: 5,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := Commands(test.commands...).ConcurRun()
if len(c.procs) != test.expectedCmds {
t.Errorf("expecting %d procs to run, got %d", test.expectedCmds, len(c.procs))
}
})
}
}

func TestCommandBuilder_StartWait(t *testing.T) {
tests := []struct {
name string
commands []string
policy CommandPolicy
expectedCmds int
}{
{
name: "zero procs",
},
{
name: "no error in procs",
commands: []string{"echo 'hello world'", "date", "ls -al"},
policy: CmdOnErrContinue,
expectedCmds: 3,
},
{
name: "continue on 1 error",
commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"},
policy: CmdOnErrContinue,
expectedCmds: 4,
},
{
name: "break on 1 error",
commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"},
policy: CmdOnErrExit,
expectedCmds: 1,
},
{
name: "continue on 2 errors",
commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"},
policy: CmdOnErrContinue,
expectedCmds: 5,
},
{
name: "break on 2 errors",
commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"},
policy: CmdOnErrExit,
expectedCmds: 1,
},
{
name: "concurrently no errors",
commands: []string{"echo 'hello world'", "date", "ls -al"},
policy: CmdExecConcurrent,
expectedCmds: 3,
},
{
name: "concurrent 1 error",
commands: []string{"foobar", "echo 'hello world'", "date", "ls -al"},
policy: CmdExecConcurrent,
expectedCmds: 4,
},
{
name: "continue on 2 errors",
commands: []string{"foobar", "echo 'hello world'", "daftpunk", "date", "ls -al"},
policy: CmdExecConcurrent,
expectedCmds: 5,
},
{
name: "Concurr|Continue with 1 err",
commands: []string{"man cat", "echo 'hello world'", "foo", "ls -al"},
policy: CmdOnErrContinue | CmdExecConcurrent,
expectedCmds: 4,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := Commands(test.commands...).WithPolicy(test.policy).Start().Wait()
if len(c.procs) != test.expectedCmds {
t.Errorf("expecting %d procs to run, got %d", test.expectedCmds, len(c.procs))
}
})
}
}
Loading

0 comments on commit 29d5ba4

Please sign in to comment.