Skip to content

Commit

Permalink
Merge pull request #143 from CrowdStrike/master
Browse files Browse the repository at this point in the history
Various fixes to make re-balancing and partition ownership more stable.
  • Loading branch information
edgefox committed Jul 29, 2015
2 parents 41834bc + 6f2394d commit 8f4abea
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 31 deletions.
76 changes: 59 additions & 17 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Consumer struct {
workerManagersLock sync.Mutex
stopStreams chan bool
close chan bool
stopCleanup chan struct{}
topicCount TopicsToNumStreams
bgInProgress bool
bgInProgressLock sync.Mutex
bgInProgressCond *sync.Cond
Expand Down Expand Up @@ -127,12 +129,12 @@ func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32) {
topicsToNumStreamsMap[topic] = c.config.NumConsumerFetchers
}

topicCount := &StaticTopicsToNumStreams{
c.topicCount = &StaticTopicsToNumStreams{
ConsumerId: c.config.Consumerid,
TopicsToNumStreamsMap: topicsToNumStreamsMap,
}

c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, topicCount)
c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount)
allTopics, err := c.config.Coordinator.GetAllTopics()
if err != nil {
panic(err)
Expand All @@ -144,7 +146,7 @@ func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32) {

time.Sleep(c.config.DeploymentTimeout)

assignmentContext := newStaticAssignmentContext(c.config.Groupid, c.config.Consumerid, []string{c.config.Consumerid}, allTopics, brokers, topicCount, topicPartitionMap)
assignmentContext := newStaticAssignmentContext(c.config.Groupid, c.config.Consumerid, []string{c.config.Consumerid}, allTopics, brokers, c.topicCount, topicPartitionMap)
partitionOwnershipDecision := newPartitionAssignor(c.config.PartitionAssignmentStrategy)(assignmentContext)

topicPartitions := make([]*TopicAndPartition, 0)
Expand Down Expand Up @@ -178,6 +180,7 @@ func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32) {
}

