diff --git a/server/lib/reverse_etl/extractors/incremental_delta.rb b/server/lib/reverse_etl/extractors/incremental_delta.rb index 991e8a69..5a2ebf7c 100644 --- a/server/lib/reverse_etl/extractors/incremental_delta.rb +++ b/server/lib/reverse_etl/extractors/incremental_delta.rb @@ -40,6 +40,8 @@ def process_record(message, sync_run, model) fingerprint = generate_fingerprint(record.data) primary_key = record.data.with_indifferent_access[model.primary_key] + return if primary_key.blank? + sync_record = find_or_initialize_sync_record(sync_run, primary_key) update_or_create_sync_record(sync_record, record, sync_run, fingerprint) rescue StandardError => e diff --git a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb index d314d39d..d1b68da3 100644 --- a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb +++ b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb @@ -126,4 +126,18 @@ # TODO: test for partial recovery via currrent offset end + + describe "#process_record" do + let(:sync_run) do + create(:sync_run, sync:, workspace: sync.workspace, source:, destination:, model: sync.model, status: "started") + end + let(:model) { create(:model) } + + context "when the primary key is blank" do + it "does not create or update sync record" do + message = double("Message", record: double("Record", data: { "id" => nil })) + expect { subject.send(:process_record, message, sync_run, model) }.not_to(change { SyncRecord.count }) + end + end + end end