diff --git a/copy/single.go b/copy/single.go index b9ea05970c..9003965c95 100644 --- a/copy/single.go +++ b/copy/single.go @@ -20,6 +20,7 @@ import ( compressiontypes "github.com/containers/image/v5/pkg/compression/types" "github.com/containers/image/v5/transports" "github.com/containers/image/v5/types" + chunkedToc "github.com/containers/storage/pkg/chunked/toc" digest "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" @@ -694,6 +695,13 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to requiredCompression = ic.compressionFormat originalCompression = srcInfo.CompressionAlgorithm } + + // Check if we have a chunked layer in storage that's based on that blob. These layers are stored by their TOC digest. + tocDigest, err := chunkedToc.GetTOCDigest(srcInfo.Annotations) + if err != nil { + return types.BlobInfo{}, "", err + } + reused, reusedBlob, err := ic.c.dest.TryReusingBlobWithOptions(ctx, srcInfo, private.TryReusingBlobOptions{ Cache: ic.c.blobInfoCache, CanSubstitute: canSubstitute, @@ -702,6 +710,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to SrcRef: srcRef, RequiredCompression: requiredCompression, OriginalCompression: originalCompression, + TOCDigest: tocDigest, }) if err != nil { return types.BlobInfo{}, "", fmt.Errorf("trying to reuse blob %s at destination: %w", srcInfo.Digest, err) diff --git a/go.mod b/go.mod index ba3a51a892..5c0701bd68 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/BurntSushi/toml v1.3.2 github.com/containers/libtrust v0.0.0-20230121012942-c1716e8a8d01 github.com/containers/ocicrypt v1.1.9 - github.com/containers/storage v1.51.1-0.20231129011200-e9f9f6605078 + github.com/containers/storage v1.51.1-0.20231205203947-fe005407c7d5 github.com/cyberphone/json-canonicalization v0.0.0-20231011164504-785e29786b46 github.com/distribution/reference v0.5.0 github.com/docker/cli v24.0.7+incompatible diff --git a/go.sum b/go.sum index 9b5122a15d..8450a17360 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ github.com/containers/libtrust v0.0.0-20230121012942-c1716e8a8d01 h1:Qzk5C6cYgle github.com/containers/libtrust v0.0.0-20230121012942-c1716e8a8d01/go.mod h1:9rfv8iPl1ZP7aqh9YA68wnZv2NUDbXdcdPHVz0pFbPY= github.com/containers/ocicrypt v1.1.9 h1:2Csfba4jse85Raxk5HIyEk8OwZNjRvfkhEGijOjIdEM= github.com/containers/ocicrypt v1.1.9/go.mod h1:dTKx1918d8TDkxXvarscpNVY+lyPakPNFN4jwA9GBys= -github.com/containers/storage v1.51.1-0.20231129011200-e9f9f6605078 h1:b9vDfBHAbKKWotUoInxAP/rUf6KmqGRZBND6lSzUcbo= -github.com/containers/storage v1.51.1-0.20231129011200-e9f9f6605078/go.mod h1:FHXkEBvKRmsTeB1JQIFfXnSyXCp+wVrt172O2ZlSzM4= +github.com/containers/storage v1.51.1-0.20231205203947-fe005407c7d5 h1:eiCkAt+i9BYRjR7KEKPI3iORCSABhY+spM/w8BkI2lo= +github.com/containers/storage v1.51.1-0.20231205203947-fe005407c7d5/go.mod h1:pMhG1O3eMGlQKpuEuv7ves+K3BsK8/UJs8ctV5fEaoI= github.com/coreos/go-oidc/v3 v3.7.0 h1:FTdj0uexT4diYIPlF4yoFVI5MRO1r5+SEcIpEw9vC0o= github.com/coreos/go-oidc/v3 v3.7.0/go.mod h1:yQzSCqBnK3e6Fs5l+f5i0F8Kwf0zpH9bPEsbY00KanM= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/internal/private/private.go b/internal/private/private.go index 95d561fcdd..72b574a5bd 100644 --- a/internal/private/private.go +++ b/internal/private/private.go @@ -117,6 +117,7 @@ type TryReusingBlobOptions struct { EmptyLayer bool // True if the blob is an "empty"/"throwaway" layer, and may not necessarily be physically represented. LayerIndex *int // If the blob is a layer, a zero-based index of the layer within the image; nil otherwise. SrcRef reference.Named // A reference to the source image that contains the input blob. + TOCDigest *digest.Digest // If specified, the blob can be looked up in the destination also by its TOC digest. } // ReusedBlob is information about a blob reused in a destination. diff --git a/manifest/oci.go b/manifest/oci.go index a85641c36a..6d5acb45d8 100644 --- a/manifest/oci.go +++ b/manifest/oci.go @@ -9,6 +9,7 @@ import ( compressiontypes "github.com/containers/image/v5/pkg/compression/types" "github.com/containers/image/v5/types" ociencspec "github.com/containers/ocicrypt/spec" + chunkedToc "github.com/containers/storage/pkg/chunked/toc" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -235,7 +236,7 @@ func (m *OCI1) Inspect(configGetter func(types.BlobInfo) ([]byte, error)) (*type } // ImageID computes an ID which can uniquely identify this image by its contents. -func (m *OCI1) ImageID([]digest.Digest) (string, error) { +func (m *OCI1) ImageID(diffIDs []digest.Digest) (string, error) { // The way m.Config.Digest “uniquely identifies” an image is // by containing RootFS.DiffIDs, which identify the layers of the image. // For non-image artifacts, the we can’t expect the config to change @@ -259,9 +260,44 @@ func (m *OCI1) ImageID([]digest.Digest) (string, error) { if err := m.Config.Digest.Validate(); err != nil { return "", err } + + // If there is any layer that is using partial content, we calculate the image ID + // in a different way since the diffID cannot be validated as for regular pulled images. + for _, layer := range m.Layers { + toc, err := chunkedToc.GetTOCDigest(layer.Annotations) + if err != nil { + return "", fmt.Errorf("error looking up annotation for layer %q: %w", layer.Digest, err) + } + if toc != nil { + return m.calculateImageIDForPartialImage(diffIDs) + } + } + return m.Config.Digest.Hex(), nil } +func (m *OCI1) calculateImageIDForPartialImage(diffIDs []digest.Digest) (string, error) { + newID := digest.Canonical.Digester() + for i, layer := range m.Layers { + diffID := diffIDs[i] + _, err := newID.Hash().Write([]byte(diffID.Hex())) + if err != nil { + return "", fmt.Errorf("error writing diffID %q: %w", diffID, err) + } + toc, err := chunkedToc.GetTOCDigest(layer.Annotations) + if err != nil { + return "", fmt.Errorf("error looking up annotation for layer %q: %w", layer.Digest, err) + } + if toc != nil { + _, err = newID.Hash().Write([]byte(toc.Hex())) + if err != nil { + return "", fmt.Errorf("error writing TOC %q: %w", toc, err) + } + } + } + return newID.Digest().Hex(), nil +} + // CanChangeLayerCompression returns true if we can compress/decompress layers with mimeType in the current image // (and the code can handle that). // NOTE: Even if this returns true, the relevant format might not accept all compression algorithms; the set of accepted diff --git a/storage/storage_dest.go b/storage/storage_dest.go index 07e1d5e1f9..bbbff6cf98 100644 --- a/storage/storage_dest.go +++ b/storage/storage_dest.go @@ -77,13 +77,13 @@ type storageImageDestination struct { indexToStorageID map[int]*string // All accesses to below data are protected by `lock` which is made // *explicit* in the code. - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed) - indexToAddedLayerInfo map[int]addedLayerInfo // Mapping from layer (by index) to blob to add to the image - blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer - diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output + uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs. + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed) + indexToAddedLayerInfo map[int]addedLayerInfo // Mapping from layer (by index) to blob to add to the image + blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer + diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output } // addedLayerInfo records data about a layer to use in this image. @@ -117,18 +117,18 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (* HasThreadSafePutBlob: true, }), - imageRef: imageRef, - directory: directory, - signatureses: make(map[digest.Digest][]byte), - blobDiffIDs: make(map[digest.Digest]digest.Digest), - blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, - SignaturesSizes: make(map[digest.Digest][]int), - indexToStorageID: make(map[int]*string), - indexToAddedLayerInfo: make(map[int]addedLayerInfo), - diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput), + imageRef: imageRef, + directory: directory, + signatureses: make(map[digest.Digest][]byte), + uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest), + blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + SignatureSizes: []int{}, + SignaturesSizes: make(map[digest.Digest][]int), + indexToStorageID: make(map[int]*string), + indexToAddedLayerInfo: make(map[int]addedLayerInfo), + diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput), } dest.Compat = impl.AddCompat(dest) return dest, nil @@ -227,7 +227,7 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf // Record information about the blob. s.lock.Lock() - s.blobDiffIDs[blobDigest] = diffID.Digest() + s.uncompressedOrTocDigest[blobDigest] = diffID.Digest() s.fileSizes[blobDigest] = counter.Count s.filenames[blobDigest] = filename s.lock.Unlock() @@ -289,7 +289,7 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces blobDigest := srcInfo.Digest s.lock.Lock() - s.blobDiffIDs[blobDigest] = blobDigest + s.uncompressedOrTocDigest[blobDigest] = blobDigest s.fileSizes[blobDigest] = 0 s.filenames[blobDigest] = "" s.diffOutputs[blobDigest] = out @@ -321,7 +321,7 @@ func (s *storageImageDestination) TryReusingBlobWithOptions(ctx context.Context, }) } -// tryReusingBlobAsPending implements TryReusingBlobWithOptions for (digest, size or -1), filling s.blobDiffIDs and other metadata. +// tryReusingBlobAsPending implements TryReusingBlobWithOptions for (digest, size or -1), filling s.uncompressedOrTocDigest and other metadata. // The caller must arrange the blob to be eventually committed using s.commitLayer(). func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, size int64, options *private.TryReusingBlobOptions) (bool, private.ReusedBlob, error) { // lock the entire method as it executes fairly quickly @@ -335,7 +335,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, digest, err) } else if err == nil { // Record the uncompressed value so that we can use it to calculate layer IDs. - s.blobDiffIDs[digest] = aLayer.UncompressedDigest() + s.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest() s.blobAdditionalLayer[digest] = aLayer return true, private.ReusedBlob{ Digest: digest, @@ -366,7 +366,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } if len(layers) > 0 { // Save this for completeness. - s.blobDiffIDs[digest] = layers[0].UncompressedDigest + s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: digest, Size: layers[0].UncompressedSize, @@ -380,7 +380,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. - s.blobDiffIDs[digest] = layers[0].UncompressedDigest + s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: digest, Size: layers[0].CompressedSize, @@ -398,7 +398,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } if len(layers) > 0 { if size != -1 { - s.blobDiffIDs[digest] = layers[0].UncompressedDigest + s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: digest, Size: size, @@ -407,7 +407,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, if !options.CanSubstitute { return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", digest) } - s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest + s.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -416,6 +416,25 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } } + tocDigest := digest + if options.TOCDigest != nil { + tocDigest = *options.TOCDigest + } + + // Check if we have a chunked layer in storage with the same TOC digest. + layers, err = s.imageRef.transport.store.LayersByTOCDigest(tocDigest) + if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { + return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with TOC digest %q: %w`, tocDigest, err) + } + if len(layers) > 0 { + // Save this for completeness. + s.uncompressedOrTocDigest[digest] = layers[0].TOCDigest + return true, private.ReusedBlob{ + Digest: layers[0].TOCDigest, + Size: layers[0].UncompressedSize, + }, nil + } + // Nope, we don't have it. return false, private.ReusedBlob{}, nil } @@ -438,16 +457,20 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string { continue } blobSum := m.FSLayers[i].BlobSum - diffID, ok := s.blobDiffIDs[blobSum] + diffID, ok := s.uncompressedOrTocDigest[blobSum] if !ok { logrus.Infof("error looking up diffID for layer %q", blobSum.String()) return "" } diffIDs = append([]digest.Digest{diffID}, diffIDs...) } - case *manifest.Schema2, *manifest.OCI1: - // We know the ID calculation for these formats doesn't actually use the diffIDs, - // so we don't need to populate the diffID list. + case *manifest.Schema2: + // We know the ID calculation doesn't actually use the diffIDs, so we don't need to populate + // the diffID list. + case *manifest.OCI1: + for _, l := range m.Layers { + diffIDs = append(diffIDs, l.Digest) + } default: return "" } @@ -518,7 +541,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) } s.lock.Unlock() // Note: commitLayer locks on-demand. - if err := s.commitLayer(index, info, -1); err != nil { + if stopQueue, err := s.commitLayer(index, info, -1); stopQueue || err != nil { return err } s.lock.Lock() @@ -532,18 +555,32 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) return nil } +// getDiffIDOrTOCDigest returns the diffID for the specified digest or the digest for the TOC, if known. +func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest.Digest) (digest.Digest, bool) { + s.lock.Lock() + defer s.lock.Unlock() + + if d, found := s.diffOutputs[uncompressedDigest]; found { + return d.TOCDigest, found + } + d, found := s.uncompressedOrTocDigest[uncompressedDigest] + return d, found +} + // commitLayer commits the specified layer with the given index to the storage. -// size can usually be -1; it can be provided if the layer is not known to be already present in blobDiffIDs. +// size can usually be -1; it can be provided if the layer is not known to be already present in uncompressedOrTocDigest. +// +// If the layer cannot be committed yet, the function returns (true, nil). // // Note that the previous layer is expected to already be committed. // // Caution: this function must be called without holding `s.lock`. Callers // must guarantee that, at any given time, at most one goroutine may execute // `commitLayer()`. -func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) error { +func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) (bool, error) { // Already committed? Return early. if _, alreadyCommitted := s.indexToStorageID[index]; alreadyCommitted { - return nil + return false, nil } // Start with an empty string or the previous layer ID. Note that @@ -557,68 +594,96 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // Carry over the previous ID for empty non-base layers. if info.emptyLayer { s.indexToStorageID[index] = &lastLayer - return nil + return false, nil } // Check if there's already a layer with the ID that we'd give to the result of applying // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - s.lock.Lock() - diffID, haveDiffID := s.blobDiffIDs[info.digest] - s.lock.Unlock() - if !haveDiffID { + // The diffIDOrTOCDigest refers either to the DiffID or the digest of the TOC. + diffIDOrTOCDigest, haveDiffIDOrTOCDigest := s.getDiffIDOrTOCDigest(info.digest) + if !haveDiffIDOrTOCDigest { // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), // or to even check if we had it. // Use none.NoCache to avoid a repeated DiffID lookup in the BlobInfoCache; a caller // that relies on using a blob digest that has never been seen by the store had better call // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only // so far we are going to accommodate that (if we should be doing that at all). - logrus.Debugf("looking for diffID for blob %+v", info.digest) + logrus.Debugf("looking for diffID or TOC digest for blob %+v", info.digest) // Use tryReusingBlobAsPending, not the top-level TryReusingBlobWithOptions, to prevent recursion via queueOrCommit. has, _, err := s.tryReusingBlobAsPending(info.digest, size, &private.TryReusingBlobOptions{ Cache: none.NoCache, CanSubstitute: false, }) if err != nil { - return fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err) + return false, fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err) } if !has { - return fmt.Errorf("error determining uncompressed digest for blob %q", info.digest.String()) + return false, fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String()) } - diffID, haveDiffID = s.blobDiffIDs[info.digest] - if !haveDiffID { - return fmt.Errorf("we have blob %q, but don't know its uncompressed digest", info.digest.String()) + diffIDOrTOCDigest, haveDiffIDOrTOCDigest = s.getDiffIDOrTOCDigest(info.digest) + if !haveDiffIDOrTOCDigest { + return false, fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String()) } } - id := diffID.Hex() + id := diffIDOrTOCDigest.Hex() if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex() + id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffIDOrTOCDigest.Hex())).Hex() } if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { // There's already a layer that should have the right contents, just reuse it. lastLayer = layer.ID s.indexToStorageID[index] = &lastLayer - return nil + return false, nil } s.lock.Lock() diffOutput, ok := s.diffOutputs[info.digest] s.lock.Unlock() if ok { + if s.manifest == nil { + logrus.Debugf("Skipping commit for TOC=%q, manifest not yet available", id) + return true, nil + } + + man, err := manifest.FromBlob(s.manifest, manifest.GuessMIMEType(s.manifest)) + if err != nil { + return false, fmt.Errorf("parsing manifest: %w", err) + } + + cb, err := s.getConfigBlob(man.ConfigInfo()) + if err != nil { + return false, err + } + + // retrieve the expected uncompressed digest from the config blob. + configOCI := &imgspecv1.Image{} + if err := json.Unmarshal(cb, configOCI); err != nil { + return false, err + } + if index >= len(configOCI.RootFS.DiffIDs) { + return false, fmt.Errorf("index %d out of range for configOCI.RootFS.DiffIDs", index) + } + layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil) if err != nil { - return err + return false, err } - // FIXME: what to do with the uncompressed digest? - diffOutput.UncompressedDigest = info.digest + // let the storage layer know what was the original uncompressed layer. + flags := make(map[string]interface{}) + flags[expectedLayerDiffIDFlag] = configOCI.RootFS.DiffIDs[index] + logrus.Debugf("Setting uncompressed digest to %q for layer %q", configOCI.RootFS.DiffIDs[index], id) + options := &graphdriver.ApplyDiffWithDifferOpts{ + Flags: flags, + } - if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil { + if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, options); err != nil { _ = s.imageRef.transport.store.Delete(layer.ID) - return err + return false, err } s.indexToStorageID[index] = &layer.ID - return nil + return false, nil } s.lock.Lock() @@ -627,11 +692,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si if ok { layer, err := al.PutAs(id, lastLayer, nil) if err != nil && !errors.Is(err, storage.ErrDuplicateID) { - return fmt.Errorf("failed to put layer from digest and labels: %w", err) + return false, fmt.Errorf("failed to put layer from digest and labels: %w", err) } lastLayer = layer.ID s.indexToStorageID[index] = &lastLayer - return nil + return false, nil } // Check if we previously cached a file with that blob's contents. If we didn't, @@ -642,7 +707,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si if !ok { // Try to find the layer with contents matching that blobsum. layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(diffID) + layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(diffIDOrTOCDigest) if err2 == nil && len(layers) > 0 { layer = layers[0].ID } else { @@ -652,7 +717,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si } } if layer == "" { - return fmt.Errorf("locating layer for blob %q: %w", info.digest, err2) + return false, fmt.Errorf("locating layer for blob %q: %w", info.digest, err2) } // Read the layer's contents. noCompression := archive.Uncompressed @@ -661,17 +726,17 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si } diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) if err2 != nil { - return fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2) + return false, fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2) } // Copy the layer diff to a file. Diff() takes a lock that it holds // until the ReadCloser that it returns is closed, and PutLayer() wants // the same lock, so the diff can't just be directly streamed from one // to the other. filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0o600) if err != nil { diff.Close() - return fmt.Errorf("creating temporary file %q: %w", filename, err) + return false, fmt.Errorf("creating temporary file %q: %w", filename, err) } // Copy the data to the file. // TODO: This can take quite some time, and should ideally be cancellable using @@ -680,7 +745,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si diff.Close() file.Close() if err != nil { - return fmt.Errorf("storing blob to file %q: %w", filename, err) + return false, fmt.Errorf("storing blob to file %q: %w", filename, err) } // Make sure that we can find this file later, should we need the layer's // contents again. @@ -691,21 +756,21 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // Read the cached blob and use it as a diff. file, err := os.Open(filename) if err != nil { - return fmt.Errorf("opening file %q: %w", filename, err) + return false, fmt.Errorf("opening file %q: %w", filename, err) } defer file.Close() // Build the new layer using the diff, regardless of where it came from. // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, &storage.LayerOptions{ OriginalDigest: info.digest, - UncompressedDigest: diffID, + UncompressedDigest: diffIDOrTOCDigest, }, file) if err != nil && !errors.Is(err, storage.ErrDuplicateID) { - return fmt.Errorf("adding layer with blob %q: %w", info.digest, err) + return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err) } s.indexToStorageID[index] = &layer.ID - return nil + return false, nil } // Commit marks the process of storing the image as successful and asks for the image to be persisted. @@ -752,11 +817,13 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t // Extract, commit, or find the layers. for i, blob := range layerBlobs { - if err := s.commitLayer(i, addedLayerInfo{ + if stopQueue, err := s.commitLayer(i, addedLayerInfo{ digest: blob.Digest, emptyLayer: blob.EmptyLayer, }, blob.Size); err != nil { return err + } else if stopQueue { + return fmt.Errorf("Internal error: storageImageDestination.Commit(): commitLayer() not ready to commit for layer %q", blob.Digest) } } var lastLayer string diff --git a/storage/storage_src.go b/storage/storage_src.go index f1ce0861e0..28df60da7b 100644 --- a/storage/storage_src.go +++ b/storage/storage_src.go @@ -29,21 +29,33 @@ import ( "github.com/sirupsen/logrus" ) +// getBlobMutexProtected is a struct to hold the state of the getBlobMutex mutex. +type getBlobMutexProtected struct { + // digestToLayerID is a lookup map from the layer digest (either the uncompressed digest or the TOC digest) to the + // layer ID in the store. + digestToLayerID map[digest.Digest]string + + // layerPosition stores where we are in reading a blob's layers + layerPosition map[digest.Digest]int +} + type storageImageSource struct { impl.Compat impl.PropertyMethodsInitialize stubs.NoGetBlobAtInitialize - imageRef storageReference - image *storage.Image - systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files - layerPosition map[digest.Digest]int // Where we are in reading a blob's layers - cachedManifest []byte // A cached copy of the manifest, if already known, or nil - getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions - SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice - SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice + imageRef storageReference + image *storage.Image + systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files + cachedManifest []byte // A cached copy of the manifest, if already known, or nil + getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID) + getBlobMutexProtected getBlobMutexProtected + SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice } +const expectedLayerDiffIDFlag = "expected-layer-diffid" + // newImageSource sets up an image for reading. func newImageSource(sys *types.SystemContext, imageRef storageReference) (*storageImageSource, error) { // First, locate the image. @@ -62,9 +74,12 @@ func newImageSource(sys *types.SystemContext, imageRef storageReference) (*stora imageRef: imageRef, systemContext: sys, image: img, - layerPosition: make(map[digest.Digest]int), SignatureSizes: []int{}, SignaturesSizes: make(map[digest.Digest][]int), + getBlobMutexProtected: getBlobMutexProtected{ + digestToLayerID: make(map[digest.Digest]string), + layerPosition: make(map[digest.Digest]int), + }, } image.Compat = impl.AddCompat(image) if img.Metadata != "" { @@ -91,6 +106,7 @@ func (s *storageImageSource) Close() error { func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (rc io.ReadCloser, n int64, err error) { // We need a valid digest value. digest := info.Digest + err = digest.Validate() if err != nil { return nil, 0, err @@ -100,10 +116,24 @@ func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, c return io.NopCloser(bytes.NewReader(image.GzippedEmptyLayer)), int64(len(image.GzippedEmptyLayer)), nil } - // Check if the blob corresponds to a diff that was used to initialize any layers. Our - // callers should try to retrieve layers using their uncompressed digests, so no need to - // check if they're using one of the compressed digests, which we can't reproduce anyway. - layers, _ := s.imageRef.transport.store.LayersByUncompressedDigest(digest) + var layers []storage.Layer + + // If the digest was overriden by LayerInfosForCopy, then we need to use the TOC digest + // to retrieve it from the storage. + s.getBlobMutex.Lock() + layerID, found := s.getBlobMutexProtected.digestToLayerID[digest] + s.getBlobMutex.Unlock() + + if found { + if layer, err := s.imageRef.transport.store.Layer(layerID); err == nil { + layers = []storage.Layer{*layer} + } + } else { + // Check if the blob corresponds to a diff that was used to initialize any layers. Our + // callers should try to retrieve layers using their uncompressed digests, so no need to + // check if they're using one of the compressed digests, which we can't reproduce anyway. + layers, _ = s.imageRef.transport.store.LayersByUncompressedDigest(digest) + } // If it's not a layer, then it must be a data item. if len(layers) == 0 { @@ -174,8 +204,8 @@ func (s *storageImageSource) getBlobAndLayerID(digest digest.Digest, layers []st // which claim to have the same contents, that we actually do have multiple layers, otherwise we could // just go ahead and use the first one every time. s.getBlobMutex.Lock() - i := s.layerPosition[digest] - s.layerPosition[digest] = i + 1 + i := s.getBlobMutexProtected.layerPosition[digest] + s.getBlobMutexProtected.layerPosition[digest] = i + 1 s.getBlobMutex.Unlock() if len(layers) > 0 { layer = layers[i%len(layers)] @@ -267,14 +297,35 @@ func (s *storageImageSource) LayerInfosForCopy(ctx context.Context, instanceDige if err != nil { return nil, fmt.Errorf("reading layer %q in image %q: %w", layerID, s.image.ID, err) } - if layer.UncompressedDigest == "" { - return nil, fmt.Errorf("uncompressed digest for layer %q is unknown", layerID) + if layer.UncompressedDigest == "" && layer.TOCDigest == "" { + return nil, fmt.Errorf("uncompressed digest and TOC digest for layer %q is unknown", layerID) } if layer.UncompressedSize < 0 { return nil, fmt.Errorf("uncompressed size for layer %q is unknown", layerID) } + + blobDigest := layer.UncompressedDigest + + if layer.TOCDigest != "" { + if layer.Flags == nil || layer.Flags[expectedLayerDiffIDFlag] == nil { + return nil, fmt.Errorf("TOC digest %q for layer %q is present but %q flag is not set", layer.TOCDigest, layerID, expectedLayerDiffIDFlag) + } + if expectedDigest, ok := layer.Flags[expectedLayerDiffIDFlag].(string); ok { + // if the layer is stored by its TOC, report the expected diffID as the layer Digest + // but store the TOC digest so we can later retrieve it from the storage. + blobDigest, err = digest.Parse(expectedDigest) + if err != nil { + return nil, fmt.Errorf("parsing expected diffID %q for layer %q: %w", expectedDigest, layerID, err) + } + } else { + return nil, fmt.Errorf("TOC digest %q for layer %q is present but %q flag is not a string", layer.TOCDigest, layerID, expectedLayerDiffIDFlag) + } + } + s.getBlobMutex.Lock() + s.getBlobMutexProtected.digestToLayerID[blobDigest] = layer.ID + s.getBlobMutex.Unlock() blobInfo := types.BlobInfo{ - Digest: layer.UncompressedDigest, + Digest: blobDigest, Size: layer.UncompressedSize, MediaType: uncompressedLayerType, } @@ -384,7 +435,7 @@ func (s *storageImageSource) getSize() (int64, error) { if err != nil { return -1, err } - if layer.UncompressedDigest == "" || layer.UncompressedSize < 0 { + if (layer.TOCDigest == "" && layer.UncompressedDigest == "") || layer.UncompressedSize < 0 { return -1, fmt.Errorf("size for layer %q is unknown, failing getSize()", layerID) } sum += layer.UncompressedSize