Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cursor based incremental refresh #40

Merged
merged 12 commits into from
Apr 16, 2024
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Ignore dotenv file
.env
.env
/.history
2 changes: 1 addition & 1 deletion server/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ gem "interactor", "~> 3.0"

gem "ruby-odbc", git: "https://github.com/Multiwoven/ruby-odbc.git"

gem "multiwoven-integrations", "~> 0.1.55"
gem "multiwoven-integrations", "~> 0.1.58"

gem "temporal-ruby", github: "coinbase/temporal-ruby"

Expand Down
10 changes: 5 additions & 5 deletions server/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ GEM
activerecord (>= 3.2, < 8.0)
rake (>= 10.4, < 14.0)
ast (2.4.2)
async (2.10.1)
async (2.10.2)
console (~> 1.10)
fiber-annotation
io-event (~> 1.5, >= 1.5.1)
Expand Down Expand Up @@ -1862,7 +1862,7 @@ GEM
msgpack (1.7.2)
multi_json (1.15.0)
multipart-post (2.4.0)
multiwoven-integrations (0.1.55)
multiwoven-integrations (0.1.58)
activesupport
async-websocket
csv
Expand Down Expand Up @@ -1925,7 +1925,7 @@ GEM
net-smtp
premailer (~> 1.7, >= 1.7.9)
protocol-hpack (1.4.3)
protocol-http (0.26.2)
protocol-http (0.26.4)
protocol-http1 (0.19.0)
protocol-http (~> 0.22)
protocol-http2 (0.16.0)
Expand Down Expand Up @@ -2071,7 +2071,7 @@ GEM
gli
hashie
stringio (3.1.0)
stripe (10.14.0)
stripe (11.0.0)
thor (1.3.0)
timecop (0.9.8)
timeout (0.4.1)
Expand Down Expand Up @@ -2130,7 +2130,7 @@ DEPENDENCIES
jwt
kaminari
liquid
multiwoven-integrations (~> 0.1.55)
multiwoven-integrations (~> 0.1.58)
newrelic_rpm
parallel
pg (~> 1.1)
Expand Down
1 change: 1 addition & 0 deletions server/app/contracts/sync_contracts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Create < Dry::Validation::Contract
required(:sync_interval_unit).filled(:string)
required(:sync_mode).filled(:string)
required(:stream_name).filled(:string)
optional(:cursor_field).maybe(:string)

# update filled with validating array of hashes
required(:configuration).filled
Expand Down
5 changes: 4 additions & 1 deletion server/app/controllers/api/v1/models_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ def set_model
end

def validate_query
query = params.dig(:model, :query)
return if query.blank?

query_type = @model.present? ? @model.connector.connector_query_type : @connector.connector_query_type
Utils::QueryValidator.validate_query(query_type, params.dig(:model, :query))
Utils::QueryValidator.validate_query(query_type, query)
rescue StandardError => e
render_error(
message: "Query validation failed: #{e.message}",
Expand Down
3 changes: 2 additions & 1 deletion server/app/controllers/api/v1/syncs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def sync_params
:sync_mode,
:sync_interval_unit,
:stream_name,
:cursor_field,
configuration: %i[from
to
mapping_type
Expand All @@ -104,7 +105,7 @@ def sync_params
if params.to_unsafe_h[:sync][:configuration].is_a?(Hash)
strong_params.merge!(configuration: params.to_unsafe_h[:sync][:configuration])
end

strong_params.delete(:cursor_field) if action_name == "update"
strong_params
end
end
Expand Down
4 changes: 4 additions & 0 deletions server/app/interactors/syncs/create_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ class CreateSync
include Interactor

def call
source = context.workspace.connectors.find_by(id: context.sync_params[:source_id])

default_cursor_field = source.catalog&.default_cursor_field(context.sync_params[:stream_name])
context.sync_params[:cursor_field] = default_cursor_field if default_cursor_field.present?
sync = context
.workspace.syncs
.create(context.sync_params)
Expand Down
5 changes: 5 additions & 0 deletions server/app/models/catalog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ def stream_to_protocol(stream)
request_rate_concurrency:
)
end

def default_cursor_field(stream_name)
current_stream = catalog["streams"].find { |stream| stream["name"] == stream_name }
current_stream["default_cursor_field"] if current_stream && catalog["source_defined_cursor"]
end
end
4 changes: 2 additions & 2 deletions server/app/models/connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def to_protocol
Multiwoven::Integrations::Protocol::Connector.new(
name: connector_name,
type: connector_type,
connection_specification: configuration
connection_specification: configuration,
query_type: connector_query_type
)
end

Expand All @@ -83,7 +84,6 @@ def connector_query_type
connector_type.to_s.camelize, connector_name.to_s.camelize
).new
connector_spec = client.connector_spec

