Skip to content

Commit

Permalink
Fix revisit records (#30)
Browse files Browse the repository at this point in the history
* Fix: revisit record's dates

* Fix: revisit date

Add test to ensure we are writing correct revisit records. This will ensure the date is correct as well as double checking the digest between the requests.

* feat: add size threshold for writing revisit records.

---------

Co-authored-by: Jake LaFountain <[email protected]>
  • Loading branch information
CorentinB and NGTmeaty authored Apr 4, 2023
1 parent fd94387 commit c6e08a4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 17 deletions.
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient
httpClient.dedupeOptions = HTTPClientSettings.DedupeOptions
httpClient.dedupeHashTable = new(sync.Map)

// Set default deduplication threshold to 1024 bytes
if httpClient.dedupeOptions.SizeThreshold == 0 {
httpClient.dedupeOptions.SizeThreshold = 1024
}

// Configure HTTP status code skipping (usually 429)
httpClient.skipHTTPStatusCodes = HTTPClientSettings.SkipHTTPStatusCodes

Expand Down
6 changes: 4 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func TestHTTPClientLocalDedupe(t *testing.T) {

for _, path := range files {
testFileSingleHashCheck(t, path, "sha1:UIRWL5DFIPQ4MX3D3GFHM2HCVU3TZ6I3", []string{"26882", "142"}, 2)
testFileRevisitVailidity(t, path)
}
}

Expand Down Expand Up @@ -490,7 +491,7 @@ func TestHTTPClientRemoteDedupe(t *testing.T) {
}
}()

for i := 0; i < 2; i++ {
for i := 0; i < 4; i++ {
req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
t.Fatal(err)
Expand All @@ -515,7 +516,8 @@ func TestHTTPClientRemoteDedupe(t *testing.T) {
}

for _, path := range files {
testFileSingleHashCheck(t, path, "sha1:UIRWL5DFIPQ4MX3D3GFHM2HCVU3TZ6I3", []string{"26882", "142"}, 2)
testFileSingleHashCheck(t, path, "sha1:UIRWL5DFIPQ4MX3D3GFHM2HCVU3TZ6I3", []string{"26882", "142"}, 4)
testFileRevisitVailidity(t, path)
}
}

Expand Down
14 changes: 6 additions & 8 deletions dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (
"net/http"
"net/url"
"strings"
"time"
)

type DedupeOptions struct {
LocalDedupe bool
CDXDedupe bool
CDXURL string
LocalDedupe bool
CDXDedupe bool
CDXURL string
SizeThreshold int
}

type revisitRecord struct {
responseUUID string
targetURI string
date time.Time
date string
}

func (d *customDialer) checkLocalRevisit(digest string) revisitRecord {
Expand All @@ -44,12 +44,10 @@ func checkCDXRevisit(CDXURL string, digest string, targetURI string) (revisitRec
cdxReply := strings.Fields(string(body))

if len(cdxReply) >= 7 {
CDXDate, _ := time.Parse("20060102150405", cdxReply[1])

return revisitRecord{
responseUUID: "",
targetURI: cdxReply[2],
date: CDXDate,
date: cdxReply[1],
}, nil
}

Expand Down
16 changes: 9 additions & 7 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (d *customDialer) writeWARCFromConnection(reqPipe, respPipe *io.PipeReader,
d.client.dedupeHashTable.Store(r.Header.Get("WARC-Payload-Digest")[5:], revisitRecord{
responseUUID: recordIDs[i],
targetURI: warcTargetURI,
date: time.Now().UTC(),
date: batch.CaptureTime,
})
}
}
Expand All @@ -255,7 +255,7 @@ func (d *customDialer) readResponse(respPipe *io.PipeReader, warcTargetURIChanne
responseRecord.Header.Set("Content-Type", "application/http; msgtype=response")

// Read the response from the pipe
_, err := io.Copy(responseRecord.Content, respPipe)
bytesCopied, err := io.Copy(responseRecord.Content, respPipe)
if err != nil {
return err
}
Expand Down Expand Up @@ -287,16 +287,18 @@ func (d *customDialer) readResponse(respPipe *io.PipeReader, warcTargetURIChanne

// Write revisit record if local or CDX dedupe is activated
var revisit = revisitRecord{}
if d.client.dedupeOptions.LocalDedupe {
revisit = d.checkLocalRevisit(payloadDigest)
} else if d.client.dedupeOptions.CDXDedupe {
revisit, _ = checkCDXRevisit(d.client.dedupeOptions.CDXURL, payloadDigest, warcTargetURI)
if bytesCopied >= int64(d.client.dedupeOptions.SizeThreshold) {
if d.client.dedupeOptions.LocalDedupe {
revisit = d.checkLocalRevisit(payloadDigest)
} else if d.client.dedupeOptions.CDXDedupe {
revisit, _ = checkCDXRevisit(d.client.dedupeOptions.CDXURL, payloadDigest, warcTargetURI)
}
}

if revisit.targetURI != "" {
responseRecord.Header.Set("WARC-Type", "revisit")
responseRecord.Header.Set("WARC-Refers-To-Target-URI", revisit.targetURI)
responseRecord.Header.Set("WARC-Refers-To-Date", revisit.date.UTC().Format(time.RFC3339))
responseRecord.Header.Set("WARC-Refers-To-Date", revisit.date)

if revisit.responseUUID != "" {
responseRecord.Header.Set("WARC-Refers-To", "<urn:uuid:"+revisit.responseUUID+">")
Expand Down
56 changes: 56 additions & 0 deletions read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,62 @@ func testFileSingleHashCheck(t *testing.T, path string, hash string, expectedCon
return -1
}

func testFileRevisitVailidity(t *testing.T, path string) {
file, err := os.Open(path)
if err != nil {
t.Fatalf("failed to open %q: %v", path, err)
}
defer file.Close()

t.Logf("checking 'WARC-Refers-To-Date' and 'WARC-Payload-Digest' for revisits on %q", path)

reader, err := NewReader(file)
if err != nil {
t.Fatalf("warc.NewReader failed for %q: %v", path, err)
}

var originalTime string
var originalDigest string

for {
record, err := reader.ReadRecord()

if err == io.EOF {
return
}

if err != nil {
record.Content.Close()
t.Fatalf("warc.ReadRecord failed: %v", err)
break
}

if record.Header.Get("WARC-Type") != "response" && record.Header.Get("WARC-Type") != "revisit" {
// We're not currently interesting in anything but response and revisit records at the moment.
record.Content.Close()
continue
}

if record.Header.Get("WARC-Type") == "response" {
originalDigest = record.Header.Get("WARC-Payload-Digest")
originalTime = record.Header.Get("WARC-Date")
record.Content.Close()
continue
}

if record.Header.Get("WARC-Type") == "revisit" {
if record.Header.Get("WARC-Payload-Digest") == originalDigest && record.Header.Get("WARC-Refers-To-Date") == originalTime {
record.Content.Close()
continue
} else {
record.Content.Close()
t.Fatalf("Revisit digest or date does not match doesn't match intended result %s != %s (or %s != %s)", record.Header.Get("WARC-Payload-Digest"), originalDigest, record.Header.Get("WARC-Refers-To-Date"), originalTime)
}
}

}
}

func TestReader(t *testing.T) {
var paths = []string{
"testdata/test.warc.gz",
Expand Down

0 comments on commit c6e08a4

Please sign in to comment.