Skip to content

Commit

Permalink
Don't propagate cancel signal to the Prometheus rules manager context
Browse files Browse the repository at this point in the history
This change allows the rules that are still  executing queries to complete
before cortex if sully shutdown.

Signed-off-by: Raphael Silva <[email protected]>
  • Loading branch information
rapphil committed Nov 8, 2024
1 parent 7e35ef5 commit 6f53f67
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326

## 1.18.1 2024-10-14

Expand Down
6 changes: 5 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,15 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
queryFunc = metricsQueryFunc
}

// We let the Prometheus rules manager control the context so that there is a chance
// for graceful shutdown of rules that are still in execution even in case the cortex context is canceled.
prometheusContext := user.InjectOrgID(context.WithoutCancel(ctx), userID)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Queryable: q,
QueryFunc: queryFunc,
Context: user.InjectOrgID(ctx, userID),
Context: prometheusContext,
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
Logger: log.With(logger, "user", userID),
Expand Down
128 changes: 128 additions & 0 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,56 @@ func (e emptyQuerier) Select(ctx context.Context, sortSeries bool, hints *storag
return storage.EmptySeriesSet()
}

func fixedQueryable(querier storage.Querier) storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
return querier, nil
})
}

type blockingQuerier struct {
queryStarted chan struct{}
queryFinished chan struct{}
queryBlocker chan struct{}
successfulQuery bool
}

func (s *blockingQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}

func (s *blockingQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}

func (s *blockingQuerier) Close() error {
return nil
}

func (s *blockingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (returnSeries storage.SeriesSet) {
select {
case <-s.queryStarted:
default:
close(s.queryStarted)
}

select {
case <-ctx.Done():
s.successfulQuery = false
returnSeries = storage.ErrSeriesSet(ctx.Err())
case <-s.queryBlocker:
s.successfulQuery = true
returnSeries = storage.EmptySeriesSet()
}

select {
case <-s.queryFinished:
default:
close(s.queryFinished)
}

return returnSeries
}

func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc {
if querierTestConfig != nil {
// disable active query tracking for test
Expand Down Expand Up @@ -322,6 +372,84 @@ func TestNotifierSendsUserIDHeader(t *testing.T) {
`), "prometheus_notifications_dropped_total"))
}

func TestRuler_TestShutdown(t *testing.T) {
tests := []struct {
name string
shutdownFn func(*blockingQuerier, *Ruler)
}{
{
name: "successful query after shutdown",
shutdownFn: func(querier *blockingQuerier, ruler *Ruler) {
// Wait query to start
<-querier.queryStarted

// The following cancel the context of the ruler service.
ruler.StopAsync()

// Simulate the completion of the query
close(querier.queryBlocker)

// Wait query to finish
<-querier.queryFinished

require.True(t, querier.successfulQuery, "query failed to complete successfully failed to complete")
},
},
{
name: "query timeout while shutdown",
shutdownFn: func(querier *blockingQuerier, ruler *Ruler) {
// Wait query to start
<-querier.queryStarted

// The following cancel the context of the ruler service.
ruler.StopAsync()

// Wait query to finish
<-querier.queryFinished

require.False(t, querier.successfulQuery, "query should not be succesfull")
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)
mockQuerier := &blockingQuerier{
queryBlocker: make(chan struct{}),
queryStarted: make(chan struct{}),
queryFinished: make(chan struct{}),
}
sleepQueriable := fixedQueryable(mockQuerier)

d := &querier.MockDistributor{}

d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{},
}, nil)
d.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Panic("This should not be called for the ruler use-cases.")

r := newTestRuler(t, cfg, store, &querier.TestConfig{
Distributor: d,
Stores: []querier.QueryableWithFilter{
querier.UseAlwaysQueryable(sleepQueriable),
},
})

test.shutdownFn(mockQuerier, r)

err := r.AwaitTerminated(context.Background())
require.NoError(t, err)

e := r.FailureCase()
require.NoError(t, e)
})
}

}

func TestRuler_Rules(t *testing.T) {
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)
Expand Down

0 comments on commit 6f53f67

Please sign in to comment.