From 78bdd135fea167882357e599452f4ded427d73c4 Mon Sep 17 00:00:00 2001 From: Aditya R Date: Thu, 1 Sep 2022 11:11:47 +0530 Subject: [PATCH] docker, BlobInfoCache: try to reuse compressed blobs when pushing across registries It seems we try to reuse blobs only for the specified registry, however we can have valid known compressed digests across registry as well following pr attempts to use that by doing following steps. * `CandidateLocations2` now processes all known blobs and appends them to returned candidates at the lowest priority. As a result when `TryReusingBlob` tries to process these candidates and if the blobs filtered by the `Opaque` set by the `transport` fail to match then attempt is made against all known blobs (ones which do not belong to the current registry). * Increase the sample set of potential blob reuse to all known compressed digests , also involving the one which do not belong to current registry. * If a blob is found match it against the registry where we are attempting to push. If blob is already there consider it a `CACHE HIT!` and reply skipping blob, since its already there. How to verify this ? * Remove all images `buildah rmi --all` // needed so all new blobs can be tagged again in common bucket * Remove any previous `blob-info-cache` by ```console rm /home//.local/share/containers/cache/blob-info-cache-v1.boltdb ``` ```console $ skopeo copy docker://registry.fedoraproject.org/fedora-minimal docker://quay.io/fl/test:some-tag $ buildah pull registry.fedoraproject.org/fedora-minimal $ buildah tag registry.fedoraproject.org/fedora-minimal quay.io/fl/test $ buildah push quay.io/fl/test ``` ```console Getting image source signatures Copying blob a3497ca15bbf skipped: already exists Copying config f7e02de757 done Writing manifest to image destination Storing signatures ``` Signed-off-by: Aditya R --- docker/docker_image_dest.go | 40 ++++++---- internal/blobinfocache/types.go | 7 +- pkg/blobinfocache/boltdb/boltdb.go | 78 +++++++++++-------- .../internal/prioritize/prioritize.go | 10 +++ pkg/blobinfocache/memory/memory.go | 72 +++++++++++------ 5 files changed, 134 insertions(+), 73 deletions(-) diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index 44b45c472c..e6b30857b6 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -332,21 +332,33 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, // Then try reusing blobs from other locations. candidates := options.Cache.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, options.CanSubstitute) for _, candidate := range candidates { - candidateRepo, err := parseBICLocationReference(candidate.Location) - if err != nil { - logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) - continue - } - if candidate.CompressorName != blobinfocache.Uncompressed { - logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name()) + var candidateRepo reference.Named + if !candidate.UnknownLocation { + candidateRepo, err = parseBICLocationReference(candidate.Location) + if err != nil { + logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) + continue + } + if candidate.CompressorName != blobinfocache.Uncompressed { + logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name()) + } else { + logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name()) + } + // Sanity checks: + if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { + logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) + continue + } } else { - logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name()) - } - - // Sanity checks: - if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { - logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) - continue + if candidate.CompressorName != blobinfocache.Uncompressed { + logrus.Debugf("Trying to reuse cached location %s compressed with %s", candidate.Digest.String(), candidate.CompressorName) + } else { + logrus.Debugf("Trying to reuse cached location %s with no compression", candidate.Digest.String()) + } + // This digest is a known variant of this blob but we don’t + // have a recorded location in this registry, let’s try looking + // for it in the current repo. + candidateRepo = reference.TrimNamed(d.ref.ref) } if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest { logrus.Debug("... Already tried the primary destination") diff --git a/internal/blobinfocache/types.go b/internal/blobinfocache/types.go index 3c2be57f32..d28bbfdd19 100644 --- a/internal/blobinfocache/types.go +++ b/internal/blobinfocache/types.go @@ -39,7 +39,8 @@ type BlobInfoCache2 interface { // BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2. type BICReplacementCandidate2 struct { - Digest digest.Digest - CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression - Location types.BICLocationReference + Digest digest.Digest + CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression + UnknownLocation bool // is true when `Location` for this blob is not set + Location types.BICLocationReference // not set if UnknownLocation is set to `true` } diff --git a/pkg/blobinfocache/boltdb/boltdb.go b/pkg/blobinfocache/boltdb/boltdb.go index a472efd95b..a7a66f29f0 100644 --- a/pkg/blobinfocache/boltdb/boltdb.go +++ b/pkg/blobinfocache/boltdb/boltdb.go @@ -282,12 +282,15 @@ func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope type }) // FIXME? Log error (but throttle the log volume on repeated accesses)? } -// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket with corresponding compression info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them to candidates. -func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { +// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket +// (or with an unknown location if scopeBucket is nil and v2Output is set) with corresponding compression +// info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them +// to candidates. +func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { digestKey := []byte(digest.String()) - b := scopeBucket.Bucket(digestKey) - if b == nil { - return candidates + var b *bolt.Bucket + if scopeBucket != nil { + b = scopeBucket.Bucket(digestKey) } compressorName := blobinfocache.UnknownCompression if compressionBucket != nil { @@ -297,24 +300,38 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW compressorName = string(compressorNameValue) } } - if compressorName == blobinfocache.UnknownCompression && requireCompressionInfo { + if compressorName == blobinfocache.UnknownCompression && v2Output { return candidates } - _ = b.ForEach(func(k, v []byte) error { - t := time.Time{} - if err := t.UnmarshalBinary(v); err != nil { - return err + if b != nil { + _ = b.ForEach(func(k, v []byte) error { + t := time.Time{} + if err := t.UnmarshalBinary(v); err != nil { + return err + } + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: types.BICLocationReference{Opaque: string(k)}, + }, + LastSeen: t, + }) + return nil + }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } else { + if v2Output { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, + }, + LastSeen: time.Time{}, + }) } - candidates = append(candidates, prioritize.CandidateWithTime{ - Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: types.BICLocationReference{Opaque: string(k)}, - }, - LastSeen: t, - }) - return nil - }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } return candidates } @@ -328,27 +345,22 @@ func (bdc *cache) CandidateLocations2(transport types.ImageTransport, scope type return bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) } -func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { +func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 { res := []prioritize.CandidateWithTime{} var uncompressedDigestValue digest.Digest // = "" if err := bdc.view(func(tx *bolt.Tx) error { scopeBucket := tx.Bucket(knownLocationsBucket) - if scopeBucket == nil { - return nil + if scopeBucket != nil { + scopeBucket = scopeBucket.Bucket([]byte(transport.Name())) } - scopeBucket = scopeBucket.Bucket([]byte(transport.Name())) - if scopeBucket == nil { - return nil - } - scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque)) - if scopeBucket == nil { - return nil + if scopeBucket != nil { + scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque)) } // compressionBucket won't have been created if previous writers never recorded info about compression, // and we don't want to fail just because of that compressionBucket := tx.Bucket(digestCompressorBucket) - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, v2Output) if canSubstitute { if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" { b := tx.Bucket(digestByUncompressedBucket) @@ -361,7 +373,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types return err } if d != primaryDigest && d != uncompressedDigestValue { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, v2Output) } return nil }); err != nil { @@ -370,7 +382,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types } } if uncompressedDigestValue != primaryDigest { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, v2Output) } } } diff --git a/pkg/blobinfocache/internal/prioritize/prioritize.go b/pkg/blobinfocache/internal/prioritize/prioritize.go index 6f5506d94c..cab60701b4 100644 --- a/pkg/blobinfocache/internal/prioritize/prioritize.go +++ b/pkg/blobinfocache/internal/prioritize/prioritize.go @@ -37,6 +37,16 @@ func (css *candidateSortState) Less(i, j int) bool { xi := css.cs[i] xj := css.cs[j] + // If i has location unknown and j has known location + // always treat i as lesser than j or vice-versa so the + // candidate with location unknown alaways gets lower priority. + if xi.Candidate.UnknownLocation && !xj.Candidate.UnknownLocation { + return true + } + if !xi.Candidate.UnknownLocation && xj.Candidate.UnknownLocation { + return false + } + // primaryDigest entries come first, more recent first. // uncompressedDigest entries, if uncompressedDigest is set and != primaryDigest, come last, more recent entry first. // Other digest values are primarily sorted by time (more recent first), secondarily by digest (to provide a deterministic order) diff --git a/pkg/blobinfocache/memory/memory.go b/pkg/blobinfocache/memory/memory.go index 426640366f..65d4d4fb98 100644 --- a/pkg/blobinfocache/memory/memory.go +++ b/pkg/blobinfocache/memory/memory.go @@ -120,25 +120,43 @@ func (mem *cache) RecordDigestCompressorName(blobDigest digest.Digest, compresso mem.compressors[blobDigest] = compressorName } -// appendReplacementCandidates creates prioritize.CandidateWithTime values for (transport, scope, digest), and returns the result of appending them to candidates. -func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { - locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: scope, blobDigest: digest}] // nil if not present - for l, t := range locations { - compressorName, compressorKnown := mem.compressors[digest] - if !compressorKnown { - if requireCompressionInfo { - continue - } - compressorName = blobinfocache.UnknownCompression +// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket +// (or with an unknown location if scopeBucket is nil and v2Output is set) with corresponding compression +// info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them +// to candidates. +func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope *types.BICTransportScope, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { + compressorName := blobinfocache.UnknownCompression + compressorNameValue, compressorKnown := mem.compressors[digest] + if compressorKnown { + compressorName = compressorNameValue + } + if compressorName == blobinfocache.UnknownCompression && v2Output { + return candidates + } + if scope != nil { + locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: *scope, blobDigest: digest}] // nil if not present + for l, t := range locations { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: l, + }, + LastSeen: t, + }) + } + } else { + if v2Output { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, + }, + LastSeen: time.Time{}, + }) } - candidates = append(candidates, prioritize.CandidateWithTime{ - Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: l, - }, - LastSeen: t, - }) } return candidates } @@ -163,24 +181,32 @@ func (mem *cache) CandidateLocations2(transport types.ImageTransport, scope type return mem.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) } -func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { +func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 { mem.mutex.Lock() defer mem.mutex.Unlock() res := []prioritize.CandidateWithTime{} - res = mem.appendReplacementCandidates(res, transport, scope, primaryDigest, requireCompressionInfo) + resAllBlobs := []prioritize.CandidateWithTime{} var uncompressedDigest digest.Digest // = "" - if canSubstitute { + getCandidatesForUncompressedDigest := func(res []prioritize.CandidateWithTime, scope *types.BICTransportScope, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { if uncompressedDigest = mem.uncompressedDigestLocked(primaryDigest); uncompressedDigest != "" { otherDigests := mem.digestsByUncompressed[uncompressedDigest] // nil if not present in the map for d := range otherDigests { if d != primaryDigest && d != uncompressedDigest { - res = mem.appendReplacementCandidates(res, transport, scope, d, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, scope, d, v2Output) } } if uncompressedDigest != primaryDigest { - res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, v2Output) } } + return res + } + res = mem.appendReplacementCandidates(res, transport, &scope, primaryDigest, v2Output) + if canSubstitute { + res = getCandidatesForUncompressedDigest(res, &scope, primaryDigest, v2Output) } + // Add alls potential blobs with location unknown if v2Output is configured true. + resAllBlobs = getCandidatesForUncompressedDigest(resAllBlobs, nil, primaryDigest, v2Output) + res = append(res, resAllBlobs...) return prioritize.DestructivelyPrioritizeReplacementCandidates(res, primaryDigest, uncompressedDigest) }