diff --git a/README.md b/README.md index 5cf39293..68bcfb66 100644 --- a/README.md +++ b/README.md @@ -320,16 +320,6 @@ vars: With these variables set, the `combine_property_data` macro will run as a pre-hook to `base_ga4_events` and clone shards to the target dataset. The number of days' worth of data to clone during incremental runs will be based on the `static_incremental_days` variable. -Jobs that run a large number of clone operations are prone to timing out. As a result, it is recommended that you increase the query timeout if you need to backfill or full-refresh the table, when first setting up or when the base model gets modified. Otherwise, it is best to prevent the base model from rebuilding on full refreshes unless needed to minimize timeouts. - -``` -models: - ga4: - staging: - base: - base_ga4__events: - +full_refresh: false -``` # dbt Style Guide This package attempts to adhere to the Brooklyn Data style guide found [here](https://github.com/brooklyn-data/co/blob/main/sql_style_guide.md). This work is in-progress. diff --git a/macros/combine_property_data.sql b/macros/combine_property_data.sql index 67ef31cc..f93c4355 100644 --- a/macros/combine_property_data.sql +++ b/macros/combine_property_data.sql @@ -1,38 +1,9 @@ -{%- macro combine_property_data() -%} - {{ return(adapter.dispatch('combine_property_data', 'ga4')()) }} +{%- macro combine_property_data(property_ids=var('property_ids'), start_date=none, end_date=none) -%} + {{ return(adapter.dispatch('combine_property_data', 'ga4')(property_ids, start_date, end_date)) }} {%- endmacro -%} -{% macro default__combine_property_data() %} - - create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`; - - {# If incremental, then use static_incremental_days variable to find earliest shard to copy #} - {% if not should_full_refresh() %} - {% set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int %} - {% else %} - {# Otherwise use 'start_date' variable #} - - {% set earliest_shard_to_retrieve = var('start_date')|int %} - {% endif %} - - {% for property_id in var('property_ids') %} - {%- set schema_name = "analytics_" + property_id|string -%} - {# Copy intraday tables #} - {%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('source_project')) -%} - {% for relation in relations %} - {%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%} - {%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%} - CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`; - {%- endif -%} - {% endfor %} - {# Copy daily tables and drop old intraday table #} - {%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('source_project')) -%} - {% for relation in relations %} - {%- set relation_suffix = relation.identifier|replace('events_', '') -%} - {%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%} - CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`; - DROP TABLE IF EXISTS `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`; - {%- endif -%} - {% endfor %} +{% macro default__combine_property_data(property_ids=var('property_ids'), start_date=none, end_date=none) %} + {% for property_id in property_ids %} + {{ ga4.combine_specified_property_data(property_id, start_date, end_date) }} {% endfor %} -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/macros/combine_specified_property_data.sql b/macros/combine_specified_property_data.sql new file mode 100644 index 00000000..150f9e2e --- /dev/null +++ b/macros/combine_specified_property_data.sql @@ -0,0 +1,63 @@ +{%- macro combine_specified_property_data(property_id, start_date, end_date) -%} + {{ return(adapter.dispatch('combine_specified_property_data', 'ga4')(property_id, start_date, end_date)) }} +{%- endmacro -%} + +{% macro default__combine_specified_property_data(property_id, start_date, end_date) %} + {% if not property_id %} + {{ exceptions.raise_compiler_error("Error: argument `property_id` is required for `combine_specified_property_data` macro.") }} + {% endif %} + + {% if var('combined_dataset', false) == false %} + {{ exceptions.raise_compiler_error("Error: `combined_dataset` variable is required for `combine_specified_property_data` macro.") }} + {% endif %} + + {% if start_date %} + {# If the 'start_date' argument exists, use it. #} + {%- set earliest_shard_to_retrieve = start_date|int -%} + {% elif not should_full_refresh() %} + {# If incremental, then use static_incremental_days variable to find earliest shard to copy #} + {%- set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int -%} + {% else %} + {# Otherwise use 'start_date' variable #} + {%- set earliest_shard_to_retrieve = var('start_date')|int -%} + {% endif %} + + {% if end_date %} + {# If the 'end_date' argument exists, use it. #} + {%- set latest_shard_to_retrieve = end_date|int -%} + {% else %} + {# Otherwise use 'end_date' variable #} + {%- set latest_shard_to_retrieve = var('end_date', modules.datetime.date.today()|string|replace("-", ""))|int -%} + {% endif %} + + {%- set schema_name = "analytics_" + property_id|string -%} + + {%- set combine_specified_property_data_query -%} + create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`; + + {# Copy intraday tables #} + {%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('source_project')) -%} + {% for relation in relations %} + {%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%} + {%- if earliest_shard_to_retrieve|int <= relation_suffix|int <= latest_shard_to_retrieve|int -%} + create or replace table `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` clone `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`; + {%- endif -%} + {% endfor %} + + {# Copy daily tables and drop old intraday table #} + {%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('source_project')) -%} + {% for relation in relations %} + {%- set relation_suffix = relation.identifier|replace('events_', '') -%} + {%- if earliest_shard_to_retrieve|int <= relation_suffix|int <= latest_shard_to_retrieve|int -%} + create or replace table `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` clone `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`; + drop table if exists `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`; + {%- endif -%} + {% endfor %} + {%- endset -%} + + {% do run_query(combine_specified_property_data_query) %} + + {% if execute %} + {{ log("Cloned from `" ~ var('source_project') ~ ".analytics_" ~ property_id ~ ".events_*[" ~ earliest_shard_to_retrieve ~ "-" ~ latest_shard_to_retrieve ~ "]` to `" ~ target.project ~ "." ~ var('combined_dataset') ~ ".events_YYYYMMDD" ~ property_id ~ "`.", True) }} + {% endif %} +{% endmacro %}