From c8cea3bdd27dbadd350e5312f8f0a757fca329db Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 14 Sep 2024 20:02:09 +0200 Subject: [PATCH] wait for output streams on process completion in `run_shell` task --- pkg/coordinator/tasks/run_shell/task.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/coordinator/tasks/run_shell/task.go b/pkg/coordinator/tasks/run_shell/task.go index 092953c..e4fcf33 100644 --- a/pkg/coordinator/tasks/run_shell/task.go +++ b/pkg/coordinator/tasks/run_shell/task.go @@ -96,10 +96,10 @@ func (t *Task) Execute(ctx context.Context) error { return err } - stdoutChan := t.readOutputStream(stdout, cmdLogger.WithField("stream", "stdout")) + stdoutChan, stdoutCloseChan := t.readOutputStream(stdout, cmdLogger.WithField("stream", "stdout")) defer close(stdoutChan) - stderrChan := t.readOutputStream(stderr, cmdLogger.WithField("stream", "stderr")) + stderrChan, stderrCloseChan := t.readOutputStream(stderr, cmdLogger.WithField("stream", "stderr")) defer close(stderrChan) // add env vars @@ -139,7 +139,7 @@ func (t *Task) Execute(ctx context.Context) error { stdin.Close() - // wait for process + // wait for process & output streams var execErr error waitChan := make(chan bool) @@ -148,9 +148,8 @@ func (t *Task) Execute(ctx context.Context) error { execErr = command.Wait() - // give stdout/stderr handler some time to parse remaining outputs - // TODO: find a better solution to wait for IO streams before continuing here - time.Sleep(100 * time.Millisecond) + <-stdoutCloseChan + <-stderrCloseChan }() // wait for output handler @@ -179,12 +178,15 @@ cmdloop: return nil } -func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) chan string { - resChan := make(chan string) +func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) (readChan chan string, closeChan chan bool) { + readChan = make(chan string) + closeChan = make(chan bool) go func() { var err error + defer close(closeChan) + reader := bufio.NewReader(pipe) for err == nil { @@ -197,7 +199,6 @@ func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) c line, isPrefix, err = reader.ReadLine() if err != nil { if err == io.EOF { - logger.Errorf("EOF") break } @@ -210,12 +211,12 @@ func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) c } if len(ln) > 0 { - resChan <- string(ln) + readChan <- string(ln) } } }() - return resChan + return readChan, closeChan } var outputVarPattern = regexp.MustCompile(`^::set-var +([^ ]+) +(.*)$`)