diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index 83d16f927c6..3f7d3f698b7 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -66,6 +66,43 @@ jobs: - name: Linting & vetting run: make go-lint + unit: + strategy: + fail-fast: false + runs-on: ubuntu-latest + name: Thanos unit tests + env: + GOBIN: /home/runner/go/bin + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Install Go. + uses: actions/setup-go@v3 + with: + go-version: 1.21.x + + - name: Install bingo and configure PATH + run: | + go install github.com/bwplotka/bingo@latest + ls -l $GOPATH/bin + echo $PATH + + - name: Install Prometheus using bingo + run: | + bingo get prometheus + + - uses: actions/cache@v3 + with: + path: | + ~/.cache/go-build + ~/.cache/golangci-lint + ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + + - name: Run unit tests + run: make test-local + e2e: strategy: fail-fast: false diff --git a/Dockerfile.multi-stage b/Dockerfile.multi-stage index a7b58f8bb21..1888368034e 100644 --- a/Dockerfile.multi-stage +++ b/Dockerfile.multi-stage @@ -15,8 +15,9 @@ COPY . $GOPATH/src/github.com/thanos-io/thanos RUN git update-index --refresh; make build # ----------------------------------------------------------------------------- +FROM alpine:3.15 -FROM quay.io/prometheus/busybox@sha256:${BASE_DOCKER_SHA} +#FROM quay.io/prometheus/busybox@sha256:${BASE_DOCKER_SHA} LABEL maintainer="The Thanos Authors" COPY --from=builder /go/bin/thanos /bin/thanos diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 8923bd376e5..a6364a3bee9 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -377,12 +377,14 @@ func runCompact( compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) - compactor, err := compact.NewBucketCompactor( + compactor, err := compact.NewBucketCompactorWithCheckerAndCallback( logger, sy, grouper, planner, comp, + compact.DefaultBlockDeletableChecker{}, + compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval), compactDir, insBkt, conf.compactionConcurrency, @@ -437,7 +439,7 @@ func runCompact( compactMainFn := func() error { if err := compactor.Compact(ctx); err != nil { - return errors.Wrap(err, "compaction") + return errors.Wrap(err, "whole compaction error") } if !conf.disableDownsampling { @@ -719,6 +721,7 @@ type compactConfig struct { maxBlockIndexSize units.Base2Bytes hashFunc string enableVerticalCompaction bool + enableOverlappingRemoval bool dedupFunc string skipBlockWithOutOfOrderChunks bool progressCalculateInterval time.Duration @@ -798,6 +801,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) + cmd.Flag("compact.enable-overlapping-removal", "In house flag to remove overlapping blocks. Turn this on to fix https://github.com/thanos-io/thanos/issues/6775."). + Default("false").BoolVar(&cc.enableOverlappingRemoval) + cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+ "Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+ "When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag."). diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 5fa7cf3c5e6..417067135d9 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -146,6 +146,8 @@ func registerQueryFrontend(app *extkingpin.App) { cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+ "Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan) + cmd.Flag("query-frontend.log-failed-queries", "Log failed queries due to any reason").Default("true").BoolVar(&cfg.CortexHandlerConfig.LogFailedQueries) + cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+ " and both flags cannot be used at the same time. "+ "Request header names used to identify the source of slow queries (repeated flag). "+ diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index 8025cdecc8d..ab418040bd0 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -279,6 +279,8 @@ Flags: --query-frontend.forward-header= ... List of headers forwarded by the query-frontend to downstream queriers, default is empty + --query-frontend.log-failed-queries + Log failed queries due to any reason --query-frontend.log-queries-longer-than=0 Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to diff --git a/docs/components/tools.md b/docs/components/tools.md index a1f7f706aeb..50dadb09e48 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -949,7 +949,7 @@ Flags: --rewrite.to-relabel-config-file= Path to YAML file that contains relabel configs that will be applied to blocks - --tmp.dir="/tmp/thanos-rewrite" + --tmp.dir="/var/folders/7w/bk4g23r116j_srlrlf8_ys7r0000gp/T/thanos-rewrite" Working directory for temporary files --tracing.config= Alternative to 'tracing.config-file' flag diff --git a/internal/cortex/frontend/transport/handler.go b/internal/cortex/frontend/transport/handler.go index 602a54d027d..77c8b7ab3bc 100644 --- a/internal/cortex/frontend/transport/handler.go +++ b/internal/cortex/frontend/transport/handler.go @@ -46,6 +46,7 @@ type HandlerConfig struct { LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"` MaxBodySize int64 `yaml:"max_body_size"` QueryStatsEnabled bool `yaml:"query_stats_enabled"` + LogFailedQueries bool `yaml:"log_failed_queries"` } // Handler accepts queries and forwards them to RoundTripper. It can log slow queries, @@ -127,6 +128,10 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { writeError(w, err) + if f.cfg.LogFailedQueries { + queryString = f.parseRequestQueryString(r, buf) + f.reportFailedQuery(r, queryString, err) + } return } @@ -160,6 +165,33 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) { + // NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info. + grafanaDashboardUID := "-" + if dashboardUID := r.Header.Get("X-Dashboard-Uid"); dashboardUID != "" { + grafanaDashboardUID = dashboardUID + } + grafanaPanelID := "-" + if panelID := r.Header.Get("X-Panel-Id"); panelID != "" { + grafanaPanelID = panelID + } + remoteUser, _, _ := r.BasicAuth() + + logMessage := append([]interface{}{ + "msg", "failed query", + "method", r.Method, + "host", r.Host, + "path", r.URL.Path, + "remote_user", remoteUser, + "remote_addr", r.RemoteAddr, + "error", err.Error(), + "grafana_dashboard_uid", grafanaDashboardUID, + "grafana_panel_id", grafanaPanelID, + }, formatQueryString(queryString)...) + + level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) +} + // reportSlowQuery reports slow queries. func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header, queryString url.Values, queryResponseTime time.Duration) { // NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info. diff --git a/internal/cortex/querier/queryrange/instrumentation.go b/internal/cortex/querier/queryrange/instrumentation.go index 9cc92c70181..99bc158b9e0 100644 --- a/internal/cortex/querier/queryrange/instrumentation.go +++ b/internal/cortex/querier/queryrange/instrumentation.go @@ -5,6 +5,8 @@ package queryrange import ( "context" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "time" "github.com/prometheus/client_golang/prometheus" @@ -12,19 +14,27 @@ import ( "github.com/weaveworks/common/instrument" ) +const DAY = 24 * time.Hour +const queryRangeBucket = "query_range_bucket" +const invalidDurationBucket = "Invalid" + +var queryRangeBuckets = []float64{.005, .01, .05, .1, .25, .5, 1, 3, 5, 10, 30, 60, 120} + // InstrumentMiddleware can be inserted into the middleware chain to expose timing information. -func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics) Middleware { - var durationCol instrument.Collector +func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics, log log.Logger) Middleware { + var durationCol instrument.Collector // Support the case metrics shouldn't be tracked (ie. unit tests). if metrics != nil { - durationCol = instrument.NewHistogramCollector(metrics.duration) + durationCol = NewDurationHistogramCollector(metrics.duration, log) } else { durationCol = &NoopCollector{} } return MiddlewareFunc(func(next Handler) Handler { return HandlerFunc(func(ctx context.Context, req Request) (Response, error) { + queryRangeDurationBucket := getRangeBucket(req) + ctx = context.WithValue(ctx, queryRangeBucket, queryRangeDurationBucket) var resp Response err := instrument.CollectedRequest(ctx, name, durationCol, instrument.ErrorCode, func(ctx context.Context) error { var err error @@ -36,6 +46,32 @@ func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics) Mid }) } +func getRangeBucket(req Request) string { + queryRangeDuration := req.GetEnd() - req.GetStart() + switch { + case queryRangeDuration < 0: + return invalidDurationBucket + case queryRangeDuration == 0: + return "Instant" + case queryRangeDuration <= time.Hour.Milliseconds(): + return "1h" + case queryRangeDuration <= 6*time.Hour.Milliseconds(): + return "6h" + case queryRangeDuration <= 12*time.Hour.Milliseconds(): + return "12h" + case queryRangeDuration <= DAY.Milliseconds(): + return "1d" + case queryRangeDuration <= 2*DAY.Milliseconds(): + return "2d" + case queryRangeDuration <= 7*DAY.Milliseconds(): + return "7d" + case queryRangeDuration <= 30*DAY.Milliseconds(): + return "30d" + default: + return "+INF" + } +} + // InstrumentMiddlewareMetrics holds the metrics tracked by InstrumentMiddleware. type InstrumentMiddlewareMetrics struct { duration *prometheus.HistogramVec @@ -48,13 +84,13 @@ func NewInstrumentMiddlewareMetrics(registerer prometheus.Registerer) *Instrumen Namespace: "cortex", Name: "frontend_query_range_duration_seconds", Help: "Total time spent in seconds doing query range requests.", - Buckets: prometheus.DefBuckets, - }, []string{"method", "status_code"}), + Buckets: queryRangeBuckets, + }, []string{"method", "status_code", queryRangeBucket}), } } // NoopCollector is a noop collector that can be used as placeholder when no metric -// should tracked by the instrumentation. +// should be tracked by the instrumentation. type NoopCollector struct{} // Register implements instrument.Collector. @@ -65,3 +101,33 @@ func (c *NoopCollector) Before(ctx context.Context, method string, start time.Ti // After implements instrument.Collector. func (c *NoopCollector) After(ctx context.Context, method, statusCode string, start time.Time) {} + +// DurationHistogramCollector collects the duration of a request +type DurationHistogramCollector struct { + metric *prometheus.HistogramVec + log log.Logger +} + +func (c *DurationHistogramCollector) Register() { + prometheus.MustRegister(c.metric) +} + +func (c *DurationHistogramCollector) Before(ctx context.Context, method string, start time.Time) { +} + +func (c *DurationHistogramCollector) After(ctx context.Context, method, statusCode string, start time.Time) { + durationBucket, ok := ctx.Value(queryRangeBucket).(string) + + if !ok { + level.Warn(c.log).Log("msg", "failed to get query range bucket for frontend_query_range_duration_seconds metrics", + "method", method, "start_time", start) + durationBucket = invalidDurationBucket + } + if c.metric != nil { + instrument.ObserveWithExemplar(ctx, c.metric.WithLabelValues(method, statusCode, durationBucket), time.Since(start).Seconds()) + } +} + +func NewDurationHistogramCollector(metric *prometheus.HistogramVec, log log.Logger) *DurationHistogramCollector { + return &DurationHistogramCollector{metric, log} +} diff --git a/internal/cortex/querier/queryrange/instrumentation_test.go b/internal/cortex/querier/queryrange/instrumentation_test.go new file mode 100644 index 00000000000..2dbe4085165 --- /dev/null +++ b/internal/cortex/querier/queryrange/instrumentation_test.go @@ -0,0 +1,117 @@ +// Copyright (c) The Cortex Authors. +// Licensed under the Apache License 2.0. + +package queryrange + +import ( + "context" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockRequest struct { + start int64 + end int64 +} + +func (m mockRequest) Reset() { +} + +func (m mockRequest) String() string { + return "mock" +} + +func (m mockRequest) ProtoMessage() { +} + +func (m mockRequest) GetStart() int64 { + return m.start +} + +func (m mockRequest) GetEnd() int64 { + return m.end +} + +func (m mockRequest) GetStep() int64 { + return 0 +} + +func (m mockRequest) GetQuery() string { + return "" +} + +func (m mockRequest) GetCachingOptions() CachingOptions { + return CachingOptions{} +} + +func (m mockRequest) WithStartEnd(startTime int64, endTime int64) Request { + return mockRequest{start: startTime, end: endTime} +} + +func (m mockRequest) WithQuery(query string) Request { + return mockRequest{} +} + +func (m mockRequest) LogToSpan(span opentracing.Span) {} + +func (m mockRequest) GetStats() string { + return "" +} + +func (m mockRequest) WithStats(stats string) Request { + return mockRequest{} +} + +func TestGetRangeBucket(t *testing.T) { + cases := []struct { + start int64 + end int64 + expected string + }{ + {0, -1, "Invalid"}, + {1, 60 * 60 * 1000, "1h"}, + {1, 6 * 60 * 60 * 1000, "6h"}, + {1, 12 * 60 * 60 * 1000, "12h"}, + {1, 24 * 60 * 60 * 1000, "1d"}, + {1, 48 * 60 * 60 * 1000, "2d"}, + {1, 7 * 24 * 60 * 60 * 1000, "7d"}, + {1, 30 * 24 * 60 * 60 * 1000, "30d"}, + {1, 31 * 24 * 60 * 60 * 1000, "+INF"}, + } + + for _, c := range cases { + req := mockRequest{start: c.start, end: c.end} + bucket := getRangeBucket(req) + if bucket != c.expected { + t.Errorf("getRangeBucket(%v) returned %v, expected %v", req, bucket, c.expected) + } + } +} + +func TestInstrumentMiddleware(t *testing.T) { + registry := prometheus.DefaultRegisterer + + metrics := NewInstrumentMiddlewareMetrics(registry) + + logger := log.NewNopLogger() + + middleware := InstrumentMiddleware("step_align", metrics, logger) + + // Create a new dummy Request object with a duration of 6 hours. + req := mockRequest{1, 6 * 60 * 60 * 1000} + + // Create a dummy Handler object that just returns a Response object. + handler := HandlerFunc(func(ctx context.Context, req Request) (Response, error) { + return Response(nil), nil + }) + + _, err := middleware.Wrap(handler).Do(context.Background(), req) + assert.NoError(t, err) + + _, error := testutil.CollectAndLint(metrics.duration, "cortex_frontend_query_range_duration_seconds") + assert.NoError(t, error) +} diff --git a/internal/cortex/querier/queryrange/roundtrip.go b/internal/cortex/querier/queryrange/roundtrip.go index 6e82b8968a2..3f254105dc7 100644 --- a/internal/cortex/querier/queryrange/roundtrip.go +++ b/internal/cortex/querier/queryrange/roundtrip.go @@ -156,11 +156,11 @@ func NewTripperware( queryRangeMiddleware := []Middleware{NewLimitsMiddleware(limits)} if cfg.AlignQueriesWithStep { - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align", metrics), StepAlignMiddleware) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align", metrics, log), StepAlignMiddleware) } if cfg.SplitQueriesByInterval != 0 { staticIntervalFn := func(_ Request) time.Duration { return cfg.SplitQueriesByInterval } - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, codec, registerer)) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval", metrics, log), SplitByIntervalMiddleware(staticIntervalFn, limits, codec, registerer)) } var c cache.Cache @@ -173,11 +173,11 @@ func NewTripperware( return nil, nil, err } c = cache - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("results_cache", metrics), queryCacheMiddleware) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("results_cache", metrics, log), queryCacheMiddleware) } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("retry", metrics, log), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) } // Start cleanup. If cleaner stops or fail, we will simply not clean the metrics for inactive users. diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 4911b4748b7..fe9fc6c244d 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -11,6 +11,7 @@ import ( "path" "path/filepath" "sort" + "strconv" "strings" "sync" "time" @@ -41,7 +42,9 @@ const FetcherConcurrency = 32 // to allow depending projects (eg. Cortex) to implement their own custom metadata fetcher while tracking // compatible metrics. type BaseFetcherMetrics struct { - Syncs prometheus.Counter + Syncs prometheus.Counter + CacheMemoryHit prometheus.Counter + CacheDiskHit prometheus.Counter } // FetcherMetrics holds metrics tracked by the metadata fetcher. This struct and its fields are exported @@ -54,22 +57,29 @@ type FetcherMetrics struct { Synced *extprom.TxGaugeVec Modified *extprom.TxGaugeVec + + SyncedByTenant *extprom.TxGaugeVec + Assigned *extprom.TxGaugeVec } // Submit applies new values for metrics tracked by transaction GaugeVec. func (s *FetcherMetrics) Submit() { s.Synced.Submit() s.Modified.Submit() + s.SyncedByTenant.Submit() + s.Assigned.Submit() } // ResetTx starts new transaction for metrics tracked by transaction GaugeVec. func (s *FetcherMetrics) ResetTx() { s.Synced.ResetTx() s.Modified.ResetTx() + s.SyncedByTenant.ResetTx() + s.Assigned.ResetTx() } const ( - FetcherSubSys = "blocks_meta" + fetcherSubSys = "blocks_meta" CorruptedMeta = "corrupted-meta-json" NoMeta = "no-meta-json" @@ -99,11 +109,20 @@ func NewBaseFetcherMetrics(reg prometheus.Registerer) *BaseFetcherMetrics { var m BaseFetcherMetrics m.Syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: FetcherSubSys, + Subsystem: fetcherSubSys, Name: "base_syncs_total", Help: "Total blocks metadata synchronization attempts by base Fetcher", }) - + m.CacheMemoryHit = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Subsystem: fetcherSubSys, + Name: "base_cache_memory_hits_total", + Help: "Total blocks metadata from memory cache hits", + }) + m.CacheDiskHit = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Subsystem: fetcherSubSys, + Name: "base_cache_disk_hits_total", + Help: "Total blocks metadata from disk cache hits", + }) return &m } @@ -111,17 +130,17 @@ func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExt var m FetcherMetrics m.Syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: FetcherSubSys, + Subsystem: fetcherSubSys, Name: "syncs_total", Help: "Total blocks metadata synchronization attempts", }) m.SyncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: FetcherSubSys, + Subsystem: fetcherSubSys, Name: "sync_failures_total", Help: "Total blocks metadata synchronization failures", }) m.SyncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Subsystem: FetcherSubSys, + Subsystem: fetcherSubSys, Name: "sync_duration_seconds", Help: "Duration of the blocks metadata synchronization in seconds", Buckets: []float64{0.01, 1, 10, 100, 300, 600, 1000}, @@ -129,7 +148,7 @@ func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExt m.Synced = extprom.NewTxGaugeVec( reg, prometheus.GaugeOpts{ - Subsystem: FetcherSubSys, + Subsystem: fetcherSubSys, Name: "synced", Help: "Number of block metadata synced", }, @@ -139,13 +158,33 @@ func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExt m.Modified = extprom.NewTxGaugeVec( reg, prometheus.GaugeOpts{ - Subsystem: FetcherSubSys, + Subsystem: fetcherSubSys, Name: "modified", Help: "Number of blocks whose metadata changed", }, []string{"modified"}, append(DefaultModifiedLabelValues(), modifiedExtraLabels...)..., ) + m.SyncedByTenant = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: fetcherSubSys, + Name: "synced_by_tenant", + Help: "Number of metadata blocks synced broken down by tenant", + }, + []string{"tenant"}, + // No init label values is fine. The only downside is those guages won't be reset to 0, but it's fine for the use case. + ) + m.Assigned = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: fetcherSubSys, + Name: "assigned", + Help: "Number of metadata blocks assigned to this pod after all filters.", + }, + []string{"tenant", "level"}, + // No init label values is fine. The only downside is those guages won't be reset to 0, but it's fine for the use case. + ) return &m } @@ -310,11 +349,12 @@ type BaseFetcher struct { // Optional local directory to cache meta.json files. cacheDir string - syncs prometheus.Counter g singleflight.Group mtx sync.Mutex cached map[ulid.ULID]*metadata.Meta + + metrics *BaseFetcherMetrics } // NewBaseFetcher constructs BaseFetcher. @@ -343,7 +383,7 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore. blockIDsLister: blockIDsLister, cacheDir: cacheDir, cached: map[ulid.ULID]*metadata.Meta{}, - syncs: metrics.Syncs, + metrics: metrics, }, nil } @@ -395,6 +435,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met ) if m, seen := f.cached[id]; seen { + f.metrics.CacheMemoryHit.Inc() return m, nil } @@ -402,6 +443,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met if f.cacheDir != "" { m, err := metadata.ReadFromDir(cachedBlockDir) if err == nil { + f.metrics.CacheDiskHit.Inc() return m, nil } @@ -462,7 +504,7 @@ type response struct { } func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { - f.syncs.Inc() + f.metrics.Syncs.Inc() var ( resp = response{ @@ -473,11 +515,21 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { ch = make(chan ulid.ULID, f.concurrency) mtx sync.Mutex ) - level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency) + level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency, "cache_dir", f.cacheDir) for i := 0; i < f.concurrency; i++ { eg.Go(func() error { + numBlocks := 0 for id := range ch { meta, err := f.loadMeta(ctx, id) + numBlocks += 1 + if (numBlocks % 1000) == 0 { + level.Debug(f.logger).Log("msg", "loaded the metadata of a block from one goroutine", + "block", id, + "n_th_block", numBlocks, + "min_time", meta.MinTime, + "duration_hours", 1.0*(meta.MaxTime-meta.MinTime)/1000/3600, + ) + } if err == nil { mtx.Lock() resp.metas[id] = meta @@ -521,6 +573,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { if err := eg.Wait(); err != nil { return nil, errors.Wrap(err, "BaseFetcher: iter bucket") } + level.Debug(f.logger).Log("msg", "fetched meta data of all blocks", "num_blocks", len(resp.metas)) mtx.Lock() for blockULID, isPartial := range partialBlocks { @@ -599,22 +652,36 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter } resp := v.(response) + numBlocksByTenant := map[string]int{} // Copy as same response might be reused by different goroutines. metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas)) for id, m := range resp.metas { metas[id] = m + numBlocksByTenant[m.Thanos.GetTenant()]++ + } + + for tenant, numBlocks := range numBlocksByTenant { + metrics.SyncedByTenant.WithLabelValues(tenant).Set(float64(numBlocks)) } metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs))) metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas) metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas) - + blockMetaBeforeFilters := len(metas) for _, filter := range filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. if err := filter.Filter(ctx, metas, metrics.Synced, metrics.Modified); err != nil { return nil, nil, errors.Wrap(err, "filter metas") } } + level.Info(f.logger).Log("msg", "filtered out block meta data", "before", blockMetaBeforeFilters, "after", len(metas), "filters", len(filters)) + // If filters is empty, it's a global fetch for all block file metadata. metrics.Assigned is for the blocks assigned to this instance. + // Therefore, it's skipped to update the gauge. + if len(filters) > 0 { + for _, m := range metas { + metrics.Assigned.WithLabelValues(m.Thanos.GetTenant(), strconv.Itoa(m.BlockMeta.Compaction.Level)).Inc() + } + } metrics.Synced.WithLabelValues(LoadedMeta).Set(float64(len(metas))) metrics.Submit() @@ -704,8 +771,13 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet return &LabelShardedMetaFilter{relabelConfig: relabelConfig} } -// Special label that will have an ULID of the meta.json being referenced to. -const BlockIDLabel = "__block_id" +const ( + // BlockIDLabel Special label that will have an ULID of the meta.json being referenced to. + BlockIDLabel = "__block_id" + + // BlockLevelLabel Special label that will have the compaction level of the meta.json being referenced to. + BlockLevelLabel = "__block_level" +) // Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels. func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { @@ -713,6 +785,7 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]* for id, m := range metas { b.Reset(labels.EmptyLabels()) b.Set(BlockIDLabel, id.String()) + b.Set(BlockLevelLabel, strconv.Itoa(m.BlockMeta.Compaction.Level)) for k, v := range m.Thanos.Labels { b.Set(k, v) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 5e24e265382..c9d787547cb 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -281,7 +281,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { if tcase.expectedMetaErr != nil { expectedFailures = 1 } - testutil.Equals(t, float64(i+1), promtest.ToFloat64(baseFetcher.syncs)) + testutil.Equals(t, float64(i+1), promtest.ToFloat64(baseFetcher.metrics.Syncs)) testutil.Equals(t, float64(i+1), promtest.ToFloat64(fetcher.metrics.Syncs)) testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(LoadedMeta))) testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(NoMeta))) @@ -362,6 +362,60 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { } +func TestLabelShardedMetaFilter_Filter_Level(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + relabelContentYaml := ` + - action: keep + regex: "0|1|2|3|4" + source_labels: [%s] + ` + relabelConfig, err := ParseRelabelConfig([]byte(fmt.Sprintf(relabelContentYaml, BlockLevelLabel)), SelectorSupportedRelabelActions) + testutil.Ok(t, err) + + f := NewLabelShardedMetaFilter(relabelConfig) + + input := map[ulid.ULID]*metadata.Meta{ + ULID(1): { + BlockMeta: tsdb.BlockMeta{ + Compaction: tsdb.BlockMetaCompaction{Level: 5}, + }, + }, + ULID(2): { + BlockMeta: tsdb.BlockMeta{ + Compaction: tsdb.BlockMetaCompaction{Level: 2}, + }, + }, + ULID(3): { + BlockMeta: tsdb.BlockMeta{ + Compaction: tsdb.BlockMetaCompaction{Level: 0}, + }, + }, + ULID(4): { + BlockMeta: tsdb.BlockMeta{ + Compaction: tsdb.BlockMetaCompaction{Level: 23}, + }, + }, + ULID(5): { + BlockMeta: tsdb.BlockMeta{ + Compaction: tsdb.BlockMetaCompaction{Level: 1}, + }, + }, + } + expected := map[ulid.ULID]*metadata.Meta{ + ULID(2): input[ULID(2)], + ULID(3): input[ULID(3)], + ULID(5): input[ULID(5)], + } + + m := newTestFetcherMetrics() + testutil.Ok(t, f.Filter(ctx, input, m.Synced, nil)) + + testutil.Equals(t, 2.0, promtest.ToFloat64(m.Synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, expected, input) +} + func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index a479ee242de..2a2829b6b4a 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -14,6 +14,8 @@ import ( "io" "os" "path/filepath" + "sort" + "strings" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -54,6 +56,12 @@ const ( ThanosVersion1 = 1 ) +const ( + TenantLabel = "__tenant__" + + DefaultTenant = "__not_set__" +) + // Meta describes the a block's meta. It wraps the known TSDB meta structure and // extends it by Thanos-specific fields. type Meta struct { @@ -108,6 +116,23 @@ func (m *Thanos) ParseExtensions(v any) (any, error) { return ConvertExtensions(m.Extensions, v) } +func (m *Thanos) GetTenant() string { + if tenant, ok := m.Labels[TenantLabel]; ok { + return tenant + } else { + return DefaultTenant + } +} + +func (m *Thanos) GetLabels() string { + res := make([]string, 0, len(m.Labels)) + for k, v := range m.Labels { + res = append(res, fmt.Sprintf("%s=%s", k, v)) + } + sort.Strings(res) + return strings.Join(res, ",") +} + // ConvertExtensions converts extensions with `any` type into specific type `v` // that the caller expects. func ConvertExtensions(extensions any, v any) (any, error) { diff --git a/pkg/block/metadata/meta_test.go b/pkg/block/metadata/meta_test.go index ecfa075228d..ca00c7c3655 100644 --- a/pkg/block/metadata/meta_test.go +++ b/pkg/block/metadata/meta_test.go @@ -341,6 +341,15 @@ func TestMeta_ReadWrite(t *testing.T) { }) } +func TestMeta_GetLabels(t *testing.T) { + m := Meta{ + Thanos: Thanos{ + Labels: map[string]string{"a": "b", "c": "d"}, + }, + } + testutil.Equals(t, "a=b,c=d", m.Thanos.GetLabels()) +} + type TestExtensions struct { Field1 int `json:"field1"` Field2 string `json:"field2"` diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ad5c92bdb9c..6e849195e75 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1127,6 +1127,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) } + toCompact = FilterRemovedBlocks(toCompact) level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact)) begin = time.Now() @@ -1347,7 +1348,7 @@ func NewBucketCompactor( concurrency int, skipBlocksWithOutOfOrderChunks bool, ) (*BucketCompactor, error) { - if concurrency <= 0 { + if concurrency < 0 { return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) } return NewBucketCompactorWithCheckerAndCallback( @@ -1378,7 +1379,7 @@ func NewBucketCompactorWithCheckerAndCallback( concurrency int, skipBlocksWithOutOfOrderChunks bool, ) (*BucketCompactor, error) { - if concurrency <= 0 { + if concurrency < 0 { return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) } return &BucketCompactor{ @@ -1398,6 +1399,10 @@ func NewBucketCompactorWithCheckerAndCallback( // Compact runs compaction over bucket. func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { + if c.concurrency == 0 { + level.Warn(c.logger).Log("msg", "compactor is disabled") + return nil + } defer func() { // Do not remove the compactDir if an error has occurred // because potentially on the next run we would not have to download diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go new file mode 100644 index 00000000000..adc363ba63e --- /dev/null +++ b/pkg/compact/overlapping.go @@ -0,0 +1,120 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compact + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type OverlappingCompactionLifecycleCallback struct { + overlappingBlocks prometheus.Counter +} + +func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback { + if enabled { + return OverlappingCompactionLifecycleCallback{ + overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_group_overlapping_blocks_total", + Help: "Total number of blocks that are overlapping and to be deleted.", + }), + } + } + return DefaultCompactionLifecycleCallback{} +} + +// PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner +// (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others. +func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { + if len(toCompact) == 0 { + return nil + } + prev := 0 + for curr, currB := range toCompact { + prevB := toCompact[prev] + if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime { + // no overlapping with previous blocks, skip it + prev = curr + continue + } else if currB.MinTime < prevB.MinTime { + // halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation + return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String())) + } else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { + err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) + if cg.enableVerticalCompaction { + level.Error(logger).Log("msg", "best effort to vertical compact", "err", err) + prev = curr + continue + } else { + return halt(err) + } + } else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime { + if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples { + level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks", + "prev", prevB.String(), "prevSeries", prevB.Stats.NumSeries, "prevSamples", prevB.Stats.NumSamples, + "curr", currB.String(), "currSeries", currB.Stats.NumSeries, "currSamples", currB.Stats.NumSamples, + ) + prev = curr + continue + } + } + // prev min <= curr min < prev max + toDelete := -1 + if prevB.MaxTime >= currB.MaxTime { + toDelete = curr + level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block", + "toKeep", prevB.String(), "toDelete", currB.String()) + } else if prevB.MaxTime < currB.MaxTime { + toDelete = prev + prev = curr + level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block", + "toKeep", currB.String(), "toDelete", prevB.String()) + } + o.overlappingBlocks.Inc() + if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil { + return retry(err) + } + toCompact[toDelete] = nil + } + return nil +} + +func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { + return nil +} + +func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { + return tsdb.DefaultBlockPopulator{}, nil +} + +func FilterRemovedBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { + for _, b := range blocks { + if b != nil { + res = append(res, b) + } + } + return res +} + +func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error { + level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), + "level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents), + "resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(), + "series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks) + if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { + return errors.Wrapf(err, "delete overlapping block %s", m.String()) + } + return nil +} diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go new file mode 100644 index 00000000000..ab6d1051bb3 --- /dev/null +++ b/pkg/compact/overlapping_test.go @@ -0,0 +1,192 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compact + +import ( + "context" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact/downsample" +) + +func TestFilterNilCompact(t *testing.T) { + blocks := []*metadata.Meta{nil, nil} + filtered := FilterRemovedBlocks(blocks) + testutil.Equals(t, 0, len(filtered)) + + meta := []*metadata.Meta{ + createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1), + nil, + createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 2), + createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 3), + nil, + } + testutil.Equals(t, 3, len(FilterRemovedBlocks(meta))) +} + +func TestPreCompactionCallback(t *testing.T) { + reg := prometheus.NewRegistry() + logger := log.NewNopLogger() + bkt := objstore.NewInMemBucket() + group := &Group{ + logger: log.NewNopLogger(), + bkt: bkt, + } + callback := NewOverlappingCompactionLifecycleCallback(reg, true) + for _, tcase := range []struct { + testName string + input []*metadata.Meta + enableVerticalCompaction bool + expectedSize int + expectedBlocks []*metadata.Meta + err error + }{ + { + testName: "empty blocks", + }, + { + testName: "no overlapping blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 1), + }, + expectedSize: 3, + }, + { + testName: "duplicated blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 1), + }, + expectedSize: 1, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + }, + }, + { + testName: "overlap non dup blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), + createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 2), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), + }, + }, + { + testName: "receive blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.ReceiveSource, 2), + createCustomBlockMeta(8, 1, 7, metadata.ReceiveSource, 3), + }, + expectedSize: 3, + }, + { + testName: "receive + compactor blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(7, 2, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), + }, + }, + { + testName: "full overlapping blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), + }, + expectedSize: 1, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1), + }, + }, + { + testName: "part overlapping blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(1, 1, 2, metadata.CompactorSource, 1), + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), + }, + }, + { + testName: "out of order blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 2, 3, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), + }, + err: halt(errors.Errorf("expect halt error")), + }, + { + testName: "partially overlapping blocks with vertical compaction off", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), + }, + err: halt(errors.Errorf("expect halt error")), + }, + { + testName: "partially overlapping blocks with vertical compaction on", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), + }, + enableVerticalCompaction: true, + expectedSize: 3, + }, + } { + if ok := t.Run(tcase.testName, func(t *testing.T) { + group.enableVerticalCompaction = tcase.enableVerticalCompaction + err := callback.PreCompactionCallback(context.Background(), logger, group, tcase.input) + if tcase.err != nil { + testutil.NotOk(t, err) + if IsHaltError(tcase.err) { + testutil.Assert(t, IsHaltError(err), "expected halt error") + } else if IsRetryError(tcase.err) { + testutil.Assert(t, IsRetryError(err), "expected retry error") + } + return + } + testutil.Equals(t, tcase.expectedSize, len(FilterRemovedBlocks(tcase.input))) + if tcase.expectedSize != len(tcase.input) { + testutil.Equals(t, tcase.expectedBlocks, FilterRemovedBlocks(tcase.input)) + } + }); !ok { + return + } + } +} + +func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.SourceType, numSeries uint64) *metadata.Meta { + labels := map[string]string{"a": "1"} + m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{}) + m.Thanos.Source = source + m.Stats.NumSeries = numSeries + return m +} diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index b901950ba89..a018d6f98a1 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -173,7 +173,7 @@ func newQueryRangeTripperware( if config.AlignRangeWithStep { queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("step_align", m), + queryrange.InstrumentMiddleware("step_align", m, logger), queryrange.StepAlignMiddleware, ) } @@ -181,7 +181,7 @@ func newQueryRangeTripperware( if config.RequestDownsampled { queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("downsampled", m), + queryrange.InstrumentMiddleware("downsampled", m, logger), DownsampledMiddleware(codec, reg), ) } @@ -191,7 +191,7 @@ func newQueryRangeTripperware( queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("split_by_interval", m), + queryrange.InstrumentMiddleware("split_by_interval", m, logger), SplitByIntervalMiddleware(queryIntervalFn, limits, codec, reg), ) } @@ -222,7 +222,7 @@ func newQueryRangeTripperware( queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("results_cache", m), + queryrange.InstrumentMiddleware("results_cache", m, logger), queryCacheMiddleware, ) } @@ -230,7 +230,7 @@ func newQueryRangeTripperware( if config.MaxRetries > 0 { queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("retry", m), + queryrange.InstrumentMiddleware("retry", m, logger), queryrange.NewRetryMiddleware(logger, config.MaxRetries, queryrange.NewRetryMiddlewareMetrics(reg)), ) } @@ -285,7 +285,7 @@ func newLabelsTripperware( if config.SplitQueriesByInterval != 0 { labelsMiddleware = append( labelsMiddleware, - queryrange.InstrumentMiddleware("split_interval", m), + queryrange.InstrumentMiddleware("split_interval", m, logger), SplitByIntervalMiddleware(queryIntervalFn, limits, codec, reg), ) } @@ -309,7 +309,7 @@ func newLabelsTripperware( labelsMiddleware = append( labelsMiddleware, - queryrange.InstrumentMiddleware("results_cache", m), + queryrange.InstrumentMiddleware("results_cache", m, logger), queryCacheMiddleware, ) } @@ -317,7 +317,7 @@ func newLabelsTripperware( if config.MaxRetries > 0 { labelsMiddleware = append( labelsMiddleware, - queryrange.InstrumentMiddleware("retry", m), + queryrange.InstrumentMiddleware("retry", m, logger), queryrange.NewRetryMiddleware(logger, config.MaxRetries, queryrange.NewRetryMiddlewareMetrics(reg)), ) } @@ -342,7 +342,7 @@ func newInstantQueryTripperware( analyzer := querysharding.NewQueryAnalyzer() instantQueryMiddlewares = append( instantQueryMiddlewares, - queryrange.InstrumentMiddleware("sharding", m), + queryrange.InstrumentMiddleware("sharding", m, log.NewNopLogger()), PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg), ) } diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index ec41466ce45..0fb145beebe 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -153,8 +153,10 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR numSamplesOutOfOrder++ level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrDuplicateSampleForTimestamp: - numSamplesDuplicates++ - level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) + // we don't care about duplicated sample for the same timestamp + continue + // numSamplesDuplicates++ + // level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrOutOfBounds: numSamplesOutOfBounds++ level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) @@ -207,7 +209,6 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR for _, ex := range t.Exemplars { exLset := labelpb.ZLabelsToPromLabels(ex.Labels) exLogger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String()) - if _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{ Labels: exLset, Value: ex.Value, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1951c217bf5..a0968471934 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -647,6 +647,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { }() } + level.Info(s.logger).Log("msg", "started to sync blocks", "num_blocks", len(metas)) for id, meta := range metas { if b := s.getBlock(id); b != nil { continue @@ -740,7 +741,16 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er } start := time.Now() - level.Debug(s.logger).Log("msg", "loading new block", "id", meta.ULID) + level.Debug(s.logger).Log("msg", "loading new block", + "id", meta.ULID, + "min_time", meta.MinTime, + "duration_hours", 1.0*(meta.MaxTime-meta.MinTime)/(3600*1000), + "num_series", meta.Stats.NumSeries, + "num_samples", meta.Stats.NumSamples, + "num_chunks", meta.Stats.NumChunks, + "resolution", meta.Thanos.Downsample.Resolution, + "compaction_level", meta.Compaction.Level, + ) defer func() { if err != nil { s.metrics.blockLoadFailures.Inc() @@ -3519,6 +3529,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a } else { buf = make([]byte, r.block.estimatedMaxChunkSize) } + if cap(buf) < r.block.estimatedMaxChunkSize { + return errors.Errorf("chunk buffer too small, expected at least %d, got %d", r.block.estimatedMaxChunkSize, cap(buf)) + } defer r.block.chunkPool.Put(&buf) for i, pIdx := range pIdxs { @@ -3539,6 +3552,10 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a chunkLen = int(diff) } } + if cap(buf) < chunkLen { + return errors.Errorf("chunk buffer too small. expected at least %d(estimatedMaxChunkSize = %d), got %d", + chunkLen, r.block.estimatedMaxChunkSize, cap(buf)) + } cb := buf[:chunkLen] n, err = io.ReadFull(bufReader, cb) readOffset += n