From dfaa41dfc82e585853772c4bcfed5b2e4bc1e799 Mon Sep 17 00:00:00 2001 From: Jonathan Yoder Date: Mon, 4 Dec 2023 15:06:38 -0500 Subject: [PATCH] Support S3 Objects List ETag map --- .../servers/s3server/s3_enumeration.go | 127 ++++++++++++++++++ .../servers/s3server/s3_enumeration_test.go | 48 +++++++ 2 files changed, 175 insertions(+) diff --git a/pkg/rsstorage/servers/s3server/s3_enumeration.go b/pkg/rsstorage/servers/s3server/s3_enumeration.go index 5d32725..d464c49 100644 --- a/pkg/rsstorage/servers/s3server/s3_enumeration.go +++ b/pkg/rsstorage/servers/s3server/s3_enumeration.go @@ -19,6 +19,7 @@ import ( type AwsOps interface { BucketDirs(bucket, s3Prefix string) ([]string, error) BucketObjects(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) ([]string, error) + BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error) } type DefaultAwsOps struct { @@ -157,6 +158,114 @@ func (a *DefaultAwsOps) BucketObjects(bucket, s3Prefix string, concurrency int, } } +func (a *DefaultAwsOps) BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error) { + svc := s3.New(a.sess) + + nextMarkerChan := make(chan string, 100) + nextMarkerChan <- "" + defer close(nextMarkerChan) + + binaryMeta := make(map[string]string) + binaryL := sync.Mutex{} + + wg := sync.WaitGroup{} + waitCh := make(chan struct{}) + wg.Add(1) + + var ops uint64 + var total uint64 + + errCh := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // If recursive is not true, include a delimiter so we only list the contents + // of the directory indicated by `s3Prefix`. Otherwise, leave the delimiter nil + // so we list everything recursively. + var delimiter *string + if !recursive { + delimiter = aws.String("/") + } + + go func() { + for i := 0; i < concurrency; i++ { + go func() { + for nextMarker := range nextMarkerChan { + wg.Add(1) + + query := &s3.ListObjectsInput{ + Bucket: aws.String(bucket), + Prefix: aws.String(s3Prefix), + Delimiter: delimiter, + } + + if nextMarker != "" { + query.SetMarker(nextMarker) + } + + resp, err := svc.ListObjectsWithContext(ctx, query) + if err != nil { + errCh <- fmt.Errorf("something went wrong listing objects: %s", err) + return + } + + nm := "" + + if resp.NextMarker != nil { + nm = *resp.NextMarker + nextMarkerChan <- nm + } + + // When there are no contents, we need to return + // early. + if len(resp.Contents) == 0 { + wg.Done() + // TODO: `nm` may always be blank when there are no + // contents, so this conditional may be unnecessary. + if nm == "" { + wg.Done() + } + return + } + + bm := getObjectsAllMap(resp, s3Prefix, reg) + + binaryL.Lock() + for key, val := range bm { + binaryMeta[key] = val + } + binaryL.Unlock() + + wg.Done() + atomic.AddUint64(&ops, uint64(len(bm))) + if ops > 1000 { + atomic.AddUint64(&total, atomic.LoadUint64(&ops)) + log.Printf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total)) + atomic.SwapUint64(&ops, 0) + } + + if nm == "" { + wg.Done() + break + } + } + }() + } + + wg.Wait() + close(waitCh) + }() + + // Block until the wait group is done or we err + select { + case <-waitCh: + return binaryMeta, nil + case err := <-errCh: + cancel() + return nil, err + } +} + var BinaryReg = regexp.MustCompile(`(.+)(\.tar\.gz|\.zip)$`) func getObjectsAll(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg *regexp.Regexp) []string { @@ -176,3 +285,21 @@ func getObjectsAll(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg return binaryMeta } + +func getObjectsAllMap(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg *regexp.Regexp) map[string]string { + binaryMeta := make(map[string]string) + + for _, key := range bucketObjectsList.Contents { + + if reg != nil { + if s := reg.FindStringSubmatch(*key.Key); len(s) > 1 { + binaryMeta[strings.TrimPrefix(s[1], s3Prefix)] = *key.ETag + } + } else { + binaryMeta[strings.TrimPrefix(*key.Key, s3Prefix)] = *key.ETag + } + + } + + return binaryMeta +} diff --git a/pkg/rsstorage/servers/s3server/s3_enumeration_test.go b/pkg/rsstorage/servers/s3server/s3_enumeration_test.go index 7256eba..b737bf3 100644 --- a/pkg/rsstorage/servers/s3server/s3_enumeration_test.go +++ b/pkg/rsstorage/servers/s3server/s3_enumeration_test.go @@ -101,3 +101,51 @@ func (s *MetaTestSuite) TestBucketObjects(c *check.C) { c.Assert(err, check.IsNil) c.Check(files, check.DeepEquals, []string{"HIJKLMN", "OPQRSTU"}) } + +func (s *MetaTestSuite) TestBucketObjectsMap(c *check.C) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Credentials: credentials.AnonymousCredentials, + }) + c.Assert(err, check.IsNil) + + httpmock.ActivateNonDefault(sess.Config.HTTPClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", `https://sync.s3.amazonaws.com/?delimiter=%2F&prefix=bin%2F3.5-xenial`, + httpmock.NewStringResponder(http.StatusOK, ` + sync + bin/3.5-xenial/ + false + + ABCDEFG.json + 123 + + + HIJKLMN.tar.gz + 456 + + + OPQRSTU.zip + 789 + + + nothing + 0 + +`)) + httpmock.RegisterResponder("GET", `https://no-sync.s3.amazonaws.com/?delimiter=%2F&prefix=bin%2F3.5-xenial`, + httpmock.NewStringResponder(http.StatusNotFound, ``)) + + ops := &DefaultAwsOps{sess: sess} + files, err := ops.BucketObjectsETagMap("no-sync", "bin/3.5-xenial", 1, false, BinaryReg) + c.Assert(err.Error(), check.Equals, "something went wrong listing objects: NotFound: Not Found\n"+ + "\tstatus code: 404, request id: , host id: ") + + files, err = ops.BucketObjectsETagMap("sync", "bin/3.5-xenial", 1, false, BinaryReg) + c.Assert(err, check.IsNil) + c.Check(files, check.DeepEquals, map[string]string{ + "HIJKLMN": "456", + "OPQRSTU": "789", + }) +}