Skip to content

Commit

Permalink
backport PR #594 to 1.4.latest (#600)
Browse files Browse the repository at this point in the history
* [ADAP-353]: Fix for #581: Python incremental model (#594)

* Fixes for incremental strategy py model.WIP

* doc string

* Remove extra comment

* Uncomment change schema test

* Update dbt/include/bigquery/macros/materializations/incremental.sql

Add python language exception for time_ingestion_partitioning

Co-authored-by: colin-rogers-dbt <[email protected]>

* Remove tox command flag for test python

* Adding the env vars in integration

* Switch to cluster execution for the python models

* Remove comment and add changie

* Skipping tests since dataproc is unstable. Restoring default to serverless

---------

Co-authored-by: colin-rogers-dbt <[email protected]>
(cherry picked from commit 384f13c)

* Nit newline
  • Loading branch information
nssalian authored Mar 10, 2023
1 parent e2620c6 commit bc01f35
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230309-181313.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix for Python incremental model regression
time: 2023-03-09T18:13:13.512904-08:00
custom:
Author: nssalian
Issue: "581"
13 changes: 9 additions & 4 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@

{% endmacro %}
{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %}
{% if is_time_ingestion_partitioning %}
{% if is_time_ingestion_partitioning and language == 'python' %}
{% do exceptions.raise_compiler_error(
"Python models do not support ingestion time partitioning"
) %}
{% endif %}
{% if is_time_ingestion_partitioning and language == 'sql' %}
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
{% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, sql)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, sql)) }}
{% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, compiled_code)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, compiled_code)) }}
{% else %}
{{ return(create_table_as(temporary, relation, sql)) }}
{{ return(create_table_as(temporary, relation, compiled_code, language)) }}
{% endif %}
{% endmacro %}

Expand Down
24 changes: 17 additions & 7 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,32 @@
from dbt.tests.util import run_dbt, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

@pytest.skip("cluster unstable", allow_module_level=True)
class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests):
pass
TEST_SKIP_MESSAGE = "Skipping the Tests since Dataproc serverless is not stable. " \
"TODO: Fix later"


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestPythonModelDataproc(dbt_tests.BasePythonModelTests):
pass


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests):
pass


models__simple_python_model = """
import pandas
def model(dbt, spark):
dbt.config(
materialized='table',
)
data = [[1,2]] * 10
return spark.createDataFrame(data, schema=['test', 'test2'])
"""

models__simple_python_model_v2 = """
import pandas
def model(dbt, spark):
dbt.config(
materialized='table',
Expand All @@ -31,13 +37,17 @@ def model(dbt, spark):
return spark.createDataFrame(data, schema=['test1', 'test3'])
"""


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestChangingSchemaDataproc:

@pytest.fixture(scope="class")
def models(self):
return {
"simple_python_model.py": models__simple_python_model
}
def test_changing_schema(self,project, logs_dir):
}

def test_changing_schema(self, project, logs_dir):
run_dbt(["run"])
write_file(models__simple_python_model_v2, project.project_root + '/models', "simple_python_model.py")
run_dbt(["run"])
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ passenv =
DBT_*
BIGQUERY_TEST_*
PYTEST_ADDOPTS
DATAPROC_*
GCS_BUCKET
commands =
bigquery: {envpython} -m pytest {posargs} -m profile_bigquery tests/integration
bigquery: {envpython} -m pytest {posargs} -vv tests/functional --profile service_account
deps =
-rdev-requirements.txt
Expand Down

0 comments on commit bc01f35

Please sign in to comment.