Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow distinct source and target projects #294

Merged
merged 7 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,25 @@ This package assumes that you have an existing DBT project with a BigQuery profi
```
vars:
ga4:
project: "your_gcp_project"
dataset: "your_ga4_dataset"
source_project: "my_source_gcp_project" # Project that contains raw GA4 data
property_ids: [11111111] # Array of properties to process
start_date: "YYYYMMDD" # Earliest date to load
static_incremental_days: 3 # Number of days to scan and reprocess on each run
```

## Required Variables (Multi-Project Instance)

When processing multiple properties at a time, the required variables change slightly. See [Multi-Property Support](#multi-property-support) section for details on configuring multiple GA4 properties as a source.

```
vars:
ga4:
source_project: "my_source_gcp_project" # Project that contains raw GA4 data
combined_dataset: "my_combined_data" # Dataset where multi-property data is cloned
property_ids: [11111111,2222222] # Array of properties to process
start_date: "YYYYMMDD" # Earliest date to load
static_incremental_days: 3 # Number of days to scan and reprocess on each run
```
See [Multi-Property Support](#multi-property-support) section for details on configuring multiple GA4 properties as a source.

## Optional Variables

Expand Down Expand Up @@ -295,14 +308,14 @@ Overriding the package's default channel mapping makes use of dbt's dispatch ove

# Multi-Property Support

Multiple GA4 properties are supported by listing out the project IDs in the `property_ids` variable. In this scenario, the `static_incremental_days` variable is required and the `dataset` variable will define the target dataset where source data will be copied.
Multiple GA4 properties are supported by listing out the project IDs in the `property_ids` variable. In this scenario, the `static_incremental_days` variable is required and the `combined_dataset` variable will define the dataset (in your profile's target project) where source data will be copied.

```
vars:
ga4:
property_ids: [11111111, 22222222, 33333333]
static_incremental_days: 3
dataset: "my_combined_dataset"
combined_dataset: "my_combined_dataset"
```

With these variables set, the `combine_property_data` macro will run as a pre-hook to `base_ga4_events` and clone shards to the target dataset. The number of days' worth of data to clone during incremental runs will be based on the `static_incremental_days` variable.
Expand Down
12 changes: 6 additions & 6 deletions macros/combine_property_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

{% macro default__combine_property_data() %}

create schema if not exists `{{var('project')}}.{{var('dataset')}}`;
create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`;

{# If incremental, then use static_incremental_days variable to find earliest shard to copy #}
{% if not should_full_refresh() %}
Expand All @@ -18,20 +18,20 @@
{% for property_id in var('property_ids') %}
{%- set schema_name = "analytics_" + property_id|string -%}
{# Copy intraday tables #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('project')) -%}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{var('project')}}.{{var('dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`;
CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`;
{%- endif -%}
{% endfor %}
{# Copy daily tables and drop old intraday table #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('project')) -%}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{var('project')}}.{{var('dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`;
DROP TABLE IF EXISTS `{{var('project')}}.{{var('dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`;
CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`;
DROP TABLE IF EXISTS `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`;
{%- endif -%}
{% endfor %}
{% endfor %}
Expand Down
2 changes: 1 addition & 1 deletion models/staging/base/base_ga4__events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

{{
config(
pre_hook="{{ ga4.combine_property_data() }}" if var('property_ids', false) else "",
pre_hook="{{ ga4.combine_property_data() }}" if var('combined_dataset', false) else "",
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by={
Expand Down
10 changes: 8 additions & 2 deletions models/staging/src_ga4.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ version: 2

sources:
- name: ga4
database: "{{var('project')}}"
schema: "{{var('dataset')}}"
database: | # Source from target.project if multi-property, otherwise source from source_project
{%- if var('combined_dataset', false) != false -%} {{target.project}}
{%- else -%} {{var('source_project')}}
{%- endif -%}
schema: | # Source from combined property dataset if set, otherwise source from original GA4 property
{%- if var('combined_dataset', false) != false -%} {{var('combined_dataset')}}
{%- else -%} analytics_{{var('property_ids')[0]}}
{%- endif -%}
tables:
- name: events
identifier: events_* # Scan across all sharded event tables. Use the 'start_date' variable to limit this scan
Expand Down
Loading