Skip to content

Commit

Permalink
Differentiate retry and halt error and retry failed compaction only o…
Browse files Browse the repository at this point in the history
…n retriable error (cortexproject#6111)

* Differentiate retry and halt error. Retry failed compaction only on retriable error

Signed-off-by: Alex Le <[email protected]>

* update CHANGELOG

Signed-off-by: Alex Le <[email protected]>

* fix lint and address comment

Signed-off-by: Alex Le <[email protected]>

* merged halt and retriable error into one and differentiate with type label

Signed-off-by: Alex Le <[email protected]>

* reverted log change

Signed-off-by: Alex Le <[email protected]>

* rename

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Jul 24, 2024
1 parent fe788be commit 42d7327
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095
* [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096
* [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097
* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
Expand Down
12 changes: 6 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)

Expand Down Expand Up @@ -192,7 +192,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))
}, append(commonLabels, reasonLabelName))

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)

Expand Down
10 changes: 10 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,20 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
if lastErr == nil {
return nil
}
if ctx.Err() != nil {
return ctx.Err()
}
if c.isCausedByPermissionDenied(lastErr) {
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr)
c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, unauthorizedError).Inc()
return nil
}
if compact.IsHaltError(lastErr) {
level.Error(c.logger).Log("msg", "compactor returned critical error", "user", userID, "err", lastErr)
c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, haltError).Inc()
return lastErr
}
c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, retriableError).Inc()

retries.Wait()
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/compactor/compactor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,23 @@ type compactorMetrics struct {
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
remainingPlannedCompactions *prometheus.GaugeVec
compactionErrorsCount *prometheus.CounterVec
}

const (
UserLabelName = "user"
TimeRangeLabelName = "time_range_milliseconds"
ReasonLabelName = "reason"
userLabelName = "user"
timeRangeLabelName = "time_range_milliseconds"
reasonLabelName = "reason"
compactionErrorTypesLabelName = "type"

retriableError = "retriable"
haltError = "halt"
unauthorizedError = "unauthorized"
)

var (
commonLabels = []string{UserLabelName}
compactionLabels = []string{TimeRangeLabelName}
commonLabels = []string{userLabelName}
compactionLabels = []string{timeRangeLabelName}
)

func newDefaultCompactorMetrics(reg prometheus.Registerer) *compactorMetrics {
Expand Down Expand Up @@ -129,7 +135,7 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
m.syncerBlocksMarkedForDeletion = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, ReasonLabelName))
}, append(commonLabels, reasonLabelName))

m.compactions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_group_compactions_total",
Expand Down Expand Up @@ -159,6 +165,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
Name: "cortex_compactor_remaining_planned_compactions",
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
}, commonLabels)
m.compactionErrorsCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_compaction_error_total",
Help: "Total number of errors from compactions.",
}, append(commonLabels, compactionErrorTypesLabelName))

return &m
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/compactor/compactor_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ func TestSyncerMetrics(t *testing.T) {
cortex_compactor_remaining_planned_compactions{user="aaa"} 377740
cortex_compactor_remaining_planned_compactions{user="bbb"} 388850
cortex_compactor_remaining_planned_compactions{user="ccc"} 399960
# HELP cortex_compactor_compaction_error_total Total number of errors from compactions.
# TYPE cortex_compactor_compaction_error_total counter
cortex_compactor_compaction_error_total{type="halt",user="aaa"} 444400
cortex_compactor_compaction_error_total{type="halt",user="bbb"} 455510
cortex_compactor_compaction_error_total{type="halt",user="ccc"} 466620
cortex_compactor_compaction_error_total{type="retriable",user="aaa"} 411070
cortex_compactor_compaction_error_total{type="retriable",user="bbb"} 422180
cortex_compactor_compaction_error_total{type="retriable",user="ccc"} 433290
cortex_compactor_compaction_error_total{type="unauthorized",user="aaa"} 477730
cortex_compactor_compaction_error_total{type="unauthorized",user="bbb"} 488840
cortex_compactor_compaction_error_total{type="unauthorized",user="ccc"} 499950
`))
require.NoError(t, err)

Expand Down Expand Up @@ -163,4 +174,13 @@ func generateTestData(cm *compactorMetrics, base float64) {
cm.remainingPlannedCompactions.WithLabelValues("aaa").Add(34 * base)
cm.remainingPlannedCompactions.WithLabelValues("bbb").Add(35 * base)
cm.remainingPlannedCompactions.WithLabelValues("ccc").Add(36 * base)
cm.compactionErrorsCount.WithLabelValues("aaa", retriableError).Add(37 * base)
cm.compactionErrorsCount.WithLabelValues("bbb", retriableError).Add(38 * base)
cm.compactionErrorsCount.WithLabelValues("ccc", retriableError).Add(39 * base)
cm.compactionErrorsCount.WithLabelValues("aaa", haltError).Add(40 * base)
cm.compactionErrorsCount.WithLabelValues("bbb", haltError).Add(41 * base)
cm.compactionErrorsCount.WithLabelValues("ccc", haltError).Add(42 * base)
cm.compactionErrorsCount.WithLabelValues("aaa", unauthorizedError).Add(43 * base)
cm.compactionErrorsCount.WithLabelValues("bbb", unauthorizedError).Add(44 * base)
cm.compactionErrorsCount.WithLabelValues("ccc", unauthorizedError).Add(45 * base)
}
110 changes: 108 additions & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,8 +1615,8 @@ func (m *tsdbPlannerMock) getNoCompactBlocks() []string {
return result
}

func mockBlockMetaJSON(id string) string {
meta := tsdb.BlockMeta{
func mockBlockMeta(id string) tsdb.BlockMeta {
return tsdb.BlockMeta{
Version: 1,
ULID: ulid.MustParse(id),
MinTime: 1574776800000,
Expand All @@ -1626,6 +1626,10 @@ func mockBlockMetaJSON(id string) string {
Sources: []ulid.ULID{ulid.MustParse(id)},
},
}
}

func mockBlockMetaJSON(id string) string {
meta := mockBlockMeta(id)

content, err := json.Marshal(meta)
if err != nil {
Expand Down Expand Up @@ -1988,3 +1992,105 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
}

func TestCompactor_FailedWithRetriableError(t *testing.T) {
t.Parallel()

ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, errors.New("test retriable error"))
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, errors.New("test retriable error"))
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

cfg := prepareConfig()
cfg.CompactionRetries = 2

c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

cortex_testutil.Poll(t, 1*time.Second, 2.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactorMetrics.compactionErrorsCount.WithLabelValues("user-1", retriableError))
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# HELP cortex_compactor_compaction_error_total Total number of errors from compactions.
# TYPE cortex_compactor_compaction_error_total counter
cortex_compactor_compaction_error_total{type="retriable", user="user-1"} 2
`),
"cortex_compactor_compaction_retry_error_total",
"cortex_compactor_compaction_error_total",
))
}

func TestCompactor_FailedWithHaltError(t *testing.T) {
t.Parallel()

ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, compact.HaltError{})
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, compact.HaltError{})
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

cfg := prepareConfig()
cfg.CompactionRetries = 2

c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

cortex_testutil.Poll(t, 1*time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactorMetrics.compactionErrorsCount.WithLabelValues("user-1", haltError))
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# HELP cortex_compactor_compaction_error_total Total number of errors from compactions.
# TYPE cortex_compactor_compaction_error_total counter
cortex_compactor_compaction_error_total{type="halt", user="user-1"} 1
`),
"cortex_compactor_compaction_retry_error_total",
"cortex_compactor_compaction_error_total",
))
}

0 comments on commit 42d7327

Please sign in to comment.