Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cursor based incremental refresh #40

Merged
merged 12 commits into from
Apr 16, 2024

Conversation

afthabvp
Copy link
Collaborator

No description provided.

server/app/interactors/syncs/create_sync.rb Outdated Show resolved Hide resolved
@@ -5,6 +5,9 @@ class CreateSync
include Interactor

def call
connector = context.workspace.connectors.find_by(id: context.sync_params[:source_id])
default_cursor_field = connector&.default_cursor_field(context.sync_params[:stream_name])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we identifying default cursor field from dynamic catalog ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamic catalog, all scenarios involve user-defined cursors.
If a static stream is present, then and only then can we define source_defined and its value in default_cursor_field.

server/app/models/connector.rb Outdated Show resolved Hide resolved
@@ -17,11 +17,14 @@ def read(sync_run_id, activity)
batch_query_params = batch_params(source_client, sync_run)
model = sync_run.sync.model

ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records, current_offset|
ReverseEtl::Utils::BatchQuery.execute_in_batches(batch_query_params) do |records,
current_offset, last_cursor_field_value|
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both current_offset and last_cursor field value ?.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cursor_field is present, we don't need to consider the current offset. However, if we want to remove duplicates, we can still consider the offset.
airbytehq/airbyte#14732

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now current_offset in the cursor field not using.
If I pass both current_offset and last_cursor_field_value in the same field, it will be confusing to update in the database because current_offset use in sync run, last_cursor_field_value insync."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is duplication associated with current_offset or last_cursor_fields_value? Won't it be handled with fingerprint?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already dealing with duplicates in the sync records using fingerprints.
The concern about duplicates mentioned here is related to fetching data from the source. because we're using >= in the query,
it shouldn't be a problem. Nonetheless, there's still a possibility of fetching duplicate data directly from the source.

@afthabvp afthabvp marked this pull request as ready for review April 15, 2024 11:07
@afthabvp afthabvp changed the title feat: add cursor based incremental refresh feat: add cursor based incremental refresh [Do Not Merge] Apr 15, 2024
"WHERE #{sync_config.cursor_field} >= '#{sync_config.current_cursor_field}' " \
"ORDER BY #{sync_config.cursor_field} ASC"
elsif sync_config.cursor_field
new_query = "(#{existing_query}) AS subquery " \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need as subquery? Can't we just use existing_query?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, additional conditions directly to the existing query without using a subquery could lead to ambiguity or unexpected behavior,
if the original query already contains conditions or clauses. Wrapping it in a subquery helps to isolate and clarify the scope of each condition.

@afthabvp afthabvp changed the title feat: add cursor based incremental refresh [Do Not Merge] feat: add cursor based incremental refresh Apr 16, 2024
Copy link
Contributor

@karthik-sivadas karthik-sivadas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a small nitpick else it LGTM

@subintp subintp merged commit 0fa80c5 into main Apr 16, 2024
1 check passed
macintushar pushed a commit that referenced this pull request Apr 20, 2024
* feat: add cursor based incremental refresh

* feat: spec for CursorQueryBuilder

* fix: resolve pr comments

* fix: spec failure fixes

* fix: spec for default_cursor_field

* fix: soql cursor based query

* fix: test failures

* fix: pr comments resolved

* chore: update integrations version to 0.1.58

* fix: nitpick fix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants