Skip to content

Commit

Permalink
store: add chunksize tests to acceptance tests
Browse files Browse the repository at this point in the history
* add chunk size tests to acceptance tests
* refactor acceptance tests slightly

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Dec 30, 2023
1 parent d388b74 commit 15c5977
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 104 deletions.
105 changes: 91 additions & 14 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@ package store
import (
"context"
"fmt"
"math"
"net/url"
"os"
"path/filepath"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"golang.org/x/exp/slices"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -62,8 +68,10 @@ type seriesCallCase struct {
expectErr error
}

type startStoreFn func(t *testing.T, extLset labels.Labels, append func(app storage.Appender)) storepb.StoreServer

// testStoreAPIsAcceptance tests StoreAPI from closed box perspective.
func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset labels.Labels, append func(app storage.Appender)) storepb.StoreServer) {
func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
t.Helper()

now := time.Now()
Expand Down Expand Up @@ -232,7 +240,6 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
appendFn: func(app storage.Appender) {
_, err := app.Append(0, labels.FromStrings("foo", "bar", "region", "somewhere"), 0, 0)
testutil.Ok(t, err)

testutil.Ok(t, app.Commit())
},
seriesCalls: []seriesCallCase{
Expand Down Expand Up @@ -759,12 +766,69 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
}
}

// Regression test for https://github.com/thanos-io/thanos/issues/396.
// Note: Only TSDB and Prometheus Stores do this
func testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, startStore startStoreFn) {
t.Run("should split into chunks of max size 120", func(t *testing.T) {
baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000
offset := int64(2*math.MaxUint16 + 5)

extLset := labels.FromStrings("region", "eu-west")
appendFn := func(app storage.Appender) {

var (
ref storage.SeriesRef
err error
)
for i := int64(0); i < offset; i++ {
ref, err = app.Append(ref, labels.FromStrings("a", "b"), baseT+i, 1)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())

}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := startStore(t, extLset, appendFn)
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, client.Series(&storepb.SeriesRequest{
MinTime: baseT,
MaxTime: baseT + offset,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
{Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"},
},
}, srv))

testutil.Equals(t, 1, len(srv.SeriesSet))

firstSeries := srv.SeriesSet[0]

testutil.Equals(t, []labelpb.ZLabel{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, firstSeries.Labels)

for i := 0; i < len(firstSeries.Chunks)-1; i++ {
chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[i].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 120, chunk.NumSamples())
}

chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[len(firstSeries.Chunks)-1].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 35, chunk.NumSamples())
})
}

func TestBucketStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })
ctx := context.Background()

for _, lazyExpandedPosting := range []bool{false, true} {
testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startStore := func(lazyExpandedPostings bool) func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
return func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
Expand Down Expand Up @@ -835,22 +899,28 @@ func TestBucketStore_Acceptance(t *testing.T) {
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
WithLazyExpandedPostings(lazyExpandedPosting),
WithLazyExpandedPostings(lazyExpandedPostings),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
}
}

for _, lazyExpandedPostings := range []bool{false, true} {
t.Run(fmt.Sprintf("lazyExpandedPostings:%t", lazyExpandedPostings), func(t *testing.T) {
testStoreAPIsAcceptance(t, startStore(lazyExpandedPostings))
})
}
}

func TestPrometheusStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
p, err := e2eutil.NewPrometheus()
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, p.Stop()) })
Expand All @@ -870,21 +940,28 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
func() string { return version })
testutil.Ok(tt, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
promStore.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}

return promStore
})
}

testStoreAPIsAcceptance(t, startStore)
testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore)
}

func TestTSDBStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
db, err := e2eutil.NewTSDB()
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

tsdbStore := NewTSDBStore(nil, db, component.Rule, extLset)
return NewTSDBStore(nil, db, component.Rule, extLset)
}

appendFn(db.Appender(context.Background()))
return tsdbStore
})
testStoreAPIsAcceptance(t, startStore)
testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore)
}
79 changes: 0 additions & 79 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/efficientgo/core/testutil"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -520,80 +518,3 @@ func TestPrometheusStore_Info(t *testing.T) {
testutil.Equals(t, int64(123), resp.MinTime)
testutil.Equals(t, int64(456), resp.MaxTime)
}

func testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, appender storage.Appender, newStore func() storepb.StoreServer) {
baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000

offset := int64(2*math.MaxUint16 + 5)
for i := int64(0); i < offset; i++ {
_, err := appender.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+i, 1)
testutil.Ok(t, err)
}

testutil.Ok(t, appender.Commit())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := newStore()
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, client.Series(&storepb.SeriesRequest{
MinTime: baseT,
MaxTime: baseT + offset,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
{Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"},
},
}, srv))

testutil.Equals(t, 1, len(srv.SeriesSet))

firstSeries := srv.SeriesSet[0]

testutil.Equals(t, []labelpb.ZLabel{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, firstSeries.Labels)

testutil.Equals(t, 1093, len(firstSeries.Chunks))

chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[0].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 120, chunk.NumSamples())

chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[1].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 120, chunk.NumSamples())

chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[len(firstSeries.Chunks)-1].Raw.Data)
testutil.Ok(t, err)
testutil.Equals(t, 35, chunk.NumSamples())
}

// Regression test for https://github.com/thanos-io/thanos/issues/396.
func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

p, err := e2eutil.NewPrometheus()
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t, p.Appender(), func() storepb.StoreServer {
testutil.Ok(t, p.Start(context.Background(), log.NewNopLogger()))

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
proxy.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}

return proxy
})
}
14 changes: 14 additions & 0 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -108,6 +109,19 @@ func (d *dedupResponseHeap) Next() bool {
}
}

func debugChunk(bs []byte) {
ch, err := chunkenc.FromData(chunkenc.EncXOR, bs)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ch.NumSamples())
it := ch.Iterator(nil)
for it.Next() != chunkenc.ValNone {
fmt.Println(it.At())
}
}
}

func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb.SeriesResponse {
chunkDedupMap := map[uint64]*storepb.AggrChunk{}

Expand Down
11 changes: 0 additions & 11 deletions pkg/store/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,7 @@ func TestTSDBStore_Series(t *testing.T) {
}
}

// Regression test for https://github.com/thanos-io/thanos/issues/1038.
func TestTSDBStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

db, err := e2eutil.NewTSDB()
defer func() { testutil.Ok(t, db.Close()) }()
testutil.Ok(t, err)

testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t, db.Appender(context.Background()), func() storepb.StoreServer {
return NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"))

})
}

type delegatorServer struct {
Expand Down

0 comments on commit 15c5977

Please sign in to comment.