Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add chunksize tests to acceptance tests #7018

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 93 additions & 16 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@ package store
import (
"context"
"fmt"
"math"
"net/url"
"os"
"path/filepath"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"golang.org/x/exp/slices"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -62,8 +67,10 @@ type seriesCallCase struct {
expectErr error
}

type startStoreFn func(t *testing.T, extLset labels.Labels, append func(app storage.Appender)) storepb.StoreServer

// testStoreAPIsAcceptance tests StoreAPI from closed box perspective.
func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset labels.Labels, append func(app storage.Appender)) storepb.StoreServer) {
func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
t.Helper()

now := time.Now()
Expand Down Expand Up @@ -232,7 +239,6 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
appendFn: func(app storage.Appender) {
_, err := app.Append(0, labels.FromStrings("foo", "bar", "region", "somewhere"), 0, 0)
testutil.Ok(t, err)

testutil.Ok(t, app.Commit())
},
seriesCalls: []seriesCallCase{
Expand Down Expand Up @@ -743,9 +749,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
}
testutil.Ok(t, err)

testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) int {
testutil.Assert(t, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) int {
return labels.Compare(x.PromLabels(), y.PromLabels())
}))
}), "Unsorted Series response returned")

receivedLabels := make([]labels.Labels, 0)
for _, s := range srv.SeriesSet {
Expand All @@ -759,12 +765,70 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
}
}

// Regression test for https://github.com/thanos-io/thanos/issues/396.
// Note: Only TSDB and Prometheus Stores do this.
func testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, startStore startStoreFn) {
t.Run("should split into chunks of max size 120", func(t *testing.T) {
baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000
offset := int64(2*math.MaxUint16 + 5)

extLset := labels.FromStrings("region", "eu-west")
appendFn := func(app storage.Appender) {

var (
ref storage.SeriesRef
err error
)
for i := int64(0); i < offset; i++ {
ref, err = app.Append(ref, labels.FromStrings("a", "b"), baseT+i, 1)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())

}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := startStore(t, extLset, appendFn)
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, client.Series(&storepb.SeriesRequest{
MinTime: baseT,
MaxTime: baseT + offset,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
{Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"},
},
}, srv))

testutil.Equals(t, 1, len(srv.SeriesSet))

firstSeries := srv.SeriesSet[0]

testutil.Equals(t, []labelpb.ZLabel{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, firstSeries.Labels)

testutil.Equals(t, 1093, len(firstSeries.Chunks))
for i := 0; i < len(firstSeries.Chunks)-1; i++ {
chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[i].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 120, chunk.NumSamples())
}

chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[len(firstSeries.Chunks)-1].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 35, chunk.NumSamples())
})
}

func TestBucketStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })
ctx := context.Background()

for _, lazyExpandedPosting := range []bool{false, true} {
testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startStore := func(lazyExpandedPostings bool) func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
return func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
Expand Down Expand Up @@ -835,22 +899,28 @@ func TestBucketStore_Acceptance(t *testing.T) {
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
WithLazyExpandedPostings(lazyExpandedPosting),
WithLazyExpandedPostings(lazyExpandedPostings),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
}
}

for _, lazyExpandedPostings := range []bool{false, true} {
t.Run(fmt.Sprintf("lazyExpandedPostings:%t", lazyExpandedPostings), func(t *testing.T) {
testStoreAPIsAcceptance(t, startStore(lazyExpandedPostings))
})
}
}

func TestPrometheusStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
p, err := e2eutil.NewPrometheus()
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, p.Stop()) })
Expand All @@ -870,21 +940,28 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
func() string { return version })
testutil.Ok(tt, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
promStore.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}

return promStore
})
}

testStoreAPIsAcceptance(t, startStore)
testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore)
}

func TestTSDBStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
db, err := e2eutil.NewTSDB()
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

tsdbStore := NewTSDBStore(nil, db, component.Rule, extLset)
return NewTSDBStore(nil, db, component.Rule, extLset)
}

appendFn(db.Appender(context.Background()))
return tsdbStore
})
testStoreAPIsAcceptance(t, startStore)
testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore)
}
79 changes: 0 additions & 79 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/efficientgo/core/testutil"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -520,80 +518,3 @@ func TestPrometheusStore_Info(t *testing.T) {
testutil.Equals(t, int64(123), resp.MinTime)
testutil.Equals(t, int64(456), resp.MaxTime)
}

func testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, appender storage.Appender, newStore func() storepb.StoreServer) {
baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000

offset := int64(2*math.MaxUint16 + 5)
for i := int64(0); i < offset; i++ {
_, err := appender.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+i, 1)
testutil.Ok(t, err)
}

testutil.Ok(t, appender.Commit())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := newStore()
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, client.Series(&storepb.SeriesRequest{
MinTime: baseT,
MaxTime: baseT + offset,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
{Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"},
},
}, srv))

testutil.Equals(t, 1, len(srv.SeriesSet))

firstSeries := srv.SeriesSet[0]

testutil.Equals(t, []labelpb.ZLabel{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, firstSeries.Labels)

testutil.Equals(t, 1093, len(firstSeries.Chunks))

chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[0].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 120, chunk.NumSamples())

chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[1].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 120, chunk.NumSamples())

chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[len(firstSeries.Chunks)-1].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 35, chunk.NumSamples())
}

// Regression test for https://github.com/thanos-io/thanos/issues/396.
func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

p, err := e2eutil.NewPrometheus()
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t, p.Appender(), func() storepb.StoreServer {
testutil.Ok(t, p.Start(context.Background(), log.NewNopLogger()))

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
proxy.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}

return proxy
})
}
14 changes: 0 additions & 14 deletions pkg/store/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,6 @@ func TestTSDBStore_Series(t *testing.T) {
}
}

// Regression test for https://github.com/thanos-io/thanos/issues/1038.
func TestTSDBStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

db, err := e2eutil.NewTSDB()
defer func() { testutil.Ok(t, db.Close()) }()
testutil.Ok(t, err)

testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t, db.Appender(context.Background()), func() storepb.StoreServer {
return NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"))

})
}

type delegatorServer struct {
*storetestutil.SeriesServer

Expand Down
Loading