diff --git a/go.mod b/go.mod index c2d979d1c18..dbb8a66a3ce 100644 --- a/go.mod +++ b/go.mod @@ -51,9 +51,9 @@ require ( github.com/sony/gobreaker v1.0.0 github.com/spf13/afero v1.11.0 github.com/stretchr/testify v1.9.0 - github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d - github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 - github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 + github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 + github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a + github.com/thanos-io/thanos v0.37.0 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.17 diff --git a/go.sum b/go.sum index d74a22a901a..944c49a8096 100644 --- a/go.sum +++ b/go.sum @@ -1683,12 +1683,12 @@ github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4 github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= -github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d h1:k+SLTP1mjNqXxsCiq4UYeKCe07le0ieffyuHm/YfmH8= -github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4= -github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g+3EPohdw4cv+6jv5+LcX6LynhHvQcYwTAMxQ= -github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= -github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 h1:khQQmo9VMMphKm10NS22iI8bAIf4DNRFvLoP4IR7kK4= -github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171/go.mod h1:v2AUVRd44iuv/PY0iwzKzgosLusUg7/dnnw5KEe1Has= +github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0= +github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= +github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a h1:BhWU58VHOxkxQEMByih9fM2WwuwCGtk5AulIcSRSr0A= +github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= +github.com/thanos-io/thanos v0.37.0 h1:RCO2pL3S1sU6Wd3ITviWSzouFg8VVzpz92eCJOUEnow= +github.com/thanos-io/thanos v0.37.0/go.mod h1:bkdS9wnItzpNofJjjp2R7Y5Xt5UeA688dLKcjmf7MaA= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index d65bdfb22ed..e83f5a1f795 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -149,6 +149,16 @@ func (b *BucketWithRetries) Name() string { return b.bucket.Name() } +func (b *BucketWithRetries) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return b.retry(ctx, func() error { + return b.bucket.IterWithAttributes(ctx, dir, f, options...) + }, fmt.Sprintf("IterWithAttributes %s", dir)) +} + +func (b *BucketWithRetries) SupportedIterOptions() []objstore.IterOptionType { + return b.bucket.SupportedIterOptions() +} + func (b *BucketWithRetries) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return b.retry(ctx, func() error { return b.bucket.Iter(ctx, dir, f, options...) diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go index 2454970517d..461b60b5e19 100644 --- a/pkg/storage/bucket/s3/bucket_client_test.go +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -208,6 +208,14 @@ func (m *mockBucket) Name() string { return "mock" } +func (m *mockBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + return nil +} + +func (m *mockBucket) SupportedIterOptions() []objstore.IterOptionType { + return nil +} + // Iter mocks objstore.Bucket.Iter() func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return nil diff --git a/vendor/github.com/thanos-io/objstore/CHANGELOG.md b/vendor/github.com/thanos-io/objstore/CHANGELOG.md index d2b1aaabdab..f0904faa198 100644 --- a/vendor/github.com/thanos-io/objstore/CHANGELOG.md +++ b/vendor/github.com/thanos-io/objstore/CHANGELOG.md @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#79](https://github.com/thanos-io/objstore/pull/79) Metrics: Fix `objstore_bucket_operation_duration_seconds` for `iter` operations. ### Added +- [#63](https://github.com/thanos-io/objstore/pull/63) Implement a `IterWithAttributes` method on the bucket client. - [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support. - [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class. - [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials. @@ -53,6 +54,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#116](https://github.com/thanos-io/objstore/pull/116) Azure: Add new storage_create_container configuration property - [#128](https://github.com/thanos-io/objstore/pull/128) GCS: Add support for `ChunkSize` for writer. - [#130](https://github.com/thanos-io/objstore/pull/130) feat: Decouple creating bucket metrics from instrumenting the bucket +- [#147](https://github.com/thanos-io/objstore/pull/147) feat: Add MaxRetries config to cos, gcs and obs. - [#150](https://github.com/thanos-io/objstore/pull/150) Add support for roundtripper wrapper. ### Changed diff --git a/vendor/github.com/thanos-io/objstore/README.md b/vendor/github.com/thanos-io/objstore/README.md index 6d848e79746..d8f58023137 100644 --- a/vendor/github.com/thanos-io/objstore/README.md +++ b/vendor/github.com/thanos-io/objstore/README.md @@ -48,7 +48,7 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m The core this module is the [`Bucket` interface](objstore.go): -```go mdox-exec="sed -n '37,50p' objstore.go" +```go mdox-exec="sed -n '39,55p' objstore.go" // Bucket provides read and write access to an object storage bucket. // NOTE: We assume strong consistency for write-read flow. type Bucket interface { @@ -63,18 +63,31 @@ type Bucket interface { // If object does not exist in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error + // Name returns the bucket name for the provider. + Name() string +} ``` All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go): -```go mdox-exec="sed -n '68,93p' objstore.go" - +```go mdox-exec="sed -n '71,106p' objstore.go" // BucketReader provides read access to an object storage bucket. type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -374,6 +387,7 @@ config: server_name: "" insecure_skip_verify: false disable_compression: false + chunk_size_bytes: 0 prefix: "" ``` @@ -447,6 +461,7 @@ config: storage_account: "" storage_account_key: "" storage_connection_string: "" + storage_create_container: false container: "" endpoint: "" user_assigned_id: "" diff --git a/vendor/github.com/thanos-io/objstore/inmem.go b/vendor/github.com/thanos-io/objstore/inmem.go index ed256c9cd9d..d550e283ce0 100644 --- a/vendor/github.com/thanos-io/objstore/inmem.go +++ b/vendor/github.com/thanos-io/objstore/inmem.go @@ -106,6 +106,20 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, return nil } +func (i *InMemBucket) SupportedIterOptions() []IterOptionType { + return []IterOptionType{Recursive} +} + +func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error { + if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(IterObjectAttributes{Name: name}) + }, options...) +} + // Get returns a reader for the given object name. func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) { if name == "" { diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index 62f1c655db7..33c6e5e8675 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -6,11 +6,13 @@ package objstore import ( "bytes" "context" + "fmt" "io" "io/fs" "os" "path" "path/filepath" + "slices" "strings" "sync" "time" @@ -70,8 +72,19 @@ type InstrumentedBucket interface { type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -101,24 +114,66 @@ type InstrumentedBucketReader interface { ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader } +var ErrOptionNotSupported = errors.New("iter option is not supported") + +// IterOptionType is used for type-safe option support checking. +type IterOptionType int + +const ( + Recursive IterOptionType = iota + UpdatedAt +) + // IterOption configures the provided params. -type IterOption func(params *IterParams) +type IterOption struct { + Type IterOptionType + Apply func(params *IterParams) +} // WithRecursiveIter is an option that can be applied to Iter() to recursively list objects // in the bucket. -func WithRecursiveIter(params *IterParams) { - params.Recursive = true +func WithRecursiveIter() IterOption { + return IterOption{ + Type: Recursive, + Apply: func(params *IterParams) { + params.Recursive = true + }, + } +} + +// WithUpdatedAt is an option that can be applied to Iter() to +// include the last modified time in the attributes. +// NB: Prefixes may not report last modified time. +// This option is currently supported for the azure, s3, bos, gcs and filesystem providers. +func WithUpdatedAt() IterOption { + return IterOption{ + Type: UpdatedAt, + Apply: func(params *IterParams) { + params.LastModified = true + }, + } } // IterParams holds the Iter() parameters and is used by objstore clients implementations. type IterParams struct { - Recursive bool + Recursive bool + LastModified bool +} + +func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error { + for _, opt := range options { + if !slices.Contains(supportedOptions, opt.Type) { + return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type) + } + } + + return nil } func ApplyIterOptions(options ...IterOption) IterParams { out := IterParams{} for _, opt := range options { - opt(&out) + opt.Apply(&out) } return out } @@ -189,6 +244,20 @@ type ObjectAttributes struct { LastModified time.Time `json:"last_modified"` } +type IterObjectAttributes struct { + Name string + lastModified time.Time +} + +func (i *IterObjectAttributes) SetLastModified(t time.Time) { + i.lastModified = t +} + +// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available. +func (i *IterObjectAttributes) LastModified() (time.Time, bool) { + return i.lastModified, !i.lastModified.IsZero() +} + // TryToGetSize tries to get upfront size from reader. // Some implementations may return only size of unread data in the reader, so it's best to call this method before // doing any reading. @@ -211,6 +280,8 @@ func TryToGetSize(r io.Reader) (int64, error) { return f.Size(), nil case ObjectSizer: return f.ObjectSize() + case *io.LimitedReader: + return f.N, nil } return 0, errors.Errorf("unsupported type of io.Reader: %T", r) } @@ -531,21 +602,43 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket return b.WithExpectedErrs(fn) } -func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error { +func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error { const op = OpIter b.metrics.ops.WithLabelValues(op).Inc() - start := time.Now() + timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op)) + defer timer.ObserveDuration() + err := b.bkt.Iter(ctx, dir, f, options...) if err != nil { if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { b.metrics.opsFailures.WithLabelValues(op).Inc() } } - b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) return err } +func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + const op = OpIter + b.metrics.ops.WithLabelValues(op).Inc() + + timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op)) + defer timer.ObserveDuration() + + err := b.bkt.IterWithAttributes(ctx, dir, f, options...) + if err != nil { + if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { + b.metrics.opsFailures.WithLabelValues(op).Inc() + } + } + + return err +} + +func (b *metricBucket) SupportedIterOptions() []IterOptionType { + return b.bkt.SupportedIterOptions() +} + func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { const op = OpAttributes b.metrics.ops.WithLabelValues(op).Inc() diff --git a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go index f2b71434683..a76b34c360e 100644 --- a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go +++ b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go @@ -54,6 +54,19 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er }, options...) } +func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + pdir := withPrefix(p.prefix, dir) + + return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error { + attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim) + return f(attrs) + }, options...) +} + +func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType { + return p.bkt.SupportedIterOptions() +} + // Get returns a reader for the given object name. func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name)) diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index e125ca3511c..5689dc62b7b 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -193,9 +193,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR return bkt, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + prefix := dir if prefix != "" && !strings.HasSuffix(prefix, DirDelim) { prefix += DirDelim @@ -211,7 +217,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blob := range resp.Segment.BlobItems { - if err := f(*blob.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blob.Name, + } + if params.LastModified { + attrs.SetLastModified(*blob.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } @@ -227,12 +239,18 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blobItem := range resp.Segment.BlobItems { - if err := f(*blobItem.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blobItem.Name, + } + if params.LastModified { + attrs.SetLastModified(*blobItem.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } for _, blobPrefix := range resp.Segment.BlobPrefixes { - if err := f(*blobPrefix.Name); err != nil { + if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil { return err } } @@ -240,6 +258,23 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. func (b *Bucket) IsObjNotFoundErr(err error) bool { if err == nil { diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index 2ed42ee8b64..01dca4bbd34 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -50,13 +50,19 @@ func NewBucket(rootDir string) (*Bucket, error) { return &Bucket{rootDir: absDir}, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { if ctx.Err() != nil { return ctx.Err() } + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + params := objstore.ApplyIterOptions(options...) absDir := filepath.Join(b.rootDir, dir) info, err := os.Stat(absDir) @@ -92,7 +98,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if params.Recursive { // Recursively list files in the subdirectory. - if err := b.Iter(ctx, name, f, options...); err != nil { + if err := b.IterWithAttributes(ctx, name, f, options...); err != nil { return err } @@ -101,13 +107,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt continue } } - if err := f(name); err != nil { + + attrs := objstore.IterObjectAttributes{ + Name: name, + } + if params.LastModified { + absPath := filepath.Join(absDir, file.Name()) + stat, err := os.Stat(absPath) + if err != nil { + return errors.Wrapf(err, "stat %s", name) + } + attrs.SetLastModified(stat.ModTime()) + } + if err := f(attrs); err != nil { return err } } return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.GetRange(ctx, name, 0, -1) diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index efb208e60e4..d54a6782f42 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -54,6 +54,11 @@ type Config struct { // Used as storage.Writer.ChunkSize of https://pkg.go.dev/google.golang.org/cloud/storage#Writer ChunkSizeBytes int `yaml:"chunk_size_bytes"` noAuth bool `yaml:"no_auth"` + + // MaxRetries controls the number of retries for idempotent operations. + // Overrides the default gcs storage client behavior if this value is greater than 0. + // Set this to 1 to disable retries. + MaxRetries int `yaml:"max_retries"` } // Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS. @@ -173,6 +178,11 @@ func newBucket(ctx context.Context, logger log.Logger, gc Config, opts []option. name: gc.Bucket, chunkSize: gc.ChunkSizeBytes, } + + if gc.MaxRetries > 0 { + bkt.bkt = bkt.bkt.Retryer(storage.WithMaxAttempts(gc.MaxRetries)) + } + return bkt, nil } @@ -181,18 +191,26 @@ func (b *Bucket) Name() string { return b.name } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + // If recursive iteration is enabled we should pass an empty delimiter. delimiter := DirDelim - if objstore.ApplyIterOptions(options...).Recursive { + if appliedOpts.Recursive { delimiter = "" } @@ -200,11 +218,15 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt Prefix: dir, Delimiter: delimiter, } - err := query.SetAttrSelection([]string{"Name"}) - if err != nil { - return err + if appliedOpts.LastModified { + if err := query.SetAttrSelection([]string{"Name", "Updated"}); err != nil { + return err + } + } else { + if err := query.SetAttrSelection([]string{"Name"}); err != nil { + return err + } } - it := b.bkt.Objects(ctx, query) for { select { @@ -219,12 +241,34 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if err != nil { return err } - if err := f(attrs.Prefix + attrs.Name); err != nil { + + objAttrs := objstore.IterObjectAttributes{Name: attrs.Prefix + attrs.Name} + if appliedOpts.LastModified { + objAttrs.SetLastModified(attrs.Updated) + } + if err := f(objAttrs); err != nil { return err } } } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { r, err := b.bkt.Object(name).NewReader(ctx) diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index 8e5b8b56402..27e82ffbafb 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -136,6 +136,7 @@ type Config struct { PartSize uint64 `yaml:"part_size"` SSEConfig SSEConfig `yaml:"sse_config"` STSEndpoint string `yaml:"sts_endpoint"` + MaxRetries int `yaml:"max_retries"` } // SSEConfig deals with the configuration of SSE for Minio. The following options are valid: @@ -263,6 +264,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, wra Region: config.Region, Transport: tpt, BucketLookup: config.BucketLookupType.MinioType(), + MaxRetries: config.MaxRetries, }) if err != nil { return nil, errors.Wrap(err, "initialize s3 client") @@ -387,18 +389,26 @@ func ValidateForTests(conf Config) error { return nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + opts := minio.ListObjectsOptions{ Prefix: dir, - Recursive: objstore.ApplyIterOptions(options...).Recursive, + Recursive: appliedOpts.Recursive, UseV1: b.listObjectsV1, } @@ -415,7 +425,15 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if object.Key == dir { continue } - if err := f(object.Key); err != nil { + + attr := objstore.IterObjectAttributes{ + Name: object.Key, + } + if appliedOpts.LastModified { + attr.SetLastModified(object.LastModified) + } + + if err := f(attr); err != nil { return err } } @@ -423,6 +441,21 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return ctx.Err() } +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { sse, err := b.getServerSideEncryption(ctx) if err != nil { @@ -629,7 +662,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke bktToCreate := c.Bucket if c.Bucket != "" && reuseBucket { - if err := b.Iter(ctx, "", func(f string) error { + if err := b.Iter(ctx, "", func(string) error { return errors.Errorf("bucket %s is not empty", c.Bucket) }); err != nil { return nil, nil, errors.Wrapf(err, "s3 check bucket %s", c.Bucket) diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index e872728e4d6..86caa0c1edf 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -21,6 +21,7 @@ import ( "github.com/ncw/swift" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/exthttp" "gopkg.in/yaml.v2" @@ -222,9 +223,13 @@ func (c *Container) Name() string { return c.name } +func (c *Container) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + // Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. -func (c *Container) Iter(_ context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (c *Container) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { if dir != "" { dir = strings.TrimSuffix(dir, string(DirDelim)) + string(DirDelim) } @@ -242,6 +247,7 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op if err != nil { return objects, errors.Wrap(err, "list object names") } + for _, object := range objects { if object == SegmentsDir { continue @@ -254,6 +260,16 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op }) } +func (c *Container) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(c.SupportedIterOptions(), options...); err != nil { + return err + } + + return c.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + func (c *Container) get(name string, headers swift.Headers, checkHash bool) (io.ReadCloser, error) { if name == "" { return nil, errors.New("object name cannot be empty") diff --git a/vendor/github.com/thanos-io/objstore/testing.go b/vendor/github.com/thanos-io/objstore/testing.go index 28cbd658894..d3fa1def443 100644 --- a/vendor/github.com/thanos-io/objstore/testing.go +++ b/vendor/github.com/thanos-io/objstore/testing.go @@ -195,7 +195,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) expected = []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some", "id2/obj_4.some", "obj_5.some"} sort.Strings(expected) sort.Strings(seen) @@ -214,7 +214,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from id1 dir? @@ -230,7 +230,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from not existing dir? @@ -295,6 +295,15 @@ func (d *delayingBucket) Iter(ctx context.Context, dir string, f func(string) er return d.bkt.Iter(ctx, dir, f, options...) } +func (d *delayingBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + time.Sleep(d.delay) + return d.bkt.IterWithAttributes(ctx, dir, f, options...) +} + +func (d *delayingBucket) SupportedIterOptions() []IterOptionType { + return d.bkt.SupportedIterOptions() +} + func (d *delayingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { time.Sleep(d.delay) return d.bkt.GetRange(ctx, name, off, length) diff --git a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go index 0a26ceeb66f..cabe07b2cf0 100644 --- a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go +++ b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go @@ -52,6 +52,18 @@ func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) erro return } +func (t TracingBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) (err error) { + doWithSpan(ctx, "bucket_iter_with_attrs", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("dir", dir) + err = t.bkt.IterWithAttributes(spanCtx, dir, f, options...) + }) + return +} + +func (t TracingBucket) SupportedIterOptions() []objstore.IterOptionType { + return t.bkt.SupportedIterOptions() +} + func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { span, spanCtx := startSpan(ctx, "bucket_get") span.LogKV("name", name) diff --git a/vendor/github.com/thanos-io/promql-engine/engine/explain.go b/vendor/github.com/thanos-io/promql-engine/engine/explain.go index 13f69404db8..e1b42634bdc 100644 --- a/vendor/github.com/thanos-io/promql-engine/engine/explain.go +++ b/vendor/github.com/thanos-io/promql-engine/engine/explain.go @@ -4,6 +4,8 @@ package engine import ( + "sync" + "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/execution/model" @@ -18,7 +20,12 @@ type ExplainableQuery interface { type AnalyzeOutputNode struct { OperatorTelemetry model.OperatorTelemetry `json:"telemetry,omitempty"` - Children []AnalyzeOutputNode `json:"children,omitempty"` + Children []*AnalyzeOutputNode `json:"children,omitempty"` + + once sync.Once + totalSamples int64 + peakSamples int64 + totalSamplesPerStep []int64 } type ExplainOutputNode struct { @@ -29,60 +36,48 @@ type ExplainOutputNode struct { var _ ExplainableQuery = &compatibilityQuery{} func (a *AnalyzeOutputNode) TotalSamples() int64 { - var total int64 - if a.OperatorTelemetry.Samples() != nil { - total += a.OperatorTelemetry.Samples().TotalSamples - } - if a.OperatorTelemetry.SubQuery() { - // Returning here to avoid double counting samples from children of subquery. - return total - } - - for _, child := range a.Children { - c := child.TotalSamples() - if c > 0 { - total += child.TotalSamples() - } - } - - return total + a.aggregateSamples() + return a.totalSamples } func (a *AnalyzeOutputNode) TotalSamplesPerStep() []int64 { - if a.OperatorTelemetry.Samples() == nil { - return []int64{} - } - - total := a.OperatorTelemetry.Samples().TotalSamplesPerStep - for _, child := range a.Children { - for i, s := range child.TotalSamplesPerStep() { - total[i] += s - } - } - - return total + a.aggregateSamples() + return a.totalSamplesPerStep } func (a *AnalyzeOutputNode) PeakSamples() int64 { - var peak int64 - if a.OperatorTelemetry.Samples() != nil { - peak = int64(a.OperatorTelemetry.Samples().PeakSamples) - } - for _, child := range a.Children { - childPeak := child.PeakSamples() - if childPeak > peak { - peak = childPeak + a.aggregateSamples() + return a.peakSamples +} + +func (a *AnalyzeOutputNode) aggregateSamples() { + a.once.Do(func() { + if nodeSamples := a.OperatorTelemetry.Samples(); nodeSamples != nil { + a.totalSamples += nodeSamples.TotalSamples + a.peakSamples += int64(nodeSamples.PeakSamples) + a.totalSamplesPerStep = nodeSamples.TotalSamplesPerStep } - } - return peak + + for _, child := range a.Children { + childPeak := child.PeakSamples() + a.peakSamples = max(a.peakSamples, childPeak) + for i, s := range child.TotalSamplesPerStep() { + a.totalSamplesPerStep[i] += s + } + // Aggregate only if the node is not a subquery to avoid double counting samples from children. + if !a.OperatorTelemetry.SubQuery() { + a.totalSamples += child.TotalSamples() + } + } + }) } func analyzeQuery(obsv model.ObservableVectorOperator) *AnalyzeOutputNode { children := obsv.Explain() - var childTelemetry []AnalyzeOutputNode + var childTelemetry []*AnalyzeOutputNode for _, child := range children { if obsChild, ok := child.(model.ObservableVectorOperator); ok { - childTelemetry = append(childTelemetry, *analyzeQuery(obsChild)) + childTelemetry = append(childTelemetry, analyzeQuery(obsChild)) } } diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go index 5ce536ce493..25e4a3886cd 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/accumulator.go @@ -290,13 +290,39 @@ type avgAcc struct { avg float64 count int64 hasValue bool + + histSum *histogram.FloatHistogram + histScratch *histogram.FloatHistogram + histSumScratch *histogram.FloatHistogram + histCount float64 } func newAvgAcc() *avgAcc { return &avgAcc{} } -func (a *avgAcc) Add(v float64, _ *histogram.FloatHistogram) error { +func (a *avgAcc) Add(v float64, h *histogram.FloatHistogram) error { + if h != nil { + a.histCount++ + if a.histSum == nil { + a.histSum = h.Copy() + a.histScratch = &histogram.FloatHistogram{} + a.histSumScratch = &histogram.FloatHistogram{} + return nil + } + + h.CopyTo(a.histScratch) + left := a.histScratch.Div(a.histCount) + a.histSum.CopyTo(a.histSumScratch) + right := a.histSumScratch.Div(a.histCount) + toAdd, err := left.Sub(right) + if err != nil { + return err + } + a.histSum, err = a.histSum.Add(toAdd) + return err + } + a.count++ if !a.hasValue { a.hasValue = true @@ -327,29 +353,43 @@ func (a *avgAcc) Add(v float64, _ *histogram.FloatHistogram) error { return nil } -func (a *avgAcc) AddVector(vs []float64, _ []*histogram.FloatHistogram) error { +func (a *avgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error { for _, v := range vs { if err := a.Add(v, nil); err != nil { return err } } + for _, h := range hs { + if err := a.Add(0, h); err != nil { + return err + } + } return nil } func (a *avgAcc) Value() (float64, *histogram.FloatHistogram) { - return a.avg, nil + return a.avg, a.histSum } -func (c *avgAcc) ValueType() ValueType { - if c.hasValue { +func (a *avgAcc) ValueType() ValueType { + hasFloat := a.count > 0 + hasHist := a.histCount > 0 + + if hasFloat && hasHist { + return MixedTypeValue + } + if hasFloat || hasHist { return SingleTypeValue - } else { - return NoValue } + return NoValue } + func (a *avgAcc) Reset(_ float64) { a.hasValue = false a.count = 0 + + a.histCount = 0 + a.histSum = nil } type statAcc struct { @@ -447,6 +487,56 @@ func (q *quantileAcc) Reset(f float64) { q.points = q.points[:0] } +type histogramAvg struct { + sum *histogram.FloatHistogram + count int64 + hasFloat bool +} + +func newHistogramAvg() *histogramAvg { + return &histogramAvg{ + sum: &histogram.FloatHistogram{}, + } +} + +func (acc *histogramAvg) Add(v float64, h *histogram.FloatHistogram) error { + if h == nil { + acc.hasFloat = true + } + if acc.count == 0 { + h.CopyTo(acc.sum) + } + var err error + if h.Schema >= acc.sum.Schema { + if acc.sum, err = acc.sum.Add(h); err != nil { + return err + } + } else { + t := h.Copy() + if _, err = t.Add(acc.sum); err != nil { + return err + } + acc.sum = t + } + acc.count++ + return nil +} + +func (acc *histogramAvg) Value() (float64, *histogram.FloatHistogram) { + return 0, acc.sum.Mul(1 / float64(acc.count)) +} + +func (acc *histogramAvg) ValueType() ValueType { + if acc.count > 0 && !acc.hasFloat { + return SingleTypeValue + } + return NoValue +} + +func (acc *histogramAvg) Reset(f float64) { + acc.count = 0 +} + // SumCompensated returns the sum of the elements of the slice calculated with greater // accuracy than Sum at the expense of additional computation. func SumCompensated(s []float64) float64 { diff --git a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go index 28e6d82e2a7..401db8b5a48 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/aggregate/scalar_table.go @@ -194,7 +194,10 @@ func newScalarAccumulator(expr parser.ItemType) (accumulator, error) { return newStdVarAcc(), nil case "quantile": return newQuantileAcc(), nil + case "histogram_avg": + return newHistogramAvg(), nil } + msg := fmt.Sprintf("unknown aggregation function %s", t) return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg) } diff --git a/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go b/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go index 5f1fa082e7c..a26546ddc8a 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/function/functions.go @@ -4,15 +4,11 @@ package function import ( - "fmt" "math" "time" - "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql/parser" - - "github.com/thanos-io/promql-engine/execution/parse" ) type functionCall func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) @@ -121,6 +117,12 @@ var instantVectorFuncs = map[string]functionCall{ } return h.Count, true }, + "histogram_avg": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h == nil { + return 0., false + } + return h.Sum / h.Count, true + }, "histogram_fraction": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { if h == nil || len(vargs) != 2 { return 0., false @@ -274,12 +276,3 @@ func IsExtFunction(functionName string) bool { _, ok := XFunctions[functionName] return ok } - -func UnknownFunctionError(name string) error { - msg := fmt.Sprintf("unknown function: %s", name) - if _, ok := parser.Functions[name]; ok { - return errors.Wrap(parse.ErrNotImplemented, msg) - } - - return errors.Wrap(parse.ErrNotSupportedExpr, msg) -} diff --git a/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go b/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go index 533e370e2ea..1741d839197 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/function/operator.go @@ -47,7 +47,7 @@ func NewFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.Vec func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { call, ok := noArgFuncs[funcExpr.Func.Name] if !ok { - return nil, UnknownFunctionError(funcExpr.Func.Name) + return nil, parse.UnknownFunctionError(funcExpr.Func.Name) } interval := opts.Step.Milliseconds() @@ -98,7 +98,7 @@ type functionOperator struct { func newInstantVectorFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOps []model.VectorOperator, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { call, ok := instantVectorFuncs[funcExpr.Func.Name] if !ok { - return nil, UnknownFunctionError(funcExpr.Func.Name) + return nil, parse.UnknownFunctionError(funcExpr.Func.Name) } scalarPoints := make([][]float64, stepsBatch) diff --git a/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go b/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go index ae786a69751..b19d3d9c822 100644 --- a/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go +++ b/vendor/github.com/thanos-io/promql-engine/logicalplan/distribute.go @@ -494,7 +494,13 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string case Deduplicate, RemoteExecution: return false case *Binary: - return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e, engineLabels)) + if isBinaryExpressionWithOneScalarSide(e) { + return true + } + return !skipBinaryPushdown && + isBinaryExpressionWithDistributableMatching(e, engineLabels) && + isDistributive(&e.LHS, skipBinaryPushdown, engineLabels, warns) && + isDistributive(&e.RHS, skipBinaryPushdown, engineLabels, warns) case *Aggregation: // Certain aggregations are currently not supported. if _, ok := distributiveAggregations[e.Op]; !ok { diff --git a/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go b/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go index caeb334366b..550e21b7cce 100644 --- a/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go +++ b/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go @@ -5,6 +5,7 @@ package ringbuffer import ( "math" + "slices" "github.com/prometheus/prometheus/model/histogram" @@ -177,6 +178,7 @@ func (r *RateBuffer) Eval(_ float64, _ *int64) (float64, *histogram.FloatHistogr r.resets...), r.last, ) + r.rateBuffer = slices.CompactFunc(r.rateBuffer, func(s1 Sample, s2 Sample) bool { return s1.T == s2.T }) numSamples := r.stepRanges[0].numSamples f, h, err := extrapolatedRate(r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset) return f, h, true, err diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/histograms.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/histograms.go deleted file mode 100644 index 769b7de59f9..00000000000 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/histograms.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) The Thanos Community Authors. -// Licensed under the Apache License 2.0. - -package prometheus - -import ( - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/value" - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -type histogramStatsIterator struct { - chunkenc.Iterator - hReader *histogram.Histogram - fhReader *histogram.FloatHistogram -} - -func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator { - return histogramStatsIterator{ - Iterator: it, - hReader: &histogram.Histogram{}, - fhReader: &histogram.FloatHistogram{}, - } -} - -func (f histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { - var t int64 - t, f.hReader = f.Iterator.AtHistogram(f.hReader) - if value.IsStaleNaN(f.hReader.Sum) { - return t, &histogram.Histogram{Sum: f.hReader.Sum} - } - - if h == nil { - return t, &histogram.Histogram{ - CounterResetHint: f.hReader.CounterResetHint, - Count: f.hReader.Count, - Sum: f.hReader.Sum, - } - } - - h.CounterResetHint = f.hReader.CounterResetHint - h.Count = f.hReader.Count - h.Sum = f.hReader.Sum - return t, h -} - -func (f histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - var t int64 - t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader) - if value.IsStaleNaN(f.fhReader.Sum) { - return t, &histogram.FloatHistogram{Sum: f.fhReader.Sum} - } - - if fh == nil { - return t, &histogram.FloatHistogram{ - CounterResetHint: f.fhReader.CounterResetHint, - Count: f.fhReader.Count, - Sum: f.fhReader.Sum, - } - } - - fh.CounterResetHint = f.fhReader.CounterResetHint - fh.Count = f.fhReader.Count - fh.Sum = f.fhReader.Sum - return t, fh -} diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go index 4bdcd0e3313..c8883498386 100644 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go +++ b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go @@ -8,6 +8,7 @@ import ( "math" "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -159,5 +160,5 @@ func newHistogramStatsSeries(series storage.Series) histogramStatsSeries { } func (h histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { - return NewHistogramStatsIterator(h.Series.Iterator(it)) + return promql.NewHistogramStatsIterator(h.Series.Iterator(it)) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go index 772d37a48bd..02e9dec513e 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go @@ -213,7 +213,7 @@ func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch ch case ch <- id: } return nil - }, objstore.WithRecursiveIter) + }, objstore.WithRecursiveIter()) return partialBlocks, err } diff --git a/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go b/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go index 43bf1aeefe2..c68828863fe 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cache/groupcache.go @@ -208,7 +208,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf if err := bucket.Iter(ctx, parsedData.Name, func(s string) error { list = append(list, s) return nil - }, objstore.WithRecursiveIter); err != nil { + }, objstore.WithRecursiveIter()); err != nil { return err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 8098de55701..4d736ccb940 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -949,8 +949,8 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d -## explicit; go 1.21 +# github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 +## explicit; go 1.22 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp github.com/thanos-io/objstore/providers/azure @@ -959,7 +959,7 @@ github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing/opentracing -# github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 +# github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a ## explicit; go 1.22.0 github.com/thanos-io/promql-engine/api github.com/thanos-io/promql-engine/engine @@ -982,7 +982,7 @@ github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus -# github.com/thanos-io/thanos v0.35.2-0.20241118113652-f998fc59c171 +# github.com/thanos-io/thanos v0.37.0 ## explicit; go 1.23.0 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block