Skip to content

Commit

Permalink
feat(ingest): support setting fields for csv without header row
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmalkmus committed Apr 3, 2024
1 parent 465ef08 commit 1fbdf5c
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 52 deletions.
28 changes: 22 additions & 6 deletions axiom/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"net/url"
"strings"
"time"
"unicode"

Expand Down Expand Up @@ -342,7 +343,7 @@ func (s *DatasetsService) Ingest(ctx context.Context, id string, r io.Reader, ty
return nil, spanError(span, err)
}

if err = setEventLabels(req, opts.EventLabels); err != nil {
if err = setOptionHeaders(req, opts, typ); err != nil {
return nil, spanError(span, err)
}

Expand Down Expand Up @@ -399,6 +400,10 @@ func (s *DatasetsService) IngestEvents(ctx context.Context, id string, events []
))
defer span.End()

if len(events) == 0 {
return &ingest.Status{}, nil
}

// Apply supplied options.
var opts ingest.Options
for _, option := range options {
Expand All @@ -407,10 +412,6 @@ func (s *DatasetsService) IngestEvents(ctx context.Context, id string, events []
}
}

if len(events) == 0 {
return &ingest.Status{}, nil
}

path, err := url.JoinPath("/v1/datasets", id, "ingest")
if err != nil {
return nil, spanError(span, err)
Expand Down Expand Up @@ -461,7 +462,7 @@ func (s *DatasetsService) IngestEvents(ctx context.Context, id string, events []
}
req.GetBody = getBody

if err = setEventLabels(req, opts.EventLabels); err != nil {
if err = setOptionHeaders(req, opts, NDJSON); err != nil {
return nil, spanError(span, err)
}

Expand Down Expand Up @@ -796,6 +797,21 @@ func setLegacyQueryResultOnSpan(span trace.Span, res querylegacy.Result) {
)
}

func setOptionHeaders(req *http.Request, opts ingest.Options, typ ContentType) error {
// Set event labels.
if err := setEventLabels(req, opts.EventLabels); err != nil {
return err
}

// Set object/csv fields. The former is only valid for JSON and NDJSON as
// the latter is obviously only valid for CSV. Both are equally optional.
if typ == CSV {
req.Header.Set("X-Axiom-CSV-Fields", strings.Join(opts.CSVFields, ","))
}

return nil
}

func setEventLabels(req *http.Request, labels map[string]any) error {
if len(labels) == 0 {
return nil
Expand Down
120 changes: 78 additions & 42 deletions axiom/datasets_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,37 @@ import (
"github.com/axiomhq/axiom-go/axiom/querylegacy"
)

const ingestData = `[
{
"time": "17/May/2015:08:05:30 +0000",
"remote_ip": "93.180.71.1",
"remote_user": "-",
"request": "GET /downloads/product_1 HTTP/1.1",
"response": 304,
"bytes": 0,
"referrer": "-",
"agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
},
{
"time": "17/May/2015:08:05:31 +0000",
"remote_ip": "93.180.71.2",
"remote_user": "-",
"request": "GET /downloads/product_1 HTTP/1.1",
"response": 304,
"bytes": 0,
"referrer": "-",
"agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
}
]`
const (
ingestData = `[
{
"time": "17/May/2015:08:05:30 +0000",
"remote_ip": "93.180.71.1",
"remote_user": "-",
"request": "GET /downloads/product_1 HTTP/1.1",
"response": 304,
"bytes": 0,
"referrer": "-",
"agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
},
{
"time": "17/May/2015:08:05:31 +0000",
"remote_ip": "93.180.71.2",
"remote_user": "-",
"request": "GET /downloads/product_1 HTTP/1.1",
"response": 304,
"bytes": 0,
"referrer": "-",
"agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
}
]`

csvIngestData = `17/May/2015:08:05:30 +0000,93.180.71.1,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21
17/May/2015:08:05:31 +0000,93.180.71.2,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)`

csvIngestDataHeader = `time,remote_ip,remote_user,request,response,bytes,referrer,agent
17/May/2015:08:05:30 +0000,93.180.71.1,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21
17/May/2015:08:05:31 +0000,93.180.71.2,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)`
)

var ingestEvents = []axiom.Event{
{
Expand Down Expand Up @@ -137,9 +146,9 @@ func (s *DatasetsTestSuite) Test() {
ingested bytes.Buffer
r io.Reader

resetBuffer = func(contentEncoders ...axiom.ContentEncoder) {
resetBuffer = func(data string, contentEncoders ...axiom.ContentEncoder) {
ingested.Reset()
r = io.TeeReader(strings.NewReader(ingestData), &ingested)
r = io.TeeReader(strings.NewReader(data), &ingested)

for _, contentEncoder := range contentEncoders {
var ceErr error
Expand All @@ -149,7 +158,7 @@ func (s *DatasetsTestSuite) Test() {
}
)

resetBuffer()
resetBuffer(ingestData)
ingestStatus, err := s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.JSON, axiom.Identity, ingest.SetEventLabel("region", "eu-west-1"))
s.Require().NoError(err)
s.Require().NotNil(ingestStatus)
Expand All @@ -160,7 +169,7 @@ func (s *DatasetsTestSuite) Test() {
s.EqualValues(ingested.Len()+22, ingestStatus.ProcessedBytes) // 22 bytes extra for the event label

// ... but gzip encoded...
resetBuffer(axiom.GzipEncoder())
resetBuffer(ingestData, axiom.GzipEncoder())
ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.JSON, axiom.Gzip)
s.Require().NoError(err)
s.Require().NotNil(ingestStatus)
Expand All @@ -171,7 +180,7 @@ func (s *DatasetsTestSuite) Test() {
s.EqualValues(ingested.Len(), ingestStatus.ProcessedBytes)

// ... but zstd encoded...
resetBuffer(axiom.ZstdEncoder())
resetBuffer(ingestData, axiom.ZstdEncoder())
ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.JSON, axiom.Zstd)
s.Require().NoError(err)
s.Require().NotNil(ingestStatus)
Expand All @@ -182,7 +191,6 @@ func (s *DatasetsTestSuite) Test() {
s.EqualValues(ingested.Len(), ingestStatus.ProcessedBytes)

// ... and from a map source...
resetBuffer()
ingestStatus, err = s.client.Datasets.IngestEvents(s.ctx, s.dataset.ID, ingestEvents)
s.Require().NoError(err)
s.Require().NotNil(ingestStatus)
Expand All @@ -192,8 +200,7 @@ func (s *DatasetsTestSuite) Test() {
s.Empty(ingestStatus.Failures)
s.EqualValues(448, int(ingestStatus.ProcessedBytes))

// ... and from a channel source.
resetBuffer()
// ... and from a channel source ...
ingestStatus, err = s.client.Datasets.IngestChannel(s.ctx, s.dataset.ID, getEventChan())
s.Require().NoError(err)
s.Require().NotNil(ingestStatus)
Expand All @@ -203,6 +210,29 @@ func (s *DatasetsTestSuite) Test() {
s.Empty(ingestStatus.Failures)
s.EqualValues(448, int(ingestStatus.ProcessedBytes))

// ... and from a CSV reader source with header...
resetBuffer(csvIngestDataHeader)
ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.CSV, axiom.Identity)
s.Require().NoError(err)
s.Require().NotNil(ingestStatus)

s.EqualValues(ingestStatus.Ingested, 2)
s.Zero(ingestStatus.Failed)
s.Empty(ingestStatus.Failures)
s.EqualValues(325, int(ingestStatus.ProcessedBytes))

// ... and from a CSV reader source without header.
resetBuffer(csvIngestData)
ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.CSV, axiom.Identity,
ingest.SetCSVFields("time", "remote_ip", "remote_user", "request", "response", "bytes", "referrer", "agent"))
s.Require().NoError(err)
s.Require().NotNil(ingestStatus)

s.EqualValues(ingestStatus.Ingested, 2)
s.Zero(ingestStatus.Failed)
s.Empty(ingestStatus.Failures)
s.EqualValues(258, int(ingestStatus.ProcessedBytes))

now := time.Now().Truncate(time.Second)
startTime := now.Add(-time.Minute)
endTime := now.Add(time.Minute)
Expand All @@ -216,9 +246,9 @@ func (s *DatasetsTestSuite) Test() {
s.Require().NoError(err)
s.Require().NotNil(queryResult)

s.EqualValues(10, queryResult.Status.RowsExamined)
s.EqualValues(10, queryResult.Status.RowsMatched)
s.Len(queryResult.Matches, 10)
s.EqualValues(14, queryResult.Status.RowsExamined)
s.EqualValues(14, queryResult.Status.RowsMatched)
s.Len(queryResult.Matches, 14)

// Also run a legacy query and make sure we see some results.
legacyQueryResult, err := s.client.Datasets.QueryLegacy(s.ctx, s.dataset.ID, querylegacy.Query{
Expand All @@ -228,9 +258,9 @@ func (s *DatasetsTestSuite) Test() {
s.Require().NoError(err)
s.Require().NotNil(legacyQueryResult)

s.EqualValues(10, legacyQueryResult.Status.RowsExamined)
s.EqualValues(10, legacyQueryResult.Status.RowsMatched)
s.Len(legacyQueryResult.Matches, 10)
s.EqualValues(14, legacyQueryResult.Status.RowsExamined)
s.EqualValues(14, legacyQueryResult.Status.RowsMatched)
s.Len(legacyQueryResult.Matches, 14)

// Run a more complex legacy query.
complexLegacyQuery := querylegacy.Query{
Expand All @@ -245,9 +275,15 @@ func (s *DatasetsTestSuite) Test() {
},
GroupBy: []string{"success", "remote_ip"},
Filter: querylegacy.Filter{
Op: querylegacy.OpEqual,
Op: querylegacy.OpExists,
Field: "response",
Value: 304,
Children: []querylegacy.Filter{
{
Op: querylegacy.OpContains,
Field: "request",
Value: "GET",
},
},
},
Order: []querylegacy.Order{
{
Expand All @@ -262,7 +298,7 @@ func (s *DatasetsTestSuite) Test() {
VirtualFields: []querylegacy.VirtualField{
{
Alias: "success",
Expression: "response < 400",
Expression: "toint(response) < 400",
},
},
Projections: []querylegacy.Projection{
Expand All @@ -277,12 +313,12 @@ func (s *DatasetsTestSuite) Test() {
s.Require().NoError(err)
s.Require().NotNil(complexLegacyQueryResult)

s.EqualValues(10, complexLegacyQueryResult.Status.RowsExamined)
s.EqualValues(10, complexLegacyQueryResult.Status.RowsMatched)
s.EqualValues(14, complexLegacyQueryResult.Status.RowsExamined)
s.EqualValues(14, complexLegacyQueryResult.Status.RowsMatched)
if s.Len(complexLegacyQueryResult.Buckets.Totals, 2) {
agg := complexLegacyQueryResult.Buckets.Totals[0].Aggregations[0]
s.EqualValues("event_count", agg.Alias)
s.EqualValues(5, agg.Value)
s.EqualValues(7, agg.Value)
}

// Trim the dataset down to a minimum.
Expand Down
6 changes: 3 additions & 3 deletions axiom/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,13 @@ func TestDatasetsService_Ingest(t *testing.T) {
hf := func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodPost, r.Method)
assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type"))
eventLabels := assertValidJSON(t, strings.NewReader(r.Header.Get("X-Axiom-Event-Labels")))
assert.Equal(t, "eu-west-1", eventLabels[0].(map[string]any)["region"])
assert.EqualValues(t, 1, eventLabels[0].(map[string]any)["instance"])

assert.Equal(t, "time", r.URL.Query().Get("timestamp-field"))
assert.Equal(t, "2/Jan/2006:15:04:05 +0000", r.URL.Query().Get("timestamp-format"))
assert.Equal(t, ";", r.URL.Query().Get("csv-delimiter"))
eventLabels := assertValidJSON(t, strings.NewReader(r.Header.Get("X-Axiom-Event-Labels")))
assert.Equal(t, "eu-west-1", eventLabels[0].(map[string]any)["region"])
assert.EqualValues(t, 1, eventLabels[0].(map[string]any)["instance"])

_ = assertValidJSON(t, r.Body)

Expand Down
21 changes: 20 additions & 1 deletion axiom/ingest/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Options struct {
// event data. This is especially useful when ingesting events from a
// third-party source that you do not have control over.
EventLabels map[string]any `url:"-"`
// Fields is a list of fields to be ingested with every event. This is only
// valid for CSV content and also completely optional. It comes in handy
// when the CSV content does not have a header row.
CSVFields []string `url:"-"`
}

// An Option applies optional parameters to an ingest operation.
Expand Down Expand Up @@ -52,7 +56,7 @@ func SetCSVDelimiter(delim string) Option {
func SetEventLabel(key string, value any) Option {
return func(o *Options) {
if o.EventLabels == nil {
o.EventLabels = make(map[string]any)
o.EventLabels = make(map[string]any, 1)
}
o.EventLabels[key] = value
}
Expand All @@ -63,3 +67,18 @@ func SetEventLabel(key string, value any) Option {
func SetEventLabels(labels map[string]any) Option {
return func(o *Options) { o.EventLabels = labels }
}

// AddCSVField adds one or more fields to be ingested with every CSV event.
func AddCSVField(field ...string) Option {
return func(o *Options) {
if o.CSVFields == nil {
o.CSVFields = make([]string, 0, len(field))
}
o.CSVFields = append(o.CSVFields, field...)
}
}

// SetCSVFields sets the fields to be ingested with every CSV event.
func SetCSVFields(fields ...string) Option {
return func(o *Options) { o.CSVFields = fields }
}
38 changes: 38 additions & 0 deletions axiom/ingest/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,44 @@ func TestOptions(t *testing.T) {
},
},
},
{
name: "add csv field",
options: []ingest.Option{
ingest.AddCSVField("foo"),
},
want: ingest.Options{
CSVFields: []string{"foo"},
},
},
{
name: "add multiple csv fields",
options: []ingest.Option{
ingest.AddCSVField("foo"),
ingest.AddCSVField("bar", "baz"),
},
want: ingest.Options{
CSVFields: []string{"foo", "bar", "baz"},
},
},
{
name: "set csv fields",
options: []ingest.Option{
ingest.SetCSVFields("foo", "bar"),
},
want: ingest.Options{
CSVFields: []string{"foo", "bar"},
},
},
{
name: "set csv fields on existing csv fields",
options: []ingest.Option{
ingest.SetCSVFields("foo", "bar"),
ingest.SetCSVFields("bar", "foo"),
},
want: ingest.Options{
CSVFields: []string{"bar", "foo"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 1fbdf5c

Please sign in to comment.