diff --git a/aptly/interfaces.go b/aptly/interfaces.go index e912daba2..9980d8981 100644 --- a/aptly/interfaces.go +++ b/aptly/interfaces.go @@ -70,7 +70,7 @@ type PublishedStorage interface { // Remove removes single file under public path Remove(path string) error // LinkFromPool links package file from pool to dist's pool location - LinkFromPool(publishedDirectory, fileName string, sourcePool PackagePool, sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error + LinkFromPool(publishedPrefix, publishedRelPath, fileName string, sourcePool PackagePool, sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error // Filelist returns list of files under prefix Filelist(prefix string) ([]string, error) // RenameFile renames (moves) file diff --git a/azure/public.go b/azure/public.go index cf1a7b205..eb54b0d3d 100644 --- a/azure/public.go +++ b/azure/public.go @@ -22,7 +22,7 @@ import ( type PublishedStorage struct { container azblob.ContainerURL prefix string - pathCache map[string]string + pathCache map[string]map[string]string } // Check interface @@ -174,31 +174,38 @@ func (storage *PublishedStorage) Remove(path string) error { // LinkFromPool links package file from pool to dist's pool location // -// publishedDirectory is desired location in pool (like prefix/pool/component/liba/libav/) +// publishedPrefix is desired prefix for the location in the pool. +// publishedRelParh is desired location in pool (like pool/component/liba/libav/) // sourcePool is instance of aptly.PackagePool // sourcePath is filepath to package file in package pool // // LinkFromPool returns relative path for the published file to be included in package index -func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName string, sourcePool aptly.PackagePool, +func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, fileName string, sourcePool aptly.PackagePool, sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error { - relPath := filepath.Join(publishedDirectory, fileName) - poolPath := filepath.Join(storage.prefix, relPath) + relFilePath := filepath.Join(publishedRelPath, fileName) + prefixRelFilePath := filepath.Join(publishedPrefix, relFilePath) + poolPath := filepath.Join(storage.prefix, prefixRelFilePath) if storage.pathCache == nil { - paths, md5s, err := storage.internalFilelist("") + storage.pathCache = make(map[string]map[string]string) + } + pathCache := storage.pathCache[publishedPrefix] + if pathCache == nil { + paths, md5s, err := storage.internalFilelist(publishedPrefix) if err != nil { return fmt.Errorf("error caching paths under prefix: %s", err) } - storage.pathCache = make(map[string]string, len(paths)) + pathCache = make(map[string]string, len(paths)) for i := range paths { - storage.pathCache[paths[i]] = md5s[i] + pathCache[paths[i]] = md5s[i] } + storage.pathCache[publishedPrefix] = pathCache } - destinationMD5, exists := storage.pathCache[relPath] + destinationMD5, exists := pathCache[relFilePath] sourceMD5 := sourceChecksums.MD5 if exists { @@ -221,9 +228,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName strin } defer source.Close() - err = storage.putFile(relPath, source, sourceMD5) + err = storage.putFile(prefixRelFilePath, source, sourceMD5) if err == nil { - storage.pathCache[relPath] = sourceMD5 + pathCache[relFilePath] = sourceMD5 } else { err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s: %s", sourcePath, storage, poolPath)) } diff --git a/azure/public_test.go b/azure/public_test.go index 79abfed23..c038ba8b9 100644 --- a/azure/public_test.go +++ b/azure/public_test.go @@ -300,45 +300,45 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Assert(err, IsNil) // first link from pool - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.storage.LinkFromPool("", filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // duplicate link from pool - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with conflict - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, false) c.Check(err, ErrorMatches, ".*file already exists and is different.*") c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with conflict and force - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, true) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, true) c.Check(err, IsNil) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Spam")) // for prefixed storage: // first link from pool - err = s.prefixedStorage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) // 2nd link from pool, providing wrong path for source file // // this test should check that file already exists in S3 and skip upload (which would fail if not skipped) s.prefixedStorage.pathCache = nil - err = s.prefixedStorage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, "wrong-looks-like-pathcache-doesnt-work", cksum1, false) + err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, "wrong-looks-like-pathcache-doesnt-work", cksum1, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "lala/pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with nested file name - err = s.storage.LinkFromPool("dists/jessie/non-free/installer-i386/current/images", "netboot/boot.img.gz", pool, src3, cksum3, false) + err = s.storage.LinkFromPool("", "dists/jessie/non-free/installer-i386/current/images", "netboot/boot.img.gz", pool, src3, cksum3, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "dists/jessie/non-free/installer-i386/current/images/netboot/boot.img.gz"), DeepEquals, []byte("Contents")) diff --git a/context/context.go b/context/context.go index a4fef877e..3be33a3a6 100644 --- a/context/context.go +++ b/context/context.go @@ -394,7 +394,8 @@ func (context *AptlyContext) GetPublishedStorage(name string) aptly.PublishedSto params.AccessKeyID, params.SecretAccessKey, params.SessionToken, params.Region, params.Endpoint, params.Bucket, params.ACL, params.Prefix, params.StorageClass, params.EncryptionMethod, params.PlusWorkaround, params.DisableMultiDel, - params.ForceSigV2, params.ForceVirtualHostedStyle, params.Debug) + params.ForceSigV2, params.ForceVirtualHostedStyle, params.ParallelListingRequests, + params.Debug) if err != nil { Fatal(err) } diff --git a/deb/package.go b/deb/package.go index ce16adf2f..5d372b2bc 100644 --- a/deb/package.go +++ b/deb/package.go @@ -621,9 +621,7 @@ func (p *Package) LinkFromPool(publishedStorage aptly.PublishedStorage, packageP return err } - publishedDirectory := filepath.Join(prefix, relPath) - - err = publishedStorage.LinkFromPool(publishedDirectory, f.Filename, packagePool, sourcePoolPath, f.Checksums, force) + err = publishedStorage.LinkFromPool(prefix, relPath, f.Filename, packagePool, sourcePoolPath, f.Checksums, force) if err != nil { return err } diff --git a/files/public.go b/files/public.go index fc87b4104..7e9e30035 100644 --- a/files/public.go +++ b/files/public.go @@ -118,16 +118,17 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress // LinkFromPool links package file from pool to dist's pool location // -// publishedDirectory is desired location in pool (like prefix/pool/component/liba/libav/) +// publishedPrefix is desired prefix for the location in the pool. +// publishedRelParh is desired location in pool (like pool/component/liba/libav/) // sourcePool is instance of aptly.PackagePool // sourcePath is a relative path to package file in package pool // // LinkFromPool returns relative path for the published file to be included in package index -func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName string, sourcePool aptly.PackagePool, +func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, fileName string, sourcePool aptly.PackagePool, sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error { baseName := filepath.Base(fileName) - poolPath := filepath.Join(storage.rootPath, publishedDirectory, filepath.Dir(fileName)) + poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName)) err := os.MkdirAll(poolPath, 0777) if err != nil { diff --git a/files/public_test.go b/files/public_test.go index 16f724cb7..f135bd3ef 100644 --- a/files/public_test.go +++ b/files/public_test.go @@ -233,7 +233,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Assert(err, IsNil) // Test using hardlinks - err = s.storage.LinkFromPool(filepath.Join(t.prefix, t.publishedDirectory), t.sourcePath, pool, srcPoolPath, sourceChecksum, false) + err = s.storage.LinkFromPool(t.prefix, t.publishedDirectory, t.sourcePath, pool, srcPoolPath, sourceChecksum, false) c.Assert(err, IsNil) st, err := os.Stat(filepath.Join(s.storage.rootPath, t.prefix, t.expectedFilename)) @@ -243,7 +243,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(int(info.Nlink), Equals, 3) // Test using symlinks - err = s.storageSymlink.LinkFromPool(filepath.Join(t.prefix, t.publishedDirectory), t.sourcePath, pool, srcPoolPath, sourceChecksum, false) + err = s.storageSymlink.LinkFromPool(t.prefix, t.publishedDirectory, t.sourcePath, pool, srcPoolPath, sourceChecksum, false) c.Assert(err, IsNil) st, err = os.Lstat(filepath.Join(s.storageSymlink.rootPath, t.prefix, t.expectedFilename)) @@ -254,7 +254,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(int(info.Mode&syscall.S_IFMT), Equals, int(syscall.S_IFLNK)) // Test using copy with checksum verification - err = s.storageCopy.LinkFromPool(filepath.Join(t.prefix, t.publishedDirectory), t.sourcePath, pool, srcPoolPath, sourceChecksum, false) + err = s.storageCopy.LinkFromPool(t.prefix, t.publishedDirectory, t.sourcePath, pool, srcPoolPath, sourceChecksum, false) c.Assert(err, IsNil) st, err = os.Stat(filepath.Join(s.storageCopy.rootPath, t.prefix, t.expectedFilename)) @@ -264,7 +264,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(int(info.Nlink), Equals, 1) // Test using copy with size verification - err = s.storageCopySize.LinkFromPool(filepath.Join(t.prefix, t.publishedDirectory), t.sourcePath, pool, srcPoolPath, sourceChecksum, false) + err = s.storageCopySize.LinkFromPool(t.prefix, t.publishedDirectory, t.sourcePath, pool, srcPoolPath, sourceChecksum, false) c.Assert(err, IsNil) st, err = os.Stat(filepath.Join(s.storageCopySize.rootPath, t.prefix, t.expectedFilename)) @@ -289,7 +289,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Assert(err, IsNil) nlinks := int(st.Sys().(*syscall.Stat_t).Nlink) - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) c.Check(err, ErrorMatches, ".*file already exists and is different") st, err = pool.Stat(srcPoolPath) @@ -297,7 +297,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(int(st.Sys().(*syscall.Stat_t).Nlink), Equals, nlinks) // linking with force - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, true) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, true) c.Check(err, IsNil) st, err = pool.Stat(srcPoolPath) @@ -305,21 +305,21 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(int(st.Sys().(*syscall.Stat_t).Nlink), Equals, nlinks+1) // Test using symlinks - err = s.storageSymlink.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) + err = s.storageSymlink.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) c.Check(err, ErrorMatches, ".*file already exists and is different") - err = s.storageSymlink.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, true) + err = s.storageSymlink.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, true) c.Check(err, IsNil) // Test using copy with checksum verification - err = s.storageCopy.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) + err = s.storageCopy.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) c.Check(err, ErrorMatches, ".*file already exists and is different") - err = s.storageCopy.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, true) + err = s.storageCopy.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, true) c.Check(err, IsNil) // Test using copy with size verification (this will NOT detect the difference) - err = s.storageCopySize.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) + err = s.storageCopySize.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, srcPoolPath, sourceChecksum, false) c.Check(err, IsNil) } diff --git a/man/aptly.1 b/man/aptly.1 index fe986f0ff..bebf2dbd1 100644 --- a/man/aptly.1 +++ b/man/aptly.1 @@ -1,7 +1,7 @@ .\" generated with Ronn/v0.7.3 .\" http://github.com/rtomayko/ronn/tree/0.7.3 . -.TH "APTLY" "1" "January 2022" "" "" +.TH "APTLY" "1" "April 2023" "" "" . .SH "NAME" \fBaptly\fR \- Debian repository management tool @@ -84,7 +84,8 @@ Configuration file is stored in JSON format (default values shown below): "plusWorkaround": false, "disableMultiDel": false, "forceSigV2": false, - "forceVirtualHostedStyle": false, + "forceVirtualHostedStyle": true, + "parallelListingRequests": 0, "debug": false } }, @@ -105,7 +106,7 @@ Configuration file is stored in JSON format (default values shown below): "accountKey": "", "container": "repo", "prefix": "" - "endpoint": "blob.core.windows.net" + "endpoint": "blob\.core\.windows\.net" } } } @@ -270,6 +271,10 @@ bucket name (optional) disable path style visit, useful with non\-AWS S3\-compatible object stores which only support virtual hosted style . .TP +\fBparallelListingRequests\fR +(optional) if greater than 1, list the content of the bucket up to this number of requests in parallel +. +.TP \fBdebug\fR (optional) enables detailed request/response dump for each S3 operation . @@ -507,6 +512,10 @@ disable verification of Release file signatures gpg keyring to use when verifying Release file (could be specified multiple times) . .TP +\-\fBmax\-tries\fR=1 +max download tries till process fails with download error +. +.TP \-\fBwith\-installer\fR download additional not packaged installer files . @@ -1009,13 +1018,13 @@ custom format for result printing include dependencies into search results . .SH "ADD PACKAGES TO LOCAL REPOSITORIES BASED ON \.CHANGES FILES" -\fBaptly\fR \fBrepo\fR \fBinclude\fR |\fIdirectory\fR \fB\|\.\|\.\|\.\fR +\fBaptly\fR \fBrepo\fR \fBinclude\fR . .P Command include looks for \.changes files in list of arguments or specified directories\. Each \.changes file is verified, parsed, referenced files are put into separate temporary directory and added into local repository\. Successfully imported files are removed by default\. . .P -Additionally uploads could be restricted with file\. Rules in this file control uploads based on GPG key ID of \.changes file signature and queries on \.changes file fields\. +Additionally uploads could be restricted with . .P Example: @@ -1450,6 +1459,10 @@ run GPG with detached tty set value for ButAutomaticUpgrades field . .TP +\-\fBcodename\fR= +codename to publish (defaults to distribution) +. +.TP \-\fBcomponent\fR= component name to publish (for multi\-component publishing, separate components with commas) . @@ -1494,6 +1507,10 @@ GPG passphrase\-file for the key (warning: could be insecure) GPG secret keyring to use (instead of default) . .TP +\-\fBskip\-bz2\fR +don\(cqt generate bzipped indexes +. +.TP \-\fBskip\-contents\fR don\(cqt generate Contents indexes . @@ -1505,10 +1522,6 @@ don\(cqt sign Release files with GPG \-\fBsuite\fR= suite to publish (defaults to distribution) . -.TP -\-\fBcodename\fR= -codename to publish (defaults to distribution) -. .SH "PUBLISH SNAPSHOT" \fBaptly\fR \fBpublish\fR \fBsnapshot\fR \fIname\fR [[\fIendpoint\fR:]\fIprefix\fR] . @@ -1557,6 +1570,10 @@ run GPG with detached tty overwrite value for ButAutomaticUpgrades field . .TP +\-\fBcodename\fR= +codename to publish (defaults to distribution) +. +.TP \-\fBcomponent\fR= component name to publish (for multi\-component publishing, separate components with commas) . @@ -1601,6 +1618,10 @@ GPG passphrase\-file for the key (warning: could be insecure) GPG secret keyring to use (instead of default) . .TP +\-\fBskip\-bz2\fR +don\(cqt generate bzipped indexes +. +.TP \-\fBskip\-contents\fR don\(cqt generate Contents indexes . @@ -1612,10 +1633,6 @@ don\(cqt sign Release files with GPG \-\fBsuite\fR= suite to publish (defaults to distribution) . -.TP -\-\fBcodename\fR= -codename to publish (defaults to distribution) -. .SH "UPDATE PUBLISHED REPOSITORY BY SWITCHING TO NEW SNAPSHOT" \fBaptly\fR \fBpublish\fR \fBswitch\fR \fIdistribution\fR [[\fIendpoint\fR:]\fIprefix\fR] \fInew\-snapshot\fR . @@ -1687,6 +1704,10 @@ GPG passphrase\-file for the key (warning: could be insecure) GPG secret keyring to use (instead of default) . .TP +\-\fBskip\-bz2\fR +don\(cqt generate bzipped indexes +. +.TP \-\fBskip\-cleanup\fR don\(cqt remove unreferenced files in prefix/component . @@ -1752,6 +1773,10 @@ GPG passphrase\-file for the key (warning: could be insecure) GPG secret keyring to use (instead of default) . .TP +\-\fBskip\-bz2\fR +don\(cqt generate bzipped indexes +. +.TP \-\fBskip\-cleanup\fR don\(cqt remove unreferenced files in prefix/component . @@ -2160,5 +2185,32 @@ Lorenzo Bolla (https://github\.com/lbolla) .IP "\[ci]" 4 Benj Fassbind (https://github\.com/randombenj) . +.IP "\[ci]" 4 +Markus Muellner (https://github\.com/mmianl) +. +.IP "\[ci]" 4 +Chuan Liu (https://github\.com/chuan) +. +.IP "\[ci]" 4 +Samuel Mutel (https://github\.com/smutel) +. +.IP "\[ci]" 4 +Russell Greene (https://github\.com/russelltg) +. +.IP "\[ci]" 4 +Wade Simmons (https://github\.com/wadey) +. +.IP "\[ci]" 4 +Steven Stone (https://github\.com/smstone) +. +.IP "\[ci]" 4 +Josh Bayfield (https://github\.com/jbayfield) +. +.IP "\[ci]" 4 +Boxjan (https://github\.com/boxjan) +. +.IP "\[ci]" 4 +Mauro Regli (https://github\.com/reglim) +. .IP "" 0 diff --git a/man/aptly.1.ronn.tmpl b/man/aptly.1.ronn.tmpl index df49404a4..f8d5fe932 100644 --- a/man/aptly.1.ronn.tmpl +++ b/man/aptly.1.ronn.tmpl @@ -77,6 +77,7 @@ Configuration file is stored in JSON format (default values shown below): "disableMultiDel": false, "forceSigV2": false, "forceVirtualHostedStyle": true, + "parallelListingRequests": 0, "debug": false } }, @@ -255,6 +256,9 @@ and associated settings: * `forceVirtualHostedStyle`: (optional) disable path style visit, useful with non-AWS S3-compatible object stores which only support virtual hosted style + * `parallelListingRequests`: + (optional) if greater than 1, list the content of the bucket up to this number of requests + in parallel * `debug`: (optional) enables detailed request/response dump for each S3 operation diff --git a/s3/parallel_filelister.go b/s3/parallel_filelister.go new file mode 100644 index 000000000..894dcf5d8 --- /dev/null +++ b/s3/parallel_filelister.go @@ -0,0 +1,197 @@ +// Implements listing the content of an S3 bucket traversing the content tree in +// a parallel fashion. +package s3 + +import ( + "strings" + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/pkg/errors" +) + +// S3ListObjectsMaxKeys is the maximum number of keys the S3 ListObjects API +// will return. +const S3ListObjectsMaxKeys = 1000 + +type pathAndMd5 struct { + path string + md5 string + err error +} + +type prefixAndDepth struct { + prefix string + depth int +} + +// ParallelFilelister holds the data structures necessary to list the bucket. +type ParallelFilelister struct { + Paths []string + Md5s []string + Errs []error + s3 *s3.S3 + bucket string + rootPrefix string + pathAndMd5Chan chan *pathAndMd5 + collectPathAndMd5Done chan bool + prefixAndDepthChan chan *prefixAndDepth + wg sync.WaitGroup + hidePlusWorkaround bool +} + +// StartNewParallelFilelister create a ParallelFilelister object and start +// traversing the bucket in the background with up to `parallelNumber` +// concurrent requests. +// If `parallelNumber` is less than 1, it will be set to 1. +// Invoke `WaitForCompletion()` to block until listing is completed. +// Paths and MD5s will be stored in the `Paths` amd `Md5s` fields of the +// `ParallelFilelister` object. +// Errors will be stored in the `Errs` field. +func StartNewParallelFilelister( + s3 *s3.S3, bucket, rootPrefix string, parallelNumber int, hidePlusWorkaround bool, +) *ParallelFilelister { + filelister := &ParallelFilelister{ + Paths: make([]string, 0, 1024), + Md5s: make([]string, 0, 1024), + s3: s3, + bucket: bucket, + rootPrefix: rootPrefix, + pathAndMd5Chan: make(chan *pathAndMd5), + collectPathAndMd5Done: make(chan bool), + prefixAndDepthChan: make(chan *prefixAndDepth), + hidePlusWorkaround: hidePlusWorkaround, + } + + go filelister.collectPathsAndMd5s() + + for i := 0; i < parallelNumber; i++ { + go filelister.filelistWorkerLoop() + } + maxDepth := 0 + if parallelNumber > 1 { + maxDepth = -1 + prefixParts := strings.Split(strings.TrimRight(rootPrefix, "/"), "/") + for i := len(prefixParts) - 1; i >= 0; i-- { + if prefixParts[i] == "dists" { + maxDepth = 0 + break + } else if prefixParts[i] == "pool" { + maxDepth = 2 - (len(prefixParts) - 1 - i) + if maxDepth < 0 { + maxDepth = 0 + } + break + } + } + } + + filelister.listPrefix(rootPrefix, maxDepth) + return filelister +} + +// WaitForCompletion blocks until all requests have terminated and all paths +// (and errors) have been collected. +func (filelister *ParallelFilelister) WaitForCompletion() { + filelister.wg.Wait() + close(filelister.prefixAndDepthChan) + close(filelister.pathAndMd5Chan) + <-filelister.collectPathAndMd5Done +} + +// listPrefix queues a prefix for parallel listing up to the specified depth. +func (filelister *ParallelFilelister) listPrefix(prefix string, maxDepth int) { + filelister.wg.Add(1) + select { + case filelister.prefixAndDepthChan <- &prefixAndDepth{prefix, maxDepth}: + default: + // Channel is full, all workers are busy, list the prefix in + // this worker to avoid deadlock. + filelister.filelistWorker(prefix, maxDepth) + } +} + +// filelistWorker list the content of a prefix in the bucket. +// If maxDepth is == 0, it will list the whole bucket sequentially. +// If maxDepth is < 0, it will list common prefixes in parallel with no depth +// limit. +// If maxDepth is > 0, it will list common prefixes in parallel for the next +// maxDepth levels, and then list sequentially. +func (filelister *ParallelFilelister) filelistWorker(prefix string, maxDepth int) { + defer filelister.wg.Done() + + params := &s3.ListObjectsInput{ + Bucket: aws.String(filelister.bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(S3ListObjectsMaxKeys), + } + if maxDepth != 0 { + if strings.HasSuffix(prefix, "dists/") { + // Do not list the content of dists/ in parallel, as it can have + // hundreds of subdirectories. + maxDepth = 0 + } + if strings.HasSuffix(prefix, "pool/") { + // List in parallel up to pool//, as there could + // be hundreds of directories inside that. + maxDepth = 2 + } + } + if maxDepth != 0 { + // Not setting Delimiter will cause the whole prefix to be listed. + params.Delimiter = aws.String("/") + } + + err := filelister.s3.ListObjectsPages(params, func(contents *s3.ListObjectsOutput, lastPage bool) bool { + for _, key := range contents.Contents { + if filelister.hidePlusWorkaround && strings.Contains(*key.Key, " ") { + // if we use plusWorkaround, we want to hide those duplicates + /// from listing + continue + } + + filelister.pathAndMd5Chan <- &pathAndMd5{ + path: *key.Key, + md5: strings.Replace(*key.ETag, "\"", "", -1), + } + + } + for _, c := range contents.CommonPrefixes { + if c.Prefix != nil { + filelister.listPrefix(*c.Prefix, maxDepth-1) + } + } + + return true + }) + + if err != nil { + filelister.pathAndMd5Chan <- &pathAndMd5{ + err: errors.WithMessagef(err, "error listing under prefix %s in %s: %s", prefix, filelister.bucket, err), + } + } +} + +// filelistWorkerLoop received new prefixes to list and invokes filelistWorker() +func (filelister *ParallelFilelister) filelistWorkerLoop() { + for i := range filelister.prefixAndDepthChan { + filelister.filelistWorker(i.prefix, i.depth) + } +} + +// collectPathsAndMd5s collects paths, md5s, and errors produeced by filelistWorker() +func (filelister *ParallelFilelister) collectPathsAndMd5s() { + for i := range filelister.pathAndMd5Chan { + if i.path != "" { + filelister.Paths = append(filelister.Paths, i.path[len(filelister.rootPrefix):]) + } + if i.md5 != "" { + filelister.Md5s = append(filelister.Md5s, i.md5) + } + if i.err != nil { + filelister.Errs = append(filelister.Errs, i.err) + } + } + close(filelister.collectPathAndMd5Done) +} diff --git a/s3/public.go b/s3/public.go index 0b6061282..44fa6ce93 100644 --- a/s3/public.go +++ b/s3/public.go @@ -24,16 +24,17 @@ const errCodeNotFound = "NotFound" // PublishedStorage abstract file system with published files (actually hosted on S3) type PublishedStorage struct { - s3 *s3.S3 - config *aws.Config - bucket string - acl string - prefix string - storageClass string - encryptionMethod string - plusWorkaround bool - disableMultiDel bool - pathCache map[string]string + s3 *s3.S3 + config *aws.Config + bucket string + acl string + prefix string + storageClass string + encryptionMethod string + plusWorkaround bool + disableMultiDel bool + pathCache map[string]map[string]string + parallelListingRequests int } // Check interface @@ -45,6 +46,7 @@ var ( func NewPublishedStorageRaw( bucket, defaultACL, prefix, storageClass, encryptionMethod string, plusWorkaround, disabledMultiDel bool, + parallelListingRequests int, config *aws.Config, ) (*PublishedStorage, error) { if defaultACL == "" { @@ -63,15 +65,16 @@ func NewPublishedStorageRaw( } result := &PublishedStorage{ - s3: s3.New(sess), - bucket: bucket, - config: config, - acl: defaultACL, - prefix: prefix, - storageClass: storageClass, - encryptionMethod: encryptionMethod, - plusWorkaround: plusWorkaround, - disableMultiDel: disabledMultiDel, + s3: s3.New(sess), + bucket: bucket, + config: config, + acl: defaultACL, + prefix: prefix, + storageClass: storageClass, + encryptionMethod: encryptionMethod, + plusWorkaround: plusWorkaround, + disableMultiDel: disabledMultiDel, + parallelListingRequests: parallelListingRequests, } return result, nil @@ -81,7 +84,8 @@ func NewPublishedStorageRaw( // keys, region and bucket name func NewPublishedStorage( accessKey, secretKey, sessionToken, region, endpoint, bucket, defaultACL, prefix, storageClass, encryptionMethod string, - plusWorkaround, disableMultiDel, forceSigV2, forceVirtualHostedStyle, debug bool) (*PublishedStorage, error) { + plusWorkaround, disableMultiDel, forceSigV2, forceVirtualHostedStyle bool, parallelListingRequests int, + debug bool) (*PublishedStorage, error) { config := &aws.Config{ Region: aws.String(region), @@ -103,7 +107,7 @@ func NewPublishedStorage( } result, err := NewPublishedStorageRaw(bucket, defaultACL, prefix, storageClass, - encryptionMethod, plusWorkaround, disableMultiDel, config) + encryptionMethod, plusWorkaround, disableMultiDel, parallelListingRequests, config) if err == nil && forceSigV2 { creds := []awsauth.Credentials{} @@ -295,31 +299,38 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress // LinkFromPool links package file from pool to dist's pool location // -// publishedDirectory is desired location in pool (like prefix/pool/component/liba/libav/) +// publishedPrefix is desired prefix for the location in the pool. +// publishedRelParh is desired location in pool (like pool/component/liba/libav/) // sourcePool is instance of aptly.PackagePool // sourcePath is filepath to package file in package pool // // LinkFromPool returns relative path for the published file to be included in package index -func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName string, sourcePool aptly.PackagePool, +func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, fileName string, sourcePool aptly.PackagePool, sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error { - relPath := filepath.Join(publishedDirectory, fileName) - poolPath := filepath.Join(storage.prefix, relPath) + relFilePath := filepath.Join(publishedRelPath, fileName) + prefixRelFilePath := filepath.Join(publishedPrefix, relFilePath) + poolPath := filepath.Join(storage.prefix, prefixRelFilePath) if storage.pathCache == nil { - paths, md5s, err := storage.internalFilelist("", true) + storage.pathCache = make(map[string]map[string]string) + } + pathCache := storage.pathCache[publishedPrefix] + if pathCache == nil { + paths, md5s, err := storage.internalFilelist(publishedPrefix, true) if err != nil { return errors.Wrap(err, "error caching paths under prefix") } - storage.pathCache = make(map[string]string, len(paths)) + pathCache = make(map[string]string, len(paths)) for i := range paths { - storage.pathCache[paths[i]] = md5s[i] + pathCache[paths[i]] = md5s[i] } + storage.pathCache[publishedPrefix] = pathCache } - destinationMD5, exists := storage.pathCache[relPath] + destinationMD5, exists := pathCache[relFilePath] sourceMD5 := sourceChecksums.MD5 if exists { @@ -327,12 +338,12 @@ func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName strin // doesn’t look like a valid MD5, // attempt to fetch one from the metadata var err error - destinationMD5, err = storage.getMD5(relPath) + destinationMD5, err = storage.getMD5(prefixRelFilePath) if err != nil { err = errors.Wrap(err, fmt.Sprintf("error verifying MD5 for %s: %s", storage, poolPath)) return err } - storage.pathCache[relPath] = destinationMD5 + pathCache[relFilePath] = destinationMD5 } if sourceMD5 == "" { return fmt.Errorf("unable to compare object, MD5 checksum missing") @@ -354,9 +365,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName strin } defer source.Close() - err = storage.putFile(relPath, source, sourceMD5) + err = storage.putFile(prefixRelFilePath, source, sourceMD5) if err == nil { - storage.pathCache[relPath] = sourceMD5 + pathCache[relFilePath] = sourceMD5 } else { err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s: %s", sourcePath, storage, poolPath)) } @@ -371,43 +382,27 @@ func (storage *PublishedStorage) Filelist(prefix string) ([]string, error) { } func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaround bool) (paths []string, md5s []string, err error) { - paths = make([]string, 0, 1024) - md5s = make([]string, 0, 1024) prefix = filepath.Join(storage.prefix, prefix) if prefix != "" { prefix += "/" } - params := &s3.ListObjectsInput{ - Bucket: aws.String(storage.bucket), - Prefix: aws.String(prefix), - MaxKeys: aws.Int64(1000), + parallelFilelistWorkers := storage.parallelListingRequests + if parallelFilelistWorkers < 1 { + parallelFilelistWorkers = 1 } + parallelFilelister := StartNewParallelFilelister(storage.s3, storage.bucket, prefix, parallelFilelistWorkers, storage.plusWorkaround && hidePlusWorkaround) - err = storage.s3.ListObjectsPages(params, func(contents *s3.ListObjectsOutput, lastPage bool) bool { - for _, key := range contents.Contents { - if storage.plusWorkaround && hidePlusWorkaround && strings.Contains(*key.Key, " ") { - // if we use plusWorkaround, we want to hide those duplicates - /// from listing - continue - } + parallelFilelister.WaitForCompletion() - if prefix == "" { - paths = append(paths, *key.Key) - } else { - paths = append(paths, (*key.Key)[len(prefix):]) - } - md5s = append(md5s, strings.Replace(*key.ETag, "\"", "", -1)) + if len(parallelFilelister.Errs) > 0 { + errorStrings := make([]string, len(parallelFilelister.Errs)) + for i, e := range parallelFilelister.Errs { + errorStrings[i] = e.Error() } - - return true - }) - - if err != nil { - return nil, nil, errors.WithMessagef(err, "error listing under prefix %s in %s: %s", prefix, storage, err) + return nil, nil, errors.WithMessagef(parallelFilelister.Errs[0], strings.Join(errorStrings, ", ")) } - - return paths, md5s, nil + return parallelFilelister.Paths, parallelFilelister.Md5s, nil } // RenameFile renames (moves) file diff --git a/s3/public_test.go b/s3/public_test.go index 5a216a01e..37ccff220 100644 --- a/s3/public_test.go +++ b/s3/public_test.go @@ -2,9 +2,12 @@ package s3 import ( "bytes" + "fmt" "io/ioutil" "os" "path/filepath" + "sort" + "strings" . "gopkg.in/check.v1" @@ -24,16 +27,23 @@ type PublishedStorageSuite struct { var _ = Suite(&PublishedStorageSuite{}) func (s *PublishedStorageSuite) SetUpTest(c *C) { + s.createStorageWithParallelRequests(c, 0) +} + +func (s *PublishedStorageSuite) createStorageWithParallelRequests(c *C, parallelRequests int) { var err error + if s.srv != nil { + s.srv.Quit() + } s.srv, err = NewServer(&Config{}) c.Assert(err, IsNil) c.Assert(s.srv, NotNil) - s.storage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "", "", "", false, true, false, false, false) + s.storage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "", "", "", false, true, false, false, parallelRequests, false) c.Assert(err, IsNil) - s.prefixedStorage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "lala", "", "", false, true, false, false, false) + s.prefixedStorage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "lala", "", "", false, true, false, false, parallelRequests, false) c.Assert(err, IsNil) - s.noSuchBucketStorage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "no-bucket", "", "", "", "", false, true, false, false, false) + s.noSuchBucketStorage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "no-bucket", "", "", "", "", false, true, false, false, parallelRequests, false) c.Assert(err, IsNil) _, err = s.storage.s3.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String("test")}) @@ -44,6 +54,17 @@ func (s *PublishedStorageSuite) TearDownTest(c *C) { s.srv.Quit() } +func (s *PublishedStorageSuite) checkGetRequestsEqual(c *C, prefix string, expectedGetRequestUris []string) { + getRequests := make([]string, 0, len(s.srv.Requests)) + for _, r := range s.srv.Requests { + if r.Method == "GET" && strings.HasPrefix(r.RequestURI, prefix) { + getRequests = append(getRequests, r.RequestURI) + } + } + sort.Strings(getRequests) + c.Check(getRequests, DeepEquals, expectedGetRequestUris) +} + func (s *PublishedStorageSuite) GetFile(c *C, path string) []byte { resp, err := s.storage.s3.GetObject(&s3.GetObjectInput{ Bucket: aws.String(s.storage.bucket), @@ -109,6 +130,13 @@ func (s *PublishedStorageSuite) TestPutFilePlusWorkaround(c *C) { } func (s *PublishedStorageSuite) TestFilelist(c *C) { + for _, parallelRequests := range []int{0, 2} { + s.createStorageWithParallelRequests(c, parallelRequests) + s.testFilelist(c) + } +} + +func (s *PublishedStorageSuite) testFilelist(c *C) { paths := []string{"a", "b", "c", "testa", "test/a", "test/b", "lala/a", "lala/b", "lala/c"} for _, path := range paths { s.PutFile(c, path, []byte("test")) @@ -116,6 +144,7 @@ func (s *PublishedStorageSuite) TestFilelist(c *C) { list, err := s.storage.Filelist("") c.Check(err, IsNil) + sort.Strings(list) c.Check(list, DeepEquals, []string{"a", "b", "c", "lala/a", "lala/b", "lala/c", "test/a", "test/b", "testa"}) list, err = s.storage.Filelist("test") @@ -131,7 +160,114 @@ func (s *PublishedStorageSuite) TestFilelist(c *C) { c.Check(list, DeepEquals, []string{"a", "b", "c"}) } +func (s *PublishedStorageSuite) TestFilelistDistAndPool(c *C) { + for _, parallelRequests := range []int{0, 2} { + s.createStorageWithParallelRequests(c, parallelRequests) + s.testFilelistDistAndPool(c, parallelRequests) + } +} + +func trimPrefix(s []string, prefix string) []string { + ret := make([]string, len(s)) + for i, ss := range s { + ret[i] = strings.TrimPrefix(ss, prefix) + } + return ret +} + +func (s *PublishedStorageSuite) testFilelistDistAndPool(c *C, parallelRequests int) { + poolPaths := []string{ + "pool/main/a/abc/abc.deb", + "pool/main/a/abc/abc-dev.deb", + "pool/main/a/agg/agg.deb", + "pool/main/x/xyz/xyz.deb", + } + for _, path := range poolPaths { + s.PutFile(c, path, []byte("test")) + } + distPaths := make([]string, 1010) + for i := 0; i < 1010; i++ { + distPaths[i] = fmt.Sprintf("dists/dist%d/binary-amd64/Packages", i) + } + for _, path := range distPaths { + s.PutFile(c, path, []byte("test")) + } + + prefixAndRequests := map[string][][]string{ + "": { + append(distPaths, poolPaths...), + { // parallelRequests == 0 + "/test?marker=dists%2Fdist99%2Fbinary-amd64%2FPackages&max-keys=1000&prefix=", + "/test?max-keys=1000&prefix=", + }, + { // parallelRequests > 1 + "/test?delimiter=%2F&max-keys=1000&prefix=", + "/test?delimiter=%2F&max-keys=1000&prefix=pool%2F", + "/test?delimiter=%2F&max-keys=1000&prefix=pool%2Fmain%2F", + "/test?marker=dists%2Fdist99%2Fbinary-amd64%2FPackages&max-keys=1000&prefix=dists%2F", + "/test?max-keys=1000&prefix=dists%2F", + "/test?max-keys=1000&prefix=pool%2Fmain%2Fa%2F", + "/test?max-keys=1000&prefix=pool%2Fmain%2Fx%2F", + }, + }, + "dists": { + trimPrefix(distPaths, "dists/"), + { // parallelRequests == 0 + "/test?marker=dists%2Fdist99%2Fbinary-amd64%2FPackages&max-keys=1000&prefix=dists%2F", + "/test?max-keys=1000&prefix=dists%2F", + }, + { // parallelRequests > 1 + "/test?marker=dists%2Fdist99%2Fbinary-amd64%2FPackages&max-keys=1000&prefix=dists%2F", + "/test?max-keys=1000&prefix=dists%2F", + }, + }, + "pool": { + trimPrefix(poolPaths, "pool/"), + { // parallelRequests == 0 + "/test?max-keys=1000&prefix=pool%2F", + }, + { // parallelRequests > 1 + "/test?delimiter=%2F&max-keys=1000&prefix=pool%2F", + "/test?delimiter=%2F&max-keys=1000&prefix=pool%2Fmain%2F", + "/test?max-keys=1000&prefix=pool%2Fmain%2Fa%2F", + "/test?max-keys=1000&prefix=pool%2Fmain%2Fx%2F", + }, + }, + "pool/main/": { + trimPrefix(poolPaths, "pool/main/"), + { // parallelRequests == 0 + "/test?max-keys=1000&prefix=pool%2Fmain%2F", + }, + { // parallelRequests > 1 + "/test?delimiter=%2F&max-keys=1000&prefix=pool%2Fmain%2F", + "/test?max-keys=1000&prefix=pool%2Fmain%2Fa%2F", + "/test?max-keys=1000&prefix=pool%2Fmain%2Fx%2F", + }, + }, + } + for prefix, expectedRequests := range prefixAndRequests { + s.srv.Requests = nil + list, err := s.storage.Filelist(prefix) + c.Check(err, IsNil) + sort.Strings(list) + sort.Strings(expectedRequests[0]) + c.Check(list, DeepEquals, expectedRequests[0]) + if parallelRequests > 1 { + s.checkGetRequestsEqual(c, "", expectedRequests[2]) + } else { + s.checkGetRequestsEqual(c, "", expectedRequests[1]) + } + } +} + func (s *PublishedStorageSuite) TestFilelistPlusWorkaround(c *C) { + for parallelRequests := range []int{0, 2} { + s.createStorageWithParallelRequests(c, parallelRequests) + s.testFilelistPlusWorkaround(c) + } +} + +func (s *PublishedStorageSuite) testFilelistPlusWorkaround(c *C) { s.storage.plusWorkaround = true s.prefixedStorage.plusWorkaround = true @@ -142,6 +278,7 @@ func (s *PublishedStorageSuite) TestFilelistPlusWorkaround(c *C) { list, err := s.storage.Filelist("") c.Check(err, IsNil) + sort.Strings(list) c.Check(list, DeepEquals, []string{"a", "b", "c", "lala/a+b", "lala/c", "test/a+1", "testa"}) list, err = s.storage.Filelist("test") @@ -223,6 +360,7 @@ func (s *PublishedStorageSuite) TestRemoveDirsPlusWorkaround(c *C) { list, err := s.storage.Filelist("") c.Check(err, IsNil) + sort.Strings(list) c.Check(list, DeepEquals, []string{"a", "b", "c", "lala/a", "lala/b", "lala/c", "testa"}) } @@ -264,50 +402,124 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Assert(err, IsNil) // first link from pool - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // duplicate link from pool - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with conflict - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, false) c.Check(err, ErrorMatches, ".*file already exists and is different.*") c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with conflict and force - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, true) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, true) c.Check(err, IsNil) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Spam")) // for prefixed storage: // first link from pool - err = s.prefixedStorage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) // 2nd link from pool, providing wrong path for source file // // this test should check that file already exists in S3 and skip upload (which would fail if not skipped) s.prefixedStorage.pathCache = nil - err = s.prefixedStorage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, "wrong-looks-like-pathcache-doesnt-work", cksum1, false) + err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, "wrong-looks-like-pathcache-doesnt-work", cksum1, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "lala/pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with nested file name - err = s.storage.LinkFromPool("dists/jessie/non-free/installer-i386/current/images", "netboot/boot.img.gz", pool, src3, cksum3, false) + err = s.storage.LinkFromPool("", "dists/jessie/non-free/installer-i386/current/images", "netboot/boot.img.gz", pool, src3, cksum3, false) c.Check(err, IsNil) c.Check(s.GetFile(c, "dists/jessie/non-free/installer-i386/current/images/netboot/boot.img.gz"), DeepEquals, []byte("Contents")) } +func (s *PublishedStorageSuite) TestLinkFromPoolCache(c *C) { + root := c.MkDir() + pool := files.NewPackagePool(root, false) + cs := files.NewMockChecksumStorage() + + tmpFile1 := filepath.Join(c.MkDir(), "mars-invaders_1.03.deb") + err := ioutil.WriteFile(tmpFile1, []byte("Contents"), 0644) + c.Assert(err, IsNil) + cksum1 := utils.ChecksumInfo{MD5: "c1df1da7a1ce305a3b60af9d5733ac1d"} + + src1, err := pool.Import(tmpFile1, "mars-invaders_1.03.deb", &cksum1, true, cs) + c.Assert(err, IsNil) + + // Publish two packages at the same publish prefix + err = s.storage.LinkFromPool("", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + err = s.storage.LinkFromPool("", filepath.Join("pool", "b"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + // Check only one listing request was done to the server + s.checkGetRequestsEqual(c, "/test?", []string{"/test?max-keys=1000&prefix="}) + + s.srv.Requests = nil + // Publish two packages at a different prefix + err = s.storage.LinkFromPool("publish-prefix", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + err = s.storage.LinkFromPool("publish-prefix", filepath.Join("pool", "b"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + // Check only one listing request was done to the server + s.checkGetRequestsEqual(c, "/test?", []string{ + "/test?max-keys=1000&prefix=publish-prefix%2F", + }) + + s.srv.Requests = nil + // Publish two packages at a prefixed storage + err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "b"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + // Check only one listing request was done to the server + s.checkGetRequestsEqual(c, "/test?", []string{ + "/test?max-keys=1000&prefix=lala%2F", + }) + + s.srv.Requests = nil + // Publish two packages at a prefixed storage plus a publish prefix. + err = s.prefixedStorage.LinkFromPool("publish-prefix", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + err = s.prefixedStorage.LinkFromPool("publish-prefix", filepath.Join("pool", "b"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + c.Check(err, IsNil) + + // Check only one listing request was done to the server + s.checkGetRequestsEqual(c, "/test?", []string{ + "/test?max-keys=1000&prefix=lala%2Fpublish-prefix%2F", + }) + + // This step checks that files already exists in S3 and skip upload (which would fail if not skipped). + s.prefixedStorage.pathCache = nil + err = s.prefixedStorage.LinkFromPool("publish-prefix", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, "non-existent-file", cksum1, false) + c.Check(err, IsNil) + err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, "non-existent-file", cksum1, false) + c.Check(err, IsNil) + err = s.storage.LinkFromPool("publish-prefix", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, "non-existent-file", cksum1, false) + c.Check(err, IsNil) + err = s.storage.LinkFromPool("", filepath.Join("pool", "a"), "mars-invaders_1.03.deb", pool, "non-existent-file", cksum1, false) + c.Check(err, IsNil) +} + func (s *PublishedStorageSuite) TestSymLink(c *C) { s.PutFile(c, "a/b", []byte("test")) diff --git a/s3/server_test.go b/s3/server_test.go index 573097abe..08c290406 100644 --- a/s3/server_test.go +++ b/s3/server_test.go @@ -59,6 +59,12 @@ func (c *Config) send409Conflict() bool { return false } +// Request stores the method and URI of an HTTP request. +type Request struct { + Method string + RequestURI string +} + // Server is a fake S3 server for testing purposes. // All of the data for the server is kept in memory. type Server struct { @@ -68,6 +74,8 @@ type Server struct { mu sync.Mutex buckets map[string]*bucket config *Config + // Requests holds a log of all requests received by the server. + Requests []Request } type bucket struct { @@ -140,6 +148,7 @@ func (srv *Server) serveHTTP(w http.ResponseWriter, req *http.Request) { if debug { log.Printf("s3test %q %q", req.Method, req.URL) } + srv.Requests = append(srv.Requests, Request{req.Method, req.RequestURI}) a := &action{ srv: srv, w: w, @@ -330,6 +339,10 @@ type Owner struct { DisplayName string } +type CommonPrefix struct { + Prefix string +} + // The ListResp type holds the results of a List bucket operation. type ListResp struct { Name string @@ -344,7 +357,7 @@ type ListResp struct { // http://goo.gl/YjQTc IsTruncated bool Contents []Key - CommonPrefixes []string `xml:">Prefix"` + CommonPrefixes []CommonPrefix } // The Key type represents an item stored in an S3 bucket. @@ -403,7 +416,7 @@ func (r bucketResource) get(a *action) interface{} { MaxKeys: maxKeys, } - var prefixes []string + var prefixes []CommonPrefix for _, obj := range objs { if !strings.HasPrefix(obj.name, prefix) { continue @@ -413,7 +426,7 @@ func (r bucketResource) get(a *action) interface{} { if delimiter != "" { if i := strings.Index(obj.name[len(prefix):], delimiter); i >= 0 { name = obj.name[:len(prefix)+i+len(delimiter)] - if prefixes != nil && prefixes[len(prefixes)-1] == name { + if prefixes != nil && prefixes[len(prefixes)-1].Prefix == name { continue } isPrefix = true @@ -427,7 +440,7 @@ func (r bucketResource) get(a *action) interface{} { break } if isPrefix { - prefixes = append(prefixes, name) + prefixes = append(prefixes, CommonPrefix{name}) } else { // Contents contains only keys not found in CommonPrefixes resp.Contents = append(resp.Contents, obj.s3Key()) diff --git a/swift/public.go b/swift/public.go index 0ca4796c5..590af091b 100644 --- a/swift/public.go +++ b/swift/public.go @@ -188,15 +188,16 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress // LinkFromPool links package file from pool to dist's pool location // -// publishedDirectory is desired location in pool (like prefix/pool/component/liba/libav/) +// publishedPrefix is desired prefix for the location in the pool. +// publishedRelParh is desired location in pool (like pool/component/liba/libav/) // sourcePool is instance of aptly.PackagePool // sourcePath is filepath to package file in package pool // // LinkFromPool returns relative path for the published file to be included in package index -func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName string, sourcePool aptly.PackagePool, +func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, fileName string, sourcePool aptly.PackagePool, sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error { - relPath := filepath.Join(publishedDirectory, fileName) + relPath := filepath.Join(publishedPrefix, publishedRelPath, fileName) poolPath := filepath.Join(storage.prefix, relPath) var ( diff --git a/swift/public_test.go b/swift/public_test.go index 31b633e81..a9eb43e73 100644 --- a/swift/public_test.go +++ b/swift/public_test.go @@ -172,7 +172,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Assert(err, IsNil) // first link from pool - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) data, err := s.storage.conn.ObjectGetBytes("test", "pool/main/m/mars-invaders/mars-invaders_1.03.deb") @@ -180,7 +180,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(data, DeepEquals, []byte("Contents")) // duplicate link from pool - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src1, cksum1, false) c.Check(err, IsNil) data, err = s.storage.conn.ObjectGetBytes("test", "pool/main/m/mars-invaders/mars-invaders_1.03.deb") @@ -188,7 +188,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(data, DeepEquals, []byte("Contents")) // link from pool with conflict - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, false) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, false) c.Check(err, ErrorMatches, ".*file already exists and is different.*") data, err = s.storage.conn.ObjectGetBytes("test", "pool/main/m/mars-invaders/mars-invaders_1.03.deb") @@ -196,7 +196,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(data, DeepEquals, []byte("Contents")) // link from pool with conflict and force - err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, true) + err = s.storage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, src2, cksum2, true) c.Check(err, IsNil) data, err = s.storage.conn.ObjectGetBytes("test", "pool/main/m/mars-invaders/mars-invaders_1.03.deb") @@ -204,7 +204,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { c.Check(data, DeepEquals, []byte("Spam")) // link from pool with nested file name - err = s.storage.LinkFromPool("dists/jessie/non-free/installer-i386/current/images", "netboot/boot.img.gz", pool, src3, cksum3, false) + err = s.storage.LinkFromPool("", "dists/jessie/non-free/installer-i386/current/images", "netboot/boot.img.gz", pool, src3, cksum3, false) c.Check(err, IsNil) data, err = s.storage.conn.ObjectGetBytes("test", "dists/jessie/non-free/installer-i386/current/images/netboot/boot.img.gz") diff --git a/utils/config.go b/utils/config.go index 321b0473a..2d6d97b06 100644 --- a/utils/config.go +++ b/utils/config.go @@ -63,6 +63,7 @@ type S3PublishRoot struct { DisableMultiDel bool `json:"disableMultiDel"` ForceSigV2 bool `json:"forceSigV2"` ForceVirtualHostedStyle bool `json:"forceVirtualHostedStyle"` + ParallelListingRequests int `json:"parallelListingRequests"` Debug bool `json:"debug"` } diff --git a/utils/config_test.go b/utils/config_test.go index a9b407afb..633f5b4cd 100644 --- a/utils/config_test.go +++ b/utils/config_test.go @@ -106,6 +106,7 @@ func (s *ConfigSuite) TestSaveConfig(c *C) { " \"disableMultiDel\": false,\n"+ " \"forceSigV2\": false,\n"+ " \"forceVirtualHostedStyle\": false,\n"+ + " \"parallelListingRequests\": 0,\n"+ " \"debug\": false\n"+ " }\n"+ " },\n"+