Skip to content

Commit

Permalink
Fix error when setting a large number of properties
Browse files Browse the repository at this point in the history
Bugfix

Fix #269.

This change greatly reduces the likelihood of an error when specifying a large number of property_ids in `ga4.combine_property_data()`.

* Fixed the following bug
  * Changed to copy a table for each peoperty_id

dbt_project.yml

```yml
vars:
    ga4:
        source_project: source-project-id
        property_ids: [
          000000001
          , 000000002
          , ...
          , 000000040
        ]
        start_date: 20210101
        static_incremental_days: 3
        combined_dataset: combined_dataset_name
```

```shell
$ dbt run -s base_ga4__events --full-refresh
06:51:19  Running with dbt=1.5.0
06:52:05  Found 999 models, 999 tests, 999 snapshots, 999 analyses, 999 macros, 999 operations, 999 seed files, 999 sources, 999 exposures, 999 metrics, 999 groups
06:52:06
06:52:14  Concurrency: 4 threads (target='dev')
06:52:14
06:52:14  1 of 1 START sql view model dataset_name.base_ga4__events ......... [RUN]
06:56:17  BigQuery adapter: https://console.cloud.google.com/bigquery?project=project-id&j=bq:asia-northeast1:????????-????-????-????-????????????&page=queryresults
06:56:17  1 of 1 ERROR creating sql view model dataset_name.base_ga4__events  [ERROR in 243.80s]
06:56:18
06:56:18  Finished running 1 view model in 0 hours 4 minutes and 11.62 seconds (251.62s).
06:56:22
06:56:22  Completed with 1 error and 0 warnings:
06:56:22
06:56:23  Database Error in model base_ga4__events (models/staging/base/base_ga4__events.sql)
06:56:23    The query is too large. The maximum standard SQL query length is 1024.00K characters, including comments and white space characters.
06:56:23
06:56:23  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
```

Merging this pull request will enable execution.

```shell
$ dbt run -s base_ga4__events --full-refresh
HH:mm:ss  Running with dbt=1.5.0
HH:mm:ss  Found 999 models, 999 tests, 999 snapshots, 999 analyses, 999 macros, 999 operations, 999 seed files, 999 sources, 999 exposures, 999 metrics, 999 groups
HH:mm:ss
HH:mm:ss  Concurrency: 4 threads (target='dev')
HH:mm:ss
HH:mm:ss  1 of 1 START sql incremental model dataset_name.base_ga4__events ... [RUN]
HH:mm:ss  Cloned from `source-project-id.analytics_000000001.events_*[20210101-20240324]` to `project-id.combined_dataset_name.events_YYYYMMDD000000001`.
HH:mm:ss  Cloned from `source-project-id.analytics_000000002.events_*[20210101-20240324]` to `project-id.combined_dataset_name.events_YYYYMMDD000000002`.
....
HH:mm:ss  Cloned from `source-project-id.analytics_000000040.events_*[20210101-20240324]` to `project-id.combined_dataset_name.events_YYYYMMDD000000040`.
HH:mm:ss  1 of 1 OK created sql incremental model dataset_name.base_ga4__events  [CREATE TABLE (? rows, ? processed) in ?]
HH:mm:ss
HH:mm:ss  Finished running 1 incremental model in ? (?).
HH:mm:ss
HH:mm:ss  Completed successfully
HH:mm:ss
HH:mm:ss  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
```

---

Fixed timeout in clone operation

The following error will almost never occur because I have changed to clone separated by property_id.

* Removed https://github.com/Velir/dbt-ga4/blame/6.0.1/README.md#L323-L332 from README.md
* Resolved the following operation

https://github.com/Velir/dbt-ga4/blame/6.0.1/README.md#L323-L332

> Jobs that run a large number of clone operations are prone to timing out. As a result, it is recommended that you increase the query timeout if you need to backfill or full-refresh the table, when first setting up or when the base model gets modified. Otherwise, it is best to prevent the base model from rebuilding on full refreshes unless needed to minimize timeouts.
>
> ```
> models:
>   ga4:
>     staging:
>       base:
>         base_ga4__events:
>           +full_refresh: false
> ```
  • Loading branch information
yamotech committed Mar 24, 2024
1 parent fd6fe8b commit 22b2d96
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 45 deletions.
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,6 @@ vars:

With these variables set, the `combine_property_data` macro will run as a pre-hook to `base_ga4_events` and clone shards to the target dataset. The number of days' worth of data to clone during incremental runs will be based on the `static_incremental_days` variable.

Jobs that run a large number of clone operations are prone to timing out. As a result, it is recommended that you increase the query timeout if you need to backfill or full-refresh the table, when first setting up or when the base model gets modified. Otherwise, it is best to prevent the base model from rebuilding on full refreshes unless needed to minimize timeouts.

