Skip to content

Commit

Permalink
Support S3 Objects List ETag map
Browse files Browse the repository at this point in the history
  • Loading branch information
jonyoder committed Dec 4, 2023
1 parent 3bdf6d3 commit 2383a72
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
127 changes: 127 additions & 0 deletions pkg/rsstorage/servers/s3server/s3_enumeration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type AwsOps interface {
BucketDirs(bucket, s3Prefix string) ([]string, error)
BucketObjects(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) ([]string, error)
BucketObjectsMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error)
}

type DefaultAwsOps struct {
Expand Down Expand Up @@ -157,6 +158,114 @@ func (a *DefaultAwsOps) BucketObjects(bucket, s3Prefix string, concurrency int,
}
}

func (a *DefaultAwsOps) BucketObjectsMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error) {
svc := s3.New(a.sess)

nextMarkerChan := make(chan string, 100)
nextMarkerChan <- ""
defer close(nextMarkerChan)

binaryMeta := make(map[string]string)
binaryL := sync.Mutex{}

wg := sync.WaitGroup{}
waitCh := make(chan struct{})
wg.Add(1)

var ops uint64
var total uint64

errCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// If recursive is not true, include a delimiter so we only list the contents
// of the directory indicated by `s3Prefix`. Otherwise, leave the delimiter nil
// so we list everything recursively.
var delimiter *string
if !recursive {
delimiter = aws.String("/")
}

go func() {
for i := 0; i < concurrency; i++ {
go func() {
for nextMarker := range nextMarkerChan {
wg.Add(1)

query := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: aws.String(s3Prefix),
Delimiter: delimiter,
}

if nextMarker != "" {
query.SetMarker(nextMarker)
}

resp, err := svc.ListObjectsWithContext(ctx, query)
if err != nil {
errCh <- fmt.Errorf("something went wrong listing objects: %s", err)
return
}

nm := ""

if resp.NextMarker != nil {
nm = *resp.NextMarker
nextMarkerChan <- nm
}

// When there are no contents, we need to return
// early.
if len(resp.Contents) == 0 {
wg.Done()
// TODO: `nm` may always be blank when there are no
// contents, so this conditional may be unnecessary.
if nm == "" {
wg.Done()
}
return
}

bm := getObjectsAllMap(resp, s3Prefix, reg)

binaryL.Lock()
for key, val := range bm {
binaryMeta[key] = val
}
binaryL.Unlock()

wg.Done()
atomic.AddUint64(&ops, uint64(len(bm)))
if ops > 1000 {
atomic.AddUint64(&total, atomic.LoadUint64(&ops))
log.Printf("For S3 prefix %s parsed %d files", s3Prefix, atomic.LoadUint64(&total))
atomic.SwapUint64(&ops, 0)
}

if nm == "" {
wg.Done()
break
}
}
}()
}

wg.Wait()
close(waitCh)
}()

// Block until the wait group is done or we err
select {
case <-waitCh:
return binaryMeta, nil
case err := <-errCh:
cancel()
return nil, err
}
}

var BinaryReg = regexp.MustCompile(`(.+)(\.tar\.gz|\.zip)$`)

func getObjectsAll(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg *regexp.Regexp) []string {
Expand All @@ -176,3 +285,21 @@ func getObjectsAll(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg

return binaryMeta
}

func getObjectsAllMap(bucketObjectsList *s3.ListObjectsOutput, s3Prefix string, reg *regexp.Regexp) map[string]string {
binaryMeta := make(map[string]string)

for _, key := range bucketObjectsList.Contents {

if reg != nil {
if s := reg.FindStringSubmatch(*key.Key); len(s) > 1 {
binaryMeta[strings.TrimPrefix(s[1], s3Prefix)] = *key.ETag
}
} else {
binaryMeta[strings.TrimPrefix(*key.Key, s3Prefix)] = *key.ETag
}

}

return binaryMeta
}
48 changes: 48 additions & 0 deletions pkg/rsstorage/servers/s3server/s3_enumeration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,51 @@ func (s *MetaTestSuite) TestBucketObjects(c *check.C) {
c.Assert(err, check.IsNil)
c.Check(files, check.DeepEquals, []string{"HIJKLMN", "OPQRSTU"})
}

func (s *MetaTestSuite) TestBucketObjectsMap(c *check.C) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
Credentials: credentials.AnonymousCredentials,
})
c.Assert(err, check.IsNil)

httpmock.ActivateNonDefault(sess.Config.HTTPClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", `https://sync.s3.amazonaws.com/?delimiter=%2F&prefix=bin%2F3.5-xenial`,
httpmock.NewStringResponder(http.StatusOK, `<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>sync</Name>
<Prefix>bin/3.5-xenial/</Prefix>
<IsTruncated>false</IsTruncated>
<Contents>
<Key>ABCDEFG.json</Key>
<ETag>123</ETag>
</Contents>
<Contents>
<Key>HIJKLMN.tar.gz</Key>
<ETag>456</ETag>
</Contents>
<Contents>
<Key>OPQRSTU.zip</Key>
<ETag>789</ETag>
</Contents>
<Contents>
<Key>nothing</Key>
<ETag>0</ETag>
</Contents>
</ListBucketResult>`))
httpmock.RegisterResponder("GET", `https://no-sync.s3.amazonaws.com/?delimiter=%2F&prefix=bin%2F3.5-xenial`,
httpmock.NewStringResponder(http.StatusNotFound, ``))

ops := &DefaultAwsOps{sess: sess}
files, err := ops.BucketObjectsMap("no-sync", "bin/3.5-xenial", 1, false, BinaryReg)
c.Assert(err.Error(), check.Equals, "something went wrong listing objects: NotFound: Not Found\n"+
"\tstatus code: 404, request id: , host id: ")

files, err = ops.BucketObjectsMap("sync", "bin/3.5-xenial", 1, false, BinaryReg)
c.Assert(err, check.IsNil)
c.Check(files, check.DeepEquals, map[string]string{
"HIJKLMN": "456",
"OPQRSTU": "789",
})
}

0 comments on commit 2383a72

Please sign in to comment.