From 4d3fe76e1ebb55e8491960b625a848e2f20c4c60 Mon Sep 17 00:00:00 2001 From: Aditya R Date: Thu, 1 Sep 2022 11:11:47 +0530 Subject: [PATCH] docker, BlobInfoCache: try to reuse compressed blobs when pushing across registries It seems we try to reuse blobs only for the specified registry, however we can have valid known compressed digests across registry as well following pr attempts to use that by doing following steps. * `CandidateLocations2` now processes all known blobs and appends them to returned candidates at the lowest priority. As a result when `TryReusingBlob` tries to process these candidates and if the blobs filtered by the `Opaque` set by the `transport` fail to match then attempt is made against all known blobs (ones which do not belong to the current registry). * Increase the sample set of potential blob reuse to all known compressed digests , also involving the one which do not belong to current registry. * If a blob is found match it against the registry where we are attempting to push. If blob is already there consider it a `CACHE HIT!` and reply skipping blob, since its already there. How to verify this ? * Remove all images `buildah rmi --all` // needed so all new blobs can be tagged again in common bucket * Remove any previous `blob-info-cache` by ```console rm /home//.local/share/containers/cache/blob-info-cache-v1.boltdb ``` ```console $ skopeo copy docker://registry.fedoraproject.org/fedora-minimal docker://quay.io/fl/test:some-tag $ buildah pull registry.fedoraproject.org/fedora-minimal $ buildah tag registry.fedoraproject.org/fedora-minimal quay.io/fl/test $ buildah push quay.io/fl/test ``` ```console Getting image source signatures Copying blob a3497ca15bbf skipped: already exists Copying config f7e02de757 done Writing manifest to image destination Storing signatures ``` Signed-off-by: Aditya R --- docker/docker_image_dest.go | 59 +++++++++----- internal/blobinfocache/types.go | 9 ++- pkg/blobinfocache/boltdb/boltdb.go | 76 +++++++++++-------- .../internal/prioritize/prioritize.go | 56 ++++++++++---- .../internal/prioritize/prioritize_test.go | 30 ++++++-- pkg/blobinfocache/internal/test/test.go | 42 +++++++--- pkg/blobinfocache/memory/memory.go | 54 ++++++++----- pkg/blobinfocache/sqlite/sqlite.go | 38 ++++++++-- 8 files changed, 253 insertions(+), 111 deletions(-) diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index 0e7b154cc6..db33d466e6 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -341,39 +341,58 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, // Then try reusing blobs from other locations. candidates := options.Cache.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, options.CanSubstitute) for _, candidate := range candidates { - candidateRepo, err := parseBICLocationReference(candidate.Location) - if err != nil { - logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) - continue - } + var err error compressionOperation, compressionAlgorithm, err := blobinfocache.OperationAndAlgorithmForCompressor(candidate.CompressorName) if err != nil { logrus.Debugf("OperationAndAlgorithmForCompressor Failed: %v", err) continue } + var candidateRepo reference.Named + if !candidate.UnknownLocation { + candidateRepo, err = parseBICLocationReference(candidate.Location) + if err != nil { + logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) + continue + } + } if !impl.BlobMatchesRequiredCompression(options, compressionAlgorithm) { requiredCompression := "nil" if compressionAlgorithm != nil { requiredCompression = compressionAlgorithm.Name() } - logrus.Debugf("Ignoring candidate blob %s as reuse candidate due to compression mismatch ( %s vs %s ) in %s", candidate.Digest.String(), options.RequiredCompression.Name(), requiredCompression, candidateRepo.Name()) + repoString := "with no location match, checking current repo" + if !candidate.UnknownLocation { + repoString = fmt.Sprintf("in %s", candidateRepo.Name()) + } + logrus.Debugf("Ignoring candidate blob %s as reuse candidate due to compression mismatch ( %s vs %s ) %s", candidate.Digest.String(), options.RequiredCompression.Name(), requiredCompression, repoString) continue } - if candidate.CompressorName != blobinfocache.Uncompressed { - logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name()) + if !candidate.UnknownLocation { + if candidate.CompressorName != blobinfocache.Uncompressed { + logrus.Debugf("Trying to reuse blob with cached digest %s compressed with %s in destination repo %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name()) + } else { + logrus.Debugf("Trying to reuse blob with cached digest %s in destination repo %s", candidate.Digest.String(), candidateRepo.Name()) + } + // Sanity checks: + if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { + // OCI distribution spec 1.1 allows mounting blobs without specifying the source repo + // (the "from" parameter); in that case we might try to use these candidates as well. + // + // OTOH that would mean we can’t do the “blobExists” check, and if there is no match + // we could get an upload request that we would have to cancel. + logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) + continue + } } else { - logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name()) - } - - // Sanity checks: - if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { - // OCI distribution spec 1.1 allows mounting blobs without specifying the source repo - // (the "from" parameter); in that case we might try to use these candidates as well. - // - // OTOH that would mean we can’t do the “blobExists” check, and if there is no match - // we could get an upload request that we would have to cancel. - logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) - continue + if candidate.CompressorName != blobinfocache.Uncompressed { + logrus.Debugf("Trying to reuse blob with cached digest %s compressed with %s with no location match, checking current repo", candidate.Digest.String(), candidate.CompressorName) + } else { + logrus.Debugf("Trying to reuse blob with cached digest %s in destination repo with no location match, checking current repo", candidate.Digest.String()) + } + // This digest is a known variant of this blob but we don’t + // have a recorded location in this registry, let’s try looking + // for it in the current repo. + candidateRepo = reference.TrimNamed(d.ref.ref) } if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest { logrus.Debug("... Already tried the primary destination") diff --git a/internal/blobinfocache/types.go b/internal/blobinfocache/types.go index fdd245812b..429d682635 100644 --- a/internal/blobinfocache/types.go +++ b/internal/blobinfocache/types.go @@ -32,7 +32,7 @@ type BlobInfoCache2 interface { // otherwise the cache could be poisoned and cause us to make incorrect edits to type // information in a manifest. RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) - // CandidateLocations2 returns a prioritized, limited, number of blobs and their locations + // CandidateLocations2 returns a prioritized, limited, number of blobs and their locations (if known) // that could possibly be reused within the specified (transport scope) (if they still // exist, which is not guaranteed). // @@ -46,7 +46,8 @@ type BlobInfoCache2 interface { // BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2. type BICReplacementCandidate2 struct { - Digest digest.Digest - CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression - Location types.BICLocationReference + Digest digest.Digest + CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression + UnknownLocation bool // is true when `Location` for this blob is not set + Location types.BICLocationReference // not set if UnknownLocation is set to `true` } diff --git a/pkg/blobinfocache/boltdb/boltdb.go b/pkg/blobinfocache/boltdb/boltdb.go index e20670b340..4f66bcee4b 100644 --- a/pkg/blobinfocache/boltdb/boltdb.go +++ b/pkg/blobinfocache/boltdb/boltdb.go @@ -296,13 +296,14 @@ func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope type }) // FIXME? Log error (but throttle the log volume on repeated accesses)? } -// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket with corresponding compression info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them to candidates. -func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { +// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket +// (which might be nil) with corresponding compression +// info from compressionBucket (which might be nil), and returns the result of appending them +// to candidates. +// v2Output allows including candidates with unknown location, and filters out candidates +// with unknown compression. +func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { digestKey := []byte(digest.String()) - b := scopeBucket.Bucket(digestKey) - if b == nil { - return candidates - } compressorName := blobinfocache.UnknownCompression if compressionBucket != nil { // the bucket won't exist if the cache was created by a v1 implementation and @@ -311,28 +312,44 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW compressorName = string(compressorNameValue) } } - if compressorName == blobinfocache.UnknownCompression && requireCompressionInfo { + if compressorName == blobinfocache.UnknownCompression && v2Output { return candidates } - _ = b.ForEach(func(k, v []byte) error { - t := time.Time{} - if err := t.UnmarshalBinary(v); err != nil { - return err - } + var b *bolt.Bucket + if scopeBucket != nil { + b = scopeBucket.Bucket(digestKey) + } + if b != nil { + _ = b.ForEach(func(k, v []byte) error { + t := time.Time{} + if err := t.UnmarshalBinary(v); err != nil { + return err + } + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: types.BICLocationReference{Opaque: string(k)}, + }, + LastSeen: t, + }) + return nil + }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } else if v2Output { candidates = append(candidates, prioritize.CandidateWithTime{ Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: types.BICLocationReference{Opaque: string(k)}, + Digest: digest, + CompressorName: compressorName, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, }, - LastSeen: t, + LastSeen: time.Time{}, }) - return nil - }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } return candidates } -// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations that could possibly be reused +// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations (if known) that could possibly be reused // within the specified (transport scope) (if they still exist, which is not guaranteed). // // If !canSubstitute, the returned candidates will match the submitted digest exactly; if canSubstitute, @@ -342,27 +359,22 @@ func (bdc *cache) CandidateLocations2(transport types.ImageTransport, scope type return bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) } -func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { +func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 { res := []prioritize.CandidateWithTime{} var uncompressedDigestValue digest.Digest // = "" if err := bdc.view(func(tx *bolt.Tx) error { scopeBucket := tx.Bucket(knownLocationsBucket) - if scopeBucket == nil { - return nil + if scopeBucket != nil { + scopeBucket = scopeBucket.Bucket([]byte(transport.Name())) } - scopeBucket = scopeBucket.Bucket([]byte(transport.Name())) - if scopeBucket == nil { - return nil - } - scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque)) - if scopeBucket == nil { - return nil + if scopeBucket != nil { + scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque)) } // compressionBucket won't have been created if previous writers never recorded info about compression, // and we don't want to fail just because of that compressionBucket := tx.Bucket(digestCompressorBucket) - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, v2Output) if canSubstitute { if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" { b := tx.Bucket(digestByUncompressedBucket) @@ -375,7 +387,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types return err } if d != primaryDigest && d != uncompressedDigestValue { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, v2Output) } return nil }); err != nil { @@ -384,7 +396,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types } } if uncompressedDigestValue != primaryDigest { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, v2Output) } } } diff --git a/pkg/blobinfocache/internal/prioritize/prioritize.go b/pkg/blobinfocache/internal/prioritize/prioritize.go index bc9315f6ef..97562687c8 100644 --- a/pkg/blobinfocache/internal/prioritize/prioritize.go +++ b/pkg/blobinfocache/internal/prioritize/prioritize.go @@ -10,15 +10,20 @@ import ( "github.com/opencontainers/go-digest" ) -// replacementAttempts is the number of blob replacement candidates returned by destructivelyPrioritizeReplacementCandidates, +// replacementAttempts is the number of blob replacement candidates with known location returned by destructivelyPrioritizeReplacementCandidates, // and therefore ultimately by types.BlobInfoCache.CandidateLocations. // This is a heuristic/guess, and could well use a different value. const replacementAttempts = 5 +// replacementUnknownLocationAttempts is the number of blob replacement candidates with unknown Location returned by destructivelyPrioritizeReplacementCandidates, +// and therefore ultimately by blobinfocache.BlobInfoCache2.CandidateLocations2. +// This is a heuristic/guess, and could well use a different value. +const replacementUnknownLocationAttempts = 2 + // CandidateWithTime is the input to types.BICReplacementCandidate prioritization. type CandidateWithTime struct { Candidate blobinfocache.BICReplacementCandidate2 // The replacement candidate - LastSeen time.Time // Time the candidate was last known to exist (either read or written) + LastSeen time.Time // Time the candidate was last known to exist (either read or written) (not set for Candidate.UnknownLocation) } // candidateSortState is a local state implementing sort.Interface on candidates to prioritize, @@ -77,9 +82,22 @@ func (css *candidateSortState) Swap(i, j int) { css.cs[i], css.cs[j] = css.cs[j], css.cs[i] } -// destructivelyPrioritizeReplacementCandidatesWithMax is destructivelyPrioritizeReplacementCandidates with a parameter for the -// number of entries to limit, only to make testing simpler. -func destructivelyPrioritizeReplacementCandidatesWithMax(cs []CandidateWithTime, primaryDigest, uncompressedDigest digest.Digest, maxCandidates int) []blobinfocache.BICReplacementCandidate2 { +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// destructivelyPrioritizeReplacementCandidatesWithMax is destructivelyPrioritizeReplacementCandidates with parameters for the +// number of entries to limit for known and unknown location separately, only to make testing simpler. +// TODO: following function is not destructive any more in the nature instead priortized result is actually copies of the original +// candidate set, so In future we might wanna re-name this public API and remove the destructive prefix. +func destructivelyPrioritizeReplacementCandidatesWithMax(cs []CandidateWithTime, primaryDigest, uncompressedDigest digest.Digest, totalLimit int, noLocationLimit int) []blobinfocache.BICReplacementCandidate2 { + // split unknown candidates and known candidates + // and limit them seperately. + var knownLocationCandidates []CandidateWithTime + var unknownLocationCandidates []CandidateWithTime // We don't need to use sort.Stable() because nanosecond timestamps are (presumably?) unique, so no two elements should // compare equal. // FIXME: Use slices.SortFunc after we update to Go 1.20 (Go 1.21?) and Time.Compare and cmp.Compare are available. @@ -88,24 +106,34 @@ func destructivelyPrioritizeReplacementCandidatesWithMax(cs []CandidateWithTime, primaryDigest: primaryDigest, uncompressedDigest: uncompressedDigest, }) + for _, candidate := range cs { + if candidate.Candidate.UnknownLocation { + unknownLocationCandidates = append(unknownLocationCandidates, candidate) + } else { + knownLocationCandidates = append(knownLocationCandidates, candidate) + } + } - resLength := len(cs) - if resLength > maxCandidates { - resLength = maxCandidates + knownLocationCandidatesUsed := min(len(knownLocationCandidates), totalLimit) + remainingCapacity := totalLimit - knownLocationCandidatesUsed + unknownLocationCandidatesUsed := min(noLocationLimit, min(remainingCapacity, len(unknownLocationCandidates))) + res := make([]blobinfocache.BICReplacementCandidate2, knownLocationCandidatesUsed) + for i := 0; i < knownLocationCandidatesUsed; i++ { + res[i] = knownLocationCandidates[i].Candidate } - res := make([]blobinfocache.BICReplacementCandidate2, resLength) - for i := range res { - res[i] = cs[i].Candidate + // If candidates with unknown location are found, lets add them to final list + for i := 0; i < unknownLocationCandidatesUsed; i++ { + res = append(res, unknownLocationCandidates[i].Candidate) } return res } // DestructivelyPrioritizeReplacementCandidates consumes AND DESTROYS an array of possible replacement candidates with their last known existence times, -// the primary digest the user actually asked for, and the corresponding uncompressed digest (if known, possibly equal to the primary digest), -// and returns an appropriately prioritized and/or trimmed result suitable for a return value from types.BlobInfoCache.CandidateLocations. +// the primary digest the user actually asked for, the corresponding uncompressed digest (if known, possibly equal to the primary digest) returns an +// appropriately prioritized and/or trimmed result suitable for a return value from types.BlobInfoCache.CandidateLocations. // // WARNING: The array of candidates is destructively modified. (The implementation of this function could of course // make a copy, but all CandidateLocations implementations build the slice of candidates only for the single purpose of calling this function anyway.) func DestructivelyPrioritizeReplacementCandidates(cs []CandidateWithTime, primaryDigest, uncompressedDigest digest.Digest) []blobinfocache.BICReplacementCandidate2 { - return destructivelyPrioritizeReplacementCandidatesWithMax(cs, primaryDigest, uncompressedDigest, replacementAttempts) + return destructivelyPrioritizeReplacementCandidatesWithMax(cs, primaryDigest, uncompressedDigest, replacementAttempts, replacementUnknownLocationAttempts) } diff --git a/pkg/blobinfocache/internal/prioritize/prioritize_test.go b/pkg/blobinfocache/internal/prioritize/prioritize_test.go index 77167e904c..4ada40682f 100644 --- a/pkg/blobinfocache/internal/prioritize/prioritize_test.go +++ b/pkg/blobinfocache/internal/prioritize/prioritize_test.go @@ -32,6 +32,10 @@ var ( {blobinfocache.BICReplacementCandidate2{Digest: digestCompressedPrimary, Location: types.BICLocationReference{Opaque: "P2"}, CompressorName: compressiontypes.GzipAlgorithmName}, time.Unix(1, 1)}, {blobinfocache.BICReplacementCandidate2{Digest: digestCompressedB, Location: types.BICLocationReference{Opaque: "B2"}, CompressorName: blobinfocache.Uncompressed}, time.Unix(2, 0)}, {blobinfocache.BICReplacementCandidate2{Digest: digestUncompressed, Location: types.BICLocationReference{Opaque: "U1"}, CompressorName: blobinfocache.UnknownCompression}, time.Unix(1, 0)}, + {blobinfocache.BICReplacementCandidate2{Digest: digestUncompressed, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, time.Time{}}, + {blobinfocache.BICReplacementCandidate2{Digest: digestCompressedA, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, time.Time{}}, + {blobinfocache.BICReplacementCandidate2{Digest: digestCompressedB, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, time.Time{}}, + {blobinfocache.BICReplacementCandidate2{Digest: digestCompressedPrimary, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, time.Time{}}, }, primaryDigest: digestCompressedPrimary, uncompressedDigest: digestUncompressed, @@ -46,12 +50,16 @@ var ( {Digest: digestCompressedA, Location: types.BICLocationReference{Opaque: "A1"}, CompressorName: compressiontypes.XzAlgorithmName}, {Digest: digestUncompressed, Location: types.BICLocationReference{Opaque: "U2"}, CompressorName: compressiontypes.GzipAlgorithmName}, {Digest: digestUncompressed, Location: types.BICLocationReference{Opaque: "U1"}, CompressorName: blobinfocache.UnknownCompression}, + {Digest: digestCompressedPrimary, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, + {Digest: digestCompressedA, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, + {Digest: digestCompressedB, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, + {Digest: digestUncompressed, UnknownLocation: true, Location: types.BICLocationReference{Opaque: ""}, CompressorName: blobinfocache.UnknownCompression}, } ) func TestCandidateSortStateLen(t *testing.T) { css := cssLiteral - assert.Equal(t, 8, css.Len()) + assert.Equal(t, 12, css.Len()) css.cs = []CandidateWithTime{} assert.Equal(t, 0, css.Len()) @@ -156,13 +164,21 @@ func TestCandidateSortStateSwap(t *testing.T) { } func TestDestructivelyPrioritizeReplacementCandidatesWithMax(t *testing.T) { - for _, max := range []int{0, 1, replacementAttempts, 100} { - // Just a smoke test; we mostly rely on test coverage in TestCandidateSortStateLess - res := destructivelyPrioritizeReplacementCandidatesWithMax(slices.Clone(cssLiteral.cs), digestCompressedPrimary, digestUncompressed, max) - if max > len(cssExpectedReplacementCandidates) { - max = len(cssExpectedReplacementCandidates) + totalUnknownLocationCandidates := 4 + for _, max := range []int{0, 1, replacementAttempts, 100, replacementUnknownLocationAttempts} { + for _, unknownMaxLimit := range []int{0, 1, replacementAttempts, 100, replacementUnknownLocationAttempts} { + if unknownMaxLimit > max { + continue + } + // Just a smoke test; we mostly rely on test coverage in TestCandidateSortStateLess + res := destructivelyPrioritizeReplacementCandidatesWithMax(slices.Clone(cssLiteral.cs), digestCompressedPrimary, digestUncompressed, max, unknownMaxLimit) + if max >= len(cssExpectedReplacementCandidates) { + max = len(cssExpectedReplacementCandidates) + max = max - totalUnknownLocationCandidates + max += unknownMaxLimit + } + assert.Equal(t, cssExpectedReplacementCandidates[:max], res) } - assert.Equal(t, cssExpectedReplacementCandidates[:max], res) } } diff --git a/pkg/blobinfocache/internal/test/test.go b/pkg/blobinfocache/internal/test/test.go index d5e92634f9..a33fad1a3e 100644 --- a/pkg/blobinfocache/internal/test/test.go +++ b/pkg/blobinfocache/internal/test/test.go @@ -16,6 +16,7 @@ const ( digestUncompressed = digest.Digest("sha256:2222222222222222222222222222222222222222222222222222222222222222") digestCompressedA = digest.Digest("sha256:3333333333333333333333333333333333333333333333333333333333333333") digestCompressedB = digest.Digest("sha256:4444444444444444444444444444444444444444444444444444444444444444") + digestUncompressedC = digest.Digest("sha256:7777777777777777777777777777777777777777777777777777777777777777") digestCompressedUnrelated = digest.Digest("sha256:5555555555555555555555555555555555555555555555555555555555555555") compressorNameU = "compressorName/U" compressorNameA = "compressorName/A" @@ -201,27 +202,50 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa cache.RecordDigestUncompressedPair(digestCompressedB, digestUncompressed) cache.RecordDigestUncompressedPair(digestUncompressed, digestUncompressed) digestNameSet := []struct { - n string - d digest.Digest - m string + n string + d digest.Digest + m string + unknownLocation bool }{ - {"U", digestUncompressed, compressorNameU}, - {"A", digestCompressedA, compressorNameA}, - {"B", digestCompressedB, compressorNameB}, - {"CU", digestCompressedUnrelated, compressorNameCU}, + {"U", digestUncompressed, compressorNameU, false}, + {"A", digestCompressedA, compressorNameA, false}, + {"B", digestCompressedB, compressorNameB, false}, + {"CU", digestCompressedUnrelated, compressorNameCU, false}, } for scopeIndex, scopeName := range []string{"A", "B", "C"} { // Run the test in two different scopes to verify they don't affect each other. scope := types.BICTransportScope{Opaque: scopeName} - // Nothing is known. assert.Equal(t, []blobinfocache.BICReplacementCandidate2{}, cache.CandidateLocations2(transport, scope, digestUnknown, false)) assert.Equal(t, []blobinfocache.BICReplacementCandidate2{}, cache.CandidateLocations2(transport, scope, digestUnknown, true)) + // If a record exists with compression without Location then + // then return a record without location and with `UnknownLocation: true` + cache.RecordDigestCompressorName(digestUncompressedC, "somecompression") + assert.Equal(t, []blobinfocache.BICReplacementCandidate2{ + { + Digest: digestUncompressedC, + CompressorName: "somecompression", + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, + }}, cache.CandidateLocations2(transport, scope, digestUncompressedC, true)) + // When another entry with scope and Location is set then it should be returned as it has higher + // priority. + cache.RecordKnownLocation(transport, scope, digestUncompressedC, types.BICLocationReference{Opaque: "somelocation"}) + assert.Equal(t, []blobinfocache.BICReplacementCandidate2{ + { + Digest: digestUncompressedC, + CompressorName: "somecompression", + UnknownLocation: false, + Location: types.BICLocationReference{Opaque: "somelocation"}, + }}, cache.CandidateLocations2(transport, scope, digestUncompressedC, true)) + // Record "2" entries before "1" entries; then results should sort "1" (more recent) before "2" (older) for _, suffix := range []string{"2", "1"} { for _, e := range digestNameSet { - cache.RecordKnownLocation(transport, scope, e.d, types.BICLocationReference{Opaque: scopeName + e.n + suffix}) + if !e.unknownLocation { + cache.RecordKnownLocation(transport, scope, e.d, types.BICLocationReference{Opaque: scopeName + e.n + suffix}) + } } } diff --git a/pkg/blobinfocache/memory/memory.go b/pkg/blobinfocache/memory/memory.go index 9e5c4256ba..cfad16b2ec 100644 --- a/pkg/blobinfocache/memory/memory.go +++ b/pkg/blobinfocache/memory/memory.go @@ -133,24 +133,39 @@ func (mem *cache) RecordDigestCompressorName(blobDigest digest.Digest, compresso mem.compressors[blobDigest] = compressorName } -// appendReplacementCandidates creates prioritize.CandidateWithTime values for (transport, scope, digest), and returns the result of appending them to candidates. -func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { +// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in memory +// with corresponding compression info from mem.compressors, and returns the result of appending +// them to candidates. v2Output allows including candidates with unknown location, and filters out +// candidates with unknown compression. +func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { + compressorName := blobinfocache.UnknownCompression + if v, ok := mem.compressors[digest]; ok { + compressorName = v + } + if compressorName == blobinfocache.UnknownCompression && v2Output { + return candidates + } locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: scope, blobDigest: digest}] // nil if not present - for l, t := range locations { - compressorName, compressorKnown := mem.compressors[digest] - if !compressorKnown { - if requireCompressionInfo { - continue - } - compressorName = blobinfocache.UnknownCompression + if len(locations) > 0 { + for l, t := range locations { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: l, + }, + LastSeen: t, + }) } + } else if v2Output { candidates = append(candidates, prioritize.CandidateWithTime{ Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: l, + Digest: digest, + CompressorName: compressorName, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, }, - LastSeen: t, + LastSeen: time.Time{}, }) } return candidates @@ -166,7 +181,7 @@ func (mem *cache) CandidateLocations(transport types.ImageTransport, scope types return blobinfocache.CandidateLocationsFromV2(mem.candidateLocations(transport, scope, primaryDigest, canSubstitute, false)) } -// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations that could possibly be reused +// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations (if known) that could possibly be reused // within the specified (transport scope) (if they still exist, which is not guaranteed). // // If !canSubstitute, the returned cadidates will match the submitted digest exactly; if canSubstitute, @@ -176,23 +191,24 @@ func (mem *cache) CandidateLocations2(transport types.ImageTransport, scope type return mem.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) } -func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { +func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 { mem.mutex.Lock() defer mem.mutex.Unlock() res := []prioritize.CandidateWithTime{} - res = mem.appendReplacementCandidates(res, transport, scope, primaryDigest, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, scope, primaryDigest, v2Output) var uncompressedDigest digest.Digest // = "" if canSubstitute { if uncompressedDigest = mem.uncompressedDigestLocked(primaryDigest); uncompressedDigest != "" { - if otherDigests, ok := mem.digestsByUncompressed[uncompressedDigest]; ok { + otherDigests := mem.digestsByUncompressed[uncompressedDigest] // nil if not present in the map + if otherDigests != nil { for _, d := range otherDigests.Values() { if d != primaryDigest && d != uncompressedDigest { - res = mem.appendReplacementCandidates(res, transport, scope, d, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, scope, d, v2Output) } } } if uncompressedDigest != primaryDigest { - res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, v2Output) } } } diff --git a/pkg/blobinfocache/sqlite/sqlite.go b/pkg/blobinfocache/sqlite/sqlite.go index 8a4b7e6096..38d3b4067b 100644 --- a/pkg/blobinfocache/sqlite/sqlite.go +++ b/pkg/blobinfocache/sqlite/sqlite.go @@ -427,10 +427,14 @@ func (sqc *cache) RecordDigestCompressorName(anyDigest digest.Digest, compressor }) // FIXME? Log error (but throttle the log volume on repeated accesses)? } -// appendReplacementCandidates creates prioritize.CandidateWithTime values for (transport, scope, digest), and returns the result of appending them to candidates. +// appendReplacementCandidates creates prioritize.CandidateWithTime values for (transport, scope, digest), +// and returns the result of appending them to candidates. requireCompressionInfo allows including candidates with unknown +// location, and filters out candidates with unknown compression. func (sqc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, tx *sql.Tx, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) ([]prioritize.CandidateWithTime, error) { var rows *sql.Rows + res := []prioritize.CandidateWithTime{} var err error + compressorName := blobinfocache.UnknownCompression if requireCompressionInfo { rows, err = tx.Query("SELECT location, time, compressor FROM KnownLocations JOIN DigestCompressors "+ "ON KnownLocations.digest = DigestCompressors.digest "+ @@ -451,14 +455,14 @@ func (sqc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW for rows.Next() { var location string var time time.Time - var compressorName string - if err := rows.Scan(&location, &time, &compressorName); err != nil { + var compressorNameLocal string + if err := rows.Scan(&location, &time, &compressorNameLocal); err != nil { return nil, fmt.Errorf("scanning candidate: %w", err) } - candidates = append(candidates, prioritize.CandidateWithTime{ + res = append(res, prioritize.CandidateWithTime{ Candidate: blobinfocache.BICReplacementCandidate2{ Digest: digest, - CompressorName: compressorName, + CompressorName: compressorNameLocal, Location: types.BICLocationReference{Opaque: location}, }, LastSeen: time, @@ -467,10 +471,32 @@ func (sqc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterating through locations: %w", err) } + + if len(res) == 0 { + compressor, found, err2 := querySingleValue[string](tx, "SELECT compressor FROM DigestCompressors WHERE digest = ?", digest.String()) + if err2 != nil { + return nil, fmt.Errorf("scanning compressorName: %w", err2) + } + if found { + compressorName = compressor + } + if requireCompressionInfo && compressorName != blobinfocache.UnknownCompression { + res = append(res, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, + }, + LastSeen: time.Time{}, + }) + } + } + candidates = append(candidates, res...) return candidates, nil } -// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations +// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations (if known) // that could possibly be reused within the specified (transport scope) (if they still // exist, which is not guaranteed). //