Skip to content

Commit

Permalink
Store: add flag ignore-deletion-marks-errors to be able to ignore err…
Browse files Browse the repository at this point in the history
…ors while retrieving deletion marks

Signed-off-by: Petter Solberg <[email protected]>
  • Loading branch information
pettersolberg88 committed Dec 29, 2023
1 parent 93840dc commit 95f111c
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func runCompact(
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, false, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type storeConfig struct {
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
ignoreDeletionMarksErrors bool
disableWeb bool
webConfig webConfig
label string
Expand Down Expand Up @@ -180,6 +181,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is 24h, half of the default value for --delete-delay on compactor.").
Default("24h").SetValue(&sc.ignoreDeletionMarksDelay)

cmd.Flag("ignore-deletion-marks-errors", "If true, Store Gateway will ignore errors while trying to fetch and parse deletion-marks. This is desirable if the storage provider intermittently time out or returning errors for non-existent files. "+
"Default is false.").
Default("false").BoolVar(&sc.ignoreDeletionMarksErrors)

cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once the block is required by a query.").
Default("false").BoolVar(&sc.lazyIndexReaderEnabled)

Expand Down Expand Up @@ -345,7 +350,7 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.ignoreDeletionMarksErrors, conf.blockMetaFetchConcurrency)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
}

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, false, block.FetcherConcurrency)}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
Expand Down Expand Up @@ -405,7 +405,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
var filters []block.MetadataFilter

if tbc.excludeDelete {
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, false, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
Expand Down Expand Up @@ -825,7 +825,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, false, tbc.blockSyncConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter)

Expand Down Expand Up @@ -1370,7 +1370,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, tbc.deleteDelay/2, false, tbc.blockSyncConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency)
stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

Expand Down
24 changes: 15 additions & 9 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,22 +878,24 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
// Delay is not considered when computing DeletionMarkBlocks map.
// Not go-routine safe.
type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
concurrency int
bkt objstore.InstrumentedBucketReader
logger log.Logger
delay time.Duration
ignoreErrors bool
concurrency int
bkt objstore.InstrumentedBucketReader

mtx sync.Mutex
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}

// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter {
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, ignoreErrors bool, concurrency int) *IgnoreDeletionMarkFilter {
return &IgnoreDeletionMarkFilter{
logger: logger,
bkt: bkt,
delay: delay,
concurrency: concurrency,
logger: logger,
bkt: bkt,
delay: delay,
ignoreErrors: ignoreErrors,
concurrency: concurrency,
}
}

Expand Down Expand Up @@ -941,6 +943,10 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
level.Warn(f.logger).Log("msg", "failed to fetch deletion-mark.json; if you see it happening often for the same block, consider manually deleting the whole block from the object storage", "block", id, "err", err)
if f.ignoreErrors {
continue
}
// Remember the last error and continue to drain the channel.
lastErr = err
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
defer cancel()

now := time.Now()
f := NewIgnoreDeletionMarkFilter(log.NewNopLogger(), objstore.WithNoopInstr(bkt), 48*time.Hour, 32)
f := NewIgnoreDeletionMarkFilter(log.NewNopLogger(), objstore.WithNoopInstr(bkt), 48*time.Hour, false, 32)

shouldFetch := &metadata.DeletionMark{
ID: ULID(1),
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, false, fetcherConcurrency)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)

Expand Down Expand Up @@ -193,7 +193,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg

reg := prometheus.NewRegistry()

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, false, fetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
insBkt := objstore.WithNoopInstr(bkt)
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, objstore.WithNoopInstr(bkt), 48*time.Hour, false, fetcherConcurrency)

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
insBkt := objstore.WithNoopInstr(bkt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func newMetaFetcher(
thanosblock.NewTimePartitionMetaFilter(minTime, maxTime),
}
if ignoreMarkedForDeletion {
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency))
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, false, concurrency))
}
baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt)
return thanosblock.NewMetaFetcher(
Expand Down

0 comments on commit 95f111c

Please sign in to comment.