Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cursor based incremental refresh #40

Merged
merged 12 commits into from
Apr 16, 2024
1 change: 1 addition & 0 deletions server/app/contracts/sync_contracts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Create < Dry::Validation::Contract
required(:sync_interval_unit).filled(:string)
required(:sync_mode).filled(:string)
required(:stream_name).filled(:string)
optional(:cursor_field).maybe(:string)

# update filled with validating array of hashes
required(:configuration).filled
Expand Down
3 changes: 2 additions & 1 deletion server/app/controllers/api/v1/syncs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def sync_params
:sync_mode,
:sync_interval_unit,
:stream_name,
:cursor_field,
configuration: %i[from
to
mapping_type
Expand All @@ -104,7 +105,7 @@ def sync_params
if params.to_unsafe_h[:sync][:configuration].is_a?(Hash)
strong_params.merge!(configuration: params.to_unsafe_h[:sync][:configuration])
end

strong_params.delete(:cursor_field) if action_name == "update"
strong_params
end
end
Expand Down
3 changes: 3 additions & 0 deletions server/app/interactors/syncs/create_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ class CreateSync
include Interactor

def call
connector = context.workspace.connectors.find_by(id: context.sync_params[:source_id])
afthabvp marked this conversation as resolved.
Show resolved Hide resolved
default_cursor_field = connector&.default_cursor_field(context.sync_params[:stream_name])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we identifying default cursor field from dynamic catalog ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamic catalog, all scenarios involve user-defined cursors.
If a static stream is present, then and only then can we define source_defined and its value in default_cursor_field.

context.sync_params[:cursor_field] = default_cursor_field if default_cursor_field.present?
sync = context
.workspace.syncs
.create(context.sync_params)
Expand Down
5 changes: 5 additions & 0 deletions server/app/models/connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ def connector_query_type

connector_spec&.connector_query_type || "raw_sql"
end

def default_cursor_field(stream_name)
afthabvp marked this conversation as resolved.
Show resolved Hide resolved
stream = catalog.catalog["streams"].find { |s| s["name"] == stream_name }
stream["default_cursor_field"] if stream && catalog["source_defined_cursor"]
end
end
6 changes: 5 additions & 1 deletion server/app/models/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# source_catalog_id :integer
# schedule_type :string
# status :integer
# cursor_field :string
# current_cursor_field :string
# created_at :datetime not null
# updated_at :datetime not null
#
Expand Down Expand Up @@ -82,7 +84,9 @@ def to_protocol
catalog.find_stream_by_name(stream_name)
),
sync_mode: Multiwoven::Integrations::Protocol::SyncMode[sync_mode],
destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"]
destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"],
cursor_field:,
current_cursor_field:
)
end

Expand Down
2 changes: 1 addition & 1 deletion server/app/serializers/sync_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
class SyncSerializer < ActiveModel::Serializer
attributes :id, :source_id, :destination_id, :model_id, :configuration,
:schedule_type, :sync_mode, :sync_interval, :sync_interval_unit,
:stream_name, :status,
:stream_name, :status, :cursor_field, :current_cursor_field,
:updated_at, :created_at

attribute :source do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class AddCursorFieldsToSyncs < ActiveRecord::Migration[7.1]
def change
add_column :syncs, :cursor_field, :string
add_column :syncs, :current_cursor_field, :string
end
end
5 changes: 3 additions & 2 deletions server/db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion server/lib/reverse_etl/extractors/incremental_delta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ def read(sync_run_id, activity)
batch_query_params = batch_params(source_client, sync_run)
model = sync_run.sync.model

ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, current_offset|
ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records,
current_offset, last_cursor_field_value|
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both current_offset and last_cursor field value ?.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cursor_field is present, we don't need to consider the current offset. However, if we want to remove duplicates, we can still consider the offset.
airbytehq/airbyte#14732

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now current_offset in the cursor field not using.
If I pass both current_offset and last_cursor_field_value in the same field, it will be confusing to update in the database because current_offset use in sync run, last_cursor_field_value insync."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is duplication associated with current_offset or last_cursor_fields_value? Won't it be handled with fingerprint?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already dealing with duplicates in the sync records using fingerprints.
The concern about duplicates mentioned here is related to fetching data from the source. because we're using >= in the query,
it shouldn't be a problem. Nonetheless, there's still a possibility of fetching duplicate data directly from the source.


