diff --git a/go.mod b/go.mod index cf8371c8b3c..800daf8da79 100644 --- a/go.mod +++ b/go.mod @@ -276,7 +276,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240820071320-b0cc5ec7c595 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index e330586c7d2..bc718c72eef 100644 --- a/go.sum +++ b/go.sum @@ -524,8 +524,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 h1:Vkq7Jib3/vBZO3oyxzSUcbUdpyK2rm3CzFIJyt4vtfQ= -github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03/go.mod h1:cNDAD0ooSyLfNtakmnGbChNg7JPYmKsRn7CQ01Rpu2E= +github.com/grafana/mimir-prometheus v0.0.0-20240820071320-b0cc5ec7c595 h1:s9H0moHZeFYlDQ1Eich7N0gPfNXUjB6C7ARauy85mTA= +github.com/grafana/mimir-prometheus v0.0.0-20240820071320-b0cc5ec7c595/go.mod h1:cNDAD0ooSyLfNtakmnGbChNg7JPYmKsRn7CQ01Rpu2E= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 h1:AJKOtDKAOg8XNFnIZSmqqqutoTSxVlRs6vekL2p2KEY= diff --git a/vendor/github.com/prometheus/prometheus/promql/engine.go b/vendor/github.com/prometheus/prometheus/promql/engine.go index 94df60e00bf..c2d7c690af6 100644 --- a/vendor/github.com/prometheus/prometheus/promql/engine.go +++ b/vendor/github.com/prometheus/prometheus/promql/engine.go @@ -713,6 +713,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval lookbackDelta: s.LookbackDelta, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, + querier: querier, } query.sampleStats.InitStepTracking(start, start, 1) @@ -771,6 +772,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval lookbackDelta: s.LookbackDelta, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, + querier: querier, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(s.Expr) @@ -939,6 +941,7 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s evalRange = 0 hints.By, hints.Grouping = extractGroupsFromPath(path) n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...) + n.SelectHints = hints case *parser.MatrixSelector: evalRange = n.Range @@ -977,10 +980,10 @@ func extractGroupsFromPath(p []parser.Node) (bool, []string) { return false, nil } -func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations.Annotations, error) { +func (ev *evaluator) checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations.Annotations, error) { switch e := expr.(type) { case *parser.MatrixSelector: - return checkAndExpandSeriesSet(ctx, e.VectorSelector) + return ev.checkAndExpandSeriesSet(ctx, e.VectorSelector) case *parser.VectorSelector: if e.Series != nil { return nil, nil @@ -992,6 +995,7 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations } } e.Series = series + ev.selectHints = e.SelectHints return ws, err } return nil, nil @@ -1033,6 +1037,11 @@ type evaluator struct { lookbackDelta time.Duration samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 + + querier storage.Querier + + // selectHints is the top-most SelectHints in the expression tree. + selectHints *storage.SelectHints } // errorf causes a panic with the input formatted into an error. @@ -1102,6 +1111,10 @@ type EvalNodeHelper struct { rightSigs map[string]Sample matchedSigs map[string]map[uint64]struct{} resultMetric map[string]labels.Labels + + // For base and info vector matching. + infoSamplesBySig map[string]Sample + labelBuilder *labels.ScratchBuilder } func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) { @@ -1420,6 +1433,64 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping return result, warnings } +// expandSeriesToMatrix expands a set of storage.Series to a Matrix. +func (ev *evaluator) expandSeriesToMatrix(series []storage.Series, offset time.Duration, start, end, interval int64) Matrix { + numSteps := int((end-start)/interval) + 1 + + mat := make(Matrix, 0, len(series)) + var prevSS *Series + it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator + for _, s := range series { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } + + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) + ss := Series{ + Metric: s.Labels(), + } + + for ts, step := start, -1; ts <= end; ts += interval { + step++ + _, f, h, ok := ev.vectorSelectorSingle(it, offset, ts) + if ok { + if h == nil { + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtStep(step, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if ss.Floats == nil { + ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) + } + ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) + } else { + point := HPoint{H: h, T: ts} + histSize := point.size() + ev.currentSamples += histSize + ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize)) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if ss.Histograms == nil { + ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) + } + ss.Histograms = append(ss.Histograms, point) + } + } + } + + if len(ss.Floats)+len(ss.Histograms) > 0 { + mat = append(mat, ss) + prevSS = &mat[len(mat)-1] + } + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + return mat +} + // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { @@ -1564,6 +1635,8 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio return ev.evalLabelReplace(e.Args) case "label_join": return ev.evalLabelJoin(e.Args) + case "info": + return ev.evalInfo(ev.ctx, e.Args) } if !matrixArg { @@ -1594,7 +1667,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio sel := arg.(*parser.MatrixSelector) selVS := sel.VectorSelector.(*parser.VectorSelector) - ws, err := checkAndExpandSeriesSet(ev.ctx, sel) + ws, err := ev.checkAndExpandSeriesSet(ev.ctx, sel) warnings.Merge(ws) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings}) @@ -1840,60 +1913,11 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio return String{V: e.Val, T: ev.startTimestamp}, nil case *parser.VectorSelector: - ws, err := checkAndExpandSeriesSet(ev.ctx, e) + ws, err := ev.checkAndExpandSeriesSet(ev.ctx, e) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } - mat := make(Matrix, 0, len(e.Series)) - var prevSS *Series - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) - var chkIter chunkenc.Iterator - for i, s := range e.Series { - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { - ev.error(err) - } - chkIter = s.Iterator(chkIter) - it.Reset(chkIter) - ss := Series{ - Metric: e.Series[i].Labels(), - } - - for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { - step++ - _, f, h, ok := ev.vectorSelectorSingle(it, e, ts) - if ok { - if h == nil { - ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtStep(step, 1) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - if ss.Floats == nil { - ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) - } - ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) - } else { - point := HPoint{H: h, T: ts} - histSize := point.size() - ev.currentSamples += histSize - ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize)) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - if ss.Histograms == nil { - ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) - } - ss.Histograms = append(ss.Histograms, point) - } - } - } - - if len(ss.Floats)+len(ss.Histograms) > 0 { - mat = append(mat, ss) - prevSS = &mat[len(mat)-1] - } - } - ev.samplesStats.UpdatePeak(ev.currentSamples) + mat := ev.expandSeriesToMatrix(e.Series, e.Offset, ev.startTimestamp, ev.endTimestamp, ev.interval) return mat, ws case *parser.MatrixSelector: @@ -2036,7 +2060,7 @@ func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) { } func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { - ws, err := checkAndExpandSeriesSet(ev.ctx, vs) + ws, err := ev.checkAndExpandSeriesSet(ev.ctx, vs) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } @@ -2058,7 +2082,7 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec vec := make(Vector, 0, len(vs.Series)) for i, s := range vs.Series { it := seriesIterators[i] - t, _, _, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) + t, _, _, ok := ev.vectorSelectorSingle(it, vs.Offset, enh.Ts) if !ok { continue } @@ -2082,10 +2106,10 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec } // vectorSelectorSingle evaluates an instant vector for the iterator of one time series. -func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) ( +func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, offset time.Duration, ts int64) ( int64, float64, *histogram.FloatHistogram, bool, ) { - refTime := ts - durationMilliseconds(node.Offset) + refTime := ts - durationMilliseconds(offset) var t int64 var v float64 var h *histogram.FloatHistogram @@ -2194,7 +2218,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota it = storage.NewBuffer(durationMilliseconds(node.Range)) ) - ws, err := checkAndExpandSeriesSet(ev.ctx, node) + ws, err := ev.checkAndExpandSeriesSet(ev.ctx, node) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } diff --git a/vendor/github.com/prometheus/prometheus/promql/functions.go b/vendor/github.com/prometheus/prometheus/promql/functions.go index 018023bf028..c71248a3f84 100644 --- a/vendor/github.com/prometheus/prometheus/promql/functions.go +++ b/vendor/github.com/prometheus/prometheus/promql/functions.go @@ -1493,7 +1493,7 @@ func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annot // === label_join(vector model.ValVector, dest_labelname, separator, src_labelname...) (Vector, Annotations) === func funcLabelJoin(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - panic("funcLabelReplace wrong implementation called") + panic("funcLabelJoin wrong implementation called") } // Common code for date related functions. @@ -1572,6 +1572,11 @@ func funcYear(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) }), nil } +// === info(matrix parser.ValueTypeVector, [ls label-selector]) (Vector, Annotations) === +func funcInfo(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + panic("funcInfo wrong implementation called") +} + // FunctionCalls is a list of all functions supported by PromQL, including their types. var FunctionCalls = map[string]FunctionCall{ "abs": funcAbs, @@ -1612,6 +1617,7 @@ var FunctionCalls = map[string]FunctionCall{ "hour": funcHour, "idelta": funcIdelta, "increase": funcIncrease, + "info": funcInfo, "irate": funcIrate, "label_replace": funcLabelReplace, "label_join": funcLabelJoin, diff --git a/vendor/github.com/prometheus/prometheus/promql/info.go b/vendor/github.com/prometheus/prometheus/promql/info.go new file mode 100644 index 00000000000..84f91269fd1 --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/promql/info.go @@ -0,0 +1,462 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promql + +import ( + "context" + "fmt" + "slices" + "strings" + + "github.com/grafana/regexp" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" +) + +const targetInfo = "target_info" + +var infoMetricsMap = map[string][]string{ + targetInfo: {"instance", "job"}, +} +var ignoreInfoMetrics = []*regexp.Regexp{regexp.MustCompile("^" + targetInfo + "$")} + +// evalInfo implements the info PromQL function. +func (ev *evaluator) evalInfo(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) { + val, annots := ev.eval(args[0]) + mat := val.(Matrix) + // Map from data label name to matchers. + dataLabelMatchers := map[string][]*labels.Matcher{} + if len(args) > 1 { + // TODO: Introduce a dedicated LabelSelector type + labelSelector := args[1].(*parser.VectorSelector) + for _, m := range labelSelector.LabelMatchers { + dataLabelMatchers[m.Name] = append(dataLabelMatchers[m.Name], m) + } + } + + ignoreSeries := map[int]struct{}{} +loop: + for i, s := range mat { + lblMap := s.Metric.Map() + name := lblMap[labels.MetricName] + for _, reIgnore := range ignoreInfoMetrics { + if reIgnore.MatchString(name) { + ignoreSeries[i] = struct{}{} + continue loop + } + } + } + + infoSeries, ws, err := ev.fetchInfoSeries(ctx, mat, ignoreSeries, dataLabelMatchers) + annots.Merge(ws) + if err != nil { + annots.Add(err) + return nil, annots + } + + res, ws := ev.combineWithInfoSeries(mat, infoSeries, ignoreSeries, dataLabelMatchers) + annots.Merge(ws) + return res, annots +} + +// fetchInfoSeries fetches info series given ev.selectHints and matching identifying labels in mat. +// Series in ignoreSeries are not fetched. +func (ev *evaluator) fetchInfoSeries(ctx context.Context, mat Matrix, ignoreSeries map[int]struct{}, dataLabelMatchers map[string][]*labels.Matcher) (Matrix, annotations.Annotations, error) { + if ev.selectHints == nil { + // ev.selectHints should have been set. + var annots annotations.Annotations + annots.Add(fmt.Errorf("ev.selectHints not set")) + return nil, annots, nil + } + + // A map of values for all identifying labels we are interested in. + idLblValues := map[string]map[string]struct{}{} + for _, identifyingLabels := range infoMetricsMap { + for i, s := range mat { + if _, exists := ignoreSeries[i]; exists { + continue + } + + // Register relevant values per identifying label for this series. + lblMap := s.Metric.Map() + for _, l := range identifyingLabels { + val := lblMap[l] + if val == "" { + continue + } + + if idLblValues[l] == nil { + idLblValues[l] = map[string]struct{}{} + } + idLblValues[l][val] = struct{}{} + } + } + } + if len(idLblValues) == 0 { + return nil, nil, nil + } + + // Generate regexps for every interesting value per identifying label. + var sb strings.Builder + idLblRegexps := make(map[string]string, len(idLblValues)) + for name, vals := range idLblValues { + sb.Reset() + i := 0 + for v := range vals { + if i > 0 { + sb.WriteRune('|') + } + sb.WriteString(regexp.QuoteMeta(v)) + i++ + } + idLblRegexps[name] = sb.String() + } + + i := 0 + sb.Reset() + for name := range infoMetricsMap { + if i > 0 { + sb.WriteRune('|') + } + sb.WriteString(regexp.QuoteMeta(name)) + i++ + } + var infoLabelMatchers []*labels.Matcher + for name, re := range idLblRegexps { + infoLabelMatchers = append(infoLabelMatchers, labels.MustNewMatcher(labels.MatchRegexp, name, re)) + } + var nameMatcher *labels.Matcher + for name, ms := range dataLabelMatchers { + for i, m := range ms { + if m.Name == labels.MetricName { + nameMatcher = m + ms = slices.Delete(ms, i, i+1) + } + infoLabelMatchers = append(infoLabelMatchers, m) + } + if len(ms) > 0 { + dataLabelMatchers[name] = ms + } else { + delete(dataLabelMatchers, name) + } + } + if nameMatcher == nil { + infoLabelMatchers = append([]*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, sb.String())}, infoLabelMatchers...) + } else if !nameMatcher.Matches(targetInfo) { + // We only support target_info for now. + var annots annotations.Annotations + annots.Add(fmt.Errorf("unsupported info metric matcher %s", nameMatcher)) + return nil, annots, nil + } + + infoIt := ev.querier.Select(ctx, false, ev.selectHints, infoLabelMatchers...) + annots := infoIt.Warnings() + if infoIt.Err() != nil { + return nil, annots, infoIt.Err() + } + var infoSeries []storage.Series + for infoIt.Next() { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } + infoSeries = append(infoSeries, infoIt.At()) + } + infoMat := ev.expandSeriesToMatrix(infoSeries, 0, ev.startTimestamp, ev.endTimestamp, ev.interval) + return infoMat, annots, infoIt.Err() +} + +// combineWithInfoSeries combines mat with select data labels from infoMat. +func (ev *evaluator) combineWithInfoSeries(mat, infoMat Matrix, ignoreSeries map[int]struct{}, dataLabelMatchers map[string][]*labels.Matcher) (Matrix, annotations.Annotations) { + buf := make([]byte, 0, 1024) + lb := labels.NewScratchBuilder(0) + sigFunction := func(name string, identifyingLabels []string) func(labels.Labels) string { + return func(lset labels.Labels) string { + lb.Reset() + lb.Add(labels.MetricName, name) + lset.MatchLabels(true, identifyingLabels...).Range(func(l labels.Label) { + lb.Add(l.Name, l.Value) + }) + lb.Sort() + return string(lb.Labels().Bytes(buf)) + } + } + + sigfs := make(map[string]func(labels.Labels) string, len(infoMetricsMap)) + for name, identifyingLabels := range infoMetricsMap { + sigfs[name] = sigFunction(name, identifyingLabels) + } + + // Keep a copy of the original point slices so they can be returned to the pool. + origMatrices := []Matrix{ + make(Matrix, len(mat)), + make(Matrix, len(infoMat)), + } + copy(origMatrices[0], mat) + copy(origMatrices[1], infoMat) + + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + originalNumSamples := ev.currentSamples + + // Create an output vector that is as big as the input matrix with + // the most time series. + biggestLen := max(len(mat), len(infoMat)) + baseVector := make(Vector, 0, len(mat)) + infoVector := make(Vector, 0, len(infoMat)) + enh := &EvalNodeHelper{ + Out: make(Vector, 0, biggestLen), + labelBuilder: &lb, + } + type seriesAndTimestamp struct { + Series + ts int64 + } + seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. + tempNumSamples := ev.currentSamples + + // For every base series, compute signature per info metric. + baseSigs := make([]map[string]string, 0, len(mat)) + for _, s := range mat { + sigs := make(map[string]string, len(infoMetricsMap)) + for infoName := range infoMetricsMap { + sigs[infoName] = sigfs[infoName](s.Metric) + } + baseSigs = append(baseSigs, sigs) + } + + infoSigs := make([]string, 0, len(infoMat)) + for _, s := range infoMat { + name := s.Metric.Map()[labels.MetricName] + infoSigs = append(infoSigs, sigfs[name](s.Metric)) + } + + var warnings annotations.Annotations + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } + + // Reset number of samples in memory after each timestamp. + ev.currentSamples = tempNumSamples + // Gather input vectors for this timestamp. + baseVector = ev.gatherVector(ts, mat, baseVector) + infoVector = ev.gatherVector(ts, infoMat, infoVector) + + enh.Ts = ts + result, err := ev.combineWithInfoVector(baseVector, infoVector, ignoreSeries, baseSigs, infoSigs, enh, dataLabelMatchers) + if err != nil { + warnings.Add(err) + } + enh.Out = result[:0] // Reuse result vector. + + vecNumSamples := result.TotalSamples() + ev.currentSamples += vecNumSamples + // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also + // needs to include the samples from the result here, as they're still in memory. + tempNumSamples += vecNumSamples + ev.samplesStats.UpdatePeak(ev.currentSamples) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + + // Add samples in result vector to output series. + for _, sample := range result { + h := sample.Metric.Hash() + ss, exists := seriess[h] + if exists { + if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. + ev.errorf("vector cannot contain metrics with the same labelset") + } + ss.ts = ts + } else { + ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} + } + addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, numSteps) + seriess[h] = ss + } + } + + // Reuse the original point slices. + for _, m := range origMatrices { + for _, s := range m { + putFPointSlice(s.Floats) + putHPointSlice(s.Histograms) + } + } + // Assemble the output matrix. By the time we get here we know we don't have too many samples. + numSamples := 0 + output := make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + numSamples += len(ss.Floats) + totalHPointSize(ss.Histograms) + output = append(output, ss.Series) + } + ev.currentSamples = originalNumSamples + numSamples + ev.samplesStats.UpdatePeak(ev.currentSamples) + return output, warnings +} + +func (ev *evaluator) gatherVector(ts int64, input Matrix, output Vector) Vector { + output = output[:0] + for i, series := range input { + switch { + case len(series.Floats) > 0 && series.Floats[0].T == ts: + output = append(output, Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + input[i].Floats = series.Floats[1:] + case len(series.Histograms) > 0 && series.Histograms[0].T == ts: + output = append(output, Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) + input[i].Histograms = series.Histograms[1:] + default: + continue + } + + // Don't add histogram size here because we only + // copy the pointer above, not the whole + // histogram. + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + + return output +} + +// combineWithInfoVector combines base and info Vectors. +// Base series in ignoreSeries are not combined. +func (ev *evaluator) combineWithInfoVector(base, info Vector, ignoreSeries map[int]struct{}, baseSigs []map[string]string, infoSigs []string, enh *EvalNodeHelper, dataLabelMatchers map[string][]*labels.Matcher) (Vector, error) { + if len(base) == 0 { + return nil, nil // Short-circuit: nothing is going to match. + } + + // All samples from the info Vectors hashed by the matching label/values. + if enh.infoSamplesBySig == nil { + enh.infoSamplesBySig = make(map[string]Sample, len(enh.Out)) + } else { + clear(enh.infoSamplesBySig) + } + + for i, s := range info { + sig := infoSigs[i] + if _, exists := enh.infoSamplesBySig[sig]; exists { + // TODO: Let the newest sample win. + name := s.Metric.Map()[labels.MetricName] + ev.errorf("found duplicate series for info metric %s", name) + } + enh.infoSamplesBySig[sig] = s + } + + lb := enh.labelBuilder + for i, bs := range base { + if _, exists := ignoreSeries[i]; exists { + // This series should not be enriched with info metric data labels. + enh.Out = append(enh.Out, Sample{ + Metric: bs.Metric, + F: bs.F, + H: bs.H, + }) + continue + } + + baseLabels := bs.Metric.Map() + infoLblMap := map[string]string{} + + // For every info metric name, try to find an info series with the same signature. + seenInfoMetrics := map[string]struct{}{} + for infoName, sig := range baseSigs[i] { + is, exists := enh.infoSamplesBySig[sig] + if !exists { + continue + } + if _, exists := seenInfoMetrics[infoName]; exists { + continue + } + + var err error + is.Metric.Range(func(l labels.Label) { + if err != nil { + return + } + if l.Name == labels.MetricName { + return + } + if _, exists := dataLabelMatchers[l.Name]; len(dataLabelMatchers) > 0 && !exists { + // Not among the specified data label matchers. + return + } + + if v, exists := infoLblMap[l.Name]; exists && v != l.Value { + err = fmt.Errorf("conflicting label: %s", l.Name) + return + } + if _, exists := baseLabels[l.Name]; exists { + // Skip labels already on the base metric. + return + } + + infoLblMap[l.Name] = l.Value + }) + if err != nil { + return nil, err + } + seenInfoMetrics[infoName] = struct{}{} + } + + lb.Reset() + for n, v := range infoLblMap { + lb.Add(n, v) + } + lb.Sort() + infoLbls := lb.Labels() + + if infoLbls.Len() == 0 { + // If there's at least one data label matcher not matching the empty string, + // we have to ignore this series as there are no matching info series. + allMatchersMatchEmpty := true + for _, ms := range dataLabelMatchers { + for _, m := range ms { + if !m.Matches("") { + allMatchersMatchEmpty = false + break + } + } + } + if !allMatchersMatchEmpty { + continue + } + } + + lb.Reset() + bs.Metric.Range(func(l labels.Label) { + lb.Add(l.Name, l.Value) + }) + infoLbls.Range(func(l labels.Label) { + lb.Add(l.Name, l.Value) + }) + lb.Sort() + + enh.Out = append(enh.Out, Sample{ + Metric: lb.Labels(), + F: bs.F, + H: bs.H, + }) + } + return enh.Out, nil +} diff --git a/vendor/github.com/prometheus/prometheus/promql/parser/ast.go b/vendor/github.com/prometheus/prometheus/promql/parser/ast.go index 830e8a2c5e4..b829d42575c 100644 --- a/vendor/github.com/prometheus/prometheus/promql/parser/ast.go +++ b/vendor/github.com/prometheus/prometheus/promql/parser/ast.go @@ -208,6 +208,9 @@ type VectorSelector struct { UnexpandedSeriesSet storage.SeriesSet Series []storage.Series + // SelectHints are the SelectHints to use when selecting corresponding info series, if enabled. + SelectHints *storage.SelectHints + PosRange posrange.PositionRange } diff --git a/vendor/github.com/prometheus/prometheus/promql/parser/functions.go b/vendor/github.com/prometheus/prometheus/promql/parser/functions.go index 99b41321fed..434d3cdc1c7 100644 --- a/vendor/github.com/prometheus/prometheus/promql/parser/functions.go +++ b/vendor/github.com/prometheus/prometheus/promql/parser/functions.go @@ -223,6 +223,13 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeMatrix}, ReturnType: ValueTypeVector, }, + "info": { + Name: "info", + ArgTypes: []ValueType{ValueTypeVector, ValueTypeVector}, + ReturnType: ValueTypeVector, + Experimental: true, + Variadic: 1, + }, "irate": { Name: "irate", ArgTypes: []ValueType{ValueTypeMatrix}, diff --git a/vendor/github.com/prometheus/prometheus/promql/parser/parse.go b/vendor/github.com/prometheus/prometheus/promql/parser/parse.go index 6dda2705297..e39fb1c51ce 100644 --- a/vendor/github.com/prometheus/prometheus/promql/parser/parse.go +++ b/vendor/github.com/prometheus/prometheus/promql/parser/parse.go @@ -798,6 +798,12 @@ func (p *parser) checkAST(node Node) (typ ValueType) { p.expectType(arg, n.Func.ArgTypes[i], fmt.Sprintf("call to function %q", n.Func.Name)) } + if n.Func.Name == "info" && len(n.Args) > 1 { + if n.Args[1].(*VectorSelector).Name != "" { + p.addParseErrf(n.Args[1].PositionRange(), "expected label selectors only, got vector selector instead") + } + } + case *ParenExpr: p.checkAST(n.Expr) diff --git a/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go b/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go index 8b1ec381adf..69b93645bcc 100644 --- a/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go +++ b/vendor/github.com/prometheus/prometheus/promql/promqltest/test.go @@ -80,7 +80,7 @@ func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage { } func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine { - return promql.NewEngine(promql.EngineOpts{ + o := promql.EngineOpts{ Logger: nil, Reg: nil, MaxSamples: maxSamples, @@ -90,7 +90,8 @@ func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamp EnableNegativeOffset: true, EnablePerStepStats: enablePerStepStats, LookbackDelta: lookbackDelta, - }) + } + return promql.NewEngine(o) } // RunBuiltinTests runs an acceptance test suite against the provided engine. diff --git a/vendor/github.com/prometheus/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/prometheus/tsdb/compact.go index cc73b4f9550..9b57f28edd2 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/compact.go @@ -939,7 +939,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blockPop return nil } -func debugOutOfOrderChunks(chks []chunks.Meta, logger log.Logger) { +func debugOutOfOrderChunks(lbls labels.Labels, chks []chunks.Meta, logger log.Logger) { if len(chks) <= 1 { return } @@ -954,25 +954,11 @@ func debugOutOfOrderChunks(chks []chunks.Meta, logger log.Logger) { } // Looks like the chunk is out of order. - prevSafeChk, prevIsSafeChk := prevChk.Chunk.(*safeHeadChunk) - currSafeChk, currIsSafeChk := currChk.Chunk.(*safeHeadChunk) - - // Get info out of safeHeadChunk (if possible). - prevHeadChunkID := chunks.HeadChunkID(0) - currHeadChunkID := chunks.HeadChunkID(0) - prevLabels := labels.Labels{} - currLabels := labels.Labels{} - if prevSafeChk != nil { - prevHeadChunkID = prevSafeChk.cid - prevLabels = prevSafeChk.s.lset - } - if currSafeChk != nil { - currHeadChunkID = currSafeChk.cid - currLabels = currSafeChk.s.lset - } - - level.Warn(logger).Log( + logValues := []any{ "msg", "found out-of-order chunk when compacting", + "num_chunks_for_series", len(chks), + "index", i, + "labels", lbls.String(), "prev_ref", prevChk.Ref, "curr_ref", currChk.Ref, "prev_min_time", timeFromMillis(prevChk.MinTime).UTC().String(), @@ -981,14 +967,23 @@ func debugOutOfOrderChunks(chks []chunks.Meta, logger log.Logger) { "curr_max_time", timeFromMillis(currChk.MaxTime).UTC().String(), "prev_samples", prevChk.Chunk.NumSamples(), "curr_samples", currChk.Chunk.NumSamples(), - "prev_is_safe_chunk", prevIsSafeChk, - "curr_is_safe_chunk", currIsSafeChk, - "prev_head_chunk_id", prevHeadChunkID, - "curr_head_chunk_id", currHeadChunkID, - "prev_labelset", prevLabels.String(), - "curr_labelset", currLabels.String(), - "num_chunks_for_series", len(chks), - ) + } + + // Get info out of safeHeadChunk (if possible). + if prevSafeChk, prevIsSafeChk := prevChk.Chunk.(*safeHeadChunk); prevIsSafeChk { + logValues = append(logValues, + "prev_head_chunk_id", prevSafeChk.cid, + "prev_labelset", prevSafeChk.s.lset.String(), + ) + } + if currSafeChk, currIsSafeChk := currChk.Chunk.(*safeHeadChunk); currIsSafeChk { + logValues = append(logValues, + "curr_head_chunk_id", currSafeChk.cid, + "curr_labelset", currSafeChk.s.lset.String(), + ) + } + + level.Warn(logger).Log(logValues...) } } @@ -1173,7 +1168,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa continue } - debugOutOfOrderChunks(chks, logger) + debugOutOfOrderChunks(s.Labels(), chks, logger) obIx := uint64(0) if len(outBlocks) > 1 { diff --git a/vendor/github.com/prometheus/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/prometheus/tsdb/querier.go index ca30f2f0ad9..22af2620caa 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/querier.go @@ -1076,7 +1076,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { // Check if the encoding has changed (i.e. we need to create a new // chunk as chunks can't have multiple encoding types). // For the first sample, the following condition will always be true as - // ValNoneNone != ValFloat | ValHistogram | ValFloatHistogram. + // ValNone != ValFloat | ValHistogram | ValFloatHistogram. if currentValueType != prevValueType { if prevValueType != chunkenc.ValNone { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) diff --git a/vendor/modules.txt b/vendor/modules.txt index 35ccfd1208a..1140aef289e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -997,7 +997,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240820071320-b0cc5ec7c595 ## explicit; go 1.21.0 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1648,7 +1648,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240820071320-b0cc5ec7c595 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b