Skip to content

Commit

Permalink
Streaming PromQL engine: benchmarking improvements (#8114)
Browse files Browse the repository at this point in the history
* Use clearer name for Prometheus' engine in benchmark output

* Add ability to start ingester and use it for multiple benchmark runs
  • Loading branch information
charleskorn authored May 13, 2024
1 parent 31edde1 commit 1d4aac1
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 28 deletions.
20 changes: 10 additions & 10 deletions pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ func BenchmarkQuery(b *testing.B) {
cases := TestCases(MetricSizes)

opts := streamingpromql.NewTestEngineOpts()
standardEngine := promql.NewEngine(opts)
prometheusEngine := promql.NewEngine(opts)
streamingEngine, err := streamingpromql.NewEngine(opts)
require.NoError(b, err)

// Important: the names below must remain in sync with the names used in tools/benchmark-query-engine.
engines := map[string]promql.QueryEngine{
"standard": standardEngine,
"streaming": streamingEngine,
"Prometheus": prometheusEngine,
"streaming": streamingEngine,
}

ctx := user.InjectOrgID(context.Background(), UserID)
Expand All @@ -65,12 +65,12 @@ func BenchmarkQuery(b *testing.B) {
b.Run(c.Name(), func(b *testing.B) {
if !skipCompareResults {
// Check both engines produce the same result before running the benchmark.
standardResult, standardClose := c.Run(ctx, b, start, end, interval, standardEngine, q)
prometheusResult, prometheusClose := c.Run(ctx, b, start, end, interval, prometheusEngine, q)
streamingResult, streamingClose := c.Run(ctx, b, start, end, interval, streamingEngine, q)

requireEqualResults(b, standardResult, streamingResult)
requireEqualResults(b, prometheusResult, streamingResult)

standardClose()
prometheusClose()
streamingClose()
}

Expand All @@ -95,7 +95,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) {
cases := TestCases(metricSizes)

opts := streamingpromql.NewTestEngineOpts()
standardEngine := promql.NewEngine(opts)
prometheusEngine := promql.NewEngine(opts)
streamingEngine, err := streamingpromql.NewEngine(opts)
require.NoError(t, err)

Expand All @@ -106,12 +106,12 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) {
start := time.Unix(int64((NumIntervals-c.Steps)*intervalSeconds), 0)
end := time.Unix(int64(NumIntervals*intervalSeconds), 0)

standardResult, standardClose := c.Run(ctx, t, start, end, interval, standardEngine, q)
prometheusResult, prometheusClose := c.Run(ctx, t, start, end, interval, prometheusEngine, q)
streamingResult, streamingClose := c.Run(ctx, t, start, end, interval, streamingEngine, q)

requireEqualResults(t, standardResult, streamingResult)
requireEqualResults(t, prometheusResult, streamingResult)

standardClose()
prometheusClose()
streamingClose()
})
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/streamingpromql/compare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

set -euo pipefail

RESULTS_FILE="$1" # Should be the path to a file produced by a command like `go test -run=XXX -bench="BenchmarkQuery" -count=6 -benchmem -timeout=1h .`
RESULTS_FILE="$1" # Should be the path to a file produced by a command like `go run ./tools/benchmark-query-engine -count=6 | tee output.txt`

STANDARD_RESULTS_FILE=$(mktemp /tmp/standard.XXXX)
PROMETHEUS_RESULTS_FILE=$(mktemp /tmp/prometheus.XXXX)
STREAMING_RESULTS_FILE=$(mktemp /tmp/streaming.XXXX)

grep --invert-match "streaming-" "$RESULTS_FILE" | sed -E 's#/standard-[0-9]+##g' > "$STANDARD_RESULTS_FILE"
grep --invert-match "standard-" "$RESULTS_FILE" | sed -E 's#/streaming-[0-9]+##g' > "$STREAMING_RESULTS_FILE"
grep --invert-match "streaming-" "$RESULTS_FILE" | sed -E 's#/Prometheus-[0-9]+##g' > "$PROMETHEUS_RESULTS_FILE"
grep --invert-match "Prometheus-" "$RESULTS_FILE" | sed -E 's#/streaming-[0-9]+##g' > "$STREAMING_RESULTS_FILE"

benchstat "$STANDARD_RESULTS_FILE" "$STREAMING_RESULTS_FILE" | sed "s#$STANDARD_RESULTS_FILE# standard #g" | sed "s#$STREAMING_RESULTS_FILE# streaming #g"
benchstat "$PROMETHEUS_RESULTS_FILE" "$STREAMING_RESULTS_FILE" | sed "s#$PROMETHEUS_RESULTS_FILE# Prometheus #g" | sed "s#$STREAMING_RESULTS_FILE# streaming #g"

rm "$STANDARD_RESULTS_FILE" "$STREAMING_RESULTS_FILE"
rm "$PROMETHEUS_RESULTS_FILE" "$STREAMING_RESULTS_FILE"
2 changes: 2 additions & 0 deletions tools/benchmark-query-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ Usage:
- `go run . -bench=abc`: run all benchmarks with names matching regex `abc`
- `go run . -count=X`: run all benchmarks X times
- `go run . -bench=abc -count=X`: run all benchmarks with names matching regex `abc` X times
- `go run . -start-ingester`: start ingester and wait (run no benchmarks)
- `go run . -use-existing-ingester=localhost:1234`: use existing ingester started with `-start-ingester` to reduce startup time
71 changes: 59 additions & 12 deletions tools/benchmark-query-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package main

import (
"bytes"
"errors"
"flag"
"fmt"
"log/slog"
"os"
"os/exec"
"os/signal"
"path/filepath"
"slices"
"strings"
Expand Down Expand Up @@ -38,13 +40,16 @@ type app struct {
ingesterAddress string
cleanup func()

count uint
testFilter string
listTests bool
count uint
testFilter string
listTests bool
justRunIngester bool
}

func (a *app) run() error {
a.parseArgs()
if err := a.parseArgs(); err != nil {
return err
}

// Do this early, to avoid doing a bunch of pointless work if the regex is invalid or doesn't match any tests.
filteredNames, err := a.filteredTestCaseNames()
Expand All @@ -67,6 +72,23 @@ func (a *app) run() error {

defer os.RemoveAll(a.tempDir)

if err := a.startIngesterAndLoadData(); err != nil {
return fmt.Errorf("starting ingester and loading data failed: %w", err)
}
defer a.cleanup()

if a.justRunIngester {
return a.waitForExit()
}

if err := a.runBenchmarks(filteredNames); err != nil {
return err
}

return nil
}

func (a *app) runBenchmarks(filteredNames []string) error {
if err := a.buildBinary(); err != nil {
return fmt.Errorf("building binary failed: %w", err)
}
Expand All @@ -75,11 +97,6 @@ func (a *app) run() error {
return fmt.Errorf("benchmark binary failed validation: %w", err)
}

if err := a.startIngesterAndLoadData(); err != nil {
return fmt.Errorf("starting ingester and loading data failed: %w", err)
}
defer a.cleanup()

haveRunAnyTests := false

for _, name := range filteredNames {
Expand All @@ -92,21 +109,43 @@ func (a *app) run() error {
}
}

slog.Info("benchmarks completed successfully, cleaning up...")
slog.Info("benchmarks completed successfully")
return nil
}

func (a *app) waitForExit() error {
// I know it's a bit weird to use string formatting like this when using structured logging, but this produces the clearest message.
slog.Info(fmt.Sprintf("ingester running, run benchmark-query-engine with -use-existing-ingester=%v", a.ingesterAddress))
slog.Info("press Ctrl+C to exit")

done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done

println()
slog.Info("interrupt received, shutting down...")

return nil
}

func (a *app) parseArgs() {
func (a *app) parseArgs() error {
flag.UintVar(&a.count, "count", 1, "run each benchmark n times")
flag.StringVar(&a.testFilter, "bench", ".", "only run benchmarks matching regexp")
flag.BoolVar(&a.listTests, "list", false, "list known benchmarks and exit")
flag.BoolVar(&a.justRunIngester, "start-ingester", false, "start ingester and wait, run no benchmarks")
flag.StringVar(&a.ingesterAddress, "use-existing-ingester", "", "use existing ingester rather than creating a new one")

if err := flagext.ParseFlagsWithoutArguments(flag.CommandLine); err != nil {
fmt.Printf("%v\n", err)
flag.Usage()
os.Exit(1)
}

if a.justRunIngester && a.ingesterAddress != "" {
return errors.New("cannot specify both '-start-ingester' and an existing ingester address with '-use-existing-ingester'")
}

return nil
}

func (a *app) findBenchmarkPackageDir() error {
Expand Down Expand Up @@ -181,6 +220,14 @@ func (a *app) validateBinary() error {
}

func (a *app) startIngesterAndLoadData() error {
if a.ingesterAddress != "" {
slog.Warn("using existing ingester; not checking data required for benchmark is present", "address", a.ingesterAddress)
a.cleanup = func() {
// Nothing to do.
}
return nil
}

slog.Info("starting ingester and loading data...")

address, cleanup, err := benchmarks.StartIngesterAndLoadData(a.dataDir, benchmarks.MetricSizes)
Expand Down Expand Up @@ -211,7 +258,7 @@ func (a *app) allTestCaseNames() []string {

for _, c := range cases {
names = append(names, benchmarkName+"/"+c.Name()+"/streaming")
names = append(names, benchmarkName+"/"+c.Name()+"/standard")
names = append(names, benchmarkName+"/"+c.Name()+"/Prometheus")
}

return names
Expand Down

0 comments on commit 1d4aac1

Please sign in to comment.