Skip to content

Commit

Permalink
use mimirpb.PreallocTimeseries in influx parser
Browse files Browse the repository at this point in the history
Signed-off-by: alexgreenbank <[email protected]>
  • Loading branch information
alexgreenbank committed Dec 19, 2024
1 parent de27d4b commit af3def1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
12 changes: 2 additions & 10 deletions pkg/distributor/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,16 @@ func influxRequestParser(ctx context.Context, r *http.Request, maxSize int, _ *u
spanLogger.SetTag("content_encoding", r.Header.Get("Content-Encoding"))
spanLogger.SetTag("content_length", r.ContentLength)

ts, bytesRead, err := influxpush.ParseInfluxLineReader(ctx, r, maxSize)
pts, bytesRead, err := influxpush.ParseInfluxLineReader(ctx, r, maxSize)
level.Debug(spanLogger).Log("msg", "decodeAndConvert complete", "bytesRead", bytesRead)
if err != nil {
level.Error(logger).Log("msg", "failed to parse Influx push request", "err", err)
return bytesRead, err
}

// Sigh, a write API optimisation needs me to jump through hoops.
pts := make([]mimirpb.PreallocTimeseries, 0, len(ts))
for i := range ts {
pts = append(pts, mimirpb.PreallocTimeseries{
TimeSeries: &ts[i],
})
}

level.Debug(spanLogger).Log(
"msg", "Influx to Prometheus conversion complete",
"metric_count", len(ts),
"metric_count", len(pts),
)

req.Timeseries = pts
Expand Down
26 changes: 15 additions & 11 deletions pkg/distributor/influxpush/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
const internalLabel = "__mimir_source__"

// ParseInfluxLineReader parses a Influx Line Protocol request from an io.Reader.
func ParseInfluxLineReader(_ context.Context, r *http.Request, maxSize int) ([]mimirpb.TimeSeries, int, error) {
func ParseInfluxLineReader(_ context.Context, r *http.Request, maxSize int) ([]mimirpb.PreallocTimeseries, int, error) {
qp := r.URL.Query()
precision := qp.Get("precision")
if precision == "" {
Expand Down Expand Up @@ -57,32 +57,34 @@ func ParseInfluxLineReader(_ context.Context, r *http.Request, maxSize int) ([]m
return ts, dataLen, err
}

func writeRequestFromInfluxPoints(points []models.Point) ([]mimirpb.TimeSeries, error) {
func writeRequestFromInfluxPoints(points []models.Point) ([]mimirpb.PreallocTimeseries, error) {
// Technically the same series should not be repeated. We should put all the samples for
// a series in single client.Timeseries. Having said that doing it is not very optimal and the
// occurrence of multiple timestamps for the same series is rare. Only reason I see it happening is
// for backfilling and this is not the API for that. Keeping that in mind, we are going to create a new
// client.Timeseries for each sample.

var returnTs []mimirpb.TimeSeries
returnTs := mimirpb.PreallocTimeseriesSliceFromPool()[:0]
if cap(returnTs) < len(points) {
returnTs = make([]mimirpb.PreallocTimeseries, 0, len(points))
}
for _, pt := range points {
ts, err := influxPointToTimeseries(pt)
var err error
err, returnTs = influxPointToTimeseries(pt, returnTs)
if err != nil {
return nil, err
}
returnTs = append(returnTs, ts...)
}

return returnTs, nil
}

// Points to Prometheus is heavily inspired from https://github.com/prometheus/influxdb_exporter/blob/a1dc16ad596a990d8854545ea39a57a99a3c7c43/main.go#L148-L211
func influxPointToTimeseries(pt models.Point) ([]mimirpb.TimeSeries, error) {
var returnTs []mimirpb.TimeSeries
func influxPointToTimeseries(pt models.Point, returnTs []mimirpb.PreallocTimeseries) (error, []mimirpb.PreallocTimeseries) {

fields, err := pt.Fields()
if err != nil {
return nil, fmt.Errorf("can't get fields from point: %w", err)
return fmt.Errorf("can't get fields from point: %w", err), returnTs
}
for field, v := range fields {
var value float64
Expand Down Expand Up @@ -132,16 +134,18 @@ func influxPointToTimeseries(pt models.Point) ([]mimirpb.TimeSeries, error) {
return lbls[i].Name < lbls[j].Name
})

returnTs = append(returnTs, mimirpb.TimeSeries{
ts := mimirpb.TimeSeries{
Labels: lbls,
Samples: []mimirpb.Sample{{
TimestampMs: util.TimeToMillis(pt.Time()),
Value: value,
}},
})
}

returnTs = append(returnTs, mimirpb.PreallocTimeseries{TimeSeries: &ts})
}

return returnTs, nil
return nil, returnTs
}

// analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]")
Expand Down

0 comments on commit af3def1

Please sign in to comment.