Skip to content

Commit

Permalink
refactor: create/update integrations in batch at end of transaction (#…
Browse files Browse the repository at this point in the history
…670)

* refactor: create/update integrations in batch at end of transaction

* docs: add comments
  • Loading branch information
bethesque authored Mar 21, 2024
1 parent 075f5db commit 92a97d5
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 14 deletions.
8 changes: 6 additions & 2 deletions lib/pact_broker/contracts/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def publish(parsed_contracts, base_url: )
version, version_notices = create_version(parsed_contracts)
tags = create_tags(parsed_contracts, version)
pacts, pact_notices = create_pacts(parsed_contracts, base_url)
update_integrations(pacts)
create_or_update_integrations(pacts)
notices = version_notices + pact_notices
ContractsPublicationResults.from_hash(
pacticipant: version.pacticipant,
Expand Down Expand Up @@ -304,7 +304,11 @@ def url_for_triggered_webhook(triggered_webhook, base_url)
PactBroker::Api::PactBrokerUrls.triggered_webhook_logs_url(triggered_webhook, base_url)
end

def update_integrations(pacts)
# Creating/updating the integrations all at once at the end of the transaction instead
# of one by one, as each pact is created, reduces the amount of time that
# a lock is held on the integrations table, therefore reducing contention
# and potential for deadlocks when there are many pacts being published at once.
def create_or_update_integrations(pacts)
integration_service.handle_bulk_contract_data_published(pacts)
end

Expand Down
9 changes: 8 additions & 1 deletion lib/pact_broker/integrations/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@

module PactBroker
module Integrations
class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id, :contract_data_updated_at))
# The columns are explicitly specified for the Integration object so that the consumer_name and provider_name columns aren't included
# in the model.
# Those columns exist in the integrations table because the integrations table used to be an integrations view based on the
# pact_publications table, and those columns existed in the view.
# When the view was migrated to be a table (in db/migrations/20211102_create_table_temp_integrations.rb and the following migrations)
# the columns had to be maintained for backwards compatiblity.
# They are not used by the current code, however.
class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id, :created_at, :contract_data_updated_at))
set_primary_key :id
plugin :insert_ignore, identifying_columns: [:consumer_id, :provider_id]
associate(:many_to_one, :consumer, :class => "PactBroker::Domain::Pacticipant", :key => :consumer_id, :primary_key => :id)
Expand Down
14 changes: 14 additions & 0 deletions lib/pact_broker/integrations/repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ def create_for_pact(consumer_id, provider_id)
nil
end

# Ensure an Integration exists for each consumer/provider pair.
# Using SELECT ... INSERT IGNORE rather than just INSERT IGNORE so that we do not
# need to lock the table at all when the integrations already exist, which will
# be the most common use case. New integrations get created incredibly rarely.
# The INSERT IGNORE is used rather than just INSERT to handle race conditions
# when requests come in parallel.
# @param [Array<Object>] where each object has a consumer and a provider
def create_for_pacts(objects_with_consumer_and_provider)
published_integrations = objects_with_consumer_and_provider.collect{ |i| { consumer_id: i.consumer.id, provider_id: i.provider.id } }
existing_integrations = Sequel::Model.db[:integrations].select(:consumer_id, :provider_id).where(Sequel.|(*published_integrations) ).all
new_integrations = (published_integrations - existing_integrations).collect{ |i| i.merge(created_at: Sequel.datetime_class.now, contract_data_updated_at: Sequel.datetime_class.now) }
Integration.dataset.insert_ignore.multi_insert(new_integrations)
end

