Skip to content

Commit

Permalink
fix: cursor query builder skip for empty condition
Browse files Browse the repository at this point in the history
  • Loading branch information
afthabvp committed May 3, 2024
1 parent 47ec952 commit a26b10a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/destination/klaviyo",
"documentation_url": "https://docs.multiwoven.com/destinations/file-storage/sftp",
"stream_type": "static",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down
2 changes: 1 addition & 1 deletion server/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ gem "interactor", "~> 3.0"

gem "ruby-odbc", git: "https://github.com/Multiwoven/ruby-odbc.git"

gem "multiwoven-integrations", "~> 0.1.63"
gem "multiwoven-integrations", "~> 0.1.64"

gem "temporal-ruby", github: "coinbase/temporal-ruby"

Expand Down
18 changes: 11 additions & 7 deletions server/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1657,9 +1657,9 @@ GEM
activesupport
concurrent-ruby (1.2.3)
connection_pool (2.4.1)
console (1.24.0)
console (1.25.1)
fiber-annotation
fiber-local
fiber-local (~> 1.1)
json
crass (1.0.6)
css_parser (1.17.1)
Expand Down Expand Up @@ -1750,7 +1750,9 @@ GEM
net-http
ffi (1.16.3)
fiber-annotation (0.2.0)
fiber-local (1.0.0)
fiber-local (1.1.0)
fiber-storage
fiber-storage (0.1.0)
fugit (1.10.1)
et-orbi (~> 1, >= 1.2.7)
raabro (~> 1.4)
Expand Down Expand Up @@ -1867,7 +1869,7 @@ GEM
msgpack (1.7.2)
multi_json (1.15.0)
multipart-post (2.4.0)
multiwoven-integrations (0.1.63)
multiwoven-integrations (0.1.64)
activesupport
async-websocket
csv
Expand All @@ -1884,6 +1886,7 @@ GEM
restforce
ruby-limiter
ruby-odbc
rubyzip
sequel
slack-ruby-client
stripe
Expand Down Expand Up @@ -2053,7 +2056,8 @@ GEM
ruby-limiter (2.3.0)
ruby-progressbar (1.13.0)
ruby2_keywords (0.0.5)
sequel (5.79.0)
rubyzip (2.3.2)
sequel (5.80.0)
bigdecimal
shoulda-matchers (5.3.0)
activesupport (>= 5.2.0)
Expand All @@ -2077,7 +2081,7 @@ GEM
gli
hashie
stringio (3.1.0)
stripe (11.2.0)
stripe (11.3.0)
thor (1.3.0)
timecop (0.9.8)
timeout (0.4.1)
Expand Down Expand Up @@ -2137,7 +2141,7 @@ DEPENDENCIES
jwt
kaminari
liquid
multiwoven-integrations (~> 0.1.63)
multiwoven-integrations (~> 0.1.64)
newrelic_rpm
parallel
pg (~> 1.1)
Expand Down
3 changes: 1 addition & 2 deletions server/lib/reverse_etl/utils/batch_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ def self.execute_in_batches(params)
# Set the current limit and offset in the sync configuration
params[:sync_config].limit = params[:batch_size]
params[:sync_config].offset = current_offset

if initial_sync_config.cursor_field
if initial_sync_config.cursor_field.present?
query_with_cursor = CursorQueryBuilder.build_cursor_query(initial_sync_config, last_cursor_field_value)
params[:sync_config] = build_cursor_sync_config(params[:sync_config], query_with_cursor)
end
Expand Down
36 changes: 36 additions & 0 deletions server/spec/lib/reverse_etl/utils/batch_query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,42 @@ module Utils # rubocop:disable Metrics/ModuleLength
expect(results.last.size).to eq(100)
end
end
context "when both cursor_field is empty" do
let(:existing_query) { "SELECT * FROM table" }
let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") }
let(:destination) { create(:connector, connector_type: "destination") }
let!(:catalog) { create(:catalog, connector: destination) }
let(:model) { create(:model, connector: source, query: existing_query) }
let(:sync) do
create(:sync, model:, source:, destination:, cursor_field: "")
end
let(:record) do
Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "[email protected]",
"first_name" => "John", "Last Name" => "Doe" },
emitted_at: DateTime.now.to_i).to_multiwoven_message
end
let(:record1) do
Multiwoven::Integrations::Protocol::RecordMessage.new(data: { "id" => 1, "email" => "[email protected]",
"first_name" => "John", "Last Name" => "Doe" },
emitted_at: DateTime.now.to_i).to_multiwoven_message
end

it "executes batches and not call CursorQueryBuilder for cursor_field is empty" do
params = {
offset: 0,
limit: 100,
batch_size: 100,
sync_config: sync.to_protocol,
client:
}
allow(client).to receive(:read).and_return(*Array.new(1, [record]), [])
expect(CursorQueryBuilder).not_to receive(:build_cursor_query).with(sync.to_protocol, "")
results = []
BatchQuery.execute_in_batches(params) do |result, _current_offset, _last_cursor_field_value|
results << result
end
end
end
context "when both cursor_field is present" do
let(:existing_query) { "SELECT * FROM table" }
let(:source) { create(:connector, connector_type: "source", connector_name: "Snowflake") }
Expand Down

0 comments on commit a26b10a

Please sign in to comment.