connector_spec&.connector_query_type || "raw_sql"
end
end
6 changes: 5 additions & 1 deletion server/app/models/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# source_catalog_id :integer
# schedule_type :string
# status :integer
# cursor_field :string
# current_cursor_field :string
# created_at :datetime not null
# updated_at :datetime not null
#
Expand Down Expand Up @@ -82,7 +84,9 @@ def to_protocol
catalog.find_stream_by_name(stream_name)
),
sync_mode: Multiwoven::Integrations::Protocol::SyncMode[sync_mode],
destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"]
destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"],
cursor_field:,
current_cursor_field:
)
end

Expand Down
2 changes: 1 addition & 1 deletion server/app/serializers/sync_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
class SyncSerializer < ActiveModel::Serializer
attributes :id, :source_id, :destination_id, :model_id, :configuration,
:schedule_type, :sync_mode, :sync_interval, :sync_interval_unit,
:stream_name, :status,
:stream_name, :status, :cursor_field, :current_cursor_field,
:updated_at, :created_at

attribute :source do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class AddCursorFieldsToSyncs < ActiveRecord::Migration[7.1]
def change
add_column :syncs, :cursor_field, :string
add_column :syncs, :current_cursor_field, :string
end
end
5 changes: 3 additions & 2 deletions server/db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions server/lib/reverse_etl/extractors/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ module ReverseEtl
module Extractors
class Base
DEFAULT_OFFSET = 0
DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i
DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i
DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i
DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i
THREAD_COUNT = (ENV["SYNC_EXTRACTOR_THREAD_POOL_SIZE"] || "5").to_i

def read(_sync_run_id)
Expand Down
5 changes: 4 additions & 1 deletion server/lib/reverse_etl/extractors/incremental_delta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ def read(sync_run_id, activity)
batch_query_params = batch_params(source_client, sync_run)
model = sync_run.sync.model

ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, current_offset|
ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records,
current_offset, last_cursor_field_value|
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both current_offset and last_cursor field value ?.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cursor_field is present, we don't need to consider the current offset. However, if we want to remove duplicates, we can still consider the offset.
airbytehq/airbyte#14732

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now current_offset in the cursor field not using.
If I pass both current_offset and last_cursor_field_value in the same field, it will be confusing to update in the database because current_offset use in sync run, last_cursor_field_value insync."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is duplication associated with current_offset or last_cursor_fields_value? Won't it be handled with fingerprint?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already dealing with duplicates in the sync records using fingerprints.
The concern about duplicates mentioned here is related to fetching data from the source. because we're using >= in the query,
it shouldn't be a problem. Nonetheless, there's still a possibility of fetching duplicate data directly from the source.


total_query_rows += records.count
process_records(records, sync_run, model)
heartbeat(activity)
sync_run.update(current_offset:, total_query_rows:)
sync_run.sync.update(current_cursor_field: last_cursor_field_value)
end
# change state querying to queued
sync_run.queue!
Expand Down
46 changes: 44 additions & 2 deletions server/lib/reverse_etl/utils/batch_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,68 @@ class BatchQuery
def self.execute_in_batches(params)
raise ArgumentError, "Batch size must be greater than 0" if params[:batch_size] <= 0

initial_sync_config = params[:sync_config]
current_offset = params[:offset]

last_cursor_field_value = params[:sync_config].current_cursor_field
loop do
# Set the current limit and offset in the sync configuration
params[:sync_config].limit = params[:batch_size]
params[:sync_config].offset = current_offset

if initial_sync_config.cursor_field
query_with_cursor = CursorQueryBuilder.build_cursor_query(initial_sync_config, last_cursor_field_value)
params[:sync_config] = build_cursor_sync_config(params[:sync_config], query_with_cursor)
end

