Skip to content

Commit

Permalink
feat: add contract_data_updated_at to integrations table to speed up …
Browse files Browse the repository at this point in the history
…dashboard query (#617)

PACT-1070
  • Loading branch information
bethesque authored Jun 16, 2023
1 parent 5615fd5 commit e43c10f
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Sequel.migration do
up do
alter_table(:integrations) do
add_column(:contract_data_updated_at, DateTime)
end
end

down do
alter_table(:integrations) do
drop_column(:contract_data_updated_at)
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
require "pact_broker/db/data_migrations/set_contract_data_updated_at_for_integrations"

Sequel.migration do
up do
PactBroker::DB::DataMigrations::SetContractDataUpdatedAtForIntegrations.call(self)
end

down do

end
end
1 change: 1 addition & 0 deletions lib/pact_broker/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "pact_broker/api/contracts"
require "pact_broker/application_context"
require "pact_broker/feature_toggle"
require "pact_broker/initializers/subscriptions"

module Webmachine
class Request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Populate the newly created contract_data_updated_at date in the integrations table
# using the latest created_at date from the pact_publications or verifications tables.
module PactBroker
module DB
module DataMigrations
class SetContractDataUpdatedAtForIntegrations
def self.call(connection)
join = {
Sequel[:integrations][:consumer_id] => Sequel[:target][:consumer_id],
Sequel[:integrations][:provider_id] => Sequel[:target][:provider_id]
}

max_created_at_for_each_integration = integrations_max_created_at(connection).from_self(alias: :target).select(:created_at).where(join)

connection[:integrations]
.where(contract_data_updated_at: nil)
.update(contract_data_updated_at: max_created_at_for_each_integration)
end

# @return [Sequel::Dataset] the overall max created_at from the union of the pact_publications and verifications tables,
# for each integration keyed by consumer_id/provider_id
def self.integrations_max_created_at(connection)
pact_publication_max_created_at(connection)
.union(verification_max_created_at(connection))
.select_group(:consumer_id, :provider_id)
.select_append{ max(:created_at).as(:created_at) }
end

# @return [Sequel::Dataset] the max created_at from the pact_publications table
# for each integration keyed by consumer_id/provider_id
def self.pact_publication_max_created_at(connection)
connection[:pact_publications]
.select_group(:consumer_id, :provider_id)
.select_append{ max(:created_at).as(:created_at) }
end

# @return [Sequel::Dataset] the max created_at from the verifications table
# for each integration keyed by consumer_id/provider_id
def self.verification_max_created_at(connection)
connection[:verifications]
.select_group(:consumer_id, :provider_id)
.select_append{ max(:created_at).as(:created_at) }
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/pact_broker/db/migrate_data.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def self.call database_connection, _options = {}
DataMigrations::CreateBranches.call(database_connection)
DataMigrations::MigrateIntegrations.call(database_connection)
DataMigrations::MigratePactVersionProviderTagSuccessfulVerifications.call(database_connection)
DataMigrations::SetContractDataUpdatedAtForIntegrations.call(database_connection)
end
end
end
Expand Down
16 changes: 12 additions & 4 deletions lib/pact_broker/events/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ module Events
extend self

def subscribe(*args)
result = nil
TemporaryListeners.subscribe(*args) do
result = yield
if block_given?
result = nil
TemporaryListeners.subscribe(*args) do
result = yield
end
result
else
Wisper.subscribe(*args)
end
result
end

def unsubscribe(*args)
Wisper.unsubscribe(*args)
end
end
end
4 changes: 4 additions & 0 deletions lib/pact_broker/initializers/subscriptions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
require "pact_broker/events/subscriber"
require "pact_broker/integrations/event_listener"

PactBroker::Events.subscribe(PactBroker::Integrations::EventListener.new)
23 changes: 23 additions & 0 deletions lib/pact_broker/integrations/event_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
require "pact_broker/services"

# Listens for events that happen in the Pact Broker that are relevant to the Integrations objects.

module PactBroker
module Integrations
class EventListener
include PactBroker::Services

# @param [Hash] params the params from the broadcast event
# @option params [PactBroker::Domain::Pact] :pact the newly published pact
def contract_published(params)
integration_service.handle_contract_data_published(params.fetch(:pact).consumer, params.fetch(:pact).provider)
end

# @param [Hash] params the params from the broadcast event
# @option params [PactBroker::Domain::Verification] :verification the newly published verification
def provider_verification_published(params)
integration_service.handle_contract_data_published(params.fetch(:verification).consumer, params.fetch(:verification).provider)
end
end
end
end
5 changes: 4 additions & 1 deletion lib/pact_broker/integrations/integration.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require "pact_broker/dataset"
require "pact_broker/verifications/pseudo_branch_status"
require "pact_broker/domain/verification"
require "pact_broker/webhooks/latest_triggered_webhook"
Expand All @@ -6,7 +7,7 @@

module PactBroker
module Integrations
class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id))
class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :consumer_id, :provider_id, :contract_data_updated_at))
set_primary_key :id
plugin :insert_ignore, identifying_columns: [:consumer_id, :provider_id]
associate(:many_to_one, :consumer, :class => "PactBroker::Domain::Pacticipant", :key => :consumer_id, :primary_key => :id)
Expand Down Expand Up @@ -75,6 +76,8 @@ class Integration < Sequel::Model(Sequel::Model.db[:integrations].select(:id, :c
end)

dataset_module do
include PactBroker::Dataset

def including_pacticipant_id(pacticipant_id)
where(consumer_id: pacticipant_id).or(provider_id: pacticipant_id)
end
Expand Down
9 changes: 9 additions & 0 deletions lib/pact_broker/integrations/repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ def create_for_pact(consumer_id, provider_id)
def delete(consumer_id, provider_id)
Integration.where(consumer_id: consumer_id, provider_id: provider_id).delete
end

# Sets the contract_data_updated_at for the integration(s) as specified by the consumer and provider
# @param [PactBroker::Domain::Pacticipant, nil] consumer the consumer for the integration, or nil if for a provider-only event (eg. Pactflow provider contract published)
# @param [PactBroker::Domain::Pacticipant] provider the provider for the integration
def set_contract_data_updated_at(consumer, provider)
Integration
.where({ consumer_id: consumer&.id, provider_id: provider.id }.compact )
.update(contract_data_updated_at: Sequel.datetime_class.now)
end
end
end
end
7 changes: 7 additions & 0 deletions lib/pact_broker/integrations/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ def self.find_all
.sort { | a, b| Integration.compare_by_last_action_date(a, b) }
end

# Callback to invoke when a consumer contract, verification result (or provider contract in Pactflow) is published
# @param [PactBroker::Domain::Pacticipant] consumer or nil
# @param [PactBroker::Domain::Pacticipant] provider
def self.handle_contract_data_published(consumer, provider)
integration_repository.set_contract_data_updated_at(consumer, provider)
end

def self.delete(consumer_name, provider_name)
consumer = pacticipant_service.find_pacticipant_by_name!(consumer_name)
provider = pacticipant_service.find_pacticipant_by_name!(provider_name)
Expand Down
4 changes: 4 additions & 0 deletions lib/pact_broker/test/test_data_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ def and_return instance_variable_name
instance_variable_get("@#{instance_variable_name}")
end

def clear_now
@now = nil
end

def set_now date
@now = date.to_date
self
Expand Down
18 changes: 0 additions & 18 deletions spec/features/publish_verification_results_and_version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,4 @@
expect(last_response.status).to be 200
expect(JSON.parse(subject.body)).to include JSON.parse(verification_content)
end

context "with a webhook configured", job: true do
before do
td.create_webhook(
method: "POST",
url: "http://example.org",
events: [{ name: PactBroker::Webhooks::WebhookEvent::VERIFICATION_PUBLISHED }]
)
end
let!(:request) do
stub_request(:post, "http://example.org").to_return(:status => 200)
end

it "executes the webhook" do
subject
expect(request).to have_been_made
end
end
end
6 changes: 6 additions & 0 deletions spec/integration/endpoints/publish_contracts_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
let(:rack_headers) { { "CONTENT_TYPE" => "application/json", "HTTP_ACCEPT" => "application/hal+json" } }
let(:encoded_contract) { Base64.strict_encode64(contract) }
let(:path) { "/contracts/publish" }
let(:contract) { td.fixed_json_content("Foo", "Bar", "1") }

subject { post(path, request_body_hash.to_json, rack_headers) }

Expand All @@ -30,4 +31,9 @@
its(:status) { is_expected.to eq 400 }
its(:body) { is_expected.to include("non UTF-8 character") }
end

it "sets the contract_data_updated_at on the integration" do
subject
expect(PactBroker::Integrations::Integration.last.contract_data_updated_at).to_not be nil
end
end
52 changes: 52 additions & 0 deletions spec/integration/endpoints/publish_verification_results_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require "pact_broker/domain/verification"
require "timecop"

describe "Publishing a pact verification" do
let(:path) { "/pacts/provider/Provider/consumer/Consumer/pact-version/#{pact.pact_version_sha}/verification-results" }
let(:verification_content) { load_fixture("verification.json") }
let(:parsed_response_body) { JSON.parse(subject.body) }
let(:pact) { td.pact }
let(:rack_env) do
{
"CONTENT_TYPE" => "application/json",
"HTTP_ACCEPT" => "application/hal+json",
"pactbroker.database_connector" => lambda { |&block| block.call }
}
end

subject { post(path, verification_content, rack_env) }

before do
Timecop.freeze(Date.today - 2) do
td.create_provider("Provider")
.create_consumer("Consumer")
.create_consumer_version("1.0.0")
.create_pact
.create_consumer_version("1.2.3")
.create_pact
.revise_pact
end
end

it "updates the contract_data_updated_at on the integration" do
expect { subject }.to change { PactBroker::Integrations::Integration.last.contract_data_updated_at }
end

context "with a webhook configured", job: true do
before do
td.create_webhook(
method: "POST",
url: "http://example.org",
events: [{ name: PactBroker::Webhooks::WebhookEvent::VERIFICATION_PUBLISHED }]
)
end
let!(:request) do
stub_request(:post, "http://example.org").to_return(:status => 200)
end

it "executes the webhook" do
subject
expect(request).to have_been_made
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
require "pact_broker/db/data_migrations/set_contract_data_updated_at_for_integrations"
require "timecop"
require "tzinfo"

module PactBroker
module DB
module DataMigrations
describe SetContractDataUpdatedAtForIntegrations do
before do
td.clear_now # use timecop instead of the TestDataBuilder @now

Timecop.freeze(day_1) do
td.publish_pact(consumer_name: "Foo", provider_name: "Bar", consumer_version_number: "1")
end

Timecop.freeze(day_2) do
td.publish_pact(consumer_name: "Foo", provider_name: "Bar", consumer_version_number: "2")
end

Timecop.freeze(day_3) do
td.create_verification(provider_version: "2")
end

Timecop.freeze(day_4) do
td.publish_pact(consumer_name: "Cat", provider_name: "Dog", consumer_version_number: "2")
end

db[:integrations].update(contract_data_updated_at: nil)
end

let(:day_1) { td.in_utc{ DateTime.new(2023, 6, 11) } }
let(:day_2) { td.in_utc{ DateTime.new(2023, 6, 12) } }
let(:day_3) { td.in_utc{ DateTime.new(2023, 6, 13) } }
let(:day_4) { td.in_utc{ DateTime.new(2023, 6, 14) } }

let(:db) { PactBroker::Domain::Version.db }

subject { SetContractDataUpdatedAtForIntegrations.call(db) }

it "sets the contract_data_updated_at to the latest of the pact publication and verification publication dates for that integration" do
subject
integrations = db[:integrations].order(:id)
expect(integrations.first[:contract_data_updated_at]).to be_date_time(day_3)
expect(integrations.last[:contract_data_updated_at]).to be_date_time(day_4)
end
end
end
end
end
Loading

0 comments on commit e43c10f

Please sign in to comment.