def delete(consumer_id, provider_id)
Integration.where(consumer_id: consumer_id, provider_id: provider_id).delete
end
Expand Down
2 changes: 2 additions & 0 deletions lib/pact_broker/integrations/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ def self.find_all(filter_options = {}, pagination_options = {}, eager_load_assoc
# @param [PactBroker::Domain::Pacticipant] consumer or nil
# @param [PactBroker::Domain::Pacticipant] provider
def self.handle_contract_data_published(consumer, provider)
integration_repository.create_for_pact(consumer.id, provider.id)
integration_repository.set_contract_data_updated_at(consumer, provider)
end


# Callback to invoke when a batch of contract data is published (eg. the publish contracts endpoint)
# @param [Array<Object>] where each object has a consumer and a provider
def self.handle_bulk_contract_data_published(objects_with_consumer_and_provider)
integration_repository.create_for_pacts(objects_with_consumer_and_provider)
integration_repository.set_contract_data_updated_at_for_multiple_integrations(objects_with_consumer_and_provider)
end

Expand Down
4 changes: 2 additions & 2 deletions lib/pact_broker/pacts/repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def unscoped(scope)
scope
end

def create params
integration_repository.create_for_pact(params.fetch(:consumer_id), params.fetch(:provider_id))
# @return [PactBroker::Domain::Pact]
def create(params)
pact_version = find_or_create_pact_version(
params.fetch(:consumer_id),
params.fetch(:provider_id),
Expand Down
11 changes: 8 additions & 3 deletions lib/pact_broker/test/test_data_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class TestDataBuilder
include PactBroker::Services
using PactBroker::StringRefinements


attr_reader :pacticipant
attr_reader :consumer
attr_reader :provider
Expand Down Expand Up @@ -192,8 +191,11 @@ def create_provider provider_name = "Provider #{model_counter}", params = {}
self
end

# Create an Integration object for the current consumer and provider
# @return [PactBroker::Test::TestDataBuilder]
def create_integration
PactBroker::Integrations::Repository.new.create_for_pact(consumer.id, provider.id)
@integration = PactBroker::Integrations::Repository.new.create_for_pact(consumer.id, provider.id)
set_created_at_if_set(@now, :integrations, { consumer_id: consumer.id, provider_id: provider.id })
self
end

Expand Down Expand Up @@ -280,7 +282,9 @@ def publish_pact(consumer_name:, provider_name:, consumer_version_number: , tags
self
end

def create_pact params = {}
# Creates a pact (and integration if one does not already exist) from the given params
# @return [PactBroker::Test::TestDataBuilder]
def create_pact(params = {})
params.delete(:comment)
json_content = params[:json_content] || default_json_content
pact_version_sha = params[:pact_version_sha] || generate_pact_version_sha(json_content)
Expand All @@ -293,6 +297,7 @@ def create_pact params = {}
json_content: prepare_json_content(json_content),
version: @consumer_version
)
integration_service.handle_bulk_contract_data_published([@pact])
pact_versions_count_after = PactBroker::Pacts::PactVersion.count
set_created_at_if_set(params[:created_at], :pact_publications, id: @pact.id)
set_created_at_if_set(params[:created_at], :pact_versions, sha: @pact.pact_version_sha) if pact_versions_count_after > pact_versions_count_before
Expand Down
40 changes: 40 additions & 0 deletions spec/lib/pact_broker/integrations/repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,46 @@ module Integrations
end
end

describe "#create_for_pacts" do
before do
Timecop.freeze(Date.today - 5) do
td.create_consumer("A")
.create_provider("B")
.create_integration
.create_pacticipant("C")
.create_pacticipant("D")
end
end

let(:objects_with_consumer_and_provider) do
[
double("i1", consumer: td.find_pacticipant("A"), provider: td.find_pacticipant("B")),
double("i2", consumer: td.find_pacticipant("C"), provider: td.find_pacticipant("D"))
]
end

subject { Repository.new.create_for_pacts(objects_with_consumer_and_provider) }

it "inserts any missing integrations" do
now = Time.utc(2024)
Timecop.freeze(now) do
subject
end

integrations = Integration.eager(:consumer, :provider).order(:id).all
expect(integrations).to contain_exactly(
have_attributes(consumer_name: "A", provider_name: "B"),
have_attributes(consumer_name: "C", provider_name: "D")
)
expect(integrations.last.created_at).to be_date_time(now)
expect(integrations.last.contract_data_updated_at).to be_date_time(now)
end

it "does not change the created_at or contract_data_updated_at of the existing integrations" do
expect { subject }.to_not change { Integration.order(:id).select(:created_at, :contract_data_updated_at).first.created_at }
end
end

describe "#set_contract_data_updated_at" do
before do
# A -> B
Expand Down
6 changes: 0 additions & 6 deletions spec/lib/pact_broker/pacts/repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ module Pacts
expect(PactVersion.order(:id).last.messages_count).to eq 0
end

it "creates an integration" do
expect { subject }.to change {
PactBroker::Integrations::Integration.where(consumer_id: consumer.id, provider_id: provider.id).count
}.from(0).to(1)
end

context "when a pact already exists with exactly the same content" do
let(:another_version) { Versions::Repository.new.create number: "2.0.0", pacticipant_id: consumer.id }

Expand Down

0 comments on commit 92a97d5

Please sign in to comment.