diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 2599a442db..578d5b2e0c 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -2706,3 +2706,50 @@ def some_data(): "created_at" ] assert s["last_value"] == 2 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_deduplication_on_write_disposition_not_merge( + item_type: TestDataItemFormat, +) -> None: + if item_type in ["pandas", "arrow-table", "arrow-batch"]: + os.environ["NORMALIZE__PARQUET_NORMALIZER__ADD_DLT_LOAD_ID"] = "True" + + @dlt.resource(write_disposition="replace", merge_key="id", primary_key="id") + def some_data( + created_at=dlt.sources.incremental("created_at"), + ): + yield data_to_item_format( + item_type, + [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": 2}, + ], + ) + + p = dlt.pipeline(pipeline_name=uniq_id()) + load_info = p.run(some_data(), destination="duckdb") + assert len(load_info.loads_ids) == 1 + load_id_1 = load_info.loads_ids[0] + + load_info_2 = p.run(some_data(), destination="duckdb") + load_id_2 = load_info_2.loads_ids[0] + assert load_id_1 != load_id_2 + + with p.sql_client() as client: + assert_query_data(p, "select id from some_data order by id", [1, 2]) + + # no duplicates + assert_query_data(p, "select id from some_data order by id", [1, 2]) + + load_ids = client.execute_sql("select _dlt_load_id from some_data") + assert len(set(load_ids)) == 1, "Expected only a single load in the destination" + assert load_id_2 == load_ids[0][0] + + # row with id=2 got updated – even though it has the same merge_key as the first row with id=2 + assert load_id_1 != load_ids[0][0], "Expected only the second load in the destination" + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2