Skip to content

Commit

Permalink
Add counter for the total amount of data crawled since startup (#28)
Browse files Browse the repository at this point in the history
Add: counter for the total amount of data crawled since startup
  • Loading branch information
CorentinB authored Oct 26, 2022
1 parent c655036 commit 8f76b75
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 18 deletions.
31 changes: 23 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"net/http"
"os"
"sync"

"github.com/paulbellamy/ratecounter"
)

type HTTPClientSettings struct {
RotatorSettings *RotatorSettings
DedupeOptions DedupeOptions
Proxy string
DecompressBody bool
SkipHTTPStatusCodes []int
VerifyCerts bool
TempDir string
FullOnDisk bool
RotatorSettings *RotatorSettings
DedupeOptions DedupeOptions
Proxy string
DecompressBody bool
SkipHTTPStatusCodes []int
VerifyCerts bool
TempDir string
FullOnDisk bool
MaxReadBeforeTruncate int
}

type CustomHTTPClient struct {
Expand All @@ -29,6 +32,8 @@ type CustomHTTPClient struct {
verifyCerts bool
TempDir string
FullOnDisk bool
MaxReadBeforeTruncate int
DataTotal *ratecounter.Counter
}

func (c *CustomHTTPClient) Close() error {
Expand All @@ -55,6 +60,9 @@ func (c *CustomHTTPClient) Close() error {
func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient *CustomHTTPClient, errChan chan error, err error) {
httpClient = new(CustomHTTPClient)

// Init data counters
httpClient.DataTotal = new(ratecounter.Counter)

// Toggle deduplication options and create map for deduplication records.
httpClient.dedupeOptions = HTTPClientSettings.DedupeOptions
httpClient.dedupeHashTable = new(sync.Map)
Expand Down Expand Up @@ -82,6 +90,13 @@ func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient
// Configure if we are only storing responses only on disk or in memory and on disk.
httpClient.FullOnDisk = HTTPClientSettings.FullOnDisk

// Configure our max read before we start truncating records
if HTTPClientSettings.MaxReadBeforeTruncate == 0 {
httpClient.MaxReadBeforeTruncate = 1000000000
} else {
httpClient.MaxReadBeforeTruncate = HTTPClientSettings.MaxReadBeforeTruncate
}

// Configure the waitgroup
httpClient.WaitGroup = new(WaitGroupWithCount)

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
module github.com/CorentinB/warc

go 1.18
go 1.19

require (
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/klauspost/compress v1.15.1
github.com/klauspost/pgzip v1.2.5
github.com/paulbellamy/ratecounter v0.2.0
github.com/refraction-networking/utls v1.1.1
github.com/satori/go.uuid v1.2.0
go.uber.org/goleak v1.1.12
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/refraction-networking/utls v1.1.1 h1:4p66eNC+MOrL3tI7oMdA5Z8d1TgQXB8fxfuueE9DA7U=
Expand Down
17 changes: 15 additions & 2 deletions warc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"os"
"strings"
"sync"

"github.com/paulbellamy/ratecounter"
)

// RotatorSettings is used to store the settings
Expand All @@ -28,8 +30,18 @@ type RotatorSettings struct {
WARCWriterPoolSize int
}

// Create mutex to ensure we are generating WARC files one at a time and not naming them the same thing.
var fileMutex sync.Mutex
var (
// Create mutex to ensure we are generating WARC files one at a time and not naming them the same thing.
fileMutex sync.Mutex

// Create a counter to keep track of the number of bytes written to WARC files
DataTotal *ratecounter.Counter
)

func init() {
// Initialize the counters
DataTotal = new(ratecounter.Counter)
}

// NewWARCRotator creates and return a channel that can be used
// to communicate records to be written to WARC files to the
Expand Down Expand Up @@ -179,6 +191,7 @@ func recordWriter(settings *RotatorSettings, records chan *RecordBatch, done cha
warcWriter.CloseCompressedWriter()
}
}

warcWriter.FileWriter.Flush()

if recordBatch.Done != nil {
Expand Down
21 changes: 14 additions & 7 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ type Record struct {
// WriteRecord writes a record to the underlying WARC file.
// A record consists of a version string, the record header followed by a
// record content block and two newlines:
// Version CLRF
// Header-Key: Header-Value CLRF
// CLRF
// Content
// CLRF
// CLRF
//
// Version CLRF
// Header-Key: Header-Value CLRF
// CLRF
// Content
// CLRF
// CLRF
func (w *Writer) WriteRecord(r *Record) (recordID string, err error) {
defer r.Content.Close()

var written int64

// Add the mandatories headers
if r.Header.Get("WARC-Date") == "" {
r.Header.Set("WARC-Date", time.Now().UTC().Format(time.RFC3339Nano))
Expand Down Expand Up @@ -92,10 +95,14 @@ func (w *Writer) WriteRecord(r *Record) (recordID string, err error) {
}

r.Content.Seek(0, 0)
if _, err := io.Copy(w.FileWriter, r.Content); err != nil {
if written, err = io.Copy(w.FileWriter, r.Content); err != nil {
return recordID, err
}

if written > 0 {
DataTotal.Incr(written)
}

if _, err := io.WriteString(w.FileWriter, "\r\n\r\n"); err != nil {
return recordID, err
}
Expand Down

0 comments on commit 8f76b75

Please sign in to comment.