-
Notifications
You must be signed in to change notification settings - Fork 4
/
client.go
152 lines (123 loc) · 4.01 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package warc
import (
"net/http"
"os"
"sync"
"time"
"github.com/paulbellamy/ratecounter"
)
type Error struct {
Err error
Func string
}
type HTTPClientSettings struct {
RotatorSettings *RotatorSettings
DedupeOptions DedupeOptions
Proxy string
DecompressBody bool
SkipHTTPStatusCodes []int
VerifyCerts bool
TempDir string
FullOnDisk bool
MaxReadBeforeTruncate int
FollowRedirects bool
TCPTimeout time.Duration
TLSHandshakeTimeout time.Duration
}
type CustomHTTPClient struct {
http.Client
WARCWriter chan *RecordBatch
WARCWriterDoneChannels []chan bool
WaitGroup *WaitGroupWithCount
dedupeHashTable *sync.Map
dedupeOptions DedupeOptions
skipHTTPStatusCodes []int
ErrChan chan *Error
verifyCerts bool
TempDir string
FullOnDisk bool
MaxReadBeforeTruncate int
DataTotal *ratecounter.Counter
}
func (c *CustomHTTPClient) Close() error {
var wg sync.WaitGroup
c.WaitGroup.Wait()
c.CloseIdleConnections()
close(c.WARCWriter)
wg.Add(len(c.WARCWriterDoneChannels))
for _, doneChan := range c.WARCWriterDoneChannels {
go func(done chan bool) {
defer wg.Done()
<-done
}(doneChan)
}
wg.Wait()
close(c.ErrChan)
return nil
}
func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient *CustomHTTPClient, err error) {
httpClient = new(CustomHTTPClient)
// Init data counters
httpClient.DataTotal = new(ratecounter.Counter)
// Toggle deduplication options and create map for deduplication records.
httpClient.dedupeOptions = HTTPClientSettings.DedupeOptions
httpClient.dedupeHashTable = new(sync.Map)
// Set default deduplication threshold to 1024 bytes
if httpClient.dedupeOptions.SizeThreshold == 0 {
httpClient.dedupeOptions.SizeThreshold = 1024
}
// Configure HTTP status code skipping (usually 429)
httpClient.skipHTTPStatusCodes = HTTPClientSettings.SkipHTTPStatusCodes
// Create an error channel for sending WARC errors through
httpClient.ErrChan = make(chan *Error)
// Toggle verification of certificates
// InsecureSkipVerify expects the opposite of the verifyCerts flag, as such we flip it.
httpClient.verifyCerts = !HTTPClientSettings.VerifyCerts
// Configure WARC temporary file directory
if HTTPClientSettings.TempDir != "" {
httpClient.TempDir = HTTPClientSettings.TempDir
err = os.MkdirAll(httpClient.TempDir, os.ModePerm)
if err != nil {
return nil, err
}
}
// Configure if we are only storing responses only on disk or in memory and on disk.
httpClient.FullOnDisk = HTTPClientSettings.FullOnDisk
// Configure our max read before we start truncating records
if HTTPClientSettings.MaxReadBeforeTruncate == 0 {
httpClient.MaxReadBeforeTruncate = 1000000000
} else {
httpClient.MaxReadBeforeTruncate = HTTPClientSettings.MaxReadBeforeTruncate
}
// Configure the waitgroup
httpClient.WaitGroup = new(WaitGroupWithCount)
// Configure WARC writer
httpClient.WARCWriter, httpClient.WARCWriterDoneChannels, err = HTTPClientSettings.RotatorSettings.NewWARCRotator()
if err != nil {
return nil, err
}
// Configure HTTP client
if !HTTPClientSettings.FollowRedirects {
httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
}
// Verify timeouts and set default values
if HTTPClientSettings.TCPTimeout == 0 {
HTTPClientSettings.TCPTimeout = 10 * time.Second
}
if HTTPClientSettings.TLSHandshakeTimeout == 0 {
HTTPClientSettings.TLSHandshakeTimeout = 10 * time.Second
}
// Configure custom dialer / transport
customDialer, err := newCustomDialer(httpClient, HTTPClientSettings.Proxy, HTTPClientSettings.TCPTimeout)
if err != nil {
return nil, err
}
customTransport, err := newCustomTransport(customDialer, HTTPClientSettings.DecompressBody, HTTPClientSettings.TLSHandshakeTimeout)
if err != nil {
return nil, err
}
httpClient.Transport = customTransport
return httpClient, nil
}