Skip to content

Commit

Permalink
osbuildprogress: use a WaitGroup to syncronise goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
mvo5 committed Jul 30, 2024
1 parent 82d376a commit 601e27b
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions bib/internal/osbuildprogress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
"time"

"github.com/cheggaaa/pb/v3"
Expand Down Expand Up @@ -45,16 +46,18 @@ type OsbuildJsonProgress struct {
Message string `json:"message"`
}

func scanJsonSeq(r io.Reader, ch chan OsbuildJsonProgress, errCh chan error) {
var progress OsbuildJsonProgress
func scanJsonSeq(wg *sync.WaitGroup, r io.Reader, ch chan OsbuildJsonProgress, errCh chan error) {
wg.Add(1)
defer wg.Done()

var progress OsbuildJsonProgress
scanner := bufio.NewScanner(r)
for scanner.Scan() {
// XXX: use a proper jsonseq reader?
line := scanner.Bytes()
line = bytes.Trim(line, "\x1e")
if err := json.Unmarshal(line, &progress); err != nil {
// XXX: provide an invalid lines chan
// XXX: provide an invalid lines chan here?
errCh <- err
continue
}
Expand All @@ -63,14 +66,17 @@ func scanJsonSeq(r io.Reader, ch chan OsbuildJsonProgress, errCh chan error) {
if err := scanner.Err(); err != nil && err != io.EOF {
errCh <- err
}
errCh <- io.EOF
}

func AttachProgress(r io.Reader, w io.Writer) {
var progress OsbuildJsonProgress
func AttachProgress(wg *sync.WaitGroup, r io.Reader, w io.Writer) {
wg.Add(1)
defer wg.Done()

var progress OsbuildJsonProgress
ch := make(chan OsbuildJsonProgress)
errCh := make(chan error)
go scanJsonSeq(r, ch, errCh)
go scanJsonSeq(wg, r, ch, errCh)

lastMessage := "-"

Expand All @@ -89,14 +95,17 @@ func AttachProgress(r io.Reader, w io.Writer) {
fmt.Fprintf(os.Stderr, "progress failed: %v\n", err)
return
}
defer pool.Stop()

contextMap := map[string]string{}

for {
select {
case err := <-errCh:
if err == io.EOF {
return
}
fmt.Fprintf(os.Stderr, "error: %v", err)
break
case progress = <-ch:
id := progress.Context.Pipeline.ID
pipelineName := contextMap[id]
Expand Down Expand Up @@ -144,11 +153,12 @@ func AttachProgress(r io.Reader, w io.Writer) {
// nothing
}
}
pool.Stop()
}

// XXX: merge back into images/pkg/osbuild/osbuild-exec.go(?)
func RunOSBuild(manifest []byte, store, outputDirectory string, exports, extraEnv []string) error {
wg := &sync.WaitGroup{}

rp, wp, err := os.Pipe()
if err != nil {
return fmt.Errorf("cannot create pipe for osbuild: %w", err)
Expand All @@ -171,7 +181,7 @@ func RunOSBuild(manifest []byte, store, outputDirectory string, exports, extraEn
// exported here
cmd.Stdout = nil
cmd.ExtraFiles = []*os.File{wp}
go AttachProgress(rp, os.Stdout)
go AttachProgress(wg, rp, os.Stdout)

for _, export := range exports {
cmd.Args = append(cmd.Args, "--export", export)
Expand All @@ -181,6 +191,12 @@ func RunOSBuild(manifest []byte, store, outputDirectory string, exports, extraEn
}
wp.Close()

// XXX: add WaitGroup
return cmd.Wait()
if err := cmd.Wait(); err != nil {
return fmt.Errorf("error running osbuild: %w", err)
}

// wait until the goroutines are finished too or we get premature
// exit of the progress reading and half finished progress bars
wg.Wait()
return nil
}

0 comments on commit 601e27b

Please sign in to comment.