Skip to content

Commit

Permalink
tests: use remote write in query frontend tests (thanos-io#6998)
Browse files Browse the repository at this point in the history
Signed-off-by: hanyuting8 <[email protected]>
  • Loading branch information
pawarpranav83 authored and hanyuting8 committed Jan 19, 2024
1 parent 665e643 commit 4e2fdec
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 38 deletions.
116 changes: 78 additions & 38 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cacheutil"
Expand All @@ -41,12 +42,13 @@ func TestQueryFrontend(t *testing.T) {
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

now := time.Now()
// Predefined Timestamp
predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC)

prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))
i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init()
q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Expand All @@ -64,17 +66,34 @@ func TestQueryFrontend(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

// Writing a custom Timeseries into the receiver
testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "myself"},
{Name: "prometheus", Value: "test"},
{Name: "replica", Value: "0"},
},
Samples: []prompb.Sample{
{Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)},
}}},
i.Endpoint("remote-write"),
))

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"replica": "0",
"tenant_id": "default-tenant",
},
})

Expand All @@ -86,13 +105,15 @@ func TestQueryFrontend(t *testing.T) {
queryTimes := vals[0]

t.Run("query frontend works for instant query", func(t *testing.T) {
queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"replica": "0",
"tenant_id": "default-tenant",
},
})

Expand All @@ -115,8 +136,8 @@ func TestQueryFrontend(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-time.Hour)),
timestamp.FromTime(predefTimestamp.Add(time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
Expand Down Expand Up @@ -159,8 +180,8 @@ func TestQueryFrontend(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-time.Hour)),
timestamp.FromTime(predefTimestamp.Add(time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
Expand All @@ -181,7 +202,7 @@ func TestQueryFrontend(t *testing.T) {
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_cache_hits_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_gets_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total"))
Expand All @@ -192,9 +213,8 @@ func TestQueryFrontend(t *testing.T) {
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "query_range"))),
)

// One more request is needed in order to satisfy the req range.
testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(2),
e2emon.Equals(1),
[]string{"http_requests_total"},
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))),
)
Expand All @@ -206,8 +226,8 @@ func TestQueryFrontend(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(24*time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-time.Hour)),
timestamp.FromTime(predefTimestamp.Add(24*time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
Expand All @@ -225,13 +245,13 @@ func TestQueryFrontend(t *testing.T) {
[]string{"thanos_query_frontend_queries_total"},
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range"))),
)
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_hits_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_gets_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_entries"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_gets_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_misses_total"))

// Query is 25h so it will be split to 2 requests.
testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions(
Expand All @@ -240,15 +260,15 @@ func TestQueryFrontend(t *testing.T) {
)

testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(4),
e2emon.Equals(3),
[]string{"http_requests_total"},
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))),
)
})

t.Run("query frontend splitting works for labels names API", func(t *testing.T) {
// LabelNames and LabelValues API should still work via query frontend.
labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool {
return len(res) > 0
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -267,7 +287,7 @@ func TestQueryFrontend(t *testing.T) {
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))),
)

labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool {
return len(res) > 0
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -288,7 +308,7 @@ func TestQueryFrontend(t *testing.T) {
})

t.Run("query frontend splitting works for labels values API", func(t *testing.T) {
labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool {
return len(res) == 1 && res[0] == "localhost:9090"
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -307,7 +327,7 @@ func TestQueryFrontend(t *testing.T) {
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))),
)

labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool {
return len(res) == 1 && res[0] == "localhost:9090"
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -333,8 +353,8 @@ func TestQueryFrontend(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
[]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")},
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-time.Hour)),
timestamp.FromTime(predefTimestamp.Add(time.Hour)),
func(res []map[string]string) bool {
if len(res) != 1 {
return false
Expand All @@ -345,6 +365,8 @@ func TestQueryFrontend(t *testing.T) {
"instance": "localhost:9090",
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"tenant_id": "default-tenant",
})
},
)
Expand All @@ -369,8 +391,8 @@ func TestQueryFrontend(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
[]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")},
timestamp.FromTime(now.Add(-24*time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)),
timestamp.FromTime(predefTimestamp.Add(time.Hour)),
func(res []map[string]string) bool {
if len(res) != 1 {
return false
Expand All @@ -381,6 +403,8 @@ func TestQueryFrontend(t *testing.T) {
"instance": "localhost:9090",
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"tenant_id": "default-tenant",
})
},
)
Expand Down Expand Up @@ -409,12 +433,13 @@ func TestQueryFrontendMemcachedCache(t *testing.T) {
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

now := time.Now()
// Predefined timestamp
predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC)

prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))
i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init()
q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

memcached := e2ethanos.NewMemcached(e, "1")
Expand Down Expand Up @@ -443,19 +468,34 @@ func TestQueryFrontendMemcachedCache(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "myself"},
{Name: "prometheus", Value: "test"},
{Name: "replica", Value: "0"},
},
Samples: []prompb.Sample{
{Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)},
}}},
i.Endpoint("remote-write")))

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_memcache_client_servers"))

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"replica": "0",
"tenant_id": "default-tenant",
},
})

Expand All @@ -469,8 +509,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-time.Hour)),
timestamp.FromTime(predefTimestamp.Add(time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
Expand Down Expand Up @@ -501,8 +541,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-time.Hour)),
timestamp.FromTime(predefTimestamp.Add(time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
Expand Down
34 changes: 34 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package e2e_test

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -1720,6 +1721,39 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q func() string,
return retExplanation
}

// Performs a remote write at the receiver external endpoint.
func remoteWrite(ctx context.Context, timeseries []prompb.TimeSeries, addr string) error {
// Create write request
data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries})
if err != nil {
return err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/receive", addr), bytes.NewReader(compressed))
if err != nil {
return err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

// Execute HTTP request
res, err := promclient.NewDefaultClient().HTTPClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer runutil.ExhaustCloseWithErrCapture(&err, res.Body, "%s: close body", req.URL.String())

if res.StatusCode/100 != 2 {
return errors.Errorf("request failed with code %s", res.Status)
}

return nil
}

func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) error) {
t.Helper()

Expand Down

0 comments on commit 4e2fdec

Please sign in to comment.