Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/new-reloader-receive-hashring' i…
Browse files Browse the repository at this point in the history
…nto new-reloader-receive-hashring
  • Loading branch information
douglascamata committed Sep 19, 2023
2 parents 82cde59 + 57650b1 commit cd779a1
Show file tree
Hide file tree
Showing 32 changed files with 2,370 additions and 324 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#6605](https://github.com/thanos-io/thanos/pull/6605) Query Frontend: Support vertical sharding binary expression with metric name when no matching labels specified.
- [#6308](https://github.com/thanos-io/thanos/pull/6308) Ruler: Support configuration flag that allows customizing template for alert message.

### Changed

Expand Down Expand Up @@ -110,6 +111,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6592](https://github.com/thanos-io/thanos/pull/6592) Query Frontend: fix bugs in vertical sharding `without` and `union` function to allow more queries to be shardable.
- [#6317](https://github.com/thanos-io/thanos/pull/6317) *: Fix internal label deduplication bug, by resorting store response set.
- [#6189](https://github.com/thanos-io/thanos/pull/6189) Rule: Fix panic when calling API `/api/v1/rules?type=alert`.
- [#6598](https://github.com/thanos-io/thanos/pull/6598) compact: fix data corruption with "invalid size" error during downsample

### Changed
- [#6049](https://github.com/thanos-io/thanos/pull/6049) Compact: *breaking :warning:* Replace group with resolution in compact metrics to avoid cardinality explosion on compact metrics for large numbers of groups.
Expand Down Expand Up @@ -271,6 +273,7 @@ NOTE: Querier's `query.promql-engine` flag enabling new PromQL engine is now unh
- [#5741](https://github.com/thanos-io/thanos/pull/5741) Query: add metrics on how much data is being selected by downstream Store APIs.
- [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change.
- [#5749](https://github.com/thanos-io/thanos/pull/5749) Query Frontend: Added small LRU cache to cache query analysis results.
- [#6544](https://github.com/thanos-io/thanos/pull/6500) Objstore: Update objstore to latest version which adds a new metric regarding uploaded TSDB bytes

### Changed

Expand Down
3 changes: 3 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ type alertMgrConfig struct {
alertExcludeLabels []string
alertQueryURL *string
alertRelabelConfigPath *extflag.PathOrContent
alertSourceTemplate *string
}

func (ac *alertMgrConfig) registerFlag(cmd extflag.FlagClause) *alertMgrConfig {
Expand All @@ -234,5 +235,7 @@ func (ac *alertMgrConfig) registerFlag(cmd extflag.FlagClause) *alertMgrConfig {
cmd.Flag("alert.label-drop", "Labels by name to drop before sending to alertmanager. This allows alert to be deduplicated on replica label (repeated). Similar Prometheus alert relabelling").
StringsVar(&ac.alertExcludeLabels)
ac.alertRelabelConfigPath = extflag.RegisterPathOrContent(cmd, "alert.relabel-config", "YAML file that contains alert relabelling configuration.", extflag.WithEnvSubstitution())
ac.alertSourceTemplate = cmd.Flag("alert.query-template", "Template to use in alerts source field. Need only include {{.Expr}} parameter").Default("/graph?g0.expr={{.Expr}}&g0.tab=1").String()

return ac
}
6 changes: 3 additions & 3 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (b *erroringBucket) IsObjNotFoundErr(err error) bool {
return b.bkt.IsObjNotFoundErr(err)
}

// IsCustomerManagedKeyError returns true if error means that customer managed key is invalid.
func (b *erroringBucket) IsCustomerManagedKeyError(err error) bool {
return b.bkt.IsCustomerManagedKeyError(err)
// IsAccessDeniedErr returns true if error means that access to the object was denied.
func (b *erroringBucket) IsAccessDeniedErr(err error) bool {
return b.bkt.IsAccessDeniedErr(err)
}

// Attributes returns information about the specified object.
Expand Down
48 changes: 46 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package main

import (
"bytes"
"context"
"fmt"
"html/template"
"math/rand"
"net/http"
"net/url"
Expand Down Expand Up @@ -38,7 +41,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/agent"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/strutil"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
Expand Down Expand Up @@ -101,6 +103,10 @@ type ruleConfig struct {
storeRateLimits store.SeriesSelectLimits
}

type Expression struct {
Expr string
}

func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.http.registerFlag(cmd)
rc.grpc.registerFlag(cmd)
Expand Down Expand Up @@ -329,6 +335,10 @@ func runRule(
}
}

if err := validateTemplate(*conf.alertmgr.alertSourceTemplate); err != nil {
return errors.Wrap(err, "invalid alert source template")
}

queryProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_rule_query_apis_", reg),
Expand Down Expand Up @@ -492,11 +502,15 @@ func runRule(
if alrt.State == rules.StatePending {
continue
}
expressionURL, err := tableLinkForExpression(*conf.alertmgr.alertSourceTemplate, expr)
if err != nil {
level.Warn(logger).Log("msg", "failed to generate link for expression", "expr", expr, "err", err)
}
a := &notifier.Alert{
StartsAt: alrt.FiredAt,
Labels: alrt.Labels,
Annotations: alrt.Annotations,
GeneratorURL: conf.alertQueryURL.String() + strutil.TableLinkForExpression(expr),
GeneratorURL: conf.alertQueryURL.String() + expressionURL,
}
if !alrt.ResolvedAt.IsZero() {
a.EndsAt = alrt.ResolvedAt
Expand Down Expand Up @@ -934,3 +948,33 @@ func reloadRules(logger log.Logger,
}
return errs.Err()
}

func tableLinkForExpression(tmpl string, expr string) (string, error) {
// template example: "/graph?g0.expr={{.Expr}}&g0.tab=1"
escapedExpression := url.QueryEscape(expr)

escapedExpr := Expression{Expr: escapedExpression}
t, err := template.New("url").Parse(tmpl)
if err != nil {
return "", errors.Wrap(err, "failed to parse template")
}

var buf bytes.Buffer
if err := t.Execute(&buf, escapedExpr); err != nil {
return "", errors.Wrap(err, "failed to execute template")
}
return buf.String(), nil
}

func validateTemplate(tmplStr string) error {
tmpl, err := template.New("test").Parse(tmplStr)
if err != nil {
return fmt.Errorf("failed to parse the template: %w", err)
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, Expression{Expr: "test_expr"})
if err != nil {
return fmt.Errorf("failed to execute the template: %w", err)
}
return nil
}
56 changes: 56 additions & 0 deletions cmd/thanos/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,59 @@ func Test_parseFlagLabels(t *testing.T) {
testutil.Equals(t, err != nil, td.expectErr)
}
}

func Test_validateTemplate(t *testing.T) {
tData := []struct {
template string
expectErr bool
}{
{
template: `/graph?g0.expr={{.Expr}}&g0.tab=1`,
expectErr: false,
},
{
template: `/graph?g0.expr={{.Expression}}&g0.tab=1`,
expectErr: true,
},
{
template: `another template includes {{.Expr}}`,
expectErr: false,
},
}
for _, td := range tData {
err := validateTemplate(td.template)
testutil.Equals(t, err != nil, td.expectErr)
}
}

func Test_tableLinkForExpression(t *testing.T) {
tData := []struct {
template string
expr string
expectStr string
expectErr bool
}{
{
template: `/graph?g0.expr={{.Expr}}&g0.tab=1`,
expr: `up{app="foo"}`,
expectStr: `/graph?g0.expr=up%7Bapp%3D%22foo%22%7D&g0.tab=1`,
expectErr: false,
},
{
template: `/graph?g0.expr={{.Expression}}&g0.tab=1`,
expr: "test_expr",
expectErr: true,
},
{
template: `another template includes {{.Expr}}`,
expr: "test_expr",
expectStr: `another template includes test_expr`,
expectErr: false,
},
}
for _, td := range tData {
resStr, err := tableLinkForExpression(td.template, td.expr)
testutil.Equals(t, err != nil, td.expectErr)
testutil.Equals(t, resStr, td.expectStr)
}
}
5 changes: 5 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type storeConfig struct {
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity.").
Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout)

cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb)

cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").
Expand Down Expand Up @@ -382,6 +386,7 @@ func runStore(
}
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
}

if conf.debugLogging {
Expand Down
3 changes: 3 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ Flags:
to alertmanager. This allows alert to be
deduplicated on replica label (repeated).
Similar Prometheus alert relabelling
--alert.query-template="/graph?g0.expr={{.Expr}}&g0.tab=1"
Template to use in alerts source field.
Need only include {{.Expr}} parameter
--alert.query-url=ALERT.QUERY-URL
The external Thanos Query URL that would be set
in all alerts 'Source' field
Expand Down
5 changes: 5 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ Flags:
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--store.enable-lazy-expanded-postings
If true, Store Gateway will estimate postings
size and try to lazily expand postings if
it downloads less data than expanding all
postings.
--store.grpc.downloaded-bytes-limit=0
Maximum amount of downloaded (either
fetched or touched) bytes in a single
Expand Down
1 change: 1 addition & 0 deletions docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ type: AZURE
config:
storage_account: ""
storage_account_key: ""
storage_connection_string: ""
container: ""
endpoint: ""
user_assigned_id: ""
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
github.com/prometheus/prometheus v0.46.1-0.20230818184859-4d8e380269da
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230804084840-c042a6a16c58
github.com/thanos-io/objstore v0.0.0-20230908084555-8d397d4d88e7
github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,8 @@ github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20230804084840-c042a6a16c58 h1:4cDXsvm3mb1NvW1B1qJ9/fy6h+OOYit0h8oVA957hLM=
github.com/thanos-io/objstore v0.0.0-20230804084840-c042a6a16c58/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE=
github.com/thanos-io/objstore v0.0.0-20230908084555-8d397d4d88e7 h1:P1mukL6u3wKv4gRLjhnEYltZf8k5dXkE7y7UvEJo0fU=
github.com/thanos-io/objstore v0.0.0-20230908084555-8d397d4d88e7/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE=
github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e h1:kwsFCU8eSkZehbrAN3nXPw5RdMHi/Bok/y8l2C4M+gk=
github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e/go.mod h1:+T/ZYNCGybT6eTsGGvVtGb63nT1cvUmH6MjqRrcQoKw=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
Expand Down
10 changes: 6 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))

// File stats are gathered.
testutil.Equals(t, fmt.Sprintf(`{
Expand Down Expand Up @@ -184,7 +184,9 @@ func TestUpload(t *testing.T) {
"rel_path": "meta.json"
}
],
"index_stats": {}
"index_stats": {
"series_max_size": 16
}
}
}
`, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand All @@ -195,7 +197,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
Expand Down Expand Up @@ -227,7 +229,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
postingLengthFieldSize = 4
)

var NotFoundRange = index.Range{Start: -1, End: -1}

// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
Expand Down Expand Up @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) {
return r.indexVersion, nil
}

// PostingsOffsets implements Reader.
func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) {
return r.postingsOffset(name, values...)
}

// TODO(bwplotka): Get advantage of multi value offset fetch.
func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) {
rngs, err := r.postingsOffset(name, value)
if err != nil {
return index.Range{}, err
}
if len(rngs) != 1 {
if len(rngs) != 1 || rngs[0] == NotFoundRange {
return index.Range{}, NotFoundRangeErr
}
return rngs[0], nil
Expand Down Expand Up @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
rngs = append(rngs, NotFoundRange)
valueIndex++
}

Expand All @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue })
if i == len(e.offsets) {
// We're past the end.
for len(rngs) < len(values) {
rngs = append(rngs, NotFoundRange)
}
break
}
if i > 0 && e.offsets[i].value != wantedValue {
Expand Down Expand Up @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
// Record on the way if wanted value is equal to the current value.
if string(value) == wantedValue {
newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
} else {
rngs = append(rngs, NotFoundRange)
}
valueIndex++
if valueIndex == len(values) {
Expand All @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
}

if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value {
// Increment i when wanted value is same as next offset.
if wantedValue == e.offsets[i+1].value {
i++
}
// wantedValue is smaller or same as the next offset we know about, let's iterate further to add those.
continue
}
Expand Down
Loading

0 comments on commit cd779a1

Please sign in to comment.