Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional querier response streaming #7173

Merged
merged 32 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0df6aa6
Allow streaming responses from querier to query-frontend
flxbk Jan 19, 2024
ff19c96
reuse response body for stream chunks
flxbk Feb 12, 2024
369513c
remove unneeded doc string
flxbk Feb 12, 2024
a87c5f5
delete internal header
flxbk Feb 12, 2024
78c0361
assert on response content
flxbk Feb 20, 2024
1b14164
defer request cleanup
flxbk Feb 20, 2024
e1af708
rename wireType to queryResult
flxbk Feb 20, 2024
000551c
extract struct construction into separate statement
flxbk Feb 20, 2024
ee941e0
naming (removeStreamingHeader)
flxbk Feb 20, 2024
4959ec3
godoc for `ResponseStreamingEnabledHeader`
flxbk Feb 20, 2024
35af831
improve query frontend test
flxbk Feb 20, 2024
c1c2796
refactor integration test
flxbk Feb 20, 2024
dda46ba
make frontend test more readable
flxbk Feb 21, 2024
4a49a79
stream only responses whose size exceeds the chunk size limit
flxbk Feb 21, 2024
cf4626f
improve usage string for `querier.response-streaming-enabled`
flxbk Feb 21, 2024
fcd80f9
reduce cardinality of integration test flags
flxbk Feb 21, 2024
d73516a
add test case for frontend
flxbk Feb 21, 2024
53a060f
do not use defer in cleanup
flxbk Feb 22, 2024
425b28f
add test for context cancellation while streaming
flxbk Feb 22, 2024
c4f1361
check for leaking goroutines
flxbk Feb 22, 2024
bc52bc9
test aborting stream on query cancellation
flxbk Feb 22, 2024
4881608
use `VerifyNoLeakTestMain` for frontend/v2 tests
flxbk Feb 23, 2024
316ef6c
add logging for aborted response stream
flxbk Feb 23, 2024
33255f9
cancel only query context
flxbk Feb 23, 2024
bf205e7
add test for non-cancellation error during streaming
flxbk Feb 23, 2024
112ac43
add test for frontend returning erro during stream
flxbk Feb 23, 2024
dd5581f
improve logs for stream abortion
flxbk Feb 23, 2024
15e0c70
assert that response body is streamed in full on worker context cance…
flxbk Feb 23, 2024
3ce483d
CHANGELOG.md
flxbk Feb 23, 2024
25d5111
about-versioning.md
flxbk Feb 23, 2024
0d40276
split up tests for worker cancellation / scheduler client error
flxbk Feb 23, 2024
a7edf35
PR number
flxbk Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [FEATURE] Querier: added `histogram_avg()` function support to PromQL. #7293
* [FEATURE] Ingester: added `-blocks-storage.tsdb.timely-head-compaction` flag, which enables more timely head compaction, and defaults to `false`. #7372
* [FEATURE] Compactor: Added `/compactor/tenants` and `/compactor/tenant/{tenant}/planned_jobs` endpoints that provide functionality that was provided by `tools/compaction-planner` -- listing of planned compaction jobs based on tenants' bucket index. #7381
* [FEATURE] Add experimental support for streaming response bodies from queriers to frontends via `-querier.response-streaming-enabled`. This is currently only supported for the `/api/v1/cardinality/active_series` endpoint. #7173
* [ENHANCEMENT] Distributor: Add a new metric `cortex_distributor_otlp_requests_total` to track the total number of OTLP requests. #7385
* [ENHANCEMENT] Vault: add lifecycle manager for token used to authenticate to Vault. This ensures the client token is always valid. Includes a gauge (`cortex_vault_token_lease_renewal_active`) to check whether token renewal is active, and the counters `cortex_vault_token_lease_renewal_success_total` and `cortex_vault_auth_success_total` to see the total number of successful lease renewals / authentications. #7337
* [ENHANCEMENT] Store-gateway: add no-compact details column on store-gateway tenants admin UI. #6848
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4622,6 +4622,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "response_streaming_enabled",
"required": false,
"desc": "Enables streaming of responses from querier to query-frontend for response types that support it (currently only `active_series` responses do).",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "querier.response-streaming-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,8 @@ Usage of ./cmd/mimir/mimir:
Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester. (default 13h)
-querier.query-store-after duration
The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'. (default 12h0m0s)
-querier.response-streaming-enabled
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
[experimental] Enables streaming of responses from querier to query-frontend for response types that support it (currently only `active_series` responses do).
-querier.scheduler-address string
Address of the query-scheduler component, in host:port format. The host should resolve to all query-scheduler instances. This option should be set only when query-scheduler component is in use and -query-scheduler.service-discovery-mode is set to 'dns'.
-querier.scheduler-client.backoff-max-period duration
Expand Down
2 changes: 2 additions & 0 deletions development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ frontend:
compression: snappy

frontend_worker:
response_streaming_enabled: true

# Uncomment when using "dns" service discovery mode for query-scheduler.
# scheduler_address: "query-scheduler:9011"

Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ The following features are currently experimental:
- Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`)
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
- Enable PromQL experimental functions (`-querier.promql-experimental-functions-enabled`)
- Allow streaming of `/active_series` responses to the frontend (`-querier.response-streaming-enabled`)
- Query-frontend
- `-query-frontend.querier-forget-delay`
- Instant query splitting (`-query-frontend.split-instant-queries-by-interval`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2554,6 +2554,12 @@ The `frontend_worker` block configures the worker running within the querier, pi
# query-scheduler.
# The CLI flags prefix for this block configuration is: querier.scheduler-client
[query_scheduler_grpc_client_config: <grpc_client>]

# (experimental) Enables streaming of responses from querier to query-frontend
# for response types that support it (currently only `active_series` responses
# do).
# CLI flag: -querier.response-streaming-enabled
[response_streaming_enabled: <boolean> | default = false]
```

