Skip to content

Commit

Permalink
expose more series size stats
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 4, 2024
1 parent dec2686 commit d56e0c7
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 43 deletions.
4 changes: 4 additions & 0 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ func processDownsampling(
}
if stats.SeriesMaxSize > 0 {
meta.Thanos.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
meta.Thanos.IndexStats.SeriesP90Size = stats.SeriesP90Size
meta.Thanos.IndexStats.SeriesP99Size = stats.SeriesP99Size
meta.Thanos.IndexStats.SeriesP999Size = stats.SeriesP999Size
meta.Thanos.IndexStats.SeriesP9999Size = stats.SeriesP9999Size
}
if err := meta.WriteToDir(logger, resdir); err != nil {
return errors.Wrap(err, "write meta")
Expand Down
38 changes: 38 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type storeConfig struct {
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedSeriesSizeStat string
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
Expand Down Expand Up @@ -164,6 +165,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("debug.estimated-max-series-size", "Estimated max series size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 64KB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxSeriesSize)).Uint64Var(&sc.estimatedMaxSeriesSize)

cmd.Flag("estimated-series-size-stat", "Statistic to use to estimate block series size. This is currently used for lazy expanded posting series size estimation. Available options are max, p90, p99, p999 and p9999. Default value is "+string(store.BlockSeriesSizeMax)).
Default(string(store.BlockSeriesSizeMax)).
EnumVar(&sc.estimatedSeriesSizeStat, string(store.BlockSeriesSizeMax), string(store.BlockSeriesSizeP99), string(store.BlockSeriesSizeP999), string(store.BlockSeriesSizeP9999))

cmd.Flag("debug.estimated-max-chunk-size", "Estimated max chunk size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxChunkSize)).Uint64Var(&sc.estimatedMaxChunkSize)

Expand Down Expand Up @@ -398,6 +403,8 @@ func runStore(
return errors.Wrap(err, "create chunk pool")
}

estimatedSeriesSizeStat := strings.ToLower(conf.estimatedSeriesSizeStat)

options := []store.BucketStoreOption{
store.WithLogger(logger),
store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger {
Expand All @@ -421,6 +428,37 @@ func runStore(
}
return conf.estimatedMaxSeriesSize
}),
store.WithBlockEstimatedSeriesSizeFunc(func(m metadata.Meta) uint64 {
switch estimatedSeriesSizeStat {
case string(store.BlockSeriesSizeMax):
if m.Thanos.IndexStats.SeriesMaxSize > 0 {
return uint64(m.Thanos.IndexStats.SeriesMaxSize)
}
case string(store.BlockSeriesSizeP90):
if m.Thanos.IndexStats.SeriesP90Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP90Size)
}
case string(store.BlockSeriesSizeP99):
if m.Thanos.IndexStats.SeriesP99Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP99Size)
}
case string(store.BlockSeriesSizeP999):
if m.Thanos.IndexStats.SeriesP999Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP999Size)
}
case string(store.BlockSeriesSizeP9999):
if m.Thanos.IndexStats.SeriesP9999Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP9999Size)
}
}

