Skip to content

Commit

Permalink
optimizations for readability of 'apply --rebalance' output (#116)
Browse files Browse the repository at this point in the history
* optimizations for readability of 'apply --rebalance' output

* add throttleBytes to output

* make throttle unit string match what's used elsewhere

* format elapsed time to a single decimal place (%.1f)

* report on brokers that fail on applying throttle
  • Loading branch information
hhahn-tw authored Apr 17, 2023
1 parent df50bbb commit 8cf79d5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 17 deletions.
6 changes: 3 additions & 3 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) str
} else if !inSync {
statusPrinter = color.New(color.FgRed).SprintfFunc()
} else if !correctLeader {
statusPrinter = color.New(color.FgBlue).SprintfFunc()
statusPrinter = color.New(color.FgCyan).SprintfFunc()
}

var statusStr string
Expand Down Expand Up @@ -797,7 +797,7 @@ func assignmentRacksDiffStr(
elements := []string{}

added := color.New(color.FgRed).SprintfFunc()
moved := color.New(color.FgBlue).SprintfFunc()
moved := color.New(color.FgCyan).SprintfFunc()

for r, replica := range new.Replicas {
var element string
Expand Down Expand Up @@ -829,7 +829,7 @@ func partitionCountDiffStr(diffValue int) string {
decreasedSprintf = fmt.Sprintf
} else {
increasedSprintf = color.New(color.FgRed).SprintfFunc()
decreasedSprintf = color.New(color.FgBlue).SprintfFunc()
decreasedSprintf = color.New(color.FgCyan).SprintfFunc()
}

if diffValue > 0 {
Expand Down
41 changes: 31 additions & 10 deletions pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (t *TopicApplier) updatePartitionsHelper(
return errors.New("Stopping because of user response")
}

err = t.updatePartitionsIteration(ctx, currAssignments, desiredAssignments, true)
err = t.updatePartitionsIteration(ctx, currAssignments, desiredAssignments, true, "")
if err != nil {
return err
}
Expand Down Expand Up @@ -861,24 +861,27 @@ func (t *TopicApplier) updatePlacementRunner(
}

numRounds := (len(assignmentsToUpdate) + batchSize - 1) / batchSize // Ceil() with integer math
roundScoreboard := color.New(color.FgYellow, color.Bold).SprintfFunc()
highlighter := color.New(color.FgYellow, color.Bold).SprintfFunc()
for i, round := 0, 1; i < len(assignmentsToUpdate); i, round = i+batchSize, round+1 {
end := i + batchSize

if end > len(assignmentsToUpdate) {
end = len(assignmentsToUpdate)
}

var roundLabel string // "x of y" used to mark progress in balancing rounds
roundLabel = highlighter("%d of %d", round, numRounds)
log.Infof(
"Balancing round %s",
roundScoreboard("%d of %d", round, numRounds),
roundLabel,
)

err := t.updatePartitionsIteration(
ctx,
currDiffAssignments[i:end],
assignmentsToUpdate[i:end],
newTopic,
roundLabel,
)
if err != nil {
return err
Expand Down Expand Up @@ -912,6 +915,7 @@ func (t *TopicApplier) updatePartitionsIteration(
currAssignments []admin.PartitionAssignment,
assignmentsToUpdate []admin.PartitionAssignment,
newTopic bool,
roundLabel string,
) error {
idsToUpdate := []int{}
for _, assignment := range assignmentsToUpdate {
Expand Down Expand Up @@ -952,12 +956,14 @@ func (t *TopicApplier) updatePartitionsIteration(
defer checkTimer.Stop()

log.Info("Sleeping then entering check loop")
highlighter := color.New(color.FgYellow, color.Bold).SprintfFunc()
roundStartTime := time.Now()

outerLoop:
for {
select {
case <-checkTimer.C:
log.Info("Checking if all partitions in topic are properly replicated...")
log.Infof("Checking if all partitions in topic %s are properly replicated...", highlighter(t.topicName))

topicInfo, err := t.adminClient.GetTopic(ctx, t.topicName, true)
if err != nil {
Expand All @@ -981,7 +987,7 @@ outerLoop:
partitionInfo := topicInfo.Partitions[assignment.ID]

if !util.SameElements(partitionInfo.Replicas, partitionInfo.ISR) {
log.Infof("Out of sync: %+v, %+v", partitionInfo.Replicas, partitionInfo.ISR)
log.Debugf("Out of sync: %+v, %+v", partitionInfo.Replicas, partitionInfo.ISR)
notReady = append(notReady, partitionInfo)
continue
}
Expand All @@ -997,7 +1003,11 @@ outerLoop:
}

if len(notReady) == 0 {
log.Infof("Partition(s) %+v looks good, continuing", idsToUpdate)
elapsed := time.Now().Sub(roundStartTime)
log.Infof("Partition(s) %+v looks good, continuing (last round duration: %s)",
idsToUpdate,
highlighter("%.1fs", float64(elapsed)/1000000000), // time.Duration is int64 nanoseconds
)
break outerLoop
}
log.Infof(">>> Not ready: %+v", notReady)
Expand All @@ -1008,7 +1018,14 @@ outerLoop:
len(assignmentsToUpdate),
admin.FormatTopicPartitions(notReady, t.brokers),
)
log.Infof("Sleeping for %s", t.config.SleepLoopDuration.String())

var roundString string // convert to " (round x of y)" if roundLabel is present
if roundLabel != "" {
roundString = fmt.Sprintf(" (current round %s, %+v elapsed)", roundLabel, time.Now().Sub(roundStartTime))
} else {
roundString = roundLabel
}
log.Infof("Sleeping for %s%s", t.config.SleepLoopDuration.String(), roundString)
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -1050,7 +1067,7 @@ func (t *TopicApplier) applyThrottles(
var throttledTopic bool

if len(topicConfigEntries) > 0 {
log.Infof("Applying topic throttles: %+v", topicConfigEntries)
log.Infof("Applying topic throttles (%d MB/sec): %+v", t.throttleBytes/1000000, topicConfigEntries)
_, err := t.adminClient.UpdateTopicConfig(
ctx,
t.topicName,
Expand All @@ -1066,14 +1083,16 @@ func (t *TopicApplier) applyThrottles(
throttledBrokers := []int{}

for _, brokerThrottle := range brokerThrottles {
log.Infof("Applying throttle to broker %d", brokerThrottle.Broker)
log.Debugf("Applying throttle to broker %d", brokerThrottle.Broker)
updatedKeys, err := t.adminClient.UpdateBrokerConfig(
ctx,
brokerThrottle.Broker,
brokerThrottle.ConfigEntries(),
false,
)
if err != nil {
log.Infof("Applied throttles to brokers %+v", throttledBrokers) // report on successful ones
log.Errorf("Error occurred applying throttle to broker %d", brokerThrottle.Broker) // log failed one here
return throttledTopic, throttledBrokers, err
}

Expand All @@ -1084,6 +1103,7 @@ func (t *TopicApplier) applyThrottles(
)
}
}
log.Infof("Applied throttles to brokers %+v", throttledBrokers)

return throttledTopic, throttledBrokers, nil
}
Expand Down Expand Up @@ -1123,7 +1143,7 @@ func (t *TopicApplier) removeThottles(
}

for _, throttledBroker := range throttledBrokers {
log.Infof("Removing throttle from broker %d", throttledBroker)
log.Debugf("Removing throttle from broker %d", throttledBroker)
_, brokerErr := t.adminClient.UpdateBrokerConfig(
ctx,
throttledBroker,
Expand All @@ -1148,6 +1168,7 @@ func (t *TopicApplier) removeThottles(
err = multierror.Append(err, brokerErr)
}
}
log.Infof("Removed throttles from brokers %+v", throttledBrokers)

return err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/briandowns/spinner"
"github.com/fatih/color"
"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/check"
Expand Down Expand Up @@ -91,11 +92,13 @@ func (c *CLIRunner) ApplyTopic(
return err
}

highlighter := color.New(color.FgYellow, color.Bold).SprintfFunc()

c.printer(
"Starting apply for topic %s in environment %s, cluster %s",
applierConfig.TopicConfig.Meta.Name,
applierConfig.TopicConfig.Meta.Environment,
applierConfig.TopicConfig.Meta.Cluster,
highlighter(applierConfig.TopicConfig.Meta.Name),
highlighter(applierConfig.TopicConfig.Meta.Environment),
highlighter(applierConfig.TopicConfig.Meta.Cluster),
)

err = applier.Apply(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/messages/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (t *TopicTailer) LogMessages(
messagePrinter = fmt.Sprintf
} else {
dividerPrinter = color.New(color.FgGreen, color.Faint).SprintfFunc()
keyPrinter = color.New(color.FgBlue, color.Bold).SprintfFunc()
keyPrinter = color.New(color.FgCyan, color.Bold).SprintfFunc()
valuePrinter = color.New(color.FgYellow).SprintfFunc()
messagePrinter = fmt.Sprintf
}
Expand Down

0 comments on commit 8cf79d5

Please sign in to comment.