Skip to content

Commit

Permalink
wait for output streams on process completion in run_shell task
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Sep 14, 2024
1 parent f2cb5bd commit c8cea3b
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions pkg/coordinator/tasks/run_shell/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func (t *Task) Execute(ctx context.Context) error {
return err
}

stdoutChan := t.readOutputStream(stdout, cmdLogger.WithField("stream", "stdout"))
stdoutChan, stdoutCloseChan := t.readOutputStream(stdout, cmdLogger.WithField("stream", "stdout"))
defer close(stdoutChan)

stderrChan := t.readOutputStream(stderr, cmdLogger.WithField("stream", "stderr"))
stderrChan, stderrCloseChan := t.readOutputStream(stderr, cmdLogger.WithField("stream", "stderr"))
defer close(stderrChan)

// add env vars
Expand Down Expand Up @@ -139,7 +139,7 @@ func (t *Task) Execute(ctx context.Context) error {

stdin.Close()

// wait for process
// wait for process & output streams
var execErr error

waitChan := make(chan bool)
Expand All @@ -148,9 +148,8 @@ func (t *Task) Execute(ctx context.Context) error {

execErr = command.Wait()

// give stdout/stderr handler some time to parse remaining outputs
// TODO: find a better solution to wait for IO streams before continuing here
time.Sleep(100 * time.Millisecond)
<-stdoutCloseChan
<-stderrCloseChan
}()

// wait for output handler
Expand Down Expand Up @@ -179,12 +178,15 @@ cmdloop:
return nil
}

func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) chan string {
resChan := make(chan string)
func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) (readChan chan string, closeChan chan bool) {
readChan = make(chan string)
closeChan = make(chan bool)

go func() {
var err error

defer close(closeChan)

reader := bufio.NewReader(pipe)

for err == nil {
Expand All @@ -197,7 +199,6 @@ func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) c
line, isPrefix, err = reader.ReadLine()
if err != nil {
if err == io.EOF {
logger.Errorf("EOF")
break
}

Expand All @@ -210,12 +211,12 @@ func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) c
}

if len(ln) > 0 {
resChan <- string(ln)
readChan <- string(ln)
}
}
}()

return resChan
return readChan, closeChan
}

var outputVarPattern = regexp.MustCompile(`^::set-var +([^ ]+) +(.*)$`)
Expand Down

0 comments on commit c8cea3b

Please sign in to comment.