From 26b03e1971846dae3e988f47d271e52fabd04ba2 Mon Sep 17 00:00:00 2001 From: "ys.achinta" Date: Tue, 25 Feb 2020 12:48:15 +0530 Subject: [PATCH 1/6] Add ListConsumerGroups for a topic Co-authored-by: ys-achinta WIP: add method to describe consumer group and filter for specific topic Fix GetConsumerGroupsForTopic and add tests for ListConsumerGroups Refactor displaying consumer groups WIP: add test for get consumer groups for topic --- cmd/list/listconsumergroup.go | 57 +++++++++++++++++++++++++ cmd/list/listconsumergroup_test.go | 67 ++++++++++++++++++++++++++++++ cmd/root.go | 2 + go.mod | 1 + pkg/client/consumerlister.go | 6 +++ pkg/client/saramaclient.go | 49 ++++++++++++++++++++++ pkg/client/saramaclient_test.go | 35 ++++++++++++++++ 7 files changed, 217 insertions(+) create mode 100644 cmd/list/listconsumergroup.go create mode 100644 cmd/list/listconsumergroup_test.go create mode 100644 pkg/client/consumerlister.go diff --git a/cmd/list/listconsumergroup.go b/cmd/list/listconsumergroup.go new file mode 100644 index 0000000..358a2d9 --- /dev/null +++ b/cmd/list/listconsumergroup.go @@ -0,0 +1,57 @@ +package list + +import ( + "strings" + + "github.com/gojek/kat/cmd/base" + "github.com/gojek/kat/logger" + "github.com/gojek/kat/pkg/client" + "github.com/spf13/cobra" +) + +type consumerGroupAdmin struct { + saramaClient client.ConsumerLister +} + +var ListConsumerGroupsCmd = &cobra.Command{ + Use: "list", + Short: "Lists the consumer groups", + Run: func(command *cobra.Command, args []string) { + cobraUtil := base.NewCobraUtil(command) + + addr := strings.Split(cobraUtil.GetStringArg("broker-list"), ",") + + cgl := consumerGroupAdmin{ + saramaClient: client.NewSaramaClient(addr), + } + cgl.ListGroups(cobraUtil.GetStringArg("topic")) + }, +} + +func init() { + ListConsumerGroupsCmd.PersistentFlags().StringP("broker-list", "b", "", "Comma separated list of broker ips") + ListConsumerGroupsCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic") + if err := ListConsumerGroupsCmd.MarkPersistentFlagRequired("broker-list"); err != nil { + logger.Fatal(err) + } +} + +func (c *consumerGroupAdmin) ListGroups(topic string) error { + consumerGroupsMap, err := c.saramaClient.ListConsumerGroups() + if err != nil { + return err + } + + var consumerGroups []string + + for consumerGroupID := range consumerGroupsMap { + consumerGroups = append(consumerGroups, consumerGroupID) + } + + _, err = c.saramaClient.GetConsumerGroupsForTopic(consumerGroups, topic) + if err != nil { + return err + } + + return nil +} diff --git a/cmd/list/listconsumergroup_test.go b/cmd/list/listconsumergroup_test.go new file mode 100644 index 0000000..cb5546f --- /dev/null +++ b/cmd/list/listconsumergroup_test.go @@ -0,0 +1,67 @@ +package list + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "gotest.tools/assert" +) + +type mockConsumerListener struct{ mock.Mock } + +func (m *mockConsumerListener) ListConsumerGroups() (map[string]string, error) { + args := m.Called() + return args.Get(0).(map[string]string), args.Error(1) +} + +func (m *mockConsumerListener) GetConsumerGroupsForTopic(consumerGroups []string, topic string) (chan string, error) { + args := m.Called(consumerGroups, topic) + return args.Get(0).(chan string), args.Error(1) +} + +func TestListGroupsReturnsSuccess(t *testing.T) { + mockConsumer := new(mockConsumerListener) + admin := consumerGroupAdmin{mockConsumer} + mockChannel := make(chan string, 0) + + consumerGroupsMap := map[string]string{"consumer1": "", "consumer2": ""} + mockConsumer.On("ListConsumerGroups").Return(consumerGroupsMap, nil) + + mockConsumer.On("GetConsumerGroupsForTopic", []string{"consumer1", "consumer2"}, "").Return(mockChannel, nil) + + err := admin.ListGroups("") + + require.NoError(t, err) + mockConsumer.AssertExpectations(t) +} + +func TestListGroupsReturnsFailureIfListConsumerGroupsFails(t *testing.T) { + mockConsumer := new(mockConsumerListener) + admin := consumerGroupAdmin{mockConsumer} + multipleConsumers := map[string]string{"consumer1": "", "consumer2": ""} + mockConsumer.On("ListConsumerGroups").Return(multipleConsumers, errors.New("list consumer groups failed")) + + err := admin.ListGroups("") + + require.Error(t, err) + assert.Equal(t, "list consumer groups failed", err.Error()) + mockConsumer.AssertExpectations(t) +} + +func TestListGroupsReturnsFailureIfGetConsumerGroupsFails(t *testing.T) { + mockConsumer := new(mockConsumerListener) + admin := consumerGroupAdmin{mockConsumer} + mockChannel := make(chan string, 0) + + consumerGroupsMap := map[string]string{"consumer1": "", "consumer2": ""} + mockConsumer.On("ListConsumerGroups").Return(consumerGroupsMap, nil) + + mockConsumer.On("GetConsumerGroupsForTopic", []string{"consumer1", "consumer2"}, "").Return(mockChannel, errors.New("get consumer groups failed")) + + err := admin.ListGroups("") + require.Error(t, err) + assert.Equal(t, "get consumer groups failed", err.Error()) + mockConsumer.AssertExpectations(t) +} diff --git a/cmd/root.go b/cmd/root.go index e1f7c65..f4c6f6b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/gojek/kat/cmd/list" "github.com/gojek/kat/cmd/mirror" "github.com/gojek/kat/logger" @@ -20,6 +21,7 @@ func init() { cobra.OnInitialize() cliCmd.AddCommand(topicCmd) cliCmd.AddCommand(mirror.MirrorCmd) + cliCmd.AddCommand(list.ListConsumerGroupsCmd) } func Execute() { diff --git a/go.mod b/go.mod index ca31c65..f77baf7 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/mattn/go-runewidth v0.0.5 // indirect github.com/mitchellh/go-homedir v1.1.0 github.com/olekukonko/tablewriter v0.0.1 + github.com/pkg/errors v0.9.1 // indirect github.com/r3labs/diff v0.0.0-20191018104334-e3ae93f4edbb github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.3 diff --git a/pkg/client/consumerlister.go b/pkg/client/consumerlister.go new file mode 100644 index 0000000..a24b0b6 --- /dev/null +++ b/pkg/client/consumerlister.go @@ -0,0 +1,6 @@ +package client + +type ConsumerLister interface { + ListConsumerGroups() (map[string]string, error) + GetConsumerGroupsForTopic([]string, string) (chan string, error) +} diff --git a/pkg/client/saramaclient.go b/pkg/client/saramaclient.go index 09dc349..b8be766 100644 --- a/pkg/client/saramaclient.go +++ b/pkg/client/saramaclient.go @@ -1,6 +1,9 @@ package client import ( + "fmt" + "sync" + "github.com/Shopify/sarama" "github.com/gojek/kat/logger" ) @@ -10,6 +13,10 @@ type SaramaClient struct { client sarama.Client } +type SaramaMember struct { + member sarama.GroupMemberDescription +} + func NewSaramaClient(addr []string) *SaramaClient { cfg := sarama.NewConfig() cfg.Version = sarama.V2_0_0_0 @@ -49,6 +56,48 @@ func (s *SaramaClient) ListBrokers() map[int]string { return brokerMap } +func (s *SaramaClient) ListConsumerGroups() (map[string]string, error) { + return s.admin.ListConsumerGroups() +} + +func (s *SaramaClient) GetConsumerGroupsForTopic(groups []string, topic string) (chan string, error) { + var wg sync.WaitGroup + consumerGroupsChannel := make(chan string, len(groups)) + + if len(topic) == 0 { + for i := 0; i < len(groups); i++ { + consumerGroupsChannel <- groups[i] + } + close(consumerGroupsChannel) + + return consumerGroupsChannel, nil + } + + for i := 0; i < len(groups); i++ { + wg.Add(1) + go func(i int, wg *sync.WaitGroup) { + defer wg.Done() + groupDescription, _ := s.admin.DescribeConsumerGroups([]string{groups[i]}) + for _, memberDesc := range groupDescription[0].Members { + ma, _ := memberDesc.GetMemberAssignment() + for topicName, _ := range ma.Topics { + if topicName == topic { + consumerGroupsChannel <- groupDescription[0].GroupId + fmt.Println(groupDescription[0].GroupId) + } + } + break + } + }(i, &wg) + } + + wg.Wait() + + close(consumerGroupsChannel) + + return consumerGroupsChannel, nil +} + func (s *SaramaClient) ListTopicDetails() (map[string]TopicDetail, error) { topics, err := s.admin.ListTopics() if err != nil { diff --git a/pkg/client/saramaclient_test.go b/pkg/client/saramaclient_test.go index ebe505c..7c806e9 100644 --- a/pkg/client/saramaclient_test.go +++ b/pkg/client/saramaclient_test.go @@ -7,6 +7,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestSaramaClient_ListTopicDetailsSuccess(t *testing.T) { @@ -269,3 +270,37 @@ func TestSaramaClient_CreatePartitionsFailure(t *testing.T) { assert.Error(t, err) admin.AssertExpectations(t) } + +type mockGroupDescription interface { + GetMemberAssignment() (*sarama.ConsumerGroupMemberAssignment, error) +} + +type mockMemberAssignment struct{} + +func (m *mockMemberAssignment) GetMemberAssignment() (*sarama.ConsumerGroupMemberAssignment, error) { + return &sarama.ConsumerGroupMemberAssignment{ + Version: 1, + Topics: map[string][]int32{"t1": {0, 1, 2}}, + UserData: []byte("ss"), + }, nil +} + +type mockGroupDescriptionStruct struct { + GroupId string + Members map[string]mockMemberAssignment +} + +func TestSaramaClient_GetConsumerGroupsForTopic(t *testing.T) { + admin := &MockClusterAdmin{} + client := SaramaClient{admin: admin} + + admin.On("DescribeConsumerGroups", []string{"c1"}).Return([]*mockGroupDescriptionStruct{{ + GroupId: "c1", + Members: map[string]mockMemberAssignment{"k1": {}}, + }}, nil) + + consumerGroupChannel, err := client.GetConsumerGroupsForTopic([]string{"c1"}, "t1") + + require.NoError(t, err) + assert.Equal(t, 1, len(consumerGroupChannel)) +} From b48b1169b58665dd1af4c0c5871b2034a11f4342 Mon Sep 17 00:00:00 2001 From: "ys.achinta" Date: Fri, 6 Mar 2020 16:44:58 +0530 Subject: [PATCH 2/6] add list consumer groups under a consumer group command --- cmd/consumergroup.go | 22 +++++++++++++++ cmd/list/listconsumergroup.go | 19 ++++++------- cmd/root.go | 3 +-- go.mod | 3 ++- go.sum | 4 +++ pkg/client/saramaclient.go | 11 ++++++++ pkg/client/saramaclient_test.go | 47 +++++++++++++++------------------ 7 files changed, 72 insertions(+), 37 deletions(-) create mode 100644 cmd/consumergroup.go diff --git a/cmd/consumergroup.go b/cmd/consumergroup.go new file mode 100644 index 0000000..e0bf619 --- /dev/null +++ b/cmd/consumergroup.go @@ -0,0 +1,22 @@ +package cmd + +import ( + "github.com/gojek/kat/cmd/list" + "github.com/gojek/kat/logger" + "github.com/spf13/cobra" +) + +var consumerGroupCmd = &cobra.Command{ + Use: "consumergroup", + Short: "Admin commands on consumergroups", +} + +func init() { + consumerGroupCmd.PersistentFlags().StringP("broker-list", "b", "", "Comma separated list of broker ips") + consumerGroupCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic") + if err := consumerGroupCmd.MarkPersistentFlagRequired("broker-list"); err != nil { + logger.Fatal(err) + } + + consumerGroupCmd.AddCommand(list.ListConsumerGroupsCmd) +} diff --git a/cmd/list/listconsumergroup.go b/cmd/list/listconsumergroup.go index 358a2d9..b02e1f6 100644 --- a/cmd/list/listconsumergroup.go +++ b/cmd/list/listconsumergroup.go @@ -1,10 +1,10 @@ package list import ( + "sort" "strings" "github.com/gojek/kat/cmd/base" - "github.com/gojek/kat/logger" "github.com/gojek/kat/pkg/client" "github.com/spf13/cobra" ) @@ -28,20 +28,21 @@ var ListConsumerGroupsCmd = &cobra.Command{ }, } -func init() { - ListConsumerGroupsCmd.PersistentFlags().StringP("broker-list", "b", "", "Comma separated list of broker ips") - ListConsumerGroupsCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic") - if err := ListConsumerGroupsCmd.MarkPersistentFlagRequired("broker-list"); err != nil { - logger.Fatal(err) - } -} - func (c *consumerGroupAdmin) ListGroups(topic string) error { consumerGroupsMap, err := c.saramaClient.ListConsumerGroups() if err != nil { return err } + consumerGroupList := make([]string, len(consumerGroupsMap)) + for group := range consumerGroupsMap { + consumerGroupList = append(consumerGroupList, group) + } + + sort.Slice(consumerGroupList, func(i int, j int) bool { + return consumerGroupList[i] < consumerGroupList[j] + }) + var consumerGroups []string for consumerGroupID := range consumerGroupsMap { diff --git a/cmd/root.go b/cmd/root.go index f4c6f6b..a151337 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -4,7 +4,6 @@ import ( "fmt" "os" - "github.com/gojek/kat/cmd/list" "github.com/gojek/kat/cmd/mirror" "github.com/gojek/kat/logger" @@ -21,7 +20,7 @@ func init() { cobra.OnInitialize() cliCmd.AddCommand(topicCmd) cliCmd.AddCommand(mirror.MirrorCmd) - cliCmd.AddCommand(list.ListConsumerGroupsCmd) + cliCmd.AddCommand(consumerGroupCmd) } func Execute() { diff --git a/go.mod b/go.mod index f77baf7..cec980b 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/mattn/go-runewidth v0.0.5 // indirect github.com/mitchellh/go-homedir v1.1.0 github.com/olekukonko/tablewriter v0.0.1 - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/r3labs/diff v0.0.0-20191018104334-e3ae93f4edbb github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.3 @@ -19,4 +19,5 @@ require ( github.com/stretchr/testify v1.3.0 golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf golang.org/x/net v0.0.0-20191028085509-fe3aa8a45271 // indirect + gotest.tools v2.2.0+incompatible ) diff --git a/go.sum b/go.sum index f02b585..26fb2c3 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8u github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw= github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/r3labs/diff v0.0.0-20191018104334-e3ae93f4edbb h1:kaV32NbiIn7ESdHB4PEW2VTKhB0odk9wo4/yW2acmoo= @@ -95,3 +97,5 @@ gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc440 gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= diff --git a/pkg/client/saramaclient.go b/pkg/client/saramaclient.go index b8be766..c7c720e 100644 --- a/pkg/client/saramaclient.go +++ b/pkg/client/saramaclient.go @@ -80,6 +80,17 @@ func (s *SaramaClient) GetConsumerGroupsForTopic(groups []string, topic string) groupDescription, _ := s.admin.DescribeConsumerGroups([]string{groups[i]}) for _, memberDesc := range groupDescription[0].Members { ma, _ := memberDesc.GetMemberAssignment() + fmt.Println("----version----") + fmt.Println(ma.Version) + fmt.Println("----version----") + + fmt.Println("----userdata----") + fmt.Println(ma.UserData) + fmt.Println("----userdata----") + + fmt.Println("----topics----") + fmt.Println(ma.Topics) + fmt.Println("----topics----") for topicName, _ := range ma.Topics { if topicName == topic { consumerGroupsChannel <- groupDescription[0].GroupId diff --git a/pkg/client/saramaclient_test.go b/pkg/client/saramaclient_test.go index 7c806e9..f51e579 100644 --- a/pkg/client/saramaclient_test.go +++ b/pkg/client/saramaclient_test.go @@ -270,36 +270,33 @@ func TestSaramaClient_CreatePartitionsFailure(t *testing.T) { assert.Error(t, err) admin.AssertExpectations(t) } - -type mockGroupDescription interface { - GetMemberAssignment() (*sarama.ConsumerGroupMemberAssignment, error) -} - -type mockMemberAssignment struct{} - -func (m *mockMemberAssignment) GetMemberAssignment() (*sarama.ConsumerGroupMemberAssignment, error) { - return &sarama.ConsumerGroupMemberAssignment{ - Version: 1, - Topics: map[string][]int32{"t1": {0, 1, 2}}, - UserData: []byte("ss"), - }, nil -} - -type mockGroupDescriptionStruct struct { - GroupId string - Members map[string]mockMemberAssignment -} - func TestSaramaClient_GetConsumerGroupsForTopic(t *testing.T) { admin := &MockClusterAdmin{} client := SaramaClient{admin: admin} - admin.On("DescribeConsumerGroups", []string{"c1"}).Return([]*mockGroupDescriptionStruct{{ - GroupId: "c1", - Members: map[string]mockMemberAssignment{"k1": {}}, - }}, nil) + groupDesciption := []*sarama.GroupDescription{{ + GroupId: "test-group-id", + Members: map[string]*sarama.GroupMemberDescription{ + "instance-id-0": { + ClientId: "instance-id-0", + MemberAssignment: []byte{0x04, 0x05, 0x06}, + }, + + "instance-id-1": { + ClientId: "instance-id-1", + MemberAssignment: []byte{0x04, 0x05, 0x06}, + }, + + "instance-id-2": { + ClientId: "instance-id-2", + MemberAssignment: []byte{0x04, 0x05, 0x06}, + }, + }, + }} + + admin.On("DescribeConsumerGroups", []string{"test-group-id"}).Return(groupDesciption, nil) - consumerGroupChannel, err := client.GetConsumerGroupsForTopic([]string{"c1"}, "t1") + consumerGroupChannel, err := client.GetConsumerGroupsForTopic([]string{"test-group-id"}, "test-topic") require.NoError(t, err) assert.Equal(t, 1, len(consumerGroupChannel)) From 7604eda831a64a4a71a35adf25bbb7986376b2ee Mon Sep 17 00:00:00 2001 From: "ys.achinta" Date: Mon, 9 Mar 2020 17:09:44 +0530 Subject: [PATCH 3/6] make topic flag mandatory. add error logging --- cmd/consumergroup.go | 6 +++++- pkg/client/saramaclient.go | 27 +++++---------------------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/cmd/consumergroup.go b/cmd/consumergroup.go index e0bf619..62ab8af 100644 --- a/cmd/consumergroup.go +++ b/cmd/consumergroup.go @@ -13,10 +13,14 @@ var consumerGroupCmd = &cobra.Command{ func init() { consumerGroupCmd.PersistentFlags().StringP("broker-list", "b", "", "Comma separated list of broker ips") - consumerGroupCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic") if err := consumerGroupCmd.MarkPersistentFlagRequired("broker-list"); err != nil { logger.Fatal(err) } + consumerGroupCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic") + if err := consumerGroupCmd.MarkPersistentFlagRequired("topic"); err != nil { + logger.Fatal(err) + } + consumerGroupCmd.AddCommand(list.ListConsumerGroupsCmd) } diff --git a/pkg/client/saramaclient.go b/pkg/client/saramaclient.go index c7c720e..803d1de 100644 --- a/pkg/client/saramaclient.go +++ b/pkg/client/saramaclient.go @@ -64,34 +64,17 @@ func (s *SaramaClient) GetConsumerGroupsForTopic(groups []string, topic string) var wg sync.WaitGroup consumerGroupsChannel := make(chan string, len(groups)) - if len(topic) == 0 { - for i := 0; i < len(groups); i++ { - consumerGroupsChannel <- groups[i] - } - close(consumerGroupsChannel) - - return consumerGroupsChannel, nil - } - for i := 0; i < len(groups); i++ { wg.Add(1) go func(i int, wg *sync.WaitGroup) { defer wg.Done() - groupDescription, _ := s.admin.DescribeConsumerGroups([]string{groups[i]}) + groupDescription, err := s.admin.DescribeConsumerGroups([]string{groups[i]}) + if err != nil { + logger.Fatalf("Err on describing consumer group %s: %v\n", groups[i], err) + } for _, memberDesc := range groupDescription[0].Members { ma, _ := memberDesc.GetMemberAssignment() - fmt.Println("----version----") - fmt.Println(ma.Version) - fmt.Println("----version----") - - fmt.Println("----userdata----") - fmt.Println(ma.UserData) - fmt.Println("----userdata----") - - fmt.Println("----topics----") - fmt.Println(ma.Topics) - fmt.Println("----topics----") - for topicName, _ := range ma.Topics { + for topicName := range ma.Topics { if topicName == topic { consumerGroupsChannel <- groupDescription[0].GroupId fmt.Println(groupDescription[0].GroupId) From 622e29094b54a8e36d1bac7c6bea31d38fcb9d0f Mon Sep 17 00:00:00 2001 From: Anirudh M Date: Tue, 17 Mar 2020 13:47:46 +0530 Subject: [PATCH 4/6] Abstract GetMemberAssignment into HasSubscription method --- cmd/list/listconsumergroup.go | 6 +++++- pkg/client/saramaclient.go | 30 +++++++++++++++++++----------- pkg/client/saramaclient_test.go | 3 +-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/cmd/list/listconsumergroup.go b/cmd/list/listconsumergroup.go index b02e1f6..9c02ab9 100644 --- a/cmd/list/listconsumergroup.go +++ b/cmd/list/listconsumergroup.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/gojek/kat/cmd/base" + "github.com/gojek/kat/logger" "github.com/gojek/kat/pkg/client" "github.com/spf13/cobra" ) @@ -24,7 +25,10 @@ var ListConsumerGroupsCmd = &cobra.Command{ cgl := consumerGroupAdmin{ saramaClient: client.NewSaramaClient(addr), } - cgl.ListGroups(cobraUtil.GetStringArg("topic")) + err := cgl.ListGroups(cobraUtil.GetStringArg("topic")) + if err != nil { + logger.Fatalf("Error while listing consumer groups for %v topic", err) + } }, } diff --git a/pkg/client/saramaclient.go b/pkg/client/saramaclient.go index 803d1de..d1cd2af 100644 --- a/pkg/client/saramaclient.go +++ b/pkg/client/saramaclient.go @@ -13,8 +13,19 @@ type SaramaClient struct { client sarama.Client } -type SaramaMember struct { - member sarama.GroupMemberDescription +type consumerGroups map[string]*sarama.GroupMemberDescription + +func (c *consumerGroups) HasSubscription(topic string) bool { + for _, memberDesc := range *c { + ma, _ := memberDesc.GetMemberAssignment() + for topicName := range ma.Topics { + if topicName == topic { + return true + } + } + break + } + return false } func NewSaramaClient(addr []string) *SaramaClient { @@ -72,15 +83,12 @@ func (s *SaramaClient) GetConsumerGroupsForTopic(groups []string, topic string) if err != nil { logger.Fatalf("Err on describing consumer group %s: %v\n", groups[i], err) } - for _, memberDesc := range groupDescription[0].Members { - ma, _ := memberDesc.GetMemberAssignment() - for topicName := range ma.Topics { - if topicName == topic { - consumerGroupsChannel <- groupDescription[0].GroupId - fmt.Println(groupDescription[0].GroupId) - } - } - break + + var c consumerGroups = groupDescription[0].Members + + if c.HasSubscription(topic) { + consumerGroupsChannel <- groupDescription[0].GroupId + fmt.Println(groupDescription[0].GroupId) } }(i, &wg) } diff --git a/pkg/client/saramaclient_test.go b/pkg/client/saramaclient_test.go index f51e579..acc912f 100644 --- a/pkg/client/saramaclient_test.go +++ b/pkg/client/saramaclient_test.go @@ -296,8 +296,7 @@ func TestSaramaClient_GetConsumerGroupsForTopic(t *testing.T) { admin.On("DescribeConsumerGroups", []string{"test-group-id"}).Return(groupDesciption, nil) - consumerGroupChannel, err := client.GetConsumerGroupsForTopic([]string{"test-group-id"}, "test-topic") + _, err := client.GetConsumerGroupsForTopic([]string{"test-group-id"}, "test-topic") require.NoError(t, err) - assert.Equal(t, 1, len(consumerGroupChannel)) } From 07f5a5065d054f9bd4afe1943806e07a60a887e4 Mon Sep 17 00:00:00 2001 From: Anirudh M Date: Tue, 17 Mar 2020 14:59:27 +0530 Subject: [PATCH 5/6] Release only on master --- .circleci/config.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 644584e..b5aac90 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -99,3 +99,6 @@ workflows: - release: requires: - test + filters: + branches: + only: master From 3353211aacd6ef7fb81f83ba22e1f727d419a2de Mon Sep 17 00:00:00 2001 From: Anirudh M Date: Tue, 17 Mar 2020 15:20:30 +0530 Subject: [PATCH 6/6] Update README and remove gotest.tools --- README.md | 7 +++++++ cmd/list/listconsumergroup_test.go | 2 +- go.mod | 1 - go.sum | 2 -- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index bb3f3a8..4db80cc 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ go install github.com/gojek/kat - [List Topics](#list-topics) - [Describe Topics](#describe-topics) - [Delete Topics](#delete-topics) +- [List Consumer Groups for a topic](#list-consumer-groups-for-a-topic) - [Increase Replication Factor](#increase-replication-factor) - [Reassign Partitions](#reassign-partitions) - [Show Topic Configs](#show-topic-configs) @@ -84,6 +85,12 @@ kat topic delete --broker-list <"broker1:9092,broker2:9092"> --last-write= --last-write= --data-dir= --topic-blacklist=<*test*> ``` +### List Consumer Groups for a Topic +* Lists all the consumer groups that are subscribed to a given topic +``` +kat consumergroup list -b <"broker1:9092,broker2:9092"> -t +``` + ### Increase Replication Factor * Increase the replication factor of topics that match given regex ``` diff --git a/cmd/list/listconsumergroup_test.go b/cmd/list/listconsumergroup_test.go index cb5546f..4deb694 100644 --- a/cmd/list/listconsumergroup_test.go +++ b/cmd/list/listconsumergroup_test.go @@ -4,9 +4,9 @@ import ( "testing" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "gotest.tools/assert" ) type mockConsumerListener struct{ mock.Mock } diff --git a/go.mod b/go.mod index cec980b..46e4b9a 100644 --- a/go.mod +++ b/go.mod @@ -19,5 +19,4 @@ require ( github.com/stretchr/testify v1.3.0 golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf golang.org/x/net v0.0.0-20191028085509-fe3aa8a45271 // indirect - gotest.tools v2.2.0+incompatible ) diff --git a/go.sum b/go.sum index 26fb2c3..033639e 100644 --- a/go.sum +++ b/go.sum @@ -97,5 +97,3 @@ gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc440 gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= -gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= -gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=