Skip to content

Commit

Permalink
Merge pull request #13 from gojek/listConsumerGroups
Browse files Browse the repository at this point in the history
Add support for ListConsumerGroups for a topic closes #7
  • Loading branch information
devdinu authored Mar 17, 2020
2 parents d200de6 + 3353211 commit 719698d
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,6 @@ workflows:
- release:
requires:
- test
filters:
branches:
only: master
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go install github.com/gojek/kat
- [List Topics](#list-topics)
- [Describe Topics](#describe-topics)
- [Delete Topics](#delete-topics)
- [List Consumer Groups for a topic](#list-consumer-groups-for-a-topic)
- [Increase Replication Factor](#increase-replication-factor)
- [Reassign Partitions](#reassign-partitions)
- [Show Topic Configs](#show-topic-configs)
Expand Down Expand Up @@ -84,6 +85,12 @@ kat topic delete --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch time> --data-dir=<kafka logs directory> --topic-blacklist=<*test*>
```

### List Consumer Groups for a Topic
* Lists all the consumer groups that are subscribed to a given topic
```
kat consumergroup list -b <"broker1:9092,broker2:9092"> -t <topic-name>
```

### Increase Replication Factor
* Increase the replication factor of topics that match given regex
```
Expand Down
26 changes: 26 additions & 0 deletions cmd/consumergroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cmd

import (
"github.com/gojek/kat/cmd/list"
"github.com/gojek/kat/logger"
"github.com/spf13/cobra"
)

var consumerGroupCmd = &cobra.Command{
Use: "consumergroup",
Short: "Admin commands on consumergroups",
}

func init() {
consumerGroupCmd.PersistentFlags().StringP("broker-list", "b", "", "Comma separated list of broker ips")
if err := consumerGroupCmd.MarkPersistentFlagRequired("broker-list"); err != nil {
logger.Fatal(err)
}

consumerGroupCmd.PersistentFlags().StringP("topic", "t", "", "Specify topic")
if err := consumerGroupCmd.MarkPersistentFlagRequired("topic"); err != nil {
logger.Fatal(err)
}

consumerGroupCmd.AddCommand(list.ListConsumerGroupsCmd)
}
62 changes: 62 additions & 0 deletions cmd/list/listconsumergroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package list

import (
"sort"
"strings"

"github.com/gojek/kat/cmd/base"
"github.com/gojek/kat/logger"
"github.com/gojek/kat/pkg/client"
"github.com/spf13/cobra"
)

type consumerGroupAdmin struct {
saramaClient client.ConsumerLister
}

var ListConsumerGroupsCmd = &cobra.Command{
Use: "list",
Short: "Lists the consumer groups",
Run: func(command *cobra.Command, args []string) {
cobraUtil := base.NewCobraUtil(command)

addr := strings.Split(cobraUtil.GetStringArg("broker-list"), ",")

cgl := consumerGroupAdmin{
saramaClient: client.NewSaramaClient(addr),
}
err := cgl.ListGroups(cobraUtil.GetStringArg("topic"))
if err != nil {
logger.Fatalf("Error while listing consumer groups for %v topic", err)
}
},
}

func (c *consumerGroupAdmin) ListGroups(topic string) error {
consumerGroupsMap, err := c.saramaClient.ListConsumerGroups()
if err != nil {
return err
}

consumerGroupList := make([]string, len(consumerGroupsMap))
for group := range consumerGroupsMap {
consumerGroupList = append(consumerGroupList, group)
}

sort.Slice(consumerGroupList, func(i int, j int) bool {
return consumerGroupList[i] < consumerGroupList[j]
})

var consumerGroups []string

for consumerGroupID := range consumerGroupsMap {
consumerGroups = append(consumerGroups, consumerGroupID)
}

_, err = c.saramaClient.GetConsumerGroupsForTopic(consumerGroups, topic)
if err != nil {
return err
}

return nil
}
67 changes: 67 additions & 0 deletions cmd/list/listconsumergroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package list

import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

type mockConsumerListener struct{ mock.Mock }

func (m *mockConsumerListener) ListConsumerGroups() (map[string]string, error) {
args := m.Called()
return args.Get(0).(map[string]string), args.Error(1)
}

func (m *mockConsumerListener) GetConsumerGroupsForTopic(consumerGroups []string, topic string) (chan string, error) {
args := m.Called(consumerGroups, topic)
return args.Get(0).(chan string), args.Error(1)
}

func TestListGroupsReturnsSuccess(t *testing.T) {
mockConsumer := new(mockConsumerListener)
admin := consumerGroupAdmin{mockConsumer}
mockChannel := make(chan string, 0)

consumerGroupsMap := map[string]string{"consumer1": "", "consumer2": ""}
mockConsumer.On("ListConsumerGroups").Return(consumerGroupsMap, nil)

mockConsumer.On("GetConsumerGroupsForTopic", []string{"consumer1", "consumer2"}, "").Return(mockChannel, nil)

err := admin.ListGroups("")

require.NoError(t, err)
mockConsumer.AssertExpectations(t)
}

func TestListGroupsReturnsFailureIfListConsumerGroupsFails(t *testing.T) {
mockConsumer := new(mockConsumerListener)
admin := consumerGroupAdmin{mockConsumer}
multipleConsumers := map[string]string{"consumer1": "", "consumer2": ""}
mockConsumer.On("ListConsumerGroups").Return(multipleConsumers, errors.New("list consumer groups failed"))

err := admin.ListGroups("")

require.Error(t, err)
assert.Equal(t, "list consumer groups failed", err.Error())
mockConsumer.AssertExpectations(t)
}

func TestListGroupsReturnsFailureIfGetConsumerGroupsFails(t *testing.T) {
mockConsumer := new(mockConsumerListener)
admin := consumerGroupAdmin{mockConsumer}
mockChannel := make(chan string, 0)

consumerGroupsMap := map[string]string{"consumer1": "", "consumer2": ""}
mockConsumer.On("ListConsumerGroups").Return(consumerGroupsMap, nil)

mockConsumer.On("GetConsumerGroupsForTopic", []string{"consumer1", "consumer2"}, "").Return(mockChannel, errors.New("get consumer groups failed"))

err := admin.ListGroups("")
require.Error(t, err)
assert.Equal(t, "get consumer groups failed", err.Error())
mockConsumer.AssertExpectations(t)
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func init() {
cobra.OnInitialize()
cliCmd.AddCommand(topicCmd)
cliCmd.AddCommand(mirror.MirrorCmd)
cliCmd.AddCommand(consumerGroupCmd)
}

func Execute() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/mattn/go-runewidth v0.0.5 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/olekukonko/tablewriter v0.0.1
github.com/pkg/errors v0.9.1
github.com/r3labs/diff v0.0.0-20191018104334-e3ae93f4edbb
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8u
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw=
github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/r3labs/diff v0.0.0-20191018104334-e3ae93f4edbb h1:kaV32NbiIn7ESdHB4PEW2VTKhB0odk9wo4/yW2acmoo=
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/consumerlister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package client

type ConsumerLister interface {
ListConsumerGroups() (map[string]string, error)
GetConsumerGroupsForTopic([]string, string) (chan string, error)
}
51 changes: 51 additions & 0 deletions pkg/client/saramaclient.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package client

import (
"fmt"
"sync"

"github.com/Shopify/sarama"
"github.com/gojek/kat/logger"
)
Expand All @@ -10,6 +13,21 @@ type SaramaClient struct {
client sarama.Client
}

type consumerGroups map[string]*sarama.GroupMemberDescription

func (c *consumerGroups) HasSubscription(topic string) bool {
for _, memberDesc := range *c {
ma, _ := memberDesc.GetMemberAssignment()
for topicName := range ma.Topics {
if topicName == topic {
return true
}
}
break
}
return false
}

func NewSaramaClient(addr []string) *SaramaClient {
cfg := sarama.NewConfig()
cfg.Version = sarama.V2_0_0_0
Expand Down Expand Up @@ -49,6 +67,39 @@ func (s *SaramaClient) ListBrokers() map[int]string {
return brokerMap
}

func (s *SaramaClient) ListConsumerGroups() (map[string]string, error) {
return s.admin.ListConsumerGroups()
}

func (s *SaramaClient) GetConsumerGroupsForTopic(groups []string, topic string) (chan string, error) {
var wg sync.WaitGroup
consumerGroupsChannel := make(chan string, len(groups))

for i := 0; i < len(groups); i++ {
wg.Add(1)
go func(i int, wg *sync.WaitGroup) {
defer wg.Done()
groupDescription, err := s.admin.DescribeConsumerGroups([]string{groups[i]})
if err != nil {
logger.Fatalf("Err on describing consumer group %s: %v\n", groups[i], err)
}

var c consumerGroups = groupDescription[0].Members

if c.HasSubscription(topic) {
consumerGroupsChannel <- groupDescription[0].GroupId
fmt.Println(groupDescription[0].GroupId)
}
}(i, &wg)
}

wg.Wait()

close(consumerGroupsChannel)

return consumerGroupsChannel, nil
}

func (s *SaramaClient) ListTopicDetails() (map[string]TopicDetail, error) {
topics, err := s.admin.ListTopics()
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/client/saramaclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestSaramaClient_ListTopicDetailsSuccess(t *testing.T) {
Expand Down Expand Up @@ -269,3 +270,33 @@ func TestSaramaClient_CreatePartitionsFailure(t *testing.T) {
assert.Error(t, err)
admin.AssertExpectations(t)
}
func TestSaramaClient_GetConsumerGroupsForTopic(t *testing.T) {
admin := &MockClusterAdmin{}
client := SaramaClient{admin: admin}

groupDesciption := []*sarama.GroupDescription{{
GroupId: "test-group-id",
Members: map[string]*sarama.GroupMemberDescription{
"instance-id-0": {
ClientId: "instance-id-0",
MemberAssignment: []byte{0x04, 0x05, 0x06},
},

"instance-id-1": {
ClientId: "instance-id-1",
MemberAssignment: []byte{0x04, 0x05, 0x06},
},

"instance-id-2": {
ClientId: "instance-id-2",
MemberAssignment: []byte{0x04, 0x05, 0x06},
},
},
}}

admin.On("DescribeConsumerGroups", []string{"test-group-id"}).Return(groupDesciption, nil)

_, err := client.GetConsumerGroupsForTopic([]string{"test-group-id"}, "test-topic")

require.NoError(t, err)
}

0 comments on commit 719698d

Please sign in to comment.