Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

207 forking in v3 #216

Closed
wants to merge 9 commits into from
23 changes: 23 additions & 0 deletions lib/queue_classic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,29 @@ def self.measure(data)
$stdout.puts("measure#qc.#{data}")
end
end

# before_fork hook (adapted from Unicorn's implementation)
def self.before_fork(*args, &block)
if block_given?
DEFAULTS[:before_fork] = block
else
DEFAULTS[:before_fork].call(*args)
end
end

# after_fork hook (adapted from Unicorn's implementation)
def self.after_fork(*args, &block)
if block_given?
DEFAULTS[:after_fork] = block
else
DEFAULTS[:after_fork].call(*args)
end
end

DEFAULTS = {
:after_fork => lambda {|worker, cpid| log(:at => "after_fork", :cpid => cpid) },
:before_fork => lambda {|worker| log(:at => "before_fork") }
}
end

require_relative "queue_classic/queue"
Expand Down
34 changes: 26 additions & 8 deletions lib/queue_classic/conn_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ def initialize(c=nil)
def execute(stmt, *params)
@mutex.synchronize do
QC.log(:at => "exec_sql", :sql => stmt.inspect)
params = nil if params.empty?
begin
params = nil if params.empty?
r = @connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
query_db(stmt, params)
rescue PGError => e
QC.log(:error => e.inspect)
@connection.reset
raise
conn_error = !!/PG::UnableToSend/.match(e.inspect)
if conn_error
@connection = establish_new
begin
query_db(stmt, params)
rescue PGError => e
execute_error(e)
end
else
execute_error(e)
end
end
end
end
Expand Down Expand Up @@ -102,5 +107,18 @@ def db_url
@db_url = URI.parse(url)
end

protected
def query_db(stmt, params)
r = @connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
end

def execute_error(e)
QC.log(:error => e.inspect)
@connection.reset
raise
end
end
end
3 changes: 3 additions & 0 deletions lib/queue_classic/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ def stop
# Calls Worker#work but after the current process is forked.
# The parent process will wait on the child process to exit.
def fork_and_work
QC.before_fork(self)
cpid = fork {setup_child; work}
log(:at => :fork, :pid => cpid)
Process.wait(cpid)
QC.after_fork(self, cpid)
cpid
end

# Blocks on locking a job, and once a job is locked,
Expand Down
24 changes: 24 additions & 0 deletions test/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,28 @@ def test_unlock_jobs_of_dead_workers
assert_equal(1, res.count)
end

def test_forked_worker
# create hooks for logging
QC.before_fork do |worker|
QC.log(:testing_before_fork => "true")
end
QC.after_fork do |worker, cpid|
QC.log(:testing_after_fork => cpid)
end

#run a simple forked job
QC.enqueue("TestObject.no_args")
QC.enqueue("TestObject.no_args")
forking_worker = TestWorker.new(:fork_worker => true)
cpid = nil
output = capture_debug_output do
cpid = forking_worker.fork_and_work
end
forking_worker.fork_and_work
assert_equal(0, forking_worker.failed_count)
expected_output = /lib=queue-classic testing_before_fork=true/
assert_match(expected_output, output, "=== debug output ===\n #{output}")
expected_output = /lib=queue-classic testing_after_fork=#{cpid}/
assert_match(expected_output, output, "=== debug output ===\n #{output}")
end
end