Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support S3 Objects List ETag map #126

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions pkg/rsstorage/servers/s3server/s3_enumeration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type AwsOps interface {
BucketDirs(bucket, s3Prefix string) ([]string, error)
BucketObjects(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) ([]string, error)
BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error)
}

type DefaultAwsOps struct {
Expand Down Expand Up @@ -157,6 +158,114 @@ func (a *DefaultAwsOps) BucketObjects(bucket, s3Prefix string, concurrency int,
}
}

func (a *DefaultAwsOps) BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error) {
svc := s3.New(a.sess)

nextMarkerChan := make(chan string, 100)
nextMarkerChan <- ""
defer close(nextMarkerChan)

binaryMeta := make(map[string]string)
binaryL := sync.Mutex{}

wg := sync.WaitGroup{}
waitCh := make(chan struct{})
wg.Add(1)

var ops uint64
var total uint64

errCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// If recursive is not true, include a delimiter so we only list the contents
// of the directory indicated by `s3Prefix`. Otherwise, leave the delimiter nil
// so we list everything recursively.
var delimiter *string
if !recursive {
delimiter = aws.String("/")
}

go func() {
for i := 0; i < concurrency; i++ {
go func() {
for nextMarker := range nextMarkerChan {
wg.Add(1)

query := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: aws.String(s3Prefix),
Delimiter: delimiter,
}

if nextMarker != "" {
query.SetMarker(nextMarker)
}

resp, err := svc.ListObjectsWithContext(ctx, query)
if err != nil {
errCh <- fmt.Errorf("something went wrong listing objects: %s", err)
return
}

nm := ""

if resp.NextMarker != nil {
nm = *resp.NextMarker
nextMarkerChan <- nm
}

// When there are no contents, we need to return
// early.
if len(resp.Contents) == 0 {
wg.Done()
// TODO: `nm` may always be blank when there are no
// contents, so this conditional may be unnecessary.
if nm == "" {
wg.Done()
}
return
}

bm := getObjectsAllMap(resp, s3Prefix, reg)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only line that differs materially from BucketObjects. It calls the new getObjectsAllMap method instead of getObjectsAll.


binaryL.Lock()
for key, val := range bm {
binaryMeta[key] = val
}
binaryL.Unlock()

wg.Done()
atomic.AddUint64(&ops, uint64(len(bm)))
if ops > 1000 {
atomic.AddUint64(&total, atomic.LoadUint64(&ops))
log.Printf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total))
atomic.SwapUint64(&ops, 0)
}

if nm == "" {
wg.Done()
break
}
}
}()
}

wg.Wait()
close(waitCh)
}()

// Block until the wait group is done or we err
select {
case <-waitCh:
return binaryMeta, nil
case err := <-errCh:
cancel()
return nil, err
}
}

var BinaryReg = regexp.MustCompile(`(.+)(\.tar\.gz|\.zip)$`)

func getObjectsAll(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg *regexp.Regexp) []string {
Expand All @@ -176,3 +285,21 @@ func getObjectsAll(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg

return binaryMeta
}

func getObjectsAllMap(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg *regexp.Regexp) map[string]string {
binaryMeta := make(map[string]string)

for _, key := range bucketObjectsList.Contents {

if reg != nil {
if s := reg.FindStringSubmatch(*key.Key); len(s) > 1 {
binaryMeta[strings.TrimPrefix(s[1], s3Prefix)] = *key.ETag
}
} else {
binaryMeta[strings.TrimPrefix(*key.Key, s3Prefix)] = *key.ETag
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is nearly identical to the existing getObjectsAll method except that it returns a map and sets the map key/value pairs here instead of simply appending to a string array.

}

}

return binaryMeta
}
48 changes: 48 additions & 0 deletions pkg/rsstorage/servers/s3server/s3_enumeration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,51 @@ func (s *MetaTestSuite) TestBucketObjects(c *check.C) {
c.Assert(err, check.IsNil)
c.Check(files, check.DeepEquals, []string{"HIJKLMN", "OPQRSTU"})
}

func (s *MetaTestSuite) TestBucketObjectsMap(c *check.C) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
Credentials: credentials.AnonymousCredentials,
})
c.Assert(err, check.IsNil)

httpmock.ActivateNonDefault(sess.Config.HTTPClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", `https://sync.s3.amazonaws.com/?delimiter=%2F&prefix=bin%2F3.5-xenial`,
httpmock.NewStringResponder(http.StatusOK, `<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>sync</Name>
<Prefix>bin/3.5-xenial/</Prefix>
<IsTruncated>false</IsTruncated>
<Contents>
<Key>ABCDEFG.json</Key>
<ETag>123</ETag>
</Contents>
<Contents>
<Key>HIJKLMN.tar.gz</Key>
<ETag>456</ETag>
</Contents>
<Contents>
<Key>OPQRSTU.zip</Key>
<ETag>789</ETag>
</Contents>
<Contents>
<Key>nothing</Key>
<ETag>0</ETag>
</Contents>
</ListBucketResult>`))
httpmock.RegisterResponder("GET", `https://no-sync.s3.amazonaws.com/?delimiter=%2F&prefix=bin%2F3.5-xenial`,
httpmock.NewStringResponder(http.StatusNotFound, ``))

ops := &DefaultAwsOps{sess: sess}
files, err := ops.BucketObjectsETagMap("no-sync", "bin/3.5-xenial", 1, false, BinaryReg)
c.Assert(err.Error(), check.Equals, "something went wrong listing objects: NotFound: Not Found\n"+
"\tstatus code: 404, request id: , host id: ")

files, err = ops.BucketObjectsETagMap("sync", "bin/3.5-xenial", 1, false, BinaryReg)
c.Assert(err, check.IsNil)
c.Check(files, check.DeepEquals, map[string]string{
"HIJKLMN": "456",
"OPQRSTU": "789",
})
}
Loading