From d5651b06b35c37ce162bb013376bf847a19e41f8 Mon Sep 17 00:00:00 2001 From: pawarpranav83 Date: Sun, 24 Dec 2023 05:55:27 +0530 Subject: [PATCH] Updated query_frontend_test.go and query_test.go Updated query_frontend_test: -- Added prompb import -- Using a predefined timestamp -- tempfn used in instant query function calls, need to return time -- removed prom instance, and using receiver instance instead, so that we can implement the tests using predefined timestamps -- Using a single sample which is remote written into the receiver -- The RemoteWrite function implemented in query_test, and performs a remote write of a Timeseries into teh receiver -- Have changed various query parameters and structure of Model.Metric to match the specified sample -- Changed testing parameters to match the changed configuration Updated query_test.go: -- Added "bytes" import -- Implemented teh RemoteWrite function that performs a remote write of a Timeseries into teh receiver Signed-off-by: pawarpranav83 --- test/e2e/query_frontend_test.go | 61 +++++++++++++++++++++++++-------- test/e2e/query_test.go | 33 ++++++++++++++++++ 2 files changed, 80 insertions(+), 14 deletions(-) diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index f93219cae6f..01605c06ae8 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/prompb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" @@ -41,14 +42,36 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + now := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) + tempfn := func() time.Time { return now } - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) + tsMillis := timestamp.FromTime(now) + + var samplesrw []prompb.TimeSeries + + metric := []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "receive", Value: "receive-ingestor-rw"}, + {Name: "replica", Value: "0"}, + {Name: "tenant_id", Value: "default-tenant"}, + } + + samplesrw = append(samplesrw, prompb.TimeSeries{ + Labels: metric, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: tsMillis}, + }, + }) + inMemoryCacheConfig := queryfrontend.CacheProviderConfig{ Type: queryfrontend.INMEMORY, Config: queryfrontend.InMemoryResponseCacheConfig{ @@ -64,17 +87,21 @@ func TestQueryFrontend(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + testutil.Ok(t, RemoteWrite(ctx, samplesrw, i.Endpoint("remote-write"))) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, tempfn, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -86,13 +113,15 @@ func TestQueryFrontend(t *testing.T) { queryTimes := vals[0] t.Run("query frontend works for instant query", func(t *testing.T) { - queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, tempfn, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -181,7 +210,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_cache_hits_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_gets_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) @@ -194,7 +223,7 @@ func TestQueryFrontend(t *testing.T) { // One more request is needed in order to satisfy the req range. testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(2), + e2emon.Equals(1), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -225,13 +254,13 @@ func TestQueryFrontend(t *testing.T) { []string{"thanos_query_frontend_queries_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range"))), ) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_hits_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_gets_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_entries")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_gets_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_misses_total")) // Query is 25h so it will be split to 2 requests. testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( @@ -240,7 +269,7 @@ func TestQueryFrontend(t *testing.T) { ) testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(4), + e2emon.Equals(3), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -345,6 +374,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) @@ -381,6 +412,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 6584c7b8426..20fbbaa9a6d 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -4,6 +4,7 @@ package e2e_test import ( + "bytes" "context" "fmt" "io" @@ -1720,6 +1721,38 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q func() string, return retExplanation } +func RemoteWrite(ctx context.Context, timeseries []prompb.TimeSeries, addr string) error { + // Create write request + data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries}) + if err != nil { + return err + } + + // Create HTTP request + compressed := snappy.Encode(nil, data) + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/receive", addr), bytes.NewReader(compressed)) + if err != nil { + return err + } + + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + // Execute HTTP request + res, err := promclient.NewDefaultClient().HTTPClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer runutil.ExhaustCloseWithErrCapture(&err, res.Body, "%s: close body", req.URL.String()) + + if res.StatusCode/100 != 2 { + return errors.Errorf("request failed with code %s", res.Status) + } + + return nil +} + func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) error) { t.Helper()