Skip to content

Commit

Permalink
Add v1 test porting to v2
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 4, 2024
1 parent a417e1f commit 04e1821
Show file tree
Hide file tree
Showing 10 changed files with 3,846 additions and 710 deletions.
37 changes: 37 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,43 @@ func GenerateSeriesV2WithSamples(
}
}

func GenerateSeriesWithSamplesV2(
st *writev2.SymbolsTable,
name string,
startTime time.Time,
scrapeInterval time.Duration,
startValue int,
numSamples int,
additionalLabels ...prompb.Label,
) (series cortexpbv2.TimeSeries) {
tsMillis := TimeToMilliseconds(startTime)
durMillis := scrapeInterval.Milliseconds()

lbls := labels.Labels{{Name: labels.MetricName, Value: name}}
st.Symbolize("__name__")
st.Symbolize(name)
for _, label := range additionalLabels {
st.Symbolize(label.Name)
st.Symbolize(label.Value)
lbls = append(lbls, labels.Label{Name: label.Name, Value: label.Value})
}

startTMillis := tsMillis
samples := make([]cortexpbv2.Sample, numSamples)
for i := 0; i < numSamples; i++ {
samples[i] = cortexpbv2.Sample{
Timestamp: startTMillis,
Value: float64(i + startValue),
}
startTMillis += durMillis
}

return cortexpbv2.TimeSeries{
LabelsRefs: cortexpbv2.GetLabelsRefsFromLabels(st.Symbols(), lbls),
Samples: samples,
}
}

func GenerateSeriesWithSamples(
name string,
startTime time.Time,
Expand Down
162 changes: 162 additions & 0 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/promqlsmith"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -52,6 +54,166 @@ func init() {
}
}

func TestRemoteWriteV1AndV2QueryResultFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul1 := e2edb.NewConsulWithName("consul1")
consul2 := e2edb.NewConsulWithName("consul2")
require.NoError(t, s.StartAndWaitReady(consul1, consul2))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.tsdb.retention-period": "2h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path1 := path.Join(s.SharedDir(), "cortex-1")
path2 := path.Join(s.SharedDir(), "cortex-2")

flags1 := mergeFlags(flags, map[string]string{
"-blocks-storage.filesystem.dir": path1,
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
})
flags2 := mergeFlags(flags, map[string]string{
"-blocks-storage.filesystem.dir": path2,
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
})
// Start Cortex replicas.
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, "")
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "")
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()
// Push some series to Cortex.
start := now.Add(-time.Minute * 60)
scrapeInterval := 30 * time.Second

numSeries := 10
numSamples := 120
serieses := make([]prompb.TimeSeries, numSeries)
seriesesV2 := make([]cortexpbv2.TimeSeries, numSeries)
lbls := make([]labels.Labels, numSeries)

// make v1 series
for i := 0; i < numSeries; i++ {
series := e2e.GenerateSeriesWithSamples("test_series", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
serieses[i] = series

builder := labels.NewBuilder(labels.EmptyLabels())
for _, lbl := range series.Labels {
builder.Set(lbl.Name, lbl.Value)
}
lbls[i] = builder.Labels()
}
// make v2 series
st := writev2.NewSymbolTable()
for i := 0; i < numSeries; i++ {
series := e2e.GenerateSeriesWithSamplesV2(&st, "test_series", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
seriesesV2[i] = series
}

res, err := c1.Push(serieses)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c2.PushV2(st.Symbols(), seriesesV2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

waitUntilReady(t, context.Background(), c1, c2, `{job="test"}`, start, now)

rnd := rand.New(rand.NewSource(now.Unix()))
opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithEnabledFunctions(enabledFunctions),
}
ps := promqlsmith.New(rnd, lbls, opts...)

type testCase struct {
query string
res1, res2 model.Value
err1, err2 error
}

queryStart := now.Add(-time.Minute * 50)
queryEnd := now.Add(-time.Minute * 10)
cases := make([]*testCase, 0, 500)
testRun := 500
var (
expr parser.Expr
query string
)
for i := 0; i < testRun; i++ {
for {
expr = ps.WalkRangeQuery()
query = expr.Pretty(0)
// timestamp is a known function that break with disable chunk trimming.
if isValidQuery(expr, 5) && !strings.Contains(query, "timestamp") {
break
}
}
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
cases = append(cases, &testCase{
query: query,
res1: res1,
res2: res2,
err1: err1,
err2: err2,
})
}

failures := 0
for i, tc := range cases {
qt := "range query"
if tc.err1 != nil || tc.err2 != nil {
if !cmp.Equal(tc.err1, tc.err2) {
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
failures++
}
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
}
}
if failures > 0 {
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
}
}

func TestDisableChunkTrimmingFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down
17 changes: 16 additions & 1 deletion pkg/cortexpbv2/compatv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,27 @@ package cortexpbv2

import (
"github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"

"github.com/cortexproject/cortex/pkg/cortexpb"
)

// ToWriteRequestV2 converts matched slices of Labels, Samples, and Histograms into a WriteRequest proto.
func ToWriteRequestV2(lbls []labels.Labels, symbols []string, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum) *WriteRequest {
func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum, additionalSymbols ...string) *WriteRequest {
st := writev2.NewSymbolTable()
for _, lbl := range lbls {
lbl.Range(func(l labels.Label) {
st.Symbolize(l.Name)
st.Symbolize(l.Value)
})
}

for _, s := range additionalSymbols {
st.Symbolize(s)
}

symbols := st.Symbols()

req := &WriteRequest{
Symbols: symbols,
Source: source,
Expand Down
17 changes: 13 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,9 +768,18 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin
d.ingesterAppendFailures.WithLabelValues(id, typeSamples, getErrorStatus(err)).Inc()
}

d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
metadataAppend := false
for _, ts := range timeseries {
if ts.Metadata.Type != cortexpbv2.METRIC_TYPE_UNSPECIFIED {
metadataAppend = true
break
}
}
if metadataAppend {
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
}
}
}

Expand Down Expand Up @@ -926,7 +935,7 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", d.ingestionRateLimiter.Limit(now, userID), totalSamples)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, validatedMetadatas)
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
Expand Down
Loading

0 comments on commit 04e1821

Please sign in to comment.