From 05f00e9a33f63d5de6857527993effbb0d9cc51e Mon Sep 17 00:00:00 2001 From: ilyutov Date: Tue, 28 Jul 2015 16:47:03 +0300 Subject: [PATCH 1/4] Attempt to reduces amount of worker goroutines. --- workers.go | 80 ++++++++++++++++++++++++++++++++----------------- workers_test.go | 6 ++-- 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/workers.go b/workers.go index 2634746..2091660 100644 --- a/workers.go +++ b/workers.go @@ -54,9 +54,11 @@ func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAnd availableWorkers := make(chan *Worker, config.NumWorkers) for i := 0; i < config.NumWorkers; i++ { workers[i] = &Worker{ + InputChannel: make(chan *TaskAndStrategy), OutputChannel: make(chan WorkerResult), TaskTimeout: config.WorkerTaskTimeout, } + workers[i].Start() availableWorkers <- workers[i] } @@ -138,6 +140,12 @@ func (wm *WorkerManager) Stop() chan bool { Debug(wm, "Stopped failure counter") finished <- true Debug(wm, "Leaving manager stop") + + Debug(wm, "Stopping workers") + for _, worker := range wm.workers { + worker.Stop() + } + Debug(wm, "Stopped all workers") }) Debugf(wm, "Stopped workerManager") }() @@ -163,7 +171,7 @@ func (wm *WorkerManager) startBatch(batch []*Message) { if wm.shutdownDecision == nil { wm.metrics.activeWorkers().Inc(1) wm.metrics.pendingWMsTasks().Dec(1) - worker.Start(task, wm.config.Strategy) + worker.InputChannel <- &TaskAndStrategy{task, wm.config.Strategy} } else { return } @@ -293,7 +301,9 @@ func (wm *WorkerManager) processBatch() { } else { Debugf(wm, "Retrying worker task %s %dth time", result.Id(), task.Retries) time.Sleep(wm.config.WorkerBackoff) - go task.Callee.Start(task, wm.config.Strategy) + go func() { + task.Callee.InputChannel <- &TaskAndStrategy{task, wm.config.Strategy} + }() } } @@ -354,6 +364,9 @@ func (wm *WorkerManager) UpdateLargestOffset(offset int64) { // Represents a worker that is able to process a single message. type Worker struct { + // Channel to write tasks to. + InputChannel chan *TaskAndStrategy + // Channel to write processing results to. OutputChannel chan WorkerResult @@ -370,39 +383,45 @@ func (w *Worker) String() string { // Starts processing a given task using given strategy with this worker. // Call to this method blocks until the task is done or timed out. -func (w *Worker) Start(task *Task, strategy WorkerStrategy) { - task.Callee = w +func (w *Worker) Start() { go func() { - shouldStop := false - resultChannel := make(chan WorkerResult) - go func() { - result := strategy(w, task.Msg, task.Id()) - for !shouldStop { - timeout := time.NewTimer(5 * time.Second) - select { - case resultChannel <- result: - timeout.Stop() - return - case <-timeout.C: + for taskAndStrategy := range w.InputChannel { + taskAndStrategy.WorkerTask.Callee = w + shouldStop := false + resultChannel := make(chan WorkerResult) + go func() { + result := taskAndStrategy.Strategy(w, taskAndStrategy.WorkerTask.Msg, taskAndStrategy.WorkerTask.Id()) + for !shouldStop { + timeout := time.NewTimer(5 * time.Second) + select { + case resultChannel <- result: + timeout.Stop() + return + case <-timeout.C: + } + } + }() + timeout := time.NewTimer(w.TaskTimeout) + select { + case result := <-resultChannel: + { + w.OutputChannel <- result + } + case <-timeout.C: + { + shouldStop = true + w.OutputChannel <- &TimedOutResult{taskAndStrategy.WorkerTask.Id()} } } - }() - timeout := time.NewTimer(w.TaskTimeout) - select { - case result := <-resultChannel: - { - w.OutputChannel <- result - } - case <-timeout.C: - { - shouldStop = true - w.OutputChannel <- &TimedOutResult{task.Id()} - } + timeout.Stop() } - timeout.Stop() }() } +func (w *Worker) Stop() { + close(w.InputChannel) +} + // Defines what to do with a single Kafka message. Returns a WorkerResult to distinguish successful and unsuccessful processings. type WorkerStrategy func(*Worker, *Message, TaskId) WorkerResult @@ -622,3 +641,8 @@ func (b *taskBatch) numOutstanding() int { func (b *taskBatch) done() bool { return b.numOutstanding() == 0 } + +type TaskAndStrategy struct { + WorkerTask *Task + Strategy WorkerStrategy +} diff --git a/workers_test.go b/workers_test.go index 94cd102..bdf8b42 100644 --- a/workers_test.go +++ b/workers_test.go @@ -75,7 +75,7 @@ func TestWorker(t *testing.T) { OutputChannel: outChannel, TaskTimeout: taskTimeout, } - worker.Start(task, goodStrategy) + worker.InputChannel <- &TaskAndStrategy{task, goodStrategy} result := <-outChannel if !result.Success() { @@ -87,7 +87,7 @@ func TestWorker(t *testing.T) { OutputChannel: outChannel, TaskTimeout: taskTimeout, } - worker2.Start(task, failStrategy) + worker2.InputChannel <- &TaskAndStrategy{task, failStrategy} result = <-outChannel if result.Success() { t.Error("Worker result with fail strategy should be unsuccessful") @@ -98,7 +98,7 @@ func TestWorker(t *testing.T) { OutputChannel: outChannel, TaskTimeout: taskTimeout, } - worker3.Start(task, slowStrategy) + worker3.InputChannel <- &TaskAndStrategy{task, slowStrategy} result = <-outChannel if _, ok := result.(*TimedOutResult); !ok { t.Error("Worker with slow strategy should time out") From 7990bb3f54f4bed8c5034db0fe7840d54b750da3 Mon Sep 17 00:00:00 2001 From: ilyutov Date: Wed, 29 Jul 2015 14:26:18 +0300 Subject: [PATCH 2/4] Attempt to reduces amount of worker goroutines. --- workers.go | 46 ++++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/workers.go b/workers.go index 2091660..c9f3714 100644 --- a/workers.go +++ b/workers.go @@ -56,6 +56,8 @@ func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAnd workers[i] = &Worker{ InputChannel: make(chan *TaskAndStrategy), OutputChannel: make(chan WorkerResult), + HandlerInputChannel: make(chan *TaskAndStrategy), + HandlerOutputChannel: make(chan WorkerResult), TaskTimeout: config.WorkerTaskTimeout, } workers[i].Start() @@ -370,6 +372,12 @@ type Worker struct { // Channel to write processing results to. OutputChannel chan WorkerResult + // Intermediate channel for pushing result to strategy handler + HandlerInputChannel chan *TaskAndStrategy + + // Intermediate channel for pushing result from strategy handler + HandlerOutputChannel chan WorkerResult + // Timeout for a single worker task. TaskTimeout time.Duration @@ -384,32 +392,37 @@ func (w *Worker) String() string { // Starts processing a given task using given strategy with this worker. // Call to this method blocks until the task is done or timed out. func (w *Worker) Start() { + handlerInterrupted := false + go func() { + for taskAndStrategy := range w.HandlerInputChannel { + result := taskAndStrategy.Strategy(w, taskAndStrategy.WorkerTask.Msg, taskAndStrategy.WorkerTask.Id()) + Loop: + for !handlerInterrupted { + timeout := time.NewTimer(5 * time.Second) + select { + case w.HandlerOutputChannel <- result: + timeout.Stop() + break Loop + case <-timeout.C: + } + } + handlerInterrupted = false + } + }() + go func() { for taskAndStrategy := range w.InputChannel { taskAndStrategy.WorkerTask.Callee = w - shouldStop := false - resultChannel := make(chan WorkerResult) - go func() { - result := taskAndStrategy.Strategy(w, taskAndStrategy.WorkerTask.Msg, taskAndStrategy.WorkerTask.Id()) - for !shouldStop { - timeout := time.NewTimer(5 * time.Second) - select { - case resultChannel <- result: - timeout.Stop() - return - case <-timeout.C: - } - } - }() + w.HandlerInputChannel <- taskAndStrategy timeout := time.NewTimer(w.TaskTimeout) select { - case result := <-resultChannel: + case result := <-w.HandlerOutputChannel: { w.OutputChannel <- result } case <-timeout.C: { - shouldStop = true + handlerInterrupted = true w.OutputChannel <- &TimedOutResult{taskAndStrategy.WorkerTask.Id()} } } @@ -420,6 +433,7 @@ func (w *Worker) Start() { func (w *Worker) Stop() { close(w.InputChannel) + close(w.HandlerInputChannel) } // Defines what to do with a single Kafka message. Returns a WorkerResult to distinguish successful and unsuccessful processings. From 8fe7f9a4918ca9e40ff651e834a58f226f8d9f25 Mon Sep 17 00:00:00 2001 From: serejja Date: Thu, 17 Sep 2015 12:35:25 +0300 Subject: [PATCH 3/4] Fix broker worker test --- workers_test.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/workers_test.go b/workers_test.go index bdf8b42..c000efa 100644 --- a/workers_test.go +++ b/workers_test.go @@ -16,6 +16,7 @@ limitations under the License. */ package go_kafka_client import ( + "fmt" "testing" "time" ) @@ -72,9 +73,13 @@ func TestWorker(t *testing.T) { //test good case worker := &Worker{ - OutputChannel: outChannel, - TaskTimeout: taskTimeout, + InputChannel: make(chan *TaskAndStrategy), + OutputChannel: outChannel, + HandlerInputChannel: make(chan *TaskAndStrategy), + HandlerOutputChannel: make(chan WorkerResult), + TaskTimeout: taskTimeout, } + worker.Start() worker.InputChannel <- &TaskAndStrategy{task, goodStrategy} result := <-outChannel @@ -84,9 +89,13 @@ func TestWorker(t *testing.T) { //test fail case worker2 := &Worker{ - OutputChannel: outChannel, - TaskTimeout: taskTimeout, + InputChannel: make(chan *TaskAndStrategy), + OutputChannel: outChannel, + HandlerInputChannel: make(chan *TaskAndStrategy), + HandlerOutputChannel: make(chan WorkerResult), + TaskTimeout: taskTimeout, } + worker2.Start() worker2.InputChannel <- &TaskAndStrategy{task, failStrategy} result = <-outChannel if result.Success() { @@ -95,9 +104,13 @@ func TestWorker(t *testing.T) { //test timeout worker3 := &Worker{ - OutputChannel: outChannel, - TaskTimeout: taskTimeout, + InputChannel: make(chan *TaskAndStrategy), + OutputChannel: outChannel, + HandlerInputChannel: make(chan *TaskAndStrategy), + HandlerOutputChannel: make(chan WorkerResult), + TaskTimeout: taskTimeout, } + worker3.Start() worker3.InputChannel <- &TaskAndStrategy{task, slowStrategy} result = <-outChannel if _, ok := result.(*TimedOutResult); !ok { From 7a4ef6ef2bc6bb8f689516ae331cd83c9f049588 Mon Sep 17 00:00:00 2001 From: serejja Date: Thu, 17 Sep 2015 12:41:47 +0300 Subject: [PATCH 4/4] fix broken workers test --- workers_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/workers_test.go b/workers_test.go index c000efa..3efc887 100644 --- a/workers_test.go +++ b/workers_test.go @@ -16,7 +16,6 @@ limitations under the License. */ package go_kafka_client import ( - "fmt" "testing" "time" )