diff --git a/.gitignore b/.gitignore index b8911b38..b460a879 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ # Ignore dotenv file -.env \ No newline at end of file +.env +/.history diff --git a/server/Gemfile b/server/Gemfile index 5b794434..d87ca42e 100644 --- a/server/Gemfile +++ b/server/Gemfile @@ -12,7 +12,7 @@ gem "interactor", "~> 3.0" gem "ruby-odbc", git: "https://github.com/Multiwoven/ruby-odbc.git" -gem "multiwoven-integrations", "~> 0.1.55" +gem "multiwoven-integrations", "~> 0.1.58" gem "temporal-ruby", github: "coinbase/temporal-ruby" diff --git a/server/Gemfile.lock b/server/Gemfile.lock index 20389f2e..79d25423 100644 --- a/server/Gemfile.lock +++ b/server/Gemfile.lock @@ -106,7 +106,7 @@ GEM activerecord (>= 3.2, < 8.0) rake (>= 10.4, < 14.0) ast (2.4.2) - async (2.10.1) + async (2.10.2) console (~> 1.10) fiber-annotation io-event (~> 1.5, >= 1.5.1) @@ -1862,7 +1862,7 @@ GEM msgpack (1.7.2) multi_json (1.15.0) multipart-post (2.4.0) - multiwoven-integrations (0.1.55) + multiwoven-integrations (0.1.58) activesupport async-websocket csv @@ -1925,7 +1925,7 @@ GEM net-smtp premailer (~> 1.7, >= 1.7.9) protocol-hpack (1.4.3) - protocol-http (0.26.2) + protocol-http (0.26.4) protocol-http1 (0.19.0) protocol-http (~> 0.22) protocol-http2 (0.16.0) @@ -2071,7 +2071,7 @@ GEM gli hashie stringio (3.1.0) - stripe (10.14.0) + stripe (11.0.0) thor (1.3.0) timecop (0.9.8) timeout (0.4.1) @@ -2130,7 +2130,7 @@ DEPENDENCIES jwt kaminari liquid - multiwoven-integrations (~> 0.1.55) + multiwoven-integrations (~> 0.1.58) newrelic_rpm parallel pg (~> 1.1) diff --git a/server/app/contracts/sync_contracts.rb b/server/app/contracts/sync_contracts.rb index b0d8ba3f..2c355389 100644 --- a/server/app/contracts/sync_contracts.rb +++ b/server/app/contracts/sync_contracts.rb @@ -24,6 +24,7 @@ class Create < Dry::Validation::Contract required(:sync_interval_unit).filled(:string) required(:sync_mode).filled(:string) required(:stream_name).filled(:string) + optional(:cursor_field).maybe(:string) # update filled with validating array of hashes required(:configuration).filled diff --git a/server/app/controllers/api/v1/models_controller.rb b/server/app/controllers/api/v1/models_controller.rb index aaae944d..11738348 100644 --- a/server/app/controllers/api/v1/models_controller.rb +++ b/server/app/controllers/api/v1/models_controller.rb @@ -72,8 +72,11 @@ def set_model end def validate_query + query = params.dig(:model, :query) + return if query.blank? + query_type = @model.present? ? @model.connector.connector_query_type : @connector.connector_query_type - Utils::QueryValidator.validate_query(query_type, params.dig(:model, :query)) + Utils::QueryValidator.validate_query(query_type, query) rescue StandardError => e render_error( message: "Query validation failed: #{e.message}", diff --git a/server/app/controllers/api/v1/syncs_controller.rb b/server/app/controllers/api/v1/syncs_controller.rb index 9b445d4f..ec177d27 100644 --- a/server/app/controllers/api/v1/syncs_controller.rb +++ b/server/app/controllers/api/v1/syncs_controller.rb @@ -93,6 +93,7 @@ def sync_params :sync_mode, :sync_interval_unit, :stream_name, + :cursor_field, configuration: %i[from to mapping_type @@ -104,7 +105,7 @@ def sync_params if params.to_unsafe_h[:sync][:configuration].is_a?(Hash) strong_params.merge!(configuration: params.to_unsafe_h[:sync][:configuration]) end - + strong_params.delete(:cursor_field) if action_name == "update" strong_params end end diff --git a/server/app/interactors/syncs/create_sync.rb b/server/app/interactors/syncs/create_sync.rb index 0abd29ec..2cc40164 100644 --- a/server/app/interactors/syncs/create_sync.rb +++ b/server/app/interactors/syncs/create_sync.rb @@ -5,6 +5,10 @@ class CreateSync include Interactor def call + source = context.workspace.connectors.find_by(id: context.sync_params[:source_id]) + + default_cursor_field = source.catalog&.default_cursor_field(context.sync_params[:stream_name]) + context.sync_params[:cursor_field] = default_cursor_field if default_cursor_field.present? sync = context .workspace.syncs .create(context.sync_params) diff --git a/server/app/models/catalog.rb b/server/app/models/catalog.rb index 322c7ea9..fac48440 100644 --- a/server/app/models/catalog.rb +++ b/server/app/models/catalog.rb @@ -50,4 +50,9 @@ def stream_to_protocol(stream) request_rate_concurrency: ) end + + def default_cursor_field(stream_name) + current_stream = catalog["streams"].find { |stream| stream["name"] == stream_name } + current_stream["default_cursor_field"] if current_stream && catalog["source_defined_cursor"] + end end diff --git a/server/app/models/connector.rb b/server/app/models/connector.rb index 2be6bd6f..ac1d7212 100644 --- a/server/app/models/connector.rb +++ b/server/app/models/connector.rb @@ -66,7 +66,8 @@ def to_protocol Multiwoven::Integrations::Protocol::Connector.new( name: connector_name, type: connector_type, - connection_specification: configuration + connection_specification: configuration, + query_type: connector_query_type ) end @@ -83,7 +84,6 @@ def connector_query_type connector_type.to_s.camelize, connector_name.to_s.camelize ).new connector_spec = client.connector_spec - connector_spec&.connector_query_type || "raw_sql" end end diff --git a/server/app/models/sync.rb b/server/app/models/sync.rb index 5cf01ee2..34adbcc9 100644 --- a/server/app/models/sync.rb +++ b/server/app/models/sync.rb @@ -13,6 +13,8 @@ # source_catalog_id :integer # schedule_type :string # status :integer +# cursor_field :string +# current_cursor_field :string # created_at :datetime not null # updated_at :datetime not null # @@ -82,7 +84,9 @@ def to_protocol catalog.find_stream_by_name(stream_name) ), sync_mode: Multiwoven::Integrations::Protocol::SyncMode[sync_mode], - destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"] + destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"], + cursor_field:, + current_cursor_field: ) end diff --git a/server/app/serializers/sync_serializer.rb b/server/app/serializers/sync_serializer.rb index de2a92c4..f78b4292 100644 --- a/server/app/serializers/sync_serializer.rb +++ b/server/app/serializers/sync_serializer.rb @@ -3,7 +3,7 @@ class SyncSerializer < ActiveModel::Serializer attributes :id, :source_id, :destination_id, :model_id, :configuration, :schedule_type, :sync_mode, :sync_interval, :sync_interval_unit, - :stream_name, :status, + :stream_name, :status, :cursor_field, :current_cursor_field, :updated_at, :created_at attribute :source do diff --git a/server/db/migrate/20240412183836_add_cursor_fields_to_syncs.rb b/server/db/migrate/20240412183836_add_cursor_fields_to_syncs.rb new file mode 100644 index 00000000..890ab028 --- /dev/null +++ b/server/db/migrate/20240412183836_add_cursor_fields_to_syncs.rb @@ -0,0 +1,6 @@ +class AddCursorFieldsToSyncs < ActiveRecord::Migration[7.1] + def change + add_column :syncs, :cursor_field, :string + add_column :syncs, :current_cursor_field, :string + end +end diff --git a/server/db/schema.rb b/server/db/schema.rb index 4493d9a5..7d4e66cd 100644 --- a/server/db/schema.rb +++ b/server/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_04_08_061904) do +ActiveRecord::Schema[7.1].define(version: 2024_04_12_183836) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -33,7 +33,6 @@ t.datetime "updated_at", null: false t.string "connector_name" t.string "description" - t.integer "query_type", default: 0 end create_table "models", force: :cascade do |t| @@ -109,6 +108,8 @@ t.string "stream_name" t.string "workflow_id" t.datetime "discarded_at" + t.string "cursor_field" + t.string "current_cursor_field" t.index ["discarded_at"], name: "index_syncs_on_discarded_at" end diff --git a/server/lib/reverse_etl/extractors/base.rb b/server/lib/reverse_etl/extractors/base.rb index 4277c1e9..7bf9c2f4 100644 --- a/server/lib/reverse_etl/extractors/base.rb +++ b/server/lib/reverse_etl/extractors/base.rb @@ -4,8 +4,8 @@ module ReverseEtl module Extractors class Base DEFAULT_OFFSET = 0 - DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i - DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i + DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i + DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i THREAD_COUNT = (ENV["SYNC_EXTRACTOR_THREAD_POOL_SIZE"] || "5").to_i def read(_sync_run_id) diff --git a/server/lib/reverse_etl/extractors/incremental_delta.rb b/server/lib/reverse_etl/extractors/incremental_delta.rb index f40897aa..458ace47 100644 --- a/server/lib/reverse_etl/extractors/incremental_delta.rb +++ b/server/lib/reverse_etl/extractors/incremental_delta.rb @@ -17,11 +17,14 @@ def read(sync_run_id, activity) batch_query_params = batch_params(source_client, sync_run) model = sync_run.sync.model - ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, current_offset| + ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, + current_offset, last_cursor_field_value| + total_query_rows += records.count process_records(records, sync_run, model) heartbeat(activity) sync_run.update(current_offset:, total_query_rows:) + sync_run.sync.update(current_cursor_field: last_cursor_field_value) end # change state querying to queued sync_run.queue! diff --git a/server/lib/reverse_etl/utils/batch_query.rb b/server/lib/reverse_etl/utils/batch_query.rb index 80acf9bc..33cf9bc6 100644 --- a/server/lib/reverse_etl/utils/batch_query.rb +++ b/server/lib/reverse_etl/utils/batch_query.rb @@ -6,26 +6,68 @@ class BatchQuery def self.execute_in_batches(params) raise ArgumentError, "Batch size must be greater than 0" if params[:batch_size] <= 0 + initial_sync_config = params[:sync_config] current_offset = params[:offset] - + last_cursor_field_value = params[:sync_config].current_cursor_field loop do # Set the current limit and offset in the sync configuration params[:sync_config].limit = params[:batch_size] params[:sync_config].offset = current_offset + if initial_sync_config.cursor_field + query_with_cursor = CursorQueryBuilder.build_cursor_query(initial_sync_config, last_cursor_field_value) + params[:sync_config] = build_cursor_sync_config(params[:sync_config], query_with_cursor) + end + # Execute the batch query result = params[:client].read(params[:sync_config]) + # Extract the value of the cursor_field column from the last record + last_cursor_field_value = extract_last_cursor_field_value(result, params[:sync_config]) # Increment the offset by the batch size for the next iteration current_offset += params[:batch_size] break if result.empty? - yield result, current_offset if block_given? + yield result, current_offset, last_cursor_field_value if block_given? # Break the loop if the number of records fetched is less than the batch size # break if result.size < params[:batch_size] end end + + def self.extract_last_cursor_field_value(result, sync_config) + return nil unless sync_config.cursor_field && !result.empty? + + last_record = result.last.record.data + last_record[sync_config.cursor_field] + end + + def self.build_cursor_sync_config(sync_config, new_query) + new_model = build_new_model(sync_config.model, new_query) + + modified_sync_config = Multiwoven::Integrations::Protocol::SyncConfig.new( + model: new_model.to_protocol, + source: sync_config.source, + destination: sync_config.destination, + stream: sync_config.stream, + sync_mode: sync_config.sync_mode, + destination_sync_mode: sync_config.destination_sync_mode, + cursor_field: sync_config.cursor_field, + current_cursor_field: sync_config.current_cursor_field + ) + modified_sync_config.offset = 0 + modified_sync_config.limit = sync_config.limit + modified_sync_config + end + + def self.build_new_model(existing_model, new_query) + Model.new( + name: existing_model.name, + query: new_query, + query_type: existing_model.query_type, + primary_key: existing_model.primary_key + ) + end end end end diff --git a/server/lib/reverse_etl/utils/cursor_query_builder.rb b/server/lib/reverse_etl/utils/cursor_query_builder.rb new file mode 100644 index 00000000..cbdf17d6 --- /dev/null +++ b/server/lib/reverse_etl/utils/cursor_query_builder.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module ReverseEtl + module Utils + class CursorQueryBuilder + def self.build_cursor_query(sync_config, current_cursor_field) + existing_query = sync_config.model.query + query_type = sync_config.source.query_type || "raw_sql" + if current_cursor_field + cursor_condition = case query_type.to_sym + when :soql + "#{sync_config.cursor_field} >= #{current_cursor_field}" + when :raw_sql + "#{sync_config.cursor_field} >= '#{current_cursor_field}'" + end + end + if cursor_condition + "#{existing_query} AS subquery " \ + "WHERE #{cursor_condition} " \ + "ORDER BY #{sync_config.cursor_field} ASC" + elsif sync_config.cursor_field + "#{existing_query} AS subquery " \ + "ORDER BY #{sync_config.cursor_field} ASC" + end + end + end + end +end diff --git a/server/spec/interactors/syncs/create_sync_spec.rb b/server/spec/interactors/syncs/create_sync_spec.rb index a4da0714..bb6257b1 100644 --- a/server/spec/interactors/syncs/create_sync_spec.rb +++ b/server/spec/interactors/syncs/create_sync_spec.rb @@ -4,10 +4,13 @@ RSpec.describe Syncs::CreateSync do let(:workspace) { create(:workspace) } - let(:source) { create(:connector, workspace:) } + let(:source) { create(:connector, workspace:, connector_type: "source") } let(:destination) { create(:connector, workspace:) } let(:model) { create(:model, workspace:, connector: source) } - let(:sync) { build(:sync, workspace:, source:, destination:, model:) } + let(:sync) do + build(:sync, workspace:, source:, destination:, model:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end before do create(:catalog, connector: source) @@ -18,24 +21,27 @@ it "creates a sync" do result = described_class.call( workspace:, - sync_params: sync.attributes.except("id", "created_at", "updated_at") + sync_params: sync.attributes.except("id", "created_at", "updated_at").with_indifferent_access ) expect(result.success?).to eq(true) expect(result.sync.persisted?).to eql(true) expect(result.sync.source_id).to eql(source.id) expect(result.sync.destination_id).to eql(destination.id) expect(result.sync.model_id).to eql(model.id) + expect(result.sync.cursor_field).to eql(sync.cursor_field) + expect(result.sync.current_cursor_field).to eql(sync.current_cursor_field) end end context "with invalid params" do let(:sync_params) do - { source_id: nil } + sync.attributes.except("id", "created_at", "destination_id") end it "fails to create sync" do - result = described_class.call(workspace:, sync_params:) + result = described_class.call(workspace:, sync_params: sync_params.with_indifferent_access) expect(result.failure?).to eq(true) + expect(result.sync.persisted?).to eql(false) end end end diff --git a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb index 3136817b..8a581598 100644 --- a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb +++ b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb @@ -46,7 +46,7 @@ before do sync.model.update(primary_key: "id") allow(client).to receive(:read).and_return(records) - allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield(records, 1) + allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield(records, 1, nil) allow(sync_run1.sync.source).to receive_message_chain(:connector_client, :new).and_return(client) allow(activity).to receive(:heartbeat) allow(activity).to receive(:cancel_requested).and_return(false) @@ -83,7 +83,8 @@ emitted_at: DateTime.now.to_i ).to_multiwoven_message - allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([modified_record1, record2], 1) + allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([modified_record1, record2], 1, + "2022-01-01") # Second sync run expect(sync_run2).to have_state(:started) @@ -91,6 +92,7 @@ subject.read(sync_run2.id, activity) sync_run2.reload expect(sync_run2).to have_state(:queued) + expect(sync_run2.sync.current_cursor_field).to eql("2022-01-01") updated_sync_record = sync_run2.sync_records.find_by(primary_key: record1.record.data["id"]) expect(sync_run2.sync_records.count).to eq(1) @@ -98,7 +100,8 @@ expect(updated_sync_record.action).to eq("destination_update") expect(updated_sync_record.record).to eq(modified_record1.record.data) - allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([record2, record3], 1) + allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([record2, record3], 1, + "2022-01-02") # Third sync run with same record expect(sync_run3).to have_state(:started) @@ -107,6 +110,7 @@ sync_run3.reload expect(sync_run3).to have_state(:queued) expect(sync_run3.sync_records.count).to eq(0) + expect(sync_run3.sync.current_cursor_field).to eql("2022-01-02") end end diff --git a/server/spec/lib/reverse_etl/loaders/standard_spec.rb b/server/spec/lib/reverse_etl/loaders/standard_spec.rb index 3eabb334..f73dfb2f 100644 --- a/server/spec/lib/reverse_etl/loaders/standard_spec.rb +++ b/server/spec/lib/reverse_etl/loaders/standard_spec.rb @@ -33,6 +33,18 @@ let!(:sync_record_batch2) { create(:sync_record, sync: sync_batch, sync_run: sync_run_batch, primary_key: "key2") } let!(:sync_record_individual) { create(:sync_record, sync: sync_individual, sync_run: sync_run_individual) } let(:activity) { instance_double("LoaderActivity") } + let(:connector_spec) do + Multiwoven::Integrations::Protocol::ConnectorSpecification.new( + connector_query_type: "raw_sql", + stream_type: "dynamic", + connection_specification: { + :$schema => "http://json-schema.org/draft-07/schema#", + :title => "Snowflake", + :type => "object", + :stream => {} + } + ) + end before do allow(activity).to receive(:heartbeat) @@ -49,6 +61,9 @@ end let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_batch_records method" do allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_batch.to_protocol, transform).and_return(multiwoven_message) @@ -76,6 +91,9 @@ end let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_batch_records method" do allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_batch.to_protocol, transform).and_return(multiwoven_message) @@ -100,6 +118,9 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_individual_records method" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_individual.to_protocol, [transform]).and_return(multiwoven_message) @@ -123,9 +144,12 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } - + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_individual_records method" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).with(sync_individual.to_protocol, [transform]).and_return(multiwoven_message) expect(subject).to receive(:heartbeat).once.with(activity) expect(sync_run_individual).to have_state(:queued) @@ -155,7 +179,9 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } - + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "sync run started to in_progress" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_individual.to_protocol, [transform]).and_return(multiwoven_message) @@ -178,6 +204,9 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { control.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "sync run started to in_progress" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) @@ -207,6 +236,9 @@ end let(:multiwoven_message) { control.to_multiwoven_message } let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_batch_records method" do allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_batch.to_protocol, transform).and_return(multiwoven_message) diff --git a/server/spec/lib/reverse_etl/utils/batch_query_spec.rb b/server/spec/lib/reverse_etl/utils/batch_query_spec.rb index 4ce62f42..8acbfd5f 100644 --- a/server/spec/lib/reverse_etl/utils/batch_query_spec.rb +++ b/server/spec/lib/reverse_etl/utils/batch_query_spec.rb @@ -3,43 +3,173 @@ require "rails_helper" module ReverseEtl - module Utils + module Utils # rubocop:disable Metrics/ModuleLength RSpec.describe BatchQuery do describe ".execute_in_batches" do let(:client) { double("Client") } + context "when neither cursor_field nor current_cursor_field are present" do + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } - let(:destination) { create(:connector, connector_type: "destination") } - let!(:catalog) { create(:catalog, connector: destination) } + let(:sync) { create(:sync, destination:) } + + before do + call_count = 0 + allow(client).to receive(:read) do |_config| + call_count += 1 + call_count < 10 ? Array.new(100, "mock_data") : [] + end + end + + it "executes batches correctly" do + params = { + offset: 0, + limit: 100, + batch_size: 100, + sync_config: sync.to_protocol, + client: + } - let(:sync) { create(:sync, destination:) } + expect(client).to receive(:read).exactly(10).times + + results = [] + BatchQuery.execute_in_batches(params) do |result| + results << result + end + + expect(results.size).to eq(9) + expect(results.first.size).to eq(100) + expect(results.last.size).to eq(100) + end + end + context "when both cursor_field is present" do + let(:existing_query) { "SELECT * FROM table" } + let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") } + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } + let(:model) { create(:model, connector: source, query: existing_query) } + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end + let(:record) do + Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "test1@mail.com", + "first_name" => "John", "Last Name" => "Doe", + "timestamp" => "2022-01-01" }, + emitted_at: DateTime.now.to_i).to_multiwoven_message + end - before do - call_count = 0 - allow(client).to receive(:read) do |_config| - call_count += 1 - call_count < 10 ? Array.new(100, "mock_data") : [] + it "executes batches and call CursorQueryBuilder" do + params = { + offset: 0, + limit: 100, + batch_size: 100, + sync_config: sync.to_protocol, + client: + } + allow(client).to receive(:read).and_return(*Array.new(1, [record]), []) + expect(CursorQueryBuilder).to receive(:build_cursor_query).with(sync.to_protocol, + "2022-01-01") + .and_call_original.twice + results = [] + BatchQuery.execute_in_batches(params) do |result, current_offset, last_cursor_field_value| + expect(result.first).to be_an_instance_of(Multiwoven::Integrations::Protocol::MultiwovenMessage) + expect(current_offset).to eq(100) + expect(last_cursor_field_value).to eq("2022-01-01") + results << result + end end end + end - it "executes batches correctly" do - params = { - offset: 0, - limit: 100, - batch_size: 100, - sync_config: sync.to_protocol, - client: - } + describe ".extract_last_cursor_field_value" do + let(:sync_config) { instance_double(Multiwoven::Integrations::Protocol::SyncConfig, cursor_field: "timestamp") } - expect(client).to receive(:read).exactly(10).times + context "when result is empty" do + it "returns nil" do + result = [] + expect(described_class.extract_last_cursor_field_value(result, sync_config)).to be_nil + end + end - results = [] - BatchQuery.execute_in_batches(params) do |result| - results << result + context "when result is not empty" do + let(:record1) do + Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "test1@mail.com", + "first_name" => "John", "Last Name" => "Doe", + "timestamp" => "2022-01-01" }, + emitted_at: DateTime.now.to_i).to_multiwoven_message end + let(:record2) do + Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "test1@mail.com", + "first_name" => "John", "Last Name" => "Doe", + "timestamp" => "2022-01-02" }, + emitted_at: DateTime.now.to_i).to_multiwoven_message + end + let(:result) { [record1, record2] } + + it "returns the value of the last record's cursor field" do + expect(described_class.extract_last_cursor_field_value(result, sync_config)).to eq("2022-01-02") + end + end + + context "when sync_config has no cursor field" do + let(:sync_config) { instance_double(Multiwoven::Integrations::Protocol::SyncConfig, cursor_field: nil) } + let(:result) do + [instance_double(Multiwoven::Integrations::Protocol::RecordMessage, data: { "timestamp" => "2022-01-01" })] + end + + it "returns nil" do + expect(described_class.extract_last_cursor_field_value(result, sync_config)).to be_nil + end + end + end + + describe ".build_cursor_sync_config" do + let(:existing_query) { "SELECT * FROM table" } + let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") } + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } + let(:model) { create(:model, connector: source, query: existing_query) } + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end + + let(:new_query) { "SELECT * FROM table WHERE timestamp >= '2022-01-01'" } + let(:sync_config) { sync.to_protocol } + + it "builds a new SyncConfig with modified query and other attributes" do + modified_sync_config = described_class.build_cursor_sync_config(sync_config, new_query) + + expect(modified_sync_config).to be_a(Multiwoven::Integrations::Protocol::SyncConfig) + expect(modified_sync_config.model.name).to eq(sync_config.model.name) + expect(modified_sync_config.model.query).to eq("SELECT * FROM table WHERE timestamp >= '2022-01-01'") + expect(modified_sync_config.model.query_type).to eq("raw_sql") + expect(modified_sync_config.model.primary_key).to eq("TestPrimaryKey") + expect(modified_sync_config.source).to eq(source.to_protocol) + expect(modified_sync_config.destination).to eq(destination.to_protocol) + expect(modified_sync_config.stream).to eq(sync_config.stream) + expect(modified_sync_config.sync_mode).to eq(sync_config.sync_mode) + expect(modified_sync_config.destination_sync_mode).to eq(sync_config.destination_sync_mode) + expect(modified_sync_config.cursor_field).to eq(sync_config.cursor_field) + expect(modified_sync_config.current_cursor_field).to eq(sync_config.current_cursor_field) + expect(modified_sync_config.limit).to eq(sync_config.limit) + expect(modified_sync_config.offset).to eq(0) + end + end + + describe ".build_new_model" do + let(:existing_model) { instance_double(Model, name: "ExistingModel", query_type: "raw_sql", primary_key: "id") } + let(:new_query) { "SELECT * FROM table WHERE timestamp >= '2022-01-01'" } + + it "builds a new Model with modified query and other attributes" do + new_model = described_class.build_new_model(existing_model, new_query) - expect(results.size).to eq(9) - expect(results.first.size).to eq(100) - expect(results.last.size).to eq(100) + expect(new_model).to be_a(Model) + expect(new_model.name).to eq("ExistingModel") + expect(new_model.query).to eq("SELECT * FROM table WHERE timestamp >= '2022-01-01'") + expect(new_model.query_type).to eq("raw_sql") + expect(new_model.primary_key).to eq("id") end end end diff --git a/server/spec/lib/reverse_etl/utils/cursor_query_builder_spec.rb b/server/spec/lib/reverse_etl/utils/cursor_query_builder_spec.rb new file mode 100644 index 00000000..db2078aa --- /dev/null +++ b/server/spec/lib/reverse_etl/utils/cursor_query_builder_spec.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +require "rails_helper" + +module ReverseEtl + module Utils + RSpec.describe CursorQueryBuilder do + let(:existing_query) { "SELECT * FROM table" } + let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") } + let(:source_salesforce) do + create(:connector, connector_type: "source", connector_name: "SalesforceConsumerGoodsCloud") + end + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } + let(:model) { create(:model, connector: source, query: existing_query) } + let(:model_salesforce) { create(:model, connector: source, query: existing_query) } + + describe ".build_cursor_query" do + context "when both cursor_field and current_cursor_field are present" do + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp", current_cursor_field: "2022-01-01") + end + + let(:sync_salesforce) do + create(:sync, model: model_salesforce, source: source_salesforce, destination:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end + let(:sync_config) { sync.to_protocol } + let(:sync_config_salesforce) { sync_salesforce.to_protocol } + + it "updates the query for raw_sql query type with WHERE and ORDER BY clauses" do + query = described_class.build_cursor_query(sync_config, "2022-01-01") + + expected_query = "SELECT * FROM table AS subquery WHERE timestamp >= '2022-01-01' ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + + it "updates the query for soql query type with WHERE and ORDER BY clauses" do + query = described_class.build_cursor_query(sync_config_salesforce, "2022-01-01") + + expected_query = "SELECT * FROM table AS subquery WHERE timestamp >= 2022-01-01 ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + end + + context "when only cursor_field is present" do + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp") + end + let(:sync_salesforce) do + create(:sync, model: model_salesforce, source: source_salesforce, destination:, cursor_field: "timestamp") + end + let(:sync_config) { sync.to_protocol } + let(:sync_config_salesforce) { sync_salesforce.to_protocol } + + it "updates the query for raw_sql query type with only ORDER BY clause" do + query = described_class.build_cursor_query(sync_config, nil) + + expected_query = "SELECT * FROM table AS subquery ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + + it "updates the query for soql query type with only ORDER BY clause" do + query = described_class.build_cursor_query(sync_config_salesforce, nil) + + expected_query = "SELECT * FROM table AS subquery ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + end + + context "when neither cursor_field nor current_cursor_field are present" do + let(:sync) do + create(:sync, model:, source:, destination:) + end + let(:sync_config) { sync.to_protocol } + it "does not update the query" do + query = described_class.build_cursor_query(sync_config, nil) + + expect(query).to eq(nil) + end + end + end + end + end +end diff --git a/server/spec/models/catalog_spec.rb b/server/spec/models/catalog_spec.rb index 5ca6e36d..3ddd86d2 100644 --- a/server/spec/models/catalog_spec.rb +++ b/server/spec/models/catalog_spec.rb @@ -100,5 +100,31 @@ end end end + + describe "#default_cursor_field" do + let(:catalog) do + { + "streams" => [ + { "name" => "stream1", "default_cursor_field" => "timestamp" }, + { "name" => "stream2", "default_cursor_field" => "created_at" } + ], + "source_defined_cursor" => true + } + end + + let(:workspace) { create(:workspace) } + let(:connector) { create(:connector) } + let(:catalog_instance) { create(:catalog, workspace:, connector:, catalog:) } + + it "returns the default cursor field for the specified stream" do + stream_name = "stream1" + expect(catalog_instance.default_cursor_field(stream_name)).to eq("timestamp") + end + + it "returns nil if the stream doesn't exist or if source_defined_cursor is false" do + stream_name = "stream3" + expect(catalog_instance.default_cursor_field(stream_name)).to be_nil + end + end end end diff --git a/server/spec/models/sync_spec.rb b/server/spec/models/sync_spec.rb index 9d78aa70..02063013 100644 --- a/server/spec/models/sync_spec.rb +++ b/server/spec/models/sync_spec.rb @@ -59,11 +59,13 @@ }) end - let(:sync) { create(:sync, destination:) } + let(:sync) { create(:sync, destination:, cursor_field: "cursor_field", current_cursor_field: "2024-01-20") } it "returns sync config protocol" do protocol = sync.to_protocol expect(protocol).to be_a(Multiwoven::Integrations::Protocol::SyncConfig) + expect(protocol.cursor_field).to eq("cursor_field") + expect(protocol.current_cursor_field).to eq("2024-01-20") end end diff --git a/server/spec/requests/api/v1/syncs_controller_spec.rb b/server/spec/requests/api/v1/syncs_controller_spec.rb index 1401e672..d9206d3a 100644 --- a/server/spec/requests/api/v1/syncs_controller_spec.rb +++ b/server/spec/requests/api/v1/syncs_controller_spec.rb @@ -102,7 +102,8 @@ sync_interval: 10, sync_interval_unit: "minutes", stream_name: "profile", - sync_mode: "full_refresh" + sync_mode: "full_refresh", + cursor_field: "created_date" } } end @@ -127,6 +128,29 @@ expect(response_hash.dig(:data, :attributes, :model_id)).to eq(request_body.dig(:sync, :model_id)) expect(response_hash.dig(:data, :attributes, :schedule_type)).to eq(request_body.dig(:sync, :schedule_type)) expect(response_hash.dig(:data, :attributes, :stream_name)).to eq(request_body.dig(:sync, :stream_name)) + expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(request_body.dig(:sync, :cursor_field)) + expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil) + expect(response_hash.dig(:data, :attributes, :sync_interval_unit)) + .to eq(request_body.dig(:sync, :sync_interval_unit)) + expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(request_body.dig(:sync, :sync_interval)) + expect(response_hash.dig(:data, :attributes, :status)).to eq("pending") + end + + it "creates a new sync and returns success with cursor_field nil " do + request_body[:sync][:cursor_field] = nil + post "/api/v1/syncs", params: request_body.to_json, headers: { "Content-Type": "application/json" } + .merge(auth_headers(user)) + expect(response).to have_http_status(:created) + response_hash = JSON.parse(response.body).with_indifferent_access + expect(response_hash.dig(:data, :id)).to be_present + expect(response_hash.dig(:data, :type)).to eq("syncs") + expect(response_hash.dig(:data, :attributes, :source_id)).to eq(request_body.dig(:sync, :source_id)) + expect(response_hash.dig(:data, :attributes, :destination_id)).to eq(request_body.dig(:sync, :destination_id)) + expect(response_hash.dig(:data, :attributes, :model_id)).to eq(request_body.dig(:sync, :model_id)) + expect(response_hash.dig(:data, :attributes, :schedule_type)).to eq(request_body.dig(:sync, :schedule_type)) + expect(response_hash.dig(:data, :attributes, :stream_name)).to eq(request_body.dig(:sync, :stream_name)) + expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(nil) + expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil) expect(response_hash.dig(:data, :attributes, :sync_interval_unit)) .to eq(request_body.dig(:sync, :sync_interval_unit)) expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(request_body.dig(:sync, :sync_interval)) @@ -166,7 +190,8 @@ }, sync_interval: 10, sync_interval_unit: "minutes", - stream_name: "profile" + stream_name: "profile", + cursor_field: "cursor_field" } } end @@ -188,6 +213,8 @@ expect(response_hash.dig(:data, :id)).to be_present expect(response_hash.dig(:data, :id)).to eq(syncs.first.id.to_s) expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(30) + expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(nil) + expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil) end it "returns an error response when wrong sync_id" do