Skip to content

Commit

Permalink
feat: add cursor based incremental refresh (#40)
Browse files Browse the repository at this point in the history
* feat: add cursor based incremental refresh

* feat: spec for CursorQueryBuilder

* fix: resolve pr comments

* fix: spec failure fixes

* fix: spec for default_cursor_field

* fix: soql cursor based query

* fix: test failures

* fix: pr comments resolved

* chore: update integrations version to 0.1.58

* fix: nitpick fix
  • Loading branch information
afthabvp authored Apr 16, 2024
1 parent b916668 commit 0fa80c5
Show file tree
Hide file tree
Showing 25 changed files with 468 additions and 57 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Ignore dotenv file
.env
.env
/.history
2 changes: 1 addition & 1 deletion server/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ gem "interactor", "~> 3.0"

gem "ruby-odbc", git: "https://github.com/Multiwoven/ruby-odbc.git"

gem "multiwoven-integrations", "~> 0.1.55"
gem "multiwoven-integrations", "~> 0.1.58"

gem "temporal-ruby", github: "coinbase/temporal-ruby"

Expand Down
10 changes: 5 additions & 5 deletions server/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ GEM
activerecord (>= 3.2, < 8.0)
rake (>= 10.4, < 14.0)
ast (2.4.2)
async (2.10.1)
async (2.10.2)
console (~> 1.10)
fiber-annotation
io-event (~> 1.5, >= 1.5.1)
Expand Down Expand Up @@ -1862,7 +1862,7 @@ GEM
msgpack (1.7.2)
multi_json (1.15.0)
multipart-post (2.4.0)
multiwoven-integrations (0.1.55)
multiwoven-integrations (0.1.58)
activesupport
async-websocket
csv
Expand Down Expand Up @@ -1925,7 +1925,7 @@ GEM
net-smtp
premailer (~> 1.7, >= 1.7.9)
protocol-hpack (1.4.3)
protocol-http (0.26.2)
protocol-http (0.26.4)
protocol-http1 (0.19.0)
protocol-http (~> 0.22)
protocol-http2 (0.16.0)
Expand Down Expand Up @@ -2071,7 +2071,7 @@ GEM
gli
hashie
stringio (3.1.0)
stripe (10.14.0)
stripe (11.0.0)
thor (1.3.0)
timecop (0.9.8)
timeout (0.4.1)
Expand Down Expand Up @@ -2130,7 +2130,7 @@ DEPENDENCIES
jwt
kaminari
liquid
multiwoven-integrations (~> 0.1.55)
multiwoven-integrations (~> 0.1.58)
newrelic_rpm
parallel
pg (~> 1.1)
Expand Down
1 change: 1 addition & 0 deletions server/app/contracts/sync_contracts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Create < Dry::Validation::Contract
required(:sync_interval_unit).filled(:string)
required(:sync_mode).filled(:string)
required(:stream_name).filled(:string)
optional(:cursor_field).maybe(:string)

# update filled with validating array of hashes
required(:configuration).filled
Expand Down
5 changes: 4 additions & 1 deletion server/app/controllers/api/v1/models_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ def set_model
end

def validate_query
query = params.dig(:model, :query)
return if query.blank?

query_type = @model.present? ? @model.connector.connector_query_type : @connector.connector_query_type
Utils::QueryValidator.validate_query(query_type, params.dig(:model, :query))
Utils::QueryValidator.validate_query(query_type, query)
rescue StandardError => e
render_error(
message: "Query validation failed: #{e.message}",
Expand Down
3 changes: 2 additions & 1 deletion server/app/controllers/api/v1/syncs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def sync_params
:sync_mode,
:sync_interval_unit,
:stream_name,
:cursor_field,
configuration: %i[from
to
mapping_type
Expand All @@ -104,7 +105,7 @@ def sync_params
if params.to_unsafe_h[:sync][:configuration].is_a?(Hash)
strong_params.merge!(configuration: params.to_unsafe_h[:sync][:configuration])
end

strong_params.delete(:cursor_field) if action_name == "update"
strong_params
end
end
Expand Down
4 changes: 4 additions & 0 deletions server/app/interactors/syncs/create_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ class CreateSync
include Interactor

def call
source = context.workspace.connectors.find_by(id: context.sync_params[:source_id])

default_cursor_field = source.catalog&.default_cursor_field(context.sync_params[:stream_name])
context.sync_params[:cursor_field] = default_cursor_field if default_cursor_field.present?
sync = context
.workspace.syncs
.create(context.sync_params)
Expand Down
5 changes: 5 additions & 0 deletions server/app/models/catalog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ def stream_to_protocol(stream)
request_rate_concurrency:
)
end

def default_cursor_field(stream_name)
current_stream = catalog["streams"].find { |stream| stream["name"] == stream_name }
current_stream["default_cursor_field"] if current_stream && catalog["source_defined_cursor"]
end
end
4 changes: 2 additions & 2 deletions server/app/models/connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def to_protocol
Multiwoven::Integrations::Protocol::Connector.new(
name: connector_name,
type: connector_type,
connection_specification: configuration
connection_specification: configuration,
query_type: connector_query_type
)
end

Expand All @@ -83,7 +84,6 @@ def connector_query_type
connector_type.to_s.camelize, connector_name.to_s.camelize
).new
connector_spec = client.connector_spec

connector_spec&.connector_query_type || "raw_sql"
end
end
6 changes: 5 additions & 1 deletion server/app/models/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# source_catalog_id :integer
# schedule_type :string
# status :integer
# cursor_field :string
# current_cursor_field :string
# created_at :datetime not null
# updated_at :datetime not null
#
Expand Down Expand Up @@ -82,7 +84,9 @@ def to_protocol
catalog.find_stream_by_name(stream_name)
),
sync_mode: Multiwoven::Integrations::Protocol::SyncMode[sync_mode],
destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"]
destination_sync_mode: Multiwoven::Integrations::Protocol::DestinationSyncMode["insert"],
cursor_field:,
current_cursor_field:
)
end

