Skip to content

Commit

Permalink
MQE: Panic when memory isn't managed as we expect (#9121)
Browse files Browse the repository at this point in the history
* MQE: Panic on negative memory consumption estimates

* Add pedantic option for panicing when pools aren't returned

For use in tests

* Fix accidentally getting extra slices

We would get a new slice when we already had an existing slice we were
keeping around for use.

* Update CHANGELOG

* Fix linting

* Address review feedback
  • Loading branch information
jhesketh authored Aug 28, 2024
1 parent a4a8792 commit 40da1cb
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* [CHANGE] Distributor: Replace `-distributor.retry-after-header.max-backoff-exponent` and `-distributor.retry-after-header.base-seconds` with `-distributor.retry-after-header.min-backoff` and `-distributor.retry-after-header.max-backoff` for easier configuration. #8694
* [CHANGE] Ingester: increase the default inactivity timeout of active series (`-ingester.active-series-metrics-idle-timeout`) from `10m` to `20m`. #8975
* [CHANGE] Distributor: Remove `-distributor.enable-otlp-metadata-storage` flag, which was deprecated in version 2.12. #9069
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9017 #9018 #9008 #9120
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9017 #9018 #9008 #9120 #9121
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
4 changes: 4 additions & 0 deletions pkg/streamingpromql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
type EngineOpts struct {
CommonOpts promql.EngineOpts
FeatureToggles FeatureToggles

// When operating in pedantic mode, we panic if memory consumption is > 0 after Query.Close()
// (indicating something was not returned to a pool).
Pedantic bool
}

type FeatureToggles struct {
Expand Down
6 changes: 6 additions & 0 deletions pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func NewEngine(opts EngineOpts, limitsProvider QueryLimitsProvider, metrics *sta
NativeHistogramBucketFactor: 1.1,
}),
queriesRejectedDueToPeakMemoryConsumption: metrics.QueriesRejectedTotal.WithLabelValues(stats.RejectReasonMaxEstimatedQueryMemoryConsumption),

pedantic: opts.Pedantic,
}, nil
}

Expand All @@ -67,6 +69,10 @@ type Engine struct {
logger log.Logger
estimatedPeakMemoryConsumption prometheus.Histogram
queriesRejectedDueToPeakMemoryConsumption prometheus.Counter

// When operating in pedantic mode, we panic if memory consumption is > 0 after Query.Close()
// (indicating something was not returned to a pool).
pedantic bool
}

func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down
13 changes: 11 additions & 2 deletions pkg/streamingpromql/functions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestDropSeriesName(t *testing.T) {
func TestFloatTransformationFunc(t *testing.T) {
transform := func(f float64) float64 { return f * 2 }
transformFunc := floatTransformationFunc(transform)
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)

seriesData := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand All @@ -43,6 +44,8 @@ func TestFloatTransformationFunc(t *testing.T) {
{H: &histogram.FloatHistogram{Count: 1, Sum: 2}},
},
}
// Increase the memory tracker for 2 FPoints, and 1 HPoint
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand All @@ -54,14 +57,16 @@ func TestFloatTransformationFunc(t *testing.T) {
},
}

modifiedSeriesData, err := transformFunc(seriesData, limiting.NewMemoryConsumptionTracker(0, nil))
modifiedSeriesData, err := transformFunc(seriesData, memoryConsumptionTracker)
require.NoError(t, err)
require.Equal(t, expected, modifiedSeriesData)
require.Equal(t, types.FPointSize*2+types.HPointSize*1, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes)
}

func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
transform := func(f float64) float64 { return f * 2 }
transformFunc := FloatTransformationDropHistogramsFunc(transform)
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)

seriesData := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand All @@ -72,6 +77,8 @@ func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
{H: &histogram.FloatHistogram{Count: 1, Sum: 2}},
},
}
// Increase the memory tracker for 2 FPoints, and 1 HPoint
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand All @@ -81,7 +88,9 @@ func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
Histograms: nil, // Histograms should be dropped
}

