Skip to content

Commit

Permalink
Updated query_frontend_test.go and query_test.go
Browse files Browse the repository at this point in the history
Updated query_frontend_test:
    -- Added prompb import
    -- Using a predefined timestamp
    -- tempfn used in instant query function calls, need to return time
    -- removed prom instance, and using receiver instance instead, so that we can implement the tests using predefined timestamps
    -- Using a single sample which is remote written into the receiver
    -- The RemoteWrite function implemented in query_test, and performs a remote write of a Timeseries into teh receiver
    -- Have changed various query parameters and structure of Model.Metric to match the specified sample
    -- Changed testing parameters to match the changed configuration

Updated query_test.go:
    -- Added "bytes" import
    -- Implemented teh RemoteWrite function that performs a remote write of a Timeseries into teh receiver

Signed-off-by: pawarpranav83 <[email protected]>
  • Loading branch information
pawarpranav83 committed Dec 24, 2023
1 parent bd7accb commit d5651b0
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 14 deletions.
61 changes: 47 additions & 14 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cacheutil"
Expand All @@ -41,14 +42,36 @@ func TestQueryFrontend(t *testing.T) {
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

now := time.Now()
now := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC)
tempfn := func() time.Time { return now }

prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))
i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init()
q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

tsMillis := timestamp.FromTime(now)

var samplesrw []prompb.TimeSeries

metric := []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "myself"},
{Name: "prometheus", Value: "test"},
{Name: "receive", Value: "receive-ingestor-rw"},
{Name: "replica", Value: "0"},
{Name: "tenant_id", Value: "default-tenant"},
}

samplesrw = append(samplesrw, prompb.TimeSeries{
Labels: metric,
Samples: []prompb.Sample{
{Value: float64(0), Timestamp: tsMillis},
},
})

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Type: queryfrontend.INMEMORY,
Config: queryfrontend.InMemoryResponseCacheConfig{
Expand All @@ -64,17 +87,21 @@ func TestQueryFrontend(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, RemoteWrite(ctx, samplesrw, i.Endpoint("remote-write")))

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, tempfn, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"replica": "0",
"tenant_id": "default-tenant",
},
})

Expand All @@ -86,13 +113,15 @@ func TestQueryFrontend(t *testing.T) {
queryTimes := vals[0]

t.Run("query frontend works for instant query", func(t *testing.T) {
queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, tempfn, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"replica": "0",
"tenant_id": "default-tenant",
},
})

Expand Down Expand Up @@ -181,7 +210,7 @@ func TestQueryFrontend(t *testing.T) {
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_cache_hits_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_gets_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total"))
Expand All @@ -194,7 +223,7 @@ func TestQueryFrontend(t *testing.T) {

// One more request is needed in order to satisfy the req range.
testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(2),
e2emon.Equals(1),
[]string{"http_requests_total"},
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))),
)
Expand Down Expand Up @@ -225,13 +254,13 @@ func TestQueryFrontend(t *testing.T) {
[]string{"thanos_query_frontend_queries_total"},
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range"))),
)
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_hits_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_gets_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_entries"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_gets_total"))
testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_misses_total"))

// Query is 25h so it will be split to 2 requests.
testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions(
Expand All @@ -240,7 +269,7 @@ func TestQueryFrontend(t *testing.T) {
)

testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(4),
e2emon.Equals(3),
[]string{"http_requests_total"},
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))),
)
Expand Down Expand Up @@ -345,6 +374,8 @@ func TestQueryFrontend(t *testing.T) {
"instance": "localhost:9090",
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"tenant_id": "default-tenant",
})
},
)
Expand Down Expand Up @@ -381,6 +412,8 @@ func TestQueryFrontend(t *testing.T) {
"instance": "localhost:9090",
"job": "myself",
"prometheus": "test",
"receive": "receive-ingestor-rw",
"tenant_id": "default-tenant",
})
},
)
Expand Down
33 changes: 33 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package e2e_test

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -1720,6 +1721,38 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q func() string,
return retExplanation
}

func RemoteWrite(ctx context.Context, timeseries []prompb.TimeSeries, addr string) error {
// Create write request
data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries})
if err != nil {
return err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/receive", addr), bytes.NewReader(compressed))
if err != nil {
return err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

// Execute HTTP request
res, err := promclient.NewDefaultClient().HTTPClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer runutil.ExhaustCloseWithErrCapture(&err, res.Body, "%s: close body", req.URL.String())

if res.StatusCode/100 != 2 {
return errors.Errorf("request failed with code %s", res.Status)
}

return nil
}

func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) error) {
t.Helper()

Expand Down

0 comments on commit d5651b0

Please sign in to comment.