From 900e96ecf01756658c2d8f831f37f8334f6d4b41 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 28 Dec 2023 05:54:40 -0800 Subject: [PATCH] update commit Signed-off-by: Ben Ye --- go.mod | 2 +- go.sum | 4 +- .../block/indexheader/lazy_binary_reader.go | 7 +- .../pkg/block/indexheader/reader_pool.go | 46 ++++++++- .../thanos/pkg/clientconfig/config.go | 99 +++++++++++++++++++ .../thanos-io/thanos/pkg/clientconfig/grpc.go | 8 ++ .../pkg/{httpconfig => clientconfig}/http.go | 57 ++++++----- .../thanos-io/thanos/pkg/httpconfig/config.go | 75 -------------- .../thanos/pkg/promclient/promclient.go | 4 +- .../thanos-io/thanos/pkg/store/bucket.go | 44 ++++++--- .../thanos-io/thanos/pkg/store/prometheus.go | 4 +- .../thanos/pkg/store/storepb/prompb/custom.go | 12 +++ vendor/modules.txt | 4 +- 13 files changed, 238 insertions(+), 128 deletions(-) create mode 100644 vendor/github.com/thanos-io/thanos/pkg/clientconfig/config.go create mode 100644 vendor/github.com/thanos-io/thanos/pkg/clientconfig/grpc.go rename vendor/github.com/thanos-io/thanos/pkg/{httpconfig => clientconfig}/http.go (87%) delete mode 100644 vendor/github.com/thanos-io/thanos/pkg/httpconfig/config.go diff --git a/go.mod b/go.mod index 929e564accd..fe3ac4cfd2e 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb github.com/thanos-io/promql-engine v0.0.0-20231214130043-41b2cf818e81 - github.com/thanos-io/thanos v0.32.5-0.20231214182650-88f7119f2166 + github.com/thanos-io/thanos v0.33.1-0.20231224215600-665e64370a2c github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d go.etcd.io/etcd/api/v3 v3.5.10 diff --git a/go.sum b/go.sum index 9991bf2a110..78bbef6e37e 100644 --- a/go.sum +++ b/go.sum @@ -1512,8 +1512,8 @@ github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb h1:3s/a99MWpt5Z github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb/go.mod h1:RMvJQnpB4QQiYGg1gF8mnPJg6IkIPY28Buh8f6b+F0c= github.com/thanos-io/promql-engine v0.0.0-20231214130043-41b2cf818e81 h1:EdFfVjUhwfj6JRjuZf+EchsxBD+60T6X0rPbzhraJj4= github.com/thanos-io/promql-engine v0.0.0-20231214130043-41b2cf818e81/go.mod h1:uzl2mg4OyB9A54Hhrk/wViZiZoHT2o2qq+NGQkEmfzs= -github.com/thanos-io/thanos v0.32.5-0.20231214182650-88f7119f2166 h1:gYfXsT+TKoHLcH6xqKWgaF62qQIpjBxJ67d4JSUuXvc= -github.com/thanos-io/thanos v0.32.5-0.20231214182650-88f7119f2166/go.mod h1:P7euwwXd8qA71hRJj/EiE4rDC3354FAW2wB2/qW4agA= +github.com/thanos-io/thanos v0.33.1-0.20231224215600-665e64370a2c h1:4Ftcc3CJL6puX5FQUvFzhjhM6xF6NdlEZPrxMzeZ6vY= +github.com/thanos-io/thanos v0.33.1-0.20231224215600-665e64370a2c/go.mod h1:P7euwwXd8qA71hRJj/EiE4rDC3354FAW2wB2/qW4agA= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go index d7e589c724f..2b36bf80259 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go @@ -83,6 +83,9 @@ type LazyBinaryReader struct { // Keep track of the last time it was used. usedAt *atomic.Int64 + + // If true, index header will be downloaded at query time rather than initialization time. + lazyDownload bool } // NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist @@ -99,8 +102,9 @@ func NewLazyBinaryReader( metrics *LazyBinaryReaderMetrics, binaryReaderMetrics *BinaryReaderMetrics, onClosed func(*LazyBinaryReader), + lazyDownload bool, ) (*LazyBinaryReader, error) { - if dir != "" { + if dir != "" && !lazyDownload { indexHeaderFile := filepath.Join(dir, id.String(), block.IndexHeaderFilename) // If the index-header doesn't exist we should download it. if _, err := os.Stat(indexHeaderFile); err != nil { @@ -131,6 +135,7 @@ func NewLazyBinaryReader( binaryReaderMetrics: binaryReaderMetrics, usedAt: atomic.NewInt64(time.Now().UnixNano()), onClosed: onClosed, + lazyDownload: lazyDownload, }, nil } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/reader_pool.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/reader_pool.go index fc8cb268139..e9fe5eb7dca 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/reader_pool.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/reader_pool.go @@ -14,6 +14,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + + "github.com/thanos-io/thanos/pkg/block/metadata" ) // ReaderPoolMetrics holds metrics tracked by ReaderPool. @@ -46,10 +48,47 @@ type ReaderPool struct { // Keep track of all readers managed by the pool. lazyReadersMx sync.Mutex lazyReaders map[*LazyBinaryReader]struct{} + + lazyDownloadFunc LazyDownloadIndexHeaderFunc +} + +// IndexHeaderLazyDownloadStrategy specifies how to download index headers +// lazily. Only used when lazy mmap is enabled. +type IndexHeaderLazyDownloadStrategy string + +const ( + // EagerDownloadStrategy always disables lazy downloading index headers. + EagerDownloadStrategy IndexHeaderLazyDownloadStrategy = "eager" + // LazyDownloadStrategy always lazily download index headers. + LazyDownloadStrategy IndexHeaderLazyDownloadStrategy = "lazy" +) + +func (s IndexHeaderLazyDownloadStrategy) StrategyToDownloadFunc() LazyDownloadIndexHeaderFunc { + switch s { + case LazyDownloadStrategy: + return AlwaysLazyDownloadIndexHeader + default: + // Always fallback to eager download index header. + return AlwaysEagerDownloadIndexHeader + } +} + +// LazyDownloadIndexHeaderFunc is used to determinte whether to download the index header lazily +// or not by checking its block metadata. Usecase can be by time or by index file size. +type LazyDownloadIndexHeaderFunc func(meta *metadata.Meta) bool + +// AlwaysEagerDownloadIndexHeader always eagerly download index header. +func AlwaysEagerDownloadIndexHeader(meta *metadata.Meta) bool { + return false +} + +// AlwaysLazyDownloadIndexHeader always lazily download index header. +func AlwaysLazyDownloadIndexHeader(meta *metadata.Meta) bool { + return true } // NewReaderPool makes a new ReaderPool. -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics) *ReaderPool { +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics, lazyDownloadFunc LazyDownloadIndexHeaderFunc) *ReaderPool { p := &ReaderPool{ logger: logger, metrics: metrics, @@ -57,6 +96,7 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime lazyReaderIdleTimeout: lazyReaderIdleTimeout, lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), + lazyDownloadFunc: lazyDownloadFunc, } // Start a goroutine to close idle readers (only if required). @@ -81,12 +121,12 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) { var reader Reader var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta)) } else { reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/clientconfig/config.go b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/config.go new file mode 100644 index 00000000000..9de1b4f5808 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/config.go @@ -0,0 +1,99 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Package clientconfig is a wrapper around github.com/prometheus/common/config with additional +// support for gRPC clients. +package clientconfig + +import ( + "fmt" + "net/url" + "strings" + + "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +// Config is a structure that allows pointing to various HTTP and GRPC endpoints, e.g. ruler connecting to queriers. +type Config struct { + HTTPConfig HTTPConfig `yaml:",inline"` + GRPCConfig *GRPCConfig `yaml:"grpc_config"` +} + +func DefaultConfig() Config { + return Config{ + HTTPConfig: HTTPConfig{ + EndpointsConfig: HTTPEndpointsConfig{ + Scheme: "http", + StaticAddresses: []string{}, + FileSDConfigs: []HTTPFileSDConfig{}, + }, + }, + GRPCConfig: &GRPCConfig{ + EndpointAddrs: []string{}, + }, + } +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultConfig() + type plain Config + return unmarshal((*plain)(c)) +} + +// LoadConfigs loads a list of Config from YAML data. +func LoadConfigs(confYAML []byte) ([]Config, error) { + var clientCfg []Config + if err := yaml.UnmarshalStrict(confYAML, &clientCfg); err != nil { + return nil, err + } + return clientCfg, nil +} + +// BuildConfigFromHTTPAddresses returns a configuration from static addresses. +func BuildConfigFromHTTPAddresses(addrs []string) ([]Config, error) { + configs := make([]Config, 0, len(addrs)) + for i, addr := range addrs { + if addr == "" { + return nil, errors.Errorf("static address cannot be empty at index %d", i) + } + // If addr is missing schema, add http. + if !strings.Contains(addr, "://") { + addr = fmt.Sprintf("http://%s", addr) + } + u, err := url.Parse(addr) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse addr %q", addr) + } + if u.Scheme != "http" && u.Scheme != "https" { + return nil, errors.Errorf("%q is not supported scheme for address", u.Scheme) + } + configs = append(configs, Config{ + HTTPConfig: HTTPConfig{ + EndpointsConfig: HTTPEndpointsConfig{ + Scheme: u.Scheme, + StaticAddresses: []string{u.Host}, + PathPrefix: u.Path, + }, + }, + }) + } + return configs, nil +} + +// BuildConfigFromGRPCAddresses returns a configuration from a static addresses. +func BuildConfigFromGRPCAddresses(addrs []string) ([]Config, error) { + configs := make([]Config, 0, len(addrs)) + for i, addr := range addrs { + if addr == "" { + return nil, errors.Errorf("static address cannot be empty at index %d", i) + } + configs = append(configs, Config{ + GRPCConfig: &GRPCConfig{ + EndpointAddrs: []string{addr}, + }, + }) + } + return configs, nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/clientconfig/grpc.go b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/grpc.go new file mode 100644 index 00000000000..e0987b6c8a9 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/grpc.go @@ -0,0 +1,8 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package clientconfig + +type GRPCConfig struct { + EndpointAddrs []string `yaml:"endpoint_addresses"` +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/httpconfig/http.go b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/http.go similarity index 87% rename from vendor/github.com/thanos-io/thanos/pkg/httpconfig/http.go rename to vendor/github.com/thanos-io/thanos/pkg/clientconfig/http.go index 6cc54de4937..b99dd9ef8f9 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/httpconfig/http.go +++ b/vendor/github.com/thanos-io/thanos/pkg/clientconfig/http.go @@ -1,8 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -// Package httpconfig is a wrapper around github.com/prometheus/common/config. -package httpconfig +package clientconfig import ( "context" @@ -32,8 +31,18 @@ import ( "github.com/thanos-io/thanos/pkg/discovery/cache" ) -// ClientConfig configures an HTTP client. -type ClientConfig struct { +// HTTPConfig is a structure that allows pointing to various HTTP endpoint, e.g ruler connecting to queriers. +type HTTPConfig struct { + HTTPClientConfig HTTPClientConfig `yaml:"http_config"` + EndpointsConfig HTTPEndpointsConfig `yaml:",inline"` +} + +func (c *HTTPConfig) NotEmpty() bool { + return len(c.EndpointsConfig.FileSDConfigs) > 0 || len(c.EndpointsConfig.StaticAddresses) > 0 +} + +// HTTPClientConfig configures an HTTP client. +type HTTPClientConfig struct { // The HTTP basic authentication credentials for the targets. BasicAuth BasicAuth `yaml:"basic_auth"` // The bearer token for the targets. @@ -77,7 +86,7 @@ func (b BasicAuth) IsZero() bool { return b.Username == "" && b.Password == "" && b.PasswordFile == "" } -// Transport configures client's transport properties. +// TransportConfig configures client's transport properties. type TransportConfig struct { MaxIdleConns int `yaml:"max_idle_conns"` MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` @@ -102,12 +111,12 @@ var defaultTransportConfig TransportConfig = TransportConfig{ DialerTimeout: int64(5 * time.Second), } -func NewDefaultClientConfig() ClientConfig { - return ClientConfig{TransportConfig: defaultTransportConfig} +func NewDefaultHTTPClientConfig() HTTPClientConfig { + return HTTPClientConfig{TransportConfig: defaultTransportConfig} } -func NewClientConfigFromYAML(cfg []byte) (*ClientConfig, error) { - conf := &ClientConfig{TransportConfig: defaultTransportConfig} +func NewHTTPClientConfigFromYAML(cfg []byte) (*HTTPClientConfig, error) { + conf := &HTTPClientConfig{TransportConfig: defaultTransportConfig} if err := yaml.Unmarshal(cfg, conf); err != nil { return nil, err } @@ -190,7 +199,7 @@ func NewRoundTripperFromConfig(cfg config_util.HTTPClientConfig, transportConfig } // NewHTTPClient returns a new HTTP client. -func NewHTTPClient(cfg ClientConfig, name string) (*http.Client, error) { +func NewHTTPClient(cfg HTTPClientConfig, name string) (*http.Client, error) { httpClientConfig := config_util.HTTPClientConfig{ BearerToken: config_util.Secret(cfg.BearerToken), BearerTokenFile: cfg.BearerTokenFile, @@ -274,13 +283,13 @@ func (u userAgentRoundTripper) RoundTrip(r *http.Request) (*http.Response, error return u.rt.RoundTrip(r) } -// EndpointsConfig configures a cluster of HTTP endpoints from static addresses and +// HTTPEndpointsConfig configures a cluster of HTTP endpoints from static addresses and // file service discovery. -type EndpointsConfig struct { +type HTTPEndpointsConfig struct { // List of addresses with DNS prefixes. StaticAddresses []string `yaml:"static_configs"` // List of file configurations (our FileSD supports different DNS lookups). - FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` + FileSDConfigs []HTTPFileSDConfig `yaml:"file_sd_configs"` // The URL scheme to use when talking to targets. Scheme string `yaml:"scheme"` @@ -289,13 +298,13 @@ type EndpointsConfig struct { PathPrefix string `yaml:"path_prefix"` } -// FileSDConfig represents a file service discovery configuration. -type FileSDConfig struct { +// HTTPFileSDConfig represents a file service discovery configuration. +type HTTPFileSDConfig struct { Files []string `yaml:"files"` RefreshInterval model.Duration `yaml:"refresh_interval"` } -func (c FileSDConfig) convert() (file.SDConfig, error) { +func (c HTTPFileSDConfig) convert() (file.SDConfig, error) { var fileSDConfig file.SDConfig b, err := yaml.Marshal(c) if err != nil { @@ -310,8 +319,8 @@ type AddressProvider interface { Addresses() []string } -// Client represents a client that can send requests to a cluster of HTTP-based endpoints. -type Client struct { +// HTTPClient represents a client that can send requests to a cluster of HTTP-based endpoints. +type HTTPClient struct { logger log.Logger httpClient *http.Client @@ -326,7 +335,7 @@ type Client struct { } // NewClient returns a new Client. -func NewClient(logger log.Logger, cfg EndpointsConfig, client *http.Client, provider AddressProvider) (*Client, error) { +func NewClient(logger log.Logger, cfg HTTPEndpointsConfig, client *http.Client, provider AddressProvider) (*HTTPClient, error) { if logger == nil { logger = log.NewNopLogger() } @@ -344,7 +353,7 @@ func NewClient(logger log.Logger, cfg EndpointsConfig, client *http.Client, prov } discoverers = append(discoverers, discovery) } - return &Client{ + return &HTTPClient{ logger: logger, httpClient: client, scheme: cfg.Scheme, @@ -357,12 +366,12 @@ func NewClient(logger log.Logger, cfg EndpointsConfig, client *http.Client, prov } // Do executes an HTTP request with the underlying HTTP client. -func (c *Client) Do(req *http.Request) (*http.Response, error) { +func (c *HTTPClient) Do(req *http.Request) (*http.Response, error) { return c.httpClient.Do(req) } // Endpoints returns the list of known endpoints. -func (c *Client) Endpoints() []*url.URL { +func (c *HTTPClient) Endpoints() []*url.URL { var urls []*url.URL for _, addr := range c.provider.Addresses() { urls = append(urls, @@ -377,7 +386,7 @@ func (c *Client) Endpoints() []*url.URL { } // Discover runs the service to discover endpoints until the given context is done. -func (c *Client) Discover(ctx context.Context) { +func (c *HTTPClient) Discover(ctx context.Context) { var wg sync.WaitGroup ch := make(chan []*targetgroup.Group) @@ -407,6 +416,6 @@ func (c *Client) Discover(ctx context.Context) { } // Resolve refreshes and resolves the list of targets. -func (c *Client) Resolve(ctx context.Context) error { +func (c *HTTPClient) Resolve(ctx context.Context) error { return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...)) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/httpconfig/config.go b/vendor/github.com/thanos-io/thanos/pkg/httpconfig/config.go deleted file mode 100644 index 3280e333782..00000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/httpconfig/config.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package httpconfig - -import ( - "fmt" - "net/url" - "strings" - - "gopkg.in/yaml.v2" - - "github.com/pkg/errors" -) - -// Config is a structure that allows pointing to various HTTP endpoint, e.g ruler connecting to queriers. -type Config struct { - HTTPClientConfig ClientConfig `yaml:"http_config"` - EndpointsConfig EndpointsConfig `yaml:",inline"` -} - -func DefaultConfig() Config { - return Config{ - EndpointsConfig: EndpointsConfig{ - Scheme: "http", - StaticAddresses: []string{}, - FileSDConfigs: []FileSDConfig{}, - }, - } -} - -// UnmarshalYAML implements the yaml.Unmarshaler interface. -func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { - *c = DefaultConfig() - type plain Config - return unmarshal((*plain)(c)) -} - -// LoadConfigs loads a list of Config from YAML data. -func LoadConfigs(confYAML []byte) ([]Config, error) { - var queryCfg []Config - if err := yaml.UnmarshalStrict(confYAML, &queryCfg); err != nil { - return nil, err - } - return queryCfg, nil -} - -// BuildConfig returns a configuration from a static addresses. -func BuildConfig(addrs []string) ([]Config, error) { - configs := make([]Config, 0, len(addrs)) - for i, addr := range addrs { - if addr == "" { - return nil, errors.Errorf("static address cannot be empty at index %d", i) - } - // If addr is missing schema, add http. - if !strings.Contains(addr, "://") { - addr = fmt.Sprintf("http://%s", addr) - } - u, err := url.Parse(addr) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse addr %q", addr) - } - if u.Scheme != "http" && u.Scheme != "https" { - return nil, errors.Errorf("%q is not supported scheme for address", u.Scheme) - } - configs = append(configs, Config{ - EndpointsConfig: EndpointsConfig{ - Scheme: u.Scheme, - StaticAddresses: []string{u.Host}, - PathPrefix: u.Path, - }, - }) - } - return configs, nil -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go index 16c292d2f86..30cf85af408 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go +++ b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go @@ -33,8 +33,8 @@ import ( "google.golang.org/grpc/codes" "gopkg.in/yaml.v2" + "github.com/thanos-io/thanos/pkg/clientconfig" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" - "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" @@ -85,7 +85,7 @@ func NewClient(c HTTPClient, logger log.Logger, userAgent string) *Client { // NewDefaultClient returns Client with tracing tripperware. func NewDefaultClient() *Client { - client, _ := httpconfig.NewHTTPClient(httpconfig.ClientConfig{}, "") + client, _ := clientconfig.NewHTTPClient(clientconfig.HTTPClientConfig{}, "") return NewWithTracingClient( log.NewNopLogger(), client, diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 3ebd6f06a47..fd4fb7392c4 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -413,6 +413,8 @@ type BucketStore struct { blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator + + indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc } func (s *BucketStore) validate() error { @@ -531,6 +533,14 @@ func WithDontResort(true bool) BucketStoreOption { } } +// WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header. +// Only used when lazy mmap is enabled at the same time. +func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { + return func(s *BucketStore) { + s.indexHeaderLazyDownloadStrategy = strategy + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -559,21 +569,22 @@ func NewBucketStore( b := make([]byte, 0, initialBufSize) return &b }}, - chunkPool: pool.NoopBytes{}, - blocks: map[ulid.ULID]*bucketBlock{}, - blockSets: map[uint64]*bucketBlockSet{}, - blockSyncConcurrency: blockSyncConcurrency, - queryGate: gate.NewNoop(), - chunksLimiterFactory: chunksLimiterFactory, - seriesLimiterFactory: seriesLimiterFactory, - bytesLimiterFactory: bytesLimiterFactory, - partitioner: partitioner, - enableCompatibilityLabel: enableCompatibilityLabel, - postingOffsetsInMemSampling: postingOffsetsInMemSampling, - enableSeriesResponseHints: enableSeriesResponseHints, - enableChunkHashCalculation: enableChunkHashCalculation, - seriesBatchSize: SeriesBatchSize, - sortingStrategy: sortingStrategyStore, + chunkPool: pool.NoopBytes{}, + blocks: map[ulid.ULID]*bucketBlock{}, + blockSets: map[uint64]*bucketBlockSet{}, + blockSyncConcurrency: blockSyncConcurrency, + queryGate: gate.NewNoop(), + chunksLimiterFactory: chunksLimiterFactory, + seriesLimiterFactory: seriesLimiterFactory, + bytesLimiterFactory: bytesLimiterFactory, + partitioner: partitioner, + enableCompatibilityLabel: enableCompatibilityLabel, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + enableSeriesResponseHints: enableSeriesResponseHints, + enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: SeriesBatchSize, + sortingStrategy: sortingStrategyStore, + indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, } for _, option := range options { @@ -582,7 +593,7 @@ func NewBucketStore( // Depend on the options indexReaderPoolMetrics := indexheader.NewReaderPoolMetrics(extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics, s.indexHeaderLazyDownloadStrategy) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too if err := s.validate(); err != nil { @@ -759,6 +770,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.dir, meta.ULID, s.postingOffsetsInMemSampling, + meta, ) if err != nil { return errors.Wrap(err, "create index header reader") diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go b/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go index ea48f7e1a27..197364ca040 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go @@ -33,9 +33,9 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/thanos-io/thanos/pkg/clientconfig" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/dedup" - "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" @@ -578,7 +578,7 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que preq.Header.Set("Content-Type", "application/x-stream-protobuf") preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") - preq.Header.Set("User-Agent", httpconfig.ThanosUserAgent) + preq.Header.Set("User-Agent", clientconfig.ThanosUserAgent) presp, err = p.client.Do(preq.WithContext(ctx)) if err != nil { return nil, errors.Wrap(err, "send request") diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/custom.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/custom.go index fb3b395a9a6..5619977da94 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/custom.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/custom.go @@ -3,7 +3,19 @@ package prompb +import ( + "github.com/prometheus/prometheus/model/histogram" +) + func (h Histogram) IsFloatHistogram() bool { _, ok := h.GetCount().(*Histogram_CountFloat) return ok } + +func FromProtoHistogram(h Histogram) *histogram.FloatHistogram { + if h.IsFloatHistogram() { + return FloatHistogramProtoToFloatHistogram(h) + } else { + return HistogramProtoToFloatHistogram(h) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e8d43e97500..1269212ad5e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -917,13 +917,14 @@ github.com/thanos-io/promql-engine/execution/warnings github.com/thanos-io/promql-engine/extlabels github.com/thanos-io/promql-engine/logicalplan github.com/thanos-io/promql-engine/query -# github.com/thanos-io/thanos v0.32.5-0.20231214182650-88f7119f2166 +# github.com/thanos-io/thanos v0.33.1-0.20231224215600-665e64370a2c ## explicit; go 1.21 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader github.com/thanos-io/thanos/pkg/block/metadata github.com/thanos-io/thanos/pkg/cache github.com/thanos-io/thanos/pkg/cacheutil +github.com/thanos-io/thanos/pkg/clientconfig github.com/thanos-io/thanos/pkg/compact github.com/thanos-io/thanos/pkg/compact/downsample github.com/thanos-io/thanos/pkg/component @@ -940,7 +941,6 @@ github.com/thanos-io/thanos/pkg/extkingpin github.com/thanos-io/thanos/pkg/extprom github.com/thanos-io/thanos/pkg/extprom/http github.com/thanos-io/thanos/pkg/gate -github.com/thanos-io/thanos/pkg/httpconfig github.com/thanos-io/thanos/pkg/info/infopb github.com/thanos-io/thanos/pkg/metadata/metadatapb github.com/thanos-io/thanos/pkg/model