diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index f93219cae6..5fda32c0f7 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/prompb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" @@ -41,12 +42,13 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + // Predefined Timestamp + predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) inMemoryCacheConfig := queryfrontend.CacheProviderConfig{ @@ -64,17 +66,34 @@ func TestQueryFrontend(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + // Writing a custom Timeseries into the receiver + testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "replica", Value: "0"}, + }, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + }}}, + i.Endpoint("remote-write"), + )) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -86,13 +105,15 @@ func TestQueryFrontend(t *testing.T) { queryTimes := vals[0] t.Run("query frontend works for instant query", func(t *testing.T) { - queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -115,8 +136,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -159,8 +180,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -181,7 +202,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_cache_hits_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_gets_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) @@ -192,9 +213,8 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "query_range"))), ) - // One more request is needed in order to satisfy the req range. testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(2), + e2emon.Equals(1), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -206,8 +226,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(24*time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(24*time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -225,13 +245,13 @@ func TestQueryFrontend(t *testing.T) { []string{"thanos_query_frontend_queries_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range"))), ) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_hits_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_gets_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_entries")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_gets_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_misses_total")) // Query is 25h so it will be split to 2 requests. testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( @@ -240,7 +260,7 @@ func TestQueryFrontend(t *testing.T) { ) testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(4), + e2emon.Equals(3), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -248,7 +268,7 @@ func TestQueryFrontend(t *testing.T) { t.Run("query frontend splitting works for labels names API", func(t *testing.T) { // LabelNames and LabelValues API should still work via query frontend. - labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) > 0 }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -267,7 +287,7 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))), ) - labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) > 0 }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -288,7 +308,7 @@ func TestQueryFrontend(t *testing.T) { }) t.Run("query frontend splitting works for labels values API", func(t *testing.T) { - labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -307,7 +327,7 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))), ) - labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -333,8 +353,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []map[string]string) bool { if len(res) != 1 { return false @@ -345,6 +365,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) @@ -369,8 +391,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, - timestamp.FromTime(now.Add(-24*time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []map[string]string) bool { if len(res) != 1 { return false @@ -381,6 +403,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) @@ -409,12 +433,13 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + // Predefined timestamp + predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) memcached := e2ethanos.NewMemcached(e, "1") @@ -443,19 +468,34 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "replica", Value: "0"}, + }, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + }}}, + i.Endpoint("remote-write"))) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_memcache_client_servers")) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -469,8 +509,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -501,8 +541,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 6584c7b842..5b9a120b90 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -4,6 +4,7 @@ package e2e_test import ( + "bytes" "context" "fmt" "io" @@ -1720,6 +1721,39 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q func() string, return retExplanation } +// Performs a remote write at the receiver external endpoint. +func remoteWrite(ctx context.Context, timeseries []prompb.TimeSeries, addr string) error { + // Create write request + data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries}) + if err != nil { + return err + } + + // Create HTTP request + compressed := snappy.Encode(nil, data) + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/receive", addr), bytes.NewReader(compressed)) + if err != nil { + return err + } + + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + // Execute HTTP request + res, err := promclient.NewDefaultClient().HTTPClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer runutil.ExhaustCloseWithErrCapture(&err, res.Body, "%s: close body", req.URL.String()) + + if res.StatusCode/100 != 2 { + return errors.Errorf("request failed with code %s", res.Status) + } + + return nil +} + func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) error) { t.Helper()