Expand Down
2 changes: 1 addition & 1 deletion server/app/serializers/sync_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
class SyncSerializer < ActiveModel::Serializer
attributes :id, :source_id, :destination_id, :model_id, :configuration,
:schedule_type, :sync_mode, :sync_interval, :sync_interval_unit,
:stream_name, :status,
:stream_name, :status, :cursor_field, :current_cursor_field,
:updated_at, :created_at

attribute :source do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class AddCursorFieldsToSyncs < ActiveRecord::Migration[7.1]
def change
add_column :syncs, :cursor_field, :string
add_column :syncs, :current_cursor_field, :string
end
end
5 changes: 3 additions & 2 deletions server/db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions server/lib/reverse_etl/extractors/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ module ReverseEtl
module Extractors
class Base
DEFAULT_OFFSET = 0
DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i
DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "10000").to_i
DEFAULT_BATCH_SIZE = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i
DEFAULT_LIMT = (ENV["SYNC_EXTRACTOR_BATCH_SIZE"] || "2000").to_i
THREAD_COUNT = (ENV["SYNC_EXTRACTOR_THREAD_POOL_SIZE"] || "5").to_i

def read(_sync_run_id)
Expand Down
5 changes: 4 additions & 1 deletion server/lib/reverse_etl/extractors/incremental_delta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ def read(sync_run_id, activity)
batch_query_params = batch_params(source_client, sync_run)
model = sync_run.sync.model

ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, current_offset|
ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records,
current_offset, last_cursor_field_value|

total_query_rows += records.count
process_records(records, sync_run, model)
heartbeat(activity)
sync_run.update(current_offset:, total_query_rows:)
sync_run.sync.update(current_cursor_field: last_cursor_field_value)
end
# change state querying to queued
sync_run.queue!
Expand Down
46 changes: 44 additions & 2 deletions server/lib/reverse_etl/utils/batch_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,68 @@ class BatchQuery
def self.execute_in_batches(params)
raise ArgumentError, "Batch size must be greater than 0" if params[:batch_size] <= 0

initial_sync_config = params[:sync_config]
current_offset = params[:offset]

last_cursor_field_value = params[:sync_config].current_cursor_field
loop do
# Set the current limit and offset in the sync configuration
params[:sync_config].limit = params[:batch_size]
params[:sync_config].offset = current_offset

if initial_sync_config.cursor_field
query_with_cursor = CursorQueryBuilder.build_cursor_query(initial_sync_config, last_cursor_field_value)
params[:sync_config] = build_cursor_sync_config(params[:sync_config], query_with_cursor)
end

