Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/1331 disables deduplication for incremental #1892

Open
wants to merge 9 commits into
base: devel
Choose a base branch
from
2 changes: 1 addition & 1 deletion dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def apply_hints(
parent_table_name (str, optional): A name of parent table if foreign relation is defined. Please note that if you use merge, you must define `root_key` columns explicitly
incremental (Incremental, optional): Enables the incremental loading for a resource.

Please note that for efficient incremental loading, the resource must be aware of the Incremental by accepting it as one if its arguments and then using are to skip already loaded data.
Please note that for efficient incremental loading, the resource must be aware of the Incremental by accepting it as one if its arguments and then using it to skip already loaded data.
In non-aware resources, `dlt` will filter out the loaded values, however, the resource will yield all the values again.

Returns: self for chaining
Expand Down
9 changes: 8 additions & 1 deletion dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,18 @@ def _set_hints(
if incremental:
primary_key = table_schema_template.get("primary_key", incremental.primary_key)
if primary_key is not None:
incremental.primary_key = primary_key
if table_schema_template.get("write_disposition") == "merge":
self._disable_deduplication(incremental)
else:
incremental.primary_key = primary_key

if table_schema_template.get("validator") is not None:
self.validator = table_schema_template["validator"]

def _disable_deduplication(self, incremental: IncrementalResourceWrapper) -> None:
"""See IncrementalTransform.deduplication_disabled()"""
incremental.primary_key = ()

def bind(self: TDltResourceImpl, *args: Any, **kwargs: Any) -> TDltResourceImpl:
"""Binds the parametrized resource to passed arguments. Modifies resource pipe in place. Does not evaluate generators or iterators."""
if self._args_bound:
Expand Down
146 changes: 146 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3539,3 +3539,149 @@ def test_apply_lag() -> None:
assert apply_lag(2, 0, 1, max) == 0
assert apply_lag(1, 2, 1, min) == 2
assert apply_lag(2, 2, 1, min) == 2


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_no_deduplication_on_write_disposition_merge(
item_type: TestDataItemFormat,
) -> None:
if item_type in ["pandas", "arrow-table", "arrow-batch"]:
os.environ["NORMALIZE__PARQUET_NORMALIZER__ADD_DLT_LOAD_ID"] = "True"

@dlt.resource(write_disposition="merge", merge_key="id", primary_key="id")
def some_data(
created_at=dlt.sources.incremental("created_at"),
):
yield data_to_item_format(
item_type,
[
{"id": 1, "created_at": 1},
{"id": 2, "created_at": 2},
],
)

p = dlt.pipeline(pipeline_name=uniq_id())
load_info = p.run(some_data(), destination="duckdb")
assert len(load_info.loads_ids) == 1
load_id_1 = load_info.loads_ids[0]

load_info_2 = p.run(some_data(), destination="duckdb")
load_id_2 = load_info_2.loads_ids[0]
assert load_id_1 != load_id_2

with p.sql_client() as client:
assert_query_data(p, "select id from some_data order by id", [1, 2])

# no duplicates
assert_query_data(p, "select id from some_data order by id", [1, 2])

load_ids = client.execute_sql("select _dlt_load_id from some_data where id = 2")
assert len(load_ids) == 1, "Expected row with id = 2 to be loaded only once"
load_id_of_duplicate_row = load_ids[0][0]

# row with id=2 got updated – even though it has the same merge_key as the first row with id=2
assert load_id_1 != load_id_of_duplicate_row, "Expected row with id = 2 to get updated"

# row with id=1 is not updated
assert_query_data(p, "select _dlt_load_id from some_data where id = 1", [load_id_1])

s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]
assert s["last_value"] == 2


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_no_deduplication_on_write_disposition_merge_late_binding(
item_type: TestDataItemFormat,
) -> None:
if item_type in ["pandas", "arrow-table", "arrow-batch"]:
os.environ["NORMALIZE__PARQUET_NORMALIZER__ADD_DLT_LOAD_ID"] = "True"

@dlt.resource(write_disposition="merge", merge_key="id", primary_key="id")
def some_data():
yield data_to_item_format(
item_type,
[
{"id": 1, "created_at": 1},
{"id": 2, "created_at": 2},
],
)

some_data.apply_hints(incremental=dlt.sources.incremental("created_at"))
p = dlt.pipeline(pipeline_name=uniq_id())
load_info = p.run(some_data(), destination="duckdb")
assert len(load_info.loads_ids) == 1
load_id_1 = load_info.loads_ids[0]

load_info_2 = p.run(some_data(), destination="duckdb")
load_id_2 = load_info_2.loads_ids[0]
assert load_id_1 != load_id_2

with p.sql_client() as client:
assert_query_data(p, "select id from some_data order by id", [1, 2])

# no duplicates
assert_query_data(p, "select id from some_data order by id", [1, 2])

load_ids = client.execute_sql("select _dlt_load_id from some_data where id = 2")
assert len(load_ids) == 1, "Expected row with id = 2 to be loaded only once"
load_id_of_duplicate_row = load_ids[0][0]

# row with id=2 got updated – even though it has the same merge_key as the first row with id=2
assert load_id_1 != load_id_of_duplicate_row, "Expected row with id = 2 to get updated"

# row with id=1 is not updated
assert_query_data(p, "select _dlt_load_id from some_data where id = 1", [load_id_1])

s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]
assert s["last_value"] == 2


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_deduplication_on_write_disposition_not_merge(
item_type: TestDataItemFormat,
) -> None:
if item_type in ["pandas", "arrow-table", "arrow-batch"]:
os.environ["NORMALIZE__PARQUET_NORMALIZER__ADD_DLT_LOAD_ID"] = "True"

@dlt.resource(write_disposition="replace", merge_key="id", primary_key="id")
def some_data(
created_at=dlt.sources.incremental("created_at"),
):
yield data_to_item_format(
item_type,
[
{"id": 1, "created_at": 1},
{"id": 2, "created_at": 2},
],
)

p = dlt.pipeline(pipeline_name=uniq_id())
load_info = p.run(some_data(), destination="duckdb")
assert len(load_info.loads_ids) == 1
load_id_1 = load_info.loads_ids[0]

load_info_2 = p.run(some_data(), destination="duckdb")
load_id_2 = load_info_2.loads_ids[0]
assert load_id_1 != load_id_2

with p.sql_client() as client:
assert_query_data(p, "select id from some_data order by id", [1, 2])

# no duplicates
assert_query_data(p, "select id from some_data order by id", [1, 2])

load_ids = client.execute_sql("select _dlt_load_id from some_data")
assert len(set(load_ids)) == 1, "Expected only a single load in the destination"
assert load_id_2 == load_ids[0][0]

# row with id=2 got updated – even though it has the same merge_key as the first row with id=2
assert load_id_1 != load_ids[0][0], "Expected only the second load in the destination"

s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][
"created_at"
]
assert s["last_value"] == 2
Loading