Skip to content

Commit

Permalink
Fix: Cast to timestamp prior to event time comparison (#1422)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Dec 5, 2024
1 parent 26c19e9 commit 4d255b2
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 4 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241204-105846.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Cast `event_time` to a timestamp prior to comparing against microbatch start/end
time
time: 2024-12-04T10:58:46.573608-05:00
custom:
Author: michelleark
Issue: "1422"
25 changes: 24 additions & 1 deletion dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

from dbt_common.exceptions import CompilationError
from dbt_common.utils.dict import filter_null_values
from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema
from dbt.adapters.base.relation import (
BaseRelation,
ComponentName,
InformationSchema,
EventTimeFilter,
)
from dbt.adapters.contracts.relation import RelationConfig, RelationType
from dbt.adapters.relation_configs import RelationConfigChangeAction

Expand Down Expand Up @@ -116,6 +121,24 @@ def materialized_view_config_changeset(
def information_schema(self, identifier: Optional[str] = None) -> "BigQueryInformationSchema":
return BigQueryInformationSchema.from_relation(self, identifier)

def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str:
"""
Returns "" if start and end are both None
"""
filter = ""
if event_time_filter.start and event_time_filter.end:
filter = f"cast({event_time_filter.field_name} as timestamp) >= '{event_time_filter.start}' and cast({event_time_filter.field_name} as timestamp) < '{event_time_filter.end}'"
elif event_time_filter.start:
filter = (
f"cast({event_time_filter.field_name} as timestamp) >= '{event_time_filter.start}'"
)
elif event_time_filter.end:
filter = (
f"cast({event_time_filter.field_name} as timestamp) < '{event_time_filter.end}'"
)

return filter


@dataclass(frozen=True, eq=False, repr=False)
class BigQueryInformationSchema(InformationSchema):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)
)
}}
select * from {{ ref('input_model') }}
select id, cast(event_time as timestamp) as event_time from {{ ref('input_model') }}
"""

microbatch_input_sql = """
Expand All @@ -582,6 +582,24 @@
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
"""

microbatch_input_event_time_date_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, DATE '2020-01-01' as event_time
union all
select 2 as id, DATE '2020-01-02' as event_time
union all
select 3 as id, DATE '2020-01-03' as event_time
"""

microbatch_input_event_time_datetime_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, DATETIME '2020-01-01' as event_time
union all
select 2 as id, DATETIME '2020-01-02' as event_time
union all
select 3 as id, DATETIME '2020-01-03' as event_time
"""

microbatch_model_no_partition_by_sql = """
{{ config(
materialized='incremental',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
microbatch_input_sql,
microbatch_model_no_partition_by_sql,
microbatch_model_invalid_partition_by_sql,
microbatch_input_event_time_date_sql,
microbatch_input_event_time_datetime_sql,
)


Expand All @@ -22,6 +24,32 @@ def microbatch_model_sql(self) -> str:
return microbatch_model_no_unique_id_sql


class TestBigQueryMicrobatchInputWithDate(TestBigQueryMicrobatch):
@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
return microbatch_input_event_time_date_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, DATE '2020-01-04'), (5, DATE '2020-01-05')"


class TestBigQueryMicrobatchInputWithDatetime(TestBigQueryMicrobatch):
@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
return microbatch_input_event_time_datetime_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, DATETIME '2020-01-04'), (5, DATETIME '2020-01-05')"


class TestBigQueryMicrobatchMissingPartitionBy:
@pytest.fixture(scope="class")
def models(self) -> str:
Expand All @@ -30,7 +58,6 @@ def models(self) -> str:
"input_model.sql": microbatch_input_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_execution_failure_no_partition_by(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
Expand All @@ -45,7 +72,6 @@ def models(self) -> str:
"input_model.sql": microbatch_input_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_execution_failure_no_partition_by(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
Expand Down

0 comments on commit 4d255b2

Please sign in to comment.