Skip to content

Commit

Permalink
backport #485 (#591)
Browse files Browse the repository at this point in the history
Co-authored-by: Christophe Oudar <[email protected]>
  • Loading branch information
colin-rogers-dbt and Kayrnt authored Mar 7, 2023
1 parent 78a613f commit eb908d7
Show file tree
Hide file tree
Showing 10 changed files with 1,078 additions and 9 deletions.
8 changes: 8 additions & 0 deletions .changes/unreleased/Fixes-20230202-010332.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Fixes
body: Fix time ingestion partitioning option regression when combined using `require_partition_filter`
option on incremental run
time: 2023-02-02T01:03:32.577336+01:00
custom:
Author: Kayrnt
Issue: "483"
PR: "485"
27 changes: 20 additions & 7 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,37 @@ class PartitionConfig(dbtClassMixin):
time_ingestion_partitioning: bool = False
copy_partitions: bool = False

def data_type_for_partition(self):
"""Return the data type of partitions for replacement."""
return self.data_type if not self.time_ingestion_partitioning else "timestamp"

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]

def data_type_should_be_truncated(self):
"""Return true if the data type should be truncated instead of cast to the data type."""
return not (
self.data_type.lower() == "int64"
or (self.data_type.lower() == "date" and self.granularity.lower() == "day")
)

def render(self, alias: Optional[str] = None):
column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME"
if alias:
column = f"{alias}.{column}"

if self.data_type.lower() == "int64" or (
self.data_type.lower() == "date" and self.granularity.lower() == "day"
):
return column
else:
if self.data_type_should_be_truncated():
return f"{self.data_type}_trunc({column}, {self.granularity})"
else:
return column

def render_wrapped(self, alias: Optional[str] = None):
"""Wrap the partitioning column when time involved to ensure it is properly casted to matching time."""
if self.data_type in ("date", "timestamp", "datetime"):
"""Wrap the partitioning column when time involved to ensure it is properly cast to matching time."""
# if data type is going to be truncated, no need to wrap
if (
self.data_type in ("date", "timestamp", "datetime")
and not self.data_type_should_be_truncated()
):
return f"{self.data_type}({self.render(alias)})"
else:
return self.render(alias)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%}

declare _dbt_max_partition {{ partition_by.data_type }} default (
declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
declare dbt_partitions_for_replacement array<{{ partition_by.data_type_for_partition() }}>;

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
Expand Down
Loading

0 comments on commit eb908d7

Please sign in to comment.