Skip to content

Commit

Permalink
Merge pull request moby#49064 from vvoland/c8d-extract-progress
Browse files Browse the repository at this point in the history
c8d/pull: Show `Extracting` layer status
  • Loading branch information
thaJeztah authored Dec 13, 2024
2 parents 1e0477f + 96ef852 commit 944e403
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 8 deletions.
7 changes: 6 additions & 1 deletion daemon/containerd/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func (i *ImageService) pullTag(ctx context.Context, ref reference.Named, platfor
})
opts = append(opts, containerd.WithImageHandler(h))

pp := pullProgress{store: i.content, showExists: true}
pp := &pullProgress{
store: i.content,
snapshotter: i.snapshotterService(i.snapshotter),
showExists: true,
}
finishProgress := jobs.showProgress(ctx, out, pp)

defer func() {
Expand Down Expand Up @@ -195,6 +199,7 @@ func (i *ImageService) pullTag(ctx context.Context, ref reference.Named, platfor

// AppendInfoHandlerWrapper will annotate the image with basic information like manifest and layer digests as labels;
// this information is used to enable remote snapshotters like nydus and stargz to query a registry.
// This is also needed for the pull progress to detect the `Extracting` status.
infoHandler := snapshotters.AppendInfoHandlerWrapper(ref.String())
opts = append(opts, containerd.WithImageHandlerWrapper(infoHandler))

Expand Down
2 changes: 1 addition & 1 deletion daemon/containerd/image_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (i *ImageService) pushRef(ctx context.Context, targetRef reference.Named, p
jobsQueue := newJobs()
finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
&pp,
pullProgress{showExists: false, store: store},
&pullProgress{showExists: false, store: store},
}))
defer func() {
finishProgress()
Expand Down
81 changes: 75 additions & 6 deletions daemon/containerd/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package containerd
import (
"context"
"errors"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/snapshotters"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/snapshots"
cerrdefs "github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/distribution/reference"
Expand Down Expand Up @@ -107,12 +110,15 @@ func (j *jobs) Jobs() []ocispec.Descriptor {
}

type pullProgress struct {
store content.Store
showExists bool
hideLayers bool
store content.Store
showExists bool
hideLayers bool
snapshotter snapshots.Snapshotter
layers []ocispec.Descriptor
unpackStart map[digest.Digest]time.Time
}

func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
func (p *pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
actives, err := p.store.ListStatuses(ctx, "")
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
Expand Down Expand Up @@ -157,22 +163,85 @@ func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out pro
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
p.finished(ctx, out, j)
ongoing.Remove(j)
} else if p.showExists {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Already exists",
HideCounts: true,
LastUpdate: true,
})
p.finished(ctx, out, j)
ongoing.Remove(j)
}
}

var committedIdx []int
for idx, desc := range p.layers {
// Find the snapshot corresponding to this layer
walkFilter := "labels.\"" + snapshotters.TargetLayerDigestLabel + "\"==" + p.layers[idx].Digest.String()

err := p.snapshotter.Walk(ctx, func(ctx context.Context, sn snapshots.Info) error {
if sn.Kind == snapshots.KindActive {
if p.unpackStart == nil {
p.unpackStart = make(map[digest.Digest]time.Time)
}
var seconds int64
if began, ok := p.unpackStart[desc.Digest]; !ok {
p.unpackStart[desc.Digest] = time.Now()
} else {
seconds = int64(time.Since(began).Seconds())
}

// We _could_ get the current size of snapshot by calling Usage, but this is too expensive
// and could impact performance. So we just show the "Extracting" message with the elapsed time as progress.
out.WriteProgress(
progress.Progress{
ID: stringid.TruncateID(desc.Digest.Encoded()),
Action: "Extracting",
// Start from 1s, because without Total, 0 won't be shown at all.
Current: 1 + seconds,
Units: "s",
})
return nil
}

if sn.Kind == snapshots.KindCommitted {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(desc.Digest.Encoded()),
Action: "Pull complete",
HideCounts: true,
LastUpdate: true,
})

committedIdx = append(committedIdx, idx)
return nil
}
return nil
}, walkFilter)
if err != nil {
return err
}
}

// Remove finished/committed layers from p.layers
if len(committedIdx) > 0 {
sort.Ints(committedIdx)
for i := len(committedIdx) - 1; i >= 0; i-- {
p.layers = append(p.layers[:committedIdx[i]], p.layers[committedIdx[i]+1:]...)
}
}

return nil
}

func (p *pullProgress) finished(ctx context.Context, out progress.Output, desc ocispec.Descriptor) {
if images.IsLayerType(desc.MediaType) {
p.layers = append(p.layers, desc)
}
}

type pushProgress struct {
Tracker docker.StatusTracker
notStartedWaitingAreUnavailable atomic.Bool
Expand Down

0 comments on commit 944e403

Please sign in to comment.