Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Worker#work_off and rake qc:work_off. #256

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ In this scenario, on each iteration of the worker's loop, it will look for jobs
QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work
```

`rake qc:work` spawns a long running worker process waiting for jobs. You can
use `rake qc:work_off` to process all the jobs currently in the queue and exit.

#### Custom Worker
This example is probably not production ready; however, it serves as an example of how to leverage the code in the Worker class to fit your non-default requirements.

Expand Down
35 changes: 22 additions & 13 deletions lib/queue_classic/tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@
namespace :qc do
desc "Start a new worker for the (default or $QUEUE / $QUEUES) queue"
task :work => :environment do
@worker = QC.default_worker_class.new

trap('INT') do
$stderr.puts("Received INT. Shutting down.")
abort("Worker has stopped running. Exit.") unless @worker.running
@worker.stop
end

trap('TERM') do
$stderr.puts("Received Term. Shutting down.")
@worker.stop
end
build_worker.start
end

@worker.start
desc "Work off jobs in the (default or $QUEUE / $QUEUES) queue"
task :work_off => :environment do
build_worker.work_off
end

desc "Returns the number of jobs in the (default or $QUEUE / $QUEUES) queue"
Expand All @@ -45,4 +37,21 @@
task :update => :environment do
QC::Setup.update
end

def build_worker
@worker = QC.default_worker_class.new

trap('INT') do
$stderr.puts("Received INT. Shutting down.")
abort("Worker has stopped running. Exit.") unless @worker.running
@worker.stop
end

trap('TERM') do
$stderr.puts("Received Term. Shutting down.")
@worker.stop
end

@worker
end
end
28 changes: 23 additions & 5 deletions lib/queue_classic/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ def stop
@running = false
end

# Processes jobs until no jobs are left.
def work_off
while result = lock_job_no_wait
queue, job = result
if queue && job
QC.log_yield(:at => "work_off", :job => job[:id]) do
process(queue, job)
end
end
end
end

# Calls Worker#work but after the current process is forked.
# The parent process will wait on the child process to exit.
def fork_and_work
Expand Down Expand Up @@ -90,15 +102,21 @@ def lock_job
log(:at => "lock_job")
job = nil
while @running
@queues.each do |queue|
if job = queue.lock
return [queue, job]
end
end
result = lock_job_no_wait
return *result if result
@conn_adapter.wait(@wait_interval, *@queues.map {|q| q.name})
end
end

def lock_job_no_wait
@queues.each do |queue|
if job = queue.lock
return [queue, job]
end
end
nil
end

# A job is processed by evaluating the target code.
# if the job is evaluated with no exceptions
# then it is deleted from the queue.
Expand Down
1 change: 1 addition & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

require_relative '../lib/queue_classic'
require "stringio"
require 'timeout'
require "minitest/autorun"

class QCTest < Minitest::Test
Expand Down
16 changes: 16 additions & 0 deletions test/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,22 @@ def test_work_with_more_complex_construct
assert_equal(0, worker.failed_count)
end

def test_work_off_until_no_jobs_are_left
QC.enqueue("TestObject.no_args")
QC.enqueue("TestObject.no_args")
QC.enqueue("TestObject.no_args")

assert_equal 3, QC.count
worker = TestWorker.new
Timeout::timeout(2) { worker.work_off }
assert_equal 0, QC.count
end

def test_work_off_does_not_wait_for_job
worker = TestWorker.new
Timeout::timeout(1) { worker.work_off }
end

def test_init_worker_with_database_url
with_database ENV['DATABASE_URL'] || ENV['QC_DATABASE_URL'] do
worker = QC::Worker.new
Expand Down
Loading