From b69210c8247b4c73919c6f3d7461f67b60076dad Mon Sep 17 00:00:00 2001 From: Pranav <101933072+pawarpranav83@users.noreply.github.com> Date: Tue, 2 Jan 2024 11:08:21 +0530 Subject: [PATCH] tests: use remote write in query frontend tests (#7017) Signed-off-by: hanyuting8 --- test/e2e/query_frontend_test.go | 136 ++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 41 deletions(-) diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 5a338dd2f1..e14832d8b9 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -26,12 +26,10 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/queryfrontend" "github.com/thanos-io/thanos/pkg/tenancy" - "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -76,7 +74,7 @@ func TestQueryFrontend(t *testing.T) { {Name: "replica", Value: "0"}, }, Samples: []prompb.Sample{ - {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + {Value: float64(1), Timestamp: timestamp.FromTime(predefTimestamp)}, }}}, i.Endpoint("remote-write"), )) @@ -477,7 +475,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { {Name: "replica", Value: "0"}, }, Samples: []prompb.Sample{ - {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + {Value: float64(1), Timestamp: timestamp.FromTime(predefTimestamp)}, }}}, i.Endpoint("remote-write"))) @@ -576,10 +574,11 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "") - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) + + predefTimestamp := model.TimeFromUnixNano(time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC).UnixNano()) - now := model.Now() ctx := context.Background() timeSeries := []labels.Labels{ {{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "1"}, {Name: "handler", Value: "/"}}, @@ -596,14 +595,36 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) { {{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "6"}, {Name: "handler", Value: "/metrics"}}, } - startTime := now.Time().Add(-1 * time.Hour) - endTime := now.Time().Add(1 * time.Hour) - _, err = e2eutil.CreateBlock(ctx, prom.Dir(), timeSeries, 20, timestamp.FromTime(startTime), timestamp.FromTime(endTime), nil, 0, metadata.NoneFunc) - testutil.Ok(t, err) - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + // Ensure labels are ordered. + for _, ts := range timeSeries { + sort.Slice(ts, func(i, j int) bool { + return ts[i].Name < ts[j].Name + }) + } + + samplespb := make([]prompb.TimeSeries, 0, len(timeSeries)) + for _, labels := range timeSeries { + labelspb := make([]prompb.Label, 0, len(labels)) + for _, label := range labels { + labelspb = append(labelspb, prompb.Label{ + Name: string(label.Name), + Value: string(label.Value), + }) + } + samplespb = append(samplespb, prompb.TimeSeries{ + Labels: labelspb, + Samples: []prompb.Sample{ + { + Value: float64(1), + Timestamp: timestamp.FromTime(predefTimestamp.Time()), + }, + }, + }) + } + + testutil.Ok(t, remoteWrite(ctx, samplespb, i.Endpoint("remote-write"))) - stores := []string{sidecar.InternalEndpoint("grpc")} - q1 := e2ethanos.NewQuerierBuilder(e, "q1", stores...).Init() + q1 := e2ethanos.NewQuerierBuilder(e, "q1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q1)) inMemoryCacheConfig := queryfrontend.CacheProviderConfig{ @@ -625,13 +646,16 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) { qryFunc := func() string { return `sum by (pod) (http_requests_total)` } queryOpts := promclient.QueryOptions{Deduplicate: true} + startTime := timestamp.FromTime(predefTimestamp.Time().Add(-1 * time.Hour)) + endTime := timestamp.FromTime(predefTimestamp.Time().Add(1 * time.Hour)) + var resultWithoutSharding model.Matrix - rangeQuery(t, ctx, q1.Endpoint("http"), qryFunc, timestamp.FromTime(startTime), timestamp.FromTime(endTime), 30, queryOpts, func(res model.Matrix) error { + rangeQuery(t, ctx, q1.Endpoint("http"), qryFunc, startTime, endTime, 30, queryOpts, func(res model.Matrix) error { resultWithoutSharding = res return nil }) var resultWithSharding model.Matrix - rangeQuery(t, ctx, qfe.Endpoint("http"), qryFunc, timestamp.FromTime(startTime), timestamp.FromTime(endTime), 30, queryOpts, func(res model.Matrix) error { + rangeQuery(t, ctx, qfe.Endpoint("http"), qryFunc, startTime, endTime, 30, queryOpts, func(res model.Matrix) error { resultWithSharding = res return nil }) @@ -646,12 +670,12 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - querier := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + querier := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(querier)) inMemoryCacheConfig := queryfrontend.CacheProviderConfig{ @@ -676,17 +700,32 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "replica", Value: "0"}, + }, + Samples: []prompb.Sample{ + {Value: float64(1), Timestamp: timestamp.FromTime(predefTimestamp)}, + }}}, + i.Endpoint("remote-write"))) + testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, querier.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, querier.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -696,8 +735,8 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -717,18 +756,17 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) { )) // make sure that we don't break cortex cache code. - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(0), "cortex_cache_hits_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_misses_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_added_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_misses_total")) // Query interval is 2 hours, which is greater than min-slit-interval, query will be broken down into 4 parts - // + rest (of interval) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(5), "thanos_frontend_split_queries_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "thanos_frontend_split_queries_total")) testutil.Ok(t, querier.WaitSumMetricsWithOptions( - e2emon.Equals(5), + e2emon.Equals(4), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range")), )) @@ -741,11 +779,12 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "") - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - now := model.Now() + predefTimestamp := model.TimeFromUnixNano(time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC).UnixNano()) ctx := context.Background() + timeSeries := []labels.Labels{ {{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "1"}, {Name: "handler", Value: "/"}}, {{Name: labels.MetricName, Value: "http_requests_total"}, {Name: "pod", Value: "1"}, {Name: "handler", Value: "/metrics"}}, @@ -768,14 +807,29 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) { }) } - startTime := now.Time().Add(-1 * time.Hour) - endTime := now.Time().Add(1 * time.Hour) - _, err = e2eutil.CreateBlock(ctx, prom.Dir(), timeSeries, 20, timestamp.FromTime(startTime), timestamp.FromTime(endTime), nil, 0, metadata.NoneFunc) - testutil.Ok(t, err) - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + samplespb := make([]prompb.TimeSeries, 0, len(timeSeries)) + for _, labels := range timeSeries { + labelspb := make([]prompb.Label, 0, len(labels)) + for _, label := range labels { + labelspb = append(labelspb, prompb.Label{ + Name: string(label.Name), + Value: string(label.Value), + }) + } + samplespb = append(samplespb, prompb.TimeSeries{ + Labels: labelspb, + Samples: []prompb.Sample{ + { + Value: float64(1), + Timestamp: timestamp.FromTime(predefTimestamp.Time()), + }, + }, + }) + } + + testutil.Ok(t, remoteWrite(ctx, samplespb, i.Endpoint("remote-write"))) - stores := []string{sidecar.InternalEndpoint("grpc")} - q1 := e2ethanos.NewQuerierBuilder(e, "q1", stores...).Init() + q1 := e2ethanos.NewQuerierBuilder(e, "q1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q1)) inMemoryCacheConfig := queryfrontend.CacheProviderConfig{ @@ -848,10 +902,10 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { resultWithoutSharding := instantQuery(t, ctx, q1.Endpoint("http"), tc.qryFunc, func() time.Time { - return now.Time() + return predefTimestamp.Time() }, queryOpts, tc.expectedSeries) resultWithSharding := instantQuery(t, ctx, qfe.Endpoint("http"), tc.qryFunc, func() time.Time { - return now.Time() + return predefTimestamp.Time() }, queryOpts, tc.expectedSeries) testutil.Equals(t, resultWithoutSharding, resultWithSharding) })