From bf804fa71d7ceea9b7f9c2ed1e526db793121fc4 Mon Sep 17 00:00:00 2001 From: Pablo Fischer Date: Wed, 25 Nov 2015 10:16:17 +0000 Subject: [PATCH 1/2] Let the NumWorkers be set via consumer configs --- mirror_maker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mirror_maker.go b/mirror_maker.go index db6ffef..02780db 100644 --- a/mirror_maker.go +++ b/mirror_maker.go @@ -157,7 +157,8 @@ func (this *MirrorMaker) startConsumers() { if err != nil { panic(err) } - config.NumWorkers = 1 + //Let the NumWorkers be set via consumer configs + //config.NumWorkers = 1 config.AutoOffsetReset = SmallestOffset config.Coordinator = NewZookeeperCoordinator(zkConfig) config.WorkerFailureCallback = func(_ *WorkerManager) FailedDecision { From d19056cc2ffd5a301097d4a96e02c22c6af7e5b3 Mon Sep 17 00:00:00 2001 From: Pablo Fischer Date: Wed, 25 Nov 2015 10:17:51 +0000 Subject: [PATCH 2/2] ProducerMessage keys and values are in byte slices, so for the key (the partition), use all the slices otherwise the max we can go is 127 for partition number --- producer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/producer.go b/producer.go index 5d5ce9c..b3d1dce 100644 --- a/producer.go +++ b/producer.go @@ -176,9 +176,12 @@ func (this *FixedPartitioner) Partition(key []byte, numPartitions int32) (int32, if key == nil { panic("FixedPartitioner does not work without keys.") } - partition, err := binary.ReadUvarint(bytes.NewBuffer(key)) - if err != nil { - return -1, err + + var partition int32 + buf := bytes.NewBuffer(key) + binary.Read(buf, binary.LittleEndian, &partition) + if (partition < 0) { + return -1, errors.New("Partition turned to be -1 (too big to be int32 little endian?)") } return int32(partition) % numPartitions, nil