Skip to content

Commit

Permalink
Merge pull request containerd#9769 from fuweid/17-backport-syncfs
Browse files Browse the repository at this point in the history
[release/1.7] Add option to perform syncfs after pull
  • Loading branch information
estesp authored Feb 7, 2024
2 parents 88fd474 + ea0a92e commit e52570c
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 60 deletions.
7 changes: 7 additions & 0 deletions api/next.pb.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3730,6 +3730,13 @@ file {
type_name: ".containerd.services.diff.v1.ApplyRequest.PayloadsEntry"
json_name: "payloads"
}
field {
name: "sync_fs"
number: 4
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "syncFs"
}
nested_type {
name: "PayloadsEntry"
field {
Expand Down
122 changes: 66 additions & 56 deletions api/services/diff/v1/diff.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/services/diff/v1/diff.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ message ApplyRequest {
repeated containerd.types.Mount mounts = 2;

map<string, google.protobuf.Any> payloads = 3;
// SyncFs is to synchronize the underlying filesystem containing files.
bool sync_fs = 4;
}

message ApplyResponse {
Expand Down
1 change: 1 addition & 0 deletions contrib/diffservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi
}
opts = append(opts, diff.WithPayloads(payloads))
}
opts = append(opts, diff.WithSyncFs(er.SyncFs))

ocidesc, err = s.applier.Apply(ctx, desc, mounts, opts...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion diff/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
r: io.TeeReader(processor, digester.Hash()),
}

if err := apply(ctx, mounts, rc); err != nil {
if err := apply(ctx, mounts, rc, config.SyncFs); err != nil {
return emptyDesc, err
}

Expand Down
4 changes: 3 additions & 1 deletion diff/apply/apply_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/containerd/containerd/mount"
)

func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, _sync bool) error {
// We currently do not support mounts nor bind mounts on MacOS in the containerd daemon.
// Using this as an exception to enable native snapshotter and allow further research.
if len(mounts) == 1 && mounts[0].Type == "bind" {
Expand All @@ -38,6 +38,8 @@ func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
path := mounts[0].Source
_, err := archive.Apply(ctx, path, r, opts...)
return err

// TODO: Do we need to sync all the filesystems?
}

return mount.WithTempMount(ctx, mounts, func(root string) error {
Expand Down
30 changes: 29 additions & 1 deletion diff/apply/apply_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ import (
"context"
"fmt"
"io"
"os"
"strings"

"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/pkg/userns"

"golang.org/x/sys/unix"
)

func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, sync bool) (retErr error) {
switch {
case len(mounts) == 1 && mounts[0].Type == "overlay":
// OverlayConvertWhiteout (mknod c 0 0) doesn't work in userns.
Expand All @@ -50,6 +53,9 @@ func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
opts = append(opts, archive.WithParents(parents))
}
_, err = archive.Apply(ctx, path, r, opts...)
if err == nil && sync {
err = doSyncFs(path)
}
return err
case len(mounts) == 1 && mounts[0].Type == "aufs":
path, parents, err := getAufsPath(mounts[0].Options)
Expand All @@ -67,6 +73,14 @@ func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
}
_, err = archive.Apply(ctx, path, r, opts...)
return err
case sync && len(mounts) == 1 && mounts[0].Type == "bind":
defer func() {
if retErr != nil {
return
}

retErr = doSyncFs(mounts[0].Source)
}()
}
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
Expand Down Expand Up @@ -130,3 +144,17 @@ func getAufsPath(options []string) (upper string, lower []string, err error) {
}
return
}

func doSyncFs(file string) error {
fd, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed to open %s: %w", file, err)
}
defer fd.Close()

_, _, errno := unix.Syscall(unix.SYS_SYNCFS, fd.Fd(), 0, 0)
if errno != 0 {
return fmt.Errorf("failed to syncfs for %s: %w", file, errno)
}
return nil
}
3 changes: 2 additions & 1 deletion diff/apply/apply_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"github.com/containerd/containerd/mount"
)

func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, _sync bool) error {
// TODO: for windows, how to sync?
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
return err
Expand Down
10 changes: 10 additions & 0 deletions diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Comparer interface {
type ApplyConfig struct {
// ProcessorPayloads specifies the payload sent to various processors
ProcessorPayloads map[string]typeurl.Any
// SyncFs is to synchronize the underlying filesystem containing files
SyncFs bool
}

// ApplyOpt is used to configure an Apply operation
Expand Down Expand Up @@ -133,3 +135,11 @@ func WithSourceDateEpoch(tm *time.Time) Opt {
return nil
}
}

// WithSyncFs sets sync flag to the config.
func WithSyncFs(sync bool) ApplyOpt {
return func(_ context.Context, _ ocispec.Descriptor, c *ApplyConfig) error {
c.SyncFs = sync
return nil
}
}
1 change: 1 addition & 0 deletions diff/proxy/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
Diff: fromDescriptor(desc),
Mounts: fromMounts(mounts),
Payloads: payloads,
SyncFs: config.SyncFs,
}
resp, err := r.client.Apply(ctx, req)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions image.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ func WithUnpackDuplicationSuppressor(suppressor kmutex.KeyedLocker) UnpackOpt {
}
}

// WithUnpackApplyOpts appends new apply options on the UnpackConfig.
func WithUnpackApplyOpts(opts ...diff.ApplyOpt) UnpackOpt {
return func(ctx context.Context, uc *UnpackConfig) error {
uc.ApplyOpts = append(uc.ApplyOpts, opts...)
return nil
}
}

func (i *image) Unpack(ctx context.Context, snapshotterName string, opts ...UnpackOpt) error {
ctx, done, err := i.client.WithLease(ctx)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ type PluginConfig struct {
//
// For example, the value can be '5h', '2h30m', '10s'.
DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"`
// ImagePullWithSyncFs is an experimental setting. It's to force sync
// filesystem during unpacking to ensure that data integrity.
ImagePullWithSyncFs bool `toml:"image_pull_with_sync_fs" json:"imagePullWithSyncFs"`
}

// X509KeyPairStreaming contains the x509 configuration for streaming
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/config/config_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,6 @@ func DefaultConfig() PluginConfig {
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(),
DrainExecSyncIOTimeout: "0s",
ImagePullWithSyncFs: false,
}
}
2 changes: 2 additions & 0 deletions pkg/cri/server/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"

"github.com/containerd/containerd"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
Expand Down Expand Up @@ -168,6 +169,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
containerd.WithImageHandler(imageHandler),
containerd.WithUnpackOpts([]containerd.UnpackOpt{
containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
containerd.WithUnpackApplyOpts(diff.WithSyncFs(c.config.ImagePullWithSyncFs)),
}),
}

Expand Down
1 change: 1 addition & 0 deletions services/diff/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (l *local) Apply(ctx context.Context, er *diffapi.ApplyRequest, _ ...grpc.C
}
opts = append(opts, diff.WithPayloads(payloads))
}
opts = append(opts, diff.WithSyncFs(er.SyncFs))

for _, differ := range l.differs {
ocidesc, err = differ.Apply(ctx, desc, mounts, opts...)
Expand Down

0 comments on commit e52570c

Please sign in to comment.