Skip to content

Commit

Permalink
tests: use remote write in query frontend tests (thanos-io#7017)
Browse files Browse the repository at this point in the history
Signed-off-by: hanyuting8 <[email protected]>
  • Loading branch information
pawarpranav83 authored and hanyuting8 committed Jan 19, 2024
1 parent 4bc5883 commit b69210c
Showing 1 changed file with 95 additions and 41 deletions.
136 changes: 95 additions & 41 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import (
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/queryfrontend"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

Expand Down Expand Up @@ -76,7 +74,7 @@ func TestQueryFrontend(t *testing.T) {
{Name: "replica", Value: "0"},
},
Samples: []prompb.Sample{
{Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)},
{Value: float64(1), Timestamp: timestamp.FromTime(predefTimestamp)},
}}},
i.Endpoint("remote-write"),
))
Expand Down Expand Up @@ -477,7 +475,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) {
{Name: "replica", Value: "0"},
},
Samples: []prompb.Sample{
{Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)},
{Value: float64(1), Timestamp: timestamp.FromTime(predefTimestamp)},
}}},
i.Endpoint("remote-write")))

Expand Down Expand Up @@ -576,10 +574,11 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) {
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "")
prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver")
i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

predefTimestamp := model.TimeFromUnixNano(time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC).UnixNano())

now := model.Now()
ctx := context.Background()
timeSeries := []labels.Labels{
{{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "1"}, {Name: "handler", Value: "/"}},
Expand All @@ -596,14 +595,36 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) {
{{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "6"}, {Name: "handler", Value: "/metrics"}},
}

startTime := now.Time().Add(-1 * time.Hour)
endTime := now.Time().Add(1 * time.Hour)
_, err = e2eutil.CreateBlock(ctx, prom.Dir(), timeSeries, 20, timestamp.FromTime(startTime), timestamp.FromTime(endTime), nil, 0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))
// Ensure labels are ordered.
for _, ts := range timeSeries {
sort.Slice(ts, func(i, j int) bool {
return ts[i].Name < ts[j].Name
})
}

samplespb := make([]prompb.TimeSeries, 0, len(timeSeries))
for _, labels := range timeSeries {
labelspb := make([]prompb.Label, 0, len(labels))
for _, label := range labels {
labelspb = append(labelspb, prompb.Label{
Name: string(label.Name),
Value: string(label.Value),
})
}
samplespb = append(samplespb, prompb.TimeSeries{
Labels: labelspb,
Samples: []prompb.Sample{
{
Value: float64(1),
Timestamp: timestamp.FromTime(predefTimestamp.Time()),
},
},
})
}

testutil.Ok(t, remoteWrite(ctx, samplespb, i.Endpoint("remote-write")))

stores := []string{sidecar.InternalEndpoint("grpc")}
q1 := e2ethanos.NewQuerierBuilder(e, "q1", stores...).Init()
q1 := e2ethanos.NewQuerierBuilder(e, "q1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q1))

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Expand All @@ -625,13 +646,16 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) {
qryFunc := func() string { return `sum by (pod) (http_requests_total)` }
queryOpts := promclient.QueryOptions{Deduplicate: true}

startTime := timestamp.FromTime(predefTimestamp.Time().Add(-1 * time.Hour))
endTime := timestamp.FromTime(predefTimestamp.Time().Add(1 * time.Hour))

var resultWithoutSharding model.Matrix
rangeQuery(t, ctx, q1.Endpoint("http"), qryFunc, timestamp.FromTime(startTime), timestamp.FromTime(endTime), 30, queryOpts, func(res model.Matrix) error {
rangeQuery(t, ctx, q1.Endpoint("http"), qryFunc, startTime, endTime, 30, queryOpts, func(res model.Matrix) error {
resultWithoutSharding = res
return nil
})
var resultWithSharding model.Matrix
rangeQuery(t, ctx, qfe.Endpoint("http"), qryFunc, timestamp.FromTime(startTime), timestamp.FromTime(endTime), 30, queryOpts, func(res model.Matrix) error {
rangeQuery(t, ctx, qfe.Endpoint("http"), qryFunc, startTime, endTime, 30, queryOpts, func(res model.Matrix) error {
resultWithSharding = res
return nil
})
Expand All @@ -646,12 +670,12 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) {
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

now := time.Now()
predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC)

prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))
i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

querier := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init()
querier := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(querier))

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Expand All @@ -676,17 +700,32 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "myself"},
{Name: "prometheus", Value: "test"},
{Name: "replica", Value: "0"},
},
Samples: []prompb.Sample{
{Value: float64(1), Timestamp: timestamp.FromTime(predefTimestamp)},
}}},
i.Endpoint("remote-write")))

testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, querier.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, querier.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"replica": "0",
"tenant_id": "default-tenant",
},
})

Expand All @@ -696,8 +735,8 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) {
ctx,
queryFrontend.Endpoint("http"),
e2ethanos.QueryUpWithoutInstance,
timestamp.FromTime(now.Add(-time.Hour)),
timestamp.FromTime(now.Add(time.Hour)),
timestamp.FromTime(predefTimestamp.Add(-time.Hour)),
timestamp.FromTime(predefTimestamp.Add(time.Hour)),
14,
promclient.QueryOptions{
Deduplicate: true,
Expand All @@ -717,18 +756,17 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) {
))

// make sure that we don't break cortex cache code.
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(0), "cortex_cache_hits_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_misses_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_misses_total"))

// Query interval is 2 hours, which is greater than min-slit-interval, query will be broken down into 4 parts
// + rest (of interval)
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(5), "thanos_frontend_split_queries_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "thanos_frontend_split_queries_total"))

testutil.Ok(t, querier.WaitSumMetricsWithOptions(
e2emon.Equals(5),
e2emon.Equals(4),
[]string{"http_requests_total"},
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range")),
))
Expand All @@ -741,11 +779,12 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) {
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "")
prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver")
i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

now := model.Now()
predefTimestamp := model.TimeFromUnixNano(time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC).UnixNano())
ctx := context.Background()

timeSeries := []labels.Labels{
{{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "1"}, {Name: "handler", Value: "/"}},
{{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "1"}, {Name: "handler", Value: "/metrics"}},
Expand All @@ -768,14 +807,29 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) {
})
}

startTime := now.Time().Add(-1 * time.Hour)
endTime := now.Time().Add(1 * time.Hour)
_, err = e2eutil.CreateBlock(ctx, prom.Dir(), timeSeries, 20, timestamp.FromTime(startTime), timestamp.FromTime(endTime), nil, 0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))
samplespb := make([]prompb.TimeSeries, 0, len(timeSeries))
for _, labels := range timeSeries {
labelspb := make([]prompb.Label, 0, len(labels))
for _, label := range labels {
labelspb = append(labelspb, prompb.Label{
Name: string(label.Name),
Value: string(label.Value),
})
}
samplespb = append(samplespb, prompb.TimeSeries{
Labels: labelspb,
Samples: []prompb.Sample{
{
Value: float64(1),
Timestamp: timestamp.FromTime(predefTimestamp.Time()),
},
},
})
}

testutil.Ok(t, remoteWrite(ctx, samplespb, i.Endpoint("remote-write")))

stores := []string{sidecar.InternalEndpoint("grpc")}
q1 := e2ethanos.NewQuerierBuilder(e, "q1", stores...).Init()
q1 := e2ethanos.NewQuerierBuilder(e, "q1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q1))

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Expand Down Expand Up @@ -848,10 +902,10 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
resultWithoutSharding := instantQuery(t, ctx, q1.Endpoint("http"), tc.qryFunc, func() time.Time {
return now.Time()
return predefTimestamp.Time()
}, queryOpts, tc.expectedSeries)
resultWithSharding := instantQuery(t, ctx, qfe.Endpoint("http"), tc.qryFunc, func() time.Time {
return now.Time()
return predefTimestamp.Time()
}, queryOpts, tc.expectedSeries)
testutil.Equals(t, resultWithoutSharding, resultWithSharding)
})
Expand Down

0 comments on commit b69210c

Please sign in to comment.