forked from attic-labs/noms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_table_reader.go
141 lines (122 loc) · 4.49 KB
/
s3_table_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"fmt"
"io"
"net"
"os"
"time"
"golang.org/x/sys/unix"
"github.com/attic-labs/noms/go/d"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/jpillora/backoff"
)
const (
s3RangePrefix = "bytes"
s3BlockSize = (1 << 10) * 512 // 512K
)
type s3TableReaderAt struct {
s3 *s3ObjectReader
h addr
}
type s3svc interface {
AbortMultipartUpload(input *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error)
CreateMultipartUpload(input *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error)
UploadPart(input *s3.UploadPartInput) (*s3.UploadPartOutput, error)
UploadPartCopy(input *s3.UploadPartCopyInput) (*s3.UploadPartCopyOutput, error)
CompleteMultipartUpload(input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error)
GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error)
}
func (s3tra *s3TableReaderAt) ReadAtWithStats(p []byte, off int64, stats *Stats) (n int, err error) {
return s3tra.s3.ReadAt(s3tra.h, p, off, stats)
}
// TODO: Bring all the multipart upload and remote-conjoin stuff over here and make this a better analogue to ddbTableStore
type s3ObjectReader struct {
s3 s3svc
bucket string
readRl chan struct{}
tc tableCache
}
func (s3or *s3ObjectReader) ReadAt(name addr, p []byte, off int64, stats *Stats) (n int, err error) {
t1 := time.Now()
if s3or.tc != nil {
r := s3or.tc.checkout(name)
if r != nil {
defer func() {
stats.FileBytesPerRead.Sample(uint64(len(p)))
stats.FileReadLatency.SampleTimeSince(t1)
}()
defer s3or.tc.checkin(name)
return r.ReadAt(p, off)
}
}
defer func() {
stats.S3BytesPerRead.Sample(uint64(len(p)))
stats.S3ReadLatency.SampleTimeSince(t1)
}()
return s3or.readRange(name, p, s3RangeHeader(off, int64(len(p))))
}
func s3RangeHeader(off, length int64) string {
lastByte := off + length - 1 // insanely, the HTTP range header specifies ranges inclusively.
return fmt.Sprintf("%s=%d-%d", s3RangePrefix, off, lastByte)
}
func (s3or *s3ObjectReader) ReadFromEnd(name addr, p []byte, stats *Stats) (n int, err error) {
// TODO: enable this to use the tableCache. The wrinkle is the tableCache currently just returns a ReaderAt, which doesn't give you the length of the object that backs it, so you can't calculate an offset if all you know is that you want the last N bytes.
defer func(t1 time.Time) {
stats.S3BytesPerRead.Sample(uint64(len(p)))
stats.S3ReadLatency.SampleTimeSince(t1)
}(time.Now())
return s3or.readRange(name, p, fmt.Sprintf("%s=-%d", s3RangePrefix, len(p)))
}
func (s3or *s3ObjectReader) readRange(name addr, p []byte, rangeHeader string) (n int, err error) {
read := func() (int, error) {
if s3or.readRl != nil {
s3or.readRl <- struct{}{}
defer func() {
<-s3or.readRl
}()
}
input := &s3.GetObjectInput{
Bucket: aws.String(s3or.bucket),
Key: aws.String(name.String()),
Range: aws.String(rangeHeader),
}
result, err := s3or.s3.GetObject(input)
d.PanicIfError(err)
d.PanicIfFalse(*result.ContentLength == int64(len(p)))
n, err := io.ReadFull(result.Body, p)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed ranged read from S3\n%s\nerr type: %T\nerror: %v\n", input.GoString(), err, err)
}
return n, err
}
n, err = read()
// We hit the point of diminishing returns investigating #3255, so add retries. In conversations with AWS people, it's not surprising to get transient failures when talking to S3, though SDKs are intended to have their own retrying. The issue may be that, in Go, making the S3 request and reading the data are separate operations, and the SDK kind of can't do its own retrying to handle failures in the latter.
if isConnReset(err) {
// We are backing off here because its possible and likely that the rate of requests to S3 is the underlying issue.
b := &backoff.Backoff{
Min: 128 * time.Microsecond,
Max: 1024 * time.Millisecond,
Factor: 2,
Jitter: true,
}
for ; isConnReset(err); n, err = read() {
dur := b.Duration()
fmt.Fprintf(os.Stderr, "Retrying S3 read in %s\n", dur.String())
time.Sleep(dur)
}
}
return
}
func isConnReset(err error) bool {
nErr, ok := err.(*net.OpError)
if !ok {
return false
}
scErr, ok := nErr.Err.(*os.SyscallError)
return ok && scErr.Err == unix.ECONNRESET
}