diff --git a/.busybox-versions b/.busybox-versions index 3b5b9ee44f..8cbed5844e 100644 --- a/.busybox-versions +++ b/.busybox-versions @@ -1,7 +1,7 @@ # Auto generated by busybox-updater.sh. DO NOT EDIT -amd64=650cefc7292b429ac55e3f9cc49a55a4cdd58698474310bb280c5e57adebf40a -arm64=63e8d2c5aa02d37f81bc9bfd80b92bbc59b31a0099a990649b887f861264b3b5 -arm=810be036d4e95b015f26c01ba999a2e14fde69b5af0c264fe607971a35ba3787 -ppc64le=b698a18e2d756e00697a63ac749825ee8c3f8a50133b8998dbed092c474c1f46 -riscv64=02a69336c173007423c0be20cfd1c8a76b3fd6d4696a5b7c19022be9a631688b -s390x=72832339f6003c44a4cdf9d576efa3ff19d5ce55e27198b4089528bf0dd04788 +amd64=7926ace2c2ee4c8fd52ae5badcc409685bae02090f796e26ef7bb504245acaaa +arm64=2754fd36a6cd1753c82f7954477b9864a0d0d2f7825c9ad015f3f2ce58d82f14 +arm=a2f2a8253820f58e0a9721045b9e5f61df2aef79d679a638e2b901d4be6e27df +ppc64le=5e26d389ce9d04cf55740944dab6d7896766fd21a5eca05201e0aef4eab96852 +riscv64=d671b149d69a3497f12d802a2cb3bf490bc16d9f048583868176bd016f35da61 +s390x=50895280c397af8998357e1a522a4f82774d6abeddc7a95bafeb10cb46692bba diff --git a/CHANGELOG.md b/CHANGELOG.md index 91152dbb6b..486de8c6e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,9 +12,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed -- [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b). -- [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: fix block deduplication - ### Added - [#6605](https://github.com/thanos-io/thanos/pull/6605) Query Frontend: Support vertical sharding binary expression with metric name when no matching labels specified. @@ -22,7 +19,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed -- [#6664](https://github.com/thanos-io/thanos/pull/6664) *: Update Prometheus to 2.46.1. - [#6698](https://github.com/thanos-io/thanos/pull/6608) Receive: Change write log level from warn to info. - [#6699](https://github.com/thanos-io/thanos/pull/6699) Receive: Use new file reloading logic for hashring configuration. @@ -30,6 +26,24 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6686](https://github.com/thanos-io/thanos/pull/6686) Remove deprecated `--log.request.decision` flag. We now use `--request.logging-config` to set logging decisions. +## [v0.32.3](https://github.com/thanos-io/thanos/tree/release-0.32) - 20.09.2023 + +### Fixed + +- [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b). +- [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: Fix block deduplication +- [#6706](https://github.com/thanos-io/thanos/pull/6706) Store: Series responses should always be sorted + +### Added + +### Changed + +- [#6664](https://github.com/thanos-io/thanos/pull/6664) *: Update Prometheus to 2.46.1. +- [#6722](https://github.com/thanos-io/thanos/pull/6722) *: Optimize iterations on GCS buckets by requesting only object names. +- [#6544](https://github.com/thanos-io/thanos/pull/6500) Objstore: Update objstore to latest version which adds a new metric regarding uploaded TSDB bytes + +### Removed + ## [v0.32.2](https://github.com/thanos-io/thanos/tree/release-0.32) - 31.08.2023 ### Fixed @@ -58,6 +72,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed +- [#6664](https://github.com/thanos-io/thanos/pull/6664) *: Update Prometheus to 2.46.1. + ### Removed ## [v0.32.0](https://github.com/thanos-io/thanos/tree/release-0.32) - 23.08.2023 diff --git a/VERSION b/VERSION index d535f78d38..6eb78516dd 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.33.0-dev \ No newline at end of file +0.33.0-dev diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 1f6be9ad91..90a40bba6b 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -367,21 +367,6 @@ func runReceive( grpcserver.WithTLSConfig(tlsCfg), ) - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - dbs.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - g.Add( func() error { level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 2d519185a3..ceadf1159c 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -656,21 +656,6 @@ func runRule( ) storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger))) - - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - tsdbStore.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } options = append(options, grpcserver.WithServer( diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 968a09ee9c..27cf759b2a 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -48,7 +48,6 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" ) @@ -113,9 +112,8 @@ func runSidecar( mint: conf.limitMinTime.PrometheusTimestamp(), maxt: math.MaxInt64, - limitMinTime: conf.limitMinTime, - client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), - labelNamesSet: stringset.AllStrings(), + limitMinTime: conf.limitMinTime, + client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), } confContentYaml, err := conf.objStore.Content() @@ -239,19 +237,6 @@ func runSidecar( }, func(error) { cancel() }) - - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - m.UpdateLabelNames(context.Background()) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } { ctx, cancel := context.WithCancel(context.Background()) @@ -264,7 +249,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -434,8 +419,6 @@ type promMetadata struct { limitMinTime thanosmodel.TimeOrDurationValue client *promclient.Client - - labelNamesSet stringset.Set } func (s *promMetadata) UpdateLabels(ctx context.Context) error { @@ -463,30 +446,6 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) { s.maxt = maxt } -func (s *promMetadata) UpdateLabelNames(ctx context.Context) { - mint, _ := s.Timestamps() - labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli()) - if err != nil { - s.mtx.Lock() - defer s.mtx.Unlock() - - s.labelNamesSet = stringset.AllStrings() - return - } - - filter := stringset.NewFromStrings(labelNames...) - s.mtx.Lock() - s.labelNamesSet = filter - s.mtx.Unlock() -} - -func (s *promMetadata) LabelNamesSet() stringset.Set { - s.mtx.Lock() - defer s.mtx.Unlock() - - return s.labelNamesSet -} - func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 29ac6921a2..73e4b838dd 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -501,23 +501,6 @@ func runStore( }) } - { - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - bs.UpdateLabelNames() - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - - } // Add bucket UI for loaded blocks. { ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) diff --git a/docs/components/query.md b/docs/components/query.md index 87733ff02d..237eb83794 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -103,16 +103,6 @@ thanos query \ This logic can also be controlled via parameter on QueryAPI. More details below. -### Deduplication on non-external labels - -In `v0.31.0` we have implemented an [optimization](../proposals-accepted/20221129-avoid-global-sort.md) which broke deduplication on non-external labels. We think that it was just a coincidence that deduplication worked at all on non-external labels in previous versions. - -External labels always override any labels a series might have and this makes it so that it is possible to remove replica labels on series returned by a StoreAPI as an optimization. If deduplication happens on internal labels then that might lead to unsorted series from a StoreAPI and that breaks deduplication. - -To fix this use-case, in 0.32.0 we've implemented a cuckoo filter on label names that is updated every 10 seconds. Using it we can detect whether deduplication was requested on internal labels. If that is the case then the series set is resorted before being sent off to the querier. It is strongly recommended to set replica labels which are external labels because otherwise the optimization cannot be applied and your queries will be slower by 20-30%. - -In the future we have plans to expose this cuckoo filter through the InfoAPI. This will allow better scoping queries to StoreAPIs. - ## Experimental PromQL Engine By default, Thanos querier comes with standard Prometheus PromQL engine. However, when `--query.promql-engine=thanos` is specified, Thanos will use [experimental Thanos PromQL engine](http://github.com/thanos-community/promql-engine) which is a drop-in, efficient implementation of PromQL engine with query planner and optimizers. diff --git a/go.mod b/go.mod index be47d64ae0..404442782e 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/prometheus/prometheus v0.46.1-0.20230818184859-4d8e380269da github.com/sony/gobreaker v0.5.0 github.com/stretchr/testify v1.8.4 - github.com/thanos-io/objstore v0.0.0-20230908084555-8d397d4d88e7 + github.com/thanos-io/objstore v0.0.0-20230913122821-eb06103887ab github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible // indirect @@ -118,14 +118,12 @@ require ( require ( github.com/onsi/gomega v1.27.10 - github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 ) require ( - github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect @@ -246,7 +244,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/oauth2 v0.11.0 // indirect - golang.org/x/sys v0.11.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/tools v0.11.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gonum.org/v1/gonum v0.12.0 // indirect diff --git a/go.sum b/go.sum index 2044edb79a..2f61a3076b 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= -github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= @@ -851,8 +849,6 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20 h1:a9hSJdJcd16e0HoMsnFvaHvxB3pxSD+SC7+CISp7xY0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U= -github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= @@ -906,8 +902,8 @@ github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4 github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= -github.com/thanos-io/objstore v0.0.0-20230908084555-8d397d4d88e7 h1:P1mukL6u3wKv4gRLjhnEYltZf8k5dXkE7y7UvEJo0fU= -github.com/thanos-io/objstore v0.0.0-20230908084555-8d397d4d88e7/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= +github.com/thanos-io/objstore v0.0.0-20230913122821-eb06103887ab h1:IfcvGL/erj7I/P5Ugoae4lUFJQ/O71ELPdFtDSNtcCQ= +github.com/thanos-io/objstore v0.0.0-20230913122821-eb06103887ab/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e h1:kwsFCU8eSkZehbrAN3nXPw5RdMHi/Bok/y8l2C4M+gk= github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e/go.mod h1:+T/ZYNCGybT6eTsGGvVtGb63nT1cvUmH6MjqRrcQoKw= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= @@ -1291,8 +1287,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 4dadb97343..509e9f3535 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -867,19 +867,6 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab return initialLset, nil } -func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) { - t.mtx.RLock() - defer t.mtx.RUnlock() - - for _, tenant := range t.tenants { - db := tenant.storeTSDB - if db == nil { - continue - } - db.UpdateLabelNames(ctx) - } -} - // extendLabels extends external labels of the initial label set. // If an external label shares same name with a label in the initial label set, // use the label in the initial label set and inform user about it. diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index f6a5ef55ec..c22c27bf3c 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -28,7 +28,6 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -238,9 +237,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "n", Value: "1"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -251,7 +250,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -270,9 +269,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "missing", Value: ""}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -295,8 +294,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".+"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -306,9 +305,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -332,8 +331,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: ""}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -352,8 +351,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -363,9 +362,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "^1$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -376,7 +375,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -387,8 +386,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), }, }, { @@ -422,9 +421,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.*$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -435,8 +434,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.+$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -489,8 +488,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -501,7 +500,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -512,8 +511,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -545,7 +544,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -557,7 +556,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^(b|a).*$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -567,9 +566,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "(1|2)"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), }, }, @@ -580,8 +579,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "a|b"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -591,8 +590,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "(a|b)"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -706,12 +705,15 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset } testutil.Ok(t, err) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) + receivedLabels := make([]labels.Labels, 0) for _, s := range srv.SeriesSet { receivedLabels = append(receivedLabels, s.PromLabels()) } - slices.SortFunc(c.expectedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) - slices.SortFunc(receivedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) + testutil.Equals(t, c.expectedLabels, receivedLabels) }) } @@ -824,7 +826,6 @@ func TestPrometheusStore_Acceptance(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, - func() stringset.Set { return stringset.AllStrings() }, func() string { return version }) testutil.Ok(tt, err) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4a8eae4572..5a6f31c42d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -59,7 +59,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" @@ -148,9 +147,15 @@ type bucketStoreMetrics struct { cachedPostingsOriginalSizeBytes prometheus.Counter cachedPostingsCompressedSizeBytes prometheus.Counter - seriesFetchDuration prometheus.Histogram - postingsFetchDuration prometheus.Histogram - chunkFetchDuration prometheus.Histogram + seriesFetchDuration prometheus.Histogram + // Counts time for fetching series across all batches. + seriesFetchDurationSum prometheus.Histogram + postingsFetchDuration prometheus.Histogram + // chunkFetchDuration counts total time loading chunks, but since we spawn + // multiple goroutines the actual latency is usually much lower than it. + chunkFetchDuration prometheus.Histogram + // Actual absolute total time for loading chunks. + chunkFetchDurationSum prometheus.Histogram } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -289,6 +294,12 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, }) + m.seriesFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_series_fetch_duration_sum_seconds", + Help: "The total time it takes to fetch series to respond to a request sent to a store gateway across all series batches. It includes both the time to fetch it from the cache and from storage in case of cache misses.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_bucket_store_postings_fetch_duration_seconds", Help: "The time it takes to fetch postings to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.", @@ -297,7 +308,13 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.chunkFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_bucket_store_chunks_fetch_duration_seconds", - Help: "The total time spent fetching chunks within a single request a store gateway.", + Help: "The total time spent fetching chunks within a single request for one block.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + + m.chunkFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_chunks_fetch_duration_sum_seconds", + Help: "The total absolute time spent fetching chunks within a single request for one block.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, }) @@ -387,9 +404,6 @@ type BucketStore struct { enabledLazyExpandedPostings bool - bmtx sync.Mutex - labelNamesSet stringset.Set - blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -543,7 +557,6 @@ func NewBucketStore( enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, seriesBatchSize: SeriesBatchSize, - labelNamesSet: stringset.AllStrings(), } for _, option := range options { @@ -931,11 +944,13 @@ type blockSeriesClient struct { lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter - skipChunks bool - shardMatcher *storepb.ShardMatcher - blockMatchers []*labels.Matcher - calculateChunkHash bool - chunkFetchDuration prometheus.Histogram + skipChunks bool + shardMatcher *storepb.ShardMatcher + blockMatchers []*labels.Matcher + calculateChunkHash bool + seriesFetchDurationSum prometheus.Histogram + chunkFetchDuration prometheus.Histogram + chunkFetchDurationSum prometheus.Histogram // Internal state. i uint64 @@ -960,7 +975,9 @@ func newBlockSeriesClient( shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, + seriesFetchDurationSum prometheus.Histogram, chunkFetchDuration prometheus.Histogram, + chunkFetchDurationSum prometheus.Histogram, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, lazyExpandedPostingsCount prometheus.Counter, @@ -983,14 +1000,16 @@ func newBlockSeriesClient( extLset: extLset, extLsetToRemove: extLsetToRemove, - mint: req.MinTime, - maxt: req.MaxTime, - indexr: b.indexReader(), - chunkr: chunkr, - chunksLimiter: limiter, - bytesLimiter: bytesLimiter, - skipChunks: req.SkipChunks, - chunkFetchDuration: chunkFetchDuration, + mint: req.MinTime, + maxt: req.MaxTime, + indexr: b.indexReader(), + chunkr: chunkr, + chunksLimiter: limiter, + bytesLimiter: bytesLimiter, + skipChunks: req.SkipChunks, + seriesFetchDurationSum: seriesFetchDurationSum, + chunkFetchDuration: chunkFetchDuration, + chunkFetchDurationSum: chunkFetchDurationSum, lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, lazyExpandedPostingsCount: lazyExpandedPostingsCount, @@ -1079,8 +1098,10 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { } if len(b.entries) == 0 { + b.seriesFetchDurationSum.Observe(b.indexr.stats.SeriesDownloadLatencySum.Seconds()) if b.chunkr != nil { b.chunkFetchDuration.Observe(b.chunkr.stats.ChunksFetchDurationSum.Seconds()) + b.chunkFetchDurationSum.Observe(b.chunkr.stats.ChunksDownloadLatencySum.Seconds()) } return nil, io.EOF } @@ -1334,7 +1355,8 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv, sortingStrategyNone) + if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1430,7 +1452,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, + s.metrics.seriesFetchDurationSum, s.metrics.chunkFetchDuration, + s.metrics.chunkFetchDurationSum, extLsetToRemove, s.enabledLazyExpandedPostings, s.metrics.lazyExpandedPostingsCount, @@ -1464,44 +1488,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - // If we have inner replica labels we need to resort. - s.mtx.Lock() - needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels) - s.mtx.Unlock() - - var resp respSet - if needsEagerRetrival { - labelsToRemove := make(map[string]struct{}) - for _, replicaLabel := range req.WithoutReplicaLabels { - labelsToRemove[replicaLabel] = struct{}{} - } - resp = newEagerRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - labelsToRemove, - ) - } else { - resp = newLazyRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - ) - } + resp := newEagerRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + nil, + ) mtx.Lock() respSets = append(respSets, resp) @@ -1736,7 +1735,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, true, SeriesBatchSize, - s.metrics.chunkFetchDuration, + s.metrics.seriesFetchDurationSum, + nil, + nil, nil, s.enabledLazyExpandedPostings, s.metrics.lazyExpandedPostingsCount, @@ -1814,38 +1815,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq }, nil } -func (s *BucketStore) UpdateLabelNames() { - s.mtx.RLock() - defer s.mtx.RUnlock() - - newSet := stringset.New() - for _, b := range s.blocks { - labelNames, err := b.indexHeaderReader.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "block", b.meta.ULID, "err", err.Error()) - s.updateLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range labelNames { - newSet.Insert(l) - } - } - s.updateLabelNamesSet(newSet) -} - -func (s *BucketStore) updateLabelNamesSet(newSet stringset.Set) { - s.bmtx.Lock() - s.labelNamesSet = newSet - s.bmtx.Unlock() -} - -func (b *BucketStore) LabelNamesSet() stringset.Set { - b.bmtx.Lock() - defer b.bmtx.Unlock() - - return b.labelNamesSet -} - func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) { // We filter external labels from matchers so we won't try to match series on them. var result []*labels.Matcher @@ -1969,7 +1938,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, true, SeriesBatchSize, - s.metrics.chunkFetchDuration, + s.metrics.seriesFetchDurationSum, + nil, + nil, nil, s.enabledLazyExpandedPostings, s.metrics.lazyExpandedPostingsCount, @@ -3073,7 +3044,10 @@ func (it *bigEndianPostings) length() int { func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error { timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration) - defer timer.ObserveDuration() + defer func() { + d := timer.ObserveDuration() + r.stats.SeriesDownloadLatencySum += d + }() // Load series from cache, overwriting the list of ids to preload // with the missing ones. @@ -3391,7 +3365,10 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ r.loadingChunks = true r.loadingChunksMtx.Unlock() + begin := time.Now() defer func() { + r.stats.ChunksDownloadLatencySum += time.Since(begin) + r.loadingChunksMtx.Lock() r.loadingChunks = false r.loadingChunksMtx.Unlock() @@ -3620,19 +3597,21 @@ type queryStats struct { cachedPostingsDecompressionErrors int CachedPostingsDecompressionTimeSum time.Duration - seriesTouched int - SeriesTouchedSizeSum units.Base2Bytes - seriesFetched int - SeriesFetchedSizeSum units.Base2Bytes - seriesFetchCount int - SeriesFetchDurationSum time.Duration - - chunksTouched int - ChunksTouchedSizeSum units.Base2Bytes - chunksFetched int - ChunksFetchedSizeSum units.Base2Bytes - chunksFetchCount int - ChunksFetchDurationSum time.Duration + seriesTouched int + SeriesTouchedSizeSum units.Base2Bytes + seriesFetched int + SeriesFetchedSizeSum units.Base2Bytes + seriesFetchCount int + SeriesFetchDurationSum time.Duration + SeriesDownloadLatencySum time.Duration + + chunksTouched int + ChunksTouchedSizeSum units.Base2Bytes + chunksFetched int + ChunksFetchedSizeSum units.Base2Bytes + chunksFetchCount int + ChunksFetchDurationSum time.Duration + ChunksDownloadLatencySum time.Duration GetAllDuration time.Duration mergedSeriesCount int @@ -3668,6 +3647,7 @@ func (s queryStats) merge(o *queryStats) *queryStats { s.SeriesFetchedSizeSum += o.SeriesFetchedSizeSum s.seriesFetchCount += o.seriesFetchCount s.SeriesFetchDurationSum += o.SeriesFetchDurationSum + s.SeriesDownloadLatencySum += o.SeriesDownloadLatencySum s.chunksTouched += o.chunksTouched s.ChunksTouchedSizeSum += o.ChunksTouchedSizeSum @@ -3675,6 +3655,7 @@ func (s queryStats) merge(o *queryStats) *queryStats { s.ChunksFetchedSizeSum += o.ChunksFetchedSizeSum s.chunksFetchCount += o.chunksFetchCount s.ChunksFetchDurationSum += o.ChunksFetchDurationSum + s.ChunksDownloadLatencySum += o.ChunksDownloadLatencySum s.GetAllDuration += o.GetAllDuration s.mergedSeriesCount += o.mergedSeriesCount @@ -3708,6 +3689,8 @@ func (s queryStats) toHints() *hintspb.QueryStats { MergedSeriesCount: int64(s.mergedSeriesCount), MergedChunksCount: int64(s.mergedChunksCount), DataDownloadedSizeSum: int64(s.DataDownloadedSizeSum), + GetAllDuration: s.GetAllDuration, + MergeDuration: s.MergeDuration, } } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 7262dee155..ebd1ffa709 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -779,30 +779,6 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { }) } -func TestBucketStore_LabelNamesSet_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { - dir := t.TempDir() - - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) - s.cache.SwapWith(noopCache{}) - - mint, maxt := s.store.TimeRange() - testutil.Equals(t, s.minTime, mint) - testutil.Equals(t, s.maxTime, maxt) - - s.store.UpdateLabelNames() - for _, b := range s.store.blocks { - waitTimeout(t, &b.pendingReaders, 5*time.Second) - } - - filter := s.store.LabelNamesSet() - for _, n := range []string{"a", "b", "c"} { - testutil.Assert(t, filter.Has(n), "expected filter to have %s", n) - } - testutil.Equals(t, 3, filter.Count()) - }) -} - func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) { cases := map[string]struct { maxSeriesLimit uint64 diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 82d42dd9d7..3df9359bf4 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cespare/xxhash" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" @@ -47,8 +48,6 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -61,7 +60,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -1661,7 +1659,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { chunksLimiterFactory: NewChunksLimiterFactory(0), seriesLimiterFactory: NewSeriesLimiterFactory(0), bytesLimiterFactory: NewBytesLimiterFactory(0), - labelNamesSet: stringset.AllStrings(), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -2769,6 +2766,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet false, SeriesBatchSize, dummyHistogram, + dummyHistogram, + dummyHistogram, nil, false, dummyCounter, @@ -3471,9 +3470,6 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) - // make sure to have updated inner label names - bucketStore.UpdateLabelNames() - srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{ WithoutReplicaLabels: []string{"replica"}, @@ -3484,5 +3480,128 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { }, }, srv)) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) testutil.Equals(t, 2, len(srv.SeriesSet)) } + +func TestQueryStatsMerge(t *testing.T) { + s := &queryStats{ + blocksQueried: 1, + postingsTouched: 1, + PostingsTouchedSizeSum: 1, + postingsToFetch: 1, + postingsFetched: 1, + PostingsFetchedSizeSum: 1, + postingsFetchCount: 1, + PostingsFetchDurationSum: 1, + cachedPostingsCompressions: 1, + cachedPostingsCompressionErrors: 1, + CachedPostingsOriginalSizeSum: 1, + CachedPostingsCompressedSizeSum: 1, + CachedPostingsCompressionTimeSum: 1, + cachedPostingsDecompressions: 1, + cachedPostingsDecompressionErrors: 1, + CachedPostingsDecompressionTimeSum: 1, + seriesTouched: 1, + SeriesTouchedSizeSum: 1, + seriesFetched: 1, + SeriesFetchedSizeSum: 1, + seriesFetchCount: 1, + SeriesFetchDurationSum: 1, + SeriesDownloadLatencySum: 1, + chunksTouched: 1, + ChunksTouchedSizeSum: 1, + chunksFetched: 1, + ChunksFetchedSizeSum: 1, + chunksFetchCount: 1, + ChunksFetchDurationSum: 1, + ChunksDownloadLatencySum: 1, + GetAllDuration: 1, + mergedSeriesCount: 1, + mergedChunksCount: 1, + MergeDuration: 1, + DataDownloadedSizeSum: 1, + } + + o := &queryStats{ + blocksQueried: 100, + postingsTouched: 100, + PostingsTouchedSizeSum: 100, + postingsToFetch: 100, + postingsFetched: 100, + PostingsFetchedSizeSum: 100, + postingsFetchCount: 100, + PostingsFetchDurationSum: 100, + cachedPostingsCompressions: 100, + cachedPostingsCompressionErrors: 100, + CachedPostingsOriginalSizeSum: 100, + CachedPostingsCompressedSizeSum: 100, + CachedPostingsCompressionTimeSum: 100, + cachedPostingsDecompressions: 100, + cachedPostingsDecompressionErrors: 100, + CachedPostingsDecompressionTimeSum: 100, + seriesTouched: 100, + SeriesTouchedSizeSum: 100, + seriesFetched: 100, + SeriesFetchedSizeSum: 100, + seriesFetchCount: 100, + SeriesFetchDurationSum: 100, + SeriesDownloadLatencySum: 100, + chunksTouched: 100, + ChunksTouchedSizeSum: 100, + chunksFetched: 100, + ChunksFetchedSizeSum: 100, + chunksFetchCount: 100, + ChunksFetchDurationSum: 100, + ChunksDownloadLatencySum: 100, + GetAllDuration: 100, + mergedSeriesCount: 100, + mergedChunksCount: 100, + MergeDuration: 100, + DataDownloadedSizeSum: 100, + } + + // Expected stats. + e := &queryStats{ + blocksQueried: 101, + postingsTouched: 101, + PostingsTouchedSizeSum: 101, + postingsToFetch: 101, + postingsFetched: 101, + PostingsFetchedSizeSum: 101, + postingsFetchCount: 101, + PostingsFetchDurationSum: 101, + cachedPostingsCompressions: 101, + cachedPostingsCompressionErrors: 101, + CachedPostingsOriginalSizeSum: 101, + CachedPostingsCompressedSizeSum: 101, + CachedPostingsCompressionTimeSum: 101, + cachedPostingsDecompressions: 101, + cachedPostingsDecompressionErrors: 101, + CachedPostingsDecompressionTimeSum: 101, + seriesTouched: 101, + SeriesTouchedSizeSum: 101, + seriesFetched: 101, + SeriesFetchedSizeSum: 101, + seriesFetchCount: 101, + SeriesFetchDurationSum: 101, + SeriesDownloadLatencySum: 101, + chunksTouched: 101, + ChunksTouchedSizeSum: 101, + chunksFetched: 101, + ChunksFetchedSizeSum: 101, + chunksFetchCount: 101, + ChunksFetchDurationSum: 101, + ChunksDownloadLatencySum: 101, + GetAllDuration: 101, + mergedSeriesCount: 101, + mergedChunksCount: 101, + MergeDuration: 101, + DataDownloadedSizeSum: 101, + } + + output := s.merge(o) + testutil.Equals(t, e, output) +} diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index c41b67d152..e6cadfbea9 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -9,24 +9,35 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" +) + +type sortingStrategy uint64 + +const ( + sortingStrategyStore sortingStrategy = iota + 1 + sortingStrategyNone ) // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. type flushableServer interface { storepb.Store_SeriesServer + Flush() error } func newFlushableServer( upstream storepb.Store_SeriesServer, - labelNames stringset.Set, - replicaLabels []string, + sortingsortingStrategy sortingStrategy, ) flushableServer { - if labelNames.HasAny(replicaLabels) { + switch sortingsortingStrategy { + case sortingStrategyStore: return &resortingServer{Store_SeriesServer: upstream} + case sortingStrategyNone: + return &passthroughServer{Store_SeriesServer: upstream} + default: + // should not happen. + panic("unexpected sorting strategy") } - return &passthroughServer{Store_SeriesServer: upstream} } // passthroughServer is a flushableServer that forwards all data to diff --git a/pkg/store/hintspb/custom.go b/pkg/store/hintspb/custom.go index 9d7da86e94..bf82d245e2 100644 --- a/pkg/store/hintspb/custom.go +++ b/pkg/store/hintspb/custom.go @@ -47,4 +47,7 @@ func (m *QueryStats) Merge(other *QueryStats) { m.ChunksFetchedSizeSum += other.ChunksFetchedSizeSum m.ChunksTouched += other.ChunksTouched m.ChunksTouchedSizeSum += other.ChunksTouchedSizeSum + + m.GetAllDuration += other.GetAllDuration + m.MergeDuration += other.MergeDuration } diff --git a/pkg/store/hintspb/hints.pb.go b/pkg/store/hintspb/hints.pb.go index 457793cf6a..2098d7489c 100644 --- a/pkg/store/hintspb/hints.pb.go +++ b/pkg/store/hintspb/hints.pb.go @@ -5,12 +5,16 @@ package hintspb import ( fmt "fmt" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + io "io" math "math" math_bits "math/bits" + time "time" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" storepb "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -18,6 +22,7 @@ import ( var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +var _ = time.Kitchen // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. @@ -301,26 +306,28 @@ var xxx_messageInfo_LabelValuesResponseHints proto.InternalMessageInfo // / QueryStats fields are unstable and might change in the future. type QueryStats struct { - BlocksQueried int64 `protobuf:"varint,1,opt,name=blocks_queried,json=blocksQueried,proto3" json:"blocks_queried,omitempty"` - MergedSeriesCount int64 `protobuf:"varint,2,opt,name=merged_series_count,json=mergedSeriesCount,proto3" json:"merged_series_count,omitempty"` - MergedChunksCount int64 `protobuf:"varint,3,opt,name=merged_chunks_count,json=mergedChunksCount,proto3" json:"merged_chunks_count,omitempty"` - PostingsTouched int64 `protobuf:"varint,4,opt,name=postings_touched,json=postingsTouched,proto3" json:"postings_touched,omitempty"` - PostingsTouchedSizeSum int64 `protobuf:"varint,5,opt,name=postings_touched_size_sum,json=postingsTouchedSizeSum,proto3" json:"postings_touched_size_sum,omitempty"` - PostingsToFetch int64 `protobuf:"varint,6,opt,name=postings_to_fetch,json=postingsToFetch,proto3" json:"postings_to_fetch,omitempty"` - PostingsFetched int64 `protobuf:"varint,7,opt,name=postings_fetched,json=postingsFetched,proto3" json:"postings_fetched,omitempty"` - PostingsFetchedSizeSum int64 `protobuf:"varint,8,opt,name=postings_fetched_size_sum,json=postingsFetchedSizeSum,proto3" json:"postings_fetched_size_sum,omitempty"` - PostingsFetchCount int64 `protobuf:"varint,9,opt,name=postings_fetch_count,json=postingsFetchCount,proto3" json:"postings_fetch_count,omitempty"` - SeriesTouched int64 `protobuf:"varint,10,opt,name=series_touched,json=seriesTouched,proto3" json:"series_touched,omitempty"` - SeriesTouchedSizeSum int64 `protobuf:"varint,11,opt,name=series_touched_size_sum,json=seriesTouchedSizeSum,proto3" json:"series_touched_size_sum,omitempty"` - SeriesFetched int64 `protobuf:"varint,12,opt,name=series_fetched,json=seriesFetched,proto3" json:"series_fetched,omitempty"` - SeriesFetchedSizeSum int64 `protobuf:"varint,13,opt,name=series_fetched_size_sum,json=seriesFetchedSizeSum,proto3" json:"series_fetched_size_sum,omitempty"` - SeriesFetchCount int64 `protobuf:"varint,14,opt,name=series_fetch_count,json=seriesFetchCount,proto3" json:"series_fetch_count,omitempty"` - ChunksTouched int64 `protobuf:"varint,15,opt,name=chunks_touched,json=chunksTouched,proto3" json:"chunks_touched,omitempty"` - ChunksTouchedSizeSum int64 `protobuf:"varint,16,opt,name=chunks_touched_size_sum,json=chunksTouchedSizeSum,proto3" json:"chunks_touched_size_sum,omitempty"` - ChunksFetched int64 `protobuf:"varint,17,opt,name=chunks_fetched,json=chunksFetched,proto3" json:"chunks_fetched,omitempty"` - ChunksFetchedSizeSum int64 `protobuf:"varint,18,opt,name=chunks_fetched_size_sum,json=chunksFetchedSizeSum,proto3" json:"chunks_fetched_size_sum,omitempty"` - ChunksFetchCount int64 `protobuf:"varint,19,opt,name=chunks_fetch_count,json=chunksFetchCount,proto3" json:"chunks_fetch_count,omitempty"` - DataDownloadedSizeSum int64 `protobuf:"varint,20,opt,name=data_downloaded_size_sum,json=dataDownloadedSizeSum,proto3" json:"data_downloaded_size_sum,omitempty"` + BlocksQueried int64 `protobuf:"varint,1,opt,name=blocks_queried,json=blocksQueried,proto3" json:"blocks_queried,omitempty"` + MergedSeriesCount int64 `protobuf:"varint,2,opt,name=merged_series_count,json=mergedSeriesCount,proto3" json:"merged_series_count,omitempty"` + MergedChunksCount int64 `protobuf:"varint,3,opt,name=merged_chunks_count,json=mergedChunksCount,proto3" json:"merged_chunks_count,omitempty"` + PostingsTouched int64 `protobuf:"varint,4,opt,name=postings_touched,json=postingsTouched,proto3" json:"postings_touched,omitempty"` + PostingsTouchedSizeSum int64 `protobuf:"varint,5,opt,name=postings_touched_size_sum,json=postingsTouchedSizeSum,proto3" json:"postings_touched_size_sum,omitempty"` + PostingsToFetch int64 `protobuf:"varint,6,opt,name=postings_to_fetch,json=postingsToFetch,proto3" json:"postings_to_fetch,omitempty"` + PostingsFetched int64 `protobuf:"varint,7,opt,name=postings_fetched,json=postingsFetched,proto3" json:"postings_fetched,omitempty"` + PostingsFetchedSizeSum int64 `protobuf:"varint,8,opt,name=postings_fetched_size_sum,json=postingsFetchedSizeSum,proto3" json:"postings_fetched_size_sum,omitempty"` + PostingsFetchCount int64 `protobuf:"varint,9,opt,name=postings_fetch_count,json=postingsFetchCount,proto3" json:"postings_fetch_count,omitempty"` + SeriesTouched int64 `protobuf:"varint,10,opt,name=series_touched,json=seriesTouched,proto3" json:"series_touched,omitempty"` + SeriesTouchedSizeSum int64 `protobuf:"varint,11,opt,name=series_touched_size_sum,json=seriesTouchedSizeSum,proto3" json:"series_touched_size_sum,omitempty"` + SeriesFetched int64 `protobuf:"varint,12,opt,name=series_fetched,json=seriesFetched,proto3" json:"series_fetched,omitempty"` + SeriesFetchedSizeSum int64 `protobuf:"varint,13,opt,name=series_fetched_size_sum,json=seriesFetchedSizeSum,proto3" json:"series_fetched_size_sum,omitempty"` + SeriesFetchCount int64 `protobuf:"varint,14,opt,name=series_fetch_count,json=seriesFetchCount,proto3" json:"series_fetch_count,omitempty"` + ChunksTouched int64 `protobuf:"varint,15,opt,name=chunks_touched,json=chunksTouched,proto3" json:"chunks_touched,omitempty"` + ChunksTouchedSizeSum int64 `protobuf:"varint,16,opt,name=chunks_touched_size_sum,json=chunksTouchedSizeSum,proto3" json:"chunks_touched_size_sum,omitempty"` + ChunksFetched int64 `protobuf:"varint,17,opt,name=chunks_fetched,json=chunksFetched,proto3" json:"chunks_fetched,omitempty"` + ChunksFetchedSizeSum int64 `protobuf:"varint,18,opt,name=chunks_fetched_size_sum,json=chunksFetchedSizeSum,proto3" json:"chunks_fetched_size_sum,omitempty"` + ChunksFetchCount int64 `protobuf:"varint,19,opt,name=chunks_fetch_count,json=chunksFetchCount,proto3" json:"chunks_fetch_count,omitempty"` + DataDownloadedSizeSum int64 `protobuf:"varint,20,opt,name=data_downloaded_size_sum,json=dataDownloadedSizeSum,proto3" json:"data_downloaded_size_sum,omitempty"` + GetAllDuration time.Duration `protobuf:"bytes,21,opt,name=get_all_duration,json=getAllDuration,proto3,stdduration" json:"get_all_duration"` + MergeDuration time.Duration `protobuf:"bytes,22,opt,name=merge_duration,json=mergeDuration,proto3,stdduration" json:"merge_duration"` } func (m *QueryStats) Reset() { *m = QueryStats{} } @@ -370,48 +377,53 @@ func init() { func init() { proto.RegisterFile("store/hintspb/hints.proto", fileDescriptor_b82aa23c4c11e83f) } var fileDescriptor_b82aa23c4c11e83f = []byte{ - // 652 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x4f, 0x13, 0x41, - 0x1c, 0xed, 0x52, 0x3e, 0x7f, 0x95, 0x52, 0xa6, 0x15, 0x16, 0x0e, 0x2b, 0x69, 0x42, 0x82, 0x86, - 0x14, 0x83, 0x31, 0x46, 0x3d, 0x09, 0x86, 0x78, 0x50, 0x13, 0x5a, 0x83, 0x89, 0x9a, 0x4c, 0xf6, - 0x63, 0xec, 0x6e, 0x68, 0x77, 0x96, 0x9d, 0xd9, 0x18, 0xb8, 0x7b, 0x35, 0xfe, 0x59, 0xc4, 0x13, - 0x47, 0x4f, 0x46, 0xe1, 0x1f, 0x31, 0x3b, 0x1f, 0xec, 0x4c, 0xb9, 0xf6, 0x02, 0xcd, 0xfb, 0xbd, - 0xf7, 0xe6, 0xbd, 0xdf, 0x4c, 0xb2, 0xb0, 0xc1, 0x38, 0xcd, 0xc9, 0x5e, 0x9c, 0xa4, 0x9c, 0x65, - 0x81, 0xfc, 0xdf, 0xcb, 0x72, 0xca, 0x29, 0x5a, 0x50, 0xe0, 0x66, 0x67, 0x48, 0x87, 0x54, 0x60, - 0x7b, 0xe5, 0x2f, 0x39, 0xde, 0x54, 0x4a, 0xf1, 0x37, 0x0b, 0xf6, 0xf8, 0x79, 0x46, 0x94, 0xb2, - 0xfb, 0xdd, 0x01, 0x34, 0x20, 0x79, 0x42, 0x58, 0x9f, 0x9c, 0x15, 0x84, 0xf1, 0x37, 0xa5, 0x13, - 0x7a, 0x05, 0xcd, 0x60, 0x44, 0xc3, 0x53, 0x3c, 0xf6, 0x79, 0x18, 0x93, 0x9c, 0xb9, 0xce, 0x56, - 0x7d, 0xa7, 0xb1, 0xdf, 0xe9, 0xf1, 0xd8, 0x4f, 0x29, 0xeb, 0xbd, 0xf5, 0x03, 0x32, 0x7a, 0x27, - 0x87, 0x07, 0xb3, 0x97, 0x7f, 0x1e, 0xd4, 0xfa, 0xcb, 0x42, 0xa1, 0x30, 0x86, 0x76, 0x01, 0x91, - 0xd4, 0x0f, 0x46, 0x04, 0x9f, 0x15, 0x24, 0x3f, 0xc7, 0x8c, 0xfb, 0x9c, 0xb9, 0x33, 0x5b, 0xce, - 0xce, 0x62, 0xbf, 0x25, 0x27, 0xc7, 0xe5, 0x60, 0x50, 0xe2, 0xdd, 0x1f, 0x0e, 0xb4, 0x75, 0x0e, - 0x96, 0xd1, 0x94, 0x11, 0x19, 0xe4, 0x25, 0x34, 0x4b, 0x79, 0x42, 0x22, 0x2c, 0xec, 0x75, 0x90, - 0x66, 0x4f, 0x55, 0xee, 0x1d, 0x94, 0xb0, 0x8e, 0xa0, 0xb8, 0x02, 0x63, 0xe8, 0x05, 0x34, 0x26, - 0xcf, 0x6e, 0xec, 0xb7, 0x6f, 0x95, 0xd5, 0xf1, 0x42, 0xee, 0xf4, 0xe1, 0xac, 0x0a, 0xb4, 0x0e, - 0x73, 0xc2, 0x05, 0x35, 0x61, 0x26, 0x89, 0x5c, 0x67, 0xcb, 0xd9, 0x59, 0xea, 0xcf, 0x24, 0x51, - 0xf7, 0x33, 0xac, 0x89, 0xf2, 0xef, 0xfd, 0xf1, 0xd4, 0x97, 0xd6, 0x3d, 0x81, 0x75, 0xd3, 0x7c, - 0x5a, 0x9b, 0xe8, 0x7e, 0x51, 0xbe, 0x27, 0xfe, 0xa8, 0x98, 0x7e, 0xea, 0x8f, 0xe0, 0x5a, 0xee, - 0x53, 0x8b, 0xfd, 0x6b, 0x01, 0xa0, 0xba, 0x25, 0xb4, 0xad, 0xa2, 0x32, 0xac, 0x68, 0xe2, 0x5a, - 0xea, 0x2a, 0x0e, 0x3b, 0x96, 0x20, 0xea, 0x41, 0x7b, 0x4c, 0xf2, 0x21, 0x89, 0x30, 0x13, 0x2f, - 0x0a, 0x87, 0xb4, 0x48, 0xb9, 0xb8, 0xfe, 0x7a, 0x7f, 0x55, 0x8e, 0xe4, 0x5b, 0x3b, 0x2c, 0x07, - 0x06, 0x3f, 0x8c, 0x8b, 0xf4, 0x54, 0xf3, 0xeb, 0x26, 0xff, 0x50, 0x4c, 0x24, 0xff, 0x21, 0xb4, - 0x32, 0xca, 0x78, 0x92, 0x0e, 0x19, 0xe6, 0xb4, 0x08, 0x63, 0x12, 0xb9, 0xb3, 0x82, 0xbc, 0xa2, - 0xf1, 0x0f, 0x12, 0x46, 0xcf, 0x61, 0x63, 0x92, 0x8a, 0x59, 0x72, 0x41, 0x30, 0x2b, 0xc6, 0xee, - 0x9c, 0xd0, 0xac, 0x4d, 0x68, 0x06, 0xc9, 0x05, 0x19, 0x14, 0x63, 0xf4, 0x08, 0x56, 0x0d, 0x29, - 0xfe, 0x4a, 0x78, 0x18, 0xbb, 0xf3, 0x93, 0xc7, 0x1c, 0x95, 0xb0, 0x95, 0x48, 0x10, 0x49, 0xe4, - 0x2e, 0xd8, 0xd4, 0x23, 0x09, 0x5b, 0x89, 0x14, 0xb5, 0x4a, 0xb4, 0x68, 0x27, 0x52, 0x1a, 0x9d, - 0xe8, 0x31, 0x74, 0x6c, 0xa9, 0x5a, 0xd4, 0x92, 0x50, 0x21, 0x4b, 0x25, 0x37, 0xb5, 0x0d, 0x4d, - 0x75, 0x05, 0x7a, 0x4f, 0x20, 0x2f, 0x4c, 0xa2, 0x7a, 0x4b, 0x4f, 0x61, 0xdd, 0xa6, 0x55, 0x89, - 0x1a, 0x82, 0xdf, 0xb1, 0xf8, 0x3a, 0x4f, 0xe5, 0xae, 0x3b, 0xdf, 0x33, 0xdd, 0x75, 0xe3, 0xca, - 0xfd, 0x4e, 0xdf, 0x65, 0xd3, 0x7d, 0xa2, 0xed, 0x2e, 0x20, 0x53, 0xa6, 0xba, 0x36, 0x85, 0xa2, - 0x65, 0x28, 0x6e, 0x9b, 0xaa, 0xc7, 0xa3, 0x9b, 0xae, 0xc8, 0x2c, 0x12, 0x35, 0x9a, 0xda, 0xb4, - 0x2a, 0x4b, 0x4b, 0x66, 0xb1, 0xf8, 0x46, 0x53, 0x25, 0xd3, 0x4d, 0x57, 0x4d, 0x77, 0xa3, 0xa9, - 0x4d, 0xab, 0xdc, 0x91, 0xe9, 0x7e, 0xb7, 0xa9, 0x29, 0x53, 0x4d, 0xdb, 0xb2, 0xa9, 0xa1, 0x90, - 0x4d, 0x9f, 0x81, 0x1b, 0xf9, 0xdc, 0xc7, 0x11, 0xfd, 0x96, 0x8e, 0xa8, 0x1f, 0x99, 0xa7, 0x74, - 0x84, 0xe6, 0x7e, 0x39, 0x7f, 0x7d, 0x3b, 0x56, 0xc7, 0x1c, 0x6c, 0x5f, 0xfe, 0xf3, 0x6a, 0x97, - 0xd7, 0x9e, 0x73, 0x75, 0xed, 0x39, 0x7f, 0xaf, 0x3d, 0xe7, 0xe7, 0x8d, 0x57, 0xbb, 0xba, 0xf1, - 0x6a, 0xbf, 0x6f, 0xbc, 0xda, 0x27, 0xfd, 0x09, 0x0b, 0xe6, 0xc5, 0x87, 0xe9, 0xc9, 0xff, 0x00, - 0x00, 0x00, 0xff, 0xff, 0x4e, 0x26, 0x0a, 0x60, 0xef, 0x06, 0x00, 0x00, + // 731 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x4f, 0xdb, 0x30, + 0x1c, 0x6d, 0x28, 0x1f, 0xc5, 0x1d, 0xa1, 0xb8, 0x05, 0x02, 0x87, 0x80, 0x2a, 0x21, 0xb1, 0x09, + 0xa5, 0x13, 0xd3, 0x34, 0x6d, 0x3b, 0xf1, 0x21, 0x34, 0x4d, 0x63, 0x12, 0xe9, 0xc4, 0xa4, 0x6d, + 0x92, 0x95, 0x34, 0x26, 0x8d, 0x48, 0xe3, 0x12, 0x3b, 0x9a, 0xe0, 0xbe, 0xeb, 0xb4, 0xe3, 0xfe, + 0x24, 0x8e, 0x1c, 0x77, 0xda, 0x07, 0x68, 0xff, 0xc7, 0x14, 0x7f, 0x34, 0x4e, 0xb9, 0xec, 0xd0, + 0x0b, 0xb4, 0xef, 0xf7, 0xde, 0xf3, 0x7b, 0x76, 0x1a, 0x83, 0x35, 0xca, 0x48, 0x8a, 0x3b, 0xfd, + 0x28, 0x61, 0x74, 0xe8, 0x8b, 0xff, 0xce, 0x30, 0x25, 0x8c, 0xc0, 0x39, 0x09, 0xae, 0xb7, 0x42, + 0x12, 0x12, 0x8e, 0x75, 0xf2, 0x4f, 0x62, 0xbc, 0x6e, 0x87, 0x84, 0x84, 0x31, 0xee, 0xf0, 0x6f, + 0x7e, 0x76, 0xd6, 0x09, 0xb2, 0xd4, 0x63, 0x11, 0x49, 0xe4, 0x5c, 0x3a, 0xf3, 0xbf, 0x43, 0xbf, + 0xc3, 0x2e, 0x87, 0x58, 0x3a, 0xb7, 0xbf, 0x18, 0x00, 0x76, 0x71, 0x1a, 0x61, 0xea, 0xe2, 0x8b, + 0x0c, 0x53, 0xf6, 0x2a, 0x5f, 0x09, 0xee, 0x01, 0xd3, 0x8f, 0x49, 0xef, 0x1c, 0x0d, 0x3c, 0xd6, + 0xeb, 0xe3, 0x94, 0x5a, 0xc6, 0x66, 0x75, 0xbb, 0xbe, 0xdb, 0x72, 0x58, 0xdf, 0x4b, 0x08, 0x75, + 0xde, 0x78, 0x3e, 0x8e, 0x8f, 0xc5, 0x70, 0x7f, 0xfa, 0xfa, 0xe7, 0x46, 0xc5, 0x5d, 0xe0, 0x0a, + 0x89, 0x51, 0xb8, 0x03, 0x20, 0x4e, 0x3c, 0x3f, 0xc6, 0xe8, 0x22, 0xc3, 0xe9, 0x25, 0xa2, 0xcc, + 0x63, 0xd4, 0x9a, 0xda, 0x34, 0xb6, 0x6b, 0x6e, 0x43, 0x4c, 0x4e, 0xf2, 0x41, 0x37, 0xc7, 0xdb, + 0x5f, 0x0d, 0xd0, 0x54, 0x39, 0xe8, 0x90, 0x24, 0x14, 0x8b, 0x20, 0x2f, 0x81, 0x99, 0xcb, 0x23, + 0x1c, 0x20, 0x6e, 0xaf, 0x82, 0x98, 0x8e, 0xdc, 0x12, 0x67, 0x3f, 0x87, 0x55, 0x04, 0xc9, 0xe5, + 0x18, 0x85, 0x2f, 0x40, 0x7d, 0x7c, 0xed, 0xfa, 0x6e, 0x73, 0xa4, 0x2c, 0x96, 0xe7, 0x72, 0xc3, + 0x05, 0x17, 0x45, 0xa0, 0x55, 0x30, 0xc3, 0x5d, 0xa0, 0x09, 0xa6, 0xa2, 0xc0, 0x32, 0x36, 0x8d, + 0xed, 0x79, 0x77, 0x2a, 0x0a, 0xda, 0x1f, 0xc1, 0x0a, 0x2f, 0xff, 0xd6, 0x1b, 0x4c, 0x7c, 0xd3, + 0xda, 0xa7, 0x60, 0x55, 0x37, 0x9f, 0xd4, 0x4e, 0xb4, 0x3f, 0x49, 0xdf, 0x53, 0x2f, 0xce, 0x26, + 0x9f, 0xfa, 0x3d, 0xb0, 0x4a, 0xee, 0x13, 0x8b, 0xfd, 0xb7, 0x06, 0x40, 0x71, 0x4a, 0x70, 0x4b, + 0x46, 0xa5, 0x48, 0xd2, 0xf8, 0xb1, 0x54, 0x65, 0x1c, 0x7a, 0x22, 0x40, 0xe8, 0x80, 0xe6, 0x00, + 0xa7, 0x21, 0x0e, 0x10, 0xe5, 0x4f, 0x14, 0xea, 0x91, 0x2c, 0x61, 0xfc, 0xf8, 0xab, 0xee, 0x92, + 0x18, 0x89, 0x67, 0xed, 0x20, 0x1f, 0x68, 0xfc, 0x5e, 0x3f, 0x4b, 0xce, 0x15, 0xbf, 0xaa, 0xf3, + 0x0f, 0xf8, 0x44, 0xf0, 0x1f, 0x82, 0xc6, 0x90, 0x50, 0x16, 0x25, 0x21, 0x45, 0x8c, 0x64, 0xbd, + 0x3e, 0x0e, 0xac, 0x69, 0x4e, 0x5e, 0x54, 0xf8, 0x3b, 0x01, 0xc3, 0xe7, 0x60, 0x6d, 0x9c, 0x8a, + 0x68, 0x74, 0x85, 0x11, 0xcd, 0x06, 0xd6, 0x0c, 0xd7, 0xac, 0x8c, 0x69, 0xba, 0xd1, 0x15, 0xee, + 0x66, 0x03, 0xf8, 0x08, 0x2c, 0x69, 0x52, 0x74, 0x86, 0x59, 0xaf, 0x6f, 0xcd, 0x8e, 0x2f, 0x73, + 0x94, 0xc3, 0xa5, 0x44, 0x9c, 0x88, 0x03, 0x6b, 0xae, 0x4c, 0x3d, 0x12, 0x70, 0x29, 0x91, 0xa4, + 0x16, 0x89, 0x6a, 0xe5, 0x44, 0x52, 0xa3, 0x12, 0x3d, 0x06, 0xad, 0xb2, 0x54, 0x6e, 0xd4, 0x3c, + 0x57, 0xc1, 0x92, 0x4a, 0xec, 0xd4, 0x16, 0x30, 0xe5, 0x11, 0xa8, 0x7d, 0x02, 0xe2, 0xc0, 0x04, + 0xaa, 0x76, 0xe9, 0x29, 0x58, 0x2d, 0xd3, 0x8a, 0x44, 0x75, 0xce, 0x6f, 0x95, 0xf8, 0x2a, 0x4f, + 0xe1, 0xae, 0x3a, 0x3f, 0xd0, 0xdd, 0x55, 0xe3, 0xc2, 0xfd, 0x5e, 0xdf, 0x05, 0xdd, 0x7d, 0xac, + 0xed, 0x0e, 0x80, 0xba, 0x4c, 0x76, 0x35, 0xb9, 0xa2, 0xa1, 0x29, 0x46, 0x4d, 0xe5, 0xc3, 0xa3, + 0x9a, 0x2e, 0x8a, 0x2c, 0x02, 0xd5, 0x9a, 0x96, 0x69, 0x45, 0x96, 0x86, 0xc8, 0x52, 0xe2, 0x6b, + 0x4d, 0xa5, 0x4c, 0x35, 0x5d, 0xd2, 0xdd, 0xb5, 0xa6, 0x65, 0x5a, 0xe1, 0x0e, 0x75, 0xf7, 0xfb, + 0x4d, 0x75, 0x99, 0x6c, 0xda, 0x14, 0x4d, 0x35, 0x85, 0x68, 0xfa, 0x0c, 0x58, 0x81, 0xc7, 0x3c, + 0x14, 0x90, 0xcf, 0x49, 0x4c, 0xbc, 0x40, 0x5f, 0xa5, 0xc5, 0x35, 0xcb, 0xf9, 0xfc, 0x70, 0x34, + 0x56, 0xcb, 0x1c, 0x83, 0x46, 0x88, 0x19, 0xf2, 0xe2, 0x18, 0xa9, 0xfb, 0xc9, 0x5a, 0xe6, 0xaf, + 0xe4, 0x35, 0x47, 0x5c, 0x60, 0x8e, 0xba, 0xc0, 0x9c, 0x43, 0x49, 0xd8, 0xaf, 0xe5, 0xaf, 0x85, + 0xef, 0xbf, 0x36, 0x0c, 0xd7, 0x0c, 0x31, 0xdb, 0x8b, 0x63, 0x35, 0x81, 0xaf, 0x81, 0xc9, 0x7f, + 0x9a, 0x85, 0xd9, 0xca, 0xff, 0x9b, 0x2d, 0x70, 0xe9, 0x68, 0xb0, 0x75, 0xfd, 0xc7, 0xae, 0x5c, + 0xdf, 0xda, 0xc6, 0xcd, 0xad, 0x6d, 0xfc, 0xbe, 0xb5, 0x8d, 0x6f, 0x77, 0x76, 0xe5, 0xe6, 0xce, + 0xae, 0xfc, 0xb8, 0xb3, 0x2b, 0x1f, 0xd4, 0xed, 0xeb, 0xcf, 0x72, 0xcb, 0x27, 0xff, 0x02, 0x00, + 0x00, 0xff, 0xff, 0x49, 0xb6, 0x83, 0x90, 0xaa, 0x07, 0x00, 0x00, } func (m *SeriesRequestHints) Marshal() (dAtA []byte, err error) { @@ -708,6 +720,26 @@ func (m *QueryStats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + n2, err2 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.MergeDuration, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.MergeDuration):]) + if err2 != nil { + return 0, err2 + } + i -= n2 + i = encodeVarintHints(dAtA, i, uint64(n2)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xb2 + n3, err3 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.GetAllDuration, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.GetAllDuration):]) + if err3 != nil { + return 0, err3 + } + i -= n3 + i = encodeVarintHints(dAtA, i, uint64(n3)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xaa if m.DataDownloadedSizeSum != 0 { i = encodeVarintHints(dAtA, i, uint64(m.DataDownloadedSizeSum)) i-- @@ -1008,6 +1040,10 @@ func (m *QueryStats) Size() (n int) { if m.DataDownloadedSizeSum != 0 { n += 2 + sovHints(uint64(m.DataDownloadedSizeSum)) } + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.GetAllDuration) + n += 2 + l + sovHints(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.MergeDuration) + n += 2 + l + sovHints(uint64(l)) return n } @@ -2068,6 +2104,72 @@ func (m *QueryStats) Unmarshal(dAtA []byte) error { break } } + case 21: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field GetAllDuration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHints + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthHints + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.GetAllDuration, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 22: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MergeDuration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHints + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthHints + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&m.MergeDuration, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipHints(dAtA[iNdEx:]) diff --git a/pkg/store/hintspb/hints.proto b/pkg/store/hintspb/hints.proto index aeb3ac11d1..69c60d2a7d 100644 --- a/pkg/store/hintspb/hints.proto +++ b/pkg/store/hintspb/hints.proto @@ -5,6 +5,7 @@ syntax = "proto3"; package hintspb; import "gogoproto/gogo.proto"; +import "google/protobuf/duration.proto"; import "store/storepb/types.proto"; option go_package = "hintspb"; @@ -90,4 +91,6 @@ message QueryStats { int64 chunks_fetch_count = 19; int64 data_downloaded_size_sum = 20; + google.protobuf.Duration get_all_duration = 21 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + google.protobuf.Duration merge_duration = 22 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; } \ No newline at end of file diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 244ae5592d..fd6d4c0195 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -42,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -54,7 +53,6 @@ type PrometheusStore struct { buffers sync.Pool component component.StoreAPI externalLabelsFn func() labels.Labels - labelNamesSet func() stringset.Set promVersion func() string timestamps func() (mint int64, maxt int64) @@ -81,7 +79,6 @@ func NewPrometheusStore( component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), - labelNamesSet func() stringset.Set, promVersion func() string, ) (*PrometheusStore, error) { if logger == nil { @@ -95,7 +92,6 @@ func NewPrometheusStore( externalLabelsFn: externalLabelsFn, promVersion: promVersion, timestamps: timestamps, - labelNamesSet: labelNamesSet, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) @@ -149,7 +145,8 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels) + s := newFlushableServer(seriesSrv, sortingStrategyStore) + extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 82965672c7..d0597b6e9c 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -26,7 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -72,7 +71,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) // MaxTime does not matter. testutil.Ok(t, err) @@ -234,7 +232,6 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) testutil.Ok(t, err) @@ -417,7 +414,6 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -481,7 +477,6 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -512,7 +507,6 @@ func TestPrometheusStore_Info(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 123, 456 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) @@ -592,7 +586,6 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 7ea18b134d..51631b388a 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -164,9 +164,7 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse { // tournament trees need n-1 auxiliary nodes so there // might not be much of a difference. type ProxyResponseHeap struct { - nodes []ProxyResponseHeapNode - iLblsScratch labels.Labels - jLblsScratch labels.Labels + nodes []ProxyResponseHeapNode } func (h *ProxyResponseHeap) Less(i, j int) bool { @@ -174,26 +172,10 @@ func (h *ProxyResponseHeap) Less(i, j int) bool { jResp := h.nodes[j].rs.At() if iResp.GetSeries() != nil && jResp.GetSeries() != nil { - // Response sets are sorted before adding external labels. - // This comparison excludes those labels to keep the same order. - iStoreLbls := h.nodes[i].rs.StoreLabels() - jStoreLbls := h.nodes[j].rs.StoreLabels() - iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) - copyLabels(&h.iLblsScratch, iLbls) - copyLabels(&h.jLblsScratch, jLbls) - - var iExtLbls, jExtLbls labels.Labels - h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls) - h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls) - - c := labels.Compare(h.iLblsScratch, h.jLblsScratch) - if c != 0 { - return c < 0 - } - return labels.Compare(iExtLbls, jExtLbls) < 0 + return labels.Compare(iLbls, jLbls) < 0 } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { return true } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { @@ -774,9 +756,9 @@ func newEagerRespSet( // This should be used only for stores that does not support doing this on server side. // See docs/proposals-accepted/20221129-avoid-global-sort.md for details. - if len(l.removeLabels) > 0 { - sortWithoutLabels(l.bufferedResponses, l.removeLabels) - } + // NOTE. Client is not guaranteed to give a sorted response when extLset is added + // Generally we need to resort here. + sortWithoutLabels(l.bufferedResponses, l.removeLabels) }(ret) @@ -794,34 +776,6 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels return l } -// dropLabels removes labels from the given label set and returns the removed labels. -func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) { - cutoff := len(l) - for i := 0; i < len(l); i++ { - if i == cutoff { - break - } - if _, ok := labelsToDrop[l[i].Name]; !ok { - continue - } - - lbl := l[i] - l = append(append(l[:i], l[i+1:]...), lbl) - cutoff-- - i-- - } - - return l[:cutoff], l[cutoff:] -} - -func copyLabels(dest *labels.Labels, src labels.Labels) { - if len(*dest) < cap(src) { - *dest = make([]labels.Label, len(src)) - } - *dest = (*dest)[:len(src)] - copy(*dest, src) -} - // sortWithoutLabels removes given labels from series and re-sorts the series responses that the same // series with different labels are coming right after each other. Other types of responses are moved to front. func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) { @@ -831,7 +785,9 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] continue } - ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + if len(labelsToRemove) > 0 { + ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + } } // With the re-ordered label sets, re-sorting all series aligns the same series diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index fdfec178ca..50fe2d46be 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -82,33 +82,6 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), }, }, - { - title: "merge duplicated sets that were ordered before adding external labels", - input: []respSet{ - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - }, - exp: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - }, { title: "merge repeated series in stores with different external labels", input: []respSet{ @@ -190,6 +163,37 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), }, }, + { + title: "test", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + }, } { t.Run(tcase.title, func(t *testing.T) { h := NewProxyResponseHeap(tcase.input...) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 73604b9236..b5182f3008 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,7 +13,6 @@ import ( "sync" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -26,7 +25,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) const RemoteReadFrameLimit = 1048576 @@ -46,9 +44,6 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - lmx sync.RWMutex - labelNamesSet stringset.Set - extLset labels.Labels mtx sync.RWMutex } @@ -77,7 +72,6 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI component: component, extLset: extLset, maxBytesPerFrame: RemoteReadFrameLimit, - labelNamesSet: stringset.AllStrings(), buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b @@ -175,7 +169,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv, sortingStrategyStore) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { @@ -376,38 +370,3 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: values}, nil } - -func (s *TSDBStore) UpdateLabelNames(ctx context.Context) { - newSet := stringset.New() - q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) - if err != nil { - level.Warn(s.logger).Log("msg", "error creating tsdb querier", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") - - res, _, err := q.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range res { - newSet.Insert(l) - } - s.setLabelNamesSet(newSet) -} - -func (s *TSDBStore) setLabelNamesSet(newSet stringset.Set) { - s.lmx.Lock() - s.labelNamesSet = newSet - s.lmx.Unlock() -} - -func (b *TSDBStore) LabelNamesSet() stringset.Set { - b.lmx.RLock() - defer b.lmx.RUnlock() - - return b.labelNamesSet -} diff --git a/pkg/stringset/set.go b/pkg/stringset/set.go deleted file mode 100644 index 080071570f..0000000000 --- a/pkg/stringset/set.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package stringset - -import ( - cuckoo "github.com/seiflotfy/cuckoofilter" -) - -type Set interface { - Has(string) bool - HasAny([]string) bool - // Count returns the number of elements in the set. - // A value of -1 indicates infinite size and can be returned by a - // set representing all possible string values. - Count() int -} - -type fixedSet struct { - cuckoo *cuckoo.Filter -} - -func (f fixedSet) HasAny(strings []string) bool { - for _, s := range strings { - if f.Has(s) { - return true - } - } - return false -} - -func NewFromStrings(items ...string) Set { - f := cuckoo.NewFilter(uint(len(items))) - for _, label := range items { - f.InsertUnique([]byte(label)) - } - - return &fixedSet{cuckoo: f} -} - -func (f fixedSet) Has(s string) bool { - return f.cuckoo.Lookup([]byte(s)) -} - -func (f fixedSet) Count() int { - return int(f.cuckoo.Count()) -} - -type mutableSet struct { - cuckoo *cuckoo.ScalableCuckooFilter -} - -type MutableSet interface { - Set - Insert(string) -} - -func New() MutableSet { - return &mutableSet{ - cuckoo: cuckoo.NewScalableCuckooFilter(), - } -} - -func (e mutableSet) Insert(s string) { - e.cuckoo.InsertUnique([]byte(s)) -} - -func (e mutableSet) Has(s string) bool { - return e.cuckoo.Lookup([]byte(s)) -} - -func (e mutableSet) HasAny(strings []string) bool { - for _, s := range strings { - if e.Has(s) { - return true - } - } - return false -} - -func (e mutableSet) Count() int { - return int(e.cuckoo.Count()) -} - -type allStringsSet struct{} - -func (e allStringsSet) HasAny(_ []string) bool { - return true -} - -func AllStrings() *allStringsSet { - return &allStringsSet{} -} - -func (e allStringsSet) Has(_ string) bool { - return true -} - -func (e allStringsSet) Count() int { - return -1 -} diff --git a/website/data/adopters.yml b/website/data/adopters.yml index f56890c677..c9cd978eba 100644 --- a/website/data/adopters.yml +++ b/website/data/adopters.yml @@ -228,3 +228,6 @@ adopters: - name: Zenduty url: https://www.zenduty.com logo: zenduty.png +- name: Banco do Brasil + url: https://www.bb.com.br/ + logo: bb.png \ No newline at end of file diff --git a/website/static/logos/bb.png b/website/static/logos/bb.png new file mode 100644 index 0000000000..fc72986f23 Binary files /dev/null and b/website/static/logos/bb.png differ