Skip to content

Commit

Permalink
Fix PG::ConnectionBad issue (#312)
Browse files Browse the repository at this point in the history
Let's lazy use the ActiveRecord::Base.connection.raw_connection.
  • Loading branch information
ukd1 authored Oct 18, 2019
1 parent 9fe22cc commit 523cd7c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 19 deletions.
10 changes: 1 addition & 9 deletions lib/queue_classic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,7 @@ def self.has_connection?
end

def self.default_conn_adapter
t = Thread.current
return t[:qc_conn_adapter] if t[:qc_conn_adapter]
adapter = if rails_connection_sharing_enabled?
ConnAdapter.new(ActiveRecord::Base.connection.raw_connection)
else
ConnAdapter.new
end

t[:qc_conn_adapter] = adapter
Thread.current[:qc_conn_adapter] ||= ConnAdapter.new(rails_connection_sharing_enabled?)
end

def self.default_conn_adapter=(conn)
Expand Down
27 changes: 17 additions & 10 deletions lib/queue_classic/conn_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,31 @@
module QC
class ConnAdapter

attr_accessor :connection
def initialize(c=nil)
@connection = c.nil? ? establish_new : validate!(c)
def initialize(active_record_connection_share)
@active_record_connection_share = active_record_connection_share
@mutex = Mutex.new
end

def connection
if @active_record_connection_share
ActiveRecord::Base.connection.raw_connection
else
@_connection ||= establish_new
end
end

def execute(stmt, *params)
@mutex.synchronize do
QC.log(:at => "exec_sql", :sql => stmt.inspect)
begin
params = nil if params.empty?
r = @connection.exec(stmt, params)
r = connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
rescue PG::Error => e
QC.log(:error => e.inspect)
@connection.reset
connection.reset
raise
end
end
Expand All @@ -32,18 +39,18 @@ def execute(stmt, *params)
def wait(time, *channels)
@mutex.synchronize do
listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'}
@connection.exec(listen_cmds.join(';'))
connection.exec(listen_cmds.join(';'))
wait_for_notify(time)
unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'}
@connection.exec(unlisten_cmds.join(';'))
connection.exec(unlisten_cmds.join(';'))
drain_notify
end
end

def disconnect
@mutex.synchronize do
begin
@connection.close
connection.close
rescue => e
QC.log(:at => 'disconnect', :error => e.message)
end
Expand All @@ -61,12 +68,12 @@ def server_version

def wait_for_notify(t)
Array.new.tap do |msgs|
@connection.wait_for_notify(t) {|event, pid, msg| msgs << msg}
connection.wait_for_notify(t) {|event, pid, msg| msgs << msg}
end
end

def drain_notify
until @connection.notifies.nil?
until connection.notifies.nil?
QC.log(:at => "drain_notifications")
end
end
Expand Down

0 comments on commit 523cd7c

Please sign in to comment.