total_query_rows += records.count
process_records(records, sync_run, model)
heartbeat(activity)
sync_run.update(current_offset:, total_query_rows:)
sync_run.sync.update(current_cursor_field: last_cursor_field_value)
end
# change state querying to queued
sync_run.queue!
Expand Down
11 changes: 10 additions & 1 deletion server/lib/reverse_etl/utils/batch_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@ def self.execute_in_batches(params)
params[:sync_config].limit = params[:batch_size]
params[:sync_config].offset = current_offset

CursorQueryBuilder.update_query(params[:sync_config])

# Execute the batch query
result = params[:client].read(params[:sync_config])

# Extract the value of the cursor_field column from the last record
last_cursor_field_value = nil
if params[:sync_config].cursor_field && !result.empty?
last_record = result.last.message.record.data
last_cursor_field_value = last_record[params[:sync_config].cursor_field]
end

# Increment the offset by the batch size for the next iteration
current_offset += params[:batch_size]

break if result.empty?

yield result, current_offset if block_given?
yield result, current_offset, last_cursor_field_value if block_given?
# Break the loop if the number of records fetched is less than the batch size
# break if result.size < params[:batch_size]
end
Expand Down
20 changes: 20 additions & 0 deletions server/lib/reverse_etl/utils/cursor_query_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

module ReverseEtl
module Utils
class CursorQueryBuilder
def self.update_query(sync_config)
if sync_config.cursor_field && sync_config.current_cursor_field
# If both cursor_field and current_cursor_field are present
sync_config.model.query = "(#{existing_query}) AS subquery
WHERE #{sync_config.cursor_field} >= '#{sync_config.current_cursor_field}'
ORDER BY #{sync_config.cursor_field} ASC"
elsif sync_config.cursor_field
# If only cursor_field is present but current_cursor_field is not
sync_config.model.query = "(#{existing_query}) AS subquery
ORDER BY #{sync_config.cursor_field} ASC"
end
end
end
end
end
4 changes: 3 additions & 1 deletion server/spec/models/sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@
})
end

let(:sync) { create(:sync, destination:) }
let(:sync) { create(:sync, destination:, cursor_field: "cursor_field", current_cursor_field: "2024-01-20") }

it "returns sync config protocol" do
protocol = sync.to_protocol
expect(protocol).to be_a(Multiwoven::Integrations::Protocol::SyncConfig)
expect(protocol.cursor_field).to eq("cursor_field")
expect(protocol.current_cursor_field).to eq("2024-01-20")
end
end

Expand Down
10 changes: 8 additions & 2 deletions server/spec/requests/api/v1/syncs_controller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@
sync_interval: 10,
sync_interval_unit: "minutes",
stream_name: "profile",
sync_mode: "full_refresh"
sync_mode: "full_refresh",
cursor_field: "created_date"
}
}
end
Expand All @@ -127,6 +128,8 @@
expect(response_hash.dig(:data, :attributes, :model_id)).to eq(request_body.dig(:sync, :model_id))
expect(response_hash.dig(:data, :attributes, :schedule_type)).to eq(request_body.dig(:sync, :schedule_type))
expect(response_hash.dig(:data, :attributes, :stream_name)).to eq(request_body.dig(:sync, :stream_name))
expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(request_body.dig(:sync, :cursor_field))
expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil)
expect(response_hash.dig(:data, :attributes, :sync_interval_unit))
.to eq(request_body.dig(:sync, :sync_interval_unit))
expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(request_body.dig(:sync, :sync_interval))
Expand Down Expand Up @@ -166,7 +169,8 @@
},
sync_interval: 10,
sync_interval_unit: "minutes",
stream_name: "profile"
stream_name: "profile",
cursor_field: "cursor_field"
}
}
end
Expand All @@ -188,6 +192,8 @@
expect(response_hash.dig(:data, :id)).to be_present
expect(response_hash.dig(:data, :id)).to eq(syncs.first.id.to_s)
expect(response_hash.dig(:data, :attributes, :sync_interval)).to eq(30)
expect(response_hash.dig(:data, :attributes, :cursor_field)).to eq(nil)
expect(response_hash.dig(:data, :attributes, :current_cursor_field)).to eq(nil)
afthabvp marked this conversation as resolved.
Show resolved Hide resolved
end

it "returns an error response when wrong sync_id" do
Expand Down
Loading