modifiedSeriesData, err := transformFunc(seriesData, limiting.NewMemoryConsumptionTracker(0, nil))
modifiedSeriesData, err := transformFunc(seriesData, memoryConsumptionTracker)
require.NoError(t, err)
require.Equal(t, expected, modifiedSeriesData)
// We expect the dropped histogram to be returned to the pool
require.Equal(t, types.FPointSize*2, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes)
}
3 changes: 3 additions & 0 deletions pkg/streamingpromql/limiting/memory_consumption.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,8 @@ func (l *MemoryConsumptionTracker) IncreaseMemoryConsumption(b uint64) error {

// DecreaseMemoryConsumption decreases the current memory consumption by b bytes.
func (l *MemoryConsumptionTracker) DecreaseMemoryConsumption(b uint64) {
if b > l.CurrentEstimatedMemoryConsumptionBytes {
panic("Estimated memory consumption of this query is negative. This indicates something has been returned to a pool more than once, which is a bug.")
}
l.CurrentEstimatedMemoryConsumptionBytes -= b
}
6 changes: 6 additions & 0 deletions pkg/streamingpromql/limiting/memory_consumption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func TestMemoryConsumptionTracker_Unlimited(t *testing.T) {
require.Equal(t, uint64(131), tracker.PeakEstimatedMemoryConsumptionBytes)

assertRejectedQueriesCount(t, reg, 0)

// Test reducing memory consumption to a negative panics
require.Panics(t, func() { tracker.DecreaseMemoryConsumption(150) })
}

func TestMemoryConsumptionTracker_Limited(t *testing.T) {
Expand Down Expand Up @@ -93,6 +96,9 @@ func TestMemoryConsumptionTracker_Limited(t *testing.T) {
require.Equal(t, uint64(11), tracker.CurrentEstimatedMemoryConsumptionBytes)
require.Equal(t, uint64(11), tracker.PeakEstimatedMemoryConsumptionBytes)
assertRejectedQueriesCount(t, reg, 1)

// Test reducing memory consumption to a negative panics
require.Panics(t, func() { tracker.DecreaseMemoryConsumption(150) })
}

func assertRejectedQueriesCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/streamingpromql/operators/binary_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,10 +778,13 @@ func (b *BinaryOperation) computeResult(left types.InstantVectorSeriesData, righ
// Can fit output in left side, and the left side is smaller than the right
canReturnLeftFPointSlice = false
fPoints = left.Floats[:0]
} else if !mixedPoints && maxPoints <= cap(right.Floats) {
return nil
}
if !mixedPoints && maxPoints <= cap(right.Floats) {
// Can otherwise fit in the right side
canReturnRightFPointSlice = false
fPoints = right.Floats[:0]
return nil
}
// Either we have mixed points or we can't fit in either left or right side, so create a new slice
var err error
Expand All @@ -796,10 +799,13 @@ func (b *BinaryOperation) computeResult(left types.InstantVectorSeriesData, righ
// Can fit output in left side, and the left side is smaller than the right
canReturnLeftHPointSlice = false
hPoints = left.Histograms[:0]
} else if !mixedPoints && maxPoints <= cap(right.Histograms) {
return nil
}
if !mixedPoints && maxPoints <= cap(right.Histograms) {
// Can otherwise fit in the right side
canReturnRightHPointSlice = false
hPoints = right.Histograms[:0]
return nil
}
// Either we have mixed points or we can't fit in either left or right side, so create a new slice
var err error
Expand Down
12 changes: 10 additions & 2 deletions pkg/streamingpromql/operators/binary_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,14 +851,19 @@ func TestBinaryOperation_SeriesMerging(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
o := &BinaryOperation{
// Simulate an expression with "on (env)".
// This is used to generate error messages.
VectorMatching: parser.VectorMatching{
On: true,
MatchingLabels: []string{"env"},
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
MemoryConsumptionTracker: memoryConsumptionTracker,
}
for _, s := range testCase.input {
// Count the memory for the given floats + histograms
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats))+types.HPointSize*uint64(len(s.Histograms))))
}

result, err := o.mergeOneSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right")
Expand Down Expand Up @@ -1130,7 +1135,10 @@ func TestBinaryOperationSeriesBuffer(t *testing.T) {
}

seriesUsed := []bool{true, false, true, true, true}
buffer := newBinaryOperationSeriesBuffer(inner, seriesUsed, limiting.NewMemoryConsumptionTracker(0, nil))
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
// We have 6 FPoints from the inner series
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6))
buffer := newBinaryOperationSeriesBuffer(inner, seriesUsed, memoryConsumptionTracker)
ctx := context.Background()

// Read first series.
Expand Down
6 changes: 6 additions & 0 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,12 @@ func (q *Query) Close() {
default:
panic(fmt.Sprintf("unknown result value type %T", q.result.Value))
}

if q.engine.pedantic && q.result.Err == nil {
if q.memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes > 0 {
panic("Memory consumption tracker still estimates > 0 bytes used. This indicates something has not been returned to a pool.")
}
}
}

func (q *Query) Statement() parser.Statement {
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ func NewTestEngineOpts() EngineOpts {
},

FeatureToggles: EnableAllFeatures,
Pedantic: true,
}
}

0 comments on commit 40da1cb

Please sign in to comment.