From f58011f5635b5c691fa7ed10a5e3d7d1d890bbec Mon Sep 17 00:00:00 2001 From: Tomoki Yamauchi Date: Tue, 16 Apr 2024 21:32:42 +0900 Subject: [PATCH 1/3] Fix error when setting a large number of properties (#312) * Fix error when setting a large number of properties Bugfix Fix #269. This change greatly reduces the likelihood of an error when specifying a large number of property_ids in `ga4.combine_property_data()`. * Fixed the following bug * Changed to copy a table for each peoperty_id dbt_project.yml ```yml vars: ga4: source_project: source-project-id property_ids: [ 000000001 , 000000002 , ... , 000000040 ] start_date: 20210101 static_incremental_days: 3 combined_dataset: combined_dataset_name ``` ```shell $ dbt run -s base_ga4__events --full-refresh 06:51:19 Running with dbt=1.5.0 06:52:05 Found 999 models, 999 tests, 999 snapshots, 999 analyses, 999 macros, 999 operations, 999 seed files, 999 sources, 999 exposures, 999 metrics, 999 groups 06:52:06 06:52:14 Concurrency: 4 threads (target='dev') 06:52:14 06:52:14 1 of 1 START sql view model dataset_name.base_ga4__events ......... [RUN] 06:56:17 BigQuery adapter: https://console.cloud.google.com/bigquery?project=project-id&j=bq:asia-northeast1:????????-????-????-????-????????????&page=queryresults 06:56:17 1 of 1 ERROR creating sql view model dataset_name.base_ga4__events [ERROR in 243.80s] 06:56:18 06:56:18 Finished running 1 view model in 0 hours 4 minutes and 11.62 seconds (251.62s). 06:56:22 06:56:22 Completed with 1 error and 0 warnings: 06:56:22 06:56:23 Database Error in model base_ga4__events (models/staging/base/base_ga4__events.sql) 06:56:23 The query is too large. The maximum standard SQL query length is 1024.00K characters, including comments and white space characters. 06:56:23 06:56:23 Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1 ``` Merging this pull request will enable execution. ```shell $ dbt run -s base_ga4__events --full-refresh HH:mm:ss Running with dbt=1.5.0 HH:mm:ss Found 999 models, 999 tests, 999 snapshots, 999 analyses, 999 macros, 999 operations, 999 seed files, 999 sources, 999 exposures, 999 metrics, 999 groups HH:mm:ss HH:mm:ss Concurrency: 4 threads (target='dev') HH:mm:ss HH:mm:ss 1 of 1 START sql incremental model dataset_name.base_ga4__events ... [RUN] HH:mm:ss Cloned from `source-project-id.analytics_000000001.events_*[20210101-20240324]` to `project-id.combined_dataset_name.events_YYYYMMDD000000001`. HH:mm:ss Cloned from `source-project-id.analytics_000000002.events_*[20210101-20240324]` to `project-id.combined_dataset_name.events_YYYYMMDD000000002`. .... HH:mm:ss Cloned from `source-project-id.analytics_000000040.events_*[20210101-20240324]` to `project-id.combined_dataset_name.events_YYYYMMDD000000040`. HH:mm:ss 1 of 1 OK created sql incremental model dataset_name.base_ga4__events [CREATE TABLE (? rows, ? processed) in ?] HH:mm:ss HH:mm:ss Finished running 1 incremental model in ? (?). HH:mm:ss HH:mm:ss Completed successfully HH:mm:ss HH:mm:ss Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1 ``` --- Fixed timeout in clone operation The following error will almost never occur because I have changed to clone separated by property_id. * Removed https://github.com/Velir/dbt-ga4/blame/6.0.1/README.md#L323-L332 from README.md * Resolved the following operation https://github.com/Velir/dbt-ga4/blame/6.0.1/README.md#L323-L332 > Jobs that run a large number of clone operations are prone to timing out. As a result, it is recommended that you increase the query timeout if you need to backfill or full-refresh the table, when first setting up or when the base model gets modified. Otherwise, it is best to prevent the base model from rebuilding on full refreshes unless needed to minimize timeouts. > > ``` > models: > ga4: > staging: > base: > base_ga4__events: > +full_refresh: false > ``` * Changed the implementation of combine_property_data to the minimum necessary * Remove latest_shard_to_retrieve --- README.md | 10 ---------- macros/combine_property_data.sql | 32 ++++++++++++++++++++------------ 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 5cf39293..68bcfb66 100644 --- a/README.md +++ b/README.md @@ -320,16 +320,6 @@ vars: With these variables set, the `combine_property_data` macro will run as a pre-hook to `base_ga4_events` and clone shards to the target dataset. The number of days' worth of data to clone during incremental runs will be based on the `static_incremental_days` variable. -Jobs that run a large number of clone operations are prone to timing out. As a result, it is recommended that you increase the query timeout if you need to backfill or full-refresh the table, when first setting up or when the base model gets modified. Otherwise, it is best to prevent the base model from rebuilding on full refreshes unless needed to minimize timeouts. - -``` -models: - ga4: - staging: - base: - base_ga4__events: - +full_refresh: false -``` # dbt Style Guide This package attempts to adhere to the Brooklyn Data style guide found [here](https://github.com/brooklyn-data/co/blob/main/sql_style_guide.md). This work is in-progress. diff --git a/macros/combine_property_data.sql b/macros/combine_property_data.sql index 67ef31cc..941e879c 100644 --- a/macros/combine_property_data.sql +++ b/macros/combine_property_data.sql @@ -3,36 +3,44 @@ {%- endmacro -%} {% macro default__combine_property_data() %} - - create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`; - - {# If incremental, then use static_incremental_days variable to find earliest shard to copy #} {% if not should_full_refresh() %} - {% set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int %} + {# If incremental, then use static_incremental_days variable to find earliest shard to copy #} + {%- set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int -%} {% else %} - {# Otherwise use 'start_date' variable #} - - {% set earliest_shard_to_retrieve = var('start_date')|int %} + {# Otherwise use 'start_date' variable #} + {%- set earliest_shard_to_retrieve = var('start_date')|int -%} {% endif %} {% for property_id in var('property_ids') %} {%- set schema_name = "analytics_" + property_id|string -%} + + {%- set combine_specified_property_data_query -%} + create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`; + {# Copy intraday tables #} {%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('source_project')) -%} {% for relation in relations %} {%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%} {%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%} - CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`; + create or replace table `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` clone `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`; {%- endif -%} {% endfor %} + {# Copy daily tables and drop old intraday table #} {%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('source_project')) -%} {% for relation in relations %} {%- set relation_suffix = relation.identifier|replace('events_', '') -%} {%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%} - CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`; - DROP TABLE IF EXISTS `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`; + create or replace table `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` clone `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`; + drop table if exists `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`; {%- endif -%} {% endfor %} + {%- endset -%} + + {% do run_query(combine_specified_property_data_query) %} + + {% if execute %} + {{ log("Cloned from `" ~ var('source_project') ~ ".analytics_" ~ property_id ~ ".events_*` to `" ~ target.project ~ "." ~ var('combined_dataset') ~ ".events_YYYYMMDD" ~ property_id ~ "`.", True) }} + {% endif %} {% endfor %} -{% endmacro %} \ No newline at end of file +{% endmacro %} From 3ef51b6414b5ea35369b676d01e15d63fb30af1d Mon Sep 17 00:00:00 2001 From: Florian Chabert <65234487+FloDevelops@users.noreply.github.com> Date: Wed, 5 Jun 2024 13:02:12 +0200 Subject: [PATCH 2/3] feature: add property_id (#321) * feature: add property_id * fix: remove "intraday_" for propery_id --- macros/base_select.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/macros/base_select.sql b/macros/base_select.sql index 1f374539..8919388f 100644 --- a/macros/base_select.sql +++ b/macros/base_select.sql @@ -35,6 +35,9 @@ , ecommerce.unique_items , ecommerce.transaction_id , items + , {%- if var('combined_dataset', false) != false %} cast(left(regexp_replace(_table_suffix, r'^(intraday_)?\d{8}', ''), 100) as int64) + {%- else %} {{ var('property_ids')[0] }} + {%- endif %} as property_id {% endmacro %} {% macro base_select_renamed() %} @@ -136,6 +139,7 @@ , unnested_items.item_params )) from unnest(items) as unnested_items ) items + , property_id , {{ ga4.unnest_key('event_params', 'ga_session_id', 'int_value', 'session_id') }} , {{ ga4.unnest_key('event_params', 'page_location') }} , {{ ga4.unnest_key('event_params', 'ga_session_number', 'int_value', 'session_number') }} From df85449977509e4d224111db3d4b58b04249f09b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20LAURENT?= Date: Wed, 5 Jun 2024 16:39:29 +0200 Subject: [PATCH 3/3] Fix invalid generated column names in conversion events (#327) * Add valid_column_name macro * Update models using conversion_events * Add tests * Update project name in tests to call macros * Add new macro to unit tests * Fix invalid escape character * Fix page test by harmonizing event name * Fix jinja range error in test --------- Co-authored-by: Adam Ribaudo --- macros/valid_column_name.sql | 14 +++++ models/marts/core/fct_ga4__client_keys.sql | 3 +- models/marts/core/fct_ga4__sessions.sql | 3 +- models/marts/core/fct_ga4__user_ids.sql | 3 +- models/staging/stg_ga4__page_conversions.sql | 2 +- .../stg_ga4__session_conversions_daily.sql | 2 +- unit_tests/test_stg_ga4__page_conversions.py | 58 ++++++++++++++++-- ...test_stg_ga4__session_conversions_daily.py | 61 +++++++++++++++++-- 8 files changed, 131 insertions(+), 15 deletions(-) create mode 100644 macros/valid_column_name.sql diff --git a/macros/valid_column_name.sql b/macros/valid_column_name.sql new file mode 100644 index 00000000..e045512b --- /dev/null +++ b/macros/valid_column_name.sql @@ -0,0 +1,14 @@ +{% macro valid_column_name(column_name) %} + {% set re = modules.re %} + {% set pattern = '[^a-zA-Z0-9_]' %} + {# a column name can't contain a non alphanumeric or _ character #} + {% set cleaned_name = re.sub(pattern, '_', column_name|string) %} + + {% if re.match('^\\d', cleaned_name) %} + {# a column name can't start by a number #} + {{ return("_" ~ cleaned_name) }} + {% else %} + {{ return(cleaned_name) }} + {% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/models/marts/core/fct_ga4__client_keys.sql b/models/marts/core/fct_ga4__client_keys.sql index c6f2de3c..b09215eb 100644 --- a/models/marts/core/fct_ga4__client_keys.sql +++ b/models/marts/core/fct_ga4__client_keys.sql @@ -10,7 +10,8 @@ select count(distinct session_key) as count_sessions {% if var('conversion_events', false) %} {% for ce in var('conversion_events',[]) %} - , sum(count_{{ce}}) as count_{{ce}} + {% set clean_ce = ga4.valid_column_name(ce) %} + , sum(count_{{clean_ce}}) as count_{{clean_ce}} {% endfor %} {% endif %} from {{ref('fct_ga4__sessions')}} diff --git a/models/marts/core/fct_ga4__sessions.sql b/models/marts/core/fct_ga4__sessions.sql index 32a47aaa..29855ec9 100644 --- a/models/marts/core/fct_ga4__sessions.sql +++ b/models/marts/core/fct_ga4__sessions.sql @@ -14,7 +14,8 @@ select min(session_number) as session_number {% if var('conversion_events', false) %} {% for ce in var('conversion_events',[]) %} - , sum({{ce}}_count) as count_{{ce}} + {% set clean_ce = ga4.valid_column_name(ce) %} + , sum({{clean_ce}}_count) as count_{{clean_ce}} {% endfor %} {% endif %} from {{ref('fct_ga4__sessions_daily')}} diff --git a/models/marts/core/fct_ga4__user_ids.sql b/models/marts/core/fct_ga4__user_ids.sql index aac65ea0..ae52ce11 100644 --- a/models/marts/core/fct_ga4__user_ids.sql +++ b/models/marts/core/fct_ga4__user_ids.sql @@ -24,7 +24,8 @@ select sum(count_sessions) as count_sessions {% if var('conversion_events', false) %} {% for ce in var('conversion_events',[]) %} - , sum(count_{{ce}}) as count_{{ce}} + {% set clean_ce = ga4.valid_column_name(ce) %} + , sum(count_{{clean_ce}}) as count_{{clean_ce}} {% endfor %} {% endif %} from user_id_mapped diff --git a/models/staging/stg_ga4__page_conversions.sql b/models/staging/stg_ga4__page_conversions.sql index e220cd3e..ea5a7f57 100644 --- a/models/staging/stg_ga4__page_conversions.sql +++ b/models/staging/stg_ga4__page_conversions.sql @@ -5,7 +5,7 @@ select page_key {% for ce in var('conversion_events',[]) %} - , countif(event_name = '{{ce}}') as {{ce}}_count + , countif(event_name = '{{ce}}') as {{ga4.valid_column_name(ce)}}_count {% endfor %} from {{ref('stg_ga4__events')}} group by 1 \ No newline at end of file diff --git a/models/staging/stg_ga4__session_conversions_daily.sql b/models/staging/stg_ga4__session_conversions_daily.sql index f33ea6d5..49b0ed85 100644 --- a/models/staging/stg_ga4__session_conversions_daily.sql +++ b/models/staging/stg_ga4__session_conversions_daily.sql @@ -25,7 +25,7 @@ with event_counts as ( session_partition_key, min(event_date_dt) as session_partition_date -- The date of this session partition {% for ce in var('conversion_events',[]) %} - , countif(event_name = '{{ce}}') as {{ce}}_count + , countif(event_name = '{{ce}}') as {{ga4.valid_column_name(ce)}}_count {% endfor %} from {{ref('stg_ga4__events')}} where 1=1 diff --git a/unit_tests/test_stg_ga4__page_conversions.py b/unit_tests/test_stg_ga4__page_conversions.py index a88c2fa2..6d3cd7da 100644 --- a/unit_tests/test_stg_ga4__page_conversions.py +++ b/unit_tests/test_stg_ga4__page_conversions.py @@ -1,5 +1,5 @@ import pytest -from dbt.tests.util import read_file,check_relations_equal,run_dbt +from dbt.tests.util import check_relations_equal, read_file, run_dbt # Define mocks via CSV (seeds) or SQL (models) mock_stg_ga4__events_csv = """event_name,page_key @@ -8,14 +8,33 @@ page_view,B """.lstrip() +mock_stg_ga4__nonstandard_events_csv = """event_name,page_key +page-view,A +page-view,A +page-view,B +""".lstrip() + expected_csv = """page_key,page_view_count A,2 B,1 """.lstrip() -actual = read_file('../models/staging/stg_ga4__page_conversions.sql') +actual = read_file("../models/staging/stg_ga4__page_conversions.sql") + + +class TestPageConversions: + # Update project name to ga4 so we can call macros with ga4.macro_name + @pytest.fixture(scope="class") + def project_config_update(self): + return {"name": "ga4"} + + # everything that goes in the "macros" + @pytest.fixture(scope="class") + def macros(self): + return { + "valid_column_name.sql": read_file("../macros/valid_column_name.sql"), + } -class TestPageConversions(): # everything that goes in the "seeds" directory (= CSV format) @pytest.fixture(scope="class") def seeds(self): @@ -30,8 +49,37 @@ def models(self): return { "actual.sql": actual, } - + def test_mock_run_and_check(self, project): run_dbt(["build", "--vars", "conversion_events: ['page_view']"]) - #breakpoint() + # breakpoint() + check_relations_equal(project.adapter, ["actual", "expected"]) + + +class TestPageConversionsNonStandardEventName: + # everything that goes in the "seeds" directory (= CSV format) + @pytest.fixture(scope="class") + def seeds(self): + return { + "stg_ga4__events.csv": mock_stg_ga4__nonstandard_events_csv, + "expected.csv": expected_csv, + } + + # everything that goes in the "macros" + @pytest.fixture(scope="class") + def macros(self): + return { + "valid_column_name.sql": read_file("../macros/valid_column_name.sql"), + } + + # everything that goes in the "models" directory (= SQL) + @pytest.fixture(scope="class") + def models(self): + return { + "actual.sql": actual, + } + + def test_mock_run_and_check(self, project): + run_dbt(["build", "--vars", "conversion_events: ['page-view']"]) + # breakpoint() check_relations_equal(project.adapter, ["actual", "expected"]) diff --git a/unit_tests/test_stg_ga4__session_conversions_daily.py b/unit_tests/test_stg_ga4__session_conversions_daily.py index bc1cbc65..8ad1e7ae 100644 --- a/unit_tests/test_stg_ga4__session_conversions_daily.py +++ b/unit_tests/test_stg_ga4__session_conversions_daily.py @@ -1,5 +1,5 @@ import pytest -from dbt.tests.util import read_file,check_relations_equal,run_dbt +from dbt.tests.util import check_relations_equal, read_file, run_dbt # Define mocks via CSV (seeds) or SQL (models) mock_stg_ga4__events_csv = """session_key,session_partition_key,event_name,event_date_dt @@ -11,6 +11,15 @@ A,A2022-01-02,my_conversion,2022-01-02 """.lstrip() +mock_stg_ga4__nonstandard_events_csv = """session_key,session_partition_key,event_name,event_date_dt +A,A2022-01-01,page_view,2022-01-01 +A,A2022-01-01,my-conversion,2022-01-01 +A,A2022-01-01,my-conversion,2022-01-01 +B,B2022-01-01,my-conversion,2022-01-01 +C,C2022-01-01,some_other_event,2022-01-01 +A,A2022-01-02,my-conversion,2022-01-02 +""".lstrip() + expected_csv = """session_key,session_partition_key,session_partition_date,my_conversion_count A,A2022-01-01,2022-01-01,2 B,B2022-01-01,2022-01-01,1 @@ -18,9 +27,15 @@ A,A2022-01-02,2022-01-02,1 """.lstrip() -actual = read_file('../models/staging/stg_ga4__session_conversions_daily.sql') +actual = read_file("../models/staging/stg_ga4__session_conversions_daily.sql") + + +class TestUsersFirstLastEvents: + # Update project name to ga4 so we can call macros with ga4.macro_name + @pytest.fixture(scope="class") + def project_config_update(self): + return {"name": "ga4", "vars": {"static_incremental_days": 3}} -class TestUsersFirstLastEvents(): # everything that goes in the "seeds" directory (= CSV format) @pytest.fixture(scope="class") def seeds(self): @@ -29,14 +44,50 @@ def seeds(self): "expected.csv": expected_csv, } + # everything that goes in the "macros" + @pytest.fixture(scope="class") + def macros(self): + return { + "valid_column_name.sql": read_file("../macros/valid_column_name.sql"), + } + # everything that goes in the "models" directory (= SQL) @pytest.fixture(scope="class") def models(self): return { "actual.sql": actual, } - + def test_mock_run_and_check(self, project): run_dbt(["build", "--vars", "conversion_events: ['my_conversion']"]) - #breakpoint() + # breakpoint() + check_relations_equal(project.adapter, ["actual", "expected"]) + + +class TestUsersNonStandardEventName: + # everything that goes in the "seeds" directory (= CSV format) + @pytest.fixture(scope="class") + def seeds(self): + return { + "stg_ga4__events.csv": mock_stg_ga4__nonstandard_events_csv, + "expected.csv": expected_csv, + } + + # everything that goes in the "macros" + @pytest.fixture(scope="class") + def macros(self): + return { + "valid_column_name.sql": read_file("../macros/valid_column_name.sql"), + } + + # everything that goes in the "models" directory (= SQL) + @pytest.fixture(scope="class") + def models(self): + return { + "actual.sql": actual, + } + + def test_mock_run_and_check(self, project): + run_dbt(["build", "--vars", "conversion_events: ['my-conversion']"]) + # breakpoint() check_relations_equal(project.adapter, ["actual", "expected"])