Skip to content

Commit

Permalink
feat(clean): use postgres advisory locks to ensure only one process c…
Browse files Browse the repository at this point in the history
…an run a clean at a time (#672)
  • Loading branch information
bethesque authored Mar 27, 2024
1 parent 92a97d5 commit 637c25f
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 35 deletions.
33 changes: 24 additions & 9 deletions lib/pact_broker/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
require "pact_broker/api/authorization/resource_access_policy"
require "pact_broker/api/middleware/http_debug_logs"
require "pact_broker/application_context"
require "pact_broker/db/advisory_lock"

module PactBroker

Expand Down Expand Up @@ -101,22 +102,19 @@ def post_configure

def prepare_database
logger.info "Database schema version is #{PactBroker::DB.version(configuration.database_connection)}"
lock = PactBroker::DB::AdvisoryLock.new(configuration.database_connection, :migrate, :pg_advisory_lock)
if configuration.auto_migrate_db
migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files }
if PactBroker::DB.is_current?(configuration.database_connection, migration_options)
logger.info "Skipping database migrations as the latest migration has already been applied"
else
logger.info "Migrating database schema"
PactBroker::DB.run_migrations configuration.database_connection, migration_options
logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}"
lock.with_lock do
ensure_all_database_migrations_are_applied
end
else
logger.info "Skipping database schema migrations as database auto migrate is disabled"
end

if configuration.auto_migrate_db_data
logger.info "Migrating data"
PactBroker::DB.run_data_migrations configuration.database_connection
lock.with_lock do
run_data_migrations
end
else
logger.info "Skipping data migrations"
end
Expand All @@ -125,6 +123,23 @@ def prepare_database
PactBroker::Webhooks::Service.fail_retrying_triggered_webhooks
end

def ensure_all_database_migrations_are_applied
migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files }

if PactBroker::DB.is_current?(configuration.database_connection, migration_options)
logger.info "Skipping database migrations as the latest migration has already been applied"
else
logger.info "Migrating database schema"
PactBroker::DB.run_migrations(configuration.database_connection, migration_options)
logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}"
end
end

def run_data_migrations
logger.info "Migrating data"
PactBroker::DB.run_data_migrations(configuration.database_connection)
end

def load_configuration_from_database
configuration.load_from_database!
end
Expand Down
58 changes: 58 additions & 0 deletions lib/pact_broker/db/advisory_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
require "pact_broker/logging"

# Uses a Postgres advisory lock to ensure that a given block of code can only have ONE
# thread in excution at a time against a given database.
# When the database is not Postgres, the block will yield without any locks, allowing
# this class to be used safely with other database types, but without the locking
# functionality.
#
# This is a wrapper around the actual implementation code in the Sequel extension from https://github.com/yuryroot/sequel-pg_advisory_lock
# which was copied into this codebase and modified for usage in this codebase.
#
# See https://www.postgresql.org/docs/16/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS for docs on lock types
#

module PactBroker
module DB
class AdvisoryLock
include PactBroker::Logging

def initialize(database_connection, name, type = :pg_try_advisory_lock)
@database_connection = database_connection
@name = name
@type = type
@lock_obtained = false
register_advisory_lock if postgres?
end

def with_lock
if postgres?
@database_connection.with_advisory_lock(@name) do
logger.debug("Lock #{@name} obtained")
@lock_obtained = true
yield
end
else
logger.warn("Executing block without lock as this is not a Postgres database")
@lock_obtained = true
yield
end
end

def lock_obtained?
@lock_obtained
end

private

def postgres?
@database_connection.adapter_scheme.to_s =~ /postgres/
end

def register_advisory_lock
@database_connection.extension :pg_advisory_lock
@database_connection.register_advisory_lock(@name, @type)
end
end
end
end
94 changes: 68 additions & 26 deletions lib/pact_broker/tasks/clean_task.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# This task is used to clean up old data in a Pact Broker database
# to stop performance issues from slowing down responses when there is
# too much data.
# See https://docs.pact.io/pact_broker/administration/maintenance

