Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117649: colexecagg: share agg func allocs between different functions if possible r=yuzefovich a=yuzefovich

Previously, for every aggregate function we would always create a fresh `aggregateFuncAlloc` struct. That allocator is used to batch-allocate buckets. However, we would never share the alloc between different output columns even if the same aggregate function type overload is used. This commit addresses that inefficiency by enumerating all possible aggregate overloads for optimized aggregate functions and storing them in a single stack-allocated slice for reuse. This slice first stores 11 anyNotNull aggregates, then 6 avg aggregates, then 1 boolAnd aggregate, etc. In other words, each aggregate overload has a fixed spot in the array. This scheme was chosen over two dimensional array to remove any unused slots (we could have a separate slice for each aggregate type, and that slice would be up to 11 - the maximum number of type overloads any aggregate can have right now).

The main observation that allows us to easily reuse these allocs is the fact that there is no concurrency when allocating buckets.

For hash aggregation we currently use 128 allocation size in these allocators whereas for ordered / window aggregation we use just 1. This commit additionally extends `aggregateFuncAlloc` interface to add a method to increment the alloc size by 1. Namely, we don't change anything in case of the hash aggregation (meaning the same pool of 128 size can now be shared among multiple functions), but for ordered and window aggregations we increment the alloc size every time we reuse the allocator - this will make it so that for ordered / window aggs we batch-allocate all necessary objects at once.

Still, by far the main benefit of this change will be observed in case of the hash aggregation with small number of groups (buckets).

Addresses: cockroachdb#117546.

Epic: None.

Release note: None

117751: stress: reduce timeouts for `deadlock` and `race` under RBE r=celiala a=rickystewart

`deadlock` timeouts appear to be much higher than they need to: `race` can also be reduced somewhat.

Epic: CRDB-8308
Release note: None

117752: workflows: remove experimental EngFlow `master` build r=celiala a=rickystewart

This did not improve anything so it can be removed for now.

Epic: CRDB-8308
Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
3 people committed Jan 12, 2024
4 parents 099f355 + d39f563 + e7fa1b8 + 45fd74b commit 8c5b9c7
Show file tree
Hide file tree
Showing 33 changed files with 764 additions and 186 deletions.
14 changes: 0 additions & 14 deletions .github/workflows/experimental-github-actions-master.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

export RUNS_PER_TEST=3
export EXTRA_TEST_ARGS="--define gotags=bazel,gss,deadlock --test_timeout=1800,3600,5395,5395 --heavy"
export EXTRA_TEST_ARGS="--define gotags=bazel,gss,deadlock --test_timeout=300,1000,1500,2000 --heavy"
export EXTRA_ISSUE_PARAMS=deadlock

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
Expand Down
2 changes: 1 addition & 1 deletion build/teamcity/cockroach/nightlies/stress_engflow_race.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

export RUNS_PER_TEST=3
export EXTRA_TEST_ARGS="--config=race --test_timeout=1800,3600,5395,5395"
export EXTRA_TEST_ARGS="--config=race --test_timeout=1200,2500,3200,4600"
export EXTRA_ISSUE_PARAMS=race

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
Expand Down
73 changes: 39 additions & 34 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,8 @@ func TestAggregatorRandom(t *testing.T) {
expNulls[curGroup] = false
expCounts[curGroup]++
expSums[curGroup] += aggCol[i]
expMins[curGroup] = min64(aggCol[i], expMins[curGroup])
expMaxs[curGroup] = max64(aggCol[i], expMaxs[curGroup])
expMins[curGroup] = min(aggCol[i], expMins[curGroup])
expMaxs[curGroup] = max(aggCol[i], expMaxs[curGroup])
}
groups[i] = int64(curGroup)
}
Expand Down Expand Up @@ -988,12 +988,16 @@ func TestAggregatorRandom(t *testing.T) {
}
}

