From 92a97d5b7aa04b51747049c39869cf39b04c3d14 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Thu, 21 Mar 2024 11:47:57 +1100 Subject: [PATCH] refactor: create/update integrations in batch at end of transaction (#670) * refactor: create/update integrations in batch at end of transaction * docs: add comments --- lib/pact_broker/contracts/service.rb | 8 +++- lib/pact_broker/integrations/integration.rb | 9 ++++- lib/pact_broker/integrations/repository.rb | 14 +++++++ lib/pact_broker/integrations/service.rb | 2 + lib/pact_broker/pacts/repository.rb | 4 +- lib/pact_broker/test/test_data_builder.rb | 11 +++-- .../integrations/repository_spec.rb | 40 +++++++++++++++++++ spec/lib/pact_broker/pacts/repository_spec.rb | 6 --- 8 files changed, 80 insertions(+), 14 deletions(-) diff --git a/lib/pact_broker/contracts/service.rb b/lib/pact_broker/contracts/service.rb index 7776bcecb..698430a65 100644 --- a/lib/pact_broker/contracts/service.rb +++ b/lib/pact_broker/contracts/service.rb @@ -35,7 +35,7 @@ def publish(parsed_contracts, base_url: ) version, version_notices = create_version(parsed_contracts) tags = create_tags(parsed_contracts, version) pacts, pact_notices = create_pacts(parsed_contracts, base_url) - update_integrations(pacts) + create_or_update_integrations(pacts) notices = version_notices + pact_notices ContractsPublicationResults.from_hash( pacticipant: version.pacticipant, @@ -304,7 +304,11 @@ def url_for_triggered_webhook(triggered_webhook, base_url) PactBroker::Api::PactBrokerUrls.triggered_webhook_logs_url(triggered_webhook, base_url) end - def update_integrations(pacts) + # Creating/updating the integrations all at once at the end of the transaction instead + # of one by one, as each pact is created, reduces the amount of time that + # a lock is held on the integrations table, therefore reducing contention + # and potential for deadlocks when there are many pacts being published at once. + def create_or_update_integrations(pacts) integration_service.handle_bulk_contract_data_published(pacts) end diff --git a/lib/pact_broker/integrations/integration.rb b/lib/pact_broker/integrations/integration.rb index 78c6750ab..860d7b9ce 100644 --- a/lib/pact_broker/integrations/integration.rb +++ b/lib/pact_broker/integrations/integration.rb @@ -7,7 +7,14 @@ module PactBroker module Integrations - class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id, :contract_data_updated_at)) + # The columns are explicitly specified for the Integration object so that the consumer_name and provider_name columns aren't included + # in the model. + # Those columns exist in the integrations table because the integrations table used to be an integrations view based on the + # pact_publications table, and those columns existed in the view. + # When the view was migrated to be a table (in db/migrations/20211102_create_table_temp_integrations.rb and the following migrations) + # the columns had to be maintained for backwards compatiblity. + # They are not used by the current code, however. + class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id, :created_at, :contract_data_updated_at)) set_primary_key :id plugin :insert_ignore, identifying_columns: [:consumer_id, :provider_id] associate(:many_to_one, :consumer, :class => "PactBroker::Domain::Pacticipant", :key => :consumer_id, :primary_key => :id) diff --git a/lib/pact_broker/integrations/repository.rb b/lib/pact_broker/integrations/repository.rb index 73f6912b4..7568eed66 100644 --- a/lib/pact_broker/integrations/repository.rb +++ b/lib/pact_broker/integrations/repository.rb @@ -28,6 +28,20 @@ def create_for_pact(consumer_id, provider_id) nil end + # Ensure an Integration exists for each consumer/provider pair. + # Using SELECT ... INSERT IGNORE rather than just INSERT IGNORE so that we do not + # need to lock the table at all when the integrations already exist, which will + # be the most common use case. New integrations get created incredibly rarely. + # The INSERT IGNORE is used rather than just INSERT to handle race conditions + # when requests come in parallel. + # @param [Array] where each object has a consumer and a provider + def create_for_pacts(objects_with_consumer_and_provider) + published_integrations = objects_with_consumer_and_provider.collect{ |i| { consumer_id: i.consumer.id, provider_id: i.provider.id } } + existing_integrations = Sequel::Model.db[:integrations].select(:consumer_id, :provider_id).where(Sequel.|(*published_integrations) ).all + new_integrations = (published_integrations - existing_integrations).collect{ |i| i.merge(created_at: Sequel.datetime_class.now, contract_data_updated_at: Sequel.datetime_class.now) } + Integration.dataset.insert_ignore.multi_insert(new_integrations) + end + def delete(consumer_id, provider_id) Integration.where(consumer_id: consumer_id, provider_id: provider_id).delete end diff --git a/lib/pact_broker/integrations/service.rb b/lib/pact_broker/integrations/service.rb index d291180cf..5f11c92b0 100644 --- a/lib/pact_broker/integrations/service.rb +++ b/lib/pact_broker/integrations/service.rb @@ -21,6 +21,7 @@ def self.find_all(filter_options = {}, pagination_options = {}, eager_load_assoc # @param [PactBroker::Domain::Pacticipant] consumer or nil # @param [PactBroker::Domain::Pacticipant] provider def self.handle_contract_data_published(consumer, provider) + integration_repository.create_for_pact(consumer.id, provider.id) integration_repository.set_contract_data_updated_at(consumer, provider) end @@ -28,6 +29,7 @@ def self.handle_contract_data_published(consumer, provider) # Callback to invoke when a batch of contract data is published (eg. the publish contracts endpoint) # @param [Array] where each object has a consumer and a provider def self.handle_bulk_contract_data_published(objects_with_consumer_and_provider) + integration_repository.create_for_pacts(objects_with_consumer_and_provider) integration_repository.set_contract_data_updated_at_for_multiple_integrations(objects_with_consumer_and_provider) end diff --git a/lib/pact_broker/pacts/repository.rb b/lib/pact_broker/pacts/repository.rb index c5f816b8f..f1e05291d 100644 --- a/lib/pact_broker/pacts/repository.rb +++ b/lib/pact_broker/pacts/repository.rb @@ -33,8 +33,8 @@ def unscoped(scope) scope end - def create params - integration_repository.create_for_pact(params.fetch(:consumer_id), params.fetch(:provider_id)) + # @return [PactBroker::Domain::Pact] + def create(params) pact_version = find_or_create_pact_version( params.fetch(:consumer_id), params.fetch(:provider_id), diff --git a/lib/pact_broker/test/test_data_builder.rb b/lib/pact_broker/test/test_data_builder.rb index 92d6bbcbc..59a2270ea 100644 --- a/lib/pact_broker/test/test_data_builder.rb +++ b/lib/pact_broker/test/test_data_builder.rb @@ -43,7 +43,6 @@ class TestDataBuilder include PactBroker::Services using PactBroker::StringRefinements - attr_reader :pacticipant attr_reader :consumer attr_reader :provider @@ -192,8 +191,11 @@ def create_provider provider_name = "Provider #{model_counter}", params = {} self end + # Create an Integration object for the current consumer and provider + # @return [PactBroker::Test::TestDataBuilder] def create_integration - PactBroker::Integrations::Repository.new.create_for_pact(consumer.id, provider.id) + @integration = PactBroker::Integrations::Repository.new.create_for_pact(consumer.id, provider.id) + set_created_at_if_set(@now, :integrations, { consumer_id: consumer.id, provider_id: provider.id }) self end @@ -280,7 +282,9 @@ def publish_pact(consumer_name:, provider_name:, consumer_version_number: , tags self end - def create_pact params = {} + # Creates a pact (and integration if one does not already exist) from the given params + # @return [PactBroker::Test::TestDataBuilder] + def create_pact(params = {}) params.delete(:comment) json_content = params[:json_content] || default_json_content pact_version_sha = params[:pact_version_sha] || generate_pact_version_sha(json_content) @@ -293,6 +297,7 @@ def create_pact params = {} json_content: prepare_json_content(json_content), version: @consumer_version ) + integration_service.handle_bulk_contract_data_published([@pact]) pact_versions_count_after = PactBroker::Pacts::PactVersion.count set_created_at_if_set(params[:created_at], :pact_publications, id: @pact.id) set_created_at_if_set(params[:created_at], :pact_versions, sha: @pact.pact_version_sha) if pact_versions_count_after > pact_versions_count_before diff --git a/spec/lib/pact_broker/integrations/repository_spec.rb b/spec/lib/pact_broker/integrations/repository_spec.rb index 243be5fbf..dd21a882a 100644 --- a/spec/lib/pact_broker/integrations/repository_spec.rb +++ b/spec/lib/pact_broker/integrations/repository_spec.rb @@ -38,6 +38,46 @@ module Integrations end end + describe "#create_for_pacts" do + before do + Timecop.freeze(Date.today - 5) do + td.create_consumer("A") + .create_provider("B") + .create_integration + .create_pacticipant("C") + .create_pacticipant("D") + end + end + + let(:objects_with_consumer_and_provider) do + [ + double("i1", consumer: td.find_pacticipant("A"), provider: td.find_pacticipant("B")), + double("i2", consumer: td.find_pacticipant("C"), provider: td.find_pacticipant("D")) + ] + end + + subject { Repository.new.create_for_pacts(objects_with_consumer_and_provider) } + + it "inserts any missing integrations" do + now = Time.utc(2024) + Timecop.freeze(now) do + subject + end + + integrations = Integration.eager(:consumer, :provider).order(:id).all + expect(integrations).to contain_exactly( + have_attributes(consumer_name: "A", provider_name: "B"), + have_attributes(consumer_name: "C", provider_name: "D") + ) + expect(integrations.last.created_at).to be_date_time(now) + expect(integrations.last.contract_data_updated_at).to be_date_time(now) + end + + it "does not change the created_at or contract_data_updated_at of the existing integrations" do + expect { subject }.to_not change { Integration.order(:id).select(:created_at, :contract_data_updated_at).first.created_at } + end + end + describe "#set_contract_data_updated_at" do before do # A -> B diff --git a/spec/lib/pact_broker/pacts/repository_spec.rb b/spec/lib/pact_broker/pacts/repository_spec.rb index 8a9fe65a9..68952e257 100644 --- a/spec/lib/pact_broker/pacts/repository_spec.rb +++ b/spec/lib/pact_broker/pacts/repository_spec.rb @@ -73,12 +73,6 @@ module Pacts expect(PactVersion.order(:id).last.messages_count).to eq 0 end - it "creates an integration" do - expect { subject }.to change { - PactBroker::Integrations::Integration.where(consumer_id: consumer.id, provider_id: provider.id).count - }.from(0).to(1) - end - context "when a pact already exists with exactly the same content" do let(:another_version) { Versions::Repository.new.create number: "2.0.0", pacticipant_id: consumer.id }