From d7f7c3263514bb7560ba090b555f8dc997c5c622 Mon Sep 17 00:00:00 2001 From: Benjamin Yolken <54862872+yolken-segment@users.noreply.github.com> Date: Fri, 9 Jul 2021 12:03:14 -0700 Subject: [PATCH] Support sorting member lags by value (#39) * Update cli parsing * Update command parsing * Check flags more strictly * Add checkArgs tests * Bump version --- cmd/topicctl/subcmd/get.go | 15 ++++- pkg/cli/cli.go | 13 ++++- pkg/cli/command.go | 79 +++++++++++++++++++++++++ pkg/cli/command_test.go | 71 +++++++++++++++++++++++ pkg/cli/repl.go | 115 +++++++++++++++++++------------------ pkg/version/version.go | 2 +- 6 files changed, 237 insertions(+), 58 deletions(-) create mode 100644 pkg/cli/command.go create mode 100644 pkg/cli/command_test.go diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index a2f22475..d2ea35f3 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -35,6 +35,7 @@ var getCmd = &cobra.Command{ type getCmdConfig struct { clusterConfig string full bool + sortValues bool zkAddr string zkPrefix string } @@ -54,6 +55,12 @@ func init() { false, "Show more full information for resources", ) + getCmd.Flags().BoolVar( + &getConfig.sortValues, + "sort-values", + false, + "Sort by value instead of name; only applies for lags at the moment", + ) getCmd.Flags().StringVarP( &getConfig.zkAddr, "zk-addr", @@ -153,7 +160,13 @@ func getRun(cmd *cobra.Command, args []string) error { return fmt.Errorf("Must provide topic and groupID as additional positional arguments") } - return cliRunner.GetMemberLags(ctx, args[1], args[2], getConfig.full) + return cliRunner.GetMemberLags( + ctx, + args[1], + args[2], + getConfig.full, + getConfig.sortValues, + ) case "members": if len(args) != 2 { return fmt.Errorf("Must provide group ID as second positional argument") diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index e3b0e6d9..084bd4ca 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "regexp" + "sort" "strconv" "strings" "time" @@ -357,6 +358,7 @@ func (c *CLIRunner) GetMemberLags( topic string, groupID string, full bool, + sortByValues bool, ) error { c.startSpinner() @@ -375,7 +377,16 @@ func (c *CLIRunner) GetMemberLags( return err } - c.printer("Group member lags:\n%s", groups.FormatMemberLags(memberLags, full)) + if sortByValues { + sort.Slice(memberLags, func(a, b int) bool { + return memberLags[a].TimeLag() < memberLags[b].TimeLag() + }) + } + + c.printer( + "Group member lags:\n%s", + groups.FormatMemberLags(memberLags, full), + ) return nil } diff --git a/pkg/cli/command.go b/pkg/cli/command.go new file mode 100644 index 00000000..c117bd30 --- /dev/null +++ b/pkg/cli/command.go @@ -0,0 +1,79 @@ +package cli + +import ( + "fmt" + "strings" +) + +type replCommand struct { + args []string + flags map[string]string +} + +func (r replCommand) getBoolValue(key string) bool { + value, ok := r.flags[key] + + if value == "true" { + return true + } else if value == "" && ok { + // If key is set but value is not, treat this as "true" + return true + } else { + return false + } +} + +func (r replCommand) checkArgs( + minArgs int, + maxArgs int, + allowedFlags map[string]struct{}, +) error { + if minArgs == maxArgs { + if len(r.args) != minArgs { + return fmt.Errorf("Expected %d args", minArgs) + } + } else { + if len(r.args) < minArgs || len(r.args) > maxArgs { + return fmt.Errorf("Expected between %d and %d args", minArgs, maxArgs) + } + } + + for key := range r.flags { + if allowedFlags == nil { + return fmt.Errorf("Flag %s not recognized", key) + } + if _, ok := allowedFlags[key]; !ok { + return fmt.Errorf("Flag %s not recognized", key) + } + } + + return nil +} + +func parseReplInputs(input string) replCommand { + args := []string{} + flags := map[string]string{} + + components := strings.Split(input, " ") + + for c, component := range components { + if component == "" { + continue + } else if c > 0 && strings.HasPrefix(component, "--") { + subcomponents := strings.SplitN(component, "=", 2) + key := subcomponents[0][2:] + var value string + if len(subcomponents) > 1 { + value = subcomponents[1] + } + flags[key] = value + } else { + args = append(args, component) + } + } + + return replCommand{ + args: args, + flags: flags, + } +} diff --git a/pkg/cli/command_test.go b/pkg/cli/command_test.go new file mode 100644 index 00000000..0359d155 --- /dev/null +++ b/pkg/cli/command_test.go @@ -0,0 +1,71 @@ +package cli + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseReplInputs(t *testing.T) { + assert.Equal( + t, + replCommand{ + args: []string{"arg1", "arg2"}, + flags: map[string]string{}, + }, + parseReplInputs("arg1 arg2"), + ) + assert.Equal( + t, + replCommand{ + args: []string{"--flag1=value1", "arg1", "arg2"}, + flags: map[string]string{}, + }, + parseReplInputs("--flag1=value1 arg1 arg2"), + ) + assert.Equal( + t, + replCommand{ + args: []string{"arg1", "arg2", "arg3"}, + flags: map[string]string{ + "flag1": "value1", + "flag2": "value2", + }, + }, + parseReplInputs("arg1 arg2 --flag1=value1 arg3 --flag2=value2"), + ) +} + +func TestGetBoolValue(t *testing.T) { + command := replCommand{ + flags: map[string]string{ + "key1": "", + "key2": "true", + "key3": "false", + }, + } + assert.True(t, command.getBoolValue("key1")) + assert.True(t, command.getBoolValue("key2")) + assert.False(t, command.getBoolValue("key3")) + assert.False(t, command.getBoolValue("non-existent-key")) +} + +func TestCheckArgs(t *testing.T) { + command := replCommand{ + args: []string{ + "arg1", + "arg2", + }, + flags: map[string]string{ + "key1": "value1", + }, + } + assert.NoError(t, command.checkArgs(2, 2, map[string]struct{}{"key1": {}})) + assert.NoError(t, command.checkArgs(2, 3, map[string]struct{}{"key1": {}})) + assert.NoError(t, command.checkArgs(1, 2, map[string]struct{}{"key1": {}})) + assert.NoError(t, command.checkArgs(1, 2, map[string]struct{}{"key1": {}, "key2": {}})) + assert.Error(t, command.checkArgs(3, 3, map[string]struct{}{"key1": {}})) + assert.Error(t, command.checkArgs(3, 5, map[string]struct{}{"key1": {}})) + assert.Error(t, command.checkArgs(2, 2, map[string]struct{}{"key2": {}})) + assert.Error(t, command.checkArgs(2, 2, nil)) +} diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index 1c4a5a3f..476142dd 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -204,26 +204,30 @@ func (r *Repl) executor(in string) { }() defer signal.Stop(sigChan) - words := strings.Split(in, " ") - switch words[0] { + command := parseReplInputs(in) + if len(command.args) == 0 { + return + } + + switch command.args[0] { case "exit": fmt.Println("Bye!") os.Exit(0) case "get": - if len(words) == 1 { + if len(command.args) == 1 { log.Error("Unrecognized input. Run 'help' for details on available commands.") return } - switch words[1] { + switch command.args[1] { case "balance": - if err := checkArgsMax(words, 3); err != nil { + if err := command.checkArgs(2, 3, nil); err != nil { log.Errorf("Error: %+v", err) return } var topicName string - if len(words) == 3 { - topicName = words[2] + if len(command.args) == 3 { + topicName = command.args[2] } if err := r.cliRunner.GetBrokerBalance(ctx, topicName); err != nil { @@ -231,25 +235,25 @@ func (r *Repl) executor(in string) { return } case "brokers": - if err := checkArgs(words, 2); err != nil { + if err := command.checkArgs(2, 2, map[string]struct{}{"full": {}}); err != nil { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetBrokers(ctx, false); err != nil { + if err := r.cliRunner.GetBrokers(ctx, command.getBoolValue("full")); err != nil { log.Errorf("Error: %+v", err) return } case "config": - if err := checkArgs(words, 3); err != nil { + if err := command.checkArgs(3, 3, nil); err != nil { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetConfig(ctx, words[2]); err != nil { + if err := r.cliRunner.GetConfig(ctx, command.args[2]); err != nil { log.Errorf("Error: %+v", err) return } case "groups": - if err := checkArgs(words, 2); err != nil { + if err := command.checkArgs(2, 2, nil); err != nil { log.Errorf("Error: %+v", err) return } @@ -258,43 +262,57 @@ func (r *Repl) executor(in string) { return } case "lags": - if err := checkArgs(words, 4); err != nil { + if err := command.checkArgs( + 4, + 4, + map[string]struct{}{"full": {}, "sort-values": {}}, + ); err != nil { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetMemberLags(ctx, words[2], words[3], false); err != nil { + if err := r.cliRunner.GetMemberLags( + ctx, + command.args[2], + command.args[3], + command.getBoolValue("full"), + command.getBoolValue("sort-values"), + ); err != nil { log.Errorf("Error: %+v", err) return } case "members": - if err := checkArgs(words, 3); err != nil { + if err := command.checkArgs(3, 3, map[string]struct{}{"full": {}}); err != nil { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetGroupMembers(ctx, words[2], false); err != nil { + if err := r.cliRunner.GetGroupMembers( + ctx, + command.args[2], + command.getBoolValue("full"), + ); err != nil { log.Errorf("Error: %+v", err) return } case "partitions": - if err := checkArgs(words, 3); err != nil { + if err := command.checkArgs(3, 3, nil); err != nil { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetPartitions(ctx, words[2]); err != nil { + if err := r.cliRunner.GetPartitions(ctx, command.args[2]); err != nil { log.Errorf("Error: %+v", err) return } case "offsets": - if err := checkArgs(words, 3); err != nil { + if err := command.checkArgs(3, 3, nil); err != nil { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetOffsets(ctx, words[2]); err != nil { + if err := r.cliRunner.GetOffsets(ctx, command.args[2]); err != nil { log.Errorf("Error: %+v", err) return } case "topics": - if err := checkArgs(words, 2); err != nil { + if err := command.checkArgs(2, 2, nil); err != nil { log.Errorf("Error: %+v", err) return } @@ -306,31 +324,39 @@ func (r *Repl) executor(in string) { log.Error("Unrecognized input. Run 'help' for details on available commands.") } case "help": - fmt.Printf("> Commands:\n%s\n", helpTableStr) - return - case "tail": - if err := checkArgsMin(words, 2); err != nil { + if err := command.checkArgs(1, 1, nil); err != nil { log.Errorf("Error: %+v", err) return } - if err := checkArgsMax(words, 3); err != nil { + + fmt.Printf("> Commands:\n%s\n", helpTableStr) + return + case "tail": + if err := command.checkArgs( + 2, + 3, + map[string]struct{}{"filter": {}, "raw": {}}, + ); err != nil { log.Errorf("Error: %+v", err) return } + // Support filter as either an arg or a flag for backwards-compatibility purposes var filterRegexp string - if len(words) == 3 { - filterRegexp = words[2] + if len(command.args) == 3 { + filterRegexp = command.args[2] + } else { + filterRegexp = command.flags["filter"] } err := r.cliRunner.Tail( ctx, - words[1], + command.args[1], kafka.LastOffset, nil, -1, filterRegexp, - false, + command.getBoolValue("raw"), ) if err != nil { log.Errorf("Error: %+v", err) @@ -376,27 +402,6 @@ func (r *Repl) completer(doc prompt.Document) []prompt.Suggest { ) } -func checkArgs(args []string, expectedCount int) error { - if len(args) != expectedCount { - return fmt.Errorf("Expected %d args", expectedCount) - } - return nil -} - -func checkArgsMin(args []string, expectedCount int) error { - if len(args) < expectedCount { - return fmt.Errorf("Expected at least %d args", expectedCount) - } - return nil -} - -func checkArgsMax(args []string, expectedCount int) error { - if len(args) > expectedCount { - return fmt.Errorf("Expected at most %d args", expectedCount) - } - return nil -} - func helpTable() string { buf := &bytes.Buffer{} @@ -425,7 +430,7 @@ func helpTable() string { "Get positions of all brokers in topic or across cluster", }, { - " get brokers", + " get brokers [--full]", "Get all brokers", }, { @@ -437,11 +442,11 @@ func helpTable() string { "Get all consumer groups", }, { - " get lags [topic] [group]", + " get lags [topic] [group] [--full] [--sort-values]", "Get consumer group lags for all partitions in a topic", }, { - " get members [group]", + " get members [group] [--full]", "Get the members of a consumer group", }, { @@ -457,7 +462,7 @@ func helpTable() string { "Get all topics", }, { - " tail [topic] [optional filter regexp]", + " tail [topic] [optional filter regexp] [--raw]", "Tail all messages in a topic", }, { diff --git a/pkg/version/version.go b/pkg/version/version.go index dad4cfb8..3d71666b 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -1,4 +1,4 @@ package version // Version is the current topicctl version. -const Version = "0.0.3" +const Version = "0.0.4"