From 75525a9b3b52755eda2b14927ef5ff1629a5cb87 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 11 Dec 2024 16:43:13 -0800 Subject: [PATCH] Expanded Postings Cache can cache results without the nearly created series under high load. (#6417) * [ExpandedPostingsCache] Quering and adding series concurrently can cache wrong results Signed-off-by: alanprot * Expiring the series after commit call Signed-off-by: alanprot * Adding option to run tests with no-race check Signed-off-by: alanprot --------- Signed-off-by: alanprot --- .github/workflows/test-build-deploy.yml | 18 +++++ Makefile | 3 + pkg/ingester/ingester.go | 21 ++++++ pkg/ingester/ingester_no_race_test.go | 90 +++++++++++++++++++++++++ 4 files changed, 132 insertions(+) create mode 100644 pkg/ingester/ingester_no_race_test.go diff --git a/.github/workflows/test-build-deploy.yml b/.github/workflows/test-build-deploy.yml index a5627859d3..92fd95b69e 100644 --- a/.github/workflows/test-build-deploy.yml +++ b/.github/workflows/test-build-deploy.yml @@ -61,6 +61,24 @@ jobs: ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex - name: Run Tests run: make BUILD_IN_CONTAINER=false test + test-no-race: + runs-on: ubuntu-20.04 + container: + image: quay.io/cortexproject/build-image:master-0ddced051 + steps: + - name: Checkout Repo + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Setup Git safe.directory + run: | + echo "this step is needed because when running in container, actions/checkout does not set safe.directory effectively." + echo "See https://github.com/actions/runner/issues/2033. We should use --system instead of --global" + git config --system --add safe.directory $GITHUB_WORKSPACE + - name: Sym Link Expected Path to Workspace + run: | + mkdir -p /go/src/github.com/cortexproject/cortex + ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex + - name: Run Tests + run: make BUILD_IN_CONTAINER=false test-no-race security: name: CodeQL diff --git a/Makefile b/Makefile index f19e7e0340..1a145e3f72 100644 --- a/Makefile +++ b/Makefile @@ -218,6 +218,9 @@ lint: test: go test -tags netgo -timeout 30m -race -count 1 ./... +test-no-race: + go test -tags netgo -timeout 30m -count 1 ./... + cover: $(eval COVERDIR := $(shell mktemp -d coverage.XXXXXXXXXX)) $(eval COVERFILE := $(shell mktemp $(COVERDIR)/unit.XXXXXXXXXX)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d22e1f6dc2..27d094c441 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1204,6 +1204,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Walk the samples, appending them to the users database app := db.Appender(ctx).(extendedAppender) + var newSeries []labels.Labels + for _, ts := range req.Timeseries { // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). @@ -1233,6 +1235,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels) // Retain the reference in case there are multiple samples for the series. if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil { + // Keep track of what series needs to be expired on the postings cache + if db.postingCache != nil { + newSeries = append(newSeries, copiedLabels) + } succeededSamplesCount++ continue } @@ -1274,6 +1280,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Copy the label set because both TSDB and the active series tracker may retain it. copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels) if ref, err = app.AppendHistogram(0, copiedLabels, hp.TimestampMs, h, fh); err == nil { + // Keep track of what series needs to be expired on the postings cache + if db.postingCache != nil { + newSeries = append(newSeries, copiedLabels) + } succeededHistogramsCount++ continue } @@ -1342,6 +1352,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if err := app.Commit(); err != nil { return nil, wrapWithUser(err, userID) } + + // This is a workaround of https://github.com/prometheus/prometheus/pull/15579 + // Calling expire here may result in the series names being expired multiple times, + // as there may be multiple Push operations concurrently for the same new timeseries. + // TODO: alanprot remove this when/if the PR is merged + if db.postingCache != nil { + for _, s := range newSeries { + db.postingCache.ExpireSeries(s) + } + } + i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds()) // If only invalid samples or histograms are pushed, don't change "last update", as TSDB was not modified. diff --git a/pkg/ingester/ingester_no_race_test.go b/pkg/ingester/ingester_no_race_test.go new file mode 100644 index 0000000000..656a7ab28c --- /dev/null +++ b/pkg/ingester/ingester_no_race_test.go @@ -0,0 +1,90 @@ +//go:build !race + +package ingester + +import ( + "context" + "fmt" + "math" + "strconv" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" +) + +// Running this test without race check as there is a known prometheus race condition. +// See https://github.com/prometheus/prometheus/pull/15141 and https://github.com/prometheus/prometheus/pull/15316 +func TestExpandedCachePostings_Race(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test") + + wg := sync.WaitGroup{} + labelNames := 100 + seriesPerLabelName := 200 + + for j := 0; j < labelNames; j++ { + metricName := fmt.Sprintf("test_metric_%d", j) + wg.Add(seriesPerLabelName * 2) + for k := 0; k < seriesPerLabelName; k++ { + go func() { + defer wg.Done() + _, err := i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, metricName, "k", strconv.Itoa(k))}, + []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + }() + + go func() { + defer wg.Done() + err := i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, + }, &mockQueryStreamServer{ctx: ctx}) + require.NoError(t, err) + }() + } + + wg.Wait() + + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, + }, s) + require.NoError(t, err) + + set, err := seriesSetFromResponseStream(s) + require.NoError(t, err) + res, err := client.MatrixFromSeriesSet(set) + require.NoError(t, err) + require.Equal(t, seriesPerLabelName, res.Len()) + } +}