Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: Fix regression on usage of cortex_ingester_queried_chunks #6398

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
* [BUGFIX] ingester: Fix regression on usage of cortex_ingester_queried_chunks #6398

## 1.18.1 2024-10-14

Expand Down
28 changes: 16 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,18 +1968,21 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
numSamples := 0
numSeries := 0
totalDataBytes := 0
numSeries, numSamples, totalDataBytes, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)
numChunks := 0
numSeries, numSamples, totalDataBytes, numChunks, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)

if err != nil {
return err
}

i.metrics.queriedSeries.Observe(float64(numSeries))
i.metrics.queriedSamples.Observe(float64(numSamples))
level.Debug(spanlog).Log("series", numSeries, "samples", numSamples, "data_bytes", totalDataBytes)
i.metrics.queriedChunks.Observe(float64(numChunks))
level.Debug(spanlog).Log("series", numSeries, "samples", numSamples, "data_bytes", totalDataBytes, "chunks", numChunks)
spanlog.SetTag("series", numSeries)
spanlog.SetTag("samples", numSamples)
spanlog.SetTag("data_bytes", totalDataBytes)
spanlog.SetTag("chunks", numChunks)
return nil
}

Expand All @@ -1998,16 +2001,16 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) {
}

// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) {
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes, numChunks int, _ error) {
q, err := db.ChunkQuerier(from, through)
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}
defer q.Close()

c, err := i.trackInflightQueryRequest()
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}
hints := &storage.SelectHints{
Start: from,
Expand All @@ -2018,7 +2021,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
ss := q.Select(ctx, false, hints, matchers...)
c()
if ss.Err() != nil {
return 0, 0, 0, ss.Err()
return 0, 0, 0, 0, ss.Err()
}

chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
Expand All @@ -2044,7 +2047,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
// It is not guaranteed that chunk returned by iterator is populated.
// For now just return error. We could also try to figure out how to read the chunk.
if meta.Chunk == nil {
return 0, 0, 0, errors.Errorf("unfilled chunk returned from TSDB chunk querier")
return 0, 0, 0, 0, errors.Errorf("unfilled chunk returned from TSDB chunk querier")
}

ch := client.Chunk{
Expand All @@ -2061,10 +2064,11 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
case chunkenc.EncFloatHistogram:
ch.Encoding = int32(encoding.PrometheusFloatHistogramChunk)
default:
return 0, 0, 0, errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding())
return 0, 0, 0, 0, errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding())
}

ts.Chunks = append(ts.Chunks, ch)
numChunks++
numSamples += meta.Chunk.NumSamples()
}
numSeries++
Expand All @@ -2078,7 +2082,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
Chunkseries: chunkSeries,
})
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}

batchSizeBytes = 0
Expand All @@ -2091,7 +2095,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}

// Final flush any existing metrics
Expand All @@ -2100,11 +2104,11 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
Chunkseries: chunkSeries,
})
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}
}

return numSeries, numSamples, totalBatchSizeBytes, nil
return numSeries, numSamples, totalBatchSizeBytes, numChunks, nil
}

func (i *Ingester) getTSDB(userID string) *userTSDB {
Expand Down
20 changes: 19 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3087,7 +3087,8 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
// Create ingester.
cfg := defaultIngesterTestConfig(t)

i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
reg := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -3154,6 +3155,7 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
recvMsgs := 0
series := 0
totalSamples := 0
totalChunks := 0

for {
resp, err := s.Recv()
Expand All @@ -3174,6 +3176,7 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
require.NoError(t, err)
totalSamples += chk.NumSamples()
}
totalChunks += len(ts.Chunks)
}
}

Expand All @@ -3183,6 +3186,21 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
require.True(t, 2 <= recvMsgs && recvMsgs <= 3)
require.Equal(t, 3, series)
require.Equal(t, 100000+500000+samplesCount, totalSamples)
require.Equal(t, 13335, totalChunks)
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ingester_queried_chunks The total number of chunks returned from queries.
# TYPE cortex_ingester_queried_chunks histogram
cortex_ingester_queried_chunks_bucket{le="10"} 0
cortex_ingester_queried_chunks_bucket{le="80"} 0
cortex_ingester_queried_chunks_bucket{le="640"} 0
cortex_ingester_queried_chunks_bucket{le="5120"} 0
cortex_ingester_queried_chunks_bucket{le="40960"} 1
cortex_ingester_queried_chunks_bucket{le="327680"} 1
cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 1
cortex_ingester_queried_chunks_bucket{le="+Inf"} 1
cortex_ingester_queried_chunks_sum 13335
cortex_ingester_queried_chunks_count 1
`), `cortex_ingester_queried_chunks`))
}

func writeRequestSingleSeries(lbls labels.Labels, samples []cortexpb.Sample) *cortexpb.WriteRequest {
Expand Down
Loading