Skip to content

Commit

Permalink
Merge pull request #166 from stealthly/barrier-delete-fix
Browse files Browse the repository at this point in the history
Barrier delete fix
  • Loading branch information
edgefox committed Oct 22, 2015
2 parents 28fba2b + c16232d commit f8e0846
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 84 deletions.
166 changes: 122 additions & 44 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
)

const (
// Offset with invalid value
// Offset with invalid value
InvalidOffset int64 = -1

// Reset the offset to the smallest offset if it is out of range
// Reset the offset to the smallest offset if it is out of range
SmallestOffset = "smallest"
// Reset the offset to the largest offset if it is out of range
// Reset the offset to the largest offset if it is out of range
LargestOffset = "largest"
)

Expand Down Expand Up @@ -301,7 +301,9 @@ func (c *Consumer) reinitializeConsumer() {

func (c *Consumer) initializeWorkerManagers() {
inLock(&c.workerManagersLock, func() {
Infof(c, "Initializing worker managers from topic registry: %s", c.topicRegistry)
if Logger.IsAllowed(DebugLevel) {
Debugf(c, "Initializing worker managers from topic registry: %s", c.topicRegistry)
}
for topic, partitions := range c.topicRegistry {
for partition := range partitions {
topicPartition := TopicAndPartition{topic, partition}
Expand Down Expand Up @@ -553,17 +555,23 @@ func (c *Consumer) stopWorkerManagers() bool {
}

func (c *Consumer) updateFetcher(numStreams int) {
Infof(c, "Updating fetcher with numStreams = %d", numStreams)
if Logger.IsAllowed(InfoLevel) {
Infof(c, "Updating fetcher with numStreams = %d", numStreams)
}
allPartitionInfos := make([]*partitionTopicInfo, 0)
Infof(c, "Topic Registry = %s", c.topicRegistry)
if Logger.IsAllowed(DebugLevel) {
Debugf(c, "Topic Registry = %s", c.topicRegistry)
}
for _, partitionAndInfo := range c.topicRegistry {
for _, partitionInfo := range partitionAndInfo {
allPartitionInfos = append(allPartitionInfos, partitionInfo)
}
}

c.fetcher.startConnections(allPartitionInfos, numStreams)
Infof(c, "Updated fetcher")
if Logger.IsAllowed(InfoLevel) {
Infof(c, "Updated fetcher")
}
}

func (c *Consumer) subscribeForChanges(group string) {
Expand Down Expand Up @@ -626,7 +634,9 @@ func (c *Consumer) rebalance() {
success := false
var stateHash string
barrierTimeout := c.config.BarrierTimeout
Infof(c, "rebalance triggered for %s\n", c.config.Consumerid)
if Logger.IsAllowed(InfoLevel) {
Infof(c, "rebalance triggered for %s\n", c.config.Consumerid)
}
for i := 0; i <= int(c.config.RebalanceMaxRetries) && !success; i++ {
partitionAssignor := newPartitionAssignor(c.config.PartitionAssignmentStrategy)
var context *assignmentContext
Expand All @@ -637,20 +647,26 @@ func (c *Consumer) rebalance() {
context, err = newAssignmentContext(c.config.Groupid, c.config.Consumerid,
c.config.ExcludeInternalTopics, c.config.Coordinator)
if err != nil {
Errorf(c, "Failed to initialize assignment context: %s", err)
if Logger.IsAllowed(ErrorLevel) {
Errorf(c, "Failed to initialize assignment context: %s", err)
}
panic(err)
}
barrierSize := len(context.Consumers)
stateHash = context.hash()

if c.lastSuccessfulRebalanceHash == stateHash {
Info(c, "No need in rebalance this time")
if Logger.IsAllowed(InfoLevel) {
Info(c, "No need in rebalance this time")
}
return
}
c.releasePartitionOwnership(c.topicRegistry)
err = c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, fmt.Sprintf("%s-ack", stateHash), string(Rebalance))
if err != nil {
Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error())
if Logger.IsAllowed(WarnLevel) {
Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error())
}
}
barrierPassed = c.config.Coordinator.AwaitOnStateBarrier(c.config.Consumerid, c.config.Groupid,
stateHash, barrierSize, string(Rebalance),
Expand All @@ -659,7 +675,9 @@ func (c *Consumer) rebalance() {
// If the barrier failed to have consensus remove it.
err = c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, stateHash, string(Rebalance))
if err != nil {
Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error())
if Logger.IsAllowed(WarnLevel) {
Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error())
}
}
}
}
Expand All @@ -675,20 +693,25 @@ func (c *Consumer) rebalance() {

err = c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, stateHash, string(Rebalance))
if err != nil {
Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error())
if Logger.IsAllowed(WarnLevel) {
Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error())
}
}
barrierTimeout += c.config.BarrierTimeout
}
if !success && !c.isShuttingdown {
panic(fmt.Sprintf("Failed to rebalance after %d retries", c.config.RebalanceMaxRetries))
} else {
c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, fmt.Sprintf("%s-ack", c.lastSuccessfulRebalanceHash), string(Rebalance))
c.lastSuccessfulRebalanceHash = stateHash
Info(c, "Rebalance has been successfully completed")
if Logger.IsAllowed(InfoLevel) {
Info(c, "Rebalance has been successfully completed")
}
}
})
} else {
Infof(c, "Rebalance was triggered during consumer '%s' shutdown sequence. Ignoring...", c.config.Consumerid)
if Logger.IsAllowed(InfoLevel) {
Infof(c, "Rebalance was triggered during consumer '%s' shutdown sequence. Ignoring...", c.config.Consumerid)
}
}
}

