Skip to content

Commit

Permalink
Expanded Postings Cache can cache results without the nearly created …
Browse files Browse the repository at this point in the history
…series under high load. (#6417)

* [ExpandedPostingsCache] Quering and adding series concurrently can cache wrong results

Signed-off-by: alanprot <[email protected]>

* Expiring the series after commit call

Signed-off-by: alanprot <[email protected]>

* Adding option to run tests with no-race check

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Dec 12, 2024
1 parent 95236cf commit 75525a9
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 0 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ jobs:
ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex
- name: Run Tests
run: make BUILD_IN_CONTAINER=false test
test-no-race:
runs-on: ubuntu-20.04
container:
image: quay.io/cortexproject/build-image:master-0ddced051
steps:
- name: Checkout Repo
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Setup Git safe.directory
run: |
echo "this step is needed because when running in container, actions/checkout does not set safe.directory effectively."
echo "See https://github.com/actions/runner/issues/2033. We should use --system instead of --global"
git config --system --add safe.directory $GITHUB_WORKSPACE
- name: Sym Link Expected Path to Workspace
run: |
mkdir -p /go/src/github.com/cortexproject/cortex
ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex
- name: Run Tests
run: make BUILD_IN_CONTAINER=false test-no-race

security:
name: CodeQL
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ lint:
test:
go test -tags netgo -timeout 30m -race -count 1 ./...

test-no-race:
go test -tags netgo -timeout 30m -count 1 ./...

cover:
$(eval COVERDIR := $(shell mktemp -d coverage.XXXXXXXXXX))
$(eval COVERFILE := $(shell mktemp $(COVERDIR)/unit.XXXXXXXXXX))
Expand Down
21 changes: 21 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)
var newSeries []labels.Labels

for _, ts := range req.Timeseries {
// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
Expand Down Expand Up @@ -1233,6 +1235,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)
// Retain the reference in case there are multiple samples for the series.
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
// Keep track of what series needs to be expired on the postings cache
if db.postingCache != nil {
newSeries = append(newSeries, copiedLabels)
}
succeededSamplesCount++
continue
}
Expand Down Expand Up @@ -1274,6 +1280,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)
if ref, err = app.AppendHistogram(0, copiedLabels, hp.TimestampMs, h, fh); err == nil {
// Keep track of what series needs to be expired on the postings cache
if db.postingCache != nil {
newSeries = append(newSeries, copiedLabels)
}
succeededHistogramsCount++
continue
}
Expand Down Expand Up @@ -1342,6 +1352,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if err := app.Commit(); err != nil {
return nil, wrapWithUser(err, userID)
}

// This is a workaround of https://github.com/prometheus/prometheus/pull/15579
// Calling expire here may result in the series names being expired multiple times,
// as there may be multiple Push operations concurrently for the same new timeseries.
// TODO: alanprot remove this when/if the PR is merged
if db.postingCache != nil {
for _, s := range newSeries {
db.postingCache.ExpireSeries(s)
}
}

i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds())

// If only invalid samples or histograms are pushed, don't change "last update", as TSDB was not modified.
Expand Down
90 changes: 90 additions & 0 deletions pkg/ingester/ingester_no_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//go:build !race

package ingester

import (
"context"
"fmt"
"math"
"strconv"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
)

// Running this test without race check as there is a known prometheus race condition.
// See https://github.com/prometheus/prometheus/pull/15141 and https://github.com/prometheus/prometheus/pull/15316
func TestExpandedCachePostings_Race(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.LifecyclerConfig.JoinAfter = 0
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true

r := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test")

wg := sync.WaitGroup{}
labelNames := 100
seriesPerLabelName := 200

for j := 0; j < labelNames; j++ {
metricName := fmt.Sprintf("test_metric_%d", j)
wg.Add(seriesPerLabelName * 2)
for k := 0; k < seriesPerLabelName; k++ {
go func() {
defer wg.Done()
_, err := i.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(labels.MetricName, metricName, "k", strconv.Itoa(k))},
[]cortexpb.Sample{{Value: 1, TimestampMs: 9}}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()

go func() {
defer wg.Done()
err := i.QueryStream(&client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}},
}, &mockQueryStreamServer{ctx: ctx})
require.NoError(t, err)
}()
}

wg.Wait()

s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(&client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}},
}, s)
require.NoError(t, err)

set, err := seriesSetFromResponseStream(s)
require.NoError(t, err)
res, err := client.MatrixFromSeriesSet(set)
require.NoError(t, err)
require.Equal(t, seriesPerLabelName, res.Len())
}
}

0 comments on commit 75525a9

Please sign in to comment.