Skip to content

Commit

Permalink
Merge branch 'db_main' into pull-latest-main
Browse files Browse the repository at this point in the history
  • Loading branch information
jnyi committed Mar 29, 2024
2 parents 943401f + 3e50e25 commit 40fee2c
Show file tree
Hide file tree
Showing 20 changed files with 805 additions and 46 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,43 @@ jobs:
- name: Linting & vetting
run: make go-lint

unit:
strategy:
fail-fast: false
runs-on: ubuntu-latest
name: Thanos unit tests
env:
GOBIN: /home/runner/go/bin
steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Install Go.
uses: actions/setup-go@v3
with:
go-version: 1.21.x

- name: Install bingo and configure PATH
run: |
go install github.com/bwplotka/bingo@latest
ls -l $GOPATH/bin
echo $PATH
- name: Install Prometheus using bingo
run: |
bingo get prometheus
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/.cache/golangci-lint
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Run unit tests
run: make test-local

e2e:
strategy:
fail-fast: false
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile.multi-stage
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ COPY . $GOPATH/src/github.com/thanos-io/thanos
RUN git update-index --refresh; make build

# -----------------------------------------------------------------------------
FROM alpine:3.15

FROM quay.io/prometheus/busybox@sha256:${BASE_DOCKER_SHA}
#FROM quay.io/prometheus/busybox@sha256:${BASE_DOCKER_SHA}
LABEL maintainer="The Thanos Authors"

COPY --from=builder /go/bin/thanos /bin/thanos
Expand Down
10 changes: 8 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,14 @@ func runCompact(
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
compactor, err := compact.NewBucketCompactorWithCheckerAndCallback(
logger,
sy,
grouper,
planner,
comp,
compact.DefaultBlockDeletableChecker{},
compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval),
compactDir,
insBkt,
conf.compactionConcurrency,
Expand Down Expand Up @@ -437,7 +439,7 @@ func runCompact(

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction")
return errors.Wrap(err, "whole compaction error")
}

if !conf.disableDownsampling {
Expand Down Expand Up @@ -719,6 +721,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
enableOverlappingRemoval bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
progressCalculateInterval time.Duration
Expand Down Expand Up @@ -798,6 +801,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("compact.enable-overlapping-removal", "In house flag to remove overlapping blocks. Turn this on to fix https://github.com/thanos-io/thanos/issues/6775.").
Default("false").BoolVar(&cc.enableOverlappingRemoval)

cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func registerQueryFrontend(app *extkingpin.App) {
cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)

cmd.Flag("query-frontend.log-failed-queries", "Log failed queries due to any reason").Default("true").BoolVar(&cfg.CortexHandlerConfig.LogFailedQueries)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
"Request header names used to identify the source of slow queries (repeated flag). "+
Expand Down
2 changes: 2 additions & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ Flags:
--query-frontend.forward-header=<http-header-name> ...
List of headers forwarded by the query-frontend
to downstream queriers, default is empty
--query-frontend.log-failed-queries
Log failed queries due to any reason
--query-frontend.log-queries-longer-than=0
Log queries that are slower than the specified
duration. Set to 0 to disable. Set to < 0 to
Expand Down
2 changes: 1 addition & 1 deletion docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ Flags:
--rewrite.to-relabel-config-file=<file-path>
Path to YAML file that contains relabel configs
that will be applied to blocks
--tmp.dir="/tmp/thanos-rewrite"
--tmp.dir="/var/folders/7w/bk4g23r116j_srlrlf8_ys7r0000gp/T/thanos-rewrite"
Working directory for temporary files
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
Expand Down
32 changes: 32 additions & 0 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
LogFailedQueries bool `yaml:"log_failed_queries"`
}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand Down Expand Up @@ -127,6 +128,10 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

if err != nil {
writeError(w, err)
if f.cfg.LogFailedQueries {
queryString = f.parseRequestQueryString(r, buf)
f.reportFailedQuery(r, queryString, err)
}
return
}

Expand Down Expand Up @@ -160,6 +165,33 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) {
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
grafanaDashboardUID := "-"
if dashboardUID := r.Header.Get("X-Dashboard-Uid"); dashboardUID != "" {
grafanaDashboardUID = dashboardUID
}
grafanaPanelID := "-"
if panelID := r.Header.Get("X-Panel-Id"); panelID != "" {
grafanaPanelID = panelID
}
remoteUser, _, _ := r.BasicAuth()

logMessage := append([]interface{}{
"msg", "failed query",
"method", r.Method,
"host", r.Host,
"path", r.URL.Path,
"remote_user", remoteUser,
"remote_addr", r.RemoteAddr,
"error", err.Error(),
"grafana_dashboard_uid", grafanaDashboardUID,
"grafana_panel_id", grafanaPanelID,
}, formatQueryString(queryString)...)

level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header, queryString url.Values, queryResponseTime time.Duration) {
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
Expand Down
78 changes: 72 additions & 6 deletions internal/cortex/querier/queryrange/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,36 @@ package queryrange

import (
"context"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)

const DAY = 24 * time.Hour
const queryRangeBucket = "query_range_bucket"
const invalidDurationBucket = "Invalid"

var queryRangeBuckets = []float64{.005, .01, .05, .1, .25, .5, 1, 3, 5, 10, 30, 60, 120}

// InstrumentMiddleware can be inserted into the middleware chain to expose timing information.
func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics) Middleware {
var durationCol instrument.Collector
func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics, log log.Logger) Middleware {

var durationCol instrument.Collector
// Support the case metrics shouldn't be tracked (ie. unit tests).
if metrics != nil {
durationCol = instrument.NewHistogramCollector(metrics.duration)
durationCol = NewDurationHistogramCollector(metrics.duration, log)
} else {
durationCol = &NoopCollector{}
}

return MiddlewareFunc(func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, req Request) (Response, error) {
queryRangeDurationBucket := getRangeBucket(req)
ctx = context.WithValue(ctx, queryRangeBucket, queryRangeDurationBucket)
var resp Response
err := instrument.CollectedRequest(ctx, name, durationCol, instrument.ErrorCode, func(ctx context.Context) error {
var err error
Expand All @@ -36,6 +46,32 @@ func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics) Mid
})
}

