diff --git a/lib/govuk_index/popularity_worker.rb b/lib/govuk_index/popularity_worker.rb index 03634508c..9124118a4 100644 --- a/lib/govuk_index/popularity_worker.rb +++ b/lib/govuk_index/popularity_worker.rb @@ -4,29 +4,39 @@ class PopularityWorker < Indexer::BaseWorker QUEUE_NAME = "bulk".freeze sidekiq_options queue: QUEUE_NAME - def perform(records, destination_index) - actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index)) + def perform(document_ids, index_name) + actions = Index::ElasticsearchProcessor.new( + client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name:), + ) + index = IndexFinder.by_name(index_name) + popularities = retrieve_popularities_for(index_name, document_ids) + + document_ids.each do |document_id| + document = index.get_document_by_id(document_id) + unless document + logger.warn "Skipping #{document_id} as it is not in the index" + next + end - popularities = retrieve_popularities_for(destination_index, records) - records.each do |record| actions.save( - process_record(record, popularities), + process_document(document, popularities), ) end actions.commit end - def process_record(record, popularities) - base_path = record["identifier"]["_id"] - title = record["document"]["title"] + def process_document(document, popularities) + base_path = document.fetch("_id") + title = document.dig("_source", "title") + identifier = document.slice("_id", "_version") OpenStruct.new( - identifier: record["identifier"].merge("version_type" => "external_gte", "_type" => "generic-document"), - document: record["document"].merge!( + identifier: identifier.merge("version_type" => "external_gte", "_type" => "generic-document"), + document: document.fetch("_source").merge( "popularity" => popularities.dig(base_path, :popularity_score), "popularity_b" => popularities.dig(base_path, :popularity_rank), "view_count" => popularities.dig(base_path, :view_count), - "autocomplete" => { # Relies on updated popularity. Title is for new records. + "autocomplete" => { # Relies on updated popularity. Title is for new documents. "input" => title, "weight" => popularities.dig(base_path, :popularity_rank), }, @@ -34,11 +44,13 @@ def process_record(record, popularities) ) end - def retrieve_popularities_for(index_name, records) + private + + def retrieve_popularities_for(index_name, document_ids) # popularity should be consistent across clusters, so look up in # the default lookup = Indexer::PopularityLookup.new(index_name, SearchConfig.default_instance) - lookup.lookup_popularities(records.map { |r| r["identifier"]["_id"] }) + lookup.lookup_popularities(document_ids) end end end diff --git a/lib/govuk_index/supertype_worker.rb b/lib/govuk_index/supertype_worker.rb index f966fdf7c..f55aea7ba 100644 --- a/lib/govuk_index/supertype_worker.rb +++ b/lib/govuk_index/supertype_worker.rb @@ -4,26 +4,40 @@ class SupertypeWorker < Indexer::BaseWorker QUEUE_NAME = "bulk".freeze sidekiq_options queue: QUEUE_NAME - def perform(records, destination_index) - actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index)) + def perform(document_ids, index_name) + actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name:)) - updated_records = records.reject do |record| - record["document"] == update_document_supertypes(record["document"]) + index = IndexFinder.by_name(index_name) + documents = document_ids.filter_map do |document_id| + document = index.get_document_by_id(document_id) + + unless document + logger.warn "Skipping #{document_id} as it is not in the index" + next + end + { + "identifier" => document.slice("_id", "_type", "_version"), + "document" => document.fetch("_source"), + } + end + + updated_documents = documents.reject do |document| + document["document"] == update_document_supertypes(document["document"]) end - updated_records.each do |record| + updated_documents.each do |document| actions.save( - process_record(record), + process_document(document), ) end actions.commit end - def process_record(record) + def process_document(document) OpenStruct.new( - identifier: record["identifier"].merge("version_type" => "external_gte"), - document: update_document_supertypes(record["document"]), + identifier: document["identifier"].merge("version_type" => "external_gte"), + document: update_document_supertypes(document["document"]), ) end diff --git a/lib/govuk_index/sync_updater.rb b/lib/govuk_index/sync_updater.rb deleted file mode 100644 index a57d74794..000000000 --- a/lib/govuk_index/sync_updater.rb +++ /dev/null @@ -1,47 +0,0 @@ -module GovukIndex - class SyncUpdater < Updater - def self.update(source_index:, destination_index: "govuk") - new( - source_index:, - destination_index:, - ).run - end - - def self.update_immediately(format_override:, source_index:, destination_index: "govuk") - new( - source_index:, - destination_index:, - format_override:, - ).run(async: false) - end - - def self.worker - SyncWorker - end - - def initialize(source_index:, destination_index:, format_override: nil) - super( - source_index:, - destination_index:, - ) - @format_override = format_override - end - - private - - def search_body - clause = @format_override ? :must : :must_not - { - query: { - bool: { - clause => { - terms: { - format: Array(@format_override || MigratedFormats.indexable_formats.keys), - }, - }, - }, - }, - } - end - end -end diff --git a/lib/govuk_index/sync_worker.rb b/lib/govuk_index/sync_worker.rb deleted file mode 100644 index 26869bcc2..000000000 --- a/lib/govuk_index/sync_worker.rb +++ /dev/null @@ -1,22 +0,0 @@ -module GovukIndex - class SyncWorker < Indexer::BaseWorker - BULK_INDEX_TIMEOUT = 60 - QUEUE_NAME = "bulk".freeze - sidekiq_options queue: QUEUE_NAME - - def perform(records, destination_index) - actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index)) - - records.each do |record| - actions.save( - OpenStruct.new( - identifier: record["identifier"].merge("version_type" => "external_gte"), - document: record["document"], - ), - ) - end - - actions.commit - end - end -end diff --git a/lib/govuk_index/updater.rb b/lib/govuk_index/updater.rb index 51f50cd55..adeb068c1 100644 --- a/lib/govuk_index/updater.rb +++ b/lib/govuk_index/updater.rb @@ -13,11 +13,11 @@ def initialize(source_index:, destination_index:) def run(async: true) Clusters.active.each do |cluster| - scroll_enumerator(cluster:).each_slice(PROCESSOR_BATCH_SIZE) do |documents| + scroll_enumerator(cluster:).each_slice(PROCESSOR_BATCH_SIZE) do |document_id| if async - worker.perform_async(documents, @destination_index) + worker.perform_async(document_id, @destination_index) else - worker.new.perform(documents, @destination_index) + worker.new.perform(document_id, @destination_index) end end end @@ -43,12 +43,7 @@ def scroll_enumerator(cluster:) index_names: @source_index, search_body:, batch_size: SCROLL_BATCH_SIZE, - ) do |record| - { - "identifier" => record.slice("_id", "_type", "_version"), - "document" => record.fetch("_source"), - } - end + ) { |document| document["_id"] } end end end diff --git a/lib/indexer/workers/base_worker.rb b/lib/indexer/workers/base_worker.rb index 2b9498eb6..444d9963d 100644 --- a/lib/indexer/workers/base_worker.rb +++ b/lib/indexer/workers/base_worker.rb @@ -39,5 +39,9 @@ def indexes(index_name) search_server.index(index_name) end end + + def logger + Logging.logger[self] + end end end diff --git a/lib/rummager.rb b/lib/rummager.rb index 40f6031af..a13bd6550 100644 --- a/lib/rummager.rb +++ b/lib/rummager.rb @@ -108,8 +108,6 @@ require "govuk_index/publishing_event_worker" require "govuk_index/supertype_updater" require "govuk_index/supertype_worker" -require "govuk_index/sync_updater" -require "govuk_index/sync_worker" require "govuk_message_queue_consumer" require "evaluate/ndcg" diff --git a/lib/tasks/indices.rake b/lib/tasks/indices.rake index 5e06115fb..63be64c8d 100644 --- a/lib/tasks/indices.rake +++ b/lib/tasks/indices.rake @@ -68,19 +68,6 @@ namespace :search do end end - desc "Sync unmigrated data into govuk - -While we are migrating data to govuk, it is important that govuk has -all the data from specified index in order for soring to be calculated -correctly -" - task :sync_govuk do - raise("Can not migrate multiple indices") if index_names.count > 1 - raise("Can not migrate for govuk index") if index_names.include?("govuk") - - GovukIndex::SyncUpdater.update(source_index: index_names.first) - end - desc "Update popularity data in indices. Update all data in the index inplace (without locks) with the new popularity diff --git a/spec/integration/govuk_index/sync_data_spec.rb b/spec/integration/govuk_index/sync_data_spec.rb deleted file mode 100644 index 091aa0924..000000000 --- a/spec/integration/govuk_index/sync_data_spec.rb +++ /dev/null @@ -1,37 +0,0 @@ -require "spec_helper" - -RSpec.describe "GovukIndex::SyncDataTest" do - before do - allow(GovukIndex::MigratedFormats).to receive(:indexable_formats).and_return("help_page" => :all) - end - - it "syncs records for non indexable formats" do - commit_document("government_test", { link: "/test", popularity: 0.3, format: "edition" }, id: "/test", type: "edition") - - GovukIndex::SyncUpdater.update(source_index: "government_test", destination_index: "govuk_test") - - expect_document_is_in_rummager({ "link" => "/test" }, type: "edition", index: "govuk_test") - end - - it "syncs will overwrite existing data" do - insert_document("government_test", { link: "/test", popularity: 0.3, format: "edition" }, id: "/test", type: "edition") - commit_index("government_test") - insert_document("govuk_test", { link: "/test", popularity: 0.4, format: "edition" }, id: "/test", type: "edition") - commit_index("govuk_test") - - GovukIndex::SyncUpdater.update(source_index: "government_test", destination_index: "govuk_test") - - expect_document_is_in_rummager({ "link" => "/test", "popularity" => 0.3 }, type: "edition", index: "govuk_test") - end - - it "will not syncs records for indexable formats" do - insert_document("government_test", { link: "/test", popularity: 0.3, format: "help_page" }, id: "/test", type: "edition") - commit_index("government_test") - insert_document("govuk_test", { link: "/test", popularity: 0.4, format: "help_page" }, id: "/test", type: "edition") - commit_index("govuk_test") - - GovukIndex::SyncUpdater.update(source_index: "government_test", destination_index: "govuk_test") - - expect_document_is_in_rummager({ "link" => "/test", "popularity" => 0.4 }, type: "edition", index: "govuk_test") - end -end diff --git a/spec/unit/govuk_index/popularity_worker_spec.rb b/spec/unit/govuk_index/popularity_worker_spec.rb index ba44e2652..e0dc31139 100644 --- a/spec/unit/govuk_index/popularity_worker_spec.rb +++ b/spec/unit/govuk_index/popularity_worker_spec.rb @@ -2,6 +2,7 @@ RSpec.describe GovukIndex::PopularityWorker do subject(:worker) { described_class.new } + let(:index) { instance_double("index") } before do allow(GovukDocumentTypes).to receive(:supertypes) @@ -9,38 +10,68 @@ .and_return("supertype1" => "type1", "supertype2" => "type2") @processor = instance_double("processor", save: nil, commit: nil) allow(Index::ElasticsearchProcessor).to receive(:new).and_return(@processor) + allow(IndexFinder).to receive(:by_name).and_return(index) end - it "saves all records" do - stub_popularity_data - records = [ - { "identifier" => { "_id" => "record_1" }, "document" => {} }, - { "identifier" => { "_id" => "record_2" }, "document" => {} }, + it "saves the documents with the popularity fields values" do + stub_popularity_data( + { + "document_1" => { popularity_score: 0.7, popularity_rank: 0.5 }, + "document_2" => { popularity_score: 0.6, popularity_rank: 0.5 }, + }, + ) + documents = [ + { "_id" => "document_1", "_version" => 1, "_source" => { "title" => "test_doc1" } }, + { "_id" => "document_2", "_version" => 1, "_source" => { "title" => "test_doc2" } }, ] - worker.perform(records, "govuk_test") + stub_document_lookups(documents) + + document_ids = documents.map { |document| document["_id"] } + worker.perform(document_ids, "govuk_test") - expect(@processor).to have_received(:save).twice + expect(@processor).to have_received(:save).with( + having_attributes( + identifier: { "_id" => "document_1", "_version" => 1, "version_type" => "external_gte", "_type" => "generic-document" }, + document: hash_including({ "popularity" => 0.7, "popularity_b" => 0.5, "title" => "test_doc1" }), + ), + ).once + expect(@processor).to have_received(:save).with( + having_attributes( + identifier: { "_id" => "document_2", "_version" => 1, "version_type" => "external_gte", "_type" => "generic-document" }, + document: hash_including({ "popularity" => 0.6, "popularity_b" => 0.5, "title" => "test_doc2" }), + ), + ).once expect(@processor).to have_received(:commit) end - it "updates popularity field" do - stub_popularity_data("record_1" => { popularity_score: 0.7, popularity_rank: 0.5 }) - - @record = { "identifier" => { "_id" => "record_1" }, - "document" => { "title" => "test_doc" } } + it "writes to the logger and continues to the next document if a document is not found" do + stub_popularity_data({ "document_2" => { popularity_score: 0.6, popularity_rank: 0.5 } }) + document = { "_id" => "document_2", "_source" => { "title" => "test_doc2" } } + allow(index).to receive(:get_document_by_id).with("document_1").and_return(nil) + allow(index).to receive(:get_document_by_id).with("document_2").and_return(document) + logger = double(warn: true) + allow(worker).to receive(:logger).and_return(logger) - worker.perform([@record], "govuk_test") + document_ids = %w[document_1 document_2] + worker.perform(document_ids, "govuk_test") expect(@processor).to have_received(:save).with( having_attributes( - identifier: { "_id" => "record_1", "version_type" => "external_gte", "_type" => "generic-document" }, - document: hash_including({ "popularity" => 0.7, "popularity_b" => 0.5, "title" => "test_doc" }), + identifier: { "_id" => "document_2", "version_type" => "external_gte", "_type" => "generic-document" }, + document: hash_including({ "popularity" => 0.6, "popularity_b" => 0.5, "title" => "test_doc2" }), ), - ) + ).once + expect(logger).to have_received(:warn).with("Skipping document_1 as it is not in the index") end def stub_popularity_data(data = Hash.new({ popularity_score: 0.5, popularity_rank: 0.5 })) popularity_lookup = instance_double("popularity_lookup", lookup_popularities: data) allow(Indexer::PopularityLookup).to receive(:new).and_return(popularity_lookup) end + + def stub_document_lookups(documents) + allow(index).to receive(:get_document_by_id) do |document_id| + documents.select { |document| document["_id"] == document_id }.first + end + end end diff --git a/spec/unit/govuk_index/supertype_worker_spec.rb b/spec/unit/govuk_index/supertype_worker_spec.rb index d50927bea..6a74bb71b 100644 --- a/spec/unit/govuk_index/supertype_worker_spec.rb +++ b/spec/unit/govuk_index/supertype_worker_spec.rb @@ -2,6 +2,7 @@ RSpec.describe GovukIndex::SupertypeWorker do subject(:worker) { described_class.new } + let(:index) { instance_double("index") } before do allow(GovukDocumentTypes).to receive(:supertypes) @@ -9,23 +10,27 @@ .and_return("supertype1" => "type1", "supertype2" => "type2") @processor = instance_double("processor", save: nil, commit: nil) allow(Index::ElasticsearchProcessor).to receive(:new).and_return(@processor) + allow(IndexFinder).to receive(:by_name).and_return(index) end - it "saves all records" do - records = [ - { "identifier" => { "_id" => "record_1" }, "document" => { "content_store_document_type" => "testgroup" } }, - { "identifier" => { "_id" => "record_2" }, "document" => { "content_store_document_type" => "testgroup" } }, + it "saves all documents" do + documents = [ + { "_id" => "document_1", "_source" => { "content_store_document_type" => "testgroup" } }, + { "_id" => "document_2", "_source" => { "content_store_document_type" => "testgroup" } }, ] - worker.perform(records, "govuk_test") + stub_document_lookups(documents) + + document_ids = documents.map { |document| document["_id"] } + worker.perform(document_ids, "govuk_test") expect(@processor).to have_received(:save).twice expect(@processor).to have_received(:commit) end it "updates supertype fields" do - record = { "identifier" => { "_id" => "record_1" }, - "document" => { "title" => "test_doc", "content_store_document_type" => "testgroup" } } - worker.perform([record], "govuk_test") + document = { "_id" => "document_1", "_source" => { "title" => "test_doc", "content_store_document_type" => "testgroup" } } + stub_document_lookups([document]) + worker.perform([document["_id"]], "govuk_test") expect(@processor).to have_received(:save).with( having_attributes( @@ -35,13 +40,38 @@ end it "does not save if the supertype fields do not need to be updated" do - record = { "identifier" => { "_id" => "record_1" }, - "document" => { "title" => "test_doc", - "content_store_document_type" => "testgroup", - "supertype1" => "type1", - "supertype2" => "type2" } } - worker.perform([record], "govuk_test") + document = { + "_id" => "document_1", + "_source" => { + "title" => "test_doc", + "content_store_document_type" => "testgroup", + "supertype1" => "type1", + "supertype2" => "type2", + }, + } + stub_document_lookups([document]) + worker.perform([document["_id"]], "govuk_test") expect(@processor).not_to have_received(:save) end + + it "writes to the logger and continues to the next document if a document is not found" do + document = { "_id" => "document_2", "_source" => { "content_store_document_type" => "testgroup" } } + allow(index).to receive(:get_document_by_id).with("document_1").and_return(nil) + allow(index).to receive(:get_document_by_id).with("document_2").and_return(document) + logger = double(warn: true) + allow(worker).to receive(:logger).and_return(logger) + + document_ids = %w[document_1 document_2] + worker.perform(document_ids, "govuk_test") + + expect(@processor).to have_received(:save).once + expect(logger).to have_received(:warn).with("Skipping document_1 as it is not in the index") + end + + def stub_document_lookups(documents) + allow(index).to receive(:get_document_by_id) do |document_id| + documents.select { |document| document["_id"] == document_id }.first + end + end end