diff --git a/pkg/distributor/influx.go b/pkg/distributor/influx.go index 1eb9deaa92..82ae887ddd 100644 --- a/pkg/distributor/influx.go +++ b/pkg/distributor/influx.go @@ -30,24 +30,16 @@ func influxRequestParser(ctx context.Context, r *http.Request, maxSize int, _ *u spanLogger.SetTag("content_encoding", r.Header.Get("Content-Encoding")) spanLogger.SetTag("content_length", r.ContentLength) - ts, bytesRead, err := influxpush.ParseInfluxLineReader(ctx, r, maxSize) + pts, bytesRead, err := influxpush.ParseInfluxLineReader(ctx, r, maxSize) level.Debug(spanLogger).Log("msg", "decodeAndConvert complete", "bytesRead", bytesRead) if err != nil { level.Error(logger).Log("msg", "failed to parse Influx push request", "err", err) return bytesRead, err } - // Sigh, a write API optimisation needs me to jump through hoops. - pts := make([]mimirpb.PreallocTimeseries, 0, len(ts)) - for i := range ts { - pts = append(pts, mimirpb.PreallocTimeseries{ - TimeSeries: &ts[i], - }) - } - level.Debug(spanLogger).Log( "msg", "Influx to Prometheus conversion complete", - "metric_count", len(ts), + "metric_count", len(pts), ) req.Timeseries = pts diff --git a/pkg/distributor/influxpush/parser.go b/pkg/distributor/influxpush/parser.go index 8b3156da69..0f0a1c6efe 100644 --- a/pkg/distributor/influxpush/parser.go +++ b/pkg/distributor/influxpush/parser.go @@ -22,7 +22,7 @@ import ( const internalLabel = "__mimir_source__" // ParseInfluxLineReader parses a Influx Line Protocol request from an io.Reader. -func ParseInfluxLineReader(_ context.Context, r *http.Request, maxSize int) ([]mimirpb.TimeSeries, int, error) { +func ParseInfluxLineReader(_ context.Context, r *http.Request, maxSize int) ([]mimirpb.PreallocTimeseries, int, error) { qp := r.URL.Query() precision := qp.Get("precision") if precision == "" { @@ -57,32 +57,34 @@ func ParseInfluxLineReader(_ context.Context, r *http.Request, maxSize int) ([]m return ts, dataLen, err } -func writeRequestFromInfluxPoints(points []models.Point) ([]mimirpb.TimeSeries, error) { +func writeRequestFromInfluxPoints(points []models.Point) ([]mimirpb.PreallocTimeseries, error) { // Technically the same series should not be repeated. We should put all the samples for // a series in single client.Timeseries. Having said that doing it is not very optimal and the // occurrence of multiple timestamps for the same series is rare. Only reason I see it happening is // for backfilling and this is not the API for that. Keeping that in mind, we are going to create a new // client.Timeseries for each sample. - var returnTs []mimirpb.TimeSeries + returnTs := mimirpb.PreallocTimeseriesSliceFromPool()[:0] + if cap(returnTs) < len(points) { + returnTs = make([]mimirpb.PreallocTimeseries, 0, len(points)) + } for _, pt := range points { - ts, err := influxPointToTimeseries(pt) + var err error + err, returnTs = influxPointToTimeseries(pt, returnTs) if err != nil { return nil, err } - returnTs = append(returnTs, ts...) } return returnTs, nil } // Points to Prometheus is heavily inspired from https://github.com/prometheus/influxdb_exporter/blob/a1dc16ad596a990d8854545ea39a57a99a3c7c43/main.go#L148-L211 -func influxPointToTimeseries(pt models.Point) ([]mimirpb.TimeSeries, error) { - var returnTs []mimirpb.TimeSeries +func influxPointToTimeseries(pt models.Point, returnTs []mimirpb.PreallocTimeseries) (error, []mimirpb.PreallocTimeseries) { fields, err := pt.Fields() if err != nil { - return nil, fmt.Errorf("can't get fields from point: %w", err) + return fmt.Errorf("can't get fields from point: %w", err), returnTs } for field, v := range fields { var value float64 @@ -132,16 +134,18 @@ func influxPointToTimeseries(pt models.Point) ([]mimirpb.TimeSeries, error) { return lbls[i].Name < lbls[j].Name }) - returnTs = append(returnTs, mimirpb.TimeSeries{ + ts := mimirpb.TimeSeries{ Labels: lbls, Samples: []mimirpb.Sample{{ TimestampMs: util.TimeToMillis(pt.Time()), Value: value, }}, - }) + } + + returnTs = append(returnTs, mimirpb.PreallocTimeseries{TimeSeries: &ts}) } - return returnTs, nil + return nil, returnTs } // analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]")