From 4d255b2f854d21d5d8871bdaa8d7ab47e7e863a3 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 5 Dec 2024 16:26:52 -0500 Subject: [PATCH] Fix: Cast to timestamp prior to event time comparison (#1422) --- .../unreleased/Fixes-20241204-105846.yaml | 7 +++++ dbt/adapters/bigquery/relation.py | 25 +++++++++++++++- .../incremental_strategy_fixtures.py | 20 ++++++++++++- .../test_incremental_microbatch.py | 30 +++++++++++++++++-- 4 files changed, 78 insertions(+), 4 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241204-105846.yaml diff --git a/.changes/unreleased/Fixes-20241204-105846.yaml b/.changes/unreleased/Fixes-20241204-105846.yaml new file mode 100644 index 000000000..2693e4513 --- /dev/null +++ b/.changes/unreleased/Fixes-20241204-105846.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Cast `event_time` to a timestamp prior to comparing against microbatch start/end + time +time: 2024-12-04T10:58:46.573608-05:00 +custom: + Author: michelleark + Issue: "1422" diff --git a/dbt/adapters/bigquery/relation.py b/dbt/adapters/bigquery/relation.py index 4edc8d7ac..037761918 100644 --- a/dbt/adapters/bigquery/relation.py +++ b/dbt/adapters/bigquery/relation.py @@ -4,7 +4,12 @@ from dbt_common.exceptions import CompilationError from dbt_common.utils.dict import filter_null_values -from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema +from dbt.adapters.base.relation import ( + BaseRelation, + ComponentName, + InformationSchema, + EventTimeFilter, +) from dbt.adapters.contracts.relation import RelationConfig, RelationType from dbt.adapters.relation_configs import RelationConfigChangeAction @@ -116,6 +121,24 @@ def materialized_view_config_changeset( def information_schema(self, identifier: Optional[str] = None) -> "BigQueryInformationSchema": return BigQueryInformationSchema.from_relation(self, identifier) + def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str: + """ + Returns "" if start and end are both None + """ + filter = "" + if event_time_filter.start and event_time_filter.end: + filter = f"cast({event_time_filter.field_name} as timestamp) >= '{event_time_filter.start}' and cast({event_time_filter.field_name} as timestamp) < '{event_time_filter.end}'" + elif event_time_filter.start: + filter = ( + f"cast({event_time_filter.field_name} as timestamp) >= '{event_time_filter.start}'" + ) + elif event_time_filter.end: + filter = ( + f"cast({event_time_filter.field_name} as timestamp) < '{event_time_filter.end}'" + ) + + return filter + @dataclass(frozen=True, eq=False, repr=False) class BigQueryInformationSchema(InformationSchema): diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 02efbb6c2..365aba8c8 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -570,7 +570,7 @@ begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) ) }} -select * from {{ ref('input_model') }} +select id, cast(event_time as timestamp) as event_time from {{ ref('input_model') }} """ microbatch_input_sql = """ @@ -582,6 +582,24 @@ select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time """ +microbatch_input_event_time_date_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, DATE '2020-01-01' as event_time +union all +select 2 as id, DATE '2020-01-02' as event_time +union all +select 3 as id, DATE '2020-01-03' as event_time +""" + +microbatch_input_event_time_datetime_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, DATETIME '2020-01-01' as event_time +union all +select 2 as id, DATETIME '2020-01-02' as event_time +union all +select 3 as id, DATETIME '2020-01-03' as event_time +""" + microbatch_model_no_partition_by_sql = """ {{ config( materialized='incremental', diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py index d1bbbcea3..d0f8b62b7 100644 --- a/tests/functional/adapter/incremental/test_incremental_microbatch.py +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -13,6 +13,8 @@ microbatch_input_sql, microbatch_model_no_partition_by_sql, microbatch_model_invalid_partition_by_sql, + microbatch_input_event_time_date_sql, + microbatch_input_event_time_datetime_sql, ) @@ -22,6 +24,32 @@ def microbatch_model_sql(self) -> str: return microbatch_model_no_unique_id_sql +class TestBigQueryMicrobatchInputWithDate(TestBigQueryMicrobatch): + @pytest.fixture(scope="class") + def input_model_sql(self) -> str: + return microbatch_input_event_time_date_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, DATE '2020-01-04'), (5, DATE '2020-01-05')" + + +class TestBigQueryMicrobatchInputWithDatetime(TestBigQueryMicrobatch): + @pytest.fixture(scope="class") + def input_model_sql(self) -> str: + return microbatch_input_event_time_datetime_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, DATETIME '2020-01-04'), (5, DATETIME '2020-01-05')" + + class TestBigQueryMicrobatchMissingPartitionBy: @pytest.fixture(scope="class") def models(self) -> str: @@ -30,7 +58,6 @@ def models(self) -> str: "input_model.sql": microbatch_input_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_execution_failure_no_partition_by(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): _, stdout = run_dbt_and_capture(["run"], expect_pass=False) @@ -45,7 +72,6 @@ def models(self) -> str: "input_model.sql": microbatch_input_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_execution_failure_no_partition_by(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): _, stdout = run_dbt_and_capture(["run"], expect_pass=False)