diff --git a/.github/workflows/deploy-main.yml b/.github/workflows/deploy-main.yml new file mode 100644 index 00000000..a85534ac --- /dev/null +++ b/.github/workflows/deploy-main.yml @@ -0,0 +1,35 @@ +name: Deploy main + +on: + push: + branches: + - main + paths: + - "ui/**" + - "server/**" + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Set up SSH key + env: + SSH_KEY_BASE64: ${{ secrets.SSH_KEY_BASE64 }} + run: | + echo "$SSH_KEY_BASE64" | base64 --decode > ${HOME}/multiwoven-deployments.pem + chmod 600 ${HOME}/multiwoven-deployments.pem + + - name: Deploy main + env: + SSH_HOST: ${{ secrets.SSH_HOST }} + SSH_USER: ${{ secrets.SSH_USER }} + run: | + ssh -o StrictHostKeyChecking=no -i ${HOME}/multiwoven-deployments.pem $SSH_USER@$SSH_HOST << 'EOF' + cd multiwoven + git pull origin main + docker-compose down + docker-compose up -d --build + EOF diff --git a/.gitignore b/.gitignore index b8911b38..b460a879 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ # Ignore dotenv file -.env \ No newline at end of file +.env +/.history diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index f4506cb9..667e1c59 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.1.57) + multiwoven-integrations (0.1.59) activesupport async-websocket csv diff --git a/integrations/lib/multiwoven/integrations/protocol/protocol.rb b/integrations/lib/multiwoven/integrations/protocol/protocol.rb index 9a4402d2..25f394a0 100644 --- a/integrations/lib/multiwoven/integrations/protocol/protocol.rb +++ b/integrations/lib/multiwoven/integrations/protocol/protocol.rb @@ -71,6 +71,7 @@ class Connector < ProtocolModel attribute :name, Types::String attribute :type, ConnectorType attribute :connection_specification, Types::Hash + attribute :query_type, Types::String.default("raw_sql").enum(*ConnectorQueryType.values) end class LogMessage < ProtocolModel @@ -114,7 +115,7 @@ class Stream < ProtocolModel attribute? :supported_sync_modes, Types::Array.of(SyncMode).optional.default(["incremental"]) # Applicable for database streams - attribute? :source_defined_cursor, Types::Bool.default(false).optional + attribute :source_defined_cursor, Types::Bool.default(false) attribute? :default_cursor_field, Types::Array.of(Types::String).optional attribute? :source_defined_primary_key, Types::Array.of(Types::Array.of(Types::String)).optional attribute? :namespace, Types::String.optional diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index d324a350..7c1f05be 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.1.57" + VERSION = "0.1.59" ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/client.rb b/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/client.rb index a494f3e7..3b771a6d 100644 --- a/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/client.rb +++ b/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/client.rb @@ -40,12 +40,6 @@ def discover(connection_config) def read(sync_config) connection_config = sync_config.source.connection_specification.with_indifferent_access initialize_client(connection_config) - return [] if sync_config.offset&.> 2000 - - # TODO: Salesforce imposes a limit on the use of OFFSET in SOQL queries, where you cannot skip(offset) more than 2000 records. - # This limitation can hinder the retrieval of large datasets in a single query. - # To overcome this, we need a cursor-based pagination strategy instead of relying on OFFSET. - # query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil? query = sync_config.model.query query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil? queried_data = @client.query(query) diff --git a/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/schema_helper.rb b/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/schema_helper.rb index 4ea22a32..6b47c54b 100644 --- a/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/schema_helper.rb +++ b/integrations/lib/multiwoven/integrations/source/salesforce_consumer_goods_cloud/schema_helper.rb @@ -118,9 +118,9 @@ def create_json_schema_for_object(metadata) "json_schema": json_schema, "required": required, "supported_sync_modes": %w[incremental], - "source_defined_cursor": true, - "default_cursor_field": ["updated"], - "source_defined_primary_key": [primary_key] + "source_defined_primary_key": [primary_key], + "source_defined_cursor": false, + "default_cursor_field": nil } end end diff --git a/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/schema_helper_spec.rb b/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/schema_helper_spec.rb index 9a2fdc02..cab3f079 100644 --- a/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/schema_helper_spec.rb +++ b/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/schema_helper_spec.rb @@ -47,8 +47,6 @@ expect(result[:batch_size]).to eq(0) expect(result[:required]).to contain_exactly("Field1") expect(result[:supported_sync_modes]).to contain_exactly("incremental") - expect(result[:source_defined_cursor]).to be true - expect(result[:default_cursor_field]).to contain_exactly("updated") end end end diff --git a/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb b/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb index 257fac9e..47f7c0e3 100644 --- a/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb +++ b/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb @@ -371,6 +371,7 @@ module Integrations::Protocol expect(connector).to be_a(described_class) expect(connector.name).to eq("example_connector") expect(connector.type).to eq("source") + expect(connector.query_type).to eq("raw_sql") expect(connector.connection_specification).to eq(key: "value") end end diff --git a/integrations/spec/multiwoven/integrations/source/salesforce_consumer_goods/schema_helper_spec.rb b/integrations/spec/multiwoven/integrations/source/salesforce_consumer_goods/schema_helper_spec.rb index 57039c93..9a818026 100644 --- a/integrations/spec/multiwoven/integrations/source/salesforce_consumer_goods/schema_helper_spec.rb +++ b/integrations/spec/multiwoven/integrations/source/salesforce_consumer_goods/schema_helper_spec.rb @@ -45,8 +45,8 @@ expect(result[:json_schema]).to be_a(Hash) expect(result[:required]).to contain_exactly("Field1") expect(result[:supported_sync_modes]).to contain_exactly("incremental") - expect(result[:source_defined_cursor]).to be true - expect(result[:default_cursor_field]).to contain_exactly("updated") + expect(result[:source_defined_cursor]).to eq(false) + expect(result[:default_cursor_field]).to eq(nil) end end end diff --git a/release-notes.md b/release-notes.md index abe43de6..a42285ba 100644 --- a/release-notes.md +++ b/release-notes.md @@ -2,184 +2,29 @@ All notable changes to this project will be documented in this file. -## [0.2.0] - 2024-04-08 +## [0.3.0] - 2024-04-15 ### 🚀 Features -- Add Filtering by Connector Type to 'Get All Connectors' API -- Add batch support for rETL -- Add default scope for Connector, Model and Sync -- Strong type validation for API -- Enhance DestinationSelector and Source and changed Dashboard UI -- Implemented delete sync -- Start with default custom object template from chakra package -- Try simple layouting -- Add support for specifying colspans -- Add title field for overriding titles in form -- Use overridden title field instead of default one -- Add BaseInputTemplate -- Add title field template -- Add title to all the meta json in all connector -- Add postgresql connector -- Rate limiting -- Configure rate limit for destinations -- Add hubspot destination connector -- Add databricks source connector -- Reporting dashboard api -- Standardization of output -- Implement static, variable and template catalog field mapping -- Enable rate limiting -- Add finite state machine for sync states -- Sync run and sync record controller -- Terminate workflow on sync deletion -- Delete source -- Edit destinations screen and delete destinations -- Add pull request template -- Add env example file -- Configure retry policy for terminate workflows -- Add databricks odbc driver -- Sync run api by sync id -- Support full refresh sync -- Custom liquid helper to_datetime -- Sync mode changes for destination connectors -- Destination/google sheets -- *(destination)* Add airtable -- *(destination)* Add Stripe destination connector -- Support full refresh sync -- Add Salesforce Consumer Goods destination connector -- Add Salesforce Consumer Goods source connector -- Add postgresql destination connector -- Release process automation -- Move integrations gem github actions to root -- Move server and ui github action ci to root -- Added sync records -- Add soql support query_type -- Update salesforce consumer goods authentication -- Flatten salesforce records attributes -- Add connector images to github to serve icons -- Adding batch_support flag and batch_size in catalog for sync -- Add batch support for facebook custom audience connector -- The volume mount paths don't clash with existing volumes and .env file added -- Create sync run when pending sync run is absent -- Duplicate sync record with primary key +- Beautify sql queries +- Add query type support for SOQL +- Add sync run mailer on success or failure (#24) +- Updated Dockerfile and env to run docker-compose on production mode. (#27) +- Added custom dropdown for destination columns (#30) +- Add code climate to github actions (#31) +- Automate integrations gem release (#37) +- Add protocol changes for cursor based (#38) +- Pull request template for monorepo (#39) +- Remove changes associated with multiwoven-integrations repository (#41) +- Added hover text for columns (#36) +- Add github action to deploy main (#44) ### 🐛 Bug Fixes -- Snowflake odbc -- Increment sync run offsets after each batch query -- Model preview queries -- IncrementalDelta primary key downcase issue -- Input background not white when put in not white containers -- Add empty line at the end -- Add types for connector schema response from backend -- Persist configure screen data on navigation -- Edit syncs dropdown values -- Conditionally render footer elements in create sync -- Pre-render components during edit sync -- Redirect routes for adding models and connectors -- Footer buttons missing in source config form -- Spinner style mismatch -- Updated edit model header to include breadcrumbs -- Added breadcrumbs and ui fix in edit query screen -- Multiple mapping bug during create sync -- Update chakra theme with design system font tokens -- Design mismatch between form component and design -- Spacing issues between the input and label -- Style mismatch of description in source config form -- Clicking on add source goes to destinations -- Description spanning fully by adding a max width -- Add connector button hardcoded to navigate to sources page always -- Update text for no connectors found -- Add batch params to stream -- Release github action to use official gem action -- Slack and salesforce svg -- Avoid numbers in path created by gem in server -- Change keytransform from underscore to unaltered -- Update link to resources in readme -- Prevent sync without connector -- Bypass cc-test-reporter steps -- Return complete slices in reporting -- Update condition for domain name -- Update issues link in the readme -- Handle error during listing sync -- Docker file env update for api host -- Soft delete for sync run -- Password policy -- Pagination link invalid -- Update login timeout -- Handle nil for to_datetime filter in liquid -- Databricks connector specification -- Spec password -- Update query and create connection -- Sftp server integration -- Schema_mode changes to enum -- Sftp full refresh -- Add batch support fields in catalog streams -- Batch size in salesforce consumer goods cloud -- Destination path fix in check connection -- Sftp connection spec filename added -- Build and lint errors -- Lint and build errors -- Add primary key validation for extractor - -### 🚜 Refactor - -- Heartbeat timeout for loader and extractor activity -- Remove unwanted code added for debugging -- Change ui schemas to an object so that it only gets applied on pages where the ui schema is to be used -- Extract query method for model preview -- Support multiple connectors in reporting -- Minor changes -- Moved tab filters to new component - -### 📚 Documentation - -- Add comments to explain layout schema -- Add github template for multiwoven server -- Update readme for ui -- Update readme link for contribution guideline -- Update contribution guidelines - -### 🎨 Styling - -- Fix the exit modal as per designs -- Update styling for step 1 for syncs page -- Update the brand color in chakra config -- Fix the designs of the top bar and model list table -- Update table coloumns styling -- Update styles for step 3 in syncs page -- Fix the alignment of the footer container -- Fix footer button alignments -- Update styling for finale sync page -- Update heading text -- Update the color to use from tokens -- Fix weaving soon design -- Ui improvements for edit sync page -- Make status tag consistent -- Update styling for edit model details modal -- Update edit model modal buttons -- Update styling for delete modal -- Update styling for final model submit step -- Update background color for syncs page -- Update bg color for the list screens -- Update all destinations pill -- Update connect your destination form styling -- Update test destination screen -- Fix the padding of table -- Update background colors for edit sync box -- Update padding of the box -- Update the font size -- Update the border color for disabled state -- Update designs for selecting data source -- Update style for testing source screen -- Update design for final source step -- Update top bar for edit source -- Update form design for edit source -- Update side nav as per figma -- Update styling for logout modal -- Align the breadcrumbs on top bar -- Update copy changes for syncs screen -- Update copy changes for models -- Update copy for destinations screen +- Add connector_query_type in spec (#23) +- Remove disable condition for adding more mapping (#26) +- Update sync run mailer to trigger on failure only +- Code climate reporter path not found +- Update back button navigation on syncs configuration (#35) diff --git a/server/Gemfile b/server/Gemfile index 5b794434..7b80f161 100644 --- a/server/Gemfile +++ b/server/Gemfile @@ -12,7 +12,7 @@ gem "interactor", "~> 3.0" gem "ruby-odbc", git: "https://github.com/Multiwoven/ruby-odbc.git" -gem "multiwoven-integrations", "~> 0.1.55" +gem "multiwoven-integrations", "~> 0.1.59" gem "temporal-ruby", github: "coinbase/temporal-ruby" diff --git a/server/Gemfile.lock b/server/Gemfile.lock index 20389f2e..33560de8 100644 --- a/server/Gemfile.lock +++ b/server/Gemfile.lock @@ -106,7 +106,7 @@ GEM activerecord (>= 3.2, < 8.0) rake (>= 10.4, < 14.0) ast (2.4.2) - async (2.10.1) + async (2.10.2) console (~> 1.10) fiber-annotation io-event (~> 1.5, >= 1.5.1) @@ -119,7 +119,7 @@ GEM protocol-http1 (~> 0.19.0) protocol-http2 (~> 0.16.0) traces (>= 0.10.0) - async-io (1.42.0) + async-io (1.42.1) async async-pool (0.4.0) async (>= 1.25) @@ -1862,7 +1862,7 @@ GEM msgpack (1.7.2) multi_json (1.15.0) multipart-post (2.4.0) - multiwoven-integrations (0.1.55) + multiwoven-integrations (0.1.59) activesupport async-websocket csv @@ -1925,7 +1925,7 @@ GEM net-smtp premailer (~> 1.7, >= 1.7.9) protocol-hpack (1.4.3) - protocol-http (0.26.2) + protocol-http (0.26.4) protocol-http1 (0.19.0) protocol-http (~> 0.22) protocol-http2 (0.16.0) @@ -2071,7 +2071,7 @@ GEM gli hashie stringio (3.1.0) - stripe (10.14.0) + stripe (11.1.0) thor (1.3.0) timecop (0.9.8) timeout (0.4.1) @@ -2130,7 +2130,7 @@ DEPENDENCIES jwt kaminari liquid - multiwoven-integrations (~> 0.1.55) + multiwoven-integrations (~> 0.1.59) newrelic_rpm parallel pg (~> 1.1) diff --git a/server/app/contracts/sync_contracts.rb b/server/app/contracts/sync_contracts.rb index b0d8ba3f..2c355389 100644 --- a/server/app/contracts/sync_contracts.rb +++ b/server/app/contracts/sync_contracts.rb @@ -24,6 +24,7 @@ class Create < Dry::Validation::Contract required(:sync_interval_unit).filled(:string) required(:sync_mode).filled(:string) required(:stream_name).filled(:string) + optional(:cursor_field).maybe(:string) # update filled with validating array of hashes required(:configuration).filled diff --git a/server/app/controllers/api/v1/models_controller.rb b/server/app/controllers/api/v1/models_controller.rb index aaae944d..11738348 100644 --- a/server/app/controllers/api/v1/models_controller.rb +++ b/server/app/controllers/api/v1/models_controller.rb @@ -72,8 +72,11 @@ def set_model end def validate_query + query = params.dig(:model, :query) + return if query.blank? + query_type = @model.present? ? @model.connector.connector_query_type : @connector.connector_query_type - Utils::QueryValidator.validate_query(query_type, params.dig(:model, :query)) + Utils::QueryValidator.validate_query(query_type, query) rescue StandardError => e render_error( message: "Query validation failed: #{e.message}", diff --git a/server/app/controllers/api/v1/syncs_controller.rb b/server/app/controllers/api/v1/syncs_controller.rb index 9b445d4f..ec177d27 100644 --- a/server/app/controllers/api/v1/syncs_controller.rb +++ b/server/app/controllers/api/v1/syncs_controller.rb @@ -93,6 +93,7 @@ def sync_params :sync_mode, :sync_interval_unit, :stream_name, + :cursor_field, configuration: %i[from to mapping_type @@ -104,7 +105,7 @@ def sync_params if params.to_unsafe_h[:sync][:configuration].is_a?(Hash) strong_params.merge!(configuration: params.to_unsafe_h[:sync][:configuration]) end - + strong_params.delete(:cursor_field) if action_name == "update" strong_params end end diff --git a/server/app/interactors/syncs/create_sync.rb b/server/app/interactors/syncs/create_sync.rb index 0abd29ec..2cc40164 100644 --- a/server/app/interactors/syncs/create_sync.rb +++ b/server/app/interactors/syncs/create_sync.rb @@ -5,6 +5,10 @@ class CreateSync include Interactor def call + source = context.workspace.connectors.find_by(id: context.sync_params[:source_id]) + + default_cursor_field = source.catalog&.default_cursor_field(context.sync_params[:stream_name]) + context.sync_params[:cursor_field] = default_cursor_field if default_cursor_field.present? sync = context .workspace.syncs .create(context.sync_params) diff --git a/server/app/models/catalog.rb b/server/app/models/catalog.rb index 322c7ea9..fac48440 100644 --- a/server/app/models/catalog.rb +++ b/server/app/models/catalog.rb @@ -50,4 +50,9 @@ def stream_to_protocol(stream) request_rate_concurrency: ) end + + def default_cursor_field(stream_name) + current_stream = catalog["streams"].find { |stream| stream["name"] == stream_name } + current_stream["default_cursor_field"] if current_stream && catalog["source_defined_cursor"] + end end diff --git a/server/app/models/connector.rb b/server/app/models/connector.rb index 2be6bd6f..ac1d7212 100644 --- a/server/app/models/connector.rb +++ b/server/app/models/connector.rb @@ -66,7 +66,8 @@ def to_protocol Multiwoven::Integrations::Protocol::Connector.new( name: connector_name, type: connector_type, - connection_specification: configuration + connection_specification: configuration, + query_type: connector_query_type ) end @@ -83,7 +84,6 @@ def connector_query_type connector_type.to_s.camelize, connector_name.to_s.camelize ).new connector_spec = client.connector_spec - connector_spec&.connector_query_type || "raw_sql" end end diff --git a/server/app/models/sync.rb b/server/app/models/sync.rb index 5cf01ee2..34adbcc9 100644 --- a/server/app/models/sync.rb +++ b/server/app/models/sync.rb @@ -13,6 +13,8 @@ # source_catalog_id :integer # schedule_type :string # status :integer +# cursor_field :string +# current_cursor_field :string # created_at :datetime not null # updated_at :datetime not null # @@ -82,7 +84,9 @@ def to_protocol catalog.find_stream_by_name(stream_name) ), sync_mode: Multiwoven::Integrations::Protocol::SyncMode[sync_mode], - destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"] + destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"], + cursor_field:, + current_cursor_field: ) end diff --git a/server/app/serializers/sync_serializer.rb b/server/app/serializers/sync_serializer.rb index de2a92c4..f78b4292 100644 --- a/server/app/serializers/sync_serializer.rb +++ b/server/app/serializers/sync_serializer.rb @@ -3,7 +3,7 @@ class SyncSerializer < ActiveModel::Serializer attributes :id, :source_id, :destination_id, :model_id, :configuration, :schedule_type, :sync_mode, :sync_interval, :sync_interval_unit, - :stream_name, :status, + :stream_name, :status, :cursor_field, :current_cursor_field, :updated_at, :created_at attribute :source do diff --git a/server/db/migrate/20240412183836_add_cursor_fields_to_syncs.rb b/server/db/migrate/20240412183836_add_cursor_fields_to_syncs.rb new file mode 100644 index 00000000..890ab028 --- /dev/null +++ b/server/db/migrate/20240412183836_add_cursor_fields_to_syncs.rb @@ -0,0 +1,6 @@ +class AddCursorFieldsToSyncs < ActiveRecord::Migration[7.1] + def change + add_column :syncs, :cursor_field, :string + add_column :syncs, :current_cursor_field, :string + end +end diff --git a/server/db/schema.rb b/server/db/schema.rb index 4493d9a5..7d4e66cd 100644 --- a/server/db/schema.rb +++ b/server/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_04_08_061904) do +ActiveRecord::Schema[7.1].define(version: 2024_04_12_183836) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -33,7 +33,6 @@ t.datetime "updated_at", null: false t.string "connector_name" t.string "description" - t.integer "query_type", default: 0 end create_table "models", force: :cascade do |t| @@ -109,6 +108,8 @@ t.string "stream_name" t.string "workflow_id" t.datetime "discarded_at" + t.string "cursor_field" + t.string "current_cursor_field" t.index ["discarded_at"], name: "index_syncs_on_discarded_at" end diff --git a/server/lib/reverse_etl/extractors/base.rb b/server/lib/reverse_etl/extractors/base.rb index 4277c1e9..7bf9c2f4 100644 --- a/server/lib/reverse_etl/extractors/base.rb +++ b/server/lib/reverse_etl/extractors/base.rb @@ -4,8 +4,8 @@ module ReverseEtl module Extractors class Base DEFAULT_OFFSET = 0 - DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i - DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i + DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i + DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i THREAD_COUNT = (ENV["SYNC_EXTRACTOR_THREAD_POOL_SIZE"] || "5").to_i def read(_sync_run_id) diff --git a/server/lib/reverse_etl/extractors/incremental_delta.rb b/server/lib/reverse_etl/extractors/incremental_delta.rb index f40897aa..458ace47 100644 --- a/server/lib/reverse_etl/extractors/incremental_delta.rb +++ b/server/lib/reverse_etl/extractors/incremental_delta.rb @@ -17,11 +17,14 @@ def read(sync_run_id, activity) batch_query_params = batch_params(source_client, sync_run) model = sync_run.sync.model - ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, current_offset| + ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, + current_offset, last_cursor_field_value| + total_query_rows += records.count process_records(records, sync_run, model) heartbeat(activity) sync_run.update(current_offset:, total_query_rows:) + sync_run.sync.update(current_cursor_field: last_cursor_field_value) end # change state querying to queued sync_run.queue! diff --git a/server/lib/reverse_etl/utils/batch_query.rb b/server/lib/reverse_etl/utils/batch_query.rb index 80acf9bc..33cf9bc6 100644 --- a/server/lib/reverse_etl/utils/batch_query.rb +++ b/server/lib/reverse_etl/utils/batch_query.rb @@ -6,26 +6,68 @@ class BatchQuery def self.execute_in_batches(params) raise ArgumentError, "Batch size must be greater than 0" if params[:batch_size] <= 0 + initial_sync_config = params[:sync_config] current_offset = params[:offset] - + last_cursor_field_value = params[:sync_config].current_cursor_field loop do # Set the current limit and offset in the sync configuration params[:sync_config].limit = params[:batch_size] params[:sync_config].offset = current_offset + if initial_sync_config.cursor_field + query_with_cursor = CursorQueryBuilder.build_cursor_query(initial_sync_config, last_cursor_field_value) + params[:sync_config] = build_cursor_sync_config(params[:sync_config], query_with_cursor) + end + # Execute the batch query result = params[:client].read(params[:sync_config]) + # Extract the value of the cursor_field column from the last record + last_cursor_field_value = extract_last_cursor_field_value(result, params[:sync_config]) # Increment the offset by the batch size for the next iteration current_offset += params[:batch_size] break if result.empty? - yield result, current_offset if block_given? + yield result, current_offset, last_cursor_field_value if block_given? # Break the loop if the number of records fetched is less than the batch size # break if result.size < params[:batch_size] end end + + def self.extract_last_cursor_field_value(result, sync_config) + return nil unless sync_config.cursor_field && !result.empty? + + last_record = result.last.record.data + last_record[sync_config.cursor_field] + end + + def self.build_cursor_sync_config(sync_config, new_query) + new_model = build_new_model(sync_config.model, new_query) + + modified_sync_config = Multiwoven::Integrations::Protocol::SyncConfig.new( + model: new_model.to_protocol, + source: sync_config.source, + destination: sync_config.destination, + stream: sync_config.stream, + sync_mode: sync_config.sync_mode, + destination_sync_mode: sync_config.destination_sync_mode, + cursor_field: sync_config.cursor_field, + current_cursor_field: sync_config.current_cursor_field + ) + modified_sync_config.offset = 0 + modified_sync_config.limit = sync_config.limit + modified_sync_config + end + + def self.build_new_model(existing_model, new_query) + Model.new( + name: existing_model.name, + query: new_query, + query_type: existing_model.query_type, + primary_key: existing_model.primary_key + ) + end end end end diff --git a/server/lib/reverse_etl/utils/cursor_query_builder.rb b/server/lib/reverse_etl/utils/cursor_query_builder.rb new file mode 100644 index 00000000..cbdf17d6 --- /dev/null +++ b/server/lib/reverse_etl/utils/cursor_query_builder.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module ReverseEtl + module Utils + class CursorQueryBuilder + def self.build_cursor_query(sync_config, current_cursor_field) + existing_query = sync_config.model.query + query_type = sync_config.source.query_type || "raw_sql" + if current_cursor_field + cursor_condition = case query_type.to_sym + when :soql + "#{sync_config.cursor_field} >= #{current_cursor_field}" + when :raw_sql + "#{sync_config.cursor_field} >= '#{current_cursor_field}'" + end + end + if cursor_condition + "#{existing_query} AS subquery " \ + "WHERE #{cursor_condition} " \ + "ORDER BY #{sync_config.cursor_field} ASC" + elsif sync_config.cursor_field + "#{existing_query} AS subquery " \ + "ORDER BY #{sync_config.cursor_field} ASC" + end + end + end + end +end diff --git a/server/spec/interactors/syncs/create_sync_spec.rb b/server/spec/interactors/syncs/create_sync_spec.rb index a4da0714..bb6257b1 100644 --- a/server/spec/interactors/syncs/create_sync_spec.rb +++ b/server/spec/interactors/syncs/create_sync_spec.rb @@ -4,10 +4,13 @@ RSpec.describe Syncs::CreateSync do let(:workspace) { create(:workspace) } - let(:source) { create(:connector, workspace:) } + let(:source) { create(:connector, workspace:, connector_type: "source") } let(:destination) { create(:connector, workspace:) } let(:model) { create(:model, workspace:, connector: source) } - let(:sync) { build(:sync, workspace:, source:, destination:, model:) } + let(:sync) do + build(:sync, workspace:, source:, destination:, model:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end before do create(:catalog, connector: source) @@ -18,24 +21,27 @@ it "creates a sync" do result = described_class.call( workspace:, - sync_params: sync.attributes.except("id", "created_at", "updated_at") + sync_params: sync.attributes.except("id", "created_at", "updated_at").with_indifferent_access ) expect(result.success?).to eq(true) expect(result.sync.persisted?).to eql(true) expect(result.sync.source_id).to eql(source.id) expect(result.sync.destination_id).to eql(destination.id) expect(result.sync.model_id).to eql(model.id) + expect(result.sync.cursor_field).to eql(sync.cursor_field) + expect(result.sync.current_cursor_field).to eql(sync.current_cursor_field) end end context "with invalid params" do let(:sync_params) do - { source_id: nil } + sync.attributes.except("id", "created_at", "destination_id") end it "fails to create sync" do - result = described_class.call(workspace:, sync_params:) + result = described_class.call(workspace:, sync_params: sync_params.with_indifferent_access) expect(result.failure?).to eq(true) + expect(result.sync.persisted?).to eql(false) end end end diff --git a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb index 3136817b..8a581598 100644 --- a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb +++ b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb @@ -46,7 +46,7 @@ before do sync.model.update(primary_key: "id") allow(client).to receive(:read).and_return(records) - allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield(records, 1) + allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield(records, 1, nil) allow(sync_run1.sync.source).to receive_message_chain(:connector_client, :new).and_return(client) allow(activity).to receive(:heartbeat) allow(activity).to receive(:cancel_requested).and_return(false) @@ -83,7 +83,8 @@ emitted_at: DateTime.now.to_i ).to_multiwoven_message - allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([modified_record1, record2], 1) + allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([modified_record1, record2], 1, + "2022-01-01") # Second sync run expect(sync_run2).to have_state(:started) @@ -91,6 +92,7 @@ subject.read(sync_run2.id, activity) sync_run2.reload expect(sync_run2).to have_state(:queued) + expect(sync_run2.sync.current_cursor_field).to eql("2022-01-01") updated_sync_record = sync_run2.sync_records.find_by(primary_key: record1.record.data["id"]) expect(sync_run2.sync_records.count).to eq(1) @@ -98,7 +100,8 @@ expect(updated_sync_record.action).to eq("destination_update") expect(updated_sync_record.record).to eq(modified_record1.record.data) - allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([record2, record3], 1) + allow(ReverseEtl::Utils::BatchQuery).to receive(:execute_in_batches).and_yield([record2, record3], 1, + "2022-01-02") # Third sync run with same record expect(sync_run3).to have_state(:started) @@ -107,6 +110,7 @@ sync_run3.reload expect(sync_run3).to have_state(:queued) expect(sync_run3.sync_records.count).to eq(0) + expect(sync_run3.sync.current_cursor_field).to eql("2022-01-02") end end diff --git a/server/spec/lib/reverse_etl/loaders/standard_spec.rb b/server/spec/lib/reverse_etl/loaders/standard_spec.rb index 3eabb334..f73dfb2f 100644 --- a/server/spec/lib/reverse_etl/loaders/standard_spec.rb +++ b/server/spec/lib/reverse_etl/loaders/standard_spec.rb @@ -33,6 +33,18 @@ let!(:sync_record_batch2) { create(:sync_record, sync: sync_batch, sync_run: sync_run_batch, primary_key: "key2") } let!(:sync_record_individual) { create(:sync_record, sync: sync_individual, sync_run: sync_run_individual) } let(:activity) { instance_double("LoaderActivity") } + let(:connector_spec) do + Multiwoven::Integrations::Protocol::ConnectorSpecification.new( + connector_query_type: "raw_sql", + stream_type: "dynamic", + connection_specification: { + :$schema => "http://json-schema.org/draft-07/schema#", + :title => "Snowflake", + :type => "object", + :stream => {} + } + ) + end before do allow(activity).to receive(:heartbeat) @@ -49,6 +61,9 @@ end let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_batch_records method" do allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_batch.to_protocol, transform).and_return(multiwoven_message) @@ -76,6 +91,9 @@ end let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_batch_records method" do allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_batch.to_protocol, transform).and_return(multiwoven_message) @@ -100,6 +118,9 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_individual_records method" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_individual.to_protocol, [transform]).and_return(multiwoven_message) @@ -123,9 +144,12 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } - + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_individual_records method" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).with(sync_individual.to_protocol, [transform]).and_return(multiwoven_message) expect(subject).to receive(:heartbeat).once.with(activity) expect(sync_run_individual).to have_state(:queued) @@ -155,7 +179,9 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { tracker.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } - + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "sync run started to in_progress" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_individual.to_protocol, [transform]).and_return(multiwoven_message) @@ -178,6 +204,9 @@ let(:transform) { transformer.transform(sync_individual, sync_record_individual) } let(:multiwoven_message) { control.to_multiwoven_message } let(:client) { instance_double(sync_individual.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "sync run started to in_progress" do allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) @@ -207,6 +236,9 @@ end let(:multiwoven_message) { control.to_multiwoven_message } let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end it "calls process_batch_records method" do allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) allow(client).to receive(:write).with(sync_batch.to_protocol, transform).and_return(multiwoven_message) diff --git a/server/spec/lib/reverse_etl/utils/batch_query_spec.rb b/server/spec/lib/reverse_etl/utils/batch_query_spec.rb index 4ce62f42..8acbfd5f 100644 --- a/server/spec/lib/reverse_etl/utils/batch_query_spec.rb +++ b/server/spec/lib/reverse_etl/utils/batch_query_spec.rb @@ -3,43 +3,173 @@ require "rails_helper" module ReverseEtl - module Utils + module Utils # rubocop:disable Metrics/ModuleLength RSpec.describe BatchQuery do describe ".execute_in_batches" do let(:client) { double("Client") } + context "when neither cursor_field nor current_cursor_field are present" do + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } - let(:destination) { create(:connector, connector_type: "destination") } - let!(:catalog) { create(:catalog, connector: destination) } + let(:sync) { create(:sync, destination:) } + + before do + call_count = 0 + allow(client).to receive(:read) do |_config| + call_count += 1 + call_count < 10 ? Array.new(100, "mock_data") : [] + end + end + + it "executes batches correctly" do + params = { + offset: 0, + limit: 100, + batch_size: 100, + sync_config: sync.to_protocol, + client: + } - let(:sync) { create(:sync, destination:) } + expect(client).to receive(:read).exactly(10).times + + results = [] + BatchQuery.execute_in_batches(params) do |result| + results << result + end + + expect(results.size).to eq(9) + expect(results.first.size).to eq(100) + expect(results.last.size).to eq(100) + end + end + context "when both cursor_field is present" do + let(:existing_query) { "SELECT * FROM table" } + let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") } + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } + let(:model) { create(:model, connector: source, query: existing_query) } + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end + let(:record) do + Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "test1@mail.com", + "first_name" => "John", "Last Name" => "Doe", + "timestamp" => "2022-01-01" }, + emitted_at: DateTime.now.to_i).to_multiwoven_message + end - before do - call_count = 0 - allow(client).to receive(:read) do |_config| - call_count += 1 - call_count < 10 ? Array.new(100, "mock_data") : [] + it "executes batches and call CursorQueryBuilder" do + params = { + offset: 0, + limit: 100, + batch_size: 100, + sync_config: sync.to_protocol, + client: + } + allow(client).to receive(:read).and_return(*Array.new(1, [record]), []) + expect(CursorQueryBuilder).to receive(:build_cursor_query).with(sync.to_protocol, + "2022-01-01") + .and_call_original.twice + results = [] + BatchQuery.execute_in_batches(params) do |result, current_offset, last_cursor_field_value| + expect(result.first).to be_an_instance_of(Multiwoven::Integrations::Protocol::MultiwovenMessage) + expect(current_offset).to eq(100) + expect(last_cursor_field_value).to eq("2022-01-01") + results << result + end end end + end - it "executes batches correctly" do - params = { - offset: 0, - limit: 100, - batch_size: 100, - sync_config: sync.to_protocol, - client: - } + describe ".extract_last_cursor_field_value" do + let(:sync_config) { instance_double(Multiwoven::Integrations::Protocol::SyncConfig, cursor_field: "timestamp") } - expect(client).to receive(:read).exactly(10).times + context "when result is empty" do + it "returns nil" do + result = [] + expect(described_class.extract_last_cursor_field_value(result, sync_config)).to be_nil + end + end - results = [] - BatchQuery.execute_in_batches(params) do |result| - results << result + context "when result is not empty" do + let(:record1) do + Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "test1@mail.com", + "first_name" => "John", "Last Name" => "Doe", + "timestamp" => "2022-01-01" }, + emitted_at: DateTime.now.to_i).to_multiwoven_message end + let(:record2) do + Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "test1@mail.com", + "first_name" => "John", "Last Name" => "Doe", + "timestamp" => "2022-01-02" }, + emitted_at: DateTime.now.to_i).to_multiwoven_message + end + let(:result) { [record1, record2] } + + it "returns the value of the last record's cursor field" do + expect(described_class.extract_last_cursor_field_value(result, sync_config)).to eq("2022-01-02") + end + end + + context "when sync_config has no cursor field" do + let(:sync_config) { instance_double(Multiwoven::Integrations::Protocol::SyncConfig, cursor_field: nil) } + let(:result) do + [instance_double(Multiwoven::Integrations::Protocol::RecordMessage, data: { "timestamp" => "2022-01-01" })] + end + + it "returns nil" do + expect(described_class.extract_last_cursor_field_value(result, sync_config)).to be_nil + end + end + end + + describe ".build_cursor_sync_config" do + let(:existing_query) { "SELECT * FROM table" } + let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") } + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } + let(:model) { create(:model, connector: source, query: existing_query) } + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end + + let(:new_query) { "SELECT * FROM table WHERE timestamp >= '2022-01-01'" } + let(:sync_config) { sync.to_protocol } + + it "builds a new SyncConfig with modified query and other attributes" do + modified_sync_config = described_class.build_cursor_sync_config(sync_config, new_query) + + expect(modified_sync_config).to be_a(Multiwoven::Integrations::Protocol::SyncConfig) + expect(modified_sync_config.model.name).to eq(sync_config.model.name) + expect(modified_sync_config.model.query).to eq("SELECT * FROM table WHERE timestamp >= '2022-01-01'") + expect(modified_sync_config.model.query_type).to eq("raw_sql") + expect(modified_sync_config.model.primary_key).to eq("TestPrimaryKey") + expect(modified_sync_config.source).to eq(source.to_protocol) + expect(modified_sync_config.destination).to eq(destination.to_protocol) + expect(modified_sync_config.stream).to eq(sync_config.stream) + expect(modified_sync_config.sync_mode).to eq(sync_config.sync_mode) + expect(modified_sync_config.destination_sync_mode).to eq(sync_config.destination_sync_mode) + expect(modified_sync_config.cursor_field).to eq(sync_config.cursor_field) + expect(modified_sync_config.current_cursor_field).to eq(sync_config.current_cursor_field) + expect(modified_sync_config.limit).to eq(sync_config.limit) + expect(modified_sync_config.offset).to eq(0) + end + end + + describe ".build_new_model" do + let(:existing_model) { instance_double(Model, name: "ExistingModel", query_type: "raw_sql", primary_key: "id") } + let(:new_query) { "SELECT * FROM table WHERE timestamp >= '2022-01-01'" } + + it "builds a new Model with modified query and other attributes" do + new_model = described_class.build_new_model(existing_model, new_query) - expect(results.size).to eq(9) - expect(results.first.size).to eq(100) - expect(results.last.size).to eq(100) + expect(new_model).to be_a(Model) + expect(new_model.name).to eq("ExistingModel") + expect(new_model.query).to eq("SELECT * FROM table WHERE timestamp >= '2022-01-01'") + expect(new_model.query_type).to eq("raw_sql") + expect(new_model.primary_key).to eq("id") end end end diff --git a/server/spec/lib/reverse_etl/utils/cursor_query_builder_spec.rb b/server/spec/lib/reverse_etl/utils/cursor_query_builder_spec.rb new file mode 100644 index 00000000..db2078aa --- /dev/null +++ b/server/spec/lib/reverse_etl/utils/cursor_query_builder_spec.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +require "rails_helper" + +module ReverseEtl + module Utils + RSpec.describe CursorQueryBuilder do + let(:existing_query) { "SELECT * FROM table" } + let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") } + let(:source_salesforce) do + create(:connector, connector_type: "source", connector_name: "SalesforceConsumerGoodsCloud") + end + let(:destination) { create(:connector, connector_type: "destination") } + let!(:catalog) { create(:catalog, connector: destination) } + let(:model) { create(:model, connector: source, query: existing_query) } + let(:model_salesforce) { create(:model, connector: source, query: existing_query) } + + describe ".build_cursor_query" do + context "when both cursor_field and current_cursor_field are present" do + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp", current_cursor_field: "2022-01-01") + end + + let(:sync_salesforce) do + create(:sync, model: model_salesforce, source: source_salesforce, destination:, cursor_field: "timestamp", + current_cursor_field: "2022-01-01") + end + let(:sync_config) { sync.to_protocol } + let(:sync_config_salesforce) { sync_salesforce.to_protocol } + + it "updates the query for raw_sql query type with WHERE and ORDER BY clauses" do + query = described_class.build_cursor_query(sync_config, "2022-01-01") + + expected_query = "SELECT * FROM table AS subquery WHERE timestamp >= '2022-01-01' ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + + it "updates the query for soql query type with WHERE and ORDER BY clauses" do + query = described_class.build_cursor_query(sync_config_salesforce, "2022-01-01") + + expected_query = "SELECT * FROM table AS subquery WHERE timestamp >= 2022-01-01 ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + end + + context "when only cursor_field is present" do + let(:sync) do + create(:sync, model:, source:, destination:, cursor_field: "timestamp") + end + let(:sync_salesforce) do + create(:sync, model: model_salesforce, source: source_salesforce, destination:, cursor_field: "timestamp") + end + let(:sync_config) { sync.to_protocol } + let(:sync_config_salesforce) { sync_salesforce.to_protocol } + + it "updates the query for raw_sql query type with only ORDER BY clause" do + query = described_class.build_cursor_query(sync_config, nil) + + expected_query = "SELECT * FROM table AS subquery ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + + it "updates the query for soql query type with only ORDER BY clause" do + query = described_class.build_cursor_query(sync_config_salesforce, nil) + + expected_query = "SELECT * FROM table AS subquery ORDER BY timestamp ASC" + expect(query).to eq(expected_query) + end + end + + context "when neither cursor_field nor current_cursor_field are present" do + let(:sync) do + create(:sync, model:, source:, destination:) + end + let(:sync_config) { sync.to_protocol } + it "does not update the query" do + query = described_class.build_cursor_query(sync_config, nil) + + expect(query).to eq(nil) + end + end + end + end + end +end diff --git a/server/spec/models/catalog_spec.rb b/server/spec/models/catalog_spec.rb index 5ca6e36d..3ddd86d2 100644 --- a/server/spec/models/catalog_spec.rb +++ b/server/spec/models/catalog_spec.rb @@ -100,5 +100,31 @@ end end end + + describe "#default_cursor_field" do + let(:catalog) do + { + "streams" => [ + { "name" => "stream1", "default_cursor_field" => "timestamp" }, + { "name" => "stream2", "default_cursor_field" => "created_at" } + ], + "source_defined_cursor" => true + } + end + + let(:workspace) { create(:workspace) } + let(:connector) { create(:connector) } + let(:catalog_instance) { create(:catalog, workspace:, connector:, catalog:) } + + it "returns the default cursor field for the specified stream" do + stream_name = "stream1" + expect(catalog_instance.default_cursor_field(stream_name)).to eq("timestamp") + end + + it "returns nil if the stream doesn't exist or if source_defined_cursor is false" do + stream_name = "stream3" + expect(catalog_instance.default_cursor_field(stream_name)).to be_nil + end + end end end diff --git a/server/spec/models/sync_spec.rb b/server/spec/models/sync_spec.rb index 9d78aa70..02063013 100644 --- a/server/spec/models/sync_spec.rb +++ b/server/spec/models/sync_spec.rb @@ -59,11 +59,13 @@ }) end - let(:sync) { create(:sync, destination:) } + let(:sync) { create(:sync, destination:, cursor_field: "cursor_field", current_cursor_field: "2024-01-20") } it "returns sync config protocol" do protocol = sync.to_protocol expect(protocol).to be_a(Multiwoven::Integrations::Protocol::SyncConfig) + expect(protocol.cursor_field).to eq("cursor_field") + expect(protocol.current_cursor_field).to eq("2024-01-20") end end diff --git a/server/spec/requests/api/v1/syncs_controller_spec.rb b/server/spec/requests/api/v1/syncs_controller_spec.rb index 1401e672..d9206d3a 100644 --- a/server/spec/requests/api/v1/syncs_controller_spec.rb +++ b/server/spec/requests/api/v1/syncs_controller_spec.rb @@ -102,7 +102,8 @@ sync_interval: 10, sync_interval_unit: "minutes", stream_name: "profile", - sync_mode: "full_refresh" + sync_mode: "full_refresh", + cursor_field: "created_date" } } end @@ -127,6 +128,29 @@ expect(response_hash.dig(:data, :attributes, :model_id)).to eq(request_body.dig(:sync, :model_id)) expect(response_hash.dig(:data, :attributes, :schedule_type)).to eq(request_body.dig(:sync, :schedule_type)) expect(response_hash.dig(:data, :attributes, :stream_name)).to eq(request_body.dig(:sync, :stream_name)) + expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(request_body.dig(:sync, :cursor_field)) + expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil) + expect(response_hash.dig(:data, :attributes, :sync_interval_unit)) + .to eq(request_body.dig(:sync, :sync_interval_unit)) + expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(request_body.dig(:sync, :sync_interval)) + expect(response_hash.dig(:data, :attributes, :status)).to eq("pending") + end + + it "creates a new sync and returns success with cursor_field nil " do + request_body[:sync][:cursor_field] = nil + post "/api/v1/syncs", params: request_body.to_json, headers: { "Content-Type": "application/json" } + .merge(auth_headers(user)) + expect(response).to have_http_status(:created) + response_hash = JSON.parse(response.body).with_indifferent_access + expect(response_hash.dig(:data, :id)).to be_present + expect(response_hash.dig(:data, :type)).to eq("syncs") + expect(response_hash.dig(:data, :attributes, :source_id)).to eq(request_body.dig(:sync, :source_id)) + expect(response_hash.dig(:data, :attributes, :destination_id)).to eq(request_body.dig(:sync, :destination_id)) + expect(response_hash.dig(:data, :attributes, :model_id)).to eq(request_body.dig(:sync, :model_id)) + expect(response_hash.dig(:data, :attributes, :schedule_type)).to eq(request_body.dig(:sync, :schedule_type)) + expect(response_hash.dig(:data, :attributes, :stream_name)).to eq(request_body.dig(:sync, :stream_name)) + expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(nil) + expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil) expect(response_hash.dig(:data, :attributes, :sync_interval_unit)) .to eq(request_body.dig(:sync, :sync_interval_unit)) expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(request_body.dig(:sync, :sync_interval)) @@ -166,7 +190,8 @@ }, sync_interval: 10, sync_interval_unit: "minutes", - stream_name: "profile" + stream_name: "profile", + cursor_field: "cursor_field" } } end @@ -188,6 +213,8 @@ expect(response_hash.dig(:data, :id)).to be_present expect(response_hash.dig(:data, :id)).to eq(syncs.first.id.to_s) expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(30) + expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(nil) + expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil) end it "returns an error response when wrong sync_id" do diff --git a/ui/src/components/Pagination/Pagination.tsx b/ui/src/components/Pagination/Pagination.tsx index ddcac18d..233718df 100644 --- a/ui/src/components/Pagination/Pagination.tsx +++ b/ui/src/components/Pagination/Pagination.tsx @@ -31,6 +31,10 @@ const PageChangeButton = ({ minWidth='0' padding={0} _hover={{ backgroundColor: 'gray.300' }} + _disabled={{ + _hover: { cursor: 'not-allowed' }, + backgroundColor: 'gray.400', + }} isDisabled={!isEnabled} > {type === PAGE_CHANGE_BUTTON_TYPE.PREVIOUS ? ( diff --git a/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/ConfigureSyncs.tsx b/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/ConfigureSyncs.tsx index e68813e1..75fa7133 100644 --- a/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/ConfigureSyncs.tsx +++ b/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/ConfigureSyncs.tsx @@ -18,20 +18,23 @@ type ConfigureSyncsProps = { selectedStream: Stream | null; configuration: FieldMapType[] | null; schemaMode: SchemaMode | null; + selectedSyncMode: string; setSelectedStream: Dispatch>; setConfiguration: Dispatch>; setSchemaMode: Dispatch>; + setSelectedSyncMode: Dispatch>; }; const ConfigureSyncs = ({ selectedStream, configuration, + selectedSyncMode, setSelectedStream, setConfiguration, setSchemaMode, + setSelectedSyncMode, }: ConfigureSyncsProps): JSX.Element | null => { const { state, stepInfo, handleMoveForward } = useContext(SteppedFormContext); - const [selectedSyncMode, setSelectedSyncMode] = useState(''); const [cursorField, setCursorField] = useState(''); const { forms } = state; @@ -66,9 +69,9 @@ const ConfigureSyncs = ({ }; const { data: catalogData } = useQuery({ - queryKey: ['syncs', 'catalog', selectedDestination.id], - queryFn: () => getCatalog(selectedDestination.id), - enabled: !!selectedDestination.id, + queryKey: ['syncs', 'catalog', selectedDestination?.id], + queryFn: () => getCatalog(selectedDestination?.id), + enabled: !!selectedDestination?.id, refetchOnMount: false, refetchOnWindowFocus: false, }); diff --git a/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/MapFields.tsx b/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/MapFields.tsx index ca0ecbd8..fb88fcd5 100644 --- a/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/MapFields.tsx +++ b/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/MapFields.tsx @@ -174,16 +174,20 @@ const MapFields = ({ isDisabled={!stream || isRequired} selectedConfigOptions={configuration} /> - {!isRequired && ( - - handleRemoveMap(index)} - /> - - )} + + handleRemoveMap(index)} + /> + ))} diff --git a/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/TemplateMapping/TemplateMapping.tsx b/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/TemplateMapping/TemplateMapping.tsx index fd6ee152..9fc999d7 100644 --- a/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/TemplateMapping/TemplateMapping.tsx +++ b/ui/src/views/Activate/Syncs/SyncForm/ConfigureSyncs/TemplateMapping/TemplateMapping.tsx @@ -124,6 +124,7 @@ const TemplateMapping = ({ _placeholder={{ color: isDisabled ? 'black.500' : 'gray.600' }} value={selectedConfig} onClick={() => setIsPopOverOpen((prevState) => !prevState)} + autoComplete='off' /> diff --git a/ui/src/views/Activate/Syncs/SyncForm/SyncForm.tsx b/ui/src/views/Activate/Syncs/SyncForm/SyncForm.tsx index 470cc14f..0cb09822 100644 --- a/ui/src/views/Activate/Syncs/SyncForm/SyncForm.tsx +++ b/ui/src/views/Activate/Syncs/SyncForm/SyncForm.tsx @@ -15,6 +15,8 @@ const SyncForm = (): JSX.Element => { const [selectedStream, setSelectedStream] = useState(null); const [configuration, setConfiguration] = useState(null); const [schemaMode, setSchemaMode] = useState(null); + const [selectedSyncMode, setSelectedSyncMode] = useState(''); + const navigate = useNavigate(); const steps = [ { @@ -42,9 +44,11 @@ const SyncForm = (): JSX.Element => { selectedStream={selectedStream} configuration={configuration} schemaMode={schemaMode} + selectedSyncMode={selectedSyncMode} setSelectedStream={setSelectedStream} setConfiguration={setConfiguration} setSchemaMode={setSchemaMode} + setSelectedSyncMode={setSelectedSyncMode} /> ), isRequireContinueCta: false,