From a73aa07817f924a3e3b1e3b033c6c9ce7ad962dc Mon Sep 17 00:00:00 2001 From: Yves Senn Date: Fri, 6 Mar 2015 15:07:13 +0100 Subject: [PATCH] add `Worker#work_off` and `rake qc:work_off`. This provides a way to work off all the jobs in the queue and exit afterwards. Closes #191. --- README.md | 3 +++ lib/queue_classic/tasks.rb | 41 ++++++++++++++++++++++--------------- lib/queue_classic/worker.rb | 28 ++++++++++++++++++++----- test/helper.rb | 1 + test/worker_test.rb | 16 +++++++++++++++ 5 files changed, 68 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 3417bcd8..3fa85528 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,9 @@ $ QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work ``` In this scenario, on each iteration of the worker's loop, it will look for jobs in the first queue prior to looking at the second queue. This means that the first queue must be empty before the worker will look at the second queue. +`rake qc:work` spawns a long running worker process waiting for jobs. You can +use `rake qc:work_off` to process all the jobs currently in the queue and exit. + #### Custom Worker This example is probably not production ready; however, it serves as an example of how to leverage the code in the Worker class to fit your non-default requirements. diff --git a/lib/queue_classic/tasks.rb b/lib/queue_classic/tasks.rb index fa21bb15..1ca78289 100644 --- a/lib/queue_classic/tasks.rb +++ b/lib/queue_classic/tasks.rb @@ -8,23 +8,12 @@ namespace :qc do desc "Start a new worker for the (default or $QUEUE / $QUEUES) queue" task :work => :environment do - @worker = QC.default_worker_class.new - - trap('INT') do - $stderr.puts("Received INT. Shutting down.") - if !@worker.running - $stderr.puts("Worker has stopped running. Exit.") - exit(1) - end - @worker.stop - end - - trap('TERM') do - $stderr.puts("Received Term. Shutting down.") - @worker.stop - end + build_worker.start + end - @worker.start + desc "Work off jobs in the (default or $QUEUE / $QUEUES) queue" + task :work_off => :environment do + build_worker.work_off end desc "Returns the number of jobs in the (default or $QUEUE / $QUEUES) queue" @@ -46,4 +35,24 @@ task :update => :environment do QC::Setup.update end + + def build_worker + @worker = QC.default_worker_class.new + + trap('INT') do + $stderr.puts("Received INT. Shutting down.") + if !@worker.running + $stderr.puts("Worker has stopped running. Exit.") + exit(1) + end + @worker.stop + end + + trap('TERM') do + $stderr.puts("Received Term. Shutting down.") + @worker.stop + end + + @worker + end end diff --git a/lib/queue_classic/worker.rb b/lib/queue_classic/worker.rb index 8aeca314..a09a611c 100644 --- a/lib/queue_classic/worker.rb +++ b/lib/queue_classic/worker.rb @@ -58,6 +58,18 @@ def stop @running = false end + # Processes jobs until no jobs are left. + def work_off + while result = lock_job_no_wait + queue, job = result + if queue && job + QC.log_yield(:at => "work_off", :job => job[:id]) do + process(queue, job) + end + end + end + end + # Calls Worker#work but after the current process is forked. # The parent process will wait on the child process to exit. def fork_and_work @@ -88,15 +100,21 @@ def lock_job log(:at => "lock_job") job = nil while @running - @queues.each do |queue| - if job = queue.lock - return [queue, job] - end - end + result = lock_job_no_wait + return *result if result @conn_adapter.wait(@wait_interval, *@queues.map {|q| q.name}) end end + def lock_job_no_wait + @queues.each do |queue| + if job = queue.lock + return [queue, job] + end + end + nil + end + # A job is processed by evaluating the target code. # if the job is evaluated with no exceptions # then it is deleted from the queue. diff --git a/test/helper.rb b/test/helper.rb index d03d8595..a956551e 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -8,6 +8,7 @@ require "queue_classic" require "stringio" +require 'timeout' require "minitest/autorun" class QCTest < Minitest::Test diff --git a/test/worker_test.rb b/test/worker_test.rb index 65df8b8e..75a92513 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -176,6 +176,22 @@ def test_work_with_more_complex_construct assert_equal(0, worker.failed_count) end + def test_work_off_until_no_jobs_are_left + QC.enqueue("TestObject.no_args") + QC.enqueue("TestObject.no_args") + QC.enqueue("TestObject.no_args") + + assert_equal 3, QC.count + worker = TestWorker.new + Timeout::timeout(2) { worker.work_off } + assert_equal 0, QC.count + end + + def test_work_off_does_not_wait_for_job + worker = TestWorker.new + Timeout::timeout(1) { worker.work_off } + end + def test_init_worker_with_arg with_database 'postgres:///invalid' do conn = PG::Connection.connect(dbname: 'queue_classic_test')