func (c *Consumer) startStreams() {
c.maintainCleanCoordinator()
stopRedirects := make(map[TopicAndPartition]chan bool)
for {
select {
Expand Down Expand Up @@ -229,6 +232,27 @@ func (c *Consumer) startStreams() {
}
}

// maintainCleanCoordinator runs on an interval to make sure that the coordinator removes old API requests to remove bloat from the coordinator over time.
func (c *Consumer) maintainCleanCoordinator() {
if c.stopCleanup != nil {
return
}

c.stopCleanup = make(chan struct{})
go func() {
tick := time.NewTicker(5 * time.Minute)
for {
select {
case <-tick.C:
Infof(c, "Starting coordinator cleanup of API reqeusts for %s", c.config.Groupid)
c.config.Coordinator.RemoveOldApiRequests(c.config.Groupid)
case <-c.stopCleanup:
return
}
}
}()
}

func (c *Consumer) pipeChannels(stopRedirects map[TopicAndPartition]chan bool) {
inLock(&c.workerManagersLock, func() {
Debugf(c, "connect channels registry: %v", c.topicRegistry)
Expand Down Expand Up @@ -258,28 +282,28 @@ func (c *Consumer) disconnectChannels(stopRedirects map[TopicAndPartition]chan b
}

func (c *Consumer) createMessageStreams(topicCountMap map[string]int) {
topicCount := &StaticTopicsToNumStreams{
c.topicCount = &StaticTopicsToNumStreams{
ConsumerId: c.config.Consumerid,
TopicsToNumStreamsMap: topicCountMap,
}

c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, topicCount)
c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount)

time.Sleep(c.config.DeploymentTimeout)

c.reinitializeConsumer()
}

func (c *Consumer) createMessageStreamsByFilterN(topicFilter TopicFilter, numStreams int) {
topicCount := &WildcardTopicsToNumStreams{
c.topicCount = &WildcardTopicsToNumStreams{
Coordinator: c.config.Coordinator,
ConsumerId: c.config.Consumerid,
TopicFilter: topicFilter,
NumStreams: numStreams,
ExcludeInternalTopics: c.config.ExcludeInternalTopics,
}

c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, topicCount)
c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount)

time.Sleep(c.config.DeploymentTimeout)

Expand Down Expand Up @@ -338,18 +362,22 @@ func (c *Consumer) Close() <-chan bool {

c.stopStreams <- true

c.releasePartitionOwnership(c.topicRegistry)

Info(c, "Deregistering consumer")
c.config.Coordinator.DeregisterConsumer(c.config.Consumerid, c.config.Groupid)
c.stopCleanup <- struct{}{} // Stop the background cleanup job.
c.stopCleanup = nil // Reset it so it can be used again.
Info(c, "Successfully deregistered consumer")

Info(c, "Closing low-level client")
c.config.LowLevelClient.Close()
Info(c, "Disconnecting from consumer coordinator")
// Other consumers will wait to take partition ownership until the ownership in the coordinator is released
// As such it should be one of the last things we do to prevent duplicate ownership or "released" ownership but the consumer is still running.
c.releasePartitionOwnership(c.topicRegistry)
c.config.Coordinator.Disconnect()
Info(c, "Disconnected from consumer coordinator")


Info(c, "Unregistering all metrics")
c.metrics.close()
Info(c, "Unregistered all metrics")
Expand Down Expand Up @@ -417,18 +445,17 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl
c.config.Coordinator.RemoveOldApiRequests(blueGreenRequest.Group)

//Generating new topicCount
var topicCount TopicsToNumStreams
switch blueGreenRequest.Pattern {
case blackListPattern:
topicCount = &WildcardTopicsToNumStreams{
c.topicCount = &WildcardTopicsToNumStreams{
Coordinator: c.config.Coordinator,
ConsumerId: c.config.Consumerid,
TopicFilter: NewBlackList(blueGreenRequest.Topics),
NumStreams: c.config.NumConsumerFetchers,
ExcludeInternalTopics: c.config.ExcludeInternalTopics,
}
case whiteListPattern:
topicCount = &WildcardTopicsToNumStreams{
c.topicCount = &WildcardTopicsToNumStreams{
Coordinator: c.config.Coordinator,
ConsumerId: c.config.Consumerid,
TopicFilter: NewWhiteList(blueGreenRequest.Topics),
Expand All @@ -441,15 +468,15 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl
for _, topic := range strings.Split(blueGreenRequest.Topics, ",") {
topicMap[topic] = c.config.NumConsumerFetchers
}
topicCount = &StaticTopicsToNumStreams{
c.topicCount = &StaticTopicsToNumStreams{
ConsumerId: c.config.Consumerid,
TopicsToNumStreamsMap: topicMap,
}
}
}

//Getting the partitions for specified topics
myTopicThreadIds := topicCount.GetConsumerThreadIdsPerTopic()
myTopicThreadIds := c.topicCount.GetConsumerThreadIdsPerTopic()
topics := make([]string, 0)
for topic, _ := range myTopicThreadIds {
topics = append(topics, topic)
Expand All @@ -461,7 +488,7 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl

//Creating assignment context with new parameters
newContext := newStaticAssignmentContext(blueGreenRequest.Group, c.config.Consumerid, context.Consumers,
context.AllTopics, context.Brokers, topicCount, topicPartitionMap)
context.AllTopics, context.Brokers, c.topicCount, topicPartitionMap)
c.config.Groupid = blueGreenRequest.Group

//Resume consuming
Expand Down Expand Up @@ -592,6 +619,11 @@ func (c *Consumer) subscribeForChanges(group string) {
break
}
}
} else if eventType == Reinitialize {
err := c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount)
if err != nil {
panic(err)
}
} else {
go c.rebalance()
}
Expand Down Expand Up @@ -622,7 +654,7 @@ func (c *Consumer) rebalance() {
var context *assignmentContext
var err error
barrierPassed := false
timeLimit := time.Now().Add(3*time.Minute)
timeLimit := time.Now().Add(3 * time.Minute)
for !barrierPassed && time.Now().Before(timeLimit) {
context, err = newAssignmentContext(c.config.Groupid, c.config.Consumerid,
c.config.ExcludeInternalTopics, c.config.Coordinator)
Expand All @@ -645,6 +677,13 @@ func (c *Consumer) rebalance() {
barrierPassed = c.config.Coordinator.AwaitOnStateBarrier(c.config.Consumerid, c.config.Groupid,
stateHash, barrierSize, string(Rebalance),
barrierTimeout)
if !barrierPassed {
// If the barrier failed to have consensus remove it.
err = c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, stateHash, string(Rebalance))
if err != nil {
Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error())
}
}
}
if !barrierPassed {
panic("Could not reach consensus on state barrier.")
Expand All @@ -665,6 +704,7 @@ func (c *Consumer) rebalance() {
if !success && !c.isShuttingdown {
panic(fmt.Sprintf("Failed to rebalance after %d retries", c.config.RebalanceMaxRetries))
} else {
c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, fmt.Sprintf("%s-ack", c.lastSuccessfulRebalanceHash), string(Rebalance))
c.lastSuccessfulRebalanceHash = stateHash
Info(c, "Rebalance has been successfully completed")
}
Expand Down Expand Up @@ -733,8 +773,9 @@ func (c *Consumer) initFetchersAndWorkers(assignmentContext *assignmentContext)
switch topicCount := assignmentContext.MyTopicToNumStreams.(type) {
case *StaticTopicsToNumStreams:
{
c.topicCount = topicCount
var numStreams int
for _, v := range topicCount.GetConsumerThreadIdsPerTopic() {
for _, v := range c.topicCount.GetConsumerThreadIdsPerTopic() {
numStreams = len(v)
break
}
Expand All @@ -743,6 +784,7 @@ func (c *Consumer) initFetchersAndWorkers(assignmentContext *assignmentContext)
}
case *WildcardTopicsToNumStreams:
{
c.topicCount = topicCount
c.updateFetcher(topicCount.NumStreams)
}
}
Expand Down
3 changes: 3 additions & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ const (
// A regular coordinator event that should normally trigger consumer rebalance.
Regular CoordinatorEvent = "Regular"

// Coordinator event that should trigger consumer re-registrer
Reinitialize CoordinatorEvent = "Reinitialize"

// A coordinator event that informs a consumer group of new deployed topics.
BlueGreenRequest CoordinatorEvent = "BlueGreenRequest"
)
Expand Down
Loading

0 comments on commit 8f4abea

Please sign in to comment.