diff --git a/workers.go b/workers.go index f13cfa3..2634746 100644 --- a/workers.go +++ b/workers.go @@ -147,6 +147,7 @@ func (wm *WorkerManager) Stop() chan bool { func (wm *WorkerManager) startBatch(batch []*Message) { inLock(&wm.stopLock, func() { + wm.currentBatch = newTaskBatch() wm.batchOrder = make([]TaskId, 0) for _, message := range batch { topicPartition := TopicAndPartition{message.Topic, message.Partition}