diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bb2812d4b..d16ae03e1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ * [BUGFIX] Query-frontend: fix empty metric name matcher not being applied under certain conditions. #8076 * [BUGFIX] Querying: Fix regex matching of multibyte runes with dot operator. #8089 * [BUGFIX] Querying: matrix results returned from instant queries were not sorted by series. #8113 +* [BUGFIX] Query scheduler: Fix a crash in result marshaling. #8140 ### Mixin diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 930ef48525..6e30b8f866 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -204,6 +204,17 @@ func (s *Stats) Merge(other *Stats) { s.AddQueueTime(other.LoadQueueTime()) } +// Copy returns a copy of the stats. Use this rather than regular struct assignment +// to make sure atomic modifications are observed. +func (s *Stats) Copy() *Stats { + if s == nil { + return nil + } + c := &Stats{} + c.Merge(s) + return c +} + func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { // Do no track statistics for requests failed because of a server error. return r.Code < 500 diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index 7562e97b59..f45e90187e 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -178,3 +178,22 @@ func TestStats_Merge(t *testing.T) { assert.Equal(t, time.Duration(0), stats1.LoadQueueTime()) }) } + +func TestStats_Copy(t *testing.T) { + s1 := &Stats{ + WallTime: 1, + FetchedSeriesCount: 2, + FetchedChunkBytes: 3, + FetchedChunksCount: 4, + ShardedQueries: 5, + SplitQueries: 6, + FetchedIndexBytes: 7, + EstimatedSeriesCount: 8, + QueueTime: 9, + } + s2 := s1.Copy() + assert.NotSame(t, s1, s2) + assert.EqualValues(t, s1, s2) + + assert.Nil(t, (*Stats)(nil).Copy()) +} diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index e15187554b..9f6e5e4862 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -305,6 +305,10 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, response.Headers, hasStreamHeader = removeStreamingHeader(response.Headers) shouldStream := hasStreamHeader && sp.streamingEnabled && len(response.Body) > responseStreamingBodyChunkSizeBytes + // Protect against not-yet-exited querier handler goroutines that could + // still be incrementing stats when sent for marshaling below. + stats = stats.Copy() + for bof.Ongoing() { c, err = sp.frontendPool.GetClientFor(frontendAddress) if err != nil { diff --git a/pkg/querier/worker/scheduler_processor_test.go b/pkg/querier/worker/scheduler_processor_test.go index e5ae51a55b..17ee857639 100644 --- a/pkg/querier/worker/scheduler_processor_test.go +++ b/pkg/querier/worker/scheduler_processor_test.go @@ -258,7 +258,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) { } func TestSchedulerProcessor_QueryTime(t *testing.T) { - runTest := func(t *testing.T, statsEnabled bool) { + runTest := func(t *testing.T, statsEnabled bool, statsRace bool) { fp, processClient, requestHandler, frontend := prepareSchedulerProcessor(t) recvCount := atomic.NewInt64(0) @@ -291,6 +291,11 @@ func TestSchedulerProcessor_QueryTime(t *testing.T) { if statsEnabled { require.Equal(t, queueTime, stat.LoadQueueTime()) + + if statsRace { + // This triggers the race detector reliably if the same stats object is marshaled. + go stat.AddEstimatedSeriesCount(1) + } } else { require.Equal(t, time.Duration(0), stat.LoadQueueTime()) } @@ -306,11 +311,15 @@ func TestSchedulerProcessor_QueryTime(t *testing.T) { } t.Run("query stats enabled should record queue time", func(t *testing.T) { - runTest(t, true) + runTest(t, true, false) + }) + + t.Run("query stats enabled should not trigger race detector", func(t *testing.T) { + runTest(t, true, true) }) t.Run("query stats disabled will not record queue time", func(t *testing.T) { - runTest(t, false) + runTest(t, false, false) }) }