// benchmarkAggregateFunction runs aggregator microbenchmarks. numGroupCol is
// the number of grouping columns. groupSize is the number of tuples to target
// in each distinct aggregation group. chunkSize is the number of tuples to
// target in each distinct partially ordered group column, and is intended for
// use with partial order. Limit is the number of rows to retrieve from the
// aggregation function before ending the microbenchmark.
// benchmarkAggregateFunction runs aggregator micro benchmarks.
// - numGroupCol is the number of grouping columns.
// - groupSize is the number of tuples to target in each distinct aggregation
// group.
// - chunkSize is the number of tuples to target in each distinct partially
// ordered group column, and is intended for use with partial order.
// - limit is the number of rows to retrieve from the aggregation function
// before ending the microbenchmark.
// - numSameAggs, if positive, indicates the number of times that the provided
// aggregate function should be evaluated.
func benchmarkAggregateFunction(
b *testing.B,
agg aggType,
Expand All @@ -1005,6 +1009,7 @@ func benchmarkAggregateFunction(
numInputRows int,
chunkSize int,
limit int,
numSameAggs int,
) {
defer log.Scope(b).Close(b)
if groupSize > numInputRows {
Expand Down Expand Up @@ -1096,10 +1101,15 @@ func benchmarkAggregateFunction(
tc := aggregatorTestCase{
typs: typs,
groupCols: groupCols,
aggCols: [][]uint32{aggCols},
aggFns: []execinfrapb.AggregatorSpec_Func{aggFn},
unorderedInput: agg.order == unordered,
}
if numSameAggs < 1 {
numSameAggs = 1
}
for i := 0; i < numSameAggs; i++ {
tc.aggCols = append(tc.aggCols, aggCols)
tc.aggFns = append(tc.aggFns, aggFn)
}
if distinctProb > 0 {
if !typs[0].Identical(types.Int) {
skip.IgnoreLint(b, "benchmarking distinct aggregation is supported only on an INT argument")
Expand Down Expand Up @@ -1148,9 +1158,13 @@ func benchmarkAggregateFunction(
if distinctProb > 0 {
distinctProbString = fmt.Sprintf("/distinctProb=%.2f", distinctProb)
}
numSameAggsSuffix := ""
if numSameAggs != 1 {
numSameAggsSuffix = fmt.Sprintf("/numSameAggs=%d", numSameAggs)
}
b.Run(fmt.Sprintf(
"%s/%s/%s/groupSize=%d%s/numInputRows=%d",
fName, agg.name, inputTypesString, groupSize, distinctProbString, numInputRows),
"%s/%s/%s%s/groupSize=%d%s/numInputRows=%d",
fName, agg.name, inputTypesString, numSameAggsSuffix, groupSize, distinctProbString, numInputRows),
func(b *testing.B) {
b.SetBytes(int64(argumentsSize * numInputRows))
b.ResetTimer()
Expand Down Expand Up @@ -1201,12 +1215,15 @@ func BenchmarkAggregator(b *testing.B) {
// when benchmarking the aggregator logic.
aggFn := execinfrapb.AnyNotNull
for _, agg := range aggTypes {
for _, numInputRows := range numRows {
for _, groupSize := range groupSizes {
benchmarkAggregateFunction(
b, agg, aggFn, []*types.T{types.Int}, 1, /* numGroupCol */
groupSize, 0 /* distinctProb */, numInputRows,
0 /* chunkSize */, 0 /* limit */)
for _, numSameAggs := range []int{1, 4} {
for _, numInputRows := range numRows {
for _, groupSize := range groupSizes {
benchmarkAggregateFunction(
b, agg, aggFn, []*types.T{types.Int}, 1, /* numGroupCol */
groupSize, 0 /* distinctProb */, numInputRows,
0 /* chunkSize */, 0 /* limit */, numSameAggs,
)
}
}
}
}
Expand Down Expand Up @@ -1243,7 +1260,8 @@ func BenchmarkAllOptimizedAggregateFunctions(b *testing.B) {
benchmarkAggregateFunction(b, agg, aggFn, aggInputTypes,
1 /* numGroupCol */, groupSize,
0 /* distinctProb */, numInputRows,
0 /* chunkSize */, 0 /* limit */)
0 /* chunkSize */, 0 /* limit */, 0, /* numSameAggs */
)
}
}
}
Expand All @@ -1267,23 +1285,10 @@ func BenchmarkDistinctAggregation(b *testing.B) {
benchmarkAggregateFunction(b, agg, aggFn, []*types.T{types.Int},
1 /* numGroupCol */, groupSize,
0 /* distinctProb */, numInputRows,
0 /* chunkSize */, 0 /* limit */)
0 /* chunkSize */, 0 /* limit */, 0, /* numSameAggs */
)
}
}
}
}
}

func min64(a, b float64) float64 {
if a < b {
return a
}
return b
}

func max64(a, b float64) float64 {
if a > b {
return a
}
return b
}
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/buildutil",
"//pkg/util/duration",
"//pkg/util/json", # keep
"//pkg/util/mon",
Expand Down
Loading

0 comments on commit 8c5b9c7

Please sign in to comment.