```
models:
ga4:
staging:
base:
base_ga4__events:
+full_refresh: false
```
# dbt Style Guide

This package attempts to adhere to the Brooklyn Data style guide found [here](https://github.com/brooklyn-data/co/blob/main/sql_style_guide.md). This work is in-progress.
41 changes: 6 additions & 35 deletions macros/combine_property_data.sql
Original file line number Diff line number Diff line change
@@ -1,38 +1,9 @@
{%- macro combine_property_data() -%}
{{ return(adapter.dispatch('combine_property_data', 'ga4')()) }}
{%- macro combine_property_data(property_ids=var('property_ids'), start_date=none, end_date=none) -%}
{{ return(adapter.dispatch('combine_property_data', 'ga4')(property_ids, start_date, end_date)) }}
{%- endmacro -%}

{% macro default__combine_property_data() %}

create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`;

{# If incremental, then use static_incremental_days variable to find earliest shard to copy #}
{% if not should_full_refresh() %}
{% set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int %}
{% else %}
{# Otherwise use 'start_date' variable #}

{% set earliest_shard_to_retrieve = var('start_date')|int %}
{% endif %}

{% for property_id in var('property_ids') %}
{%- set schema_name = "analytics_" + property_id|string -%}
{# Copy intraday tables #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`;
{%- endif -%}
{% endfor %}
{# Copy daily tables and drop old intraday table #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`;
DROP TABLE IF EXISTS `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`;
{%- endif -%}
{% endfor %}
{% macro default__combine_property_data(property_ids=var('property_ids'), start_date=none, end_date=none) %}
{% for property_id in property_ids %}
{{ ga4.combine_specified_property_data(property_id, start_date, end_date) }}
{% endfor %}
{% endmacro %}
{% endmacro %}
63 changes: 63 additions & 0 deletions macros/combine_specified_property_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{%- macro combine_specified_property_data(property_id, start_date, end_date) -%}
{{ return(adapter.dispatch('combine_specified_property_data', 'ga4')(property_id, start_date, end_date)) }}
{%- endmacro -%}

{% macro default__combine_specified_property_data(property_id, start_date, end_date) %}
{% if not property_id %}
{{ exceptions.raise_compiler_error("Error: argument `property_id` is required for `combine_specified_property_data` macro.") }}
{% endif %}

{% if var('combined_dataset', false) == false %}
{{ exceptions.raise_compiler_error("Error: `combined_dataset` variable is required for `combine_specified_property_data` macro.") }}
{% endif %}

{% if start_date %}
{# If the 'start_date' argument exists, use it. #}
{%- set earliest_shard_to_retrieve = start_date|int -%}
{% elif not should_full_refresh() %}
{# If incremental, then use static_incremental_days variable to find earliest shard to copy #}
{%- set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int -%}
{% else %}
{# Otherwise use 'start_date' variable #}
{%- set earliest_shard_to_retrieve = var('start_date')|int -%}
{% endif %}

{% if end_date %}
{# If the 'end_date' argument exists, use it. #}
{%- set latest_shard_to_retrieve = end_date|int -%}
{% else %}
{# Otherwise use 'end_date' variable #}
{%- set latest_shard_to_retrieve = var('end_date', modules.datetime.date.today()|string|replace("-", ""))|int -%}
{% endif %}

{%- set schema_name = "analytics_" + property_id|string -%}

{%- set combine_specified_property_data_query -%}
create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`;

{# Copy intraday tables #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%}
{%- if earliest_shard_to_retrieve|int <= relation_suffix|int <= latest_shard_to_retrieve|int -%}
create or replace table `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` clone `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`;
{%- endif -%}
{% endfor %}

{# Copy daily tables and drop old intraday table #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_', '') -%}
{%- if earliest_shard_to_retrieve|int <= relation_suffix|int <= latest_shard_to_retrieve|int -%}
create or replace table `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` clone `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`;
drop table if exists `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`;
{%- endif -%}
{% endfor %}
{%- endset -%}

{% do run_query(combine_specified_property_data_query) %}

{% if execute %}
{{ log("Cloned from `" ~ var('source_project') ~ ".analytics_" ~ property_id ~ ".events_*[" ~ earliest_shard_to_retrieve ~ "-" ~ latest_shard_to_retrieve ~ "]` to `" ~ target.project ~ "." ~ var('combined_dataset') ~ ".events_YYYYMMDD" ~ property_id ~ "`.", True) }}
{% endif %}
{% endmacro %}

0 comments on commit 22b2d96

Please sign in to comment.