Skip to content

Commit

Permalink
Add argument to ReadRecord to process payload fully locally, instead …
Browse files Browse the repository at this point in the history
…of loading everything in RAM
  • Loading branch information
CorentinB committed Dec 23, 2020
1 parent c64f63f commit 3e3b48a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 12 deletions.
67 changes: 58 additions & 9 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func readUntilDelim(r reader, delim []byte) (line []byte, err error) {
}

// ReadRecord reads the next record from the opened WARC file.
func (r *Reader) ReadRecord() (*Record, error) {
// If onDisk is set to true, then the record's payload will be
// written to a temp file on disk, and specified in the *Record.PayloadPath,
// else, everything happen in memory.
func (r *Reader) ReadRecord(onDisk bool) (*Record, error) {
r.gzipReader.Multistream(false)

// Dump gzip block to a temporary file
Expand Down Expand Up @@ -108,16 +111,62 @@ func (r *Reader) ReadRecord() (*Record, error) {
}
}

content, err := ioutil.ReadAll(tempReader)
if err != nil {
return nil, err
}
// If onDisk is specified, then we write the payload to a new temp file
if onDisk {
payloadTempFile, err := ioutil.TempFile("", "warc-reading-*")
if err != nil {
return nil, err
}
defer payloadTempFile.Close()

// Copy all the payload (including the potential trailing CRLF)
// to a newly created temporary file
_, err = io.Copy(payloadTempFile, tempReader)
if err != nil {
payloadTempFile.Close()
os.Remove(payloadTempFile.Name())
return nil, err
}

// Check if the last 4 bytes are \r\n\r\n,
// if yes, then we truncate the last 4 bytes
buf := make([]byte, 16)
stats, err := os.Stat(payloadTempFile.Name())
if err != nil {
payloadTempFile.Close()
os.Remove(payloadTempFile.Name())
return nil, err
}

content = bytes.TrimSuffix(content, []byte("\r\n\r\n"))
start := stats.Size() - 16
_, err = payloadTempFile.ReadAt(buf, start)
if err != nil {
payloadTempFile.Close()
os.Remove(payloadTempFile.Name())
return nil, err
}

r.record = &Record{
Header: header,
Content: bytes.NewReader(content),
if bytes.HasSuffix(buf, []byte("\r\n\r\n")) {
os.Truncate(payloadTempFile.Name(), stats.Size()-4)
}

r.record = &Record{
Header: header,
Content: nil,
PayloadPath: payloadTempFile.Name(),
}
} else {
content, err := ioutil.ReadAll(tempReader)
if err != nil {
return nil, err
}

content = bytes.TrimSuffix(content, []byte("\r\n\r\n"))

r.record = &Record{
Header: header,
Content: bytes.NewReader(content),
}
}

// Reset the reader for the next block
Expand Down
6 changes: 3 additions & 3 deletions read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func testFileHash(t *testing.T, path string) {
}

for {
record, err := reader.ReadRecord()
record, err := reader.ReadRecord(false)
if err != nil {
if err != io.EOF {
t.Fatalf("failed to read all record content: %v", err)
Expand Down Expand Up @@ -58,7 +58,7 @@ func testFileScan(t *testing.T, path string) {

total := 0
for {
if _, err := reader.ReadRecord(); err != nil {
if _, err := reader.ReadRecord(false); err != nil {
break
}
total++
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestSimpleWriteRead(t *testing.T) {
// We read the records and test if we get the expected output
for i, testRecord := range testRecords {
t.Logf("reading record %d", i)
record, err := reader.ReadRecord()
record, err := reader.ReadRecord(false)
if err != nil {
t.Fatalf("expected record, got %v", err)
}
Expand Down

0 comments on commit 3e3b48a

Please sign in to comment.