// Always fallback to series max size if none of other stats available.
if m.Thanos.IndexStats.SeriesMaxSize > 0 {
return uint64(m.Thanos.IndexStats.SeriesMaxSize)
}
// If series max size not available from the metadata, fallback to the configured default.
return conf.estimatedMaxSeriesSize
}),
store.WithBlockEstimatedMaxChunkFunc(func(m metadata.Meta) uint64 {
if m.Thanos.IndexStats.ChunkMaxSize > 0 &&
uint64(m.Thanos.IndexStats.ChunkMaxSize) < conf.estimatedMaxChunkSize {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ require (

require (
capnproto.org/go/capnp/v3 v3.0.0-alpha.30
github.com/DataDog/sketches-go v1.4.6
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU=
github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/sketches-go v1.4.6 h1:acd5fb+QdUzGrosfNLwrIhqyrbMORpvBy7mE+vHlT3I=
github.com/DataDog/sketches-go v1.4.6/go.mod h1:7Y8GN8Jf66DLyDhc94zuWA3uHEt/7ttt8jHOBWWrSOg=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3 h1:i84ZOPT35YCJROyuf97VP/VEdYhQce/8NTLOWq5tqJw=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3/go.mod h1:3+qm+VCJbVmQ9uscVz+8h1rRkJEy9ZNFGgpT1XB9mPg=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 h1:FhsH8qgWFkkPlPXBZ68uuT/FH/R+DLTtVPxjLEBs1v4=
Expand Down
74 changes: 68 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/DataDog/sketches-go/ddsketch"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -85,9 +86,13 @@ type HealthStats struct {
ChunkAvgSize int64
ChunkMaxSize int64

SeriesMinSize int64
SeriesAvgSize int64
SeriesMaxSize int64
SeriesMinSize int64
SeriesAvgSize int64
SeriesMaxSize int64
SeriesP9999Size int64
SeriesP999Size int64
SeriesP99Size int64
SeriesP90Size int64

SingleSampleSeries int64
SingleSampleChunks int64
Expand Down Expand Up @@ -209,6 +214,59 @@ func (n *minMaxSumInt64) Avg() int64 {
return n.sum / n.cnt
}

// sketch is a wrapper for DDSketch which allows to calculate quantile values with a relative accuracy.
type sketch struct {
cnt int64
s *ddsketch.DDSketch
}

func newSketch() *sketch {
// Always valid if accuracy is within (0, 1).
// Hardcode 0.1 relative accuracy as we only need int precision.
dd, _ := ddsketch.NewDefaultDDSketch(0.1)
return &sketch{s: dd}
}

func (s *sketch) Add(v int64) {
s.cnt++
s.s.Add(float64(v))

Check failure on line 232 in pkg/block/index.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

Error return value of `s.s.Add` is not checked (errcheck)
}

func (s *sketch) Avg() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
return int64(s.s.GetSum()) / s.cnt
}

func (s *sketch) Max() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
v, _ := s.s.GetMaxValue()
return int64(v)
}

func (s *sketch) Min() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
v, _ := s.s.GetMinValue()
return int64(v)
}

func (s *sketch) Quantile(quantile float64) int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if quantile is valid and sketch is not empty.
v, _ := s.s.GetValueAtQuantile(quantile)
return int64(v)
}

// GatherIndexHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
// helps to assess index health.
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
Expand Down Expand Up @@ -237,7 +295,7 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m
seriesChunks = newMinMaxSumInt64()
chunkDuration = newMinMaxSumInt64()
chunkSize = newMinMaxSumInt64()
seriesSize = newMinMaxSumInt64()
seriesSize = newSketch()
)

lnames, err := r.LabelNames(ctx)
Expand Down Expand Up @@ -391,9 +449,13 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m
stats.ChunkAvgSize = chunkSize.Avg()
stats.ChunkMinSize = chunkSize.min

stats.SeriesMaxSize = seriesSize.max
stats.SeriesMaxSize = seriesSize.Max()
stats.SeriesAvgSize = seriesSize.Avg()
stats.SeriesMinSize = seriesSize.min
stats.SeriesMinSize = seriesSize.Min()
stats.SeriesP90Size = seriesSize.Quantile(0.90)
stats.SeriesP99Size = seriesSize.Quantile(0.99)
stats.SeriesP999Size = seriesSize.Quantile(0.999)
stats.SeriesP9999Size = seriesSize.Quantile(0.9999)

stats.ChunkMaxDuration = time.Duration(chunkDuration.max) * time.Millisecond
stats.ChunkAvgDuration = time.Duration(chunkDuration.Avg()) * time.Millisecond
Expand Down
8 changes: 6 additions & 2 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ type Thanos struct {
}

