diff --git a/pkg/parcacol/querier.go b/pkg/parcacol/querier.go index 593bebcc4ef..0cf28044b06 100644 --- a/pkg/parcacol/querier.go +++ b/pkg/parcacol/querier.go @@ -344,6 +344,7 @@ const ( ColumnPeriodSum = "sum(" + profile.ColumnPeriod + ")" ColumnValueCount = "count(" + profile.ColumnValue + ")" ColumnValueSum = "sum(" + profile.ColumnValue + ")" + ColumnValueFirst = "first(" + profile.ColumnValue + ")" ) func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, sampleTypeUnit string) ([]*pb.MetricsSeries, error) { @@ -531,13 +532,22 @@ func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan Filter(filterExpr). Aggregate( []logicalplan.Expr{ - logicalplan.Sum(logicalplan.Col(profile.ColumnValue)), + logicalplan.Sum(logicalplan.Col(profile.ColumnValue)).Alias(ColumnValueFirst), }, []logicalplan.Expr{ logicalplan.DynCol(profile.ColumnLabels), logicalplan.Col(profile.ColumnTimestamp), }, ). + Aggregate( + []logicalplan.Expr{ + logicalplan.Take(logicalplan.Col(profile.ColumnValue), 1).Alias(ColumnValueFirst), + }, + []logicalplan.Expr{ + logicalplan.DynCol(profile.ColumnLabels), + logicalplan.Duration(1000 * time.Millisecond), + }, + ). Execute(ctx, func(ctx context.Context, r arrow.Record) error { r.Retain() records = append(records, r) @@ -561,7 +571,7 @@ func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan // Add necessary columns and their found value is false by default. columnIndices := map[string]columnIndex{ profile.ColumnTimestamp: {}, - ColumnValueSum: {}, + ColumnValueFirst: {}, } labelColumnIndices := []int{} labelSet := labels.Labels{} @@ -623,17 +633,17 @@ func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan } ts := ar.Column(columnIndices[profile.ColumnTimestamp].index).(*array.Int64).Value(i) - value := ar.Column(columnIndices[ColumnValueSum].index).(*array.Int64).Value(i) + + // value := ar.Column(columnIndices[ColumnValueFirst].index).(*array.Int64).Value(i) + valueList := ar.Column(columnIndices[ColumnValueFirst].index).(*array.List) + start, _ := valueList.ValueOffsets(i) + value := valueList.ListValues().(*array.Int64).Value(int(start)) // Each step bucket will only return one of the timestamps and its value. // For this reason we'll take each timestamp and divide it by the step seconds. // If we have seen a MetricsSample for this bucket before, we'll ignore this one. // If we haven't seen one we'll add this sample to the response. - // TODO: This still queries way too much data from the underlying database. - // This needs to be moved to FrostDB to not even query all of this data in the first place. - // With a scrape interval of 10s and a query range of 1d we'd query 8640 samples and at most return 960. - // Even worse for a week, we'd query 60480 samples and only return 1000. tsBucket := ts / 1000 / int64(step.Seconds()) if _, found := resSeriesBuckets[index][tsBucket]; found { // We already have a MetricsSample for this timestamp bucket, ignore it.