Skip to content

Commit

Permalink
Lock the rows when taking the advisory lock
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Sep 4, 2024
1 parent 851b629 commit ee42d61
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 24 deletions.
47 changes: 41 additions & 6 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
METRICS = [
FindJobSecondsTotal = Prometheus::Client::Counter.new(
:que_find_job_seconds_total,
docstring: "Seconds spent finding a job",
labels: %i[queue],
),

FindJobHitTotal = Prometheus::Client::Counter.new(
:que_find_job_hit_total,
docstring: "total number of job hit and misses when acquiring a lock",
labels: %i[queue job_hit],
),
].freeze

def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
Expand Down Expand Up @@ -32,17 +46,23 @@ def execute(command, params = [])
end
end

# This method continues looping through the que_jobs table until it either
# locks a job successfully or determines that there are no jobs to process.
def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])
observe(duration_metric: FindJobSecondsTotal, labels: { queue: queue }) do
Que.transaction do
job_to_lock = Que.execute(:find_job_to_lock, [queue, cursor])
return job_to_lock if job_to_lock.empty?

break if result.empty?
cursor = job_to_lock.first["job_id"]
job_locked = pg_try_advisory_lock?(cursor)

cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
observe(count_metric: FindJobHitTotal, labels: { queue: queue, job_hit: job_locked })
return job_to_lock if job_locked
end
end
end
result
end

def pg_try_advisory_lock?(job_id)
Expand All @@ -63,6 +83,21 @@ def unlock_job(job_id)
conn.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end

def observe(count_metric: nil, duration_metric: nil, labels: {})
now = monotonic_now
yield if block_given?
ensure
count_metric&.increment(labels: labels)
duration_metric&.increment(
by: monotonic_now - now,
labels: labels,
)
end

def monotonic_now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
end
1 change: 1 addition & 0 deletions lib/que/middleware/worker_collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def initialize(app, options = {})
register(*WorkerGroup::METRICS)
register(*Worker::METRICS)
register(*Locker::METRICS)
register(*Adapters::ActiveRecordWithLock::METRICS)
end

def call(env)
Expand Down
1 change: 1 addition & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ module Que
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
FOR UPDATE SKIP LOCKED
LIMIT 1
},
}
Expand Down
18 changes: 0 additions & 18 deletions spec/integration/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,6 @@

# rubocop:disable RSpec/DescribeClass
RSpec.describe "multiple workers" do
def with_workers(num, stop_timeout: 5, secondary_queues: [], &block)
Que::WorkerGroup.start(
num,
wake_interval: 0.01,
secondary_queues: secondary_queues,
).tap(&block).stop(stop_timeout)
end

# Wait for a maximum of [timeout] seconds for all jobs to be worked
def wait_for_jobs_to_be_worked(timeout: 10)
start = Time.now
loop do
break if QueJob.count == 0 || Time.now - start > timeout

sleep 0.1
end
end

context "with one worker and many jobs" do
it "works each job exactly once" do
10.times.each { |i| FakeJob.enqueue(i) }
Expand Down
42 changes: 42 additions & 0 deletions spec/lib/que/adapters/active_record_with_lock_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

require "spec_helper"

RSpec.describe Que::Adapters::ActiveRecordWithLock, :active_record_with_lock do
subject(:adapter) do
described_class.new(job_connection_pool: JobRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool)
end

before do
described_class::FindJobHitTotal.values.each { |labels, _| labels.clear }
end

context "with enqueued jobs" do
before do
10.times do
FakeJob.enqueue(1)
end
end

it "sets correct metric values" do
expect(QueJob.count).to eq(10)
with_workers(5) { wait_for_jobs_to_be_worked }
expect(QueJob.count).to eq(0)
expect(described_class::FindJobHitTotal.values[{ :queue => "default", :job_hit => "true" }]).to eq(10.0)
end
end

describe ".lock_job_with_lock_database" do
subject(:lock_job) { adapter.lock_job_with_lock_database("default", 0) }

context "with no jobs enqueued" do
it "exists the loop and sets correct metric values" do
expect(QueJob.count).to eq(0)
locked_job = lock_job
expect(locked_job).to eq([])
expect(described_class::FindJobHitTotal.values[{ :queue => "default", :job_hit => "true" }]).to eq(0.0)
end
end
end
end
31 changes: 31 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ def establish_database_connection
Que.logger = Logger.new("/dev/null")

RSpec.configure do |config|
# Run only specific adapter files based on the adapter class
spec_dir = "./spec/lib"
# Construct the path for the adapter spec file
adapter_spec_class_path = File.join(spec_dir, "#{Que.adapter.class.to_s.underscore}_spec.rb")

# Exclude patterns for tests in the que/adapters directory
config.exclude_pattern = "**/que/adapters/*.rb"

# Require the adapter spec file if it exists
if File.exist?(adapter_spec_class_path)
require adapter_spec_class_path
end

config.before do
QueJob.delete_all
FakeJob.log = []
Expand All @@ -57,3 +70,21 @@ def establish_database_connection
Prometheus::Client.registry.instance_eval { @metrics.clear }
end
end

def with_workers(num, stop_timeout: 5, secondary_queues: [], &block)
Que::WorkerGroup.start(
num,
wake_interval: 0.01,
secondary_queues: secondary_queues,
).tap(&block).stop(stop_timeout)
end

# Wait for a maximum of [timeout] seconds for all jobs to be worked
def wait_for_jobs_to_be_worked(timeout: 10)
start = Time.now
loop do
break if QueJob.count == 0 || Time.now - start > timeout

sleep 0.1
end
end

0 comments on commit ee42d61

Please sign in to comment.