From 83504cf06153e01a4896cbe7d783cf4b239e901c Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Fri, 17 Jul 2015 07:09:57 -0400 Subject: [PATCH] Allocate a new taskBatch for each batch of tasks in the WorkerManager --- workers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workers.go b/workers.go index f13cfa3..2634746 100644 --- a/workers.go +++ b/workers.go @@ -147,6 +147,7 @@ func (wm *WorkerManager) Stop() chan bool { func (wm *WorkerManager) startBatch(batch []*Message) { inLock(&wm.stopLock, func() { + wm.currentBatch = newTaskBatch() wm.batchOrder = make([]TaskId, 0) for _, message := range batch { topicPartition := TopicAndPartition{message.Topic, message.Partition}