diff --git a/CHANGELOG.md b/CHANGELOG.md index 5510db5500..e92927842d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,22 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.) +## [v0.37.1](https://github.com/thanos-io/thanos/tree/release-0.37) - 03.12.2024 + +### Fixed + +- [#7674](https://github.com/thanos-io/thanos/pull/7674) Query-frontend: Fix connection to Redis cluster with TLS. +- [#7945](https://github.com/thanos-io/thanos/pull/7945) Receive: Capnproto - use segment from existing message. +- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892). +- [#7954](https://github.com/thanos-io/thanos/pull/7954) Sidecar: Ensure limit param is positive for compatibility with older Prometheus. +- [#7953](https://github.com/thanos-io/thanos/pull/7953) Query: Update promql-engine for subquery avg fix. + +### Added + +### Changed + +### Removed + ## [v0.37.0](https://github.com/thanos-io/thanos/tree/release-0.37) - 25.11.2024 ### Fixed diff --git a/VERSION b/VERSION index 0f1a7dfc7c..9b1bb85123 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.37.0 +0.37.1 diff --git a/docs/sharding.md b/docs/sharding.md index 9cfa0fbf8a..f943ec071b 100644 --- a/docs/sharding.md +++ b/docs/sharding.md @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt # Relabelling -Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. +Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. Currently, thanos only supports the following relabel actions: diff --git a/go.mod b/go.mod index 309e0f6432..9a725cb167 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/sony/gobreaker v0.5.0 github.com/stretchr/testify v1.9.0 github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 - github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a + github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible // indirect github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a @@ -112,7 +112,7 @@ require ( ) require ( - capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af + capnproto.org/go/capnp/v3 v3.0.0-alpha.30 github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/hashicorp/golang-lru/v2 v2.0.7 @@ -133,7 +133,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect github.com/cilium/ebpf v0.11.0 // indirect - github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/go-licenser v0.3.1 // indirect @@ -167,6 +166,7 @@ require ( k8s.io/client-go v0.31.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect + zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 // indirect ) require ( diff --git a/go.sum b/go.sum index 9250fff8b8..9933abda40 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0= -capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc= +capnproto.org/go/capnp/v3 v3.0.0-alpha.30 h1:iABQan/YiHFCgSXym5aNj27osapnEgAk4WaWYqb4sQM= +capnproto.org/go/capnp/v3 v3.0.0-alpha.30/go.mod h1:+ysMHvOh1EWNOyorxJWs1omhRFiDoKxKkWQACp54jKM= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= @@ -1500,8 +1500,6 @@ github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= -github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU= -github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0= github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0= github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0= github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -2106,8 +2104,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI= github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= -github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= -github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= +github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= @@ -2259,14 +2257,12 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0= github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= -github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a h1:BhWU58VHOxkxQEMByih9fM2WwuwCGtk5AulIcSRSr0A= -github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= +github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 h1:cChM/FbpXeYmrSmXO1/MmmSlONviLVxWAWCB0/g4JrY= +github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= -github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= -github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= -github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= -github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= +github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= @@ -3337,3 +3333,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 h1:yksDCGMVzyn3vlyf0GZ3huiF5FFaMGQpQ3UJvR0EoGA= +zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE= diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index eeca44db92..b655ea1ab1 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -787,7 +787,9 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la q.Add("match[]", storepb.PromMatchersToString(matchers...)) q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit > 0 { + q.Add("limit", strconv.Itoa(limit)) + } u.RawQuery = q.Encode() var m struct { @@ -809,7 +811,9 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [ } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit > 0 { + q.Add("limit", strconv.Itoa(limit)) + } u.RawQuery = q.Encode() var m struct { @@ -830,7 +834,9 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str } q.Add("start", formatTime(timestamp.Time(startTime))) q.Add("end", formatTime(timestamp.Time(endTime))) - q.Add("limit", strconv.Itoa(limit)) + if limit > 0 { + q.Add("limit", strconv.Itoa(limit)) + } u.RawQuery = q.Encode() var m struct { @@ -898,7 +904,6 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric if metric != "" { q.Add("metric", metric) } - // We only set limit when it is >= 0. if limit >= 0 { q.Add("limit", strconv.Itoa(limit)) } diff --git a/pkg/queryfrontend/config.go b/pkg/queryfrontend/config.go index 437f0cf9ad..aabb841405 100644 --- a/pkg/queryfrontend/config.go +++ b/pkg/queryfrontend/config.go @@ -162,13 +162,15 @@ func NewCacheConfig(logger log.Logger, confContentYaml []byte) (*cortexcache.Con } return &cortexcache.Config{ Redis: cortexcache.RedisConfig{ - Endpoint: config.Redis.Addr, - Timeout: config.Redis.ReadTimeout, - MasterName: config.Redis.MasterName, - Expiration: config.Expiration, - DB: config.Redis.DB, - Password: flagext.Secret{Value: config.Redis.Password}, - Username: config.Redis.Username, + Endpoint: config.Redis.Addr, + Timeout: config.Redis.ReadTimeout, + MasterName: config.Redis.MasterName, + Expiration: config.Expiration, + DB: config.Redis.DB, + Password: flagext.Secret{Value: config.Redis.Password}, + Username: config.Redis.Username, + EnableTLS: config.Redis.TLSEnabled, + InsecureSkipVerify: config.Redis.TLSConfig.InsecureSkipVerify, }, Background: cortexcache.BackgroundConfig{ WriteBackBuffer: config.Redis.MaxSetMultiConcurrency * config.Redis.SetMultiBatchSize, diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 26e6284b73..9c9954d1bd 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -62,11 +62,8 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig - tsdbClients []store.Client - tsdbClientsNeedUpdate bool - - exemplarClients map[string]*exemplars.TSDB - exemplarClientsNeedUpdate bool + tsdbClients []store.Client + exemplarClients map[string]*exemplars.TSDB metricNameFilterEnabled bool } @@ -100,19 +97,19 @@ func NewMultiTSDB( } mt := &MultiTSDB{ - dataDir: dataDir, - logger: log.With(l, "component", "multi-tsdb"), - reg: reg, - tsdbOpts: tsdbOpts, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tsdbClientsNeedUpdate: true, - exemplarClientsNeedUpdate: true, - tenantLabelName: tenantLabelName, - bucket: bucket, - allowOutOfOrderUpload: allowOutOfOrderUpload, - hashFunc: hashFunc, + dataDir: dataDir, + logger: log.With(l, "component", "multi-tsdb"), + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tsdbClients: make([]store.Client, 0), + exemplarClients: map[string]*exemplars.TSDB{}, + tenantLabelName: tenantLabelName, + bucket: bucket, + allowOutOfOrderUpload: allowOutOfOrderUpload, + hashFunc: hashFunc, } for _, option := range options { @@ -122,6 +119,49 @@ func NewMultiTSDB( return mt } +// testGetTenant returns the tenant with the given tenantID for testing purposes. +func (t *MultiTSDB) testGetTenant(tenantID string) *tenant { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.tenants[tenantID] +} + +func (t *MultiTSDB) updateTSDBClients() { + t.tsdbClients = t.tsdbClients[:0] + for _, tenant := range t.tenants { + client := tenant.client() + if client != nil { + t.tsdbClients = append(t.tsdbClients, client) + } + } +} + +func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) { + t.tenants[tenantID] = newTenant + t.updateTSDBClients() + if newTenant.exemplars() != nil { + t.exemplarClients[tenantID] = newTenant.exemplars() + } +} + +func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.addTenantUnlocked(tenantID, newTenant) +} + +func (t *MultiTSDB) removeTenantUnlocked(tenantID string) { + delete(t.tenants, tenantID) + delete(t.exemplarClients, tenantID) + t.updateTSDBClients() +} + +func (t *MultiTSDB) removeTenantLocked(tenantID string) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.removeTenantUnlocked(tenantID) +} + type localClient struct { store *store.TSDBStore @@ -416,9 +456,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { } level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) - delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true + t.removeTenantUnlocked(tenantID) } return merr.Err() @@ -578,58 +616,17 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { return merr.Err() } +// TSDBLocalClients should be used as read-only. func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() - if !t.tsdbClientsNeedUpdate { - t.mtx.RUnlock() - return t.tsdbClients - } - - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - if !t.tsdbClientsNeedUpdate { - return t.tsdbClients - } - - res := make([]store.Client, 0, len(t.tenants)) - for _, tenant := range t.tenants { - client := tenant.client() - if client != nil { - res = append(res, client) - } - } - - t.tsdbClientsNeedUpdate = false - t.tsdbClients = res - + defer t.mtx.RUnlock() return t.tsdbClients } +// TSDBExemplars should be used as read-only. func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() - if !t.exemplarClientsNeedUpdate { - t.mtx.RUnlock() - return t.exemplarClients - } - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - - if !t.exemplarClientsNeedUpdate { - return t.exemplarClients - } - - res := make(map[string]*exemplars.TSDB, len(t.tenants)) - for k, tenant := range t.tenants { - e := tenant.exemplars() - if e != nil { - res[k] = e - } - } - - t.exemplarClientsNeedUpdate = false - t.exemplarClients = res + defer t.mtx.RUnlock() return t.exemplarClients } @@ -705,11 +702,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant nil, ) if err != nil { - t.mtx.Lock() - delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true - t.mtx.Unlock() + t.removeTenantLocked(tenantID) return err } var ship *shipper.Shipper @@ -732,6 +725,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant options = append(options, store.WithCuckooMetricNameStoreFilter()) } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) + t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") return nil } @@ -760,9 +754,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan } tenant = newTenant() - t.tenants[tenantID] = tenant - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true + t.addTenantUnlocked(tenantID, tenant) t.mtx.Unlock() logger := log.With(t.logger, "tenant", tenantID) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 4bee9c0514..a36db4b402 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -5,6 +5,7 @@ package receive import ( "context" + "fmt" "io" "math" "os" @@ -193,7 +194,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Open()) testutil.Ok(t, appendSample(m, testTenant, time.Now())) - tenant := m.tenants[testTenant] + tenant := m.testGetTenant(testTenant) db := tenant.readyStorage().Get() testutil.Equals(t, 0, len(db.Blocks())) @@ -541,6 +542,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { testutil.Equals(t, 1, len(m.TSDBLocalClients())) } +func TestMultiTSDBAddNewTenant(t *testing.T) { + t.Parallel() + const iterations = 10 + // This test detects race conditions, so we run it multiple times to increase the chance of catching the issue. + for i := 0; i < iterations; i++ { + t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) { + dir := t.TempDir() + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + objstore.NewInMemBucket(), + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(t, m.Close()) }() + + concurrency := 50 + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + // simulate remote write with new tenant concurrently + go func(i int) { + defer wg.Done() + testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10)))) + }(i) + // simulate read request concurrently + go func() { + m.TSDBLocalClients() + }() + } + wg.Wait() + testutil.Equals(t, concurrency, len(m.TSDBLocalClients())) + }) + } +} + func TestAlignedHeadFlush(t *testing.T) { t.Parallel() @@ -801,7 +843,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim func queryLabelValues(ctx context.Context, m *MultiTSDB) error { proxy := store.NewProxyStore(nil, nil, func() []store.Client { - clients := m.TSDBLocalClients() + m.mtx.Lock() + defer m.mtx.Unlock() + clients := make([]store.Client, len(m.tsdbClients)) + copy(clients, m.tsdbClients) if len(clients) > 0 { clients[0] = &slowClient{clients[0]} } diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 7e90c3c204..bf38cb06ed 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { for _, c := range tc.cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range changedConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { for _, c := range cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } diff --git a/pkg/receive/writecapnp/client.go b/pkg/receive/writecapnp/client.go index 0a20d90d44..3cd9f2d082 100644 --- a/pkg/receive/writecapnp/client.go +++ b/pkg/receive/writecapnp/client.go @@ -69,22 +69,9 @@ func (r *RemoteWriteClient) writeWithReconnect(ctx context.Context, numReconnect if err := r.connect(ctx); err != nil { return nil, err } - arena := capnp.SingleSegment(nil) - defer arena.Release() result, release := r.writer.Write(ctx, func(params Writer_write_Params) error { - _, seg, err := capnp.NewMessage(arena) - if err != nil { - return err - } - wr, err := NewRootWriteRequest(seg) - if err != nil { - return err - } - if err := params.SetWr(wr); err != nil { - return err - } - wr, err = params.Wr() + wr, err := params.NewWr() if err != nil { return err } diff --git a/pkg/receive/writecapnp/marshal.go b/pkg/receive/writecapnp/marshal.go index 2d42d60b84..efc1a8ef03 100644 --- a/pkg/receive/writecapnp/marshal.go +++ b/pkg/receive/writecapnp/marshal.go @@ -6,6 +6,8 @@ package writecapnp import ( "capnproto.org/go/capnp/v3" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -46,7 +48,7 @@ func Build(tenant string, tsreq []prompb.TimeSeries) (WriteRequest, error) { func BuildInto(wr WriteRequest, tenant string, tsreq []prompb.TimeSeries) error { if err := wr.SetTenant(tenant); err != nil { - return err + return errors.Wrap(err, "set tenant") } series, err := wr.NewTimeSeries(int32(len(tsreq))) @@ -59,27 +61,30 @@ func BuildInto(wr WriteRequest, tenant string, tsreq []prompb.TimeSeries) error lblsc, err := tsc.NewLabels(int32(len(ts.Labels))) if err != nil { - return err + return errors.Wrap(err, "new labels") } if err := marshalLabels(lblsc, ts.Labels, builder); err != nil { - return err + return errors.Wrap(err, "marshal labels") } if err := marshalSamples(tsc, ts.Samples); err != nil { - return err + return errors.Wrap(err, "marshal samples") } if err := marshalHistograms(tsc, ts.Histograms); err != nil { - return err + return errors.Wrap(err, "marshal histograms") } if err := marshalExemplars(tsc, ts.Exemplars, builder); err != nil { - return err + return errors.Wrap(err, "marshal exemplars") } } symbols, err := wr.NewSymbols() if err != nil { - return err + return errors.Wrap(err, "new symbols") } - return marshalSymbols(builder, symbols) + if err := marshalSymbols(builder, symbols); err != nil { + return errors.Wrap(err, "marshal symbols") + } + return nil } func marshalSymbols(builder *symbolsBuilder, symbols Symbols) error {