diff --git a/.changes/unreleased/Fixes-20230309-181313.yaml b/.changes/unreleased/Fixes-20230309-181313.yaml new file mode 100644 index 000000000..8681f5eaf --- /dev/null +++ b/.changes/unreleased/Fixes-20230309-181313.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix for Python incremental model regression +time: 2023-03-09T18:13:13.512904-08:00 +custom: + Author: nssalian + Issue: "581" diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 88fc91eae..9f1479749 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -23,12 +23,17 @@ {% endmacro %} {% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, compiled_code, language='sql') %} - {% if is_time_ingestion_partitioning %} + {% if is_time_ingestion_partitioning and language == 'python' %} + {% do exceptions.raise_compiler_error( + "Python models do not support ingestion time partitioning" + ) %} + {% endif %} + {% if is_time_ingestion_partitioning and language == 'sql' %} {#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#} - {% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, sql)) %} - {{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, sql)) }} + {% do run_query(create_ingestion_time_partitioned_table_as_sql(temporary, relation, compiled_code)) %} + {{ return(bq_insert_into_ingestion_time_partitioned_table_sql(relation, compiled_code)) }} {% else %} - {{ return(create_table_as(temporary, relation, sql)) }} + {{ return(create_table_as(temporary, relation, compiled_code, language)) }} {% endif %} {% endmacro %} diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 03ad871e2..38c1fcb21 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -3,16 +3,22 @@ from dbt.tests.util import run_dbt, write_file import dbt.tests.adapter.python_model.test_python_model as dbt_tests -@pytest.skip("cluster unstable", allow_module_level=True) -class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests): - pass +TEST_SKIP_MESSAGE = "Skipping the Tests since Dataproc serverless is not stable. " \ + "TODO: Fix later" + +@pytest.mark.skip(reason=TEST_SKIP_MESSAGE) class TestPythonModelDataproc(dbt_tests.BasePythonModelTests): pass + +@pytest.mark.skip(reason=TEST_SKIP_MESSAGE) +class TestPythonIncrementalMatsDataproc(dbt_tests.BasePythonIncrementalTests): + pass + + models__simple_python_model = """ import pandas - def model(dbt, spark): dbt.config( materialized='table', @@ -20,9 +26,9 @@ def model(dbt, spark): data = [[1,2]] * 10 return spark.createDataFrame(data, schema=['test', 'test2']) """ + models__simple_python_model_v2 = """ import pandas - def model(dbt, spark): dbt.config( materialized='table', @@ -31,13 +37,17 @@ def model(dbt, spark): return spark.createDataFrame(data, schema=['test1', 'test3']) """ + +@pytest.mark.skip(reason=TEST_SKIP_MESSAGE) class TestChangingSchemaDataproc: + @pytest.fixture(scope="class") def models(self): return { "simple_python_model.py": models__simple_python_model - } - def test_changing_schema(self,project, logs_dir): + } + + def test_changing_schema(self, project, logs_dir): run_dbt(["run"]) write_file(models__simple_python_model_v2, project.project_root + '/models', "simple_python_model.py") run_dbt(["run"]) diff --git a/tox.ini b/tox.ini index 3785f1976..4d552ab44 100644 --- a/tox.ini +++ b/tox.ini @@ -20,8 +20,9 @@ passenv = DBT_* BIGQUERY_TEST_* PYTEST_ADDOPTS + DATAPROC_* + GCS_BUCKET commands = - bigquery: {envpython} -m pytest {posargs} -m profile_bigquery tests/integration bigquery: {envpython} -m pytest {posargs} -vv tests/functional --profile service_account deps = -rdev-requirements.txt