### etcd
Expand Down
62 changes: 45 additions & 17 deletions integration/query_frontend_active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integration

import (
"fmt"
"net/http"
"strconv"
"testing"
Expand All @@ -19,21 +20,40 @@ import (
"github.com/grafana/mimir/integration/e2emimir"
)

func TestActiveSeriesWithQueryShardingHTTP(t *testing.T) {
config := queryFrontendTestConfig{
queryStatsEnabled: true,
querySchedulerEnabled: true,
querySchedulerDiscoveryMode: "ring",
setup: func(t *testing.T, s *e2e.Scenario) (string, map[string]string) {
flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags())
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

return "", flags
},
func TestActiveSeriesWithQuerySharding(t *testing.T) {
for _, tc := range []struct {
querySchedulerEnabled bool
shardingEnabled bool
}{
{true, false},
{true, true},
{false, true},
} {
config := queryFrontendTestConfig{
queryStatsEnabled: true,
shardActiveSeriesQueries: tc.shardingEnabled,
querySchedulerEnabled: tc.querySchedulerEnabled,
querySchedulerDiscoveryMode: "ring",
setup: func(t *testing.T, s *e2e.Scenario) (string, map[string]string) {
flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(),
map[string]string{
"-querier.response-streaming-enabled": "true",
},
)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

return "", flags
},
}

testName := fmt.Sprintf("query scheduler=%v/query sharding=%v",
tc.querySchedulerEnabled, tc.shardingEnabled,
)
t.Run(testName, func(t *testing.T) {
runTestActiveSeriesWithQueryShardingHTTPTest(t, config)
})
}

runTestActiveSeriesWithQueryShardingHTTPTest(t, config)
}

func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFrontendTestConfig) {
Expand All @@ -54,8 +74,8 @@ func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFronten
"-query-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
"-query-frontend.query-sharding-total-shards": "32",
"-query-frontend.query-sharding-max-sharded-queries": "128",
"-query-frontend.shard-active-series-queries": "true",
"-querier.cardinality-analysis-enabled": "true",
"-querier.max-concurrent": "128",
})

// Start the query-scheduler if enabled.
Expand All @@ -74,6 +94,10 @@ func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFronten
require.NoError(t, s.StartAndWaitReady(queryScheduler))
}

if cfg.shardActiveSeriesQueries {
flags["-query-frontend.shard-active-series-queries"] = "true"
}

// Start the query-frontend.
queryFrontend := e2emimir.NewQueryFrontend("query-frontend", flags, e2emimir.WithConfigFile(configFile))
require.NoError(t, s.Start(queryFrontend))
Expand Down Expand Up @@ -134,6 +158,10 @@ func runTestActiveSeriesWithQueryShardingHTTPTest(t *testing.T, cfg queryFronten
}

_, err = c.ActiveSeries(metricName, e2emimir.WithQueryShards(512))
require.Error(t, err)
require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)")
if cfg.shardActiveSeriesQueries {
require.Error(t, err)
require.Contains(t, err.Error(), "shard count 512 exceeds allowed maximum (128)")
} else {
require.NoError(t, err)
}
}
1 change: 1 addition & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type queryFrontendTestConfig struct {
queryStatsEnabled bool
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
withHistograms bool
shardActiveSeriesQueries bool
}

func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
Expand Down
31 changes: 25 additions & 6 deletions pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"context"
"io"
"net/http"
"strconv"

"github.com/grafana/dskit/httpgrpc"
)

// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, io.ReadCloser, error)
}

func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
Expand All @@ -43,22 +44,40 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
return nil, err
}

resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
resp, body, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
if err != nil {
var ok bool
if resp, ok = httpgrpc.HTTPResponseFromError(err); !ok {
return nil, err
}
}

var respBody io.ReadCloser
if body != nil {
respBody = body
} else {
respBody = &buffer{buff: resp.Body, ReadCloser: io.NopCloser(bytes.NewReader(resp.Body))}
}

httpResp := &http.Response{
StatusCode: int(resp.Code),
Body: &buffer{buff: resp.Body, ReadCloser: io.NopCloser(bytes.NewReader(resp.Body))},
Header: http.Header{},
ContentLength: int64(len(resp.Body)),
StatusCode: int(resp.Code),
Body: respBody,
Header: http.Header{},
}
for _, h := range resp.Headers {
httpResp.Header[h.Key] = h.Values
}

contentLength := -1
if len(resp.Body) > 0 {
contentLength = len(resp.Body)
} else if l := httpResp.Header.Get("Content-Length"); l != "" {
cl, err := strconv.Atoi(l)
if err != nil {
contentLength = cl
}
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}
httpResp.ContentLength = int64(contentLength)

return httpResp, nil
}
13 changes: 7 additions & 6 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"flag"
"fmt"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -162,14 +163,14 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) {
}

// RoundTripGRPC round trips a proto (instead of an HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, io.ReadCloser, error) {
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
carrier := (*httpgrpcutil.HttpgrpcHeadersCarrier)(req)
err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier)
if err != nil {
return nil, err
return nil, nil, err
}
}

Expand All @@ -185,18 +186,18 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
}

if err := f.queueRequest(ctx, &request); err != nil {
return nil, err
return nil, nil, err
}

select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, nil, ctx.Err()

case resp := <-request.response:
return resp, nil
return resp, nil, nil

case err := <-request.err:
return nil, err
return nil, nil, err
}
}

Expand Down
Loading
Loading