module PactBroker
module DB
class CleanTask < ::Rake::TaskLib
Expand All @@ -7,11 +12,13 @@ class CleanTask < ::Rake::TaskLib
attr_accessor :version_deletion_limit
attr_accessor :logger
attr_accessor :dry_run
attr_accessor :use_lock # allow disabling of postgres lock if it is causing problems

def initialize &block
require "pact_broker/db/clean_incremental"
@version_deletion_limit = 1000
@dry_run = false
@use_lock = true
@keep_version_selectors = PactBroker::DB::CleanIncremental::DEFAULT_KEEP_SELECTORS
rake_task(&block)
end
Expand All @@ -28,42 +35,77 @@ def rake_task &block
namespace :db do
desc "Clean unnecessary pacts and verifications from database"
task :clean do | _t, _args |

instance_eval(&block)

require "pact_broker/db/clean_incremental"
require "pact_broker/error"
require "yaml"
require "benchmark"
with_lock do
perform_clean
end
end
end
end
end

def perform_clean
require "pact_broker/db/clean_incremental"
require "pact_broker/error"
require "yaml"
require "benchmark"

raise PactBroker::Error.new("You must specify the version_deletion_limit") unless version_deletion_limit
raise PactBroker::Error.new("You must specify the version_deletion_limit") unless version_deletion_limit

prefix = dry_run ? "[DRY RUN] " : ""
if keep_version_selectors.nil? || keep_version_selectors.empty?
raise PactBroker::Error.new("You must specify which versions to keep")
else
add_defaults_to_keep_selectors
output "Deleting oldest #{version_deletion_limit} versions, keeping versions that match the configured selectors", keep_version_selectors.collect(&:to_hash)
end

if keep_version_selectors.nil? || keep_version_selectors.empty?
raise PactBroker::Error.new("You must specify which versions to keep")
else
add_defaults_to_keep_selectors
output "#{prefix}Deleting oldest #{version_deletion_limit} versions, keeping versions that match the configured selectors", keep_version_selectors.collect(&:to_hash)
end
start_time = Time.now
results = PactBroker::DB::CleanIncremental.call(database_connection,
keep: keep_version_selectors,
limit: version_deletion_limit,
logger: logger,
dry_run: dry_run
)
end_time = Time.now
elapsed_seconds = (end_time - start_time).to_i
output "Results (#{elapsed_seconds} seconds)", results
end

start_time = Time.now
results = PactBroker::DB::CleanIncremental.call(database_connection,
keep: keep_version_selectors,
limit: version_deletion_limit,
logger: logger,
dry_run: dry_run
)
end_time = Time.now
elapsed_seconds = (end_time - start_time).to_i
output "Results (#{elapsed_seconds} seconds)", results
end
# Use a Postgres advisory lock to ensure that only one clean can run at a time.
# This allows a cron schedule to be used on the Pact Broker Docker image when deployed
# on a multi-instance architecture, without all the instances stepping on each other's toes.
#
# Any tasks that attempt to run while a clean job is running will skip the clean
# and exit with a message and a success code.
#
# To test that the lock works, run:
# script/docker/db-start.sh
# script/docker/db-migrate.sh
# for i in {0..3}; do PACT_BROKER_TEST_DATABASE_URL=postgres://postgres:postgres@localhost/postgres bundle exec rake pact_broker:db:clean &; done;
#
# There will be 3 messages saying "Clean was not performed" and output from one thread showing the clean is being done.
def with_lock
if use_lock
require "pact_broker/db/advisory_lock"

lock = PactBroker::DB::AdvisoryLock.new(database_connection, :clean, :pg_try_advisory_lock)
results = lock.with_lock do
yield
end

if !lock.lock_obtained?
output("Clean was not performed as a clean is already in progress. Exiting.")
end
results
else
yield
end
end

