Skip to content

Commit

Permalink
Merge pull request #3006 from alphagov/dont-put-entire-document-into-…
Browse files Browse the repository at this point in the history
…sidekiq-payload

Don't put a large list of the entire documents into Sidekiq arguments
  • Loading branch information
davidgisbey authored Oct 8, 2024
2 parents 657365d + 6f78795 commit 6331a88
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 182 deletions.
38 changes: 25 additions & 13 deletions lib/govuk_index/popularity_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,53 @@ class PopularityWorker < Indexer::BaseWorker
QUEUE_NAME = "bulk".freeze
sidekiq_options queue: QUEUE_NAME

def perform(records, destination_index)
actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index))
def perform(document_ids, index_name)
actions = Index::ElasticsearchProcessor.new(
client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name:),
)
index = IndexFinder.by_name(index_name)
popularities = retrieve_popularities_for(index_name, document_ids)

document_ids.each do |document_id|
document = index.get_document_by_id(document_id)
unless document
logger.warn "Skipping #{document_id} as it is not in the index"
next
end

popularities = retrieve_popularities_for(destination_index, records)
records.each do |record|
actions.save(
process_record(record, popularities),
process_document(document, popularities),
)
end

actions.commit
end

def process_record(record, popularities)
base_path = record["identifier"]["_id"]
title = record["document"]["title"]
def process_document(document, popularities)
base_path = document.fetch("_id")
title = document.dig("_source", "title")
identifier = document.slice("_id", "_version")
OpenStruct.new(
identifier: record["identifier"].merge("version_type" => "external_gte", "_type" => "generic-document"),
document: record["document"].merge!(
identifier: identifier.merge("version_type" => "external_gte", "_type" => "generic-document"),
document: document.fetch("_source").merge(
"popularity" => popularities.dig(base_path, :popularity_score),
"popularity_b" => popularities.dig(base_path, :popularity_rank),
"view_count" => popularities.dig(base_path, :view_count),
"autocomplete" => { # Relies on updated popularity. Title is for new records.
"autocomplete" => { # Relies on updated popularity. Title is for new documents.
"input" => title,
"weight" => popularities.dig(base_path, :popularity_rank),
},
),
)
end

def retrieve_popularities_for(index_name, records)
private

def retrieve_popularities_for(index_name, document_ids)
# popularity should be consistent across clusters, so look up in
# the default
lookup = Indexer::PopularityLookup.new(index_name, SearchConfig.default_instance)
lookup.lookup_popularities(records.map { |r| r["identifier"]["_id"] })
lookup.lookup_popularities(document_ids)
end
end
end
32 changes: 23 additions & 9 deletions lib/govuk_index/supertype_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,40 @@ class SupertypeWorker < Indexer::BaseWorker
QUEUE_NAME = "bulk".freeze
sidekiq_options queue: QUEUE_NAME

def perform(records, destination_index)
actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index))
def perform(document_ids, index_name)
actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name:))

updated_records = records.reject do |record|
record["document"] == update_document_supertypes(record["document"])
index = IndexFinder.by_name(index_name)
documents = document_ids.filter_map do |document_id|
document = index.get_document_by_id(document_id)

unless document
logger.warn "Skipping #{document_id} as it is not in the index"
next
end
{
"identifier" => document.slice("_id", "_type", "_version"),
"document" => document.fetch("_source"),
}
end

updated_documents = documents.reject do |document|
document["document"] == update_document_supertypes(document["document"])
end

updated_records.each do |record|
updated_documents.each do |document|
actions.save(
process_record(record),
process_document(document),
)
end

actions.commit
end

def process_record(record)
def process_document(document)
OpenStruct.new(
identifier: record["identifier"].merge("version_type" => "external_gte"),
document: update_document_supertypes(record["document"]),
identifier: document["identifier"].merge("version_type" => "external_gte"),
document: update_document_supertypes(document["document"]),
)
end

Expand Down
47 changes: 0 additions & 47 deletions lib/govuk_index/sync_updater.rb

This file was deleted.

22 changes: 0 additions & 22 deletions lib/govuk_index/sync_worker.rb

This file was deleted.

13 changes: 4 additions & 9 deletions lib/govuk_index/updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def initialize(source_index:, destination_index:)

def run(async: true)
Clusters.active.each do |cluster|
scroll_enumerator(cluster:).each_slice(PROCESSOR_BATCH_SIZE) do |documents|
scroll_enumerator(cluster:).each_slice(PROCESSOR_BATCH_SIZE) do |document_id|
if async
worker.perform_async(documents, @destination_index)
worker.perform_async(document_id, @destination_index)
else
worker.new.perform(documents, @destination_index)
worker.new.perform(document_id, @destination_index)
end
end
end
Expand All @@ -43,12 +43,7 @@ def scroll_enumerator(cluster:)
index_names: @source_index,
search_body:,
batch_size: SCROLL_BATCH_SIZE,
) do |record|
{
"identifier" => record.slice("_id", "_type", "_version"),
"document" => record.fetch("_source"),
}
end
) { |document| document["_id"] }
end
end
end
4 changes: 4 additions & 0 deletions lib/indexer/workers/base_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,9 @@ def indexes(index_name)
search_server.index(index_name)
end
end

