Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause worker before it reserves a job #235

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions lib/qless/worker/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,29 @@ def register_signal_handlers
def jobs
return Enumerator.new do |enum|
loop do
begin
job = reserver.reserve
rescue Exception => error
# We want workers to durably stay up, so we don't want errors
# during job reserving (e.g. network timeouts, etc) to kill the
# worker.
log(:error,
"Error reserving job: #{error.class}: #{error.message}")
end

# If we ended up getting a job, yield it. Otherwise, we wait
if job.nil?
no_job_available
# So long as we're paused, we should wait
if paused
log(:debug, 'Paused...')
sleep interval
else
self.current_job = job
enum.yield(job)
self.current_job = nil
begin
job = reserver.reserve
rescue Exception => error
# We want workers to durably stay up, so we don't want errors
# during job reserving (e.g. network timeouts, etc) to kill the
# worker.
log(:error,
"Error reserving job: #{error.class}: #{error.message}")
end

# If we ended up getting a job, yield it. Otherwise, we wait
if job.nil?
no_job_available
else
self.current_job = job
enum.yield(job)
self.current_job = nil
end
end

break if @shutdown
Expand Down
6 changes: 0 additions & 6 deletions lib/qless/worker/serial.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ def run
log(:debug, "Starting job #{job.klass_name} (#{job.jid} from #{job.queue_name})")
perform(job)
log(:debug, "Finished job #{job.klass_name} (#{job.jid} from #{job.queue_name})")

# So long as we're paused, we should wait
while paused
log(:debug, 'Paused...')
sleep interval
end
end
end
end
Expand Down
21 changes: 21 additions & 0 deletions spec/integration/workers/serial_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ def self.perform(job)
expect { |b| worker.send(:with_current_job, &b) }.to yield_with_args(nil)
end

it 'can pause a worker and then unpause it' do
job_class = Class.new do
def self.perform(job)
Redis.connect(url: job['redis']).rpush(job['key'], job['word'])
end
end
stub_const('JobClass', job_class)

worker.pause

queue.put('JobClass', { redis: redis.client.id, key: key, word: 'hello' })

run_worker_concurrently_with(worker) do
expect(redis.brpop(key, timeout: 1)).to eq(nil)

worker.unpause

expect(redis.brpop(key, timeout: 1)).to eq([key.to_s, 'hello'])
end
end

context 'when a job times out', :uses_threads do
it 'invokes the given callback when the current job is the one that timed out' do
callback_invoked = false
Expand Down