From 8147515df60d646ebd6f69574c364206533518bf Mon Sep 17 00:00:00 2001 From: ilyutov Date: Thu, 9 Jul 2015 15:27:18 +0300 Subject: [PATCH 01/13] Zookeeper client reconnection feature --- consumer.go | 35 +++++++++++++++++++++-------------- structs.go | 3 +++ zk_coordinator.go | 47 +++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 67 insertions(+), 18 deletions(-) diff --git a/consumer.go b/consumer.go index cf04440..94b2353 100644 --- a/consumer.go +++ b/consumer.go @@ -54,6 +54,7 @@ type Consumer struct { workerManagersLock sync.Mutex stopStreams chan bool close chan bool + topicCount TopicsToNumStreams metrics *ConsumerMetrics @@ -123,12 +124,12 @@ func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32) { topicsToNumStreamsMap[topic] = c.config.NumConsumerFetchers } - topicCount := &StaticTopicsToNumStreams{ + c.topicCount = &StaticTopicsToNumStreams{ ConsumerId: c.config.Consumerid, TopicsToNumStreamsMap: topicsToNumStreamsMap, } - c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, topicCount) + c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount) allTopics, err := c.config.Coordinator.GetAllTopics() if err != nil { panic(err) @@ -140,7 +141,7 @@ func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32) { time.Sleep(c.config.DeploymentTimeout) - assignmentContext := newStaticAssignmentContext(c.config.Groupid, c.config.Consumerid, []string{c.config.Consumerid}, allTopics, brokers, topicCount, topicPartitionMap) + assignmentContext := newStaticAssignmentContext(c.config.Groupid, c.config.Consumerid, []string{c.config.Consumerid}, allTopics, brokers, c.topicCount, topicPartitionMap) partitionOwnershipDecision := newPartitionAssignor(c.config.PartitionAssignmentStrategy)(assignmentContext) topicPartitions := make([]*TopicAndPartition, 0) @@ -238,12 +239,12 @@ func (c *Consumer) disconnectChannels(stopRedirects map[TopicAndPartition]chan b } func (c *Consumer) createMessageStreams(topicCountMap map[string]int) { - topicCount := &StaticTopicsToNumStreams{ + c.topicCount = &StaticTopicsToNumStreams{ ConsumerId: c.config.Consumerid, TopicsToNumStreamsMap: topicCountMap, } - c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, topicCount) + c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount) time.Sleep(c.config.DeploymentTimeout) @@ -251,7 +252,7 @@ func (c *Consumer) createMessageStreams(topicCountMap map[string]int) { } func (c *Consumer) createMessageStreamsByFilterN(topicFilter TopicFilter, numStreams int) { - topicCount := &WildcardTopicsToNumStreams{ + c.topicCount = &WildcardTopicsToNumStreams{ Coordinator: c.config.Coordinator, ConsumerId: c.config.Consumerid, TopicFilter: topicFilter, @@ -259,7 +260,7 @@ func (c *Consumer) createMessageStreamsByFilterN(topicFilter TopicFilter, numStr ExcludeInternalTopics: c.config.ExcludeInternalTopics, } - c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, topicCount) + c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount) time.Sleep(c.config.DeploymentTimeout) @@ -394,10 +395,9 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl c.config.Coordinator.RemoveOldApiRequests(blueGreenRequest.Group) //Generating new topicCount - var topicCount TopicsToNumStreams switch blueGreenRequest.Pattern { case blackListPattern: - topicCount = &WildcardTopicsToNumStreams{ + c.topicCount = &WildcardTopicsToNumStreams{ Coordinator: c.config.Coordinator, ConsumerId: c.config.Consumerid, TopicFilter: NewBlackList(blueGreenRequest.Topics), @@ -405,7 +405,7 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl ExcludeInternalTopics: c.config.ExcludeInternalTopics, } case whiteListPattern: - topicCount = &WildcardTopicsToNumStreams{ + c.topicCount = &WildcardTopicsToNumStreams{ Coordinator: c.config.Coordinator, ConsumerId: c.config.Consumerid, TopicFilter: NewWhiteList(blueGreenRequest.Topics), @@ -418,7 +418,7 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl for _, topic := range strings.Split(blueGreenRequest.Topics, ",") { topicMap[topic] = c.config.NumConsumerFetchers } - topicCount = &StaticTopicsToNumStreams{ + c.topicCount = &StaticTopicsToNumStreams{ ConsumerId: c.config.Consumerid, TopicsToNumStreamsMap: topicMap, } @@ -426,7 +426,7 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl } //Getting the partitions for specified topics - myTopicThreadIds := topicCount.GetConsumerThreadIdsPerTopic() + myTopicThreadIds := c.topicCount.GetConsumerThreadIdsPerTopic() topics := make([]string, 0) for topic, _ := range myTopicThreadIds { topics = append(topics, topic) @@ -438,7 +438,7 @@ func (c *Consumer) handleBlueGreenRequest(requestId string, blueGreenRequest *Bl //Creating assignment context with new parameters newContext := newStaticAssignmentContext(blueGreenRequest.Group, c.config.Consumerid, context.Consumers, - context.AllTopics, context.Brokers, topicCount, topicPartitionMap) + context.AllTopics, context.Brokers, c.topicCount, topicPartitionMap) c.config.Groupid = blueGreenRequest.Group //Resume consuming @@ -564,6 +564,11 @@ func (c *Consumer) subscribeForChanges(group string) { break } } + } else if eventType == Reinitialize { + err := c.config.Coordinator.RegisterConsumer(c.config.Consumerid, c.config.Groupid, c.topicCount) + if err != nil { + panic(err) + } } else { go c.rebalance() } @@ -701,8 +706,9 @@ func (c *Consumer) initFetchersAndWorkers(assignmentContext *assignmentContext) switch topicCount := assignmentContext.MyTopicToNumStreams.(type) { case *StaticTopicsToNumStreams: { + c.topicCount = topicCount var numStreams int - for _, v := range topicCount.GetConsumerThreadIdsPerTopic() { + for _, v := range c.topicCount.GetConsumerThreadIdsPerTopic() { numStreams = len(v) break } @@ -711,6 +717,7 @@ func (c *Consumer) initFetchersAndWorkers(assignmentContext *assignmentContext) } case *WildcardTopicsToNumStreams: { + c.topicCount = topicCount c.updateFetcher(topicCount.NumStreams) } } diff --git a/structs.go b/structs.go index 04e98ee..5ff9328 100644 --- a/structs.go +++ b/structs.go @@ -224,6 +224,9 @@ const ( // A regular coordinator event that should normally trigger consumer rebalance. Regular CoordinatorEvent = "Regular" + // Coordinator event that should trigger consumer re-registrer + Reinitialize CoordinatorEvent = "Reinitialize" + // A coordinator event that informs a consumer group of new deployed topics. BlueGreenRequest CoordinatorEvent = "BlueGreenRequest" ) diff --git a/zk_coordinator.go b/zk_coordinator.go index 61075f6..031178a 100644 --- a/zk_coordinator.go +++ b/zk_coordinator.go @@ -40,6 +40,8 @@ type ZookeeperCoordinator struct { config *ZookeeperConfig zkConn *zk.Conn unsubscribe chan bool + closed bool + watches map[string]chan CoordinatorEvent } func (this *ZookeeperCoordinator) String() string { @@ -52,33 +54,62 @@ func NewZookeeperCoordinator(Config *ZookeeperConfig) *ZookeeperCoordinator { return &ZookeeperCoordinator{ config: Config, unsubscribe: make(chan bool), + watches: make(map[string]chan CoordinatorEvent), } } /* Establish connection to this ConsumerCoordinator. Returns an error if fails to connect, nil otherwise. */ func (this *ZookeeperCoordinator) Connect() (err error) { + var connectionEvents <-chan zk.Event for i := 0; i <= this.config.MaxRequestRetries; i++ { - this.zkConn, err = this.tryConnect() + this.zkConn, connectionEvents, err = this.tryConnect() if err == nil { + go this.listenConnectionEvents(connectionEvents) return } Tracef(this, "Zookeeper connect failed after %d-th retry", i) time.Sleep(this.config.RequestBackoff) } + return } -func (this *ZookeeperCoordinator) tryConnect() (zkConn *zk.Conn, err error) { +func (this *ZookeeperCoordinator) tryConnect() (zkConn *zk.Conn, connectionEvents <-chan zk.Event, err error) { Infof(this, "Connecting to ZK at %s\n", this.config.ZookeeperConnect) - zkConn, _, err = zk.Connect(this.config.ZookeeperConnect, this.config.ZookeeperTimeout) + zkConn, connectionEvents, err = zk.Connect(this.config.ZookeeperConnect, this.config.ZookeeperTimeout) return } func (this *ZookeeperCoordinator) Disconnect() { Infof(this, "Closing connection to ZK at %s\n", this.config.ZookeeperConnect) + this.closed = true this.zkConn.Close() } +func (this *ZookeeperCoordinator) listenConnectionEvents(connectionEvents <-chan zk.Event) { + for event := range connectionEvents { + if (this.closed) { + return + } + + if (event.State == zk.StateExpired && event.Type == zk.EventSession) { + err := this.Connect() + if (err != nil) { + panic(err) + } + for groupId, watch := range this.watches { + _, err := this.SubscribeForChanges(groupId) + if (err != nil) { + panic(err) + } + watch <- Reinitialize + } + + return + } + } +} + /* Registers a new consumer with Consumerid id and TopicCount subscription that is a part of consumer group Groupid in this ConsumerCoordinator. Returns an error if registration failed, nil otherwise. */ func (this *ZookeeperCoordinator) RegisterConsumer(Consumerid string, Groupid string, TopicCount TopicsToNumStreams) (err error) { backoffMultiplier := 1 @@ -443,7 +474,15 @@ func (this *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (events <- } func (this *ZookeeperCoordinator) trySubscribeForChanges(Groupid string) (<-chan CoordinatorEvent, error) { - changes := make(chan CoordinatorEvent) + var changes chan CoordinatorEvent + if _, ok := this.watches[Groupid]; !ok { + changes = make(chan CoordinatorEvent) + this.watches[Groupid] = changes + } else { + changes = this.watches[Groupid] + } + + Infof(this, "Subscribing for changes for %s", Groupid) consumersWatcher, err := this.getConsumersInGroupWatcher(Groupid) From ee4988a65ec53ad9b4c23d8a20abb9a3b64b4c99 Mon Sep 17 00:00:00 2001 From: ilyutov Date: Thu, 9 Jul 2015 15:28:34 +0300 Subject: [PATCH 02/13] Go format --- consumer.go | 2 +- consumer_config.go | 20 +-- low_level_client.go | 4 +- metrics.go | 2 +- mirror_maker.go | 6 +- producer.go | 306 ++++++++++++++++++++-------------------- producer_sarama.go | 20 +-- schema_registry_test.go | 8 +- structs.go | 12 +- syslog_producer.go | 6 +- testing_utils.go | 6 +- zk_coordinator.go | 15 +- 12 files changed, 203 insertions(+), 204 deletions(-) diff --git a/consumer.go b/consumer.go index 94b2353..7d1e367 100644 --- a/consumer.go +++ b/consumer.go @@ -54,7 +54,7 @@ type Consumer struct { workerManagersLock sync.Mutex stopStreams chan bool close chan bool - topicCount TopicsToNumStreams + topicCount TopicsToNumStreams metrics *ConsumerMetrics diff --git a/consumer_config.go b/consumer_config.go index 3ee8115..a81d7db 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -139,8 +139,8 @@ type ConsumerConfig struct { /* Coordinator used to coordinate consumer's actions, e.g. trigger rebalance events, store offsets and consumer metadata etc. */ Coordinator ConsumerCoordinator - /* OffsetStorage is used to store and retrieve consumer offsets. */ - OffsetStorage OffsetStorage + /* OffsetStorage is used to store and retrieve consumer offsets. */ + OffsetStorage OffsetStorage /* Indicates whether the client supports blue-green deployment. This config entry is needed because blue-green deployment won't work with RoundRobin partition assignment strategy. @@ -340,14 +340,14 @@ func (c *ConsumerConfig) Validate() error { return errors.New("Please provide a Coordinator") } - if c.OffsetStorage == nil { - // This is for folks who already use this client - if zookeeper, ok := c.Coordinator.(*ZookeeperCoordinator); ok { - c.OffsetStorage = zookeeper - } else { - return errors.New("Please provide an OffsetStorage") - } - } + if c.OffsetStorage == nil { + // This is for folks who already use this client + if zookeeper, ok := c.Coordinator.(*ZookeeperCoordinator); ok { + c.OffsetStorage = zookeeper + } else { + return errors.New("Please provide an OffsetStorage") + } + } if c.BlueGreenDeploymentEnabled && c.PartitionAssignmentStrategy != RangeStrategy { return errors.New("In order to use Blue-Green deployment Range partition assignment strategy should be used") diff --git a/low_level_client.go b/low_level_client.go index c426cc5..0b1e0b2 100644 --- a/low_level_client.go +++ b/low_level_client.go @@ -326,13 +326,13 @@ func (this *SiestaClient) GetAvailableOffset(topic string, partition int32, offs // Gets the offset for a given group, topic and partition. // May return an error if fails to retrieve the offset. func (this *SiestaClient) GetOffset(group string, topic string, partition int32) (int64, error) { - return this.connector.GetOffset(group, topic, partition) + return this.connector.GetOffset(group, topic, partition) } // Commits the given offset for a given group, topic and partition. // May return an error if fails to commit the offset. func (this *SiestaClient) CommitOffset(group string, topic string, partition int32, offset int64) error { - return this.connector.CommitOffset(group, topic, partition, offset) + return this.connector.CommitOffset(group, topic, partition, offset) } // Gracefully shuts down this client. diff --git a/metrics.go b/metrics.go index 2d9fd59..25276a8 100644 --- a/metrics.go +++ b/metrics.go @@ -44,7 +44,7 @@ func newConsumerMetrics(consumerName, prefix string) *ConsumerMetrics { } // Ensure prefix ends with a dot (.) so it plays nice with statsd/graphite - prefix = strings.Trim(prefix, " ") + prefix = strings.Trim(prefix, " ") if prefix != "" && prefix[len(prefix)-1:] != "." { prefix += "." } diff --git a/mirror_maker.go b/mirror_maker.go index 714d6fa..db6ffef 100644 --- a/mirror_maker.go +++ b/mirror_maker.go @@ -109,10 +109,10 @@ func NewMirrorMaker(config *MirrorMakerConfig) *MirrorMaker { logLineSchema = readLoglineSchema() } return &MirrorMaker{ - config: config, + config: config, logLineSchema: logLineSchema, - evolutioned: newSchemaSet(), - errors: make(chan *FailedMessage), + evolutioned: newSchemaSet(), + errors: make(chan *FailedMessage), } } diff --git a/producer.go b/producer.go index dd88c2f..5d5ce9c 100644 --- a/producer.go +++ b/producer.go @@ -18,82 +18,82 @@ package go_kafka_client import ( - "bytes" - "encoding/binary" - "errors" - hashing "hash" - "hash/fnv" - "math/rand" - "time" + "bytes" + "encoding/binary" + "errors" + hashing "hash" + "hash/fnv" + "math/rand" + "time" ) type Producer interface { - Errors() <-chan *FailedMessage - Successes() <-chan *ProducerMessage - Input() chan <- *ProducerMessage - Close() error - AsyncClose() + Errors() <-chan *FailedMessage + Successes() <-chan *ProducerMessage + Input() chan<- *ProducerMessage + Close() error + AsyncClose() } type ProducerConstructor func(config *ProducerConfig) Producer type ProducerMessage struct { - Topic string - Key interface{} - Value interface{} - KeyEncoder Encoder - ValueEncoder Encoder - - offset int64 - partition int32 + Topic string + Key interface{} + Value interface{} + KeyEncoder Encoder + ValueEncoder Encoder + + offset int64 + partition int32 } type Partitioner interface { - Partition(key []byte, numPartitions int32) (int32, error) - RequiresConsistency() bool + Partition(key []byte, numPartitions int32) (int32, error) + RequiresConsistency() bool } type PartitionerConstructor func() Partitioner type ProducerConfig struct { - Clientid string - BrokerList []string - SendBufferSize int - CompressionCodec string - FlushByteCount int - FlushTimeout time.Duration - BatchSize int - MaxMessageBytes int - MaxMessagesPerRequest int - Acks int - RetryBackoff time.Duration - Timeout time.Duration - Partitioner PartitionerConstructor - KeyEncoder Encoder - ValueEncoder Encoder - AckSuccesses bool - - //Retries int //TODO ?? + Clientid string + BrokerList []string + SendBufferSize int + CompressionCodec string + FlushByteCount int + FlushTimeout time.Duration + BatchSize int + MaxMessageBytes int + MaxMessagesPerRequest int + Acks int + RetryBackoff time.Duration + Timeout time.Duration + Partitioner PartitionerConstructor + KeyEncoder Encoder + ValueEncoder Encoder + AckSuccesses bool + + //Retries int //TODO ?? } func DefaultProducerConfig() *ProducerConfig { - return &ProducerConfig{ - Clientid: "mirrormaker", - MaxMessageBytes: 1000000, - Acks: 1, - RetryBackoff: 250 * time.Millisecond, - KeyEncoder: &ByteEncoder{}, - ValueEncoder: &ByteEncoder{}, - Partitioner: NewRandomPartitioner, - Timeout: 10 * time.Second, - BatchSize: 10, - MaxMessagesPerRequest: 100, - FlushByteCount: 65535, - FlushTimeout: 5 * time.Second, - AckSuccesses: false, - SendBufferSize: 1, - CompressionCodec: "none", - } + return &ProducerConfig{ + Clientid: "mirrormaker", + MaxMessageBytes: 1000000, + Acks: 1, + RetryBackoff: 250 * time.Millisecond, + KeyEncoder: &ByteEncoder{}, + ValueEncoder: &ByteEncoder{}, + Partitioner: NewRandomPartitioner, + Timeout: 10 * time.Second, + BatchSize: 10, + MaxMessagesPerRequest: 100, + FlushByteCount: 65535, + FlushTimeout: 5 * time.Second, + AckSuccesses: false, + SendBufferSize: 1, + CompressionCodec: "none", + } } // ProducerConfigFromFile is a helper function that loads a producer's configuration information from file. @@ -113,167 +113,167 @@ func DefaultProducerConfig() *ProducerConfig { // The configuration file entries should be constructed in key=value syntax. A # symbol at the beginning // of a line indicates a comment. Blank lines are ignored. The file should end with a newline character. func ProducerConfigFromFile(filename string) (*ProducerConfig, error) { - p, err := LoadConfiguration(filename) - if err != nil { - return nil, err - } - - config := DefaultProducerConfig() - setStringConfig(&config.Clientid, p["client.id"]) - setStringSliceConfig(&config.BrokerList, p["metadata.broker.list"], ",") - if err := setIntConfig(&config.SendBufferSize, p["send.buffer.size"]); err != nil { - return nil, err - } - setStringConfig(&config.CompressionCodec, p["compression.codec"]) - if err := setIntConfig(&config.FlushByteCount, p["flush.byte.count"]); err != nil { - return nil, err - } - if err := setDurationConfig(&config.FlushTimeout, p["flush.timeout"]); err != nil { - return nil, err - } - if err := setIntConfig(&config.BatchSize, p["batch.size"]); err != nil { - return nil, err - } - if err := setIntConfig(&config.MaxMessageBytes, p["max.message.bytes"]); err != nil { - return nil, err - } - if err := setIntConfig(&config.MaxMessagesPerRequest, p["max.messages.per.request"]); err != nil { - return nil, err - } - if err := setIntConfig(&config.Acks, p["acks"]); err != nil { - return nil, err - } - if err := setDurationConfig(&config.RetryBackoff, p["retry.backoff"]); err != nil { - return nil, err - } - if err := setDurationConfig(&config.Timeout, p["timeout"]); err != nil { - return nil, err - } - - return config, nil + p, err := LoadConfiguration(filename) + if err != nil { + return nil, err + } + + config := DefaultProducerConfig() + setStringConfig(&config.Clientid, p["client.id"]) + setStringSliceConfig(&config.BrokerList, p["metadata.broker.list"], ",") + if err := setIntConfig(&config.SendBufferSize, p["send.buffer.size"]); err != nil { + return nil, err + } + setStringConfig(&config.CompressionCodec, p["compression.codec"]) + if err := setIntConfig(&config.FlushByteCount, p["flush.byte.count"]); err != nil { + return nil, err + } + if err := setDurationConfig(&config.FlushTimeout, p["flush.timeout"]); err != nil { + return nil, err + } + if err := setIntConfig(&config.BatchSize, p["batch.size"]); err != nil { + return nil, err + } + if err := setIntConfig(&config.MaxMessageBytes, p["max.message.bytes"]); err != nil { + return nil, err + } + if err := setIntConfig(&config.MaxMessagesPerRequest, p["max.messages.per.request"]); err != nil { + return nil, err + } + if err := setIntConfig(&config.Acks, p["acks"]); err != nil { + return nil, err + } + if err := setDurationConfig(&config.RetryBackoff, p["retry.backoff"]); err != nil { + return nil, err + } + if err := setDurationConfig(&config.Timeout, p["timeout"]); err != nil { + return nil, err + } + + return config, nil } func (this *ProducerConfig) Validate() error { - if len(this.BrokerList) == 0 { - return errors.New("Broker list cannot be empty") - } + if len(this.BrokerList) == 0 { + return errors.New("Broker list cannot be empty") + } - if this.Partitioner == nil { - return errors.New("Producer partitioner cannot be empty") - } + if this.Partitioner == nil { + return errors.New("Producer partitioner cannot be empty") + } - return nil + return nil } // Partitioner sends messages to partitions that correspond message keys -type FixedPartitioner struct {} +type FixedPartitioner struct{} func NewFixedPartitioner() Partitioner { - return &FixedPartitioner{} + return &FixedPartitioner{} } func (this *FixedPartitioner) Partition(key []byte, numPartitions int32) (int32, error) { - if key == nil { - panic("FixedPartitioner does not work without keys.") - } - partition, err := binary.ReadUvarint(bytes.NewBuffer(key)) - if err != nil { - return -1, err - } - - return int32(partition) % numPartitions, nil + if key == nil { + panic("FixedPartitioner does not work without keys.") + } + partition, err := binary.ReadUvarint(bytes.NewBuffer(key)) + if err != nil { + return -1, err + } + + return int32(partition) % numPartitions, nil } func (this *FixedPartitioner) RequiresConsistency() bool { - return true + return true } // RandomPartitioner implements the Partitioner interface by choosing a random partition each time. type RandomPartitioner struct { - generator *rand.Rand + generator *rand.Rand } func NewRandomPartitioner() Partitioner { - return &RandomPartitioner{ - generator: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), - } + return &RandomPartitioner{ + generator: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + } } func (this *RandomPartitioner) Partition(key []byte, numPartitions int32) (int32, error) { - return int32(this.generator.Intn(int(numPartitions))), nil + return int32(this.generator.Intn(int(numPartitions))), nil } func (this *RandomPartitioner) RequiresConsistency() bool { - return false + return false } // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time. type RoundRobinPartitioner struct { - partition int32 + partition int32 } func NewRoundRobinPartitioner() Partitioner { - return &RoundRobinPartitioner{} + return &RoundRobinPartitioner{} } func (this *RoundRobinPartitioner) Partition(key []byte, numPartitions int32) (int32, error) { - if this.partition >= numPartitions { - this.partition = 0 - } - ret := this.partition - this.partition++ - return ret, nil + if this.partition >= numPartitions { + this.partition = 0 + } + ret := this.partition + this.partition++ + return ret, nil } func (this *RoundRobinPartitioner) RequiresConsistency() bool { - return false + return false } // HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition // is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages // with the same key always end up on the same partition. type HashPartitioner struct { - random Partitioner - hasher hashing.Hash32 + random Partitioner + hasher hashing.Hash32 } func NewHashPartitioner() Partitioner { - p := new(HashPartitioner) - p.random = NewRandomPartitioner() - p.hasher = fnv.New32a() - return p + p := new(HashPartitioner) + p.random = NewRandomPartitioner() + p.hasher = fnv.New32a() + return p } func (this *HashPartitioner) Partition(key []byte, numPartitions int32) (int32, error) { - if key == nil { - return this.random.Partition(key, numPartitions) - } - - this.hasher.Reset() - _, err := this.hasher.Write(key) - if err != nil { - return -1, err - } - hash := int32(this.hasher.Sum32()) - if hash < 0 { - hash = -hash - } - return hash % numPartitions, nil + if key == nil { + return this.random.Partition(key, numPartitions) + } + + this.hasher.Reset() + _, err := this.hasher.Write(key) + if err != nil { + return -1, err + } + hash := int32(this.hasher.Sum32()) + if hash < 0 { + hash = -hash + } + return hash % numPartitions, nil } func (this *HashPartitioner) RequiresConsistency() bool { - return true + return true } // ConstantPartitioner implements the Partitioner interface by just returning a constant value. type ConstantPartitioner struct { - Constant int32 + Constant int32 } func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) { - return p.Constant, nil + return p.Constant, nil } func (p *ConstantPartitioner) RequiresConsistency() bool { - return true + return true } diff --git a/producer_sarama.go b/producer_sarama.go index 209f0f5..97bc645 100644 --- a/producer_sarama.go +++ b/producer_sarama.go @@ -24,9 +24,9 @@ import ( type SaramaProducer struct { saramaProducer sarama.AsyncProducer - input chan *ProducerMessage - successes chan *ProducerMessage - errors chan *FailedMessage + input chan *ProducerMessage + successes chan *ProducerMessage + errors chan *FailedMessage config *ProducerConfig } @@ -70,15 +70,15 @@ func NewSaramaProducer(conf *ProducerConfig) Producer { saramaProducer: producer, config: conf, } - saramaProducer.initSuccesses() - saramaProducer.initErrors() - saramaProducer.initInput() + saramaProducer.initSuccesses() + saramaProducer.initErrors() + saramaProducer.initInput() - return saramaProducer + return saramaProducer } func (this *SaramaProducer) Errors() <-chan *FailedMessage { - return this.errors + return this.errors } func (this *SaramaProducer) initErrors() { @@ -106,7 +106,7 @@ func (this *SaramaProducer) initErrors() { } func (this *SaramaProducer) Successes() <-chan *ProducerMessage { - return this.successes + return this.successes } func (this *SaramaProducer) initSuccesses() { @@ -134,7 +134,7 @@ func (this *SaramaProducer) initSuccesses() { } func (this *SaramaProducer) Input() chan<- *ProducerMessage { - return this.input + return this.input } func (this *SaramaProducer) initInput() { diff --git a/schema_registry_test.go b/schema_registry_test.go index f26259b..065e618 100644 --- a/schema_registry_test.go +++ b/schema_registry_test.go @@ -16,15 +16,15 @@ limitations under the License. */ package go_kafka_client import ( - "testing" - avro "github.com/stealthly/go-avro" + avro "github.com/stealthly/go-avro" + "testing" ) func TestSchemaRegistry(t *testing.T) { client := NewCachedSchemaRegistryClient("http://localhost:8081") rawSchema := "{\"namespace\": \"ly.stealth.kafka.metrics\",\"type\": \"record\",\"name\": \"Timings\",\"fields\": [{\"name\": \"id\", \"type\": \"long\"},{\"name\": \"timings\", \"type\": {\"type\":\"array\", \"items\": \"long\"} }]}" - schema, err := avro.ParseSchema(rawSchema) - assert(t, err, nil) + schema, err := avro.ParseSchema(rawSchema) + assert(t, err, nil) id, err := client.Register("test1", schema) assert(t, err, nil) assertNot(t, id, 0) diff --git a/structs.go b/structs.go index 5ff9328..914663a 100644 --- a/structs.go +++ b/structs.go @@ -233,13 +233,13 @@ const ( // OffsetStorage is used to store and retrieve consumer offsets. type OffsetStorage interface { - // Gets the offset for a given group, topic and partition. - // May return an error if fails to retrieve the offset. - GetOffset(group string, topic string, partition int32) (int64, error) + // Gets the offset for a given group, topic and partition. + // May return an error if fails to retrieve the offset. + GetOffset(group string, topic string, partition int32) (int64, error) - // Commits the given offset for a given group, topic and partition. - // May return an error if fails to commit the offset. - CommitOffset(group string, topic string, partition int32, offset int64) error + // Commits the given offset for a given group, topic and partition. + // May return an error if fails to commit the offset. + CommitOffset(group string, topic string, partition int32, offset int64) error } // Represents a consumer state snapshot. diff --git a/syslog_producer.go b/syslog_producer.go index 5c5bdb4..27976ce 100644 --- a/syslog_producer.go +++ b/syslog_producer.go @@ -175,11 +175,11 @@ func (this *SyslogProducer) startProducers() { config.ClientID = conf.Clientid config.ChannelBufferSize = conf.SendBufferSize switch strings.ToLower(conf.CompressionCodec) { - case "none": + case "none": config.Producer.Compression = sarama.CompressionNone - case "gzip": + case "gzip": config.Producer.Compression = sarama.CompressionGZIP - case "snappy": + case "snappy": config.Producer.Compression = sarama.CompressionSnappy } config.Producer.Flush.Bytes = conf.FlushByteCount diff --git a/testing_utils.go b/testing_utils.go index 64fa257..415ea74 100644 --- a/testing_utils.go +++ b/testing_utils.go @@ -147,7 +147,7 @@ func receiveNoMessages(t *testing.T, timeout time.Duration, from <-chan []*Messa func produceN(t *testing.T, n int, topic string, brokerAddr string) { clientConfig := sarama.NewConfig() - clientConfig.Producer.Timeout = 10 * time.Second + clientConfig.Producer.Timeout = 10 * time.Second client, err := sarama.NewClient([]string{brokerAddr}, clientConfig) if err != nil { t.Fatal(err) @@ -173,7 +173,7 @@ func produceNToTopicPartition(t *testing.T, n int, topic string, partition int, clientConfig := sarama.NewConfig() partitionerFactory := &SaramaPartitionerFactory{NewFixedPartitioner} clientConfig.Producer.Partitioner = partitionerFactory.PartitionerConstructor - clientConfig.Producer.Timeout = 10 * time.Second + clientConfig.Producer.Timeout = 10 * time.Second client, err := sarama.NewClient([]string{brokerAddr}, clientConfig) if err != nil { t.Fatal(err) @@ -199,7 +199,7 @@ func produceNToTopicPartition(t *testing.T, n int, topic string, partition int, func produce(t *testing.T, messages []string, topic string, brokerAddr string, compression sarama.CompressionCodec) { clientConfig := sarama.NewConfig() clientConfig.Producer.Compression = compression - clientConfig.Producer.Timeout = 10 * time.Second + clientConfig.Producer.Timeout = 10 * time.Second client, err := sarama.NewClient([]string{brokerAddr}, clientConfig) if err != nil { t.Fatal(err) diff --git a/zk_coordinator.go b/zk_coordinator.go index 031178a..2f6c3f9 100644 --- a/zk_coordinator.go +++ b/zk_coordinator.go @@ -40,8 +40,8 @@ type ZookeeperCoordinator struct { config *ZookeeperConfig zkConn *zk.Conn unsubscribe chan bool - closed bool - watches map[string]chan CoordinatorEvent + closed bool + watches map[string]chan CoordinatorEvent } func (this *ZookeeperCoordinator) String() string { @@ -54,7 +54,7 @@ func NewZookeeperCoordinator(Config *ZookeeperConfig) *ZookeeperCoordinator { return &ZookeeperCoordinator{ config: Config, unsubscribe: make(chan bool), - watches: make(map[string]chan CoordinatorEvent), + watches: make(map[string]chan CoordinatorEvent), } } @@ -88,18 +88,18 @@ func (this *ZookeeperCoordinator) Disconnect() { func (this *ZookeeperCoordinator) listenConnectionEvents(connectionEvents <-chan zk.Event) { for event := range connectionEvents { - if (this.closed) { + if this.closed { return } - if (event.State == zk.StateExpired && event.Type == zk.EventSession) { + if event.State == zk.StateExpired && event.Type == zk.EventSession { err := this.Connect() - if (err != nil) { + if err != nil { panic(err) } for groupId, watch := range this.watches { _, err := this.SubscribeForChanges(groupId) - if (err != nil) { + if err != nil { panic(err) } watch <- Reinitialize @@ -482,7 +482,6 @@ func (this *ZookeeperCoordinator) trySubscribeForChanges(Groupid string) (<-chan changes = this.watches[Groupid] } - Infof(this, "Subscribing for changes for %s", Groupid) consumersWatcher, err := this.getConsumersInGroupWatcher(Groupid) From 4b871d00873cb6da3639fe233eadcd2ff1bff5e9 Mon Sep 17 00:00:00 2001 From: Marcus Date: Mon, 13 Jul 2015 16:41:51 -0700 Subject: [PATCH 03/13] Updated file perms. --- Dockerfile | 0 Dockerfile.mirrormaker | 0 Godeps | 0 LICENSE | 0 README.md | 0 Vagrantfile | 0 api.go | 0 avro/logline.go | 0 avro_encoder_decoder.go | 0 avro_encoder_decoder_test.go | 0 config/consumer.properties | 0 consumer.go | 0 consumer_config.go | 0 consumer_test.go | 0 consumers/Godeps | 0 consumers/consumers.go | 2 +- consumers/consumers.properties | 0 consumers/seelog.xml | 0 docs/emitters.md | 0 docs/offset_storage.md | 0 emitters_test.go | 0 encoder_decoder.go | 0 fetcher.go | 0 filter.go | 0 log_emitters.go | 0 logger.go | 0 logline.avsc | 0 low_level_client.go | 0 marathon/README.md | 0 marathon/http_request.avsc | 0 marathon/marathon_event_producer.go | 2 +- marathon_event_producer.go | 0 message_buffer.go | 0 message_buffer_test.go | 0 metrics.go | 0 metrics_emitters.go | 0 mirror_maker.go | 0 mirror_maker_test.go | 0 mirrormaker/README.md | 0 mirrormaker/consumer.config | 0 mirrormaker/logline.avsc | 0 mirrormaker/mirror_maker.go | 2 +- mirrormaker/producer.config | 0 partition_assignment.go | 0 partition_assignment_test.go | 0 perf/README.md | 0 perf/avro/timings.avsc | 0 perf/consumer/consumer.go | 2 +- perf/mirror/mirror.go | 2 +- perf/producer/producer.go | 2 +- producer.go | 0 producer_sarama.go | 0 producers/Godeps | 0 producers/producers.go | 2 +- producers/producers.properties | 0 producers/seelog.xml | 0 producers/tailf/tailf.go | 0 schema_registry.go | 0 schema_registry_test.go | 0 structs.go | 0 swtich/switch.go | 2 +- syslog/Dockerfile | 0 syslog/README.md | 0 syslog/Vagrantfile | 0 syslog/producer.properties | 0 syslog/syslog.go | 2 +- syslog/syslog_proto/logline.pb.go | 0 syslog/syslog_proto/logline.proto | 0 syslog_producer.go | 0 syslog_test.go | 0 testing_utils.go | 0 topics.go | 0 utils.go | 0 workers.go | 0 workers_test.go | 0 zk_coordinator.go | 4 ++-- zk_coordinator_test.go | 0 77 files changed, 11 insertions(+), 11 deletions(-) mode change 100644 => 100755 Dockerfile mode change 100644 => 100755 Dockerfile.mirrormaker mode change 100644 => 100755 Godeps mode change 100644 => 100755 LICENSE mode change 100644 => 100755 README.md mode change 100644 => 100755 Vagrantfile mode change 100644 => 100755 api.go mode change 100644 => 100755 avro/logline.go mode change 100644 => 100755 avro_encoder_decoder.go mode change 100644 => 100755 avro_encoder_decoder_test.go mode change 100644 => 100755 config/consumer.properties mode change 100644 => 100755 consumer.go mode change 100644 => 100755 consumer_config.go mode change 100644 => 100755 consumer_test.go mode change 100644 => 100755 consumers/Godeps mode change 100644 => 100755 consumers/consumers.go mode change 100644 => 100755 consumers/consumers.properties mode change 100644 => 100755 consumers/seelog.xml mode change 100644 => 100755 docs/emitters.md mode change 100644 => 100755 docs/offset_storage.md mode change 100644 => 100755 emitters_test.go mode change 100644 => 100755 encoder_decoder.go mode change 100644 => 100755 fetcher.go mode change 100644 => 100755 filter.go mode change 100644 => 100755 log_emitters.go mode change 100644 => 100755 logger.go mode change 100644 => 100755 logline.avsc mode change 100644 => 100755 low_level_client.go mode change 100644 => 100755 marathon/README.md mode change 100644 => 100755 marathon/http_request.avsc mode change 100644 => 100755 marathon/marathon_event_producer.go mode change 100644 => 100755 marathon_event_producer.go mode change 100644 => 100755 message_buffer.go mode change 100644 => 100755 message_buffer_test.go mode change 100644 => 100755 metrics.go mode change 100644 => 100755 metrics_emitters.go mode change 100644 => 100755 mirror_maker.go mode change 100644 => 100755 mirror_maker_test.go mode change 100644 => 100755 mirrormaker/README.md mode change 100644 => 100755 mirrormaker/consumer.config mode change 100644 => 100755 mirrormaker/logline.avsc mode change 100644 => 100755 mirrormaker/mirror_maker.go mode change 100644 => 100755 mirrormaker/producer.config mode change 100644 => 100755 partition_assignment.go mode change 100644 => 100755 partition_assignment_test.go mode change 100644 => 100755 perf/README.md mode change 100644 => 100755 perf/avro/timings.avsc mode change 100644 => 100755 perf/consumer/consumer.go mode change 100644 => 100755 perf/mirror/mirror.go mode change 100644 => 100755 perf/producer/producer.go mode change 100644 => 100755 producer.go mode change 100644 => 100755 producer_sarama.go mode change 100644 => 100755 producers/Godeps mode change 100644 => 100755 producers/producers.go mode change 100644 => 100755 producers/producers.properties mode change 100644 => 100755 producers/seelog.xml mode change 100644 => 100755 producers/tailf/tailf.go mode change 100644 => 100755 schema_registry.go mode change 100644 => 100755 schema_registry_test.go mode change 100644 => 100755 structs.go mode change 100644 => 100755 swtich/switch.go mode change 100644 => 100755 syslog/Dockerfile mode change 100644 => 100755 syslog/README.md mode change 100644 => 100755 syslog/Vagrantfile mode change 100644 => 100755 syslog/producer.properties mode change 100644 => 100755 syslog/syslog.go mode change 100644 => 100755 syslog/syslog_proto/logline.pb.go mode change 100644 => 100755 syslog/syslog_proto/logline.proto mode change 100644 => 100755 syslog_producer.go mode change 100644 => 100755 syslog_test.go mode change 100644 => 100755 testing_utils.go mode change 100644 => 100755 topics.go mode change 100644 => 100755 utils.go mode change 100644 => 100755 workers.go mode change 100644 => 100755 workers_test.go mode change 100644 => 100755 zk_coordinator.go mode change 100644 => 100755 zk_coordinator_test.go diff --git a/Dockerfile b/Dockerfile old mode 100644 new mode 100755 diff --git a/Dockerfile.mirrormaker b/Dockerfile.mirrormaker old mode 100644 new mode 100755 diff --git a/Godeps b/Godeps old mode 100644 new mode 100755 diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/Vagrantfile b/Vagrantfile old mode 100644 new mode 100755 diff --git a/api.go b/api.go old mode 100644 new mode 100755 diff --git a/avro/logline.go b/avro/logline.go old mode 100644 new mode 100755 diff --git a/avro_encoder_decoder.go b/avro_encoder_decoder.go old mode 100644 new mode 100755 diff --git a/avro_encoder_decoder_test.go b/avro_encoder_decoder_test.go old mode 100644 new mode 100755 diff --git a/config/consumer.properties b/config/consumer.properties old mode 100644 new mode 100755 diff --git a/consumer.go b/consumer.go old mode 100644 new mode 100755 diff --git a/consumer_config.go b/consumer_config.go old mode 100644 new mode 100755 diff --git a/consumer_test.go b/consumer_test.go old mode 100644 new mode 100755 diff --git a/consumers/Godeps b/consumers/Godeps old mode 100644 new mode 100755 diff --git a/consumers/consumers.go b/consumers/consumers.go old mode 100644 new mode 100755 index 93b1c3b..e8ae418 --- a/consumers/consumers.go +++ b/consumers/consumers.go @@ -20,7 +20,7 @@ package main import ( "fmt" metrics "github.com/rcrowley/go-metrics" - kafkaClient "github.com/stealthly/go_kafka_client" + kafkaClient "github.com/CrowdStrike/go_kafka_client" "net" "os" "os/signal" diff --git a/consumers/consumers.properties b/consumers/consumers.properties old mode 100644 new mode 100755 diff --git a/consumers/seelog.xml b/consumers/seelog.xml old mode 100644 new mode 100755 diff --git a/docs/emitters.md b/docs/emitters.md old mode 100644 new mode 100755 diff --git a/docs/offset_storage.md b/docs/offset_storage.md old mode 100644 new mode 100755 diff --git a/emitters_test.go b/emitters_test.go old mode 100644 new mode 100755 diff --git a/encoder_decoder.go b/encoder_decoder.go old mode 100644 new mode 100755 diff --git a/fetcher.go b/fetcher.go old mode 100644 new mode 100755 diff --git a/filter.go b/filter.go old mode 100644 new mode 100755 diff --git a/log_emitters.go b/log_emitters.go old mode 100644 new mode 100755 diff --git a/logger.go b/logger.go old mode 100644 new mode 100755 diff --git a/logline.avsc b/logline.avsc old mode 100644 new mode 100755 diff --git a/low_level_client.go b/low_level_client.go old mode 100644 new mode 100755 diff --git a/marathon/README.md b/marathon/README.md old mode 100644 new mode 100755 diff --git a/marathon/http_request.avsc b/marathon/http_request.avsc old mode 100644 new mode 100755 diff --git a/marathon/marathon_event_producer.go b/marathon/marathon_event_producer.go old mode 100644 new mode 100755 index 56ac08e..ef8016a --- a/marathon/marathon_event_producer.go +++ b/marathon/marathon_event_producer.go @@ -19,7 +19,7 @@ import ( "flag" "fmt" "github.com/stealthly/go-avro" - kafka "github.com/stealthly/go_kafka_client" + kafka "github.com/CrowdStrike/go_kafka_client" "os" "os/signal" "runtime" diff --git a/marathon_event_producer.go b/marathon_event_producer.go old mode 100644 new mode 100755 diff --git a/message_buffer.go b/message_buffer.go old mode 100644 new mode 100755 diff --git a/message_buffer_test.go b/message_buffer_test.go old mode 100644 new mode 100755 diff --git a/metrics.go b/metrics.go old mode 100644 new mode 100755 diff --git a/metrics_emitters.go b/metrics_emitters.go old mode 100644 new mode 100755 diff --git a/mirror_maker.go b/mirror_maker.go old mode 100644 new mode 100755 diff --git a/mirror_maker_test.go b/mirror_maker_test.go old mode 100644 new mode 100755 diff --git a/mirrormaker/README.md b/mirrormaker/README.md old mode 100644 new mode 100755 diff --git a/mirrormaker/consumer.config b/mirrormaker/consumer.config old mode 100644 new mode 100755 diff --git a/mirrormaker/logline.avsc b/mirrormaker/logline.avsc old mode 100644 new mode 100755 diff --git a/mirrormaker/mirror_maker.go b/mirrormaker/mirror_maker.go old mode 100644 new mode 100755 index 77ef0fe..fd6da26 --- a/mirrormaker/mirror_maker.go +++ b/mirrormaker/mirror_maker.go @@ -18,7 +18,7 @@ package main import ( "flag" "fmt" - kafka "github.com/stealthly/go_kafka_client" + kafka "github.com/CrowdStrike/go_kafka_client" "os" "os/signal" "runtime" diff --git a/mirrormaker/producer.config b/mirrormaker/producer.config old mode 100644 new mode 100755 diff --git a/partition_assignment.go b/partition_assignment.go old mode 100644 new mode 100755 diff --git a/partition_assignment_test.go b/partition_assignment_test.go old mode 100644 new mode 100755 diff --git a/perf/README.md b/perf/README.md old mode 100644 new mode 100755 diff --git a/perf/avro/timings.avsc b/perf/avro/timings.avsc old mode 100644 new mode 100755 diff --git a/perf/consumer/consumer.go b/perf/consumer/consumer.go old mode 100644 new mode 100755 index b3760a5..8772033 --- a/perf/consumer/consumer.go +++ b/perf/consumer/consumer.go @@ -19,7 +19,7 @@ import ( "flag" "os" "fmt" - kafka "github.com/stealthly/go_kafka_client" + kafka "github.com/CrowdStrike/go_kafka_client" "github.com/stealthly/go-avro" "strings" "os/signal" diff --git a/perf/mirror/mirror.go b/perf/mirror/mirror.go old mode 100644 new mode 100755 index d8fb501..286ccea --- a/perf/mirror/mirror.go +++ b/perf/mirror/mirror.go @@ -19,7 +19,7 @@ import ( "flag" "os" "fmt" - kafka "github.com/stealthly/go_kafka_client" + kafka "github.com/CrowdStrike/go_kafka_client" "github.com/stealthly/go-avro" "strings" "os/signal" diff --git a/perf/producer/producer.go b/perf/producer/producer.go old mode 100644 new mode 100755 index 2798f2a..b90f40f --- a/perf/producer/producer.go +++ b/perf/producer/producer.go @@ -20,7 +20,7 @@ import ( "fmt" "github.com/golang/protobuf/proto" "github.com/stealthly/go-avro" - kafka "github.com/stealthly/go_kafka_client" + kafka "github.com/CrowdStrike/go_kafka_client" sp "github.com/stealthly/go_kafka_client/syslog/syslog_proto" "os" "strings" diff --git a/producer.go b/producer.go old mode 100644 new mode 100755 diff --git a/producer_sarama.go b/producer_sarama.go old mode 100644 new mode 100755 diff --git a/producers/Godeps b/producers/Godeps old mode 100644 new mode 100755 diff --git a/producers/producers.go b/producers/producers.go old mode 100644 new mode 100755 index 5f8d527..82c5394 --- a/producers/producers.go +++ b/producers/producers.go @@ -22,7 +22,7 @@ import ( "fmt" "github.com/Shopify/sarama" metrics "github.com/rcrowley/go-metrics" - kafkaClient "github.com/stealthly/go_kafka_client" + kafkaClient "github.com/CrowdStrike/go_kafka_client" "net" "os" "os/signal" diff --git a/producers/producers.properties b/producers/producers.properties old mode 100644 new mode 100755 diff --git a/producers/seelog.xml b/producers/seelog.xml old mode 100644 new mode 100755 diff --git a/producers/tailf/tailf.go b/producers/tailf/tailf.go old mode 100644 new mode 100755 diff --git a/schema_registry.go b/schema_registry.go old mode 100644 new mode 100755 diff --git a/schema_registry_test.go b/schema_registry_test.go old mode 100644 new mode 100755 diff --git a/structs.go b/structs.go old mode 100644 new mode 100755 diff --git a/swtich/switch.go b/swtich/switch.go old mode 100644 new mode 100755 index 4f3a542..4e67b62 --- a/swtich/switch.go +++ b/swtich/switch.go @@ -1,7 +1,7 @@ package main import ( - kafka "github.com/stealthly/go_kafka_client" + kafka "github.com/CrowdStrike/go_kafka_client" "flag" "os" ) diff --git a/syslog/Dockerfile b/syslog/Dockerfile old mode 100644 new mode 100755 diff --git a/syslog/README.md b/syslog/README.md old mode 100644 new mode 100755 diff --git a/syslog/Vagrantfile b/syslog/Vagrantfile old mode 100644 new mode 100755 diff --git a/syslog/producer.properties b/syslog/producer.properties old mode 100644 new mode 100755 diff --git a/syslog/syslog.go b/syslog/syslog.go old mode 100644 new mode 100755 index 72dffd8..b00b7a7 --- a/syslog/syslog.go +++ b/syslog/syslog.go @@ -20,7 +20,7 @@ import ( "fmt" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" - kafka "github.com/stealthly/go_kafka_client" + kafka "github.com/CrowdStrike/go_kafka_client" sp "github.com/stealthly/go_kafka_client/syslog/syslog_proto" "math" "os" diff --git a/syslog/syslog_proto/logline.pb.go b/syslog/syslog_proto/logline.pb.go old mode 100644 new mode 100755 diff --git a/syslog/syslog_proto/logline.proto b/syslog/syslog_proto/logline.proto old mode 100644 new mode 100755 diff --git a/syslog_producer.go b/syslog_producer.go old mode 100644 new mode 100755 diff --git a/syslog_test.go b/syslog_test.go old mode 100644 new mode 100755 diff --git a/testing_utils.go b/testing_utils.go old mode 100644 new mode 100755 diff --git a/topics.go b/topics.go old mode 100644 new mode 100755 diff --git a/utils.go b/utils.go old mode 100644 new mode 100755 diff --git a/workers.go b/workers.go old mode 100644 new mode 100755 diff --git a/workers_test.go b/workers_test.go old mode 100644 new mode 100755 diff --git a/zk_coordinator.go b/zk_coordinator.go old mode 100644 new mode 100755 index 94cce3f..974fdd4 --- a/zk_coordinator.go +++ b/zk_coordinator.go @@ -707,7 +707,7 @@ func (this *ZookeeperCoordinator) waitForMembersToJoin(barrierPath string, expec blackholeFunc := func(blackhole <-chan zk.Event) { <-blackhole } - + for { select { // Using a priority select to provide precedence to the stop chan @@ -727,7 +727,7 @@ func (this *ZookeeperCoordinator) waitForMembersToJoin(barrierPath string, expec // Haven't seen all expected consumers on this barrier path. Watch for changes to the path... select { case <-stopChan: - go blackholeFunc(zkMemberJoinedWatcher) + go blackholeFunc(zkMemberJoinedWatcher) return case <-zkMemberJoinedWatcher: continue diff --git a/zk_coordinator_test.go b/zk_coordinator_test.go old mode 100644 new mode 100755 From e94ad12053bc242cfadfb137874cc0e6815e04c9 Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 10:10:22 -0700 Subject: [PATCH 04/13] Various fixes to make re-balancing and partition ownership more stable. --- consumer.go | 41 ++++++++++++++++++++++++++++++++++++++--- zk_coordinator.go | 42 +++++++++++++++++++++++++++++++----------- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/consumer.go b/consumer.go index 2e97e03..2d9d886 100755 --- a/consumer.go +++ b/consumer.go @@ -54,6 +54,7 @@ type Consumer struct { workerManagersLock sync.Mutex stopStreams chan bool close chan bool + stopCleanup chan struct{} topicCount TopicsToNumStreams metrics *ConsumerMetrics @@ -175,6 +176,7 @@ func (c *Consumer) StartStaticPartitions(topicPartitionMap map[string][]int32) { } func (c *Consumer) startStreams() { + c.maintainCleanCoordinator() stopRedirects := make(map[TopicAndPartition]chan bool) for { select { @@ -210,6 +212,27 @@ func (c *Consumer) startStreams() { } } +// maintainCleanCoordinator runs on an interval to make sure that the coordinator removes old API requests to remove bloat from the coordinator over time. +func (c *Consumer) maintainCleanCoordinator() { + if c.stopCleanup != nil { + return + } + + c.stopCleanup = make(chan struct{}) + go func() { + tick := time.NewTicker(5 * time.Minute) + for { + select { + case <-tick.C: + Infof(c, "Starting coordinator cleanup of API reqeusts for %s", c.config.Groupid) + c.config.Coordinator.RemoveOldApiRequests(c.config.Groupid) + case <-c.stopCleanup: + return + } + } + }() +} + func (c *Consumer) pipeChannels(stopRedirects map[TopicAndPartition]chan bool) { inLock(&c.workerManagersLock, func() { Debugf(c, "connect channels registry: %v", c.topicRegistry) @@ -319,10 +342,10 @@ func (c *Consumer) Close() <-chan bool { c.stopStreams <- true - c.releasePartitionOwnership(c.topicRegistry) - Info(c, "Deregistering consumer") c.config.Coordinator.DeregisterConsumer(c.config.Consumerid, c.config.Groupid) + c.stopCleanup <- struct{}{} // Stop the background cleanup job. + c.stopCleanup = nil // Reset it so it can be used again. Info(c, "Successfully deregistered consumer") Info(c, "Closing low-level client") @@ -331,6 +354,10 @@ func (c *Consumer) Close() <-chan bool { c.config.Coordinator.Disconnect() Info(c, "Disconnected from consumer coordinator") + // Other consumers will wait to take partition ownership until the ownership in the coordinator is released + // As such it should be one of the last things we do to prevent duplicate ownership or "released" ownership but the consumer is still running. + c.releasePartitionOwnership(c.topicRegistry) + Info(c, "Unregistering all metrics") c.metrics.close() Info(c, "Unregistered all metrics") @@ -599,7 +626,7 @@ func (c *Consumer) rebalance() { var context *assignmentContext var err error barrierPassed := false - timeLimit := time.Now().Add(3*time.Minute) + timeLimit := time.Now().Add(3 * time.Minute) for !barrierPassed && time.Now().Before(timeLimit) { context, err = newAssignmentContext(c.config.Groupid, c.config.Consumerid, c.config.ExcludeInternalTopics, c.config.Coordinator) @@ -622,6 +649,13 @@ func (c *Consumer) rebalance() { barrierPassed = c.config.Coordinator.AwaitOnStateBarrier(c.config.Consumerid, c.config.Groupid, stateHash, barrierSize, string(Rebalance), barrierTimeout) + if !barrierPassed { + // If the barrier failed to have consensus remove it. + err = c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, stateHash, string(Rebalance)) + if err != nil { + Warnf(c, "Failed to remove state barrier %s due to: %s", stateHash, err.Error()) + } + } } if !barrierPassed { panic("Could not reach consensus on state barrier.") @@ -642,6 +676,7 @@ func (c *Consumer) rebalance() { if !success && !c.isShuttingdown { panic(fmt.Sprintf("Failed to rebalance after %d retries", c.config.RebalanceMaxRetries)) } else { + c.config.Coordinator.RemoveStateBarrier(c.config.Groupid, fmt.Sprintf("%s-ack", c.lastSuccessfulRebalanceHash), string(Rebalance)) c.lastSuccessfulRebalanceHash = stateHash Info(c, "Rebalance has been successfully completed") } diff --git a/zk_coordinator.go b/zk_coordinator.go index 0b897c6..9dac08f 100755 --- a/zk_coordinator.go +++ b/zk_coordinator.go @@ -476,7 +476,7 @@ func (this *ZookeeperCoordinator) SubscribeForChanges(Groupid string) (events <- func (this *ZookeeperCoordinator) trySubscribeForChanges(Groupid string) (<-chan CoordinatorEvent, error) { var changes chan CoordinatorEvent if _, ok := this.watches[Groupid]; !ok { - changes = make(chan CoordinatorEvent) + changes = make(chan CoordinatorEvent, 100) this.watches[Groupid] = changes } else { changes = this.watches[Groupid] @@ -509,9 +509,10 @@ func (this *ZookeeperCoordinator) trySubscribeForChanges(Groupid string) (<-chan go func() { for { select { - case e, ok := <-zkEvents: + case e := <-zkEvents: { - if ok && e.Type != zk.EventNotWatching && e.State != zk.StateDisconnected { + Infof(this, "Received zkEvent Type: %s State: %s Path: %s", e.Type.String(), e.State.String(), e.Path) + if e.Type != zk.EventNotWatching && e.State != zk.StateDisconnected { if strings.HasPrefix(e.Path, fmt.Sprintf("%s/%s", newZKGroupDirs(this.config.Root, Groupid).ConsumerApiDir, BlueGreenDeploymentAPI)) { changes <- BlueGreenRequest @@ -520,7 +521,6 @@ func (this *ZookeeperCoordinator) trySubscribeForChanges(Groupid string) (<-chan } } - Debugf(this, "Event path %s", e.Path) if strings.HasPrefix(e.Path, newZKGroupDirs(this.config.Root, Groupid).ConsumerRegistryDir) { Info(this, "Trying to renew watcher for consumer registry") consumersWatcher, err = this.getConsumersInGroupWatcher(Groupid) @@ -658,6 +658,9 @@ func (this *ZookeeperCoordinator) RemoveOldApiRequests(group string) (err error) func (this *ZookeeperCoordinator) tryRemoveOldApiRequests(group string, api ConsumerGroupApi) error { requests := make([]string, 0) var err error + var data []byte + var t int64 + apiPath := fmt.Sprintf("%s/%s", newZKGroupDirs(this.config.Root, group).ConsumerApiDir, api) for i := 0; i <= this.config.MaxRequestRetries; i++ { requests, _, err = this.zkConn.Children(apiPath) @@ -665,7 +668,18 @@ func (this *ZookeeperCoordinator) tryRemoveOldApiRequests(group string, api Cons continue } for _, request := range requests { - err = this.deleteNode(fmt.Sprintf("%s/%s", apiPath, request)) + childPath := fmt.Sprintf("%s/%s", apiPath, request) + if data, _, err = this.zkConn.Get(childPath); err != nil && err != zk.ErrNoNode { + // It's possible another consumer deleted the node before we could read it's data + break + } + + if t, err = strconv.ParseInt(string(data), 10, 64); err == nil && !time.Unix(t, 0).Before(time.Now()) { + // Don't delete if this zk node has a timestamp as the data and the timestamp is still valid + continue + } + // If the data is not a timestamp or is a timestamp but has reached expiration delete it + err = this.deleteNode(childPath) if err != nil && err != zk.ErrNoNode { break } @@ -689,12 +703,12 @@ func (this *ZookeeperCoordinator) AwaitOnStateBarrier(consumerId string, group s barrierTimeout := barrierExpiration.Sub(time.Now()) go this.waitForMembersToJoin(barrierPath, barrierSize, membershipDoneChan, stopChan) select { - case err = <- membershipDoneChan: + case err = <-membershipDoneChan: // break the select break - case <- time.After(barrierTimeout): + case <-time.After(barrierTimeout): stopChan <- struct{}{} - err = fmt.Errorf("Timedout waiting for consensus on barrier path %s", barrierPath) + err = fmt.Errorf("Timed out waiting for consensus on barrier path %s", barrierPath) } } @@ -716,7 +730,7 @@ func (this *ZookeeperCoordinator) joinStateBarrier(barrierPath, consumerId strin // Attempt to create the barrier path, with a shared deadline _, err = this.zkConn.Create(barrierPath, []byte(strconv.FormatInt(deadline.Unix(), 10)), 0, zk.WorldACL(zk.PermAll)) if err != nil { - if err != zk.ErrNodeExists{ + if err != zk.ErrNodeExists { continue } // If the barrier path already exists, read it's value @@ -729,7 +743,8 @@ func (this *ZookeeperCoordinator) joinStateBarrier(barrierPath, consumerId strin } } // Register our consumerId as a child node on the barrierPath. This should notify other consumers we have joined. - if err = this.createOrUpdatePathParentMayNotExistFailSafe(fmt.Sprintf("%s/%s", barrierPath, consumerId), make([]byte, 0)); err == nil || err == zk.ErrNodeExists { + // Need to join as an ephemeral node to ensure that if the barrier Id is re-used we aren't permanently registered giving false counts. + if _, err = this.zkConn.Create(fmt.Sprintf("%s/%s", barrierPath, consumerId), make([]byte, 0), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)); err == nil || err == zk.ErrNodeExists { Infof(this, "Successfully joined state barrier %s", barrierPath) return deadline, nil } @@ -738,7 +753,7 @@ func (this *ZookeeperCoordinator) joinStateBarrier(barrierPath, consumerId strin return time.Now(), fmt.Errorf("Failed to join state barrier %s after %d retries", barrierPath, this.config.MaxRequestRetries) } -func (this *ZookeeperCoordinator) waitForMembersToJoin(barrierPath string, expected int, doneChan chan <- error, stopChan <- chan struct{}) { +func (this *ZookeeperCoordinator) waitForMembersToJoin(barrierPath string, expected int, doneChan chan<- error, stopChan <-chan struct{}) { // Make sure we clean up the channel. defer close(doneChan) @@ -856,6 +871,11 @@ func (this *ZookeeperCoordinator) tryClaimPartitionOwnership(group string, topic if err != nil { if err == zk.ErrNodeExists { + var data []byte + if data, _, err = this.zkConn.Get(pathToOwn); err == nil && string(data) == consumerThreadId.String() { + // If the current owner of the partition is the same consumer Id as the current one, carry on. + return true, nil + } Debugf(consumerThreadId, "waiting for the partition ownership to be deleted: %d", partition) return false, nil } else { From b2bfe4efa9e718feb3cbe119c01022ffa576d66c Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 10:42:13 -0700 Subject: [PATCH 05/13] Fixed file permissions. --- Dockerfile | 0 Dockerfile.mirrormaker | 0 Godeps | 0 LICENSE | 0 README.md | 0 Vagrantfile | 0 api.go | 0 avro_encoder_decoder.go | 0 avro_encoder_decoder_test.go | 0 consumer.go | 0 consumer_config.go | 0 consumer_test.go | 0 emitters_test.go | 0 encoder_decoder.go | 0 fetcher.go | 0 filter.go | 0 log_emitters.go | 0 logger.go | 0 logline.avsc | 0 low_level_client.go | 0 marathon_event_producer.go | 0 message_buffer.go | 0 message_buffer_test.go | 0 metrics.go | 0 metrics_emitters.go | 0 mirror_maker.go | 0 mirror_maker_test.go | 0 partition_assignment.go | 0 partition_assignment_test.go | 0 producer.go | 0 producer_sarama.go | 0 run-tests.sh | 0 schema_registry.go | 0 schema_registry_test.go | 0 structs.go | 0 syslog_producer.go | 0 syslog_test.go | 0 testing_utils.go | 0 topics.go | 0 utils.go | 0 workers.go | 0 workers_test.go | 0 zk_coordinator.go | 0 zk_coordinator_test.go | 0 44 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 Dockerfile mode change 100755 => 100644 Dockerfile.mirrormaker mode change 100755 => 100644 Godeps mode change 100755 => 100644 LICENSE mode change 100755 => 100644 README.md mode change 100755 => 100644 Vagrantfile mode change 100755 => 100644 api.go mode change 100755 => 100644 avro_encoder_decoder.go mode change 100755 => 100644 avro_encoder_decoder_test.go mode change 100755 => 100644 consumer.go mode change 100755 => 100644 consumer_config.go mode change 100755 => 100644 consumer_test.go mode change 100755 => 100644 emitters_test.go mode change 100755 => 100644 encoder_decoder.go mode change 100755 => 100644 fetcher.go mode change 100755 => 100644 filter.go mode change 100755 => 100644 log_emitters.go mode change 100755 => 100644 logger.go mode change 100755 => 100644 logline.avsc mode change 100755 => 100644 low_level_client.go mode change 100755 => 100644 marathon_event_producer.go mode change 100755 => 100644 message_buffer.go mode change 100755 => 100644 message_buffer_test.go mode change 100755 => 100644 metrics.go mode change 100755 => 100644 metrics_emitters.go mode change 100755 => 100644 mirror_maker.go mode change 100755 => 100644 mirror_maker_test.go mode change 100755 => 100644 partition_assignment.go mode change 100755 => 100644 partition_assignment_test.go mode change 100755 => 100644 producer.go mode change 100755 => 100644 producer_sarama.go mode change 100755 => 100644 run-tests.sh mode change 100755 => 100644 schema_registry.go mode change 100755 => 100644 schema_registry_test.go mode change 100755 => 100644 structs.go mode change 100755 => 100644 syslog_producer.go mode change 100755 => 100644 syslog_test.go mode change 100755 => 100644 testing_utils.go mode change 100755 => 100644 topics.go mode change 100755 => 100644 utils.go mode change 100755 => 100644 workers.go mode change 100755 => 100644 workers_test.go mode change 100755 => 100644 zk_coordinator.go mode change 100755 => 100644 zk_coordinator_test.go diff --git a/Dockerfile b/Dockerfile old mode 100755 new mode 100644 diff --git a/Dockerfile.mirrormaker b/Dockerfile.mirrormaker old mode 100755 new mode 100644 diff --git a/Godeps b/Godeps old mode 100755 new mode 100644 diff --git a/LICENSE b/LICENSE old mode 100755 new mode 100644 diff --git a/README.md b/README.md old mode 100755 new mode 100644 diff --git a/Vagrantfile b/Vagrantfile old mode 100755 new mode 100644 diff --git a/api.go b/api.go old mode 100755 new mode 100644 diff --git a/avro_encoder_decoder.go b/avro_encoder_decoder.go old mode 100755 new mode 100644 diff --git a/avro_encoder_decoder_test.go b/avro_encoder_decoder_test.go old mode 100755 new mode 100644 diff --git a/consumer.go b/consumer.go old mode 100755 new mode 100644 diff --git a/consumer_config.go b/consumer_config.go old mode 100755 new mode 100644 diff --git a/consumer_test.go b/consumer_test.go old mode 100755 new mode 100644 diff --git a/emitters_test.go b/emitters_test.go old mode 100755 new mode 100644 diff --git a/encoder_decoder.go b/encoder_decoder.go old mode 100755 new mode 100644 diff --git a/fetcher.go b/fetcher.go old mode 100755 new mode 100644 diff --git a/filter.go b/filter.go old mode 100755 new mode 100644 diff --git a/log_emitters.go b/log_emitters.go old mode 100755 new mode 100644 diff --git a/logger.go b/logger.go old mode 100755 new mode 100644 diff --git a/logline.avsc b/logline.avsc old mode 100755 new mode 100644 diff --git a/low_level_client.go b/low_level_client.go old mode 100755 new mode 100644 diff --git a/marathon_event_producer.go b/marathon_event_producer.go old mode 100755 new mode 100644 diff --git a/message_buffer.go b/message_buffer.go old mode 100755 new mode 100644 diff --git a/message_buffer_test.go b/message_buffer_test.go old mode 100755 new mode 100644 diff --git a/metrics.go b/metrics.go old mode 100755 new mode 100644 diff --git a/metrics_emitters.go b/metrics_emitters.go old mode 100755 new mode 100644 diff --git a/mirror_maker.go b/mirror_maker.go old mode 100755 new mode 100644 diff --git a/mirror_maker_test.go b/mirror_maker_test.go old mode 100755 new mode 100644 diff --git a/partition_assignment.go b/partition_assignment.go old mode 100755 new mode 100644 diff --git a/partition_assignment_test.go b/partition_assignment_test.go old mode 100755 new mode 100644 diff --git a/producer.go b/producer.go old mode 100755 new mode 100644 diff --git a/producer_sarama.go b/producer_sarama.go old mode 100755 new mode 100644 diff --git a/run-tests.sh b/run-tests.sh old mode 100755 new mode 100644 diff --git a/schema_registry.go b/schema_registry.go old mode 100755 new mode 100644 diff --git a/schema_registry_test.go b/schema_registry_test.go old mode 100755 new mode 100644 diff --git a/structs.go b/structs.go old mode 100755 new mode 100644 diff --git a/syslog_producer.go b/syslog_producer.go old mode 100755 new mode 100644 diff --git a/syslog_test.go b/syslog_test.go old mode 100755 new mode 100644 diff --git a/testing_utils.go b/testing_utils.go old mode 100755 new mode 100644 diff --git a/topics.go b/topics.go old mode 100755 new mode 100644 diff --git a/utils.go b/utils.go old mode 100755 new mode 100644 diff --git a/workers.go b/workers.go old mode 100755 new mode 100644 diff --git a/workers_test.go b/workers_test.go old mode 100755 new mode 100644 diff --git a/zk_coordinator.go b/zk_coordinator.go old mode 100755 new mode 100644 diff --git a/zk_coordinator_test.go b/zk_coordinator_test.go old mode 100755 new mode 100644 From f91f84fc75041c0ea6fab7e83d6744465127515d Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 10:43:06 -0700 Subject: [PATCH 06/13] Fixed file permissions. --- avro/logline.go | 0 config/consumer.properties | 0 consumers/Godeps | 0 consumers/consumers.go | 0 consumers/consumers.properties | 0 consumers/seelog.xml | 0 docs/emitters.md | 0 docs/offset_storage.md | 0 mirrormaker/README.md | 0 mirrormaker/consumer.config | 0 mirrormaker/logline.avsc | 0 mirrormaker/mirror_maker.go | 0 mirrormaker/producer.config | 0 perf/README.md | 0 producers/Godeps | 0 producers/producers.go | 0 producers/producers.properties | 0 producers/seelog.xml | 0 syslog/Dockerfile | 0 syslog/README.md | 0 syslog/Vagrantfile | 0 syslog/producer.properties | 0 syslog/syslog.go | 0 vagrant/prep.sh | 0 24 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 avro/logline.go mode change 100755 => 100644 config/consumer.properties mode change 100755 => 100644 consumers/Godeps mode change 100755 => 100644 consumers/consumers.go mode change 100755 => 100644 consumers/consumers.properties mode change 100755 => 100644 consumers/seelog.xml mode change 100755 => 100644 docs/emitters.md mode change 100755 => 100644 docs/offset_storage.md mode change 100755 => 100644 mirrormaker/README.md mode change 100755 => 100644 mirrormaker/consumer.config mode change 100755 => 100644 mirrormaker/logline.avsc mode change 100755 => 100644 mirrormaker/mirror_maker.go mode change 100755 => 100644 mirrormaker/producer.config mode change 100755 => 100644 perf/README.md mode change 100755 => 100644 producers/Godeps mode change 100755 => 100644 producers/producers.go mode change 100755 => 100644 producers/producers.properties mode change 100755 => 100644 producers/seelog.xml mode change 100755 => 100644 syslog/Dockerfile mode change 100755 => 100644 syslog/README.md mode change 100755 => 100644 syslog/Vagrantfile mode change 100755 => 100644 syslog/producer.properties mode change 100755 => 100644 syslog/syslog.go mode change 100755 => 100644 vagrant/prep.sh diff --git a/avro/logline.go b/avro/logline.go old mode 100755 new mode 100644 diff --git a/config/consumer.properties b/config/consumer.properties old mode 100755 new mode 100644 diff --git a/consumers/Godeps b/consumers/Godeps old mode 100755 new mode 100644 diff --git a/consumers/consumers.go b/consumers/consumers.go old mode 100755 new mode 100644 diff --git a/consumers/consumers.properties b/consumers/consumers.properties old mode 100755 new mode 100644 diff --git a/consumers/seelog.xml b/consumers/seelog.xml old mode 100755 new mode 100644 diff --git a/docs/emitters.md b/docs/emitters.md old mode 100755 new mode 100644 diff --git a/docs/offset_storage.md b/docs/offset_storage.md old mode 100755 new mode 100644 diff --git a/mirrormaker/README.md b/mirrormaker/README.md old mode 100755 new mode 100644 diff --git a/mirrormaker/consumer.config b/mirrormaker/consumer.config old mode 100755 new mode 100644 diff --git a/mirrormaker/logline.avsc b/mirrormaker/logline.avsc old mode 100755 new mode 100644 diff --git a/mirrormaker/mirror_maker.go b/mirrormaker/mirror_maker.go old mode 100755 new mode 100644 diff --git a/mirrormaker/producer.config b/mirrormaker/producer.config old mode 100755 new mode 100644 diff --git a/perf/README.md b/perf/README.md old mode 100755 new mode 100644 diff --git a/producers/Godeps b/producers/Godeps old mode 100755 new mode 100644 diff --git a/producers/producers.go b/producers/producers.go old mode 100755 new mode 100644 diff --git a/producers/producers.properties b/producers/producers.properties old mode 100755 new mode 100644 diff --git a/producers/seelog.xml b/producers/seelog.xml old mode 100755 new mode 100644 diff --git a/syslog/Dockerfile b/syslog/Dockerfile old mode 100755 new mode 100644 diff --git a/syslog/README.md b/syslog/README.md old mode 100755 new mode 100644 diff --git a/syslog/Vagrantfile b/syslog/Vagrantfile old mode 100755 new mode 100644 diff --git a/syslog/producer.properties b/syslog/producer.properties old mode 100755 new mode 100644 diff --git a/syslog/syslog.go b/syslog/syslog.go old mode 100755 new mode 100644 diff --git a/vagrant/prep.sh b/vagrant/prep.sh old mode 100755 new mode 100644 From e709f072e0c99245a98fd27c2e2eb8edcbe3dedf Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 10:51:00 -0700 Subject: [PATCH 07/13] Fixed file permissions. --- marathon/README.md | 0 marathon/http_request.avsc | 0 marathon/marathon_event_producer.go | 0 perf/avro/timings.avsc | 0 perf/consumer/consumer.go | 0 perf/mirror/mirror.go | 0 perf/producer/producer.go | 0 producers/tailf/tailf.go | 0 run-tests.sh | 0 9 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 marathon/README.md mode change 100755 => 100644 marathon/http_request.avsc mode change 100755 => 100644 marathon/marathon_event_producer.go mode change 100755 => 100644 perf/avro/timings.avsc mode change 100755 => 100644 perf/consumer/consumer.go mode change 100755 => 100644 perf/mirror/mirror.go mode change 100755 => 100644 perf/producer/producer.go mode change 100755 => 100644 producers/tailf/tailf.go mode change 100644 => 100755 run-tests.sh diff --git a/marathon/README.md b/marathon/README.md old mode 100755 new mode 100644 diff --git a/marathon/http_request.avsc b/marathon/http_request.avsc old mode 100755 new mode 100644 diff --git a/marathon/marathon_event_producer.go b/marathon/marathon_event_producer.go old mode 100755 new mode 100644 diff --git a/perf/avro/timings.avsc b/perf/avro/timings.avsc old mode 100755 new mode 100644 diff --git a/perf/consumer/consumer.go b/perf/consumer/consumer.go old mode 100755 new mode 100644 diff --git a/perf/mirror/mirror.go b/perf/mirror/mirror.go old mode 100755 new mode 100644 diff --git a/perf/producer/producer.go b/perf/producer/producer.go old mode 100755 new mode 100644 diff --git a/producers/tailf/tailf.go b/producers/tailf/tailf.go old mode 100755 new mode 100644 diff --git a/run-tests.sh b/run-tests.sh old mode 100644 new mode 100755 From 6df590460c7c652532a7f348b05a8afbe0772b1a Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 10:52:48 -0700 Subject: [PATCH 08/13] Fixed file permissions. --- swtich/switch.go | 0 syslog/syslog_proto/logline.pb.go | 0 syslog/syslog_proto/logline.proto | 0 3 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 swtich/switch.go mode change 100755 => 100644 syslog/syslog_proto/logline.pb.go mode change 100755 => 100644 syslog/syslog_proto/logline.proto diff --git a/swtich/switch.go b/swtich/switch.go old mode 100755 new mode 100644 diff --git a/syslog/syslog_proto/logline.pb.go b/syslog/syslog_proto/logline.pb.go old mode 100755 new mode 100644 diff --git a/syslog/syslog_proto/logline.proto b/syslog/syslog_proto/logline.proto old mode 100755 new mode 100644 From 895b003e45654f3ed038dbda43fc197d1e6469a8 Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 10:53:24 -0700 Subject: [PATCH 09/13] Fixed file permissions. --- vagrant/prep.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 vagrant/prep.sh diff --git a/vagrant/prep.sh b/vagrant/prep.sh old mode 100644 new mode 100755 From fe4a451e2c58aa0f834750fe1372bd1b5f888b0d Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 11:03:39 -0700 Subject: [PATCH 10/13] Synced subfolders imports back to stealthly. --- consumers/consumers.go | 2 +- marathon/marathon_event_producer.go | 2 +- mirrormaker/mirror_maker.go | 2 +- perf/consumer/consumer.go | 2 +- perf/mirror/mirror.go | 2 +- perf/producer/producer.go | 2 +- producers/producers.go | 2 +- swtich/switch.go | 2 +- syslog/syslog.go | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/consumers/consumers.go b/consumers/consumers.go index e8ae418..93b1c3b 100644 --- a/consumers/consumers.go +++ b/consumers/consumers.go @@ -20,7 +20,7 @@ package main import ( "fmt" metrics "github.com/rcrowley/go-metrics" - kafkaClient "github.com/CrowdStrike/go_kafka_client" + kafkaClient "github.com/stealthly/go_kafka_client" "net" "os" "os/signal" diff --git a/marathon/marathon_event_producer.go b/marathon/marathon_event_producer.go index ef8016a..56ac08e 100644 --- a/marathon/marathon_event_producer.go +++ b/marathon/marathon_event_producer.go @@ -19,7 +19,7 @@ import ( "flag" "fmt" "github.com/stealthly/go-avro" - kafka "github.com/CrowdStrike/go_kafka_client" + kafka "github.com/stealthly/go_kafka_client" "os" "os/signal" "runtime" diff --git a/mirrormaker/mirror_maker.go b/mirrormaker/mirror_maker.go index fd6da26..77ef0fe 100644 --- a/mirrormaker/mirror_maker.go +++ b/mirrormaker/mirror_maker.go @@ -18,7 +18,7 @@ package main import ( "flag" "fmt" - kafka "github.com/CrowdStrike/go_kafka_client" + kafka "github.com/stealthly/go_kafka_client" "os" "os/signal" "runtime" diff --git a/perf/consumer/consumer.go b/perf/consumer/consumer.go index 8772033..b3760a5 100644 --- a/perf/consumer/consumer.go +++ b/perf/consumer/consumer.go @@ -19,7 +19,7 @@ import ( "flag" "os" "fmt" - kafka "github.com/CrowdStrike/go_kafka_client" + kafka "github.com/stealthly/go_kafka_client" "github.com/stealthly/go-avro" "strings" "os/signal" diff --git a/perf/mirror/mirror.go b/perf/mirror/mirror.go index 286ccea..d8fb501 100644 --- a/perf/mirror/mirror.go +++ b/perf/mirror/mirror.go @@ -19,7 +19,7 @@ import ( "flag" "os" "fmt" - kafka "github.com/CrowdStrike/go_kafka_client" + kafka "github.com/stealthly/go_kafka_client" "github.com/stealthly/go-avro" "strings" "os/signal" diff --git a/perf/producer/producer.go b/perf/producer/producer.go index b90f40f..2798f2a 100644 --- a/perf/producer/producer.go +++ b/perf/producer/producer.go @@ -20,7 +20,7 @@ import ( "fmt" "github.com/golang/protobuf/proto" "github.com/stealthly/go-avro" - kafka "github.com/CrowdStrike/go_kafka_client" + kafka "github.com/stealthly/go_kafka_client" sp "github.com/stealthly/go_kafka_client/syslog/syslog_proto" "os" "strings" diff --git a/producers/producers.go b/producers/producers.go index 82c5394..5f8d527 100644 --- a/producers/producers.go +++ b/producers/producers.go @@ -22,7 +22,7 @@ import ( "fmt" "github.com/Shopify/sarama" metrics "github.com/rcrowley/go-metrics" - kafkaClient "github.com/CrowdStrike/go_kafka_client" + kafkaClient "github.com/stealthly/go_kafka_client" "net" "os" "os/signal" diff --git a/swtich/switch.go b/swtich/switch.go index 4e67b62..4f3a542 100644 --- a/swtich/switch.go +++ b/swtich/switch.go @@ -1,7 +1,7 @@ package main import ( - kafka "github.com/CrowdStrike/go_kafka_client" + kafka "github.com/stealthly/go_kafka_client" "flag" "os" ) diff --git a/syslog/syslog.go b/syslog/syslog.go index b00b7a7..72dffd8 100644 --- a/syslog/syslog.go +++ b/syslog/syslog.go @@ -20,7 +20,7 @@ import ( "fmt" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" - kafka "github.com/CrowdStrike/go_kafka_client" + kafka "github.com/stealthly/go_kafka_client" sp "github.com/stealthly/go_kafka_client/syslog/syslog_proto" "math" "os" From e4ebaca9438a47e6b43b85944d7526cbefb4071d Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 12:10:03 -0700 Subject: [PATCH 11/13] Fixed bug where releasing partition ownership happened after zk connection was closed. --- consumer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index d184e6c..a8bda46 100644 --- a/consumer.go +++ b/consumer.go @@ -371,12 +371,12 @@ func (c *Consumer) Close() <-chan bool { Info(c, "Closing low-level client") c.config.LowLevelClient.Close() Info(c, "Disconnecting from consumer coordinator") - c.config.Coordinator.Disconnect() - Info(c, "Disconnected from consumer coordinator") - // Other consumers will wait to take partition ownership until the ownership in the coordinator is released // As such it should be one of the last things we do to prevent duplicate ownership or "released" ownership but the consumer is still running. c.releasePartitionOwnership(c.topicRegistry) + c.config.Coordinator.Disconnect() + Info(c, "Disconnected from consumer coordinator") + Info(c, "Unregistering all metrics") c.metrics.close() From 2ec223fe6de0bdf10a0351ae0f79b3e0b788a586 Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 14:47:27 -0700 Subject: [PATCH 12/13] Fixed an issue where blue green switches weren't being handled gracefully. --- zk_coordinator.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/zk_coordinator.go b/zk_coordinator.go index 34f17ea..165f45c 100644 --- a/zk_coordinator.go +++ b/zk_coordinator.go @@ -669,19 +669,27 @@ func (this *ZookeeperCoordinator) tryRemoveOldApiRequests(group string, api Cons } for _, request := range requests { childPath := fmt.Sprintf("%s/%s", apiPath, request) - if data, _, err = this.zkConn.Get(childPath); err != nil && err != zk.ErrNoNode { - // It's possible another consumer deleted the node before we could read it's data - break + if api == Rebalance { + if data, _, err = this.zkConn.Get(childPath); err != nil && err != zk.ErrNoNode { + // It's possible another consumer deleted the node before we could read it's data + break + } + if t, err = strconv.ParseInt(string(data), 10, 64); err != nil { + t = int64(0) // If the data isn't a timestamp ensure it will be deleted anyway. + } + } else if api == BlueGreenRequest { + if t, err = strconv.ParseInt(string(request), 10, 64); err != nil { + break + } } - if t, err = strconv.ParseInt(string(data), 10, 64); err == nil && !time.Unix(t, 0).Before(time.Now()) { - // Don't delete if this zk node has a timestamp as the data and the timestamp is still valid - continue - } - // If the data is not a timestamp or is a timestamp but has reached expiration delete it - err = this.deleteNode(childPath) - if err != nil && err != zk.ErrNoNode { - break + // Don't delete if this zk node has a timestamp as the data and the timestamp is still valid + if !time.Unix(t, 0).Before(time.Now().Add(-10*time.Minute)) { + // If the data is not a timestamp or is a timestamp but has reached expiration delete it + err = this.deleteNode(childPath) + if err != nil && err != zk.ErrNoNode { + break + } } } } From 6f2394da5e8de3c1d7054b98c525914559ab8abc Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 28 Jul 2015 14:51:23 -0700 Subject: [PATCH 13/13] Fixed an issue where blue green switches weren't being handled gracefully. --- zk_coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zk_coordinator.go b/zk_coordinator.go index 165f45c..c47c1a2 100644 --- a/zk_coordinator.go +++ b/zk_coordinator.go @@ -677,7 +677,7 @@ func (this *ZookeeperCoordinator) tryRemoveOldApiRequests(group string, api Cons if t, err = strconv.ParseInt(string(data), 10, 64); err != nil { t = int64(0) // If the data isn't a timestamp ensure it will be deleted anyway. } - } else if api == BlueGreenRequest { + } else if api == BlueGreenDeploymentAPI { if t, err = strconv.ParseInt(string(request), 10, 64); err != nil { break }