From d33f87871af036b51ec747433c6e36205e513453 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sun, 1 Dec 2024 03:48:41 +0900 Subject: [PATCH] Upgrade thanos version to v0.37.0 (#6385) --- go.mod | 6 +- go.sum | 12 +- .../rulestore/bucketclient/bucket_client.go | 4 +- pkg/storage/bucket/client_mock.go | 10 ++ pkg/storage/bucket/prefixed_bucket_client.go | 9 ++ pkg/storage/bucket/s3/bucket_client.go | 10 ++ pkg/storage/bucket/s3/bucket_client_test.go | 10 ++ pkg/storage/bucket/sse_bucket_client.go | 8 ++ .../tsdb/bucketindex/markers_bucket_client.go | 10 ++ .../thanos-io/objstore/CHANGELOG.md | 2 + .../github.com/thanos-io/objstore/README.md | 23 +++- vendor/github.com/thanos-io/objstore/inmem.go | 14 +++ .../github.com/thanos-io/objstore/objstore.go | 111 ++++++++++++++++-- .../thanos-io/objstore/prefixed_bucket.go | 13 ++ .../objstore/providers/azure/azure.go | 47 +++++++- .../providers/filesystem/filesystem.go | 45 ++++++- .../thanos-io/objstore/providers/gcs/gcs.go | 62 ++++++++-- .../thanos-io/objstore/providers/s3/s3.go | 45 ++++++- .../objstore/providers/swift/swift.go | 18 ++- .../github.com/thanos-io/objstore/testing.go | 15 ++- .../tracing/opentracing/opentracing.go | 12 ++ .../thanos-io/promql-engine/engine/explain.go | 79 ++++++------- .../execution/aggregate/accumulator.go | 104 ++++++++++++++-- .../execution/aggregate/scalar_table.go | 3 + .../execution/function/functions.go | 19 +-- .../execution/function/operator.go | 4 +- .../promql-engine/logicalplan/distribute.go | 8 +- .../promql-engine/ringbuffer/rate.go | 2 + .../storage/prometheus/histograms.go | 66 ----------- .../storage/prometheus/scanners.go | 3 +- .../thanos-io/thanos/pkg/block/fetcher.go | 2 +- .../thanos-io/thanos/pkg/cache/groupcache.go | 2 +- vendor/modules.txt | 8 +- 33 files changed, 594 insertions(+), 192 deletions(-) delete mode 100644 vendor/github.com/thanos-io/promql-engine/storage/prometheus/histograms.go diff --git a/go.mod b/go.mod index cee28a7ef1..6fbc1405f8 100644 --- a/go.mod +++ b/go.mod @@ -51,9 +51,9 @@ require ( github.com/sony/gobreaker v1.0.0 github.com/spf13/afero v1.11.0 github.com/stretchr/testify v1.10.0 - github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d - github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 - github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 + github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 + github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a + github.com/thanos-io/thanos v0.37.0 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.17 diff --git a/go.sum b/go.sum index 6fd960db4a..e32a7f18ee 100644 --- a/go.sum +++ b/go.sum @@ -1684,12 +1684,12 @@ github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4 github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= -github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d h1:k+SLTP1mjNqXxsCiq4UYeKCe07le0ieffyuHm/YfmH8= -github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4= -github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g+3EPohdw4cv+6jv5+LcX6LynhHvQcYwTAMxQ= -github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= -github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 h1:khQQmo9VMMphKm10NS22iI8bAIf4DNRFvLoP4IR7kK4= -github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171/go.mod h1:v2AUVRd44iuv/PY0iwzKzgosLusUg7/dnnw5KEe1Has= +github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0= +github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= +github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a h1:BhWU58VHOxkxQEMByih9fM2WwuwCGtk5AulIcSRSr0A= +github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= +github.com/thanos-io/thanos v0.37.0 h1:RCO2pL3S1sU6Wd3ITviWSzouFg8VVzpz92eCJOUEnow= +github.com/thanos-io/thanos v0.37.0/go.mod h1:bkdS9wnItzpNofJjjp2R7Y5Xt5UeA688dLKcjmf7MaA= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index c59e1ad47a..049524f500 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -126,7 +126,7 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul Name: group, }) return nil - }, objstore.WithRecursiveIter) + }, objstore.WithRecursiveIter()) if err != nil { return nil, err @@ -163,7 +163,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, Name: group, }) return nil - }, objstore.WithRecursiveIter) + }, objstore.WithRecursiveIter()) if err != nil { return nil, err } diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index e503a027e7..f323000db2 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -57,6 +57,16 @@ func (m *ClientMock) Name() string { return "mock" } +func (m *ClientMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + args := m.Called(ctx, dir, f, options) + return args.Error(0) +} + +func (m *ClientMock) SupportedIterOptions() []objstore.IterOptionType { + args := m.Called() + return args.Get(0).([]objstore.IterOptionType) +} + // Iter mocks objstore.Bucket.Iter() func (m *ClientMock) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { args := m.Called(ctx, dir, f, options) diff --git a/pkg/storage/bucket/prefixed_bucket_client.go b/pkg/storage/bucket/prefixed_bucket_client.go index f6606e654d..ac3ca06ce3 100644 --- a/pkg/storage/bucket/prefixed_bucket_client.go +++ b/pkg/storage/bucket/prefixed_bucket_client.go @@ -44,6 +44,15 @@ func (b *PrefixedBucketClient) Delete(ctx context.Context, name string) error { // Name returns the bucket name for the provider. func (b *PrefixedBucketClient) Name() string { return b.bucket.Name() } +// TODO(Sungjin1212): Implement if needed +func (b *PrefixedBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return b.bucket.IterWithAttributes(ctx, dir, f, options...) +} + +func (b *PrefixedBucketClient) SupportedIterOptions() []objstore.IterOptionType { + return b.bucket.SupportedIterOptions() +} + // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. The configured prefix will be stripped // before supplied function is applied. diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index d65bdfb22e..e83f5a1f79 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -149,6 +149,16 @@ func (b *BucketWithRetries) Name() string { return b.bucket.Name() } +func (b *BucketWithRetries) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return b.retry(ctx, func() error { + return b.bucket.IterWithAttributes(ctx, dir, f, options...) + }, fmt.Sprintf("IterWithAttributes %s", dir)) +} + +func (b *BucketWithRetries) SupportedIterOptions() []objstore.IterOptionType { + return b.bucket.SupportedIterOptions() +} + func (b *BucketWithRetries) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return b.retry(ctx, func() error { return b.bucket.Iter(ctx, dir, f, options...) diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go index 2454970517..ec757100a0 100644 --- a/pkg/storage/bucket/s3/bucket_client_test.go +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -208,6 +208,16 @@ func (m *mockBucket) Name() string { return "mock" } +// IterWithAttributes mocks objstore.Bucket.IterWithAttributes() +func (m *mockBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return nil +} + +// SupportedIterOptions mocks objstore.Bucket.SupportedIterOptions() +func (m *mockBucket) SupportedIterOptions() []objstore.IterOptionType { + return nil +} + // Iter mocks objstore.Bucket.Iter() func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return nil diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go index b88e25e39c..873b74e74a 100644 --- a/pkg/storage/bucket/sse_bucket_client.go +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -98,6 +98,14 @@ func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) { return sse, nil } +func (b *SSEBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return b.bucket.IterWithAttributes(ctx, dir, f, options...) +} + +func (b *SSEBucketClient) SupportedIterOptions() []objstore.IterOptionType { + return b.bucket.SupportedIterOptions() +} + // Iter implements objstore.Bucket. func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return b.bucket.Iter(ctx, dir, f, options...) diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index c0dbb6e766..e2271cc393 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -75,6 +75,16 @@ func (b *globalMarkersBucket) Close() error { return b.parent.Close() } +// IterWithAttributes implements objstore.Bucket. +func (b *globalMarkersBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return b.parent.IterWithAttributes(ctx, dir, f, options...) +} + +// SupportedIterOptions implements objstore.Bucket. +func (b *globalMarkersBucket) SupportedIterOptions() []objstore.IterOptionType { + return b.parent.SupportedIterOptions() +} + // Iter implements objstore.Bucket. func (b *globalMarkersBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return b.parent.Iter(ctx, dir, f, options...) diff --git a/vendor/github.com/thanos-io/objstore/CHANGELOG.md b/vendor/github.com/thanos-io/objstore/CHANGELOG.md index d2b1aaabda..f0904faa19 100644 --- a/vendor/github.com/thanos-io/objstore/CHANGELOG.md +++ b/vendor/github.com/thanos-io/objstore/CHANGELOG.md @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#79](https://github.com/thanos-io/objstore/pull/79) Metrics: Fix `objstore_bucket_operation_duration_seconds` for `iter` operations. ### Added +- [#63](https://github.com/thanos-io/objstore/pull/63) Implement a `IterWithAttributes` method on the bucket client. - [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support. - [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class. - [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials. @@ -53,6 +54,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#116](https://github.com/thanos-io/objstore/pull/116) Azure: Add new storage_create_container configuration property - [#128](https://github.com/thanos-io/objstore/pull/128) GCS: Add support for `ChunkSize` for writer. - [#130](https://github.com/thanos-io/objstore/pull/130) feat: Decouple creating bucket metrics from instrumenting the bucket +- [#147](https://github.com/thanos-io/objstore/pull/147) feat: Add MaxRetries config to cos, gcs and obs. - [#150](https://github.com/thanos-io/objstore/pull/150) Add support for roundtripper wrapper. ### Changed diff --git a/vendor/github.com/thanos-io/objstore/README.md b/vendor/github.com/thanos-io/objstore/README.md index 6d848e7974..d8f5802313 100644 --- a/vendor/github.com/thanos-io/objstore/README.md +++ b/vendor/github.com/thanos-io/objstore/README.md @@ -48,7 +48,7 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m The core this module is the [`Bucket` interface](objstore.go): -```go mdox-exec="sed -n '37,50p' objstore.go" +```go mdox-exec="sed -n '39,55p' objstore.go" // Bucket provides read and write access to an object storage bucket. // NOTE: We assume strong consistency for write-read flow. type Bucket interface { @@ -63,18 +63,31 @@ type Bucket interface { // If object does not exist in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error + // Name returns the bucket name for the provider. + Name() string +} ``` All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go): -```go mdox-exec="sed -n '68,93p' objstore.go" - +```go mdox-exec="sed -n '71,106p' objstore.go" // BucketReader provides read access to an object storage bucket. type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -374,6 +387,7 @@ config: server_name: "" insecure_skip_verify: false disable_compression: false + chunk_size_bytes: 0 prefix: "" ``` @@ -447,6 +461,7 @@ config: storage_account: "" storage_account_key: "" storage_connection_string: "" + storage_create_container: false container: "" endpoint: "" user_assigned_id: "" diff --git a/vendor/github.com/thanos-io/objstore/inmem.go b/vendor/github.com/thanos-io/objstore/inmem.go index ed256c9cd9..d550e283ce 100644 --- a/vendor/github.com/thanos-io/objstore/inmem.go +++ b/vendor/github.com/thanos-io/objstore/inmem.go @@ -106,6 +106,20 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, return nil } +func (i *InMemBucket) SupportedIterOptions() []IterOptionType { + return []IterOptionType{Recursive} +} + +func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error { + if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(IterObjectAttributes{Name: name}) + }, options...) +} + // Get returns a reader for the given object name. func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) { if name == "" { diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index 62f1c655db..33c6e5e867 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -6,11 +6,13 @@ package objstore import ( "bytes" "context" + "fmt" "io" "io/fs" "os" "path" "path/filepath" + "slices" "strings" "sync" "time" @@ -70,8 +72,19 @@ type InstrumentedBucket interface { type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -101,24 +114,66 @@ type InstrumentedBucketReader interface { ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader } +var ErrOptionNotSupported = errors.New("iter option is not supported") + +// IterOptionType is used for type-safe option support checking. +type IterOptionType int + +const ( + Recursive IterOptionType = iota + UpdatedAt +) + // IterOption configures the provided params. -type IterOption func(params *IterParams) +type IterOption struct { + Type IterOptionType + Apply func(params *IterParams) +} // WithRecursiveIter is an option that can be applied to Iter() to recursively list objects // in the bucket. -func WithRecursiveIter(params *IterParams) { - params.Recursive = true +func WithRecursiveIter() IterOption { + return IterOption{ + Type: Recursive, + Apply: func(params *IterParams) { + params.Recursive = true + }, + } +} + +// WithUpdatedAt is an option that can be applied to Iter() to +// include the last modified time in the attributes. +// NB: Prefixes may not report last modified time. +// This option is currently supported for the azure, s3, bos, gcs and filesystem providers. +func WithUpdatedAt() IterOption { + return IterOption{ + Type: UpdatedAt, + Apply: func(params *IterParams) { + params.LastModified = true + }, + } } // IterParams holds the Iter() parameters and is used by objstore clients implementations. type IterParams struct { - Recursive bool + Recursive bool + LastModified bool +} + +func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error { + for _, opt := range options { + if !slices.Contains(supportedOptions, opt.Type) { + return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type) + } + } + + return nil } func ApplyIterOptions(options ...IterOption) IterParams { out := IterParams{} for _, opt := range options { - opt(&out) + opt.Apply(&out) } return out } @@ -189,6 +244,20 @@ type ObjectAttributes struct { LastModified time.Time `json:"last_modified"` } +type IterObjectAttributes struct { + Name string + lastModified time.Time +} + +func (i *IterObjectAttributes) SetLastModified(t time.Time) { + i.lastModified = t +} + +// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available. +func (i *IterObjectAttributes) LastModified() (time.Time, bool) { + return i.lastModified, !i.lastModified.IsZero() +} + // TryToGetSize tries to get upfront size from reader. // Some implementations may return only size of unread data in the reader, so it's best to call this method before // doing any reading. @@ -211,6 +280,8 @@ func TryToGetSize(r io.Reader) (int64, error) { return f.Size(), nil case ObjectSizer: return f.ObjectSize() + case *io.LimitedReader: + return f.N, nil } return 0, errors.Errorf("unsupported type of io.Reader: %T", r) } @@ -531,21 +602,43 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket return b.WithExpectedErrs(fn) } -func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error { +func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error { const op = OpIter b.metrics.ops.WithLabelValues(op).Inc() - start := time.Now() + timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op)) + defer timer.ObserveDuration() + err := b.bkt.Iter(ctx, dir, f, options...) if err != nil { if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { b.metrics.opsFailures.WithLabelValues(op).Inc() } } - b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) return err } +func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + const op = OpIter + b.metrics.ops.WithLabelValues(op).Inc() + + timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op)) + defer timer.ObserveDuration() + + err := b.bkt.IterWithAttributes(ctx, dir, f, options...) + if err != nil { + if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { + b.metrics.opsFailures.WithLabelValues(op).Inc() + } + } + + return err +} + +func (b *metricBucket) SupportedIterOptions() []IterOptionType { + return b.bkt.SupportedIterOptions() +} + func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { const op = OpAttributes b.metrics.ops.WithLabelValues(op).Inc() diff --git a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go index f2b7143468..a76b34c360 100644 --- a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go +++ b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go @@ -54,6 +54,19 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er }, options...) } +func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + pdir := withPrefix(p.prefix, dir) + + return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error { + attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim) + return f(attrs) + }, options...) +} + +func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType { + return p.bkt.SupportedIterOptions() +} + // Get returns a reader for the given object name. func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name)) diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index e125ca3511..5689dc62b7 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -193,9 +193,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR return bkt, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + prefix := dir if prefix != "" && !strings.HasSuffix(prefix, DirDelim) { prefix += DirDelim @@ -211,7 +217,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blob := range resp.Segment.BlobItems { - if err := f(*blob.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blob.Name, + } + if params.LastModified { + attrs.SetLastModified(*blob.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } @@ -227,12 +239,18 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blobItem := range resp.Segment.BlobItems { - if err := f(*blobItem.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blobItem.Name, + } + if params.LastModified { + attrs.SetLastModified(*blobItem.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } for _, blobPrefix := range resp.Segment.BlobPrefixes { - if err := f(*blobPrefix.Name); err != nil { + if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil { return err } } @@ -240,6 +258,23 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. func (b *Bucket) IsObjNotFoundErr(err error) bool { if err == nil { diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index 2ed42ee8b6..01dca4bbd3 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -50,13 +50,19 @@ func NewBucket(rootDir string) (*Bucket, error) { return &Bucket{rootDir: absDir}, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { if ctx.Err() != nil { return ctx.Err() } + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + params := objstore.ApplyIterOptions(options...) absDir := filepath.Join(b.rootDir, dir) info, err := os.Stat(absDir) @@ -92,7 +98,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if params.Recursive { // Recursively list files in the subdirectory. - if err := b.Iter(ctx, name, f, options...); err != nil { + if err := b.IterWithAttributes(ctx, name, f, options...); err != nil { return err } @@ -101,13 +107,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt continue } } - if err := f(name); err != nil { + + attrs := objstore.IterObjectAttributes{ + Name: name, + } + if params.LastModified { + absPath := filepath.Join(absDir, file.Name()) + stat, err := os.Stat(absPath) + if err != nil { + return errors.Wrapf(err, "stat %s", name) + } + attrs.SetLastModified(stat.ModTime()) + } + if err := f(attrs); err != nil { return err } } return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.GetRange(ctx, name, 0, -1) diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index efb208e60e..d54a6782f4 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -54,6 +54,11 @@ type Config struct { // Used as storage.Writer.ChunkSize of https://pkg.go.dev/google.golang.org/cloud/storage#Writer ChunkSizeBytes int `yaml:"chunk_size_bytes"` noAuth bool `yaml:"no_auth"` + + // MaxRetries controls the number of retries for idempotent operations. + // Overrides the default gcs storage client behavior if this value is greater than 0. + // Set this to 1 to disable retries. + MaxRetries int `yaml:"max_retries"` } // Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS. @@ -173,6 +178,11 @@ func newBucket(ctx context.Context, logger log.Logger, gc Config, opts []option. name: gc.Bucket, chunkSize: gc.ChunkSizeBytes, } + + if gc.MaxRetries > 0 { + bkt.bkt = bkt.bkt.Retryer(storage.WithMaxAttempts(gc.MaxRetries)) + } + return bkt, nil } @@ -181,18 +191,26 @@ func (b *Bucket) Name() string { return b.name } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + // If recursive iteration is enabled we should pass an empty delimiter. delimiter := DirDelim - if objstore.ApplyIterOptions(options...).Recursive { + if appliedOpts.Recursive { delimiter = "" } @@ -200,11 +218,15 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt Prefix: dir, Delimiter: delimiter, } - err := query.SetAttrSelection([]string{"Name"}) - if err != nil { - return err + if appliedOpts.LastModified { + if err := query.SetAttrSelection([]string{"Name", "Updated"}); err != nil { + return err + } + } else { + if err := query.SetAttrSelection([]string{"Name"}); err != nil { + return err + } } - it := b.bkt.Objects(ctx, query) for { select { @@ -219,12 +241,34 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if err != nil { return err } - if err := f(attrs.Prefix + attrs.Name); err != nil { + + objAttrs := objstore.IterObjectAttributes{Name: attrs.Prefix + attrs.Name} + if appliedOpts.LastModified { + objAttrs.SetLastModified(attrs.Updated) + } + if err := f(objAttrs); err != nil { return err } } } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { r, err := b.bkt.Object(name).NewReader(ctx) diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index 8e5b8b5640..27e82ffbaf 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -136,6 +136,7 @@ type Config struct { PartSize uint64 `yaml:"part_size"` SSEConfig SSEConfig `yaml:"sse_config"` STSEndpoint string `yaml:"sts_endpoint"` + MaxRetries int `yaml:"max_retries"` } // SSEConfig deals with the configuration of SSE for Minio. The following options are valid: @@ -263,6 +264,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, wra Region: config.Region, Transport: tpt, BucketLookup: config.BucketLookupType.MinioType(), + MaxRetries: config.MaxRetries, }) if err != nil { return nil, errors.Wrap(err, "initialize s3 client") @@ -387,18 +389,26 @@ func ValidateForTests(conf Config) error { return nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + opts := minio.ListObjectsOptions{ Prefix: dir, - Recursive: objstore.ApplyIterOptions(options...).Recursive, + Recursive: appliedOpts.Recursive, UseV1: b.listObjectsV1, } @@ -415,7 +425,15 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if object.Key == dir { continue } - if err := f(object.Key); err != nil { + + attr := objstore.IterObjectAttributes{ + Name: object.Key, + } + if appliedOpts.LastModified { + attr.SetLastModified(object.LastModified) + } + + if err := f(attr); err != nil { return err } } @@ -423,6 +441,21 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return ctx.Err() } +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { sse, err := b.getServerSideEncryption(ctx) if err != nil { @@ -629,7 +662,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke bktToCreate := c.Bucket if c.Bucket != "" && reuseBucket { - if err := b.Iter(ctx, "", func(f string) error { + if err := b.Iter(ctx, "", func(string) error { return errors.Errorf("bucket %s is not empty", c.Bucket) }); err != nil { return nil, nil, errors.Wrapf(err, "s3 check bucket %s", c.Bucket) diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index e872728e4d..86caa0c1ed 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -21,6 +21,7 @@ import ( "github.com/ncw/swift" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/exthttp" "gopkg.in/yaml.v2" @@ -222,9 +223,13 @@ func (c *Container) Name() string { return c.name } +func (c *Container) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + // Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. -func (c *Container) Iter(_ context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (c *Container) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { if dir != "" { dir = strings.TrimSuffix(dir, string(DirDelim)) + string(DirDelim) } @@ -242,6 +247,7 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op if err != nil { return objects, errors.Wrap(err, "list object names") } + for _, object := range objects { if object == SegmentsDir { continue @@ -254,6 +260,16 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op }) } +func (c *Container) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(c.SupportedIterOptions(), options...); err != nil { + return err + } + + return c.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + func (c *Container) get(name string, headers swift.Headers, checkHash bool) (io.ReadCloser, error) { if name == "" { return nil, errors.New("object name cannot be empty") diff --git a/vendor/github.com/thanos-io/objstore/testing.go b/vendor/github.com/thanos-io/objstore/testing.go index 28cbd65889..d3fa1def44 100644 --- a/vendor/github.com/thanos-io/objstore/testing.go +++ b/vendor/github.com/thanos-io/objstore/testing.go @@ -195,7 +195,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) expected = []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some", "id2/obj_4.some", "obj_5.some"} sort.Strings(expected) sort.Strings(seen) @@ -214,7 +214,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from id1 dir? @@ -230,7 +230,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from not existing dir? @@ -295,6 +295,15 @@ func (d *delayingBucket) Iter(ctx context.Context, dir string, f func(string) er return d.bkt.Iter(ctx, dir, f, options...) } +func (d *delayingBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + time.Sleep(d.delay) + return d.bkt.IterWithAttributes(ctx, dir, f, options...) +} + +func (d *delayingBucket) SupportedIterOptions() []IterOptionType { + return d.bkt.SupportedIterOptions() +} + func (d *delayingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { time.Sleep(d.delay) return d.bkt.GetRange(ctx, name, off, length) diff --git a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go index 0a26ceeb66..cabe07b2cf 100644 --- a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go +++ b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go @@ -52,6 +52,18 @@ func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) erro return } +func (t TracingBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) (err error) { + doWithSpan(ctx, "bucket_iter_with_attrs", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("dir", dir) + err = t.bkt.IterWithAttributes(spanCtx, dir, f, options...) + }) + return +} + +func (t TracingBucket) SupportedIterOptions() []objstore.IterOptionType { + return t.bkt.SupportedIterOptions() +} + func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { span, spanCtx := startSpan(ctx, "bucket_get") span.LogKV("name", name) diff --git a/vendor/github.com/thanos-io/promql-engine/engine/explain.go b/vendor/github.com/thanos-io/promql-engine/engine/explain.go index 13f69404db..e1b42634bd 100644 --- a/vendor/github.com/thanos-io/promql-engine/engine/explain.go +++ b/vendor/github.com/thanos-io/promql-engine/engine/explain.go @@ -4,6 +4,8 @@ package engine import ( + "sync" + "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/execution/model" @@ -18,7 +20,12 @@ type ExplainableQuery interface { type AnalyzeOutputNode struct { OperatorTelemetry model.OperatorTelemetry `json:"telemetry,omitempty"` - Children []AnalyzeOutputNode `json:"children,omitempty"` + Children []*AnalyzeOutputNode `json:"children,omitempty"` + + once sync.Once + totalSamples int64 + peakSamples int64 + totalSamplesPerStep []int64 } type ExplainOutputNode struct { @@ -29,60 +36,48 @@ type ExplainOutputNode struct { var _ ExplainableQuery = &compatibilityQuery{} func (a *AnalyzeOutputNode) TotalSamples() int64 { - var total int64 - if a.OperatorTelemetry.Samples() != nil { - total += a.OperatorTelemetry.Samples().TotalSamples - } - if a.OperatorTelemetry.SubQuery() { - // Returning here to avoid double counting samples from children of subquery. - return total - } - - for _, child := range a.Children { - c := child.TotalSamples() - if c > 0 { - total += child.TotalSamples() - } - } - - return total + a.aggregateSamples() + return a.totalSamples } func (a *AnalyzeOutputNode) TotalSamplesPerStep() []int64 { - if a.OperatorTelemetry.Samples() == nil { - return []int64{} - } - - total := a.OperatorTelemetry.Samples().TotalSamplesPerStep - for _, child := range a.Children { - for i, s := range child.TotalSamplesPerStep() { - total[i] += s - } - } - - return total + a.aggregateSamples() + return a.totalSamplesPerStep } func (a *AnalyzeOutputNode) PeakSamples() int64 { - var peak int64 - if a.OperatorTelemetry.Samples() != nil { - peak = int64(a.OperatorTelemetry.Samples().PeakSamples) - } - for _, child := range a.Children { - childPeak := child.PeakSamples() - if childPeak > peak { - peak = childPeak + a.aggregateSamples() + return a.peakSamples +} + +func (a *AnalyzeOutputNode) aggregateSamples() { + a.once.Do(func() { + if nodeSamples := a.OperatorTelemetry.Samples(); nodeSamples != nil { + a.totalSamples += nodeSamples.TotalSamples + a.peakSamples += int64(nodeSamples.PeakSamples) + a.totalSamplesPerStep = nodeSamples.TotalSamplesPerStep } - } - return peak + + for _, child := range a.Children { + childPeak := child.PeakSamples() + a.peakSamples = max(a.peakSamples, childPeak) + for i, s := range child.TotalSamplesPerStep() { + a.totalSamplesPerStep[i] += s + } + // Aggregate only if the node is not a subquery to avoid double counting samples from children. + if !a.OperatorTelemetry.SubQuery() { + a.totalSamples += child.TotalSamples() + } + } + }) } func analyzeQuery(obsv model.ObservableVectorOperator) *AnalyzeOutputNode { children := obsv.Explain() - var childTelemetry []AnalyzeOutputNode + var childTelemetry []*AnalyzeOutputNode for _, child := range children { if obsChild, ok := child.(model.ObservableVectorOperator); ok { - childTelemetry = append(childTelemetry, *analyzeQuery(obsChild)) + childTelemetry = append(childTelemetry, analyzeQuery(obsChild)) } } diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go index 5ce536ce49..25e4a3886c 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go @@ -290,13 +290,39 @@ type avgAcc struct { avg float64 count int64 hasValue bool + + histSum *histogram.FloatHistogram + histScratch *histogram.FloatHistogram + histSumScratch *histogram.FloatHistogram + histCount float64 } func newAvgAcc() *avgAcc { return &avgAcc{} } -func (a *avgAcc) Add(v float64, _ *histogram.FloatHistogram) error { +func (a *avgAcc) Add(v float64, h *histogram.FloatHistogram) error { + if h != nil { + a.histCount++ + if a.histSum == nil { + a.histSum = h.Copy() + a.histScratch = &histogram.FloatHistogram{} + a.histSumScratch = &histogram.FloatHistogram{} + return nil + } + + h.CopyTo(a.histScratch) + left := a.histScratch.Div(a.histCount) + a.histSum.CopyTo(a.histSumScratch) + right := a.histSumScratch.Div(a.histCount) + toAdd, err := left.Sub(right) + if err != nil { + return err + } + a.histSum, err = a.histSum.Add(toAdd) + return err + } + a.count++ if !a.hasValue { a.hasValue = true @@ -327,29 +353,43 @@ func (a *avgAcc) Add(v float64, _ *histogram.FloatHistogram) error { return nil } -func (a *avgAcc) AddVector(vs []float64, _ []*histogram.FloatHistogram) error { +func (a *avgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error { for _, v := range vs { if err := a.Add(v, nil); err != nil { return err } } + for _, h := range hs { + if err := a.Add(0, h); err != nil { + return err + } + } return nil } func (a *avgAcc) Value() (float64, *histogram.FloatHistogram) { - return a.avg, nil + return a.avg, a.histSum } -func (c *avgAcc) ValueType() ValueType { - if c.hasValue { +func (a *avgAcc) ValueType() ValueType { + hasFloat := a.count > 0 + hasHist := a.histCount > 0 + + if hasFloat && hasHist { + return MixedTypeValue + } + if hasFloat || hasHist { return SingleTypeValue - } else { - return NoValue } + return NoValue } + func (a *avgAcc) Reset(_ float64) { a.hasValue = false a.count = 0 + + a.histCount = 0 + a.histSum = nil } type statAcc struct { @@ -447,6 +487,56 @@ func (q *quantileAcc) Reset(f float64) { q.points = q.points[:0] } +type histogramAvg struct { + sum *histogram.FloatHistogram + count int64 + hasFloat bool +} + +func newHistogramAvg() *histogramAvg { + return &histogramAvg{ + sum: &histogram.FloatHistogram{}, + } +} + +func (acc *histogramAvg) Add(v float64, h *histogram.FloatHistogram) error { + if h == nil { + acc.hasFloat = true + } + if acc.count == 0 { + h.CopyTo(acc.sum) + } + var err error + if h.Schema >= acc.sum.Schema { + if acc.sum, err = acc.sum.Add(h); err != nil { + return err + } + } else { + t := h.Copy() + if _, err = t.Add(acc.sum); err != nil { + return err + } + acc.sum = t + } + acc.count++ + return nil +} + +func (acc *histogramAvg) Value() (float64, *histogram.FloatHistogram) { + return 0, acc.sum.Mul(1 / float64(acc.count)) +} + +func (acc *histogramAvg) ValueType() ValueType { + if acc.count > 0 && !acc.hasFloat { + return SingleTypeValue + } + return NoValue +} + +func (acc *histogramAvg) Reset(f float64) { + acc.count = 0 +} + // SumCompensated returns the sum of the elements of the slice calculated with greater // accuracy than Sum at the expense of additional computation. func SumCompensated(s []float64) float64 { diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go index 28e6d82e2a..401db8b5a4 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go @@ -194,7 +194,10 @@ func newScalarAccumulator(expr parser.ItemType) (accumulator, error) { return newStdVarAcc(), nil case "quantile": return newQuantileAcc(), nil + case "histogram_avg": + return newHistogramAvg(), nil } + msg := fmt.Sprintf("unknown aggregation function %s", t) return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg) } diff --git a/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go b/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go index 5f1fa082e7..a26546ddc8 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go @@ -4,15 +4,11 @@ package function import ( - "fmt" "math" "time" - "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql/parser" - - "github.com/thanos-io/promql-engine/execution/parse" ) type functionCall func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) @@ -121,6 +117,12 @@ var instantVectorFuncs = map[string]functionCall{ } return h.Count, true }, + "histogram_avg": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h == nil { + return 0., false + } + return h.Sum / h.Count, true + }, "histogram_fraction": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { if h == nil || len(vargs) != 2 { return 0., false @@ -274,12 +276,3 @@ func IsExtFunction(functionName string) bool { _, ok := XFunctions[functionName] return ok } - -func UnknownFunctionError(name string) error { - msg := fmt.Sprintf("unknown function: %s", name) - if _, ok := parser.Functions[name]; ok { - return errors.Wrap(parse.ErrNotImplemented, msg) - } - - return errors.Wrap(parse.ErrNotSupportedExpr, msg) -} diff --git a/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go b/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go index 533e370e2e..1741d83919 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go @@ -47,7 +47,7 @@ func NewFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.Vec func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { call, ok := noArgFuncs[funcExpr.Func.Name] if !ok { - return nil, UnknownFunctionError(funcExpr.Func.Name) + return nil, parse.UnknownFunctionError(funcExpr.Func.Name) } interval := opts.Step.Milliseconds() @@ -98,7 +98,7 @@ type functionOperator struct { func newInstantVectorFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.VectorOperator, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { call, ok := instantVectorFuncs[funcExpr.Func.Name] if !ok { - return nil, UnknownFunctionError(funcExpr.Func.Name) + return nil, parse.UnknownFunctionError(funcExpr.Func.Name) } scalarPoints := make([][]float64, stepsBatch) diff --git a/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go b/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go index ae786a6975..b19d3d9c82 100644 --- a/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go +++ b/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go @@ -494,7 +494,13 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string case Deduplicate, RemoteExecution: return false case *Binary: - return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e, engineLabels)) + if isBinaryExpressionWithOneScalarSide(e) { + return true + } + return !skipBinaryPushdown && + isBinaryExpressionWithDistributableMatching(e, engineLabels) && + isDistributive(&e.LHS, skipBinaryPushdown, engineLabels, warns) && + isDistributive(&e.RHS, skipBinaryPushdown, engineLabels, warns) case *Aggregation: // Certain aggregations are currently not supported. if _, ok := distributiveAggregations[e.Op]; !ok { diff --git a/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go b/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go index caeb334366..550e21b7cc 100644 --- a/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go +++ b/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go @@ -5,6 +5,7 @@ package ringbuffer import ( "math" + "slices" "github.com/prometheus/prometheus/model/histogram" @@ -177,6 +178,7 @@ func (r *RateBuffer) Eval(_ float64, _ *int64) (float64, *histogram.FloatHistogr r.resets...), r.last, ) + r.rateBuffer = slices.CompactFunc(r.rateBuffer, func(s1 Sample, s2 Sample) bool { return s1.T == s2.T }) numSamples := r.stepRanges[0].numSamples f, h, err := extrapolatedRate(r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset) return f, h, true, err diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/histograms.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/histograms.go deleted file mode 100644 index 769b7de59f..0000000000 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/histograms.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) The Thanos Community Authors. -// Licensed under the Apache License 2.0. - -package prometheus - -import ( - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/value" - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -type histogramStatsIterator struct { - chunkenc.Iterator - hReader *histogram.Histogram - fhReader *histogram.FloatHistogram -} - -func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator { - return histogramStatsIterator{ - Iterator: it, - hReader: &histogram.Histogram{}, - fhReader: &histogram.FloatHistogram{}, - } -} - -func (f histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { - var t int64 - t, f.hReader = f.Iterator.AtHistogram(f.hReader) - if value.IsStaleNaN(f.hReader.Sum) { - return t, &histogram.Histogram{Sum: f.hReader.Sum} - } - - if h == nil { - return t, &histogram.Histogram{ - CounterResetHint: f.hReader.CounterResetHint, - Count: f.hReader.Count, - Sum: f.hReader.Sum, - } - } - - h.CounterResetHint = f.hReader.CounterResetHint - h.Count = f.hReader.Count - h.Sum = f.hReader.Sum - return t, h -} - -func (f histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - var t int64 - t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader) - if value.IsStaleNaN(f.fhReader.Sum) { - return t, &histogram.FloatHistogram{Sum: f.fhReader.Sum} - } - - if fh == nil { - return t, &histogram.FloatHistogram{ - CounterResetHint: f.fhReader.CounterResetHint, - Count: f.fhReader.Count, - Sum: f.fhReader.Sum, - } - } - - fh.CounterResetHint = f.fhReader.CounterResetHint - fh.Count = f.fhReader.Count - fh.Sum = f.fhReader.Sum - return t, fh -} diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go index 4bdcd0e331..c888349838 100644 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go +++ b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go @@ -8,6 +8,7 @@ import ( "math" "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -159,5 +160,5 @@ func newHistogramStatsSeries(series storage.Series) histogramStatsSeries { } func (h histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { - return NewHistogramStatsIterator(h.Series.Iterator(it)) + return promql.NewHistogramStatsIterator(h.Series.Iterator(it)) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go index 772d37a48b..02e9dec513 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go @@ -213,7 +213,7 @@ func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch ch case ch <- id: } return nil - }, objstore.WithRecursiveIter) + }, objstore.WithRecursiveIter()) return partialBlocks, err } diff --git a/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go b/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go index 43bf1aeefe..c68828863f 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go @@ -208,7 +208,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf if err := bucket.Iter(ctx, parsedData.Name, func(s string) error { list = append(list, s) return nil - }, objstore.WithRecursiveIter); err != nil { + }, objstore.WithRecursiveIter()); err != nil { return err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 60202dbd57..fee5727173 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -950,8 +950,8 @@ github.com/stretchr/testify/assert github.com/stretchr/testify/assert/yaml github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d -## explicit; go 1.21 +# github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 +## explicit; go 1.22 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp github.com/thanos-io/objstore/providers/azure @@ -960,7 +960,7 @@ github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing/opentracing -# github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 +# github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a ## explicit; go 1.22.0 github.com/thanos-io/promql-engine/api github.com/thanos-io/promql-engine/engine @@ -983,7 +983,7 @@ github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus -# github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 +# github.com/thanos-io/thanos v0.37.0 ## explicit; go 1.23.0 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block