Skip to content

Commit

Permalink
Add GZIP compression to collector endpoint (#95)
Browse files Browse the repository at this point in the history
* Add GZIP compression to collector endpoint

* Check 'Accept-Encoding' header
  • Loading branch information
Ivaka authored Jan 26, 2024
1 parent 3b2b8f8 commit 71a3541
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 7 deletions.
48 changes: 45 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"compress/gzip"
"context"
"encoding/binary"
"fmt"
Expand All @@ -22,6 +23,11 @@ import (
"github.com/castai/egressd/pb"
)

var (
acceptEncoding = http.CanonicalHeaderKey("Accept-Encoding")
contentEncoding = http.CanonicalHeaderKey("Content-Encoding")
)

func CurrentTimeGetter() func() time.Time {
return func() time.Time {
return time.Now()
Expand Down Expand Up @@ -139,10 +145,20 @@ func (c *Collector) GetRawNetworkMetricsHandler(w http.ResponseWriter, req *http
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := w.Write(batchBytes); err != nil {
c.log.Errorf("write batch: %v", err)
return

enc := req.Header.Get(acceptEncoding)
if strings.Contains(strings.ToLower(enc), "gzip") {
if err := c.writeGzipBody(w, batchBytes); err != nil {
c.log.Errorf("write batch %v", err)
return
}
} else {
if err := c.writePlainBody(w, batchBytes); err != nil {
c.log.Errorf("write batch %v", err)
return
}
}

if c.cfg.SendTrafficDelta {
// reset metric tx/rx values, so only delta numbers will be sent with the next batch
for _, m := range c.podMetrics {
Expand All @@ -154,6 +170,32 @@ func (c *Collector) GetRawNetworkMetricsHandler(w http.ResponseWriter, req *http
}
}

func (c *Collector) writeGzipBody(w http.ResponseWriter, body []byte) error {
writer, err := gzip.NewWriterLevel(w, gzip.BestCompression)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return fmt.Errorf("cannot create gzip writer: %w", err)
}
defer writer.Close()

w.Header().Add(contentEncoding, "gzip")

if _, err := writer.Write(body); err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return err
}

return nil
}

func (c *Collector) writePlainBody(w http.ResponseWriter, body []byte) error {
if _, err := w.Write(body); err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return err
}
return nil
}

// collect aggregates conntract records into reduced pod metrics.
func (c *Collector) collect() error {
start := time.Now()
Expand Down
27 changes: 23 additions & 4 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package collector

import (
"compress/gzip"
"io"
"net/http"
"net/http/httptest"
"sort"
Expand Down Expand Up @@ -420,11 +422,19 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {

w := httptest.NewRecorder()
// scrape pods metrics and prepare them to be sent
coll.GetRawNetworkMetricsHandler(w, &http.Request{})
coll.GetRawNetworkMetricsHandler(w, &http.Request{Header: http.Header{
"Accept-Encoding": []string{"gzip"},
}})
r.Equal(200, w.Code)

batch := &pb.RawNetworkMetricBatch{}
err := proto.Unmarshal(w.Body.Bytes(), batch)
gzreader, err := gzip.NewReader(w.Body)
r.NoError(err)

body, err := io.ReadAll(gzreader)
r.NoError(err)

err = proto.Unmarshal(body, batch)
r.NoError(err)
r.Len(batch.Items, 2)
sort.Slice(batch.Items, func(i, j int) bool {
Expand Down Expand Up @@ -507,11 +517,20 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {

w := httptest.NewRecorder()
// scrape pods metrics and prepare them to be sent
coll.GetRawNetworkMetricsHandler(w, &http.Request{})
coll.GetRawNetworkMetricsHandler(w, &http.Request{Header: http.Header{
"Accept-Encoding": []string{"gzip"},
}})
r.Equal(200, w.Code)

batch := &pb.RawNetworkMetricBatch{}
err := proto.Unmarshal(w.Body.Bytes(), batch)

gzreader, err := gzip.NewReader(w.Body)
r.NoError(err)

body, err := io.ReadAll(gzreader)
r.NoError(err)

err = proto.Unmarshal(body, batch)
r.NoError(err)
r.Len(batch.Items, 2)
sort.Slice(batch.Items, func(i, j int) bool {
Expand Down

0 comments on commit 71a3541

Please sign in to comment.