Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store: Add flag ignore-deletion-marks-errors to be able to ignore errors while retrieving deletion marks #7013

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule.
- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled.
- [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled.
- [#7013](https://github.com/thanos-io/thanos/pull/7013) Store Gateway: Added `--ignore-deletion-marks-errors` to ignore errors while retrieving deletion marks.

- [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func runCompact(
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, false, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type storeConfig struct {
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
ignoreDeletionMarksErrors bool
disableWeb bool
webConfig webConfig
label string
Expand Down Expand Up @@ -180,6 +181,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is 24h, half of the default value for --delete-delay on compactor.").
Default("24h").SetValue(&sc.ignoreDeletionMarksDelay)

cmd.Flag("ignore-deletion-marks-errors", "If true, Store Gateway will ignore errors while trying to fetch and parse deletion-marks. This is desirable if the storage provider intermittently time out or returning errors for non-existent files. "+
"Default is false.").
Default("false").BoolVar(&sc.ignoreDeletionMarksErrors)

cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once the block is required by a query.").
Default("false").BoolVar(&sc.lazyIndexReaderEnabled)

Expand Down Expand Up @@ -345,7 +350,7 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.ignoreDeletionMarksErrors, conf.blockMetaFetchConcurrency)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
}

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, false, block.FetcherConcurrency)}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
Expand Down Expand Up @@ -405,7 +405,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
var filters []block.MetadataFilter

if tbc.excludeDelete {
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, false, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
Expand Down Expand Up @@ -825,7 +825,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, false, tbc.blockSyncConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter)

Expand Down Expand Up @@ -1370,7 +1370,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, false, tbc.blockSyncConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency)
stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ Flags:
blocks before being deleted from bucket.
Default is 24h, half of the default value for
--delete-delay on compactor.
--ignore-deletion-marks-errors
If true, Store Gateway will ignore errors while
trying to fetch and parse deletion-marks.
This is desirable if the storage provider
intermittently time out or returning errors for
non-existent files. Default is false.
--index-cache-size=250MB Maximum size of items held in the in-memory
index cache. Ignored if --index-cache.config or
--index-cache.config-file option is specified.
Expand Down
24 changes: 15 additions & 9 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,22 +878,24 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
// Delay is not considered when computing DeletionMarkBlocks map.
// Not go-routine safe.
type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
concurrency int
bkt objstore.InstrumentedBucketReader
logger log.Logger
delay time.Duration
ignoreErrors bool
concurrency int
bkt objstore.InstrumentedBucketReader

mtx sync.Mutex
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}

// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter {
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, ignoreErrors bool, concurrency int) *IgnoreDeletionMarkFilter {
return &IgnoreDeletionMarkFilter{
logger: logger,
bkt: bkt,
delay: delay,
concurrency: concurrency,
logger: logger,
bkt: bkt,
delay: delay,
ignoreErrors: ignoreErrors,
concurrency: concurrency,
}
}

Expand Down Expand Up @@ -941,6 +943,10 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
level.Warn(f.logger).Log("msg", "failed to fetch deletion-mark.json; if you see it happening often for the same block, consider manually deleting the whole block from the object storage", "block", id, "err", err)
if f.ignoreErrors {
continue
}
// Remember the last error and continue to drain the channel.
lastErr = err
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
defer cancel()

now := time.Now()
f := NewIgnoreDeletionMarkFilter(log.NewNopLogger(), objstore.WithNoopInstr(bkt), 48*time.Hour, 32)
f := NewIgnoreDeletionMarkFilter(log.NewNopLogger(), objstore.WithNoopInstr(bkt), 48*time.Hour, false, 32)

shouldFetch := &metadata.DeletionMark{
ID: ULID(1),
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, false, fetcherConcurrency)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)

Expand Down Expand Up @@ -193,7 +193,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg

reg := prometheus.NewRegistry()

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, false, fetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
insBkt := objstore.WithNoopInstr(bkt)
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, objstore.WithNoopInstr(bkt), 48*time.Hour, false, fetcherConcurrency)

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
insBkt := objstore.WithNoopInstr(bkt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func newMetaFetcher(
thanosblock.NewTimePartitionMetaFilter(minTime, maxTime),
}
if ignoreMarkedForDeletion {
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency))
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, false, concurrency))
}
baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt)
return thanosblock.NewMetaFetcher(
Expand Down
Loading