Skip to content

Commit

Permalink
Fixing review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Mar 19, 2024
1 parent 9805602 commit 010339b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 63 deletions.
6 changes: 5 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4836,6 +4836,10 @@ type prepConfig struct {
ingestStoragePartitions int32 // Number of partitions. Auto-detected from configured ingesters if not explicitly set.
ingestStorageKafka *kfake.Cluster

// We need this setting to simulate a response from ingesters that didn't support responding
// with a stream of chunks, and were responding with chunk series instead. This is needed to
// ensure backwards compatibility, i.e., that queriers can still correctly handle both types
// or responses.
disableStreamingResponse bool
}

Expand Down Expand Up @@ -5805,7 +5809,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
}
}

if i.disableStreamingResponse {
if i.disableStreamingResponse || req.StreamingChunksBatchSize == 0 {
nonStreamingResponses = append(nonStreamingResponses, &client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
{
Expand Down
102 changes: 46 additions & 56 deletions pkg/distributor/query_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/test"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -489,71 +488,62 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {

for testName, testData := range tests {
testData := testData
for _, disableStreamingResponse := range []bool{false, true} {
disableStreamingResponse := disableStreamingResponse
t.Run(fmt.Sprintf("%s, streaming response disabled: %v", testName, disableStreamingResponse), func(t *testing.T) {
t.Parallel()

limits := prepareDefaultLimits()
limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize
t.Run(testName, func(t *testing.T) {
t.Parallel()

cfg := prepConfig{
numDistributors: 1,
ingestStorageEnabled: true,
ingesterStateByZone: testData.ingesterStateByZone,
ingesterDataByZone: testData.ingesterDataByZone,
ingesterDataTenantID: tenantID,
queryDelay: 250 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received.
replicationFactor: 1, // Ingest storage is not expected to use it.
limits: limits,
disableStreamingResponse: disableStreamingResponse,
configure: func(config *Config) {
config.PreferAvailabilityZone = testData.preferZone
config.MinimizeIngesterRequests = testData.minimizeIngesterRequests
},
}
limits := prepareDefaultLimits()
limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize

distributors, ingesters, distributorRegistries, _ := prepare(t, cfg)
require.Len(t, distributors, 1)
require.Len(t, distributorRegistries, 1)
cfg := prepConfig{
numDistributors: 1,
ingestStorageEnabled: true,
ingesterStateByZone: testData.ingesterStateByZone,
ingesterDataByZone: testData.ingesterDataByZone,
ingesterDataTenantID: tenantID,
queryDelay: 250 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received.
replicationFactor: 1, // Ingest storage is not expected to use it.
limits: limits,
configure: func(config *Config) {
config.PreferAvailabilityZone = testData.preferZone
config.MinimizeIngesterRequests = testData.minimizeIngesterRequests
},
}

// Query ingesters.
queryMetrics := stats.NewQueryMetrics(distributorRegistries[0])
resp, err := distributors[0].QueryStream(ctx, queryMetrics, 0, 10, testData.matchers...)
distributors, ingesters, distributorRegistries, _ := prepare(t, cfg)
require.Len(t, distributors, 1)
require.Len(t, distributorRegistries, 1)

if testData.expectedErr == nil {
require.NoError(t, err)
} else {
assert.ErrorIs(t, err, testData.expectedErr)
// Query ingesters.
queryMetrics := stats.NewQueryMetrics(distributorRegistries[0])
resp, err := distributors[0].QueryStream(ctx, queryMetrics, 0, 10, testData.matchers...)

// Assert that downstream gRPC statuses are passed back upstream.
_, expectedIsGRPC := grpcutil.ErrorToStatus(testData.expectedErr)
if expectedIsGRPC {
_, actualIsGRPC := grpcutil.ErrorToStatus(err)
assert.True(t, actualIsGRPC, fmt.Sprintf("expected error to be a status error, but got: %T", err))
}
}
if testData.expectedErr == nil {
require.NoError(t, err)
} else {
assert.ErrorIs(t, err, testData.expectedErr)

var responseMatrix model.Matrix
if len(resp.Chunkseries) == 0 {
responseMatrix, err = ingester_client.StreamingSeriesToMatrix(0, 5, resp.StreamingSeries)
} else {
responseMatrix, err = ingester_client.TimeSeriesChunksToMatrix(0, 5, resp.Chunkseries)
// Assert that downstream gRPC statuses are passed back upstream.
_, expectedIsGRPC := grpcutil.ErrorToStatus(testData.expectedErr)
if expectedIsGRPC {
_, actualIsGRPC := grpcutil.ErrorToStatus(err)
assert.True(t, actualIsGRPC, fmt.Sprintf("expected error to be a status error, but got: %T", err))
}
assert.NoError(t, err)
assert.Equal(t, testData.expectedResponse.String(), responseMatrix.String())
}

// Check how many ingesters have been queried.
// Because we return immediately on failures, it might take some time for all ingester calls to register.
test.Poll(t, 4*cfg.queryDelay, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") })
var responseMatrix model.Matrix
if len(resp.Chunkseries) == 0 {
responseMatrix, err = ingester_client.StreamingSeriesToMatrix(0, 5, resp.StreamingSeries)
} else {
responseMatrix, err = ingester_client.TimeSeriesChunksToMatrix(0, 5, resp.Chunkseries)
}
assert.NoError(t, err)
assert.Equal(t, testData.expectedResponse.String(), responseMatrix.String())

if disableStreamingResponse {
// We expected the number of non-deduplicated chunks to be equal to the number of queried series
// given we expect 1 chunk per series.
assert.Equal(t, float64(testData.expectedResponse.Len()), testutil.ToFloat64(queryMetrics.IngesterChunksTotal)-testutil.ToFloat64(queryMetrics.IngesterChunksDeduplicated))
}
})
}
// Check how many ingesters have been queried.
// Because we return immediately on failures, it might take some time for all ingester calls to register.
test.Poll(t, 4*cfg.queryDelay, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") })
})
}
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/ingester/client/chunkcompat.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,21 @@ func StreamingSeriesToMatrix(from, through model.Time, sSeries []StreamingSeries
}

result := model.Matrix{}
var chunks []Chunk
for _, series := range sSeries {
chunks = chunks[:0]
for sourceIdx, source := range series.Sources {
chunks, err := source.StreamReader.GetChunks(source.SeriesIndex)
sourceChunks, err := source.StreamReader.GetChunks(source.SeriesIndex)
if err != nil {
return nil, fmt.Errorf("GetChunks() from stream reader for series %d from source %d: %w", source.SeriesIndex, sourceIdx, err)
}
stream, err := seriesChunksToMatrix(from, through, series.Labels, chunks)
if err != nil {
return nil, err
}
result = append(result, stream)
chunks = append(chunks, sourceChunks...)
}
stream, err := seriesChunksToMatrix(from, through, series.Labels, chunks)
if err != nil {
return nil, err
}
result = append(result, stream)
}
return result, nil
}
Expand Down

0 comments on commit 010339b

Please sign in to comment.