Skip to content

Commit

Permalink
ZSTD Dictionary support (#41)
Browse files Browse the repository at this point in the history
* Add example dictionary generated from test WARCs with `zstd --train`

* Update .gitignore to include zstd warcs

* feat: update packages and run tidy.

* fix: use 'proper' dictionary that decodes properly.

* feat: initial commit of zstd dictionary support.

This commit allows an external ZSTD generated dictionary to be used in the compression process. This implementation will be spec complaint against the IIPC spec and currently works with all known ZSTD WARC tools. It is currently a WIP and needs additional testing and validation to ensure everything is working correctly.

* feat: ensure TLS handshake time is being respected

* fix: run fieldalignment

warc/client.go:15:25: struct of size 176 could be 144
warc/client.go:31:23: struct of size 232 could be 216
warc/dedupe.go:23:20: struct with 32 pointer bytes could be 24
warc/dedupe.go:31:20: struct with 48 pointer bytes could be 40
warc/dialer.go:24:19: struct with 168 pointer bytes could be 160
warc/random_local_ip.go:16:19: struct with 24 pointer bytes could be 8
warc/spooled.go:40:22: struct of size 80 could be 72
warc.go:15:22: struct with 72 pointer bytes could be 64
write.go:19:13: struct with 64 pointer bytes could be 56
warc/write.go:32:18: struct with 40 pointer bytes could be 32

* fix: add comments back

* fix: run fieldalignment (again?)

warc/client.go:15:25: struct with 96 pointer bytes could be 88
warc/client.go:31:23: struct with 176 pointer bytes could be 168
  • Loading branch information
NGTmeaty authored Sep 13, 2024
1 parent 66a771b commit d23f809
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 47 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ warcs/**
temp/**
output/**
warc
*.warc.gz
*.warc.gz
*.warc.zst
32 changes: 18 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,36 @@ type Error struct {

type HTTPClientSettings struct {
RotatorSettings *RotatorSettings
DedupeOptions DedupeOptions
Proxy string
DecompressBody bool
SkipHTTPStatusCodes []int
VerifyCerts bool
TempDir string
FullOnDisk bool
MaxReadBeforeTruncate int
FollowRedirects bool
SkipHTTPStatusCodes []int
DedupeOptions DedupeOptions
DialTimeout time.Duration
ResponseHeaderTimeout time.Duration
TLSHandshakeTimeout time.Duration
MaxReadBeforeTruncate int
TCPTimeout time.Duration
DecompressBody bool
FollowRedirects bool
FullOnDisk bool
VerifyCerts bool
RandomLocalIP bool
}

type CustomHTTPClient struct {
WARCWriter chan *RecordBatch
WaitGroup *WaitGroupWithCount
dedupeHashTable *sync.Map
ErrChan chan *Error
http.Client
WARCWriter chan *RecordBatch
TempDir string
WARCWriterDoneChannels []chan bool
WaitGroup *WaitGroupWithCount
dedupeHashTable *sync.Map
dedupeOptions DedupeOptions
skipHTTPStatusCodes []int
ErrChan chan *Error
dedupeOptions DedupeOptions
MaxReadBeforeTruncate int
TLSHandshakeTimeout time.Duration
verifyCerts bool
TempDir string
FullOnDisk bool
MaxReadBeforeTruncate int
randomLocalIP bool
}

Expand Down Expand Up @@ -142,6 +144,8 @@ func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient
HTTPClientSettings.TLSHandshakeTimeout = 10 * time.Second
}

httpClient.TLSHandshakeTimeout = HTTPClientSettings.TLSHandshakeTimeout

// Configure custom dialer / transport
customDialer, err := newCustomDialer(httpClient, HTTPClientSettings.Proxy, HTTPClientSettings.DialTimeout)
if err != nil {
Expand Down
139 changes: 139 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,145 @@ func TestHTTPClientWithoutChunkEncoding(t *testing.T) {
}
}

func TestHTTPClientWithZStandard(t *testing.T) {
var (
rotatorSettings = NewRotatorSettings()
errWg sync.WaitGroup
err error
)

// init test HTTP endpoint
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fileBytes, err := os.ReadFile(path.Join("testdata", "image.svg"))
if err != nil {
t.Fatal(err)
}

w.Header().Set("Content-Type", "image/svg+xml")
w.WriteHeader(http.StatusOK)
w.Write(fileBytes)
}))
defer server.Close()

rotatorSettings.OutputDirectory, err = os.MkdirTemp("", "warc-tests-")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(rotatorSettings.OutputDirectory)

rotatorSettings.Prefix = "TESTZSTD"
rotatorSettings.Compression = "ZSTD"

// init the HTTP client responsible for recording HTTP(s) requests / responses
httpClient, err := NewWARCWritingHTTPClient(HTTPClientSettings{RotatorSettings: rotatorSettings})
if err != nil {
t.Fatalf("Unable to init WARC writing HTTP client: %s", err)
}

errWg.Add(1)
go func() {
defer errWg.Done()
for err := range httpClient.ErrChan {
t.Errorf("Error writing to WARC: %s", err.Err.Error())
}
}()

req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
t.Fatal(err)
}

resp, err := httpClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

io.Copy(io.Discard, resp.Body)

httpClient.Close()

files, err := filepath.Glob(rotatorSettings.OutputDirectory + "/*")
if err != nil {
t.Fatal(err)
}

for _, path := range files {
testFileSingleHashCheck(t, path, "sha1:UIRWL5DFIPQ4MX3D3GFHM2HCVU3TZ6I3", []string{"26872"}, 1)
}
}

func TestHTTPClientWithZStandardDictionary(t *testing.T) {
var (
rotatorSettings = NewRotatorSettings()
errWg sync.WaitGroup
err error
)

// init test HTTP endpoint
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fileBytes, err := os.ReadFile(path.Join("testdata", "image.svg"))
if err != nil {
t.Fatal(err)
}

w.Header().Set("Content-Type", "image/svg+xml")
w.WriteHeader(http.StatusOK)
w.Write(fileBytes)
}))
defer server.Close()

rotatorSettings.OutputDirectory, err = os.MkdirTemp("", "warc-tests-")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(rotatorSettings.OutputDirectory)

rotatorSettings.Prefix = "TESTZSTDDICT"
rotatorSettings.Compression = "ZSTD"

// Use predefined compression dictionary in testdata to compress with.
rotatorSettings.CompressionDictionary = "testdata/dictionary"

// init the HTTP client responsible for recording HTTP(s) requests / responses
httpClient, err := NewWARCWritingHTTPClient(HTTPClientSettings{RotatorSettings: rotatorSettings})
if err != nil {
t.Fatalf("Unable to init WARC writing HTTP client: %s", err)
}

errWg.Add(1)
go func() {
defer errWg.Done()
for err := range httpClient.ErrChan {
t.Errorf("Error writing to WARC: %s", err.Err.Error())
}
}()

req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
t.Fatal(err)
}

resp, err := httpClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

io.Copy(io.Discard, resp.Body)

httpClient.Close()

files, err := filepath.Glob(rotatorSettings.OutputDirectory + "/*")
if err != nil {
t.Fatal(err)
}

for _, path := range files {
testFileSingleHashCheck(t, path, "sha1:UIRWL5DFIPQ4MX3D3GFHM2HCVU3TZ6I3", []string{"26872"}, 1)
}
}

func BenchmarkConcurrentUnder2MB(b *testing.B) {
var (
rotatorSettings = NewRotatorSettings()
Expand Down
6 changes: 3 additions & 3 deletions dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ var CDXHTTPClient = http.Client{
}

type DedupeOptions struct {
LocalDedupe bool
CDXDedupe bool
CDXURL string
CDXCookie string
SizeThreshold int
LocalDedupe bool
CDXDedupe bool
}

type revisitRecord struct {
responseUUID string
targetURI string
size int
date string
size int
}

func (d *customDialer) checkLocalRevisit(digest string) revisitRecord {
Expand Down
4 changes: 2 additions & 2 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
)

type customDialer struct {
net.Dialer
proxyDialer proxy.Dialer
client *CustomHTTPClient
net.Dialer
}

func newCustomDialer(httpClient *CustomHTTPClient, proxyURL string, DialTimeout time.Duration) (d *customDialer, err error) {
Expand Down Expand Up @@ -154,7 +154,7 @@ func (d *customDialer) CustomDialTLS(network, address string) (net.Conn, error)
}

errc := make(chan error, 2)
timer := time.AfterFunc(time.Second, func() {
timer := time.AfterFunc(d.client.TLSHandshakeTimeout, func() {
errc <- errors.New("TLS handshake timeout")
})

Expand Down
2 changes: 1 addition & 1 deletion random_local_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ var (
)

type availableIPs struct {
IPs []net.IP
sync.Mutex
Index uint32
IPs []net.IP
}

func getAvailableIPs() (IPs []net.IP, err error) {
Expand Down
2 changes: 1 addition & 1 deletion spooled.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type spooledTempFile struct {
file *os.File
filePrefix string
tempDir string
fullOnDisk bool
maxInMemorySize int
fullOnDisk bool
reading bool // transitions at most once from false -> true
closed bool
}
Expand Down
Binary file added testdata/dictionary
Binary file not shown.
74 changes: 59 additions & 15 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha1"
"crypto/sha256"
"encoding/base32"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -113,7 +114,7 @@ func isLineStartingWithHTTPMethod(line string) bool {
}

// NewWriter creates a new WARC writer.
func NewWriter(writer io.Writer, fileName string, compression string, contentLengthHeader string) (*Writer, error) {
func NewWriter(writer io.Writer, fileName string, compression string, contentLengthHeader string, newFileCreation bool, dictionary []byte) (*Writer, error) {
if compression != "" {
if compression == "GZIP" {
gzipWriter := gzip.NewWriter(writer)
Expand All @@ -125,16 +126,58 @@ func NewWriter(writer io.Writer, fileName string, compression string, contentLen
FileWriter: bufio.NewWriter(gzipWriter),
}, nil
} else if compression == "ZSTD" {
zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
if err != nil {
return nil, err
if newFileCreation && len(dictionary) > 0 {
dictionaryZstdwriter, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
if err != nil {
return nil, err
}

// Compress dictionary with ZSTD.
// TODO: Option to allow uncompressed dictionary (maybe? not sure there's any need.)
payload := dictionaryZstdwriter.EncodeAll(dictionary, nil)

// Magic number for skippable dictionary frame (0x184D2A5D).
// https://github.com/ArchiveTeam/wget-lua/releases/tag/v1.20.3-at.20200401.01
// https://iipc.github.io/warc-specifications/specifications/warc-zstd/
magic := uint32(0x184D2A5D)

// Create the frame header (magic + payload size)
header := make([]byte, 8)
binary.LittleEndian.PutUint32(header[:4], magic)
binary.LittleEndian.PutUint32(header[4:], uint32(len(payload)))

// Combine header and payload together into a full frame.
frame := append(header, payload...)

// Write generated frame directly to WARC file.
// The regular ZStandard writer will continue afterwards with normal ZStandard frames.
writer.Write(frame)
}

// Create ZStandard writer either with or without the encoder dictionary and return it.
if len(dictionary) > 0 {
zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.SpeedBetterCompression), zstd.WithEncoderDict(dictionary))
if err != nil {
return nil, err
}
return &Writer{
FileName: fileName,
Compression: compression,
ZSTDWriter: zstdWriter,
FileWriter: bufio.NewWriter(zstdWriter),
}, nil
} else {
zstdWriter, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
if err != nil {
return nil, err
}
return &Writer{
FileName: fileName,
Compression: compression,
ZSTDWriter: zstdWriter,
FileWriter: bufio.NewWriter(zstdWriter),
}, nil
}
return &Writer{
FileName: fileName,
Compression: compression,
ZSTDWriter: zstdWriter,
FileWriter: bufio.NewWriter(zstdWriter),
}, nil
}
return nil, errors.New("invalid compression algorithm: " + compression)
}
Expand Down Expand Up @@ -166,11 +209,12 @@ func NewRecordBatch() *RecordBatch {
// and initialize it with default values
func NewRotatorSettings() *RotatorSettings {
return &RotatorSettings{
WarcinfoContent: NewHeader(),
Prefix: "WARC",
WarcSize: 1000,
Compression: "GZIP",
OutputDirectory: "./",
WarcinfoContent: NewHeader(),
Prefix: "WARC",
WarcSize: 1000,
Compression: "GZIP",
CompressionDictionary: "",
OutputDirectory: "./",
}
}

Expand Down
4 changes: 4 additions & 0 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ func TestNewRotatorSettings(t *testing.T) {
if rotatorSettings.Compression != "GZIP" {
t.Error("Failed to set WARC rotator's compression algorithm")
}

if rotatorSettings.CompressionDictionary != "" {
t.Error("Failed to set WARC rotator's compression dictionary")
}
}
Loading

0 comments on commit d23f809

Please sign in to comment.