diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index 68ac6e63c3c..7d7d7e4dfa0 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -43,14 +43,14 @@ func BenchmarkQuery(b *testing.B) { cases := TestCases(MetricSizes) opts := streamingpromql.NewTestEngineOpts() - standardEngine := promql.NewEngine(opts) + prometheusEngine := promql.NewEngine(opts) streamingEngine, err := streamingpromql.NewEngine(opts) require.NoError(b, err) // Important: the names below must remain in sync with the names used in tools/benchmark-query-engine. engines := map[string]promql.QueryEngine{ - "standard": standardEngine, - "streaming": streamingEngine, + "Prometheus": prometheusEngine, + "streaming": streamingEngine, } ctx := user.InjectOrgID(context.Background(), UserID) @@ -65,12 +65,12 @@ func BenchmarkQuery(b *testing.B) { b.Run(c.Name(), func(b *testing.B) { if !skipCompareResults { // Check both engines produce the same result before running the benchmark. - standardResult, standardClose := c.Run(ctx, b, start, end, interval, standardEngine, q) + prometheusResult, prometheusClose := c.Run(ctx, b, start, end, interval, prometheusEngine, q) streamingResult, streamingClose := c.Run(ctx, b, start, end, interval, streamingEngine, q) - requireEqualResults(b, standardResult, streamingResult) + requireEqualResults(b, prometheusResult, streamingResult) - standardClose() + prometheusClose() streamingClose() } @@ -95,7 +95,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) { cases := TestCases(metricSizes) opts := streamingpromql.NewTestEngineOpts() - standardEngine := promql.NewEngine(opts) + prometheusEngine := promql.NewEngine(opts) streamingEngine, err := streamingpromql.NewEngine(opts) require.NoError(t, err) @@ -106,12 +106,12 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) { start := time.Unix(int64((NumIntervals-c.Steps)*intervalSeconds), 0) end := time.Unix(int64(NumIntervals*intervalSeconds), 0) - standardResult, standardClose := c.Run(ctx, t, start, end, interval, standardEngine, q) + prometheusResult, prometheusClose := c.Run(ctx, t, start, end, interval, prometheusEngine, q) streamingResult, streamingClose := c.Run(ctx, t, start, end, interval, streamingEngine, q) - requireEqualResults(t, standardResult, streamingResult) + requireEqualResults(t, prometheusResult, streamingResult) - standardClose() + prometheusClose() streamingClose() }) } diff --git a/pkg/streamingpromql/compare.sh b/pkg/streamingpromql/compare.sh index 63f2ba6854d..5c7cbbf345d 100755 --- a/pkg/streamingpromql/compare.sh +++ b/pkg/streamingpromql/compare.sh @@ -4,14 +4,14 @@ set -euo pipefail -RESULTS_FILE="$1" # Should be the path to a file produced by a command like `go test -run=XXX -bench="BenchmarkQuery" -count=6 -benchmem -timeout=1h .` +RESULTS_FILE="$1" # Should be the path to a file produced by a command like `go run ./tools/benchmark-query-engine -count=6 | tee output.txt` -STANDARD_RESULTS_FILE=$(mktemp /tmp/standard.XXXX) +PROMETHEUS_RESULTS_FILE=$(mktemp /tmp/prometheus.XXXX) STREAMING_RESULTS_FILE=$(mktemp /tmp/streaming.XXXX) -grep --invert-match "streaming-" "$RESULTS_FILE" | sed -E 's#/standard-[0-9]+##g' > "$STANDARD_RESULTS_FILE" -grep --invert-match "standard-" "$RESULTS_FILE" | sed -E 's#/streaming-[0-9]+##g' > "$STREAMING_RESULTS_FILE" +grep --invert-match "streaming-" "$RESULTS_FILE" | sed -E 's#/Prometheus-[0-9]+##g' > "$PROMETHEUS_RESULTS_FILE" +grep --invert-match "Prometheus-" "$RESULTS_FILE" | sed -E 's#/streaming-[0-9]+##g' > "$STREAMING_RESULTS_FILE" -benchstat "$STANDARD_RESULTS_FILE" "$STREAMING_RESULTS_FILE" | sed "s#$STANDARD_RESULTS_FILE# standard #g" | sed "s#$STREAMING_RESULTS_FILE# streaming #g" +benchstat "$PROMETHEUS_RESULTS_FILE" "$STREAMING_RESULTS_FILE" | sed "s#$PROMETHEUS_RESULTS_FILE# Prometheus #g" | sed "s#$STREAMING_RESULTS_FILE# streaming #g" -rm "$STANDARD_RESULTS_FILE" "$STREAMING_RESULTS_FILE" +rm "$PROMETHEUS_RESULTS_FILE" "$STREAMING_RESULTS_FILE" diff --git a/tools/benchmark-query-engine/README.md b/tools/benchmark-query-engine/README.md index 2ef48ee89a2..96171d4d8f4 100644 --- a/tools/benchmark-query-engine/README.md +++ b/tools/benchmark-query-engine/README.md @@ -13,3 +13,5 @@ Usage: - `go run . -bench=abc`: run all benchmarks with names matching regex `abc` - `go run . -count=X`: run all benchmarks X times - `go run . -bench=abc -count=X`: run all benchmarks with names matching regex `abc` X times +- `go run . -start-ingester`: start ingester and wait (run no benchmarks) +- `go run . -use-existing-ingester=localhost:1234`: use existing ingester started with `-start-ingester` to reduce startup time diff --git a/tools/benchmark-query-engine/main.go b/tools/benchmark-query-engine/main.go index d6d05ddb60d..2ef52290fa3 100644 --- a/tools/benchmark-query-engine/main.go +++ b/tools/benchmark-query-engine/main.go @@ -4,11 +4,13 @@ package main import ( "bytes" + "errors" "flag" "fmt" "log/slog" "os" "os/exec" + "os/signal" "path/filepath" "slices" "strings" @@ -38,13 +40,16 @@ type app struct { ingesterAddress string cleanup func() - count uint - testFilter string - listTests bool + count uint + testFilter string + listTests bool + justRunIngester bool } func (a *app) run() error { - a.parseArgs() + if err := a.parseArgs(); err != nil { + return err + } // Do this early, to avoid doing a bunch of pointless work if the regex is invalid or doesn't match any tests. filteredNames, err := a.filteredTestCaseNames() @@ -67,6 +72,23 @@ func (a *app) run() error { defer os.RemoveAll(a.tempDir) + if err := a.startIngesterAndLoadData(); err != nil { + return fmt.Errorf("starting ingester and loading data failed: %w", err) + } + defer a.cleanup() + + if a.justRunIngester { + return a.waitForExit() + } + + if err := a.runBenchmarks(filteredNames); err != nil { + return err + } + + return nil +} + +func (a *app) runBenchmarks(filteredNames []string) error { if err := a.buildBinary(); err != nil { return fmt.Errorf("building binary failed: %w", err) } @@ -75,11 +97,6 @@ func (a *app) run() error { return fmt.Errorf("benchmark binary failed validation: %w", err) } - if err := a.startIngesterAndLoadData(); err != nil { - return fmt.Errorf("starting ingester and loading data failed: %w", err) - } - defer a.cleanup() - haveRunAnyTests := false for _, name := range filteredNames { @@ -92,21 +109,43 @@ func (a *app) run() error { } } - slog.Info("benchmarks completed successfully, cleaning up...") + slog.Info("benchmarks completed successfully") + return nil +} + +func (a *app) waitForExit() error { + // I know it's a bit weird to use string formatting like this when using structured logging, but this produces the clearest message. + slog.Info(fmt.Sprintf("ingester running, run benchmark-query-engine with -use-existing-ingester=%v", a.ingesterAddress)) + slog.Info("press Ctrl+C to exit") + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + <-done + + println() + slog.Info("interrupt received, shutting down...") return nil } -func (a *app) parseArgs() { +func (a *app) parseArgs() error { flag.UintVar(&a.count, "count", 1, "run each benchmark n times") flag.StringVar(&a.testFilter, "bench", ".", "only run benchmarks matching regexp") flag.BoolVar(&a.listTests, "list", false, "list known benchmarks and exit") + flag.BoolVar(&a.justRunIngester, "start-ingester", false, "start ingester and wait, run no benchmarks") + flag.StringVar(&a.ingesterAddress, "use-existing-ingester", "", "use existing ingester rather than creating a new one") if err := flagext.ParseFlagsWithoutArguments(flag.CommandLine); err != nil { fmt.Printf("%v\n", err) flag.Usage() os.Exit(1) } + + if a.justRunIngester && a.ingesterAddress != "" { + return errors.New("cannot specify both '-start-ingester' and an existing ingester address with '-use-existing-ingester'") + } + + return nil } func (a *app) findBenchmarkPackageDir() error { @@ -181,6 +220,14 @@ func (a *app) validateBinary() error { } func (a *app) startIngesterAndLoadData() error { + if a.ingesterAddress != "" { + slog.Warn("using existing ingester; not checking data required for benchmark is present", "address", a.ingesterAddress) + a.cleanup = func() { + // Nothing to do. + } + return nil + } + slog.Info("starting ingester and loading data...") address, cleanup, err := benchmarks.StartIngesterAndLoadData(a.dataDir, benchmarks.MetricSizes) @@ -211,7 +258,7 @@ func (a *app) allTestCaseNames() []string { for _, c := range cases { names = append(names, benchmarkName+"/"+c.Name()+"/streaming") - names = append(names, benchmarkName+"/"+c.Name()+"/standard") + names = append(names, benchmarkName+"/"+c.Name()+"/Prometheus") } return names