From 22f6b3e1d9caed60391928dfe5ff77cbc4263afc Mon Sep 17 00:00:00 2001 From: Bertrand Paquet Date: Sun, 25 Aug 2024 23:05:58 +0200 Subject: [PATCH] Parallel layer upload for s3 cache Signed-off-by: Bertrand Paquet --- README.md | 1 + cache/remotecache/s3/s3.go | 230 +++++++++++++++++++++---------------- 2 files changed, 129 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index 76d0fbc3d995..5a1d8e9ab952 100644 --- a/README.md +++ b/README.md @@ -578,6 +578,7 @@ Other options are: * Multiple manifest names can be specified at the same time, separated by `;`. The standard use case is to use the git sha1 as name, and the branch name as duplicate, and load both with 2 `import-cache` commands. * `ignore-error=`: specify if error is ignored in case cache export fails (default: `false`) * `touch_refresh=24h`: Instead of being uploaded again when not changed, blobs files will be "touched" on s3 every `touch_refresh`, default is 24h. Due to this, an expiration policy can be set on the S3 bucket to cleanup useless files automatically. Manifests files are systematically rewritten, there is no need to touch them. +* `upload_parallelism=4`: This parameter changes the number of layers uploaded to s3 in parallel. Each individual layer is uploaded with 5 threads, using the Upload manager provided by the AWS SDK. `--import-cache` options: * `type=s3` diff --git a/cache/remotecache/s3/s3.go b/cache/remotecache/s3/s3.go index 065c581204ae..ffa2ae6c3eca 100644 --- a/cache/remotecache/s3/s3.go +++ b/cache/remotecache/s3/s3.go @@ -30,37 +30,40 @@ import ( digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) const ( - attrBucket = "bucket" - attrRegion = "region" - attrPrefix = "prefix" - attrManifestsPrefix = "manifests_prefix" - attrBlobsPrefix = "blobs_prefix" - attrName = "name" - attrTouchRefresh = "touch_refresh" - attrEndpointURL = "endpoint_url" - attrAccessKeyID = "access_key_id" - attrSecretAccessKey = "secret_access_key" - attrSessionToken = "session_token" - attrUsePathStyle = "use_path_style" - maxCopyObjectSize = 5 * 1024 * 1024 * 1024 + attrBucket = "bucket" + attrRegion = "region" + attrPrefix = "prefix" + attrManifestsPrefix = "manifests_prefix" + attrBlobsPrefix = "blobs_prefix" + attrName = "name" + attrTouchRefresh = "touch_refresh" + attrEndpointURL = "endpoint_url" + attrAccessKeyID = "access_key_id" + attrSecretAccessKey = "secret_access_key" + attrSessionToken = "session_token" + attrUsePathStyle = "use_path_style" + attrUploadParallelism = "upload_parallelism" + maxCopyObjectSize = 5 * 1024 * 1024 * 1024 ) type Config struct { - Bucket string - Region string - Prefix string - ManifestsPrefix string - BlobsPrefix string - Names []string - TouchRefresh time.Duration - EndpointURL string - AccessKeyID string - SecretAccessKey string - SessionToken string - UsePathStyle bool + Bucket string + Region string + Prefix string + ManifestsPrefix string + BlobsPrefix string + Names []string + TouchRefresh time.Duration + EndpointURL string + AccessKeyID string + SecretAccessKey string + SessionToken string + UsePathStyle bool + UploadParallelism int } func getConfig(attrs map[string]string) (Config, error) { @@ -125,19 +128,33 @@ func getConfig(attrs map[string]string) (Config, error) { } } + uploadParallelism := 4 + uploadParallelismStr, ok := attrs[attrUploadParallelism] + if ok { + uploadParallelismInt, err := strconv.Atoi(uploadParallelismStr) + if err != nil { + return Config{}, errors.Errorf("upload_parallelism must be a positive integer") + } + if uploadParallelismInt <= 0 { + return Config{}, errors.Errorf("upload_parallelism must be a positive integer") + } + uploadParallelism = uploadParallelismInt + } + return Config{ - Bucket: bucket, - Region: region, - Prefix: prefix, - ManifestsPrefix: manifestsPrefix, - BlobsPrefix: blobsPrefix, - Names: names, - TouchRefresh: touchRefresh, - EndpointURL: endpointURL, - AccessKeyID: accessKeyID, - SecretAccessKey: secretAccessKey, - SessionToken: sessionToken, - UsePathStyle: usePathStyle, + Bucket: bucket, + Region: region, + Prefix: prefix, + ManifestsPrefix: manifestsPrefix, + BlobsPrefix: blobsPrefix, + Names: names, + TouchRefresh: touchRefresh, + EndpointURL: endpointURL, + AccessKeyID: accessKeyID, + SecretAccessKey: secretAccessKey, + SessionToken: sessionToken, + UsePathStyle: usePathStyle, + UploadParallelism: uploadParallelism, }, nil } @@ -187,64 +204,84 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) { return nil, err } - for i, l := range cacheConfig.Layers { - dgstPair, ok := descs[l.Blob] - if !ok { - return nil, errors.Errorf("missing blob %s", l.Blob) - } - if dgstPair.Descriptor.Annotations == nil { - return nil, errors.Errorf("invalid descriptor without annotations") - } - v, ok := dgstPair.Descriptor.Annotations[labels.LabelUncompressed] - if !ok { - return nil, errors.Errorf("invalid descriptor without uncompressed annotation") - } - diffID, err := digest.Parse(v) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse uncompressed annotation") - } + eg, groupCtx := errgroup.WithContext(ctx) + tasks := make(chan int, e.config.UploadParallelism) - key := e.s3Client.blobKey(dgstPair.Descriptor.Digest) - exists, size, err := e.s3Client.exists(ctx, key) - if err != nil { - return nil, errors.Wrapf(err, "failed to check file presence in cache") + go func() { + for i := range cacheConfig.Layers { + tasks <- i } - if exists != nil { - if time.Since(*exists) > e.config.TouchRefresh { - err = e.s3Client.touch(ctx, key, size) + close(tasks) + }() + + for k := 0; k < e.config.UploadParallelism; k++ { + eg.Go(func() error { + for index := range tasks { + blob := cacheConfig.Layers[index].Blob + dgstPair, ok := descs[blob] + if !ok { + return errors.Errorf("missing blob %s", blob) + } + if dgstPair.Descriptor.Annotations == nil { + return errors.Errorf("invalid descriptor without annotations") + } + v, ok := dgstPair.Descriptor.Annotations[labels.LabelUncompressed] + if !ok { + return errors.Errorf("invalid descriptor without uncompressed annotation") + } + diffID, err := digest.Parse(v) if err != nil { - return nil, errors.Wrapf(err, "failed to touch file") + return errors.Wrapf(err, "failed to parse uncompressed annotation") } - } - } else { - layerDone := progress.OneOff(ctx, fmt.Sprintf("writing layer %s", l.Blob)) - // TODO: once buildkit uses v2, start using - // https://github.com/containerd/containerd/pull/9657 - // currently inline data should never happen. - ra, err := dgstPair.Provider.ReaderAt(ctx, dgstPair.Descriptor) - if err != nil { - return nil, layerDone(errors.Wrap(err, "error reading layer blob from provider")) - } - defer ra.Close() - if err := e.s3Client.saveMutableAt(ctx, key, &nopCloserSectionReader{io.NewSectionReader(ra, 0, ra.Size())}); err != nil { - return nil, layerDone(errors.Wrap(err, "error writing layer blob")) - } - layerDone(nil) - } - la := &v1.LayerAnnotations{ - DiffID: diffID, - Size: dgstPair.Descriptor.Size, - MediaType: dgstPair.Descriptor.MediaType, - } - if v, ok := dgstPair.Descriptor.Annotations["buildkit/createdat"]; ok { - var t time.Time - if err := (&t).UnmarshalText([]byte(v)); err != nil { - return nil, err + key := e.s3Client.blobKey(dgstPair.Descriptor.Digest) + exists, size, err := e.s3Client.exists(groupCtx, key) + if err != nil { + return errors.Wrapf(err, "failed to check file presence in cache") + } + if exists != nil { + if time.Since(*exists) > e.config.TouchRefresh { + err = e.s3Client.touch(groupCtx, key, size) + if err != nil { + return errors.Wrapf(err, "failed to touch file") + } + } + } else { + layerDone := progress.OneOff(groupCtx, fmt.Sprintf("writing layer %s", blob)) + // TODO: once buildkit uses v2, start using + // https://github.com/containerd/containerd/pull/9657 + // currently inline data should never happen. + ra, err := dgstPair.Provider.ReaderAt(groupCtx, dgstPair.Descriptor) + if err != nil { + return layerDone(errors.Wrap(err, "error reading layer blob from provider")) + } + defer ra.Close() + if err := e.s3Client.saveMutableAt(groupCtx, key, &nopCloserSectionReader{io.NewSectionReader(ra, 0, ra.Size())}); err != nil { + return layerDone(errors.Wrap(err, "error writing layer blob")) + } + layerDone(nil) + } + + la := &v1.LayerAnnotations{ + DiffID: diffID, + Size: dgstPair.Descriptor.Size, + MediaType: dgstPair.Descriptor.MediaType, + } + if v, ok := dgstPair.Descriptor.Annotations["buildkit/createdat"]; ok { + var t time.Time + if err := (&t).UnmarshalText([]byte(v)); err != nil { + return err + } + la.CreatedAt = t.UTC() + } + cacheConfig.Layers[index].Annotations = la } - la.CreatedAt = t.UTC() - } - cacheConfig.Layers[i].Annotations = la + return nil + }) + } + + if err := eg.Wait(); err != nil { + return nil, err } dt, err := json.Marshal(cacheConfig) @@ -253,7 +290,7 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) { } for _, name := range e.config.Names { - if err := e.s3Client.saveMutable(ctx, e.s3Client.manifestKey(name), dt); err != nil { + if err := e.s3Client.saveMutableAt(ctx, e.s3Client.manifestKey(name), bytes.NewReader(dt)); err != nil { return nil, errors.Wrapf(err, "error writing manifest: %s", name) } } @@ -430,18 +467,7 @@ func (s3Client *s3Client) getReader(ctx context.Context, key string) (io.ReadClo return output.Body, nil } -func (s3Client *s3Client) saveMutable(ctx context.Context, key string, value []byte) error { - input := &s3.PutObjectInput{ - Bucket: &s3Client.bucket, - Key: &key, - - Body: bytes.NewReader(value), - } - _, err := s3Client.Upload(ctx, input) - return err -} - -func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io.ReadSeekCloser) error { +func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io.Reader) error { input := &s3.PutObjectInput{ Bucket: &s3Client.bucket, Key: &key,