def output string, payload = {}
logger ? logger.info(string, payload) : puts("#{string} #{payload.to_json}")
def output(string, payload = {})
prefix = dry_run ? "[DRY RUN] " : ""
logger ? logger.info("#{prefix}#{string}") : puts("#{prefix}#{string} #{payload.to_json}")
end

def add_defaults_to_keep_selectors
Expand Down
101 changes: 101 additions & 0 deletions lib/sequel/extensions/pg_advisory_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copied with thanks from https://github.com/yuryroot/sequel-pg_advisory_lock/blob/d7509aa/lib/sequel/extensions/pg_advisory_lock.rb
# The reason this is copy/pasted and modified is that I wanted to allow exact duplicate
# locks to be registered because different threads running the same code
# should not cause a Sequel::Error to be raised.
# Also, I wanted it to use Concurrent::Hash for multi-threaded environments.

require "sequel"
require "zlib"
require "concurrent/hash"

module Sequel
module Postgres
module PgAdvisoryLock

SESSION_LEVEL_LOCKS = [
:pg_advisory_lock,
:pg_try_advisory_lock
].freeze

TRANSACTION_LEVEL_LOCKS = [
:pg_advisory_xact_lock,
:pg_try_advisory_xact_lock
].freeze

LOCK_FUNCTIONS = (SESSION_LEVEL_LOCKS + TRANSACTION_LEVEL_LOCKS).freeze

DEFAULT_LOCK_FUNCTION = :pg_advisory_lock
UNLOCK_FUNCTION = :pg_advisory_unlock

class LockAlreadyRegistered < Sequel::Error; end

def registered_advisory_locks
@registered_advisory_locks ||= Concurrent::Hash.new
end

def with_advisory_lock(name, id = nil)
options = registered_advisory_locks.fetch(name.to_sym)

lock_key = options.fetch(:key)
function_params = [lock_key, id].compact

lock_function = options.fetch(:lock_function)
transaction_level_lock = TRANSACTION_LEVEL_LOCKS.include?(lock_function)

if transaction_level_lock
# TODO: It's allowed to specify additional options (in particular, :server)
# while opening database transaction.
# That's why this check must be smarter.
unless in_transaction?
raise Error, "Transaction must be manually opened before using transaction level lock '#{lock_function}'"
end

if get(Sequel.function(lock_function, *function_params))
yield
end
else
synchronize do
if get(Sequel.function(lock_function, *function_params))
begin
result = yield
ensure
get(Sequel.function(UNLOCK_FUNCTION, *function_params))
result
end
end
end
end
end

# Beth: not sure how much extra value this registration provides.
# It turns the name into a number, and makes sure the name/number is unique,
# and that you don't try and use a different lock function with the same name.
def register_advisory_lock(name, lock_function = DEFAULT_LOCK_FUNCTION)
name = name.to_sym

if registered_advisory_locks.key?(name) && registered_advisory_locks[name][:lock_function] != lock_function
raise LockAlreadyRegistered, "Lock with name :#{name} is already registered with a different lock function (#{registered_advisory_locks[name][:lock_function]})"
end

key = advisory_lock_key_for(name)
name_for_key = registered_advisory_locks.keys.find { |n| registered_advisory_locks[n].fetch(:key) == key }
if name_for_key && name_for_key != name
raise Error, "Lock key #{key} is already taken"
end

function = lock_function.to_sym
unless LOCK_FUNCTIONS.include?(function)
raise Error, "Invalid lock function :#{function}"
end

registered_advisory_locks[name] = { key: key, lock_function: function }
end

def advisory_lock_key_for(lock_name)
Zlib.crc32(lock_name.to_s) % 2 ** 31
end
end
end

Database.register_extension(:pg_advisory_lock, Postgres::PgAdvisoryLock)
end
3 changes: 3 additions & 0 deletions script/docker/db-migrate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

PACT_BROKER_TEST_DATABASE_URL=postgres://postgres:postgres@localhost/postgres bundle exec rake pact_broker:db:migrate
Loading

0 comments on commit 637c25f

Please sign in to comment.