# Execute the batch query
result = params[:client].read(params[:sync_config])
# Extract the value of the cursor_field column from the last record
last_cursor_field_value = extract_last_cursor_field_value(result, params[:sync_config])

# Increment the offset by the batch size for the next iteration
current_offset += params[:batch_size]

break if result.empty?

yield result, current_offset if block_given?
yield result, current_offset, last_cursor_field_value if block_given?
# Break the loop if the number of records fetched is less than the batch size
# break if result.size < params[:batch_size]
end
end

def self.extract_last_cursor_field_value(result, sync_config)
return nil unless sync_config.cursor_field && !result.empty?

last_record = result.last.record.data
last_record[sync_config.cursor_field]
end

def self.build_cursor_sync_config(sync_config, new_query)
new_model = build_new_model(sync_config.model, new_query)

modified_sync_config = Multiwoven::Integrations::Protocol::SyncConfig.new(
model: new_model.to_protocol,
source: sync_config.source,
destination: sync_config.destination,
stream: sync_config.stream,
sync_mode: sync_config.sync_mode,
destination_sync_mode: sync_config.destination_sync_mode,
cursor_field: sync_config.cursor_field,
current_cursor_field: sync_config.current_cursor_field
)
modified_sync_config.offset = 0
modified_sync_config.limit = sync_config.limit
modified_sync_config
end

def self.build_new_model(existing_model, new_query)
Model.new(
name: existing_model.name,
query: new_query,
query_type: existing_model.query_type,
primary_key: existing_model.primary_key
)
end
end
end
end
28 changes: 28 additions & 0 deletions server/lib/reverse_etl/utils/cursor_query_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

module ReverseEtl
module Utils
class CursorQueryBuilder
def self.build_cursor_query(sync_config, current_cursor_field)
existing_query = sync_config.model.query
query_type = sync_config.source.query_type || "raw_sql"
if current_cursor_field
cursor_condition = case query_type.to_sym
when :soql
"#{sync_config.cursor_field} >= #{current_cursor_field}"
when :raw_sql
"#{sync_config.cursor_field} >= '#{current_cursor_field}'"
end
end
if cursor_condition
"#{existing_query} AS subquery " \
"WHERE #{cursor_condition} " \
"ORDER BY #{sync_config.cursor_field} ASC"
elsif sync_config.cursor_field
"#{existing_query} AS subquery " \
"ORDER BY #{sync_config.cursor_field} ASC"
end
end
end
end
end
16 changes: 11 additions & 5 deletions server/spec/interactors/syncs/create_sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

RSpec.describe Syncs::CreateSync do
let(:workspace) { create(:workspace) }
let(:source) { create(:connector, workspace:) }
let(:source) { create(:connector, workspace:, connector_type: "source") }
let(:destination) { create(:connector, workspace:) }
let(:model) { create(:model, workspace:, connector: source) }
let(:sync) { build(:sync, workspace:, source:, destination:, model:) }
let(:sync) do
build(:sync, workspace:, source:, destination:, model:, cursor_field: "timestamp",
current_cursor_field: "2022-01-01")
end

before do
create(:catalog, connector: source)
Expand All @@ -18,24 +21,27 @@
it "creates a sync" do
result = described_class.call(
workspace:,
sync_params: sync.attributes.except("id", "created_at", "updated_at")
sync_params: sync.attributes.except("id", "created_at", "updated_at").with_indifferent_access
)
expect(result.success?).to eq(true)
expect(result.sync.persisted?).to eql(true)
expect(result.sync.source_id).to eql(source.id)
expect(result.sync.destination_id).to eql(destination.id)
expect(result.sync.model_id).to eql(model.id)
expect(result.sync.cursor_field).to eql(sync.cursor_field)
expect(result.sync.current_cursor_field).to eql(sync.current_cursor_field)
end
end

context "with invalid params" do
let(:sync_params) do
{ source_id: nil }
sync.attributes.except("id", "created_at", "destination_id")
end

it "fails to create sync" do
result = described_class.call(workspace:, sync_params:)
result = described_class.call(workspace:, sync_params: sync_params.with_indifferent_access)
expect(result.failure?).to eq(true)
expect(result.sync.persisted?).to eql(false)
end
end
end
Loading

0 comments on commit 0fa80c5

Please sign in to comment.