def logger
Logging.logger[self]
end
end
end
2 changes: 0 additions & 2 deletions lib/rummager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@
require "govuk_index/publishing_event_worker"
require "govuk_index/supertype_updater"
require "govuk_index/supertype_worker"
require "govuk_index/sync_updater"
require "govuk_index/sync_worker"
require "govuk_message_queue_consumer"

require "evaluate/ndcg"
Expand Down
13 changes: 0 additions & 13 deletions lib/tasks/indices.rake
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,6 @@ namespace :search do
end
end

desc "Sync unmigrated data into govuk
While we are migrating data to govuk, it is important that govuk has
all the data from specified index in order for soring to be calculated
correctly
"
task :sync_govuk do
raise("Can not migrate multiple indices") if index_names.count > 1
raise("Can not migrate for govuk index") if index_names.include?("govuk")

GovukIndex::SyncUpdater.update(source_index: index_names.first)
end

desc "Update popularity data in indices.
Update all data in the index inplace (without locks) with the new popularity
Expand Down
37 changes: 0 additions & 37 deletions spec/integration/govuk_index/sync_data_spec.rb

This file was deleted.

63 changes: 47 additions & 16 deletions spec/unit/govuk_index/popularity_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,76 @@

RSpec.describe GovukIndex::PopularityWorker do
subject(:worker) { described_class.new }
let(:index) { instance_double("index") }

before do
allow(GovukDocumentTypes).to receive(:supertypes)
.with(document_type: "testgroup")
.and_return("supertype1" => "type1", "supertype2" => "type2")
@processor = instance_double("processor", save: nil, commit: nil)
allow(Index::ElasticsearchProcessor).to receive(:new).and_return(@processor)
allow(IndexFinder).to receive(:by_name).and_return(index)
end

it "saves all records" do
stub_popularity_data
records = [
{ "identifier" => { "_id" => "record_1" }, "document" => {} },
{ "identifier" => { "_id" => "record_2" }, "document" => {} },
it "saves the documents with the popularity fields values" do
stub_popularity_data(
{
"document_1" => { popularity_score: 0.7, popularity_rank: 0.5 },
"document_2" => { popularity_score: 0.6, popularity_rank: 0.5 },
},
)
documents = [
{ "_id" => "document_1", "_version" => 1, "_source" => { "title" => "test_doc1" } },
{ "_id" => "document_2", "_version" => 1, "_source" => { "title" => "test_doc2" } },
]
worker.perform(records, "govuk_test")
stub_document_lookups(documents)

document_ids = documents.map { |document| document["_id"] }
worker.perform(document_ids, "govuk_test")

expect(@processor).to have_received(:save).twice
expect(@processor).to have_received(:save).with(
having_attributes(
identifier: { "_id" => "document_1", "_version" => 1, "version_type" => "external_gte", "_type" => "generic-document" },
document: hash_including({ "popularity" => 0.7, "popularity_b" => 0.5, "title" => "test_doc1" }),
),
).once
expect(@processor).to have_received(:save).with(
having_attributes(
identifier: { "_id" => "document_2", "_version" => 1, "version_type" => "external_gte", "_type" => "generic-document" },
document: hash_including({ "popularity" => 0.6, "popularity_b" => 0.5, "title" => "test_doc2" }),
),
).once
expect(@processor).to have_received(:commit)
end

it "updates popularity field" do
stub_popularity_data("record_1" => { popularity_score: 0.7, popularity_rank: 0.5 })

@record = { "identifier" => { "_id" => "record_1" },
"document" => { "title" => "test_doc" } }
it "writes to the logger and continues to the next document if a document is not found" do
stub_popularity_data({ "document_2" => { popularity_score: 0.6, popularity_rank: 0.5 } })
document = { "_id" => "document_2", "_source" => { "title" => "test_doc2" } }
allow(index).to receive(:get_document_by_id).with("document_1").and_return(nil)
allow(index).to receive(:get_document_by_id).with("document_2").and_return(document)
logger = double(warn: true)
allow(worker).to receive(:logger).and_return(logger)

worker.perform([@record], "govuk_test")
document_ids = %w[document_1 document_2]
worker.perform(document_ids, "govuk_test")

expect(@processor).to have_received(:save).with(
having_attributes(
identifier: { "_id" => "record_1", "version_type" => "external_gte", "_type" => "generic-document" },
document: hash_including({ "popularity" => 0.7, "popularity_b" => 0.5, "title" => "test_doc" }),
identifier: { "_id" => "document_2", "version_type" => "external_gte", "_type" => "generic-document" },
document: hash_including({ "popularity" => 0.6, "popularity_b" => 0.5, "title" => "test_doc2" }),
),
)
).once
expect(logger).to have_received(:warn).with("Skipping document_1 as it is not in the index")
end

def stub_popularity_data(data = Hash.new({ popularity_score: 0.5, popularity_rank: 0.5 }))
popularity_lookup = instance_double("popularity_lookup", lookup_popularities: data)
allow(Indexer::PopularityLookup).to receive(:new).and_return(popularity_lookup)
end

def stub_document_lookups(documents)
allow(index).to receive(:get_document_by_id) do |document_id|
documents.select { |document| document["_id"] == document_id }.first
end
end
end
Loading

0 comments on commit 6331a88

Please sign in to comment.