Skip to content

Commit

Permalink
Merge branch 'main'
Browse files Browse the repository at this point in the history
  • Loading branch information
dgitis committed Jul 20, 2024
2 parents 982cae7 + df85449 commit 6312c46
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 25 deletions.
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,6 @@ vars:

With these variables set, the `combine_property_data` macro will run as a pre-hook to `base_ga4_events` and clone shards to the target dataset. The number of days' worth of data to clone during incremental runs will be based on the `static_incremental_days` variable.

Jobs that run a large number of clone operations are prone to timing out. As a result, it is recommended that you increase the query timeout if you need to backfill or full-refresh the table, when first setting up or when the base model gets modified. Otherwise, it is best to prevent the base model from rebuilding on full refreshes unless needed to minimize timeouts.

```
models:
ga4:
staging:
base:
base_ga4__events:
+full_refresh: false
```
# dbt Style Guide

This package attempts to adhere to the Brooklyn Data style guide found [here](https://github.com/brooklyn-data/co/blob/main/sql_style_guide.md). This work is in-progress.
4 changes: 4 additions & 0 deletions macros/base_select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
, ecommerce.unique_items
, ecommerce.transaction_id
, items
, {%- if var('combined_dataset', false) != false %} cast(left(regexp_replace(_table_suffix, r'^(intraday_)?\d{8}', ''), 100) as int64)
{%- else %} {{ var('property_ids')[0] }}
{%- endif %} as property_id
{% endmacro %}

{% macro base_select_renamed() %}
Expand Down Expand Up @@ -136,6 +139,7 @@
, unnested_items.item_params
)) from unnest(items) as unnested_items
) items
, property_id
, {{ ga4.unnest_key('event_params', 'ga_session_id', 'int_value', 'session_id') }}
, {{ ga4.unnest_key('event_params', 'page_location') }}
, {{ ga4.unnest_key('event_params', 'ga_session_number', 'int_value', 'session_number') }}
Expand Down
4 changes: 4 additions & 0 deletions macros/combine_property_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
{% if not should_full_refresh() %}
{# If incremental, then use static_incremental_days variable to find earliest shard to copy #}
{%- set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int -%}
{# If incremental, then use static_incremental_days variable to find earliest shard to copy #}
{%- set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int -%}
{% else %}
{# Otherwise use 'start_date' variable #}
{%- set earliest_shard_to_retrieve = var('start_date')|int -%}
{# Otherwise use 'start_date' variable #}
{%- set earliest_shard_to_retrieve = var('start_date')|int -%}
{% endif %}
{% for property_id in var('property_ids') %}
{%- set schema_name = "analytics_" + property_id|string -%}
Expand Down
14 changes: 14 additions & 0 deletions macros/valid_column_name.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro valid_column_name(column_name) %}
{% set re = modules.re %}
{% set pattern = '[^a-zA-Z0-9_]' %}
{# a column name can't contain a non alphanumeric or _ character #}
{% set cleaned_name = re.sub(pattern, '_', column_name|string) %}

{% if re.match('^\\d', cleaned_name) %}
{# a column name can't start by a number #}
{{ return("_" ~ cleaned_name) }}
{% else %}
{{ return(cleaned_name) }}
{% endif %}

{% endmacro %}
3 changes: 2 additions & 1 deletion models/marts/core/fct_ga4__client_keys.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ select
count(distinct session_key) as count_sessions
{% if var('conversion_events', false) %}
{% for ce in var('conversion_events',[]) %}
, sum(count_{{ce}}) as count_{{ce}}
{% set clean_ce = ga4.valid_column_name(ce) %}
, sum(count_{{clean_ce}}) as count_{{clean_ce}}
{% endfor %}
{% endif %}
from {{ref('fct_ga4__sessions')}}
Expand Down
3 changes: 2 additions & 1 deletion models/marts/core/fct_ga4__sessions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ select
min(session_number) as session_number
{% if var('conversion_events', false) %}
{% for ce in var('conversion_events',[]) %}
, sum({{ce}}_count) as count_{{ce}}
{% set clean_ce = ga4.valid_column_name(ce) %}
, sum({{clean_ce}}_count) as count_{{clean_ce}}
{% endfor %}
{% endif %}
from {{ref('fct_ga4__sessions_daily')}}
Expand Down
3 changes: 2 additions & 1 deletion models/marts/core/fct_ga4__user_ids.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ select
sum(count_sessions) as count_sessions
{% if var('conversion_events', false) %}
{% for ce in var('conversion_events',[]) %}
, sum(count_{{ce}}) as count_{{ce}}
{% set clean_ce = ga4.valid_column_name(ce) %}
, sum(count_{{clean_ce}}) as count_{{clean_ce}}
{% endfor %}
{% endif %}
from user_id_mapped
Expand Down
2 changes: 1 addition & 1 deletion models/staging/stg_ga4__page_conversions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
select
page_key
{% for ce in var('conversion_events',[]) %}
, countif(event_name = '{{ce}}') as {{ce}}_count
, countif(event_name = '{{ce}}') as {{ga4.valid_column_name(ce)}}_count
{% endfor %}
from {{ref('stg_ga4__events')}}
group by 1
2 changes: 1 addition & 1 deletion models/staging/stg_ga4__session_conversions_daily.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ with event_counts as (
session_partition_key,
min(event_date_dt) as session_partition_date -- The date of this session partition
{% for ce in var('conversion_events',[]) %}
, countif(event_name = '{{ce}}') as {{ce}}_count
, countif(event_name = '{{ce}}') as {{ga4.valid_column_name(ce)}}_count
{% endfor %}
from {{ref('stg_ga4__events')}}
where 1=1
Expand Down
58 changes: 53 additions & 5 deletions unit_tests/test_stg_ga4__page_conversions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from dbt.tests.util import read_file,check_relations_equal,run_dbt
from dbt.tests.util import check_relations_equal, read_file, run_dbt

# Define mocks via CSV (seeds) or SQL (models)
mock_stg_ga4__events_csv = """event_name,page_key
Expand All @@ -8,14 +8,33 @@
page_view,B
""".lstrip()

mock_stg_ga4__nonstandard_events_csv = """event_name,page_key
page-view,A
page-view,A
page-view,B
""".lstrip()

expected_csv = """page_key,page_view_count
A,2
B,1
""".lstrip()

actual = read_file('../models/staging/stg_ga4__page_conversions.sql')
actual = read_file("../models/staging/stg_ga4__page_conversions.sql")


class TestPageConversions:
# Update project name to ga4 so we can call macros with ga4.macro_name
@pytest.fixture(scope="class")
def project_config_update(self):
return {"name": "ga4"}

# everything that goes in the "macros"
@pytest.fixture(scope="class")
def macros(self):
return {
"valid_column_name.sql": read_file("../macros/valid_column_name.sql"),
}

class TestPageConversions():
# everything that goes in the "seeds" directory (= CSV format)
@pytest.fixture(scope="class")
def seeds(self):
Expand All @@ -30,8 +49,37 @@ def models(self):
return {
"actual.sql": actual,
}

def test_mock_run_and_check(self, project):
run_dbt(["build", "--vars", "conversion_events: ['page_view']"])
#breakpoint()
# breakpoint()
check_relations_equal(project.adapter, ["actual", "expected"])


class TestPageConversionsNonStandardEventName:
# everything that goes in the "seeds" directory (= CSV format)
@pytest.fixture(scope="class")
def seeds(self):
return {
"stg_ga4__events.csv": mock_stg_ga4__nonstandard_events_csv,
"expected.csv": expected_csv,
}

# everything that goes in the "macros"
@pytest.fixture(scope="class")
def macros(self):
return {
"valid_column_name.sql": read_file("../macros/valid_column_name.sql"),
}

# everything that goes in the "models" directory (= SQL)
@pytest.fixture(scope="class")
def models(self):
return {
"actual.sql": actual,
}

def test_mock_run_and_check(self, project):
run_dbt(["build", "--vars", "conversion_events: ['page-view']"])
# breakpoint()
check_relations_equal(project.adapter, ["actual", "expected"])
61 changes: 56 additions & 5 deletions unit_tests/test_stg_ga4__session_conversions_daily.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from dbt.tests.util import read_file,check_relations_equal,run_dbt
from dbt.tests.util import check_relations_equal, read_file, run_dbt

# Define mocks via CSV (seeds) or SQL (models)
mock_stg_ga4__events_csv = """session_key,session_partition_key,event_name,event_date_dt
Expand All @@ -11,16 +11,31 @@
A,A2022-01-02,my_conversion,2022-01-02
""".lstrip()

mock_stg_ga4__nonstandard_events_csv = """session_key,session_partition_key,event_name,event_date_dt
A,A2022-01-01,page_view,2022-01-01
A,A2022-01-01,my-conversion,2022-01-01
A,A2022-01-01,my-conversion,2022-01-01
B,B2022-01-01,my-conversion,2022-01-01
C,C2022-01-01,some_other_event,2022-01-01
A,A2022-01-02,my-conversion,2022-01-02
""".lstrip()

expected_csv = """session_key,session_partition_key,session_partition_date,my_conversion_count
A,A2022-01-01,2022-01-01,2
B,B2022-01-01,2022-01-01,1
C,C2022-01-01,2022-01-01,0
A,A2022-01-02,2022-01-02,1
""".lstrip()

actual = read_file('../models/staging/stg_ga4__session_conversions_daily.sql')
actual = read_file("../models/staging/stg_ga4__session_conversions_daily.sql")


class TestUsersFirstLastEvents:
# Update project name to ga4 so we can call macros with ga4.macro_name
@pytest.fixture(scope="class")
def project_config_update(self):
return {"name": "ga4", "vars": {"static_incremental_days": 3}}

class TestUsersFirstLastEvents():
# everything that goes in the "seeds" directory (= CSV format)
@pytest.fixture(scope="class")
def seeds(self):
Expand All @@ -29,14 +44,50 @@ def seeds(self):
"expected.csv": expected_csv,
}

# everything that goes in the "macros"
@pytest.fixture(scope="class")
def macros(self):
return {
"valid_column_name.sql": read_file("../macros/valid_column_name.sql"),
}

# everything that goes in the "models" directory (= SQL)
@pytest.fixture(scope="class")
def models(self):
return {
"actual.sql": actual,
}

def test_mock_run_and_check(self, project):
run_dbt(["build", "--vars", "conversion_events: ['my_conversion']"])
#breakpoint()
# breakpoint()
check_relations_equal(project.adapter, ["actual", "expected"])


class TestUsersNonStandardEventName:
# everything that goes in the "seeds" directory (= CSV format)
@pytest.fixture(scope="class")
def seeds(self):
return {
"stg_ga4__events.csv": mock_stg_ga4__nonstandard_events_csv,
"expected.csv": expected_csv,
}

# everything that goes in the "macros"
@pytest.fixture(scope="class")
def macros(self):
return {
"valid_column_name.sql": read_file("../macros/valid_column_name.sql"),
}

# everything that goes in the "models" directory (= SQL)
@pytest.fixture(scope="class")
def models(self):
return {
"actual.sql": actual,
}

def test_mock_run_and_check(self, project):
run_dbt(["build", "--vars", "conversion_events: ['my-conversion']"])
# breakpoint()
check_relations_equal(project.adapter, ["actual", "expected"])

0 comments on commit 6312c46

Please sign in to comment.