func getRangeBucket(req Request) string {
queryRangeDuration := req.GetEnd() - req.GetStart()
switch {
case queryRangeDuration < 0:
return invalidDurationBucket
case queryRangeDuration == 0:
return "Instant"
case queryRangeDuration <= time.Hour.Milliseconds():
return "1h"
case queryRangeDuration <= 6*time.Hour.Milliseconds():
return "6h"
case queryRangeDuration <= 12*time.Hour.Milliseconds():
return "12h"
case queryRangeDuration <= DAY.Milliseconds():
return "1d"
case queryRangeDuration <= 2*DAY.Milliseconds():
return "2d"
case queryRangeDuration <= 7*DAY.Milliseconds():
return "7d"
case queryRangeDuration <= 30*DAY.Milliseconds():
return "30d"
default:
return "+INF"
}
}

// InstrumentMiddlewareMetrics holds the metrics tracked by InstrumentMiddleware.
type InstrumentMiddlewareMetrics struct {
duration *prometheus.HistogramVec
Expand All @@ -48,13 +84,13 @@ func NewInstrumentMiddlewareMetrics(registerer prometheus.Registerer) *Instrumen
Namespace: "cortex",
Name: "frontend_query_range_duration_seconds",
Help: "Total time spent in seconds doing query range requests.",
Buckets: prometheus.DefBuckets,
}, []string{"method", "status_code"}),
Buckets: queryRangeBuckets,
}, []string{"method", "status_code", queryRangeBucket}),
}
}

// NoopCollector is a noop collector that can be used as placeholder when no metric
// should tracked by the instrumentation.
// should be tracked by the instrumentation.
type NoopCollector struct{}

// Register implements instrument.Collector.
Expand All @@ -65,3 +101,33 @@ func (c *NoopCollector) Before(ctx context.Context, method string, start time.Ti

// After implements instrument.Collector.
func (c *NoopCollector) After(ctx context.Context, method, statusCode string, start time.Time) {}

// DurationHistogramCollector collects the duration of a request
type DurationHistogramCollector struct {
metric *prometheus.HistogramVec
log log.Logger
}

func (c *DurationHistogramCollector) Register() {
prometheus.MustRegister(c.metric)
}

func (c *DurationHistogramCollector) Before(ctx context.Context, method string, start time.Time) {
}

func (c *DurationHistogramCollector) After(ctx context.Context, method, statusCode string, start time.Time) {
durationBucket, ok := ctx.Value(queryRangeBucket).(string)

if !ok {
level.Warn(c.log).Log("msg", "failed to get query range bucket for frontend_query_range_duration_seconds metrics",
"method", method, "start_time", start)
durationBucket = invalidDurationBucket
}
if c.metric != nil {
instrument.ObserveWithExemplar(ctx, c.metric.WithLabelValues(method, statusCode, durationBucket), time.Since(start).Seconds())
}
}

func NewDurationHistogramCollector(metric *prometheus.HistogramVec, log log.Logger) *DurationHistogramCollector {
return &DurationHistogramCollector{metric, log}
}
Loading

0 comments on commit 40fee2c

Please sign in to comment.