# Execute the batch query
result = params[:client].read(params[:sync_config])
# Extract the value of the cursor_field column from the last record
last_cursor_field_value = extract_last_cursor_field_value(result, params[:sync_config])

# Increment the offset by the batch size for the next iteration
current_offset += params[:batch_size]

break if result.empty?

yield result, current_offset if block_given?
yield result, current_offset, last_cursor_field_value if block_given?
# Break the loop if the number of records fetched is less than the batch size
# break if result.size < params[:batch_size]
end
end

def self.extract_last_cursor_field_value(result, sync_config)
return nil unless sync_config.cursor_field && !result.empty?

last_record = result.last.record.data
last_record[sync_config.cursor_field]
end

def self.build_cursor_sync_config(sync_config, new_query)
new_model = build_new_model(sync_config.model, new_query)

modified_sync_config = Multiwoven::Integrations::Protocol::SyncConfig.new(
model: new_model.to_protocol,
source: sync_config.source,
destination: sync_config.destination,
stream: sync_config.stream,
sync_mode: sync_config.sync_mode,
destination_sync_mode: sync_config.destination_sync_mode,
cursor_field: sync_config.cursor_field,
current_cursor_field: sync_config.current_cursor_field
)
modified_sync_config.offset = 0
modified_sync_config.limit = sync_config.limit
modified_sync_config
end

def self.build_new_model(existing_model, new_query)
Model.new(
name: existing_model.name,
query: new_query,
query_type: existing_model.query_type,
primary_key: existing_model.primary_key
)
end
end
end
end
28 changes: 28 additions & 0 deletions server/lib/reverse_etl/utils/cursor_query_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

module ReverseEtl
module Utils
class CursorQueryBuilder
def self.build_cursor_query(sync_config, current_cursor_field)
existing_query = sync_config.model.query
query_type = sync_config.source.query_type || "raw_sql"
if current_cursor_field
cursor_condition = case query_type.to_sym
when :soql
"#{sync_config.cursor_field} >= #{current_cursor_field}"
when :raw_sql
"#{sync_config.cursor_field} >= '#{current_cursor_field}'"
end
end
if cursor_condition
"#{existing_query} AS subquery " \
"WHERE #{cursor_condition} " \
"ORDER BY #{sync_config.cursor_field} ASC"
subintp marked this conversation as resolved.
Show resolved Hide resolved
elsif sync_config.cursor_field
"#{existing_query} AS subquery " \
"ORDER BY #{sync_config.cursor_field} ASC"
end
end
end
end
end
16 changes: 11 additions & 5 deletions server/spec/interactors/syncs/create_sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

RSpec.describe Syncs::CreateSync do
let(:workspace) { create(:workspace) }
let(:source) { create(:connector, workspace:) }
let(:source) { create(:connector, workspace:, connector_type: "source") }
let(:destination) { create(:connector, workspace:) }
let(:model) { create(:model, workspace:, connector: source) }
let(:sync) { build(:sync, workspace:, source:, destination:, model:) }
let(:sync) do
build(:sync, workspace:, source:, destination:, model:, cursor_field: "timestamp",
current_cursor_field: "2022-01-01")
end

before do
create(:catalog, connector: source)
Expand All @@ -18,24 +21,27 @@
it "creates a sync" do
result = described_class.call(
workspace:,
sync_params: sync.attributes.except("id", "created_at", "updated_at")
sync_params: sync.attributes.except("id", "created_at", "updated_at").with_indifferent_access
)
expect(result.success?).to eq(true)
expect(result.sync.persisted?).to eql(true)
expect(result.sync.source_id).to eql(source.id)
expect(result.sync.destination_id).to eql(destination.id)
expect(result.sync.model_id).to eql(model.id)
expect(result.sync.cursor_field).to eql(sync.cursor_field)
expect(result.sync.current_cursor_field).to eql(sync.current_cursor_field)
end
end

context "with invalid params" do
let(:sync_params) do
{ source_id: nil }
sync.attributes.except("id", "created_at", "destination_id")
end

it "fails to create sync" do
result = described_class.call(workspace:, sync_params:)
result = described_class.call(workspace:, sync_params: sync_params.with_indifferent_access)
expect(result.failure?).to eq(true)
afthabvp marked this conversation as resolved.
Show resolved Hide resolved
expect(result.sync.persisted?).to eql(false)
end
end
end
Loading
Loading