From b99266832b93be25e4b46c91441f309cb5333ef1 Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Wed, 30 Apr 2014 15:48:53 -0400 Subject: [PATCH 1/9] new test passes, but need to reconnect only after fork closes connection --- lib/queue_classic/conn_adapter.rb | 5 +++++ lib/queue_classic/worker.rb | 14 ++++++++++++-- test/worker_test.rb | 6 ++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/lib/queue_classic/conn_adapter.rb b/lib/queue_classic/conn_adapter.rb index 06fadc6f..145c1f56 100644 --- a/lib/queue_classic/conn_adapter.rb +++ b/lib/queue_classic/conn_adapter.rb @@ -11,7 +11,12 @@ def initialize(c=nil) @mutex = Mutex.new end + def reconnect_if_closed + @connection = establish_new + end + def execute(stmt, *params) + reconnect_if_closed @mutex.synchronize do QC.log(:at => "exec_sql", :sql => stmt.inspect) begin diff --git a/lib/queue_classic/worker.rb b/lib/queue_classic/worker.rb index 54990855..c6910956 100644 --- a/lib/queue_classic/worker.rb +++ b/lib/queue_classic/worker.rb @@ -41,8 +41,9 @@ def initialize(args={}) def start unlock_jobs_of_dead_workers() while @running - @fork_worker ? fork_and_work : work + result = @fork_worker ? fork_and_work : work end + result end # Signals the worker to stop taking new work. @@ -59,9 +60,18 @@ def stop # Calls Worker#work but after the current process is forked. # The parent process will wait on the child process to exit. def fork_and_work - cpid = fork {setup_child; work} + read, write = IO.pipe + cpid = Process.fork do + setup_child + result = work + read.close + write.puts result + end log(:at => :fork, :pid => cpid) + write.close Process.wait(cpid) + @running = false + read.read.strip end # Blocks on locking a job, and once a job is locked, diff --git a/test/worker_test.rb b/test/worker_test.rb index ea7a27f2..a5e78154 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -194,4 +194,10 @@ def test_unlock_jobs_of_dead_workers assert_equal(1, res.count) end + def test_forked_worker + QC.enqueue("TestObject.one_arg", "13") + worker = TestWorker.new(:fork_worker => true) + r = worker.start + assert_equal("13", r) + end end From 5f1b02f4161a05f21120f540bf3c2e163efd6a9d Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Wed, 30 Apr 2014 16:04:38 -0400 Subject: [PATCH 2/9] before_fork and after_fork callbacks --- lib/queue_classic/worker.rb | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/queue_classic/worker.rb b/lib/queue_classic/worker.rb index c6910956..4abad0c4 100644 --- a/lib/queue_classic/worker.rb +++ b/lib/queue_classic/worker.rb @@ -61,19 +61,30 @@ def stop # The parent process will wait on the child process to exit. def fork_and_work read, write = IO.pipe + before_fork cpid = Process.fork do setup_child - result = work read.close - write.puts result + write.puts work end log(:at => :fork, :pid => cpid) write.close Process.wait(cpid) @running = false + after_fork(cpid) read.read.strip end + # Override if you want to run code before a forked worker + def before_fork + log(:at => "before_fork") + end + + # Override if you want to run code after a forked process finishes, but before the worker moves on to the next job + def after_fork(cpid) + log(:at => "after_fork", :pid => cpid) + end + # Blocks on locking a job, and once a job is locked, # it will process the job. def work From 73a5b3225a7cc85dd7b0bc9e2e158db94d097f1d Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Fri, 2 May 2014 11:35:26 -0400 Subject: [PATCH 3/9] all tests pass, but the fix isn't ideal --- lib/queue_classic/conn_adapter.rb | 39 ++++++++++++++++++++----------- lib/queue_classic/worker.rb | 9 +------ test/worker_test.rb | 4 ++-- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/lib/queue_classic/conn_adapter.rb b/lib/queue_classic/conn_adapter.rb index 145c1f56..46e79b38 100644 --- a/lib/queue_classic/conn_adapter.rb +++ b/lib/queue_classic/conn_adapter.rb @@ -11,24 +11,24 @@ def initialize(c=nil) @mutex = Mutex.new end - def reconnect_if_closed - @connection = establish_new - end - def execute(stmt, *params) - reconnect_if_closed @mutex.synchronize do QC.log(:at => "exec_sql", :sql => stmt.inspect) + params = nil if params.empty? begin - params = nil if params.empty? - r = @connection.exec(stmt, params) - result = [] - r.each {|t| result << t} - result.length > 1 ? result : result.pop + query_db(stmt, params) rescue PGError => e - QC.log(:error => e.inspect) - @connection.reset - raise + conn_error = !!/PG::UnableToSend/.match(e.inspect) + if conn_error + @connection = establish_new + begin + query_db(stmt, params) + rescue PGError => e + execute_error(e) + end + else + execute_error(e) + end end end end @@ -107,5 +107,18 @@ def db_url @db_url = URI.parse(url) end + protected + def query_db(stmt, params) + r = @connection.exec(stmt, params) + result = [] + r.each {|t| result << t} + result.length > 1 ? result : result.pop + end + + def execute_error(e) + QC.log(:error => e.inspect) + @connection.reset + raise + end end end diff --git a/lib/queue_classic/worker.rb b/lib/queue_classic/worker.rb index 4abad0c4..d3214d3f 100644 --- a/lib/queue_classic/worker.rb +++ b/lib/queue_classic/worker.rb @@ -60,19 +60,12 @@ def stop # Calls Worker#work but after the current process is forked. # The parent process will wait on the child process to exit. def fork_and_work - read, write = IO.pipe before_fork - cpid = Process.fork do - setup_child - read.close - write.puts work - end + cpid = fork {setup_child; work} log(:at => :fork, :pid => cpid) - write.close Process.wait(cpid) @running = false after_fork(cpid) - read.read.strip end # Override if you want to run code before a forked worker diff --git a/test/worker_test.rb b/test/worker_test.rb index a5e78154..7ad8be9e 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -197,7 +197,7 @@ def test_unlock_jobs_of_dead_workers def test_forked_worker QC.enqueue("TestObject.one_arg", "13") worker = TestWorker.new(:fork_worker => true) - r = worker.start - assert_equal("13", r) + cpid = worker.start + assert_equal(0, worker.failed_count) end end From 1b8306943150e1eab111d74e34e0ae099c779c41 Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Fri, 2 May 2014 14:26:18 -0400 Subject: [PATCH 4/9] hooks more like unicorn, tests to cover them --- lib/queue_classic.rb | 25 +++++++++++++++++++++++++ lib/queue_classic/worker.rb | 18 ++++-------------- test/helper.rb | 1 + test/worker_test.rb | 25 +++++++++++++++++++++---- 4 files changed, 51 insertions(+), 18 deletions(-) diff --git a/lib/queue_classic.rb b/lib/queue_classic.rb index fb5d7d7d..4f5346f8 100644 --- a/lib/queue_classic.rb +++ b/lib/queue_classic.rb @@ -97,6 +97,31 @@ def self.measure(data) $stdout.puts("measure#qc.#{data}") end end + + # before_fork hook (adapted from Unicorn's implementation) + def self.before_fork(*args, &block) + log(:at => "before_fork") + if block_given? + DEFAULTS[:before_fork] = block + else + DEFAULTS[:before_fork].call(*args) + end + end + + # after_fork hook (adapted from Unicorn's implementation) + def self.after_fork(*args, &block) + log(:at => "after_fork") + if block_given? + DEFAULTS[:after_fork] = block + else + DEFAULTS[:after_fork].call(*args) + end + end + + DEFAULTS = { + :after_fork => lambda {|worker, cpid| }, + :before_fork => lambda {|worker| } + } end require_relative "queue_classic/queue" diff --git a/lib/queue_classic/worker.rb b/lib/queue_classic/worker.rb index d3214d3f..c992baa6 100644 --- a/lib/queue_classic/worker.rb +++ b/lib/queue_classic/worker.rb @@ -41,9 +41,8 @@ def initialize(args={}) def start unlock_jobs_of_dead_workers() while @running - result = @fork_worker ? fork_and_work : work + @fork_worker ? fork_and_work : work end - result end # Signals the worker to stop taking new work. @@ -60,22 +59,13 @@ def stop # Calls Worker#work but after the current process is forked. # The parent process will wait on the child process to exit. def fork_and_work - before_fork + QC.before_fork(self) cpid = fork {setup_child; work} log(:at => :fork, :pid => cpid) Process.wait(cpid) @running = false - after_fork(cpid) - end - - # Override if you want to run code before a forked worker - def before_fork - log(:at => "before_fork") - end - - # Override if you want to run code after a forked process finishes, but before the worker moves on to the next job - def after_fork(cpid) - log(:at => "after_fork", :pid => cpid) + QC.after_fork(self, cpid) + cpid end # Blocks on locking a job, and once a job is locked, diff --git a/test/helper.rb b/test/helper.rb index 09cf05c4..8678b25d 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -5,6 +5,7 @@ require "queue_classic" require "stringio" +require 'minitest' require "minitest/autorun" class QCTest < Minitest::Test diff --git a/test/worker_test.rb b/test/worker_test.rb index 7ad8be9e..8c224e78 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -195,9 +195,26 @@ def test_unlock_jobs_of_dead_workers end def test_forked_worker - QC.enqueue("TestObject.one_arg", "13") - worker = TestWorker.new(:fork_worker => true) - cpid = worker.start - assert_equal(0, worker.failed_count) + # create hooks for logging + QC.before_fork do |worker| + QC.log(:before_fork => "true") + end + QC.after_fork do |worker, cpid| + QC.log(:after_fork => cpid) + end + + #run a simple forked job + QC.enqueue("TestObject.no_args") + forking_worker = TestWorker.new(:fork_worker => true) + cpid = nil + output = capture_debug_output do + cpid = forking_worker.fork_and_work + end + + assert_equal(0, forking_worker.failed_count) + expected_output = /lib=queue-classic before_fork=true/ + assert_match(expected_output, output, "=== debug output ===\n #{output}") + expected_output = /lib=queue-classic after_fork=#{cpid}/ + assert_match(expected_output, output, "=== debug output ===\n #{output}") end end From b81d1ff9fec3be0acea5224d0841ef5d2942f8cc Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Fri, 2 May 2014 14:27:57 -0400 Subject: [PATCH 5/9] moving log statements to default function --- lib/queue_classic.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/queue_classic.rb b/lib/queue_classic.rb index 4f5346f8..554cc84b 100644 --- a/lib/queue_classic.rb +++ b/lib/queue_classic.rb @@ -100,7 +100,6 @@ def self.measure(data) # before_fork hook (adapted from Unicorn's implementation) def self.before_fork(*args, &block) - log(:at => "before_fork") if block_given? DEFAULTS[:before_fork] = block else @@ -110,7 +109,6 @@ def self.before_fork(*args, &block) # after_fork hook (adapted from Unicorn's implementation) def self.after_fork(*args, &block) - log(:at => "after_fork") if block_given? DEFAULTS[:after_fork] = block else @@ -119,8 +117,8 @@ def self.after_fork(*args, &block) end DEFAULTS = { - :after_fork => lambda {|worker, cpid| }, - :before_fork => lambda {|worker| } + :after_fork => lambda {|worker, cpid| log(:at => "after_fork", :cpid => cpid) }, + :before_fork => lambda {|worker| log(:at => "before_fork") } } end From 9dfb6788d7949cff33d1264eec485cb398950139 Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Fri, 2 May 2014 15:09:06 -0400 Subject: [PATCH 6/9] cleaning up now that I understand how @running was used --- lib/queue_classic/worker.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/queue_classic/worker.rb b/lib/queue_classic/worker.rb index c992baa6..12dcd71f 100644 --- a/lib/queue_classic/worker.rb +++ b/lib/queue_classic/worker.rb @@ -41,7 +41,7 @@ def initialize(args={}) def start unlock_jobs_of_dead_workers() while @running - @fork_worker ? fork_and_work : work + @fork_worker ? fork_and_work : work end end @@ -63,7 +63,6 @@ def fork_and_work cpid = fork {setup_child; work} log(:at => :fork, :pid => cpid) Process.wait(cpid) - @running = false QC.after_fork(self, cpid) cpid end From 582a16d3157c4451e5bd97e1e5d5a90b60004569 Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Fri, 2 May 2014 15:12:02 -0400 Subject: [PATCH 7/9] and this minitest require was optional --- test/helper.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/test/helper.rb b/test/helper.rb index 8678b25d..09cf05c4 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -5,7 +5,6 @@ require "queue_classic" require "stringio" -require 'minitest' require "minitest/autorun" class QCTest < Minitest::Test From cb2c59aa1c0e1fbade20e7f3b244d269e9c55581 Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Fri, 2 May 2014 15:14:35 -0400 Subject: [PATCH 8/9] making test log statements more specific --- test/worker_test.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/worker_test.rb b/test/worker_test.rb index 8c224e78..9587be8e 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -197,10 +197,10 @@ def test_unlock_jobs_of_dead_workers def test_forked_worker # create hooks for logging QC.before_fork do |worker| - QC.log(:before_fork => "true") + QC.log(:testing_before_fork => "true") end QC.after_fork do |worker, cpid| - QC.log(:after_fork => cpid) + QC.log(:testing_after_fork => cpid) end #run a simple forked job @@ -212,9 +212,9 @@ def test_forked_worker end assert_equal(0, forking_worker.failed_count) - expected_output = /lib=queue-classic before_fork=true/ + expected_output = /lib=queue-classic testing_before_fork=true/ assert_match(expected_output, output, "=== debug output ===\n #{output}") - expected_output = /lib=queue-classic after_fork=#{cpid}/ + expected_output = /lib=queue-classic testing_after_fork=#{cpid}/ assert_match(expected_output, output, "=== debug output ===\n #{output}") end end From 2292b71aaaff996d10eafd5198b8b3e3fb234b33 Mon Sep 17 00:00:00 2001 From: Ben Donaldson Date: Fri, 2 May 2014 15:48:44 -0400 Subject: [PATCH 9/9] better test to ensure PGError before teardown --- test/worker_test.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/worker_test.rb b/test/worker_test.rb index 9587be8e..6ec52e79 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -205,12 +205,13 @@ def test_forked_worker #run a simple forked job QC.enqueue("TestObject.no_args") + QC.enqueue("TestObject.no_args") forking_worker = TestWorker.new(:fork_worker => true) cpid = nil output = capture_debug_output do cpid = forking_worker.fork_and_work end - + forking_worker.fork_and_work assert_equal(0, forking_worker.failed_count) expected_output = /lib=queue-classic testing_before_fork=true/ assert_match(expected_output, output, "=== debug output ===\n #{output}")