Skip to content

Commit

Permalink
apply: Allow ignoring a specific error returned from updatePartitions()
Browse files Browse the repository at this point in the history
When applying topic's configuration, we'd like to be able
avoid failing in case the desired topic's partition count
is smaller than the actual topic's partitions count (on
the broker).
We know that Kafka doesn't allow partitions decrease,
so instead of failing the operation we'd like to
continue without doing anything.
This is very convenient when trying to apply a batch
of topics in which case, we don't want to fail the rest
of the batch for one topic that its partition count
configuration is lower than the actual partitions count.

This change is backward compatible, as the default behavior
is kept.

Signed-off-by: shimon-armis <[email protected]>
  • Loading branch information
shimonturjeman committed Mar 21, 2024
1 parent 48409ce commit 19daf93
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type applyCmdConfig struct {
autoContinueRebalance bool
retentionDropStepDurationStr string
skipConfirm bool
ignoreFewerPartitionsError bool
sleepLoopDuration time.Duration

shared sharedOptions
Expand Down Expand Up @@ -99,6 +100,12 @@ func init() {
false,
"Skip confirmation prompts during apply process",
)
applyCmd.Flags().BoolVar(
&applyConfig.ignoreFewerPartitionsError,
"ignore-fewer-partitions-error",
false,
"Don't return error when topic's config specifies fewer partitions than it currently has",
)
applyCmd.Flags().DurationVar(
&applyConfig.sleepLoopDuration,
"sleep-loop-duration",
Expand Down Expand Up @@ -231,6 +238,7 @@ func applyTopic(
AutoContinueRebalance: applyConfig.autoContinueRebalance,
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
SkipConfirm: applyConfig.skipConfirm,
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
SleepLoopDuration: applyConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type TopicApplierConfig struct {
AutoContinueRebalance bool
RetentionDropStepDuration time.Duration
SkipConfirm bool
IgnoreFewerPartitionsError bool
SleepLoopDuration time.Duration
TopicConfig config.TopicConfig
}
Expand Down Expand Up @@ -194,6 +195,10 @@ func (t *TopicApplier) applyNewTopic(ctx context.Context) error {
return nil
}

func isFewerPartitionsError(err error) bool {
return strings.Contains(err.Error(), "Fewer partitions in topic config")
}

func (t *TopicApplier) applyExistingTopic(
ctx context.Context,
topicInfo admin.TopicInfo,
Expand All @@ -213,6 +218,11 @@ func (t *TopicApplier) applyExistingTopic(
}

if err := t.updatePartitions(ctx, topicInfo); err != nil {
if isFewerPartitionsError(err) && t.config.IgnoreFewerPartitionsError {
log.Warnf("UpdatePartitions failure ignored. topic: %v, error: %v", t.topicName, err)
return nil
}

return err
}

Expand Down

0 comments on commit 19daf93

Please sign in to comment.