type IndexStats struct {
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
SeriesP90Size int64 `json:"series_p90_size,omitempty"`
SeriesP99Size int64 `json:"series_p99_size,omitempty"`
SeriesP999Size int64 `json:"series_p999_size,omitempty"`
SeriesP9999Size int64 `json:"series_p99999_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
}

func (m *Thanos) ParseExtensions(v any) (any, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}
if stats.SeriesMaxSize > 0 {
thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
thanosMeta.IndexStats.SeriesP90Size = stats.SeriesP90Size
thanosMeta.IndexStats.SeriesP99Size = stats.SeriesP99Size
thanosMeta.IndexStats.SeriesP999Size = stats.SeriesP999Size
thanosMeta.IndexStats.SeriesP9999Size = stats.SeriesP9999Size
}
newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
// Only one chunk will be generated in that block, so we won't set chunk size.
testutil.Assert(t, meta.Thanos.IndexStats.SeriesMaxSize > 0, "compacted blocks have index stats series max size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP90Size > 0, "compacted blocks have index stats series P90 size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP99Size > 0, "compacted blocks have index stats series P99 size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP999Size > 0, "compacted blocks have index stats series P999 size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP9999Size > 0, "compacted blocks have index stats series P9999 size set")
}
})
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,14 @@ func TestBucketStore_Acceptance(t *testing.T) {
Labels: labels.NewBuilder(extLset).Set("replica", replica).Labels().Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
IndexStats: metadata.IndexStats{
SeriesMaxSize: stats.SeriesMaxSize,
SeriesP90Size: stats.SeriesP90Size,
SeriesP99Size: stats.SeriesP99Size,
SeriesP999Size: stats.SeriesP999Size,
SeriesP9999Size: stats.SeriesP9999Size,
ChunkMaxSize: stats.ChunkMaxSize,
},
}, nil)
testutil.Ok(tt, err)

Expand Down Expand Up @@ -1115,7 +1122,14 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
IndexStats: metadata.IndexStats{
SeriesMaxSize: stats.SeriesMaxSize,
SeriesP90Size: stats.SeriesP90Size,
SeriesP99Size: stats.SeriesP99Size,
SeriesP999Size: stats.SeriesP999Size,
SeriesP9999Size: stats.SeriesP9999Size,
ChunkMaxSize: stats.ChunkMaxSize,
},
}, nil)
testutil.Ok(tt, err)

Expand Down
33 changes: 33 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ const (
ChunksTouched
)

type BlockSeriesSizeStat string

const (
BlockSeriesSizeMax BlockSeriesSizeStat = "max"
BlockSeriesSizeP90 BlockSeriesSizeStat = "p90"
BlockSeriesSizeP99 BlockSeriesSizeStat = "p99"
BlockSeriesSizeP999 BlockSeriesSizeStat = "p999"
BlockSeriesSizeP9999 BlockSeriesSizeStat = "p9999"
)

const (
// MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
Expand Down Expand Up @@ -423,8 +433,14 @@ type BucketStore struct {

sortingStrategy sortingStrategy

// blockEstimatedMaxSeriesFunc is a function which estimates max series size of a block for series fetch purpose.
// We want to use max series size as metric to avoid series re-fetch.
blockEstimatedMaxSeriesFunc BlockEstimator
blockEstimatedMaxChunkFunc BlockEstimator
// blockEstimatedSeriesSizeFunc is a function which estimates series size of a block based on configured strategy.
// It can be either max, P90, P99, P999, etc series size of the block. It can be used for the purpose of lazy posting
// series size estimation when there is no strict requirement to use max series size of the block.
blockEstimatedSeriesSizeFunc BlockEstimator

indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc

Expand Down Expand Up @@ -539,6 +555,12 @@ func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) BucketStoreOption {
}
}

func WithBlockEstimatedSeriesSizeFunc(f BlockEstimator) BucketStoreOption {
return func(s *BucketStore) {
s.blockEstimatedSeriesSizeFunc = f
}
}

func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption {
return func(s *BucketStore) {
s.blockEstimatedMaxChunkFunc = f
Expand Down Expand Up @@ -819,6 +841,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
indexHeaderReader,
s.partitioner,
s.blockEstimatedMaxSeriesFunc,
s.blockEstimatedSeriesSizeFunc,
s.blockEstimatedMaxChunkFunc,
)
if err != nil {
Expand Down Expand Up @@ -2359,6 +2382,10 @@ type bucketBlock struct {

estimatedMaxChunkSize int
estimatedMaxSeriesSize int

// estimatedSeriesSize is an estimated series size used in lazy postings. It can be a different metric to
// estimatedMaxSeriesSize as when fetching series we need to use series max size to avoid re-fetch.
estimatedSeriesSize int
}

func newBucketBlock(
Expand All @@ -2372,12 +2399,17 @@ func newBucketBlock(
indexHeadReader indexheader.Reader,
p Partitioner,
maxSeriesSizeFunc BlockEstimator,
seriesSizeFunc BlockEstimator,
maxChunkSizeFunc BlockEstimator,
) (b *bucketBlock, err error) {
maxSeriesSize := EstimatedMaxSeriesSize
if maxSeriesSizeFunc != nil {
maxSeriesSize = int(maxSeriesSizeFunc(*meta))
}
seriesSize := maxSeriesSize
if seriesSizeFunc != nil {
seriesSize = int(seriesSizeFunc(*meta))
}
maxChunkSize := EstimatedMaxChunkSize
if maxChunkSizeFunc != nil {
maxChunkSize = int(maxChunkSizeFunc(*meta))
Expand All @@ -2398,6 +2430,7 @@ func newBucketBlock(
extLset: extLset,
relabelLabels: relabelLabels,
estimatedMaxSeriesSize: maxSeriesSize,
estimatedSeriesSize: seriesSize,
estimatedMaxChunkSize: maxChunkSize,
}

Expand Down
Loading

0 comments on commit d56e0c7

Please sign in to comment.