Expand All @@ -701,14 +724,18 @@ func tryRebalance(c *Consumer, context *assignmentContext, partitionAssignor ass

offsets, err := c.fetchOffsets(topicPartitions)
if err != nil {
Errorf(c, "Failed to fetch offsets during rebalance: %s", err)
if Logger.IsAllowed(ErrorLevel) {
Errorf(c, "Failed to fetch offsets during rebalance: %s", err)
}
return false
}

currentTopicRegistry := make(map[string]map[int32]*partitionTopicInfo)

if c.isShuttingdown {
Warnf(c, "Aborting consumer '%s' rebalancing, since shutdown sequence started.", c.config.Consumerid)
if Logger.IsAllowed(WarnLevel) {
Warnf(c, "Aborting consumer '%s' rebalancing, since shutdown sequence started.", c.config.Consumerid)
}
return true
} else {
for _, topicPartition := range topicPartitions {
Expand All @@ -719,7 +746,9 @@ func tryRebalance(c *Consumer, context *assignmentContext, partitionAssignor ass
}

if c.reflectPartitionOwnershipDecision(partitionOwnershipDecision) {
Info(c, "Partition ownership has been successfully reflected")
if Logger.IsAllowed(InfoLevel) {
Info(c, "Partition ownership has been successfully reflected")
}
barrierPassed := false
retriesLeft := 3
stateHash := context.hash()
Expand All @@ -736,11 +765,17 @@ func tryRebalance(c *Consumer, context *assignmentContext, partitionAssignor ass
}

c.topicRegistry = currentTopicRegistry
Infof(c, "Trying to reinitialize fetchers and workers")
if Logger.IsAllowed(InfoLevel) {
Infof(c, "Trying to reinitialize fetchers and workers")
}
c.initFetchersAndWorkers(context)
Infof(c, "Fetchers and workers have been successfully reinitialized")
if Logger.IsAllowed(InfoLevel) {
Infof(c, "Fetchers and workers have been successfully reinitialized")
}
} else {
Errorf(c, "Failed to reflect partition ownership during rebalance")
if Logger.IsAllowed(ErrorLevel) {
Errorf(c, "Failed to reflect partition ownership during rebalance")
}
return false
}

Expand All @@ -749,27 +784,34 @@ func tryRebalance(c *Consumer, context *assignmentContext, partitionAssignor ass

func (c *Consumer) initFetchersAndWorkers(assignmentContext *assignmentContext) {
switch topicCount := assignmentContext.MyTopicToNumStreams.(type) {
case *StaticTopicsToNumStreams:
case *StaticTopicsToNumStreams:
{
c.topicCount = topicCount
var numStreams int
for _, v := range c.topicCount.GetConsumerThreadIdsPerTopic() {
numStreams = len(v)
break
}
Infof(c, "Trying to update fetcher")
if Logger.IsAllowed(InfoLevel) {
Infof(c, "Trying to update fetcher")
}
c.updateFetcher(numStreams)
}
case *WildcardTopicsToNumStreams:
case *WildcardTopicsToNumStreams:
{
c.topicCount = topicCount
c.updateFetcher(topicCount.NumStreams)
}
}
Infof(c, "Fetcher has been updated %s", assignmentContext)

if Logger.IsAllowed(DebugLevel) {
Debugf(c, "Fetcher has been updated %s", assignmentContext)
}
c.initializeWorkerManagers()

Infof(c, "Restarted streams")
if Logger.IsAllowed(InfoLevel) {
Infof(c, "Restarted streams")
}
c.connectChannels <- true
}

Expand All @@ -788,9 +830,11 @@ func (c *Consumer) fetchOffsets(topicPartitions []*TopicAndPartition) (map[Topic
}

func (c *Consumer) addPartitionTopicInfo(currenttopicRegistry map[string]map[int32]*partitionTopicInfo,
topicPartition *TopicAndPartition, offset int64,
consumerThreadId ConsumerThreadId) {
Tracef(c, "Adding partitionTopicInfo: %v \n %s", currenttopicRegistry, topicPartition)
topicPartition *TopicAndPartition, offset int64,
consumerThreadId ConsumerThreadId) {
if Logger.IsAllowed(DebugLevel) {
Debugf(c, "Adding partitionTopicInfo: %s", topicPartition)
}
partTopicInfoMap, exists := currenttopicRegistry[topicPartition.Topic]
if !exists {
partTopicInfoMap = make(map[int32]*partitionTopicInfo)
Expand All @@ -814,23 +858,34 @@ consumerThreadId ConsumerThreadId) {
}

func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision map[TopicAndPartition]ConsumerThreadId) bool {
Infof(c, "Consumer is trying to reflect partition ownership decision: %v\n", partitionOwnershipDecision)
if Logger.IsAllowed(InfoLevel) {
Info(c, "Consumer is trying to reflect partition ownership decision")
}
if Logger.IsAllowed(DebugLevel) {
Debugf(c, "Partition ownership decision: %v", partitionOwnershipDecision)
}

pool := NewRoutinePool(c.config.RoutinePoolSize)

successfullyOwnedPartitions := make([]*TopicAndPartition, 0)
for topicPartition, consumerThreadId := range partitionOwnershipDecision {
success, err := c.config.Coordinator.ClaimPartitionOwnership(c.config.Groupid, topicPartition.Topic, topicPartition.Partition, consumerThreadId)
if err != nil {
panic(err)
}
if success {
Debugf(c, "Consumer successfully claimed partition %d for topic %s", topicPartition.Partition, topicPartition.Topic)
successfullyOwnedPartitions = append(successfullyOwnedPartitions, &topicPartition)
} else {
Warnf(c, "Consumer failed to claim partition %d for topic %s", topicPartition.Partition, topicPartition.Topic)
successChan := make(chan TopicAndPartition)
go func() {
for tp := range successChan {
successfullyOwnedPartitions = append(successfullyOwnedPartitions, &tp)
}
}()

for topicPartition, consumerThreadId := range partitionOwnershipDecision {
pool.Do(c.claimPartitionOwnershipFunc(topicPartition, successChan, consumerThreadId))
}

pool.Stop()
close(successChan)

if len(partitionOwnershipDecision) > len(successfullyOwnedPartitions) {
Warnf(c, "Consumer failed to reflect all partitions %d of %d", len(successfullyOwnedPartitions), len(partitionOwnershipDecision))
if Logger.IsAllowed(WarnLevel) {
Warnf(c, "Consumer failed to reflect all partitions %d of %d", len(successfullyOwnedPartitions), len(partitionOwnershipDecision))
}
for _, topicPartition := range successfullyOwnedPartitions {
c.config.Coordinator.ReleasePartitionOwnership(c.config.Groupid, topicPartition.Topic, topicPartition.Partition)
}
Expand All @@ -840,8 +895,29 @@ func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision
return true
}

func (c *Consumer) claimPartitionOwnershipFunc(topicPartition TopicAndPartition, successChan chan TopicAndPartition, consumerThreadId ConsumerThreadId) func() {
return func() {
success, err := c.config.Coordinator.ClaimPartitionOwnership(c.config.Groupid, topicPartition.Topic, topicPartition.Partition, consumerThreadId)
if err != nil {
panic(err)
}
if success {
if Logger.IsAllowed(DebugLevel) {
Debugf(c, "Consumer successfully claimed partition %d for topic %s", topicPartition.Partition, topicPartition.Topic)
}
successChan <- topicPartition
} else {
if Logger.IsAllowed(WarnLevel) {
Warnf(c, "Consumer failed to claim partition %d for topic %s", topicPartition.Partition, topicPartition.Topic)
}
}
}
}

func (c *Consumer) releasePartitionOwnership(localtopicRegistry map[string]map[int32]*partitionTopicInfo) {
Info(c, "Releasing partition ownership")
if Logger.IsAllowed(InfoLevel) {
Info(c, "Releasing partition ownership")
}
for topic, partitionInfos := range localtopicRegistry {
for partition, _ := range partitionInfos {
if err := c.config.Coordinator.ReleasePartitionOwnership(c.config.Groupid, topic, partition); err != nil {
Expand All @@ -850,7 +926,9 @@ func (c *Consumer) releasePartitionOwnership(localtopicRegistry map[string]map[i
}
delete(localtopicRegistry, topic)
}
Info(c, "Successfully released partition ownership")
if Logger.IsAllowed(InfoLevel) {
Info(c, "Successfully released partition ownership")
}
}

// Returns a state snapshot for this consumer. State snapshot contains a set of metrics splitted by topics and partitions.
Expand Down
18 changes: 18 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ type ConsumerConfig struct {

/* Metrics Prefix if the client wants to organize the way metric names are emitted. (optional) */
MetricsPrefix string

/* Config to skip corrupted messages. If set to true the consumer will increment the topic-partition offset by 1
on each corrupted response until the corrupted part of data is over. Turned off by default. */
SkipCorruptedMessages bool

/* RoutinePoolSize defines the size of routine pools created within this consumer. */
RoutinePoolSize int
}

//DefaultConsumerConfig creates a ConsumerConfig with sane defaults. Note that several required config entries (like Strategy and callbacks) are still not set.
Expand Down Expand Up @@ -217,6 +224,8 @@ func DefaultConsumerConfig() *ConsumerConfig {
config.KeyDecoder = &ByteDecoder{}
config.ValueDecoder = config.KeyDecoder

config.RoutinePoolSize = 50

return config
}

Expand Down Expand Up @@ -490,6 +499,15 @@ func ConsumerConfigFromFile(filename string) (*ConsumerConfig, error) {
if err := setDurationConfig(&config.FetchRequestBackoff, c["fetch.request.backoff"]); err != nil {
return nil, err
}
if err := setDurationConfig(&config.DeploymentTimeout, c["deployment.timeout"]); err != nil {
return nil, err
}
if err := setDurationConfig(&config.BarrierTimeout, c["barrier.timeout"]); err != nil {
return nil, err
}
if err := setIntConfig(&config.RoutinePoolSize, c["routine.pool.size"]); err != nil {
return nil, err
}
setBoolConfig(&config.BlueGreenDeploymentEnabled, c["blue.green.deployment.enabled"])

return config, nil
Expand Down
Loading

0 comments on commit f8e0846

Please sign in to comment.